blob: 0abd854869761fe8b5cfcbbe486a9e6424ad1ad3 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hartd3003252013-11-15 09:44:46 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080022import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070023import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080024import net.onrc.onos.ofcontroller.util.Dpid;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070025import net.onrc.onos.ofcontroller.util.FlowEntry;
26import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070027import net.onrc.onos.ofcontroller.util.FlowId;
28import net.onrc.onos.ofcontroller.util.FlowPath;
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -080029import net.onrc.onos.ofcontroller.util.Pair;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070030import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
31
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070032import org.slf4j.Logger;
33import org.slf4j.LoggerFactory;
34
Jonathan Hart18ad55c2013-11-11 22:49:55 -080035import com.esotericsoftware.kryo2.Kryo;
36import com.esotericsoftware.kryo2.io.Input;
37import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070038import com.hazelcast.config.Config;
39import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070040import com.hazelcast.core.EntryEvent;
41import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070042import com.hazelcast.core.Hazelcast;
43import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070044import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070045import com.hazelcast.instance.GroupProperties;
46
Pavlin Radoslavove2497672014-01-12 18:03:35 -080047import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor;
48
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070049/**
50 * A datagrid service that uses Hazelcast as a datagrid.
51 * The relevant data is stored in the Hazelcast datagrid and shared as
52 * appropriate in a multi-node cluster.
53 */
54public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070055 private final static int MAX_BUFFER_SIZE = 64*1024;
56
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070057 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070058 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070059 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070060
61 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070062 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070063 private Config hazelcastConfig = null;
64
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070065 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070066 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070067
68 // State related to the Flow map
69 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070070 private IMap<Long, byte[]> mapFlow = null;
71 private MapFlowListener mapFlowListener = null;
72 private String mapFlowListenerId = null;
73
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070074 // State related to the Flow Entry map
75 protected static final String mapFlowEntryName = "mapFlowEntry";
76 private IMap<Long, byte[]> mapFlowEntry = null;
77 private MapFlowEntryListener mapFlowEntryListener = null;
78 private String mapFlowEntryListenerId = null;
79
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -080080 // State related to the Flow ID map
81 protected static final String mapFlowIdName = "mapFlowId";
82 private IMap<Long, byte[]> mapFlowId = null;
83 private MapFlowIdListener mapFlowIdListener = null;
84 private String mapFlowIdListenerId = null;
85
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080086 // State related to the Flow Entry ID map
87 protected static final String mapFlowEntryIdName = "mapFlowEntryId";
88 private IMap<Long, byte[]> mapFlowEntryId = null;
89 private MapFlowEntryIdListener mapFlowEntryIdListener = null;
90 private String mapFlowEntryIdListenerId = null;
91
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070092 // State related to the Network Topology map
93 protected static final String mapTopologyName = "mapTopology";
94 private IMap<String, byte[]> mapTopology = null;
95 private MapTopologyListener mapTopologyListener = null;
96 private String mapTopologyListenerId = null;
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -080097
Jonathan Hart18ad55c2013-11-11 22:49:55 -080098 // State related to the ARP map
99 protected static final String arpMapName = "arpMap";
Jonathan Hartd3003252013-11-15 09:44:46 -0800100 private IMap<ArpMessage, byte[]> arpMap = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800101 private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
102 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700103
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700104 /**
105 * Class for receiving notifications for Flow state.
106 *
107 * The datagrid map is:
108 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -0800109 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700110 */
111 class MapFlowListener implements EntryListener<Long, byte[]> {
112 /**
113 * Receive a notification that an entry is added.
114 *
115 * @param event the notification event for the entry.
116 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800117 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800118 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800119 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700120
121 //
122 // Decode the value and deliver the notification
123 //
124 Kryo kryo = kryoFactory.newKryo();
125 Input input = new Input(valueBytes);
126 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
127 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700128 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700129 }
130
131 /**
132 * Receive a notification that an entry is removed.
133 *
134 * @param event the notification event for the entry.
135 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800136 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800137 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800138 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700139
140 //
141 // Decode the value and deliver the notification
142 //
143 Kryo kryo = kryoFactory.newKryo();
144 Input input = new Input(valueBytes);
145 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
146 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700147 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700148 }
149
150 /**
151 * Receive a notification that an entry is updated.
152 *
153 * @param event the notification event for the entry.
154 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800155 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800156 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800157 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700158
159 //
160 // Decode the value and deliver the notification
161 //
162 Kryo kryo = kryoFactory.newKryo();
163 Input input = new Input(valueBytes);
164 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
165 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700166 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700167 }
168
169 /**
170 * Receive a notification that an entry is evicted.
171 *
172 * @param event the notification event for the entry.
173 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800174 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800175 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700176 // NOTE: We don't use eviction for this map
177 }
178 }
179
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700180 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700181 * Class for receiving notifications for FlowEntry state.
182 *
183 * The datagrid map is:
184 * - Key : FlowEntry ID (Long)
185 * - Value : Serialized FlowEntry (byte[])
186 */
187 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
188 /**
189 * Receive a notification that an entry is added.
190 *
191 * @param event the notification event for the entry.
192 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800193 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800194 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800195 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700196
197 //
198 // Decode the value and deliver the notification
199 //
200 Kryo kryo = kryoFactory.newKryo();
201 Input input = new Input(valueBytes);
202 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
203 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700204 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700205 }
206
207 /**
208 * Receive a notification that an entry is removed.
209 *
210 * @param event the notification event for the entry.
211 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800212 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800213 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800214 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700215
216 //
217 // Decode the value and deliver the notification
218 //
219 Kryo kryo = kryoFactory.newKryo();
220 Input input = new Input(valueBytes);
221 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
222 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700223 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700224 }
225
226 /**
227 * Receive a notification that an entry is updated.
228 *
229 * @param event the notification event for the entry.
230 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800231 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800232 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800233 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700234
235 //
236 // Decode the value and deliver the notification
237 //
238 Kryo kryo = kryoFactory.newKryo();
239 Input input = new Input(valueBytes);
240 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
241 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700242 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700243 }
244
245 /**
246 * Receive a notification that an entry is evicted.
247 *
248 * @param event the notification event for the entry.
249 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800250 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800251 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700252 // NOTE: We don't use eviction for this map
253 }
254 }
255
256 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800257 * Class for receiving notifications for FlowId state.
258 *
259 * The datagrid map is:
260 * - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800261 * - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800262 */
263 class MapFlowIdListener implements EntryListener<Long, byte[]> {
264 /**
265 * Receive a notification that an entry is added.
266 *
267 * @param event the notification event for the entry.
268 */
269 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800270 Long keyLong = event.getKey();
271 FlowId flowId = new FlowId(keyLong);
272
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800273 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800274
275 //
276 // Decode the value and deliver the notification
277 //
278 Kryo kryo = kryoFactory.newKryo();
279 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800280 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800281 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800282 flowEventHandlerService.notificationRecvFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800283 }
284
285 /**
286 * Receive a notification that an entry is removed.
287 *
288 * @param event the notification event for the entry.
289 */
290 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800291 Long keyLong = event.getKey();
292 FlowId flowId = new FlowId(keyLong);
293
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800294 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800295
296 //
297 // Decode the value and deliver the notification
298 //
299 Kryo kryo = kryoFactory.newKryo();
300 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800301 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800302 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800303 flowEventHandlerService.notificationRecvFlowIdRemoved(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800304 }
305
306 /**
307 * Receive a notification that an entry is updated.
308 *
309 * @param event the notification event for the entry.
310 */
311 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800312 Long keyLong = event.getKey();
313 FlowId flowId = new FlowId(keyLong);
314
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800315 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800316
317 //
318 // Decode the value and deliver the notification
319 //
320 Kryo kryo = kryoFactory.newKryo();
321 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800322 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800323 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800324 flowEventHandlerService.notificationRecvFlowIdUpdated(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800325 }
326
327 /**
328 * Receive a notification that an entry is evicted.
329 *
330 * @param event the notification event for the entry.
331 */
332 public void entryEvicted(EntryEvent<Long, byte[]> event) {
333 // NOTE: We don't use eviction for this map
334 }
335 }
336
337 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800338 * Class for receiving notifications for FlowEntryId state.
339 *
340 * The datagrid map is:
341 * - Key : FlowEntryId (Long)
342 * - Value : Serialized Switch Dpid (byte[])
343 */
344 class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
345 /**
346 * Receive a notification that an entry is added.
347 *
348 * @param event the notification event for the entry.
349 */
350 public void entryAdded(EntryEvent<Long, byte[]> event) {
351 Long keyLong = event.getKey();
352 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
353
354 byte[] valueBytes = event.getValue();
355
356 //
357 // Decode the value and deliver the notification
358 //
359 Kryo kryo = kryoFactory.newKryo();
360 Input input = new Input(valueBytes);
361 Dpid dpid = kryo.readObject(input, Dpid.class);
362 kryoFactory.deleteKryo(kryo);
363 flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
364 }
365
366 /**
367 * Receive a notification that an entry is removed.
368 *
369 * @param event the notification event for the entry.
370 */
371 public void entryRemoved(EntryEvent<Long, byte[]> event) {
372 Long keyLong = event.getKey();
373 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
374
375 byte[] valueBytes = event.getValue();
376
377 //
378 // Decode the value and deliver the notification
379 //
380 Kryo kryo = kryoFactory.newKryo();
381 Input input = new Input(valueBytes);
382 Dpid dpid = kryo.readObject(input, Dpid.class);
383 kryoFactory.deleteKryo(kryo);
384 flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
385 }
386
387 /**
388 * Receive a notification that an entry is updated.
389 *
390 * @param event the notification event for the entry.
391 */
392 public void entryUpdated(EntryEvent<Long, byte[]> event) {
393 Long keyLong = event.getKey();
394 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
395
396 byte[] valueBytes = event.getValue();
397
398 //
399 // Decode the value and deliver the notification
400 //
401 Kryo kryo = kryoFactory.newKryo();
402 Input input = new Input(valueBytes);
403 Dpid dpid = kryo.readObject(input, Dpid.class);
404 kryoFactory.deleteKryo(kryo);
405 flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
406 }
407
408 /**
409 * Receive a notification that an entry is evicted.
410 *
411 * @param event the notification event for the entry.
412 */
413 public void entryEvicted(EntryEvent<Long, byte[]> event) {
414 // NOTE: We don't use eviction for this map
415 }
416 }
417
418 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700419 * Class for receiving notifications for Network Topology state.
420 *
421 * The datagrid map is:
422 * - Key: TopologyElement ID (String)
423 * - Value: Serialized TopologyElement (byte[])
424 */
425 class MapTopologyListener implements EntryListener<String, byte[]> {
426 /**
427 * Receive a notification that an entry is added.
428 *
429 * @param event the notification event for the entry.
430 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800431 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800432 public void entryAdded(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800433 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700434
435 //
436 // Decode the value and deliver the notification
437 //
438 Kryo kryo = kryoFactory.newKryo();
439 Input input = new Input(valueBytes);
440 TopologyElement topologyElement =
441 kryo.readObject(input, TopologyElement.class);
442 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700443 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700444 }
445
446 /**
447 * Receive a notification that an entry is removed.
448 *
449 * @param event the notification event for the entry.
450 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800451 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800452 public void entryRemoved(EntryEvent<String, byte[]> event) {
Pavlin Radoslavove2497672014-01-12 18:03:35 -0800453 PerformanceMonitor.start("TopologyEntryRemoved.NotificationReceived");
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800454 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700455
456 //
457 // Decode the value and deliver the notification
458 //
459 Kryo kryo = kryoFactory.newKryo();
460 Input input = new Input(valueBytes);
461 TopologyElement topologyElement =
462 kryo.readObject(input, TopologyElement.class);
463 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700464 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavove2497672014-01-12 18:03:35 -0800465 PerformanceMonitor.stop("TopologyEntryRemoved.NotificationReceived");
Pavlin Radoslavov8bd6d112014-01-12 20:12:37 -0800466 PerformanceMonitor.report("TopologyEntryRemoved.NotificationReceived");
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700467 }
468
469 /**
470 * Receive a notification that an entry is updated.
471 *
472 * @param event the notification event for the entry.
473 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800474 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800475 public void entryUpdated(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800476 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700477
478 //
479 // Decode the value and deliver the notification
480 //
481 Kryo kryo = kryoFactory.newKryo();
482 Input input = new Input(valueBytes);
483 TopologyElement topologyElement =
484 kryo.readObject(input, TopologyElement.class);
485 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700486 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700487 }
488
489 /**
490 * Receive a notification that an entry is evicted.
491 *
492 * @param event the notification event for the entry.
493 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800494 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800495 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700496 // NOTE: We don't use eviction for this map
497 }
498 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800499
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800500 /**
501 * Class for receiving notifications for ARP requests.
502 *
503 * The datagrid map is:
504 * - Key: Request ID (String)
505 * - Value: ARP request packet (byte[])
506 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800507 class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800508 /**
509 * Receive a notification that an entry is added.
510 *
511 * @param event the notification event for the entry.
512 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800513 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800514 public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800515 for (IArpEventHandler arpEventHandler : arpEventHandlers) {
516 arpEventHandler.arpRequestNotification(event.getKey());
517 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800518
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800519 //
520 // Decode the value and deliver the notification
521 //
522 /*
523 Kryo kryo = kryoFactory.newKryo();
524 Input input = new Input(valueBytes);
525 TopologyElement topologyElement =
526 kryo.readObject(input, TopologyElement.class);
527 kryoFactory.deleteKryo(kryo);
528 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
529 */
530 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800531
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800532 /**
533 * Receive a notification that an entry is removed.
534 *
535 * @param event the notification event for the entry.
536 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800537 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800538 public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800539 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800540 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800541
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800542 /**
543 * Receive a notification that an entry is updated.
544 *
545 * @param event the notification event for the entry.
546 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800547 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800548 public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800549 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800550 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800551
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800552 /**
553 * Receive a notification that an entry is evicted.
554 *
555 * @param event the notification event for the entry.
556 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800557 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800558 public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800559 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800560 }
561 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700562
563 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700564 * Initialize the Hazelcast Datagrid operation.
565 *
566 * @param conf the configuration filename.
567 */
568 public void init(String configFilename) {
569 /*
570 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
571 System.setProperty("hazelcast.socket.send.buffer.size", "32");
572 */
573 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800574
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700575 // Init from configuration file
576 try {
577 hazelcastConfig = new FileSystemXmlConfig(configFilename);
578 } catch (FileNotFoundException e) {
579 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
580 }
581 /*
582 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
583 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
584 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
585 */
586 //
587 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
588 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
589 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
590 }
591
592 /**
593 * Shutdown the Hazelcast Datagrid operation.
594 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800595 @Override
596 protected void finalize() {
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700597 close();
598 }
599
600 /**
601 * Shutdown the Hazelcast Datagrid operation.
602 */
603 public void close() {
604 Hazelcast.shutdownAll();
605 }
606
607 /**
608 * Get the collection of offered module services.
609 *
610 * @return the collection of offered module services.
611 */
612 @Override
613 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800614 Collection<Class<? extends IFloodlightService>> l =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700615 new ArrayList<Class<? extends IFloodlightService>>();
616 l.add(IDatagridService.class);
617 return l;
618 }
619
620 /**
621 * Get the collection of implemented services.
622 *
623 * @return the collection of implemented services.
624 */
625 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800626 public Map<Class<? extends IFloodlightService>, IFloodlightService>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700627 getServiceImpls() {
628 Map<Class<? extends IFloodlightService>,
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800629 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700630 new HashMap<Class<? extends IFloodlightService>,
631 IFloodlightService>();
632 m.put(IDatagridService.class, this);
633 return m;
634 }
635
636 /**
637 * Get the collection of modules this module depends on.
638 *
639 * @return the collection of modules this module depends on.
640 */
641 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800642 public Collection<Class<? extends IFloodlightService>>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700643 getModuleDependencies() {
644 Collection<Class<? extends IFloodlightService>> l =
645 new ArrayList<Class<? extends IFloodlightService>>();
646 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700647 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700648 return l;
649 }
650
651 /**
652 * Initialize the module.
653 *
654 * @param context the module context to use for the initialization.
655 */
656 @Override
657 public void init(FloodlightModuleContext context)
658 throws FloodlightModuleException {
659 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700660 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700661
662 // Get the configuration file name and configure the Datagrid
663 Map<String, String> configMap = context.getConfigParams(this);
664 String configFilename = configMap.get(HazelcastConfigFile);
665 this.init(configFilename);
666 }
667
668 /**
669 * Startup module operation.
670 *
671 * @param context the module context to use for the startup.
672 */
673 @Override
674 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700675 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700676
677 restApi.addRestletRoutable(new DatagridWebRoutable());
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800678
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800679 arpMap = hazelcastInstance.getMap(arpMapName);
680 arpMap.addEntryListener(new ArpMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700681 }
682
683 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700684 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700685 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700686 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700687 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700688 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700689 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700690 */
691 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700692 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
693 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700694
695 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700696 mapFlowListener = new MapFlowListener();
697 mapFlow = hazelcastInstance.getMap(mapFlowName);
698 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700699
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700700 // Initialize the FlowEntry-related map state
701 mapFlowEntryListener = new MapFlowEntryListener();
702 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
703 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
704
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800705 // Initialize the FlowId-related map state
706 mapFlowIdListener = new MapFlowIdListener();
707 mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
708 mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
709
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800710 // Initialize the FlowEntryId-related map state
711 mapFlowEntryIdListener = new MapFlowEntryIdListener();
712 mapFlowEntryId = hazelcastInstance.getMap(mapFlowEntryIdName);
713 mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);
714
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700715 // Initialize the Topology-related map state
716 mapTopologyListener = new MapTopologyListener();
717 mapTopology = hazelcastInstance.getMap(mapTopologyName);
718 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700719 }
720
721 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700722 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700723 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700724 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700725 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700726 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700727 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700728 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700729 */
730 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700731 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700732 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700733 mapFlow.removeEntryListener(mapFlowListenerId);
734 mapFlow = null;
735 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700736
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700737 // Clear the FlowEntry-related map state
738 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
739 mapFlowEntry = null;
740 mapFlowEntryListener = null;
741
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800742 // Clear the FlowId-related map state
743 mapFlowId.removeEntryListener(mapFlowIdListenerId);
744 mapFlowId = null;
745 mapFlowIdListener = null;
746
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800747 // Clear the FlowEntryId-related map state
748 mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
749 mapFlowEntryId = null;
750 mapFlowEntryIdListener = null;
751
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700752 // Clear the Topology-related map state
753 mapTopology.removeEntryListener(mapTopologyListenerId);
754 mapTopology = null;
755 mapTopologyListener = null;
756
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700757 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700758 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800759
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800760 @Override
761 public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
762 if (arpEventHandler != null) {
763 arpEventHandlers.add(arpEventHandler);
764 }
765 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800766
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800767 @Override
768 public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
769 arpEventHandlers.remove(arpEventHandler);
770 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800771
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700772 /**
773 * Get all Flows that are currently in the datagrid.
774 *
775 * @return all Flows that are currently in the datagrid.
776 */
777 @Override
778 public Collection<FlowPath> getAllFlows() {
779 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
780
781 //
782 // Get all current entries
783 //
784 Collection<byte[]> values = mapFlow.values();
785 Kryo kryo = kryoFactory.newKryo();
786 for (byte[] valueBytes : values) {
787 //
788 // Decode the value
789 //
790 Input input = new Input(valueBytes);
791 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
792 allFlows.add(flowPath);
793 }
794 kryoFactory.deleteKryo(kryo);
795
796 return allFlows;
797 }
798
799 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800800 * Get a Flow for a given Flow ID.
801 *
802 * @param flowId the Flow ID of the Flow to get.
803 * @return the Flow if found, otherwise null.
804 */
805 @Override
806 public FlowPath getFlow(FlowId flowId) {
807 byte[] valueBytes = mapFlow.get(flowId.value());
808 if (valueBytes == null)
809 return null;
810
811 Kryo kryo = kryoFactory.newKryo();
812 //
813 // Decode the value
814 //
815 Input input = new Input(valueBytes);
816 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
817 kryoFactory.deleteKryo(kryo);
818
819 return flowPath;
820 }
821
822 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700823 * Send a notification that a Flow is added.
824 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700825 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700826 */
827 @Override
828 public void notificationSendFlowAdded(FlowPath flowPath) {
829 //
830 // Encode the value
831 //
832 byte[] buffer = new byte[MAX_BUFFER_SIZE];
833 Kryo kryo = kryoFactory.newKryo();
834 Output output = new Output(buffer, -1);
835 kryo.writeObject(output, flowPath);
836 byte[] valueBytes = output.toBytes();
837 kryoFactory.deleteKryo(kryo);
838
839 //
840 // Put the entry:
841 // - Key : Flow ID (Long)
842 // - Value : Serialized Flow (byte[])
843 //
844 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
845 }
846
847 /**
848 * Send a notification that a Flow is removed.
849 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700850 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700851 */
852 @Override
853 public void notificationSendFlowRemoved(FlowId flowId) {
854 //
855 // Remove the entry:
856 // - Key : Flow ID (Long)
857 // - Value : Serialized Flow (byte[])
858 //
859 mapFlow.removeAsync(flowId.value());
860 }
861
862 /**
863 * Send a notification that a Flow is updated.
864 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700865 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700866 */
867 @Override
868 public void notificationSendFlowUpdated(FlowPath flowPath) {
869 // NOTE: Adding an entry with an existing key automatically updates it
870 notificationSendFlowAdded(flowPath);
871 }
872
873 /**
874 * Send a notification that all Flows are removed.
875 */
876 @Override
877 public void notificationSendAllFlowsRemoved() {
878 //
879 // Remove all entries
880 // NOTE: We remove the entries one-by-one so the per-entry
881 // notifications will be delivered.
882 //
883 // mapFlow.clear();
884 Set<Long> keySet = mapFlow.keySet();
885 for (Long key : keySet) {
886 mapFlow.removeAsync(key);
887 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700888 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700889
890 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700891 * Get all Flow Entries that are currently in the datagrid.
892 *
893 * @return all Flow Entries that are currently in the datagrid.
894 */
895 @Override
896 public Collection<FlowEntry> getAllFlowEntries() {
897 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
898
899 //
900 // Get all current entries
901 //
902 Collection<byte[]> values = mapFlowEntry.values();
903 Kryo kryo = kryoFactory.newKryo();
904 for (byte[] valueBytes : values) {
905 //
906 // Decode the value
907 //
908 Input input = new Input(valueBytes);
909 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
910 allFlowEntries.add(flowEntry);
911 }
912 kryoFactory.deleteKryo(kryo);
913
914 return allFlowEntries;
915 }
916
917 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800918 * Get a Flow Entry for a given Flow Entry ID.
919 *
920 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
921 * @return the Flow Entry if found, otherwise null.
922 */
923 @Override
924 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
925 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
926 if (valueBytes == null)
927 return null;
928
929 Kryo kryo = kryoFactory.newKryo();
930 //
931 // Decode the value
932 //
933 Input input = new Input(valueBytes);
934 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
935 kryoFactory.deleteKryo(kryo);
936
937 return flowEntry;
938 }
939
940 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700941 * Send a notification that a FlowEntry is added.
942 *
943 * @param flowEntry the FlowEntry that is added.
944 */
945 @Override
946 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
947 //
948 // Encode the value
949 //
950 byte[] buffer = new byte[MAX_BUFFER_SIZE];
951 Kryo kryo = kryoFactory.newKryo();
952 Output output = new Output(buffer, -1);
953 kryo.writeObject(output, flowEntry);
954 byte[] valueBytes = output.toBytes();
955 kryoFactory.deleteKryo(kryo);
956
957 //
958 // Put the entry:
959 // - Key : FlowEntry ID (Long)
960 // - Value : Serialized FlowEntry (byte[])
961 //
962 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
963 }
964
965 /**
966 * Send a notification that a FlowEntry is removed.
967 *
968 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
969 */
970 @Override
971 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
972 //
973 // Remove the entry:
974 // - Key : FlowEntry ID (Long)
975 // - Value : Serialized FlowEntry (byte[])
976 //
977 mapFlowEntry.removeAsync(flowEntryId.value());
978 }
979
980 /**
981 * Send a notification that a FlowEntry is updated.
982 *
983 * @param flowEntry the FlowEntry that is updated.
984 */
985 @Override
986 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
987 // NOTE: Adding an entry with an existing key automatically updates it
988 notificationSendFlowEntryAdded(flowEntry);
989 }
990
991 /**
992 * Send a notification that all Flow Entries are removed.
993 */
994 @Override
995 public void notificationSendAllFlowEntriesRemoved() {
996 //
997 // Remove all entries
998 // NOTE: We remove the entries one-by-one so the per-entry
999 // notifications will be delivered.
1000 //
1001 // mapFlowEntry.clear();
1002 Set<Long> keySet = mapFlowEntry.keySet();
1003 for (Long key : keySet) {
1004 mapFlowEntry.removeAsync(key);
1005 }
1006 }
1007
1008 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001009 * Get all Flow IDs that are currently in the datagrid.
1010 *
1011 * @return all Flow IDs that are currently in the datagrid.
1012 */
1013 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001014 public Collection<Pair<FlowId, Dpid>> getAllFlowIds() {
1015 Collection<Pair<FlowId, Dpid>> allFlowIds =
1016 new LinkedList<Pair<FlowId, Dpid>>();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001017
1018 //
1019 // Get all current entries
1020 //
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001021 Kryo kryo = kryoFactory.newKryo();
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001022 for (Map.Entry<Long, byte[]> entry : mapFlowId.entrySet()) {
1023 Long key = entry.getKey();
1024 byte[] valueBytes = entry.getValue();
1025
1026 FlowId flowId = new FlowId(key);
1027
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001028 //
1029 // Decode the value
1030 //
1031 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001032 Dpid dpid = kryo.readObject(input, Dpid.class);
1033
1034 Pair<FlowId, Dpid> pair = new Pair(flowId, dpid);
1035 allFlowIds.add(pair);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001036 }
1037 kryoFactory.deleteKryo(kryo);
1038
1039 return allFlowIds;
1040 }
1041
1042 /**
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001043 * Get all Flow Entry IDs that are currently in the datagrid.
1044 *
1045 * @return all Flow Entry IDs that ae currently in the datagrid.
1046 */
1047 @Override
1048 public Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds() {
1049 Collection<Pair<FlowEntryId, Dpid>> allFlowEntryIds =
1050 new LinkedList<Pair<FlowEntryId, Dpid>>();
1051
1052 //
1053 // Get all current entries
1054 //
1055 Kryo kryo = kryoFactory.newKryo();
1056 for (Map.Entry<Long, byte[]> entry : mapFlowEntryId.entrySet()) {
1057 Long key = entry.getKey();
1058 byte[] valueBytes = entry.getValue();
1059
1060 FlowEntryId flowEntryId = new FlowEntryId(key);
1061
1062 //
1063 // Decode the value
1064 //
1065 Input input = new Input(valueBytes);
1066 Dpid dpid = kryo.readObject(input, Dpid.class);
1067
1068 Pair<FlowEntryId, Dpid> pair = new Pair(flowEntryId, dpid);
1069 allFlowEntryIds.add(pair);
1070 }
1071 kryoFactory.deleteKryo(kryo);
1072
1073 return allFlowEntryIds;
1074 }
1075
1076 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001077 * Send a notification that a FlowId is added.
1078 *
1079 * @param flowId the FlowId that is added.
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001080 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001081 */
1082 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001083 public void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid) {
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001084 //
1085 // Encode the value
1086 //
1087 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1088 Kryo kryo = kryoFactory.newKryo();
1089 Output output = new Output(buffer, -1);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001090 kryo.writeObject(output, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001091 byte[] valueBytes = output.toBytes();
1092 kryoFactory.deleteKryo(kryo);
1093
1094 //
1095 // Put the entry:
1096 // - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001097 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001098 //
1099 mapFlowId.putAsync(flowId.value(), valueBytes);
1100 }
1101
1102 /**
1103 * Send a notification that a FlowId is removed.
1104 *
1105 * @param flowId the FlowId that is removed.
1106 */
1107 @Override
1108 public void notificationSendFlowIdRemoved(FlowId flowId) {
1109 //
1110 // Remove the entry:
1111 // - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001112 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001113 //
1114 mapFlowId.removeAsync(flowId.value());
1115 }
1116
1117 /**
1118 * Send a notification that a FlowId is updated.
1119 *
1120 * @param flowId the FlowId that is updated.
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001121 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001122 */
1123 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001124 public void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid) {
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001125 // NOTE: Adding an entry with an existing key automatically updates it
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001126 notificationSendFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001127 }
1128
1129 /**
1130 * Send a notification that all Flow IDs are removed.
1131 */
1132 @Override
1133 public void notificationSendAllFlowIdsRemoved() {
1134 //
1135 // Remove all entries
1136 // NOTE: We remove the entries one-by-one so the per-entry
1137 // notifications will be delivered.
1138 //
1139 // mapFlowId.clear();
1140 Set<Long> keySet = mapFlowId.keySet();
1141 for (Long key : keySet) {
1142 mapFlowId.removeAsync(key);
1143 }
1144 }
1145
1146 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001147 * Send a notification that a FlowEntryId is added.
1148 *
1149 * @param flowEntryId the FlowEntryId that is added.
1150 * @param dpid the Switch Dpid.
1151 */
1152 @Override
1153 public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
1154 Dpid dpid) {
1155 //
1156 // Encode the value
1157 //
1158 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1159 Kryo kryo = kryoFactory.newKryo();
1160 Output output = new Output(buffer, -1);
1161 kryo.writeObject(output, dpid);
1162 byte[] valueBytes = output.toBytes();
1163 kryoFactory.deleteKryo(kryo);
1164
1165 //
1166 // Put the entry:
1167 // - Key : FlowEntryId (Long)
1168 // - Value : Serialized Switch Dpid (byte[])
1169 //
1170 mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
1171 }
1172
1173 /**
1174 * Send a notification that a FlowEntryId is removed.
1175 *
1176 * @param flowEntryId the FlowEntryId that is removed.
1177 */
1178 @Override
1179 public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
1180 //
1181 // Remove the entry:
1182 // - Key : FlowEntryId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001183 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001184 //
1185 mapFlowEntryId.removeAsync(flowEntryId.value());
1186 }
1187
1188 /**
1189 * Send a notification that a FlowEntryId is updated.
1190 *
1191 * @param flowEntryId the FlowEntryId that is updated.
1192 * @param dpid the Switch Dpid.
1193 */
1194 @Override
1195 public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
1196 Dpid dpid) {
1197 // NOTE: Adding an entry with an existing key automatically updates it
1198 notificationSendFlowEntryIdAdded(flowEntryId, dpid);
1199 }
1200
1201 /**
1202 * Send a notification that all Flow Entry IDs are removed.
1203 */
1204 @Override
1205 public void notificationSendAllFlowEntryIdsRemoved() {
1206 //
1207 // Remove all entries
1208 // NOTE: We remove the entries one-by-one so the per-entry
1209 // notifications will be delivered.
1210 //
1211 // mapFlowEntryId.clear();
1212 Set<Long> keySet = mapFlowEntryId.keySet();
1213 for (Long key : keySet) {
1214 mapFlowEntryId.removeAsync(key);
1215 }
1216 }
1217
1218 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001219 * Get all Topology Elements that are currently in the datagrid.
1220 *
1221 * @return all Topology Elements that are currently in the datagrid.
1222 */
1223 @Override
1224 public Collection<TopologyElement> getAllTopologyElements() {
1225 Collection<TopologyElement> allTopologyElements =
1226 new LinkedList<TopologyElement>();
1227
1228 //
1229 // Get all current entries
1230 //
1231 Collection<byte[]> values = mapTopology.values();
1232 Kryo kryo = kryoFactory.newKryo();
1233 for (byte[] valueBytes : values) {
1234 //
1235 // Decode the value
1236 //
1237 Input input = new Input(valueBytes);
1238 TopologyElement topologyElement =
1239 kryo.readObject(input, TopologyElement.class);
1240 allTopologyElements.add(topologyElement);
1241 }
1242 kryoFactory.deleteKryo(kryo);
1243
1244 return allTopologyElements;
1245 }
1246
1247 /**
1248 * Send a notification that a Topology Element is added.
1249 *
1250 * @param topologyElement the Topology Element that is added.
1251 */
1252 @Override
1253 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
1254 //
1255 // Encode the value
1256 //
1257 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1258 Kryo kryo = kryoFactory.newKryo();
1259 Output output = new Output(buffer, -1);
1260 kryo.writeObject(output, topologyElement);
1261 byte[] valueBytes = output.toBytes();
1262 kryoFactory.deleteKryo(kryo);
1263
1264 //
1265 // Put the entry:
1266 // - Key : TopologyElement ID (String)
1267 // - Value : Serialized TopologyElement (byte[])
1268 //
1269 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
1270 }
1271
1272 /**
1273 * Send a notification that a Topology Element is removed.
1274 *
1275 * @param topologyElement the Topology Element that is removed.
1276 */
1277 @Override
1278 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
1279 //
1280 // Remove the entry:
1281 // - Key : TopologyElement ID (String)
1282 // - Value : Serialized TopologyElement (byte[])
1283 //
1284 mapTopology.removeAsync(topologyElement.elementId());
1285 }
1286
1287 /**
1288 * Send a notification that a Topology Element is updated.
1289 *
1290 * @param topologyElement the Topology Element that is updated.
1291 */
1292 @Override
1293 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
1294 // NOTE: Adding an entry with an existing key automatically updates it
1295 notificationSendTopologyElementAdded(topologyElement);
1296 }
1297
1298 /**
1299 * Send a notification that all Topology Elements are removed.
1300 */
1301 @Override
1302 public void notificationSendAllTopologyElementsRemoved() {
1303 //
1304 // Remove all entries
1305 // NOTE: We remove the entries one-by-one so the per-entry
1306 // notifications will be delivered.
1307 //
1308 // mapTopology.clear();
1309 Set<String> keySet = mapTopology.keySet();
1310 for (String key : keySet) {
1311 mapTopology.removeAsync(key);
1312 }
1313 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -08001314
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001315 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -08001316 public void sendArpRequest(ArpMessage arpMessage) {
1317 //log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
1318 arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001319 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001320}