blob: 4a48acf064e19aee7b51a3a174d4e8e606efb4cf [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hart7804bea2014-01-07 10:50:52 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
22import net.onrc.onos.ofcontroller.proxyarp.IArpReplyEventHandler;
23import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
24import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070025import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070026import net.onrc.onos.ofcontroller.util.FlowEntry;
27import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070028import net.onrc.onos.ofcontroller.util.FlowId;
29import net.onrc.onos.ofcontroller.util.FlowPath;
30import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
31
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070032import org.slf4j.Logger;
33import org.slf4j.LoggerFactory;
34
Jonathan Hart18ad55c2013-11-11 22:49:55 -080035import com.esotericsoftware.kryo2.Kryo;
36import com.esotericsoftware.kryo2.io.Input;
37import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070038import com.hazelcast.config.Config;
39import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070040import com.hazelcast.core.EntryEvent;
41import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070042import com.hazelcast.core.Hazelcast;
43import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070044import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070045import com.hazelcast.instance.GroupProperties;
46
47/**
48 * A datagrid service that uses Hazelcast as a datagrid.
49 * The relevant data is stored in the Hazelcast datagrid and shared as
50 * appropriate in a multi-node cluster.
51 */
52public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070053 private final static int MAX_BUFFER_SIZE = 64*1024;
54
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070055 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070056 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070057 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070058
59 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070060 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070061 private Config hazelcastConfig = null;
62
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070063 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070064 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070065
66 // State related to the Flow map
67 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070068 private IMap<Long, byte[]> mapFlow = null;
69 private MapFlowListener mapFlowListener = null;
70 private String mapFlowListenerId = null;
71
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070072 // State related to the Flow Entry map
73 protected static final String mapFlowEntryName = "mapFlowEntry";
74 private IMap<Long, byte[]> mapFlowEntry = null;
75 private MapFlowEntryListener mapFlowEntryListener = null;
76 private String mapFlowEntryListenerId = null;
77
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070078 // State related to the Network Topology map
79 protected static final String mapTopologyName = "mapTopology";
80 private IMap<String, byte[]> mapTopology = null;
81 private MapTopologyListener mapTopologyListener = null;
82 private String mapTopologyListenerId = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080083
Jonathan Hart7804bea2014-01-07 10:50:52 -080084 // State related to the packet out map
85 protected static final String packetOutMapName = "packetOutMap";
86 private IMap<PacketOutNotification, byte[]> packetOutMap = null;
87 private List<IPacketOutEventHandler> packetOutEventHandlers = new ArrayList<IPacketOutEventHandler>();
Jonathan Hart18ad55c2013-11-11 22:49:55 -080088 private final byte[] dummyByte = {0};
Jonathan Hart7804bea2014-01-07 10:50:52 -080089
90 // State related to the ARP reply map
91 protected static final String arpReplyMapName = "arpReplyMap";
92 private IMap<ArpReplyNotification, byte[]> arpReplyMap = null;
93 private List<IArpReplyEventHandler> arpReplyEventHandlers = new ArrayList<IArpReplyEventHandler>();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070094
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070095 /**
96 * Class for receiving notifications for Flow state.
97 *
98 * The datagrid map is:
99 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -0800100 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700101 */
102 class MapFlowListener implements EntryListener<Long, byte[]> {
103 /**
104 * Receive a notification that an entry is added.
105 *
106 * @param event the notification event for the entry.
107 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800108 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700109 byte[] valueBytes = (byte[])event.getValue();
110
111 //
112 // Decode the value and deliver the notification
113 //
114 Kryo kryo = kryoFactory.newKryo();
115 Input input = new Input(valueBytes);
116 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
117 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700118 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700119 }
120
121 /**
122 * Receive a notification that an entry is removed.
123 *
124 * @param event the notification event for the entry.
125 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800126 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700127 byte[] valueBytes = (byte[])event.getValue();
128
129 //
130 // Decode the value and deliver the notification
131 //
132 Kryo kryo = kryoFactory.newKryo();
133 Input input = new Input(valueBytes);
134 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
135 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700136 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700137 }
138
139 /**
140 * Receive a notification that an entry is updated.
141 *
142 * @param event the notification event for the entry.
143 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800144 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700145 byte[] valueBytes = (byte[])event.getValue();
146
147 //
148 // Decode the value and deliver the notification
149 //
150 Kryo kryo = kryoFactory.newKryo();
151 Input input = new Input(valueBytes);
152 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
153 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700154 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700155 }
156
157 /**
158 * Receive a notification that an entry is evicted.
159 *
160 * @param event the notification event for the entry.
161 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800162 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700163 // NOTE: We don't use eviction for this map
164 }
165 }
166
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700167 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700168 * Class for receiving notifications for FlowEntry state.
169 *
170 * The datagrid map is:
171 * - Key : FlowEntry ID (Long)
172 * - Value : Serialized FlowEntry (byte[])
173 */
174 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
175 /**
176 * Receive a notification that an entry is added.
177 *
178 * @param event the notification event for the entry.
179 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800180 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700181 byte[] valueBytes = (byte[])event.getValue();
182
183 //
184 // Decode the value and deliver the notification
185 //
186 Kryo kryo = kryoFactory.newKryo();
187 Input input = new Input(valueBytes);
188 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
189 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700190 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700191 }
192
193 /**
194 * Receive a notification that an entry is removed.
195 *
196 * @param event the notification event for the entry.
197 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800198 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700199 byte[] valueBytes = (byte[])event.getValue();
200
201 //
202 // Decode the value and deliver the notification
203 //
204 Kryo kryo = kryoFactory.newKryo();
205 Input input = new Input(valueBytes);
206 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
207 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700208 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700209 }
210
211 /**
212 * Receive a notification that an entry is updated.
213 *
214 * @param event the notification event for the entry.
215 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800216 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700217 byte[] valueBytes = (byte[])event.getValue();
218
219 //
220 // Decode the value and deliver the notification
221 //
222 Kryo kryo = kryoFactory.newKryo();
223 Input input = new Input(valueBytes);
224 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
225 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700226 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700227 }
228
229 /**
230 * Receive a notification that an entry is evicted.
231 *
232 * @param event the notification event for the entry.
233 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800234 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700235 // NOTE: We don't use eviction for this map
236 }
237 }
238
239 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700240 * Class for receiving notifications for Network Topology state.
241 *
242 * The datagrid map is:
243 * - Key: TopologyElement ID (String)
244 * - Value: Serialized TopologyElement (byte[])
245 */
246 class MapTopologyListener implements EntryListener<String, byte[]> {
247 /**
248 * Receive a notification that an entry is added.
249 *
250 * @param event the notification event for the entry.
251 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800252 public void entryAdded(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700253 byte[] valueBytes = (byte[])event.getValue();
254
255 //
256 // Decode the value and deliver the notification
257 //
258 Kryo kryo = kryoFactory.newKryo();
259 Input input = new Input(valueBytes);
260 TopologyElement topologyElement =
261 kryo.readObject(input, TopologyElement.class);
262 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700263 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700264 }
265
266 /**
267 * Receive a notification that an entry is removed.
268 *
269 * @param event the notification event for the entry.
270 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800271 public void entryRemoved(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700272 byte[] valueBytes = (byte[])event.getValue();
273
274 //
275 // Decode the value and deliver the notification
276 //
277 Kryo kryo = kryoFactory.newKryo();
278 Input input = new Input(valueBytes);
279 TopologyElement topologyElement =
280 kryo.readObject(input, TopologyElement.class);
281 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700282 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700283 }
284
285 /**
286 * Receive a notification that an entry is updated.
287 *
288 * @param event the notification event for the entry.
289 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800290 public void entryUpdated(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700291 byte[] valueBytes = (byte[])event.getValue();
292
293 //
294 // Decode the value and deliver the notification
295 //
296 Kryo kryo = kryoFactory.newKryo();
297 Input input = new Input(valueBytes);
298 TopologyElement topologyElement =
299 kryo.readObject(input, TopologyElement.class);
300 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700301 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700302 }
303
304 /**
305 * Receive a notification that an entry is evicted.
306 *
307 * @param event the notification event for the entry.
308 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800309 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700310 // NOTE: We don't use eviction for this map
311 }
312 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800313
314 /**
Jonathan Hart7804bea2014-01-07 10:50:52 -0800315 * Class for receiving notifications for sending packet-outs.
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800316 *
317 * The datagrid map is:
Jonathan Hart7804bea2014-01-07 10:50:52 -0800318 * - Key: Packet-out to send (PacketOutNotification)
319 * - Value: dummy value (we only need the key) (byte[])
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800320 */
Jonathan Hart7804bea2014-01-07 10:50:52 -0800321 class PacketOutMapListener implements EntryListener<PacketOutNotification, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800322 /**
323 * Receive a notification that an entry is added.
324 *
325 * @param event the notification event for the entry.
326 */
Jonathan Hart7804bea2014-01-07 10:50:52 -0800327 public void entryAdded(EntryEvent<PacketOutNotification, byte[]> event) {
328 for (IPacketOutEventHandler packetOutEventHandler : packetOutEventHandlers) {
329 packetOutEventHandler.packetOutNotification(event.getKey());
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800330 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800331 }
332
333 /**
334 * Receive a notification that an entry is removed.
335 *
336 * @param event the notification event for the entry.
337 */
Jonathan Hart7804bea2014-01-07 10:50:52 -0800338 public void entryRemoved(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800339 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800340 }
341
342 /**
343 * Receive a notification that an entry is updated.
344 *
345 * @param event the notification event for the entry.
346 */
Jonathan Hart7804bea2014-01-07 10:50:52 -0800347 public void entryUpdated(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800348 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800349 }
350
351 /**
352 * Receive a notification that an entry is evicted.
353 *
354 * @param event the notification event for the entry.
355 */
Jonathan Hart7804bea2014-01-07 10:50:52 -0800356 public void entryEvicted(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800357 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800358 }
359 }
Jonathan Hart7804bea2014-01-07 10:50:52 -0800360
361 /**
362 * Class for receiving notifications for sending packet-outs.
363 *
364 * The datagrid map is:
365 * - Key: Packet-out to send (PacketOutNotification)
366 * - Value: dummy value (we only need the key) (byte[])
367 */
368 class ArpReplyMapListener implements EntryListener<ArpReplyNotification, byte[]> {
369 /**
370 * Receive a notification that an entry is added.
371 *
372 * @param event the notification event for the entry.
373 */
374 public void entryAdded(EntryEvent<ArpReplyNotification, byte[]> event) {
375 for (IArpReplyEventHandler arpReplyEventHandler : arpReplyEventHandlers) {
376 arpReplyEventHandler.arpReplyEvent(event.getKey());
377 }
378 }
379
380 // These methods aren't used for ARP replies
381 public void entryRemoved(EntryEvent<ArpReplyNotification, byte[]> event) {}
382 public void entryUpdated(EntryEvent<ArpReplyNotification, byte[]> event) {}
383 public void entryEvicted(EntryEvent<ArpReplyNotification, byte[]> event) {}
384 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700385
386 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700387 * Initialize the Hazelcast Datagrid operation.
388 *
389 * @param conf the configuration filename.
390 */
391 public void init(String configFilename) {
392 /*
393 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
394 System.setProperty("hazelcast.socket.send.buffer.size", "32");
395 */
396 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
397
398 // Init from configuration file
399 try {
400 hazelcastConfig = new FileSystemXmlConfig(configFilename);
401 } catch (FileNotFoundException e) {
402 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
403 }
404 /*
405 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
406 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
407 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
408 */
409 //
410 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
411 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
412 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
413 }
414
415 /**
416 * Shutdown the Hazelcast Datagrid operation.
417 */
418 public void finalize() {
419 close();
420 }
421
422 /**
423 * Shutdown the Hazelcast Datagrid operation.
424 */
425 public void close() {
426 Hazelcast.shutdownAll();
427 }
428
429 /**
430 * Get the collection of offered module services.
431 *
432 * @return the collection of offered module services.
433 */
434 @Override
435 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
436 Collection<Class<? extends IFloodlightService>> l =
437 new ArrayList<Class<? extends IFloodlightService>>();
438 l.add(IDatagridService.class);
439 return l;
440 }
441
442 /**
443 * Get the collection of implemented services.
444 *
445 * @return the collection of implemented services.
446 */
447 @Override
448 public Map<Class<? extends IFloodlightService>, IFloodlightService>
449 getServiceImpls() {
450 Map<Class<? extends IFloodlightService>,
Pavlin Radoslavov27da7532013-10-18 18:41:50 -0700451 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700452 new HashMap<Class<? extends IFloodlightService>,
453 IFloodlightService>();
454 m.put(IDatagridService.class, this);
455 return m;
456 }
457
458 /**
459 * Get the collection of modules this module depends on.
460 *
461 * @return the collection of modules this module depends on.
462 */
463 @Override
464 public Collection<Class<? extends IFloodlightService>>
465 getModuleDependencies() {
466 Collection<Class<? extends IFloodlightService>> l =
467 new ArrayList<Class<? extends IFloodlightService>>();
468 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700469 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700470 return l;
471 }
472
473 /**
474 * Initialize the module.
475 *
476 * @param context the module context to use for the initialization.
477 */
478 @Override
479 public void init(FloodlightModuleContext context)
480 throws FloodlightModuleException {
481 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700482 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700483
484 // Get the configuration file name and configure the Datagrid
485 Map<String, String> configMap = context.getConfigParams(this);
486 String configFilename = configMap.get(HazelcastConfigFile);
487 this.init(configFilename);
488 }
489
490 /**
491 * Startup module operation.
492 *
493 * @param context the module context to use for the startup.
494 */
495 @Override
496 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700497 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700498
499 restApi.addRestletRoutable(new DatagridWebRoutable());
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800500
Jonathan Hart7804bea2014-01-07 10:50:52 -0800501 packetOutMap = hazelcastInstance.getMap(packetOutMapName);
502 packetOutMap.addEntryListener(new PacketOutMapListener(), true);
503
504 arpReplyMap = hazelcastInstance.getMap(arpReplyMapName);
505 arpReplyMap.addEntryListener(new ArpReplyMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700506 }
507
508 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700509 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700510 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700511 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700512 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700513 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700514 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700515 */
516 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700517 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
518 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700519
520 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700521 mapFlowListener = new MapFlowListener();
522 mapFlow = hazelcastInstance.getMap(mapFlowName);
523 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700524
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700525 // Initialize the FlowEntry-related map state
526 mapFlowEntryListener = new MapFlowEntryListener();
527 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
528 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
529
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700530 // Initialize the Topology-related map state
531 mapTopologyListener = new MapTopologyListener();
532 mapTopology = hazelcastInstance.getMap(mapTopologyName);
533 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700534 }
535
536 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700537 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700538 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700539 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700540 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700541 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700542 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700543 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700544 */
545 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700546 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700547 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700548 mapFlow.removeEntryListener(mapFlowListenerId);
549 mapFlow = null;
550 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700551
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700552 // Clear the FlowEntry-related map state
553 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
554 mapFlowEntry = null;
555 mapFlowEntryListener = null;
556
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700557 // Clear the Topology-related map state
558 mapTopology.removeEntryListener(mapTopologyListenerId);
559 mapTopology = null;
560 mapTopologyListener = null;
561
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700562 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700563 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800564
565 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800566 public void registerPacketOutEventHandler(IPacketOutEventHandler arpEventHandler) {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800567 if (arpEventHandler != null) {
Jonathan Hart7804bea2014-01-07 10:50:52 -0800568 packetOutEventHandlers.add(arpEventHandler);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800569 }
570 }
571
572 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800573 public void deregisterPacketOutEventHandler(IPacketOutEventHandler arpEventHandler) {
574 packetOutEventHandlers.remove(arpEventHandler);
575 }
576
577 @Override
578 public void registerArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
579 if (arpReplyEventHandler != null) {
580 arpReplyEventHandlers.add(arpReplyEventHandler);
581 }
582 }
583
584 @Override
585 public void deregisterArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
586 arpReplyEventHandlers.remove(arpReplyEventHandler);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800587 }
588
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700589 /**
590 * Get all Flows that are currently in the datagrid.
591 *
592 * @return all Flows that are currently in the datagrid.
593 */
594 @Override
595 public Collection<FlowPath> getAllFlows() {
596 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
597
598 //
599 // Get all current entries
600 //
601 Collection<byte[]> values = mapFlow.values();
602 Kryo kryo = kryoFactory.newKryo();
603 for (byte[] valueBytes : values) {
604 //
605 // Decode the value
606 //
607 Input input = new Input(valueBytes);
608 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
609 allFlows.add(flowPath);
610 }
611 kryoFactory.deleteKryo(kryo);
612
613 return allFlows;
614 }
615
616 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800617 * Get a Flow for a given Flow ID.
618 *
619 * @param flowId the Flow ID of the Flow to get.
620 * @return the Flow if found, otherwise null.
621 */
622 @Override
623 public FlowPath getFlow(FlowId flowId) {
624 byte[] valueBytes = mapFlow.get(flowId.value());
625 if (valueBytes == null)
626 return null;
627
628 Kryo kryo = kryoFactory.newKryo();
629 //
630 // Decode the value
631 //
632 Input input = new Input(valueBytes);
633 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
634 kryoFactory.deleteKryo(kryo);
635
636 return flowPath;
637 }
638
639 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700640 * Send a notification that a Flow is added.
641 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700642 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700643 */
644 @Override
645 public void notificationSendFlowAdded(FlowPath flowPath) {
646 //
647 // Encode the value
648 //
649 byte[] buffer = new byte[MAX_BUFFER_SIZE];
650 Kryo kryo = kryoFactory.newKryo();
651 Output output = new Output(buffer, -1);
652 kryo.writeObject(output, flowPath);
653 byte[] valueBytes = output.toBytes();
654 kryoFactory.deleteKryo(kryo);
655
656 //
657 // Put the entry:
658 // - Key : Flow ID (Long)
659 // - Value : Serialized Flow (byte[])
660 //
661 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
662 }
663
664 /**
665 * Send a notification that a Flow is removed.
666 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700667 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700668 */
669 @Override
670 public void notificationSendFlowRemoved(FlowId flowId) {
671 //
672 // Remove the entry:
673 // - Key : Flow ID (Long)
674 // - Value : Serialized Flow (byte[])
675 //
676 mapFlow.removeAsync(flowId.value());
677 }
678
679 /**
680 * Send a notification that a Flow is updated.
681 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700682 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700683 */
684 @Override
685 public void notificationSendFlowUpdated(FlowPath flowPath) {
686 // NOTE: Adding an entry with an existing key automatically updates it
687 notificationSendFlowAdded(flowPath);
688 }
689
690 /**
691 * Send a notification that all Flows are removed.
692 */
693 @Override
694 public void notificationSendAllFlowsRemoved() {
695 //
696 // Remove all entries
697 // NOTE: We remove the entries one-by-one so the per-entry
698 // notifications will be delivered.
699 //
700 // mapFlow.clear();
701 Set<Long> keySet = mapFlow.keySet();
702 for (Long key : keySet) {
703 mapFlow.removeAsync(key);
704 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700705 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700706
707 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700708 * Get all Flow Entries that are currently in the datagrid.
709 *
710 * @return all Flow Entries that are currently in the datagrid.
711 */
712 @Override
713 public Collection<FlowEntry> getAllFlowEntries() {
714 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
715
716 //
717 // Get all current entries
718 //
719 Collection<byte[]> values = mapFlowEntry.values();
720 Kryo kryo = kryoFactory.newKryo();
721 for (byte[] valueBytes : values) {
722 //
723 // Decode the value
724 //
725 Input input = new Input(valueBytes);
726 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
727 allFlowEntries.add(flowEntry);
728 }
729 kryoFactory.deleteKryo(kryo);
730
731 return allFlowEntries;
732 }
733
734 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800735 * Get a Flow Entry for a given Flow Entry ID.
736 *
737 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
738 * @return the Flow Entry if found, otherwise null.
739 */
740 @Override
741 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
742 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
743 if (valueBytes == null)
744 return null;
745
746 Kryo kryo = kryoFactory.newKryo();
747 //
748 // Decode the value
749 //
750 Input input = new Input(valueBytes);
751 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
752 kryoFactory.deleteKryo(kryo);
753
754 return flowEntry;
755 }
756
757 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700758 * Send a notification that a FlowEntry is added.
759 *
760 * @param flowEntry the FlowEntry that is added.
761 */
762 @Override
763 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
764 //
765 // Encode the value
766 //
767 byte[] buffer = new byte[MAX_BUFFER_SIZE];
768 Kryo kryo = kryoFactory.newKryo();
769 Output output = new Output(buffer, -1);
770 kryo.writeObject(output, flowEntry);
771 byte[] valueBytes = output.toBytes();
772 kryoFactory.deleteKryo(kryo);
773
774 //
775 // Put the entry:
776 // - Key : FlowEntry ID (Long)
777 // - Value : Serialized FlowEntry (byte[])
778 //
779 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
780 }
781
782 /**
783 * Send a notification that a FlowEntry is removed.
784 *
785 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
786 */
787 @Override
788 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
789 //
790 // Remove the entry:
791 // - Key : FlowEntry ID (Long)
792 // - Value : Serialized FlowEntry (byte[])
793 //
794 mapFlowEntry.removeAsync(flowEntryId.value());
795 }
796
797 /**
798 * Send a notification that a FlowEntry is updated.
799 *
800 * @param flowEntry the FlowEntry that is updated.
801 */
802 @Override
803 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
804 // NOTE: Adding an entry with an existing key automatically updates it
805 notificationSendFlowEntryAdded(flowEntry);
806 }
807
808 /**
809 * Send a notification that all Flow Entries are removed.
810 */
811 @Override
812 public void notificationSendAllFlowEntriesRemoved() {
813 //
814 // Remove all entries
815 // NOTE: We remove the entries one-by-one so the per-entry
816 // notifications will be delivered.
817 //
818 // mapFlowEntry.clear();
819 Set<Long> keySet = mapFlowEntry.keySet();
820 for (Long key : keySet) {
821 mapFlowEntry.removeAsync(key);
822 }
823 }
824
825 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700826 * Get all Topology Elements that are currently in the datagrid.
827 *
828 * @return all Topology Elements that are currently in the datagrid.
829 */
830 @Override
831 public Collection<TopologyElement> getAllTopologyElements() {
832 Collection<TopologyElement> allTopologyElements =
833 new LinkedList<TopologyElement>();
834
835 //
836 // Get all current entries
837 //
838 Collection<byte[]> values = mapTopology.values();
839 Kryo kryo = kryoFactory.newKryo();
840 for (byte[] valueBytes : values) {
841 //
842 // Decode the value
843 //
844 Input input = new Input(valueBytes);
845 TopologyElement topologyElement =
846 kryo.readObject(input, TopologyElement.class);
847 allTopologyElements.add(topologyElement);
848 }
849 kryoFactory.deleteKryo(kryo);
850
851 return allTopologyElements;
852 }
853
854 /**
855 * Send a notification that a Topology Element is added.
856 *
857 * @param topologyElement the Topology Element that is added.
858 */
859 @Override
860 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
861 //
862 // Encode the value
863 //
864 byte[] buffer = new byte[MAX_BUFFER_SIZE];
865 Kryo kryo = kryoFactory.newKryo();
866 Output output = new Output(buffer, -1);
867 kryo.writeObject(output, topologyElement);
868 byte[] valueBytes = output.toBytes();
869 kryoFactory.deleteKryo(kryo);
870
871 //
872 // Put the entry:
873 // - Key : TopologyElement ID (String)
874 // - Value : Serialized TopologyElement (byte[])
875 //
876 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
877 }
878
879 /**
880 * Send a notification that a Topology Element is removed.
881 *
882 * @param topologyElement the Topology Element that is removed.
883 */
884 @Override
885 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
886 //
887 // Remove the entry:
888 // - Key : TopologyElement ID (String)
889 // - Value : Serialized TopologyElement (byte[])
890 //
891 mapTopology.removeAsync(topologyElement.elementId());
892 }
893
894 /**
895 * Send a notification that a Topology Element is updated.
896 *
897 * @param topologyElement the Topology Element that is updated.
898 */
899 @Override
900 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
901 // NOTE: Adding an entry with an existing key automatically updates it
902 notificationSendTopologyElementAdded(topologyElement);
903 }
904
905 /**
906 * Send a notification that all Topology Elements are removed.
907 */
908 @Override
909 public void notificationSendAllTopologyElementsRemoved() {
910 //
911 // Remove all entries
912 // NOTE: We remove the entries one-by-one so the per-entry
913 // notifications will be delivered.
914 //
915 // mapTopology.clear();
916 Set<String> keySet = mapTopology.keySet();
917 for (String key : keySet) {
918 mapTopology.removeAsync(key);
919 }
920 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800921
922 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800923 public void sendPacketOutNotification(PacketOutNotification packetOutNotification) {
924 packetOutMap.putAsync(packetOutNotification, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800925 }
Jonathan Hart7804bea2014-01-07 10:50:52 -0800926
927 @Override
928 public void sendArpReplyNotification(ArpReplyNotification arpReply) {
929 arpReplyMap.putAsync(arpReply, dummyByte, 1L, TimeUnit.MILLISECONDS);
930 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700931}