blob: d9fb7c3cb882153ac591cccf8cb5c32d2fb85b04 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07008import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07009import java.util.Set;
10
11import com.esotericsoftware.kryo2.Kryo;
12import com.esotericsoftware.kryo2.io.Input;
13import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070014
15import net.floodlightcontroller.core.IFloodlightProviderService;
16import net.floodlightcontroller.core.module.FloodlightModuleContext;
17import net.floodlightcontroller.core.module.FloodlightModuleException;
18import net.floodlightcontroller.core.module.IFloodlightModule;
19import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070020import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070021
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070022import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070023import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070024import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070025import net.onrc.onos.ofcontroller.util.FlowEntry;
26import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070027import net.onrc.onos.ofcontroller.util.FlowId;
28import net.onrc.onos.ofcontroller.util.FlowPath;
29import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
30
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070031import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34import com.hazelcast.config.Config;
35import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070036import com.hazelcast.core.EntryEvent;
37import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070038import com.hazelcast.core.Hazelcast;
39import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070040import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070041import com.hazelcast.instance.GroupProperties;
42
43/**
44 * A datagrid service that uses Hazelcast as a datagrid.
45 * The relevant data is stored in the Hazelcast datagrid and shared as
46 * appropriate in a multi-node cluster.
47 */
48public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070049 private final static int MAX_BUFFER_SIZE = 64*1024;
50
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070051 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070052 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070053 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070054
55 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070056 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070057 private Config hazelcastConfig = null;
58
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070059 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070060 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070061
62 // State related to the Flow map
63 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070064 private IMap<Long, byte[]> mapFlow = null;
65 private MapFlowListener mapFlowListener = null;
66 private String mapFlowListenerId = null;
67
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070068 // State related to the Flow Entry map
69 protected static final String mapFlowEntryName = "mapFlowEntry";
70 private IMap<Long, byte[]> mapFlowEntry = null;
71 private MapFlowEntryListener mapFlowEntryListener = null;
72 private String mapFlowEntryListenerId = null;
73
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070074 // State related to the Network Topology map
75 protected static final String mapTopologyName = "mapTopology";
76 private IMap<String, byte[]> mapTopology = null;
77 private MapTopologyListener mapTopologyListener = null;
78 private String mapTopologyListenerId = null;
79
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070080 /**
81 * Class for receiving notifications for Flow state.
82 *
83 * The datagrid map is:
84 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -080085 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070086 */
87 class MapFlowListener implements EntryListener<Long, byte[]> {
88 /**
89 * Receive a notification that an entry is added.
90 *
91 * @param event the notification event for the entry.
92 */
93 public void entryAdded(EntryEvent event) {
94 Long keyLong = (Long)event.getKey();
95 byte[] valueBytes = (byte[])event.getValue();
96
97 //
98 // Decode the value and deliver the notification
99 //
100 Kryo kryo = kryoFactory.newKryo();
101 Input input = new Input(valueBytes);
102 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
103 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700104 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700105 }
106
107 /**
108 * Receive a notification that an entry is removed.
109 *
110 * @param event the notification event for the entry.
111 */
112 public void entryRemoved(EntryEvent event) {
113 Long keyLong = (Long)event.getKey();
114 byte[] valueBytes = (byte[])event.getValue();
115
116 //
117 // Decode the value and deliver the notification
118 //
119 Kryo kryo = kryoFactory.newKryo();
120 Input input = new Input(valueBytes);
121 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
122 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700123 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700124 }
125
126 /**
127 * Receive a notification that an entry is updated.
128 *
129 * @param event the notification event for the entry.
130 */
131 public void entryUpdated(EntryEvent event) {
132 Long keyLong = (Long)event.getKey();
133 byte[] valueBytes = (byte[])event.getValue();
134
135 //
136 // Decode the value and deliver the notification
137 //
138 Kryo kryo = kryoFactory.newKryo();
139 Input input = new Input(valueBytes);
140 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
141 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700142 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700143 }
144
145 /**
146 * Receive a notification that an entry is evicted.
147 *
148 * @param event the notification event for the entry.
149 */
150 public void entryEvicted(EntryEvent event) {
151 // NOTE: We don't use eviction for this map
152 }
153 }
154
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700155 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700156 * Class for receiving notifications for FlowEntry state.
157 *
158 * The datagrid map is:
159 * - Key : FlowEntry ID (Long)
160 * - Value : Serialized FlowEntry (byte[])
161 */
162 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
163 /**
164 * Receive a notification that an entry is added.
165 *
166 * @param event the notification event for the entry.
167 */
168 public void entryAdded(EntryEvent event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700169 //
170 // NOTE: Ignore Flow Entries Events originated by this instance
171 //
172 if (event.getMember().localMember())
173 return;
174
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700175 Long keyLong = (Long)event.getKey();
176 byte[] valueBytes = (byte[])event.getValue();
177
178 //
179 // Decode the value and deliver the notification
180 //
181 Kryo kryo = kryoFactory.newKryo();
182 Input input = new Input(valueBytes);
183 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
184 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700185 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700186 }
187
188 /**
189 * Receive a notification that an entry is removed.
190 *
191 * @param event the notification event for the entry.
192 */
193 public void entryRemoved(EntryEvent event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700194 //
195 // NOTE: Ignore Flow Entries Events originated by this instance
196 //
197 if (event.getMember().localMember())
198 return;
199
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700200 Long keyLong = (Long)event.getKey();
201 byte[] valueBytes = (byte[])event.getValue();
202
203 //
204 // Decode the value and deliver the notification
205 //
206 Kryo kryo = kryoFactory.newKryo();
207 Input input = new Input(valueBytes);
208 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
209 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700210 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700211 }
212
213 /**
214 * Receive a notification that an entry is updated.
215 *
216 * @param event the notification event for the entry.
217 */
218 public void entryUpdated(EntryEvent event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700219 //
220 // NOTE: Ignore Flow Entries Events originated by this instance
221 //
222 if (event.getMember().localMember())
223 return;
224
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700225 Long keyLong = (Long)event.getKey();
226 byte[] valueBytes = (byte[])event.getValue();
227
228 //
229 // Decode the value and deliver the notification
230 //
231 Kryo kryo = kryoFactory.newKryo();
232 Input input = new Input(valueBytes);
233 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
234 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700235 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700236 }
237
238 /**
239 * Receive a notification that an entry is evicted.
240 *
241 * @param event the notification event for the entry.
242 */
243 public void entryEvicted(EntryEvent event) {
244 // NOTE: We don't use eviction for this map
245 }
246 }
247
248 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700249 * Class for receiving notifications for Network Topology state.
250 *
251 * The datagrid map is:
252 * - Key: TopologyElement ID (String)
253 * - Value: Serialized TopologyElement (byte[])
254 */
255 class MapTopologyListener implements EntryListener<String, byte[]> {
256 /**
257 * Receive a notification that an entry is added.
258 *
259 * @param event the notification event for the entry.
260 */
261 public void entryAdded(EntryEvent event) {
262 String keyString = (String)event.getKey();
263 byte[] valueBytes = (byte[])event.getValue();
264
265 //
266 // Decode the value and deliver the notification
267 //
268 Kryo kryo = kryoFactory.newKryo();
269 Input input = new Input(valueBytes);
270 TopologyElement topologyElement =
271 kryo.readObject(input, TopologyElement.class);
272 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700273 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700274 }
275
276 /**
277 * Receive a notification that an entry is removed.
278 *
279 * @param event the notification event for the entry.
280 */
281 public void entryRemoved(EntryEvent event) {
282 String keyString = (String)event.getKey();
283 byte[] valueBytes = (byte[])event.getValue();
284
285 //
286 // Decode the value and deliver the notification
287 //
288 Kryo kryo = kryoFactory.newKryo();
289 Input input = new Input(valueBytes);
290 TopologyElement topologyElement =
291 kryo.readObject(input, TopologyElement.class);
292 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700293 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700294 }
295
296 /**
297 * Receive a notification that an entry is updated.
298 *
299 * @param event the notification event for the entry.
300 */
301 public void entryUpdated(EntryEvent event) {
302 String keyString = (String)event.getKey();
303 byte[] valueBytes = (byte[])event.getValue();
304
305 //
306 // Decode the value and deliver the notification
307 //
308 Kryo kryo = kryoFactory.newKryo();
309 Input input = new Input(valueBytes);
310 TopologyElement topologyElement =
311 kryo.readObject(input, TopologyElement.class);
312 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700313 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700314 }
315
316 /**
317 * Receive a notification that an entry is evicted.
318 *
319 * @param event the notification event for the entry.
320 */
321 public void entryEvicted(EntryEvent event) {
322 // NOTE: We don't use eviction for this map
323 }
324 }
325
326 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700327 * Initialize the Hazelcast Datagrid operation.
328 *
329 * @param conf the configuration filename.
330 */
331 public void init(String configFilename) {
332 /*
333 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
334 System.setProperty("hazelcast.socket.send.buffer.size", "32");
335 */
336 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
337
338 // Init from configuration file
339 try {
340 hazelcastConfig = new FileSystemXmlConfig(configFilename);
341 } catch (FileNotFoundException e) {
342 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
343 }
344 /*
345 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
346 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
347 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
348 */
349 //
350 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
351 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
352 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
353 }
354
355 /**
356 * Shutdown the Hazelcast Datagrid operation.
357 */
358 public void finalize() {
359 close();
360 }
361
362 /**
363 * Shutdown the Hazelcast Datagrid operation.
364 */
365 public void close() {
366 Hazelcast.shutdownAll();
367 }
368
369 /**
370 * Get the collection of offered module services.
371 *
372 * @return the collection of offered module services.
373 */
374 @Override
375 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
376 Collection<Class<? extends IFloodlightService>> l =
377 new ArrayList<Class<? extends IFloodlightService>>();
378 l.add(IDatagridService.class);
379 return l;
380 }
381
382 /**
383 * Get the collection of implemented services.
384 *
385 * @return the collection of implemented services.
386 */
387 @Override
388 public Map<Class<? extends IFloodlightService>, IFloodlightService>
389 getServiceImpls() {
390 Map<Class<? extends IFloodlightService>,
Pavlin Radoslavov27da7532013-10-18 18:41:50 -0700391 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700392 new HashMap<Class<? extends IFloodlightService>,
393 IFloodlightService>();
394 m.put(IDatagridService.class, this);
395 return m;
396 }
397
398 /**
399 * Get the collection of modules this module depends on.
400 *
401 * @return the collection of modules this module depends on.
402 */
403 @Override
404 public Collection<Class<? extends IFloodlightService>>
405 getModuleDependencies() {
406 Collection<Class<? extends IFloodlightService>> l =
407 new ArrayList<Class<? extends IFloodlightService>>();
408 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700409 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700410 return l;
411 }
412
413 /**
414 * Initialize the module.
415 *
416 * @param context the module context to use for the initialization.
417 */
418 @Override
419 public void init(FloodlightModuleContext context)
420 throws FloodlightModuleException {
421 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700422 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700423
424 // Get the configuration file name and configure the Datagrid
425 Map<String, String> configMap = context.getConfigParams(this);
426 String configFilename = configMap.get(HazelcastConfigFile);
427 this.init(configFilename);
428 }
429
430 /**
431 * Startup module operation.
432 *
433 * @param context the module context to use for the startup.
434 */
435 @Override
436 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700437 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700438
439 restApi.addRestletRoutable(new DatagridWebRoutable());
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700440 }
441
442 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700443 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700444 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700445 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700446 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700447 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700448 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700449 */
450 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700451 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
452 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700453
454 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700455 mapFlowListener = new MapFlowListener();
456 mapFlow = hazelcastInstance.getMap(mapFlowName);
457 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700458
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700459 // Initialize the FlowEntry-related map state
460 mapFlowEntryListener = new MapFlowEntryListener();
461 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
462 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
463
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700464 // Initialize the Topology-related map state
465 mapTopologyListener = new MapTopologyListener();
466 mapTopology = hazelcastInstance.getMap(mapTopologyName);
467 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700468 }
469
470 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700471 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700472 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700473 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700474 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700475 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700476 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700477 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700478 */
479 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700480 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700481 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700482 mapFlow.removeEntryListener(mapFlowListenerId);
483 mapFlow = null;
484 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700485
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700486 // Clear the FlowEntry-related map state
487 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
488 mapFlowEntry = null;
489 mapFlowEntryListener = null;
490
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700491 // Clear the Topology-related map state
492 mapTopology.removeEntryListener(mapTopologyListenerId);
493 mapTopology = null;
494 mapTopologyListener = null;
495
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700496 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700497 }
498
499 /**
500 * Get all Flows that are currently in the datagrid.
501 *
502 * @return all Flows that are currently in the datagrid.
503 */
504 @Override
505 public Collection<FlowPath> getAllFlows() {
506 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
507
508 //
509 // Get all current entries
510 //
511 Collection<byte[]> values = mapFlow.values();
512 Kryo kryo = kryoFactory.newKryo();
513 for (byte[] valueBytes : values) {
514 //
515 // Decode the value
516 //
517 Input input = new Input(valueBytes);
518 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
519 allFlows.add(flowPath);
520 }
521 kryoFactory.deleteKryo(kryo);
522
523 return allFlows;
524 }
525
526 /**
527 * Send a notification that a Flow is added.
528 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700529 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700530 */
531 @Override
532 public void notificationSendFlowAdded(FlowPath flowPath) {
533 //
534 // Encode the value
535 //
536 byte[] buffer = new byte[MAX_BUFFER_SIZE];
537 Kryo kryo = kryoFactory.newKryo();
538 Output output = new Output(buffer, -1);
539 kryo.writeObject(output, flowPath);
540 byte[] valueBytes = output.toBytes();
541 kryoFactory.deleteKryo(kryo);
542
543 //
544 // Put the entry:
545 // - Key : Flow ID (Long)
546 // - Value : Serialized Flow (byte[])
547 //
548 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
549 }
550
551 /**
552 * Send a notification that a Flow is removed.
553 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700554 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700555 */
556 @Override
557 public void notificationSendFlowRemoved(FlowId flowId) {
558 //
559 // Remove the entry:
560 // - Key : Flow ID (Long)
561 // - Value : Serialized Flow (byte[])
562 //
563 mapFlow.removeAsync(flowId.value());
564 }
565
566 /**
567 * Send a notification that a Flow is updated.
568 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700569 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700570 */
571 @Override
572 public void notificationSendFlowUpdated(FlowPath flowPath) {
573 // NOTE: Adding an entry with an existing key automatically updates it
574 notificationSendFlowAdded(flowPath);
575 }
576
577 /**
578 * Send a notification that all Flows are removed.
579 */
580 @Override
581 public void notificationSendAllFlowsRemoved() {
582 //
583 // Remove all entries
584 // NOTE: We remove the entries one-by-one so the per-entry
585 // notifications will be delivered.
586 //
587 // mapFlow.clear();
588 Set<Long> keySet = mapFlow.keySet();
589 for (Long key : keySet) {
590 mapFlow.removeAsync(key);
591 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700592 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700593
594 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700595 * Get all Flow Entries that are currently in the datagrid.
596 *
597 * @return all Flow Entries that are currently in the datagrid.
598 */
599 @Override
600 public Collection<FlowEntry> getAllFlowEntries() {
601 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
602
603 //
604 // Get all current entries
605 //
606 Collection<byte[]> values = mapFlowEntry.values();
607 Kryo kryo = kryoFactory.newKryo();
608 for (byte[] valueBytes : values) {
609 //
610 // Decode the value
611 //
612 Input input = new Input(valueBytes);
613 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
614 allFlowEntries.add(flowEntry);
615 }
616 kryoFactory.deleteKryo(kryo);
617
618 return allFlowEntries;
619 }
620
621 /**
622 * Send a notification that a FlowEntry is added.
623 *
624 * @param flowEntry the FlowEntry that is added.
625 */
626 @Override
627 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
628 //
629 // Encode the value
630 //
631 byte[] buffer = new byte[MAX_BUFFER_SIZE];
632 Kryo kryo = kryoFactory.newKryo();
633 Output output = new Output(buffer, -1);
634 kryo.writeObject(output, flowEntry);
635 byte[] valueBytes = output.toBytes();
636 kryoFactory.deleteKryo(kryo);
637
638 //
639 // Put the entry:
640 // - Key : FlowEntry ID (Long)
641 // - Value : Serialized FlowEntry (byte[])
642 //
643 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
644 }
645
646 /**
647 * Send a notification that a FlowEntry is removed.
648 *
649 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
650 */
651 @Override
652 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
653 //
654 // Remove the entry:
655 // - Key : FlowEntry ID (Long)
656 // - Value : Serialized FlowEntry (byte[])
657 //
658 mapFlowEntry.removeAsync(flowEntryId.value());
659 }
660
661 /**
662 * Send a notification that a FlowEntry is updated.
663 *
664 * @param flowEntry the FlowEntry that is updated.
665 */
666 @Override
667 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
668 // NOTE: Adding an entry with an existing key automatically updates it
669 notificationSendFlowEntryAdded(flowEntry);
670 }
671
672 /**
673 * Send a notification that all Flow Entries are removed.
674 */
675 @Override
676 public void notificationSendAllFlowEntriesRemoved() {
677 //
678 // Remove all entries
679 // NOTE: We remove the entries one-by-one so the per-entry
680 // notifications will be delivered.
681 //
682 // mapFlowEntry.clear();
683 Set<Long> keySet = mapFlowEntry.keySet();
684 for (Long key : keySet) {
685 mapFlowEntry.removeAsync(key);
686 }
687 }
688
689 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700690 * Get all Topology Elements that are currently in the datagrid.
691 *
692 * @return all Topology Elements that are currently in the datagrid.
693 */
694 @Override
695 public Collection<TopologyElement> getAllTopologyElements() {
696 Collection<TopologyElement> allTopologyElements =
697 new LinkedList<TopologyElement>();
698
699 //
700 // Get all current entries
701 //
702 Collection<byte[]> values = mapTopology.values();
703 Kryo kryo = kryoFactory.newKryo();
704 for (byte[] valueBytes : values) {
705 //
706 // Decode the value
707 //
708 Input input = new Input(valueBytes);
709 TopologyElement topologyElement =
710 kryo.readObject(input, TopologyElement.class);
711 allTopologyElements.add(topologyElement);
712 }
713 kryoFactory.deleteKryo(kryo);
714
715 return allTopologyElements;
716 }
717
718 /**
719 * Send a notification that a Topology Element is added.
720 *
721 * @param topologyElement the Topology Element that is added.
722 */
723 @Override
724 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
725 //
726 // Encode the value
727 //
728 byte[] buffer = new byte[MAX_BUFFER_SIZE];
729 Kryo kryo = kryoFactory.newKryo();
730 Output output = new Output(buffer, -1);
731 kryo.writeObject(output, topologyElement);
732 byte[] valueBytes = output.toBytes();
733 kryoFactory.deleteKryo(kryo);
734
735 //
736 // Put the entry:
737 // - Key : TopologyElement ID (String)
738 // - Value : Serialized TopologyElement (byte[])
739 //
740 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
741 }
742
743 /**
744 * Send a notification that a Topology Element is removed.
745 *
746 * @param topologyElement the Topology Element that is removed.
747 */
748 @Override
749 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
750 //
751 // Remove the entry:
752 // - Key : TopologyElement ID (String)
753 // - Value : Serialized TopologyElement (byte[])
754 //
755 mapTopology.removeAsync(topologyElement.elementId());
756 }
757
758 /**
759 * Send a notification that a Topology Element is updated.
760 *
761 * @param topologyElement the Topology Element that is updated.
762 */
763 @Override
764 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
765 // NOTE: Adding an entry with an existing key automatically updates it
766 notificationSendTopologyElementAdded(topologyElement);
767 }
768
769 /**
770 * Send a notification that all Topology Elements are removed.
771 */
772 @Override
773 public void notificationSendAllTopologyElementsRemoved() {
774 //
775 // Remove all entries
776 // NOTE: We remove the entries one-by-one so the per-entry
777 // notifications will be delivered.
778 //
779 // mapTopology.clear();
780 Set<String> keySet = mapTopology.keySet();
781 for (String key : keySet) {
782 mapTopology.removeAsync(key);
783 }
784 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700785}