1
2
3
4
5 package org.epics.pvmanager;
6
7 import java.util.*;
8 import java.util.concurrent.*;
9 import java.util.concurrent.atomic.AtomicInteger;
10 import java.util.logging.Level;
11 import java.util.logging.Logger;
12 import static org.epics.pvmanager.util.Executors.*;
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 public abstract class DataSource {
31
32 private static final Logger log = Logger.getLogger(DataSource.class.getName());
33
34 private final boolean writeable;
35
36
37
38
39
40
41
42 public boolean isWriteable() {
43 return writeable;
44 }
45
46
47
48
49
50
51 public DataSource(boolean writeable) {
52 this.writeable = writeable;
53 }
54
55
56 private Map<String, ChannelHandler> usedChannels = new ConcurrentHashMap<String, ChannelHandler>();
57
58
59
60
61
62
63
64
65 ChannelHandler channel(String channelName) {
66 ChannelHandler channel = usedChannels.get(channelHandlerLookupName(channelName));
67 if (channel == null) {
68 channel = createChannel(channelName);
69 if (channel == null)
70 return null;
71 usedChannels.put(channelHandlerRegisterName(channelName, channel), channel);
72 }
73 return channel;
74 }
75
76
77
78
79
80
81
82
83
84
85
86
87 protected String channelHandlerLookupName(String channelName) {
88 return channelName;
89 }
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 protected String channelHandlerRegisterName(String channelName, ChannelHandler handler) {
105 return channelHandlerLookupName(channelName);
106 }
107
108
109
110
111
112
113
114
115 protected abstract ChannelHandler createChannel(String channelName);
116
117
118
119
120
121
122 private static ExecutorService exec = Executors.newSingleThreadExecutor(namedPool("PVMgr DataSource Worker "));
123
124
125
126 private Set<ChannelReadRecipe> readRecipes = Collections.synchronizedSet(new HashSet<ChannelReadRecipe>());
127 private Set<ChannelWriteRecipe> writeRecipes = Collections.synchronizedSet(new HashSet<ChannelWriteRecipe>());
128
129
130
131
132
133
134
135
136
137
138 public void connectRead(final ReadRecipe readRecipe) {
139
140
141
142 readRecipes.addAll(readRecipe.getChannelReadRecipes());
143
144
145
146 final Map<ChannelHandler, Collection<ChannelReadRecipe>> handlersWithSubscriptions =
147 new HashMap<>();
148 for (final ChannelReadRecipe channelRecipe : readRecipe.getChannelReadRecipes()) {
149 try {
150 String channelName = channelRecipe.getChannelName();
151 ChannelHandler channelHandler = channel(channelName);
152 if (channelHandler == null) {
153 throw new RuntimeException("Channel named '" + channelName + "' not found");
154 }
155 Collection<ChannelReadRecipe> channelSubscriptions = handlersWithSubscriptions.get(channelHandler);
156 if (channelSubscriptions == null) {
157 channelSubscriptions = new HashSet<>();
158 handlersWithSubscriptions.put(channelHandler, channelSubscriptions);
159 }
160 channelSubscriptions.add(channelRecipe);
161 } catch (Exception ex) {
162
163
164 channelRecipe.getReadSubscription().getExceptionWriteFunction().writeValue(ex);
165 }
166
167 }
168
169
170
171 exec.execute(new Runnable() {
172
173 @Override
174 public void run() {
175 for (Map.Entry<ChannelHandler, Collection<ChannelReadRecipe>> entry : handlersWithSubscriptions.entrySet()) {
176 ChannelHandler channelHandler = entry.getKey();
177 Collection<ChannelReadRecipe> channelRecipes = entry.getValue();
178 for (ChannelReadRecipe channelRecipe : channelRecipes) {
179 try {
180 channelHandler.addReader(channelRecipe.getReadSubscription());
181 } catch(Exception ex) {
182
183
184 channelRecipe.getReadSubscription().getExceptionWriteFunction().writeValue(ex);
185 }
186 }
187 }
188 }
189 });
190 }
191
192
193
194
195
196
197
198
199
200
201 public void disconnectRead(final ReadRecipe readRecipe) {
202
203 final Map<ChannelHandler, ChannelHandlerReadSubscription> handlers = new HashMap<>();
204 for (ChannelReadRecipe channelRecipe : readRecipe.getChannelReadRecipes()) {
205 if (!readRecipes.contains(channelRecipe)) {
206 log.log(Level.WARNING, "ChannelReadRecipe {0} was disconnected but was never connected. Ignoring it.", channelRecipe);
207 } else {
208 String channelName = channelRecipe.getChannelName();
209 ChannelHandler channelHandler = channel(channelName);
210
211
212
213 if (channelHandler != null) {
214 handlers.put(channelHandler, channelRecipe.getReadSubscription());
215 }
216 readRecipes.remove(channelRecipe);
217 }
218 }
219
220
221 exec.execute(new Runnable() {
222
223 @Override
224 public void run() {
225 for (Map.Entry<ChannelHandler, ChannelHandlerReadSubscription> entry : handlers.entrySet()) {
226 ChannelHandler channelHandler = entry.getKey();
227 ChannelHandlerReadSubscription channelHandlerReadSubscription = entry.getValue();
228 channelHandler.removeReader(channelHandlerReadSubscription);
229 }
230 }
231
232 });
233
234 }
235
236
237
238
239
240
241
242
243
244 public void connectWrite(final WriteRecipe writeRecipe) {
245 if (!isWriteable()) {
246 throw new RuntimeException("Data source is read only");
247 }
248
249
250
251 writeRecipes.addAll(writeRecipe.getChannelWriteRecipes());
252
253
254
255 final Map<ChannelHandler, Collection<ChannelHandlerWriteSubscription>> handlers = new HashMap<>();
256 for (ChannelWriteRecipe channelWriteRecipe : writeRecipe.getChannelWriteRecipes()) {
257 try {
258 String channelName = channelWriteRecipe.getChannelName();
259 ChannelHandler handler = channel(channelName);
260 if (handler == null) {
261 throw new RuntimeException("Channel " + channelName + " does not exist");
262 }
263 Collection<ChannelHandlerWriteSubscription> channelSubscriptions = handlers.get(handler);
264 if (channelSubscriptions == null) {
265 channelSubscriptions = new HashSet<>();
266 handlers.put(handler, channelSubscriptions);
267 }
268 channelSubscriptions.add(channelWriteRecipe.getWriteSubscription());
269 } catch (Exception ex) {
270 channelWriteRecipe.getWriteSubscription().getExceptionWriteFunction().writeValue(ex);
271 }
272 }
273
274
275 exec.execute(new Runnable() {
276
277 @Override
278 public void run() {
279 for (Map.Entry<ChannelHandler, Collection<ChannelHandlerWriteSubscription>> entry : handlers.entrySet()) {
280 ChannelHandler channelHandler = entry.getKey();
281 Collection<ChannelHandlerWriteSubscription> subscriptions = entry.getValue();
282 for (ChannelHandlerWriteSubscription subscription : subscriptions) {
283 try {
284 channelHandler.addWriter(subscription);
285 } catch (Exception ex) {
286
287
288 subscription.getExceptionWriteFunction().writeValue(ex);
289 }
290 }
291 }
292 }
293 });
294 }
295
296
297
298
299
300
301
302
303 public void disconnectWrite(final WriteRecipe writeRecipe) {
304 if (!isWriteable()) {
305 throw new RuntimeException("Data source is read only");
306 }
307
308 final Map<ChannelHandler, ChannelHandlerWriteSubscription> handlers = new HashMap<ChannelHandler, ChannelHandlerWriteSubscription>();
309 for (ChannelWriteRecipe channelWriteRecipe : writeRecipe.getChannelWriteRecipes()) {
310 if (!writeRecipes.contains(channelWriteRecipe)) {
311 log.log(Level.WARNING, "ChannelWriteRecipe {0} was unregistered but was never registered. Ignoring it.", channelWriteRecipe);
312 } else {
313 try {
314 String channelName = channelWriteRecipe.getChannelName();
315 ChannelHandler handler = channel(channelName);
316
317
318
319 if (handler != null) {
320 handlers.put(handler, channelWriteRecipe.getWriteSubscription());
321 }
322 } catch (Exception ex) {
323
324
325 log.log(Level.WARNING, "Error while preparing channel '" + channelWriteRecipe.getChannelName() + "' for closing.", ex);
326 }
327 writeRecipes.remove(channelWriteRecipe);
328 }
329 }
330
331
332 exec.execute(new Runnable() {
333
334 @Override
335 public void run() {
336 for (Map.Entry<ChannelHandler, ChannelHandlerWriteSubscription> entry : handlers.entrySet()) {
337 ChannelHandler channelHandler = entry.getKey();
338 ChannelHandlerWriteSubscription channelHandlerWriteSubscription = entry.getValue();
339 channelHandler.removeWrite(channelHandlerWriteSubscription);
340 }
341 }
342 });
343 }
344
345
346
347
348
349
350
351
352
353
354
355
356 public void write(final WriteRecipe writeRecipe, final Runnable callback, final ExceptionHandler exceptionHandler) {
357 if (!isWriteable())
358 throw new UnsupportedOperationException("This data source is read only");
359
360 final WritePlanner planner = new WritePlanner();
361 for (ChannelWriteRecipe channelWriteRecipe : writeRecipe.getChannelWriteRecipes()) {
362 ChannelHandler channel = channel(channelWriteRecipe.getChannelName());
363 planner.addChannel(channel, channelWriteRecipe.getWriteSubscription().getWriteCache().getValue(),
364 channelWriteRecipe.getWriteSubscription().getWriteCache().getPrecedingChannels());
365 }
366
367
368 exec.execute(new Runnable() {
369
370 private void scheduleNext() {
371 for (Map.Entry<ChannelHandler, Object> entry : planner.nextChannels().entrySet()) {
372 final String channelName = entry.getKey().getChannelName();
373 try {
374 entry.getKey().write(entry.getValue(), new ChannelWriteCallback() {
375
376 AtomicInteger counter = new AtomicInteger();
377
378 @Override
379 public void channelWritten(Exception ex) {
380 planner.removeChannel(channelName);
381
382
383
384 if (ex != null) {
385 exceptionHandler.handleException(ex);
386 return;
387 }
388
389
390 if (planner.isDone()) {
391 callback.run();
392 } else {
393 scheduleNext();
394 }
395 }
396 });
397 } catch (RuntimeException ex) {
398 exceptionHandler.handleException(ex);
399 }
400 }
401 }
402
403 @Override
404 public void run() {
405 scheduleNext();
406 }
407 });
408 }
409
410
411
412
413
414
415 public Map<String, ChannelHandler> getChannels() {
416 return Collections.unmodifiableMap(usedChannels);
417 }
418
419
420
421
422 public void close() {
423 }
424
425 }