blob: b24029972df55c579b50a450cdae6b80ea6b560e [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hart7804bea2014-01-07 10:50:52 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
22import net.onrc.onos.ofcontroller.proxyarp.IArpReplyEventHandler;
23import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
24import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070025import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080026import net.onrc.onos.ofcontroller.util.Dpid;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070027import net.onrc.onos.ofcontroller.util.FlowEntry;
28import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070029import net.onrc.onos.ofcontroller.util.FlowId;
30import net.onrc.onos.ofcontroller.util.FlowPath;
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -080031import net.onrc.onos.ofcontroller.util.Pair;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070032import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
33
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070034import org.slf4j.Logger;
35import org.slf4j.LoggerFactory;
36
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -080037import com.esotericsoftware.kryo.Kryo;
38import com.esotericsoftware.kryo.io.Input;
39import com.esotericsoftware.kryo.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070040import com.hazelcast.config.Config;
41import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070042import com.hazelcast.core.EntryEvent;
43import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070044import com.hazelcast.core.Hazelcast;
45import com.hazelcast.core.HazelcastInstance;
Toshio Koide3738ee52014-02-12 14:57:39 -080046import com.hazelcast.core.IList;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070047import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070048import com.hazelcast.instance.GroupProperties;
Toshio Koide3738ee52014-02-12 14:57:39 -080049import net.onrc.onos.intent.Intent;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070050
Pavlin Radoslavove2497672014-01-12 18:03:35 -080051import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor;
52
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070053/**
54 * A datagrid service that uses Hazelcast as a datagrid.
55 * The relevant data is stored in the Hazelcast datagrid and shared as
56 * appropriate in a multi-node cluster.
57 */
58public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070059 private final static int MAX_BUFFER_SIZE = 64*1024;
60
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070061 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070062 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070063 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070064
65 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070066 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070067 private Config hazelcastConfig = null;
68
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070069 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070070 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070071
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080072 private Map<String, IEventChannel<?, ?>> eventChannels =
73 new HashMap<String, IEventChannel<?, ?>>();
74
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070075 // State related to the Flow map
76 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070077 private IMap<Long, byte[]> mapFlow = null;
78 private MapFlowListener mapFlowListener = null;
79 private String mapFlowListenerId = null;
80
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070081 // State related to the Flow Entry map
82 protected static final String mapFlowEntryName = "mapFlowEntry";
83 private IMap<Long, byte[]> mapFlowEntry = null;
84 private MapFlowEntryListener mapFlowEntryListener = null;
85 private String mapFlowEntryListenerId = null;
86
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -080087 // State related to the Flow ID map
88 protected static final String mapFlowIdName = "mapFlowId";
89 private IMap<Long, byte[]> mapFlowId = null;
90 private MapFlowIdListener mapFlowIdListener = null;
91 private String mapFlowIdListenerId = null;
92
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080093 // State related to the Flow Entry ID map
94 protected static final String mapFlowEntryIdName = "mapFlowEntryId";
95 private IMap<Long, byte[]> mapFlowEntryId = null;
96 private MapFlowEntryIdListener mapFlowEntryIdListener = null;
97 private String mapFlowEntryIdListenerId = null;
98
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070099 // State related to the Network Topology map
100 protected static final String mapTopologyName = "mapTopology";
101 private IMap<String, byte[]> mapTopology = null;
102 private MapTopologyListener mapTopologyListener = null;
103 private String mapTopologyListenerId = null;
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800104
Jonathan Hart7804bea2014-01-07 10:50:52 -0800105 // State related to the packet out map
106 protected static final String packetOutMapName = "packetOutMap";
107 private IMap<PacketOutNotification, byte[]> packetOutMap = null;
108 private List<IPacketOutEventHandler> packetOutEventHandlers = new ArrayList<IPacketOutEventHandler>();
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800109
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800110 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700111
Jonathan Hart7804bea2014-01-07 10:50:52 -0800112 // State related to the ARP reply map
113 protected static final String arpReplyMapName = "arpReplyMap";
114 private IMap<ArpReplyNotification, byte[]> arpReplyMap = null;
115 private List<IArpReplyEventHandler> arpReplyEventHandlers = new ArrayList<IArpReplyEventHandler>();
Toshio Koide3738ee52014-02-12 14:57:39 -0800116
117
118 protected static final String intentListName = "intentList";
119 private IList<Intent> intentList = null;
120
121 @Override
122 public void registerIntent(Collection<Intent> intents) {
123 intentList.addAll(intents);
124 }
125
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700126
127 /**
128 * Class for receiving notifications for Flow state.
129 *
130 * The datagrid map is:
131 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -0800132 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700133 */
134 class MapFlowListener implements EntryListener<Long, byte[]> {
135 /**
136 * Receive a notification that an entry is added.
137 *
138 * @param event the notification event for the entry.
139 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800140 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800141 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800142 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700143
144 //
145 // Decode the value and deliver the notification
146 //
147 Kryo kryo = kryoFactory.newKryo();
148 Input input = new Input(valueBytes);
149 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
150 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700151 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700152 }
153
154 /**
155 * Receive a notification that an entry is removed.
156 *
157 * @param event the notification event for the entry.
158 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800159 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800160 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800161 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700162
163 //
164 // Decode the value and deliver the notification
165 //
166 Kryo kryo = kryoFactory.newKryo();
167 Input input = new Input(valueBytes);
168 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
169 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700170 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700171 }
172
173 /**
174 * Receive a notification that an entry is updated.
175 *
176 * @param event the notification event for the entry.
177 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800178 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800179 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800180 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700181
182 //
183 // Decode the value and deliver the notification
184 //
185 Kryo kryo = kryoFactory.newKryo();
186 Input input = new Input(valueBytes);
187 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
188 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700189 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700190 }
191
192 /**
193 * Receive a notification that an entry is evicted.
194 *
195 * @param event the notification event for the entry.
196 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800197 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800198 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700199 // NOTE: We don't use eviction for this map
200 }
201 }
202
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700203 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700204 * Class for receiving notifications for FlowEntry state.
205 *
206 * The datagrid map is:
207 * - Key : FlowEntry ID (Long)
208 * - Value : Serialized FlowEntry (byte[])
209 */
210 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
211 /**
212 * Receive a notification that an entry is added.
213 *
214 * @param event the notification event for the entry.
215 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800216 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800217 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800218 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700219
220 //
221 // Decode the value and deliver the notification
222 //
223 Kryo kryo = kryoFactory.newKryo();
224 Input input = new Input(valueBytes);
225 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
226 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700227 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700228 }
229
230 /**
231 * Receive a notification that an entry is removed.
232 *
233 * @param event the notification event for the entry.
234 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800235 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800236 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800237 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700238
239 //
240 // Decode the value and deliver the notification
241 //
242 Kryo kryo = kryoFactory.newKryo();
243 Input input = new Input(valueBytes);
244 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
245 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700246 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700247 }
248
249 /**
250 * Receive a notification that an entry is updated.
251 *
252 * @param event the notification event for the entry.
253 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800254 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800255 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800256 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700257
258 //
259 // Decode the value and deliver the notification
260 //
261 Kryo kryo = kryoFactory.newKryo();
262 Input input = new Input(valueBytes);
263 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
264 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700265 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700266 }
267
268 /**
269 * Receive a notification that an entry is evicted.
270 *
271 * @param event the notification event for the entry.
272 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800273 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800274 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700275 // NOTE: We don't use eviction for this map
276 }
277 }
278
279 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800280 * Class for receiving notifications for FlowId state.
281 *
282 * The datagrid map is:
283 * - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800284 * - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800285 */
286 class MapFlowIdListener implements EntryListener<Long, byte[]> {
287 /**
288 * Receive a notification that an entry is added.
289 *
290 * @param event the notification event for the entry.
291 */
292 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800293 Long keyLong = event.getKey();
294 FlowId flowId = new FlowId(keyLong);
295
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800296 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800297
298 //
299 // Decode the value and deliver the notification
300 //
301 Kryo kryo = kryoFactory.newKryo();
302 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800303 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800304 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800305 flowEventHandlerService.notificationRecvFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800306 }
307
308 /**
309 * Receive a notification that an entry is removed.
310 *
311 * @param event the notification event for the entry.
312 */
313 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800314 Long keyLong = event.getKey();
315 FlowId flowId = new FlowId(keyLong);
316
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800317 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800318
319 //
320 // Decode the value and deliver the notification
321 //
322 Kryo kryo = kryoFactory.newKryo();
323 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800324 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800325 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800326 flowEventHandlerService.notificationRecvFlowIdRemoved(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800327 }
328
329 /**
330 * Receive a notification that an entry is updated.
331 *
332 * @param event the notification event for the entry.
333 */
334 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800335 Long keyLong = event.getKey();
336 FlowId flowId = new FlowId(keyLong);
337
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800338 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800339
340 //
341 // Decode the value and deliver the notification
342 //
343 Kryo kryo = kryoFactory.newKryo();
344 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800345 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800346 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800347 flowEventHandlerService.notificationRecvFlowIdUpdated(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800348 }
349
350 /**
351 * Receive a notification that an entry is evicted.
352 *
353 * @param event the notification event for the entry.
354 */
355 public void entryEvicted(EntryEvent<Long, byte[]> event) {
356 // NOTE: We don't use eviction for this map
357 }
358 }
359
360 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800361 * Class for receiving notifications for FlowEntryId state.
362 *
363 * The datagrid map is:
364 * - Key : FlowEntryId (Long)
365 * - Value : Serialized Switch Dpid (byte[])
366 */
367 class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
368 /**
369 * Receive a notification that an entry is added.
370 *
371 * @param event the notification event for the entry.
372 */
373 public void entryAdded(EntryEvent<Long, byte[]> event) {
374 Long keyLong = event.getKey();
375 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
376
377 byte[] valueBytes = event.getValue();
378
379 //
380 // Decode the value and deliver the notification
381 //
382 Kryo kryo = kryoFactory.newKryo();
383 Input input = new Input(valueBytes);
384 Dpid dpid = kryo.readObject(input, Dpid.class);
385 kryoFactory.deleteKryo(kryo);
386 flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
387 }
388
389 /**
390 * Receive a notification that an entry is removed.
391 *
392 * @param event the notification event for the entry.
393 */
394 public void entryRemoved(EntryEvent<Long, byte[]> event) {
395 Long keyLong = event.getKey();
396 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
397
398 byte[] valueBytes = event.getValue();
399
400 //
401 // Decode the value and deliver the notification
402 //
403 Kryo kryo = kryoFactory.newKryo();
404 Input input = new Input(valueBytes);
405 Dpid dpid = kryo.readObject(input, Dpid.class);
406 kryoFactory.deleteKryo(kryo);
407 flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
408 }
409
410 /**
411 * Receive a notification that an entry is updated.
412 *
413 * @param event the notification event for the entry.
414 */
415 public void entryUpdated(EntryEvent<Long, byte[]> event) {
416 Long keyLong = event.getKey();
417 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
418
419 byte[] valueBytes = event.getValue();
420
421 //
422 // Decode the value and deliver the notification
423 //
424 Kryo kryo = kryoFactory.newKryo();
425 Input input = new Input(valueBytes);
426 Dpid dpid = kryo.readObject(input, Dpid.class);
427 kryoFactory.deleteKryo(kryo);
428 flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
429 }
430
431 /**
432 * Receive a notification that an entry is evicted.
433 *
434 * @param event the notification event for the entry.
435 */
436 public void entryEvicted(EntryEvent<Long, byte[]> event) {
437 // NOTE: We don't use eviction for this map
438 }
439 }
440
441 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700442 * Class for receiving notifications for Network Topology state.
443 *
444 * The datagrid map is:
445 * - Key: TopologyElement ID (String)
446 * - Value: Serialized TopologyElement (byte[])
447 */
448 class MapTopologyListener implements EntryListener<String, byte[]> {
449 /**
450 * Receive a notification that an entry is added.
451 *
452 * @param event the notification event for the entry.
453 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800454 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800455 public void entryAdded(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800456 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700457
458 //
459 // Decode the value and deliver the notification
460 //
461 Kryo kryo = kryoFactory.newKryo();
462 Input input = new Input(valueBytes);
463 TopologyElement topologyElement =
464 kryo.readObject(input, TopologyElement.class);
465 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700466 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700467 }
468
469 /**
470 * Receive a notification that an entry is removed.
471 *
472 * @param event the notification event for the entry.
473 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800474 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800475 public void entryRemoved(EntryEvent<String, byte[]> event) {
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800476// String tag = "TopologyEntryRemoved.NotificationReceived." + event.getKey();
477 String tag = "TopologyEntryRemoved.NotificationReceived";
478 PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800479 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700480
481 //
482 // Decode the value and deliver the notification
483 //
484 Kryo kryo = kryoFactory.newKryo();
485 Input input = new Input(valueBytes);
486 TopologyElement topologyElement =
487 kryo.readObject(input, TopologyElement.class);
488 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700489 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800490// PerformanceMonitor.stop(tag);
491 m.stop();
492// PerformanceMonitor.report(tag);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700493 }
494
495 /**
496 * Receive a notification that an entry is updated.
497 *
498 * @param event the notification event for the entry.
499 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800500 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800501 public void entryUpdated(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800502 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700503
504 //
505 // Decode the value and deliver the notification
506 //
507 Kryo kryo = kryoFactory.newKryo();
508 Input input = new Input(valueBytes);
509 TopologyElement topologyElement =
510 kryo.readObject(input, TopologyElement.class);
511 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700512 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700513 }
514
515 /**
516 * Receive a notification that an entry is evicted.
517 *
518 * @param event the notification event for the entry.
519 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800520 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800521 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700522 // NOTE: We don't use eviction for this map
523 }
524 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800525
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800526 /**
Jonathan Hart7804bea2014-01-07 10:50:52 -0800527 * Class for receiving notifications for sending packet-outs.
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800528 *
529 * The datagrid map is:
Jonathan Hart7804bea2014-01-07 10:50:52 -0800530 * - Key: Packet-out to send (PacketOutNotification)
531 * - Value: dummy value (we only need the key) (byte[])
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800532 */
Jonathan Hart7804bea2014-01-07 10:50:52 -0800533 class PacketOutMapListener implements EntryListener<PacketOutNotification, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800534 /**
535 * Receive a notification that an entry is added.
536 *
537 * @param event the notification event for the entry.
538 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800539 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800540 public void entryAdded(EntryEvent<PacketOutNotification, byte[]> event) {
541 for (IPacketOutEventHandler packetOutEventHandler : packetOutEventHandlers) {
542 packetOutEventHandler.packetOutNotification(event.getKey());
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800543 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800544 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800545
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800546 /**
547 * Receive a notification that an entry is removed.
548 *
549 * @param event the notification event for the entry.
550 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800551 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800552 public void entryRemoved(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800553 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800554 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800555
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800556 /**
557 * Receive a notification that an entry is updated.
558 *
559 * @param event the notification event for the entry.
560 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800561 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800562 public void entryUpdated(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800563 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800564 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800565
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800566 /**
567 * Receive a notification that an entry is evicted.
568 *
569 * @param event the notification event for the entry.
570 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800571 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800572 public void entryEvicted(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800573 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800574 }
575 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700576
577 /**
Jonathan Hart7804bea2014-01-07 10:50:52 -0800578 * Class for receiving notifications for sending packet-outs.
579 *
580 * The datagrid map is:
581 * - Key: Packet-out to send (PacketOutNotification)
582 * - Value: dummy value (we only need the key) (byte[])
583 */
584 class ArpReplyMapListener implements EntryListener<ArpReplyNotification, byte[]> {
585 /**
586 * Receive a notification that an entry is added.
587 *
588 * @param event the notification event for the entry.
589 */
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800590 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800591 public void entryAdded(EntryEvent<ArpReplyNotification, byte[]> event) {
592 for (IArpReplyEventHandler arpReplyEventHandler : arpReplyEventHandlers) {
593 arpReplyEventHandler.arpReplyEvent(event.getKey());
594 }
595 }
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800596
Jonathan Hart7804bea2014-01-07 10:50:52 -0800597 // These methods aren't used for ARP replies
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800598 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800599 public void entryRemoved(EntryEvent<ArpReplyNotification, byte[]> event) {}
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800600 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800601 public void entryUpdated(EntryEvent<ArpReplyNotification, byte[]> event) {}
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800602 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800603 public void entryEvicted(EntryEvent<ArpReplyNotification, byte[]> event) {}
604 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700605
606 /**
607 * Initialize the Hazelcast Datagrid operation.
608 *
609 * @param conf the configuration filename.
610 */
611 public void init(String configFilename) {
612 /*
613 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
614 System.setProperty("hazelcast.socket.send.buffer.size", "32");
615 */
616 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800617
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700618 // Init from configuration file
619 try {
620 hazelcastConfig = new FileSystemXmlConfig(configFilename);
621 } catch (FileNotFoundException e) {
622 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
623 }
624 /*
625 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
626 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
627 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
628 */
629 //
630 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
631 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
632 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
633 }
634
635 /**
636 * Shutdown the Hazelcast Datagrid operation.
637 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800638 @Override
639 protected void finalize() {
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700640 close();
641 }
642
643 /**
644 * Shutdown the Hazelcast Datagrid operation.
645 */
646 public void close() {
647 Hazelcast.shutdownAll();
648 }
649
650 /**
651 * Get the collection of offered module services.
652 *
653 * @return the collection of offered module services.
654 */
655 @Override
656 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800657 Collection<Class<? extends IFloodlightService>> l =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700658 new ArrayList<Class<? extends IFloodlightService>>();
659 l.add(IDatagridService.class);
660 return l;
661 }
662
663 /**
664 * Get the collection of implemented services.
665 *
666 * @return the collection of implemented services.
667 */
668 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800669 public Map<Class<? extends IFloodlightService>, IFloodlightService>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700670 getServiceImpls() {
671 Map<Class<? extends IFloodlightService>,
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800672 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700673 new HashMap<Class<? extends IFloodlightService>,
674 IFloodlightService>();
675 m.put(IDatagridService.class, this);
676 return m;
677 }
678
679 /**
680 * Get the collection of modules this module depends on.
681 *
682 * @return the collection of modules this module depends on.
683 */
684 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800685 public Collection<Class<? extends IFloodlightService>>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700686 getModuleDependencies() {
687 Collection<Class<? extends IFloodlightService>> l =
688 new ArrayList<Class<? extends IFloodlightService>>();
689 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700690 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700691 return l;
692 }
693
694 /**
695 * Initialize the module.
696 *
697 * @param context the module context to use for the initialization.
698 */
699 @Override
700 public void init(FloodlightModuleContext context)
701 throws FloodlightModuleException {
702 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700703 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700704
705 // Get the configuration file name and configure the Datagrid
706 Map<String, String> configMap = context.getConfigParams(this);
707 String configFilename = configMap.get(HazelcastConfigFile);
708 this.init(configFilename);
709 }
710
711 /**
712 * Startup module operation.
713 *
714 * @param context the module context to use for the startup.
715 */
716 @Override
717 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700718 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700719
720 restApi.addRestletRoutable(new DatagridWebRoutable());
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800721
Jonathan Hart7804bea2014-01-07 10:50:52 -0800722 packetOutMap = hazelcastInstance.getMap(packetOutMapName);
723 packetOutMap.addEntryListener(new PacketOutMapListener(), true);
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800724
Jonathan Hart7804bea2014-01-07 10:50:52 -0800725 arpReplyMap = hazelcastInstance.getMap(arpReplyMapName);
726 arpReplyMap.addEntryListener(new ArpReplyMapListener(), true);
Toshio Koide3738ee52014-02-12 14:57:39 -0800727 intentList = hazelcastInstance.getList(intentListName);
728
729
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700730 }
731
732 /**
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800733 * Create an event channel.
734 *
735 * If the channel already exists, just return it.
736 * NOTE: The channel is started automatically.
737 *
738 * @param channelName the event channel name.
739 * @param typeK the type of the Key in the Key-Value store.
740 * @param typeV the type of the Value in the Key-Value store.
741 * @return the event channel for the channel name.
742 */
743 @Override
744 public <K, V> IEventChannel<K, V> createChannel(String channelName,
745 Class<K> typeK, Class<V> typeV) {
746 IEventChannel<K, V> eventChannel =
747 createChannelImpl(channelName, typeK, typeV);
748 eventChannel.startup();
749
750 return eventChannel;
751 }
752
753 /**
754 * Create an event channel implementation.
755 *
756 * If the channel already exists, just return it.
757 * NOTE: The caller must call IEventChannel.startup() to startup the
758 * channel operation.
759 *
760 * @param channelName the event channel name.
761 * @param typeK the type of the Key in the Key-Value store.
762 * @param typeV the type of the Value in the Key-Value store.
763 * @return the event channel for the channel name.
764 */
765 private <K, V> IEventChannel<K, V> createChannelImpl(String channelName,
766 Class<K> typeK, Class<V> typeV) {
767 IEventChannel<K, V> castedEventChannel;
768 IEventChannel<?, ?> genericEventChannel =
769 eventChannels.get(channelName);
770
771 // Add the channel if the first listener
772 if (genericEventChannel == null) {
773 castedEventChannel =
774 new HazelcastEventChannel<K, V>(hazelcastInstance,
775 channelName, typeK, typeV);
776 eventChannels.put(channelName, castedEventChannel);
777 } else {
778 //
779 // TODO: Find if we can use Java internal support to check for
780 // type mismatch.
781 //
782 if (! genericEventChannel.verifyKeyValueTypes(typeK, typeV)) {
783 throw new ClassCastException("Key-value type mismatch for event channel " + channelName);
784 }
785 castedEventChannel = (IEventChannel<K, V>)genericEventChannel;
786 }
787
788 return castedEventChannel;
789 }
790
791 /**
792 * Add event channel listener.
793 *
794 * NOTE: The channel is started automatically right after the listener
795 * is added.
796 *
797 * @param channelName the event channel name.
798 * @param listener the listener to add.
799 * @param typeK the type of the Key in the Key-Value store.
800 * @param typeV the type of the Value in the Key-Value store.
801 * @return the event channel for the channel name.
802 */
803 @Override
804 public <K, V> IEventChannel<K, V> addListener(String channelName,
805 IEventChannelListener<K, V> listener,
806 Class<K> typeK, Class<V> typeV) {
807 IEventChannel<K, V> eventChannel =
808 createChannelImpl(channelName, typeK, typeV);
809 eventChannel.addListener(listener);
810 eventChannel.startup();
811
812 return eventChannel;
813 }
814
815 /**
816 * Remove event channel listener.
817 *
818 * @param channelName the event channel name.
819 * @param listener the listener to remove.
820 */
821 @Override
822 public <K, V> void removeListener(String channelName,
823 IEventChannelListener<K, V> listener) {
824 IEventChannel<K, V> castedEventChannel;
825 IEventChannel<?, ?> genericEventChannel =
826 eventChannels.get(channelName);
827
828 if (genericEventChannel != null) {
829 //
830 // TODO: Find if we can use Java internal support to check for
831 // type mismatch.
832 // NOTE: Using "ClassCastException" exception below doesn't work.
833 //
834 castedEventChannel = (IEventChannel<K, V>)genericEventChannel;
835 castedEventChannel.removeListener(listener);
836 }
837 }
838
839 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700840 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700841 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700842 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700843 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700844 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700845 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700846 */
847 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700848 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
849 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700850
851 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700852 mapFlowListener = new MapFlowListener();
853 mapFlow = hazelcastInstance.getMap(mapFlowName);
854 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700855
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700856 // Initialize the FlowEntry-related map state
857 mapFlowEntryListener = new MapFlowEntryListener();
858 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
859 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
860
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800861 // Initialize the FlowId-related map state
862 mapFlowIdListener = new MapFlowIdListener();
863 mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
864 mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
865
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800866 // Initialize the FlowEntryId-related map state
867 mapFlowEntryIdListener = new MapFlowEntryIdListener();
868 mapFlowEntryId = hazelcastInstance.getMap(mapFlowEntryIdName);
869 mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);
870
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700871 // Initialize the Topology-related map state
872 mapTopologyListener = new MapTopologyListener();
873 mapTopology = hazelcastInstance.getMap(mapTopologyName);
874 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700875 }
876
877 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700878 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700879 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700880 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700881 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700882 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700883 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700884 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700885 */
886 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700887 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700888 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700889 mapFlow.removeEntryListener(mapFlowListenerId);
890 mapFlow = null;
891 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700892
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700893 // Clear the FlowEntry-related map state
894 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
895 mapFlowEntry = null;
896 mapFlowEntryListener = null;
897
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800898 // Clear the FlowId-related map state
899 mapFlowId.removeEntryListener(mapFlowIdListenerId);
900 mapFlowId = null;
901 mapFlowIdListener = null;
902
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800903 // Clear the FlowEntryId-related map state
904 mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
905 mapFlowEntryId = null;
906 mapFlowEntryIdListener = null;
907
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700908 // Clear the Topology-related map state
909 mapTopology.removeEntryListener(mapTopologyListenerId);
910 mapTopology = null;
911 mapTopologyListener = null;
912
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700913 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700914 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800915
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800916 @Override
Jonathan Hartc6325622014-01-14 16:37:50 -0800917 public void registerPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
918 if (packetOutEventHandler != null) {
919 packetOutEventHandlers.add(packetOutEventHandler);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800920 }
921 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800922
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800923 @Override
Jonathan Hartc6325622014-01-14 16:37:50 -0800924 public void deregisterPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
925 packetOutEventHandlers.remove(packetOutEventHandler);
Jonathan Hart7804bea2014-01-07 10:50:52 -0800926 }
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800927
Jonathan Hart7804bea2014-01-07 10:50:52 -0800928 @Override
929 public void registerArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
930 if (arpReplyEventHandler != null) {
931 arpReplyEventHandlers.add(arpReplyEventHandler);
932 }
933 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800934
Jonathan Hart7804bea2014-01-07 10:50:52 -0800935 @Override
936 public void deregisterArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
937 arpReplyEventHandlers.remove(arpReplyEventHandler);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800938 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800939
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700940 /**
941 * Get all Flows that are currently in the datagrid.
942 *
943 * @return all Flows that are currently in the datagrid.
944 */
945 @Override
946 public Collection<FlowPath> getAllFlows() {
947 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
948
949 //
950 // Get all current entries
951 //
952 Collection<byte[]> values = mapFlow.values();
953 Kryo kryo = kryoFactory.newKryo();
954 for (byte[] valueBytes : values) {
955 //
956 // Decode the value
957 //
958 Input input = new Input(valueBytes);
959 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
960 allFlows.add(flowPath);
961 }
962 kryoFactory.deleteKryo(kryo);
963
964 return allFlows;
965 }
966
967 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800968 * Get a Flow for a given Flow ID.
969 *
970 * @param flowId the Flow ID of the Flow to get.
971 * @return the Flow if found, otherwise null.
972 */
973 @Override
974 public FlowPath getFlow(FlowId flowId) {
975 byte[] valueBytes = mapFlow.get(flowId.value());
976 if (valueBytes == null)
977 return null;
978
979 Kryo kryo = kryoFactory.newKryo();
980 //
981 // Decode the value
982 //
983 Input input = new Input(valueBytes);
984 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
985 kryoFactory.deleteKryo(kryo);
986
987 return flowPath;
988 }
989
990 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700991 * Send a notification that a Flow is added.
992 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700993 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700994 */
995 @Override
996 public void notificationSendFlowAdded(FlowPath flowPath) {
997 //
998 // Encode the value
999 //
1000 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1001 Kryo kryo = kryoFactory.newKryo();
1002 Output output = new Output(buffer, -1);
1003 kryo.writeObject(output, flowPath);
1004 byte[] valueBytes = output.toBytes();
1005 kryoFactory.deleteKryo(kryo);
1006
1007 //
1008 // Put the entry:
1009 // - Key : Flow ID (Long)
1010 // - Value : Serialized Flow (byte[])
1011 //
1012 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
1013 }
1014
1015 /**
1016 * Send a notification that a Flow is removed.
1017 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001018 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07001019 */
1020 @Override
1021 public void notificationSendFlowRemoved(FlowId flowId) {
1022 //
1023 // Remove the entry:
1024 // - Key : Flow ID (Long)
1025 // - Value : Serialized Flow (byte[])
1026 //
1027 mapFlow.removeAsync(flowId.value());
1028 }
1029
1030 /**
1031 * Send a notification that a Flow is updated.
1032 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001033 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07001034 */
1035 @Override
1036 public void notificationSendFlowUpdated(FlowPath flowPath) {
1037 // NOTE: Adding an entry with an existing key automatically updates it
1038 notificationSendFlowAdded(flowPath);
1039 }
1040
1041 /**
1042 * Send a notification that all Flows are removed.
1043 */
1044 @Override
1045 public void notificationSendAllFlowsRemoved() {
1046 //
1047 // Remove all entries
1048 // NOTE: We remove the entries one-by-one so the per-entry
1049 // notifications will be delivered.
1050 //
1051 // mapFlow.clear();
1052 Set<Long> keySet = mapFlow.keySet();
1053 for (Long key : keySet) {
1054 mapFlow.removeAsync(key);
1055 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001056 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001057
1058 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001059 * Get all Flow Entries that are currently in the datagrid.
1060 *
1061 * @return all Flow Entries that are currently in the datagrid.
1062 */
1063 @Override
1064 public Collection<FlowEntry> getAllFlowEntries() {
1065 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
1066
1067 //
1068 // Get all current entries
1069 //
1070 Collection<byte[]> values = mapFlowEntry.values();
1071 Kryo kryo = kryoFactory.newKryo();
1072 for (byte[] valueBytes : values) {
1073 //
1074 // Decode the value
1075 //
1076 Input input = new Input(valueBytes);
1077 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
1078 allFlowEntries.add(flowEntry);
1079 }
1080 kryoFactory.deleteKryo(kryo);
1081
1082 return allFlowEntries;
1083 }
1084
1085 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -08001086 * Get a Flow Entry for a given Flow Entry ID.
1087 *
1088 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
1089 * @return the Flow Entry if found, otherwise null.
1090 */
1091 @Override
1092 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
1093 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
1094 if (valueBytes == null)
1095 return null;
1096
1097 Kryo kryo = kryoFactory.newKryo();
1098 //
1099 // Decode the value
1100 //
1101 Input input = new Input(valueBytes);
1102 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
1103 kryoFactory.deleteKryo(kryo);
1104
1105 return flowEntry;
1106 }
1107
1108 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001109 * Send a notification that a FlowEntry is added.
1110 *
1111 * @param flowEntry the FlowEntry that is added.
1112 */
1113 @Override
1114 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
1115 //
1116 // Encode the value
1117 //
1118 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1119 Kryo kryo = kryoFactory.newKryo();
1120 Output output = new Output(buffer, -1);
1121 kryo.writeObject(output, flowEntry);
1122 byte[] valueBytes = output.toBytes();
1123 kryoFactory.deleteKryo(kryo);
1124
1125 //
1126 // Put the entry:
1127 // - Key : FlowEntry ID (Long)
1128 // - Value : Serialized FlowEntry (byte[])
1129 //
1130 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
1131 }
1132
1133 /**
1134 * Send a notification that a FlowEntry is removed.
1135 *
1136 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
1137 */
1138 @Override
1139 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
1140 //
1141 // Remove the entry:
1142 // - Key : FlowEntry ID (Long)
1143 // - Value : Serialized FlowEntry (byte[])
1144 //
1145 mapFlowEntry.removeAsync(flowEntryId.value());
1146 }
1147
1148 /**
1149 * Send a notification that a FlowEntry is updated.
1150 *
1151 * @param flowEntry the FlowEntry that is updated.
1152 */
1153 @Override
1154 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
1155 // NOTE: Adding an entry with an existing key automatically updates it
1156 notificationSendFlowEntryAdded(flowEntry);
1157 }
1158
1159 /**
1160 * Send a notification that all Flow Entries are removed.
1161 */
1162 @Override
1163 public void notificationSendAllFlowEntriesRemoved() {
1164 //
1165 // Remove all entries
1166 // NOTE: We remove the entries one-by-one so the per-entry
1167 // notifications will be delivered.
1168 //
1169 // mapFlowEntry.clear();
1170 Set<Long> keySet = mapFlowEntry.keySet();
1171 for (Long key : keySet) {
1172 mapFlowEntry.removeAsync(key);
1173 }
1174 }
1175
1176 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001177 * Get all Flow IDs that are currently in the datagrid.
1178 *
1179 * @return all Flow IDs that are currently in the datagrid.
1180 */
1181 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001182 public Collection<Pair<FlowId, Dpid>> getAllFlowIds() {
1183 Collection<Pair<FlowId, Dpid>> allFlowIds =
1184 new LinkedList<Pair<FlowId, Dpid>>();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001185
1186 //
1187 // Get all current entries
1188 //
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001189 Kryo kryo = kryoFactory.newKryo();
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001190 for (Map.Entry<Long, byte[]> entry : mapFlowId.entrySet()) {
1191 Long key = entry.getKey();
1192 byte[] valueBytes = entry.getValue();
1193
1194 FlowId flowId = new FlowId(key);
1195
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001196 //
1197 // Decode the value
1198 //
1199 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001200 Dpid dpid = kryo.readObject(input, Dpid.class);
1201
1202 Pair<FlowId, Dpid> pair = new Pair(flowId, dpid);
1203 allFlowIds.add(pair);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001204 }
1205 kryoFactory.deleteKryo(kryo);
1206
1207 return allFlowIds;
1208 }
1209
1210 /**
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001211 * Get all Flow Entry IDs that are currently in the datagrid.
1212 *
1213 * @return all Flow Entry IDs that ae currently in the datagrid.
1214 */
1215 @Override
1216 public Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds() {
1217 Collection<Pair<FlowEntryId, Dpid>> allFlowEntryIds =
1218 new LinkedList<Pair<FlowEntryId, Dpid>>();
1219
1220 //
1221 // Get all current entries
1222 //
1223 Kryo kryo = kryoFactory.newKryo();
1224 for (Map.Entry<Long, byte[]> entry : mapFlowEntryId.entrySet()) {
1225 Long key = entry.getKey();
1226 byte[] valueBytes = entry.getValue();
1227
1228 FlowEntryId flowEntryId = new FlowEntryId(key);
1229
1230 //
1231 // Decode the value
1232 //
1233 Input input = new Input(valueBytes);
1234 Dpid dpid = kryo.readObject(input, Dpid.class);
1235
1236 Pair<FlowEntryId, Dpid> pair = new Pair(flowEntryId, dpid);
1237 allFlowEntryIds.add(pair);
1238 }
1239 kryoFactory.deleteKryo(kryo);
1240
1241 return allFlowEntryIds;
1242 }
1243
1244 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001245 * Send a notification that a FlowId is added.
1246 *
1247 * @param flowId the FlowId that is added.
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001248 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001249 */
1250 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001251 public void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid) {
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001252 //
1253 // Encode the value
1254 //
1255 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1256 Kryo kryo = kryoFactory.newKryo();
1257 Output output = new Output(buffer, -1);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001258 kryo.writeObject(output, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001259 byte[] valueBytes = output.toBytes();
1260 kryoFactory.deleteKryo(kryo);
1261
1262 //
1263 // Put the entry:
1264 // - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001265 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001266 //
1267 mapFlowId.putAsync(flowId.value(), valueBytes);
1268 }
1269
1270 /**
1271 * Send a notification that a FlowId is removed.
1272 *
1273 * @param flowId the FlowId that is removed.
1274 */
1275 @Override
1276 public void notificationSendFlowIdRemoved(FlowId flowId) {
1277 //
1278 // Remove the entry:
1279 // - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001280 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001281 //
1282 mapFlowId.removeAsync(flowId.value());
1283 }
1284
1285 /**
1286 * Send a notification that a FlowId is updated.
1287 *
1288 * @param flowId the FlowId that is updated.
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001289 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001290 */
1291 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001292 public void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid) {
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001293 // NOTE: Adding an entry with an existing key automatically updates it
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001294 notificationSendFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001295 }
1296
1297 /**
1298 * Send a notification that all Flow IDs are removed.
1299 */
1300 @Override
1301 public void notificationSendAllFlowIdsRemoved() {
1302 //
1303 // Remove all entries
1304 // NOTE: We remove the entries one-by-one so the per-entry
1305 // notifications will be delivered.
1306 //
1307 // mapFlowId.clear();
1308 Set<Long> keySet = mapFlowId.keySet();
1309 for (Long key : keySet) {
1310 mapFlowId.removeAsync(key);
1311 }
1312 }
1313
1314 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001315 * Send a notification that a FlowEntryId is added.
1316 *
1317 * @param flowEntryId the FlowEntryId that is added.
1318 * @param dpid the Switch Dpid.
1319 */
1320 @Override
1321 public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
1322 Dpid dpid) {
1323 //
1324 // Encode the value
1325 //
1326 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1327 Kryo kryo = kryoFactory.newKryo();
1328 Output output = new Output(buffer, -1);
1329 kryo.writeObject(output, dpid);
1330 byte[] valueBytes = output.toBytes();
1331 kryoFactory.deleteKryo(kryo);
1332
1333 //
1334 // Put the entry:
1335 // - Key : FlowEntryId (Long)
1336 // - Value : Serialized Switch Dpid (byte[])
1337 //
1338 mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
1339 }
1340
1341 /**
1342 * Send a notification that a FlowEntryId is removed.
1343 *
1344 * @param flowEntryId the FlowEntryId that is removed.
1345 */
1346 @Override
1347 public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
1348 //
1349 // Remove the entry:
1350 // - Key : FlowEntryId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001351 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001352 //
1353 mapFlowEntryId.removeAsync(flowEntryId.value());
1354 }
1355
1356 /**
1357 * Send a notification that a FlowEntryId is updated.
1358 *
1359 * @param flowEntryId the FlowEntryId that is updated.
1360 * @param dpid the Switch Dpid.
1361 */
1362 @Override
1363 public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
1364 Dpid dpid) {
1365 // NOTE: Adding an entry with an existing key automatically updates it
1366 notificationSendFlowEntryIdAdded(flowEntryId, dpid);
1367 }
1368
1369 /**
1370 * Send a notification that all Flow Entry IDs are removed.
1371 */
1372 @Override
1373 public void notificationSendAllFlowEntryIdsRemoved() {
1374 //
1375 // Remove all entries
1376 // NOTE: We remove the entries one-by-one so the per-entry
1377 // notifications will be delivered.
1378 //
1379 // mapFlowEntryId.clear();
1380 Set<Long> keySet = mapFlowEntryId.keySet();
1381 for (Long key : keySet) {
1382 mapFlowEntryId.removeAsync(key);
1383 }
1384 }
1385
1386 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001387 * Get all Topology Elements that are currently in the datagrid.
1388 *
1389 * @return all Topology Elements that are currently in the datagrid.
1390 */
1391 @Override
1392 public Collection<TopologyElement> getAllTopologyElements() {
1393 Collection<TopologyElement> allTopologyElements =
1394 new LinkedList<TopologyElement>();
1395
1396 //
1397 // Get all current entries
1398 //
1399 Collection<byte[]> values = mapTopology.values();
1400 Kryo kryo = kryoFactory.newKryo();
1401 for (byte[] valueBytes : values) {
1402 //
1403 // Decode the value
1404 //
1405 Input input = new Input(valueBytes);
1406 TopologyElement topologyElement =
1407 kryo.readObject(input, TopologyElement.class);
1408 allTopologyElements.add(topologyElement);
1409 }
1410 kryoFactory.deleteKryo(kryo);
1411
1412 return allTopologyElements;
1413 }
1414
1415 /**
1416 * Send a notification that a Topology Element is added.
1417 *
1418 * @param topologyElement the Topology Element that is added.
1419 */
1420 @Override
1421 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
1422 //
1423 // Encode the value
1424 //
1425 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1426 Kryo kryo = kryoFactory.newKryo();
1427 Output output = new Output(buffer, -1);
1428 kryo.writeObject(output, topologyElement);
1429 byte[] valueBytes = output.toBytes();
1430 kryoFactory.deleteKryo(kryo);
1431
1432 //
1433 // Put the entry:
1434 // - Key : TopologyElement ID (String)
1435 // - Value : Serialized TopologyElement (byte[])
1436 //
1437 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
1438 }
1439
1440 /**
1441 * Send a notification that a Topology Element is removed.
1442 *
1443 * @param topologyElement the Topology Element that is removed.
1444 */
1445 @Override
1446 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
1447 //
1448 // Remove the entry:
1449 // - Key : TopologyElement ID (String)
1450 // - Value : Serialized TopologyElement (byte[])
1451 //
1452 mapTopology.removeAsync(topologyElement.elementId());
1453 }
1454
1455 /**
1456 * Send a notification that a Topology Element is updated.
1457 *
1458 * @param topologyElement the Topology Element that is updated.
1459 */
1460 @Override
1461 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
1462 // NOTE: Adding an entry with an existing key automatically updates it
1463 notificationSendTopologyElementAdded(topologyElement);
1464 }
1465
1466 /**
1467 * Send a notification that all Topology Elements are removed.
1468 */
1469 @Override
1470 public void notificationSendAllTopologyElementsRemoved() {
1471 //
1472 // Remove all entries
1473 // NOTE: We remove the entries one-by-one so the per-entry
1474 // notifications will be delivered.
1475 //
1476 // mapTopology.clear();
1477 Set<String> keySet = mapTopology.keySet();
1478 for (String key : keySet) {
1479 mapTopology.removeAsync(key);
1480 }
1481 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -08001482
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001483 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -08001484 public void sendPacketOutNotification(PacketOutNotification packetOutNotification) {
1485 packetOutMap.putAsync(packetOutNotification, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001486 }
Jonathan Hart7804bea2014-01-07 10:50:52 -08001487
1488 @Override
1489 public void sendArpReplyNotification(ArpReplyNotification arpReply) {
1490 arpReplyMap.putAsync(arpReply, dummyByte, 1L, TimeUnit.MILLISECONDS);
1491 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001492}