View Javadoc
1   /**
2    * Copyright (C) 2010-14 pvmanager developers. See COPYRIGHT.TXT
3    * All rights reserved. Use is subject to license terms. See LICENSE.TXT
4    */
5   package org.epics.pvmanager;
6   
7   import java.util.*;
8   import java.util.Map.Entry;
9   import java.util.concurrent.ConcurrentHashMap;
10  import java.util.logging.Logger;
11  
12  /**
13   * A data source that can dispatch a request to multiple different
14   * data sources.
15   *
16   * @author carcassi
17   */
18  public class CompositeDataSource extends DataSource {
19      
20      private static Logger log = Logger.getLogger(CompositeDataSource.class.getName());
21  
22      // Stores all data sources by name
23      private Map<String, DataSource> dataSources = new ConcurrentHashMap<String, DataSource>();
24  
25      private volatile String delimiter = "://";
26      private volatile String defaultDataSource;
27  
28      /**
29       * Creates a new CompositeDataSource.
30       */
31      public CompositeDataSource() {
32          super(true);
33      }
34  
35      /**
36       * Returns the delimeter that divides the data source name from the
37       * channel name. Default is "://" so that "epics://pv1" corresponds
38       * to the "pv1" channel from the "epics" datasource.
39       *
40       * @return data source delimeter; can't be null
41       */
42      public String getDelimiter() {
43          return delimiter;
44      }
45  
46      /**
47       * Changes the data source delimiter.
48       *
49       * @param delimiter new data source delimiter; can't be null
50       */
51      public void setDelimiter(String delimiter) {
52          this.delimiter = delimiter;
53      }
54  
55      /**
56       * Adds/replaces the data source corresponding to the given name.
57       *
58       * @param name the name of the data source
59       * @param dataSource the data source to add/replace
60       */
61      public void putDataSource(String name, DataSource dataSource) {
62          dataSources.put(name, dataSource);
63      }
64  
65      /**
66       * Returns which data source is used if no data source is specified in the
67       * channel name.
68       *
69       * @return the default data source, or null if it was never set
70       */
71      public String getDefaultDataSource() {
72          return defaultDataSource;
73      }
74      
75      /**
76       * Returns the data sources registered to this composite data source.
77       * 
78       * @return the registered data sources
79       */
80      public Map<String, DataSource> getDataSources() {
81          return Collections.unmodifiableMap(dataSources);
82      }
83  
84      /**
85       * Sets the data source to be used if the channel does not specify
86       * one explicitely. The data source must have already been added.
87       *
88       * @param defaultDataSource the default data source
89       */
90      public void setDefaultDataSource(String defaultDataSource) {
91          if (!dataSources.containsKey(defaultDataSource))
92              throw new IllegalArgumentException("The data source " + defaultDataSource + " was not previously added, and therefore cannot be set as default");
93  
94          this.defaultDataSource = defaultDataSource;
95      }
96      
97      private String nameOf(String channelName) {
98          int indexDelimiter = channelName.indexOf(delimiter);
99          if (indexDelimiter == -1) {
100             return channelName;
101         } else {
102             return channelName.substring(indexDelimiter + delimiter.length());
103         }
104     }
105     
106     private String sourceOf(String channelName) {
107         int indexDelimiter = channelName.indexOf(delimiter);
108         if (indexDelimiter == -1) {
109             if (defaultDataSource == null)
110                 throw new IllegalArgumentException("Channel " + channelName + " uses the default data source but one was never set.");
111             return defaultDataSource;
112         } else {
113             String source = channelName.substring(0, indexDelimiter);
114             if (dataSources.containsKey(source))
115                 return source;
116             throw new IllegalArgumentException("Data source " + source + " for " + channelName + " was not configured.");
117         }
118     }
119     
120     private Map<String, ReadRecipe> splitRecipe(ReadRecipe readRecipe) {
121         Map<String, ReadRecipe> splitRecipe = new HashMap<String, ReadRecipe>();
122 
123         // Iterate through the recipe to understand how to distribute
124         // the calls
125         Map<String, Collection<ChannelReadRecipe>> routingRecipes = new HashMap<String, Collection<ChannelReadRecipe>>();
126         for (ChannelReadRecipe channelRecipe : readRecipe.getChannelReadRecipes()) {
127             String name = nameOf(channelRecipe.getChannelName());
128             String dataSource = sourceOf(channelRecipe.getChannelName());
129 
130             if (dataSource == null)
131                 throw new IllegalArgumentException("Channel " + name + " uses the default data source but one was never set.");
132 
133             // Add recipe for the target dataSource
134             if (routingRecipes.get(dataSource) == null) {
135                 routingRecipes.put(dataSource, new HashSet<ChannelReadRecipe>());
136             }
137             routingRecipes.get(dataSource).add(new ChannelReadRecipe(name, channelRecipe.getReadSubscription()));
138         }
139         
140         // Create the recipes
141         for (Entry<String, Collection<ChannelReadRecipe>> entry : routingRecipes.entrySet()) {
142             splitRecipe.put(entry.getKey(), new ReadRecipe(entry.getValue()));
143         }
144         
145         return splitRecipe;
146     }
147 
148     @Override
149     public void connectRead(ReadRecipe readRecipe) {
150         Map<String, ReadRecipe> splitRecipe = splitRecipe(readRecipe);
151 
152         // Dispatch calls to all the data sources
153         for (Map.Entry<String, ReadRecipe> entry : splitRecipe.entrySet()) {
154             try {
155                 DataSource dataSource = dataSources.get(entry.getKey());
156                 if (dataSource == null) {
157                     throw new IllegalArgumentException("DataSource '" + entry.getKey() + delimiter + "' was not configured.");
158                 }
159                 dataSource.connectRead(entry.getValue());
160             } catch (RuntimeException ex) {
161                 // If data source fail, still go and connect the others
162                 readRecipe.getChannelReadRecipes().iterator().next().getReadSubscription().getExceptionWriteFunction().writeValue(ex);
163             }
164         }
165     }
166 
167     @Override
168     public void disconnectRead(ReadRecipe readRecipe) {
169         Map<String, ReadRecipe> splitRecipe = splitRecipe(readRecipe);
170 
171         // Dispatch calls to all the data sources
172         for (Map.Entry<String, ReadRecipe> entry : splitRecipe.entrySet()) {
173             try {
174                 dataSources.get(entry.getKey()).disconnectRead(entry.getValue());
175             } catch(RuntimeException ex) {
176                 // If a data source fails, still go and disconnect the others
177                 readRecipe.getChannelReadRecipes().iterator().next().getReadSubscription().getExceptionWriteFunction().writeValue(ex);
178             }
179         }
180     }
181     
182     private Map<String, WriteRecipe> splitRecipe(WriteRecipe writeRecipe) {
183         // Chop the recipe along different data sources
184         Map<String, Collection<ChannelWriteRecipe>> recipes = new HashMap<String, Collection<ChannelWriteRecipe>>();
185         for (ChannelWriteRecipe channelWriteRecipe : writeRecipe.getChannelWriteRecipes()) {
186             String channelName = nameOf(channelWriteRecipe.getChannelName());
187             String dataSource = sourceOf(channelWriteRecipe.getChannelName());
188             Collection<ChannelWriteRecipe> channelWriteRecipes = recipes.get(dataSource);
189             if (channelWriteRecipes == null) {
190                 channelWriteRecipes = new ArrayList<ChannelWriteRecipe>();
191                 recipes.put(dataSource, channelWriteRecipes);
192             }
193             channelWriteRecipes.add(new ChannelWriteRecipe(channelName, channelWriteRecipe.getWriteSubscription()));
194         }
195         
196         Map<String, WriteRecipe> splitRecipes = new HashMap<String, WriteRecipe>();
197         for (Map.Entry<String, Collection<ChannelWriteRecipe>> en : recipes.entrySet()) {
198             String dataSource = en.getKey();
199             Collection<ChannelWriteRecipe> val = en.getValue();
200             WriteRecipe newWriteRecipe = new WriteRecipe(val);
201             splitRecipes.put(dataSource, newWriteRecipe);
202         }
203         
204         return splitRecipes;
205     }
206 
207     @Override
208     public void connectWrite(WriteRecipe writeRecipe) {
209         Map<String, WriteRecipe> splitRecipes = splitRecipe(writeRecipe);
210         for (Entry<String, WriteRecipe> entry : splitRecipes.entrySet()) {
211             String dataSource = entry.getKey();
212             WriteRecipe splitWriteRecipe = entry.getValue();
213             dataSources.get(dataSource).connectWrite(splitWriteRecipe);
214         }
215     }
216 
217     @Override
218     public void disconnectWrite(WriteRecipe writeRecipe) {
219         Map<String, WriteRecipe> splitRecipe = splitRecipe(writeRecipe);
220         
221         for (Map.Entry<String, WriteRecipe> en : splitRecipe.entrySet()) {
222             String dataSource = en.getKey();
223             WriteRecipe splitWriteRecipe = en.getValue();
224             dataSources.get(dataSource).disconnectWrite(splitWriteRecipe);
225         }
226     }
227     
228 
229     @Override
230     ChannelHandler channel(String channelName) {
231         String name = nameOf(channelName);
232         String dataSource = sourceOf(channelName);
233         return dataSources.get(dataSource).channel(name);
234     }
235     
236     @Override
237     protected ChannelHandler createChannel(String channelName) {
238         throw new UnsupportedOperationException("Composite data source can't create channels directly.");
239     }
240 
241     /**
242      * Closes all DataSources that are registered in the composite.
243      */
244     @Override
245     public void close() {
246         for (DataSource dataSource : dataSources.values()) {
247             dataSource.close();
248         }
249     }
250 
251     @Override
252     public Map<String, ChannelHandler> getChannels() {
253         Map<String, ChannelHandler> channels = new HashMap<String, ChannelHandler>();
254         for (Entry<String, DataSource> entry : dataSources.entrySet()) {
255             String dataSourceName = entry.getKey();
256             DataSource dataSource = entry.getValue();
257             for (Entry<String, ChannelHandler> channelEntry : dataSource.getChannels().entrySet()) {
258                 String channelName = channelEntry.getKey();
259                 ChannelHandler channelHandler = channelEntry.getValue();
260                 channels.put(dataSourceName + delimiter + channelName, channelHandler);
261             }
262         }
263         
264         return channels;
265     }
266 
267 }