blob: 6483121a95cb55e7ece86a9b176e60c6e062c1c9 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hartd3003252013-11-15 09:44:46 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080022import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070023import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070024import net.onrc.onos.ofcontroller.util.FlowEntry;
25import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070026import net.onrc.onos.ofcontroller.util.FlowId;
27import net.onrc.onos.ofcontroller.util.FlowPath;
28import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
29
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070030import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
Jonathan Hart18ad55c2013-11-11 22:49:55 -080033import com.esotericsoftware.kryo2.Kryo;
34import com.esotericsoftware.kryo2.io.Input;
35import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070036import com.hazelcast.config.Config;
37import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070038import com.hazelcast.core.EntryEvent;
39import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070040import com.hazelcast.core.Hazelcast;
41import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070042import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070043import com.hazelcast.instance.GroupProperties;
44
45/**
46 * A datagrid service that uses Hazelcast as a datagrid.
47 * The relevant data is stored in the Hazelcast datagrid and shared as
48 * appropriate in a multi-node cluster.
49 */
50public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070051 private final static int MAX_BUFFER_SIZE = 64*1024;
52
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070053 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070054 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070055 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070056
57 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070058 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070059 private Config hazelcastConfig = null;
60
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070061 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070062 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070063
64 // State related to the Flow map
65 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070066 private IMap<Long, byte[]> mapFlow = null;
67 private MapFlowListener mapFlowListener = null;
68 private String mapFlowListenerId = null;
69
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070070 // State related to the Flow Entry map
71 protected static final String mapFlowEntryName = "mapFlowEntry";
72 private IMap<Long, byte[]> mapFlowEntry = null;
73 private MapFlowEntryListener mapFlowEntryListener = null;
74 private String mapFlowEntryListenerId = null;
75
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070076 // State related to the Network Topology map
77 protected static final String mapTopologyName = "mapTopology";
78 private IMap<String, byte[]> mapTopology = null;
79 private MapTopologyListener mapTopologyListener = null;
80 private String mapTopologyListenerId = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080081
82 // State related to the ARP map
83 protected static final String arpMapName = "arpMap";
Jonathan Hartd3003252013-11-15 09:44:46 -080084 private IMap<ArpMessage, byte[]> arpMap = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080085 private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
86 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070087
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070088 /**
89 * Class for receiving notifications for Flow state.
90 *
91 * The datagrid map is:
92 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -080093 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070094 */
95 class MapFlowListener implements EntryListener<Long, byte[]> {
96 /**
97 * Receive a notification that an entry is added.
98 *
99 * @param event the notification event for the entry.
100 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800101 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700102 byte[] valueBytes = (byte[])event.getValue();
103
104 //
105 // Decode the value and deliver the notification
106 //
107 Kryo kryo = kryoFactory.newKryo();
108 Input input = new Input(valueBytes);
109 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
110 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700111 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700112 }
113
114 /**
115 * Receive a notification that an entry is removed.
116 *
117 * @param event the notification event for the entry.
118 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800119 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700120 byte[] valueBytes = (byte[])event.getValue();
121
122 //
123 // Decode the value and deliver the notification
124 //
125 Kryo kryo = kryoFactory.newKryo();
126 Input input = new Input(valueBytes);
127 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
128 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700129 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700130 }
131
132 /**
133 * Receive a notification that an entry is updated.
134 *
135 * @param event the notification event for the entry.
136 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800137 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700138 byte[] valueBytes = (byte[])event.getValue();
139
140 //
141 // Decode the value and deliver the notification
142 //
143 Kryo kryo = kryoFactory.newKryo();
144 Input input = new Input(valueBytes);
145 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
146 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700147 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700148 }
149
150 /**
151 * Receive a notification that an entry is evicted.
152 *
153 * @param event the notification event for the entry.
154 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800155 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700156 // NOTE: We don't use eviction for this map
157 }
158 }
159
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700160 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700161 * Class for receiving notifications for FlowEntry state.
162 *
163 * The datagrid map is:
164 * - Key : FlowEntry ID (Long)
165 * - Value : Serialized FlowEntry (byte[])
166 */
167 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
168 /**
169 * Receive a notification that an entry is added.
170 *
171 * @param event the notification event for the entry.
172 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800173 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700174 byte[] valueBytes = (byte[])event.getValue();
175
176 //
177 // Decode the value and deliver the notification
178 //
179 Kryo kryo = kryoFactory.newKryo();
180 Input input = new Input(valueBytes);
181 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
182 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700183 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700184 }
185
186 /**
187 * Receive a notification that an entry is removed.
188 *
189 * @param event the notification event for the entry.
190 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800191 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700192 byte[] valueBytes = (byte[])event.getValue();
193
194 //
195 // Decode the value and deliver the notification
196 //
197 Kryo kryo = kryoFactory.newKryo();
198 Input input = new Input(valueBytes);
199 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
200 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700201 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700202 }
203
204 /**
205 * Receive a notification that an entry is updated.
206 *
207 * @param event the notification event for the entry.
208 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800209 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700210 byte[] valueBytes = (byte[])event.getValue();
211
212 //
213 // Decode the value and deliver the notification
214 //
215 Kryo kryo = kryoFactory.newKryo();
216 Input input = new Input(valueBytes);
217 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
218 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700219 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700220 }
221
222 /**
223 * Receive a notification that an entry is evicted.
224 *
225 * @param event the notification event for the entry.
226 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800227 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700228 // NOTE: We don't use eviction for this map
229 }
230 }
231
232 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700233 * Class for receiving notifications for Network Topology state.
234 *
235 * The datagrid map is:
236 * - Key: TopologyElement ID (String)
237 * - Value: Serialized TopologyElement (byte[])
238 */
239 class MapTopologyListener implements EntryListener<String, byte[]> {
240 /**
241 * Receive a notification that an entry is added.
242 *
243 * @param event the notification event for the entry.
244 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800245 public void entryAdded(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700246 byte[] valueBytes = (byte[])event.getValue();
247
248 //
249 // Decode the value and deliver the notification
250 //
251 Kryo kryo = kryoFactory.newKryo();
252 Input input = new Input(valueBytes);
253 TopologyElement topologyElement =
254 kryo.readObject(input, TopologyElement.class);
255 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700256 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700257 }
258
259 /**
260 * Receive a notification that an entry is removed.
261 *
262 * @param event the notification event for the entry.
263 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800264 public void entryRemoved(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700265 byte[] valueBytes = (byte[])event.getValue();
266
267 //
268 // Decode the value and deliver the notification
269 //
270 Kryo kryo = kryoFactory.newKryo();
271 Input input = new Input(valueBytes);
272 TopologyElement topologyElement =
273 kryo.readObject(input, TopologyElement.class);
274 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700275 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700276 }
277
278 /**
279 * Receive a notification that an entry is updated.
280 *
281 * @param event the notification event for the entry.
282 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800283 public void entryUpdated(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700284 byte[] valueBytes = (byte[])event.getValue();
285
286 //
287 // Decode the value and deliver the notification
288 //
289 Kryo kryo = kryoFactory.newKryo();
290 Input input = new Input(valueBytes);
291 TopologyElement topologyElement =
292 kryo.readObject(input, TopologyElement.class);
293 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700294 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700295 }
296
297 /**
298 * Receive a notification that an entry is evicted.
299 *
300 * @param event the notification event for the entry.
301 */
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800302 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700303 // NOTE: We don't use eviction for this map
304 }
305 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800306
307 /**
308 * Class for receiving notifications for ARP requests.
309 *
310 * The datagrid map is:
311 * - Key: Request ID (String)
312 * - Value: ARP request packet (byte[])
313 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800314 class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800315 /**
316 * Receive a notification that an entry is added.
317 *
318 * @param event the notification event for the entry.
319 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800320 public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800321 for (IArpEventHandler arpEventHandler : arpEventHandlers) {
322 arpEventHandler.arpRequestNotification(event.getKey());
323 }
324
325 //
326 // Decode the value and deliver the notification
327 //
328 /*
329 Kryo kryo = kryoFactory.newKryo();
330 Input input = new Input(valueBytes);
331 TopologyElement topologyElement =
332 kryo.readObject(input, TopologyElement.class);
333 kryoFactory.deleteKryo(kryo);
334 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
335 */
336 }
337
338 /**
339 * Receive a notification that an entry is removed.
340 *
341 * @param event the notification event for the entry.
342 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800343 public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800344 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800345 }
346
347 /**
348 * Receive a notification that an entry is updated.
349 *
350 * @param event the notification event for the entry.
351 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800352 public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800353 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800354 }
355
356 /**
357 * Receive a notification that an entry is evicted.
358 *
359 * @param event the notification event for the entry.
360 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800361 public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800362 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800363 }
364 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700365
366 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700367 * Initialize the Hazelcast Datagrid operation.
368 *
369 * @param conf the configuration filename.
370 */
371 public void init(String configFilename) {
372 /*
373 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
374 System.setProperty("hazelcast.socket.send.buffer.size", "32");
375 */
376 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
377
378 // Init from configuration file
379 try {
380 hazelcastConfig = new FileSystemXmlConfig(configFilename);
381 } catch (FileNotFoundException e) {
382 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
383 }
384 /*
385 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
386 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
387 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
388 */
389 //
390 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
391 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
392 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
393 }
394
395 /**
396 * Shutdown the Hazelcast Datagrid operation.
397 */
398 public void finalize() {
399 close();
400 }
401
402 /**
403 * Shutdown the Hazelcast Datagrid operation.
404 */
405 public void close() {
406 Hazelcast.shutdownAll();
407 }
408
409 /**
410 * Get the collection of offered module services.
411 *
412 * @return the collection of offered module services.
413 */
414 @Override
415 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
416 Collection<Class<? extends IFloodlightService>> l =
417 new ArrayList<Class<? extends IFloodlightService>>();
418 l.add(IDatagridService.class);
419 return l;
420 }
421
422 /**
423 * Get the collection of implemented services.
424 *
425 * @return the collection of implemented services.
426 */
427 @Override
428 public Map<Class<? extends IFloodlightService>, IFloodlightService>
429 getServiceImpls() {
430 Map<Class<? extends IFloodlightService>,
Pavlin Radoslavov27da7532013-10-18 18:41:50 -0700431 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700432 new HashMap<Class<? extends IFloodlightService>,
433 IFloodlightService>();
434 m.put(IDatagridService.class, this);
435 return m;
436 }
437
438 /**
439 * Get the collection of modules this module depends on.
440 *
441 * @return the collection of modules this module depends on.
442 */
443 @Override
444 public Collection<Class<? extends IFloodlightService>>
445 getModuleDependencies() {
446 Collection<Class<? extends IFloodlightService>> l =
447 new ArrayList<Class<? extends IFloodlightService>>();
448 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700449 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700450 return l;
451 }
452
453 /**
454 * Initialize the module.
455 *
456 * @param context the module context to use for the initialization.
457 */
458 @Override
459 public void init(FloodlightModuleContext context)
460 throws FloodlightModuleException {
461 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700462 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700463
464 // Get the configuration file name and configure the Datagrid
465 Map<String, String> configMap = context.getConfigParams(this);
466 String configFilename = configMap.get(HazelcastConfigFile);
467 this.init(configFilename);
468 }
469
470 /**
471 * Startup module operation.
472 *
473 * @param context the module context to use for the startup.
474 */
475 @Override
476 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700477 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700478
479 restApi.addRestletRoutable(new DatagridWebRoutable());
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800480
481 arpMap = hazelcastInstance.getMap(arpMapName);
482 arpMap.addEntryListener(new ArpMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700483 }
484
485 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700486 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700487 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700488 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700489 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700490 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700491 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700492 */
493 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700494 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
495 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700496
497 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700498 mapFlowListener = new MapFlowListener();
499 mapFlow = hazelcastInstance.getMap(mapFlowName);
500 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700501
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700502 // Initialize the FlowEntry-related map state
503 mapFlowEntryListener = new MapFlowEntryListener();
504 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
505 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
506
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700507 // Initialize the Topology-related map state
508 mapTopologyListener = new MapTopologyListener();
509 mapTopology = hazelcastInstance.getMap(mapTopologyName);
510 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700511 }
512
513 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700514 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700515 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700516 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700517 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700518 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700519 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700520 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700521 */
522 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700523 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700524 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700525 mapFlow.removeEntryListener(mapFlowListenerId);
526 mapFlow = null;
527 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700528
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700529 // Clear the FlowEntry-related map state
530 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
531 mapFlowEntry = null;
532 mapFlowEntryListener = null;
533
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700534 // Clear the Topology-related map state
535 mapTopology.removeEntryListener(mapTopologyListenerId);
536 mapTopology = null;
537 mapTopologyListener = null;
538
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700539 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700540 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800541
542 @Override
543 public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
544 if (arpEventHandler != null) {
545 arpEventHandlers.add(arpEventHandler);
546 }
547 }
548
549 @Override
550 public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
551 arpEventHandlers.remove(arpEventHandler);
552 }
553
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700554 /**
555 * Get all Flows that are currently in the datagrid.
556 *
557 * @return all Flows that are currently in the datagrid.
558 */
559 @Override
560 public Collection<FlowPath> getAllFlows() {
561 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
562
563 //
564 // Get all current entries
565 //
566 Collection<byte[]> values = mapFlow.values();
567 Kryo kryo = kryoFactory.newKryo();
568 for (byte[] valueBytes : values) {
569 //
570 // Decode the value
571 //
572 Input input = new Input(valueBytes);
573 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
574 allFlows.add(flowPath);
575 }
576 kryoFactory.deleteKryo(kryo);
577
578 return allFlows;
579 }
580
581 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800582 * Get a Flow for a given Flow ID.
583 *
584 * @param flowId the Flow ID of the Flow to get.
585 * @return the Flow if found, otherwise null.
586 */
587 @Override
588 public FlowPath getFlow(FlowId flowId) {
589 byte[] valueBytes = mapFlow.get(flowId.value());
590 if (valueBytes == null)
591 return null;
592
593 Kryo kryo = kryoFactory.newKryo();
594 //
595 // Decode the value
596 //
597 Input input = new Input(valueBytes);
598 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
599 kryoFactory.deleteKryo(kryo);
600
601 return flowPath;
602 }
603
604 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700605 * Send a notification that a Flow is added.
606 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700607 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700608 */
609 @Override
610 public void notificationSendFlowAdded(FlowPath flowPath) {
611 //
612 // Encode the value
613 //
614 byte[] buffer = new byte[MAX_BUFFER_SIZE];
615 Kryo kryo = kryoFactory.newKryo();
616 Output output = new Output(buffer, -1);
617 kryo.writeObject(output, flowPath);
618 byte[] valueBytes = output.toBytes();
619 kryoFactory.deleteKryo(kryo);
620
621 //
622 // Put the entry:
623 // - Key : Flow ID (Long)
624 // - Value : Serialized Flow (byte[])
625 //
626 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
627 }
628
629 /**
630 * Send a notification that a Flow is removed.
631 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700632 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700633 */
634 @Override
635 public void notificationSendFlowRemoved(FlowId flowId) {
636 //
637 // Remove the entry:
638 // - Key : Flow ID (Long)
639 // - Value : Serialized Flow (byte[])
640 //
641 mapFlow.removeAsync(flowId.value());
642 }
643
644 /**
645 * Send a notification that a Flow is updated.
646 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700647 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700648 */
649 @Override
650 public void notificationSendFlowUpdated(FlowPath flowPath) {
651 // NOTE: Adding an entry with an existing key automatically updates it
652 notificationSendFlowAdded(flowPath);
653 }
654
655 /**
656 * Send a notification that all Flows are removed.
657 */
658 @Override
659 public void notificationSendAllFlowsRemoved() {
660 //
661 // Remove all entries
662 // NOTE: We remove the entries one-by-one so the per-entry
663 // notifications will be delivered.
664 //
665 // mapFlow.clear();
666 Set<Long> keySet = mapFlow.keySet();
667 for (Long key : keySet) {
668 mapFlow.removeAsync(key);
669 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700670 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700671
672 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700673 * Get all Flow Entries that are currently in the datagrid.
674 *
675 * @return all Flow Entries that are currently in the datagrid.
676 */
677 @Override
678 public Collection<FlowEntry> getAllFlowEntries() {
679 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
680
681 //
682 // Get all current entries
683 //
684 Collection<byte[]> values = mapFlowEntry.values();
685 Kryo kryo = kryoFactory.newKryo();
686 for (byte[] valueBytes : values) {
687 //
688 // Decode the value
689 //
690 Input input = new Input(valueBytes);
691 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
692 allFlowEntries.add(flowEntry);
693 }
694 kryoFactory.deleteKryo(kryo);
695
696 return allFlowEntries;
697 }
698
699 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800700 * Get a Flow Entry for a given Flow Entry ID.
701 *
702 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
703 * @return the Flow Entry if found, otherwise null.
704 */
705 @Override
706 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
707 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
708 if (valueBytes == null)
709 return null;
710
711 Kryo kryo = kryoFactory.newKryo();
712 //
713 // Decode the value
714 //
715 Input input = new Input(valueBytes);
716 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
717 kryoFactory.deleteKryo(kryo);
718
719 return flowEntry;
720 }
721
722 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700723 * Send a notification that a FlowEntry is added.
724 *
725 * @param flowEntry the FlowEntry that is added.
726 */
727 @Override
728 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
729 //
730 // Encode the value
731 //
732 byte[] buffer = new byte[MAX_BUFFER_SIZE];
733 Kryo kryo = kryoFactory.newKryo();
734 Output output = new Output(buffer, -1);
735 kryo.writeObject(output, flowEntry);
736 byte[] valueBytes = output.toBytes();
737 kryoFactory.deleteKryo(kryo);
738
739 //
740 // Put the entry:
741 // - Key : FlowEntry ID (Long)
742 // - Value : Serialized FlowEntry (byte[])
743 //
744 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
745 }
746
747 /**
748 * Send a notification that a FlowEntry is removed.
749 *
750 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
751 */
752 @Override
753 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
754 //
755 // Remove the entry:
756 // - Key : FlowEntry ID (Long)
757 // - Value : Serialized FlowEntry (byte[])
758 //
759 mapFlowEntry.removeAsync(flowEntryId.value());
760 }
761
762 /**
763 * Send a notification that a FlowEntry is updated.
764 *
765 * @param flowEntry the FlowEntry that is updated.
766 */
767 @Override
768 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
769 // NOTE: Adding an entry with an existing key automatically updates it
770 notificationSendFlowEntryAdded(flowEntry);
771 }
772
773 /**
774 * Send a notification that all Flow Entries are removed.
775 */
776 @Override
777 public void notificationSendAllFlowEntriesRemoved() {
778 //
779 // Remove all entries
780 // NOTE: We remove the entries one-by-one so the per-entry
781 // notifications will be delivered.
782 //
783 // mapFlowEntry.clear();
784 Set<Long> keySet = mapFlowEntry.keySet();
785 for (Long key : keySet) {
786 mapFlowEntry.removeAsync(key);
787 }
788 }
789
790 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700791 * Get all Topology Elements that are currently in the datagrid.
792 *
793 * @return all Topology Elements that are currently in the datagrid.
794 */
795 @Override
796 public Collection<TopologyElement> getAllTopologyElements() {
797 Collection<TopologyElement> allTopologyElements =
798 new LinkedList<TopologyElement>();
799
800 //
801 // Get all current entries
802 //
803 Collection<byte[]> values = mapTopology.values();
804 Kryo kryo = kryoFactory.newKryo();
805 for (byte[] valueBytes : values) {
806 //
807 // Decode the value
808 //
809 Input input = new Input(valueBytes);
810 TopologyElement topologyElement =
811 kryo.readObject(input, TopologyElement.class);
812 allTopologyElements.add(topologyElement);
813 }
814 kryoFactory.deleteKryo(kryo);
815
816 return allTopologyElements;
817 }
818
819 /**
820 * Send a notification that a Topology Element is added.
821 *
822 * @param topologyElement the Topology Element that is added.
823 */
824 @Override
825 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
826 //
827 // Encode the value
828 //
829 byte[] buffer = new byte[MAX_BUFFER_SIZE];
830 Kryo kryo = kryoFactory.newKryo();
831 Output output = new Output(buffer, -1);
832 kryo.writeObject(output, topologyElement);
833 byte[] valueBytes = output.toBytes();
834 kryoFactory.deleteKryo(kryo);
835
836 //
837 // Put the entry:
838 // - Key : TopologyElement ID (String)
839 // - Value : Serialized TopologyElement (byte[])
840 //
841 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
842 }
843
844 /**
845 * Send a notification that a Topology Element is removed.
846 *
847 * @param topologyElement the Topology Element that is removed.
848 */
849 @Override
850 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
851 //
852 // Remove the entry:
853 // - Key : TopologyElement ID (String)
854 // - Value : Serialized TopologyElement (byte[])
855 //
856 mapTopology.removeAsync(topologyElement.elementId());
857 }
858
859 /**
860 * Send a notification that a Topology Element is updated.
861 *
862 * @param topologyElement the Topology Element that is updated.
863 */
864 @Override
865 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
866 // NOTE: Adding an entry with an existing key automatically updates it
867 notificationSendTopologyElementAdded(topologyElement);
868 }
869
870 /**
871 * Send a notification that all Topology Elements are removed.
872 */
873 @Override
874 public void notificationSendAllTopologyElementsRemoved() {
875 //
876 // Remove all entries
877 // NOTE: We remove the entries one-by-one so the per-entry
878 // notifications will be delivered.
879 //
880 // mapTopology.clear();
881 Set<String> keySet = mapTopology.keySet();
882 for (String key : keySet) {
883 mapTopology.removeAsync(key);
884 }
885 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800886
887 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800888 public void sendArpRequest(ArpMessage arpMessage) {
889 //log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
890 arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800891 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700892}