blob: effbe815c0c32d4187f512538b1d5cfdd9ebe3ea [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hartd3003252013-11-15 09:44:46 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080022import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070023import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070024import net.onrc.onos.ofcontroller.util.FlowEntry;
25import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070026import net.onrc.onos.ofcontroller.util.FlowId;
27import net.onrc.onos.ofcontroller.util.FlowPath;
28import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
29
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070030import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
Jonathan Hart18ad55c2013-11-11 22:49:55 -080033import com.esotericsoftware.kryo2.Kryo;
34import com.esotericsoftware.kryo2.io.Input;
35import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070036import com.hazelcast.config.Config;
37import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070038import com.hazelcast.core.EntryEvent;
39import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070040import com.hazelcast.core.Hazelcast;
41import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070042import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070043import com.hazelcast.instance.GroupProperties;
44
45/**
46 * A datagrid service that uses Hazelcast as a datagrid.
47 * The relevant data is stored in the Hazelcast datagrid and shared as
48 * appropriate in a multi-node cluster.
49 */
50public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070051 private final static int MAX_BUFFER_SIZE = 64*1024;
52
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070053 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070054 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070055 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070056
57 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070058 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070059 private Config hazelcastConfig = null;
60
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070061 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070062 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070063
64 // State related to the Flow map
65 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070066 private IMap<Long, byte[]> mapFlow = null;
67 private MapFlowListener mapFlowListener = null;
68 private String mapFlowListenerId = null;
69
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070070 // State related to the Flow Entry map
71 protected static final String mapFlowEntryName = "mapFlowEntry";
72 private IMap<Long, byte[]> mapFlowEntry = null;
73 private MapFlowEntryListener mapFlowEntryListener = null;
74 private String mapFlowEntryListenerId = null;
75
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -080076 // State related to the Flow ID map
77 protected static final String mapFlowIdName = "mapFlowId";
78 private IMap<Long, byte[]> mapFlowId = null;
79 private MapFlowIdListener mapFlowIdListener = null;
80 private String mapFlowIdListenerId = null;
81
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070082 // State related to the Network Topology map
83 protected static final String mapTopologyName = "mapTopology";
84 private IMap<String, byte[]> mapTopology = null;
85 private MapTopologyListener mapTopologyListener = null;
86 private String mapTopologyListenerId = null;
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -080087
Jonathan Hart18ad55c2013-11-11 22:49:55 -080088 // State related to the ARP map
89 protected static final String arpMapName = "arpMap";
Jonathan Hartd3003252013-11-15 09:44:46 -080090 private IMap<ArpMessage, byte[]> arpMap = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080091 private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
92 private final byte[] dummyByte = {0};
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070093
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070094 /**
95 * Class for receiving notifications for Flow state.
96 *
97 * The datagrid map is:
98 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -080099 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700100 */
101 class MapFlowListener implements EntryListener<Long, byte[]> {
102 /**
103 * Receive a notification that an entry is added.
104 *
105 * @param event the notification event for the entry.
106 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800107 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800108 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800109 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700110
111 //
112 // Decode the value and deliver the notification
113 //
114 Kryo kryo = kryoFactory.newKryo();
115 Input input = new Input(valueBytes);
116 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
117 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700118 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700119 }
120
121 /**
122 * Receive a notification that an entry is removed.
123 *
124 * @param event the notification event for the entry.
125 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800126 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800127 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800128 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700129
130 //
131 // Decode the value and deliver the notification
132 //
133 Kryo kryo = kryoFactory.newKryo();
134 Input input = new Input(valueBytes);
135 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
136 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700137 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700138 }
139
140 /**
141 * Receive a notification that an entry is updated.
142 *
143 * @param event the notification event for the entry.
144 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800145 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800146 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800147 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700148
149 //
150 // Decode the value and deliver the notification
151 //
152 Kryo kryo = kryoFactory.newKryo();
153 Input input = new Input(valueBytes);
154 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
155 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700156 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700157 }
158
159 /**
160 * Receive a notification that an entry is evicted.
161 *
162 * @param event the notification event for the entry.
163 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800164 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800165 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700166 // NOTE: We don't use eviction for this map
167 }
168 }
169
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700170 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700171 * Class for receiving notifications for FlowEntry state.
172 *
173 * The datagrid map is:
174 * - Key : FlowEntry ID (Long)
175 * - Value : Serialized FlowEntry (byte[])
176 */
177 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
178 /**
179 * Receive a notification that an entry is added.
180 *
181 * @param event the notification event for the entry.
182 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800183 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800184 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800185 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700186
187 //
188 // Decode the value and deliver the notification
189 //
190 Kryo kryo = kryoFactory.newKryo();
191 Input input = new Input(valueBytes);
192 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
193 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700194 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700195 }
196
197 /**
198 * Receive a notification that an entry is removed.
199 *
200 * @param event the notification event for the entry.
201 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800202 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800203 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800204 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700205
206 //
207 // Decode the value and deliver the notification
208 //
209 Kryo kryo = kryoFactory.newKryo();
210 Input input = new Input(valueBytes);
211 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
212 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700213 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700214 }
215
216 /**
217 * Receive a notification that an entry is updated.
218 *
219 * @param event the notification event for the entry.
220 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800221 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800222 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800223 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700224
225 //
226 // Decode the value and deliver the notification
227 //
228 Kryo kryo = kryoFactory.newKryo();
229 Input input = new Input(valueBytes);
230 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
231 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700232 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700233 }
234
235 /**
236 * Receive a notification that an entry is evicted.
237 *
238 * @param event the notification event for the entry.
239 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800240 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800241 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700242 // NOTE: We don't use eviction for this map
243 }
244 }
245
246 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800247 * Class for receiving notifications for FlowId state.
248 *
249 * The datagrid map is:
250 * - Key : FlowId (Long)
251 * - Value : Serialized FlowId (byte[])
252 */
253 class MapFlowIdListener implements EntryListener<Long, byte[]> {
254 /**
255 * Receive a notification that an entry is added.
256 *
257 * @param event the notification event for the entry.
258 */
259 public void entryAdded(EntryEvent<Long, byte[]> event) {
260 byte[] valueBytes = (byte[])event.getValue();
261
262 //
263 // Decode the value and deliver the notification
264 //
265 Kryo kryo = kryoFactory.newKryo();
266 Input input = new Input(valueBytes);
267 FlowId flowId = kryo.readObject(input, FlowId.class);
268 kryoFactory.deleteKryo(kryo);
269 flowEventHandlerService.notificationRecvFlowIdAdded(flowId);
270 }
271
272 /**
273 * Receive a notification that an entry is removed.
274 *
275 * @param event the notification event for the entry.
276 */
277 public void entryRemoved(EntryEvent<Long, byte[]> event) {
278 byte[] valueBytes = (byte[])event.getValue();
279
280 //
281 // Decode the value and deliver the notification
282 //
283 Kryo kryo = kryoFactory.newKryo();
284 Input input = new Input(valueBytes);
285 FlowId flowId = kryo.readObject(input, FlowId.class);
286 kryoFactory.deleteKryo(kryo);
287 flowEventHandlerService.notificationRecvFlowIdRemoved(flowId);
288 }
289
290 /**
291 * Receive a notification that an entry is updated.
292 *
293 * @param event the notification event for the entry.
294 */
295 public void entryUpdated(EntryEvent<Long, byte[]> event) {
296 byte[] valueBytes = (byte[])event.getValue();
297
298 //
299 // Decode the value and deliver the notification
300 //
301 Kryo kryo = kryoFactory.newKryo();
302 Input input = new Input(valueBytes);
303 FlowId flowId = kryo.readObject(input, FlowId.class);
304 kryoFactory.deleteKryo(kryo);
305 flowEventHandlerService.notificationRecvFlowIdUpdated(flowId);
306 }
307
308 /**
309 * Receive a notification that an entry is evicted.
310 *
311 * @param event the notification event for the entry.
312 */
313 public void entryEvicted(EntryEvent<Long, byte[]> event) {
314 // NOTE: We don't use eviction for this map
315 }
316 }
317
318 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700319 * Class for receiving notifications for Network Topology state.
320 *
321 * The datagrid map is:
322 * - Key: TopologyElement ID (String)
323 * - Value: Serialized TopologyElement (byte[])
324 */
325 class MapTopologyListener implements EntryListener<String, byte[]> {
326 /**
327 * Receive a notification that an entry is added.
328 *
329 * @param event the notification event for the entry.
330 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800331 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800332 public void entryAdded(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800333 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700334
335 //
336 // Decode the value and deliver the notification
337 //
338 Kryo kryo = kryoFactory.newKryo();
339 Input input = new Input(valueBytes);
340 TopologyElement topologyElement =
341 kryo.readObject(input, TopologyElement.class);
342 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700343 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700344 }
345
346 /**
347 * Receive a notification that an entry is removed.
348 *
349 * @param event the notification event for the entry.
350 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800351 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800352 public void entryRemoved(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800353 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700354
355 //
356 // Decode the value and deliver the notification
357 //
358 Kryo kryo = kryoFactory.newKryo();
359 Input input = new Input(valueBytes);
360 TopologyElement topologyElement =
361 kryo.readObject(input, TopologyElement.class);
362 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700363 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700364 }
365
366 /**
367 * Receive a notification that an entry is updated.
368 *
369 * @param event the notification event for the entry.
370 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800371 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800372 public void entryUpdated(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800373 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700374
375 //
376 // Decode the value and deliver the notification
377 //
378 Kryo kryo = kryoFactory.newKryo();
379 Input input = new Input(valueBytes);
380 TopologyElement topologyElement =
381 kryo.readObject(input, TopologyElement.class);
382 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700383 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700384 }
385
386 /**
387 * Receive a notification that an entry is evicted.
388 *
389 * @param event the notification event for the entry.
390 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800391 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800392 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700393 // NOTE: We don't use eviction for this map
394 }
395 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800396
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800397 /**
398 * Class for receiving notifications for ARP requests.
399 *
400 * The datagrid map is:
401 * - Key: Request ID (String)
402 * - Value: ARP request packet (byte[])
403 */
Jonathan Hartd3003252013-11-15 09:44:46 -0800404 class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800405 /**
406 * Receive a notification that an entry is added.
407 *
408 * @param event the notification event for the entry.
409 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800410 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800411 public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800412 for (IArpEventHandler arpEventHandler : arpEventHandlers) {
413 arpEventHandler.arpRequestNotification(event.getKey());
414 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800415
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800416 //
417 // Decode the value and deliver the notification
418 //
419 /*
420 Kryo kryo = kryoFactory.newKryo();
421 Input input = new Input(valueBytes);
422 TopologyElement topologyElement =
423 kryo.readObject(input, TopologyElement.class);
424 kryoFactory.deleteKryo(kryo);
425 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
426 */
427 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800428
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800429 /**
430 * Receive a notification that an entry is removed.
431 *
432 * @param event the notification event for the entry.
433 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800434 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800435 public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800436 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800437 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800438
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800439 /**
440 * Receive a notification that an entry is updated.
441 *
442 * @param event the notification event for the entry.
443 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800444 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800445 public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800446 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800447 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800448
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800449 /**
450 * Receive a notification that an entry is evicted.
451 *
452 * @param event the notification event for the entry.
453 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800454 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -0800455 public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800456 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800457 }
458 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700459
460 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700461 * Initialize the Hazelcast Datagrid operation.
462 *
463 * @param conf the configuration filename.
464 */
465 public void init(String configFilename) {
466 /*
467 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
468 System.setProperty("hazelcast.socket.send.buffer.size", "32");
469 */
470 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800471
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700472 // Init from configuration file
473 try {
474 hazelcastConfig = new FileSystemXmlConfig(configFilename);
475 } catch (FileNotFoundException e) {
476 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
477 }
478 /*
479 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
480 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
481 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
482 */
483 //
484 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
485 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
486 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
487 }
488
489 /**
490 * Shutdown the Hazelcast Datagrid operation.
491 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800492 @Override
493 protected void finalize() {
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700494 close();
495 }
496
497 /**
498 * Shutdown the Hazelcast Datagrid operation.
499 */
500 public void close() {
501 Hazelcast.shutdownAll();
502 }
503
504 /**
505 * Get the collection of offered module services.
506 *
507 * @return the collection of offered module services.
508 */
509 @Override
510 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800511 Collection<Class<? extends IFloodlightService>> l =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700512 new ArrayList<Class<? extends IFloodlightService>>();
513 l.add(IDatagridService.class);
514 return l;
515 }
516
517 /**
518 * Get the collection of implemented services.
519 *
520 * @return the collection of implemented services.
521 */
522 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800523 public Map<Class<? extends IFloodlightService>, IFloodlightService>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700524 getServiceImpls() {
525 Map<Class<? extends IFloodlightService>,
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800526 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700527 new HashMap<Class<? extends IFloodlightService>,
528 IFloodlightService>();
529 m.put(IDatagridService.class, this);
530 return m;
531 }
532
533 /**
534 * Get the collection of modules this module depends on.
535 *
536 * @return the collection of modules this module depends on.
537 */
538 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800539 public Collection<Class<? extends IFloodlightService>>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700540 getModuleDependencies() {
541 Collection<Class<? extends IFloodlightService>> l =
542 new ArrayList<Class<? extends IFloodlightService>>();
543 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700544 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700545 return l;
546 }
547
548 /**
549 * Initialize the module.
550 *
551 * @param context the module context to use for the initialization.
552 */
553 @Override
554 public void init(FloodlightModuleContext context)
555 throws FloodlightModuleException {
556 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700557 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700558
559 // Get the configuration file name and configure the Datagrid
560 Map<String, String> configMap = context.getConfigParams(this);
561 String configFilename = configMap.get(HazelcastConfigFile);
562 this.init(configFilename);
563 }
564
565 /**
566 * Startup module operation.
567 *
568 * @param context the module context to use for the startup.
569 */
570 @Override
571 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700572 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700573
574 restApi.addRestletRoutable(new DatagridWebRoutable());
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800575
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800576 arpMap = hazelcastInstance.getMap(arpMapName);
577 arpMap.addEntryListener(new ArpMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700578 }
579
580 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700581 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700582 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700583 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700584 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700585 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700586 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700587 */
588 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700589 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
590 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700591
592 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700593 mapFlowListener = new MapFlowListener();
594 mapFlow = hazelcastInstance.getMap(mapFlowName);
595 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700596
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700597 // Initialize the FlowEntry-related map state
598 mapFlowEntryListener = new MapFlowEntryListener();
599 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
600 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
601
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800602 // Initialize the FlowId-related map state
603 mapFlowIdListener = new MapFlowIdListener();
604 mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
605 mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
606
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700607 // Initialize the Topology-related map state
608 mapTopologyListener = new MapTopologyListener();
609 mapTopology = hazelcastInstance.getMap(mapTopologyName);
610 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700611 }
612
613 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700614 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700615 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700616 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700617 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700618 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700619 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700620 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700621 */
622 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700623 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700624 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700625 mapFlow.removeEntryListener(mapFlowListenerId);
626 mapFlow = null;
627 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700628
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700629 // Clear the FlowEntry-related map state
630 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
631 mapFlowEntry = null;
632 mapFlowEntryListener = null;
633
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800634 // Clear the FlowId-related map state
635 mapFlowId.removeEntryListener(mapFlowIdListenerId);
636 mapFlowId = null;
637 mapFlowIdListener = null;
638
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700639 // Clear the Topology-related map state
640 mapTopology.removeEntryListener(mapTopologyListenerId);
641 mapTopology = null;
642 mapTopologyListener = null;
643
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700644 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700645 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800646
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800647 @Override
648 public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
649 if (arpEventHandler != null) {
650 arpEventHandlers.add(arpEventHandler);
651 }
652 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800653
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800654 @Override
655 public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
656 arpEventHandlers.remove(arpEventHandler);
657 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800658
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700659 /**
660 * Get all Flows that are currently in the datagrid.
661 *
662 * @return all Flows that are currently in the datagrid.
663 */
664 @Override
665 public Collection<FlowPath> getAllFlows() {
666 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
667
668 //
669 // Get all current entries
670 //
671 Collection<byte[]> values = mapFlow.values();
672 Kryo kryo = kryoFactory.newKryo();
673 for (byte[] valueBytes : values) {
674 //
675 // Decode the value
676 //
677 Input input = new Input(valueBytes);
678 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
679 allFlows.add(flowPath);
680 }
681 kryoFactory.deleteKryo(kryo);
682
683 return allFlows;
684 }
685
686 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800687 * Get a Flow for a given Flow ID.
688 *
689 * @param flowId the Flow ID of the Flow to get.
690 * @return the Flow if found, otherwise null.
691 */
692 @Override
693 public FlowPath getFlow(FlowId flowId) {
694 byte[] valueBytes = mapFlow.get(flowId.value());
695 if (valueBytes == null)
696 return null;
697
698 Kryo kryo = kryoFactory.newKryo();
699 //
700 // Decode the value
701 //
702 Input input = new Input(valueBytes);
703 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
704 kryoFactory.deleteKryo(kryo);
705
706 return flowPath;
707 }
708
709 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700710 * Send a notification that a Flow is added.
711 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700712 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700713 */
714 @Override
715 public void notificationSendFlowAdded(FlowPath flowPath) {
716 //
717 // Encode the value
718 //
719 byte[] buffer = new byte[MAX_BUFFER_SIZE];
720 Kryo kryo = kryoFactory.newKryo();
721 Output output = new Output(buffer, -1);
722 kryo.writeObject(output, flowPath);
723 byte[] valueBytes = output.toBytes();
724 kryoFactory.deleteKryo(kryo);
725
726 //
727 // Put the entry:
728 // - Key : Flow ID (Long)
729 // - Value : Serialized Flow (byte[])
730 //
731 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
732 }
733
734 /**
735 * Send a notification that a Flow is removed.
736 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700737 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700738 */
739 @Override
740 public void notificationSendFlowRemoved(FlowId flowId) {
741 //
742 // Remove the entry:
743 // - Key : Flow ID (Long)
744 // - Value : Serialized Flow (byte[])
745 //
746 mapFlow.removeAsync(flowId.value());
747 }
748
749 /**
750 * Send a notification that a Flow is updated.
751 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700752 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700753 */
754 @Override
755 public void notificationSendFlowUpdated(FlowPath flowPath) {
756 // NOTE: Adding an entry with an existing key automatically updates it
757 notificationSendFlowAdded(flowPath);
758 }
759
760 /**
761 * Send a notification that all Flows are removed.
762 */
763 @Override
764 public void notificationSendAllFlowsRemoved() {
765 //
766 // Remove all entries
767 // NOTE: We remove the entries one-by-one so the per-entry
768 // notifications will be delivered.
769 //
770 // mapFlow.clear();
771 Set<Long> keySet = mapFlow.keySet();
772 for (Long key : keySet) {
773 mapFlow.removeAsync(key);
774 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700775 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700776
777 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700778 * Get all Flow Entries that are currently in the datagrid.
779 *
780 * @return all Flow Entries that are currently in the datagrid.
781 */
782 @Override
783 public Collection<FlowEntry> getAllFlowEntries() {
784 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
785
786 //
787 // Get all current entries
788 //
789 Collection<byte[]> values = mapFlowEntry.values();
790 Kryo kryo = kryoFactory.newKryo();
791 for (byte[] valueBytes : values) {
792 //
793 // Decode the value
794 //
795 Input input = new Input(valueBytes);
796 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
797 allFlowEntries.add(flowEntry);
798 }
799 kryoFactory.deleteKryo(kryo);
800
801 return allFlowEntries;
802 }
803
804 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800805 * Get a Flow Entry for a given Flow Entry ID.
806 *
807 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
808 * @return the Flow Entry if found, otherwise null.
809 */
810 @Override
811 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
812 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
813 if (valueBytes == null)
814 return null;
815
816 Kryo kryo = kryoFactory.newKryo();
817 //
818 // Decode the value
819 //
820 Input input = new Input(valueBytes);
821 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
822 kryoFactory.deleteKryo(kryo);
823
824 return flowEntry;
825 }
826
827 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700828 * Send a notification that a FlowEntry is added.
829 *
830 * @param flowEntry the FlowEntry that is added.
831 */
832 @Override
833 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
834 //
835 // Encode the value
836 //
837 byte[] buffer = new byte[MAX_BUFFER_SIZE];
838 Kryo kryo = kryoFactory.newKryo();
839 Output output = new Output(buffer, -1);
840 kryo.writeObject(output, flowEntry);
841 byte[] valueBytes = output.toBytes();
842 kryoFactory.deleteKryo(kryo);
843
844 //
845 // Put the entry:
846 // - Key : FlowEntry ID (Long)
847 // - Value : Serialized FlowEntry (byte[])
848 //
849 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
850 }
851
852 /**
853 * Send a notification that a FlowEntry is removed.
854 *
855 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
856 */
857 @Override
858 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
859 //
860 // Remove the entry:
861 // - Key : FlowEntry ID (Long)
862 // - Value : Serialized FlowEntry (byte[])
863 //
864 mapFlowEntry.removeAsync(flowEntryId.value());
865 }
866
867 /**
868 * Send a notification that a FlowEntry is updated.
869 *
870 * @param flowEntry the FlowEntry that is updated.
871 */
872 @Override
873 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
874 // NOTE: Adding an entry with an existing key automatically updates it
875 notificationSendFlowEntryAdded(flowEntry);
876 }
877
878 /**
879 * Send a notification that all Flow Entries are removed.
880 */
881 @Override
882 public void notificationSendAllFlowEntriesRemoved() {
883 //
884 // Remove all entries
885 // NOTE: We remove the entries one-by-one so the per-entry
886 // notifications will be delivered.
887 //
888 // mapFlowEntry.clear();
889 Set<Long> keySet = mapFlowEntry.keySet();
890 for (Long key : keySet) {
891 mapFlowEntry.removeAsync(key);
892 }
893 }
894
895 /**
Pavlin Radoslavov2004fa02014-01-07 14:46:42 -0800896 * Get all Flow IDs that are currently in the datagrid.
897 *
898 * @return all Flow IDs that are currently in the datagrid.
899 */
900 @Override
901 public Collection<FlowId> getAllFlowIds() {
902 Collection<FlowId> allFlowIds = new LinkedList<FlowId>();
903
904 //
905 // Get all current entries
906 //
907 Collection<byte[]> values = mapFlowId.values();
908 Kryo kryo = kryoFactory.newKryo();
909 for (byte[] valueBytes : values) {
910 //
911 // Decode the value
912 //
913 Input input = new Input(valueBytes);
914 FlowId flowId = kryo.readObject(input, FlowId.class);
915 allFlowIds.add(flowId);
916 }
917 kryoFactory.deleteKryo(kryo);
918
919 return allFlowIds;
920 }
921
922 /**
923 * Send a notification that a FlowId is added.
924 *
925 * @param flowId the FlowId that is added.
926 */
927 @Override
928 public void notificationSendFlowIdAdded(FlowId flowId) {
929 //
930 // Encode the value
931 //
932 byte[] buffer = new byte[MAX_BUFFER_SIZE];
933 Kryo kryo = kryoFactory.newKryo();
934 Output output = new Output(buffer, -1);
935 kryo.writeObject(output, flowId);
936 byte[] valueBytes = output.toBytes();
937 kryoFactory.deleteKryo(kryo);
938
939 //
940 // Put the entry:
941 // - Key : FlowId (Long)
942 // - Value : Serialized FlowId (byte[])
943 //
944 mapFlowId.putAsync(flowId.value(), valueBytes);
945 }
946
947 /**
948 * Send a notification that a FlowId is removed.
949 *
950 * @param flowId the FlowId that is removed.
951 */
952 @Override
953 public void notificationSendFlowIdRemoved(FlowId flowId) {
954 //
955 // Remove the entry:
956 // - Key : FlowId (Long)
957 // - Value : Serialized FlowId (byte[])
958 //
959 mapFlowId.removeAsync(flowId.value());
960 }
961
962 /**
963 * Send a notification that a FlowId is updated.
964 *
965 * @param flowId the FlowId that is updated.
966 */
967 @Override
968 public void notificationSendFlowIdUpdated(FlowId flowId) {
969 // NOTE: Adding an entry with an existing key automatically updates it
970 notificationSendFlowIdAdded(flowId);
971 }
972
973 /**
974 * Send a notification that all Flow IDs are removed.
975 */
976 @Override
977 public void notificationSendAllFlowIdsRemoved() {
978 //
979 // Remove all entries
980 // NOTE: We remove the entries one-by-one so the per-entry
981 // notifications will be delivered.
982 //
983 // mapFlowId.clear();
984 Set<Long> keySet = mapFlowId.keySet();
985 for (Long key : keySet) {
986 mapFlowId.removeAsync(key);
987 }
988 }
989
990 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700991 * Get all Topology Elements that are currently in the datagrid.
992 *
993 * @return all Topology Elements that are currently in the datagrid.
994 */
995 @Override
996 public Collection<TopologyElement> getAllTopologyElements() {
997 Collection<TopologyElement> allTopologyElements =
998 new LinkedList<TopologyElement>();
999
1000 //
1001 // Get all current entries
1002 //
1003 Collection<byte[]> values = mapTopology.values();
1004 Kryo kryo = kryoFactory.newKryo();
1005 for (byte[] valueBytes : values) {
1006 //
1007 // Decode the value
1008 //
1009 Input input = new Input(valueBytes);
1010 TopologyElement topologyElement =
1011 kryo.readObject(input, TopologyElement.class);
1012 allTopologyElements.add(topologyElement);
1013 }
1014 kryoFactory.deleteKryo(kryo);
1015
1016 return allTopologyElements;
1017 }
1018
1019 /**
1020 * Send a notification that a Topology Element is added.
1021 *
1022 * @param topologyElement the Topology Element that is added.
1023 */
1024 @Override
1025 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
1026 //
1027 // Encode the value
1028 //
1029 byte[] buffer = new byte[MAX_BUFFER_SIZE];
1030 Kryo kryo = kryoFactory.newKryo();
1031 Output output = new Output(buffer, -1);
1032 kryo.writeObject(output, topologyElement);
1033 byte[] valueBytes = output.toBytes();
1034 kryoFactory.deleteKryo(kryo);
1035
1036 //
1037 // Put the entry:
1038 // - Key : TopologyElement ID (String)
1039 // - Value : Serialized TopologyElement (byte[])
1040 //
1041 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
1042 }
1043
1044 /**
1045 * Send a notification that a Topology Element is removed.
1046 *
1047 * @param topologyElement the Topology Element that is removed.
1048 */
1049 @Override
1050 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
1051 //
1052 // Remove the entry:
1053 // - Key : TopologyElement ID (String)
1054 // - Value : Serialized TopologyElement (byte[])
1055 //
1056 mapTopology.removeAsync(topologyElement.elementId());
1057 }
1058
1059 /**
1060 * Send a notification that a Topology Element is updated.
1061 *
1062 * @param topologyElement the Topology Element that is updated.
1063 */
1064 @Override
1065 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
1066 // NOTE: Adding an entry with an existing key automatically updates it
1067 notificationSendTopologyElementAdded(topologyElement);
1068 }
1069
1070 /**
1071 * Send a notification that all Topology Elements are removed.
1072 */
1073 @Override
1074 public void notificationSendAllTopologyElementsRemoved() {
1075 //
1076 // Remove all entries
1077 // NOTE: We remove the entries one-by-one so the per-entry
1078 // notifications will be delivered.
1079 //
1080 // mapTopology.clear();
1081 Set<String> keySet = mapTopology.keySet();
1082 for (String key : keySet) {
1083 mapTopology.removeAsync(key);
1084 }
1085 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -08001086
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001087 @Override
Jonathan Hartd3003252013-11-15 09:44:46 -08001088 public void sendArpRequest(ArpMessage arpMessage) {
1089 //log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
1090 arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -08001091 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001092}