blob: 33091b95852d58190b8b972d420dec73af7de7a6 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hartd3003252013-11-15 09:44:46 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080022import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070023import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070024import net.onrc.onos.ofcontroller.util.FlowEntry;
25import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070026import net.onrc.onos.ofcontroller.util.FlowId;
27import net.onrc.onos.ofcontroller.util.FlowPath;
28import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
29
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070030import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
Jonathan Hart18ad55c2013-11-11 22:49:55 -080033import com.esotericsoftware.kryo2.Kryo;
34import com.esotericsoftware.kryo2.io.Input;
35import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070036import com.hazelcast.config.Config;
37import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070038import com.hazelcast.core.EntryEvent;
39import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070040import com.hazelcast.core.Hazelcast;
41import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070042import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070043import com.hazelcast.instance.GroupProperties;
44
45/**
46 * A datagrid service that uses Hazelcast as a datagrid.
47 * The relevant data is stored in the Hazelcast datagrid and shared as
48 * appropriate in a multi-node cluster.
49 */
50public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070051 private final static int MAX_BUFFER_SIZE = 64*1024;
52
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070053 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070054 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070055 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070056
57 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070058 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070059 private Config hazelcastConfig = null;
60
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070061 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070062 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070063
64 // State related to the Flow map
65 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070066 private IMap<Long, byte[]> mapFlow = null;
67 private MapFlowListener mapFlowListener = null;
68 private String mapFlowListenerId = null;
69
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070070 // State related to the Flow Entry map
71 protected static final String mapFlowEntryName = "mapFlowEntry";
72 private IMap<Long, byte[]> mapFlowEntry = null;
73 private MapFlowEntryListener mapFlowEntryListener = null;
74 private String mapFlowEntryListenerId = null;
75
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070076 // State related to the Network Topology map
77 protected static final String mapTopologyName = "mapTopology";
78 private IMap<String, byte[]> mapTopology = null;
79 private MapTopologyListener mapTopologyListener = null;
80 private String mapTopologyListenerId = null;
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -080081
Jonathan Hart18ad55c2013-11-11 22:49:55 -080082 // State related to the ARP map
83 protected static final String arpMapName = "arpMap";
Jonathan Hartd3003252013-11-15 09:44:46 -080084 private IMap<ArpMessage, byte[]> arpMap = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080085 private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
86 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070087
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070088 /**
89 * Class for receiving notifications for Flow state.
90 *
91 * The datagrid map is:
92 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -080093 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070094 */
95 class MapFlowListener implements EntryListener<Long, byte[]> {
96 /**
97 * Receive a notification that an entry is added.
98 *
99 * @param event the notification event for the entry.
100 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800101 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800102 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800103 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700104
105 //
106 // Decode the value and deliver the notification
107 //
108 Kryo kryo = kryoFactory.newKryo();
109 Input input = new Input(valueBytes);
110 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
111 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700112 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700113 }
114
115 /**
116 * Receive a notification that an entry is removed.
117 *
118 * @param event the notification event for the entry.
119 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800120 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800121 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800122 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700123
124 //
125 // Decode the value and deliver the notification
126 //
127 Kryo kryo = kryoFactory.newKryo();
128 Input input = new Input(valueBytes);
129 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
130 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700131 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700132 }
133
134 /**
135 * Receive a notification that an entry is updated.
136 *
137 * @param event the notification event for the entry.
138 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800139 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800140 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800141 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700142
143 //
144 // Decode the value and deliver the notification
145 //
146 Kryo kryo = kryoFactory.newKryo();
147 Input input = new Input(valueBytes);
148 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
149 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700150 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700151 }
152
153 /**
154 * Receive a notification that an entry is evicted.
155 *
156 * @param event the notification event for the entry.
157 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800158 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800159 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700160 // NOTE: We don't use eviction for this map
161 }
162 }
163
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700164 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700165 * Class for receiving notifications for FlowEntry state.
166 *
167 * The datagrid map is:
168 * - Key : FlowEntry ID (Long)
169 * - Value : Serialized FlowEntry (byte[])
170 */
171 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
172 /**
173 * Receive a notification that an entry is added.
174 *
175 * @param event the notification event for the entry.
176 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800177 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800178 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800179 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700180
181 //
182 // Decode the value and deliver the notification
183 //
184 Kryo kryo = kryoFactory.newKryo();
185 Input input = new Input(valueBytes);
186 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
187 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700188 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700189 }
190
191 /**
192 * Receive a notification that an entry is removed.
193 *
194 * @param event the notification event for the entry.
195 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800196 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800197 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800198 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700199
200 //
201 // Decode the value and deliver the notification
202 //
203 Kryo kryo = kryoFactory.newKryo();
204 Input input = new Input(valueBytes);
205 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
206 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700207 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700208 }
209
210 /**
211 * Receive a notification that an entry is updated.
212 *
213 * @param event the notification event for the entry.
214 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800215 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800216 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800217 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700218
219 //
220 // Decode the value and deliver the notification
221 //
222 Kryo kryo = kryoFactory.newKryo();
223 Input input = new Input(valueBytes);
224 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
225 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700226 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700227 }
228
229 /**
230 * Receive a notification that an entry is evicted.
231 *
232 * @param event the notification event for the entry.
233 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800234 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800235 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700236 // NOTE: We don't use eviction for this map
237 }
238 }
239
240 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700241 * Class for receiving notifications for Network Topology state.
242 *
243 * The datagrid map is:
244 * - Key: TopologyElement ID (String)
245 * - Value: Serialized TopologyElement (byte[])
246 */
247 class MapTopologyListener implements EntryListener<String, byte[]> {
248 /**
249 * Receive a notification that an entry is added.
250 *
251 * @param event the notification event for the entry.
252 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800253 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800254 public void entryAdded(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800255 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700256
257 //
258 // Decode the value and deliver the notification
259 //
260 Kryo kryo = kryoFactory.newKryo();
261 Input input = new Input(valueBytes);
262 TopologyElement topologyElement =
263 kryo.readObject(input, TopologyElement.class);
264 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700265 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700266 }
267
268 /**
269 * Receive a notification that an entry is removed.
270 *
271 * @param event the notification event for the entry.
272 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800273 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800274 public void entryRemoved(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800275 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700276
277 //
278 // Decode the value and deliver the notification
279 //
280 Kryo kryo = kryoFactory.newKryo();
281 Input input = new Input(valueBytes);
282 TopologyElement topologyElement =
283 kryo.readObject(input, TopologyElement.class);
284 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700285 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700286 }
287
288 /**
289 * Receive a notification that an entry is updated.
290 *
291 * @param event the notification event for the entry.
292 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800293 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800294 public void entryUpdated(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800295 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700296
297 //
298 // Decode the value and deliver the notification
299 //
300 Kryo kryo = kryoFactory.newKryo();
301 Input input = new Input(valueBytes);
302 TopologyElement topologyElement =
303 kryo.readObject(input, TopologyElement.class);
304 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700305 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700306 }
307
308 /**
309 * Receive a notification that an entry is evicted.
310 *
311 * @param event the notification event for the entry.
312 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800313 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800314 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700315 // NOTE: We don't use eviction for this map
316 }
317 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800318
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800319 /**
320 * Class for receiving notifications for ARP requests.
321 *
322 * The datagrid map is:
323 * - Key: Request ID (String)
324 * - Value: ARP request packet (byte[])
325 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800326 class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800327 /**
328 * Receive a notification that an entry is added.
329 *
330 * @param event the notification event for the entry.
331 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800332 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800333 public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800334 for (IArpEventHandler arpEventHandler : arpEventHandlers) {
335 arpEventHandler.arpRequestNotification(event.getKey());
336 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800337
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800338 //
339 // Decode the value and deliver the notification
340 //
341 /*
342 Kryo kryo = kryoFactory.newKryo();
343 Input input = new Input(valueBytes);
344 TopologyElement topologyElement =
345 kryo.readObject(input, TopologyElement.class);
346 kryoFactory.deleteKryo(kryo);
347 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
348 */
349 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800350
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800351 /**
352 * Receive a notification that an entry is removed.
353 *
354 * @param event the notification event for the entry.
355 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800356 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800357 public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800358 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800359 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800360
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800361 /**
362 * Receive a notification that an entry is updated.
363 *
364 * @param event the notification event for the entry.
365 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800366 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800367 public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800368 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800369 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800370
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800371 /**
372 * Receive a notification that an entry is evicted.
373 *
374 * @param event the notification event for the entry.
375 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800376 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800377 public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800378 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800379 }
380 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700381
382 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700383 * Initialize the Hazelcast Datagrid operation.
384 *
385 * @param conf the configuration filename.
386 */
387 public void init(String configFilename) {
388 /*
389 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
390 System.setProperty("hazelcast.socket.send.buffer.size", "32");
391 */
392 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800393
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700394 // Init from configuration file
395 try {
396 hazelcastConfig = new FileSystemXmlConfig(configFilename);
397 } catch (FileNotFoundException e) {
398 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
399 }
400 /*
401 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
402 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
403 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
404 */
405 //
406 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
407 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
408 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
409 }
410
411 /**
412 * Shutdown the Hazelcast Datagrid operation.
413 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800414 @Override
415 protected void finalize() {
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700416 close();
417 }
418
419 /**
420 * Shutdown the Hazelcast Datagrid operation.
421 */
422 public void close() {
423 Hazelcast.shutdownAll();
424 }
425
426 /**
427 * Get the collection of offered module services.
428 *
429 * @return the collection of offered module services.
430 */
431 @Override
432 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800433 Collection<Class<? extends IFloodlightService>> l =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700434 new ArrayList<Class<? extends IFloodlightService>>();
435 l.add(IDatagridService.class);
436 return l;
437 }
438
439 /**
440 * Get the collection of implemented services.
441 *
442 * @return the collection of implemented services.
443 */
444 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800445 public Map<Class<? extends IFloodlightService>, IFloodlightService>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700446 getServiceImpls() {
447 Map<Class<? extends IFloodlightService>,
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800448 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700449 new HashMap<Class<? extends IFloodlightService>,
450 IFloodlightService>();
451 m.put(IDatagridService.class, this);
452 return m;
453 }
454
455 /**
456 * Get the collection of modules this module depends on.
457 *
458 * @return the collection of modules this module depends on.
459 */
460 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800461 public Collection<Class<? extends IFloodlightService>>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700462 getModuleDependencies() {
463 Collection<Class<? extends IFloodlightService>> l =
464 new ArrayList<Class<? extends IFloodlightService>>();
465 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700466 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700467 return l;
468 }
469
470 /**
471 * Initialize the module.
472 *
473 * @param context the module context to use for the initialization.
474 */
475 @Override
476 public void init(FloodlightModuleContext context)
477 throws FloodlightModuleException {
478 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700479 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700480
481 // Get the configuration file name and configure the Datagrid
482 Map<String, String> configMap = context.getConfigParams(this);
483 String configFilename = configMap.get(HazelcastConfigFile);
484 this.init(configFilename);
485 }
486
487 /**
488 * Startup module operation.
489 *
490 * @param context the module context to use for the startup.
491 */
492 @Override
493 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700494 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700495
496 restApi.addRestletRoutable(new DatagridWebRoutable());
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800497
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800498 arpMap = hazelcastInstance.getMap(arpMapName);
499 arpMap.addEntryListener(new ArpMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700500 }
501
502 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700503 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700504 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700505 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700506 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700507 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700508 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700509 */
510 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700511 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
512 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700513
514 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700515 mapFlowListener = new MapFlowListener();
516 mapFlow = hazelcastInstance.getMap(mapFlowName);
517 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700518
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700519 // Initialize the FlowEntry-related map state
520 mapFlowEntryListener = new MapFlowEntryListener();
521 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
522 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
523
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700524 // Initialize the Topology-related map state
525 mapTopologyListener = new MapTopologyListener();
526 mapTopology = hazelcastInstance.getMap(mapTopologyName);
527 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700528 }
529
530 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700531 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700532 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700533 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700534 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700535 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700536 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700537 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700538 */
539 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700540 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700541 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700542 mapFlow.removeEntryListener(mapFlowListenerId);
543 mapFlow = null;
544 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700545
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700546 // Clear the FlowEntry-related map state
547 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
548 mapFlowEntry = null;
549 mapFlowEntryListener = null;
550
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700551 // Clear the Topology-related map state
552 mapTopology.removeEntryListener(mapTopologyListenerId);
553 mapTopology = null;
554 mapTopologyListener = null;
555
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700556 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700557 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800558
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800559 @Override
560 public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
561 if (arpEventHandler != null) {
562 arpEventHandlers.add(arpEventHandler);
563 }
564 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800565
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800566 @Override
567 public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
568 arpEventHandlers.remove(arpEventHandler);
569 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800570
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700571 /**
572 * Get all Flows that are currently in the datagrid.
573 *
574 * @return all Flows that are currently in the datagrid.
575 */
576 @Override
577 public Collection<FlowPath> getAllFlows() {
578 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
579
580 //
581 // Get all current entries
582 //
583 Collection<byte[]> values = mapFlow.values();
584 Kryo kryo = kryoFactory.newKryo();
585 for (byte[] valueBytes : values) {
586 //
587 // Decode the value
588 //
589 Input input = new Input(valueBytes);
590 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
591 allFlows.add(flowPath);
592 }
593 kryoFactory.deleteKryo(kryo);
594
595 return allFlows;
596 }
597
598 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800599 * Get a Flow for a given Flow ID.
600 *
601 * @param flowId the Flow ID of the Flow to get.
602 * @return the Flow if found, otherwise null.
603 */
604 @Override
605 public FlowPath getFlow(FlowId flowId) {
606 byte[] valueBytes = mapFlow.get(flowId.value());
607 if (valueBytes == null)
608 return null;
609
610 Kryo kryo = kryoFactory.newKryo();
611 //
612 // Decode the value
613 //
614 Input input = new Input(valueBytes);
615 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
616 kryoFactory.deleteKryo(kryo);
617
618 return flowPath;
619 }
620
621 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700622 * Send a notification that a Flow is added.
623 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700624 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700625 */
626 @Override
627 public void notificationSendFlowAdded(FlowPath flowPath) {
628 //
629 // Encode the value
630 //
631 byte[] buffer = new byte[MAX_BUFFER_SIZE];
632 Kryo kryo = kryoFactory.newKryo();
633 Output output = new Output(buffer, -1);
634 kryo.writeObject(output, flowPath);
635 byte[] valueBytes = output.toBytes();
636 kryoFactory.deleteKryo(kryo);
637
638 //
639 // Put the entry:
640 // - Key : Flow ID (Long)
641 // - Value : Serialized Flow (byte[])
642 //
643 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
644 }
645
646 /**
647 * Send a notification that a Flow is removed.
648 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700649 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700650 */
651 @Override
652 public void notificationSendFlowRemoved(FlowId flowId) {
653 //
654 // Remove the entry:
655 // - Key : Flow ID (Long)
656 // - Value : Serialized Flow (byte[])
657 //
658 mapFlow.removeAsync(flowId.value());
659 }
660
661 /**
662 * Send a notification that a Flow is updated.
663 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700664 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700665 */
666 @Override
667 public void notificationSendFlowUpdated(FlowPath flowPath) {
668 // NOTE: Adding an entry with an existing key automatically updates it
669 notificationSendFlowAdded(flowPath);
670 }
671
672 /**
673 * Send a notification that all Flows are removed.
674 */
675 @Override
676 public void notificationSendAllFlowsRemoved() {
677 //
678 // Remove all entries
679 // NOTE: We remove the entries one-by-one so the per-entry
680 // notifications will be delivered.
681 //
682 // mapFlow.clear();
683 Set<Long> keySet = mapFlow.keySet();
684 for (Long key : keySet) {
685 mapFlow.removeAsync(key);
686 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700687 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700688
689 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700690 * Get all Flow Entries that are currently in the datagrid.
691 *
692 * @return all Flow Entries that are currently in the datagrid.
693 */
694 @Override
695 public Collection<FlowEntry> getAllFlowEntries() {
696 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
697
698 //
699 // Get all current entries
700 //
701 Collection<byte[]> values = mapFlowEntry.values();
702 Kryo kryo = kryoFactory.newKryo();
703 for (byte[] valueBytes : values) {
704 //
705 // Decode the value
706 //
707 Input input = new Input(valueBytes);
708 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
709 allFlowEntries.add(flowEntry);
710 }
711 kryoFactory.deleteKryo(kryo);
712
713 return allFlowEntries;
714 }
715
716 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800717 * Get a Flow Entry for a given Flow Entry ID.
718 *
719 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
720 * @return the Flow Entry if found, otherwise null.
721 */
722 @Override
723 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
724 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
725 if (valueBytes == null)
726 return null;
727
728 Kryo kryo = kryoFactory.newKryo();
729 //
730 // Decode the value
731 //
732 Input input = new Input(valueBytes);
733 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
734 kryoFactory.deleteKryo(kryo);
735
736 return flowEntry;
737 }
738
739 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700740 * Send a notification that a FlowEntry is added.
741 *
742 * @param flowEntry the FlowEntry that is added.
743 */
744 @Override
745 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
746 //
747 // Encode the value
748 //
749 byte[] buffer = new byte[MAX_BUFFER_SIZE];
750 Kryo kryo = kryoFactory.newKryo();
751 Output output = new Output(buffer, -1);
752 kryo.writeObject(output, flowEntry);
753 byte[] valueBytes = output.toBytes();
754 kryoFactory.deleteKryo(kryo);
755
756 //
757 // Put the entry:
758 // - Key : FlowEntry ID (Long)
759 // - Value : Serialized FlowEntry (byte[])
760 //
761 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
762 }
763
764 /**
765 * Send a notification that a FlowEntry is removed.
766 *
767 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
768 */
769 @Override
770 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
771 //
772 // Remove the entry:
773 // - Key : FlowEntry ID (Long)
774 // - Value : Serialized FlowEntry (byte[])
775 //
776 mapFlowEntry.removeAsync(flowEntryId.value());
777 }
778
779 /**
780 * Send a notification that a FlowEntry is updated.
781 *
782 * @param flowEntry the FlowEntry that is updated.
783 */
784 @Override
785 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
786 // NOTE: Adding an entry with an existing key automatically updates it
787 notificationSendFlowEntryAdded(flowEntry);
788 }
789
790 /**
791 * Send a notification that all Flow Entries are removed.
792 */
793 @Override
794 public void notificationSendAllFlowEntriesRemoved() {
795 //
796 // Remove all entries
797 // NOTE: We remove the entries one-by-one so the per-entry
798 // notifications will be delivered.
799 //
800 // mapFlowEntry.clear();
801 Set<Long> keySet = mapFlowEntry.keySet();
802 for (Long key : keySet) {
803 mapFlowEntry.removeAsync(key);
804 }
805 }
806
807 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700808 * Get all Topology Elements that are currently in the datagrid.
809 *
810 * @return all Topology Elements that are currently in the datagrid.
811 */
812 @Override
813 public Collection<TopologyElement> getAllTopologyElements() {
814 Collection<TopologyElement> allTopologyElements =
815 new LinkedList<TopologyElement>();
816
817 //
818 // Get all current entries
819 //
820 Collection<byte[]> values = mapTopology.values();
821 Kryo kryo = kryoFactory.newKryo();
822 for (byte[] valueBytes : values) {
823 //
824 // Decode the value
825 //
826 Input input = new Input(valueBytes);
827 TopologyElement topologyElement =
828 kryo.readObject(input, TopologyElement.class);
829 allTopologyElements.add(topologyElement);
830 }
831 kryoFactory.deleteKryo(kryo);
832
833 return allTopologyElements;
834 }
835
836 /**
837 * Send a notification that a Topology Element is added.
838 *
839 * @param topologyElement the Topology Element that is added.
840 */
841 @Override
842 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
843 //
844 // Encode the value
845 //
846 byte[] buffer = new byte[MAX_BUFFER_SIZE];
847 Kryo kryo = kryoFactory.newKryo();
848 Output output = new Output(buffer, -1);
849 kryo.writeObject(output, topologyElement);
850 byte[] valueBytes = output.toBytes();
851 kryoFactory.deleteKryo(kryo);
852
853 //
854 // Put the entry:
855 // - Key : TopologyElement ID (String)
856 // - Value : Serialized TopologyElement (byte[])
857 //
858 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
859 }
860
861 /**
862 * Send a notification that a Topology Element is removed.
863 *
864 * @param topologyElement the Topology Element that is removed.
865 */
866 @Override
867 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
868 //
869 // Remove the entry:
870 // - Key : TopologyElement ID (String)
871 // - Value : Serialized TopologyElement (byte[])
872 //
873 mapTopology.removeAsync(topologyElement.elementId());
874 }
875
876 /**
877 * Send a notification that a Topology Element is updated.
878 *
879 * @param topologyElement the Topology Element that is updated.
880 */
881 @Override
882 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
883 // NOTE: Adding an entry with an existing key automatically updates it
884 notificationSendTopologyElementAdded(topologyElement);
885 }
886
887 /**
888 * Send a notification that all Topology Elements are removed.
889 */
890 @Override
891 public void notificationSendAllTopologyElementsRemoved() {
892 //
893 // Remove all entries
894 // NOTE: We remove the entries one-by-one so the per-entry
895 // notifications will be delivered.
896 //
897 // mapTopology.clear();
898 Set<String> keySet = mapTopology.keySet();
899 for (String key : keySet) {
900 mapTopology.removeAsync(key);
901 }
902 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800903
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800904 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800905 public void sendArpRequest(ArpMessage arpMessage) {
906 //log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
907 arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800908 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700909}