blob: 41b49572a77f24b5f8343197c6dfa96e2bd235e0 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07008import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07009import java.util.Set;
10
11import com.esotericsoftware.kryo2.Kryo;
12import com.esotericsoftware.kryo2.io.Input;
13import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070014
15import net.floodlightcontroller.core.IFloodlightProviderService;
16import net.floodlightcontroller.core.module.FloodlightModuleContext;
17import net.floodlightcontroller.core.module.FloodlightModuleException;
18import net.floodlightcontroller.core.module.IFloodlightModule;
19import net.floodlightcontroller.core.module.IFloodlightService;
20
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070021import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070022import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070023import net.onrc.onos.ofcontroller.util.FlowEntry;
24import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070025import net.onrc.onos.ofcontroller.util.FlowId;
26import net.onrc.onos.ofcontroller.util.FlowPath;
27import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
28
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070029import org.slf4j.Logger;
30import org.slf4j.LoggerFactory;
31
32import com.hazelcast.config.Config;
33import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070034import com.hazelcast.core.EntryEvent;
35import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070036import com.hazelcast.core.Hazelcast;
37import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070038import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070039import com.hazelcast.instance.GroupProperties;
40
41/**
42 * A datagrid service that uses Hazelcast as a datagrid.
43 * The relevant data is stored in the Hazelcast datagrid and shared as
44 * appropriate in a multi-node cluster.
45 */
46public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070047 private final static int MAX_BUFFER_SIZE = 64*1024;
48
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070049 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070050 protected IFloodlightProviderService floodlightProvider;
51
52 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070053 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070054 private Config hazelcastConfig = null;
55
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070056 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070057 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070058
59 // State related to the Flow map
60 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070061 private IMap<Long, byte[]> mapFlow = null;
62 private MapFlowListener mapFlowListener = null;
63 private String mapFlowListenerId = null;
64
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070065 // State related to the Flow Entry map
66 protected static final String mapFlowEntryName = "mapFlowEntry";
67 private IMap<Long, byte[]> mapFlowEntry = null;
68 private MapFlowEntryListener mapFlowEntryListener = null;
69 private String mapFlowEntryListenerId = null;
70
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070071 // State related to the Network Topology map
72 protected static final String mapTopologyName = "mapTopology";
73 private IMap<String, byte[]> mapTopology = null;
74 private MapTopologyListener mapTopologyListener = null;
75 private String mapTopologyListenerId = null;
76
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070077 /**
78 * Class for receiving notifications for Flow state.
79 *
80 * The datagrid map is:
81 * - Key : Flow ID (Long)
82 * - Value : Serialized Flow (byte[])
83 */
84 class MapFlowListener implements EntryListener<Long, byte[]> {
85 /**
86 * Receive a notification that an entry is added.
87 *
88 * @param event the notification event for the entry.
89 */
90 public void entryAdded(EntryEvent event) {
91 Long keyLong = (Long)event.getKey();
92 byte[] valueBytes = (byte[])event.getValue();
93
94 //
95 // Decode the value and deliver the notification
96 //
97 Kryo kryo = kryoFactory.newKryo();
98 Input input = new Input(valueBytes);
99 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
100 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700101 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700102 }
103
104 /**
105 * Receive a notification that an entry is removed.
106 *
107 * @param event the notification event for the entry.
108 */
109 public void entryRemoved(EntryEvent event) {
110 Long keyLong = (Long)event.getKey();
111 byte[] valueBytes = (byte[])event.getValue();
112
113 //
114 // Decode the value and deliver the notification
115 //
116 Kryo kryo = kryoFactory.newKryo();
117 Input input = new Input(valueBytes);
118 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
119 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700120 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700121 }
122
123 /**
124 * Receive a notification that an entry is updated.
125 *
126 * @param event the notification event for the entry.
127 */
128 public void entryUpdated(EntryEvent event) {
129 Long keyLong = (Long)event.getKey();
130 byte[] valueBytes = (byte[])event.getValue();
131
132 //
133 // Decode the value and deliver the notification
134 //
135 Kryo kryo = kryoFactory.newKryo();
136 Input input = new Input(valueBytes);
137 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
138 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700139 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700140 }
141
142 /**
143 * Receive a notification that an entry is evicted.
144 *
145 * @param event the notification event for the entry.
146 */
147 public void entryEvicted(EntryEvent event) {
148 // NOTE: We don't use eviction for this map
149 }
150 }
151
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700152 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700153 * Class for receiving notifications for FlowEntry state.
154 *
155 * The datagrid map is:
156 * - Key : FlowEntry ID (Long)
157 * - Value : Serialized FlowEntry (byte[])
158 */
159 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
160 /**
161 * Receive a notification that an entry is added.
162 *
163 * @param event the notification event for the entry.
164 */
165 public void entryAdded(EntryEvent event) {
166 Long keyLong = (Long)event.getKey();
167 byte[] valueBytes = (byte[])event.getValue();
168
169 //
170 // Decode the value and deliver the notification
171 //
172 Kryo kryo = kryoFactory.newKryo();
173 Input input = new Input(valueBytes);
174 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
175 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700176 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700177 }
178
179 /**
180 * Receive a notification that an entry is removed.
181 *
182 * @param event the notification event for the entry.
183 */
184 public void entryRemoved(EntryEvent event) {
185 Long keyLong = (Long)event.getKey();
186 byte[] valueBytes = (byte[])event.getValue();
187
188 //
189 // Decode the value and deliver the notification
190 //
191 Kryo kryo = kryoFactory.newKryo();
192 Input input = new Input(valueBytes);
193 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
194 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700195 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700196 }
197
198 /**
199 * Receive a notification that an entry is updated.
200 *
201 * @param event the notification event for the entry.
202 */
203 public void entryUpdated(EntryEvent event) {
204 Long keyLong = (Long)event.getKey();
205 byte[] valueBytes = (byte[])event.getValue();
206
207 //
208 // Decode the value and deliver the notification
209 //
210 Kryo kryo = kryoFactory.newKryo();
211 Input input = new Input(valueBytes);
212 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
213 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700214 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700215 }
216
217 /**
218 * Receive a notification that an entry is evicted.
219 *
220 * @param event the notification event for the entry.
221 */
222 public void entryEvicted(EntryEvent event) {
223 // NOTE: We don't use eviction for this map
224 }
225 }
226
227 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700228 * Class for receiving notifications for Network Topology state.
229 *
230 * The datagrid map is:
231 * - Key: TopologyElement ID (String)
232 * - Value: Serialized TopologyElement (byte[])
233 */
234 class MapTopologyListener implements EntryListener<String, byte[]> {
235 /**
236 * Receive a notification that an entry is added.
237 *
238 * @param event the notification event for the entry.
239 */
240 public void entryAdded(EntryEvent event) {
241 String keyString = (String)event.getKey();
242 byte[] valueBytes = (byte[])event.getValue();
243
244 //
245 // Decode the value and deliver the notification
246 //
247 Kryo kryo = kryoFactory.newKryo();
248 Input input = new Input(valueBytes);
249 TopologyElement topologyElement =
250 kryo.readObject(input, TopologyElement.class);
251 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700252 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700253 }
254
255 /**
256 * Receive a notification that an entry is removed.
257 *
258 * @param event the notification event for the entry.
259 */
260 public void entryRemoved(EntryEvent event) {
261 String keyString = (String)event.getKey();
262 byte[] valueBytes = (byte[])event.getValue();
263
264 //
265 // Decode the value and deliver the notification
266 //
267 Kryo kryo = kryoFactory.newKryo();
268 Input input = new Input(valueBytes);
269 TopologyElement topologyElement =
270 kryo.readObject(input, TopologyElement.class);
271 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700272 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700273 }
274
275 /**
276 * Receive a notification that an entry is updated.
277 *
278 * @param event the notification event for the entry.
279 */
280 public void entryUpdated(EntryEvent event) {
281 String keyString = (String)event.getKey();
282 byte[] valueBytes = (byte[])event.getValue();
283
284 //
285 // Decode the value and deliver the notification
286 //
287 Kryo kryo = kryoFactory.newKryo();
288 Input input = new Input(valueBytes);
289 TopologyElement topologyElement =
290 kryo.readObject(input, TopologyElement.class);
291 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700292 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700293 }
294
295 /**
296 * Receive a notification that an entry is evicted.
297 *
298 * @param event the notification event for the entry.
299 */
300 public void entryEvicted(EntryEvent event) {
301 // NOTE: We don't use eviction for this map
302 }
303 }
304
305 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700306 * Initialize the Hazelcast Datagrid operation.
307 *
308 * @param conf the configuration filename.
309 */
310 public void init(String configFilename) {
311 /*
312 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
313 System.setProperty("hazelcast.socket.send.buffer.size", "32");
314 */
315 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
316
317 // Init from configuration file
318 try {
319 hazelcastConfig = new FileSystemXmlConfig(configFilename);
320 } catch (FileNotFoundException e) {
321 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
322 }
323 /*
324 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
325 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
326 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
327 */
328 //
329 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
330 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
331 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
332 }
333
334 /**
335 * Shutdown the Hazelcast Datagrid operation.
336 */
337 public void finalize() {
338 close();
339 }
340
341 /**
342 * Shutdown the Hazelcast Datagrid operation.
343 */
344 public void close() {
345 Hazelcast.shutdownAll();
346 }
347
348 /**
349 * Get the collection of offered module services.
350 *
351 * @return the collection of offered module services.
352 */
353 @Override
354 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
355 Collection<Class<? extends IFloodlightService>> l =
356 new ArrayList<Class<? extends IFloodlightService>>();
357 l.add(IDatagridService.class);
358 return l;
359 }
360
361 /**
362 * Get the collection of implemented services.
363 *
364 * @return the collection of implemented services.
365 */
366 @Override
367 public Map<Class<? extends IFloodlightService>, IFloodlightService>
368 getServiceImpls() {
369 Map<Class<? extends IFloodlightService>,
Pavlin Radoslavov27da7532013-10-18 18:41:50 -0700370 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700371 new HashMap<Class<? extends IFloodlightService>,
372 IFloodlightService>();
373 m.put(IDatagridService.class, this);
374 return m;
375 }
376
377 /**
378 * Get the collection of modules this module depends on.
379 *
380 * @return the collection of modules this module depends on.
381 */
382 @Override
383 public Collection<Class<? extends IFloodlightService>>
384 getModuleDependencies() {
385 Collection<Class<? extends IFloodlightService>> l =
386 new ArrayList<Class<? extends IFloodlightService>>();
387 l.add(IFloodlightProviderService.class);
388 return l;
389 }
390
391 /**
392 * Initialize the module.
393 *
394 * @param context the module context to use for the initialization.
395 */
396 @Override
397 public void init(FloodlightModuleContext context)
398 throws FloodlightModuleException {
399 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
400
401 // Get the configuration file name and configure the Datagrid
402 Map<String, String> configMap = context.getConfigParams(this);
403 String configFilename = configMap.get(HazelcastConfigFile);
404 this.init(configFilename);
405 }
406
407 /**
408 * Startup module operation.
409 *
410 * @param context the module context to use for the startup.
411 */
412 @Override
413 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700414 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
415 }
416
417 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700418 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700419 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700420 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700421 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700422 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700423 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700424 */
425 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700426 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
427 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700428
429 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700430 mapFlowListener = new MapFlowListener();
431 mapFlow = hazelcastInstance.getMap(mapFlowName);
432 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700433
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700434 // Initialize the FlowEntry-related map state
435 mapFlowEntryListener = new MapFlowEntryListener();
436 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
437 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
438
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700439 // Initialize the Topology-related map state
440 mapTopologyListener = new MapTopologyListener();
441 mapTopology = hazelcastInstance.getMap(mapTopologyName);
442 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700443 }
444
445 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700446 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700447 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700448 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700449 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700450 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700451 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700452 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700453 */
454 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700455 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700456 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700457 mapFlow.removeEntryListener(mapFlowListenerId);
458 mapFlow = null;
459 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700460
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700461 // Clear the FlowEntry-related map state
462 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
463 mapFlowEntry = null;
464 mapFlowEntryListener = null;
465
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700466 // Clear the Topology-related map state
467 mapTopology.removeEntryListener(mapTopologyListenerId);
468 mapTopology = null;
469 mapTopologyListener = null;
470
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700471 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700472 }
473
474 /**
475 * Get all Flows that are currently in the datagrid.
476 *
477 * @return all Flows that are currently in the datagrid.
478 */
479 @Override
480 public Collection<FlowPath> getAllFlows() {
481 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
482
483 //
484 // Get all current entries
485 //
486 Collection<byte[]> values = mapFlow.values();
487 Kryo kryo = kryoFactory.newKryo();
488 for (byte[] valueBytes : values) {
489 //
490 // Decode the value
491 //
492 Input input = new Input(valueBytes);
493 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
494 allFlows.add(flowPath);
495 }
496 kryoFactory.deleteKryo(kryo);
497
498 return allFlows;
499 }
500
501 /**
502 * Send a notification that a Flow is added.
503 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700504 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700505 */
506 @Override
507 public void notificationSendFlowAdded(FlowPath flowPath) {
508 //
509 // Encode the value
510 //
511 byte[] buffer = new byte[MAX_BUFFER_SIZE];
512 Kryo kryo = kryoFactory.newKryo();
513 Output output = new Output(buffer, -1);
514 kryo.writeObject(output, flowPath);
515 byte[] valueBytes = output.toBytes();
516 kryoFactory.deleteKryo(kryo);
517
518 //
519 // Put the entry:
520 // - Key : Flow ID (Long)
521 // - Value : Serialized Flow (byte[])
522 //
523 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
524 }
525
526 /**
527 * Send a notification that a Flow is removed.
528 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700529 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700530 */
531 @Override
532 public void notificationSendFlowRemoved(FlowId flowId) {
533 //
534 // Remove the entry:
535 // - Key : Flow ID (Long)
536 // - Value : Serialized Flow (byte[])
537 //
538 mapFlow.removeAsync(flowId.value());
539 }
540
541 /**
542 * Send a notification that a Flow is updated.
543 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700544 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700545 */
546 @Override
547 public void notificationSendFlowUpdated(FlowPath flowPath) {
548 // NOTE: Adding an entry with an existing key automatically updates it
549 notificationSendFlowAdded(flowPath);
550 }
551
552 /**
553 * Send a notification that all Flows are removed.
554 */
555 @Override
556 public void notificationSendAllFlowsRemoved() {
557 //
558 // Remove all entries
559 // NOTE: We remove the entries one-by-one so the per-entry
560 // notifications will be delivered.
561 //
562 // mapFlow.clear();
563 Set<Long> keySet = mapFlow.keySet();
564 for (Long key : keySet) {
565 mapFlow.removeAsync(key);
566 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700567 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700568
569 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700570 * Get all Flow Entries that are currently in the datagrid.
571 *
572 * @return all Flow Entries that are currently in the datagrid.
573 */
574 @Override
575 public Collection<FlowEntry> getAllFlowEntries() {
576 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
577
578 //
579 // Get all current entries
580 //
581 Collection<byte[]> values = mapFlowEntry.values();
582 Kryo kryo = kryoFactory.newKryo();
583 for (byte[] valueBytes : values) {
584 //
585 // Decode the value
586 //
587 Input input = new Input(valueBytes);
588 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
589 allFlowEntries.add(flowEntry);
590 }
591 kryoFactory.deleteKryo(kryo);
592
593 return allFlowEntries;
594 }
595
596 /**
597 * Send a notification that a FlowEntry is added.
598 *
599 * @param flowEntry the FlowEntry that is added.
600 */
601 @Override
602 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
603 //
604 // Encode the value
605 //
606 byte[] buffer = new byte[MAX_BUFFER_SIZE];
607 Kryo kryo = kryoFactory.newKryo();
608 Output output = new Output(buffer, -1);
609 kryo.writeObject(output, flowEntry);
610 byte[] valueBytes = output.toBytes();
611 kryoFactory.deleteKryo(kryo);
612
613 //
614 // Put the entry:
615 // - Key : FlowEntry ID (Long)
616 // - Value : Serialized FlowEntry (byte[])
617 //
618 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
619 }
620
621 /**
622 * Send a notification that a FlowEntry is removed.
623 *
624 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
625 */
626 @Override
627 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
628 //
629 // Remove the entry:
630 // - Key : FlowEntry ID (Long)
631 // - Value : Serialized FlowEntry (byte[])
632 //
633 mapFlowEntry.removeAsync(flowEntryId.value());
634 }
635
636 /**
637 * Send a notification that a FlowEntry is updated.
638 *
639 * @param flowEntry the FlowEntry that is updated.
640 */
641 @Override
642 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
643 // NOTE: Adding an entry with an existing key automatically updates it
644 notificationSendFlowEntryAdded(flowEntry);
645 }
646
647 /**
648 * Send a notification that all Flow Entries are removed.
649 */
650 @Override
651 public void notificationSendAllFlowEntriesRemoved() {
652 //
653 // Remove all entries
654 // NOTE: We remove the entries one-by-one so the per-entry
655 // notifications will be delivered.
656 //
657 // mapFlowEntry.clear();
658 Set<Long> keySet = mapFlowEntry.keySet();
659 for (Long key : keySet) {
660 mapFlowEntry.removeAsync(key);
661 }
662 }
663
664 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700665 * Get all Topology Elements that are currently in the datagrid.
666 *
667 * @return all Topology Elements that are currently in the datagrid.
668 */
669 @Override
670 public Collection<TopologyElement> getAllTopologyElements() {
671 Collection<TopologyElement> allTopologyElements =
672 new LinkedList<TopologyElement>();
673
674 //
675 // Get all current entries
676 //
677 Collection<byte[]> values = mapTopology.values();
678 Kryo kryo = kryoFactory.newKryo();
679 for (byte[] valueBytes : values) {
680 //
681 // Decode the value
682 //
683 Input input = new Input(valueBytes);
684 TopologyElement topologyElement =
685 kryo.readObject(input, TopologyElement.class);
686 allTopologyElements.add(topologyElement);
687 }
688 kryoFactory.deleteKryo(kryo);
689
690 return allTopologyElements;
691 }
692
693 /**
694 * Send a notification that a Topology Element is added.
695 *
696 * @param topologyElement the Topology Element that is added.
697 */
698 @Override
699 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
700 //
701 // Encode the value
702 //
703 byte[] buffer = new byte[MAX_BUFFER_SIZE];
704 Kryo kryo = kryoFactory.newKryo();
705 Output output = new Output(buffer, -1);
706 kryo.writeObject(output, topologyElement);
707 byte[] valueBytes = output.toBytes();
708 kryoFactory.deleteKryo(kryo);
709
710 //
711 // Put the entry:
712 // - Key : TopologyElement ID (String)
713 // - Value : Serialized TopologyElement (byte[])
714 //
715 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
716 }
717
718 /**
719 * Send a notification that a Topology Element is removed.
720 *
721 * @param topologyElement the Topology Element that is removed.
722 */
723 @Override
724 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
725 //
726 // Remove the entry:
727 // - Key : TopologyElement ID (String)
728 // - Value : Serialized TopologyElement (byte[])
729 //
730 mapTopology.removeAsync(topologyElement.elementId());
731 }
732
733 /**
734 * Send a notification that a Topology Element is updated.
735 *
736 * @param topologyElement the Topology Element that is updated.
737 */
738 @Override
739 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
740 // NOTE: Adding an entry with an existing key automatically updates it
741 notificationSendTopologyElementAdded(topologyElement);
742 }
743
744 /**
745 * Send a notification that all Topology Elements are removed.
746 */
747 @Override
748 public void notificationSendAllTopologyElementsRemoved() {
749 //
750 // Remove all entries
751 // NOTE: We remove the entries one-by-one so the per-entry
752 // notifications will be delivered.
753 //
754 // mapTopology.clear();
755 Set<String> keySet = mapTopology.keySet();
756 for (String key : keySet) {
757 mapTopology.removeAsync(key);
758 }
759 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700760}