blob: e54dfc514dd325e18422c4b1673b97cee77c76a9 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hartd3003252013-11-15 09:44:46 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080022import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070023import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080024import net.onrc.onos.ofcontroller.util.Dpid;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070025import net.onrc.onos.ofcontroller.util.FlowEntry;
26import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070027import net.onrc.onos.ofcontroller.util.FlowId;
28import net.onrc.onos.ofcontroller.util.FlowPath;
29import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
30
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070031import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
Jonathan Hart18ad55c2013-11-11 22:49:55 -080034import com.esotericsoftware.kryo2.Kryo;
35import com.esotericsoftware.kryo2.io.Input;
36import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070037import com.hazelcast.config.Config;
38import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070039import com.hazelcast.core.EntryEvent;
40import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070041import com.hazelcast.core.Hazelcast;
42import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070043import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070044import com.hazelcast.instance.GroupProperties;
45
46/**
47 * A datagrid service that uses Hazelcast as a datagrid.
48 * The relevant data is stored in the Hazelcast datagrid and shared as
49 * appropriate in a multi-node cluster.
50 */
51public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070052 private final static int MAX_BUFFER_SIZE = 64*1024;
53
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070054 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070055 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070056 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070057
58 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070059 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070060 private Config hazelcastConfig = null;
61
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070062 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070063 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070064
65 // State related to the Flow map
66 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070067 private IMap<Long, byte[]> mapFlow = null;
68 private MapFlowListener mapFlowListener = null;
69 private String mapFlowListenerId = null;
70
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070071 // State related to the Flow Entry map
72 protected static final String mapFlowEntryName = "mapFlowEntry";
73 private IMap<Long, byte[]> mapFlowEntry = null;
74 private MapFlowEntryListener mapFlowEntryListener = null;
75 private String mapFlowEntryListenerId = null;
76
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -080077 // State related to the Flow ID map
78 protected static final String mapFlowIdName = "mapFlowId";
79 private IMap<Long, byte[]> mapFlowId = null;
80 private MapFlowIdListener mapFlowIdListener = null;
81 private String mapFlowIdListenerId = null;
82
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080083 // State related to the Flow Entry ID map
84 protected static final String mapFlowEntryIdName = "mapFlowEntryId";
85 private IMap<Long, byte[]> mapFlowEntryId = null;
86 private MapFlowEntryIdListener mapFlowEntryIdListener = null;
87 private String mapFlowEntryIdListenerId = null;
88
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070089 // State related to the Network Topology map
90 protected static final String mapTopologyName = "mapTopology";
91 private IMap<String, byte[]> mapTopology = null;
92 private MapTopologyListener mapTopologyListener = null;
93 private String mapTopologyListenerId = null;
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -080094
Jonathan Hart18ad55c2013-11-11 22:49:55 -080095 // State related to the ARP map
96 protected static final String arpMapName = "arpMap";
Jonathan Hartd3003252013-11-15 09:44:46 -080097 private IMap<ArpMessage, byte[]> arpMap = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080098 private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
99 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700100
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700101 /**
102 * Class for receiving notifications for Flow state.
103 *
104 * The datagrid map is:
105 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -0800106 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700107 */
108 class MapFlowListener implements EntryListener<Long, byte[]> {
109 /**
110 * Receive a notification that an entry is added.
111 *
112 * @param event the notification event for the entry.
113 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800114 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800115 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800116 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700117
118 //
119 // Decode the value and deliver the notification
120 //
121 Kryo kryo = kryoFactory.newKryo();
122 Input input = new Input(valueBytes);
123 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
124 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700125 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700126 }
127
128 /**
129 * Receive a notification that an entry is removed.
130 *
131 * @param event the notification event for the entry.
132 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800133 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800134 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800135 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700136
137 //
138 // Decode the value and deliver the notification
139 //
140 Kryo kryo = kryoFactory.newKryo();
141 Input input = new Input(valueBytes);
142 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
143 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700144 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700145 }
146
147 /**
148 * Receive a notification that an entry is updated.
149 *
150 * @param event the notification event for the entry.
151 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800152 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800153 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800154 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700155
156 //
157 // Decode the value and deliver the notification
158 //
159 Kryo kryo = kryoFactory.newKryo();
160 Input input = new Input(valueBytes);
161 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
162 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700163 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700164 }
165
166 /**
167 * Receive a notification that an entry is evicted.
168 *
169 * @param event the notification event for the entry.
170 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800171 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800172 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700173 // NOTE: We don't use eviction for this map
174 }
175 }
176
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700177 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700178 * Class for receiving notifications for FlowEntry state.
179 *
180 * The datagrid map is:
181 * - Key : FlowEntry ID (Long)
182 * - Value : Serialized FlowEntry (byte[])
183 */
184 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
185 /**
186 * Receive a notification that an entry is added.
187 *
188 * @param event the notification event for the entry.
189 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800190 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800191 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800192 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700193
194 //
195 // Decode the value and deliver the notification
196 //
197 Kryo kryo = kryoFactory.newKryo();
198 Input input = new Input(valueBytes);
199 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
200 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700201 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700202 }
203
204 /**
205 * Receive a notification that an entry is removed.
206 *
207 * @param event the notification event for the entry.
208 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800209 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800210 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800211 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700212
213 //
214 // Decode the value and deliver the notification
215 //
216 Kryo kryo = kryoFactory.newKryo();
217 Input input = new Input(valueBytes);
218 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
219 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700220 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700221 }
222
223 /**
224 * Receive a notification that an entry is updated.
225 *
226 * @param event the notification event for the entry.
227 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800228 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800229 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800230 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700231
232 //
233 // Decode the value and deliver the notification
234 //
235 Kryo kryo = kryoFactory.newKryo();
236 Input input = new Input(valueBytes);
237 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
238 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700239 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700240 }
241
242 /**
243 * Receive a notification that an entry is evicted.
244 *
245 * @param event the notification event for the entry.
246 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800247 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800248 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700249 // NOTE: We don't use eviction for this map
250 }
251 }
252
253 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800254 * Class for receiving notifications for FlowId state.
255 *
256 * The datagrid map is:
257 * - Key : FlowId (Long)
258 * - Value : Serialized FlowId (byte[])
259 */
260 class MapFlowIdListener implements EntryListener<Long, byte[]> {
261 /**
262 * Receive a notification that an entry is added.
263 *
264 * @param event the notification event for the entry.
265 */
266 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800267 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800268
269 //
270 // Decode the value and deliver the notification
271 //
272 Kryo kryo = kryoFactory.newKryo();
273 Input input = new Input(valueBytes);
274 FlowId flowId = kryo.readObject(input, FlowId.class);
275 kryoFactory.deleteKryo(kryo);
276 flowEventHandlerService.notificationRecvFlowIdAdded(flowId);
277 }
278
279 /**
280 * Receive a notification that an entry is removed.
281 *
282 * @param event the notification event for the entry.
283 */
284 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800285 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800286
287 //
288 // Decode the value and deliver the notification
289 //
290 Kryo kryo = kryoFactory.newKryo();
291 Input input = new Input(valueBytes);
292 FlowId flowId = kryo.readObject(input, FlowId.class);
293 kryoFactory.deleteKryo(kryo);
294 flowEventHandlerService.notificationRecvFlowIdRemoved(flowId);
295 }
296
297 /**
298 * Receive a notification that an entry is updated.
299 *
300 * @param event the notification event for the entry.
301 */
302 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800303 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800304
305 //
306 // Decode the value and deliver the notification
307 //
308 Kryo kryo = kryoFactory.newKryo();
309 Input input = new Input(valueBytes);
310 FlowId flowId = kryo.readObject(input, FlowId.class);
311 kryoFactory.deleteKryo(kryo);
312 flowEventHandlerService.notificationRecvFlowIdUpdated(flowId);
313 }
314
315 /**
316 * Receive a notification that an entry is evicted.
317 *
318 * @param event the notification event for the entry.
319 */
320 public void entryEvicted(EntryEvent<Long, byte[]> event) {
321 // NOTE: We don't use eviction for this map
322 }
323 }
324
325 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800326 * Class for receiving notifications for FlowEntryId state.
327 *
328 * The datagrid map is:
329 * - Key : FlowEntryId (Long)
330 * - Value : Serialized Switch Dpid (byte[])
331 */
332 class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
333 /**
334 * Receive a notification that an entry is added.
335 *
336 * @param event the notification event for the entry.
337 */
338 public void entryAdded(EntryEvent<Long, byte[]> event) {
339 Long keyLong = event.getKey();
340 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
341
342 byte[] valueBytes = event.getValue();
343
344 //
345 // Decode the value and deliver the notification
346 //
347 Kryo kryo = kryoFactory.newKryo();
348 Input input = new Input(valueBytes);
349 Dpid dpid = kryo.readObject(input, Dpid.class);
350 kryoFactory.deleteKryo(kryo);
351 flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
352 }
353
354 /**
355 * Receive a notification that an entry is removed.
356 *
357 * @param event the notification event for the entry.
358 */
359 public void entryRemoved(EntryEvent<Long, byte[]> event) {
360 Long keyLong = event.getKey();
361 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
362
363 byte[] valueBytes = event.getValue();
364
365 //
366 // Decode the value and deliver the notification
367 //
368 Kryo kryo = kryoFactory.newKryo();
369 Input input = new Input(valueBytes);
370 Dpid dpid = kryo.readObject(input, Dpid.class);
371 kryoFactory.deleteKryo(kryo);
372 flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
373 }
374
375 /**
376 * Receive a notification that an entry is updated.
377 *
378 * @param event the notification event for the entry.
379 */
380 public void entryUpdated(EntryEvent<Long, byte[]> event) {
381 Long keyLong = event.getKey();
382 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
383
384 byte[] valueBytes = event.getValue();
385
386 //
387 // Decode the value and deliver the notification
388 //
389 Kryo kryo = kryoFactory.newKryo();
390 Input input = new Input(valueBytes);
391 Dpid dpid = kryo.readObject(input, Dpid.class);
392 kryoFactory.deleteKryo(kryo);
393 flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
394 }
395
396 /**
397 * Receive a notification that an entry is evicted.
398 *
399 * @param event the notification event for the entry.
400 */
401 public void entryEvicted(EntryEvent<Long, byte[]> event) {
402 // NOTE: We don't use eviction for this map
403 }
404 }
405
406 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700407 * Class for receiving notifications for Network Topology state.
408 *
409 * The datagrid map is:
410 * - Key: TopologyElement ID (String)
411 * - Value: Serialized TopologyElement (byte[])
412 */
413 class MapTopologyListener implements EntryListener<String, byte[]> {
414 /**
415 * Receive a notification that an entry is added.
416 *
417 * @param event the notification event for the entry.
418 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800419 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800420 public void entryAdded(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800421 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700422
423 //
424 // Decode the value and deliver the notification
425 //
426 Kryo kryo = kryoFactory.newKryo();
427 Input input = new Input(valueBytes);
428 TopologyElement topologyElement =
429 kryo.readObject(input, TopologyElement.class);
430 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700431 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700432 }
433
434 /**
435 * Receive a notification that an entry is removed.
436 *
437 * @param event the notification event for the entry.
438 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800439 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800440 public void entryRemoved(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800441 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700442
443 //
444 // Decode the value and deliver the notification
445 //
446 Kryo kryo = kryoFactory.newKryo();
447 Input input = new Input(valueBytes);
448 TopologyElement topologyElement =
449 kryo.readObject(input, TopologyElement.class);
450 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700451 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700452 }
453
454 /**
455 * Receive a notification that an entry is updated.
456 *
457 * @param event the notification event for the entry.
458 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800459 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800460 public void entryUpdated(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800461 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700462
463 //
464 // Decode the value and deliver the notification
465 //
466 Kryo kryo = kryoFactory.newKryo();
467 Input input = new Input(valueBytes);
468 TopologyElement topologyElement =
469 kryo.readObject(input, TopologyElement.class);
470 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700471 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700472 }
473
474 /**
475 * Receive a notification that an entry is evicted.
476 *
477 * @param event the notification event for the entry.
478 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800479 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800480 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700481 // NOTE: We don't use eviction for this map
482 }
483 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800484
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800485 /**
486 * Class for receiving notifications for ARP requests.
487 *
488 * The datagrid map is:
489 * - Key: Request ID (String)
490 * - Value: ARP request packet (byte[])
491 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800492 class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800493 /**
494 * Receive a notification that an entry is added.
495 *
496 * @param event the notification event for the entry.
497 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800498 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800499 public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800500 for (IArpEventHandler arpEventHandler : arpEventHandlers) {
501 arpEventHandler.arpRequestNotification(event.getKey());
502 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800503
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800504 //
505 // Decode the value and deliver the notification
506 //
507 /*
508 Kryo kryo = kryoFactory.newKryo();
509 Input input = new Input(valueBytes);
510 TopologyElement topologyElement =
511 kryo.readObject(input, TopologyElement.class);
512 kryoFactory.deleteKryo(kryo);
513 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
514 */
515 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800516
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800517 /**
518 * Receive a notification that an entry is removed.
519 *
520 * @param event the notification event for the entry.
521 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800522 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800523 public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800524 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800525 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800526
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800527 /**
528 * Receive a notification that an entry is updated.
529 *
530 * @param event the notification event for the entry.
531 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800532 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800533 public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800534 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800535 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800536
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800537 /**
538 * Receive a notification that an entry is evicted.
539 *
540 * @param event the notification event for the entry.
541 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800542 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800543 public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800544 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800545 }
546 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700547
548 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700549 * Initialize the Hazelcast Datagrid operation.
550 *
551 * @param conf the configuration filename.
552 */
553 public void init(String configFilename) {
554 /*
555 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
556 System.setProperty("hazelcast.socket.send.buffer.size", "32");
557 */
558 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800559
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700560 // Init from configuration file
561 try {
562 hazelcastConfig = new FileSystemXmlConfig(configFilename);
563 } catch (FileNotFoundException e) {
564 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
565 }
566 /*
567 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
568 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
569 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
570 */
571 //
572 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
573 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
574 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
575 }
576
577 /**
578 * Shutdown the Hazelcast Datagrid operation.
579 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800580 @Override
581 protected void finalize() {
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700582 close();
583 }
584
585 /**
586 * Shutdown the Hazelcast Datagrid operation.
587 */
588 public void close() {
589 Hazelcast.shutdownAll();
590 }
591
592 /**
593 * Get the collection of offered module services.
594 *
595 * @return the collection of offered module services.
596 */
597 @Override
598 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800599 Collection<Class<? extends IFloodlightService>> l =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700600 new ArrayList<Class<? extends IFloodlightService>>();
601 l.add(IDatagridService.class);
602 return l;
603 }
604
605 /**
606 * Get the collection of implemented services.
607 *
608 * @return the collection of implemented services.
609 */
610 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800611 public Map<Class<? extends IFloodlightService>, IFloodlightService>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700612 getServiceImpls() {
613 Map<Class<? extends IFloodlightService>,
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800614 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700615 new HashMap<Class<? extends IFloodlightService>,
616 IFloodlightService>();
617 m.put(IDatagridService.class, this);
618 return m;
619 }
620
621 /**
622 * Get the collection of modules this module depends on.
623 *
624 * @return the collection of modules this module depends on.
625 */
626 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800627 public Collection<Class<? extends IFloodlightService>>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700628 getModuleDependencies() {
629 Collection<Class<? extends IFloodlightService>> l =
630 new ArrayList<Class<? extends IFloodlightService>>();
631 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700632 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700633 return l;
634 }
635
636 /**
637 * Initialize the module.
638 *
639 * @param context the module context to use for the initialization.
640 */
641 @Override
642 public void init(FloodlightModuleContext context)
643 throws FloodlightModuleException {
644 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700645 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700646
647 // Get the configuration file name and configure the Datagrid
648 Map<String, String> configMap = context.getConfigParams(this);
649 String configFilename = configMap.get(HazelcastConfigFile);
650 this.init(configFilename);
651 }
652
653 /**
654 * Startup module operation.
655 *
656 * @param context the module context to use for the startup.
657 */
658 @Override
659 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700660 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700661
662 restApi.addRestletRoutable(new DatagridWebRoutable());
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800663
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800664 arpMap = hazelcastInstance.getMap(arpMapName);
665 arpMap.addEntryListener(new ArpMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700666 }
667
668 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700669 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700670 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700671 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700672 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700673 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700674 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700675 */
676 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700677 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
678 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700679
680 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700681 mapFlowListener = new MapFlowListener();
682 mapFlow = hazelcastInstance.getMap(mapFlowName);
683 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700684
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700685 // Initialize the FlowEntry-related map state
686 mapFlowEntryListener = new MapFlowEntryListener();
687 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
688 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
689
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800690 // Initialize the FlowId-related map state
691 mapFlowIdListener = new MapFlowIdListener();
692 mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
693 mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
694
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800695 // Initialize the FlowEntryId-related map state
696 mapFlowEntryIdListener = new MapFlowEntryIdListener();
697 mapFlowEntryId = hazelcastInstance.getMap(mapFlowEntryIdName);
698 mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);
699
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700700 // Initialize the Topology-related map state
701 mapTopologyListener = new MapTopologyListener();
702 mapTopology = hazelcastInstance.getMap(mapTopologyName);
703 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700704 }
705
706 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700707 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700708 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700709 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700710 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700711 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700712 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700713 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700714 */
715 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700716 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700717 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700718 mapFlow.removeEntryListener(mapFlowListenerId);
719 mapFlow = null;
720 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700721
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700722 // Clear the FlowEntry-related map state
723 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
724 mapFlowEntry = null;
725 mapFlowEntryListener = null;
726
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800727 // Clear the FlowId-related map state
728 mapFlowId.removeEntryListener(mapFlowIdListenerId);
729 mapFlowId = null;
730 mapFlowIdListener = null;
731
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800732 // Clear the FlowEntryId-related map state
733 mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
734 mapFlowEntryId = null;
735 mapFlowEntryIdListener = null;
736
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700737 // Clear the Topology-related map state
738 mapTopology.removeEntryListener(mapTopologyListenerId);
739 mapTopology = null;
740 mapTopologyListener = null;
741
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700742 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700743 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800744
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800745 @Override
746 public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
747 if (arpEventHandler != null) {
748 arpEventHandlers.add(arpEventHandler);
749 }
750 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800751
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800752 @Override
753 public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
754 arpEventHandlers.remove(arpEventHandler);
755 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800756
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700757 /**
758 * Get all Flows that are currently in the datagrid.
759 *
760 * @return all Flows that are currently in the datagrid.
761 */
762 @Override
763 public Collection<FlowPath> getAllFlows() {
764 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
765
766 //
767 // Get all current entries
768 //
769 Collection<byte[]> values = mapFlow.values();
770 Kryo kryo = kryoFactory.newKryo();
771 for (byte[] valueBytes : values) {
772 //
773 // Decode the value
774 //
775 Input input = new Input(valueBytes);
776 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
777 allFlows.add(flowPath);
778 }
779 kryoFactory.deleteKryo(kryo);
780
781 return allFlows;
782 }
783
784 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800785 * Get a Flow for a given Flow ID.
786 *
787 * @param flowId the Flow ID of the Flow to get.
788 * @return the Flow if found, otherwise null.
789 */
790 @Override
791 public FlowPath getFlow(FlowId flowId) {
792 byte[] valueBytes = mapFlow.get(flowId.value());
793 if (valueBytes == null)
794 return null;
795
796 Kryo kryo = kryoFactory.newKryo();
797 //
798 // Decode the value
799 //
800 Input input = new Input(valueBytes);
801 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
802 kryoFactory.deleteKryo(kryo);
803
804 return flowPath;
805 }
806
807 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700808 * Send a notification that a Flow is added.
809 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700810 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700811 */
812 @Override
813 public void notificationSendFlowAdded(FlowPath flowPath) {
814 //
815 // Encode the value
816 //
817 byte[] buffer = new byte[MAX_BUFFER_SIZE];
818 Kryo kryo = kryoFactory.newKryo();
819 Output output = new Output(buffer, -1);
820 kryo.writeObject(output, flowPath);
821 byte[] valueBytes = output.toBytes();
822 kryoFactory.deleteKryo(kryo);
823
824 //
825 // Put the entry:
826 // - Key : Flow ID (Long)
827 // - Value : Serialized Flow (byte[])
828 //
829 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
830 }
831
832 /**
833 * Send a notification that a Flow is removed.
834 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700835 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700836 */
837 @Override
838 public void notificationSendFlowRemoved(FlowId flowId) {
839 //
840 // Remove the entry:
841 // - Key : Flow ID (Long)
842 // - Value : Serialized Flow (byte[])
843 //
844 mapFlow.removeAsync(flowId.value());
845 }
846
847 /**
848 * Send a notification that a Flow is updated.
849 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700850 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700851 */
852 @Override
853 public void notificationSendFlowUpdated(FlowPath flowPath) {
854 // NOTE: Adding an entry with an existing key automatically updates it
855 notificationSendFlowAdded(flowPath);
856 }
857
858 /**
859 * Send a notification that all Flows are removed.
860 */
861 @Override
862 public void notificationSendAllFlowsRemoved() {
863 //
864 // Remove all entries
865 // NOTE: We remove the entries one-by-one so the per-entry
866 // notifications will be delivered.
867 //
868 // mapFlow.clear();
869 Set<Long> keySet = mapFlow.keySet();
870 for (Long key : keySet) {
871 mapFlow.removeAsync(key);
872 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700873 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700874
875 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700876 * Get all Flow Entries that are currently in the datagrid.
877 *
878 * @return all Flow Entries that are currently in the datagrid.
879 */
880 @Override
881 public Collection<FlowEntry> getAllFlowEntries() {
882 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
883
884 //
885 // Get all current entries
886 //
887 Collection<byte[]> values = mapFlowEntry.values();
888 Kryo kryo = kryoFactory.newKryo();
889 for (byte[] valueBytes : values) {
890 //
891 // Decode the value
892 //
893 Input input = new Input(valueBytes);
894 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
895 allFlowEntries.add(flowEntry);
896 }
897 kryoFactory.deleteKryo(kryo);
898
899 return allFlowEntries;
900 }
901
902 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800903 * Get a Flow Entry for a given Flow Entry ID.
904 *
905 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
906 * @return the Flow Entry if found, otherwise null.
907 */
908 @Override
909 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
910 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
911 if (valueBytes == null)
912 return null;
913
914 Kryo kryo = kryoFactory.newKryo();
915 //
916 // Decode the value
917 //
918 Input input = new Input(valueBytes);
919 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
920 kryoFactory.deleteKryo(kryo);
921
922 return flowEntry;
923 }
924
925 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700926 * Send a notification that a FlowEntry is added.
927 *
928 * @param flowEntry the FlowEntry that is added.
929 */
930 @Override
931 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
932 //
933 // Encode the value
934 //
935 byte[] buffer = new byte[MAX_BUFFER_SIZE];
936 Kryo kryo = kryoFactory.newKryo();
937 Output output = new Output(buffer, -1);
938 kryo.writeObject(output, flowEntry);
939 byte[] valueBytes = output.toBytes();
940 kryoFactory.deleteKryo(kryo);
941
942 //
943 // Put the entry:
944 // - Key : FlowEntry ID (Long)
945 // - Value : Serialized FlowEntry (byte[])
946 //
947 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
948 }
949
950 /**
951 * Send a notification that a FlowEntry is removed.
952 *
953 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
954 */
955 @Override
956 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
957 //
958 // Remove the entry:
959 // - Key : FlowEntry ID (Long)
960 // - Value : Serialized FlowEntry (byte[])
961 //
962 mapFlowEntry.removeAsync(flowEntryId.value());
963 }
964
965 /**
966 * Send a notification that a FlowEntry is updated.
967 *
968 * @param flowEntry the FlowEntry that is updated.
969 */
970 @Override
971 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
972 // NOTE: Adding an entry with an existing key automatically updates it
973 notificationSendFlowEntryAdded(flowEntry);
974 }
975
976 /**
977 * Send a notification that all Flow Entries are removed.
978 */
979 @Override
980 public void notificationSendAllFlowEntriesRemoved() {
981 //
982 // Remove all entries
983 // NOTE: We remove the entries one-by-one so the per-entry
984 // notifications will be delivered.
985 //
986 // mapFlowEntry.clear();
987 Set<Long> keySet = mapFlowEntry.keySet();
988 for (Long key : keySet) {
989 mapFlowEntry.removeAsync(key);
990 }
991 }
992
993 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800994 * Get all Flow IDs that are currently in the datagrid.
995 *
996 * @return all Flow IDs that are currently in the datagrid.
997 */
998 @Override
999 public Collection<FlowId> getAllFlowIds() {
1000 Collection<FlowId> allFlowIds = new LinkedList<FlowId>();
1001
1002 //
1003 // Get all current entries
1004 //
1005 Collection<byte[]> values = mapFlowId.values();
1006 Kryo kryo = kryoFactory.newKryo();
1007 for (byte[] valueBytes : values) {
1008 //
1009 // Decode the value
1010 //
1011 Input input = new Input(valueBytes);
1012 FlowId flowId = kryo.readObject(input, FlowId.class);
1013 allFlowIds.add(flowId);
1014 }
1015 kryoFactory.deleteKryo(kryo);
1016
1017 return allFlowIds;
1018 }
1019
1020 /**
1021 * Send a notification that a FlowId is added.
1022 *
1023 * @param flowId the FlowId that is added.
1024 */
1025 @Override
1026 public void notificationSendFlowIdAdded(FlowId flowId) {
1027 //
1028 // Encode the value
1029 //
1030 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1031 Kryo kryo = kryoFactory.newKryo();
1032 Output output = new Output(buffer, -1);
1033 kryo.writeObject(output, flowId);
1034 byte[] valueBytes = output.toBytes();
1035 kryoFactory.deleteKryo(kryo);
1036
1037 //
1038 // Put the entry:
1039 // - Key : FlowId (Long)
1040 // - Value : Serialized FlowId (byte[])
1041 //
1042 mapFlowId.putAsync(flowId.value(), valueBytes);
1043 }
1044
1045 /**
1046 * Send a notification that a FlowId is removed.
1047 *
1048 * @param flowId the FlowId that is removed.
1049 */
1050 @Override
1051 public void notificationSendFlowIdRemoved(FlowId flowId) {
1052 //
1053 // Remove the entry:
1054 // - Key : FlowId (Long)
1055 // - Value : Serialized FlowId (byte[])
1056 //
1057 mapFlowId.removeAsync(flowId.value());
1058 }
1059
1060 /**
1061 * Send a notification that a FlowId is updated.
1062 *
1063 * @param flowId the FlowId that is updated.
1064 */
1065 @Override
1066 public void notificationSendFlowIdUpdated(FlowId flowId) {
1067 // NOTE: Adding an entry with an existing key automatically updates it
1068 notificationSendFlowIdAdded(flowId);
1069 }
1070
1071 /**
1072 * Send a notification that all Flow IDs are removed.
1073 */
1074 @Override
1075 public void notificationSendAllFlowIdsRemoved() {
1076 //
1077 // Remove all entries
1078 // NOTE: We remove the entries one-by-one so the per-entry
1079 // notifications will be delivered.
1080 //
1081 // mapFlowId.clear();
1082 Set<Long> keySet = mapFlowId.keySet();
1083 for (Long key : keySet) {
1084 mapFlowId.removeAsync(key);
1085 }
1086 }
1087
1088 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001089 * Send a notification that a FlowEntryId is added.
1090 *
1091 * @param flowEntryId the FlowEntryId that is added.
1092 * @param dpid the Switch Dpid.
1093 */
1094 @Override
1095 public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
1096 Dpid dpid) {
1097 //
1098 // Encode the value
1099 //
1100 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1101 Kryo kryo = kryoFactory.newKryo();
1102 Output output = new Output(buffer, -1);
1103 kryo.writeObject(output, dpid);
1104 byte[] valueBytes = output.toBytes();
1105 kryoFactory.deleteKryo(kryo);
1106
1107 //
1108 // Put the entry:
1109 // - Key : FlowEntryId (Long)
1110 // - Value : Serialized Switch Dpid (byte[])
1111 //
1112 mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
1113 }
1114
1115 /**
1116 * Send a notification that a FlowEntryId is removed.
1117 *
1118 * @param flowEntryId the FlowEntryId that is removed.
1119 */
1120 @Override
1121 public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
1122 //
1123 // Remove the entry:
1124 // - Key : FlowEntryId (Long)
1125 // - Value : Serialized Dpid (byte[])
1126 //
1127 mapFlowEntryId.removeAsync(flowEntryId.value());
1128 }
1129
1130 /**
1131 * Send a notification that a FlowEntryId is updated.
1132 *
1133 * @param flowEntryId the FlowEntryId that is updated.
1134 * @param dpid the Switch Dpid.
1135 */
1136 @Override
1137 public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
1138 Dpid dpid) {
1139 // NOTE: Adding an entry with an existing key automatically updates it
1140 notificationSendFlowEntryIdAdded(flowEntryId, dpid);
1141 }
1142
1143 /**
1144 * Send a notification that all Flow Entry IDs are removed.
1145 */
1146 @Override
1147 public void notificationSendAllFlowEntryIdsRemoved() {
1148 //
1149 // Remove all entries
1150 // NOTE: We remove the entries one-by-one so the per-entry
1151 // notifications will be delivered.
1152 //
1153 // mapFlowEntryId.clear();
1154 Set<Long> keySet = mapFlowEntryId.keySet();
1155 for (Long key : keySet) {
1156 mapFlowEntryId.removeAsync(key);
1157 }
1158 }
1159
1160 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001161 * Get all Topology Elements that are currently in the datagrid.
1162 *
1163 * @return all Topology Elements that are currently in the datagrid.
1164 */
1165 @Override
1166 public Collection<TopologyElement> getAllTopologyElements() {
1167 Collection<TopologyElement> allTopologyElements =
1168 new LinkedList<TopologyElement>();
1169
1170 //
1171 // Get all current entries
1172 //
1173 Collection<byte[]> values = mapTopology.values();
1174 Kryo kryo = kryoFactory.newKryo();
1175 for (byte[] valueBytes : values) {
1176 //
1177 // Decode the value
1178 //
1179 Input input = new Input(valueBytes);
1180 TopologyElement topologyElement =
1181 kryo.readObject(input, TopologyElement.class);
1182 allTopologyElements.add(topologyElement);
1183 }
1184 kryoFactory.deleteKryo(kryo);
1185
1186 return allTopologyElements;
1187 }
1188
1189 /**
1190 * Send a notification that a Topology Element is added.
1191 *
1192 * @param topologyElement the Topology Element that is added.
1193 */
1194 @Override
1195 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
1196 //
1197 // Encode the value
1198 //
1199 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1200 Kryo kryo = kryoFactory.newKryo();
1201 Output output = new Output(buffer, -1);
1202 kryo.writeObject(output, topologyElement);
1203 byte[] valueBytes = output.toBytes();
1204 kryoFactory.deleteKryo(kryo);
1205
1206 //
1207 // Put the entry:
1208 // - Key : TopologyElement ID (String)
1209 // - Value : Serialized TopologyElement (byte[])
1210 //
1211 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
1212 }
1213
1214 /**
1215 * Send a notification that a Topology Element is removed.
1216 *
1217 * @param topologyElement the Topology Element that is removed.
1218 */
1219 @Override
1220 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
1221 //
1222 // Remove the entry:
1223 // - Key : TopologyElement ID (String)
1224 // - Value : Serialized TopologyElement (byte[])
1225 //
1226 mapTopology.removeAsync(topologyElement.elementId());
1227 }
1228
1229 /**
1230 * Send a notification that a Topology Element is updated.
1231 *
1232 * @param topologyElement the Topology Element that is updated.
1233 */
1234 @Override
1235 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
1236 // NOTE: Adding an entry with an existing key automatically updates it
1237 notificationSendTopologyElementAdded(topologyElement);
1238 }
1239
1240 /**
1241 * Send a notification that all Topology Elements are removed.
1242 */
1243 @Override
1244 public void notificationSendAllTopologyElementsRemoved() {
1245 //
1246 // Remove all entries
1247 // NOTE: We remove the entries one-by-one so the per-entry
1248 // notifications will be delivered.
1249 //
1250 // mapTopology.clear();
1251 Set<String> keySet = mapTopology.keySet();
1252 for (String key : keySet) {
1253 mapTopology.removeAsync(key);
1254 }
1255 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -08001256
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001257 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -08001258 public void sendArpRequest(ArpMessage arpMessage) {
1259 //log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
1260 arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001261 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001262}