blob: b76aa580387079d9e358cf8792b72932f68f49c8 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hartd3003252013-11-15 09:44:46 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080022import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070023import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080024import net.onrc.onos.ofcontroller.util.Dpid;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070025import net.onrc.onos.ofcontroller.util.FlowEntry;
26import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070027import net.onrc.onos.ofcontroller.util.FlowId;
28import net.onrc.onos.ofcontroller.util.FlowPath;
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -080029import net.onrc.onos.ofcontroller.util.Pair;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070030import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
31
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070032import org.slf4j.Logger;
33import org.slf4j.LoggerFactory;
34
Jonathan Hart18ad55c2013-11-11 22:49:55 -080035import com.esotericsoftware.kryo2.Kryo;
36import com.esotericsoftware.kryo2.io.Input;
37import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070038import com.hazelcast.config.Config;
39import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070040import com.hazelcast.core.EntryEvent;
41import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070042import com.hazelcast.core.Hazelcast;
43import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070044import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070045import com.hazelcast.instance.GroupProperties;
46
Pavlin Radoslavove2497672014-01-12 18:03:35 -080047import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor;
48
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070049/**
50 * A datagrid service that uses Hazelcast as a datagrid.
51 * The relevant data is stored in the Hazelcast datagrid and shared as
52 * appropriate in a multi-node cluster.
53 */
54public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070055 private final static int MAX_BUFFER_SIZE = 64*1024;
56
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070057 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070058 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070059 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070060
61 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070062 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070063 private Config hazelcastConfig = null;
64
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070065 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070066 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070067
68 // State related to the Flow map
69 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070070 private IMap<Long, byte[]> mapFlow = null;
71 private MapFlowListener mapFlowListener = null;
72 private String mapFlowListenerId = null;
73
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070074 // State related to the Flow Entry map
75 protected static final String mapFlowEntryName = "mapFlowEntry";
76 private IMap<Long, byte[]> mapFlowEntry = null;
77 private MapFlowEntryListener mapFlowEntryListener = null;
78 private String mapFlowEntryListenerId = null;
79
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -080080 // State related to the Flow ID map
81 protected static final String mapFlowIdName = "mapFlowId";
82 private IMap<Long, byte[]> mapFlowId = null;
83 private MapFlowIdListener mapFlowIdListener = null;
84 private String mapFlowIdListenerId = null;
85
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080086 // State related to the Flow Entry ID map
87 protected static final String mapFlowEntryIdName = "mapFlowEntryId";
88 private IMap<Long, byte[]> mapFlowEntryId = null;
89 private MapFlowEntryIdListener mapFlowEntryIdListener = null;
90 private String mapFlowEntryIdListenerId = null;
91
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070092 // State related to the Network Topology map
93 protected static final String mapTopologyName = "mapTopology";
94 private IMap<String, byte[]> mapTopology = null;
95 private MapTopologyListener mapTopologyListener = null;
96 private String mapTopologyListenerId = null;
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -080097
Jonathan Hart18ad55c2013-11-11 22:49:55 -080098 // State related to the ARP map
99 protected static final String arpMapName = "arpMap";
Jonathan Hartd3003252013-11-15 09:44:46 -0800100 private IMap<ArpMessage, byte[]> arpMap = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800101 private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
102 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700103
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700104 /**
105 * Class for receiving notifications for Flow state.
106 *
107 * The datagrid map is:
108 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -0800109 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700110 */
111 class MapFlowListener implements EntryListener<Long, byte[]> {
112 /**
113 * Receive a notification that an entry is added.
114 *
115 * @param event the notification event for the entry.
116 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800117 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800118 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800119 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700120
121 //
122 // Decode the value and deliver the notification
123 //
124 Kryo kryo = kryoFactory.newKryo();
125 Input input = new Input(valueBytes);
126 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
127 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700128 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700129 }
130
131 /**
132 * Receive a notification that an entry is removed.
133 *
134 * @param event the notification event for the entry.
135 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800136 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800137 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800138 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700139
140 //
141 // Decode the value and deliver the notification
142 //
143 Kryo kryo = kryoFactory.newKryo();
144 Input input = new Input(valueBytes);
145 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
146 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700147 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700148 }
149
150 /**
151 * Receive a notification that an entry is updated.
152 *
153 * @param event the notification event for the entry.
154 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800155 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800156 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800157 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700158
159 //
160 // Decode the value and deliver the notification
161 //
162 Kryo kryo = kryoFactory.newKryo();
163 Input input = new Input(valueBytes);
164 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
165 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700166 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700167 }
168
169 /**
170 * Receive a notification that an entry is evicted.
171 *
172 * @param event the notification event for the entry.
173 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800174 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800175 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700176 // NOTE: We don't use eviction for this map
177 }
178 }
179
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700180 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700181 * Class for receiving notifications for FlowEntry state.
182 *
183 * The datagrid map is:
184 * - Key : FlowEntry ID (Long)
185 * - Value : Serialized FlowEntry (byte[])
186 */
187 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
188 /**
189 * Receive a notification that an entry is added.
190 *
191 * @param event the notification event for the entry.
192 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800193 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800194 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800195 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700196
197 //
198 // Decode the value and deliver the notification
199 //
200 Kryo kryo = kryoFactory.newKryo();
201 Input input = new Input(valueBytes);
202 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
203 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700204 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700205 }
206
207 /**
208 * Receive a notification that an entry is removed.
209 *
210 * @param event the notification event for the entry.
211 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800212 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800213 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800214 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700215
216 //
217 // Decode the value and deliver the notification
218 //
219 Kryo kryo = kryoFactory.newKryo();
220 Input input = new Input(valueBytes);
221 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
222 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700223 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700224 }
225
226 /**
227 * Receive a notification that an entry is updated.
228 *
229 * @param event the notification event for the entry.
230 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800231 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800232 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800233 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700234
235 //
236 // Decode the value and deliver the notification
237 //
238 Kryo kryo = kryoFactory.newKryo();
239 Input input = new Input(valueBytes);
240 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
241 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700242 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700243 }
244
245 /**
246 * Receive a notification that an entry is evicted.
247 *
248 * @param event the notification event for the entry.
249 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800250 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800251 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700252 // NOTE: We don't use eviction for this map
253 }
254 }
255
256 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800257 * Class for receiving notifications for FlowId state.
258 *
259 * The datagrid map is:
260 * - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800261 * - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800262 */
263 class MapFlowIdListener implements EntryListener<Long, byte[]> {
264 /**
265 * Receive a notification that an entry is added.
266 *
267 * @param event the notification event for the entry.
268 */
269 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800270 Long keyLong = event.getKey();
271 FlowId flowId = new FlowId(keyLong);
272
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800273 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800274
275 //
276 // Decode the value and deliver the notification
277 //
278 Kryo kryo = kryoFactory.newKryo();
279 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800280 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800281 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800282 flowEventHandlerService.notificationRecvFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800283 }
284
285 /**
286 * Receive a notification that an entry is removed.
287 *
288 * @param event the notification event for the entry.
289 */
290 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800291 Long keyLong = event.getKey();
292 FlowId flowId = new FlowId(keyLong);
293
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800294 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800295
296 //
297 // Decode the value and deliver the notification
298 //
299 Kryo kryo = kryoFactory.newKryo();
300 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800301 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800302 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800303 flowEventHandlerService.notificationRecvFlowIdRemoved(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800304 }
305
306 /**
307 * Receive a notification that an entry is updated.
308 *
309 * @param event the notification event for the entry.
310 */
311 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800312 Long keyLong = event.getKey();
313 FlowId flowId = new FlowId(keyLong);
314
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800315 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800316
317 //
318 // Decode the value and deliver the notification
319 //
320 Kryo kryo = kryoFactory.newKryo();
321 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800322 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800323 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800324 flowEventHandlerService.notificationRecvFlowIdUpdated(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800325 }
326
327 /**
328 * Receive a notification that an entry is evicted.
329 *
330 * @param event the notification event for the entry.
331 */
332 public void entryEvicted(EntryEvent<Long, byte[]> event) {
333 // NOTE: We don't use eviction for this map
334 }
335 }
336
337 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800338 * Class for receiving notifications for FlowEntryId state.
339 *
340 * The datagrid map is:
341 * - Key : FlowEntryId (Long)
342 * - Value : Serialized Switch Dpid (byte[])
343 */
344 class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
345 /**
346 * Receive a notification that an entry is added.
347 *
348 * @param event the notification event for the entry.
349 */
350 public void entryAdded(EntryEvent<Long, byte[]> event) {
351 Long keyLong = event.getKey();
352 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
353
354 byte[] valueBytes = event.getValue();
355
356 //
357 // Decode the value and deliver the notification
358 //
359 Kryo kryo = kryoFactory.newKryo();
360 Input input = new Input(valueBytes);
361 Dpid dpid = kryo.readObject(input, Dpid.class);
362 kryoFactory.deleteKryo(kryo);
363 flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
364 }
365
366 /**
367 * Receive a notification that an entry is removed.
368 *
369 * @param event the notification event for the entry.
370 */
371 public void entryRemoved(EntryEvent<Long, byte[]> event) {
372 Long keyLong = event.getKey();
373 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
374
375 byte[] valueBytes = event.getValue();
376
377 //
378 // Decode the value and deliver the notification
379 //
380 Kryo kryo = kryoFactory.newKryo();
381 Input input = new Input(valueBytes);
382 Dpid dpid = kryo.readObject(input, Dpid.class);
383 kryoFactory.deleteKryo(kryo);
384 flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
385 }
386
387 /**
388 * Receive a notification that an entry is updated.
389 *
390 * @param event the notification event for the entry.
391 */
392 public void entryUpdated(EntryEvent<Long, byte[]> event) {
393 Long keyLong = event.getKey();
394 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
395
396 byte[] valueBytes = event.getValue();
397
398 //
399 // Decode the value and deliver the notification
400 //
401 Kryo kryo = kryoFactory.newKryo();
402 Input input = new Input(valueBytes);
403 Dpid dpid = kryo.readObject(input, Dpid.class);
404 kryoFactory.deleteKryo(kryo);
405 flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
406 }
407
408 /**
409 * Receive a notification that an entry is evicted.
410 *
411 * @param event the notification event for the entry.
412 */
413 public void entryEvicted(EntryEvent<Long, byte[]> event) {
414 // NOTE: We don't use eviction for this map
415 }
416 }
417
418 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700419 * Class for receiving notifications for Network Topology state.
420 *
421 * The datagrid map is:
422 * - Key: TopologyElement ID (String)
423 * - Value: Serialized TopologyElement (byte[])
424 */
425 class MapTopologyListener implements EntryListener<String, byte[]> {
426 /**
427 * Receive a notification that an entry is added.
428 *
429 * @param event the notification event for the entry.
430 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800431 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800432 public void entryAdded(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800433 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700434
435 //
436 // Decode the value and deliver the notification
437 //
438 Kryo kryo = kryoFactory.newKryo();
439 Input input = new Input(valueBytes);
440 TopologyElement topologyElement =
441 kryo.readObject(input, TopologyElement.class);
442 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700443 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700444 }
445
446 /**
447 * Receive a notification that an entry is removed.
448 *
449 * @param event the notification event for the entry.
450 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800451 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800452 public void entryRemoved(EntryEvent<String, byte[]> event) {
Pavlin Radoslavov3fe9e8b2014-01-12 21:33:20 -0800453 String tag = "TopologyEntryRemoved.NotificationReceived." + event.getKey();
454 PerformanceMonitor.start(tag);
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800455 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700456
457 //
458 // Decode the value and deliver the notification
459 //
460 Kryo kryo = kryoFactory.newKryo();
461 Input input = new Input(valueBytes);
462 TopologyElement topologyElement =
463 kryo.readObject(input, TopologyElement.class);
464 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700465 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavov3fe9e8b2014-01-12 21:33:20 -0800466 PerformanceMonitor.stop(tag);
467 PerformanceMonitor.report(tag);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700468 }
469
470 /**
471 * Receive a notification that an entry is updated.
472 *
473 * @param event the notification event for the entry.
474 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800475 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800476 public void entryUpdated(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800477 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700478
479 //
480 // Decode the value and deliver the notification
481 //
482 Kryo kryo = kryoFactory.newKryo();
483 Input input = new Input(valueBytes);
484 TopologyElement topologyElement =
485 kryo.readObject(input, TopologyElement.class);
486 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700487 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700488 }
489
490 /**
491 * Receive a notification that an entry is evicted.
492 *
493 * @param event the notification event for the entry.
494 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800495 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800496 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700497 // NOTE: We don't use eviction for this map
498 }
499 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800500
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800501 /**
502 * Class for receiving notifications for ARP requests.
503 *
504 * The datagrid map is:
505 * - Key: Request ID (String)
506 * - Value: ARP request packet (byte[])
507 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800508 class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800509 /**
510 * Receive a notification that an entry is added.
511 *
512 * @param event the notification event for the entry.
513 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800514 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800515 public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800516 for (IArpEventHandler arpEventHandler : arpEventHandlers) {
517 arpEventHandler.arpRequestNotification(event.getKey());
518 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800519
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800520 //
521 // Decode the value and deliver the notification
522 //
523 /*
524 Kryo kryo = kryoFactory.newKryo();
525 Input input = new Input(valueBytes);
526 TopologyElement topologyElement =
527 kryo.readObject(input, TopologyElement.class);
528 kryoFactory.deleteKryo(kryo);
529 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
530 */
531 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800532
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800533 /**
534 * Receive a notification that an entry is removed.
535 *
536 * @param event the notification event for the entry.
537 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800538 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800539 public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800540 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800541 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800542
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800543 /**
544 * Receive a notification that an entry is updated.
545 *
546 * @param event the notification event for the entry.
547 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800548 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800549 public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800550 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800551 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800552
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800553 /**
554 * Receive a notification that an entry is evicted.
555 *
556 * @param event the notification event for the entry.
557 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800558 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800559 public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800560 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800561 }
562 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700563
564 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700565 * Initialize the Hazelcast Datagrid operation.
566 *
567 * @param conf the configuration filename.
568 */
569 public void init(String configFilename) {
570 /*
571 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
572 System.setProperty("hazelcast.socket.send.buffer.size", "32");
573 */
574 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800575
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700576 // Init from configuration file
577 try {
578 hazelcastConfig = new FileSystemXmlConfig(configFilename);
579 } catch (FileNotFoundException e) {
580 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
581 }
582 /*
583 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
584 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
585 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
586 */
587 //
588 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
589 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
590 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
591 }
592
593 /**
594 * Shutdown the Hazelcast Datagrid operation.
595 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800596 @Override
597 protected void finalize() {
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700598 close();
599 }
600
601 /**
602 * Shutdown the Hazelcast Datagrid operation.
603 */
604 public void close() {
605 Hazelcast.shutdownAll();
606 }
607
608 /**
609 * Get the collection of offered module services.
610 *
611 * @return the collection of offered module services.
612 */
613 @Override
614 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800615 Collection<Class<? extends IFloodlightService>> l =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700616 new ArrayList<Class<? extends IFloodlightService>>();
617 l.add(IDatagridService.class);
618 return l;
619 }
620
621 /**
622 * Get the collection of implemented services.
623 *
624 * @return the collection of implemented services.
625 */
626 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800627 public Map<Class<? extends IFloodlightService>, IFloodlightService>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700628 getServiceImpls() {
629 Map<Class<? extends IFloodlightService>,
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800630 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700631 new HashMap<Class<? extends IFloodlightService>,
632 IFloodlightService>();
633 m.put(IDatagridService.class, this);
634 return m;
635 }
636
637 /**
638 * Get the collection of modules this module depends on.
639 *
640 * @return the collection of modules this module depends on.
641 */
642 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800643 public Collection<Class<? extends IFloodlightService>>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700644 getModuleDependencies() {
645 Collection<Class<? extends IFloodlightService>> l =
646 new ArrayList<Class<? extends IFloodlightService>>();
647 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700648 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700649 return l;
650 }
651
652 /**
653 * Initialize the module.
654 *
655 * @param context the module context to use for the initialization.
656 */
657 @Override
658 public void init(FloodlightModuleContext context)
659 throws FloodlightModuleException {
660 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700661 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700662
663 // Get the configuration file name and configure the Datagrid
664 Map<String, String> configMap = context.getConfigParams(this);
665 String configFilename = configMap.get(HazelcastConfigFile);
666 this.init(configFilename);
667 }
668
669 /**
670 * Startup module operation.
671 *
672 * @param context the module context to use for the startup.
673 */
674 @Override
675 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700676 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700677
678 restApi.addRestletRoutable(new DatagridWebRoutable());
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800679
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800680 arpMap = hazelcastInstance.getMap(arpMapName);
681 arpMap.addEntryListener(new ArpMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700682 }
683
684 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700685 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700686 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700687 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700688 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700689 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700690 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700691 */
692 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700693 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
694 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700695
696 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700697 mapFlowListener = new MapFlowListener();
698 mapFlow = hazelcastInstance.getMap(mapFlowName);
699 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700700
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700701 // Initialize the FlowEntry-related map state
702 mapFlowEntryListener = new MapFlowEntryListener();
703 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
704 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
705
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800706 // Initialize the FlowId-related map state
707 mapFlowIdListener = new MapFlowIdListener();
708 mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
709 mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
710
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800711 // Initialize the FlowEntryId-related map state
712 mapFlowEntryIdListener = new MapFlowEntryIdListener();
713 mapFlowEntryId = hazelcastInstance.getMap(mapFlowEntryIdName);
714 mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);
715
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700716 // Initialize the Topology-related map state
717 mapTopologyListener = new MapTopologyListener();
718 mapTopology = hazelcastInstance.getMap(mapTopologyName);
719 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700720 }
721
722 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700723 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700724 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700725 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700726 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700727 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700728 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700729 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700730 */
731 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700732 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700733 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700734 mapFlow.removeEntryListener(mapFlowListenerId);
735 mapFlow = null;
736 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700737
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700738 // Clear the FlowEntry-related map state
739 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
740 mapFlowEntry = null;
741 mapFlowEntryListener = null;
742
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800743 // Clear the FlowId-related map state
744 mapFlowId.removeEntryListener(mapFlowIdListenerId);
745 mapFlowId = null;
746 mapFlowIdListener = null;
747
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800748 // Clear the FlowEntryId-related map state
749 mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
750 mapFlowEntryId = null;
751 mapFlowEntryIdListener = null;
752
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700753 // Clear the Topology-related map state
754 mapTopology.removeEntryListener(mapTopologyListenerId);
755 mapTopology = null;
756 mapTopologyListener = null;
757
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700758 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700759 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800760
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800761 @Override
762 public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
763 if (arpEventHandler != null) {
764 arpEventHandlers.add(arpEventHandler);
765 }
766 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800767
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800768 @Override
769 public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
770 arpEventHandlers.remove(arpEventHandler);
771 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800772
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700773 /**
774 * Get all Flows that are currently in the datagrid.
775 *
776 * @return all Flows that are currently in the datagrid.
777 */
778 @Override
779 public Collection<FlowPath> getAllFlows() {
780 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
781
782 //
783 // Get all current entries
784 //
785 Collection<byte[]> values = mapFlow.values();
786 Kryo kryo = kryoFactory.newKryo();
787 for (byte[] valueBytes : values) {
788 //
789 // Decode the value
790 //
791 Input input = new Input(valueBytes);
792 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
793 allFlows.add(flowPath);
794 }
795 kryoFactory.deleteKryo(kryo);
796
797 return allFlows;
798 }
799
800 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800801 * Get a Flow for a given Flow ID.
802 *
803 * @param flowId the Flow ID of the Flow to get.
804 * @return the Flow if found, otherwise null.
805 */
806 @Override
807 public FlowPath getFlow(FlowId flowId) {
808 byte[] valueBytes = mapFlow.get(flowId.value());
809 if (valueBytes == null)
810 return null;
811
812 Kryo kryo = kryoFactory.newKryo();
813 //
814 // Decode the value
815 //
816 Input input = new Input(valueBytes);
817 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
818 kryoFactory.deleteKryo(kryo);
819
820 return flowPath;
821 }
822
823 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700824 * Send a notification that a Flow is added.
825 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700826 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700827 */
828 @Override
829 public void notificationSendFlowAdded(FlowPath flowPath) {
830 //
831 // Encode the value
832 //
833 byte[] buffer = new byte[MAX_BUFFER_SIZE];
834 Kryo kryo = kryoFactory.newKryo();
835 Output output = new Output(buffer, -1);
836 kryo.writeObject(output, flowPath);
837 byte[] valueBytes = output.toBytes();
838 kryoFactory.deleteKryo(kryo);
839
840 //
841 // Put the entry:
842 // - Key : Flow ID (Long)
843 // - Value : Serialized Flow (byte[])
844 //
845 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
846 }
847
848 /**
849 * Send a notification that a Flow is removed.
850 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700851 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700852 */
853 @Override
854 public void notificationSendFlowRemoved(FlowId flowId) {
855 //
856 // Remove the entry:
857 // - Key : Flow ID (Long)
858 // - Value : Serialized Flow (byte[])
859 //
860 mapFlow.removeAsync(flowId.value());
861 }
862
863 /**
864 * Send a notification that a Flow is updated.
865 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700866 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700867 */
868 @Override
869 public void notificationSendFlowUpdated(FlowPath flowPath) {
870 // NOTE: Adding an entry with an existing key automatically updates it
871 notificationSendFlowAdded(flowPath);
872 }
873
874 /**
875 * Send a notification that all Flows are removed.
876 */
877 @Override
878 public void notificationSendAllFlowsRemoved() {
879 //
880 // Remove all entries
881 // NOTE: We remove the entries one-by-one so the per-entry
882 // notifications will be delivered.
883 //
884 // mapFlow.clear();
885 Set<Long> keySet = mapFlow.keySet();
886 for (Long key : keySet) {
887 mapFlow.removeAsync(key);
888 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700889 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700890
891 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700892 * Get all Flow Entries that are currently in the datagrid.
893 *
894 * @return all Flow Entries that are currently in the datagrid.
895 */
896 @Override
897 public Collection<FlowEntry> getAllFlowEntries() {
898 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
899
900 //
901 // Get all current entries
902 //
903 Collection<byte[]> values = mapFlowEntry.values();
904 Kryo kryo = kryoFactory.newKryo();
905 for (byte[] valueBytes : values) {
906 //
907 // Decode the value
908 //
909 Input input = new Input(valueBytes);
910 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
911 allFlowEntries.add(flowEntry);
912 }
913 kryoFactory.deleteKryo(kryo);
914
915 return allFlowEntries;
916 }
917
918 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800919 * Get a Flow Entry for a given Flow Entry ID.
920 *
921 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
922 * @return the Flow Entry if found, otherwise null.
923 */
924 @Override
925 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
926 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
927 if (valueBytes == null)
928 return null;
929
930 Kryo kryo = kryoFactory.newKryo();
931 //
932 // Decode the value
933 //
934 Input input = new Input(valueBytes);
935 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
936 kryoFactory.deleteKryo(kryo);
937
938 return flowEntry;
939 }
940
941 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700942 * Send a notification that a FlowEntry is added.
943 *
944 * @param flowEntry the FlowEntry that is added.
945 */
946 @Override
947 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
948 //
949 // Encode the value
950 //
951 byte[] buffer = new byte[MAX_BUFFER_SIZE];
952 Kryo kryo = kryoFactory.newKryo();
953 Output output = new Output(buffer, -1);
954 kryo.writeObject(output, flowEntry);
955 byte[] valueBytes = output.toBytes();
956 kryoFactory.deleteKryo(kryo);
957
958 //
959 // Put the entry:
960 // - Key : FlowEntry ID (Long)
961 // - Value : Serialized FlowEntry (byte[])
962 //
963 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
964 }
965
966 /**
967 * Send a notification that a FlowEntry is removed.
968 *
969 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
970 */
971 @Override
972 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
973 //
974 // Remove the entry:
975 // - Key : FlowEntry ID (Long)
976 // - Value : Serialized FlowEntry (byte[])
977 //
978 mapFlowEntry.removeAsync(flowEntryId.value());
979 }
980
981 /**
982 * Send a notification that a FlowEntry is updated.
983 *
984 * @param flowEntry the FlowEntry that is updated.
985 */
986 @Override
987 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
988 // NOTE: Adding an entry with an existing key automatically updates it
989 notificationSendFlowEntryAdded(flowEntry);
990 }
991
992 /**
993 * Send a notification that all Flow Entries are removed.
994 */
995 @Override
996 public void notificationSendAllFlowEntriesRemoved() {
997 //
998 // Remove all entries
999 // NOTE: We remove the entries one-by-one so the per-entry
1000 // notifications will be delivered.
1001 //
1002 // mapFlowEntry.clear();
1003 Set<Long> keySet = mapFlowEntry.keySet();
1004 for (Long key : keySet) {
1005 mapFlowEntry.removeAsync(key);
1006 }
1007 }
1008
1009 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001010 * Get all Flow IDs that are currently in the datagrid.
1011 *
1012 * @return all Flow IDs that are currently in the datagrid.
1013 */
1014 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001015 public Collection<Pair<FlowId, Dpid>> getAllFlowIds() {
1016 Collection<Pair<FlowId, Dpid>> allFlowIds =
1017 new LinkedList<Pair<FlowId, Dpid>>();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001018
1019 //
1020 // Get all current entries
1021 //
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001022 Kryo kryo = kryoFactory.newKryo();
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001023 for (Map.Entry<Long, byte[]> entry : mapFlowId.entrySet()) {
1024 Long key = entry.getKey();
1025 byte[] valueBytes = entry.getValue();
1026
1027 FlowId flowId = new FlowId(key);
1028
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001029 //
1030 // Decode the value
1031 //
1032 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001033 Dpid dpid = kryo.readObject(input, Dpid.class);
1034
1035 Pair<FlowId, Dpid> pair = new Pair(flowId, dpid);
1036 allFlowIds.add(pair);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001037 }
1038 kryoFactory.deleteKryo(kryo);
1039
1040 return allFlowIds;
1041 }
1042
1043 /**
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001044 * Get all Flow Entry IDs that are currently in the datagrid.
1045 *
1046 * @return all Flow Entry IDs that ae currently in the datagrid.
1047 */
1048 @Override
1049 public Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds() {
1050 Collection<Pair<FlowEntryId, Dpid>> allFlowEntryIds =
1051 new LinkedList<Pair<FlowEntryId, Dpid>>();
1052
1053 //
1054 // Get all current entries
1055 //
1056 Kryo kryo = kryoFactory.newKryo();
1057 for (Map.Entry<Long, byte[]> entry : mapFlowEntryId.entrySet()) {
1058 Long key = entry.getKey();
1059 byte[] valueBytes = entry.getValue();
1060
1061 FlowEntryId flowEntryId = new FlowEntryId(key);
1062
1063 //
1064 // Decode the value
1065 //
1066 Input input = new Input(valueBytes);
1067 Dpid dpid = kryo.readObject(input, Dpid.class);
1068
1069 Pair<FlowEntryId, Dpid> pair = new Pair(flowEntryId, dpid);
1070 allFlowEntryIds.add(pair);
1071 }
1072 kryoFactory.deleteKryo(kryo);
1073
1074 return allFlowEntryIds;
1075 }
1076
1077 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001078 * Send a notification that a FlowId is added.
1079 *
1080 * @param flowId the FlowId that is added.
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001081 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001082 */
1083 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001084 public void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid) {
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001085 //
1086 // Encode the value
1087 //
1088 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1089 Kryo kryo = kryoFactory.newKryo();
1090 Output output = new Output(buffer, -1);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001091 kryo.writeObject(output, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001092 byte[] valueBytes = output.toBytes();
1093 kryoFactory.deleteKryo(kryo);
1094
1095 //
1096 // Put the entry:
1097 // - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001098 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001099 //
1100 mapFlowId.putAsync(flowId.value(), valueBytes);
1101 }
1102
1103 /**
1104 * Send a notification that a FlowId is removed.
1105 *
1106 * @param flowId the FlowId that is removed.
1107 */
1108 @Override
1109 public void notificationSendFlowIdRemoved(FlowId flowId) {
1110 //
1111 // Remove the entry:
1112 // - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001113 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001114 //
1115 mapFlowId.removeAsync(flowId.value());
1116 }
1117
1118 /**
1119 * Send a notification that a FlowId is updated.
1120 *
1121 * @param flowId the FlowId that is updated.
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001122 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001123 */
1124 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001125 public void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid) {
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001126 // NOTE: Adding an entry with an existing key automatically updates it
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001127 notificationSendFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001128 }
1129
1130 /**
1131 * Send a notification that all Flow IDs are removed.
1132 */
1133 @Override
1134 public void notificationSendAllFlowIdsRemoved() {
1135 //
1136 // Remove all entries
1137 // NOTE: We remove the entries one-by-one so the per-entry
1138 // notifications will be delivered.
1139 //
1140 // mapFlowId.clear();
1141 Set<Long> keySet = mapFlowId.keySet();
1142 for (Long key : keySet) {
1143 mapFlowId.removeAsync(key);
1144 }
1145 }
1146
1147 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001148 * Send a notification that a FlowEntryId is added.
1149 *
1150 * @param flowEntryId the FlowEntryId that is added.
1151 * @param dpid the Switch Dpid.
1152 */
1153 @Override
1154 public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
1155 Dpid dpid) {
1156 //
1157 // Encode the value
1158 //
1159 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1160 Kryo kryo = kryoFactory.newKryo();
1161 Output output = new Output(buffer, -1);
1162 kryo.writeObject(output, dpid);
1163 byte[] valueBytes = output.toBytes();
1164 kryoFactory.deleteKryo(kryo);
1165
1166 //
1167 // Put the entry:
1168 // - Key : FlowEntryId (Long)
1169 // - Value : Serialized Switch Dpid (byte[])
1170 //
1171 mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
1172 }
1173
1174 /**
1175 * Send a notification that a FlowEntryId is removed.
1176 *
1177 * @param flowEntryId the FlowEntryId that is removed.
1178 */
1179 @Override
1180 public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
1181 //
1182 // Remove the entry:
1183 // - Key : FlowEntryId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001184 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001185 //
1186 mapFlowEntryId.removeAsync(flowEntryId.value());
1187 }
1188
1189 /**
1190 * Send a notification that a FlowEntryId is updated.
1191 *
1192 * @param flowEntryId the FlowEntryId that is updated.
1193 * @param dpid the Switch Dpid.
1194 */
1195 @Override
1196 public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
1197 Dpid dpid) {
1198 // NOTE: Adding an entry with an existing key automatically updates it
1199 notificationSendFlowEntryIdAdded(flowEntryId, dpid);
1200 }
1201
1202 /**
1203 * Send a notification that all Flow Entry IDs are removed.
1204 */
1205 @Override
1206 public void notificationSendAllFlowEntryIdsRemoved() {
1207 //
1208 // Remove all entries
1209 // NOTE: We remove the entries one-by-one so the per-entry
1210 // notifications will be delivered.
1211 //
1212 // mapFlowEntryId.clear();
1213 Set<Long> keySet = mapFlowEntryId.keySet();
1214 for (Long key : keySet) {
1215 mapFlowEntryId.removeAsync(key);
1216 }
1217 }
1218
1219 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001220 * Get all Topology Elements that are currently in the datagrid.
1221 *
1222 * @return all Topology Elements that are currently in the datagrid.
1223 */
1224 @Override
1225 public Collection<TopologyElement> getAllTopologyElements() {
1226 Collection<TopologyElement> allTopologyElements =
1227 new LinkedList<TopologyElement>();
1228
1229 //
1230 // Get all current entries
1231 //
1232 Collection<byte[]> values = mapTopology.values();
1233 Kryo kryo = kryoFactory.newKryo();
1234 for (byte[] valueBytes : values) {
1235 //
1236 // Decode the value
1237 //
1238 Input input = new Input(valueBytes);
1239 TopologyElement topologyElement =
1240 kryo.readObject(input, TopologyElement.class);
1241 allTopologyElements.add(topologyElement);
1242 }
1243 kryoFactory.deleteKryo(kryo);
1244
1245 return allTopologyElements;
1246 }
1247
1248 /**
1249 * Send a notification that a Topology Element is added.
1250 *
1251 * @param topologyElement the Topology Element that is added.
1252 */
1253 @Override
1254 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
1255 //
1256 // Encode the value
1257 //
1258 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1259 Kryo kryo = kryoFactory.newKryo();
1260 Output output = new Output(buffer, -1);
1261 kryo.writeObject(output, topologyElement);
1262 byte[] valueBytes = output.toBytes();
1263 kryoFactory.deleteKryo(kryo);
1264
1265 //
1266 // Put the entry:
1267 // - Key : TopologyElement ID (String)
1268 // - Value : Serialized TopologyElement (byte[])
1269 //
1270 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
1271 }
1272
1273 /**
1274 * Send a notification that a Topology Element is removed.
1275 *
1276 * @param topologyElement the Topology Element that is removed.
1277 */
1278 @Override
1279 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
1280 //
1281 // Remove the entry:
1282 // - Key : TopologyElement ID (String)
1283 // - Value : Serialized TopologyElement (byte[])
1284 //
1285 mapTopology.removeAsync(topologyElement.elementId());
1286 }
1287
1288 /**
1289 * Send a notification that a Topology Element is updated.
1290 *
1291 * @param topologyElement the Topology Element that is updated.
1292 */
1293 @Override
1294 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
1295 // NOTE: Adding an entry with an existing key automatically updates it
1296 notificationSendTopologyElementAdded(topologyElement);
1297 }
1298
1299 /**
1300 * Send a notification that all Topology Elements are removed.
1301 */
1302 @Override
1303 public void notificationSendAllTopologyElementsRemoved() {
1304 //
1305 // Remove all entries
1306 // NOTE: We remove the entries one-by-one so the per-entry
1307 // notifications will be delivered.
1308 //
1309 // mapTopology.clear();
1310 Set<String> keySet = mapTopology.keySet();
1311 for (String key : keySet) {
1312 mapTopology.removeAsync(key);
1313 }
1314 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -08001315
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001316 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -08001317 public void sendArpRequest(ArpMessage arpMessage) {
1318 //log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
1319 arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001320 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001321}