blob: 04e001e37a44f43613edb2b5bbd9630b2df005da [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080021import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070022import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070023import net.onrc.onos.ofcontroller.util.FlowEntry;
24import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070025import net.onrc.onos.ofcontroller.util.FlowId;
26import net.onrc.onos.ofcontroller.util.FlowPath;
27import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
28
Jonathan Hart18ad55c2013-11-11 22:49:55 -080029import org.openflow.util.HexString;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070030import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
Jonathan Hart18ad55c2013-11-11 22:49:55 -080033import com.esotericsoftware.kryo2.Kryo;
34import com.esotericsoftware.kryo2.io.Input;
35import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070036import com.hazelcast.config.Config;
37import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070038import com.hazelcast.core.EntryEvent;
39import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070040import com.hazelcast.core.Hazelcast;
41import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070042import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070043import com.hazelcast.instance.GroupProperties;
44
45/**
46 * A datagrid service that uses Hazelcast as a datagrid.
47 * The relevant data is stored in the Hazelcast datagrid and shared as
48 * appropriate in a multi-node cluster.
49 */
50public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070051 private final static int MAX_BUFFER_SIZE = 64*1024;
52
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070053 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070054 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070055 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070056
57 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070058 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070059 private Config hazelcastConfig = null;
60
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070061 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070062 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070063
64 // State related to the Flow map
65 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070066 private IMap<Long, byte[]> mapFlow = null;
67 private MapFlowListener mapFlowListener = null;
68 private String mapFlowListenerId = null;
69
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070070 // State related to the Flow Entry map
71 protected static final String mapFlowEntryName = "mapFlowEntry";
72 private IMap<Long, byte[]> mapFlowEntry = null;
73 private MapFlowEntryListener mapFlowEntryListener = null;
74 private String mapFlowEntryListenerId = null;
75
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070076 // State related to the Network Topology map
77 protected static final String mapTopologyName = "mapTopology";
78 private IMap<String, byte[]> mapTopology = null;
79 private MapTopologyListener mapTopologyListener = null;
80 private String mapTopologyListenerId = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080081
82 // State related to the ARP map
83 protected static final String arpMapName = "arpMap";
84 private IMap<byte[], byte[]> arpMap = null;
85 private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
86 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070087
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070088 /**
89 * Class for receiving notifications for Flow state.
90 *
91 * The datagrid map is:
92 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -080093 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070094 */
95 class MapFlowListener implements EntryListener<Long, byte[]> {
96 /**
97 * Receive a notification that an entry is added.
98 *
99 * @param event the notification event for the entry.
100 */
101 public void entryAdded(EntryEvent event) {
102 Long keyLong = (Long)event.getKey();
103 byte[] valueBytes = (byte[])event.getValue();
104
105 //
106 // Decode the value and deliver the notification
107 //
108 Kryo kryo = kryoFactory.newKryo();
109 Input input = new Input(valueBytes);
110 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
111 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700112 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700113 }
114
115 /**
116 * Receive a notification that an entry is removed.
117 *
118 * @param event the notification event for the entry.
119 */
120 public void entryRemoved(EntryEvent event) {
121 Long keyLong = (Long)event.getKey();
122 byte[] valueBytes = (byte[])event.getValue();
123
124 //
125 // Decode the value and deliver the notification
126 //
127 Kryo kryo = kryoFactory.newKryo();
128 Input input = new Input(valueBytes);
129 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
130 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700131 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700132 }
133
134 /**
135 * Receive a notification that an entry is updated.
136 *
137 * @param event the notification event for the entry.
138 */
139 public void entryUpdated(EntryEvent event) {
140 Long keyLong = (Long)event.getKey();
141 byte[] valueBytes = (byte[])event.getValue();
142
143 //
144 // Decode the value and deliver the notification
145 //
146 Kryo kryo = kryoFactory.newKryo();
147 Input input = new Input(valueBytes);
148 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
149 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700150 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700151 }
152
153 /**
154 * Receive a notification that an entry is evicted.
155 *
156 * @param event the notification event for the entry.
157 */
158 public void entryEvicted(EntryEvent event) {
159 // NOTE: We don't use eviction for this map
160 }
161 }
162
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700163 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700164 * Class for receiving notifications for FlowEntry state.
165 *
166 * The datagrid map is:
167 * - Key : FlowEntry ID (Long)
168 * - Value : Serialized FlowEntry (byte[])
169 */
170 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
171 /**
172 * Receive a notification that an entry is added.
173 *
174 * @param event the notification event for the entry.
175 */
176 public void entryAdded(EntryEvent event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700177 //
178 // NOTE: Ignore Flow Entries Events originated by this instance
179 //
180 if (event.getMember().localMember())
181 return;
182
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700183 Long keyLong = (Long)event.getKey();
184 byte[] valueBytes = (byte[])event.getValue();
185
186 //
187 // Decode the value and deliver the notification
188 //
189 Kryo kryo = kryoFactory.newKryo();
190 Input input = new Input(valueBytes);
191 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
192 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700193 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700194 }
195
196 /**
197 * Receive a notification that an entry is removed.
198 *
199 * @param event the notification event for the entry.
200 */
201 public void entryRemoved(EntryEvent event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700202 //
203 // NOTE: Ignore Flow Entries Events originated by this instance
204 //
205 if (event.getMember().localMember())
206 return;
207
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700208 Long keyLong = (Long)event.getKey();
209 byte[] valueBytes = (byte[])event.getValue();
210
211 //
212 // Decode the value and deliver the notification
213 //
214 Kryo kryo = kryoFactory.newKryo();
215 Input input = new Input(valueBytes);
216 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
217 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700218 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700219 }
220
221 /**
222 * Receive a notification that an entry is updated.
223 *
224 * @param event the notification event for the entry.
225 */
226 public void entryUpdated(EntryEvent event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700227 //
228 // NOTE: Ignore Flow Entries Events originated by this instance
229 //
230 if (event.getMember().localMember())
231 return;
232
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700233 Long keyLong = (Long)event.getKey();
234 byte[] valueBytes = (byte[])event.getValue();
235
236 //
237 // Decode the value and deliver the notification
238 //
239 Kryo kryo = kryoFactory.newKryo();
240 Input input = new Input(valueBytes);
241 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
242 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700243 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700244 }
245
246 /**
247 * Receive a notification that an entry is evicted.
248 *
249 * @param event the notification event for the entry.
250 */
251 public void entryEvicted(EntryEvent event) {
252 // NOTE: We don't use eviction for this map
253 }
254 }
255
256 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700257 * Class for receiving notifications for Network Topology state.
258 *
259 * The datagrid map is:
260 * - Key: TopologyElement ID (String)
261 * - Value: Serialized TopologyElement (byte[])
262 */
263 class MapTopologyListener implements EntryListener<String, byte[]> {
264 /**
265 * Receive a notification that an entry is added.
266 *
267 * @param event the notification event for the entry.
268 */
269 public void entryAdded(EntryEvent event) {
270 String keyString = (String)event.getKey();
271 byte[] valueBytes = (byte[])event.getValue();
272
273 //
274 // Decode the value and deliver the notification
275 //
276 Kryo kryo = kryoFactory.newKryo();
277 Input input = new Input(valueBytes);
278 TopologyElement topologyElement =
279 kryo.readObject(input, TopologyElement.class);
280 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700281 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700282 }
283
284 /**
285 * Receive a notification that an entry is removed.
286 *
287 * @param event the notification event for the entry.
288 */
289 public void entryRemoved(EntryEvent event) {
290 String keyString = (String)event.getKey();
291 byte[] valueBytes = (byte[])event.getValue();
292
293 //
294 // Decode the value and deliver the notification
295 //
296 Kryo kryo = kryoFactory.newKryo();
297 Input input = new Input(valueBytes);
298 TopologyElement topologyElement =
299 kryo.readObject(input, TopologyElement.class);
300 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700301 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700302 }
303
304 /**
305 * Receive a notification that an entry is updated.
306 *
307 * @param event the notification event for the entry.
308 */
309 public void entryUpdated(EntryEvent event) {
310 String keyString = (String)event.getKey();
311 byte[] valueBytes = (byte[])event.getValue();
312
313 //
314 // Decode the value and deliver the notification
315 //
316 Kryo kryo = kryoFactory.newKryo();
317 Input input = new Input(valueBytes);
318 TopologyElement topologyElement =
319 kryo.readObject(input, TopologyElement.class);
320 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700321 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700322 }
323
324 /**
325 * Receive a notification that an entry is evicted.
326 *
327 * @param event the notification event for the entry.
328 */
329 public void entryEvicted(EntryEvent event) {
330 // NOTE: We don't use eviction for this map
331 }
332 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800333
334 /**
335 * Class for receiving notifications for ARP requests.
336 *
337 * The datagrid map is:
338 * - Key: Request ID (String)
339 * - Value: ARP request packet (byte[])
340 */
341 class ArpMapListener implements EntryListener<byte[], byte[]> {
342 /**
343 * Receive a notification that an entry is added.
344 *
345 * @param event the notification event for the entry.
346 */
347 public void entryAdded(EntryEvent<byte[], byte[]> event) {
348 for (IArpEventHandler arpEventHandler : arpEventHandlers) {
349 arpEventHandler.arpRequestNotification(event.getKey());
350 }
351
352 //
353 // Decode the value and deliver the notification
354 //
355 /*
356 Kryo kryo = kryoFactory.newKryo();
357 Input input = new Input(valueBytes);
358 TopologyElement topologyElement =
359 kryo.readObject(input, TopologyElement.class);
360 kryoFactory.deleteKryo(kryo);
361 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
362 */
363 }
364
365 /**
366 * Receive a notification that an entry is removed.
367 *
368 * @param event the notification event for the entry.
369 */
370 public void entryRemoved(EntryEvent<byte[], byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800371 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800372 }
373
374 /**
375 * Receive a notification that an entry is updated.
376 *
377 * @param event the notification event for the entry.
378 */
379 public void entryUpdated(EntryEvent<byte[], byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800380 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800381 }
382
383 /**
384 * Receive a notification that an entry is evicted.
385 *
386 * @param event the notification event for the entry.
387 */
388 public void entryEvicted(EntryEvent<byte[], byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800389 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800390 }
391 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700392
393 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700394 * Initialize the Hazelcast Datagrid operation.
395 *
396 * @param conf the configuration filename.
397 */
398 public void init(String configFilename) {
399 /*
400 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
401 System.setProperty("hazelcast.socket.send.buffer.size", "32");
402 */
403 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
404
405 // Init from configuration file
406 try {
407 hazelcastConfig = new FileSystemXmlConfig(configFilename);
408 } catch (FileNotFoundException e) {
409 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
410 }
411 /*
412 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
413 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
414 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
415 */
416 //
417 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
418 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
419 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
420 }
421
422 /**
423 * Shutdown the Hazelcast Datagrid operation.
424 */
425 public void finalize() {
426 close();
427 }
428
429 /**
430 * Shutdown the Hazelcast Datagrid operation.
431 */
432 public void close() {
433 Hazelcast.shutdownAll();
434 }
435
436 /**
437 * Get the collection of offered module services.
438 *
439 * @return the collection of offered module services.
440 */
441 @Override
442 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
443 Collection<Class<? extends IFloodlightService>> l =
444 new ArrayList<Class<? extends IFloodlightService>>();
445 l.add(IDatagridService.class);
446 return l;
447 }
448
449 /**
450 * Get the collection of implemented services.
451 *
452 * @return the collection of implemented services.
453 */
454 @Override
455 public Map<Class<? extends IFloodlightService>, IFloodlightService>
456 getServiceImpls() {
457 Map<Class<? extends IFloodlightService>,
Pavlin Radoslavov27da7532013-10-18 18:41:50 -0700458 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700459 new HashMap<Class<? extends IFloodlightService>,
460 IFloodlightService>();
461 m.put(IDatagridService.class, this);
462 return m;
463 }
464
465 /**
466 * Get the collection of modules this module depends on.
467 *
468 * @return the collection of modules this module depends on.
469 */
470 @Override
471 public Collection<Class<? extends IFloodlightService>>
472 getModuleDependencies() {
473 Collection<Class<? extends IFloodlightService>> l =
474 new ArrayList<Class<? extends IFloodlightService>>();
475 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700476 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700477 return l;
478 }
479
480 /**
481 * Initialize the module.
482 *
483 * @param context the module context to use for the initialization.
484 */
485 @Override
486 public void init(FloodlightModuleContext context)
487 throws FloodlightModuleException {
488 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700489 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700490
491 // Get the configuration file name and configure the Datagrid
492 Map<String, String> configMap = context.getConfigParams(this);
493 String configFilename = configMap.get(HazelcastConfigFile);
494 this.init(configFilename);
495 }
496
497 /**
498 * Startup module operation.
499 *
500 * @param context the module context to use for the startup.
501 */
502 @Override
503 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700504 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700505
506 restApi.addRestletRoutable(new DatagridWebRoutable());
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800507
508 arpMap = hazelcastInstance.getMap(arpMapName);
509 arpMap.addEntryListener(new ArpMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700510 }
511
512 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700513 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700514 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700515 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700516 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700517 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700518 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700519 */
520 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700521 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
522 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700523
524 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700525 mapFlowListener = new MapFlowListener();
526 mapFlow = hazelcastInstance.getMap(mapFlowName);
527 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700528
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700529 // Initialize the FlowEntry-related map state
530 mapFlowEntryListener = new MapFlowEntryListener();
531 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
532 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
533
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700534 // Initialize the Topology-related map state
535 mapTopologyListener = new MapTopologyListener();
536 mapTopology = hazelcastInstance.getMap(mapTopologyName);
537 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700538 }
539
540 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700541 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700542 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700543 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700544 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700545 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700546 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700547 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700548 */
549 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700550 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700551 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700552 mapFlow.removeEntryListener(mapFlowListenerId);
553 mapFlow = null;
554 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700555
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700556 // Clear the FlowEntry-related map state
557 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
558 mapFlowEntry = null;
559 mapFlowEntryListener = null;
560
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700561 // Clear the Topology-related map state
562 mapTopology.removeEntryListener(mapTopologyListenerId);
563 mapTopology = null;
564 mapTopologyListener = null;
565
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700566 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700567 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800568
569 @Override
570 public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
571 if (arpEventHandler != null) {
572 arpEventHandlers.add(arpEventHandler);
573 }
574 }
575
576 @Override
577 public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
578 arpEventHandlers.remove(arpEventHandler);
579 }
580
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700581 /**
582 * Get all Flows that are currently in the datagrid.
583 *
584 * @return all Flows that are currently in the datagrid.
585 */
586 @Override
587 public Collection<FlowPath> getAllFlows() {
588 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
589
590 //
591 // Get all current entries
592 //
593 Collection<byte[]> values = mapFlow.values();
594 Kryo kryo = kryoFactory.newKryo();
595 for (byte[] valueBytes : values) {
596 //
597 // Decode the value
598 //
599 Input input = new Input(valueBytes);
600 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
601 allFlows.add(flowPath);
602 }
603 kryoFactory.deleteKryo(kryo);
604
605 return allFlows;
606 }
607
608 /**
609 * Send a notification that a Flow is added.
610 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700611 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700612 */
613 @Override
614 public void notificationSendFlowAdded(FlowPath flowPath) {
615 //
616 // Encode the value
617 //
618 byte[] buffer = new byte[MAX_BUFFER_SIZE];
619 Kryo kryo = kryoFactory.newKryo();
620 Output output = new Output(buffer, -1);
621 kryo.writeObject(output, flowPath);
622 byte[] valueBytes = output.toBytes();
623 kryoFactory.deleteKryo(kryo);
624
625 //
626 // Put the entry:
627 // - Key : Flow ID (Long)
628 // - Value : Serialized Flow (byte[])
629 //
630 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
631 }
632
633 /**
634 * Send a notification that a Flow is removed.
635 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700636 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700637 */
638 @Override
639 public void notificationSendFlowRemoved(FlowId flowId) {
640 //
641 // Remove the entry:
642 // - Key : Flow ID (Long)
643 // - Value : Serialized Flow (byte[])
644 //
645 mapFlow.removeAsync(flowId.value());
646 }
647
648 /**
649 * Send a notification that a Flow is updated.
650 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700651 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700652 */
653 @Override
654 public void notificationSendFlowUpdated(FlowPath flowPath) {
655 // NOTE: Adding an entry with an existing key automatically updates it
656 notificationSendFlowAdded(flowPath);
657 }
658
659 /**
660 * Send a notification that all Flows are removed.
661 */
662 @Override
663 public void notificationSendAllFlowsRemoved() {
664 //
665 // Remove all entries
666 // NOTE: We remove the entries one-by-one so the per-entry
667 // notifications will be delivered.
668 //
669 // mapFlow.clear();
670 Set<Long> keySet = mapFlow.keySet();
671 for (Long key : keySet) {
672 mapFlow.removeAsync(key);
673 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700674 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700675
676 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700677 * Get all Flow Entries that are currently in the datagrid.
678 *
679 * @return all Flow Entries that are currently in the datagrid.
680 */
681 @Override
682 public Collection<FlowEntry> getAllFlowEntries() {
683 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
684
685 //
686 // Get all current entries
687 //
688 Collection<byte[]> values = mapFlowEntry.values();
689 Kryo kryo = kryoFactory.newKryo();
690 for (byte[] valueBytes : values) {
691 //
692 // Decode the value
693 //
694 Input input = new Input(valueBytes);
695 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
696 allFlowEntries.add(flowEntry);
697 }
698 kryoFactory.deleteKryo(kryo);
699
700 return allFlowEntries;
701 }
702
703 /**
704 * Send a notification that a FlowEntry is added.
705 *
706 * @param flowEntry the FlowEntry that is added.
707 */
708 @Override
709 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
710 //
711 // Encode the value
712 //
713 byte[] buffer = new byte[MAX_BUFFER_SIZE];
714 Kryo kryo = kryoFactory.newKryo();
715 Output output = new Output(buffer, -1);
716 kryo.writeObject(output, flowEntry);
717 byte[] valueBytes = output.toBytes();
718 kryoFactory.deleteKryo(kryo);
719
720 //
721 // Put the entry:
722 // - Key : FlowEntry ID (Long)
723 // - Value : Serialized FlowEntry (byte[])
724 //
725 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
726 }
727
728 /**
729 * Send a notification that a FlowEntry is removed.
730 *
731 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
732 */
733 @Override
734 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
735 //
736 // Remove the entry:
737 // - Key : FlowEntry ID (Long)
738 // - Value : Serialized FlowEntry (byte[])
739 //
740 mapFlowEntry.removeAsync(flowEntryId.value());
741 }
742
743 /**
744 * Send a notification that a FlowEntry is updated.
745 *
746 * @param flowEntry the FlowEntry that is updated.
747 */
748 @Override
749 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
750 // NOTE: Adding an entry with an existing key automatically updates it
751 notificationSendFlowEntryAdded(flowEntry);
752 }
753
754 /**
755 * Send a notification that all Flow Entries are removed.
756 */
757 @Override
758 public void notificationSendAllFlowEntriesRemoved() {
759 //
760 // Remove all entries
761 // NOTE: We remove the entries one-by-one so the per-entry
762 // notifications will be delivered.
763 //
764 // mapFlowEntry.clear();
765 Set<Long> keySet = mapFlowEntry.keySet();
766 for (Long key : keySet) {
767 mapFlowEntry.removeAsync(key);
768 }
769 }
770
771 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700772 * Get all Topology Elements that are currently in the datagrid.
773 *
774 * @return all Topology Elements that are currently in the datagrid.
775 */
776 @Override
777 public Collection<TopologyElement> getAllTopologyElements() {
778 Collection<TopologyElement> allTopologyElements =
779 new LinkedList<TopologyElement>();
780
781 //
782 // Get all current entries
783 //
784 Collection<byte[]> values = mapTopology.values();
785 Kryo kryo = kryoFactory.newKryo();
786 for (byte[] valueBytes : values) {
787 //
788 // Decode the value
789 //
790 Input input = new Input(valueBytes);
791 TopologyElement topologyElement =
792 kryo.readObject(input, TopologyElement.class);
793 allTopologyElements.add(topologyElement);
794 }
795 kryoFactory.deleteKryo(kryo);
796
797 return allTopologyElements;
798 }
799
800 /**
801 * Send a notification that a Topology Element is added.
802 *
803 * @param topologyElement the Topology Element that is added.
804 */
805 @Override
806 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
807 //
808 // Encode the value
809 //
810 byte[] buffer = new byte[MAX_BUFFER_SIZE];
811 Kryo kryo = kryoFactory.newKryo();
812 Output output = new Output(buffer, -1);
813 kryo.writeObject(output, topologyElement);
814 byte[] valueBytes = output.toBytes();
815 kryoFactory.deleteKryo(kryo);
816
817 //
818 // Put the entry:
819 // - Key : TopologyElement ID (String)
820 // - Value : Serialized TopologyElement (byte[])
821 //
822 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
823 }
824
825 /**
826 * Send a notification that a Topology Element is removed.
827 *
828 * @param topologyElement the Topology Element that is removed.
829 */
830 @Override
831 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
832 //
833 // Remove the entry:
834 // - Key : TopologyElement ID (String)
835 // - Value : Serialized TopologyElement (byte[])
836 //
837 mapTopology.removeAsync(topologyElement.elementId());
838 }
839
840 /**
841 * Send a notification that a Topology Element is updated.
842 *
843 * @param topologyElement the Topology Element that is updated.
844 */
845 @Override
846 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
847 // NOTE: Adding an entry with an existing key automatically updates it
848 notificationSendTopologyElementAdded(topologyElement);
849 }
850
851 /**
852 * Send a notification that all Topology Elements are removed.
853 */
854 @Override
855 public void notificationSendAllTopologyElementsRemoved() {
856 //
857 // Remove all entries
858 // NOTE: We remove the entries one-by-one so the per-entry
859 // notifications will be delivered.
860 //
861 // mapTopology.clear();
862 Set<String> keySet = mapTopology.keySet();
863 for (String key : keySet) {
864 mapTopology.removeAsync(key);
865 }
866 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800867
868 @Override
869 public void sendArpRequest(byte[] arpRequest) {
870 log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
871 arpMap.putAsync(arpRequest, dummyByte, 1L, TimeUnit.MILLISECONDS);
872 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700873}