blob: c195f82fbdaae8f1e936f66de4baee6e9fd4ec24 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hartd3003252013-11-15 09:44:46 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080022import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070023import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080024import net.onrc.onos.ofcontroller.util.Dpid;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070025import net.onrc.onos.ofcontroller.util.FlowEntry;
26import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070027import net.onrc.onos.ofcontroller.util.FlowId;
28import net.onrc.onos.ofcontroller.util.FlowPath;
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -080029import net.onrc.onos.ofcontroller.util.Pair;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070030import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
31
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070032import org.slf4j.Logger;
33import org.slf4j.LoggerFactory;
34
Jonathan Hart18ad55c2013-11-11 22:49:55 -080035import com.esotericsoftware.kryo2.Kryo;
36import com.esotericsoftware.kryo2.io.Input;
37import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070038import com.hazelcast.config.Config;
39import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070040import com.hazelcast.core.EntryEvent;
41import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070042import com.hazelcast.core.Hazelcast;
43import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070044import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070045import com.hazelcast.instance.GroupProperties;
46
47/**
48 * A datagrid service that uses Hazelcast as a datagrid.
49 * The relevant data is stored in the Hazelcast datagrid and shared as
50 * appropriate in a multi-node cluster.
51 */
52public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070053 private final static int MAX_BUFFER_SIZE = 64*1024;
54
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070055 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070056 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070057 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070058
59 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070060 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070061 private Config hazelcastConfig = null;
62
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070063 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070064 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070065
66 // State related to the Flow map
67 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070068 private IMap<Long, byte[]> mapFlow = null;
69 private MapFlowListener mapFlowListener = null;
70 private String mapFlowListenerId = null;
71
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070072 // State related to the Flow Entry map
73 protected static final String mapFlowEntryName = "mapFlowEntry";
74 private IMap<Long, byte[]> mapFlowEntry = null;
75 private MapFlowEntryListener mapFlowEntryListener = null;
76 private String mapFlowEntryListenerId = null;
77
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -080078 // State related to the Flow ID map
79 protected static final String mapFlowIdName = "mapFlowId";
80 private IMap<Long, byte[]> mapFlowId = null;
81 private MapFlowIdListener mapFlowIdListener = null;
82 private String mapFlowIdListenerId = null;
83
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080084 // State related to the Flow Entry ID map
85 protected static final String mapFlowEntryIdName = "mapFlowEntryId";
86 private IMap<Long, byte[]> mapFlowEntryId = null;
87 private MapFlowEntryIdListener mapFlowEntryIdListener = null;
88 private String mapFlowEntryIdListenerId = null;
89
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070090 // State related to the Network Topology map
91 protected static final String mapTopologyName = "mapTopology";
92 private IMap<String, byte[]> mapTopology = null;
93 private MapTopologyListener mapTopologyListener = null;
94 private String mapTopologyListenerId = null;
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -080095
Jonathan Hart18ad55c2013-11-11 22:49:55 -080096 // State related to the ARP map
97 protected static final String arpMapName = "arpMap";
Jonathan Hartd3003252013-11-15 09:44:46 -080098 private IMap<ArpMessage, byte[]> arpMap = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080099 private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
100 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700101
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700102 /**
103 * Class for receiving notifications for Flow state.
104 *
105 * The datagrid map is:
106 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -0800107 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700108 */
109 class MapFlowListener implements EntryListener<Long, byte[]> {
110 /**
111 * Receive a notification that an entry is added.
112 *
113 * @param event the notification event for the entry.
114 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800115 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800116 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800117 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700118
119 //
120 // Decode the value and deliver the notification
121 //
122 Kryo kryo = kryoFactory.newKryo();
123 Input input = new Input(valueBytes);
124 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
125 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700126 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700127 }
128
129 /**
130 * Receive a notification that an entry is removed.
131 *
132 * @param event the notification event for the entry.
133 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800134 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800135 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800136 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700137
138 //
139 // Decode the value and deliver the notification
140 //
141 Kryo kryo = kryoFactory.newKryo();
142 Input input = new Input(valueBytes);
143 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
144 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700145 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700146 }
147
148 /**
149 * Receive a notification that an entry is updated.
150 *
151 * @param event the notification event for the entry.
152 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800153 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800154 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800155 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700156
157 //
158 // Decode the value and deliver the notification
159 //
160 Kryo kryo = kryoFactory.newKryo();
161 Input input = new Input(valueBytes);
162 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
163 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700164 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700165 }
166
167 /**
168 * Receive a notification that an entry is evicted.
169 *
170 * @param event the notification event for the entry.
171 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800172 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800173 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700174 // NOTE: We don't use eviction for this map
175 }
176 }
177
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700178 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700179 * Class for receiving notifications for FlowEntry state.
180 *
181 * The datagrid map is:
182 * - Key : FlowEntry ID (Long)
183 * - Value : Serialized FlowEntry (byte[])
184 */
185 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
186 /**
187 * Receive a notification that an entry is added.
188 *
189 * @param event the notification event for the entry.
190 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800191 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800192 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800193 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700194
195 //
196 // Decode the value and deliver the notification
197 //
198 Kryo kryo = kryoFactory.newKryo();
199 Input input = new Input(valueBytes);
200 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
201 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700202 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700203 }
204
205 /**
206 * Receive a notification that an entry is removed.
207 *
208 * @param event the notification event for the entry.
209 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800210 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800211 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800212 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700213
214 //
215 // Decode the value and deliver the notification
216 //
217 Kryo kryo = kryoFactory.newKryo();
218 Input input = new Input(valueBytes);
219 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
220 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700221 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700222 }
223
224 /**
225 * Receive a notification that an entry is updated.
226 *
227 * @param event the notification event for the entry.
228 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800229 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800230 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800231 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700232
233 //
234 // Decode the value and deliver the notification
235 //
236 Kryo kryo = kryoFactory.newKryo();
237 Input input = new Input(valueBytes);
238 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
239 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700240 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700241 }
242
243 /**
244 * Receive a notification that an entry is evicted.
245 *
246 * @param event the notification event for the entry.
247 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800248 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800249 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700250 // NOTE: We don't use eviction for this map
251 }
252 }
253
254 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800255 * Class for receiving notifications for FlowId state.
256 *
257 * The datagrid map is:
258 * - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800259 * - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800260 */
261 class MapFlowIdListener implements EntryListener<Long, byte[]> {
262 /**
263 * Receive a notification that an entry is added.
264 *
265 * @param event the notification event for the entry.
266 */
267 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800268 Long keyLong = event.getKey();
269 FlowId flowId = new FlowId(keyLong);
270
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800271 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800272
273 //
274 // Decode the value and deliver the notification
275 //
276 Kryo kryo = kryoFactory.newKryo();
277 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800278 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800279 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800280 flowEventHandlerService.notificationRecvFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800281 }
282
283 /**
284 * Receive a notification that an entry is removed.
285 *
286 * @param event the notification event for the entry.
287 */
288 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800289 Long keyLong = event.getKey();
290 FlowId flowId = new FlowId(keyLong);
291
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800292 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800293
294 //
295 // Decode the value and deliver the notification
296 //
297 Kryo kryo = kryoFactory.newKryo();
298 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800299 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800300 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800301 flowEventHandlerService.notificationRecvFlowIdRemoved(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800302 }
303
304 /**
305 * Receive a notification that an entry is updated.
306 *
307 * @param event the notification event for the entry.
308 */
309 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800310 Long keyLong = event.getKey();
311 FlowId flowId = new FlowId(keyLong);
312
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800313 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800314
315 //
316 // Decode the value and deliver the notification
317 //
318 Kryo kryo = kryoFactory.newKryo();
319 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800320 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800321 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800322 flowEventHandlerService.notificationRecvFlowIdUpdated(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800323 }
324
325 /**
326 * Receive a notification that an entry is evicted.
327 *
328 * @param event the notification event for the entry.
329 */
330 public void entryEvicted(EntryEvent<Long, byte[]> event) {
331 // NOTE: We don't use eviction for this map
332 }
333 }
334
335 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800336 * Class for receiving notifications for FlowEntryId state.
337 *
338 * The datagrid map is:
339 * - Key : FlowEntryId (Long)
340 * - Value : Serialized Switch Dpid (byte[])
341 */
342 class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
343 /**
344 * Receive a notification that an entry is added.
345 *
346 * @param event the notification event for the entry.
347 */
348 public void entryAdded(EntryEvent<Long, byte[]> event) {
349 Long keyLong = event.getKey();
350 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
351
352 byte[] valueBytes = event.getValue();
353
354 //
355 // Decode the value and deliver the notification
356 //
357 Kryo kryo = kryoFactory.newKryo();
358 Input input = new Input(valueBytes);
359 Dpid dpid = kryo.readObject(input, Dpid.class);
360 kryoFactory.deleteKryo(kryo);
361 flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
362 }
363
364 /**
365 * Receive a notification that an entry is removed.
366 *
367 * @param event the notification event for the entry.
368 */
369 public void entryRemoved(EntryEvent<Long, byte[]> event) {
370 Long keyLong = event.getKey();
371 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
372
373 byte[] valueBytes = event.getValue();
374
375 //
376 // Decode the value and deliver the notification
377 //
378 Kryo kryo = kryoFactory.newKryo();
379 Input input = new Input(valueBytes);
380 Dpid dpid = kryo.readObject(input, Dpid.class);
381 kryoFactory.deleteKryo(kryo);
382 flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
383 }
384
385 /**
386 * Receive a notification that an entry is updated.
387 *
388 * @param event the notification event for the entry.
389 */
390 public void entryUpdated(EntryEvent<Long, byte[]> event) {
391 Long keyLong = event.getKey();
392 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
393
394 byte[] valueBytes = event.getValue();
395
396 //
397 // Decode the value and deliver the notification
398 //
399 Kryo kryo = kryoFactory.newKryo();
400 Input input = new Input(valueBytes);
401 Dpid dpid = kryo.readObject(input, Dpid.class);
402 kryoFactory.deleteKryo(kryo);
403 flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
404 }
405
406 /**
407 * Receive a notification that an entry is evicted.
408 *
409 * @param event the notification event for the entry.
410 */
411 public void entryEvicted(EntryEvent<Long, byte[]> event) {
412 // NOTE: We don't use eviction for this map
413 }
414 }
415
416 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700417 * Class for receiving notifications for Network Topology state.
418 *
419 * The datagrid map is:
420 * - Key: TopologyElement ID (String)
421 * - Value: Serialized TopologyElement (byte[])
422 */
423 class MapTopologyListener implements EntryListener<String, byte[]> {
424 /**
425 * Receive a notification that an entry is added.
426 *
427 * @param event the notification event for the entry.
428 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800429 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800430 public void entryAdded(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800431 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700432
433 //
434 // Decode the value and deliver the notification
435 //
436 Kryo kryo = kryoFactory.newKryo();
437 Input input = new Input(valueBytes);
438 TopologyElement topologyElement =
439 kryo.readObject(input, TopologyElement.class);
440 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700441 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700442 }
443
444 /**
445 * Receive a notification that an entry is removed.
446 *
447 * @param event the notification event for the entry.
448 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800449 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800450 public void entryRemoved(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800451 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700452
453 //
454 // Decode the value and deliver the notification
455 //
456 Kryo kryo = kryoFactory.newKryo();
457 Input input = new Input(valueBytes);
458 TopologyElement topologyElement =
459 kryo.readObject(input, TopologyElement.class);
460 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700461 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700462 }
463
464 /**
465 * Receive a notification that an entry is updated.
466 *
467 * @param event the notification event for the entry.
468 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800469 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800470 public void entryUpdated(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800471 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700472
473 //
474 // Decode the value and deliver the notification
475 //
476 Kryo kryo = kryoFactory.newKryo();
477 Input input = new Input(valueBytes);
478 TopologyElement topologyElement =
479 kryo.readObject(input, TopologyElement.class);
480 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700481 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700482 }
483
484 /**
485 * Receive a notification that an entry is evicted.
486 *
487 * @param event the notification event for the entry.
488 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800489 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800490 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700491 // NOTE: We don't use eviction for this map
492 }
493 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800494
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800495 /**
496 * Class for receiving notifications for ARP requests.
497 *
498 * The datagrid map is:
499 * - Key: Request ID (String)
500 * - Value: ARP request packet (byte[])
501 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800502 class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800503 /**
504 * Receive a notification that an entry is added.
505 *
506 * @param event the notification event for the entry.
507 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800508 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800509 public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800510 for (IArpEventHandler arpEventHandler : arpEventHandlers) {
511 arpEventHandler.arpRequestNotification(event.getKey());
512 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800513
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800514 //
515 // Decode the value and deliver the notification
516 //
517 /*
518 Kryo kryo = kryoFactory.newKryo();
519 Input input = new Input(valueBytes);
520 TopologyElement topologyElement =
521 kryo.readObject(input, TopologyElement.class);
522 kryoFactory.deleteKryo(kryo);
523 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
524 */
525 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800526
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800527 /**
528 * Receive a notification that an entry is removed.
529 *
530 * @param event the notification event for the entry.
531 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800532 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800533 public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800534 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800535 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800536
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800537 /**
538 * Receive a notification that an entry is updated.
539 *
540 * @param event the notification event for the entry.
541 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800542 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800543 public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800544 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800545 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800546
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800547 /**
548 * Receive a notification that an entry is evicted.
549 *
550 * @param event the notification event for the entry.
551 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800552 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800553 public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800554 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800555 }
556 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700557
558 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700559 * Initialize the Hazelcast Datagrid operation.
560 *
561 * @param conf the configuration filename.
562 */
563 public void init(String configFilename) {
564 /*
565 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
566 System.setProperty("hazelcast.socket.send.buffer.size", "32");
567 */
568 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800569
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700570 // Init from configuration file
571 try {
572 hazelcastConfig = new FileSystemXmlConfig(configFilename);
573 } catch (FileNotFoundException e) {
574 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
575 }
576 /*
577 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
578 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
579 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
580 */
581 //
582 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
583 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
584 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
585 }
586
587 /**
588 * Shutdown the Hazelcast Datagrid operation.
589 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800590 @Override
591 protected void finalize() {
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700592 close();
593 }
594
595 /**
596 * Shutdown the Hazelcast Datagrid operation.
597 */
598 public void close() {
599 Hazelcast.shutdownAll();
600 }
601
602 /**
603 * Get the collection of offered module services.
604 *
605 * @return the collection of offered module services.
606 */
607 @Override
608 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800609 Collection<Class<? extends IFloodlightService>> l =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700610 new ArrayList<Class<? extends IFloodlightService>>();
611 l.add(IDatagridService.class);
612 return l;
613 }
614
615 /**
616 * Get the collection of implemented services.
617 *
618 * @return the collection of implemented services.
619 */
620 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800621 public Map<Class<? extends IFloodlightService>, IFloodlightService>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700622 getServiceImpls() {
623 Map<Class<? extends IFloodlightService>,
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800624 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700625 new HashMap<Class<? extends IFloodlightService>,
626 IFloodlightService>();
627 m.put(IDatagridService.class, this);
628 return m;
629 }
630
631 /**
632 * Get the collection of modules this module depends on.
633 *
634 * @return the collection of modules this module depends on.
635 */
636 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800637 public Collection<Class<? extends IFloodlightService>>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700638 getModuleDependencies() {
639 Collection<Class<? extends IFloodlightService>> l =
640 new ArrayList<Class<? extends IFloodlightService>>();
641 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700642 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700643 return l;
644 }
645
646 /**
647 * Initialize the module.
648 *
649 * @param context the module context to use for the initialization.
650 */
651 @Override
652 public void init(FloodlightModuleContext context)
653 throws FloodlightModuleException {
654 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700655 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700656
657 // Get the configuration file name and configure the Datagrid
658 Map<String, String> configMap = context.getConfigParams(this);
659 String configFilename = configMap.get(HazelcastConfigFile);
660 this.init(configFilename);
661 }
662
663 /**
664 * Startup module operation.
665 *
666 * @param context the module context to use for the startup.
667 */
668 @Override
669 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700670 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700671
672 restApi.addRestletRoutable(new DatagridWebRoutable());
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800673
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800674 arpMap = hazelcastInstance.getMap(arpMapName);
675 arpMap.addEntryListener(new ArpMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700676 }
677
678 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700679 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700680 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700681 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700682 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700683 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700684 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700685 */
686 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700687 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
688 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700689
690 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700691 mapFlowListener = new MapFlowListener();
692 mapFlow = hazelcastInstance.getMap(mapFlowName);
693 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700694
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700695 // Initialize the FlowEntry-related map state
696 mapFlowEntryListener = new MapFlowEntryListener();
697 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
698 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
699
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800700 // Initialize the FlowId-related map state
701 mapFlowIdListener = new MapFlowIdListener();
702 mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
703 mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
704
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800705 // Initialize the FlowEntryId-related map state
706 mapFlowEntryIdListener = new MapFlowEntryIdListener();
707 mapFlowEntryId = hazelcastInstance.getMap(mapFlowEntryIdName);
708 mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);
709
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700710 // Initialize the Topology-related map state
711 mapTopologyListener = new MapTopologyListener();
712 mapTopology = hazelcastInstance.getMap(mapTopologyName);
713 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700714 }
715
716 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700717 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700718 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700719 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700720 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700721 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700722 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700723 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700724 */
725 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700726 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700727 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700728 mapFlow.removeEntryListener(mapFlowListenerId);
729 mapFlow = null;
730 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700731
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700732 // Clear the FlowEntry-related map state
733 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
734 mapFlowEntry = null;
735 mapFlowEntryListener = null;
736
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800737 // Clear the FlowId-related map state
738 mapFlowId.removeEntryListener(mapFlowIdListenerId);
739 mapFlowId = null;
740 mapFlowIdListener = null;
741
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800742 // Clear the FlowEntryId-related map state
743 mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
744 mapFlowEntryId = null;
745 mapFlowEntryIdListener = null;
746
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700747 // Clear the Topology-related map state
748 mapTopology.removeEntryListener(mapTopologyListenerId);
749 mapTopology = null;
750 mapTopologyListener = null;
751
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700752 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700753 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800754
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800755 @Override
756 public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
757 if (arpEventHandler != null) {
758 arpEventHandlers.add(arpEventHandler);
759 }
760 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800761
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800762 @Override
763 public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
764 arpEventHandlers.remove(arpEventHandler);
765 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800766
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700767 /**
768 * Get all Flows that are currently in the datagrid.
769 *
770 * @return all Flows that are currently in the datagrid.
771 */
772 @Override
773 public Collection<FlowPath> getAllFlows() {
774 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
775
776 //
777 // Get all current entries
778 //
779 Collection<byte[]> values = mapFlow.values();
780 Kryo kryo = kryoFactory.newKryo();
781 for (byte[] valueBytes : values) {
782 //
783 // Decode the value
784 //
785 Input input = new Input(valueBytes);
786 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
787 allFlows.add(flowPath);
788 }
789 kryoFactory.deleteKryo(kryo);
790
791 return allFlows;
792 }
793
794 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800795 * Get a Flow for a given Flow ID.
796 *
797 * @param flowId the Flow ID of the Flow to get.
798 * @return the Flow if found, otherwise null.
799 */
800 @Override
801 public FlowPath getFlow(FlowId flowId) {
802 byte[] valueBytes = mapFlow.get(flowId.value());
803 if (valueBytes == null)
804 return null;
805
806 Kryo kryo = kryoFactory.newKryo();
807 //
808 // Decode the value
809 //
810 Input input = new Input(valueBytes);
811 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
812 kryoFactory.deleteKryo(kryo);
813
814 return flowPath;
815 }
816
817 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700818 * Send a notification that a Flow is added.
819 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700820 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700821 */
822 @Override
823 public void notificationSendFlowAdded(FlowPath flowPath) {
824 //
825 // Encode the value
826 //
827 byte[] buffer = new byte[MAX_BUFFER_SIZE];
828 Kryo kryo = kryoFactory.newKryo();
829 Output output = new Output(buffer, -1);
830 kryo.writeObject(output, flowPath);
831 byte[] valueBytes = output.toBytes();
832 kryoFactory.deleteKryo(kryo);
833
834 //
835 // Put the entry:
836 // - Key : Flow ID (Long)
837 // - Value : Serialized Flow (byte[])
838 //
839 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
840 }
841
842 /**
843 * Send a notification that a Flow is removed.
844 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700845 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700846 */
847 @Override
848 public void notificationSendFlowRemoved(FlowId flowId) {
849 //
850 // Remove the entry:
851 // - Key : Flow ID (Long)
852 // - Value : Serialized Flow (byte[])
853 //
854 mapFlow.removeAsync(flowId.value());
855 }
856
857 /**
858 * Send a notification that a Flow is updated.
859 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700860 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700861 */
862 @Override
863 public void notificationSendFlowUpdated(FlowPath flowPath) {
864 // NOTE: Adding an entry with an existing key automatically updates it
865 notificationSendFlowAdded(flowPath);
866 }
867
868 /**
869 * Send a notification that all Flows are removed.
870 */
871 @Override
872 public void notificationSendAllFlowsRemoved() {
873 //
874 // Remove all entries
875 // NOTE: We remove the entries one-by-one so the per-entry
876 // notifications will be delivered.
877 //
878 // mapFlow.clear();
879 Set<Long> keySet = mapFlow.keySet();
880 for (Long key : keySet) {
881 mapFlow.removeAsync(key);
882 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700883 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700884
885 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700886 * Get all Flow Entries that are currently in the datagrid.
887 *
888 * @return all Flow Entries that are currently in the datagrid.
889 */
890 @Override
891 public Collection<FlowEntry> getAllFlowEntries() {
892 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
893
894 //
895 // Get all current entries
896 //
897 Collection<byte[]> values = mapFlowEntry.values();
898 Kryo kryo = kryoFactory.newKryo();
899 for (byte[] valueBytes : values) {
900 //
901 // Decode the value
902 //
903 Input input = new Input(valueBytes);
904 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
905 allFlowEntries.add(flowEntry);
906 }
907 kryoFactory.deleteKryo(kryo);
908
909 return allFlowEntries;
910 }
911
912 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800913 * Get a Flow Entry for a given Flow Entry ID.
914 *
915 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
916 * @return the Flow Entry if found, otherwise null.
917 */
918 @Override
919 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
920 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
921 if (valueBytes == null)
922 return null;
923
924 Kryo kryo = kryoFactory.newKryo();
925 //
926 // Decode the value
927 //
928 Input input = new Input(valueBytes);
929 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
930 kryoFactory.deleteKryo(kryo);
931
932 return flowEntry;
933 }
934
935 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700936 * Send a notification that a FlowEntry is added.
937 *
938 * @param flowEntry the FlowEntry that is added.
939 */
940 @Override
941 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
942 //
943 // Encode the value
944 //
945 byte[] buffer = new byte[MAX_BUFFER_SIZE];
946 Kryo kryo = kryoFactory.newKryo();
947 Output output = new Output(buffer, -1);
948 kryo.writeObject(output, flowEntry);
949 byte[] valueBytes = output.toBytes();
950 kryoFactory.deleteKryo(kryo);
951
952 //
953 // Put the entry:
954 // - Key : FlowEntry ID (Long)
955 // - Value : Serialized FlowEntry (byte[])
956 //
957 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
958 }
959
960 /**
961 * Send a notification that a FlowEntry is removed.
962 *
963 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
964 */
965 @Override
966 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
967 //
968 // Remove the entry:
969 // - Key : FlowEntry ID (Long)
970 // - Value : Serialized FlowEntry (byte[])
971 //
972 mapFlowEntry.removeAsync(flowEntryId.value());
973 }
974
975 /**
976 * Send a notification that a FlowEntry is updated.
977 *
978 * @param flowEntry the FlowEntry that is updated.
979 */
980 @Override
981 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
982 // NOTE: Adding an entry with an existing key automatically updates it
983 notificationSendFlowEntryAdded(flowEntry);
984 }
985
986 /**
987 * Send a notification that all Flow Entries are removed.
988 */
989 @Override
990 public void notificationSendAllFlowEntriesRemoved() {
991 //
992 // Remove all entries
993 // NOTE: We remove the entries one-by-one so the per-entry
994 // notifications will be delivered.
995 //
996 // mapFlowEntry.clear();
997 Set<Long> keySet = mapFlowEntry.keySet();
998 for (Long key : keySet) {
999 mapFlowEntry.removeAsync(key);
1000 }
1001 }
1002
1003 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001004 * Get all Flow IDs that are currently in the datagrid.
1005 *
1006 * @return all Flow IDs that are currently in the datagrid.
1007 */
1008 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001009 public Collection<Pair<FlowId, Dpid>> getAllFlowIds() {
1010 Collection<Pair<FlowId, Dpid>> allFlowIds =
1011 new LinkedList<Pair<FlowId, Dpid>>();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001012
1013 //
1014 // Get all current entries
1015 //
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001016 Kryo kryo = kryoFactory.newKryo();
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001017 for (Map.Entry<Long, byte[]> entry : mapFlowId.entrySet()) {
1018 Long key = entry.getKey();
1019 byte[] valueBytes = entry.getValue();
1020
1021 FlowId flowId = new FlowId(key);
1022
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001023 //
1024 // Decode the value
1025 //
1026 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001027 Dpid dpid = kryo.readObject(input, Dpid.class);
1028
1029 Pair<FlowId, Dpid> pair = new Pair(flowId, dpid);
1030 allFlowIds.add(pair);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001031 }
1032 kryoFactory.deleteKryo(kryo);
1033
1034 return allFlowIds;
1035 }
1036
1037 /**
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001038 * Get all Flow Entry IDs that are currently in the datagrid.
1039 *
1040 * @return all Flow Entry IDs that ae currently in the datagrid.
1041 */
1042 @Override
1043 public Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds() {
1044 Collection<Pair<FlowEntryId, Dpid>> allFlowEntryIds =
1045 new LinkedList<Pair<FlowEntryId, Dpid>>();
1046
1047 //
1048 // Get all current entries
1049 //
1050 Kryo kryo = kryoFactory.newKryo();
1051 for (Map.Entry<Long, byte[]> entry : mapFlowEntryId.entrySet()) {
1052 Long key = entry.getKey();
1053 byte[] valueBytes = entry.getValue();
1054
1055 FlowEntryId flowEntryId = new FlowEntryId(key);
1056
1057 //
1058 // Decode the value
1059 //
1060 Input input = new Input(valueBytes);
1061 Dpid dpid = kryo.readObject(input, Dpid.class);
1062
1063 Pair<FlowEntryId, Dpid> pair = new Pair(flowEntryId, dpid);
1064 allFlowEntryIds.add(pair);
1065 }
1066 kryoFactory.deleteKryo(kryo);
1067
1068 return allFlowEntryIds;
1069 }
1070
1071 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001072 * Send a notification that a FlowId is added.
1073 *
1074 * @param flowId the FlowId that is added.
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001075 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001076 */
1077 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001078 public void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid) {
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001079 //
1080 // Encode the value
1081 //
1082 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1083 Kryo kryo = kryoFactory.newKryo();
1084 Output output = new Output(buffer, -1);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001085 kryo.writeObject(output, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001086 byte[] valueBytes = output.toBytes();
1087 kryoFactory.deleteKryo(kryo);
1088
1089 //
1090 // Put the entry:
1091 // - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001092 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001093 //
1094 mapFlowId.putAsync(flowId.value(), valueBytes);
1095 }
1096
1097 /**
1098 * Send a notification that a FlowId is removed.
1099 *
1100 * @param flowId the FlowId that is removed.
1101 */
1102 @Override
1103 public void notificationSendFlowIdRemoved(FlowId flowId) {
1104 //
1105 // Remove the entry:
1106 // - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001107 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001108 //
1109 mapFlowId.removeAsync(flowId.value());
1110 }
1111
1112 /**
1113 * Send a notification that a FlowId is updated.
1114 *
1115 * @param flowId the FlowId that is updated.
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001116 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001117 */
1118 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001119 public void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid) {
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001120 // NOTE: Adding an entry with an existing key automatically updates it
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001121 notificationSendFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001122 }
1123
1124 /**
1125 * Send a notification that all Flow IDs are removed.
1126 */
1127 @Override
1128 public void notificationSendAllFlowIdsRemoved() {
1129 //
1130 // Remove all entries
1131 // NOTE: We remove the entries one-by-one so the per-entry
1132 // notifications will be delivered.
1133 //
1134 // mapFlowId.clear();
1135 Set<Long> keySet = mapFlowId.keySet();
1136 for (Long key : keySet) {
1137 mapFlowId.removeAsync(key);
1138 }
1139 }
1140
1141 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001142 * Send a notification that a FlowEntryId is added.
1143 *
1144 * @param flowEntryId the FlowEntryId that is added.
1145 * @param dpid the Switch Dpid.
1146 */
1147 @Override
1148 public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
1149 Dpid dpid) {
1150 //
1151 // Encode the value
1152 //
1153 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1154 Kryo kryo = kryoFactory.newKryo();
1155 Output output = new Output(buffer, -1);
1156 kryo.writeObject(output, dpid);
1157 byte[] valueBytes = output.toBytes();
1158 kryoFactory.deleteKryo(kryo);
1159
1160 //
1161 // Put the entry:
1162 // - Key : FlowEntryId (Long)
1163 // - Value : Serialized Switch Dpid (byte[])
1164 //
1165 mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
1166 }
1167
1168 /**
1169 * Send a notification that a FlowEntryId is removed.
1170 *
1171 * @param flowEntryId the FlowEntryId that is removed.
1172 */
1173 @Override
1174 public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
1175 //
1176 // Remove the entry:
1177 // - Key : FlowEntryId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001178 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001179 //
1180 mapFlowEntryId.removeAsync(flowEntryId.value());
1181 }
1182
1183 /**
1184 * Send a notification that a FlowEntryId is updated.
1185 *
1186 * @param flowEntryId the FlowEntryId that is updated.
1187 * @param dpid the Switch Dpid.
1188 */
1189 @Override
1190 public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
1191 Dpid dpid) {
1192 // NOTE: Adding an entry with an existing key automatically updates it
1193 notificationSendFlowEntryIdAdded(flowEntryId, dpid);
1194 }
1195
1196 /**
1197 * Send a notification that all Flow Entry IDs are removed.
1198 */
1199 @Override
1200 public void notificationSendAllFlowEntryIdsRemoved() {
1201 //
1202 // Remove all entries
1203 // NOTE: We remove the entries one-by-one so the per-entry
1204 // notifications will be delivered.
1205 //
1206 // mapFlowEntryId.clear();
1207 Set<Long> keySet = mapFlowEntryId.keySet();
1208 for (Long key : keySet) {
1209 mapFlowEntryId.removeAsync(key);
1210 }
1211 }
1212
1213 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001214 * Get all Topology Elements that are currently in the datagrid.
1215 *
1216 * @return all Topology Elements that are currently in the datagrid.
1217 */
1218 @Override
1219 public Collection<TopologyElement> getAllTopologyElements() {
1220 Collection<TopologyElement> allTopologyElements =
1221 new LinkedList<TopologyElement>();
1222
1223 //
1224 // Get all current entries
1225 //
1226 Collection<byte[]> values = mapTopology.values();
1227 Kryo kryo = kryoFactory.newKryo();
1228 for (byte[] valueBytes : values) {
1229 //
1230 // Decode the value
1231 //
1232 Input input = new Input(valueBytes);
1233 TopologyElement topologyElement =
1234 kryo.readObject(input, TopologyElement.class);
1235 allTopologyElements.add(topologyElement);
1236 }
1237 kryoFactory.deleteKryo(kryo);
1238
1239 return allTopologyElements;
1240 }
1241
1242 /**
1243 * Send a notification that a Topology Element is added.
1244 *
1245 * @param topologyElement the Topology Element that is added.
1246 */
1247 @Override
1248 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
1249 //
1250 // Encode the value
1251 //
1252 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1253 Kryo kryo = kryoFactory.newKryo();
1254 Output output = new Output(buffer, -1);
1255 kryo.writeObject(output, topologyElement);
1256 byte[] valueBytes = output.toBytes();
1257 kryoFactory.deleteKryo(kryo);
1258
1259 //
1260 // Put the entry:
1261 // - Key : TopologyElement ID (String)
1262 // - Value : Serialized TopologyElement (byte[])
1263 //
1264 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
1265 }
1266
1267 /**
1268 * Send a notification that a Topology Element is removed.
1269 *
1270 * @param topologyElement the Topology Element that is removed.
1271 */
1272 @Override
1273 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
1274 //
1275 // Remove the entry:
1276 // - Key : TopologyElement ID (String)
1277 // - Value : Serialized TopologyElement (byte[])
1278 //
1279 mapTopology.removeAsync(topologyElement.elementId());
1280 }
1281
1282 /**
1283 * Send a notification that a Topology Element is updated.
1284 *
1285 * @param topologyElement the Topology Element that is updated.
1286 */
1287 @Override
1288 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
1289 // NOTE: Adding an entry with an existing key automatically updates it
1290 notificationSendTopologyElementAdded(topologyElement);
1291 }
1292
1293 /**
1294 * Send a notification that all Topology Elements are removed.
1295 */
1296 @Override
1297 public void notificationSendAllTopologyElementsRemoved() {
1298 //
1299 // Remove all entries
1300 // NOTE: We remove the entries one-by-one so the per-entry
1301 // notifications will be delivered.
1302 //
1303 // mapTopology.clear();
1304 Set<String> keySet = mapTopology.keySet();
1305 for (String key : keySet) {
1306 mapTopology.removeAsync(key);
1307 }
1308 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -08001309
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001310 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -08001311 public void sendArpRequest(ArpMessage arpMessage) {
1312 //log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
1313 arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001314 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001315}