blob: e80ef56fdcf6261f391e5e6cbcb1ba3f21225ecc [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hart7804bea2014-01-07 10:50:52 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
22import net.onrc.onos.ofcontroller.proxyarp.IArpReplyEventHandler;
23import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
24import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070025import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080026import net.onrc.onos.ofcontroller.util.Dpid;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070027import net.onrc.onos.ofcontroller.util.FlowEntry;
28import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070029import net.onrc.onos.ofcontroller.util.FlowId;
30import net.onrc.onos.ofcontroller.util.FlowPath;
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -080031import net.onrc.onos.ofcontroller.util.Pair;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070032import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
33
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070034import org.slf4j.Logger;
35import org.slf4j.LoggerFactory;
36
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -080037import com.esotericsoftware.kryo.Kryo;
38import com.esotericsoftware.kryo.io.Input;
39import com.esotericsoftware.kryo.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070040import com.hazelcast.config.Config;
41import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070042import com.hazelcast.core.EntryEvent;
43import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070044import com.hazelcast.core.Hazelcast;
45import com.hazelcast.core.HazelcastInstance;
Toshio Koide3738ee52014-02-12 14:57:39 -080046import com.hazelcast.core.IList;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070047import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070048import com.hazelcast.instance.GroupProperties;
Toshio Koide3738ee52014-02-12 14:57:39 -080049import net.onrc.onos.intent.Intent;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070050
Pavlin Radoslavove2497672014-01-12 18:03:35 -080051import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor;
52
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070053/**
54 * A datagrid service that uses Hazelcast as a datagrid.
55 * The relevant data is stored in the Hazelcast datagrid and shared as
56 * appropriate in a multi-node cluster.
57 */
58public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070059 private final static int MAX_BUFFER_SIZE = 64*1024;
60
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070061 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070062 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070063 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070064
65 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070066 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070067 private Config hazelcastConfig = null;
68
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070069 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070070 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070071
72 // State related to the Flow map
73 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070074 private IMap<Long, byte[]> mapFlow = null;
75 private MapFlowListener mapFlowListener = null;
76 private String mapFlowListenerId = null;
77
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070078 // State related to the Flow Entry map
79 protected static final String mapFlowEntryName = "mapFlowEntry";
80 private IMap<Long, byte[]> mapFlowEntry = null;
81 private MapFlowEntryListener mapFlowEntryListener = null;
82 private String mapFlowEntryListenerId = null;
83
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -080084 // State related to the Flow ID map
85 protected static final String mapFlowIdName = "mapFlowId";
86 private IMap<Long, byte[]> mapFlowId = null;
87 private MapFlowIdListener mapFlowIdListener = null;
88 private String mapFlowIdListenerId = null;
89
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080090 // State related to the Flow Entry ID map
91 protected static final String mapFlowEntryIdName = "mapFlowEntryId";
92 private IMap<Long, byte[]> mapFlowEntryId = null;
93 private MapFlowEntryIdListener mapFlowEntryIdListener = null;
94 private String mapFlowEntryIdListenerId = null;
95
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070096 // State related to the Network Topology map
97 protected static final String mapTopologyName = "mapTopology";
98 private IMap<String, byte[]> mapTopology = null;
99 private MapTopologyListener mapTopologyListener = null;
100 private String mapTopologyListenerId = null;
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800101
Jonathan Hart7804bea2014-01-07 10:50:52 -0800102 // State related to the packet out map
103 protected static final String packetOutMapName = "packetOutMap";
104 private IMap<PacketOutNotification, byte[]> packetOutMap = null;
105 private List<IPacketOutEventHandler> packetOutEventHandlers = new ArrayList<IPacketOutEventHandler>();
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800106
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800107 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700108
Jonathan Hart7804bea2014-01-07 10:50:52 -0800109 // State related to the ARP reply map
110 protected static final String arpReplyMapName = "arpReplyMap";
111 private IMap<ArpReplyNotification, byte[]> arpReplyMap = null;
112 private List<IArpReplyEventHandler> arpReplyEventHandlers = new ArrayList<IArpReplyEventHandler>();
Toshio Koide3738ee52014-02-12 14:57:39 -0800113
114
115 protected static final String intentListName = "intentList";
116 private IList<Intent> intentList = null;
117
118 @Override
119 public void registerIntent(Collection<Intent> intents) {
120 intentList.addAll(intents);
121 }
122
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700123
124 /**
125 * Class for receiving notifications for Flow state.
126 *
127 * The datagrid map is:
128 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -0800129 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700130 */
131 class MapFlowListener implements EntryListener<Long, byte[]> {
132 /**
133 * Receive a notification that an entry is added.
134 *
135 * @param event the notification event for the entry.
136 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800137 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800138 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800139 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700140
141 //
142 // Decode the value and deliver the notification
143 //
144 Kryo kryo = kryoFactory.newKryo();
145 Input input = new Input(valueBytes);
146 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
147 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700148 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700149 }
150
151 /**
152 * Receive a notification that an entry is removed.
153 *
154 * @param event the notification event for the entry.
155 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800156 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800157 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800158 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700159
160 //
161 // Decode the value and deliver the notification
162 //
163 Kryo kryo = kryoFactory.newKryo();
164 Input input = new Input(valueBytes);
165 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
166 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700167 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700168 }
169
170 /**
171 * Receive a notification that an entry is updated.
172 *
173 * @param event the notification event for the entry.
174 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800175 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800176 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800177 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700178
179 //
180 // Decode the value and deliver the notification
181 //
182 Kryo kryo = kryoFactory.newKryo();
183 Input input = new Input(valueBytes);
184 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
185 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700186 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700187 }
188
189 /**
190 * Receive a notification that an entry is evicted.
191 *
192 * @param event the notification event for the entry.
193 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800194 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800195 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700196 // NOTE: We don't use eviction for this map
197 }
198 }
199
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700200 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700201 * Class for receiving notifications for FlowEntry state.
202 *
203 * The datagrid map is:
204 * - Key : FlowEntry ID (Long)
205 * - Value : Serialized FlowEntry (byte[])
206 */
207 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
208 /**
209 * Receive a notification that an entry is added.
210 *
211 * @param event the notification event for the entry.
212 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800213 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800214 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800215 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700216
217 //
218 // Decode the value and deliver the notification
219 //
220 Kryo kryo = kryoFactory.newKryo();
221 Input input = new Input(valueBytes);
222 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
223 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700224 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700225 }
226
227 /**
228 * Receive a notification that an entry is removed.
229 *
230 * @param event the notification event for the entry.
231 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800232 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800233 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800234 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700235
236 //
237 // Decode the value and deliver the notification
238 //
239 Kryo kryo = kryoFactory.newKryo();
240 Input input = new Input(valueBytes);
241 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
242 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700243 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700244 }
245
246 /**
247 * Receive a notification that an entry is updated.
248 *
249 * @param event the notification event for the entry.
250 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800251 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800252 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800253 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700254
255 //
256 // Decode the value and deliver the notification
257 //
258 Kryo kryo = kryoFactory.newKryo();
259 Input input = new Input(valueBytes);
260 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
261 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700262 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700263 }
264
265 /**
266 * Receive a notification that an entry is evicted.
267 *
268 * @param event the notification event for the entry.
269 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800270 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800271 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700272 // NOTE: We don't use eviction for this map
273 }
274 }
275
276 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800277 * Class for receiving notifications for FlowId state.
278 *
279 * The datagrid map is:
280 * - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800281 * - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800282 */
283 class MapFlowIdListener implements EntryListener<Long, byte[]> {
284 /**
285 * Receive a notification that an entry is added.
286 *
287 * @param event the notification event for the entry.
288 */
289 public void entryAdded(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800290 Long keyLong = event.getKey();
291 FlowId flowId = new FlowId(keyLong);
292
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800293 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800294
295 //
296 // Decode the value and deliver the notification
297 //
298 Kryo kryo = kryoFactory.newKryo();
299 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800300 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800301 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800302 flowEventHandlerService.notificationRecvFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800303 }
304
305 /**
306 * Receive a notification that an entry is removed.
307 *
308 * @param event the notification event for the entry.
309 */
310 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800311 Long keyLong = event.getKey();
312 FlowId flowId = new FlowId(keyLong);
313
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800314 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800315
316 //
317 // Decode the value and deliver the notification
318 //
319 Kryo kryo = kryoFactory.newKryo();
320 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800321 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800322 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800323 flowEventHandlerService.notificationRecvFlowIdRemoved(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800324 }
325
326 /**
327 * Receive a notification that an entry is updated.
328 *
329 * @param event the notification event for the entry.
330 */
331 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800332 Long keyLong = event.getKey();
333 FlowId flowId = new FlowId(keyLong);
334
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800335 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800336
337 //
338 // Decode the value and deliver the notification
339 //
340 Kryo kryo = kryoFactory.newKryo();
341 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800342 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800343 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800344 flowEventHandlerService.notificationRecvFlowIdUpdated(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800345 }
346
347 /**
348 * Receive a notification that an entry is evicted.
349 *
350 * @param event the notification event for the entry.
351 */
352 public void entryEvicted(EntryEvent<Long, byte[]> event) {
353 // NOTE: We don't use eviction for this map
354 }
355 }
356
357 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800358 * Class for receiving notifications for FlowEntryId state.
359 *
360 * The datagrid map is:
361 * - Key : FlowEntryId (Long)
362 * - Value : Serialized Switch Dpid (byte[])
363 */
364 class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
365 /**
366 * Receive a notification that an entry is added.
367 *
368 * @param event the notification event for the entry.
369 */
370 public void entryAdded(EntryEvent<Long, byte[]> event) {
371 Long keyLong = event.getKey();
372 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
373
374 byte[] valueBytes = event.getValue();
375
376 //
377 // Decode the value and deliver the notification
378 //
379 Kryo kryo = kryoFactory.newKryo();
380 Input input = new Input(valueBytes);
381 Dpid dpid = kryo.readObject(input, Dpid.class);
382 kryoFactory.deleteKryo(kryo);
383 flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
384 }
385
386 /**
387 * Receive a notification that an entry is removed.
388 *
389 * @param event the notification event for the entry.
390 */
391 public void entryRemoved(EntryEvent<Long, byte[]> event) {
392 Long keyLong = event.getKey();
393 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
394
395 byte[] valueBytes = event.getValue();
396
397 //
398 // Decode the value and deliver the notification
399 //
400 Kryo kryo = kryoFactory.newKryo();
401 Input input = new Input(valueBytes);
402 Dpid dpid = kryo.readObject(input, Dpid.class);
403 kryoFactory.deleteKryo(kryo);
404 flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
405 }
406
407 /**
408 * Receive a notification that an entry is updated.
409 *
410 * @param event the notification event for the entry.
411 */
412 public void entryUpdated(EntryEvent<Long, byte[]> event) {
413 Long keyLong = event.getKey();
414 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
415
416 byte[] valueBytes = event.getValue();
417
418 //
419 // Decode the value and deliver the notification
420 //
421 Kryo kryo = kryoFactory.newKryo();
422 Input input = new Input(valueBytes);
423 Dpid dpid = kryo.readObject(input, Dpid.class);
424 kryoFactory.deleteKryo(kryo);
425 flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
426 }
427
428 /**
429 * Receive a notification that an entry is evicted.
430 *
431 * @param event the notification event for the entry.
432 */
433 public void entryEvicted(EntryEvent<Long, byte[]> event) {
434 // NOTE: We don't use eviction for this map
435 }
436 }
437
438 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700439 * Class for receiving notifications for Network Topology state.
440 *
441 * The datagrid map is:
442 * - Key: TopologyElement ID (String)
443 * - Value: Serialized TopologyElement (byte[])
444 */
445 class MapTopologyListener implements EntryListener<String, byte[]> {
446 /**
447 * Receive a notification that an entry is added.
448 *
449 * @param event the notification event for the entry.
450 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800451 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800452 public void entryAdded(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800453 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700454
455 //
456 // Decode the value and deliver the notification
457 //
458 Kryo kryo = kryoFactory.newKryo();
459 Input input = new Input(valueBytes);
460 TopologyElement topologyElement =
461 kryo.readObject(input, TopologyElement.class);
462 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700463 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700464 }
465
466 /**
467 * Receive a notification that an entry is removed.
468 *
469 * @param event the notification event for the entry.
470 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800471 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800472 public void entryRemoved(EntryEvent<String, byte[]> event) {
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800473// String tag = "TopologyEntryRemoved.NotificationReceived." + event.getKey();
474 String tag = "TopologyEntryRemoved.NotificationReceived";
475 PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800476 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700477
478 //
479 // Decode the value and deliver the notification
480 //
481 Kryo kryo = kryoFactory.newKryo();
482 Input input = new Input(valueBytes);
483 TopologyElement topologyElement =
484 kryo.readObject(input, TopologyElement.class);
485 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700486 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800487// PerformanceMonitor.stop(tag);
488 m.stop();
489// PerformanceMonitor.report(tag);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700490 }
491
492 /**
493 * Receive a notification that an entry is updated.
494 *
495 * @param event the notification event for the entry.
496 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800497 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800498 public void entryUpdated(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800499 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700500
501 //
502 // Decode the value and deliver the notification
503 //
504 Kryo kryo = kryoFactory.newKryo();
505 Input input = new Input(valueBytes);
506 TopologyElement topologyElement =
507 kryo.readObject(input, TopologyElement.class);
508 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700509 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700510 }
511
512 /**
513 * Receive a notification that an entry is evicted.
514 *
515 * @param event the notification event for the entry.
516 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800517 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800518 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700519 // NOTE: We don't use eviction for this map
520 }
521 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800522
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800523 /**
Jonathan Hart7804bea2014-01-07 10:50:52 -0800524 * Class for receiving notifications for sending packet-outs.
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800525 *
526 * The datagrid map is:
Jonathan Hart7804bea2014-01-07 10:50:52 -0800527 * - Key: Packet-out to send (PacketOutNotification)
528 * - Value: dummy value (we only need the key) (byte[])
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800529 */
Jonathan Hart7804bea2014-01-07 10:50:52 -0800530 class PacketOutMapListener implements EntryListener<PacketOutNotification, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800531 /**
532 * Receive a notification that an entry is added.
533 *
534 * @param event the notification event for the entry.
535 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800536 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800537 public void entryAdded(EntryEvent<PacketOutNotification, byte[]> event) {
538 for (IPacketOutEventHandler packetOutEventHandler : packetOutEventHandlers) {
539 packetOutEventHandler.packetOutNotification(event.getKey());
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800540 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800541 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800542
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800543 /**
544 * Receive a notification that an entry is removed.
545 *
546 * @param event the notification event for the entry.
547 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800548 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800549 public void entryRemoved(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800550 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800551 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800552
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800553 /**
554 * Receive a notification that an entry is updated.
555 *
556 * @param event the notification event for the entry.
557 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800558 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800559 public void entryUpdated(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800560 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800561 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800562
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800563 /**
564 * Receive a notification that an entry is evicted.
565 *
566 * @param event the notification event for the entry.
567 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800568 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800569 public void entryEvicted(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800570 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800571 }
572 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700573
574 /**
Jonathan Hart7804bea2014-01-07 10:50:52 -0800575 * Class for receiving notifications for sending packet-outs.
576 *
577 * The datagrid map is:
578 * - Key: Packet-out to send (PacketOutNotification)
579 * - Value: dummy value (we only need the key) (byte[])
580 */
581 class ArpReplyMapListener implements EntryListener<ArpReplyNotification, byte[]> {
582 /**
583 * Receive a notification that an entry is added.
584 *
585 * @param event the notification event for the entry.
586 */
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800587 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800588 public void entryAdded(EntryEvent<ArpReplyNotification, byte[]> event) {
589 for (IArpReplyEventHandler arpReplyEventHandler : arpReplyEventHandlers) {
590 arpReplyEventHandler.arpReplyEvent(event.getKey());
591 }
592 }
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800593
Jonathan Hart7804bea2014-01-07 10:50:52 -0800594 // These methods aren't used for ARP replies
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800595 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800596 public void entryRemoved(EntryEvent<ArpReplyNotification, byte[]> event) {}
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800597 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800598 public void entryUpdated(EntryEvent<ArpReplyNotification, byte[]> event) {}
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800599 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800600 public void entryEvicted(EntryEvent<ArpReplyNotification, byte[]> event) {}
601 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700602
603 /**
604 * Initialize the Hazelcast Datagrid operation.
605 *
606 * @param conf the configuration filename.
607 */
608 public void init(String configFilename) {
609 /*
610 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
611 System.setProperty("hazelcast.socket.send.buffer.size", "32");
612 */
613 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800614
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700615 // Init from configuration file
616 try {
617 hazelcastConfig = new FileSystemXmlConfig(configFilename);
618 } catch (FileNotFoundException e) {
619 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
620 }
621 /*
622 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
623 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
624 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
625 */
626 //
627 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
628 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
629 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
630 }
631
632 /**
633 * Shutdown the Hazelcast Datagrid operation.
634 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800635 @Override
636 protected void finalize() {
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700637 close();
638 }
639
640 /**
641 * Shutdown the Hazelcast Datagrid operation.
642 */
643 public void close() {
644 Hazelcast.shutdownAll();
645 }
646
647 /**
648 * Get the collection of offered module services.
649 *
650 * @return the collection of offered module services.
651 */
652 @Override
653 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800654 Collection<Class<? extends IFloodlightService>> l =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700655 new ArrayList<Class<? extends IFloodlightService>>();
656 l.add(IDatagridService.class);
657 return l;
658 }
659
660 /**
661 * Get the collection of implemented services.
662 *
663 * @return the collection of implemented services.
664 */
665 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800666 public Map<Class<? extends IFloodlightService>, IFloodlightService>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700667 getServiceImpls() {
668 Map<Class<? extends IFloodlightService>,
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800669 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700670 new HashMap<Class<? extends IFloodlightService>,
671 IFloodlightService>();
672 m.put(IDatagridService.class, this);
673 return m;
674 }
675
676 /**
677 * Get the collection of modules this module depends on.
678 *
679 * @return the collection of modules this module depends on.
680 */
681 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800682 public Collection<Class<? extends IFloodlightService>>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700683 getModuleDependencies() {
684 Collection<Class<? extends IFloodlightService>> l =
685 new ArrayList<Class<? extends IFloodlightService>>();
686 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700687 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700688 return l;
689 }
690
691 /**
692 * Initialize the module.
693 *
694 * @param context the module context to use for the initialization.
695 */
696 @Override
697 public void init(FloodlightModuleContext context)
698 throws FloodlightModuleException {
699 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700700 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700701
702 // Get the configuration file name and configure the Datagrid
703 Map<String, String> configMap = context.getConfigParams(this);
704 String configFilename = configMap.get(HazelcastConfigFile);
705 this.init(configFilename);
706 }
707
708 /**
709 * Startup module operation.
710 *
711 * @param context the module context to use for the startup.
712 */
713 @Override
714 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700715 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700716
717 restApi.addRestletRoutable(new DatagridWebRoutable());
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800718
Jonathan Hart7804bea2014-01-07 10:50:52 -0800719 packetOutMap = hazelcastInstance.getMap(packetOutMapName);
720 packetOutMap.addEntryListener(new PacketOutMapListener(), true);
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800721
Jonathan Hart7804bea2014-01-07 10:50:52 -0800722 arpReplyMap = hazelcastInstance.getMap(arpReplyMapName);
723 arpReplyMap.addEntryListener(new ArpReplyMapListener(), true);
Toshio Koide3738ee52014-02-12 14:57:39 -0800724 intentList = hazelcastInstance.getList(intentListName);
725
726
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700727 }
728
729 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700730 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700731 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700732 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700733 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700734 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700735 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700736 */
737 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700738 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
739 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700740
741 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700742 mapFlowListener = new MapFlowListener();
743 mapFlow = hazelcastInstance.getMap(mapFlowName);
744 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700745
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700746 // Initialize the FlowEntry-related map state
747 mapFlowEntryListener = new MapFlowEntryListener();
748 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
749 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
750
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800751 // Initialize the FlowId-related map state
752 mapFlowIdListener = new MapFlowIdListener();
753 mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
754 mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
755
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800756 // Initialize the FlowEntryId-related map state
757 mapFlowEntryIdListener = new MapFlowEntryIdListener();
758 mapFlowEntryId = hazelcastInstance.getMap(mapFlowEntryIdName);
759 mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);
760
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700761 // Initialize the Topology-related map state
762 mapTopologyListener = new MapTopologyListener();
763 mapTopology = hazelcastInstance.getMap(mapTopologyName);
764 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700765 }
766
767 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700768 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700769 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700770 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700771 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700772 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700773 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700774 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700775 */
776 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700777 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700778 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700779 mapFlow.removeEntryListener(mapFlowListenerId);
780 mapFlow = null;
781 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700782
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700783 // Clear the FlowEntry-related map state
784 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
785 mapFlowEntry = null;
786 mapFlowEntryListener = null;
787
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800788 // Clear the FlowId-related map state
789 mapFlowId.removeEntryListener(mapFlowIdListenerId);
790 mapFlowId = null;
791 mapFlowIdListener = null;
792
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800793 // Clear the FlowEntryId-related map state
794 mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
795 mapFlowEntryId = null;
796 mapFlowEntryIdListener = null;
797
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700798 // Clear the Topology-related map state
799 mapTopology.removeEntryListener(mapTopologyListenerId);
800 mapTopology = null;
801 mapTopologyListener = null;
802
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700803 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700804 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800805
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800806 @Override
Jonathan Hartc6325622014-01-14 16:37:50 -0800807 public void registerPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
808 if (packetOutEventHandler != null) {
809 packetOutEventHandlers.add(packetOutEventHandler);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800810 }
811 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800812
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800813 @Override
Jonathan Hartc6325622014-01-14 16:37:50 -0800814 public void deregisterPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
815 packetOutEventHandlers.remove(packetOutEventHandler);
Jonathan Hart7804bea2014-01-07 10:50:52 -0800816 }
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800817
Jonathan Hart7804bea2014-01-07 10:50:52 -0800818 @Override
819 public void registerArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
820 if (arpReplyEventHandler != null) {
821 arpReplyEventHandlers.add(arpReplyEventHandler);
822 }
823 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800824
Jonathan Hart7804bea2014-01-07 10:50:52 -0800825 @Override
826 public void deregisterArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
827 arpReplyEventHandlers.remove(arpReplyEventHandler);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800828 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800829
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700830 /**
831 * Get all Flows that are currently in the datagrid.
832 *
833 * @return all Flows that are currently in the datagrid.
834 */
835 @Override
836 public Collection<FlowPath> getAllFlows() {
837 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
838
839 //
840 // Get all current entries
841 //
842 Collection<byte[]> values = mapFlow.values();
843 Kryo kryo = kryoFactory.newKryo();
844 for (byte[] valueBytes : values) {
845 //
846 // Decode the value
847 //
848 Input input = new Input(valueBytes);
849 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
850 allFlows.add(flowPath);
851 }
852 kryoFactory.deleteKryo(kryo);
853
854 return allFlows;
855 }
856
857 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800858 * Get a Flow for a given Flow ID.
859 *
860 * @param flowId the Flow ID of the Flow to get.
861 * @return the Flow if found, otherwise null.
862 */
863 @Override
864 public FlowPath getFlow(FlowId flowId) {
865 byte[] valueBytes = mapFlow.get(flowId.value());
866 if (valueBytes == null)
867 return null;
868
869 Kryo kryo = kryoFactory.newKryo();
870 //
871 // Decode the value
872 //
873 Input input = new Input(valueBytes);
874 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
875 kryoFactory.deleteKryo(kryo);
876
877 return flowPath;
878 }
879
880 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700881 * Send a notification that a Flow is added.
882 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700883 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700884 */
885 @Override
886 public void notificationSendFlowAdded(FlowPath flowPath) {
887 //
888 // Encode the value
889 //
890 byte[] buffer = new byte[MAX_BUFFER_SIZE];
891 Kryo kryo = kryoFactory.newKryo();
892 Output output = new Output(buffer, -1);
893 kryo.writeObject(output, flowPath);
894 byte[] valueBytes = output.toBytes();
895 kryoFactory.deleteKryo(kryo);
896
897 //
898 // Put the entry:
899 // - Key : Flow ID (Long)
900 // - Value : Serialized Flow (byte[])
901 //
902 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
903 }
904
905 /**
906 * Send a notification that a Flow is removed.
907 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700908 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700909 */
910 @Override
911 public void notificationSendFlowRemoved(FlowId flowId) {
912 //
913 // Remove the entry:
914 // - Key : Flow ID (Long)
915 // - Value : Serialized Flow (byte[])
916 //
917 mapFlow.removeAsync(flowId.value());
918 }
919
920 /**
921 * Send a notification that a Flow is updated.
922 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700923 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700924 */
925 @Override
926 public void notificationSendFlowUpdated(FlowPath flowPath) {
927 // NOTE: Adding an entry with an existing key automatically updates it
928 notificationSendFlowAdded(flowPath);
929 }
930
931 /**
932 * Send a notification that all Flows are removed.
933 */
934 @Override
935 public void notificationSendAllFlowsRemoved() {
936 //
937 // Remove all entries
938 // NOTE: We remove the entries one-by-one so the per-entry
939 // notifications will be delivered.
940 //
941 // mapFlow.clear();
942 Set<Long> keySet = mapFlow.keySet();
943 for (Long key : keySet) {
944 mapFlow.removeAsync(key);
945 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700946 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700947
948 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700949 * Get all Flow Entries that are currently in the datagrid.
950 *
951 * @return all Flow Entries that are currently in the datagrid.
952 */
953 @Override
954 public Collection<FlowEntry> getAllFlowEntries() {
955 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
956
957 //
958 // Get all current entries
959 //
960 Collection<byte[]> values = mapFlowEntry.values();
961 Kryo kryo = kryoFactory.newKryo();
962 for (byte[] valueBytes : values) {
963 //
964 // Decode the value
965 //
966 Input input = new Input(valueBytes);
967 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
968 allFlowEntries.add(flowEntry);
969 }
970 kryoFactory.deleteKryo(kryo);
971
972 return allFlowEntries;
973 }
974
975 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800976 * Get a Flow Entry for a given Flow Entry ID.
977 *
978 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
979 * @return the Flow Entry if found, otherwise null.
980 */
981 @Override
982 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
983 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
984 if (valueBytes == null)
985 return null;
986
987 Kryo kryo = kryoFactory.newKryo();
988 //
989 // Decode the value
990 //
991 Input input = new Input(valueBytes);
992 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
993 kryoFactory.deleteKryo(kryo);
994
995 return flowEntry;
996 }
997
998 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700999 * Send a notification that a FlowEntry is added.
1000 *
1001 * @param flowEntry the FlowEntry that is added.
1002 */
1003 @Override
1004 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
1005 //
1006 // Encode the value
1007 //
1008 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1009 Kryo kryo = kryoFactory.newKryo();
1010 Output output = new Output(buffer, -1);
1011 kryo.writeObject(output, flowEntry);
1012 byte[] valueBytes = output.toBytes();
1013 kryoFactory.deleteKryo(kryo);
1014
1015 //
1016 // Put the entry:
1017 // - Key : FlowEntry ID (Long)
1018 // - Value : Serialized FlowEntry (byte[])
1019 //
1020 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
1021 }
1022
1023 /**
1024 * Send a notification that a FlowEntry is removed.
1025 *
1026 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
1027 */
1028 @Override
1029 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
1030 //
1031 // Remove the entry:
1032 // - Key : FlowEntry ID (Long)
1033 // - Value : Serialized FlowEntry (byte[])
1034 //
1035 mapFlowEntry.removeAsync(flowEntryId.value());
1036 }
1037
1038 /**
1039 * Send a notification that a FlowEntry is updated.
1040 *
1041 * @param flowEntry the FlowEntry that is updated.
1042 */
1043 @Override
1044 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
1045 // NOTE: Adding an entry with an existing key automatically updates it
1046 notificationSendFlowEntryAdded(flowEntry);
1047 }
1048
1049 /**
1050 * Send a notification that all Flow Entries are removed.
1051 */
1052 @Override
1053 public void notificationSendAllFlowEntriesRemoved() {
1054 //
1055 // Remove all entries
1056 // NOTE: We remove the entries one-by-one so the per-entry
1057 // notifications will be delivered.
1058 //
1059 // mapFlowEntry.clear();
1060 Set<Long> keySet = mapFlowEntry.keySet();
1061 for (Long key : keySet) {
1062 mapFlowEntry.removeAsync(key);
1063 }
1064 }
1065
1066 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001067 * Get all Flow IDs that are currently in the datagrid.
1068 *
1069 * @return all Flow IDs that are currently in the datagrid.
1070 */
1071 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001072 public Collection<Pair<FlowId, Dpid>> getAllFlowIds() {
1073 Collection<Pair<FlowId, Dpid>> allFlowIds =
1074 new LinkedList<Pair<FlowId, Dpid>>();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001075
1076 //
1077 // Get all current entries
1078 //
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001079 Kryo kryo = kryoFactory.newKryo();
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001080 for (Map.Entry<Long, byte[]> entry : mapFlowId.entrySet()) {
1081 Long key = entry.getKey();
1082 byte[] valueBytes = entry.getValue();
1083
1084 FlowId flowId = new FlowId(key);
1085
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001086 //
1087 // Decode the value
1088 //
1089 Input input = new Input(valueBytes);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001090 Dpid dpid = kryo.readObject(input, Dpid.class);
1091
1092 Pair<FlowId, Dpid> pair = new Pair(flowId, dpid);
1093 allFlowIds.add(pair);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001094 }
1095 kryoFactory.deleteKryo(kryo);
1096
1097 return allFlowIds;
1098 }
1099
1100 /**
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001101 * Get all Flow Entry IDs that are currently in the datagrid.
1102 *
1103 * @return all Flow Entry IDs that ae currently in the datagrid.
1104 */
1105 @Override
1106 public Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds() {
1107 Collection<Pair<FlowEntryId, Dpid>> allFlowEntryIds =
1108 new LinkedList<Pair<FlowEntryId, Dpid>>();
1109
1110 //
1111 // Get all current entries
1112 //
1113 Kryo kryo = kryoFactory.newKryo();
1114 for (Map.Entry<Long, byte[]> entry : mapFlowEntryId.entrySet()) {
1115 Long key = entry.getKey();
1116 byte[] valueBytes = entry.getValue();
1117
1118 FlowEntryId flowEntryId = new FlowEntryId(key);
1119
1120 //
1121 // Decode the value
1122 //
1123 Input input = new Input(valueBytes);
1124 Dpid dpid = kryo.readObject(input, Dpid.class);
1125
1126 Pair<FlowEntryId, Dpid> pair = new Pair(flowEntryId, dpid);
1127 allFlowEntryIds.add(pair);
1128 }
1129 kryoFactory.deleteKryo(kryo);
1130
1131 return allFlowEntryIds;
1132 }
1133
1134 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001135 * Send a notification that a FlowId is added.
1136 *
1137 * @param flowId the FlowId that is added.
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001138 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001139 */
1140 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001141 public void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid) {
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001142 //
1143 // Encode the value
1144 //
1145 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1146 Kryo kryo = kryoFactory.newKryo();
1147 Output output = new Output(buffer, -1);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001148 kryo.writeObject(output, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001149 byte[] valueBytes = output.toBytes();
1150 kryoFactory.deleteKryo(kryo);
1151
1152 //
1153 // Put the entry:
1154 // - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001155 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001156 //
1157 mapFlowId.putAsync(flowId.value(), valueBytes);
1158 }
1159
1160 /**
1161 * Send a notification that a FlowId is removed.
1162 *
1163 * @param flowId the FlowId that is removed.
1164 */
1165 @Override
1166 public void notificationSendFlowIdRemoved(FlowId flowId) {
1167 //
1168 // Remove the entry:
1169 // - Key : FlowId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001170 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001171 //
1172 mapFlowId.removeAsync(flowId.value());
1173 }
1174
1175 /**
1176 * Send a notification that a FlowId is updated.
1177 *
1178 * @param flowId the FlowId that is updated.
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001179 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001180 */
1181 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001182 public void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid) {
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001183 // NOTE: Adding an entry with an existing key automatically updates it
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001184 notificationSendFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001185 }
1186
1187 /**
1188 * Send a notification that all Flow IDs are removed.
1189 */
1190 @Override
1191 public void notificationSendAllFlowIdsRemoved() {
1192 //
1193 // Remove all entries
1194 // NOTE: We remove the entries one-by-one so the per-entry
1195 // notifications will be delivered.
1196 //
1197 // mapFlowId.clear();
1198 Set<Long> keySet = mapFlowId.keySet();
1199 for (Long key : keySet) {
1200 mapFlowId.removeAsync(key);
1201 }
1202 }
1203
1204 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001205 * Send a notification that a FlowEntryId is added.
1206 *
1207 * @param flowEntryId the FlowEntryId that is added.
1208 * @param dpid the Switch Dpid.
1209 */
1210 @Override
1211 public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
1212 Dpid dpid) {
1213 //
1214 // Encode the value
1215 //
1216 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1217 Kryo kryo = kryoFactory.newKryo();
1218 Output output = new Output(buffer, -1);
1219 kryo.writeObject(output, dpid);
1220 byte[] valueBytes = output.toBytes();
1221 kryoFactory.deleteKryo(kryo);
1222
1223 //
1224 // Put the entry:
1225 // - Key : FlowEntryId (Long)
1226 // - Value : Serialized Switch Dpid (byte[])
1227 //
1228 mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
1229 }
1230
1231 /**
1232 * Send a notification that a FlowEntryId is removed.
1233 *
1234 * @param flowEntryId the FlowEntryId that is removed.
1235 */
1236 @Override
1237 public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
1238 //
1239 // Remove the entry:
1240 // - Key : FlowEntryId (Long)
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001241 // - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001242 //
1243 mapFlowEntryId.removeAsync(flowEntryId.value());
1244 }
1245
1246 /**
1247 * Send a notification that a FlowEntryId is updated.
1248 *
1249 * @param flowEntryId the FlowEntryId that is updated.
1250 * @param dpid the Switch Dpid.
1251 */
1252 @Override
1253 public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
1254 Dpid dpid) {
1255 // NOTE: Adding an entry with an existing key automatically updates it
1256 notificationSendFlowEntryIdAdded(flowEntryId, dpid);
1257 }
1258
1259 /**
1260 * Send a notification that all Flow Entry IDs are removed.
1261 */
1262 @Override
1263 public void notificationSendAllFlowEntryIdsRemoved() {
1264 //
1265 // Remove all entries
1266 // NOTE: We remove the entries one-by-one so the per-entry
1267 // notifications will be delivered.
1268 //
1269 // mapFlowEntryId.clear();
1270 Set<Long> keySet = mapFlowEntryId.keySet();
1271 for (Long key : keySet) {
1272 mapFlowEntryId.removeAsync(key);
1273 }
1274 }
1275
1276 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001277 * Get all Topology Elements that are currently in the datagrid.
1278 *
1279 * @return all Topology Elements that are currently in the datagrid.
1280 */
1281 @Override
1282 public Collection<TopologyElement> getAllTopologyElements() {
1283 Collection<TopologyElement> allTopologyElements =
1284 new LinkedList<TopologyElement>();
1285
1286 //
1287 // Get all current entries
1288 //
1289 Collection<byte[]> values = mapTopology.values();
1290 Kryo kryo = kryoFactory.newKryo();
1291 for (byte[] valueBytes : values) {
1292 //
1293 // Decode the value
1294 //
1295 Input input = new Input(valueBytes);
1296 TopologyElement topologyElement =
1297 kryo.readObject(input, TopologyElement.class);
1298 allTopologyElements.add(topologyElement);
1299 }
1300 kryoFactory.deleteKryo(kryo);
1301
1302 return allTopologyElements;
1303 }
1304
1305 /**
1306 * Send a notification that a Topology Element is added.
1307 *
1308 * @param topologyElement the Topology Element that is added.
1309 */
1310 @Override
1311 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
1312 //
1313 // Encode the value
1314 //
1315 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1316 Kryo kryo = kryoFactory.newKryo();
1317 Output output = new Output(buffer, -1);
1318 kryo.writeObject(output, topologyElement);
1319 byte[] valueBytes = output.toBytes();
1320 kryoFactory.deleteKryo(kryo);
1321
1322 //
1323 // Put the entry:
1324 // - Key : TopologyElement ID (String)
1325 // - Value : Serialized TopologyElement (byte[])
1326 //
1327 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
1328 }
1329
1330 /**
1331 * Send a notification that a Topology Element is removed.
1332 *
1333 * @param topologyElement the Topology Element that is removed.
1334 */
1335 @Override
1336 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
1337 //
1338 // Remove the entry:
1339 // - Key : TopologyElement ID (String)
1340 // - Value : Serialized TopologyElement (byte[])
1341 //
1342 mapTopology.removeAsync(topologyElement.elementId());
1343 }
1344
1345 /**
1346 * Send a notification that a Topology Element is updated.
1347 *
1348 * @param topologyElement the Topology Element that is updated.
1349 */
1350 @Override
1351 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
1352 // NOTE: Adding an entry with an existing key automatically updates it
1353 notificationSendTopologyElementAdded(topologyElement);
1354 }
1355
1356 /**
1357 * Send a notification that all Topology Elements are removed.
1358 */
1359 @Override
1360 public void notificationSendAllTopologyElementsRemoved() {
1361 //
1362 // Remove all entries
1363 // NOTE: We remove the entries one-by-one so the per-entry
1364 // notifications will be delivered.
1365 //
1366 // mapTopology.clear();
1367 Set<String> keySet = mapTopology.keySet();
1368 for (String key : keySet) {
1369 mapTopology.removeAsync(key);
1370 }
1371 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -08001372
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001373 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -08001374 public void sendPacketOutNotification(PacketOutNotification packetOutNotification) {
1375 packetOutMap.putAsync(packetOutNotification, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001376 }
Jonathan Hart7804bea2014-01-07 10:50:52 -08001377
1378 @Override
1379 public void sendArpReplyNotification(ArpReplyNotification arpReply) {
1380 arpReplyMap.putAsync(arpReply, dummyByte, 1L, TimeUnit.MILLISECONDS);
1381 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001382}