View Javadoc
1   /**
2    * Copyright (C) 2010-14 pvmanager developers. See COPYRIGHT.TXT
3    * All rights reserved. Use is subject to license terms. See LICENSE.TXT
4    */
5   package org.epics.pvmanager.jca;
6   
7   import org.epics.pvmanager.MultiplexedChannelHandler;
8   import org.epics.pvmanager.ChannelWriteCallback;
9   import org.epics.pvmanager.ValueCache;
10  import gov.aps.jca.CAException;
11  import gov.aps.jca.Channel;
12  import gov.aps.jca.Monitor;
13  import gov.aps.jca.dbr.*;
14  import gov.aps.jca.event.AccessRightsEvent;
15  import gov.aps.jca.event.AccessRightsListener;
16  import gov.aps.jca.event.ConnectionEvent;
17  import gov.aps.jca.event.ConnectionListener;
18  import gov.aps.jca.event.GetEvent;
19  import gov.aps.jca.event.GetListener;
20  import gov.aps.jca.event.MonitorEvent;
21  import gov.aps.jca.event.MonitorListener;
22  import gov.aps.jca.event.PutEvent;
23  import gov.aps.jca.event.PutListener;
24  import java.util.Arrays;
25  import java.util.HashMap;
26  import java.util.Map;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  import java.util.logging.Level;
29  import java.util.logging.Logger;
30  import java.util.regex.Matcher;
31  import java.util.regex.Pattern;
32  import org.epics.pvmanager.*;
33  import org.epics.util.array.CollectionNumbers;
34  import org.epics.util.array.ListNumber;
35  
36  /**
37   * A ChannelHandler for the JCADataSource.
38   * <p>
39   * NOTE: this class is extensible as per Bastian request so that DESY can hook
40   * a different type factory. This is a temporary measure until the problem
41   * is solved in better, more general way, so that data sources
42   * can work only with data source specific types, while allowing
43   * conversions to normalized type through operators. The contract of this
44   * class is, therefore, expected to change.
45   * <p>
46   * Related changes are marked so that they are not accidentally removed in the
47   * meantime, and can be intentionally removed when a better solution is implemented.
48   *
49   * @author carcassi
50   */
51  class JCAChannelHandler extends MultiplexedChannelHandler<JCAConnectionPayload, JCAMessagePayload> {
52  
53      private static final int LARGE_ARRAY = 100000;
54      private final JCADataSource jcaDataSource;
55      private final String jcaChannelName;
56      // TODO: probably all volatile members could be guarded by this
57      private volatile Channel channel;
58      // TODO: needs monitor can probably be removed
59      private volatile boolean needsMonitor;
60      private Monitor valueMonitor;
61      private Monitor metadataMonitor;
62      private volatile boolean largeArray = false;
63      private volatile boolean sentReadOnlyException = false;
64      private final boolean putCallback;
65      private final boolean longString;
66      
67      // For the AccessChaneListener we need to guard it differently
68      private final AtomicBoolean needsAccessChangeListener = new AtomicBoolean(false);
69      
70      public static Pattern longStringPattern = Pattern.compile(".+\\..*\\$.*");
71      private final static Pattern hasOptions = Pattern.compile("(.*) (\\{.*\\})");
72      
73      private final static Logger log = Logger.getLogger(JCAChannelHandler.class.getName());
74  
75      public JCAChannelHandler(String channelName, JCADataSource jcaDataSource) {
76          super(channelName);
77          setProcessMessageOnReconnect(false);
78          this.jcaDataSource = jcaDataSource;
79          
80          boolean longStringName = longStringPattern.matcher(channelName).matches();
81          
82          // Parse parameters
83          // Done here so that they can be immutable
84          Matcher matcher = hasOptions.matcher(getChannelName());
85          if (matcher.matches()) {
86              jcaChannelName = matcher.group(1);
87              String clientOptions = matcher.group(2);
88              // TODO: Hack, this should have a real JSON parser
89              switch (clientOptions) {
90                  case "{\"putCallback\":true}":
91                      putCallback = true;
92                      longString = longStringName;
93                      break;
94                  case "{\"putCallback\":false}":
95                      putCallback = false;
96                      longString = longStringName;
97                      break;
98                  case "{\"longString\":true}":
99                      putCallback = false;
100                     longString = true;
101                     break;
102                 case "{\"longString\":false}":
103                     putCallback = false;
104                     longString = false;
105                     break;
106                 default:
107                     throw new IllegalArgumentException("Option not recognized for " + getChannelName());
108             }
109         } else {
110             longString = longStringName;
111             putCallback = false;
112             jcaChannelName = channelName;
113         }
114     }
115 
116     /**
117      * Whether this channel should be written using a put callback.
118      * 
119      * @return true if a put callback should be used
120      */
121     public boolean isPutCallback() {
122         return putCallback;
123     }
124 
125     /**
126      * Return whether this channel should be treated as a long string,
127      * meaning a BYTE[] that really represents an encoded string.
128      * 
129      * @return true if the channel should be handled as a long string
130      */
131     public boolean isLongString() {
132         return longString;
133     }
134 
135     /**
136      * The datasource this channel refers to.
137      * 
138      * @return a jca data source
139      */
140     public JCADataSource getJcaDataSource() {
141         return jcaDataSource;
142     }
143  
144     /**
145      * The name used for the actual connection.
146      * 
147      * @return the name of the ca channel
148      */
149     public String getJcaChannelName() {
150         return jcaChannelName;
151     }
152     
153     @Override
154     protected JCATypeAdapter findTypeAdapter(ValueCache<?> cache, JCAConnectionPayload connPayload) {
155         return jcaDataSource.getTypeSupport().find(cache, connPayload);
156     }
157 
158     @Override
159     public synchronized void connect() {
160         needsMonitor = true;
161         needsAccessChangeListener.set(true);
162         
163         try {
164             // Give the listener right away so that no event gets lost
165 	    // If it's a large array, connect using lower priority
166 	    if (largeArray) {
167                 channel = jcaDataSource.getContext().createChannel(getJcaChannelName(), connectionListener, Channel.PRIORITY_MIN);
168 	    } else {
169                 channel = jcaDataSource.getContext().createChannel(getJcaChannelName(), connectionListener, (short) (Channel.PRIORITY_MIN + 1));
170 	    }
171         } catch (CAException ex) {
172             throw new RuntimeException("JCA Connection failed", ex);
173         }
174     }
175 
176     private void putWithCallback(Object newValue, final ChannelWriteCallback callback) throws CAException {
177         PutListener listener = new PutListener() {
178 
179             @Override
180             public void putCompleted(PutEvent ev) {
181                 if (log.isLoggable(Level.FINEST)) {
182                     log.log(Level.FINEST, "JCA putCompleted for channel {0} event {1}", new Object[] {getChannelName(), ev});
183                 }
184                 
185                 if (ev.getStatus().isSuccessful()) {
186                     callback.channelWritten(null);
187                 } else {
188                     callback.channelWritten(new Exception(ev.toString()));
189                 }
190             }
191         };
192         // If it's a ListNumber, extract the array
193         if (newValue instanceof ListNumber) {
194             ListNumber data = (ListNumber) newValue;
195             Object wrappedArray = CollectionNumbers.wrappedArray(data);
196             if (wrappedArray == null) {
197                 newValue = CollectionNumbers.doubleArrayCopyOf(data);
198             } else {
199                 newValue = wrappedArray;
200             }
201         }
202         if (newValue instanceof String) {
203             if (isLongString()) {
204                 channel.put(toBytes(newValue.toString()), listener);
205             } else {
206                 if (channel.getFieldType().isBYTE() && channel.getElementCount() > 1) {
207                     log.warning("You are writing the String " + newValue + " to BYTE channel " + getChannelName() + ": use {\"longString\":true} for support");
208                     channel.put(toBytes(newValue.toString()), listener);
209                 } else {
210                     channel.put(newValue.toString(), listener);
211                 }
212             }
213         } else if (newValue instanceof byte[]) {
214             channel.put((byte[]) newValue, listener);
215         } else if (newValue instanceof short[]) {
216             channel.put((short[]) newValue, listener);
217         } else if (newValue instanceof int[]) {
218             channel.put((int[]) newValue, listener);
219         } else if (newValue instanceof float[]) {
220             channel.put((float[]) newValue, listener);
221         } else if (newValue instanceof double[]) {
222             channel.put((double[]) newValue, listener);
223         } else if (newValue instanceof Byte) {
224             channel.put((Byte) newValue, listener);
225         } else if (newValue instanceof Short) {
226             channel.put((Short) newValue, listener);
227         } else if (newValue instanceof Integer) {
228             channel.put((Integer) newValue, listener);
229         } else if (newValue instanceof Long) {
230             // XXX: Channel access does not support 64 bit integers
231             // If fits 32 bits, use int. Use double otherwise
232             long value64 = (Long) newValue;
233             int value32 = (int) value64;
234             if (value32 == value64) {
235                 channel.put(value32, listener);
236             } else {
237                 channel.put((double) value64, listener);
238             }
239         } else if (newValue instanceof Float) {
240             channel.put((Float) newValue, listener);
241         } else if (newValue instanceof Double) {
242             channel.put((Double) newValue, listener);
243         } else {
244             throw new RuntimeException("Unsupported type for CA: " + newValue.getClass());
245         }
246         jcaDataSource.getContext().flushIO();
247     }
248 
249     private void put(Object newValue, final ChannelWriteCallback callback) throws CAException {
250         // If it's a ListNumber, extract the array
251         if (newValue instanceof ListNumber) {
252             ListNumber data = (ListNumber) newValue;
253             Object wrappedArray = CollectionNumbers.wrappedArray(data);
254             if (wrappedArray == null) {
255                 newValue = CollectionNumbers.doubleArrayCopyOf(data);
256             } else {
257                 newValue = wrappedArray;
258             }
259         }
260         if (newValue instanceof Double[]) {
261             log.warning("You are writing a Double[] to channel " + getChannelName() + ": use org.epics.util.array.ListDouble instead");
262             final Double dbl[] = (Double[]) newValue;
263             final double val[] = new double[dbl.length];
264             for (int i = 0; i < val.length; ++i) {
265                 val[i] = dbl[i].doubleValue();
266             }
267             newValue = val;
268         }
269         if (newValue instanceof Integer[]) {
270             log.warning("You are writing a Integer[] to channel " + getChannelName() + ": use org.epics.util.array.ListInt instead");
271             final Integer ival[] = (Integer[]) newValue;
272             final int val[] = new int[ival.length];
273             for (int i = 0; i < val.length; ++i) {
274                 val[i] = ival[i].intValue();
275             }
276             newValue = val;
277         }
278         
279         if (newValue instanceof String) {
280             if (isLongString()) {
281                 channel.put(toBytes(newValue.toString()));
282             } else {
283                 if (channel.getFieldType().isBYTE() && channel.getElementCount() > 1) {
284                     log.warning("You are writing the String " + newValue + " to BYTE channel " + getChannelName() + ": use {\"longString\":true} for support");
285                     channel.put(toBytes(newValue.toString()));
286                 } else {
287                     channel.put(newValue.toString());
288                 }
289             }
290         } else if (newValue instanceof byte[]) {
291             channel.put((byte[]) newValue);
292         } else if (newValue instanceof short[]) {
293             channel.put((short[]) newValue);
294         } else if (newValue instanceof int[]) {
295             channel.put((int[]) newValue);
296         } else if (newValue instanceof float[]) {
297             channel.put((float[]) newValue);
298         } else if (newValue instanceof double[]) {
299             channel.put((double[]) newValue);
300         } else if (newValue instanceof Byte) {
301             channel.put((Byte) newValue);
302         } else if (newValue instanceof Short) {
303             channel.put((Short) newValue);
304         } else if (newValue instanceof Integer) {
305             channel.put((Integer) newValue);
306         } else if (newValue instanceof Long) {
307             // XXX: Channel access does not support 64 bit integers
308             // If fits 32 bits, use int. Use double otherwise
309             long value64 = (Long) newValue;
310             int value32 = (int) value64;
311             if (value32 == value64) {
312                 channel.put(value32);
313             } else {
314                 channel.put((double) value64);
315             }
316         } else if (newValue instanceof Float) {
317             channel.put((Float) newValue);
318         } else if (newValue instanceof Double) {
319             channel.put((Double) newValue);
320         } else {
321             callback.channelWritten(new Exception(new RuntimeException("Unsupported type for CA: " + newValue.getClass())));
322             return;
323         }
324         jcaDataSource.getContext().flushIO();
325         callback.channelWritten(null);
326     }
327 
328     private void setup(Channel channel) throws CAException {
329         DBRType metaType = metadataFor(channel);
330         
331         // If metadata is needed, get it
332         if (metaType != null) {
333             // Need to use callback for the listener instead of doing a synchronous get
334             // (which seemed to perform better) because JCA (JNI implementation)
335             // would return an empty list of labels for the Enum metadata
336             channel.get(metaType, 1, new GetListener() {
337 
338                 @Override
339                 public void getCompleted(GetEvent ev) {
340                     synchronized(JCAChannelHandler.this) {
341                         if (log.isLoggable(Level.FINEST)) {
342                             log.log(Level.FINEST, "JCA metadata getCompleted for channel {0} event {1}", new Object[] {getChannelName(), ev});
343                         }
344                         
345                         // In case the metadata arrives after the monitor
346                         MonitorEvent event = null;
347                         if (getLastMessagePayload() != null) {
348                             event = getLastMessagePayload().getEvent();
349                         }
350                         processMessage(new JCAMessagePayload(ev.getDBR(), event));
351                     }
352                 }
353             });
354         }
355 
356         if (needsMonitor) {
357             // At each (re)connect, we need to create a new monitor:
358             // since the type could be changed, we would have a type mismatch
359             // between the current type and the old type when the monitor was
360             // created
361             
362             // XXX: Ideally, we would destroy the monitor on reconnect,
363             // but currently this does not work with CAJ (you get an
364             // IllegalStateException because the transport is not there
365             // anymore). So, for now, we destroy the monitor during the 
366             // the connection callback.
367             
368             // XXX: Ideally, we should just close (clear) the monitor, but
369             // this would cause one last event to reach the monitorListener.
370             // So, we remove the monitorListener right before the clear.
371             
372             // TODO: we could remember the previous type, and reconnect
373             // only if the type actually changed
374             if (valueMonitor != null) {
375                 valueMonitor.removeMonitorListener(monitorListener);
376                 valueMonitor.clear();
377                 valueMonitor = null;
378             }
379             
380             valueMonitor = channel.addMonitor(valueTypeFor(channel), countFor(channel), jcaDataSource.getMonitorMask(), monitorListener);
381             needsMonitor = false;
382         }
383 
384         // Remove current metadata monitor
385         if (metadataMonitor != null) {
386             metadataMonitor.removeMonitorListener(metadataListener);
387             metadataMonitor.clear();
388             metadataMonitor = null;
389         }
390         
391         // Setup metadata monitor if required
392         if (jcaDataSource.isDbePropertySupported() && metaType != null) {
393             metadataMonitor = channel.addMonitor(metaType, 1, Monitor.PROPERTY, metadataListener);
394         }
395 
396         // Flush the entire context (it's the best we can do)
397         channel.getContext().flushIO();
398     }
399     
400     private final ConnectionListener connectionListener = new ConnectionListener() {
401 
402             @Override
403             public void connectionChanged(ConnectionEvent ev) {
404                 synchronized(JCAChannelHandler.this) {
405                     try {
406                         if (log.isLoggable(Level.FINEST)) {
407                             log.log(Level.FINEST, "JCA connectionChanged for channel {0} event {1}", new Object[] {getChannelName(), ev});
408                         }
409 
410                         // Take the channel from the event so that there is no
411                         // synchronization problem
412                         Channel channel = (Channel) ev.getSource();
413 
414                         // Check whether the channel is large and was opened
415                         // as large. Reconnect if does not match
416                         if (ev.isConnected() && channel.getElementCount() >= LARGE_ARRAY && !largeArray) {
417                             disconnect();
418                             largeArray = true;
419                             connect();
420                             return;
421                         }
422 
423                         processConnection(new JCAConnectionPayload(JCAChannelHandler.this, channel, getConnectionPayload()));
424                         if (ev.isConnected()) {
425                             // If connected, no write access and exception was not sent, notify writers
426                             if (!channel.getWriteAccess() && !sentReadOnlyException) {
427                                 reportExceptionToAllWriters(createReadOnlyException());
428                                 sentReadOnlyException = true;
429                             }
430                             
431                             // Setup monitors on connection
432                             setup(channel);
433                         } else {
434                             resetMessage();
435                             // Next connection, resend the read only exception if that's the case
436                             sentReadOnlyException = false;
437                             needsMonitor = true;
438                         }
439                         
440                     } catch (Exception ex) {
441                         reportExceptionToAllReadersAndWriters(ex);
442                     }
443                 }
444                 
445                 // XXX: because of the JNI implementation this section cannot
446                 // be part of the previous atomic section. The problem is that
447                 // adding the listener causes the listener to be called
448                 // right away on a different thread, and the addAccessRightsListener
449                 // seems to return only after the event is processed. This
450                 // means that you cannot serialize the addListener call
451                 // and the listener callback.
452                 // Since we have to make a choice between having either the add
453                 // or the callback properly synchronized, we choose the callback
454                 boolean addListener = needsAccessChangeListener.getAndSet(false);
455                 if (addListener) {
456                     try {
457                         Channel channel = (Channel) ev.getSource();
458                         channel.addAccessRightsListener(new AccessRightsListener() {
459 
460                             @Override
461                             public void accessRightsChanged(AccessRightsEvent ev) {
462                                 if (log.isLoggable(Level.FINEST)) {
463                                     log.log(Level.FINEST, "JCA accessRightsChanged for channel {0} event {1}", new Object[] {getChannelName(), ev});
464                                 }
465                                 
466                                 // Some JNI implementation lock if calling getState
467                                 // from within this callback. We context switch in that case
468                                 final Channel channel = (Channel) ev.getSource();
469                                 Runnable task = new Runnable() {
470 
471                                     @Override
472                                     public void run() {
473                                         synchronized(JCAChannelHandler.this) {
474                                             processConnection(new JCAConnectionPayload(JCAChannelHandler.this, channel, getConnectionPayload()));
475                                             if (!sentReadOnlyException && !channel.getWriteAccess()) {
476                                                 reportExceptionToAllWriters(createReadOnlyException());
477                                                 sentReadOnlyException = true;
478                                             }
479                                         }
480                                     }
481                                 };
482                                 if (jcaDataSource.useContextSwitchForAccessRightCallback()) {
483                                     jcaDataSource.getContextSwitch().submit(task);
484                                 } else {
485                                     task.run();
486                                 }
487                             }
488                         });
489                     } catch (Exception ex) {
490                         reportExceptionToAllReadersAndWriters(ex);
491                     }
492                 }
493             }
494         };;
495     
496     private String toStringDBR(DBR value) {
497         StringBuilder builder = new StringBuilder();
498         if (value == null) {
499             return "null";
500         }
501         if (value.getValue() instanceof double[]) {
502             builder.append(Arrays.toString((double[]) value.getValue()));
503         } else if (value.getValue() instanceof short[]) {
504             builder.append(Arrays.toString((short[]) value.getValue()));
505         } else if (value.getValue() instanceof String[]) {
506             builder.append(Arrays.toString((String[]) value.getValue()));
507         } else {
508             builder.append(value.getValue().toString());
509         }
510         return builder.toString();
511     }
512     
513     private final MonitorListener monitorListener = new MonitorListener() {
514 
515         @Override
516         public void monitorChanged(MonitorEvent event) {
517             synchronized(JCAChannelHandler.this) {
518                 if (log.isLoggable(Level.FINEST)) {
519                     log.log(Level.FINEST, "JCA value monitorChanged for channel {0} value {1}, event {2}", new Object[] {getChannelName(), toStringDBR(event.getDBR()), event});
520                 }
521                 
522                 DBR metadata = null;
523                 if (getLastMessagePayload() != null) {
524                     metadata = getLastMessagePayload().getMetadata();
525                 }
526                 processMessage(new JCAMessagePayload(metadata, event));
527             }
528         }
529     };
530     
531     private final MonitorListener metadataListener = new MonitorListener() {
532 
533         @Override
534         public void monitorChanged(MonitorEvent ev) {
535             synchronized(JCAChannelHandler.this) {
536                 if (log.isLoggable(Level.FINEST)) {
537                     log.log(Level.FINEST, "JCA metadata monitorChanged for channel {0} event {1}", new Object[] {getChannelName(), ev});
538                 }
539 
540                 // In case the metadata arrives after the monitor
541                 MonitorEvent event = null;
542                 if (getLastMessagePayload() != null) {
543                     event = getLastMessagePayload().getEvent();
544                 }
545                 processMessage(new JCAMessagePayload(ev.getDBR(), event));
546             }
547         }
548     };
549 
550     @Override
551     public synchronized void disconnect() {
552         try {
553             // Close the channel
554             // Need to guard because the channel may be closed if the
555             // context was already destroyed
556             if (channel.getConnectionState() != Channel.ConnectionState.CLOSED) {
557                 channel.removeConnectionListener(connectionListener);
558                 channel.destroy();
559             }
560         } catch (CAException ex) {
561             throw new RuntimeException("JCA Disconnect fail", ex);
562         } finally {
563             channel = null;
564             sentReadOnlyException = false;
565             processConnection(null);
566         }
567     }
568 
569     @Override
570     public void write(Object newValue, final ChannelWriteCallback callback) {
571         try {
572             if (isPutCallback())
573                 putWithCallback(newValue, callback);
574             else
575                 put(newValue, callback);
576         } catch (CAException ex) {
577             callback.channelWritten(ex);
578         }
579     }
580     
581     @Override
582     protected boolean isConnected(JCAConnectionPayload connPayload) {
583         return connPayload != null && connPayload.isChannelConnected();
584     }
585 
586     @Override
587     protected boolean isWriteConnected(JCAConnectionPayload connPayload) {
588         return connPayload != null && connPayload.isWriteConnected();
589     }
590 
591     @Override
592     protected synchronized void addWriter(ChannelHandlerWriteSubscription subscription) {
593         super.addWriter(subscription);
594         // If already connected and read only, we need to notify this writer
595         if (sentReadOnlyException) {
596             subscription.getExceptionWriteFunction().writeValue(createReadOnlyException());
597         }
598     }
599     
600     private Exception createReadOnlyException() {
601         return new RuntimeException("'" + getJcaChannelName() + "' is read-only");
602     }
603 
604     @Override
605     public synchronized Map<String, Object> getProperties() {
606         Map<String, Object> properties = new HashMap<String, Object>();
607         if (channel != null) {
608             properties.put("CA Channel name", channel.getName());
609             properties.put("CA Connection state", channel.getConnectionState().getName());
610             if (channel.getConnectionState() == Channel.ConnectionState.CONNECTED) {
611                 properties.put("CA Hostname", channel.getHostName());
612                 properties.put("CA Channel type", channel.getFieldType().getName());
613                 properties.put("CA Element count", channel.getElementCount());
614                 properties.put("CA Read access", channel.getReadAccess());
615                 properties.put("CA Write access", channel.getWriteAccess());
616             }
617             properties.put("isLongString", isLongString());
618             properties.put("isPutCallback", isPutCallback());
619             properties.put("Connected", isConnected());
620             properties.put("Write Connected", isWriteConnected());
621             properties.put("Connection payload", getConnectionPayload());
622             properties.put("Last message payload", getLastMessagePayload());
623         }
624         return properties;
625     }
626 
627     protected DBRType metadataFor(Channel channel) {
628         DBRType type = channel.getFieldType();
629         
630         if (type.isBYTE() || type.isSHORT() || type.isINT() || type.isFLOAT() || type.isDOUBLE())
631             return DBR_CTRL_Double.TYPE;
632         
633         if (type.isENUM())
634             return DBR_LABELS_Enum.TYPE;
635         
636         return null;
637     }
638 
639     protected int countFor(Channel channel) {
640         if (channel.getElementCount() == 1)
641             return 1;
642         
643         if (jcaDataSource.isVarArraySupported())
644             return 0;
645         else
646             return channel.getElementCount();
647     }
648     
649     static Pattern rtypeStringPattern = Pattern.compile(".+\\.RTYP.*");
650 
651     protected DBRType valueTypeFor(Channel channel) {
652         DBRType type = channel.getFieldType();
653         
654         if (type.isBYTE()) {
655             return DBR_TIME_Byte.TYPE;
656         } else if (type.isSHORT()) {
657             return DBR_TIME_Short.TYPE;
658         } else if (type.isINT()) {
659             return DBR_TIME_Int.TYPE;
660         } else if (type.isFLOAT()) {
661             return DBR_TIME_Float.TYPE;
662         } else if (type.isDOUBLE()) {
663             return DBR_TIME_Double.TYPE;
664         } else if (type.isENUM()) {
665             return DBR_TIME_Enum.TYPE;
666         } else if (type.isSTRING()) {
667             if (jcaDataSource.isRtypValueOnly() &&
668                     rtypeStringPattern.matcher(channel.getName()).matches()) {
669                 return DBR_String.TYPE;
670             }
671             return DBR_TIME_String.TYPE;
672         }
673         
674         throw new IllegalArgumentException("Unsupported type " + type);
675     }
676     
677     /**
678      * Converts a String into byte array.
679      * 
680      * @param text the string to be converted
681      * @return byte array, always including '\0' termination
682      */
683     static byte[] toBytes(final String text) {
684         // TODO: it's unclear what encoding is used and how
685         
686         // Write string as byte array WITH '\0' TERMINATION!
687         final byte[] bytes = new byte[text.length() + 1];
688         System.arraycopy(text.getBytes(), 0, bytes, 0, text.length());
689         bytes[text.length()] = '\0';
690         return bytes;
691     }
692     
693     /**
694      * Converts a byte array into a String. It
695      * 
696      * @param data the array to be converted
697      * @return the string
698      */
699     static String toString(byte[] data) {
700         int index = 0;
701         while (index < data.length && data[index] != '\0') {
702             index++;
703         }
704         
705         return new String(data, 0, index);
706     }
707 }