blob: 13a0157bc4fa9591e38140ee2e14b2b70dfa402e [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Brian O'Connor2daf7a92014-01-14 11:26:35 -080021import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor.Measurement;
Jonathan Hart7804bea2014-01-07 10:50:52 -080022import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
23import net.onrc.onos.ofcontroller.proxyarp.IArpReplyEventHandler;
24import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
25import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070026import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080027import net.onrc.onos.ofcontroller.util.Dpid;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070028import net.onrc.onos.ofcontroller.util.FlowEntry;
29import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070030import net.onrc.onos.ofcontroller.util.FlowId;
31import net.onrc.onos.ofcontroller.util.FlowPath;
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -080032import net.onrc.onos.ofcontroller.util.Pair;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070033import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
34
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070035import org.slf4j.Logger;
36import org.slf4j.LoggerFactory;
37
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -080038import com.esotericsoftware.kryo.Kryo;
39import com.esotericsoftware.kryo.io.Input;
40import com.esotericsoftware.kryo.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070041import com.hazelcast.config.Config;
42import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070043import com.hazelcast.core.EntryEvent;
44import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070045import com.hazelcast.core.Hazelcast;
46import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070047import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070048import com.hazelcast.instance.GroupProperties;
49
Pavlin Radoslavove2497672014-01-12 18:03:35 -080050import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor;
51
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070052/**
53 * A datagrid service that uses Hazelcast as a datagrid.
54 * The relevant data is stored in the Hazelcast datagrid and shared as
55 * appropriate in a multi-node cluster.
56 */
57public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070058 private final static int MAX_BUFFER_SIZE = 64*1024;
59
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070060 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070061 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070062 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070063
64 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070065 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070066 private Config hazelcastConfig = null;
67
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070068 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070069 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070070
71 // State related to the Flow map
72 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070073 private IMap<Long, byte[]> mapFlow = null;
74 private MapFlowListener mapFlowListener = null;
75 private String mapFlowListenerId = null;
76
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070077 // State related to the Flow Entry map
78 protected static final String mapFlowEntryName = "mapFlowEntry";
79 private IMap<Long, byte[]> mapFlowEntry = null;
80 private MapFlowEntryListener mapFlowEntryListener = null;
81 private String mapFlowEntryListenerId = null;
82
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -080083 // State related to the Flow ID map
84 protected static final String mapFlowIdName = "mapFlowId";
85 private IMap<Long, byte[]> mapFlowId = null;
86 private MapFlowIdListener mapFlowIdListener = null;
87 private String mapFlowIdListenerId = null;
88
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080089 // State related to the Flow Entry ID map
90 protected static final String mapFlowEntryIdName = "mapFlowEntryId";
91 private IMap<Long, byte[]> mapFlowEntryId = null;
92 private MapFlowEntryIdListener mapFlowEntryIdListener = null;
93 private String mapFlowEntryIdListenerId = null;
94
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070095 // State related to the Network Topology map
96 protected static final String mapTopologyName = "mapTopology";
97 private IMap<String, byte[]> mapTopology = null;
98 private MapTopologyListener mapTopologyListener = null;
99 private String mapTopologyListenerId = null;
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800100
Jonathan Hart7804bea2014-01-07 10:50:52 -0800101 // State related to the packet out map
102 protected static final String packetOutMapName = "packetOutMap";
103 private IMap<PacketOutNotification, byte[]> packetOutMap = null;
104 private List<IPacketOutEventHandler> packetOutEventHandlers = new ArrayList<IPacketOutEventHandler>();
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800105
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800106 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700107
Jonathan Hart7804bea2014-01-07 10:50:52 -0800108 // State related to the ARP reply map
109 protected static final String arpReplyMapName = "arpReplyMap";
110 private IMap<ArpReplyNotification, byte[]> arpReplyMap = null;
111 private List<IArpReplyEventHandler> arpReplyEventHandlers = new ArrayList<IArpReplyEventHandler>();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700112
113 /**
114 * Class for receiving notifications for Flow state.
115 *
116 * The datagrid map is:
117 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -0800118 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700119 */
120 class MapFlowListener implements EntryListener<Long, byte[]> {
121 /**
122 * Receive a notification that an entry is added.
123 *
124 * @param event the notification event for the entry.
125 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800126 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800127 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800128 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700129
130 //
131 // Decode the value and deliver the notification
132 //
133 Kryo kryo = kryoFactory.newKryo();
134 Input input = new Input(valueBytes);
135 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
136 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700137 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700138 }
139
140 /**
141 * Receive a notification that an entry is removed.
142 *
143 * @param event the notification event for the entry.
144 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800145 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800146 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800147 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700148
149 //
150 // Decode the value and deliver the notification
151 //
152 Kryo kryo = kryoFactory.newKryo();
153 Input input = new Input(valueBytes);
154 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
155 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700156 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700157 }
158
159 /**
160 * Receive a notification that an entry is updated.
161 *
162 * @param event the notification event for the entry.
163 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800164 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800165 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800166 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700167
168 //
169 // Decode the value and deliver the notification
170 //
171 Kryo kryo = kryoFactory.newKryo();
172 Input input = new Input(valueBytes);
173 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
174 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700175 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700176 }
177
178 /**
179 * Receive a notification that an entry is evicted.
180 *
181 * @param event the notification event for the entry.
182 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800183 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800184 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700185 // NOTE: We don't use eviction for this map
186 }
187 }
188
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700189 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700190 * Class for receiving notifications for FlowEntry state.
191 *
192 * The datagrid map is:
193 * - Key : FlowEntry ID (Long)
194 * - Value : Serialized FlowEntry (byte[])
195 */
196 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
197 /**
198 * Receive a notification that an entry is added.
199 *
200 * @param event the notification event for the entry.
201 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800202 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800203 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800204 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700205
206 //
207 // Decode the value and deliver the notification
208 //
209 Kryo kryo = kryoFactory.newKryo();
210 Input input = new Input(valueBytes);
211 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
212 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700213 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700214 }
215
216 /**
217 * Receive a notification that an entry is removed.
218 *
219 * @param event the notification event for the entry.
220 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800221 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800222 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800223 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700224
225 //
226 // Decode the value and deliver the notification
227 //
228 Kryo kryo = kryoFactory.newKryo();
229 Input input = new Input(valueBytes);
230 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
231 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700232 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700233 }
234
235 /**
236 * Receive a notification that an entry is updated.
237 *
238 * @param event the notification event for the entry.
239 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800240 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800241 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800242 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700243
244 //
245 // Decode the value and deliver the notification
246 //
247 Kryo kryo = kryoFactory.newKryo();
248 Input input = new Input(valueBytes);
249 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
250 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700251 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700252 }
253
254 /**
255 * Receive a notification that an entry is evicted.
256 *
257 * @param event the notification event for the entry.
258 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800259 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800260 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700261 // NOTE: We don't use eviction for this map
262 }
263 }
264
265 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800266 * Class for receiving notifications for FlowId state.
267 *
268 * The datagrid map is:
269 * - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800270 * - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800271 */
272 class MapFlowIdListener implements EntryListener<Long, byte[]> {
273 /**
274 * Receive a notification that an entry is added.
275 *
276 * @param event the notification event for the entry.
277 */
278 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800279 Long keyLong = event.getKey();
280 FlowId flowId = new FlowId(keyLong);
281
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800282 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800283
284 //
285 // Decode the value and deliver the notification
286 //
287 Kryo kryo = kryoFactory.newKryo();
288 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800289 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800290 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800291 flowEventHandlerService.notificationRecvFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800292 }
293
294 /**
295 * Receive a notification that an entry is removed.
296 *
297 * @param event the notification event for the entry.
298 */
299 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800300 Long keyLong = event.getKey();
301 FlowId flowId = new FlowId(keyLong);
302
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800303 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800304
305 //
306 // Decode the value and deliver the notification
307 //
308 Kryo kryo = kryoFactory.newKryo();
309 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800310 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800311 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800312 flowEventHandlerService.notificationRecvFlowIdRemoved(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800313 }
314
315 /**
316 * Receive a notification that an entry is updated.
317 *
318 * @param event the notification event for the entry.
319 */
320 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800321 Long keyLong = event.getKey();
322 FlowId flowId = new FlowId(keyLong);
323
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800324 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800325
326 //
327 // Decode the value and deliver the notification
328 //
329 Kryo kryo = kryoFactory.newKryo();
330 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800331 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800332 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800333 flowEventHandlerService.notificationRecvFlowIdUpdated(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800334 }
335
336 /**
337 * Receive a notification that an entry is evicted.
338 *
339 * @param event the notification event for the entry.
340 */
341 public void entryEvicted(EntryEvent<Long, byte[]> event) {
342 // NOTE: We don't use eviction for this map
343 }
344 }
345
346 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800347 * Class for receiving notifications for FlowEntryId state.
348 *
349 * The datagrid map is:
350 * - Key : FlowEntryId (Long)
351 * - Value : Serialized Switch Dpid (byte[])
352 */
353 class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
354 /**
355 * Receive a notification that an entry is added.
356 *
357 * @param event the notification event for the entry.
358 */
359 public void entryAdded(EntryEvent<Long, byte[]> event) {
360 Long keyLong = event.getKey();
361 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
362
363 byte[] valueBytes = event.getValue();
364
365 //
366 // Decode the value and deliver the notification
367 //
368 Kryo kryo = kryoFactory.newKryo();
369 Input input = new Input(valueBytes);
370 Dpid dpid = kryo.readObject(input, Dpid.class);
371 kryoFactory.deleteKryo(kryo);
372 flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
373 }
374
375 /**
376 * Receive a notification that an entry is removed.
377 *
378 * @param event the notification event for the entry.
379 */
380 public void entryRemoved(EntryEvent<Long, byte[]> event) {
381 Long keyLong = event.getKey();
382 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
383
384 byte[] valueBytes = event.getValue();
385
386 //
387 // Decode the value and deliver the notification
388 //
389 Kryo kryo = kryoFactory.newKryo();
390 Input input = new Input(valueBytes);
391 Dpid dpid = kryo.readObject(input, Dpid.class);
392 kryoFactory.deleteKryo(kryo);
393 flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
394 }
395
396 /**
397 * Receive a notification that an entry is updated.
398 *
399 * @param event the notification event for the entry.
400 */
401 public void entryUpdated(EntryEvent<Long, byte[]> event) {
402 Long keyLong = event.getKey();
403 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
404
405 byte[] valueBytes = event.getValue();
406
407 //
408 // Decode the value and deliver the notification
409 //
410 Kryo kryo = kryoFactory.newKryo();
411 Input input = new Input(valueBytes);
412 Dpid dpid = kryo.readObject(input, Dpid.class);
413 kryoFactory.deleteKryo(kryo);
414 flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
415 }
416
417 /**
418 * Receive a notification that an entry is evicted.
419 *
420 * @param event the notification event for the entry.
421 */
422 public void entryEvicted(EntryEvent<Long, byte[]> event) {
423 // NOTE: We don't use eviction for this map
424 }
425 }
426
427 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700428 * Class for receiving notifications for Network Topology state.
429 *
430 * The datagrid map is:
431 * - Key: TopologyElement ID (String)
432 * - Value: Serialized TopologyElement (byte[])
433 */
434 class MapTopologyListener implements EntryListener<String, byte[]> {
435 /**
436 * Receive a notification that an entry is added.
437 *
438 * @param event the notification event for the entry.
439 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800440 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800441 public void entryAdded(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800442 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700443
444 //
445 // Decode the value and deliver the notification
446 //
447 Kryo kryo = kryoFactory.newKryo();
448 Input input = new Input(valueBytes);
449 TopologyElement topologyElement =
450 kryo.readObject(input, TopologyElement.class);
451 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700452 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700453 }
454
455 /**
456 * Receive a notification that an entry is removed.
457 *
458 * @param event the notification event for the entry.
459 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800460 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800461 public void entryRemoved(EntryEvent<String, byte[]> event) {
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800462// String tag = "TopologyEntryRemoved.NotificationReceived." + event.getKey();
463 String tag = "TopologyEntryRemoved.NotificationReceived";
464 PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800465 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700466
467 //
468 // Decode the value and deliver the notification
469 //
470 Kryo kryo = kryoFactory.newKryo();
471 Input input = new Input(valueBytes);
472 TopologyElement topologyElement =
473 kryo.readObject(input, TopologyElement.class);
474 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700475 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800476// PerformanceMonitor.stop(tag);
477 m.stop();
478// PerformanceMonitor.report(tag);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700479 }
480
481 /**
482 * Receive a notification that an entry is updated.
483 *
484 * @param event the notification event for the entry.
485 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800486 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800487 public void entryUpdated(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800488 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700489
490 //
491 // Decode the value and deliver the notification
492 //
493 Kryo kryo = kryoFactory.newKryo();
494 Input input = new Input(valueBytes);
495 TopologyElement topologyElement =
496 kryo.readObject(input, TopologyElement.class);
497 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700498 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700499 }
500
501 /**
502 * Receive a notification that an entry is evicted.
503 *
504 * @param event the notification event for the entry.
505 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800506 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800507 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700508 // NOTE: We don't use eviction for this map
509 }
510 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800511
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800512 /**
Jonathan Hart7804bea2014-01-07 10:50:52 -0800513 * Class for receiving notifications for sending packet-outs.
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800514 *
515 * The datagrid map is:
Jonathan Hart7804bea2014-01-07 10:50:52 -0800516 * - Key: Packet-out to send (PacketOutNotification)
517 * - Value: dummy value (we only need the key) (byte[])
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800518 */
Jonathan Hart7804bea2014-01-07 10:50:52 -0800519 class PacketOutMapListener implements EntryListener<PacketOutNotification, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800520 /**
521 * Receive a notification that an entry is added.
522 *
523 * @param event the notification event for the entry.
524 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800525 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800526 public void entryAdded(EntryEvent<PacketOutNotification, byte[]> event) {
527 for (IPacketOutEventHandler packetOutEventHandler : packetOutEventHandlers) {
528 packetOutEventHandler.packetOutNotification(event.getKey());
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800529 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800530 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800531
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800532 /**
533 * Receive a notification that an entry is removed.
534 *
535 * @param event the notification event for the entry.
536 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800537 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800538 public void entryRemoved(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800539 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800540 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800541
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800542 /**
543 * Receive a notification that an entry is updated.
544 *
545 * @param event the notification event for the entry.
546 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800547 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800548 public void entryUpdated(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800549 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800550 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800551
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800552 /**
553 * Receive a notification that an entry is evicted.
554 *
555 * @param event the notification event for the entry.
556 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800557 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800558 public void entryEvicted(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800559 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800560 }
561 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700562
563 /**
Jonathan Hart7804bea2014-01-07 10:50:52 -0800564 * Class for receiving notifications for sending packet-outs.
565 *
566 * The datagrid map is:
567 * - Key: Packet-out to send (PacketOutNotification)
568 * - Value: dummy value (we only need the key) (byte[])
569 */
570 class ArpReplyMapListener implements EntryListener<ArpReplyNotification, byte[]> {
571 /**
572 * Receive a notification that an entry is added.
573 *
574 * @param event the notification event for the entry.
575 */
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800576 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800577 public void entryAdded(EntryEvent<ArpReplyNotification, byte[]> event) {
578 for (IArpReplyEventHandler arpReplyEventHandler : arpReplyEventHandlers) {
579 arpReplyEventHandler.arpReplyEvent(event.getKey());
580 }
581 }
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800582
Jonathan Hart7804bea2014-01-07 10:50:52 -0800583 // These methods aren't used for ARP replies
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800584 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800585 public void entryRemoved(EntryEvent<ArpReplyNotification, byte[]> event) {}
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800586 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800587 public void entryUpdated(EntryEvent<ArpReplyNotification, byte[]> event) {}
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800588 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800589 public void entryEvicted(EntryEvent<ArpReplyNotification, byte[]> event) {}
590 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700591
592 /**
593 * Initialize the Hazelcast Datagrid operation.
594 *
595 * @param conf the configuration filename.
596 */
597 public void init(String configFilename) {
598 /*
599 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
600 System.setProperty("hazelcast.socket.send.buffer.size", "32");
601 */
602 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800603
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700604 // Init from configuration file
605 try {
606 hazelcastConfig = new FileSystemXmlConfig(configFilename);
607 } catch (FileNotFoundException e) {
608 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
609 }
610 /*
611 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
612 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
613 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
614 */
615 //
616 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
617 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
618 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
619 }
620
621 /**
622 * Shutdown the Hazelcast Datagrid operation.
623 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800624 @Override
625 protected void finalize() {
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700626 close();
627 }
628
629 /**
630 * Shutdown the Hazelcast Datagrid operation.
631 */
632 public void close() {
633 Hazelcast.shutdownAll();
634 }
635
636 /**
637 * Get the collection of offered module services.
638 *
639 * @return the collection of offered module services.
640 */
641 @Override
642 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800643 Collection<Class<? extends IFloodlightService>> l =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700644 new ArrayList<Class<? extends IFloodlightService>>();
645 l.add(IDatagridService.class);
646 return l;
647 }
648
649 /**
650 * Get the collection of implemented services.
651 *
652 * @return the collection of implemented services.
653 */
654 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800655 public Map<Class<? extends IFloodlightService>, IFloodlightService>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700656 getServiceImpls() {
657 Map<Class<? extends IFloodlightService>,
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800658 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700659 new HashMap<Class<? extends IFloodlightService>,
660 IFloodlightService>();
661 m.put(IDatagridService.class, this);
662 return m;
663 }
664
665 /**
666 * Get the collection of modules this module depends on.
667 *
668 * @return the collection of modules this module depends on.
669 */
670 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800671 public Collection<Class<? extends IFloodlightService>>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700672 getModuleDependencies() {
673 Collection<Class<? extends IFloodlightService>> l =
674 new ArrayList<Class<? extends IFloodlightService>>();
675 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700676 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700677 return l;
678 }
679
680 /**
681 * Initialize the module.
682 *
683 * @param context the module context to use for the initialization.
684 */
685 @Override
686 public void init(FloodlightModuleContext context)
687 throws FloodlightModuleException {
688 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700689 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700690
691 // Get the configuration file name and configure the Datagrid
692 Map<String, String> configMap = context.getConfigParams(this);
693 String configFilename = configMap.get(HazelcastConfigFile);
694 this.init(configFilename);
695 }
696
697 /**
698 * Startup module operation.
699 *
700 * @param context the module context to use for the startup.
701 */
702 @Override
703 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700704 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700705
706 restApi.addRestletRoutable(new DatagridWebRoutable());
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800707
Jonathan Hart7804bea2014-01-07 10:50:52 -0800708 packetOutMap = hazelcastInstance.getMap(packetOutMapName);
709 packetOutMap.addEntryListener(new PacketOutMapListener(), true);
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800710
Jonathan Hart7804bea2014-01-07 10:50:52 -0800711 arpReplyMap = hazelcastInstance.getMap(arpReplyMapName);
712 arpReplyMap.addEntryListener(new ArpReplyMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700713 }
714
715 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700716 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700717 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700718 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700719 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700720 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700721 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700722 */
723 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700724 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
725 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700726
727 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700728 mapFlowListener = new MapFlowListener();
729 mapFlow = hazelcastInstance.getMap(mapFlowName);
730 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700731
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700732 // Initialize the FlowEntry-related map state
733 mapFlowEntryListener = new MapFlowEntryListener();
734 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
735 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
736
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800737 // Initialize the FlowId-related map state
738 mapFlowIdListener = new MapFlowIdListener();
739 mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
740 mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
741
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800742 // Initialize the FlowEntryId-related map state
743 mapFlowEntryIdListener = new MapFlowEntryIdListener();
744 mapFlowEntryId = hazelcastInstance.getMap(mapFlowEntryIdName);
745 mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);
746
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700747 // Initialize the Topology-related map state
748 mapTopologyListener = new MapTopologyListener();
749 mapTopology = hazelcastInstance.getMap(mapTopologyName);
750 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700751 }
752
753 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700754 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700755 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700756 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700757 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700758 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700759 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700760 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700761 */
762 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700763 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700764 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700765 mapFlow.removeEntryListener(mapFlowListenerId);
766 mapFlow = null;
767 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700768
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700769 // Clear the FlowEntry-related map state
770 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
771 mapFlowEntry = null;
772 mapFlowEntryListener = null;
773
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800774 // Clear the FlowId-related map state
775 mapFlowId.removeEntryListener(mapFlowIdListenerId);
776 mapFlowId = null;
777 mapFlowIdListener = null;
778
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800779 // Clear the FlowEntryId-related map state
780 mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
781 mapFlowEntryId = null;
782 mapFlowEntryIdListener = null;
783
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700784 // Clear the Topology-related map state
785 mapTopology.removeEntryListener(mapTopologyListenerId);
786 mapTopology = null;
787 mapTopologyListener = null;
788
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700789 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700790 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800791
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800792 @Override
Jonathan Hartc6325622014-01-14 16:37:50 -0800793 public void registerPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
794 if (packetOutEventHandler != null) {
795 packetOutEventHandlers.add(packetOutEventHandler);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800796 }
797 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800798
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800799 @Override
Jonathan Hartc6325622014-01-14 16:37:50 -0800800 public void deregisterPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
801 packetOutEventHandlers.remove(packetOutEventHandler);
Jonathan Hart7804bea2014-01-07 10:50:52 -0800802 }
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800803
Jonathan Hart7804bea2014-01-07 10:50:52 -0800804 @Override
805 public void registerArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
806 if (arpReplyEventHandler != null) {
807 arpReplyEventHandlers.add(arpReplyEventHandler);
808 }
809 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800810
Jonathan Hart7804bea2014-01-07 10:50:52 -0800811 @Override
812 public void deregisterArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
813 arpReplyEventHandlers.remove(arpReplyEventHandler);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800814 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800815
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700816 /**
817 * Get all Flows that are currently in the datagrid.
818 *
819 * @return all Flows that are currently in the datagrid.
820 */
821 @Override
822 public Collection<FlowPath> getAllFlows() {
823 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
824
825 //
826 // Get all current entries
827 //
828 Collection<byte[]> values = mapFlow.values();
829 Kryo kryo = kryoFactory.newKryo();
830 for (byte[] valueBytes : values) {
831 //
832 // Decode the value
833 //
834 Input input = new Input(valueBytes);
835 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
836 allFlows.add(flowPath);
837 }
838 kryoFactory.deleteKryo(kryo);
839
840 return allFlows;
841 }
842
843 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800844 * Get a Flow for a given Flow ID.
845 *
846 * @param flowId the Flow ID of the Flow to get.
847 * @return the Flow if found, otherwise null.
848 */
849 @Override
850 public FlowPath getFlow(FlowId flowId) {
851 byte[] valueBytes = mapFlow.get(flowId.value());
852 if (valueBytes == null)
853 return null;
854
855 Kryo kryo = kryoFactory.newKryo();
856 //
857 // Decode the value
858 //
859 Input input = new Input(valueBytes);
860 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
861 kryoFactory.deleteKryo(kryo);
862
863 return flowPath;
864 }
865
866 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700867 * Send a notification that a Flow is added.
868 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700869 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700870 */
871 @Override
872 public void notificationSendFlowAdded(FlowPath flowPath) {
873 //
874 // Encode the value
875 //
876 byte[] buffer = new byte[MAX_BUFFER_SIZE];
877 Kryo kryo = kryoFactory.newKryo();
878 Output output = new Output(buffer, -1);
879 kryo.writeObject(output, flowPath);
880 byte[] valueBytes = output.toBytes();
881 kryoFactory.deleteKryo(kryo);
882
883 //
884 // Put the entry:
885 // - Key : Flow ID (Long)
886 // - Value : Serialized Flow (byte[])
887 //
888 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
889 }
890
891 /**
892 * Send a notification that a Flow is removed.
893 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700894 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700895 */
896 @Override
897 public void notificationSendFlowRemoved(FlowId flowId) {
898 //
899 // Remove the entry:
900 // - Key : Flow ID (Long)
901 // - Value : Serialized Flow (byte[])
902 //
903 mapFlow.removeAsync(flowId.value());
904 }
905
906 /**
907 * Send a notification that a Flow is updated.
908 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700909 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700910 */
911 @Override
912 public void notificationSendFlowUpdated(FlowPath flowPath) {
913 // NOTE: Adding an entry with an existing key automatically updates it
914 notificationSendFlowAdded(flowPath);
915 }
916
917 /**
918 * Send a notification that all Flows are removed.
919 */
920 @Override
921 public void notificationSendAllFlowsRemoved() {
922 //
923 // Remove all entries
924 // NOTE: We remove the entries one-by-one so the per-entry
925 // notifications will be delivered.
926 //
927 // mapFlow.clear();
928 Set<Long> keySet = mapFlow.keySet();
929 for (Long key : keySet) {
930 mapFlow.removeAsync(key);
931 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700932 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700933
934 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700935 * Get all Flow Entries that are currently in the datagrid.
936 *
937 * @return all Flow Entries that are currently in the datagrid.
938 */
939 @Override
940 public Collection<FlowEntry> getAllFlowEntries() {
941 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
942
943 //
944 // Get all current entries
945 //
946 Collection<byte[]> values = mapFlowEntry.values();
947 Kryo kryo = kryoFactory.newKryo();
948 for (byte[] valueBytes : values) {
949 //
950 // Decode the value
951 //
952 Input input = new Input(valueBytes);
953 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
954 allFlowEntries.add(flowEntry);
955 }
956 kryoFactory.deleteKryo(kryo);
957
958 return allFlowEntries;
959 }
960
961 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800962 * Get a Flow Entry for a given Flow Entry ID.
963 *
964 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
965 * @return the Flow Entry if found, otherwise null.
966 */
967 @Override
968 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
969 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
970 if (valueBytes == null)
971 return null;
972
973 Kryo kryo = kryoFactory.newKryo();
974 //
975 // Decode the value
976 //
977 Input input = new Input(valueBytes);
978 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
979 kryoFactory.deleteKryo(kryo);
980
981 return flowEntry;
982 }
983
984 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700985 * Send a notification that a FlowEntry is added.
986 *
987 * @param flowEntry the FlowEntry that is added.
988 */
989 @Override
990 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
991 //
992 // Encode the value
993 //
994 byte[] buffer = new byte[MAX_BUFFER_SIZE];
995 Kryo kryo = kryoFactory.newKryo();
996 Output output = new Output(buffer, -1);
997 kryo.writeObject(output, flowEntry);
998 byte[] valueBytes = output.toBytes();
999 kryoFactory.deleteKryo(kryo);
1000
1001 //
1002 // Put the entry:
1003 // - Key : FlowEntry ID (Long)
1004 // - Value : Serialized FlowEntry (byte[])
1005 //
1006 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
1007 }
1008
1009 /**
1010 * Send a notification that a FlowEntry is removed.
1011 *
1012 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
1013 */
1014 @Override
1015 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
1016 //
1017 // Remove the entry:
1018 // - Key : FlowEntry ID (Long)
1019 // - Value : Serialized FlowEntry (byte[])
1020 //
1021 mapFlowEntry.removeAsync(flowEntryId.value());
1022 }
1023
1024 /**
1025 * Send a notification that a FlowEntry is updated.
1026 *
1027 * @param flowEntry the FlowEntry that is updated.
1028 */
1029 @Override
1030 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
1031 // NOTE: Adding an entry with an existing key automatically updates it
1032 notificationSendFlowEntryAdded(flowEntry);
1033 }
1034
1035 /**
1036 * Send a notification that all Flow Entries are removed.
1037 */
1038 @Override
1039 public void notificationSendAllFlowEntriesRemoved() {
1040 //
1041 // Remove all entries
1042 // NOTE: We remove the entries one-by-one so the per-entry
1043 // notifications will be delivered.
1044 //
1045 // mapFlowEntry.clear();
1046 Set<Long> keySet = mapFlowEntry.keySet();
1047 for (Long key : keySet) {
1048 mapFlowEntry.removeAsync(key);
1049 }
1050 }
1051
1052 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001053 * Get all Flow IDs that are currently in the datagrid.
1054 *
1055 * @return all Flow IDs that are currently in the datagrid.
1056 */
1057 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001058 public Collection<Pair<FlowId, Dpid>> getAllFlowIds() {
1059 Collection<Pair<FlowId, Dpid>> allFlowIds =
1060 new LinkedList<Pair<FlowId, Dpid>>();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001061
1062 //
1063 // Get all current entries
1064 //
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001065 Kryo kryo = kryoFactory.newKryo();
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001066 for (Map.Entry<Long, byte[]> entry : mapFlowId.entrySet()) {
1067 Long key = entry.getKey();
1068 byte[] valueBytes = entry.getValue();
1069
1070 FlowId flowId = new FlowId(key);
1071
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001072 //
1073 // Decode the value
1074 //
1075 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001076 Dpid dpid = kryo.readObject(input, Dpid.class);
1077
1078 Pair<FlowId, Dpid> pair = new Pair(flowId, dpid);
1079 allFlowIds.add(pair);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001080 }
1081 kryoFactory.deleteKryo(kryo);
1082
1083 return allFlowIds;
1084 }
1085
1086 /**
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001087 * Get all Flow Entry IDs that are currently in the datagrid.
1088 *
1089 * @return all Flow Entry IDs that ae currently in the datagrid.
1090 */
1091 @Override
1092 public Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds() {
1093 Collection<Pair<FlowEntryId, Dpid>> allFlowEntryIds =
1094 new LinkedList<Pair<FlowEntryId, Dpid>>();
1095
1096 //
1097 // Get all current entries
1098 //
1099 Kryo kryo = kryoFactory.newKryo();
1100 for (Map.Entry<Long, byte[]> entry : mapFlowEntryId.entrySet()) {
1101 Long key = entry.getKey();
1102 byte[] valueBytes = entry.getValue();
1103
1104 FlowEntryId flowEntryId = new FlowEntryId(key);
1105
1106 //
1107 // Decode the value
1108 //
1109 Input input = new Input(valueBytes);
1110 Dpid dpid = kryo.readObject(input, Dpid.class);
1111
1112 Pair<FlowEntryId, Dpid> pair = new Pair(flowEntryId, dpid);
1113 allFlowEntryIds.add(pair);
1114 }
1115 kryoFactory.deleteKryo(kryo);
1116
1117 return allFlowEntryIds;
1118 }
1119
1120 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001121 * Send a notification that a FlowId is added.
1122 *
1123 * @param flowId the FlowId that is added.
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001124 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001125 */
1126 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001127 public void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid) {
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001128 //
1129 // Encode the value
1130 //
1131 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1132 Kryo kryo = kryoFactory.newKryo();
1133 Output output = new Output(buffer, -1);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001134 kryo.writeObject(output, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001135 byte[] valueBytes = output.toBytes();
1136 kryoFactory.deleteKryo(kryo);
1137
1138 //
1139 // Put the entry:
1140 // - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001141 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001142 //
1143 mapFlowId.putAsync(flowId.value(), valueBytes);
1144 }
1145
1146 /**
1147 * Send a notification that a FlowId is removed.
1148 *
1149 * @param flowId the FlowId that is removed.
1150 */
1151 @Override
1152 public void notificationSendFlowIdRemoved(FlowId flowId) {
1153 //
1154 // Remove the entry:
1155 // - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001156 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001157 //
1158 mapFlowId.removeAsync(flowId.value());
1159 }
1160
1161 /**
1162 * Send a notification that a FlowId is updated.
1163 *
1164 * @param flowId the FlowId that is updated.
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001165 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001166 */
1167 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001168 public void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid) {
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001169 // NOTE: Adding an entry with an existing key automatically updates it
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001170 notificationSendFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001171 }
1172
1173 /**
1174 * Send a notification that all Flow IDs are removed.
1175 */
1176 @Override
1177 public void notificationSendAllFlowIdsRemoved() {
1178 //
1179 // Remove all entries
1180 // NOTE: We remove the entries one-by-one so the per-entry
1181 // notifications will be delivered.
1182 //
1183 // mapFlowId.clear();
1184 Set<Long> keySet = mapFlowId.keySet();
1185 for (Long key : keySet) {
1186 mapFlowId.removeAsync(key);
1187 }
1188 }
1189
1190 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001191 * Send a notification that a FlowEntryId is added.
1192 *
1193 * @param flowEntryId the FlowEntryId that is added.
1194 * @param dpid the Switch Dpid.
1195 */
1196 @Override
1197 public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
1198 Dpid dpid) {
1199 //
1200 // Encode the value
1201 //
1202 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1203 Kryo kryo = kryoFactory.newKryo();
1204 Output output = new Output(buffer, -1);
1205 kryo.writeObject(output, dpid);
1206 byte[] valueBytes = output.toBytes();
1207 kryoFactory.deleteKryo(kryo);
1208
1209 //
1210 // Put the entry:
1211 // - Key : FlowEntryId (Long)
1212 // - Value : Serialized Switch Dpid (byte[])
1213 //
1214 mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
1215 }
1216
1217 /**
1218 * Send a notification that a FlowEntryId is removed.
1219 *
1220 * @param flowEntryId the FlowEntryId that is removed.
1221 */
1222 @Override
1223 public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
1224 //
1225 // Remove the entry:
1226 // - Key : FlowEntryId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001227 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001228 //
1229 mapFlowEntryId.removeAsync(flowEntryId.value());
1230 }
1231
1232 /**
1233 * Send a notification that a FlowEntryId is updated.
1234 *
1235 * @param flowEntryId the FlowEntryId that is updated.
1236 * @param dpid the Switch Dpid.
1237 */
1238 @Override
1239 public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
1240 Dpid dpid) {
1241 // NOTE: Adding an entry with an existing key automatically updates it
1242 notificationSendFlowEntryIdAdded(flowEntryId, dpid);
1243 }
1244
1245 /**
1246 * Send a notification that all Flow Entry IDs are removed.
1247 */
1248 @Override
1249 public void notificationSendAllFlowEntryIdsRemoved() {
1250 //
1251 // Remove all entries
1252 // NOTE: We remove the entries one-by-one so the per-entry
1253 // notifications will be delivered.
1254 //
1255 // mapFlowEntryId.clear();
1256 Set<Long> keySet = mapFlowEntryId.keySet();
1257 for (Long key : keySet) {
1258 mapFlowEntryId.removeAsync(key);
1259 }
1260 }
1261
1262 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001263 * Get all Topology Elements that are currently in the datagrid.
1264 *
1265 * @return all Topology Elements that are currently in the datagrid.
1266 */
1267 @Override
1268 public Collection<TopologyElement> getAllTopologyElements() {
1269 Collection<TopologyElement> allTopologyElements =
1270 new LinkedList<TopologyElement>();
1271
1272 //
1273 // Get all current entries
1274 //
1275 Collection<byte[]> values = mapTopology.values();
1276 Kryo kryo = kryoFactory.newKryo();
1277 for (byte[] valueBytes : values) {
1278 //
1279 // Decode the value
1280 //
1281 Input input = new Input(valueBytes);
1282 TopologyElement topologyElement =
1283 kryo.readObject(input, TopologyElement.class);
1284 allTopologyElements.add(topologyElement);
1285 }
1286 kryoFactory.deleteKryo(kryo);
1287
1288 return allTopologyElements;
1289 }
1290
1291 /**
1292 * Send a notification that a Topology Element is added.
1293 *
1294 * @param topologyElement the Topology Element that is added.
1295 */
1296 @Override
1297 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
1298 //
1299 // Encode the value
1300 //
1301 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1302 Kryo kryo = kryoFactory.newKryo();
1303 Output output = new Output(buffer, -1);
1304 kryo.writeObject(output, topologyElement);
1305 byte[] valueBytes = output.toBytes();
1306 kryoFactory.deleteKryo(kryo);
1307
1308 //
1309 // Put the entry:
1310 // - Key : TopologyElement ID (String)
1311 // - Value : Serialized TopologyElement (byte[])
1312 //
1313 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
1314 }
1315
1316 /**
1317 * Send a notification that a Topology Element is removed.
1318 *
1319 * @param topologyElement the Topology Element that is removed.
1320 */
1321 @Override
1322 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
1323 //
1324 // Remove the entry:
1325 // - Key : TopologyElement ID (String)
1326 // - Value : Serialized TopologyElement (byte[])
1327 //
1328 mapTopology.removeAsync(topologyElement.elementId());
1329 }
1330
1331 /**
1332 * Send a notification that a Topology Element is updated.
1333 *
1334 * @param topologyElement the Topology Element that is updated.
1335 */
1336 @Override
1337 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
1338 // NOTE: Adding an entry with an existing key automatically updates it
1339 notificationSendTopologyElementAdded(topologyElement);
1340 }
1341
1342 /**
1343 * Send a notification that all Topology Elements are removed.
1344 */
1345 @Override
1346 public void notificationSendAllTopologyElementsRemoved() {
1347 //
1348 // Remove all entries
1349 // NOTE: We remove the entries one-by-one so the per-entry
1350 // notifications will be delivered.
1351 //
1352 // mapTopology.clear();
1353 Set<String> keySet = mapTopology.keySet();
1354 for (String key : keySet) {
1355 mapTopology.removeAsync(key);
1356 }
1357 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -08001358
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001359 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -08001360 public void sendPacketOutNotification(PacketOutNotification packetOutNotification) {
1361 packetOutMap.putAsync(packetOutNotification, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001362 }
Jonathan Hart7804bea2014-01-07 10:50:52 -08001363
1364 @Override
1365 public void sendArpReplyNotification(ArpReplyNotification arpReply) {
1366 arpReplyMap.putAsync(arpReply, dummyByte, 1L, TimeUnit.MILLISECONDS);
1367 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001368}