blob: d6c870dc1ac87f6a8b4a6f9ed2ed76e8b1ed93f0 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hartd3003252013-11-15 09:44:46 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080022import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070023import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070024import net.onrc.onos.ofcontroller.util.FlowEntry;
25import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070026import net.onrc.onos.ofcontroller.util.FlowId;
27import net.onrc.onos.ofcontroller.util.FlowPath;
28import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
29
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070030import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
Jonathan Hart18ad55c2013-11-11 22:49:55 -080033import com.esotericsoftware.kryo2.Kryo;
34import com.esotericsoftware.kryo2.io.Input;
35import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070036import com.hazelcast.config.Config;
37import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070038import com.hazelcast.core.EntryEvent;
39import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070040import com.hazelcast.core.Hazelcast;
41import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070042import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070043import com.hazelcast.instance.GroupProperties;
44
45/**
46 * A datagrid service that uses Hazelcast as a datagrid.
47 * The relevant data is stored in the Hazelcast datagrid and shared as
48 * appropriate in a multi-node cluster.
49 */
50public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070051 private final static int MAX_BUFFER_SIZE = 64*1024;
52
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070053 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070054 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070055 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070056
57 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070058 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070059 private Config hazelcastConfig = null;
60
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070061 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070062 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070063
64 // State related to the Flow map
65 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070066 private IMap<Long, byte[]> mapFlow = null;
67 private MapFlowListener mapFlowListener = null;
68 private String mapFlowListenerId = null;
69
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070070 // State related to the Flow Entry map
71 protected static final String mapFlowEntryName = "mapFlowEntry";
72 private IMap<Long, byte[]> mapFlowEntry = null;
73 private MapFlowEntryListener mapFlowEntryListener = null;
74 private String mapFlowEntryListenerId = null;
75
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -080076 // State related to the Flow ID map
77 protected static final String mapFlowIdName = "mapFlowId";
78 private IMap<Long, byte[]> mapFlowId = null;
79 private MapFlowIdListener mapFlowIdListener = null;
80 private String mapFlowIdListenerId = null;
81
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070082 // State related to the Network Topology map
83 protected static final String mapTopologyName = "mapTopology";
84 private IMap<String, byte[]> mapTopology = null;
85 private MapTopologyListener mapTopologyListener = null;
86 private String mapTopologyListenerId = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080087
88 // State related to the ARP map
89 protected static final String arpMapName = "arpMap";
Jonathan Hartd3003252013-11-15 09:44:46 -080090 private IMap<ArpMessage, byte[]> arpMap = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080091 private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
92 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070093
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070094 /**
95 * Class for receiving notifications for Flow state.
96 *
97 * The datagrid map is:
98 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -080099 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700100 */
101 class MapFlowListener implements EntryListener<Long, byte[]> {
102 /**
103 * Receive a notification that an entry is added.
104 *
105 * @param event the notification event for the entry.
106 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800107 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700108 byte[] valueBytes = (byte[])event.getValue();
109
110 //
111 // Decode the value and deliver the notification
112 //
113 Kryo kryo = kryoFactory.newKryo();
114 Input input = new Input(valueBytes);
115 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
116 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700117 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700118 }
119
120 /**
121 * Receive a notification that an entry is removed.
122 *
123 * @param event the notification event for the entry.
124 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800125 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700126 byte[] valueBytes = (byte[])event.getValue();
127
128 //
129 // Decode the value and deliver the notification
130 //
131 Kryo kryo = kryoFactory.newKryo();
132 Input input = new Input(valueBytes);
133 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
134 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700135 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700136 }
137
138 /**
139 * Receive a notification that an entry is updated.
140 *
141 * @param event the notification event for the entry.
142 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800143 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700144 byte[] valueBytes = (byte[])event.getValue();
145
146 //
147 // Decode the value and deliver the notification
148 //
149 Kryo kryo = kryoFactory.newKryo();
150 Input input = new Input(valueBytes);
151 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
152 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700153 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700154 }
155
156 /**
157 * Receive a notification that an entry is evicted.
158 *
159 * @param event the notification event for the entry.
160 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800161 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700162 // NOTE: We don't use eviction for this map
163 }
164 }
165
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700166 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700167 * Class for receiving notifications for FlowEntry state.
168 *
169 * The datagrid map is:
170 * - Key : FlowEntry ID (Long)
171 * - Value : Serialized FlowEntry (byte[])
172 */
173 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
174 /**
175 * Receive a notification that an entry is added.
176 *
177 * @param event the notification event for the entry.
178 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800179 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700180 byte[] valueBytes = (byte[])event.getValue();
181
182 //
183 // Decode the value and deliver the notification
184 //
185 Kryo kryo = kryoFactory.newKryo();
186 Input input = new Input(valueBytes);
187 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
188 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700189 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700190 }
191
192 /**
193 * Receive a notification that an entry is removed.
194 *
195 * @param event the notification event for the entry.
196 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800197 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700198 byte[] valueBytes = (byte[])event.getValue();
199
200 //
201 // Decode the value and deliver the notification
202 //
203 Kryo kryo = kryoFactory.newKryo();
204 Input input = new Input(valueBytes);
205 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
206 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700207 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700208 }
209
210 /**
211 * Receive a notification that an entry is updated.
212 *
213 * @param event the notification event for the entry.
214 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800215 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700216 byte[] valueBytes = (byte[])event.getValue();
217
218 //
219 // Decode the value and deliver the notification
220 //
221 Kryo kryo = kryoFactory.newKryo();
222 Input input = new Input(valueBytes);
223 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
224 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700225 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700226 }
227
228 /**
229 * Receive a notification that an entry is evicted.
230 *
231 * @param event the notification event for the entry.
232 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800233 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700234 // NOTE: We don't use eviction for this map
235 }
236 }
237
238 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800239 * Class for receiving notifications for FlowId state.
240 *
241 * The datagrid map is:
242 * - Key : FlowId (Long)
243 * - Value : Serialized FlowId (byte[])
244 */
245 class MapFlowIdListener implements EntryListener<Long, byte[]> {
246 /**
247 * Receive a notification that an entry is added.
248 *
249 * @param event the notification event for the entry.
250 */
251 public void entryAdded(EntryEvent<Long, byte[]> event) {
252 byte[] valueBytes = (byte[])event.getValue();
253
254 //
255 // Decode the value and deliver the notification
256 //
257 Kryo kryo = kryoFactory.newKryo();
258 Input input = new Input(valueBytes);
259 FlowId flowId = kryo.readObject(input, FlowId.class);
260 kryoFactory.deleteKryo(kryo);
261 flowEventHandlerService.notificationRecvFlowIdAdded(flowId);
262 }
263
264 /**
265 * Receive a notification that an entry is removed.
266 *
267 * @param event the notification event for the entry.
268 */
269 public void entryRemoved(EntryEvent<Long, byte[]> event) {
270 byte[] valueBytes = (byte[])event.getValue();
271
272 //
273 // Decode the value and deliver the notification
274 //
275 Kryo kryo = kryoFactory.newKryo();
276 Input input = new Input(valueBytes);
277 FlowId flowId = kryo.readObject(input, FlowId.class);
278 kryoFactory.deleteKryo(kryo);
279 flowEventHandlerService.notificationRecvFlowIdRemoved(flowId);
280 }
281
282 /**
283 * Receive a notification that an entry is updated.
284 *
285 * @param event the notification event for the entry.
286 */
287 public void entryUpdated(EntryEvent<Long, byte[]> event) {
288 byte[] valueBytes = (byte[])event.getValue();
289
290 //
291 // Decode the value and deliver the notification
292 //
293 Kryo kryo = kryoFactory.newKryo();
294 Input input = new Input(valueBytes);
295 FlowId flowId = kryo.readObject(input, FlowId.class);
296 kryoFactory.deleteKryo(kryo);
297 flowEventHandlerService.notificationRecvFlowIdUpdated(flowId);
298 }
299
300 /**
301 * Receive a notification that an entry is evicted.
302 *
303 * @param event the notification event for the entry.
304 */
305 public void entryEvicted(EntryEvent<Long, byte[]> event) {
306 // NOTE: We don't use eviction for this map
307 }
308 }
309
310 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700311 * Class for receiving notifications for Network Topology state.
312 *
313 * The datagrid map is:
314 * - Key: TopologyElement ID (String)
315 * - Value: Serialized TopologyElement (byte[])
316 */
317 class MapTopologyListener implements EntryListener<String, byte[]> {
318 /**
319 * Receive a notification that an entry is added.
320 *
321 * @param event the notification event for the entry.
322 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800323 public void entryAdded(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700324 byte[] valueBytes = (byte[])event.getValue();
325
326 //
327 // Decode the value and deliver the notification
328 //
329 Kryo kryo = kryoFactory.newKryo();
330 Input input = new Input(valueBytes);
331 TopologyElement topologyElement =
332 kryo.readObject(input, TopologyElement.class);
333 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700334 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700335 }
336
337 /**
338 * Receive a notification that an entry is removed.
339 *
340 * @param event the notification event for the entry.
341 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800342 public void entryRemoved(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700343 byte[] valueBytes = (byte[])event.getValue();
344
345 //
346 // Decode the value and deliver the notification
347 //
348 Kryo kryo = kryoFactory.newKryo();
349 Input input = new Input(valueBytes);
350 TopologyElement topologyElement =
351 kryo.readObject(input, TopologyElement.class);
352 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700353 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700354 }
355
356 /**
357 * Receive a notification that an entry is updated.
358 *
359 * @param event the notification event for the entry.
360 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800361 public void entryUpdated(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700362 byte[] valueBytes = (byte[])event.getValue();
363
364 //
365 // Decode the value and deliver the notification
366 //
367 Kryo kryo = kryoFactory.newKryo();
368 Input input = new Input(valueBytes);
369 TopologyElement topologyElement =
370 kryo.readObject(input, TopologyElement.class);
371 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700372 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700373 }
374
375 /**
376 * Receive a notification that an entry is evicted.
377 *
378 * @param event the notification event for the entry.
379 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800380 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700381 // NOTE: We don't use eviction for this map
382 }
383 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800384
385 /**
386 * Class for receiving notifications for ARP requests.
387 *
388 * The datagrid map is:
389 * - Key: Request ID (String)
390 * - Value: ARP request packet (byte[])
391 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800392 class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800393 /**
394 * Receive a notification that an entry is added.
395 *
396 * @param event the notification event for the entry.
397 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800398 public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800399 for (IArpEventHandler arpEventHandler : arpEventHandlers) {
400 arpEventHandler.arpRequestNotification(event.getKey());
401 }
402
403 //
404 // Decode the value and deliver the notification
405 //
406 /*
407 Kryo kryo = kryoFactory.newKryo();
408 Input input = new Input(valueBytes);
409 TopologyElement topologyElement =
410 kryo.readObject(input, TopologyElement.class);
411 kryoFactory.deleteKryo(kryo);
412 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
413 */
414 }
415
416 /**
417 * Receive a notification that an entry is removed.
418 *
419 * @param event the notification event for the entry.
420 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800421 public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800422 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800423 }
424
425 /**
426 * Receive a notification that an entry is updated.
427 *
428 * @param event the notification event for the entry.
429 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800430 public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800431 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800432 }
433
434 /**
435 * Receive a notification that an entry is evicted.
436 *
437 * @param event the notification event for the entry.
438 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800439 public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800440 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800441 }
442 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700443
444 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700445 * Initialize the Hazelcast Datagrid operation.
446 *
447 * @param conf the configuration filename.
448 */
449 public void init(String configFilename) {
450 /*
451 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
452 System.setProperty("hazelcast.socket.send.buffer.size", "32");
453 */
454 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
455
456 // Init from configuration file
457 try {
458 hazelcastConfig = new FileSystemXmlConfig(configFilename);
459 } catch (FileNotFoundException e) {
460 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
461 }
462 /*
463 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
464 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
465 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
466 */
467 //
468 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
469 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
470 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
471 }
472
473 /**
474 * Shutdown the Hazelcast Datagrid operation.
475 */
476 public void finalize() {
477 close();
478 }
479
480 /**
481 * Shutdown the Hazelcast Datagrid operation.
482 */
483 public void close() {
484 Hazelcast.shutdownAll();
485 }
486
487 /**
488 * Get the collection of offered module services.
489 *
490 * @return the collection of offered module services.
491 */
492 @Override
493 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
494 Collection<Class<? extends IFloodlightService>> l =
495 new ArrayList<Class<? extends IFloodlightService>>();
496 l.add(IDatagridService.class);
497 return l;
498 }
499
500 /**
501 * Get the collection of implemented services.
502 *
503 * @return the collection of implemented services.
504 */
505 @Override
506 public Map<Class<? extends IFloodlightService>, IFloodlightService>
507 getServiceImpls() {
508 Map<Class<? extends IFloodlightService>,
Pavlin Radoslavov27da7532013-10-18 18:41:50 -0700509 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700510 new HashMap<Class<? extends IFloodlightService>,
511 IFloodlightService>();
512 m.put(IDatagridService.class, this);
513 return m;
514 }
515
516 /**
517 * Get the collection of modules this module depends on.
518 *
519 * @return the collection of modules this module depends on.
520 */
521 @Override
522 public Collection<Class<? extends IFloodlightService>>
523 getModuleDependencies() {
524 Collection<Class<? extends IFloodlightService>> l =
525 new ArrayList<Class<? extends IFloodlightService>>();
526 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700527 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700528 return l;
529 }
530
531 /**
532 * Initialize the module.
533 *
534 * @param context the module context to use for the initialization.
535 */
536 @Override
537 public void init(FloodlightModuleContext context)
538 throws FloodlightModuleException {
539 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700540 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700541
542 // Get the configuration file name and configure the Datagrid
543 Map<String, String> configMap = context.getConfigParams(this);
544 String configFilename = configMap.get(HazelcastConfigFile);
545 this.init(configFilename);
546 }
547
548 /**
549 * Startup module operation.
550 *
551 * @param context the module context to use for the startup.
552 */
553 @Override
554 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700555 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700556
557 restApi.addRestletRoutable(new DatagridWebRoutable());
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800558
559 arpMap = hazelcastInstance.getMap(arpMapName);
560 arpMap.addEntryListener(new ArpMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700561 }
562
563 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700564 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700565 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700566 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700567 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700568 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700569 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700570 */
571 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700572 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
573 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700574
575 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700576 mapFlowListener = new MapFlowListener();
577 mapFlow = hazelcastInstance.getMap(mapFlowName);
578 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700579
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700580 // Initialize the FlowEntry-related map state
581 mapFlowEntryListener = new MapFlowEntryListener();
582 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
583 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
584
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800585 // Initialize the FlowId-related map state
586 mapFlowIdListener = new MapFlowIdListener();
587 mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
588 mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
589
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700590 // Initialize the Topology-related map state
591 mapTopologyListener = new MapTopologyListener();
592 mapTopology = hazelcastInstance.getMap(mapTopologyName);
593 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700594 }
595
596 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700597 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700598 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700599 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700600 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700601 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700602 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700603 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700604 */
605 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700606 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700607 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700608 mapFlow.removeEntryListener(mapFlowListenerId);
609 mapFlow = null;
610 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700611
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700612 // Clear the FlowEntry-related map state
613 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
614 mapFlowEntry = null;
615 mapFlowEntryListener = null;
616
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800617 // Clear the FlowId-related map state
618 mapFlowId.removeEntryListener(mapFlowIdListenerId);
619 mapFlowId = null;
620 mapFlowIdListener = null;
621
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700622 // Clear the Topology-related map state
623 mapTopology.removeEntryListener(mapTopologyListenerId);
624 mapTopology = null;
625 mapTopologyListener = null;
626
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700627 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700628 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800629
630 @Override
631 public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
632 if (arpEventHandler != null) {
633 arpEventHandlers.add(arpEventHandler);
634 }
635 }
636
637 @Override
638 public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
639 arpEventHandlers.remove(arpEventHandler);
640 }
641
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700642 /**
643 * Get all Flows that are currently in the datagrid.
644 *
645 * @return all Flows that are currently in the datagrid.
646 */
647 @Override
648 public Collection<FlowPath> getAllFlows() {
649 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
650
651 //
652 // Get all current entries
653 //
654 Collection<byte[]> values = mapFlow.values();
655 Kryo kryo = kryoFactory.newKryo();
656 for (byte[] valueBytes : values) {
657 //
658 // Decode the value
659 //
660 Input input = new Input(valueBytes);
661 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
662 allFlows.add(flowPath);
663 }
664 kryoFactory.deleteKryo(kryo);
665
666 return allFlows;
667 }
668
669 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800670 * Get a Flow for a given Flow ID.
671 *
672 * @param flowId the Flow ID of the Flow to get.
673 * @return the Flow if found, otherwise null.
674 */
675 @Override
676 public FlowPath getFlow(FlowId flowId) {
677 byte[] valueBytes = mapFlow.get(flowId.value());
678 if (valueBytes == null)
679 return null;
680
681 Kryo kryo = kryoFactory.newKryo();
682 //
683 // Decode the value
684 //
685 Input input = new Input(valueBytes);
686 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
687 kryoFactory.deleteKryo(kryo);
688
689 return flowPath;
690 }
691
692 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700693 * Send a notification that a Flow is added.
694 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700695 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700696 */
697 @Override
698 public void notificationSendFlowAdded(FlowPath flowPath) {
699 //
700 // Encode the value
701 //
702 byte[] buffer = new byte[MAX_BUFFER_SIZE];
703 Kryo kryo = kryoFactory.newKryo();
704 Output output = new Output(buffer, -1);
705 kryo.writeObject(output, flowPath);
706 byte[] valueBytes = output.toBytes();
707 kryoFactory.deleteKryo(kryo);
708
709 //
710 // Put the entry:
711 // - Key : Flow ID (Long)
712 // - Value : Serialized Flow (byte[])
713 //
714 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
715 }
716
717 /**
718 * Send a notification that a Flow is removed.
719 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700720 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700721 */
722 @Override
723 public void notificationSendFlowRemoved(FlowId flowId) {
724 //
725 // Remove the entry:
726 // - Key : Flow ID (Long)
727 // - Value : Serialized Flow (byte[])
728 //
729 mapFlow.removeAsync(flowId.value());
730 }
731
732 /**
733 * Send a notification that a Flow is updated.
734 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700735 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700736 */
737 @Override
738 public void notificationSendFlowUpdated(FlowPath flowPath) {
739 // NOTE: Adding an entry with an existing key automatically updates it
740 notificationSendFlowAdded(flowPath);
741 }
742
743 /**
744 * Send a notification that all Flows are removed.
745 */
746 @Override
747 public void notificationSendAllFlowsRemoved() {
748 //
749 // Remove all entries
750 // NOTE: We remove the entries one-by-one so the per-entry
751 // notifications will be delivered.
752 //
753 // mapFlow.clear();
754 Set<Long> keySet = mapFlow.keySet();
755 for (Long key : keySet) {
756 mapFlow.removeAsync(key);
757 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700758 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700759
760 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700761 * Get all Flow Entries that are currently in the datagrid.
762 *
763 * @return all Flow Entries that are currently in the datagrid.
764 */
765 @Override
766 public Collection<FlowEntry> getAllFlowEntries() {
767 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
768
769 //
770 // Get all current entries
771 //
772 Collection<byte[]> values = mapFlowEntry.values();
773 Kryo kryo = kryoFactory.newKryo();
774 for (byte[] valueBytes : values) {
775 //
776 // Decode the value
777 //
778 Input input = new Input(valueBytes);
779 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
780 allFlowEntries.add(flowEntry);
781 }
782 kryoFactory.deleteKryo(kryo);
783
784 return allFlowEntries;
785 }
786
787 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800788 * Get a Flow Entry for a given Flow Entry ID.
789 *
790 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
791 * @return the Flow Entry if found, otherwise null.
792 */
793 @Override
794 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
795 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
796 if (valueBytes == null)
797 return null;
798
799 Kryo kryo = kryoFactory.newKryo();
800 //
801 // Decode the value
802 //
803 Input input = new Input(valueBytes);
804 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
805 kryoFactory.deleteKryo(kryo);
806
807 return flowEntry;
808 }
809
810 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700811 * Send a notification that a FlowEntry is added.
812 *
813 * @param flowEntry the FlowEntry that is added.
814 */
815 @Override
816 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
817 //
818 // Encode the value
819 //
820 byte[] buffer = new byte[MAX_BUFFER_SIZE];
821 Kryo kryo = kryoFactory.newKryo();
822 Output output = new Output(buffer, -1);
823 kryo.writeObject(output, flowEntry);
824 byte[] valueBytes = output.toBytes();
825 kryoFactory.deleteKryo(kryo);
826
827 //
828 // Put the entry:
829 // - Key : FlowEntry ID (Long)
830 // - Value : Serialized FlowEntry (byte[])
831 //
832 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
833 }
834
835 /**
836 * Send a notification that a FlowEntry is removed.
837 *
838 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
839 */
840 @Override
841 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
842 //
843 // Remove the entry:
844 // - Key : FlowEntry ID (Long)
845 // - Value : Serialized FlowEntry (byte[])
846 //
847 mapFlowEntry.removeAsync(flowEntryId.value());
848 }
849
850 /**
851 * Send a notification that a FlowEntry is updated.
852 *
853 * @param flowEntry the FlowEntry that is updated.
854 */
855 @Override
856 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
857 // NOTE: Adding an entry with an existing key automatically updates it
858 notificationSendFlowEntryAdded(flowEntry);
859 }
860
861 /**
862 * Send a notification that all Flow Entries are removed.
863 */
864 @Override
865 public void notificationSendAllFlowEntriesRemoved() {
866 //
867 // Remove all entries
868 // NOTE: We remove the entries one-by-one so the per-entry
869 // notifications will be delivered.
870 //
871 // mapFlowEntry.clear();
872 Set<Long> keySet = mapFlowEntry.keySet();
873 for (Long key : keySet) {
874 mapFlowEntry.removeAsync(key);
875 }
876 }
877
878 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800879 * Get all Flow IDs that are currently in the datagrid.
880 *
881 * @return all Flow IDs that are currently in the datagrid.
882 */
883 @Override
884 public Collection<FlowId> getAllFlowIds() {
885 Collection<FlowId> allFlowIds = new LinkedList<FlowId>();
886
887 //
888 // Get all current entries
889 //
890 Collection<byte[]> values = mapFlowId.values();
891 Kryo kryo = kryoFactory.newKryo();
892 for (byte[] valueBytes : values) {
893 //
894 // Decode the value
895 //
896 Input input = new Input(valueBytes);
897 FlowId flowId = kryo.readObject(input, FlowId.class);
898 allFlowIds.add(flowId);
899 }
900 kryoFactory.deleteKryo(kryo);
901
902 return allFlowIds;
903 }
904
905 /**
906 * Send a notification that a FlowId is added.
907 *
908 * @param flowId the FlowId that is added.
909 */
910 @Override
911 public void notificationSendFlowIdAdded(FlowId flowId) {
912 //
913 // Encode the value
914 //
915 byte[] buffer = new byte[MAX_BUFFER_SIZE];
916 Kryo kryo = kryoFactory.newKryo();
917 Output output = new Output(buffer, -1);
918 kryo.writeObject(output, flowId);
919 byte[] valueBytes = output.toBytes();
920 kryoFactory.deleteKryo(kryo);
921
922 //
923 // Put the entry:
924 // - Key : FlowId (Long)
925 // - Value : Serialized FlowId (byte[])
926 //
927 mapFlowId.putAsync(flowId.value(), valueBytes);
928 }
929
930 /**
931 * Send a notification that a FlowId is removed.
932 *
933 * @param flowId the FlowId that is removed.
934 */
935 @Override
936 public void notificationSendFlowIdRemoved(FlowId flowId) {
937 //
938 // Remove the entry:
939 // - Key : FlowId (Long)
940 // - Value : Serialized FlowId (byte[])
941 //
942 mapFlowId.removeAsync(flowId.value());
943 }
944
945 /**
946 * Send a notification that a FlowId is updated.
947 *
948 * @param flowId the FlowId that is updated.
949 */
950 @Override
951 public void notificationSendFlowIdUpdated(FlowId flowId) {
952 // NOTE: Adding an entry with an existing key automatically updates it
953 notificationSendFlowIdAdded(flowId);
954 }
955
956 /**
957 * Send a notification that all Flow IDs are removed.
958 */
959 @Override
960 public void notificationSendAllFlowIdsRemoved() {
961 //
962 // Remove all entries
963 // NOTE: We remove the entries one-by-one so the per-entry
964 // notifications will be delivered.
965 //
966 // mapFlowId.clear();
967 Set<Long> keySet = mapFlowId.keySet();
968 for (Long key : keySet) {
969 mapFlowId.removeAsync(key);
970 }
971 }
972
973 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700974 * Get all Topology Elements that are currently in the datagrid.
975 *
976 * @return all Topology Elements that are currently in the datagrid.
977 */
978 @Override
979 public Collection<TopologyElement> getAllTopologyElements() {
980 Collection<TopologyElement> allTopologyElements =
981 new LinkedList<TopologyElement>();
982
983 //
984 // Get all current entries
985 //
986 Collection<byte[]> values = mapTopology.values();
987 Kryo kryo = kryoFactory.newKryo();
988 for (byte[] valueBytes : values) {
989 //
990 // Decode the value
991 //
992 Input input = new Input(valueBytes);
993 TopologyElement topologyElement =
994 kryo.readObject(input, TopologyElement.class);
995 allTopologyElements.add(topologyElement);
996 }
997 kryoFactory.deleteKryo(kryo);
998
999 return allTopologyElements;
1000 }
1001
1002 /**
1003 * Send a notification that a Topology Element is added.
1004 *
1005 * @param topologyElement the Topology Element that is added.
1006 */
1007 @Override
1008 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
1009 //
1010 // Encode the value
1011 //
1012 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1013 Kryo kryo = kryoFactory.newKryo();
1014 Output output = new Output(buffer, -1);
1015 kryo.writeObject(output, topologyElement);
1016 byte[] valueBytes = output.toBytes();
1017 kryoFactory.deleteKryo(kryo);
1018
1019 //
1020 // Put the entry:
1021 // - Key : TopologyElement ID (String)
1022 // - Value : Serialized TopologyElement (byte[])
1023 //
1024 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
1025 }
1026
1027 /**
1028 * Send a notification that a Topology Element is removed.
1029 *
1030 * @param topologyElement the Topology Element that is removed.
1031 */
1032 @Override
1033 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
1034 //
1035 // Remove the entry:
1036 // - Key : TopologyElement ID (String)
1037 // - Value : Serialized TopologyElement (byte[])
1038 //
1039 mapTopology.removeAsync(topologyElement.elementId());
1040 }
1041
1042 /**
1043 * Send a notification that a Topology Element is updated.
1044 *
1045 * @param topologyElement the Topology Element that is updated.
1046 */
1047 @Override
1048 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
1049 // NOTE: Adding an entry with an existing key automatically updates it
1050 notificationSendTopologyElementAdded(topologyElement);
1051 }
1052
1053 /**
1054 * Send a notification that all Topology Elements are removed.
1055 */
1056 @Override
1057 public void notificationSendAllTopologyElementsRemoved() {
1058 //
1059 // Remove all entries
1060 // NOTE: We remove the entries one-by-one so the per-entry
1061 // notifications will be delivered.
1062 //
1063 // mapTopology.clear();
1064 Set<String> keySet = mapTopology.keySet();
1065 for (String key : keySet) {
1066 mapTopology.removeAsync(key);
1067 }
1068 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001069
1070 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -08001071 public void sendArpRequest(ArpMessage arpMessage) {
1072 //log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
1073 arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001074 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001075}