blob: 45995bf9a94c09e2b28a219622eb3d107fb4d49b [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Brian O'Connor2daf7a92014-01-14 11:26:35 -080021import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor.Measurement;
Jonathan Hartd3003252013-11-15 09:44:46 -080022import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080023import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070024import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080025import net.onrc.onos.ofcontroller.util.Dpid;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070026import net.onrc.onos.ofcontroller.util.FlowEntry;
27import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070028import net.onrc.onos.ofcontroller.util.FlowId;
29import net.onrc.onos.ofcontroller.util.FlowPath;
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -080030import net.onrc.onos.ofcontroller.util.Pair;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070031import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
32
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070033import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
35
Jonathan Hart18ad55c2013-11-11 22:49:55 -080036import com.esotericsoftware.kryo2.Kryo;
37import com.esotericsoftware.kryo2.io.Input;
38import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070039import com.hazelcast.config.Config;
40import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070041import com.hazelcast.core.EntryEvent;
42import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070043import com.hazelcast.core.Hazelcast;
44import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070045import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070046import com.hazelcast.instance.GroupProperties;
47
Pavlin Radoslavove2497672014-01-12 18:03:35 -080048import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor;
49
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070050/**
51 * A datagrid service that uses Hazelcast as a datagrid.
52 * The relevant data is stored in the Hazelcast datagrid and shared as
53 * appropriate in a multi-node cluster.
54 */
55public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070056 private final static int MAX_BUFFER_SIZE = 64*1024;
57
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070058 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070059 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070060 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070061
62 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070063 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070064 private Config hazelcastConfig = null;
65
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070066 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070067 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070068
69 // State related to the Flow map
70 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070071 private IMap<Long, byte[]> mapFlow = null;
72 private MapFlowListener mapFlowListener = null;
73 private String mapFlowListenerId = null;
74
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070075 // State related to the Flow Entry map
76 protected static final String mapFlowEntryName = "mapFlowEntry";
77 private IMap<Long, byte[]> mapFlowEntry = null;
78 private MapFlowEntryListener mapFlowEntryListener = null;
79 private String mapFlowEntryListenerId = null;
80
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -080081 // State related to the Flow ID map
82 protected static final String mapFlowIdName = "mapFlowId";
83 private IMap<Long, byte[]> mapFlowId = null;
84 private MapFlowIdListener mapFlowIdListener = null;
85 private String mapFlowIdListenerId = null;
86
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080087 // State related to the Flow Entry ID map
88 protected static final String mapFlowEntryIdName = "mapFlowEntryId";
89 private IMap<Long, byte[]> mapFlowEntryId = null;
90 private MapFlowEntryIdListener mapFlowEntryIdListener = null;
91 private String mapFlowEntryIdListenerId = null;
92
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070093 // State related to the Network Topology map
94 protected static final String mapTopologyName = "mapTopology";
95 private IMap<String, byte[]> mapTopology = null;
96 private MapTopologyListener mapTopologyListener = null;
97 private String mapTopologyListenerId = null;
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -080098
Jonathan Hart18ad55c2013-11-11 22:49:55 -080099 // State related to the ARP map
100 protected static final String arpMapName = "arpMap";
Jonathan Hartd3003252013-11-15 09:44:46 -0800101 private IMap<ArpMessage, byte[]> arpMap = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800102 private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
103 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700104
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700105 /**
106 * Class for receiving notifications for Flow state.
107 *
108 * The datagrid map is:
109 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -0800110 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700111 */
112 class MapFlowListener implements EntryListener<Long, byte[]> {
113 /**
114 * Receive a notification that an entry is added.
115 *
116 * @param event the notification event for the entry.
117 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800118 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800119 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800120 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700121
122 //
123 // Decode the value and deliver the notification
124 //
125 Kryo kryo = kryoFactory.newKryo();
126 Input input = new Input(valueBytes);
127 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
128 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700129 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700130 }
131
132 /**
133 * Receive a notification that an entry is removed.
134 *
135 * @param event the notification event for the entry.
136 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800137 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800138 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800139 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700140
141 //
142 // Decode the value and deliver the notification
143 //
144 Kryo kryo = kryoFactory.newKryo();
145 Input input = new Input(valueBytes);
146 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
147 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700148 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700149 }
150
151 /**
152 * Receive a notification that an entry is updated.
153 *
154 * @param event the notification event for the entry.
155 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800156 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800157 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800158 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700159
160 //
161 // Decode the value and deliver the notification
162 //
163 Kryo kryo = kryoFactory.newKryo();
164 Input input = new Input(valueBytes);
165 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
166 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700167 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700168 }
169
170 /**
171 * Receive a notification that an entry is evicted.
172 *
173 * @param event the notification event for the entry.
174 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800175 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800176 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700177 // NOTE: We don't use eviction for this map
178 }
179 }
180
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700181 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700182 * Class for receiving notifications for FlowEntry state.
183 *
184 * The datagrid map is:
185 * - Key : FlowEntry ID (Long)
186 * - Value : Serialized FlowEntry (byte[])
187 */
188 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
189 /**
190 * Receive a notification that an entry is added.
191 *
192 * @param event the notification event for the entry.
193 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800194 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800195 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800196 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700197
198 //
199 // Decode the value and deliver the notification
200 //
201 Kryo kryo = kryoFactory.newKryo();
202 Input input = new Input(valueBytes);
203 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
204 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700205 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700206 }
207
208 /**
209 * Receive a notification that an entry is removed.
210 *
211 * @param event the notification event for the entry.
212 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800213 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800214 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800215 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700216
217 //
218 // Decode the value and deliver the notification
219 //
220 Kryo kryo = kryoFactory.newKryo();
221 Input input = new Input(valueBytes);
222 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
223 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700224 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700225 }
226
227 /**
228 * Receive a notification that an entry is updated.
229 *
230 * @param event the notification event for the entry.
231 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800232 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800233 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800234 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700235
236 //
237 // Decode the value and deliver the notification
238 //
239 Kryo kryo = kryoFactory.newKryo();
240 Input input = new Input(valueBytes);
241 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
242 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700243 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700244 }
245
246 /**
247 * Receive a notification that an entry is evicted.
248 *
249 * @param event the notification event for the entry.
250 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800251 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800252 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700253 // NOTE: We don't use eviction for this map
254 }
255 }
256
257 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800258 * Class for receiving notifications for FlowId state.
259 *
260 * The datagrid map is:
261 * - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800262 * - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800263 */
264 class MapFlowIdListener implements EntryListener<Long, byte[]> {
265 /**
266 * Receive a notification that an entry is added.
267 *
268 * @param event the notification event for the entry.
269 */
270 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800271 Long keyLong = event.getKey();
272 FlowId flowId = new FlowId(keyLong);
273
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800274 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800275
276 //
277 // Decode the value and deliver the notification
278 //
279 Kryo kryo = kryoFactory.newKryo();
280 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800281 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800282 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800283 flowEventHandlerService.notificationRecvFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800284 }
285
286 /**
287 * Receive a notification that an entry is removed.
288 *
289 * @param event the notification event for the entry.
290 */
291 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800292 Long keyLong = event.getKey();
293 FlowId flowId = new FlowId(keyLong);
294
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800295 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800296
297 //
298 // Decode the value and deliver the notification
299 //
300 Kryo kryo = kryoFactory.newKryo();
301 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800302 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800303 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800304 flowEventHandlerService.notificationRecvFlowIdRemoved(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800305 }
306
307 /**
308 * Receive a notification that an entry is updated.
309 *
310 * @param event the notification event for the entry.
311 */
312 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800313 Long keyLong = event.getKey();
314 FlowId flowId = new FlowId(keyLong);
315
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800316 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800317
318 //
319 // Decode the value and deliver the notification
320 //
321 Kryo kryo = kryoFactory.newKryo();
322 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800323 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800324 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800325 flowEventHandlerService.notificationRecvFlowIdUpdated(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800326 }
327
328 /**
329 * Receive a notification that an entry is evicted.
330 *
331 * @param event the notification event for the entry.
332 */
333 public void entryEvicted(EntryEvent<Long, byte[]> event) {
334 // NOTE: We don't use eviction for this map
335 }
336 }
337
338 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800339 * Class for receiving notifications for FlowEntryId state.
340 *
341 * The datagrid map is:
342 * - Key : FlowEntryId (Long)
343 * - Value : Serialized Switch Dpid (byte[])
344 */
345 class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
346 /**
347 * Receive a notification that an entry is added.
348 *
349 * @param event the notification event for the entry.
350 */
351 public void entryAdded(EntryEvent<Long, byte[]> event) {
352 Long keyLong = event.getKey();
353 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
354
355 byte[] valueBytes = event.getValue();
356
357 //
358 // Decode the value and deliver the notification
359 //
360 Kryo kryo = kryoFactory.newKryo();
361 Input input = new Input(valueBytes);
362 Dpid dpid = kryo.readObject(input, Dpid.class);
363 kryoFactory.deleteKryo(kryo);
364 flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
365 }
366
367 /**
368 * Receive a notification that an entry is removed.
369 *
370 * @param event the notification event for the entry.
371 */
372 public void entryRemoved(EntryEvent<Long, byte[]> event) {
373 Long keyLong = event.getKey();
374 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
375
376 byte[] valueBytes = event.getValue();
377
378 //
379 // Decode the value and deliver the notification
380 //
381 Kryo kryo = kryoFactory.newKryo();
382 Input input = new Input(valueBytes);
383 Dpid dpid = kryo.readObject(input, Dpid.class);
384 kryoFactory.deleteKryo(kryo);
385 flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
386 }
387
388 /**
389 * Receive a notification that an entry is updated.
390 *
391 * @param event the notification event for the entry.
392 */
393 public void entryUpdated(EntryEvent<Long, byte[]> event) {
394 Long keyLong = event.getKey();
395 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
396
397 byte[] valueBytes = event.getValue();
398
399 //
400 // Decode the value and deliver the notification
401 //
402 Kryo kryo = kryoFactory.newKryo();
403 Input input = new Input(valueBytes);
404 Dpid dpid = kryo.readObject(input, Dpid.class);
405 kryoFactory.deleteKryo(kryo);
406 flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
407 }
408
409 /**
410 * Receive a notification that an entry is evicted.
411 *
412 * @param event the notification event for the entry.
413 */
414 public void entryEvicted(EntryEvent<Long, byte[]> event) {
415 // NOTE: We don't use eviction for this map
416 }
417 }
418
419 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700420 * Class for receiving notifications for Network Topology state.
421 *
422 * The datagrid map is:
423 * - Key: TopologyElement ID (String)
424 * - Value: Serialized TopologyElement (byte[])
425 */
426 class MapTopologyListener implements EntryListener<String, byte[]> {
427 /**
428 * Receive a notification that an entry is added.
429 *
430 * @param event the notification event for the entry.
431 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800432 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800433 public void entryAdded(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800434 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700435
436 //
437 // Decode the value and deliver the notification
438 //
439 Kryo kryo = kryoFactory.newKryo();
440 Input input = new Input(valueBytes);
441 TopologyElement topologyElement =
442 kryo.readObject(input, TopologyElement.class);
443 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700444 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700445 }
446
447 /**
448 * Receive a notification that an entry is removed.
449 *
450 * @param event the notification event for the entry.
451 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800452 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800453 public void entryRemoved(EntryEvent<String, byte[]> event) {
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800454// String tag = "TopologyEntryRemoved.NotificationReceived." + event.getKey();
455 String tag = "TopologyEntryRemoved.NotificationReceived";
456 PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800457 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700458
459 //
460 // Decode the value and deliver the notification
461 //
462 Kryo kryo = kryoFactory.newKryo();
463 Input input = new Input(valueBytes);
464 TopologyElement topologyElement =
465 kryo.readObject(input, TopologyElement.class);
466 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700467 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800468// PerformanceMonitor.stop(tag);
469 m.stop();
470// PerformanceMonitor.report(tag);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700471 }
472
473 /**
474 * Receive a notification that an entry is updated.
475 *
476 * @param event the notification event for the entry.
477 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800478 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800479 public void entryUpdated(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800480 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700481
482 //
483 // Decode the value and deliver the notification
484 //
485 Kryo kryo = kryoFactory.newKryo();
486 Input input = new Input(valueBytes);
487 TopologyElement topologyElement =
488 kryo.readObject(input, TopologyElement.class);
489 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700490 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700491 }
492
493 /**
494 * Receive a notification that an entry is evicted.
495 *
496 * @param event the notification event for the entry.
497 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800498 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800499 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700500 // NOTE: We don't use eviction for this map
501 }
502 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800503
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800504 /**
505 * Class for receiving notifications for ARP requests.
506 *
507 * The datagrid map is:
508 * - Key: Request ID (String)
509 * - Value: ARP request packet (byte[])
510 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800511 class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800512 /**
513 * Receive a notification that an entry is added.
514 *
515 * @param event the notification event for the entry.
516 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800517 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800518 public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800519 for (IArpEventHandler arpEventHandler : arpEventHandlers) {
520 arpEventHandler.arpRequestNotification(event.getKey());
521 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800522
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800523 //
524 // Decode the value and deliver the notification
525 //
526 /*
527 Kryo kryo = kryoFactory.newKryo();
528 Input input = new Input(valueBytes);
529 TopologyElement topologyElement =
530 kryo.readObject(input, TopologyElement.class);
531 kryoFactory.deleteKryo(kryo);
532 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
533 */
534 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800535
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800536 /**
537 * Receive a notification that an entry is removed.
538 *
539 * @param event the notification event for the entry.
540 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800541 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800542 public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800543 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800544 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800545
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800546 /**
547 * Receive a notification that an entry is updated.
548 *
549 * @param event the notification event for the entry.
550 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800551 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800552 public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800553 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800554 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800555
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800556 /**
557 * Receive a notification that an entry is evicted.
558 *
559 * @param event the notification event for the entry.
560 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800561 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800562 public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800563 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800564 }
565 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700566
567 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700568 * Initialize the Hazelcast Datagrid operation.
569 *
570 * @param conf the configuration filename.
571 */
572 public void init(String configFilename) {
573 /*
574 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
575 System.setProperty("hazelcast.socket.send.buffer.size", "32");
576 */
577 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800578
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700579 // Init from configuration file
580 try {
581 hazelcastConfig = new FileSystemXmlConfig(configFilename);
582 } catch (FileNotFoundException e) {
583 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
584 }
585 /*
586 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
587 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
588 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
589 */
590 //
591 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
592 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
593 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
594 }
595
596 /**
597 * Shutdown the Hazelcast Datagrid operation.
598 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800599 @Override
600 protected void finalize() {
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700601 close();
602 }
603
604 /**
605 * Shutdown the Hazelcast Datagrid operation.
606 */
607 public void close() {
608 Hazelcast.shutdownAll();
609 }
610
611 /**
612 * Get the collection of offered module services.
613 *
614 * @return the collection of offered module services.
615 */
616 @Override
617 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800618 Collection<Class<? extends IFloodlightService>> l =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700619 new ArrayList<Class<? extends IFloodlightService>>();
620 l.add(IDatagridService.class);
621 return l;
622 }
623
624 /**
625 * Get the collection of implemented services.
626 *
627 * @return the collection of implemented services.
628 */
629 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800630 public Map<Class<? extends IFloodlightService>, IFloodlightService>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700631 getServiceImpls() {
632 Map<Class<? extends IFloodlightService>,
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800633 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700634 new HashMap<Class<? extends IFloodlightService>,
635 IFloodlightService>();
636 m.put(IDatagridService.class, this);
637 return m;
638 }
639
640 /**
641 * Get the collection of modules this module depends on.
642 *
643 * @return the collection of modules this module depends on.
644 */
645 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800646 public Collection<Class<? extends IFloodlightService>>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700647 getModuleDependencies() {
648 Collection<Class<? extends IFloodlightService>> l =
649 new ArrayList<Class<? extends IFloodlightService>>();
650 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700651 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700652 return l;
653 }
654
655 /**
656 * Initialize the module.
657 *
658 * @param context the module context to use for the initialization.
659 */
660 @Override
661 public void init(FloodlightModuleContext context)
662 throws FloodlightModuleException {
663 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700664 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700665
666 // Get the configuration file name and configure the Datagrid
667 Map<String, String> configMap = context.getConfigParams(this);
668 String configFilename = configMap.get(HazelcastConfigFile);
669 this.init(configFilename);
670 }
671
672 /**
673 * Startup module operation.
674 *
675 * @param context the module context to use for the startup.
676 */
677 @Override
678 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700679 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700680
681 restApi.addRestletRoutable(new DatagridWebRoutable());
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800682
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800683 arpMap = hazelcastInstance.getMap(arpMapName);
684 arpMap.addEntryListener(new ArpMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700685 }
686
687 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700688 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700689 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700690 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700691 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700692 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700693 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700694 */
695 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700696 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
697 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700698
699 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700700 mapFlowListener = new MapFlowListener();
701 mapFlow = hazelcastInstance.getMap(mapFlowName);
702 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700703
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700704 // Initialize the FlowEntry-related map state
705 mapFlowEntryListener = new MapFlowEntryListener();
706 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
707 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
708
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800709 // Initialize the FlowId-related map state
710 mapFlowIdListener = new MapFlowIdListener();
711 mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
712 mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
713
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800714 // Initialize the FlowEntryId-related map state
715 mapFlowEntryIdListener = new MapFlowEntryIdListener();
716 mapFlowEntryId = hazelcastInstance.getMap(mapFlowEntryIdName);
717 mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);
718
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700719 // Initialize the Topology-related map state
720 mapTopologyListener = new MapTopologyListener();
721 mapTopology = hazelcastInstance.getMap(mapTopologyName);
722 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700723 }
724
725 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700726 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700727 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700728 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700729 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700730 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700731 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700732 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700733 */
734 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700735 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700736 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700737 mapFlow.removeEntryListener(mapFlowListenerId);
738 mapFlow = null;
739 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700740
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700741 // Clear the FlowEntry-related map state
742 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
743 mapFlowEntry = null;
744 mapFlowEntryListener = null;
745
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800746 // Clear the FlowId-related map state
747 mapFlowId.removeEntryListener(mapFlowIdListenerId);
748 mapFlowId = null;
749 mapFlowIdListener = null;
750
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800751 // Clear the FlowEntryId-related map state
752 mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
753 mapFlowEntryId = null;
754 mapFlowEntryIdListener = null;
755
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700756 // Clear the Topology-related map state
757 mapTopology.removeEntryListener(mapTopologyListenerId);
758 mapTopology = null;
759 mapTopologyListener = null;
760
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700761 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700762 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800763
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800764 @Override
765 public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
766 if (arpEventHandler != null) {
767 arpEventHandlers.add(arpEventHandler);
768 }
769 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800770
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800771 @Override
772 public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
773 arpEventHandlers.remove(arpEventHandler);
774 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800775
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700776 /**
777 * Get all Flows that are currently in the datagrid.
778 *
779 * @return all Flows that are currently in the datagrid.
780 */
781 @Override
782 public Collection<FlowPath> getAllFlows() {
783 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
784
785 //
786 // Get all current entries
787 //
788 Collection<byte[]> values = mapFlow.values();
789 Kryo kryo = kryoFactory.newKryo();
790 for (byte[] valueBytes : values) {
791 //
792 // Decode the value
793 //
794 Input input = new Input(valueBytes);
795 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
796 allFlows.add(flowPath);
797 }
798 kryoFactory.deleteKryo(kryo);
799
800 return allFlows;
801 }
802
803 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800804 * Get a Flow for a given Flow ID.
805 *
806 * @param flowId the Flow ID of the Flow to get.
807 * @return the Flow if found, otherwise null.
808 */
809 @Override
810 public FlowPath getFlow(FlowId flowId) {
811 byte[] valueBytes = mapFlow.get(flowId.value());
812 if (valueBytes == null)
813 return null;
814
815 Kryo kryo = kryoFactory.newKryo();
816 //
817 // Decode the value
818 //
819 Input input = new Input(valueBytes);
820 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
821 kryoFactory.deleteKryo(kryo);
822
823 return flowPath;
824 }
825
826 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700827 * Send a notification that a Flow is added.
828 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700829 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700830 */
831 @Override
832 public void notificationSendFlowAdded(FlowPath flowPath) {
833 //
834 // Encode the value
835 //
836 byte[] buffer = new byte[MAX_BUFFER_SIZE];
837 Kryo kryo = kryoFactory.newKryo();
838 Output output = new Output(buffer, -1);
839 kryo.writeObject(output, flowPath);
840 byte[] valueBytes = output.toBytes();
841 kryoFactory.deleteKryo(kryo);
842
843 //
844 // Put the entry:
845 // - Key : Flow ID (Long)
846 // - Value : Serialized Flow (byte[])
847 //
848 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
849 }
850
851 /**
852 * Send a notification that a Flow is removed.
853 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700854 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700855 */
856 @Override
857 public void notificationSendFlowRemoved(FlowId flowId) {
858 //
859 // Remove the entry:
860 // - Key : Flow ID (Long)
861 // - Value : Serialized Flow (byte[])
862 //
863 mapFlow.removeAsync(flowId.value());
864 }
865
866 /**
867 * Send a notification that a Flow is updated.
868 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700869 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700870 */
871 @Override
872 public void notificationSendFlowUpdated(FlowPath flowPath) {
873 // NOTE: Adding an entry with an existing key automatically updates it
874 notificationSendFlowAdded(flowPath);
875 }
876
877 /**
878 * Send a notification that all Flows are removed.
879 */
880 @Override
881 public void notificationSendAllFlowsRemoved() {
882 //
883 // Remove all entries
884 // NOTE: We remove the entries one-by-one so the per-entry
885 // notifications will be delivered.
886 //
887 // mapFlow.clear();
888 Set<Long> keySet = mapFlow.keySet();
889 for (Long key : keySet) {
890 mapFlow.removeAsync(key);
891 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700892 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700893
894 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700895 * Get all Flow Entries that are currently in the datagrid.
896 *
897 * @return all Flow Entries that are currently in the datagrid.
898 */
899 @Override
900 public Collection<FlowEntry> getAllFlowEntries() {
901 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
902
903 //
904 // Get all current entries
905 //
906 Collection<byte[]> values = mapFlowEntry.values();
907 Kryo kryo = kryoFactory.newKryo();
908 for (byte[] valueBytes : values) {
909 //
910 // Decode the value
911 //
912 Input input = new Input(valueBytes);
913 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
914 allFlowEntries.add(flowEntry);
915 }
916 kryoFactory.deleteKryo(kryo);
917
918 return allFlowEntries;
919 }
920
921 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800922 * Get a Flow Entry for a given Flow Entry ID.
923 *
924 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
925 * @return the Flow Entry if found, otherwise null.
926 */
927 @Override
928 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
929 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
930 if (valueBytes == null)
931 return null;
932
933 Kryo kryo = kryoFactory.newKryo();
934 //
935 // Decode the value
936 //
937 Input input = new Input(valueBytes);
938 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
939 kryoFactory.deleteKryo(kryo);
940
941 return flowEntry;
942 }
943
944 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700945 * Send a notification that a FlowEntry is added.
946 *
947 * @param flowEntry the FlowEntry that is added.
948 */
949 @Override
950 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
951 //
952 // Encode the value
953 //
954 byte[] buffer = new byte[MAX_BUFFER_SIZE];
955 Kryo kryo = kryoFactory.newKryo();
956 Output output = new Output(buffer, -1);
957 kryo.writeObject(output, flowEntry);
958 byte[] valueBytes = output.toBytes();
959 kryoFactory.deleteKryo(kryo);
960
961 //
962 // Put the entry:
963 // - Key : FlowEntry ID (Long)
964 // - Value : Serialized FlowEntry (byte[])
965 //
966 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
967 }
968
969 /**
970 * Send a notification that a FlowEntry is removed.
971 *
972 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
973 */
974 @Override
975 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
976 //
977 // Remove the entry:
978 // - Key : FlowEntry ID (Long)
979 // - Value : Serialized FlowEntry (byte[])
980 //
981 mapFlowEntry.removeAsync(flowEntryId.value());
982 }
983
984 /**
985 * Send a notification that a FlowEntry is updated.
986 *
987 * @param flowEntry the FlowEntry that is updated.
988 */
989 @Override
990 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
991 // NOTE: Adding an entry with an existing key automatically updates it
992 notificationSendFlowEntryAdded(flowEntry);
993 }
994
995 /**
996 * Send a notification that all Flow Entries are removed.
997 */
998 @Override
999 public void notificationSendAllFlowEntriesRemoved() {
1000 //
1001 // Remove all entries
1002 // NOTE: We remove the entries one-by-one so the per-entry
1003 // notifications will be delivered.
1004 //
1005 // mapFlowEntry.clear();
1006 Set<Long> keySet = mapFlowEntry.keySet();
1007 for (Long key : keySet) {
1008 mapFlowEntry.removeAsync(key);
1009 }
1010 }
1011
1012 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001013 * Get all Flow IDs that are currently in the datagrid.
1014 *
1015 * @return all Flow IDs that are currently in the datagrid.
1016 */
1017 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001018 public Collection<Pair<FlowId, Dpid>> getAllFlowIds() {
1019 Collection<Pair<FlowId, Dpid>> allFlowIds =
1020 new LinkedList<Pair<FlowId, Dpid>>();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001021
1022 //
1023 // Get all current entries
1024 //
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001025 Kryo kryo = kryoFactory.newKryo();
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001026 for (Map.Entry<Long, byte[]> entry : mapFlowId.entrySet()) {
1027 Long key = entry.getKey();
1028 byte[] valueBytes = entry.getValue();
1029
1030 FlowId flowId = new FlowId(key);
1031
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001032 //
1033 // Decode the value
1034 //
1035 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001036 Dpid dpid = kryo.readObject(input, Dpid.class);
1037
1038 Pair<FlowId, Dpid> pair = new Pair(flowId, dpid);
1039 allFlowIds.add(pair);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001040 }
1041 kryoFactory.deleteKryo(kryo);
1042
1043 return allFlowIds;
1044 }
1045
1046 /**
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001047 * Get all Flow Entry IDs that are currently in the datagrid.
1048 *
1049 * @return all Flow Entry IDs that ae currently in the datagrid.
1050 */
1051 @Override
1052 public Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds() {
1053 Collection<Pair<FlowEntryId, Dpid>> allFlowEntryIds =
1054 new LinkedList<Pair<FlowEntryId, Dpid>>();
1055
1056 //
1057 // Get all current entries
1058 //
1059 Kryo kryo = kryoFactory.newKryo();
1060 for (Map.Entry<Long, byte[]> entry : mapFlowEntryId.entrySet()) {
1061 Long key = entry.getKey();
1062 byte[] valueBytes = entry.getValue();
1063
1064 FlowEntryId flowEntryId = new FlowEntryId(key);
1065
1066 //
1067 // Decode the value
1068 //
1069 Input input = new Input(valueBytes);
1070 Dpid dpid = kryo.readObject(input, Dpid.class);
1071
1072 Pair<FlowEntryId, Dpid> pair = new Pair(flowEntryId, dpid);
1073 allFlowEntryIds.add(pair);
1074 }
1075 kryoFactory.deleteKryo(kryo);
1076
1077 return allFlowEntryIds;
1078 }
1079
1080 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001081 * Send a notification that a FlowId is added.
1082 *
1083 * @param flowId the FlowId that is added.
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001084 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001085 */
1086 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001087 public void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid) {
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001088 //
1089 // Encode the value
1090 //
1091 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1092 Kryo kryo = kryoFactory.newKryo();
1093 Output output = new Output(buffer, -1);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001094 kryo.writeObject(output, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001095 byte[] valueBytes = output.toBytes();
1096 kryoFactory.deleteKryo(kryo);
1097
1098 //
1099 // Put the entry:
1100 // - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001101 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001102 //
1103 mapFlowId.putAsync(flowId.value(), valueBytes);
1104 }
1105
1106 /**
1107 * Send a notification that a FlowId is removed.
1108 *
1109 * @param flowId the FlowId that is removed.
1110 */
1111 @Override
1112 public void notificationSendFlowIdRemoved(FlowId flowId) {
1113 //
1114 // Remove the entry:
1115 // - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001116 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001117 //
1118 mapFlowId.removeAsync(flowId.value());
1119 }
1120
1121 /**
1122 * Send a notification that a FlowId is updated.
1123 *
1124 * @param flowId the FlowId that is updated.
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001125 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001126 */
1127 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001128 public void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid) {
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001129 // NOTE: Adding an entry with an existing key automatically updates it
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001130 notificationSendFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001131 }
1132
1133 /**
1134 * Send a notification that all Flow IDs are removed.
1135 */
1136 @Override
1137 public void notificationSendAllFlowIdsRemoved() {
1138 //
1139 // Remove all entries
1140 // NOTE: We remove the entries one-by-one so the per-entry
1141 // notifications will be delivered.
1142 //
1143 // mapFlowId.clear();
1144 Set<Long> keySet = mapFlowId.keySet();
1145 for (Long key : keySet) {
1146 mapFlowId.removeAsync(key);
1147 }
1148 }
1149
1150 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001151 * Send a notification that a FlowEntryId is added.
1152 *
1153 * @param flowEntryId the FlowEntryId that is added.
1154 * @param dpid the Switch Dpid.
1155 */
1156 @Override
1157 public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
1158 Dpid dpid) {
1159 //
1160 // Encode the value
1161 //
1162 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1163 Kryo kryo = kryoFactory.newKryo();
1164 Output output = new Output(buffer, -1);
1165 kryo.writeObject(output, dpid);
1166 byte[] valueBytes = output.toBytes();
1167 kryoFactory.deleteKryo(kryo);
1168
1169 //
1170 // Put the entry:
1171 // - Key : FlowEntryId (Long)
1172 // - Value : Serialized Switch Dpid (byte[])
1173 //
1174 mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
1175 }
1176
1177 /**
1178 * Send a notification that a FlowEntryId is removed.
1179 *
1180 * @param flowEntryId the FlowEntryId that is removed.
1181 */
1182 @Override
1183 public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
1184 //
1185 // Remove the entry:
1186 // - Key : FlowEntryId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001187 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001188 //
1189 mapFlowEntryId.removeAsync(flowEntryId.value());
1190 }
1191
1192 /**
1193 * Send a notification that a FlowEntryId is updated.
1194 *
1195 * @param flowEntryId the FlowEntryId that is updated.
1196 * @param dpid the Switch Dpid.
1197 */
1198 @Override
1199 public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
1200 Dpid dpid) {
1201 // NOTE: Adding an entry with an existing key automatically updates it
1202 notificationSendFlowEntryIdAdded(flowEntryId, dpid);
1203 }
1204
1205 /**
1206 * Send a notification that all Flow Entry IDs are removed.
1207 */
1208 @Override
1209 public void notificationSendAllFlowEntryIdsRemoved() {
1210 //
1211 // Remove all entries
1212 // NOTE: We remove the entries one-by-one so the per-entry
1213 // notifications will be delivered.
1214 //
1215 // mapFlowEntryId.clear();
1216 Set<Long> keySet = mapFlowEntryId.keySet();
1217 for (Long key : keySet) {
1218 mapFlowEntryId.removeAsync(key);
1219 }
1220 }
1221
1222 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001223 * Get all Topology Elements that are currently in the datagrid.
1224 *
1225 * @return all Topology Elements that are currently in the datagrid.
1226 */
1227 @Override
1228 public Collection<TopologyElement> getAllTopologyElements() {
1229 Collection<TopologyElement> allTopologyElements =
1230 new LinkedList<TopologyElement>();
1231
1232 //
1233 // Get all current entries
1234 //
1235 Collection<byte[]> values = mapTopology.values();
1236 Kryo kryo = kryoFactory.newKryo();
1237 for (byte[] valueBytes : values) {
1238 //
1239 // Decode the value
1240 //
1241 Input input = new Input(valueBytes);
1242 TopologyElement topologyElement =
1243 kryo.readObject(input, TopologyElement.class);
1244 allTopologyElements.add(topologyElement);
1245 }
1246 kryoFactory.deleteKryo(kryo);
1247
1248 return allTopologyElements;
1249 }
1250
1251 /**
1252 * Send a notification that a Topology Element is added.
1253 *
1254 * @param topologyElement the Topology Element that is added.
1255 */
1256 @Override
1257 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
1258 //
1259 // Encode the value
1260 //
1261 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1262 Kryo kryo = kryoFactory.newKryo();
1263 Output output = new Output(buffer, -1);
1264 kryo.writeObject(output, topologyElement);
1265 byte[] valueBytes = output.toBytes();
1266 kryoFactory.deleteKryo(kryo);
1267
1268 //
1269 // Put the entry:
1270 // - Key : TopologyElement ID (String)
1271 // - Value : Serialized TopologyElement (byte[])
1272 //
1273 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
1274 }
1275
1276 /**
1277 * Send a notification that a Topology Element is removed.
1278 *
1279 * @param topologyElement the Topology Element that is removed.
1280 */
1281 @Override
1282 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
1283 //
1284 // Remove the entry:
1285 // - Key : TopologyElement ID (String)
1286 // - Value : Serialized TopologyElement (byte[])
1287 //
1288 mapTopology.removeAsync(topologyElement.elementId());
1289 }
1290
1291 /**
1292 * Send a notification that a Topology Element is updated.
1293 *
1294 * @param topologyElement the Topology Element that is updated.
1295 */
1296 @Override
1297 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
1298 // NOTE: Adding an entry with an existing key automatically updates it
1299 notificationSendTopologyElementAdded(topologyElement);
1300 }
1301
1302 /**
1303 * Send a notification that all Topology Elements are removed.
1304 */
1305 @Override
1306 public void notificationSendAllTopologyElementsRemoved() {
1307 //
1308 // Remove all entries
1309 // NOTE: We remove the entries one-by-one so the per-entry
1310 // notifications will be delivered.
1311 //
1312 // mapTopology.clear();
1313 Set<String> keySet = mapTopology.keySet();
1314 for (String key : keySet) {
1315 mapTopology.removeAsync(key);
1316 }
1317 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -08001318
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001319 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -08001320 public void sendArpRequest(ArpMessage arpMessage) {
1321 //log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
1322 arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001323 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001324}