blob: 7c86fc3f51604fefa10963f0611e7b6a1ae5542b [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
TeruU80ce5062014-03-03 17:16:13 -080020import net.onrc.onos.ofcontroller.devicemanager.IDeviceEventHandler;
21import net.onrc.onos.ofcontroller.devicemanager.OnosDevice;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070022import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hart7804bea2014-01-07 10:50:52 -080023import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
24import net.onrc.onos.ofcontroller.proxyarp.IArpReplyEventHandler;
25import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
26import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070027import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080028import net.onrc.onos.ofcontroller.util.Dpid;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070029import net.onrc.onos.ofcontroller.util.FlowEntry;
30import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070031import net.onrc.onos.ofcontroller.util.FlowId;
32import net.onrc.onos.ofcontroller.util.FlowPath;
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -080033import net.onrc.onos.ofcontroller.util.Pair;
Pavlin Radoslavov6b3b4ad2014-03-19 11:58:06 -070034import net.onrc.onos.ofcontroller.util.PerformanceMonitor;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070035import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
36
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070037import org.slf4j.Logger;
38import org.slf4j.LoggerFactory;
39
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -080040import com.esotericsoftware.kryo.Kryo;
41import com.esotericsoftware.kryo.io.Input;
42import com.esotericsoftware.kryo.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070043import com.hazelcast.config.Config;
44import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070045import com.hazelcast.core.EntryEvent;
46import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070047import com.hazelcast.core.Hazelcast;
48import com.hazelcast.core.HazelcastInstance;
Toshio Koide3738ee52014-02-12 14:57:39 -080049import com.hazelcast.core.IList;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070050import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070051import com.hazelcast.instance.GroupProperties;
Toshio Koide3738ee52014-02-12 14:57:39 -080052import net.onrc.onos.intent.Intent;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070053
54/**
55 * A datagrid service that uses Hazelcast as a datagrid.
56 * The relevant data is stored in the Hazelcast datagrid and shared as
57 * appropriate in a multi-node cluster.
58 */
59public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -070060 private static final int MAX_BUFFER_SIZE = 64 * 1024;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070061
Ray Milkey0ab2d8a2014-03-20 14:30:10 -070062 static final Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
63 private IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070064
Ray Milkey0ab2d8a2014-03-20 14:30:10 -070065 static final String HAZELCAST_CONFIG_FILE = "datagridConfig";
66 private HazelcastInstance hazelcastInstance;
67 private Config hazelcastConfig;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070068
Ray Milkey0ab2d8a2014-03-20 14:30:10 -070069 private final KryoFactory kryoFactory = new KryoFactory();
70 private IFlowEventHandlerService flowEventHandlerService;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070071
Ray Milkey0ab2d8a2014-03-20 14:30:10 -070072 private final Map<String, IEventChannel<?, ?>> eventChannels = new HashMap<>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080073
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070074 // State related to the Flow map
Ray Milkey0ab2d8a2014-03-20 14:30:10 -070075 private static final String MAP_FLOW_NAME = "mapFlow";
76 private IMap<Long, byte[]> mapFlow;
77 private MapFlowListener mapFlowListener;
78 private String mapFlowListenerId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070079
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070080 // State related to the Flow Entry map
Ray Milkey0ab2d8a2014-03-20 14:30:10 -070081 private static final String MAP_FLOW_ENTRY_NAME = "mapFlowEntry";
82 private IMap<Long, byte[]> mapFlowEntry;
83 private MapFlowEntryListener mapFlowEntryListener;
84 private String mapFlowEntryListenerId;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070085
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -080086 // State related to the Flow ID map
Ray Milkey0ab2d8a2014-03-20 14:30:10 -070087 private static final String MAP_FLOW_ID_NAME = "mapFlowId";
88 private IMap<Long, byte[]> mapFlowId;
89 private MapFlowIdListener mapFlowIdListener;
90 private String mapFlowIdListenerId;
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -080091
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080092 // State related to the Flow Entry ID map
Ray Milkey0ab2d8a2014-03-20 14:30:10 -070093 private static final String MAP_FLOW_ENTRY_ID_NAME = "mapFlowEntryId";
94 private IMap<Long, byte[]> mapFlowEntryId;
95 private MapFlowEntryIdListener mapFlowEntryIdListener;
96 private String mapFlowEntryIdListenerId;
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -080097
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070098 // State related to the Network Topology map
Ray Milkey0ab2d8a2014-03-20 14:30:10 -070099 private static final String MAP_TOPOLOGY_NAME = "mapTopology";
100 private IMap<String, byte[]> mapTopology;
101 private MapTopologyListener mapTopologyListener;
102 private String mapTopologyListenerId;
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800103
Jonathan Hart7804bea2014-01-07 10:50:52 -0800104 // State related to the packet out map
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700105 private static final String PACKET_OUT_MAP_NAME = "packetOutMap";
106 private IMap<PacketOutNotification, byte[]> packetOutMap;
107 private final List<IPacketOutEventHandler> packetOutEventHandlers = new ArrayList<>();
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800108
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800109 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700110
Jonathan Hart7804bea2014-01-07 10:50:52 -0800111 // State related to the ARP reply map
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700112 private static final String ARP_REPLY_MAP_NAME = "arpReplyMap";
113 private IMap<ArpReplyNotification, byte[]> arpReplyMap;
114 private final List<IArpReplyEventHandler> arpReplyEventHandlers = new ArrayList<>();
115
116
117 private static final String INTENT_LIST_NAME = "intentList";
118 private IList<Intent> intentList;
Toshio Koide3738ee52014-02-12 14:57:39 -0800119
120 @Override
121 public void registerIntent(Collection<Intent> intents) {
122 intentList.addAll(intents);
123 }
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700124
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700125
TeruU80ce5062014-03-03 17:16:13 -0800126 // State related to the Network Device map
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700127 private static final String MAP_DEVICE_NAME = "mapDevice";
128 private IMap<Long, OnosDevice> mapDevice;
129 private final List<IDeviceEventHandler> deviceEventHandlers = new ArrayList<>();
130
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700131 /**
132 * Class for receiving notifications for Flow state.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700133 * <p/>
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700134 * The datagrid map is:
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700135 * - Key : Flow ID (Long)
136 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700137 */
138 class MapFlowListener implements EntryListener<Long, byte[]> {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700139 /**
140 * Receive a notification that an entry is added.
141 *
142 * @param event the notification event for the entry.
143 */
144 @Override
145 public void entryAdded(EntryEvent<Long, byte[]> event) {
146 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700147
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700148 //
149 // Decode the value and deliver the notification
150 //
151 Kryo kryo = kryoFactory.newKryo();
152 Input input = new Input(valueBytes);
153 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
154 kryoFactory.deleteKryo(kryo);
155 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
156 }
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700157
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700158 /**
159 * Receive a notification that an entry is removed.
160 *
161 * @param event the notification event for the entry.
162 */
163 @Override
164 public void entryRemoved(EntryEvent<Long, byte[]> event) {
165 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700166
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700167 //
168 // Decode the value and deliver the notification
169 //
170 Kryo kryo = kryoFactory.newKryo();
171 Input input = new Input(valueBytes);
172 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
173 kryoFactory.deleteKryo(kryo);
174 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
175 }
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700176
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700177 /**
178 * Receive a notification that an entry is updated.
179 *
180 * @param event the notification event for the entry.
181 */
182 @Override
183 public void entryUpdated(EntryEvent<Long, byte[]> event) {
184 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700185
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700186 //
187 // Decode the value and deliver the notification
188 //
189 Kryo kryo = kryoFactory.newKryo();
190 Input input = new Input(valueBytes);
191 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
192 kryoFactory.deleteKryo(kryo);
193 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
194 }
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700195
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700196 /**
197 * Receive a notification that an entry is evicted.
198 *
199 * @param event the notification event for the entry.
200 */
201 @Override
202 public void entryEvicted(EntryEvent<Long, byte[]> event) {
203 // NOTE: We don't use eviction for this map
204 }
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700205 }
206
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700207 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700208 * Class for receiving notifications for FlowEntry state.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700209 * <p/>
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700210 * The datagrid map is:
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700211 * - Key : FlowEntry ID (Long)
212 * - Value : Serialized FlowEntry (byte[])
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700213 */
214 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700215 /**
216 * Receive a notification that an entry is added.
217 *
218 * @param event the notification event for the entry.
219 */
220 @Override
221 public void entryAdded(EntryEvent<Long, byte[]> event) {
222 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700223
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700224 //
225 // Decode the value and deliver the notification
226 //
227 Kryo kryo = kryoFactory.newKryo();
228 Input input = new Input(valueBytes);
229 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
230 kryoFactory.deleteKryo(kryo);
231 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
232 }
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700233
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700234 /**
235 * Receive a notification that an entry is removed.
236 *
237 * @param event the notification event for the entry.
238 */
239 @Override
240 public void entryRemoved(EntryEvent<Long, byte[]> event) {
241 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700242
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700243 //
244 // Decode the value and deliver the notification
245 //
246 Kryo kryo = kryoFactory.newKryo();
247 Input input = new Input(valueBytes);
248 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
249 kryoFactory.deleteKryo(kryo);
250 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
251 }
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700252
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700253 /**
254 * Receive a notification that an entry is updated.
255 *
256 * @param event the notification event for the entry.
257 */
258 @Override
259 public void entryUpdated(EntryEvent<Long, byte[]> event) {
260 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700261
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700262 //
263 // Decode the value and deliver the notification
264 //
265 Kryo kryo = kryoFactory.newKryo();
266 Input input = new Input(valueBytes);
267 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
268 kryoFactory.deleteKryo(kryo);
269 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
270 }
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700271
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700272 /**
273 * Receive a notification that an entry is evicted.
274 *
275 * @param event the notification event for the entry.
276 */
277 @Override
278 public void entryEvicted(EntryEvent<Long, byte[]> event) {
279 // NOTE: We don't use eviction for this map
280 }
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700281 }
282
283 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800284 * Class for receiving notifications for FlowId state.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700285 * <p/>
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800286 * The datagrid map is:
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700287 * - Key : FlowId (Long)
288 * - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800289 */
290 class MapFlowIdListener implements EntryListener<Long, byte[]> {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700291 /**
292 * Receive a notification that an entry is added.
293 *
294 * @param event the notification event for the entry.
295 */
296 public void entryAdded(EntryEvent<Long, byte[]> event) {
297 Long keyLong = event.getKey();
298 FlowId flowId = new FlowId(keyLong);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800299
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700300 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800301
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700302 //
303 // Decode the value and deliver the notification
304 //
305 Kryo kryo = kryoFactory.newKryo();
306 Input input = new Input(valueBytes);
307 Dpid dpid = kryo.readObject(input, Dpid.class);
308 kryoFactory.deleteKryo(kryo);
309 flowEventHandlerService.notificationRecvFlowIdAdded(flowId, dpid);
310 }
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800311
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700312 /**
313 * Receive a notification that an entry is removed.
314 *
315 * @param event the notification event for the entry.
316 */
317 public void entryRemoved(EntryEvent<Long, byte[]> event) {
318 Long keyLong = event.getKey();
319 FlowId flowId = new FlowId(keyLong);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800320
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700321 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800322
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700323 //
324 // Decode the value and deliver the notification
325 //
326 Kryo kryo = kryoFactory.newKryo();
327 Input input = new Input(valueBytes);
328 Dpid dpid = kryo.readObject(input, Dpid.class);
329 kryoFactory.deleteKryo(kryo);
330 flowEventHandlerService.notificationRecvFlowIdRemoved(flowId, dpid);
331 }
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800332
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700333 /**
334 * Receive a notification that an entry is updated.
335 *
336 * @param event the notification event for the entry.
337 */
338 public void entryUpdated(EntryEvent<Long, byte[]> event) {
339 Long keyLong = event.getKey();
340 FlowId flowId = new FlowId(keyLong);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -0800341
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700342 byte[] valueBytes = event.getValue();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800343
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700344 //
345 // Decode the value and deliver the notification
346 //
347 Kryo kryo = kryoFactory.newKryo();
348 Input input = new Input(valueBytes);
349 Dpid dpid = kryo.readObject(input, Dpid.class);
350 kryoFactory.deleteKryo(kryo);
351 flowEventHandlerService.notificationRecvFlowIdUpdated(flowId, dpid);
352 }
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800353
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700354 /**
355 * Receive a notification that an entry is evicted.
356 *
357 * @param event the notification event for the entry.
358 */
359 public void entryEvicted(EntryEvent<Long, byte[]> event) {
360 // NOTE: We don't use eviction for this map
361 }
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800362 }
363
364 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800365 * Class for receiving notifications for FlowEntryId state.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700366 * <p/>
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800367 * The datagrid map is:
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700368 * - Key : FlowEntryId (Long)
369 * - Value : Serialized Switch Dpid (byte[])
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800370 */
371 class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700372 /**
373 * Receive a notification that an entry is added.
374 *
375 * @param event the notification event for the entry.
376 */
377 public void entryAdded(EntryEvent<Long, byte[]> event) {
378 Long keyLong = event.getKey();
379 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800380
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700381 byte[] valueBytes = event.getValue();
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800382
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700383 //
384 // Decode the value and deliver the notification
385 //
386 Kryo kryo = kryoFactory.newKryo();
387 Input input = new Input(valueBytes);
388 Dpid dpid = kryo.readObject(input, Dpid.class);
389 kryoFactory.deleteKryo(kryo);
390 flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
391 }
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800392
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700393 /**
394 * Receive a notification that an entry is removed.
395 *
396 * @param event the notification event for the entry.
397 */
398 public void entryRemoved(EntryEvent<Long, byte[]> event) {
399 Long keyLong = event.getKey();
400 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800401
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700402 byte[] valueBytes = event.getValue();
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800403
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700404 //
405 // Decode the value and deliver the notification
406 //
407 Kryo kryo = kryoFactory.newKryo();
408 Input input = new Input(valueBytes);
409 Dpid dpid = kryo.readObject(input, Dpid.class);
410 kryoFactory.deleteKryo(kryo);
411 flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
412 }
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800413
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700414 /**
415 * Receive a notification that an entry is updated.
416 *
417 * @param event the notification event for the entry.
418 */
419 public void entryUpdated(EntryEvent<Long, byte[]> event) {
420 Long keyLong = event.getKey();
421 FlowEntryId flowEntryId = new FlowEntryId(keyLong);
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800422
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700423 byte[] valueBytes = event.getValue();
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800424
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700425 //
426 // Decode the value and deliver the notification
427 //
428 Kryo kryo = kryoFactory.newKryo();
429 Input input = new Input(valueBytes);
430 Dpid dpid = kryo.readObject(input, Dpid.class);
431 kryoFactory.deleteKryo(kryo);
432 flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
433 }
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800434
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700435 /**
436 * Receive a notification that an entry is evicted.
437 *
438 * @param event the notification event for the entry.
439 */
440 public void entryEvicted(EntryEvent<Long, byte[]> event) {
441 // NOTE: We don't use eviction for this map
442 }
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800443 }
444
445 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700446 * Class for receiving notifications for Network Topology state.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700447 * <p/>
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700448 * The datagrid map is:
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700449 * - Key: TopologyElement ID (String)
450 * - Value: Serialized TopologyElement (byte[])
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700451 */
452 class MapTopologyListener implements EntryListener<String, byte[]> {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700453 /**
454 * Receive a notification that an entry is added.
455 *
456 * @param event the notification event for the entry.
457 */
458 @Override
459 public void entryAdded(EntryEvent<String, byte[]> event) {
460 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700461
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700462 //
463 // Decode the value and deliver the notification
464 //
465 Kryo kryo = kryoFactory.newKryo();
466 Input input = new Input(valueBytes);
467 TopologyElement topologyElement =
468 kryo.readObject(input, TopologyElement.class);
469 kryoFactory.deleteKryo(kryo);
470 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
471 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700472
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700473 /**
474 * Receive a notification that an entry is removed.
475 *
476 * @param event the notification event for the entry.
477 */
478 @Override
479 public void entryRemoved(EntryEvent<String, byte[]> event) {
480// String tag = "TopologyEntryRemoved.NotificationReceived." + event.getKey();
481 String tag = "TopologyEntryRemoved.NotificationReceived";
482 PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
483 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700484
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700485 //
486 // Decode the value and deliver the notification
487 //
488 Kryo kryo = kryoFactory.newKryo();
489 Input input = new Input(valueBytes);
490 TopologyElement topologyElement =
491 kryo.readObject(input, TopologyElement.class);
492 kryoFactory.deleteKryo(kryo);
493 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
494// PerformanceMonitor.stop(tag);
495 m.stop();
496// PerformanceMonitor.report(tag);
497 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700498
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700499 /**
500 * Receive a notification that an entry is updated.
501 *
502 * @param event the notification event for the entry.
503 */
504 @Override
505 public void entryUpdated(EntryEvent<String, byte[]> event) {
506 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700507
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700508 //
509 // Decode the value and deliver the notification
510 //
511 Kryo kryo = kryoFactory.newKryo();
512 Input input = new Input(valueBytes);
513 TopologyElement topologyElement =
514 kryo.readObject(input, TopologyElement.class);
515 kryoFactory.deleteKryo(kryo);
516 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
517 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700518
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700519 /**
520 * Receive a notification that an entry is evicted.
521 *
522 * @param event the notification event for the entry.
523 */
524 @Override
525 public void entryEvicted(EntryEvent<String, byte[]> event) {
526 // NOTE: We don't use eviction for this map
527 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700528 }
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700529
530 /**
531 * MapDeviceListener - reacts to Device related events.
532 */
TeruU80ce5062014-03-03 17:16:13 -0800533 class MapDeviceListener implements EntryListener<Long, OnosDevice> {
534
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700535 @Override
536 public void entryAdded(EntryEvent<Long, OnosDevice> event) {
537 for (IDeviceEventHandler deviceEventHandler : deviceEventHandlers) {
538 deviceEventHandler.addDeviceEvent(event.getKey(), event.getValue());
539 }
540 }
541
542 @Override
543 public void entryRemoved(EntryEvent<Long, OnosDevice> event) {
544 for (IDeviceEventHandler deviceEventHandler : deviceEventHandlers) {
545 deviceEventHandler.deleteDeviceEvent(event.getKey(), event.getValue());
546 }
547 }
548
549 @Override
550 public void entryUpdated(EntryEvent<Long, OnosDevice> event) {
551 for (IDeviceEventHandler deviceEventHandler : deviceEventHandlers) {
552 deviceEventHandler.updateDeviceEvent(event.getKey(), event.getValue());
553 }
554 }
555
556 @Override
557 public void entryEvicted(EntryEvent<Long, OnosDevice> arg0) {
558 //Not used.
559 }
TeruU80ce5062014-03-03 17:16:13 -0800560 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800561
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800562 /**
Jonathan Hart7804bea2014-01-07 10:50:52 -0800563 * Class for receiving notifications for sending packet-outs.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700564 * <p/>
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800565 * The datagrid map is:
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700566 * - Key: Packet-out to send (PacketOutNotification)
567 * - Value: dummy value (we only need the key) (byte[])
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800568 */
Jonathan Hart7804bea2014-01-07 10:50:52 -0800569 class PacketOutMapListener implements EntryListener<PacketOutNotification, byte[]> {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700570 /**
571 * Receive a notification that an entry is added.
572 *
573 * @param event the notification event for the entry.
574 */
575 @Override
576 public void entryAdded(EntryEvent<PacketOutNotification, byte[]> event) {
577 for (IPacketOutEventHandler packetOutEventHandler : packetOutEventHandlers) {
578 packetOutEventHandler.packetOutNotification(event.getKey());
579 }
580 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800581
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700582 /**
583 * Receive a notification that an entry is removed.
584 *
585 * @param event the notification event for the entry.
586 */
587 @Override
588 public void entryRemoved(EntryEvent<PacketOutNotification, byte[]> event) {
589 // Not used
590 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800591
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700592 /**
593 * Receive a notification that an entry is updated.
594 *
595 * @param event the notification event for the entry.
596 */
597 @Override
598 public void entryUpdated(EntryEvent<PacketOutNotification, byte[]> event) {
599 // Not used
600 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800601
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700602 /**
603 * Receive a notification that an entry is evicted.
604 *
605 * @param event the notification event for the entry.
606 */
607 @Override
608 public void entryEvicted(EntryEvent<PacketOutNotification, byte[]> event) {
609 // Not used
610 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800611 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700612
613 /**
Jonathan Hart7804bea2014-01-07 10:50:52 -0800614 * Class for receiving notifications for sending packet-outs.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700615 * <p/>
Jonathan Hart7804bea2014-01-07 10:50:52 -0800616 * The datagrid map is:
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700617 * - Key: Packet-out to send (PacketOutNotification)
618 * - Value: dummy value (we only need the key) (byte[])
Jonathan Hart7804bea2014-01-07 10:50:52 -0800619 */
620 class ArpReplyMapListener implements EntryListener<ArpReplyNotification, byte[]> {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700621 /**
622 * Receive a notification that an entry is added.
623 *
624 * @param event the notification event for the entry.
625 */
626 @Override
627 public void entryAdded(EntryEvent<ArpReplyNotification, byte[]> event) {
628 triggerEventHandler(event.getKey());
629 }
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800630
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700631 @Override
632 public void entryUpdated(EntryEvent<ArpReplyNotification, byte[]> event) {
633 triggerEventHandler(event.getKey());
634 }
635
636 @Override
637 public void entryRemoved(EntryEvent<ArpReplyNotification, byte[]> event) {
638 // Not used for ARP replies
639 }
640
641 @Override
642 public void entryEvicted(EntryEvent<ArpReplyNotification, byte[]> event) {
643 // Not used for ARP replies
644 }
645
646 /**
647 * Handle an event.
648 * @param notification notification
649 */
650 private void triggerEventHandler(ArpReplyNotification notification) {
651 for (IArpReplyEventHandler arpReplyEventHandler : arpReplyEventHandlers) {
652 arpReplyEventHandler.arpReplyEvent(notification);
653 }
654 }
Jonathan Hart7804bea2014-01-07 10:50:52 -0800655 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700656
657 /**
658 * Initialize the Hazelcast Datagrid operation.
659 *
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700660 * @param configFilename the configuration filename.
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700661 */
662 public void init(String configFilename) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700663 /*
664 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
665 System.setProperty("hazelcast.socket.send.buffer.size", "32");
666 */
667 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800668
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700669 // Init from configuration file
670 try {
671 hazelcastConfig = new FileSystemXmlConfig(configFilename);
672 } catch (FileNotFoundException e) {
673 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
674 }
675 /*
676 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
677 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
678 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
679 */
680 //
681 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
682 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
683 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700684 }
685
686 /**
687 * Shutdown the Hazelcast Datagrid operation.
688 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800689 @Override
690 protected void finalize() {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700691 close();
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700692 }
693
694 /**
695 * Shutdown the Hazelcast Datagrid operation.
696 */
697 public void close() {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700698 Hazelcast.shutdownAll();
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700699 }
700
701 /**
702 * Get the collection of offered module services.
703 *
704 * @return the collection of offered module services.
705 */
706 @Override
707 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800708 Collection<Class<? extends IFloodlightService>> l =
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700709 new ArrayList<Class<? extends IFloodlightService>>();
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700710 l.add(IDatagridService.class);
711 return l;
712 }
713
714 /**
715 * Get the collection of implemented services.
716 *
717 * @return the collection of implemented services.
718 */
719 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800720 public Map<Class<? extends IFloodlightService>, IFloodlightService>
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700721 getServiceImpls() {
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700722 Map<Class<? extends IFloodlightService>,
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700723 IFloodlightService> m =
724 new HashMap<Class<? extends IFloodlightService>,
725 IFloodlightService>();
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700726 m.put(IDatagridService.class, this);
727 return m;
728 }
729
730 /**
731 * Get the collection of modules this module depends on.
732 *
733 * @return the collection of modules this module depends on.
734 */
735 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800736 public Collection<Class<? extends IFloodlightService>>
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700737 getModuleDependencies() {
738 Collection<Class<? extends IFloodlightService>> l =
739 new ArrayList<Class<? extends IFloodlightService>>();
740 l.add(IFloodlightProviderService.class);
741 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700742 return l;
743 }
744
745 /**
746 * Initialize the module.
747 *
748 * @param context the module context to use for the initialization.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700749 * @throws FloodlightModuleException on error
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700750 */
751 @Override
752 public void init(FloodlightModuleContext context)
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700753 throws FloodlightModuleException {
754 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700755
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700756 // Get the configuration file name and configure the Datagrid
757 Map<String, String> configMap = context.getConfigParams(this);
758 String configFilename = configMap.get(HAZELCAST_CONFIG_FILE);
759 this.init(configFilename);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700760 }
761
762 /**
763 * Startup module operation.
764 *
765 * @param context the module context to use for the startup.
766 */
767 @Override
768 public void startUp(FloodlightModuleContext context) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700769 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700770
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700771 restApi.addRestletRoutable(new DatagridWebRoutable());
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800772
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700773 packetOutMap = hazelcastInstance.getMap(PACKET_OUT_MAP_NAME);
774 packetOutMap.addEntryListener(new PacketOutMapListener(), true);
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800775
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700776 arpReplyMap = hazelcastInstance.getMap(ARP_REPLY_MAP_NAME);
777 arpReplyMap.addEntryListener(new ArpReplyMapListener(), true);
778 intentList = hazelcastInstance.getList(INTENT_LIST_NAME);
779
780 mapDevice = hazelcastInstance.getMap(MAP_DEVICE_NAME);
781 mapDevice.addEntryListener(new MapDeviceListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700782 }
783
784 /**
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800785 * Create an event channel.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700786 * <p/>
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800787 * If the channel already exists, just return it.
788 * NOTE: The channel is started automatically.
789 *
790 * @param channelName the event channel name.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700791 * @param <K> the type of the Key in the Key-Value store.
792 * @param <V> the type of the Value in the Key-Value store.
793 * @param typeK the type of the Key in the Key-Value store.
794 * @param typeV the type of the Value in the Key-Value store.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800795 * @return the event channel for the channel name.
796 */
797 @Override
798 public <K, V> IEventChannel<K, V> createChannel(String channelName,
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700799 Class<K> typeK, Class<V> typeV) {
800 IEventChannel<K, V> eventChannel =
801 createChannelImpl(channelName, typeK, typeV);
802 eventChannel.startup();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800803
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700804 return eventChannel;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800805 }
806
807 /**
808 * Create an event channel implementation.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700809 * <p/>
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800810 * If the channel already exists, just return it.
811 * NOTE: The caller must call IEventChannel.startup() to startup the
812 * channel operation.
813 *
814 * @param channelName the event channel name.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700815 * @param <K> the type of the Key in the Key-Value store.
816 * @param <V> the type of the Value in the Key-Value store.
817 * @param typeK the type of the Key in the Key-Value store.
818 * @param typeV the type of the Value in the Key-Value store.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800819 * @return the event channel for the channel name.
820 */
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -0800821 private synchronized <K, V> IEventChannel<K, V> createChannelImpl(
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700822 String channelName,
823 Class<K> typeK, Class<V> typeV) {
824 IEventChannel<K, V> castedEventChannel;
825 IEventChannel<?, ?> genericEventChannel =
826 eventChannels.get(channelName);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800827
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700828 // Add the channel if the first listener
829 if (genericEventChannel == null) {
830 castedEventChannel =
831 new HazelcastEventChannel<K, V>(hazelcastInstance,
832 channelName, typeK, typeV);
833 eventChannels.put(channelName, castedEventChannel);
834 } else {
835 //
836 // TODO: Find if we can use Java internal support to check for
837 // type mismatch.
838 //
839 if (!genericEventChannel.verifyKeyValueTypes(typeK, typeV)) {
840 throw new ClassCastException("Key-value type mismatch for event channel " + channelName);
841 }
842 castedEventChannel = (IEventChannel<K, V>) genericEventChannel;
843 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800844
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700845 return castedEventChannel;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800846 }
847
848 /**
849 * Add event channel listener.
850 *
851 * NOTE: The channel is started automatically right after the listener
852 * is added.
853 *
854 * @param channelName the event channel name.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700855 * @param listener the listener to add.
856 * @param <K> the type of the Key in the Key-Value store.
857 * @param <V> the type of the Value in the Key-Value store.
858 * @param typeK the type of the Key in the Key-Value store.
859 * @param typeV the type of the Value in the Key-Value store.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800860 * @return the event channel for the channel name.
861 */
862 @Override
863 public <K, V> IEventChannel<K, V> addListener(String channelName,
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700864 IEventChannelListener<K, V> listener,
865 Class<K> typeK, Class<V> typeV) {
866 IEventChannel<K, V> eventChannel =
867 createChannelImpl(channelName, typeK, typeV);
868 eventChannel.addListener(listener);
869 eventChannel.startup();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800870
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700871 return eventChannel;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800872 }
873
874 /**
875 * Remove event channel listener.
876 *
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700877 * @param <K> the type of the Key in the Key-Value store.
878 * @param <V> the type of the Value in the Key-Value store.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800879 * @param channelName the event channel name.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700880 * @param listener the listener to remove.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800881 */
882 @Override
883 public <K, V> void removeListener(String channelName,
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700884 IEventChannelListener<K, V> listener) {
885 IEventChannel<K, V> castedEventChannel;
886 IEventChannel<?, ?> genericEventChannel =
887 eventChannels.get(channelName);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800888
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700889 if (genericEventChannel != null) {
890 //
891 // TODO: Find if we can use Java internal support to check for
892 // type mismatch.
893 // NOTE: Using "ClassCastException" exception below doesn't work.
894 //
895 castedEventChannel = (IEventChannel<K, V>) genericEventChannel;
896 castedEventChannel.removeListener(listener);
897 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800898 }
899
900 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700901 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700902 * notifications.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700903 * <p/>
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700904 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700905 *
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700906 * @param flowEventHandlerServiceToRegister the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700907 */
908 @Override
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700909 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerServiceToRegister) {
910 this.flowEventHandlerService = flowEventHandlerServiceToRegister;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700911
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700912 // Initialize the Flow-related map state
913 mapFlowListener = new MapFlowListener();
914 mapFlow = hazelcastInstance.getMap(MAP_FLOW_NAME);
915 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700916
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700917 // Initialize the FlowEntry-related map state
918 mapFlowEntryListener = new MapFlowEntryListener();
919 mapFlowEntry = hazelcastInstance.getMap(MAP_FLOW_ENTRY_NAME);
920 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700921
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700922 // Initialize the FlowId-related map state
923 mapFlowIdListener = new MapFlowIdListener();
924 mapFlowId = hazelcastInstance.getMap(MAP_FLOW_ID_NAME);
925 mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800926
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700927 // Initialize the FlowEntryId-related map state
928 mapFlowEntryIdListener = new MapFlowEntryIdListener();
929 mapFlowEntryId = hazelcastInstance.getMap(MAP_FLOW_ENTRY_ID_NAME);
930 mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800931
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700932 // Initialize the Topology-related map state
933 mapTopologyListener = new MapTopologyListener();
934 mapTopology = hazelcastInstance.getMap(MAP_TOPOLOGY_NAME);
935 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700936 }
937
938 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700939 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700940 * notifications.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700941 * <p/>
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700942 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700943 *
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700944 * @param flowEventHandlerServiceToDeregister the Flow Event Handler Service to
945 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700946 */
947 @Override
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700948 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerServiceToDeregister) {
949 // Clear the Flow-related map state
950 mapFlow.removeEntryListener(mapFlowListenerId);
951 mapFlow = null;
952 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700953
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700954 // Clear the FlowEntry-related map state
955 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
956 mapFlowEntry = null;
957 mapFlowEntryListener = null;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700958
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700959 // Clear the FlowId-related map state
960 mapFlowId.removeEntryListener(mapFlowIdListenerId);
961 mapFlowId = null;
962 mapFlowIdListener = null;
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800963
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700964 // Clear the FlowEntryId-related map state
965 mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
966 mapFlowEntryId = null;
967 mapFlowEntryIdListener = null;
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -0800968
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700969 // Clear the Topology-related map state
970 mapTopology.removeEntryListener(mapTopologyListenerId);
971 mapTopology = null;
972 mapTopologyListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700973
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700974 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700975 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800976
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800977 @Override
Jonathan Hartc6325622014-01-14 16:37:50 -0800978 public void registerPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700979 if (packetOutEventHandler != null) {
980 packetOutEventHandlers.add(packetOutEventHandler);
981 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800982 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800983
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800984 @Override
Jonathan Hartc6325622014-01-14 16:37:50 -0800985 public void deregisterPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700986 packetOutEventHandlers.remove(packetOutEventHandler);
Jonathan Hart7804bea2014-01-07 10:50:52 -0800987 }
Yuta HIGUCHI2d5ac522014-01-22 10:21:41 -0800988
Jonathan Hart7804bea2014-01-07 10:50:52 -0800989 @Override
990 public void registerArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700991 if (arpReplyEventHandler != null) {
992 arpReplyEventHandlers.add(arpReplyEventHandler);
993 }
Jonathan Hart7804bea2014-01-07 10:50:52 -0800994 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800995
Jonathan Hart7804bea2014-01-07 10:50:52 -0800996 @Override
997 public void deregisterArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -0700998 arpReplyEventHandlers.remove(arpReplyEventHandler);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800999 }
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001000
TeruU80ce5062014-03-03 17:16:13 -08001001 @Override
1002 public void registerMapDeviceEventHandler(IDeviceEventHandler deviceEventHandler) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001003 if (deviceEventHandler != null) {
1004 deviceEventHandlers.add(deviceEventHandler);
1005 }
TeruU80ce5062014-03-03 17:16:13 -08001006 }
1007
1008 @Override
1009 public void deregisterMapDeviceEventHandler(IDeviceEventHandler deviceEventHandler) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001010 deviceEventHandlers.remove(deviceEventHandler);
TeruU80ce5062014-03-03 17:16:13 -08001011 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -08001012
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07001013 /**
1014 * Get all Flows that are currently in the datagrid.
1015 *
1016 * @return all Flows that are currently in the datagrid.
1017 */
1018 @Override
1019 public Collection<FlowPath> getAllFlows() {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001020 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07001021
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001022 //
1023 // Get all current entries
1024 //
1025 Collection<byte[]> values = mapFlow.values();
1026 Kryo kryo = kryoFactory.newKryo();
1027 for (byte[] valueBytes : values) {
1028 //
1029 // Decode the value
1030 //
1031 Input input = new Input(valueBytes);
1032 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
1033 allFlows.add(flowPath);
1034 }
1035 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07001036
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001037 return allFlows;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07001038 }
1039
1040 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -08001041 * Get a Flow for a given Flow ID.
1042 *
1043 * @param flowId the Flow ID of the Flow to get.
1044 * @return the Flow if found, otherwise null.
1045 */
1046 @Override
1047 public FlowPath getFlow(FlowId flowId) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001048 byte[] valueBytes = mapFlow.get(flowId.value());
1049 if (valueBytes == null) {
1050 return null;
1051 }
Pavlin Radoslavov379c9042013-11-26 15:40:49 -08001052
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001053 Kryo kryo = kryoFactory.newKryo();
1054 //
1055 // Decode the value
1056 //
1057 Input input = new Input(valueBytes);
1058 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
1059 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov379c9042013-11-26 15:40:49 -08001060
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001061 return flowPath;
Pavlin Radoslavov379c9042013-11-26 15:40:49 -08001062 }
1063
1064 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07001065 * Send a notification that a Flow is added.
1066 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001067 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07001068 */
1069 @Override
1070 public void notificationSendFlowAdded(FlowPath flowPath) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001071 //
1072 // Encode the value
1073 //
1074 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1075 Kryo kryo = kryoFactory.newKryo();
1076 Output output = new Output(buffer, -1);
1077 kryo.writeObject(output, flowPath);
1078 byte[] valueBytes = output.toBytes();
1079 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07001080
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001081 //
1082 // Put the entry:
1083 // - Key : Flow ID (Long)
1084 // - Value : Serialized Flow (byte[])
1085 //
1086 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07001087 }
1088
1089 /**
1090 * Send a notification that a Flow is removed.
1091 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001092 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07001093 */
1094 @Override
1095 public void notificationSendFlowRemoved(FlowId flowId) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001096 //
1097 // Remove the entry:
1098 // - Key : Flow ID (Long)
1099 // - Value : Serialized Flow (byte[])
1100 //
1101 mapFlow.removeAsync(flowId.value());
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07001102 }
1103
1104 /**
1105 * Send a notification that a Flow is updated.
1106 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001107 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07001108 */
1109 @Override
1110 public void notificationSendFlowUpdated(FlowPath flowPath) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001111 // NOTE: Adding an entry with an existing key automatically updates it
1112 notificationSendFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07001113 }
1114
1115 /**
1116 * Send a notification that all Flows are removed.
1117 */
1118 @Override
1119 public void notificationSendAllFlowsRemoved() {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001120 //
1121 // Remove all entries
1122 // NOTE: We remove the entries one-by-one so the per-entry
1123 // notifications will be delivered.
1124 //
1125 // mapFlow.clear();
1126 Set<Long> keySet = mapFlow.keySet();
1127 for (Long key : keySet) {
1128 mapFlow.removeAsync(key);
1129 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001130 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001131
1132 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001133 * Get all Flow Entries that are currently in the datagrid.
1134 *
1135 * @return all Flow Entries that are currently in the datagrid.
1136 */
1137 @Override
1138 public Collection<FlowEntry> getAllFlowEntries() {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001139 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001140
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001141 //
1142 // Get all current entries
1143 //
1144 Collection<byte[]> values = mapFlowEntry.values();
1145 Kryo kryo = kryoFactory.newKryo();
1146 for (byte[] valueBytes : values) {
1147 //
1148 // Decode the value
1149 //
1150 Input input = new Input(valueBytes);
1151 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
1152 allFlowEntries.add(flowEntry);
1153 }
1154 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001155
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001156 return allFlowEntries;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001157 }
1158
1159 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -08001160 * Get a Flow Entry for a given Flow Entry ID.
1161 *
1162 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
1163 * @return the Flow Entry if found, otherwise null.
1164 */
1165 @Override
1166 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001167 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
1168 if (valueBytes == null) {
1169 return null;
1170 }
Pavlin Radoslavov379c9042013-11-26 15:40:49 -08001171
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001172 Kryo kryo = kryoFactory.newKryo();
1173 //
1174 // Decode the value
1175 //
1176 Input input = new Input(valueBytes);
1177 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
1178 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov379c9042013-11-26 15:40:49 -08001179
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001180 return flowEntry;
Pavlin Radoslavov379c9042013-11-26 15:40:49 -08001181 }
1182
1183 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001184 * Send a notification that a FlowEntry is added.
1185 *
1186 * @param flowEntry the FlowEntry that is added.
1187 */
1188 @Override
1189 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001190 //
1191 // Encode the value
1192 //
1193 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1194 Kryo kryo = kryoFactory.newKryo();
1195 Output output = new Output(buffer, -1);
1196 kryo.writeObject(output, flowEntry);
1197 byte[] valueBytes = output.toBytes();
1198 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001199
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001200 //
1201 // Put the entry:
1202 // - Key : FlowEntry ID (Long)
1203 // - Value : Serialized FlowEntry (byte[])
1204 //
1205 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001206 }
1207
1208 /**
1209 * Send a notification that a FlowEntry is removed.
1210 *
1211 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
1212 */
1213 @Override
1214 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001215 //
1216 // Remove the entry:
1217 // - Key : FlowEntry ID (Long)
1218 // - Value : Serialized FlowEntry (byte[])
1219 //
1220 mapFlowEntry.removeAsync(flowEntryId.value());
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001221 }
1222
1223 /**
1224 * Send a notification that a FlowEntry is updated.
1225 *
1226 * @param flowEntry the FlowEntry that is updated.
1227 */
1228 @Override
1229 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001230 // NOTE: Adding an entry with an existing key automatically updates it
1231 notificationSendFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001232 }
1233
1234 /**
1235 * Send a notification that all Flow Entries are removed.
1236 */
1237 @Override
1238 public void notificationSendAllFlowEntriesRemoved() {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001239 //
1240 // Remove all entries
1241 // NOTE: We remove the entries one-by-one so the per-entry
1242 // notifications will be delivered.
1243 //
1244 // mapFlowEntry.clear();
1245 Set<Long> keySet = mapFlowEntry.keySet();
1246 for (Long key : keySet) {
1247 mapFlowEntry.removeAsync(key);
1248 }
Pavlin Radoslavovb7506842013-10-29 17:46:54 -07001249 }
1250
1251 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001252 * Get all Flow IDs that are currently in the datagrid.
1253 *
1254 * @return all Flow IDs that are currently in the datagrid.
1255 */
1256 @Override
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001257 public Collection<Pair<FlowId, Dpid>> getAllFlowIds() {
1258 Collection<Pair<FlowId, Dpid>> allFlowIds =
1259 new LinkedList<Pair<FlowId, Dpid>>();
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001260
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001261 //
1262 // Get all current entries
1263 //
1264 Kryo kryo = kryoFactory.newKryo();
1265 for (Map.Entry<Long, byte[]> entry : mapFlowId.entrySet()) {
1266 Long key = entry.getKey();
1267 byte[] valueBytes = entry.getValue();
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001268
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001269 FlowId flowId = new FlowId(key);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001270
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001271 //
1272 // Decode the value
1273 //
1274 Input input = new Input(valueBytes);
1275 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001276
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001277 Pair<FlowId, Dpid> pair = new Pair(flowId, dpid);
1278 allFlowIds.add(pair);
1279 }
1280 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001281
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001282 return allFlowIds;
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001283 }
1284
1285 /**
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001286 * Get all Flow Entry IDs that are currently in the datagrid.
1287 *
1288 * @return all Flow Entry IDs that ae currently in the datagrid.
1289 */
1290 @Override
1291 public Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds() {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001292 Collection<Pair<FlowEntryId, Dpid>> allFlowEntryIds =
1293 new LinkedList<Pair<FlowEntryId, Dpid>>();
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001294
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001295 //
1296 // Get all current entries
1297 //
1298 Kryo kryo = kryoFactory.newKryo();
1299 for (Map.Entry<Long, byte[]> entry : mapFlowEntryId.entrySet()) {
1300 Long key = entry.getKey();
1301 byte[] valueBytes = entry.getValue();
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001302
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001303 FlowEntryId flowEntryId = new FlowEntryId(key);
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001304
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001305 //
1306 // Decode the value
1307 //
1308 Input input = new Input(valueBytes);
1309 Dpid dpid = kryo.readObject(input, Dpid.class);
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001310
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001311 Pair<FlowEntryId, Dpid> pair = new Pair(flowEntryId, dpid);
1312 allFlowEntryIds.add(pair);
1313 }
1314 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001315
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001316 return allFlowEntryIds;
Pavlin Radoslavova9c0c3b2014-01-09 10:54:45 -08001317 }
1318
1319 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001320 * Send a notification that a FlowId is added.
1321 *
1322 * @param flowId the FlowId that is added.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001323 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001324 */
1325 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001326 public void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001327 //
1328 // Encode the value
1329 //
1330 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1331 Kryo kryo = kryoFactory.newKryo();
1332 Output output = new Output(buffer, -1);
1333 kryo.writeObject(output, dpid);
1334 byte[] valueBytes = output.toBytes();
1335 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001336
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001337 //
1338 // Put the entry:
1339 // - Key : FlowId (Long)
1340 // - Value : Serialized Switch Dpid (byte[])
1341 //
1342 mapFlowId.putAsync(flowId.value(), valueBytes);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001343 }
1344
1345 /**
1346 * Send a notification that a FlowId is removed.
1347 *
1348 * @param flowId the FlowId that is removed.
1349 */
1350 @Override
1351 public void notificationSendFlowIdRemoved(FlowId flowId) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001352 //
1353 // Remove the entry:
1354 // - Key : FlowId (Long)
1355 // - Value : Serialized Switch Dpid (byte[])
1356 //
1357 mapFlowId.removeAsync(flowId.value());
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001358 }
1359
1360 /**
1361 * Send a notification that a FlowId is updated.
1362 *
1363 * @param flowId the FlowId that is updated.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001364 * @param dpid the Source Switch Dpid.
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001365 */
1366 @Override
Pavlin Radoslavov2194d112014-01-10 13:36:00 -08001367 public void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001368 // NOTE: Adding an entry with an existing key automatically updates it
1369 notificationSendFlowIdAdded(flowId, dpid);
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001370 }
1371
1372 /**
1373 * Send a notification that all Flow IDs are removed.
1374 */
1375 @Override
1376 public void notificationSendAllFlowIdsRemoved() {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001377 //
1378 // Remove all entries
1379 // NOTE: We remove the entries one-by-one so the per-entry
1380 // notifications will be delivered.
1381 //
1382 // mapFlowId.clear();
1383 Set<Long> keySet = mapFlowId.keySet();
1384 for (Long key : keySet) {
1385 mapFlowId.removeAsync(key);
1386 }
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -08001387 }
1388
1389 /**
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001390 * Send a notification that a FlowEntryId is added.
1391 *
1392 * @param flowEntryId the FlowEntryId that is added.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001393 * @param dpid the Switch Dpid.
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001394 */
1395 @Override
1396 public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001397 Dpid dpid) {
1398 //
1399 // Encode the value
1400 //
1401 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1402 Kryo kryo = kryoFactory.newKryo();
1403 Output output = new Output(buffer, -1);
1404 kryo.writeObject(output, dpid);
1405 byte[] valueBytes = output.toBytes();
1406 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001407
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001408 //
1409 // Put the entry:
1410 // - Key : FlowEntryId (Long)
1411 // - Value : Serialized Switch Dpid (byte[])
1412 //
1413 mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001414 }
1415
1416 /**
1417 * Send a notification that a FlowEntryId is removed.
1418 *
1419 * @param flowEntryId the FlowEntryId that is removed.
1420 */
1421 @Override
1422 public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001423 //
1424 // Remove the entry:
1425 // - Key : FlowEntryId (Long)
1426 // - Value : Serialized Switch Dpid (byte[])
1427 //
1428 mapFlowEntryId.removeAsync(flowEntryId.value());
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001429 }
1430
1431 /**
1432 * Send a notification that a FlowEntryId is updated.
1433 *
1434 * @param flowEntryId the FlowEntryId that is updated.
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001435 * @param dpid the Switch Dpid.
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001436 */
1437 @Override
1438 public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001439 Dpid dpid) {
1440 // NOTE: Adding an entry with an existing key automatically updates it
1441 notificationSendFlowEntryIdAdded(flowEntryId, dpid);
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001442 }
1443
1444 /**
1445 * Send a notification that all Flow Entry IDs are removed.
1446 */
1447 @Override
1448 public void notificationSendAllFlowEntryIdsRemoved() {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001449 //
1450 // Remove all entries
1451 // NOTE: We remove the entries one-by-one so the per-entry
1452 // notifications will be delivered.
1453 //
1454 // mapFlowEntryId.clear();
1455 Set<Long> keySet = mapFlowEntryId.keySet();
1456 for (Long key : keySet) {
1457 mapFlowEntryId.removeAsync(key);
1458 }
Pavlin Radoslavov909da3c2014-01-09 04:04:33 -08001459 }
1460
1461 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001462 * Get all Topology Elements that are currently in the datagrid.
1463 *
1464 * @return all Topology Elements that are currently in the datagrid.
1465 */
1466 @Override
1467 public Collection<TopologyElement> getAllTopologyElements() {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001468 Collection<TopologyElement> allTopologyElements =
1469 new LinkedList<TopologyElement>();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001470
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001471 //
1472 // Get all current entries
1473 //
1474 Collection<byte[]> values = mapTopology.values();
1475 Kryo kryo = kryoFactory.newKryo();
1476 for (byte[] valueBytes : values) {
1477 //
1478 // Decode the value
1479 //
1480 Input input = new Input(valueBytes);
1481 TopologyElement topologyElement =
1482 kryo.readObject(input, TopologyElement.class);
1483 allTopologyElements.add(topologyElement);
1484 }
1485 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001486
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001487 return allTopologyElements;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001488 }
1489
1490 /**
1491 * Send a notification that a Topology Element is added.
1492 *
1493 * @param topologyElement the Topology Element that is added.
1494 */
1495 @Override
1496 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001497 //
1498 // Encode the value
1499 //
1500 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1501 Kryo kryo = kryoFactory.newKryo();
1502 Output output = new Output(buffer, -1);
1503 kryo.writeObject(output, topologyElement);
1504 byte[] valueBytes = output.toBytes();
1505 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001506
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001507 //
1508 // Put the entry:
1509 // - Key : TopologyElement ID (String)
1510 // - Value : Serialized TopologyElement (byte[])
1511 //
1512 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
1513
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001514 }
1515
1516 /**
1517 * Send a notification that a Topology Element is removed.
1518 *
1519 * @param topologyElement the Topology Element that is removed.
1520 */
1521 @Override
1522 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001523 //
1524 // Remove the entry:
1525 // - Key : TopologyElement ID (String)
1526 // - Value : Serialized TopologyElement (byte[])
1527 //
1528 mapTopology.removeAsync(topologyElement.elementId());
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001529 }
1530
1531 /**
1532 * Send a notification that a Topology Element is updated.
1533 *
1534 * @param topologyElement the Topology Element that is updated.
1535 */
1536 @Override
1537 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001538 // NOTE: Adding an entry with an existing key automatically updates it
1539 notificationSendTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001540 }
1541
1542 /**
1543 * Send a notification that all Topology Elements are removed.
1544 */
1545 @Override
1546 public void notificationSendAllTopologyElementsRemoved() {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001547 //
1548 // Remove all entries
1549 // NOTE: We remove the entries one-by-one so the per-entry
1550 // notifications will be delivered.
1551 //
1552 // mapTopology.clear();
1553 Set<String> keySet = mapTopology.keySet();
1554 for (String key : keySet) {
1555 mapTopology.removeAsync(key);
1556 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -07001557 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -08001558
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001559 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -08001560 public void sendPacketOutNotification(PacketOutNotification packetOutNotification) {
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001561 packetOutMap.putAsync(packetOutNotification, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001562 }
Jonathan Hart7804bea2014-01-07 10:50:52 -08001563
Ray Milkey0ab2d8a2014-03-20 14:30:10 -07001564 @Override
1565 public void sendArpReplyNotification(ArpReplyNotification arpReply) {
1566 arpReplyMap.putAsync(arpReply, dummyByte, 1L, TimeUnit.MILLISECONDS);
1567 }
1568
1569 @Override
1570 public void sendNotificationDeviceAdded(Long mac, OnosDevice dev) {
1571 log.debug("DeviceAdded in datagrid. mac {}", dev.getMacAddress());
1572 mapDevice.putAsync(mac, dev);
1573 }
1574
1575 @Override
1576 public void sendNotificationDeviceDeleted(OnosDevice dev) {
1577 long mac = dev.getMacAddress().toLong();
1578 if (mapDevice.containsKey(mac)) {
1579 log.debug("DeviceDeleted in datagrid. mac {}", dev.getMacAddress());
1580 mapDevice.removeAsync(mac);
1581 }
1582 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001583}