View Javadoc
1   /**
2    * Copyright (C) 2010-14 pvmanager developers. See COPYRIGHT.TXT
3    * All rights reserved. Use is subject to license terms. See LICENSE.TXT
4    */
5   package org.epics.pvmanager;
6   
7   import java.util.*;
8   import java.util.concurrent.*;
9   import java.util.concurrent.atomic.AtomicInteger;
10  import java.util.logging.Level;
11  import java.util.logging.Logger;
12  import static org.epics.pvmanager.util.Executors.*;
13  
14  /**
15   * A source for data that is going to be processed by the PVManager.
16   * PVManager can work with more than one source at a time. Support
17   * for each different source can be added by external libraries.
18   * <p>
19   * To implement a datasource, one has to implement the {@link #createChannel(java.lang.String) }
20   * method, and the requested will be forwarded to the channel accordingly.
21   * The channels are automatically cached and reused. The name under which
22   * the channels are looked up in the cache or registered in the cache is configurable.
23   * <p>
24   * Channel handlers can be implemented from scratch, or one can use the {@link MultiplexedChannelHandler}
25   * for handlers that want to open a single connection which is going to be
26   * shared by all readers and writers.
27   *
28   * @author carcassi
29   */
30  public abstract class DataSource {
31  
32      private static final Logger log = Logger.getLogger(DataSource.class.getName());
33  
34      private final boolean writeable;
35  
36      /**
37       * Returns true whether the channels of this data source can be
38       * written to.
39       * 
40       * @return true if data source accept write operations
41       */
42      public boolean isWriteable() {
43          return writeable;
44      }
45      
46      /**
47       * Creates a new data source.
48       * 
49       * @param writeable whether the data source implements write operations
50       */
51      public DataSource(boolean writeable) {
52          this.writeable = writeable;
53      }
54  
55      // Keeps track of the currently created channels
56      private Map<String, ChannelHandler> usedChannels = new ConcurrentHashMap<String, ChannelHandler>();
57  
58      /**
59       * Returns a channel from the given name, either cached or it
60       * will create it.
61       * 
62       * @param channelName name of a channel
63       * @return a new or cached handler
64       */
65      ChannelHandler channel(String channelName) {
66          ChannelHandler channel = usedChannels.get(channelHandlerLookupName(channelName));
67          if (channel == null) {
68              channel = createChannel(channelName);
69              if (channel == null)
70                  return null;
71              usedChannels.put(channelHandlerRegisterName(channelName, channel), channel);
72          }
73          return channel;
74      }
75      
76      /**
77       * Returns the lookup name to use to find the channel handler in
78       * the cache. By default, it returns the channel name itself.
79       * If a datasource needs multiple different channel names to
80       * be the same channel handler (e.g. parts of the channel name
81       * are initialization parameters) then it can override this method
82       * to change the lookup.
83       * 
84       * @param channelName the channel name
85       * @return the channel handler to look up in the cache
86       */
87      protected String channelHandlerLookupName(String channelName) {
88          return channelName;
89      }
90      
91      /**
92       * Returns the name the given handler should be registered as.
93       * By default, it returns the lookup name, so that lookup and
94       * registration in the cache are consistent. If a datasource
95       * needs multiple different channel names to be the same 
96       * channel handler (e.g. parts of the channel name are read/write
97       * parameters) then it can override this method to change the
98       * registration.
99       * 
100      * @param channelName the name under which the ChannelHandler was created
101      * @param handler the handler to register
102      * @return the name under which to register in the cache
103      */
104     protected String channelHandlerRegisterName(String channelName, ChannelHandler handler) {
105         return channelHandlerLookupName(channelName);
106     }
107 
108     /**
109      * Creates a channel handler for the given name. In the simplest
110      * case, this is the only method a data source needs to implement.
111      * 
112      * @param channelName the name for a new channel
113      * @return a new handler
114      */
115     protected abstract ChannelHandler createChannel(String channelName);
116 
117     // The executor used by the data source to perform asynchronous operations,
118     // such as connections and writes. I am current using a single thread for
119     // all data sources, which can be changed if needed.
120     // Since it's a single executor for all data sources, it should not be
121     // shut down at data source close.
122     private static ExecutorService exec = Executors.newSingleThreadExecutor(namedPool("PVMgr DataSource Worker "));
123     
124     // Keeps track of the recipes that were opened with
125     // this data source.
126     private Set<ChannelReadRecipe> readRecipes = Collections.synchronizedSet(new HashSet<ChannelReadRecipe>());
127     private Set<ChannelWriteRecipe> writeRecipes = Collections.synchronizedSet(new HashSet<ChannelWriteRecipe>());
128 
129     /**
130      * Connects to a set of channels based on the given recipe.
131      * <p>
132      * The data source must update the value caches relative to each channel.
133      * Before updating any cache, it must lock the collector relative to that
134      * cache and after any update it must notify the collector.
135      *
136      * @param readRecipe the instructions for the data connection
137      */
138     public void connectRead(final ReadRecipe readRecipe) {
139         // Add the recipe first, so that if a problem comes out
140         // while processing the request, we still keep
141         // track of it.
142         readRecipes.addAll(readRecipe.getChannelReadRecipes());
143 
144         // Let's go through all the recipes first, so if something
145         // breaks unexpectadely, either everything works or nothing works
146         final Map<ChannelHandler, Collection<ChannelReadRecipe>> handlersWithSubscriptions =
147                 new HashMap<>();
148         for (final ChannelReadRecipe channelRecipe : readRecipe.getChannelReadRecipes()) {
149             try {
150                 String channelName = channelRecipe.getChannelName();
151                 ChannelHandler channelHandler = channel(channelName);
152                 if (channelHandler == null) {
153                     throw new RuntimeException("Channel named '" + channelName + "' not found");
154                 }
155                 Collection<ChannelReadRecipe> channelSubscriptions = handlersWithSubscriptions.get(channelHandler);
156                 if (channelSubscriptions == null) {
157                     channelSubscriptions = new HashSet<>();
158                     handlersWithSubscriptions.put(channelHandler, channelSubscriptions);
159                 }
160                 channelSubscriptions.add(channelRecipe);
161             } catch (Exception ex) {
162                 // If any error happens while creating the channel,
163                 // report it to the exception handler of that channel
164                 channelRecipe.getReadSubscription().getExceptionWriteFunction().writeValue(ex);
165             }
166             
167         }
168         
169         // Now that we went through all channels,
170         // add a monitor to the ones that were found
171         exec.execute(new Runnable() {
172 
173             @Override
174             public void run() {
175                 for (Map.Entry<ChannelHandler, Collection<ChannelReadRecipe>> entry : handlersWithSubscriptions.entrySet()) {
176                     ChannelHandler channelHandler = entry.getKey();
177                     Collection<ChannelReadRecipe> channelRecipes = entry.getValue();
178                     for (ChannelReadRecipe channelRecipe : channelRecipes) {
179                         try {
180                             channelHandler.addReader(channelRecipe.getReadSubscription());
181                         } catch(Exception ex) {
182                             // If an error happens while adding the read subscription,
183                             // notify the appropriate handler
184                             channelRecipe.getReadSubscription().getExceptionWriteFunction().writeValue(ex);
185                         }
186                     }
187                 }
188             }
189         });
190     }
191 
192     /**
193      * Disconnects the set of channels given by the recipe.
194      * <p>
195      * The disconnect call is guaranteed to be given the same object,
196      * so that the recipe itself can be used as a key in a map to retrieve
197      * the list of resources needed to be closed.
198      *
199      * @param readRecipe the instructions for the data connection
200      */
201     public void disconnectRead(final ReadRecipe readRecipe) {
202         // Find the channels to disconnect
203         final Map<ChannelHandler, ChannelHandlerReadSubscription> handlers = new HashMap<>();
204         for (ChannelReadRecipe channelRecipe : readRecipe.getChannelReadRecipes()) {
205             if (!readRecipes.contains(channelRecipe)) {
206                 log.log(Level.WARNING, "ChannelReadRecipe {0} was disconnected but was never connected. Ignoring it.", channelRecipe);
207             } else {
208                 String channelName = channelRecipe.getChannelName();
209                 ChannelHandler channelHandler = channel(channelName);
210                 // If the channel is not found, it means it was not found during
211                 // connection and a proper notification was sent then. Silently
212                 // ignore it.
213                 if (channelHandler != null) {
214                     handlers.put(channelHandler, channelRecipe.getReadSubscription());
215                 }
216                 readRecipes.remove(channelRecipe);
217             }
218         }
219         
220         // Schedule disconnection and return right away.
221         exec.execute(new Runnable() {
222 
223             @Override
224             public void run() {
225                 for (Map.Entry<ChannelHandler, ChannelHandlerReadSubscription> entry : handlers.entrySet()) {
226                     ChannelHandler channelHandler = entry.getKey();
227                     ChannelHandlerReadSubscription channelHandlerReadSubscription = entry.getValue();
228                     channelHandler.removeReader(channelHandlerReadSubscription);
229                 }
230             }
231             
232         });
233 
234     }
235     
236     /**
237      * Prepares the channels defined in the write recipe for writes.
238      * <p>
239      * If these are channels over the network, it will create the 
240      * network connections with the underlying libraries.
241      * 
242      * @param writeRecipe the recipe that will contain the write data
243      */
244     public void connectWrite(final WriteRecipe writeRecipe) {
245         if (!isWriteable()) {
246             throw new RuntimeException("Data source is read only");
247         }
248         
249         // Register right away, so that if a failure happen
250         // we still keep track of it
251         writeRecipes.addAll(writeRecipe.getChannelWriteRecipes());
252         
253         // Let's go through the whole request first, so if something
254         // breaks unexpectadely, either everything works or nothing works
255         final Map<ChannelHandler, Collection<ChannelHandlerWriteSubscription>> handlers = new HashMap<>();
256         for (ChannelWriteRecipe channelWriteRecipe : writeRecipe.getChannelWriteRecipes()) {
257             try {
258                 String channelName = channelWriteRecipe.getChannelName();
259                 ChannelHandler handler = channel(channelName);
260                 if (handler == null) {
261                     throw new RuntimeException("Channel " + channelName + " does not exist");
262                 }
263                 Collection<ChannelHandlerWriteSubscription> channelSubscriptions = handlers.get(handler);
264                 if (channelSubscriptions == null) {
265                     channelSubscriptions = new HashSet<>();
266                     handlers.put(handler, channelSubscriptions);
267                 }
268                 channelSubscriptions.add(channelWriteRecipe.getWriteSubscription());
269             } catch (Exception ex) {
270                 channelWriteRecipe.getWriteSubscription().getExceptionWriteFunction().writeValue(ex);
271             }
272         }
273 
274         // Connect using another thread
275         exec.execute(new Runnable() {
276 
277             @Override
278             public void run() {
279                 for (Map.Entry<ChannelHandler, Collection<ChannelHandlerWriteSubscription>> entry : handlers.entrySet()) {
280                     ChannelHandler channelHandler = entry.getKey();
281                     Collection<ChannelHandlerWriteSubscription> subscriptions = entry.getValue();
282                     for (ChannelHandlerWriteSubscription subscription : subscriptions) {
283                         try {
284                             channelHandler.addWriter(subscription);
285                         } catch (Exception ex) {
286                             // If an error happens while adding the write subscription,
287                             // notify the appropriate handler
288                             subscription.getExceptionWriteFunction().writeValue(ex);
289                         }
290                     }
291                 }
292             }
293         });
294     }
295     
296     /**
297      * Releases the resources associated with the given write recipe.
298      * <p>
299      * Will close network channels and deallocate memory needed.
300      * 
301      * @param writeRecipe the recipe that will no longer be used
302      */
303     public void disconnectWrite(final WriteRecipe writeRecipe) {
304         if (!isWriteable()) {
305             throw new RuntimeException("Data source is read only");
306         }
307         
308         final Map<ChannelHandler, ChannelHandlerWriteSubscription> handlers = new HashMap<ChannelHandler, ChannelHandlerWriteSubscription>();
309         for (ChannelWriteRecipe channelWriteRecipe : writeRecipe.getChannelWriteRecipes()) {
310             if (!writeRecipes.contains(channelWriteRecipe)) {
311                 log.log(Level.WARNING, "ChannelWriteRecipe {0} was unregistered but was never registered. Ignoring it.", channelWriteRecipe);
312             } else {
313                 try {
314                     String channelName = channelWriteRecipe.getChannelName();
315                     ChannelHandler handler = channel(channelName);
316                     // If the channel does not exist, simply skip it: it must have
317                     // not be there while preparing the write, so an appropriate
318                     // notification has already been sent
319                     if (handler != null) {
320                         handlers.put(handler, channelWriteRecipe.getWriteSubscription());
321                     }
322                 } catch (Exception ex) {
323                     // No point in sending the exception through the exception handler:
324                     // nothing will be listening by now. Just log the exception
325                     log.log(Level.WARNING, "Error while preparing channel '" + channelWriteRecipe.getChannelName() + "' for closing.", ex);
326                 }
327                 writeRecipes.remove(channelWriteRecipe);
328             }
329         }
330 
331         // Disconnect using another thread
332         exec.execute(new Runnable() {
333 
334             @Override
335             public void run() {
336                 for (Map.Entry<ChannelHandler, ChannelHandlerWriteSubscription> entry : handlers.entrySet()) {
337                     ChannelHandler channelHandler = entry.getKey();
338                     ChannelHandlerWriteSubscription channelHandlerWriteSubscription = entry.getValue();
339                     channelHandler.removeWrite(channelHandlerWriteSubscription);
340                 }
341             }
342         });
343     }
344     
345     /**
346      * Writes the contents in the given write recipe to the channels
347      * of this data sources.
348      * <p>
349      * The write recipe needs to be first prepared with {@link #connectWrite(org.epics.pvmanager.WriteRecipe) }
350      * and then cleaned up with {@link #disconnectWrite(org.epics.pvmanager.WriteRecipe)  }.
351      * 
352      * @param writeRecipe the recipe containing the data to write
353      * @param callback function to call when the write is concluded
354      * @param exceptionHandler where to report the exceptions
355      */
356     public void write(final WriteRecipe writeRecipe, final Runnable callback, final ExceptionHandler exceptionHandler) {
357         if (!isWriteable())
358             throw new UnsupportedOperationException("This data source is read only");
359         
360         final WritePlanner planner = new WritePlanner();
361         for (ChannelWriteRecipe channelWriteRecipe : writeRecipe.getChannelWriteRecipes()) {
362             ChannelHandler channel = channel(channelWriteRecipe.getChannelName());
363             planner.addChannel(channel, channelWriteRecipe.getWriteSubscription().getWriteCache().getValue(),
364                     channelWriteRecipe.getWriteSubscription().getWriteCache().getPrecedingChannels());
365         }
366 
367         // Connect using another thread
368         exec.execute(new Runnable() {
369 
370             private void scheduleNext() {
371                 for (Map.Entry<ChannelHandler, Object> entry : planner.nextChannels().entrySet()) {
372                     final String channelName = entry.getKey().getChannelName();
373                     try {
374                         entry.getKey().write(entry.getValue(), new ChannelWriteCallback() {
375 
376                             AtomicInteger counter = new AtomicInteger();
377 
378                             @Override
379                             public void channelWritten(Exception ex) {
380                                 planner.removeChannel(channelName);
381 
382                                 // If there was an error, notify the exception
383                                 // and don't schedule anything else
384                                 if (ex != null) {
385                                     exceptionHandler.handleException(ex);
386                                     return;
387                                 }
388 
389                                 // Notify only when the last channel was written
390                                 if (planner.isDone()) {
391                                     callback.run();
392                                 } else {
393                                     scheduleNext();
394                                 }
395                             }
396                         });
397                     } catch (RuntimeException ex) {
398                         exceptionHandler.handleException(ex);
399                     }
400                 }
401             }
402 
403             @Override
404             public void run() {
405                 scheduleNext();
406             }
407         });
408     }
409 
410     /**
411      * Returns the channel handlers for this data source.
412      * 
413      * @return an unmodifiable collection
414      */
415     public Map<String, ChannelHandler> getChannels() {
416         return Collections.unmodifiableMap(usedChannels);
417     }
418 
419     /**
420      * Closes the DataSource and the resources associated with it.
421      */
422     public void close() {
423     }
424     
425 }