blob: 775f9528713dbaaec42a12ff5cb96111c7864eb0 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hartd3003252013-11-15 09:44:46 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080022import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070023import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070024import net.onrc.onos.ofcontroller.util.FlowEntry;
25import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070026import net.onrc.onos.ofcontroller.util.FlowId;
27import net.onrc.onos.ofcontroller.util.FlowPath;
28import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
29
Jonathan Hart18ad55c2013-11-11 22:49:55 -080030import org.openflow.util.HexString;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070031import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
Jonathan Hart18ad55c2013-11-11 22:49:55 -080034import com.esotericsoftware.kryo2.Kryo;
35import com.esotericsoftware.kryo2.io.Input;
36import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070037import com.hazelcast.config.Config;
38import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070039import com.hazelcast.core.EntryEvent;
40import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070041import com.hazelcast.core.Hazelcast;
42import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070043import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070044import com.hazelcast.instance.GroupProperties;
45
46/**
47 * A datagrid service that uses Hazelcast as a datagrid.
48 * The relevant data is stored in the Hazelcast datagrid and shared as
49 * appropriate in a multi-node cluster.
50 */
51public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070052 private final static int MAX_BUFFER_SIZE = 64*1024;
53
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070054 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070055 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070056 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070057
58 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070059 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070060 private Config hazelcastConfig = null;
61
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070062 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070063 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070064
65 // State related to the Flow map
66 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070067 private IMap<Long, byte[]> mapFlow = null;
68 private MapFlowListener mapFlowListener = null;
69 private String mapFlowListenerId = null;
70
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070071 // State related to the Flow Entry map
72 protected static final String mapFlowEntryName = "mapFlowEntry";
73 private IMap<Long, byte[]> mapFlowEntry = null;
74 private MapFlowEntryListener mapFlowEntryListener = null;
75 private String mapFlowEntryListenerId = null;
76
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070077 // State related to the Network Topology map
78 protected static final String mapTopologyName = "mapTopology";
79 private IMap<String, byte[]> mapTopology = null;
80 private MapTopologyListener mapTopologyListener = null;
81 private String mapTopologyListenerId = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080082
83 // State related to the ARP map
84 protected static final String arpMapName = "arpMap";
Jonathan Hartd3003252013-11-15 09:44:46 -080085 private IMap<ArpMessage, byte[]> arpMap = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080086 private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
87 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070088
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070089 /**
90 * Class for receiving notifications for Flow state.
91 *
92 * The datagrid map is:
93 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -080094 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070095 */
96 class MapFlowListener implements EntryListener<Long, byte[]> {
97 /**
98 * Receive a notification that an entry is added.
99 *
100 * @param event the notification event for the entry.
101 */
102 public void entryAdded(EntryEvent event) {
103 Long keyLong = (Long)event.getKey();
104 byte[] valueBytes = (byte[])event.getValue();
105
106 //
107 // Decode the value and deliver the notification
108 //
109 Kryo kryo = kryoFactory.newKryo();
110 Input input = new Input(valueBytes);
111 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
112 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700113 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700114 }
115
116 /**
117 * Receive a notification that an entry is removed.
118 *
119 * @param event the notification event for the entry.
120 */
121 public void entryRemoved(EntryEvent event) {
122 Long keyLong = (Long)event.getKey();
123 byte[] valueBytes = (byte[])event.getValue();
124
125 //
126 // Decode the value and deliver the notification
127 //
128 Kryo kryo = kryoFactory.newKryo();
129 Input input = new Input(valueBytes);
130 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
131 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700132 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700133 }
134
135 /**
136 * Receive a notification that an entry is updated.
137 *
138 * @param event the notification event for the entry.
139 */
140 public void entryUpdated(EntryEvent event) {
141 Long keyLong = (Long)event.getKey();
142 byte[] valueBytes = (byte[])event.getValue();
143
144 //
145 // Decode the value and deliver the notification
146 //
147 Kryo kryo = kryoFactory.newKryo();
148 Input input = new Input(valueBytes);
149 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
150 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700151 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700152 }
153
154 /**
155 * Receive a notification that an entry is evicted.
156 *
157 * @param event the notification event for the entry.
158 */
159 public void entryEvicted(EntryEvent event) {
160 // NOTE: We don't use eviction for this map
161 }
162 }
163
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700164 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700165 * Class for receiving notifications for FlowEntry state.
166 *
167 * The datagrid map is:
168 * - Key : FlowEntry ID (Long)
169 * - Value : Serialized FlowEntry (byte[])
170 */
171 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
172 /**
173 * Receive a notification that an entry is added.
174 *
175 * @param event the notification event for the entry.
176 */
177 public void entryAdded(EntryEvent event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700178 //
179 // NOTE: Ignore Flow Entries Events originated by this instance
180 //
181 if (event.getMember().localMember())
182 return;
183
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700184 Long keyLong = (Long)event.getKey();
185 byte[] valueBytes = (byte[])event.getValue();
186
187 //
188 // Decode the value and deliver the notification
189 //
190 Kryo kryo = kryoFactory.newKryo();
191 Input input = new Input(valueBytes);
192 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
193 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700194 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700195 }
196
197 /**
198 * Receive a notification that an entry is removed.
199 *
200 * @param event the notification event for the entry.
201 */
202 public void entryRemoved(EntryEvent event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700203 //
204 // NOTE: Ignore Flow Entries Events originated by this instance
205 //
206 if (event.getMember().localMember())
207 return;
208
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700209 Long keyLong = (Long)event.getKey();
210 byte[] valueBytes = (byte[])event.getValue();
211
212 //
213 // Decode the value and deliver the notification
214 //
215 Kryo kryo = kryoFactory.newKryo();
216 Input input = new Input(valueBytes);
217 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
218 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700219 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700220 }
221
222 /**
223 * Receive a notification that an entry is updated.
224 *
225 * @param event the notification event for the entry.
226 */
227 public void entryUpdated(EntryEvent event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700228 //
229 // NOTE: Ignore Flow Entries Events originated by this instance
230 //
231 if (event.getMember().localMember())
232 return;
233
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700234 Long keyLong = (Long)event.getKey();
235 byte[] valueBytes = (byte[])event.getValue();
236
237 //
238 // Decode the value and deliver the notification
239 //
240 Kryo kryo = kryoFactory.newKryo();
241 Input input = new Input(valueBytes);
242 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
243 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700244 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700245 }
246
247 /**
248 * Receive a notification that an entry is evicted.
249 *
250 * @param event the notification event for the entry.
251 */
252 public void entryEvicted(EntryEvent event) {
253 // NOTE: We don't use eviction for this map
254 }
255 }
256
257 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700258 * Class for receiving notifications for Network Topology state.
259 *
260 * The datagrid map is:
261 * - Key: TopologyElement ID (String)
262 * - Value: Serialized TopologyElement (byte[])
263 */
264 class MapTopologyListener implements EntryListener<String, byte[]> {
265 /**
266 * Receive a notification that an entry is added.
267 *
268 * @param event the notification event for the entry.
269 */
270 public void entryAdded(EntryEvent event) {
271 String keyString = (String)event.getKey();
272 byte[] valueBytes = (byte[])event.getValue();
273
274 //
275 // Decode the value and deliver the notification
276 //
277 Kryo kryo = kryoFactory.newKryo();
278 Input input = new Input(valueBytes);
279 TopologyElement topologyElement =
280 kryo.readObject(input, TopologyElement.class);
281 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700282 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700283 }
284
285 /**
286 * Receive a notification that an entry is removed.
287 *
288 * @param event the notification event for the entry.
289 */
290 public void entryRemoved(EntryEvent event) {
291 String keyString = (String)event.getKey();
292 byte[] valueBytes = (byte[])event.getValue();
293
294 //
295 // Decode the value and deliver the notification
296 //
297 Kryo kryo = kryoFactory.newKryo();
298 Input input = new Input(valueBytes);
299 TopologyElement topologyElement =
300 kryo.readObject(input, TopologyElement.class);
301 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700302 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700303 }
304
305 /**
306 * Receive a notification that an entry is updated.
307 *
308 * @param event the notification event for the entry.
309 */
310 public void entryUpdated(EntryEvent event) {
311 String keyString = (String)event.getKey();
312 byte[] valueBytes = (byte[])event.getValue();
313
314 //
315 // Decode the value and deliver the notification
316 //
317 Kryo kryo = kryoFactory.newKryo();
318 Input input = new Input(valueBytes);
319 TopologyElement topologyElement =
320 kryo.readObject(input, TopologyElement.class);
321 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700322 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700323 }
324
325 /**
326 * Receive a notification that an entry is evicted.
327 *
328 * @param event the notification event for the entry.
329 */
330 public void entryEvicted(EntryEvent event) {
331 // NOTE: We don't use eviction for this map
332 }
333 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800334
335 /**
336 * Class for receiving notifications for ARP requests.
337 *
338 * The datagrid map is:
339 * - Key: Request ID (String)
340 * - Value: ARP request packet (byte[])
341 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800342 class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800343 /**
344 * Receive a notification that an entry is added.
345 *
346 * @param event the notification event for the entry.
347 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800348 public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800349 for (IArpEventHandler arpEventHandler : arpEventHandlers) {
350 arpEventHandler.arpRequestNotification(event.getKey());
351 }
352
353 //
354 // Decode the value and deliver the notification
355 //
356 /*
357 Kryo kryo = kryoFactory.newKryo();
358 Input input = new Input(valueBytes);
359 TopologyElement topologyElement =
360 kryo.readObject(input, TopologyElement.class);
361 kryoFactory.deleteKryo(kryo);
362 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
363 */
364 }
365
366 /**
367 * Receive a notification that an entry is removed.
368 *
369 * @param event the notification event for the entry.
370 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800371 public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800372 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800373 }
374
375 /**
376 * Receive a notification that an entry is updated.
377 *
378 * @param event the notification event for the entry.
379 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800380 public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800381 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800382 }
383
384 /**
385 * Receive a notification that an entry is evicted.
386 *
387 * @param event the notification event for the entry.
388 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800389 public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800390 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800391 }
392 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700393
394 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700395 * Initialize the Hazelcast Datagrid operation.
396 *
397 * @param conf the configuration filename.
398 */
399 public void init(String configFilename) {
400 /*
401 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
402 System.setProperty("hazelcast.socket.send.buffer.size", "32");
403 */
404 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
405
406 // Init from configuration file
407 try {
408 hazelcastConfig = new FileSystemXmlConfig(configFilename);
409 } catch (FileNotFoundException e) {
410 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
411 }
412 /*
413 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
414 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
415 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
416 */
417 //
418 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
419 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
420 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
421 }
422
423 /**
424 * Shutdown the Hazelcast Datagrid operation.
425 */
426 public void finalize() {
427 close();
428 }
429
430 /**
431 * Shutdown the Hazelcast Datagrid operation.
432 */
433 public void close() {
434 Hazelcast.shutdownAll();
435 }
436
437 /**
438 * Get the collection of offered module services.
439 *
440 * @return the collection of offered module services.
441 */
442 @Override
443 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
444 Collection<Class<? extends IFloodlightService>> l =
445 new ArrayList<Class<? extends IFloodlightService>>();
446 l.add(IDatagridService.class);
447 return l;
448 }
449
450 /**
451 * Get the collection of implemented services.
452 *
453 * @return the collection of implemented services.
454 */
455 @Override
456 public Map<Class<? extends IFloodlightService>, IFloodlightService>
457 getServiceImpls() {
458 Map<Class<? extends IFloodlightService>,
Pavlin Radoslavov27da7532013-10-18 18:41:50 -0700459 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700460 new HashMap<Class<? extends IFloodlightService>,
461 IFloodlightService>();
462 m.put(IDatagridService.class, this);
463 return m;
464 }
465
466 /**
467 * Get the collection of modules this module depends on.
468 *
469 * @return the collection of modules this module depends on.
470 */
471 @Override
472 public Collection<Class<? extends IFloodlightService>>
473 getModuleDependencies() {
474 Collection<Class<? extends IFloodlightService>> l =
475 new ArrayList<Class<? extends IFloodlightService>>();
476 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700477 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700478 return l;
479 }
480
481 /**
482 * Initialize the module.
483 *
484 * @param context the module context to use for the initialization.
485 */
486 @Override
487 public void init(FloodlightModuleContext context)
488 throws FloodlightModuleException {
489 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700490 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700491
492 // Get the configuration file name and configure the Datagrid
493 Map<String, String> configMap = context.getConfigParams(this);
494 String configFilename = configMap.get(HazelcastConfigFile);
495 this.init(configFilename);
496 }
497
498 /**
499 * Startup module operation.
500 *
501 * @param context the module context to use for the startup.
502 */
503 @Override
504 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700505 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700506
507 restApi.addRestletRoutable(new DatagridWebRoutable());
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800508
509 arpMap = hazelcastInstance.getMap(arpMapName);
510 arpMap.addEntryListener(new ArpMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700511 }
512
513 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700514 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700515 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700516 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700517 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700518 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700519 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700520 */
521 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700522 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
523 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700524
525 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700526 mapFlowListener = new MapFlowListener();
527 mapFlow = hazelcastInstance.getMap(mapFlowName);
528 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700529
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700530 // Initialize the FlowEntry-related map state
531 mapFlowEntryListener = new MapFlowEntryListener();
532 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
533 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
534
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700535 // Initialize the Topology-related map state
536 mapTopologyListener = new MapTopologyListener();
537 mapTopology = hazelcastInstance.getMap(mapTopologyName);
538 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700539 }
540
541 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700542 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700543 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700544 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700545 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700546 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700547 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700548 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700549 */
550 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700551 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700552 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700553 mapFlow.removeEntryListener(mapFlowListenerId);
554 mapFlow = null;
555 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700556
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700557 // Clear the FlowEntry-related map state
558 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
559 mapFlowEntry = null;
560 mapFlowEntryListener = null;
561
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700562 // Clear the Topology-related map state
563 mapTopology.removeEntryListener(mapTopologyListenerId);
564 mapTopology = null;
565 mapTopologyListener = null;
566
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700567 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700568 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800569
570 @Override
571 public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
572 if (arpEventHandler != null) {
573 arpEventHandlers.add(arpEventHandler);
574 }
575 }
576
577 @Override
578 public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
579 arpEventHandlers.remove(arpEventHandler);
580 }
581
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700582 /**
583 * Get all Flows that are currently in the datagrid.
584 *
585 * @return all Flows that are currently in the datagrid.
586 */
587 @Override
588 public Collection<FlowPath> getAllFlows() {
589 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
590
591 //
592 // Get all current entries
593 //
594 Collection<byte[]> values = mapFlow.values();
595 Kryo kryo = kryoFactory.newKryo();
596 for (byte[] valueBytes : values) {
597 //
598 // Decode the value
599 //
600 Input input = new Input(valueBytes);
601 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
602 allFlows.add(flowPath);
603 }
604 kryoFactory.deleteKryo(kryo);
605
606 return allFlows;
607 }
608
609 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800610 * Get a Flow for a given Flow ID.
611 *
612 * @param flowId the Flow ID of the Flow to get.
613 * @return the Flow if found, otherwise null.
614 */
615 @Override
616 public FlowPath getFlow(FlowId flowId) {
617 byte[] valueBytes = mapFlow.get(flowId.value());
618 if (valueBytes == null)
619 return null;
620
621 Kryo kryo = kryoFactory.newKryo();
622 //
623 // Decode the value
624 //
625 Input input = new Input(valueBytes);
626 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
627 kryoFactory.deleteKryo(kryo);
628
629 return flowPath;
630 }
631
632 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700633 * Send a notification that a Flow is added.
634 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700635 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700636 */
637 @Override
638 public void notificationSendFlowAdded(FlowPath flowPath) {
639 //
640 // Encode the value
641 //
642 byte[] buffer = new byte[MAX_BUFFER_SIZE];
643 Kryo kryo = kryoFactory.newKryo();
644 Output output = new Output(buffer, -1);
645 kryo.writeObject(output, flowPath);
646 byte[] valueBytes = output.toBytes();
647 kryoFactory.deleteKryo(kryo);
648
649 //
650 // Put the entry:
651 // - Key : Flow ID (Long)
652 // - Value : Serialized Flow (byte[])
653 //
654 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
655 }
656
657 /**
658 * Send a notification that a Flow is removed.
659 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700660 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700661 */
662 @Override
663 public void notificationSendFlowRemoved(FlowId flowId) {
664 //
665 // Remove the entry:
666 // - Key : Flow ID (Long)
667 // - Value : Serialized Flow (byte[])
668 //
669 mapFlow.removeAsync(flowId.value());
670 }
671
672 /**
673 * Send a notification that a Flow is updated.
674 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700675 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700676 */
677 @Override
678 public void notificationSendFlowUpdated(FlowPath flowPath) {
679 // NOTE: Adding an entry with an existing key automatically updates it
680 notificationSendFlowAdded(flowPath);
681 }
682
683 /**
684 * Send a notification that all Flows are removed.
685 */
686 @Override
687 public void notificationSendAllFlowsRemoved() {
688 //
689 // Remove all entries
690 // NOTE: We remove the entries one-by-one so the per-entry
691 // notifications will be delivered.
692 //
693 // mapFlow.clear();
694 Set<Long> keySet = mapFlow.keySet();
695 for (Long key : keySet) {
696 mapFlow.removeAsync(key);
697 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700698 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700699
700 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700701 * Get all Flow Entries that are currently in the datagrid.
702 *
703 * @return all Flow Entries that are currently in the datagrid.
704 */
705 @Override
706 public Collection<FlowEntry> getAllFlowEntries() {
707 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
708
709 //
710 // Get all current entries
711 //
712 Collection<byte[]> values = mapFlowEntry.values();
713 Kryo kryo = kryoFactory.newKryo();
714 for (byte[] valueBytes : values) {
715 //
716 // Decode the value
717 //
718 Input input = new Input(valueBytes);
719 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
720 allFlowEntries.add(flowEntry);
721 }
722 kryoFactory.deleteKryo(kryo);
723
724 return allFlowEntries;
725 }
726
727 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800728 * Get a Flow Entry for a given Flow Entry ID.
729 *
730 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
731 * @return the Flow Entry if found, otherwise null.
732 */
733 @Override
734 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
735 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
736 if (valueBytes == null)
737 return null;
738
739 Kryo kryo = kryoFactory.newKryo();
740 //
741 // Decode the value
742 //
743 Input input = new Input(valueBytes);
744 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
745 kryoFactory.deleteKryo(kryo);
746
747 return flowEntry;
748 }
749
750 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700751 * Send a notification that a FlowEntry is added.
752 *
753 * @param flowEntry the FlowEntry that is added.
754 */
755 @Override
756 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
757 //
758 // Encode the value
759 //
760 byte[] buffer = new byte[MAX_BUFFER_SIZE];
761 Kryo kryo = kryoFactory.newKryo();
762 Output output = new Output(buffer, -1);
763 kryo.writeObject(output, flowEntry);
764 byte[] valueBytes = output.toBytes();
765 kryoFactory.deleteKryo(kryo);
766
767 //
768 // Put the entry:
769 // - Key : FlowEntry ID (Long)
770 // - Value : Serialized FlowEntry (byte[])
771 //
772 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
773 }
774
775 /**
776 * Send a notification that a FlowEntry is removed.
777 *
778 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
779 */
780 @Override
781 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
782 //
783 // Remove the entry:
784 // - Key : FlowEntry ID (Long)
785 // - Value : Serialized FlowEntry (byte[])
786 //
787 mapFlowEntry.removeAsync(flowEntryId.value());
788 }
789
790 /**
791 * Send a notification that a FlowEntry is updated.
792 *
793 * @param flowEntry the FlowEntry that is updated.
794 */
795 @Override
796 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
797 // NOTE: Adding an entry with an existing key automatically updates it
798 notificationSendFlowEntryAdded(flowEntry);
799 }
800
801 /**
802 * Send a notification that all Flow Entries are removed.
803 */
804 @Override
805 public void notificationSendAllFlowEntriesRemoved() {
806 //
807 // Remove all entries
808 // NOTE: We remove the entries one-by-one so the per-entry
809 // notifications will be delivered.
810 //
811 // mapFlowEntry.clear();
812 Set<Long> keySet = mapFlowEntry.keySet();
813 for (Long key : keySet) {
814 mapFlowEntry.removeAsync(key);
815 }
816 }
817
818 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700819 * Get all Topology Elements that are currently in the datagrid.
820 *
821 * @return all Topology Elements that are currently in the datagrid.
822 */
823 @Override
824 public Collection<TopologyElement> getAllTopologyElements() {
825 Collection<TopologyElement> allTopologyElements =
826 new LinkedList<TopologyElement>();
827
828 //
829 // Get all current entries
830 //
831 Collection<byte[]> values = mapTopology.values();
832 Kryo kryo = kryoFactory.newKryo();
833 for (byte[] valueBytes : values) {
834 //
835 // Decode the value
836 //
837 Input input = new Input(valueBytes);
838 TopologyElement topologyElement =
839 kryo.readObject(input, TopologyElement.class);
840 allTopologyElements.add(topologyElement);
841 }
842 kryoFactory.deleteKryo(kryo);
843
844 return allTopologyElements;
845 }
846
847 /**
848 * Send a notification that a Topology Element is added.
849 *
850 * @param topologyElement the Topology Element that is added.
851 */
852 @Override
853 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
854 //
855 // Encode the value
856 //
857 byte[] buffer = new byte[MAX_BUFFER_SIZE];
858 Kryo kryo = kryoFactory.newKryo();
859 Output output = new Output(buffer, -1);
860 kryo.writeObject(output, topologyElement);
861 byte[] valueBytes = output.toBytes();
862 kryoFactory.deleteKryo(kryo);
863
864 //
865 // Put the entry:
866 // - Key : TopologyElement ID (String)
867 // - Value : Serialized TopologyElement (byte[])
868 //
869 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
870 }
871
872 /**
873 * Send a notification that a Topology Element is removed.
874 *
875 * @param topologyElement the Topology Element that is removed.
876 */
877 @Override
878 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
879 //
880 // Remove the entry:
881 // - Key : TopologyElement ID (String)
882 // - Value : Serialized TopologyElement (byte[])
883 //
884 mapTopology.removeAsync(topologyElement.elementId());
885 }
886
887 /**
888 * Send a notification that a Topology Element is updated.
889 *
890 * @param topologyElement the Topology Element that is updated.
891 */
892 @Override
893 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
894 // NOTE: Adding an entry with an existing key automatically updates it
895 notificationSendTopologyElementAdded(topologyElement);
896 }
897
898 /**
899 * Send a notification that all Topology Elements are removed.
900 */
901 @Override
902 public void notificationSendAllTopologyElementsRemoved() {
903 //
904 // Remove all entries
905 // NOTE: We remove the entries one-by-one so the per-entry
906 // notifications will be delivered.
907 //
908 // mapTopology.clear();
909 Set<String> keySet = mapTopology.keySet();
910 for (String key : keySet) {
911 mapTopology.removeAsync(key);
912 }
913 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800914
915 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800916 public void sendArpRequest(ArpMessage arpMessage) {
917 //log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
918 arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800919 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700920}