1
2
3
4
5 package org.epics.pvmanager;
6
7 import java.util.*;
8 import java.util.Map.Entry;
9 import java.util.concurrent.ConcurrentHashMap;
10 import java.util.logging.Logger;
11
12
13
14
15
16
17
18 public class CompositeDataSource extends DataSource {
19
20 private static Logger log = Logger.getLogger(CompositeDataSource.class.getName());
21
22
23 private Map<String, DataSource> dataSources = new ConcurrentHashMap<String, DataSource>();
24
25 private volatile String delimiter = "://";
26 private volatile String defaultDataSource;
27
28
29
30
31 public CompositeDataSource() {
32 super(true);
33 }
34
35
36
37
38
39
40
41
42 public String getDelimiter() {
43 return delimiter;
44 }
45
46
47
48
49
50
51 public void setDelimiter(String delimiter) {
52 this.delimiter = delimiter;
53 }
54
55
56
57
58
59
60
61 public void putDataSource(String name, DataSource dataSource) {
62 dataSources.put(name, dataSource);
63 }
64
65
66
67
68
69
70
71 public String getDefaultDataSource() {
72 return defaultDataSource;
73 }
74
75
76
77
78
79
80 public Map<String, DataSource> getDataSources() {
81 return Collections.unmodifiableMap(dataSources);
82 }
83
84
85
86
87
88
89
90 public void setDefaultDataSource(String defaultDataSource) {
91 if (!dataSources.containsKey(defaultDataSource))
92 throw new IllegalArgumentException("The data source " + defaultDataSource + " was not previously added, and therefore cannot be set as default");
93
94 this.defaultDataSource = defaultDataSource;
95 }
96
97 private String nameOf(String channelName) {
98 int indexDelimiter = channelName.indexOf(delimiter);
99 if (indexDelimiter == -1) {
100 return channelName;
101 } else {
102 return channelName.substring(indexDelimiter + delimiter.length());
103 }
104 }
105
106 private String sourceOf(String channelName) {
107 int indexDelimiter = channelName.indexOf(delimiter);
108 if (indexDelimiter == -1) {
109 if (defaultDataSource == null)
110 throw new IllegalArgumentException("Channel " + channelName + " uses the default data source but one was never set.");
111 return defaultDataSource;
112 } else {
113 String source = channelName.substring(0, indexDelimiter);
114 if (dataSources.containsKey(source))
115 return source;
116 throw new IllegalArgumentException("Data source " + source + " for " + channelName + " was not configured.");
117 }
118 }
119
120 private Map<String, ReadRecipe> splitRecipe(ReadRecipe readRecipe) {
121 Map<String, ReadRecipe> splitRecipe = new HashMap<String, ReadRecipe>();
122
123
124
125 Map<String, Collection<ChannelReadRecipe>> routingRecipes = new HashMap<String, Collection<ChannelReadRecipe>>();
126 for (ChannelReadRecipe channelRecipe : readRecipe.getChannelReadRecipes()) {
127 String name = nameOf(channelRecipe.getChannelName());
128 String dataSource = sourceOf(channelRecipe.getChannelName());
129
130 if (dataSource == null)
131 throw new IllegalArgumentException("Channel " + name + " uses the default data source but one was never set.");
132
133
134 if (routingRecipes.get(dataSource) == null) {
135 routingRecipes.put(dataSource, new HashSet<ChannelReadRecipe>());
136 }
137 routingRecipes.get(dataSource).add(new ChannelReadRecipe(name, channelRecipe.getReadSubscription()));
138 }
139
140
141 for (Entry<String, Collection<ChannelReadRecipe>> entry : routingRecipes.entrySet()) {
142 splitRecipe.put(entry.getKey(), new ReadRecipe(entry.getValue()));
143 }
144
145 return splitRecipe;
146 }
147
148 @Override
149 public void connectRead(ReadRecipe readRecipe) {
150 Map<String, ReadRecipe> splitRecipe = splitRecipe(readRecipe);
151
152
153 for (Map.Entry<String, ReadRecipe> entry : splitRecipe.entrySet()) {
154 try {
155 DataSource dataSource = dataSources.get(entry.getKey());
156 if (dataSource == null) {
157 throw new IllegalArgumentException("DataSource '" + entry.getKey() + delimiter + "' was not configured.");
158 }
159 dataSource.connectRead(entry.getValue());
160 } catch (RuntimeException ex) {
161
162 readRecipe.getChannelReadRecipes().iterator().next().getReadSubscription().getExceptionWriteFunction().writeValue(ex);
163 }
164 }
165 }
166
167 @Override
168 public void disconnectRead(ReadRecipe readRecipe) {
169 Map<String, ReadRecipe> splitRecipe = splitRecipe(readRecipe);
170
171
172 for (Map.Entry<String, ReadRecipe> entry : splitRecipe.entrySet()) {
173 try {
174 dataSources.get(entry.getKey()).disconnectRead(entry.getValue());
175 } catch(RuntimeException ex) {
176
177 readRecipe.getChannelReadRecipes().iterator().next().getReadSubscription().getExceptionWriteFunction().writeValue(ex);
178 }
179 }
180 }
181
182 private Map<String, WriteRecipe> splitRecipe(WriteRecipe writeRecipe) {
183
184 Map<String, Collection<ChannelWriteRecipe>> recipes = new HashMap<String, Collection<ChannelWriteRecipe>>();
185 for (ChannelWriteRecipe channelWriteRecipe : writeRecipe.getChannelWriteRecipes()) {
186 String channelName = nameOf(channelWriteRecipe.getChannelName());
187 String dataSource = sourceOf(channelWriteRecipe.getChannelName());
188 Collection<ChannelWriteRecipe> channelWriteRecipes = recipes.get(dataSource);
189 if (channelWriteRecipes == null) {
190 channelWriteRecipes = new ArrayList<ChannelWriteRecipe>();
191 recipes.put(dataSource, channelWriteRecipes);
192 }
193 channelWriteRecipes.add(new ChannelWriteRecipe(channelName, channelWriteRecipe.getWriteSubscription()));
194 }
195
196 Map<String, WriteRecipe> splitRecipes = new HashMap<String, WriteRecipe>();
197 for (Map.Entry<String, Collection<ChannelWriteRecipe>> en : recipes.entrySet()) {
198 String dataSource = en.getKey();
199 Collection<ChannelWriteRecipe> val = en.getValue();
200 WriteRecipe newWriteRecipe = new WriteRecipe(val);
201 splitRecipes.put(dataSource, newWriteRecipe);
202 }
203
204 return splitRecipes;
205 }
206
207 @Override
208 public void connectWrite(WriteRecipe writeRecipe) {
209 Map<String, WriteRecipe> splitRecipes = splitRecipe(writeRecipe);
210 for (Entry<String, WriteRecipe> entry : splitRecipes.entrySet()) {
211 String dataSource = entry.getKey();
212 WriteRecipe splitWriteRecipe = entry.getValue();
213 dataSources.get(dataSource).connectWrite(splitWriteRecipe);
214 }
215 }
216
217 @Override
218 public void disconnectWrite(WriteRecipe writeRecipe) {
219 Map<String, WriteRecipe> splitRecipe = splitRecipe(writeRecipe);
220
221 for (Map.Entry<String, WriteRecipe> en : splitRecipe.entrySet()) {
222 String dataSource = en.getKey();
223 WriteRecipe splitWriteRecipe = en.getValue();
224 dataSources.get(dataSource).disconnectWrite(splitWriteRecipe);
225 }
226 }
227
228
229 @Override
230 ChannelHandler channel(String channelName) {
231 String name = nameOf(channelName);
232 String dataSource = sourceOf(channelName);
233 return dataSources.get(dataSource).channel(name);
234 }
235
236 @Override
237 protected ChannelHandler createChannel(String channelName) {
238 throw new UnsupportedOperationException("Composite data source can't create channels directly.");
239 }
240
241
242
243
244 @Override
245 public void close() {
246 for (DataSource dataSource : dataSources.values()) {
247 dataSource.close();
248 }
249 }
250
251 @Override
252 public Map<String, ChannelHandler> getChannels() {
253 Map<String, ChannelHandler> channels = new HashMap<String, ChannelHandler>();
254 for (Entry<String, DataSource> entry : dataSources.entrySet()) {
255 String dataSourceName = entry.getKey();
256 DataSource dataSource = entry.getValue();
257 for (Entry<String, ChannelHandler> channelEntry : dataSource.getChannels().entrySet()) {
258 String channelName = channelEntry.getKey();
259 ChannelHandler channelHandler = channelEntry.getValue();
260 channels.put(dataSourceName + delimiter + channelName, channelHandler);
261 }
262 }
263
264 return channels;
265 }
266
267 }