blob: 7f8a7a7777c27848c9dcee2fb79a92e0396f7f70 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080021import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070022import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070023import net.onrc.onos.ofcontroller.util.FlowEntry;
24import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070025import net.onrc.onos.ofcontroller.util.FlowId;
26import net.onrc.onos.ofcontroller.util.FlowPath;
27import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
28
Jonathan Hart18ad55c2013-11-11 22:49:55 -080029import org.openflow.util.HexString;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070030import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
Jonathan Hart18ad55c2013-11-11 22:49:55 -080033import com.esotericsoftware.kryo2.Kryo;
34import com.esotericsoftware.kryo2.io.Input;
35import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070036import com.hazelcast.config.Config;
37import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070038import com.hazelcast.core.EntryEvent;
39import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070040import com.hazelcast.core.Hazelcast;
41import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070042import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070043import com.hazelcast.instance.GroupProperties;
44
45/**
46 * A datagrid service that uses Hazelcast as a datagrid.
47 * The relevant data is stored in the Hazelcast datagrid and shared as
48 * appropriate in a multi-node cluster.
49 */
50public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070051 private final static int MAX_BUFFER_SIZE = 64*1024;
52
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070053 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070054 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070055 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070056
57 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070058 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070059 private Config hazelcastConfig = null;
60
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070061 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070062 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070063
64 // State related to the Flow map
65 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070066 private IMap<Long, byte[]> mapFlow = null;
67 private MapFlowListener mapFlowListener = null;
68 private String mapFlowListenerId = null;
69
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070070 // State related to the Flow Entry map
71 protected static final String mapFlowEntryName = "mapFlowEntry";
72 private IMap<Long, byte[]> mapFlowEntry = null;
73 private MapFlowEntryListener mapFlowEntryListener = null;
74 private String mapFlowEntryListenerId = null;
75
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070076 // State related to the Network Topology map
77 protected static final String mapTopologyName = "mapTopology";
78 private IMap<String, byte[]> mapTopology = null;
79 private MapTopologyListener mapTopologyListener = null;
80 private String mapTopologyListenerId = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080081
82 // State related to the ARP map
83 protected static final String arpMapName = "arpMap";
84 private IMap<byte[], byte[]> arpMap = null;
85 private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
86 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070087
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070088 /**
89 * Class for receiving notifications for Flow state.
90 *
91 * The datagrid map is:
92 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -080093 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070094 */
95 class MapFlowListener implements EntryListener<Long, byte[]> {
96 /**
97 * Receive a notification that an entry is added.
98 *
99 * @param event the notification event for the entry.
100 */
101 public void entryAdded(EntryEvent event) {
102 Long keyLong = (Long)event.getKey();
103 byte[] valueBytes = (byte[])event.getValue();
104
105 //
106 // Decode the value and deliver the notification
107 //
108 Kryo kryo = kryoFactory.newKryo();
109 Input input = new Input(valueBytes);
110 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
111 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700112 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700113 }
114
115 /**
116 * Receive a notification that an entry is removed.
117 *
118 * @param event the notification event for the entry.
119 */
120 public void entryRemoved(EntryEvent event) {
121 Long keyLong = (Long)event.getKey();
122 byte[] valueBytes = (byte[])event.getValue();
123
124 //
125 // Decode the value and deliver the notification
126 //
127 Kryo kryo = kryoFactory.newKryo();
128 Input input = new Input(valueBytes);
129 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
130 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700131 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700132 }
133
134 /**
135 * Receive a notification that an entry is updated.
136 *
137 * @param event the notification event for the entry.
138 */
139 public void entryUpdated(EntryEvent event) {
140 Long keyLong = (Long)event.getKey();
141 byte[] valueBytes = (byte[])event.getValue();
142
143 //
144 // Decode the value and deliver the notification
145 //
146 Kryo kryo = kryoFactory.newKryo();
147 Input input = new Input(valueBytes);
148 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
149 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700150 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700151 }
152
153 /**
154 * Receive a notification that an entry is evicted.
155 *
156 * @param event the notification event for the entry.
157 */
158 public void entryEvicted(EntryEvent event) {
159 // NOTE: We don't use eviction for this map
160 }
161 }
162
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700163 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700164 * Class for receiving notifications for FlowEntry state.
165 *
166 * The datagrid map is:
167 * - Key : FlowEntry ID (Long)
168 * - Value : Serialized FlowEntry (byte[])
169 */
170 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
171 /**
172 * Receive a notification that an entry is added.
173 *
174 * @param event the notification event for the entry.
175 */
176 public void entryAdded(EntryEvent event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700177 //
178 // NOTE: Ignore Flow Entries Events originated by this instance
179 //
180 if (event.getMember().localMember())
181 return;
182
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700183 Long keyLong = (Long)event.getKey();
184 byte[] valueBytes = (byte[])event.getValue();
185
186 //
187 // Decode the value and deliver the notification
188 //
189 Kryo kryo = kryoFactory.newKryo();
190 Input input = new Input(valueBytes);
191 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
192 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700193 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700194 }
195
196 /**
197 * Receive a notification that an entry is removed.
198 *
199 * @param event the notification event for the entry.
200 */
201 public void entryRemoved(EntryEvent event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700202 //
203 // NOTE: Ignore Flow Entries Events originated by this instance
204 //
205 if (event.getMember().localMember())
206 return;
207
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700208 Long keyLong = (Long)event.getKey();
209 byte[] valueBytes = (byte[])event.getValue();
210
211 //
212 // Decode the value and deliver the notification
213 //
214 Kryo kryo = kryoFactory.newKryo();
215 Input input = new Input(valueBytes);
216 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
217 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700218 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700219 }
220
221 /**
222 * Receive a notification that an entry is updated.
223 *
224 * @param event the notification event for the entry.
225 */
226 public void entryUpdated(EntryEvent event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700227 //
228 // NOTE: Ignore Flow Entries Events originated by this instance
229 //
230 if (event.getMember().localMember())
231 return;
232
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700233 Long keyLong = (Long)event.getKey();
234 byte[] valueBytes = (byte[])event.getValue();
235
236 //
237 // Decode the value and deliver the notification
238 //
239 Kryo kryo = kryoFactory.newKryo();
240 Input input = new Input(valueBytes);
241 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
242 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700243 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700244 }
245
246 /**
247 * Receive a notification that an entry is evicted.
248 *
249 * @param event the notification event for the entry.
250 */
251 public void entryEvicted(EntryEvent event) {
252 // NOTE: We don't use eviction for this map
253 }
254 }
255
256 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700257 * Class for receiving notifications for Network Topology state.
258 *
259 * The datagrid map is:
260 * - Key: TopologyElement ID (String)
261 * - Value: Serialized TopologyElement (byte[])
262 */
263 class MapTopologyListener implements EntryListener<String, byte[]> {
264 /**
265 * Receive a notification that an entry is added.
266 *
267 * @param event the notification event for the entry.
268 */
269 public void entryAdded(EntryEvent event) {
270 String keyString = (String)event.getKey();
271 byte[] valueBytes = (byte[])event.getValue();
272
273 //
274 // Decode the value and deliver the notification
275 //
276 Kryo kryo = kryoFactory.newKryo();
277 Input input = new Input(valueBytes);
278 TopologyElement topologyElement =
279 kryo.readObject(input, TopologyElement.class);
280 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700281 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700282 }
283
284 /**
285 * Receive a notification that an entry is removed.
286 *
287 * @param event the notification event for the entry.
288 */
289 public void entryRemoved(EntryEvent event) {
290 String keyString = (String)event.getKey();
291 byte[] valueBytes = (byte[])event.getValue();
292
293 //
294 // Decode the value and deliver the notification
295 //
296 Kryo kryo = kryoFactory.newKryo();
297 Input input = new Input(valueBytes);
298 TopologyElement topologyElement =
299 kryo.readObject(input, TopologyElement.class);
300 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700301 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700302 }
303
304 /**
305 * Receive a notification that an entry is updated.
306 *
307 * @param event the notification event for the entry.
308 */
309 public void entryUpdated(EntryEvent event) {
310 String keyString = (String)event.getKey();
311 byte[] valueBytes = (byte[])event.getValue();
312
313 //
314 // Decode the value and deliver the notification
315 //
316 Kryo kryo = kryoFactory.newKryo();
317 Input input = new Input(valueBytes);
318 TopologyElement topologyElement =
319 kryo.readObject(input, TopologyElement.class);
320 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700321 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700322 }
323
324 /**
325 * Receive a notification that an entry is evicted.
326 *
327 * @param event the notification event for the entry.
328 */
329 public void entryEvicted(EntryEvent event) {
330 // NOTE: We don't use eviction for this map
331 }
332 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800333
334 /**
335 * Class for receiving notifications for ARP requests.
336 *
337 * The datagrid map is:
338 * - Key: Request ID (String)
339 * - Value: ARP request packet (byte[])
340 */
341 class ArpMapListener implements EntryListener<byte[], byte[]> {
342 /**
343 * Receive a notification that an entry is added.
344 *
345 * @param event the notification event for the entry.
346 */
347 public void entryAdded(EntryEvent<byte[], byte[]> event) {
348 for (IArpEventHandler arpEventHandler : arpEventHandlers) {
349 arpEventHandler.arpRequestNotification(event.getKey());
350 }
351
352 //
353 // Decode the value and deliver the notification
354 //
355 /*
356 Kryo kryo = kryoFactory.newKryo();
357 Input input = new Input(valueBytes);
358 TopologyElement topologyElement =
359 kryo.readObject(input, TopologyElement.class);
360 kryoFactory.deleteKryo(kryo);
361 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
362 */
363 }
364
365 /**
366 * Receive a notification that an entry is removed.
367 *
368 * @param event the notification event for the entry.
369 */
370 public void entryRemoved(EntryEvent<byte[], byte[]> event) {
371 /*
372 String keyString = (String)event.getKey();
373 byte[] valueBytes = (byte[])event.getValue();
374
375 //
376 // Decode the value and deliver the notification
377 //
378 Kryo kryo = kryoFactory.newKryo();
379 Input input = new Input(valueBytes);
380 TopologyElement topologyElement =
381 kryo.readObject(input, TopologyElement.class);
382 kryoFactory.deleteKryo(kryo);
383 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
384 */
385 }
386
387 /**
388 * Receive a notification that an entry is updated.
389 *
390 * @param event the notification event for the entry.
391 */
392 public void entryUpdated(EntryEvent<byte[], byte[]> event) {
393 /*
394 String keyString = (String)event.getKey();
395 byte[] valueBytes = (byte[])event.getValue();
396
397 //
398 // Decode the value and deliver the notification
399 //
400 Kryo kryo = kryoFactory.newKryo();
401 Input input = new Input(valueBytes);
402 TopologyElement topologyElement =
403 kryo.readObject(input, TopologyElement.class);
404 kryoFactory.deleteKryo(kryo);
405 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
406 */
407 }
408
409 /**
410 * Receive a notification that an entry is evicted.
411 *
412 * @param event the notification event for the entry.
413 */
414 public void entryEvicted(EntryEvent<byte[], byte[]> event) {
415 // NOTE: We don't use eviction for this map
416 }
417 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700418
419 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700420 * Initialize the Hazelcast Datagrid operation.
421 *
422 * @param conf the configuration filename.
423 */
424 public void init(String configFilename) {
425 /*
426 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
427 System.setProperty("hazelcast.socket.send.buffer.size", "32");
428 */
429 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
430
431 // Init from configuration file
432 try {
433 hazelcastConfig = new FileSystemXmlConfig(configFilename);
434 } catch (FileNotFoundException e) {
435 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
436 }
437 /*
438 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
439 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
440 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
441 */
442 //
443 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
444 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
445 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
446 }
447
448 /**
449 * Shutdown the Hazelcast Datagrid operation.
450 */
451 public void finalize() {
452 close();
453 }
454
455 /**
456 * Shutdown the Hazelcast Datagrid operation.
457 */
458 public void close() {
459 Hazelcast.shutdownAll();
460 }
461
462 /**
463 * Get the collection of offered module services.
464 *
465 * @return the collection of offered module services.
466 */
467 @Override
468 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
469 Collection<Class<? extends IFloodlightService>> l =
470 new ArrayList<Class<? extends IFloodlightService>>();
471 l.add(IDatagridService.class);
472 return l;
473 }
474
475 /**
476 * Get the collection of implemented services.
477 *
478 * @return the collection of implemented services.
479 */
480 @Override
481 public Map<Class<? extends IFloodlightService>, IFloodlightService>
482 getServiceImpls() {
483 Map<Class<? extends IFloodlightService>,
Pavlin Radoslavov27da7532013-10-18 18:41:50 -0700484 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700485 new HashMap<Class<? extends IFloodlightService>,
486 IFloodlightService>();
487 m.put(IDatagridService.class, this);
488 return m;
489 }
490
491 /**
492 * Get the collection of modules this module depends on.
493 *
494 * @return the collection of modules this module depends on.
495 */
496 @Override
497 public Collection<Class<? extends IFloodlightService>>
498 getModuleDependencies() {
499 Collection<Class<? extends IFloodlightService>> l =
500 new ArrayList<Class<? extends IFloodlightService>>();
501 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700502 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700503 return l;
504 }
505
506 /**
507 * Initialize the module.
508 *
509 * @param context the module context to use for the initialization.
510 */
511 @Override
512 public void init(FloodlightModuleContext context)
513 throws FloodlightModuleException {
514 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700515 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700516
517 // Get the configuration file name and configure the Datagrid
518 Map<String, String> configMap = context.getConfigParams(this);
519 String configFilename = configMap.get(HazelcastConfigFile);
520 this.init(configFilename);
521 }
522
523 /**
524 * Startup module operation.
525 *
526 * @param context the module context to use for the startup.
527 */
528 @Override
529 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700530 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700531
532 restApi.addRestletRoutable(new DatagridWebRoutable());
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800533
534 arpMap = hazelcastInstance.getMap(arpMapName);
535 arpMap.addEntryListener(new ArpMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700536 }
537
538 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700539 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700540 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700541 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700542 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700543 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700544 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700545 */
546 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700547 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
548 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700549
550 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700551 mapFlowListener = new MapFlowListener();
552 mapFlow = hazelcastInstance.getMap(mapFlowName);
553 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700554
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700555 // Initialize the FlowEntry-related map state
556 mapFlowEntryListener = new MapFlowEntryListener();
557 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
558 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
559
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700560 // Initialize the Topology-related map state
561 mapTopologyListener = new MapTopologyListener();
562 mapTopology = hazelcastInstance.getMap(mapTopologyName);
563 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700564 }
565
566 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700567 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700568 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700569 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700570 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700571 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700572 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700573 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700574 */
575 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700576 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700577 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700578 mapFlow.removeEntryListener(mapFlowListenerId);
579 mapFlow = null;
580 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700581
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700582 // Clear the FlowEntry-related map state
583 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
584 mapFlowEntry = null;
585 mapFlowEntryListener = null;
586
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700587 // Clear the Topology-related map state
588 mapTopology.removeEntryListener(mapTopologyListenerId);
589 mapTopology = null;
590 mapTopologyListener = null;
591
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700592 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700593 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800594
595 @Override
596 public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
597 if (arpEventHandler != null) {
598 arpEventHandlers.add(arpEventHandler);
599 }
600 }
601
602 @Override
603 public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
604 arpEventHandlers.remove(arpEventHandler);
605 }
606
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700607 /**
608 * Get all Flows that are currently in the datagrid.
609 *
610 * @return all Flows that are currently in the datagrid.
611 */
612 @Override
613 public Collection<FlowPath> getAllFlows() {
614 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
615
616 //
617 // Get all current entries
618 //
619 Collection<byte[]> values = mapFlow.values();
620 Kryo kryo = kryoFactory.newKryo();
621 for (byte[] valueBytes : values) {
622 //
623 // Decode the value
624 //
625 Input input = new Input(valueBytes);
626 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
627 allFlows.add(flowPath);
628 }
629 kryoFactory.deleteKryo(kryo);
630
631 return allFlows;
632 }
633
634 /**
635 * Send a notification that a Flow is added.
636 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700637 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700638 */
639 @Override
640 public void notificationSendFlowAdded(FlowPath flowPath) {
641 //
642 // Encode the value
643 //
644 byte[] buffer = new byte[MAX_BUFFER_SIZE];
645 Kryo kryo = kryoFactory.newKryo();
646 Output output = new Output(buffer, -1);
647 kryo.writeObject(output, flowPath);
648 byte[] valueBytes = output.toBytes();
649 kryoFactory.deleteKryo(kryo);
650
651 //
652 // Put the entry:
653 // - Key : Flow ID (Long)
654 // - Value : Serialized Flow (byte[])
655 //
656 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
657 }
658
659 /**
660 * Send a notification that a Flow is removed.
661 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700662 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700663 */
664 @Override
665 public void notificationSendFlowRemoved(FlowId flowId) {
666 //
667 // Remove the entry:
668 // - Key : Flow ID (Long)
669 // - Value : Serialized Flow (byte[])
670 //
671 mapFlow.removeAsync(flowId.value());
672 }
673
674 /**
675 * Send a notification that a Flow is updated.
676 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700677 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700678 */
679 @Override
680 public void notificationSendFlowUpdated(FlowPath flowPath) {
681 // NOTE: Adding an entry with an existing key automatically updates it
682 notificationSendFlowAdded(flowPath);
683 }
684
685 /**
686 * Send a notification that all Flows are removed.
687 */
688 @Override
689 public void notificationSendAllFlowsRemoved() {
690 //
691 // Remove all entries
692 // NOTE: We remove the entries one-by-one so the per-entry
693 // notifications will be delivered.
694 //
695 // mapFlow.clear();
696 Set<Long> keySet = mapFlow.keySet();
697 for (Long key : keySet) {
698 mapFlow.removeAsync(key);
699 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700700 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700701
702 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700703 * Get all Flow Entries that are currently in the datagrid.
704 *
705 * @return all Flow Entries that are currently in the datagrid.
706 */
707 @Override
708 public Collection<FlowEntry> getAllFlowEntries() {
709 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
710
711 //
712 // Get all current entries
713 //
714 Collection<byte[]> values = mapFlowEntry.values();
715 Kryo kryo = kryoFactory.newKryo();
716 for (byte[] valueBytes : values) {
717 //
718 // Decode the value
719 //
720 Input input = new Input(valueBytes);
721 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
722 allFlowEntries.add(flowEntry);
723 }
724 kryoFactory.deleteKryo(kryo);
725
726 return allFlowEntries;
727 }
728
729 /**
730 * Send a notification that a FlowEntry is added.
731 *
732 * @param flowEntry the FlowEntry that is added.
733 */
734 @Override
735 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
736 //
737 // Encode the value
738 //
739 byte[] buffer = new byte[MAX_BUFFER_SIZE];
740 Kryo kryo = kryoFactory.newKryo();
741 Output output = new Output(buffer, -1);
742 kryo.writeObject(output, flowEntry);
743 byte[] valueBytes = output.toBytes();
744 kryoFactory.deleteKryo(kryo);
745
746 //
747 // Put the entry:
748 // - Key : FlowEntry ID (Long)
749 // - Value : Serialized FlowEntry (byte[])
750 //
751 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
752 }
753
754 /**
755 * Send a notification that a FlowEntry is removed.
756 *
757 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
758 */
759 @Override
760 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
761 //
762 // Remove the entry:
763 // - Key : FlowEntry ID (Long)
764 // - Value : Serialized FlowEntry (byte[])
765 //
766 mapFlowEntry.removeAsync(flowEntryId.value());
767 }
768
769 /**
770 * Send a notification that a FlowEntry is updated.
771 *
772 * @param flowEntry the FlowEntry that is updated.
773 */
774 @Override
775 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
776 // NOTE: Adding an entry with an existing key automatically updates it
777 notificationSendFlowEntryAdded(flowEntry);
778 }
779
780 /**
781 * Send a notification that all Flow Entries are removed.
782 */
783 @Override
784 public void notificationSendAllFlowEntriesRemoved() {
785 //
786 // Remove all entries
787 // NOTE: We remove the entries one-by-one so the per-entry
788 // notifications will be delivered.
789 //
790 // mapFlowEntry.clear();
791 Set<Long> keySet = mapFlowEntry.keySet();
792 for (Long key : keySet) {
793 mapFlowEntry.removeAsync(key);
794 }
795 }
796
797 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700798 * Get all Topology Elements that are currently in the datagrid.
799 *
800 * @return all Topology Elements that are currently in the datagrid.
801 */
802 @Override
803 public Collection<TopologyElement> getAllTopologyElements() {
804 Collection<TopologyElement> allTopologyElements =
805 new LinkedList<TopologyElement>();
806
807 //
808 // Get all current entries
809 //
810 Collection<byte[]> values = mapTopology.values();
811 Kryo kryo = kryoFactory.newKryo();
812 for (byte[] valueBytes : values) {
813 //
814 // Decode the value
815 //
816 Input input = new Input(valueBytes);
817 TopologyElement topologyElement =
818 kryo.readObject(input, TopologyElement.class);
819 allTopologyElements.add(topologyElement);
820 }
821 kryoFactory.deleteKryo(kryo);
822
823 return allTopologyElements;
824 }
825
826 /**
827 * Send a notification that a Topology Element is added.
828 *
829 * @param topologyElement the Topology Element that is added.
830 */
831 @Override
832 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
833 //
834 // Encode the value
835 //
836 byte[] buffer = new byte[MAX_BUFFER_SIZE];
837 Kryo kryo = kryoFactory.newKryo();
838 Output output = new Output(buffer, -1);
839 kryo.writeObject(output, topologyElement);
840 byte[] valueBytes = output.toBytes();
841 kryoFactory.deleteKryo(kryo);
842
843 //
844 // Put the entry:
845 // - Key : TopologyElement ID (String)
846 // - Value : Serialized TopologyElement (byte[])
847 //
848 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
849 }
850
851 /**
852 * Send a notification that a Topology Element is removed.
853 *
854 * @param topologyElement the Topology Element that is removed.
855 */
856 @Override
857 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
858 //
859 // Remove the entry:
860 // - Key : TopologyElement ID (String)
861 // - Value : Serialized TopologyElement (byte[])
862 //
863 mapTopology.removeAsync(topologyElement.elementId());
864 }
865
866 /**
867 * Send a notification that a Topology Element is updated.
868 *
869 * @param topologyElement the Topology Element that is updated.
870 */
871 @Override
872 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
873 // NOTE: Adding an entry with an existing key automatically updates it
874 notificationSendTopologyElementAdded(topologyElement);
875 }
876
877 /**
878 * Send a notification that all Topology Elements are removed.
879 */
880 @Override
881 public void notificationSendAllTopologyElementsRemoved() {
882 //
883 // Remove all entries
884 // NOTE: We remove the entries one-by-one so the per-entry
885 // notifications will be delivered.
886 //
887 // mapTopology.clear();
888 Set<String> keySet = mapTopology.keySet();
889 for (String key : keySet) {
890 mapTopology.removeAsync(key);
891 }
892 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800893
894 @Override
895 public void sendArpRequest(byte[] arpRequest) {
896 log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
897 arpMap.putAsync(arpRequest, dummyByte, 1L, TimeUnit.MILLISECONDS);
898 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700899}