blob: d48e896ded30840c31fff45f67a9456cc347deb9 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hartd3003252013-11-15 09:44:46 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080022import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070023import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070024import net.onrc.onos.ofcontroller.util.FlowEntry;
25import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070026import net.onrc.onos.ofcontroller.util.FlowId;
27import net.onrc.onos.ofcontroller.util.FlowPath;
28import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
29
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070030import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
Jonathan Hart18ad55c2013-11-11 22:49:55 -080033import com.esotericsoftware.kryo2.Kryo;
34import com.esotericsoftware.kryo2.io.Input;
35import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070036import com.hazelcast.config.Config;
37import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070038import com.hazelcast.core.EntryEvent;
39import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070040import com.hazelcast.core.Hazelcast;
41import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070042import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070043import com.hazelcast.instance.GroupProperties;
44
45/**
46 * A datagrid service that uses Hazelcast as a datagrid.
47 * The relevant data is stored in the Hazelcast datagrid and shared as
48 * appropriate in a multi-node cluster.
49 */
50public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070051 private final static int MAX_BUFFER_SIZE = 64*1024;
52
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070053 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070054 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070055 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070056
57 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070058 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070059 private Config hazelcastConfig = null;
60
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070061 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070062 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070063
64 // State related to the Flow map
65 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070066 private IMap<Long, byte[]> mapFlow = null;
67 private MapFlowListener mapFlowListener = null;
68 private String mapFlowListenerId = null;
69
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070070 // State related to the Flow Entry map
71 protected static final String mapFlowEntryName = "mapFlowEntry";
72 private IMap<Long, byte[]> mapFlowEntry = null;
73 private MapFlowEntryListener mapFlowEntryListener = null;
74 private String mapFlowEntryListenerId = null;
75
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070076 // State related to the Network Topology map
77 protected static final String mapTopologyName = "mapTopology";
78 private IMap<String, byte[]> mapTopology = null;
79 private MapTopologyListener mapTopologyListener = null;
80 private String mapTopologyListenerId = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080081
82 // State related to the ARP map
83 protected static final String arpMapName = "arpMap";
Jonathan Hartd3003252013-11-15 09:44:46 -080084 private IMap<ArpMessage, byte[]> arpMap = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080085 private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
86 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070087
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070088 /**
89 * Class for receiving notifications for Flow state.
90 *
91 * The datagrid map is:
92 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -080093 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070094 */
95 class MapFlowListener implements EntryListener<Long, byte[]> {
96 /**
97 * Receive a notification that an entry is added.
98 *
99 * @param event the notification event for the entry.
100 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800101 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700102 byte[] valueBytes = (byte[])event.getValue();
103
104 //
105 // Decode the value and deliver the notification
106 //
107 Kryo kryo = kryoFactory.newKryo();
108 Input input = new Input(valueBytes);
109 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
110 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700111 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700112 }
113
114 /**
115 * Receive a notification that an entry is removed.
116 *
117 * @param event the notification event for the entry.
118 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800119 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700120 byte[] valueBytes = (byte[])event.getValue();
121
122 //
123 // Decode the value and deliver the notification
124 //
125 Kryo kryo = kryoFactory.newKryo();
126 Input input = new Input(valueBytes);
127 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
128 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700129 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700130 }
131
132 /**
133 * Receive a notification that an entry is updated.
134 *
135 * @param event the notification event for the entry.
136 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800137 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700138 byte[] valueBytes = (byte[])event.getValue();
139
140 //
141 // Decode the value and deliver the notification
142 //
143 Kryo kryo = kryoFactory.newKryo();
144 Input input = new Input(valueBytes);
145 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
146 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700147 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700148 }
149
150 /**
151 * Receive a notification that an entry is evicted.
152 *
153 * @param event the notification event for the entry.
154 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800155 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700156 // NOTE: We don't use eviction for this map
157 }
158 }
159
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700160 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700161 * Class for receiving notifications for FlowEntry state.
162 *
163 * The datagrid map is:
164 * - Key : FlowEntry ID (Long)
165 * - Value : Serialized FlowEntry (byte[])
166 */
167 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
168 /**
169 * Receive a notification that an entry is added.
170 *
171 * @param event the notification event for the entry.
172 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800173 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700174 //
175 // NOTE: Ignore Flow Entries Events originated by this instance
176 //
177 if (event.getMember().localMember())
178 return;
179
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700180 byte[] valueBytes = (byte[])event.getValue();
181
182 //
183 // Decode the value and deliver the notification
184 //
185 Kryo kryo = kryoFactory.newKryo();
186 Input input = new Input(valueBytes);
187 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
188 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700189 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700190 }
191
192 /**
193 * Receive a notification that an entry is removed.
194 *
195 * @param event the notification event for the entry.
196 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800197 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700198 //
199 // NOTE: Ignore Flow Entries Events originated by this instance
200 //
201 if (event.getMember().localMember())
202 return;
203
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700204 byte[] valueBytes = (byte[])event.getValue();
205
206 //
207 // Decode the value and deliver the notification
208 //
209 Kryo kryo = kryoFactory.newKryo();
210 Input input = new Input(valueBytes);
211 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
212 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700213 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700214 }
215
216 /**
217 * Receive a notification that an entry is updated.
218 *
219 * @param event the notification event for the entry.
220 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800221 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov4cd65772013-10-31 12:50:31 -0700222 //
223 // NOTE: Ignore Flow Entries Events originated by this instance
224 //
225 if (event.getMember().localMember())
226 return;
227
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700228 byte[] valueBytes = (byte[])event.getValue();
229
230 //
231 // Decode the value and deliver the notification
232 //
233 Kryo kryo = kryoFactory.newKryo();
234 Input input = new Input(valueBytes);
235 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
236 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700237 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700238 }
239
240 /**
241 * Receive a notification that an entry is evicted.
242 *
243 * @param event the notification event for the entry.
244 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800245 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700246 // NOTE: We don't use eviction for this map
247 }
248 }
249
250 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700251 * Class for receiving notifications for Network Topology state.
252 *
253 * The datagrid map is:
254 * - Key: TopologyElement ID (String)
255 * - Value: Serialized TopologyElement (byte[])
256 */
257 class MapTopologyListener implements EntryListener<String, byte[]> {
258 /**
259 * Receive a notification that an entry is added.
260 *
261 * @param event the notification event for the entry.
262 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800263 public void entryAdded(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700264 byte[] valueBytes = (byte[])event.getValue();
265
266 //
267 // Decode the value and deliver the notification
268 //
269 Kryo kryo = kryoFactory.newKryo();
270 Input input = new Input(valueBytes);
271 TopologyElement topologyElement =
272 kryo.readObject(input, TopologyElement.class);
273 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700274 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700275 }
276
277 /**
278 * Receive a notification that an entry is removed.
279 *
280 * @param event the notification event for the entry.
281 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800282 public void entryRemoved(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700283 byte[] valueBytes = (byte[])event.getValue();
284
285 //
286 // Decode the value and deliver the notification
287 //
288 Kryo kryo = kryoFactory.newKryo();
289 Input input = new Input(valueBytes);
290 TopologyElement topologyElement =
291 kryo.readObject(input, TopologyElement.class);
292 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700293 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700294 }
295
296 /**
297 * Receive a notification that an entry is updated.
298 *
299 * @param event the notification event for the entry.
300 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800301 public void entryUpdated(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700302 byte[] valueBytes = (byte[])event.getValue();
303
304 //
305 // Decode the value and deliver the notification
306 //
307 Kryo kryo = kryoFactory.newKryo();
308 Input input = new Input(valueBytes);
309 TopologyElement topologyElement =
310 kryo.readObject(input, TopologyElement.class);
311 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700312 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700313 }
314
315 /**
316 * Receive a notification that an entry is evicted.
317 *
318 * @param event the notification event for the entry.
319 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800320 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700321 // NOTE: We don't use eviction for this map
322 }
323 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800324
325 /**
326 * Class for receiving notifications for ARP requests.
327 *
328 * The datagrid map is:
329 * - Key: Request ID (String)
330 * - Value: ARP request packet (byte[])
331 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800332 class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800333 /**
334 * Receive a notification that an entry is added.
335 *
336 * @param event the notification event for the entry.
337 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800338 public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800339 for (IArpEventHandler arpEventHandler : arpEventHandlers) {
340 arpEventHandler.arpRequestNotification(event.getKey());
341 }
342
343 //
344 // Decode the value and deliver the notification
345 //
346 /*
347 Kryo kryo = kryoFactory.newKryo();
348 Input input = new Input(valueBytes);
349 TopologyElement topologyElement =
350 kryo.readObject(input, TopologyElement.class);
351 kryoFactory.deleteKryo(kryo);
352 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
353 */
354 }
355
356 /**
357 * Receive a notification that an entry is removed.
358 *
359 * @param event the notification event for the entry.
360 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800361 public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800362 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800363 }
364
365 /**
366 * Receive a notification that an entry is updated.
367 *
368 * @param event the notification event for the entry.
369 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800370 public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800371 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800372 }
373
374 /**
375 * Receive a notification that an entry is evicted.
376 *
377 * @param event the notification event for the entry.
378 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800379 public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800380 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800381 }
382 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700383
384 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700385 * Initialize the Hazelcast Datagrid operation.
386 *
387 * @param conf the configuration filename.
388 */
389 public void init(String configFilename) {
390 /*
391 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
392 System.setProperty("hazelcast.socket.send.buffer.size", "32");
393 */
394 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
395
396 // Init from configuration file
397 try {
398 hazelcastConfig = new FileSystemXmlConfig(configFilename);
399 } catch (FileNotFoundException e) {
400 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
401 }
402 /*
403 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
404 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
405 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
406 */
407 //
408 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
409 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
410 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
411 }
412
413 /**
414 * Shutdown the Hazelcast Datagrid operation.
415 */
416 public void finalize() {
417 close();
418 }
419
420 /**
421 * Shutdown the Hazelcast Datagrid operation.
422 */
423 public void close() {
424 Hazelcast.shutdownAll();
425 }
426
427 /**
428 * Get the collection of offered module services.
429 *
430 * @return the collection of offered module services.
431 */
432 @Override
433 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
434 Collection<Class<? extends IFloodlightService>> l =
435 new ArrayList<Class<? extends IFloodlightService>>();
436 l.add(IDatagridService.class);
437 return l;
438 }
439
440 /**
441 * Get the collection of implemented services.
442 *
443 * @return the collection of implemented services.
444 */
445 @Override
446 public Map<Class<? extends IFloodlightService>, IFloodlightService>
447 getServiceImpls() {
448 Map<Class<? extends IFloodlightService>,
Pavlin Radoslavov27da7532013-10-18 18:41:50 -0700449 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700450 new HashMap<Class<? extends IFloodlightService>,
451 IFloodlightService>();
452 m.put(IDatagridService.class, this);
453 return m;
454 }
455
456 /**
457 * Get the collection of modules this module depends on.
458 *
459 * @return the collection of modules this module depends on.
460 */
461 @Override
462 public Collection<Class<? extends IFloodlightService>>
463 getModuleDependencies() {
464 Collection<Class<? extends IFloodlightService>> l =
465 new ArrayList<Class<? extends IFloodlightService>>();
466 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700467 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700468 return l;
469 }
470
471 /**
472 * Initialize the module.
473 *
474 * @param context the module context to use for the initialization.
475 */
476 @Override
477 public void init(FloodlightModuleContext context)
478 throws FloodlightModuleException {
479 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700480 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700481
482 // Get the configuration file name and configure the Datagrid
483 Map<String, String> configMap = context.getConfigParams(this);
484 String configFilename = configMap.get(HazelcastConfigFile);
485 this.init(configFilename);
486 }
487
488 /**
489 * Startup module operation.
490 *
491 * @param context the module context to use for the startup.
492 */
493 @Override
494 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700495 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700496
497 restApi.addRestletRoutable(new DatagridWebRoutable());
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800498
499 arpMap = hazelcastInstance.getMap(arpMapName);
500 arpMap.addEntryListener(new ArpMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700501 }
502
503 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700504 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700505 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700506 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700507 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700508 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700509 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700510 */
511 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700512 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
513 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700514
515 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700516 mapFlowListener = new MapFlowListener();
517 mapFlow = hazelcastInstance.getMap(mapFlowName);
518 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700519
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700520 // Initialize the FlowEntry-related map state
521 mapFlowEntryListener = new MapFlowEntryListener();
522 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
523 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
524
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700525 // Initialize the Topology-related map state
526 mapTopologyListener = new MapTopologyListener();
527 mapTopology = hazelcastInstance.getMap(mapTopologyName);
528 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700529 }
530
531 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700532 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700533 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700534 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700535 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700536 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700537 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700538 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700539 */
540 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700541 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700542 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700543 mapFlow.removeEntryListener(mapFlowListenerId);
544 mapFlow = null;
545 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700546
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700547 // Clear the FlowEntry-related map state
548 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
549 mapFlowEntry = null;
550 mapFlowEntryListener = null;
551
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700552 // Clear the Topology-related map state
553 mapTopology.removeEntryListener(mapTopologyListenerId);
554 mapTopology = null;
555 mapTopologyListener = null;
556
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700557 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700558 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800559
560 @Override
561 public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
562 if (arpEventHandler != null) {
563 arpEventHandlers.add(arpEventHandler);
564 }
565 }
566
567 @Override
568 public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
569 arpEventHandlers.remove(arpEventHandler);
570 }
571
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700572 /**
573 * Get all Flows that are currently in the datagrid.
574 *
575 * @return all Flows that are currently in the datagrid.
576 */
577 @Override
578 public Collection<FlowPath> getAllFlows() {
579 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
580
581 //
582 // Get all current entries
583 //
584 Collection<byte[]> values = mapFlow.values();
585 Kryo kryo = kryoFactory.newKryo();
586 for (byte[] valueBytes : values) {
587 //
588 // Decode the value
589 //
590 Input input = new Input(valueBytes);
591 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
592 allFlows.add(flowPath);
593 }
594 kryoFactory.deleteKryo(kryo);
595
596 return allFlows;
597 }
598
599 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800600 * Get a Flow for a given Flow ID.
601 *
602 * @param flowId the Flow ID of the Flow to get.
603 * @return the Flow if found, otherwise null.
604 */
605 @Override
606 public FlowPath getFlow(FlowId flowId) {
607 byte[] valueBytes = mapFlow.get(flowId.value());
608 if (valueBytes == null)
609 return null;
610
611 Kryo kryo = kryoFactory.newKryo();
612 //
613 // Decode the value
614 //
615 Input input = new Input(valueBytes);
616 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
617 kryoFactory.deleteKryo(kryo);
618
619 return flowPath;
620 }
621
622 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700623 * Send a notification that a Flow is added.
624 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700625 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700626 */
627 @Override
628 public void notificationSendFlowAdded(FlowPath flowPath) {
629 //
630 // Encode the value
631 //
632 byte[] buffer = new byte[MAX_BUFFER_SIZE];
633 Kryo kryo = kryoFactory.newKryo();
634 Output output = new Output(buffer, -1);
635 kryo.writeObject(output, flowPath);
636 byte[] valueBytes = output.toBytes();
637 kryoFactory.deleteKryo(kryo);
638
639 //
640 // Put the entry:
641 // - Key : Flow ID (Long)
642 // - Value : Serialized Flow (byte[])
643 //
644 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
645 }
646
647 /**
648 * Send a notification that a Flow is removed.
649 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700650 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700651 */
652 @Override
653 public void notificationSendFlowRemoved(FlowId flowId) {
654 //
655 // Remove the entry:
656 // - Key : Flow ID (Long)
657 // - Value : Serialized Flow (byte[])
658 //
659 mapFlow.removeAsync(flowId.value());
660 }
661
662 /**
663 * Send a notification that a Flow is updated.
664 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700665 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700666 */
667 @Override
668 public void notificationSendFlowUpdated(FlowPath flowPath) {
669 // NOTE: Adding an entry with an existing key automatically updates it
670 notificationSendFlowAdded(flowPath);
671 }
672
673 /**
674 * Send a notification that all Flows are removed.
675 */
676 @Override
677 public void notificationSendAllFlowsRemoved() {
678 //
679 // Remove all entries
680 // NOTE: We remove the entries one-by-one so the per-entry
681 // notifications will be delivered.
682 //
683 // mapFlow.clear();
684 Set<Long> keySet = mapFlow.keySet();
685 for (Long key : keySet) {
686 mapFlow.removeAsync(key);
687 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700688 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700689
690 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700691 * Get all Flow Entries that are currently in the datagrid.
692 *
693 * @return all Flow Entries that are currently in the datagrid.
694 */
695 @Override
696 public Collection<FlowEntry> getAllFlowEntries() {
697 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
698
699 //
700 // Get all current entries
701 //
702 Collection<byte[]> values = mapFlowEntry.values();
703 Kryo kryo = kryoFactory.newKryo();
704 for (byte[] valueBytes : values) {
705 //
706 // Decode the value
707 //
708 Input input = new Input(valueBytes);
709 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
710 allFlowEntries.add(flowEntry);
711 }
712 kryoFactory.deleteKryo(kryo);
713
714 return allFlowEntries;
715 }
716
717 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800718 * Get a Flow Entry for a given Flow Entry ID.
719 *
720 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
721 * @return the Flow Entry if found, otherwise null.
722 */
723 @Override
724 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
725 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
726 if (valueBytes == null)
727 return null;
728
729 Kryo kryo = kryoFactory.newKryo();
730 //
731 // Decode the value
732 //
733 Input input = new Input(valueBytes);
734 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
735 kryoFactory.deleteKryo(kryo);
736
737 return flowEntry;
738 }
739
740 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700741 * Send a notification that a FlowEntry is added.
742 *
743 * @param flowEntry the FlowEntry that is added.
744 */
745 @Override
746 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
747 //
748 // Encode the value
749 //
750 byte[] buffer = new byte[MAX_BUFFER_SIZE];
751 Kryo kryo = kryoFactory.newKryo();
752 Output output = new Output(buffer, -1);
753 kryo.writeObject(output, flowEntry);
754 byte[] valueBytes = output.toBytes();
755 kryoFactory.deleteKryo(kryo);
756
757 //
758 // Put the entry:
759 // - Key : FlowEntry ID (Long)
760 // - Value : Serialized FlowEntry (byte[])
761 //
762 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
763 }
764
765 /**
766 * Send a notification that a FlowEntry is removed.
767 *
768 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
769 */
770 @Override
771 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
772 //
773 // Remove the entry:
774 // - Key : FlowEntry ID (Long)
775 // - Value : Serialized FlowEntry (byte[])
776 //
777 mapFlowEntry.removeAsync(flowEntryId.value());
778 }
779
780 /**
781 * Send a notification that a FlowEntry is updated.
782 *
783 * @param flowEntry the FlowEntry that is updated.
784 */
785 @Override
786 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
787 // NOTE: Adding an entry with an existing key automatically updates it
788 notificationSendFlowEntryAdded(flowEntry);
789 }
790
791 /**
792 * Send a notification that all Flow Entries are removed.
793 */
794 @Override
795 public void notificationSendAllFlowEntriesRemoved() {
796 //
797 // Remove all entries
798 // NOTE: We remove the entries one-by-one so the per-entry
799 // notifications will be delivered.
800 //
801 // mapFlowEntry.clear();
802 Set<Long> keySet = mapFlowEntry.keySet();
803 for (Long key : keySet) {
804 mapFlowEntry.removeAsync(key);
805 }
806 }
807
808 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700809 * Get all Topology Elements that are currently in the datagrid.
810 *
811 * @return all Topology Elements that are currently in the datagrid.
812 */
813 @Override
814 public Collection<TopologyElement> getAllTopologyElements() {
815 Collection<TopologyElement> allTopologyElements =
816 new LinkedList<TopologyElement>();
817
818 //
819 // Get all current entries
820 //
821 Collection<byte[]> values = mapTopology.values();
822 Kryo kryo = kryoFactory.newKryo();
823 for (byte[] valueBytes : values) {
824 //
825 // Decode the value
826 //
827 Input input = new Input(valueBytes);
828 TopologyElement topologyElement =
829 kryo.readObject(input, TopologyElement.class);
830 allTopologyElements.add(topologyElement);
831 }
832 kryoFactory.deleteKryo(kryo);
833
834 return allTopologyElements;
835 }
836
837 /**
838 * Send a notification that a Topology Element is added.
839 *
840 * @param topologyElement the Topology Element that is added.
841 */
842 @Override
843 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
844 //
845 // Encode the value
846 //
847 byte[] buffer = new byte[MAX_BUFFER_SIZE];
848 Kryo kryo = kryoFactory.newKryo();
849 Output output = new Output(buffer, -1);
850 kryo.writeObject(output, topologyElement);
851 byte[] valueBytes = output.toBytes();
852 kryoFactory.deleteKryo(kryo);
853
854 //
855 // Put the entry:
856 // - Key : TopologyElement ID (String)
857 // - Value : Serialized TopologyElement (byte[])
858 //
859 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
860 }
861
862 /**
863 * Send a notification that a Topology Element is removed.
864 *
865 * @param topologyElement the Topology Element that is removed.
866 */
867 @Override
868 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
869 //
870 // Remove the entry:
871 // - Key : TopologyElement ID (String)
872 // - Value : Serialized TopologyElement (byte[])
873 //
874 mapTopology.removeAsync(topologyElement.elementId());
875 }
876
877 /**
878 * Send a notification that a Topology Element is updated.
879 *
880 * @param topologyElement the Topology Element that is updated.
881 */
882 @Override
883 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
884 // NOTE: Adding an entry with an existing key automatically updates it
885 notificationSendTopologyElementAdded(topologyElement);
886 }
887
888 /**
889 * Send a notification that all Topology Elements are removed.
890 */
891 @Override
892 public void notificationSendAllTopologyElementsRemoved() {
893 //
894 // Remove all entries
895 // NOTE: We remove the entries one-by-one so the per-entry
896 // notifications will be delivered.
897 //
898 // mapTopology.clear();
899 Set<String> keySet = mapTopology.keySet();
900 for (String key : keySet) {
901 mapTopology.removeAsync(key);
902 }
903 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800904
905 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800906 public void sendArpRequest(ArpMessage arpMessage) {
907 //log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
908 arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800909 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700910}