blob: 632cc38f608a04aa84bd30c7c6610c293a83f7da [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hartd3003252013-11-15 09:44:46 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080022import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070023import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080024import net.onrc.onos.ofcontroller.util.Dpid;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070025import net.onrc.onos.ofcontroller.util.FlowEntry;
26import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070027import net.onrc.onos.ofcontroller.util.FlowId;
28import net.onrc.onos.ofcontroller.util.FlowPath;
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -080029import net.onrc.onos.ofcontroller.util.Pair;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070030import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
31
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070032import org.slf4j.Logger;
33import org.slf4j.LoggerFactory;
34
Jonathan Hart18ad55c2013-11-11 22:49:55 -080035import com.esotericsoftware.kryo2.Kryo;
36import com.esotericsoftware.kryo2.io.Input;
37import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070038import com.hazelcast.config.Config;
39import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070040import com.hazelcast.core.EntryEvent;
41import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070042import com.hazelcast.core.Hazelcast;
43import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070044import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070045import com.hazelcast.instance.GroupProperties;
46
47/**
48 * A datagrid service that uses Hazelcast as a datagrid.
49 * The relevant data is stored in the Hazelcast datagrid and shared as
50 * appropriate in a multi-node cluster.
51 */
52public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070053 private final static int MAX_BUFFER_SIZE = 64*1024;
54
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070055 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070056 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070057 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070058
59 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070060 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070061 private Config hazelcastConfig = null;
62
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070063 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070064 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070065
66 // State related to the Flow map
67 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070068 private IMap<Long, byte[]> mapFlow = null;
69 private MapFlowListener mapFlowListener = null;
70 private String mapFlowListenerId = null;
71
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070072 // State related to the Flow Entry map
73 protected static final String mapFlowEntryName = "mapFlowEntry";
74 private IMap<Long, byte[]> mapFlowEntry = null;
75 private MapFlowEntryListener mapFlowEntryListener = null;
76 private String mapFlowEntryListenerId = null;
77
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -080078 // State related to the Flow ID map
79 protected static final String mapFlowIdName = "mapFlowId";
80 private IMap<Long, byte[]> mapFlowId = null;
81 private MapFlowIdListener mapFlowIdListener = null;
82 private String mapFlowIdListenerId = null;
83
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080084 // State related to the Flow Entry ID map
85 protected static final String mapFlowEntryIdName = "mapFlowEntryId";
86 private IMap<Long, byte[]> mapFlowEntryId = null;
87 private MapFlowEntryIdListener mapFlowEntryIdListener = null;
88 private String mapFlowEntryIdListenerId = null;
89
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070090 // State related to the Network Topology map
91 protected static final String mapTopologyName = "mapTopology";
92 private IMap<String, byte[]> mapTopology = null;
93 private MapTopologyListener mapTopologyListener = null;
94 private String mapTopologyListenerId = null;
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -080095
Jonathan Hart18ad55c2013-11-11 22:49:55 -080096 // State related to the ARP map
97 protected static final String arpMapName = "arpMap";
Jonathan Hartd3003252013-11-15 09:44:46 -080098 private IMap<ArpMessage, byte[]> arpMap = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080099 private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
100 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700101
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700102 /**
103 * Class for receiving notifications for Flow state.
104 *
105 * The datagrid map is:
106 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -0800107 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700108 */
109 class MapFlowListener implements EntryListener<Long, byte[]> {
110 /**
111 * Receive a notification that an entry is added.
112 *
113 * @param event the notification event for the entry.
114 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800115 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800116 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800117 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700118
119 //
120 // Decode the value and deliver the notification
121 //
122 Kryo kryo = kryoFactory.newKryo();
123 Input input = new Input(valueBytes);
124 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
125 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700126 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700127 }
128
129 /**
130 * Receive a notification that an entry is removed.
131 *
132 * @param event the notification event for the entry.
133 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800134 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800135 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800136 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700137
138 //
139 // Decode the value and deliver the notification
140 //
141 Kryo kryo = kryoFactory.newKryo();
142 Input input = new Input(valueBytes);
143 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
144 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700145 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700146 }
147
148 /**
149 * Receive a notification that an entry is updated.
150 *
151 * @param event the notification event for the entry.
152 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800153 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800154 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800155 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700156
157 //
158 // Decode the value and deliver the notification
159 //
160 Kryo kryo = kryoFactory.newKryo();
161 Input input = new Input(valueBytes);
162 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
163 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700164 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700165 }
166
167 /**
168 * Receive a notification that an entry is evicted.
169 *
170 * @param event the notification event for the entry.
171 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800172 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800173 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700174 // NOTE: We don't use eviction for this map
175 }
176 }
177
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700178 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700179 * Class for receiving notifications for FlowEntry state.
180 *
181 * The datagrid map is:
182 * - Key : FlowEntry ID (Long)
183 * - Value : Serialized FlowEntry (byte[])
184 */
185 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
186 /**
187 * Receive a notification that an entry is added.
188 *
189 * @param event the notification event for the entry.
190 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800191 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800192 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800193 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700194
195 //
196 // Decode the value and deliver the notification
197 //
198 Kryo kryo = kryoFactory.newKryo();
199 Input input = new Input(valueBytes);
200 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
201 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700202 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700203 }
204
205 /**
206 * Receive a notification that an entry is removed.
207 *
208 * @param event the notification event for the entry.
209 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800210 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800211 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800212 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700213
214 //
215 // Decode the value and deliver the notification
216 //
217 Kryo kryo = kryoFactory.newKryo();
218 Input input = new Input(valueBytes);
219 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
220 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700221 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700222 }
223
224 /**
225 * Receive a notification that an entry is updated.
226 *
227 * @param event the notification event for the entry.
228 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800229 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800230 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800231 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700232
233 //
234 // Decode the value and deliver the notification
235 //
236 Kryo kryo = kryoFactory.newKryo();
237 Input input = new Input(valueBytes);
238 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
239 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700240 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700241 }
242
243 /**
244 * Receive a notification that an entry is evicted.
245 *
246 * @param event the notification event for the entry.
247 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800248 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800249 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700250 // NOTE: We don't use eviction for this map
251 }
252 }
253
254 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800255 * Class for receiving notifications for FlowId state.
256 *
257 * The datagrid map is:
258 * - Key : FlowId (Long)
259 * - Value : Serialized FlowId (byte[])
260 */
261 class MapFlowIdListener implements EntryListener<Long, byte[]> {
262 /**
263 * Receive a notification that an entry is added.
264 *
265 * @param event the notification event for the entry.
266 */
267 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800268 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800269
270 //
271 // Decode the value and deliver the notification
272 //
273 Kryo kryo = kryoFactory.newKryo();
274 Input input = new Input(valueBytes);
275 FlowId flowId = kryo.readObject(input, FlowId.class);
276 kryoFactory.deleteKryo(kryo);
277 flowEventHandlerService.notificationRecvFlowIdAdded(flowId);
278 }
279
280 /**
281 * Receive a notification that an entry is removed.
282 *
283 * @param event the notification event for the entry.
284 */
285 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800286 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800287
288 //
289 // Decode the value and deliver the notification
290 //
291 Kryo kryo = kryoFactory.newKryo();
292 Input input = new Input(valueBytes);
293 FlowId flowId = kryo.readObject(input, FlowId.class);
294 kryoFactory.deleteKryo(kryo);
295 flowEventHandlerService.notificationRecvFlowIdRemoved(flowId);
296 }
297
298 /**
299 * Receive a notification that an entry is updated.
300 *
301 * @param event the notification event for the entry.
302 */
303 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800304 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800305
306 //
307 // Decode the value and deliver the notification
308 //
309 Kryo kryo = kryoFactory.newKryo();
310 Input input = new Input(valueBytes);
311 FlowId flowId = kryo.readObject(input, FlowId.class);
312 kryoFactory.deleteKryo(kryo);
313 flowEventHandlerService.notificationRecvFlowIdUpdated(flowId);
314 }
315
316 /**
317 * Receive a notification that an entry is evicted.
318 *
319 * @param event the notification event for the entry.
320 */
321 public void entryEvicted(EntryEvent<Long, byte[]> event) {
322 // NOTE: We don't use eviction for this map
323 }
324 }
325
326 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800327 * Class for receiving notifications for FlowEntryId state.
328 *
329 * The datagrid map is:
330 * - Key : FlowEntryId (Long)
331 * - Value : Serialized Switch Dpid (byte[])
332 */
333 class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
334 /**
335 * Receive a notification that an entry is added.
336 *
337 * @param event the notification event for the entry.
338 */
339 public void entryAdded(EntryEvent<Long, byte[]> event) {
340 Long keyLong = event.getKey();
341 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
342
343 byte[] valueBytes = event.getValue();
344
345 //
346 // Decode the value and deliver the notification
347 //
348 Kryo kryo = kryoFactory.newKryo();
349 Input input = new Input(valueBytes);
350 Dpid dpid = kryo.readObject(input, Dpid.class);
351 kryoFactory.deleteKryo(kryo);
352 flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
353 }
354
355 /**
356 * Receive a notification that an entry is removed.
357 *
358 * @param event the notification event for the entry.
359 */
360 public void entryRemoved(EntryEvent<Long, byte[]> event) {
361 Long keyLong = event.getKey();
362 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
363
364 byte[] valueBytes = event.getValue();
365
366 //
367 // Decode the value and deliver the notification
368 //
369 Kryo kryo = kryoFactory.newKryo();
370 Input input = new Input(valueBytes);
371 Dpid dpid = kryo.readObject(input, Dpid.class);
372 kryoFactory.deleteKryo(kryo);
373 flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
374 }
375
376 /**
377 * Receive a notification that an entry is updated.
378 *
379 * @param event the notification event for the entry.
380 */
381 public void entryUpdated(EntryEvent<Long, byte[]> event) {
382 Long keyLong = event.getKey();
383 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
384
385 byte[] valueBytes = event.getValue();
386
387 //
388 // Decode the value and deliver the notification
389 //
390 Kryo kryo = kryoFactory.newKryo();
391 Input input = new Input(valueBytes);
392 Dpid dpid = kryo.readObject(input, Dpid.class);
393 kryoFactory.deleteKryo(kryo);
394 flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
395 }
396
397 /**
398 * Receive a notification that an entry is evicted.
399 *
400 * @param event the notification event for the entry.
401 */
402 public void entryEvicted(EntryEvent<Long, byte[]> event) {
403 // NOTE: We don't use eviction for this map
404 }
405 }
406
407 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700408 * Class for receiving notifications for Network Topology state.
409 *
410 * The datagrid map is:
411 * - Key: TopologyElement ID (String)
412 * - Value: Serialized TopologyElement (byte[])
413 */
414 class MapTopologyListener implements EntryListener<String, byte[]> {
415 /**
416 * Receive a notification that an entry is added.
417 *
418 * @param event the notification event for the entry.
419 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800420 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800421 public void entryAdded(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800422 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700423
424 //
425 // Decode the value and deliver the notification
426 //
427 Kryo kryo = kryoFactory.newKryo();
428 Input input = new Input(valueBytes);
429 TopologyElement topologyElement =
430 kryo.readObject(input, TopologyElement.class);
431 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700432 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700433 }
434
435 /**
436 * Receive a notification that an entry is removed.
437 *
438 * @param event the notification event for the entry.
439 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800440 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800441 public void entryRemoved(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800442 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700443
444 //
445 // Decode the value and deliver the notification
446 //
447 Kryo kryo = kryoFactory.newKryo();
448 Input input = new Input(valueBytes);
449 TopologyElement topologyElement =
450 kryo.readObject(input, TopologyElement.class);
451 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700452 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700453 }
454
455 /**
456 * Receive a notification that an entry is updated.
457 *
458 * @param event the notification event for the entry.
459 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800460 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800461 public void entryUpdated(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800462 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700463
464 //
465 // Decode the value and deliver the notification
466 //
467 Kryo kryo = kryoFactory.newKryo();
468 Input input = new Input(valueBytes);
469 TopologyElement topologyElement =
470 kryo.readObject(input, TopologyElement.class);
471 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700472 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700473 }
474
475 /**
476 * Receive a notification that an entry is evicted.
477 *
478 * @param event the notification event for the entry.
479 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800480 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800481 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700482 // NOTE: We don't use eviction for this map
483 }
484 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800485
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800486 /**
487 * Class for receiving notifications for ARP requests.
488 *
489 * The datagrid map is:
490 * - Key: Request ID (String)
491 * - Value: ARP request packet (byte[])
492 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800493 class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800494 /**
495 * Receive a notification that an entry is added.
496 *
497 * @param event the notification event for the entry.
498 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800499 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800500 public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800501 for (IArpEventHandler arpEventHandler : arpEventHandlers) {
502 arpEventHandler.arpRequestNotification(event.getKey());
503 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800504
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800505 //
506 // Decode the value and deliver the notification
507 //
508 /*
509 Kryo kryo = kryoFactory.newKryo();
510 Input input = new Input(valueBytes);
511 TopologyElement topologyElement =
512 kryo.readObject(input, TopologyElement.class);
513 kryoFactory.deleteKryo(kryo);
514 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
515 */
516 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800517
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800518 /**
519 * Receive a notification that an entry is removed.
520 *
521 * @param event the notification event for the entry.
522 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800523 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800524 public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800525 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800526 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800527
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800528 /**
529 * Receive a notification that an entry is updated.
530 *
531 * @param event the notification event for the entry.
532 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800533 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800534 public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800535 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800536 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800537
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800538 /**
539 * Receive a notification that an entry is evicted.
540 *
541 * @param event the notification event for the entry.
542 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800543 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800544 public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800545 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800546 }
547 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700548
549 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700550 * Initialize the Hazelcast Datagrid operation.
551 *
552 * @param conf the configuration filename.
553 */
554 public void init(String configFilename) {
555 /*
556 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
557 System.setProperty("hazelcast.socket.send.buffer.size", "32");
558 */
559 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800560
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700561 // Init from configuration file
562 try {
563 hazelcastConfig = new FileSystemXmlConfig(configFilename);
564 } catch (FileNotFoundException e) {
565 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
566 }
567 /*
568 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
569 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
570 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
571 */
572 //
573 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
574 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
575 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
576 }
577
578 /**
579 * Shutdown the Hazelcast Datagrid operation.
580 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800581 @Override
582 protected void finalize() {
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700583 close();
584 }
585
586 /**
587 * Shutdown the Hazelcast Datagrid operation.
588 */
589 public void close() {
590 Hazelcast.shutdownAll();
591 }
592
593 /**
594 * Get the collection of offered module services.
595 *
596 * @return the collection of offered module services.
597 */
598 @Override
599 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800600 Collection<Class<? extends IFloodlightService>> l =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700601 new ArrayList<Class<? extends IFloodlightService>>();
602 l.add(IDatagridService.class);
603 return l;
604 }
605
606 /**
607 * Get the collection of implemented services.
608 *
609 * @return the collection of implemented services.
610 */
611 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800612 public Map<Class<? extends IFloodlightService>, IFloodlightService>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700613 getServiceImpls() {
614 Map<Class<? extends IFloodlightService>,
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800615 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700616 new HashMap<Class<? extends IFloodlightService>,
617 IFloodlightService>();
618 m.put(IDatagridService.class, this);
619 return m;
620 }
621
622 /**
623 * Get the collection of modules this module depends on.
624 *
625 * @return the collection of modules this module depends on.
626 */
627 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800628 public Collection<Class<? extends IFloodlightService>>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700629 getModuleDependencies() {
630 Collection<Class<? extends IFloodlightService>> l =
631 new ArrayList<Class<? extends IFloodlightService>>();
632 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700633 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700634 return l;
635 }
636
637 /**
638 * Initialize the module.
639 *
640 * @param context the module context to use for the initialization.
641 */
642 @Override
643 public void init(FloodlightModuleContext context)
644 throws FloodlightModuleException {
645 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700646 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700647
648 // Get the configuration file name and configure the Datagrid
649 Map<String, String> configMap = context.getConfigParams(this);
650 String configFilename = configMap.get(HazelcastConfigFile);
651 this.init(configFilename);
652 }
653
654 /**
655 * Startup module operation.
656 *
657 * @param context the module context to use for the startup.
658 */
659 @Override
660 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700661 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700662
663 restApi.addRestletRoutable(new DatagridWebRoutable());
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800664
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800665 arpMap = hazelcastInstance.getMap(arpMapName);
666 arpMap.addEntryListener(new ArpMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700667 }
668
669 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700670 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700671 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700672 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700673 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700674 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700675 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700676 */
677 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700678 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
679 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700680
681 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700682 mapFlowListener = new MapFlowListener();
683 mapFlow = hazelcastInstance.getMap(mapFlowName);
684 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700685
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700686 // Initialize the FlowEntry-related map state
687 mapFlowEntryListener = new MapFlowEntryListener();
688 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
689 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
690
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800691 // Initialize the FlowId-related map state
692 mapFlowIdListener = new MapFlowIdListener();
693 mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
694 mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
695
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800696 // Initialize the FlowEntryId-related map state
697 mapFlowEntryIdListener = new MapFlowEntryIdListener();
698 mapFlowEntryId = hazelcastInstance.getMap(mapFlowEntryIdName);
699 mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);
700
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700701 // Initialize the Topology-related map state
702 mapTopologyListener = new MapTopologyListener();
703 mapTopology = hazelcastInstance.getMap(mapTopologyName);
704 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700705 }
706
707 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700708 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700709 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700710 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700711 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700712 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700713 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700714 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700715 */
716 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700717 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700718 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700719 mapFlow.removeEntryListener(mapFlowListenerId);
720 mapFlow = null;
721 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700722
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700723 // Clear the FlowEntry-related map state
724 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
725 mapFlowEntry = null;
726 mapFlowEntryListener = null;
727
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800728 // Clear the FlowId-related map state
729 mapFlowId.removeEntryListener(mapFlowIdListenerId);
730 mapFlowId = null;
731 mapFlowIdListener = null;
732
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800733 // Clear the FlowEntryId-related map state
734 mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
735 mapFlowEntryId = null;
736 mapFlowEntryIdListener = null;
737
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700738 // Clear the Topology-related map state
739 mapTopology.removeEntryListener(mapTopologyListenerId);
740 mapTopology = null;
741 mapTopologyListener = null;
742
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700743 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700744 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800745
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800746 @Override
747 public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
748 if (arpEventHandler != null) {
749 arpEventHandlers.add(arpEventHandler);
750 }
751 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800752
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800753 @Override
754 public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
755 arpEventHandlers.remove(arpEventHandler);
756 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800757
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700758 /**
759 * Get all Flows that are currently in the datagrid.
760 *
761 * @return all Flows that are currently in the datagrid.
762 */
763 @Override
764 public Collection<FlowPath> getAllFlows() {
765 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
766
767 //
768 // Get all current entries
769 //
770 Collection<byte[]> values = mapFlow.values();
771 Kryo kryo = kryoFactory.newKryo();
772 for (byte[] valueBytes : values) {
773 //
774 // Decode the value
775 //
776 Input input = new Input(valueBytes);
777 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
778 allFlows.add(flowPath);
779 }
780 kryoFactory.deleteKryo(kryo);
781
782 return allFlows;
783 }
784
785 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800786 * Get a Flow for a given Flow ID.
787 *
788 * @param flowId the Flow ID of the Flow to get.
789 * @return the Flow if found, otherwise null.
790 */
791 @Override
792 public FlowPath getFlow(FlowId flowId) {
793 byte[] valueBytes = mapFlow.get(flowId.value());
794 if (valueBytes == null)
795 return null;
796
797 Kryo kryo = kryoFactory.newKryo();
798 //
799 // Decode the value
800 //
801 Input input = new Input(valueBytes);
802 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
803 kryoFactory.deleteKryo(kryo);
804
805 return flowPath;
806 }
807
808 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700809 * Send a notification that a Flow is added.
810 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700811 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700812 */
813 @Override
814 public void notificationSendFlowAdded(FlowPath flowPath) {
815 //
816 // Encode the value
817 //
818 byte[] buffer = new byte[MAX_BUFFER_SIZE];
819 Kryo kryo = kryoFactory.newKryo();
820 Output output = new Output(buffer, -1);
821 kryo.writeObject(output, flowPath);
822 byte[] valueBytes = output.toBytes();
823 kryoFactory.deleteKryo(kryo);
824
825 //
826 // Put the entry:
827 // - Key : Flow ID (Long)
828 // - Value : Serialized Flow (byte[])
829 //
830 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
831 }
832
833 /**
834 * Send a notification that a Flow is removed.
835 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700836 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700837 */
838 @Override
839 public void notificationSendFlowRemoved(FlowId flowId) {
840 //
841 // Remove the entry:
842 // - Key : Flow ID (Long)
843 // - Value : Serialized Flow (byte[])
844 //
845 mapFlow.removeAsync(flowId.value());
846 }
847
848 /**
849 * Send a notification that a Flow is updated.
850 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700851 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700852 */
853 @Override
854 public void notificationSendFlowUpdated(FlowPath flowPath) {
855 // NOTE: Adding an entry with an existing key automatically updates it
856 notificationSendFlowAdded(flowPath);
857 }
858
859 /**
860 * Send a notification that all Flows are removed.
861 */
862 @Override
863 public void notificationSendAllFlowsRemoved() {
864 //
865 // Remove all entries
866 // NOTE: We remove the entries one-by-one so the per-entry
867 // notifications will be delivered.
868 //
869 // mapFlow.clear();
870 Set<Long> keySet = mapFlow.keySet();
871 for (Long key : keySet) {
872 mapFlow.removeAsync(key);
873 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700874 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700875
876 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700877 * Get all Flow Entries that are currently in the datagrid.
878 *
879 * @return all Flow Entries that are currently in the datagrid.
880 */
881 @Override
882 public Collection<FlowEntry> getAllFlowEntries() {
883 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
884
885 //
886 // Get all current entries
887 //
888 Collection<byte[]> values = mapFlowEntry.values();
889 Kryo kryo = kryoFactory.newKryo();
890 for (byte[] valueBytes : values) {
891 //
892 // Decode the value
893 //
894 Input input = new Input(valueBytes);
895 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
896 allFlowEntries.add(flowEntry);
897 }
898 kryoFactory.deleteKryo(kryo);
899
900 return allFlowEntries;
901 }
902
903 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800904 * Get a Flow Entry for a given Flow Entry ID.
905 *
906 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
907 * @return the Flow Entry if found, otherwise null.
908 */
909 @Override
910 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
911 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
912 if (valueBytes == null)
913 return null;
914
915 Kryo kryo = kryoFactory.newKryo();
916 //
917 // Decode the value
918 //
919 Input input = new Input(valueBytes);
920 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
921 kryoFactory.deleteKryo(kryo);
922
923 return flowEntry;
924 }
925
926 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700927 * Send a notification that a FlowEntry is added.
928 *
929 * @param flowEntry the FlowEntry that is added.
930 */
931 @Override
932 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
933 //
934 // Encode the value
935 //
936 byte[] buffer = new byte[MAX_BUFFER_SIZE];
937 Kryo kryo = kryoFactory.newKryo();
938 Output output = new Output(buffer, -1);
939 kryo.writeObject(output, flowEntry);
940 byte[] valueBytes = output.toBytes();
941 kryoFactory.deleteKryo(kryo);
942
943 //
944 // Put the entry:
945 // - Key : FlowEntry ID (Long)
946 // - Value : Serialized FlowEntry (byte[])
947 //
948 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
949 }
950
951 /**
952 * Send a notification that a FlowEntry is removed.
953 *
954 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
955 */
956 @Override
957 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
958 //
959 // Remove the entry:
960 // - Key : FlowEntry ID (Long)
961 // - Value : Serialized FlowEntry (byte[])
962 //
963 mapFlowEntry.removeAsync(flowEntryId.value());
964 }
965
966 /**
967 * Send a notification that a FlowEntry is updated.
968 *
969 * @param flowEntry the FlowEntry that is updated.
970 */
971 @Override
972 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
973 // NOTE: Adding an entry with an existing key automatically updates it
974 notificationSendFlowEntryAdded(flowEntry);
975 }
976
977 /**
978 * Send a notification that all Flow Entries are removed.
979 */
980 @Override
981 public void notificationSendAllFlowEntriesRemoved() {
982 //
983 // Remove all entries
984 // NOTE: We remove the entries one-by-one so the per-entry
985 // notifications will be delivered.
986 //
987 // mapFlowEntry.clear();
988 Set<Long> keySet = mapFlowEntry.keySet();
989 for (Long key : keySet) {
990 mapFlowEntry.removeAsync(key);
991 }
992 }
993
994 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800995 * Get all Flow IDs that are currently in the datagrid.
996 *
997 * @return all Flow IDs that are currently in the datagrid.
998 */
999 @Override
1000 public Collection<FlowId> getAllFlowIds() {
1001 Collection<FlowId> allFlowIds = new LinkedList<FlowId>();
1002
1003 //
1004 // Get all current entries
1005 //
1006 Collection<byte[]> values = mapFlowId.values();
1007 Kryo kryo = kryoFactory.newKryo();
1008 for (byte[] valueBytes : values) {
1009 //
1010 // Decode the value
1011 //
1012 Input input = new Input(valueBytes);
1013 FlowId flowId = kryo.readObject(input, FlowId.class);
1014 allFlowIds.add(flowId);
1015 }
1016 kryoFactory.deleteKryo(kryo);
1017
1018 return allFlowIds;
1019 }
1020
1021 /**
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001022 * Get all Flow Entry IDs that are currently in the datagrid.
1023 *
1024 * @return all Flow Entry IDs that ae currently in the datagrid.
1025 */
1026 @Override
1027 public Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds() {
1028 Collection<Pair<FlowEntryId, Dpid>> allFlowEntryIds =
1029 new LinkedList<Pair<FlowEntryId, Dpid>>();
1030
1031 //
1032 // Get all current entries
1033 //
1034 Kryo kryo = kryoFactory.newKryo();
1035 for (Map.Entry<Long, byte[]> entry : mapFlowEntryId.entrySet()) {
1036 Long key = entry.getKey();
1037 byte[] valueBytes = entry.getValue();
1038
1039 FlowEntryId flowEntryId = new FlowEntryId(key);
1040
1041 //
1042 // Decode the value
1043 //
1044 Input input = new Input(valueBytes);
1045 Dpid dpid = kryo.readObject(input, Dpid.class);
1046
1047 Pair<FlowEntryId, Dpid> pair = new Pair(flowEntryId, dpid);
1048 allFlowEntryIds.add(pair);
1049 }
1050 kryoFactory.deleteKryo(kryo);
1051
1052 return allFlowEntryIds;
1053 }
1054
1055 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001056 * Send a notification that a FlowId is added.
1057 *
1058 * @param flowId the FlowId that is added.
1059 */
1060 @Override
1061 public void notificationSendFlowIdAdded(FlowId flowId) {
1062 //
1063 // Encode the value
1064 //
1065 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1066 Kryo kryo = kryoFactory.newKryo();
1067 Output output = new Output(buffer, -1);
1068 kryo.writeObject(output, flowId);
1069 byte[] valueBytes = output.toBytes();
1070 kryoFactory.deleteKryo(kryo);
1071
1072 //
1073 // Put the entry:
1074 // - Key : FlowId (Long)
1075 // - Value : Serialized FlowId (byte[])
1076 //
1077 mapFlowId.putAsync(flowId.value(), valueBytes);
1078 }
1079
1080 /**
1081 * Send a notification that a FlowId is removed.
1082 *
1083 * @param flowId the FlowId that is removed.
1084 */
1085 @Override
1086 public void notificationSendFlowIdRemoved(FlowId flowId) {
1087 //
1088 // Remove the entry:
1089 // - Key : FlowId (Long)
1090 // - Value : Serialized FlowId (byte[])
1091 //
1092 mapFlowId.removeAsync(flowId.value());
1093 }
1094
1095 /**
1096 * Send a notification that a FlowId is updated.
1097 *
1098 * @param flowId the FlowId that is updated.
1099 */
1100 @Override
1101 public void notificationSendFlowIdUpdated(FlowId flowId) {
1102 // NOTE: Adding an entry with an existing key automatically updates it
1103 notificationSendFlowIdAdded(flowId);
1104 }
1105
1106 /**
1107 * Send a notification that all Flow IDs are removed.
1108 */
1109 @Override
1110 public void notificationSendAllFlowIdsRemoved() {
1111 //
1112 // Remove all entries
1113 // NOTE: We remove the entries one-by-one so the per-entry
1114 // notifications will be delivered.
1115 //
1116 // mapFlowId.clear();
1117 Set<Long> keySet = mapFlowId.keySet();
1118 for (Long key : keySet) {
1119 mapFlowId.removeAsync(key);
1120 }
1121 }
1122
1123 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001124 * Send a notification that a FlowEntryId is added.
1125 *
1126 * @param flowEntryId the FlowEntryId that is added.
1127 * @param dpid the Switch Dpid.
1128 */
1129 @Override
1130 public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
1131 Dpid dpid) {
1132 //
1133 // Encode the value
1134 //
1135 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1136 Kryo kryo = kryoFactory.newKryo();
1137 Output output = new Output(buffer, -1);
1138 kryo.writeObject(output, dpid);
1139 byte[] valueBytes = output.toBytes();
1140 kryoFactory.deleteKryo(kryo);
1141
1142 //
1143 // Put the entry:
1144 // - Key : FlowEntryId (Long)
1145 // - Value : Serialized Switch Dpid (byte[])
1146 //
1147 mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
1148 }
1149
1150 /**
1151 * Send a notification that a FlowEntryId is removed.
1152 *
1153 * @param flowEntryId the FlowEntryId that is removed.
1154 */
1155 @Override
1156 public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
1157 //
1158 // Remove the entry:
1159 // - Key : FlowEntryId (Long)
1160 // - Value : Serialized Dpid (byte[])
1161 //
1162 mapFlowEntryId.removeAsync(flowEntryId.value());
1163 }
1164
1165 /**
1166 * Send a notification that a FlowEntryId is updated.
1167 *
1168 * @param flowEntryId the FlowEntryId that is updated.
1169 * @param dpid the Switch Dpid.
1170 */
1171 @Override
1172 public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
1173 Dpid dpid) {
1174 // NOTE: Adding an entry with an existing key automatically updates it
1175 notificationSendFlowEntryIdAdded(flowEntryId, dpid);
1176 }
1177
1178 /**
1179 * Send a notification that all Flow Entry IDs are removed.
1180 */
1181 @Override
1182 public void notificationSendAllFlowEntryIdsRemoved() {
1183 //
1184 // Remove all entries
1185 // NOTE: We remove the entries one-by-one so the per-entry
1186 // notifications will be delivered.
1187 //
1188 // mapFlowEntryId.clear();
1189 Set<Long> keySet = mapFlowEntryId.keySet();
1190 for (Long key : keySet) {
1191 mapFlowEntryId.removeAsync(key);
1192 }
1193 }
1194
1195 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001196 * Get all Topology Elements that are currently in the datagrid.
1197 *
1198 * @return all Topology Elements that are currently in the datagrid.
1199 */
1200 @Override
1201 public Collection<TopologyElement> getAllTopologyElements() {
1202 Collection<TopologyElement> allTopologyElements =
1203 new LinkedList<TopologyElement>();
1204
1205 //
1206 // Get all current entries
1207 //
1208 Collection<byte[]> values = mapTopology.values();
1209 Kryo kryo = kryoFactory.newKryo();
1210 for (byte[] valueBytes : values) {
1211 //
1212 // Decode the value
1213 //
1214 Input input = new Input(valueBytes);
1215 TopologyElement topologyElement =
1216 kryo.readObject(input, TopologyElement.class);
1217 allTopologyElements.add(topologyElement);
1218 }
1219 kryoFactory.deleteKryo(kryo);
1220
1221 return allTopologyElements;
1222 }
1223
1224 /**
1225 * Send a notification that a Topology Element is added.
1226 *
1227 * @param topologyElement the Topology Element that is added.
1228 */
1229 @Override
1230 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
1231 //
1232 // Encode the value
1233 //
1234 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1235 Kryo kryo = kryoFactory.newKryo();
1236 Output output = new Output(buffer, -1);
1237 kryo.writeObject(output, topologyElement);
1238 byte[] valueBytes = output.toBytes();
1239 kryoFactory.deleteKryo(kryo);
1240
1241 //
1242 // Put the entry:
1243 // - Key : TopologyElement ID (String)
1244 // - Value : Serialized TopologyElement (byte[])
1245 //
1246 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
1247 }
1248
1249 /**
1250 * Send a notification that a Topology Element is removed.
1251 *
1252 * @param topologyElement the Topology Element that is removed.
1253 */
1254 @Override
1255 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
1256 //
1257 // Remove the entry:
1258 // - Key : TopologyElement ID (String)
1259 // - Value : Serialized TopologyElement (byte[])
1260 //
1261 mapTopology.removeAsync(topologyElement.elementId());
1262 }
1263
1264 /**
1265 * Send a notification that a Topology Element is updated.
1266 *
1267 * @param topologyElement the Topology Element that is updated.
1268 */
1269 @Override
1270 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
1271 // NOTE: Adding an entry with an existing key automatically updates it
1272 notificationSendTopologyElementAdded(topologyElement);
1273 }
1274
1275 /**
1276 * Send a notification that all Topology Elements are removed.
1277 */
1278 @Override
1279 public void notificationSendAllTopologyElementsRemoved() {
1280 //
1281 // Remove all entries
1282 // NOTE: We remove the entries one-by-one so the per-entry
1283 // notifications will be delivered.
1284 //
1285 // mapTopology.clear();
1286 Set<String> keySet = mapTopology.keySet();
1287 for (String key : keySet) {
1288 mapTopology.removeAsync(key);
1289 }
1290 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -08001291
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001292 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -08001293 public void sendArpRequest(ArpMessage arpMessage) {
1294 //log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
1295 arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001296 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001297}