blob: 3b34a35ec7a1022b93d017058d8dba58abc4cd23 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07008import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07009import java.util.Set;
10
11import com.esotericsoftware.kryo2.Kryo;
12import com.esotericsoftware.kryo2.io.Input;
13import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070014
15import net.floodlightcontroller.core.IFloodlightProviderService;
16import net.floodlightcontroller.core.module.FloodlightModuleContext;
17import net.floodlightcontroller.core.module.FloodlightModuleException;
18import net.floodlightcontroller.core.module.IFloodlightModule;
19import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070020import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070021
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070022import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070023import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070024import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070025import net.onrc.onos.ofcontroller.util.FlowEntry;
26import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070027import net.onrc.onos.ofcontroller.util.FlowId;
28import net.onrc.onos.ofcontroller.util.FlowPath;
29import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
30
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070031import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34import com.hazelcast.config.Config;
35import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070036import com.hazelcast.core.EntryEvent;
37import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070038import com.hazelcast.core.Hazelcast;
39import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070040import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070041import com.hazelcast.instance.GroupProperties;
42
43/**
44 * A datagrid service that uses Hazelcast as a datagrid.
45 * The relevant data is stored in the Hazelcast datagrid and shared as
46 * appropriate in a multi-node cluster.
47 */
48public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070049 private final static int MAX_BUFFER_SIZE = 64*1024;
50
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070051 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070052 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070053 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070054
55 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070056 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070057 private Config hazelcastConfig = null;
58
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070059 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070060 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070061
62 // State related to the Flow map
63 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070064 private IMap<Long, byte[]> mapFlow = null;
65 private MapFlowListener mapFlowListener = null;
66 private String mapFlowListenerId = null;
67
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070068 // State related to the Flow Entry map
69 protected static final String mapFlowEntryName = "mapFlowEntry";
70 private IMap<Long, byte[]> mapFlowEntry = null;
71 private MapFlowEntryListener mapFlowEntryListener = null;
72 private String mapFlowEntryListenerId = null;
73
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070074 // State related to the Network Topology map
75 protected static final String mapTopologyName = "mapTopology";
76 private IMap<String, byte[]> mapTopology = null;
77 private MapTopologyListener mapTopologyListener = null;
78 private String mapTopologyListenerId = null;
79
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070080 /**
81 * Class for receiving notifications for Flow state.
82 *
83 * The datagrid map is:
84 * - Key : Flow ID (Long)
85 * - Value : Serialized Flow (byte[])
86 */
87 class MapFlowListener implements EntryListener<Long, byte[]> {
88 /**
89 * Receive a notification that an entry is added.
90 *
91 * @param event the notification event for the entry.
92 */
93 public void entryAdded(EntryEvent event) {
94 Long keyLong = (Long)event.getKey();
95 byte[] valueBytes = (byte[])event.getValue();
96
97 //
98 // Decode the value and deliver the notification
99 //
100 Kryo kryo = kryoFactory.newKryo();
101 Input input = new Input(valueBytes);
102 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
103 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700104 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700105 }
106
107 /**
108 * Receive a notification that an entry is removed.
109 *
110 * @param event the notification event for the entry.
111 */
112 public void entryRemoved(EntryEvent event) {
113 Long keyLong = (Long)event.getKey();
114 byte[] valueBytes = (byte[])event.getValue();
115
116 //
117 // Decode the value and deliver the notification
118 //
119 Kryo kryo = kryoFactory.newKryo();
120 Input input = new Input(valueBytes);
121 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
122 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700123 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700124 }
125
126 /**
127 * Receive a notification that an entry is updated.
128 *
129 * @param event the notification event for the entry.
130 */
131 public void entryUpdated(EntryEvent event) {
132 Long keyLong = (Long)event.getKey();
133 byte[] valueBytes = (byte[])event.getValue();
134
135 //
136 // Decode the value and deliver the notification
137 //
138 Kryo kryo = kryoFactory.newKryo();
139 Input input = new Input(valueBytes);
140 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
141 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700142 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700143 }
144
145 /**
146 * Receive a notification that an entry is evicted.
147 *
148 * @param event the notification event for the entry.
149 */
150 public void entryEvicted(EntryEvent event) {
151 // NOTE: We don't use eviction for this map
152 }
153 }
154
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700155 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700156 * Class for receiving notifications for FlowEntry state.
157 *
158 * The datagrid map is:
159 * - Key : FlowEntry ID (Long)
160 * - Value : Serialized FlowEntry (byte[])
161 */
162 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
163 /**
164 * Receive a notification that an entry is added.
165 *
166 * @param event the notification event for the entry.
167 */
168 public void entryAdded(EntryEvent event) {
169 Long keyLong = (Long)event.getKey();
170 byte[] valueBytes = (byte[])event.getValue();
171
172 //
173 // Decode the value and deliver the notification
174 //
175 Kryo kryo = kryoFactory.newKryo();
176 Input input = new Input(valueBytes);
177 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
178 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700179 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700180 }
181
182 /**
183 * Receive a notification that an entry is removed.
184 *
185 * @param event the notification event for the entry.
186 */
187 public void entryRemoved(EntryEvent event) {
188 Long keyLong = (Long)event.getKey();
189 byte[] valueBytes = (byte[])event.getValue();
190
191 //
192 // Decode the value and deliver the notification
193 //
194 Kryo kryo = kryoFactory.newKryo();
195 Input input = new Input(valueBytes);
196 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
197 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700198 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700199 }
200
201 /**
202 * Receive a notification that an entry is updated.
203 *
204 * @param event the notification event for the entry.
205 */
206 public void entryUpdated(EntryEvent event) {
207 Long keyLong = (Long)event.getKey();
208 byte[] valueBytes = (byte[])event.getValue();
209
210 //
211 // Decode the value and deliver the notification
212 //
213 Kryo kryo = kryoFactory.newKryo();
214 Input input = new Input(valueBytes);
215 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
216 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700217 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700218 }
219
220 /**
221 * Receive a notification that an entry is evicted.
222 *
223 * @param event the notification event for the entry.
224 */
225 public void entryEvicted(EntryEvent event) {
226 // NOTE: We don't use eviction for this map
227 }
228 }
229
230 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700231 * Class for receiving notifications for Network Topology state.
232 *
233 * The datagrid map is:
234 * - Key: TopologyElement ID (String)
235 * - Value: Serialized TopologyElement (byte[])
236 */
237 class MapTopologyListener implements EntryListener<String, byte[]> {
238 /**
239 * Receive a notification that an entry is added.
240 *
241 * @param event the notification event for the entry.
242 */
243 public void entryAdded(EntryEvent event) {
244 String keyString = (String)event.getKey();
245 byte[] valueBytes = (byte[])event.getValue();
246
247 //
248 // Decode the value and deliver the notification
249 //
250 Kryo kryo = kryoFactory.newKryo();
251 Input input = new Input(valueBytes);
252 TopologyElement topologyElement =
253 kryo.readObject(input, TopologyElement.class);
254 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700255 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700256 }
257
258 /**
259 * Receive a notification that an entry is removed.
260 *
261 * @param event the notification event for the entry.
262 */
263 public void entryRemoved(EntryEvent event) {
264 String keyString = (String)event.getKey();
265 byte[] valueBytes = (byte[])event.getValue();
266
267 //
268 // Decode the value and deliver the notification
269 //
270 Kryo kryo = kryoFactory.newKryo();
271 Input input = new Input(valueBytes);
272 TopologyElement topologyElement =
273 kryo.readObject(input, TopologyElement.class);
274 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700275 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700276 }
277
278 /**
279 * Receive a notification that an entry is updated.
280 *
281 * @param event the notification event for the entry.
282 */
283 public void entryUpdated(EntryEvent event) {
284 String keyString = (String)event.getKey();
285 byte[] valueBytes = (byte[])event.getValue();
286
287 //
288 // Decode the value and deliver the notification
289 //
290 Kryo kryo = kryoFactory.newKryo();
291 Input input = new Input(valueBytes);
292 TopologyElement topologyElement =
293 kryo.readObject(input, TopologyElement.class);
294 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700295 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700296 }
297
298 /**
299 * Receive a notification that an entry is evicted.
300 *
301 * @param event the notification event for the entry.
302 */
303 public void entryEvicted(EntryEvent event) {
304 // NOTE: We don't use eviction for this map
305 }
306 }
307
308 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700309 * Initialize the Hazelcast Datagrid operation.
310 *
311 * @param conf the configuration filename.
312 */
313 public void init(String configFilename) {
314 /*
315 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
316 System.setProperty("hazelcast.socket.send.buffer.size", "32");
317 */
318 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
319
320 // Init from configuration file
321 try {
322 hazelcastConfig = new FileSystemXmlConfig(configFilename);
323 } catch (FileNotFoundException e) {
324 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
325 }
326 /*
327 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
328 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
329 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
330 */
331 //
332 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
333 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
334 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
335 }
336
337 /**
338 * Shutdown the Hazelcast Datagrid operation.
339 */
340 public void finalize() {
341 close();
342 }
343
344 /**
345 * Shutdown the Hazelcast Datagrid operation.
346 */
347 public void close() {
348 Hazelcast.shutdownAll();
349 }
350
351 /**
352 * Get the collection of offered module services.
353 *
354 * @return the collection of offered module services.
355 */
356 @Override
357 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
358 Collection<Class<? extends IFloodlightService>> l =
359 new ArrayList<Class<? extends IFloodlightService>>();
360 l.add(IDatagridService.class);
361 return l;
362 }
363
364 /**
365 * Get the collection of implemented services.
366 *
367 * @return the collection of implemented services.
368 */
369 @Override
370 public Map<Class<? extends IFloodlightService>, IFloodlightService>
371 getServiceImpls() {
372 Map<Class<? extends IFloodlightService>,
Pavlin Radoslavov27da7532013-10-18 18:41:50 -0700373 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700374 new HashMap<Class<? extends IFloodlightService>,
375 IFloodlightService>();
376 m.put(IDatagridService.class, this);
377 return m;
378 }
379
380 /**
381 * Get the collection of modules this module depends on.
382 *
383 * @return the collection of modules this module depends on.
384 */
385 @Override
386 public Collection<Class<? extends IFloodlightService>>
387 getModuleDependencies() {
388 Collection<Class<? extends IFloodlightService>> l =
389 new ArrayList<Class<? extends IFloodlightService>>();
390 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700391 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700392 return l;
393 }
394
395 /**
396 * Initialize the module.
397 *
398 * @param context the module context to use for the initialization.
399 */
400 @Override
401 public void init(FloodlightModuleContext context)
402 throws FloodlightModuleException {
403 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700404 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700405
406 // Get the configuration file name and configure the Datagrid
407 Map<String, String> configMap = context.getConfigParams(this);
408 String configFilename = configMap.get(HazelcastConfigFile);
409 this.init(configFilename);
410 }
411
412 /**
413 * Startup module operation.
414 *
415 * @param context the module context to use for the startup.
416 */
417 @Override
418 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700419 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700420
421 restApi.addRestletRoutable(new DatagridWebRoutable());
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700422 }
423
424 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700425 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700426 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700427 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700428 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700429 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700430 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700431 */
432 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700433 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
434 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700435
436 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700437 mapFlowListener = new MapFlowListener();
438 mapFlow = hazelcastInstance.getMap(mapFlowName);
439 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700440
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700441 // Initialize the FlowEntry-related map state
442 mapFlowEntryListener = new MapFlowEntryListener();
443 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
444 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
445
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700446 // Initialize the Topology-related map state
447 mapTopologyListener = new MapTopologyListener();
448 mapTopology = hazelcastInstance.getMap(mapTopologyName);
449 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700450 }
451
452 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700453 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700454 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700455 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700456 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700457 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700458 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700459 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700460 */
461 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700462 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700463 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700464 mapFlow.removeEntryListener(mapFlowListenerId);
465 mapFlow = null;
466 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700467
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700468 // Clear the FlowEntry-related map state
469 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
470 mapFlowEntry = null;
471 mapFlowEntryListener = null;
472
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700473 // Clear the Topology-related map state
474 mapTopology.removeEntryListener(mapTopologyListenerId);
475 mapTopology = null;
476 mapTopologyListener = null;
477
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700478 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700479 }
480
481 /**
482 * Get all Flows that are currently in the datagrid.
483 *
484 * @return all Flows that are currently in the datagrid.
485 */
486 @Override
487 public Collection<FlowPath> getAllFlows() {
488 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
489
490 //
491 // Get all current entries
492 //
493 Collection<byte[]> values = mapFlow.values();
494 Kryo kryo = kryoFactory.newKryo();
495 for (byte[] valueBytes : values) {
496 //
497 // Decode the value
498 //
499 Input input = new Input(valueBytes);
500 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
501 allFlows.add(flowPath);
502 }
503 kryoFactory.deleteKryo(kryo);
504
505 return allFlows;
506 }
507
508 /**
509 * Send a notification that a Flow is added.
510 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700511 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700512 */
513 @Override
514 public void notificationSendFlowAdded(FlowPath flowPath) {
515 //
516 // Encode the value
517 //
518 byte[] buffer = new byte[MAX_BUFFER_SIZE];
519 Kryo kryo = kryoFactory.newKryo();
520 Output output = new Output(buffer, -1);
521 kryo.writeObject(output, flowPath);
522 byte[] valueBytes = output.toBytes();
523 kryoFactory.deleteKryo(kryo);
524
525 //
526 // Put the entry:
527 // - Key : Flow ID (Long)
528 // - Value : Serialized Flow (byte[])
529 //
530 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
531 }
532
533 /**
534 * Send a notification that a Flow is removed.
535 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700536 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700537 */
538 @Override
539 public void notificationSendFlowRemoved(FlowId flowId) {
540 //
541 // Remove the entry:
542 // - Key : Flow ID (Long)
543 // - Value : Serialized Flow (byte[])
544 //
545 mapFlow.removeAsync(flowId.value());
546 }
547
548 /**
549 * Send a notification that a Flow is updated.
550 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700551 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700552 */
553 @Override
554 public void notificationSendFlowUpdated(FlowPath flowPath) {
555 // NOTE: Adding an entry with an existing key automatically updates it
556 notificationSendFlowAdded(flowPath);
557 }
558
559 /**
560 * Send a notification that all Flows are removed.
561 */
562 @Override
563 public void notificationSendAllFlowsRemoved() {
564 //
565 // Remove all entries
566 // NOTE: We remove the entries one-by-one so the per-entry
567 // notifications will be delivered.
568 //
569 // mapFlow.clear();
570 Set<Long> keySet = mapFlow.keySet();
571 for (Long key : keySet) {
572 mapFlow.removeAsync(key);
573 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700574 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700575
576 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700577 * Get all Flow Entries that are currently in the datagrid.
578 *
579 * @return all Flow Entries that are currently in the datagrid.
580 */
581 @Override
582 public Collection<FlowEntry> getAllFlowEntries() {
583 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
584
585 //
586 // Get all current entries
587 //
588 Collection<byte[]> values = mapFlowEntry.values();
589 Kryo kryo = kryoFactory.newKryo();
590 for (byte[] valueBytes : values) {
591 //
592 // Decode the value
593 //
594 Input input = new Input(valueBytes);
595 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
596 allFlowEntries.add(flowEntry);
597 }
598 kryoFactory.deleteKryo(kryo);
599
600 return allFlowEntries;
601 }
602
603 /**
604 * Send a notification that a FlowEntry is added.
605 *
606 * @param flowEntry the FlowEntry that is added.
607 */
608 @Override
609 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
610 //
611 // Encode the value
612 //
613 byte[] buffer = new byte[MAX_BUFFER_SIZE];
614 Kryo kryo = kryoFactory.newKryo();
615 Output output = new Output(buffer, -1);
616 kryo.writeObject(output, flowEntry);
617 byte[] valueBytes = output.toBytes();
618 kryoFactory.deleteKryo(kryo);
619
620 //
621 // Put the entry:
622 // - Key : FlowEntry ID (Long)
623 // - Value : Serialized FlowEntry (byte[])
624 //
625 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
626 }
627
628 /**
629 * Send a notification that a FlowEntry is removed.
630 *
631 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
632 */
633 @Override
634 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
635 //
636 // Remove the entry:
637 // - Key : FlowEntry ID (Long)
638 // - Value : Serialized FlowEntry (byte[])
639 //
640 mapFlowEntry.removeAsync(flowEntryId.value());
641 }
642
643 /**
644 * Send a notification that a FlowEntry is updated.
645 *
646 * @param flowEntry the FlowEntry that is updated.
647 */
648 @Override
649 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
650 // NOTE: Adding an entry with an existing key automatically updates it
651 notificationSendFlowEntryAdded(flowEntry);
652 }
653
654 /**
655 * Send a notification that all Flow Entries are removed.
656 */
657 @Override
658 public void notificationSendAllFlowEntriesRemoved() {
659 //
660 // Remove all entries
661 // NOTE: We remove the entries one-by-one so the per-entry
662 // notifications will be delivered.
663 //
664 // mapFlowEntry.clear();
665 Set<Long> keySet = mapFlowEntry.keySet();
666 for (Long key : keySet) {
667 mapFlowEntry.removeAsync(key);
668 }
669 }
670
671 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700672 * Get all Topology Elements that are currently in the datagrid.
673 *
674 * @return all Topology Elements that are currently in the datagrid.
675 */
676 @Override
677 public Collection<TopologyElement> getAllTopologyElements() {
678 Collection<TopologyElement> allTopologyElements =
679 new LinkedList<TopologyElement>();
680
681 //
682 // Get all current entries
683 //
684 Collection<byte[]> values = mapTopology.values();
685 Kryo kryo = kryoFactory.newKryo();
686 for (byte[] valueBytes : values) {
687 //
688 // Decode the value
689 //
690 Input input = new Input(valueBytes);
691 TopologyElement topologyElement =
692 kryo.readObject(input, TopologyElement.class);
693 allTopologyElements.add(topologyElement);
694 }
695 kryoFactory.deleteKryo(kryo);
696
697 return allTopologyElements;
698 }
699
700 /**
701 * Send a notification that a Topology Element is added.
702 *
703 * @param topologyElement the Topology Element that is added.
704 */
705 @Override
706 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
707 //
708 // Encode the value
709 //
710 byte[] buffer = new byte[MAX_BUFFER_SIZE];
711 Kryo kryo = kryoFactory.newKryo();
712 Output output = new Output(buffer, -1);
713 kryo.writeObject(output, topologyElement);
714 byte[] valueBytes = output.toBytes();
715 kryoFactory.deleteKryo(kryo);
716
717 //
718 // Put the entry:
719 // - Key : TopologyElement ID (String)
720 // - Value : Serialized TopologyElement (byte[])
721 //
722 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
723 }
724
725 /**
726 * Send a notification that a Topology Element is removed.
727 *
728 * @param topologyElement the Topology Element that is removed.
729 */
730 @Override
731 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
732 //
733 // Remove the entry:
734 // - Key : TopologyElement ID (String)
735 // - Value : Serialized TopologyElement (byte[])
736 //
737 mapTopology.removeAsync(topologyElement.elementId());
738 }
739
740 /**
741 * Send a notification that a Topology Element is updated.
742 *
743 * @param topologyElement the Topology Element that is updated.
744 */
745 @Override
746 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
747 // NOTE: Adding an entry with an existing key automatically updates it
748 notificationSendTopologyElementAdded(topologyElement);
749 }
750
751 /**
752 * Send a notification that all Topology Elements are removed.
753 */
754 @Override
755 public void notificationSendAllTopologyElementsRemoved() {
756 //
757 // Remove all entries
758 // NOTE: We remove the entries one-by-one so the per-entry
759 // notifications will be delivered.
760 //
761 // mapTopology.clear();
762 Set<String> keySet = mapTopology.keySet();
763 for (String key : keySet) {
764 mapTopology.removeAsync(key);
765 }
766 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700767}