1 /**
2 * Copyright (C) 2010-14 pvmanager developers. See COPYRIGHT.TXT
3 * All rights reserved. Use is subject to license terms. See LICENSE.TXT
4 */
5 package org.epics.pvmanager;
6
7 import java.util.*;
8 import java.util.concurrent.*;
9 import java.util.concurrent.atomic.AtomicInteger;
10 import java.util.logging.Level;
11 import java.util.logging.Logger;
12 import static org.epics.pvmanager.util.Executors.*;
13
14 /**
15 * A source for data that is going to be processed by the PVManager.
16 * PVManager can work with more than one source at a time. Support
17 * for each different source can be added by external libraries.
18 * <p>
19 * To implement a datasource, one has to implement the {@link #createChannel(java.lang.String) }
20 * method, and the requested will be forwarded to the channel accordingly.
21 * The channels are automatically cached and reused. The name under which
22 * the channels are looked up in the cache or registered in the cache is configurable.
23 * <p>
24 * Channel handlers can be implemented from scratch, or one can use the {@link MultiplexedChannelHandler}
25 * for handlers that want to open a single connection which is going to be
26 * shared by all readers and writers.
27 *
28 * @author carcassi
29 */
30 public abstract class DataSource {
31
32 private static final Logger log = Logger.getLogger(DataSource.class.getName());
33
34 private final boolean writeable;
35
36 /**
37 * Returns true whether the channels of this data source can be
38 * written to.
39 *
40 * @return true if data source accept write operations
41 */
42 public boolean isWriteable() {
43 return writeable;
44 }
45
46 /**
47 * Creates a new data source.
48 *
49 * @param writeable whether the data source implements write operations
50 */
51 public DataSource(boolean writeable) {
52 this.writeable = writeable;
53 }
54
55 // Keeps track of the currently created channels
56 private Map<String, ChannelHandler> usedChannels = new ConcurrentHashMap<String, ChannelHandler>();
57
58 /**
59 * Returns a channel from the given name, either cached or it
60 * will create it.
61 *
62 * @param channelName name of a channel
63 * @return a new or cached handler
64 */
65 ChannelHandler channel(String channelName) {
66 ChannelHandler channel = usedChannels.get(channelHandlerLookupName(channelName));
67 if (channel == null) {
68 channel = createChannel(channelName);
69 if (channel == null)
70 return null;
71 usedChannels.put(channelHandlerRegisterName(channelName, channel), channel);
72 }
73 return channel;
74 }
75
76 /**
77 * Returns the lookup name to use to find the channel handler in
78 * the cache. By default, it returns the channel name itself.
79 * If a datasource needs multiple different channel names to
80 * be the same channel handler (e.g. parts of the channel name
81 * are initialization parameters) then it can override this method
82 * to change the lookup.
83 *
84 * @param channelName the channel name
85 * @return the channel handler to look up in the cache
86 */
87 protected String channelHandlerLookupName(String channelName) {
88 return channelName;
89 }
90
91 /**
92 * Returns the name the given handler should be registered as.
93 * By default, it returns the lookup name, so that lookup and
94 * registration in the cache are consistent. If a datasource
95 * needs multiple different channel names to be the same
96 * channel handler (e.g. parts of the channel name are read/write
97 * parameters) then it can override this method to change the
98 * registration.
99 *
100 * @param channelName the name under which the ChannelHandler was created
101 * @param handler the handler to register
102 * @return the name under which to register in the cache
103 */
104 protected String channelHandlerRegisterName(String channelName, ChannelHandler handler) {
105 return channelHandlerLookupName(channelName);
106 }
107
108 /**
109 * Creates a channel handler for the given name. In the simplest
110 * case, this is the only method a data source needs to implement.
111 *
112 * @param channelName the name for a new channel
113 * @return a new handler
114 */
115 protected abstract ChannelHandler createChannel(String channelName);
116
117 // The executor used by the data source to perform asynchronous operations,
118 // such as connections and writes. I am current using a single thread for
119 // all data sources, which can be changed if needed.
120 // Since it's a single executor for all data sources, it should not be
121 // shut down at data source close.
122 private static ExecutorService exec = Executors.newSingleThreadExecutor(namedPool("PVMgr DataSource Worker "));
123
124 // Keeps track of the recipes that were opened with
125 // this data source.
126 private Set<ChannelReadRecipe> readRecipes = Collections.synchronizedSet(new HashSet<ChannelReadRecipe>());
127 private Set<ChannelWriteRecipe> writeRecipes = Collections.synchronizedSet(new HashSet<ChannelWriteRecipe>());
128
129 /**
130 * Connects to a set of channels based on the given recipe.
131 * <p>
132 * The data source must update the value caches relative to each channel.
133 * Before updating any cache, it must lock the collector relative to that
134 * cache and after any update it must notify the collector.
135 *
136 * @param readRecipe the instructions for the data connection
137 */
138 public void connectRead(final ReadRecipe readRecipe) {
139 // Add the recipe first, so that if a problem comes out
140 // while processing the request, we still keep
141 // track of it.
142 readRecipes.addAll(readRecipe.getChannelReadRecipes());
143
144 // Let's go through all the recipes first, so if something
145 // breaks unexpectadely, either everything works or nothing works
146 final Map<ChannelHandler, Collection<ChannelReadRecipe>> handlersWithSubscriptions =
147 new HashMap<>();
148 for (final ChannelReadRecipe channelRecipe : readRecipe.getChannelReadRecipes()) {
149 try {
150 String channelName = channelRecipe.getChannelName();
151 ChannelHandler channelHandler = channel(channelName);
152 if (channelHandler == null) {
153 throw new RuntimeException("Channel named '" + channelName + "' not found");
154 }
155 Collection<ChannelReadRecipe> channelSubscriptions = handlersWithSubscriptions.get(channelHandler);
156 if (channelSubscriptions == null) {
157 channelSubscriptions = new HashSet<>();
158 handlersWithSubscriptions.put(channelHandler, channelSubscriptions);
159 }
160 channelSubscriptions.add(channelRecipe);
161 } catch (Exception ex) {
162 // If any error happens while creating the channel,
163 // report it to the exception handler of that channel
164 channelRecipe.getReadSubscription().getExceptionWriteFunction().writeValue(ex);
165 }
166
167 }
168
169 // Now that we went through all channels,
170 // add a monitor to the ones that were found
171 exec.execute(new Runnable() {
172
173 @Override
174 public void run() {
175 for (Map.Entry<ChannelHandler, Collection<ChannelReadRecipe>> entry : handlersWithSubscriptions.entrySet()) {
176 ChannelHandler channelHandler = entry.getKey();
177 Collection<ChannelReadRecipe> channelRecipes = entry.getValue();
178 for (ChannelReadRecipe channelRecipe : channelRecipes) {
179 try {
180 channelHandler.addReader(channelRecipe.getReadSubscription());
181 } catch(Exception ex) {
182 // If an error happens while adding the read subscription,
183 // notify the appropriate handler
184 channelRecipe.getReadSubscription().getExceptionWriteFunction().writeValue(ex);
185 }
186 }
187 }
188 }
189 });
190 }
191
192 /**
193 * Disconnects the set of channels given by the recipe.
194 * <p>
195 * The disconnect call is guaranteed to be given the same object,
196 * so that the recipe itself can be used as a key in a map to retrieve
197 * the list of resources needed to be closed.
198 *
199 * @param readRecipe the instructions for the data connection
200 */
201 public void disconnectRead(final ReadRecipe readRecipe) {
202 // Find the channels to disconnect
203 final Map<ChannelHandler, ChannelHandlerReadSubscription> handlers = new HashMap<>();
204 for (ChannelReadRecipe channelRecipe : readRecipe.getChannelReadRecipes()) {
205 if (!readRecipes.contains(channelRecipe)) {
206 log.log(Level.WARNING, "ChannelReadRecipe {0} was disconnected but was never connected. Ignoring it.", channelRecipe);
207 } else {
208 String channelName = channelRecipe.getChannelName();
209 ChannelHandler channelHandler = channel(channelName);
210 // If the channel is not found, it means it was not found during
211 // connection and a proper notification was sent then. Silently
212 // ignore it.
213 if (channelHandler != null) {
214 handlers.put(channelHandler, channelRecipe.getReadSubscription());
215 }
216 readRecipes.remove(channelRecipe);
217 }
218 }
219
220 // Schedule disconnection and return right away.
221 exec.execute(new Runnable() {
222
223 @Override
224 public void run() {
225 for (Map.Entry<ChannelHandler, ChannelHandlerReadSubscription> entry : handlers.entrySet()) {
226 ChannelHandler channelHandler = entry.getKey();
227 ChannelHandlerReadSubscription channelHandlerReadSubscription = entry.getValue();
228 channelHandler.removeReader(channelHandlerReadSubscription);
229 }
230 }
231
232 });
233
234 }
235
236 /**
237 * Prepares the channels defined in the write recipe for writes.
238 * <p>
239 * If these are channels over the network, it will create the
240 * network connections with the underlying libraries.
241 *
242 * @param writeRecipe the recipe that will contain the write data
243 */
244 public void connectWrite(final WriteRecipe writeRecipe) {
245 if (!isWriteable()) {
246 throw new RuntimeException("Data source is read only");
247 }
248
249 // Register right away, so that if a failure happen
250 // we still keep track of it
251 writeRecipes.addAll(writeRecipe.getChannelWriteRecipes());
252
253 // Let's go through the whole request first, so if something
254 // breaks unexpectadely, either everything works or nothing works
255 final Map<ChannelHandler, Collection<ChannelHandlerWriteSubscription>> handlers = new HashMap<>();
256 for (ChannelWriteRecipe channelWriteRecipe : writeRecipe.getChannelWriteRecipes()) {
257 try {
258 String channelName = channelWriteRecipe.getChannelName();
259 ChannelHandler handler = channel(channelName);
260 if (handler == null) {
261 throw new RuntimeException("Channel " + channelName + " does not exist");
262 }
263 Collection<ChannelHandlerWriteSubscription> channelSubscriptions = handlers.get(handler);
264 if (channelSubscriptions == null) {
265 channelSubscriptions = new HashSet<>();
266 handlers.put(handler, channelSubscriptions);
267 }
268 channelSubscriptions.add(channelWriteRecipe.getWriteSubscription());
269 } catch (Exception ex) {
270 channelWriteRecipe.getWriteSubscription().getExceptionWriteFunction().writeValue(ex);
271 }
272 }
273
274 // Connect using another thread
275 exec.execute(new Runnable() {
276
277 @Override
278 public void run() {
279 for (Map.Entry<ChannelHandler, Collection<ChannelHandlerWriteSubscription>> entry : handlers.entrySet()) {
280 ChannelHandler channelHandler = entry.getKey();
281 Collection<ChannelHandlerWriteSubscription> subscriptions = entry.getValue();
282 for (ChannelHandlerWriteSubscription subscription : subscriptions) {
283 try {
284 channelHandler.addWriter(subscription);
285 } catch (Exception ex) {
286 // If an error happens while adding the write subscription,
287 // notify the appropriate handler
288 subscription.getExceptionWriteFunction().writeValue(ex);
289 }
290 }
291 }
292 }
293 });
294 }
295
296 /**
297 * Releases the resources associated with the given write recipe.
298 * <p>
299 * Will close network channels and deallocate memory needed.
300 *
301 * @param writeRecipe the recipe that will no longer be used
302 */
303 public void disconnectWrite(final WriteRecipe writeRecipe) {
304 if (!isWriteable()) {
305 throw new RuntimeException("Data source is read only");
306 }
307
308 final Map<ChannelHandler, ChannelHandlerWriteSubscription> handlers = new HashMap<ChannelHandler, ChannelHandlerWriteSubscription>();
309 for (ChannelWriteRecipe channelWriteRecipe : writeRecipe.getChannelWriteRecipes()) {
310 if (!writeRecipes.contains(channelWriteRecipe)) {
311 log.log(Level.WARNING, "ChannelWriteRecipe {0} was unregistered but was never registered. Ignoring it.", channelWriteRecipe);
312 } else {
313 try {
314 String channelName = channelWriteRecipe.getChannelName();
315 ChannelHandler handler = channel(channelName);
316 // If the channel does not exist, simply skip it: it must have
317 // not be there while preparing the write, so an appropriate
318 // notification has already been sent
319 if (handler != null) {
320 handlers.put(handler, channelWriteRecipe.getWriteSubscription());
321 }
322 } catch (Exception ex) {
323 // No point in sending the exception through the exception handler:
324 // nothing will be listening by now. Just log the exception
325 log.log(Level.WARNING, "Error while preparing channel '" + channelWriteRecipe.getChannelName() + "' for closing.", ex);
326 }
327 writeRecipes.remove(channelWriteRecipe);
328 }
329 }
330
331 // Disconnect using another thread
332 exec.execute(new Runnable() {
333
334 @Override
335 public void run() {
336 for (Map.Entry<ChannelHandler, ChannelHandlerWriteSubscription> entry : handlers.entrySet()) {
337 ChannelHandler channelHandler = entry.getKey();
338 ChannelHandlerWriteSubscription channelHandlerWriteSubscription = entry.getValue();
339 channelHandler.removeWrite(channelHandlerWriteSubscription);
340 }
341 }
342 });
343 }
344
345 /**
346 * Writes the contents in the given write recipe to the channels
347 * of this data sources.
348 * <p>
349 * The write recipe needs to be first prepared with {@link #connectWrite(org.epics.pvmanager.WriteRecipe) }
350 * and then cleaned up with {@link #disconnectWrite(org.epics.pvmanager.WriteRecipe) }.
351 *
352 * @param writeRecipe the recipe containing the data to write
353 * @param callback function to call when the write is concluded
354 * @param exceptionHandler where to report the exceptions
355 */
356 public void write(final WriteRecipe writeRecipe, final Runnable callback, final ExceptionHandler exceptionHandler) {
357 if (!isWriteable())
358 throw new UnsupportedOperationException("This data source is read only");
359
360 final WritePlanner planner = new WritePlanner();
361 for (ChannelWriteRecipe channelWriteRecipe : writeRecipe.getChannelWriteRecipes()) {
362 ChannelHandler channel = channel(channelWriteRecipe.getChannelName());
363 planner.addChannel(channel, channelWriteRecipe.getWriteSubscription().getWriteCache().getValue(),
364 channelWriteRecipe.getWriteSubscription().getWriteCache().getPrecedingChannels());
365 }
366
367 // Connect using another thread
368 exec.execute(new Runnable() {
369
370 private void scheduleNext() {
371 for (Map.Entry<ChannelHandler, Object> entry : planner.nextChannels().entrySet()) {
372 final String channelName = entry.getKey().getChannelName();
373 try {
374 entry.getKey().write(entry.getValue(), new ChannelWriteCallback() {
375
376 AtomicInteger counter = new AtomicInteger();
377
378 @Override
379 public void channelWritten(Exception ex) {
380 planner.removeChannel(channelName);
381
382 // If there was an error, notify the exception
383 // and don't schedule anything else
384 if (ex != null) {
385 exceptionHandler.handleException(ex);
386 return;
387 }
388
389 // Notify only when the last channel was written
390 if (planner.isDone()) {
391 callback.run();
392 } else {
393 scheduleNext();
394 }
395 }
396 });
397 } catch (RuntimeException ex) {
398 exceptionHandler.handleException(ex);
399 }
400 }
401 }
402
403 @Override
404 public void run() {
405 scheduleNext();
406 }
407 });
408 }
409
410 /**
411 * Returns the channel handlers for this data source.
412 *
413 * @return an unmodifiable collection
414 */
415 public Map<String, ChannelHandler> getChannels() {
416 return Collections.unmodifiableMap(usedChannels);
417 }
418
419 /**
420 * Closes the DataSource and the resources associated with it.
421 */
422 public void close() {
423 }
424
425 }