1
2
3
4
5 package org.epics.pvmanager.jca;
6
7 import org.epics.pvmanager.MultiplexedChannelHandler;
8 import org.epics.pvmanager.ChannelWriteCallback;
9 import org.epics.pvmanager.ValueCache;
10 import gov.aps.jca.CAException;
11 import gov.aps.jca.Channel;
12 import gov.aps.jca.Monitor;
13 import gov.aps.jca.dbr.*;
14 import gov.aps.jca.event.AccessRightsEvent;
15 import gov.aps.jca.event.AccessRightsListener;
16 import gov.aps.jca.event.ConnectionEvent;
17 import gov.aps.jca.event.ConnectionListener;
18 import gov.aps.jca.event.GetEvent;
19 import gov.aps.jca.event.GetListener;
20 import gov.aps.jca.event.MonitorEvent;
21 import gov.aps.jca.event.MonitorListener;
22 import gov.aps.jca.event.PutEvent;
23 import gov.aps.jca.event.PutListener;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.logging.Level;
29 import java.util.logging.Logger;
30 import java.util.regex.Matcher;
31 import java.util.regex.Pattern;
32 import org.epics.pvmanager.*;
33 import org.epics.util.array.CollectionNumbers;
34 import org.epics.util.array.ListNumber;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 class JCAChannelHandler extends MultiplexedChannelHandler<JCAConnectionPayload, JCAMessagePayload> {
52
53 private static final int LARGE_ARRAY = 100000;
54 private final JCADataSource jcaDataSource;
55 private final String jcaChannelName;
56
57 private volatile Channel channel;
58
59 private volatile boolean needsMonitor;
60 private Monitor valueMonitor;
61 private Monitor metadataMonitor;
62 private volatile boolean largeArray = false;
63 private volatile boolean sentReadOnlyException = false;
64 private final boolean putCallback;
65 private final boolean longString;
66
67
68 private final AtomicBoolean needsAccessChangeListener = new AtomicBoolean(false);
69
70 public static Pattern longStringPattern = Pattern.compile(".+\\..*\\$.*");
71 private final static Pattern hasOptions = Pattern.compile("(.*) (\\{.*\\})");
72
73 private final static Logger log = Logger.getLogger(JCAChannelHandler.class.getName());
74
75 public JCAChannelHandler(String channelName, JCADataSource jcaDataSource) {
76 super(channelName);
77 setProcessMessageOnReconnect(false);
78 this.jcaDataSource = jcaDataSource;
79
80 boolean longStringName = longStringPattern.matcher(channelName).matches();
81
82
83
84 Matcher matcher = hasOptions.matcher(getChannelName());
85 if (matcher.matches()) {
86 jcaChannelName = matcher.group(1);
87 String clientOptions = matcher.group(2);
88
89 switch (clientOptions) {
90 case "{\"putCallback\":true}":
91 putCallback = true;
92 longString = longStringName;
93 break;
94 case "{\"putCallback\":false}":
95 putCallback = false;
96 longString = longStringName;
97 break;
98 case "{\"longString\":true}":
99 putCallback = false;
100 longString = true;
101 break;
102 case "{\"longString\":false}":
103 putCallback = false;
104 longString = false;
105 break;
106 default:
107 throw new IllegalArgumentException("Option not recognized for " + getChannelName());
108 }
109 } else {
110 longString = longStringName;
111 putCallback = false;
112 jcaChannelName = channelName;
113 }
114 }
115
116
117
118
119
120
121 public boolean isPutCallback() {
122 return putCallback;
123 }
124
125
126
127
128
129
130
131 public boolean isLongString() {
132 return longString;
133 }
134
135
136
137
138
139
140 public JCADataSource getJcaDataSource() {
141 return jcaDataSource;
142 }
143
144
145
146
147
148
149 public String getJcaChannelName() {
150 return jcaChannelName;
151 }
152
153 @Override
154 protected JCATypeAdapter findTypeAdapter(ValueCache<?> cache, JCAConnectionPayload connPayload) {
155 return jcaDataSource.getTypeSupport().find(cache, connPayload);
156 }
157
158 @Override
159 public synchronized void connect() {
160 needsMonitor = true;
161 needsAccessChangeListener.set(true);
162
163 try {
164
165
166 if (largeArray) {
167 channel = jcaDataSource.getContext().createChannel(getJcaChannelName(), connectionListener, Channel.PRIORITY_MIN);
168 } else {
169 channel = jcaDataSource.getContext().createChannel(getJcaChannelName(), connectionListener, (short) (Channel.PRIORITY_MIN + 1));
170 }
171 } catch (CAException ex) {
172 throw new RuntimeException("JCA Connection failed", ex);
173 }
174 }
175
176 private void putWithCallback(Object newValue, final ChannelWriteCallback callback) throws CAException {
177 PutListener listener = new PutListener() {
178
179 @Override
180 public void putCompleted(PutEvent ev) {
181 if (log.isLoggable(Level.FINEST)) {
182 log.log(Level.FINEST, "JCA putCompleted for channel {0} event {1}", new Object[] {getChannelName(), ev});
183 }
184
185 if (ev.getStatus().isSuccessful()) {
186 callback.channelWritten(null);
187 } else {
188 callback.channelWritten(new Exception(ev.toString()));
189 }
190 }
191 };
192
193 if (newValue instanceof ListNumber) {
194 ListNumber data = (ListNumber) newValue;
195 Object wrappedArray = CollectionNumbers.wrappedArray(data);
196 if (wrappedArray == null) {
197 newValue = CollectionNumbers.doubleArrayCopyOf(data);
198 } else {
199 newValue = wrappedArray;
200 }
201 }
202 if (newValue instanceof String) {
203 if (isLongString()) {
204 channel.put(toBytes(newValue.toString()), listener);
205 } else {
206 if (channel.getFieldType().isBYTE() && channel.getElementCount() > 1) {
207 log.warning("You are writing the String " + newValue + " to BYTE channel " + getChannelName() + ": use {\"longString\":true} for support");
208 channel.put(toBytes(newValue.toString()), listener);
209 } else {
210 channel.put(newValue.toString(), listener);
211 }
212 }
213 } else if (newValue instanceof byte[]) {
214 channel.put((byte[]) newValue, listener);
215 } else if (newValue instanceof short[]) {
216 channel.put((short[]) newValue, listener);
217 } else if (newValue instanceof int[]) {
218 channel.put((int[]) newValue, listener);
219 } else if (newValue instanceof float[]) {
220 channel.put((float[]) newValue, listener);
221 } else if (newValue instanceof double[]) {
222 channel.put((double[]) newValue, listener);
223 } else if (newValue instanceof Byte) {
224 channel.put((Byte) newValue, listener);
225 } else if (newValue instanceof Short) {
226 channel.put((Short) newValue, listener);
227 } else if (newValue instanceof Integer) {
228 channel.put((Integer) newValue, listener);
229 } else if (newValue instanceof Long) {
230
231
232 long value64 = (Long) newValue;
233 int value32 = (int) value64;
234 if (value32 == value64) {
235 channel.put(value32, listener);
236 } else {
237 channel.put((double) value64, listener);
238 }
239 } else if (newValue instanceof Float) {
240 channel.put((Float) newValue, listener);
241 } else if (newValue instanceof Double) {
242 channel.put((Double) newValue, listener);
243 } else {
244 throw new RuntimeException("Unsupported type for CA: " + newValue.getClass());
245 }
246 jcaDataSource.getContext().flushIO();
247 }
248
249 private void put(Object newValue, final ChannelWriteCallback callback) throws CAException {
250
251 if (newValue instanceof ListNumber) {
252 ListNumber data = (ListNumber) newValue;
253 Object wrappedArray = CollectionNumbers.wrappedArray(data);
254 if (wrappedArray == null) {
255 newValue = CollectionNumbers.doubleArrayCopyOf(data);
256 } else {
257 newValue = wrappedArray;
258 }
259 }
260 if (newValue instanceof Double[]) {
261 log.warning("You are writing a Double[] to channel " + getChannelName() + ": use org.epics.util.array.ListDouble instead");
262 final Double dbl[] = (Double[]) newValue;
263 final double val[] = new double[dbl.length];
264 for (int i = 0; i < val.length; ++i) {
265 val[i] = dbl[i].doubleValue();
266 }
267 newValue = val;
268 }
269 if (newValue instanceof Integer[]) {
270 log.warning("You are writing a Integer[] to channel " + getChannelName() + ": use org.epics.util.array.ListInt instead");
271 final Integer ival[] = (Integer[]) newValue;
272 final int val[] = new int[ival.length];
273 for (int i = 0; i < val.length; ++i) {
274 val[i] = ival[i].intValue();
275 }
276 newValue = val;
277 }
278
279 if (newValue instanceof String) {
280 if (isLongString()) {
281 channel.put(toBytes(newValue.toString()));
282 } else {
283 if (channel.getFieldType().isBYTE() && channel.getElementCount() > 1) {
284 log.warning("You are writing the String " + newValue + " to BYTE channel " + getChannelName() + ": use {\"longString\":true} for support");
285 channel.put(toBytes(newValue.toString()));
286 } else {
287 channel.put(newValue.toString());
288 }
289 }
290 } else if (newValue instanceof byte[]) {
291 channel.put((byte[]) newValue);
292 } else if (newValue instanceof short[]) {
293 channel.put((short[]) newValue);
294 } else if (newValue instanceof int[]) {
295 channel.put((int[]) newValue);
296 } else if (newValue instanceof float[]) {
297 channel.put((float[]) newValue);
298 } else if (newValue instanceof double[]) {
299 channel.put((double[]) newValue);
300 } else if (newValue instanceof Byte) {
301 channel.put((Byte) newValue);
302 } else if (newValue instanceof Short) {
303 channel.put((Short) newValue);
304 } else if (newValue instanceof Integer) {
305 channel.put((Integer) newValue);
306 } else if (newValue instanceof Long) {
307
308
309 long value64 = (Long) newValue;
310 int value32 = (int) value64;
311 if (value32 == value64) {
312 channel.put(value32);
313 } else {
314 channel.put((double) value64);
315 }
316 } else if (newValue instanceof Float) {
317 channel.put((Float) newValue);
318 } else if (newValue instanceof Double) {
319 channel.put((Double) newValue);
320 } else {
321 callback.channelWritten(new Exception(new RuntimeException("Unsupported type for CA: " + newValue.getClass())));
322 return;
323 }
324 jcaDataSource.getContext().flushIO();
325 callback.channelWritten(null);
326 }
327
328 private void setup(Channel channel) throws CAException {
329 DBRType metaType = metadataFor(channel);
330
331
332 if (metaType != null) {
333
334
335
336 channel.get(metaType, 1, new GetListener() {
337
338 @Override
339 public void getCompleted(GetEvent ev) {
340 synchronized(JCAChannelHandler.this) {
341 if (log.isLoggable(Level.FINEST)) {
342 log.log(Level.FINEST, "JCA metadata getCompleted for channel {0} event {1}", new Object[] {getChannelName(), ev});
343 }
344
345
346 MonitorEvent event = null;
347 if (getLastMessagePayload() != null) {
348 event = getLastMessagePayload().getEvent();
349 }
350 processMessage(new JCAMessagePayload(ev.getDBR(), event));
351 }
352 }
353 });
354 }
355
356 if (needsMonitor) {
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374 if (valueMonitor != null) {
375 valueMonitor.removeMonitorListener(monitorListener);
376 valueMonitor.clear();
377 valueMonitor = null;
378 }
379
380 valueMonitor = channel.addMonitor(valueTypeFor(channel), countFor(channel), jcaDataSource.getMonitorMask(), monitorListener);
381 needsMonitor = false;
382 }
383
384
385 if (metadataMonitor != null) {
386 metadataMonitor.removeMonitorListener(metadataListener);
387 metadataMonitor.clear();
388 metadataMonitor = null;
389 }
390
391
392 if (jcaDataSource.isDbePropertySupported() && metaType != null) {
393 metadataMonitor = channel.addMonitor(metaType, 1, Monitor.PROPERTY, metadataListener);
394 }
395
396
397 channel.getContext().flushIO();
398 }
399
400 private final ConnectionListener connectionListener = new ConnectionListener() {
401
402 @Override
403 public void connectionChanged(ConnectionEvent ev) {
404 synchronized(JCAChannelHandler.this) {
405 try {
406 if (log.isLoggable(Level.FINEST)) {
407 log.log(Level.FINEST, "JCA connectionChanged for channel {0} event {1}", new Object[] {getChannelName(), ev});
408 }
409
410
411
412 Channel channel = (Channel) ev.getSource();
413
414
415
416 if (ev.isConnected() && channel.getElementCount() >= LARGE_ARRAY && !largeArray) {
417 disconnect();
418 largeArray = true;
419 connect();
420 return;
421 }
422
423 processConnection(new JCAConnectionPayload(JCAChannelHandler.this, channel, getConnectionPayload()));
424 if (ev.isConnected()) {
425
426 if (!channel.getWriteAccess() && !sentReadOnlyException) {
427 reportExceptionToAllWriters(createReadOnlyException());
428 sentReadOnlyException = true;
429 }
430
431
432 setup(channel);
433 } else {
434 resetMessage();
435
436 sentReadOnlyException = false;
437 needsMonitor = true;
438 }
439
440 } catch (Exception ex) {
441 reportExceptionToAllReadersAndWriters(ex);
442 }
443 }
444
445
446
447
448
449
450
451
452
453
454 boolean addListener = needsAccessChangeListener.getAndSet(false);
455 if (addListener) {
456 try {
457 Channel channel = (Channel) ev.getSource();
458 channel.addAccessRightsListener(new AccessRightsListener() {
459
460 @Override
461 public void accessRightsChanged(AccessRightsEvent ev) {
462 if (log.isLoggable(Level.FINEST)) {
463 log.log(Level.FINEST, "JCA accessRightsChanged for channel {0} event {1}", new Object[] {getChannelName(), ev});
464 }
465
466
467
468 final Channel channel = (Channel) ev.getSource();
469 Runnable task = new Runnable() {
470
471 @Override
472 public void run() {
473 synchronized(JCAChannelHandler.this) {
474 processConnection(new JCAConnectionPayload(JCAChannelHandler.this, channel, getConnectionPayload()));
475 if (!sentReadOnlyException && !channel.getWriteAccess()) {
476 reportExceptionToAllWriters(createReadOnlyException());
477 sentReadOnlyException = true;
478 }
479 }
480 }
481 };
482 if (jcaDataSource.useContextSwitchForAccessRightCallback()) {
483 jcaDataSource.getContextSwitch().submit(task);
484 } else {
485 task.run();
486 }
487 }
488 });
489 } catch (Exception ex) {
490 reportExceptionToAllReadersAndWriters(ex);
491 }
492 }
493 }
494 };;
495
496 private String toStringDBR(DBR value) {
497 StringBuilder builder = new StringBuilder();
498 if (value == null) {
499 return "null";
500 }
501 if (value.getValue() instanceof double[]) {
502 builder.append(Arrays.toString((double[]) value.getValue()));
503 } else if (value.getValue() instanceof short[]) {
504 builder.append(Arrays.toString((short[]) value.getValue()));
505 } else if (value.getValue() instanceof String[]) {
506 builder.append(Arrays.toString((String[]) value.getValue()));
507 } else {
508 builder.append(value.getValue().toString());
509 }
510 return builder.toString();
511 }
512
513 private final MonitorListener monitorListener = new MonitorListener() {
514
515 @Override
516 public void monitorChanged(MonitorEvent event) {
517 synchronized(JCAChannelHandler.this) {
518 if (log.isLoggable(Level.FINEST)) {
519 log.log(Level.FINEST, "JCA value monitorChanged for channel {0} value {1}, event {2}", new Object[] {getChannelName(), toStringDBR(event.getDBR()), event});
520 }
521
522 DBR metadata = null;
523 if (getLastMessagePayload() != null) {
524 metadata = getLastMessagePayload().getMetadata();
525 }
526 processMessage(new JCAMessagePayload(metadata, event));
527 }
528 }
529 };
530
531 private final MonitorListener metadataListener = new MonitorListener() {
532
533 @Override
534 public void monitorChanged(MonitorEvent ev) {
535 synchronized(JCAChannelHandler.this) {
536 if (log.isLoggable(Level.FINEST)) {
537 log.log(Level.FINEST, "JCA metadata monitorChanged for channel {0} event {1}", new Object[] {getChannelName(), ev});
538 }
539
540
541 MonitorEvent event = null;
542 if (getLastMessagePayload() != null) {
543 event = getLastMessagePayload().getEvent();
544 }
545 processMessage(new JCAMessagePayload(ev.getDBR(), event));
546 }
547 }
548 };
549
550 @Override
551 public synchronized void disconnect() {
552 try {
553
554
555
556 if (channel.getConnectionState() != Channel.ConnectionState.CLOSED) {
557 channel.removeConnectionListener(connectionListener);
558 channel.destroy();
559 }
560 } catch (CAException ex) {
561 throw new RuntimeException("JCA Disconnect fail", ex);
562 } finally {
563 channel = null;
564 sentReadOnlyException = false;
565 processConnection(null);
566 }
567 }
568
569 @Override
570 public void write(Object newValue, final ChannelWriteCallback callback) {
571 try {
572 if (isPutCallback())
573 putWithCallback(newValue, callback);
574 else
575 put(newValue, callback);
576 } catch (CAException ex) {
577 callback.channelWritten(ex);
578 }
579 }
580
581 @Override
582 protected boolean isConnected(JCAConnectionPayload connPayload) {
583 return connPayload != null && connPayload.isChannelConnected();
584 }
585
586 @Override
587 protected boolean isWriteConnected(JCAConnectionPayload connPayload) {
588 return connPayload != null && connPayload.isWriteConnected();
589 }
590
591 @Override
592 protected synchronized void addWriter(ChannelHandlerWriteSubscription subscription) {
593 super.addWriter(subscription);
594
595 if (sentReadOnlyException) {
596 subscription.getExceptionWriteFunction().writeValue(createReadOnlyException());
597 }
598 }
599
600 private Exception createReadOnlyException() {
601 return new RuntimeException("'" + getJcaChannelName() + "' is read-only");
602 }
603
604 @Override
605 public synchronized Map<String, Object> getProperties() {
606 Map<String, Object> properties = new HashMap<String, Object>();
607 if (channel != null) {
608 properties.put("CA Channel name", channel.getName());
609 properties.put("CA Connection state", channel.getConnectionState().getName());
610 if (channel.getConnectionState() == Channel.ConnectionState.CONNECTED) {
611 properties.put("CA Hostname", channel.getHostName());
612 properties.put("CA Channel type", channel.getFieldType().getName());
613 properties.put("CA Element count", channel.getElementCount());
614 properties.put("CA Read access", channel.getReadAccess());
615 properties.put("CA Write access", channel.getWriteAccess());
616 }
617 properties.put("isLongString", isLongString());
618 properties.put("isPutCallback", isPutCallback());
619 properties.put("Connected", isConnected());
620 properties.put("Write Connected", isWriteConnected());
621 properties.put("Connection payload", getConnectionPayload());
622 properties.put("Last message payload", getLastMessagePayload());
623 }
624 return properties;
625 }
626
627 protected DBRType metadataFor(Channel channel) {
628 DBRType type = channel.getFieldType();
629
630 if (type.isBYTE() || type.isSHORT() || type.isINT() || type.isFLOAT() || type.isDOUBLE())
631 return DBR_CTRL_Double.TYPE;
632
633 if (type.isENUM())
634 return DBR_LABELS_Enum.TYPE;
635
636 return null;
637 }
638
639 protected int countFor(Channel channel) {
640 if (channel.getElementCount() == 1)
641 return 1;
642
643 if (jcaDataSource.isVarArraySupported())
644 return 0;
645 else
646 return channel.getElementCount();
647 }
648
649 static Pattern rtypeStringPattern = Pattern.compile(".+\\.RTYP.*");
650
651 protected DBRType valueTypeFor(Channel channel) {
652 DBRType type = channel.getFieldType();
653
654 if (type.isBYTE()) {
655 return DBR_TIME_Byte.TYPE;
656 } else if (type.isSHORT()) {
657 return DBR_TIME_Short.TYPE;
658 } else if (type.isINT()) {
659 return DBR_TIME_Int.TYPE;
660 } else if (type.isFLOAT()) {
661 return DBR_TIME_Float.TYPE;
662 } else if (type.isDOUBLE()) {
663 return DBR_TIME_Double.TYPE;
664 } else if (type.isENUM()) {
665 return DBR_TIME_Enum.TYPE;
666 } else if (type.isSTRING()) {
667 if (jcaDataSource.isRtypValueOnly() &&
668 rtypeStringPattern.matcher(channel.getName()).matches()) {
669 return DBR_String.TYPE;
670 }
671 return DBR_TIME_String.TYPE;
672 }
673
674 throw new IllegalArgumentException("Unsupported type " + type);
675 }
676
677
678
679
680
681
682
683 static byte[] toBytes(final String text) {
684
685
686
687 final byte[] bytes = new byte[text.length() + 1];
688 System.arraycopy(text.getBytes(), 0, bytes, 0, text.length());
689 bytes[text.length()] = '\0';
690 return bytes;
691 }
692
693
694
695
696
697
698
699 static String toString(byte[] data) {
700 int index = 0;
701 while (index < data.length && data[index] != '\0') {
702 index++;
703 }
704
705 return new String(data, 0, index);
706 }
707 }