blob: 481002f444e1be9ee1b746300b62d2da08e4f446 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07008import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07009import java.util.Set;
10
11import com.esotericsoftware.kryo2.Kryo;
12import com.esotericsoftware.kryo2.io.Input;
13import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070014
15import net.floodlightcontroller.core.IFloodlightProviderService;
16import net.floodlightcontroller.core.module.FloodlightModuleContext;
17import net.floodlightcontroller.core.module.FloodlightModuleException;
18import net.floodlightcontroller.core.module.IFloodlightModule;
19import net.floodlightcontroller.core.module.IFloodlightService;
20
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -070021import net.onrc.onos.ofcontroller.flowmanager.IPathComputationService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070022import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070023import net.onrc.onos.ofcontroller.util.FlowId;
24import net.onrc.onos.ofcontroller.util.FlowPath;
25import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
26
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070027import org.slf4j.Logger;
28import org.slf4j.LoggerFactory;
29
30import com.hazelcast.config.Config;
31import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070032import com.hazelcast.core.EntryEvent;
33import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070034import com.hazelcast.core.Hazelcast;
35import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070036import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070037import com.hazelcast.instance.GroupProperties;
38
39/**
40 * A datagrid service that uses Hazelcast as a datagrid.
41 * The relevant data is stored in the Hazelcast datagrid and shared as
42 * appropriate in a multi-node cluster.
43 */
44public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070045 private final static int MAX_BUFFER_SIZE = 64*1024;
46
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070047 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070048 protected IFloodlightProviderService floodlightProvider;
49
50 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070051 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070052 private Config hazelcastConfig = null;
53
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070054 private KryoFactory kryoFactory = new KryoFactory();
55
56 // State related to the Flow map
57 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -070058 private IPathComputationService pathComputationService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070059 private IMap<Long, byte[]> mapFlow = null;
60 private MapFlowListener mapFlowListener = null;
61 private String mapFlowListenerId = null;
62
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070063 // State related to the Network Topology map
64 protected static final String mapTopologyName = "mapTopology";
65 private IMap<String, byte[]> mapTopology = null;
66 private MapTopologyListener mapTopologyListener = null;
67 private String mapTopologyListenerId = null;
68
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070069 /**
70 * Class for receiving notifications for Flow state.
71 *
72 * The datagrid map is:
73 * - Key : Flow ID (Long)
74 * - Value : Serialized Flow (byte[])
75 */
76 class MapFlowListener implements EntryListener<Long, byte[]> {
77 /**
78 * Receive a notification that an entry is added.
79 *
80 * @param event the notification event for the entry.
81 */
82 public void entryAdded(EntryEvent event) {
83 Long keyLong = (Long)event.getKey();
84 byte[] valueBytes = (byte[])event.getValue();
85
86 //
87 // Decode the value and deliver the notification
88 //
89 Kryo kryo = kryoFactory.newKryo();
90 Input input = new Input(valueBytes);
91 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
92 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -070093 pathComputationService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070094 }
95
96 /**
97 * Receive a notification that an entry is removed.
98 *
99 * @param event the notification event for the entry.
100 */
101 public void entryRemoved(EntryEvent event) {
102 Long keyLong = (Long)event.getKey();
103 byte[] valueBytes = (byte[])event.getValue();
104
105 //
106 // Decode the value and deliver the notification
107 //
108 Kryo kryo = kryoFactory.newKryo();
109 Input input = new Input(valueBytes);
110 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
111 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700112 pathComputationService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700113 }
114
115 /**
116 * Receive a notification that an entry is updated.
117 *
118 * @param event the notification event for the entry.
119 */
120 public void entryUpdated(EntryEvent event) {
121 Long keyLong = (Long)event.getKey();
122 byte[] valueBytes = (byte[])event.getValue();
123
124 //
125 // Decode the value and deliver the notification
126 //
127 Kryo kryo = kryoFactory.newKryo();
128 Input input = new Input(valueBytes);
129 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
130 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700131 pathComputationService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700132 }
133
134 /**
135 * Receive a notification that an entry is evicted.
136 *
137 * @param event the notification event for the entry.
138 */
139 public void entryEvicted(EntryEvent event) {
140 // NOTE: We don't use eviction for this map
141 }
142 }
143
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700144 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700145 * Class for receiving notifications for Network Topology state.
146 *
147 * The datagrid map is:
148 * - Key: TopologyElement ID (String)
149 * - Value: Serialized TopologyElement (byte[])
150 */
151 class MapTopologyListener implements EntryListener<String, byte[]> {
152 /**
153 * Receive a notification that an entry is added.
154 *
155 * @param event the notification event for the entry.
156 */
157 public void entryAdded(EntryEvent event) {
158 String keyString = (String)event.getKey();
159 byte[] valueBytes = (byte[])event.getValue();
160
161 //
162 // Decode the value and deliver the notification
163 //
164 Kryo kryo = kryoFactory.newKryo();
165 Input input = new Input(valueBytes);
166 TopologyElement topologyElement =
167 kryo.readObject(input, TopologyElement.class);
168 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700169 pathComputationService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700170 }
171
172 /**
173 * Receive a notification that an entry is removed.
174 *
175 * @param event the notification event for the entry.
176 */
177 public void entryRemoved(EntryEvent event) {
178 String keyString = (String)event.getKey();
179 byte[] valueBytes = (byte[])event.getValue();
180
181 //
182 // Decode the value and deliver the notification
183 //
184 Kryo kryo = kryoFactory.newKryo();
185 Input input = new Input(valueBytes);
186 TopologyElement topologyElement =
187 kryo.readObject(input, TopologyElement.class);
188 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700189 pathComputationService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700190 }
191
192 /**
193 * Receive a notification that an entry is updated.
194 *
195 * @param event the notification event for the entry.
196 */
197 public void entryUpdated(EntryEvent event) {
198 String keyString = (String)event.getKey();
199 byte[] valueBytes = (byte[])event.getValue();
200
201 //
202 // Decode the value and deliver the notification
203 //
204 Kryo kryo = kryoFactory.newKryo();
205 Input input = new Input(valueBytes);
206 TopologyElement topologyElement =
207 kryo.readObject(input, TopologyElement.class);
208 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700209 pathComputationService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700210 }
211
212 /**
213 * Receive a notification that an entry is evicted.
214 *
215 * @param event the notification event for the entry.
216 */
217 public void entryEvicted(EntryEvent event) {
218 // NOTE: We don't use eviction for this map
219 }
220 }
221
222 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700223 * Initialize the Hazelcast Datagrid operation.
224 *
225 * @param conf the configuration filename.
226 */
227 public void init(String configFilename) {
228 /*
229 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
230 System.setProperty("hazelcast.socket.send.buffer.size", "32");
231 */
232 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
233
234 // Init from configuration file
235 try {
236 hazelcastConfig = new FileSystemXmlConfig(configFilename);
237 } catch (FileNotFoundException e) {
238 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
239 }
240 /*
241 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
242 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
243 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
244 */
245 //
246 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
247 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
248 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
249 }
250
251 /**
252 * Shutdown the Hazelcast Datagrid operation.
253 */
254 public void finalize() {
255 close();
256 }
257
258 /**
259 * Shutdown the Hazelcast Datagrid operation.
260 */
261 public void close() {
262 Hazelcast.shutdownAll();
263 }
264
265 /**
266 * Get the collection of offered module services.
267 *
268 * @return the collection of offered module services.
269 */
270 @Override
271 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
272 Collection<Class<? extends IFloodlightService>> l =
273 new ArrayList<Class<? extends IFloodlightService>>();
274 l.add(IDatagridService.class);
275 return l;
276 }
277
278 /**
279 * Get the collection of implemented services.
280 *
281 * @return the collection of implemented services.
282 */
283 @Override
284 public Map<Class<? extends IFloodlightService>, IFloodlightService>
285 getServiceImpls() {
286 Map<Class<? extends IFloodlightService>,
Pavlin Radoslavov27da7532013-10-18 18:41:50 -0700287 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700288 new HashMap<Class<? extends IFloodlightService>,
289 IFloodlightService>();
290 m.put(IDatagridService.class, this);
291 return m;
292 }
293
294 /**
295 * Get the collection of modules this module depends on.
296 *
297 * @return the collection of modules this module depends on.
298 */
299 @Override
300 public Collection<Class<? extends IFloodlightService>>
301 getModuleDependencies() {
302 Collection<Class<? extends IFloodlightService>> l =
303 new ArrayList<Class<? extends IFloodlightService>>();
304 l.add(IFloodlightProviderService.class);
305 return l;
306 }
307
308 /**
309 * Initialize the module.
310 *
311 * @param context the module context to use for the initialization.
312 */
313 @Override
314 public void init(FloodlightModuleContext context)
315 throws FloodlightModuleException {
316 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
317
318 // Get the configuration file name and configure the Datagrid
319 Map<String, String> configMap = context.getConfigParams(this);
320 String configFilename = configMap.get(HazelcastConfigFile);
321 this.init(configFilename);
322 }
323
324 /**
325 * Startup module operation.
326 *
327 * @param context the module context to use for the startup.
328 */
329 @Override
330 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700331 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
332 }
333
334 /**
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700335 * Register Path Computation Service for receiving Flow-related
336 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700337 *
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700338 * NOTE: Only a single Path Computation Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700339 *
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700340 * @param pathComputationService the Path Computation Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700341 */
342 @Override
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700343 public void registerPathComputationService(IPathComputationService pathComputationService) {
344 this.pathComputationService = pathComputationService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700345
346 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700347 mapFlowListener = new MapFlowListener();
348 mapFlow = hazelcastInstance.getMap(mapFlowName);
349 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700350
351 // Initialize the Topology-related map state
352 mapTopologyListener = new MapTopologyListener();
353 mapTopology = hazelcastInstance.getMap(mapTopologyName);
354 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700355 }
356
357 /**
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700358 * De-register Path Computation Service for receiving Flow-related
359 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700360 *
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700361 * NOTE: Only a single Path Computation Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700362 *
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700363 * @param pathComputationService the Path Computation Service to
364 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700365 */
366 @Override
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700367 public void deregisterPathComputationService(IPathComputationService pathComputationService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700368 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700369 mapFlow.removeEntryListener(mapFlowListenerId);
370 mapFlow = null;
371 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700372
373 // Clear the Topology-related map state
374 mapTopology.removeEntryListener(mapTopologyListenerId);
375 mapTopology = null;
376 mapTopologyListener = null;
377
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700378 this.pathComputationService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700379 }
380
381 /**
382 * Get all Flows that are currently in the datagrid.
383 *
384 * @return all Flows that are currently in the datagrid.
385 */
386 @Override
387 public Collection<FlowPath> getAllFlows() {
388 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
389
390 //
391 // Get all current entries
392 //
393 Collection<byte[]> values = mapFlow.values();
394 Kryo kryo = kryoFactory.newKryo();
395 for (byte[] valueBytes : values) {
396 //
397 // Decode the value
398 //
399 Input input = new Input(valueBytes);
400 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
401 allFlows.add(flowPath);
402 }
403 kryoFactory.deleteKryo(kryo);
404
405 return allFlows;
406 }
407
408 /**
409 * Send a notification that a Flow is added.
410 *
411 * @param flowPath the flow that is added.
412 */
413 @Override
414 public void notificationSendFlowAdded(FlowPath flowPath) {
415 //
416 // Encode the value
417 //
418 byte[] buffer = new byte[MAX_BUFFER_SIZE];
419 Kryo kryo = kryoFactory.newKryo();
420 Output output = new Output(buffer, -1);
421 kryo.writeObject(output, flowPath);
422 byte[] valueBytes = output.toBytes();
423 kryoFactory.deleteKryo(kryo);
424
425 //
426 // Put the entry:
427 // - Key : Flow ID (Long)
428 // - Value : Serialized Flow (byte[])
429 //
430 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
431 }
432
433 /**
434 * Send a notification that a Flow is removed.
435 *
436 * @param flowId the Flow ID of the flow that is removed.
437 */
438 @Override
439 public void notificationSendFlowRemoved(FlowId flowId) {
440 //
441 // Remove the entry:
442 // - Key : Flow ID (Long)
443 // - Value : Serialized Flow (byte[])
444 //
445 mapFlow.removeAsync(flowId.value());
446 }
447
448 /**
449 * Send a notification that a Flow is updated.
450 *
451 * @param flowPath the flow that is updated.
452 */
453 @Override
454 public void notificationSendFlowUpdated(FlowPath flowPath) {
455 // NOTE: Adding an entry with an existing key automatically updates it
456 notificationSendFlowAdded(flowPath);
457 }
458
459 /**
460 * Send a notification that all Flows are removed.
461 */
462 @Override
463 public void notificationSendAllFlowsRemoved() {
464 //
465 // Remove all entries
466 // NOTE: We remove the entries one-by-one so the per-entry
467 // notifications will be delivered.
468 //
469 // mapFlow.clear();
470 Set<Long> keySet = mapFlow.keySet();
471 for (Long key : keySet) {
472 mapFlow.removeAsync(key);
473 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700474 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700475
476 /**
477 * Get all Topology Elements that are currently in the datagrid.
478 *
479 * @return all Topology Elements that are currently in the datagrid.
480 */
481 @Override
482 public Collection<TopologyElement> getAllTopologyElements() {
483 Collection<TopologyElement> allTopologyElements =
484 new LinkedList<TopologyElement>();
485
486 //
487 // Get all current entries
488 //
489 Collection<byte[]> values = mapTopology.values();
490 Kryo kryo = kryoFactory.newKryo();
491 for (byte[] valueBytes : values) {
492 //
493 // Decode the value
494 //
495 Input input = new Input(valueBytes);
496 TopologyElement topologyElement =
497 kryo.readObject(input, TopologyElement.class);
498 allTopologyElements.add(topologyElement);
499 }
500 kryoFactory.deleteKryo(kryo);
501
502 return allTopologyElements;
503 }
504
505 /**
506 * Send a notification that a Topology Element is added.
507 *
508 * @param topologyElement the Topology Element that is added.
509 */
510 @Override
511 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
512 //
513 // Encode the value
514 //
515 byte[] buffer = new byte[MAX_BUFFER_SIZE];
516 Kryo kryo = kryoFactory.newKryo();
517 Output output = new Output(buffer, -1);
518 kryo.writeObject(output, topologyElement);
519 byte[] valueBytes = output.toBytes();
520 kryoFactory.deleteKryo(kryo);
521
522 //
523 // Put the entry:
524 // - Key : TopologyElement ID (String)
525 // - Value : Serialized TopologyElement (byte[])
526 //
527 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
528 }
529
530 /**
531 * Send a notification that a Topology Element is removed.
532 *
533 * @param topologyElement the Topology Element that is removed.
534 */
535 @Override
536 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
537 //
538 // Remove the entry:
539 // - Key : TopologyElement ID (String)
540 // - Value : Serialized TopologyElement (byte[])
541 //
542 mapTopology.removeAsync(topologyElement.elementId());
543 }
544
545 /**
546 * Send a notification that a Topology Element is updated.
547 *
548 * @param topologyElement the Topology Element that is updated.
549 */
550 @Override
551 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
552 // NOTE: Adding an entry with an existing key automatically updates it
553 notificationSendTopologyElementAdded(topologyElement);
554 }
555
556 /**
557 * Send a notification that all Topology Elements are removed.
558 */
559 @Override
560 public void notificationSendAllTopologyElementsRemoved() {
561 //
562 // Remove all entries
563 // NOTE: We remove the entries one-by-one so the per-entry
564 // notifications will be delivered.
565 //
566 // mapTopology.clear();
567 Set<String> keySet = mapTopology.keySet();
568 for (String key : keySet) {
569 mapTopology.removeAsync(key);
570 }
571 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700572}