blob: cfc61828202913688ef0bdb4526de136962ac772 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07008import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07009import java.util.Set;
10
11import com.esotericsoftware.kryo2.Kryo;
12import com.esotericsoftware.kryo2.io.Input;
13import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070014
15import net.floodlightcontroller.core.IFloodlightProviderService;
16import net.floodlightcontroller.core.module.FloodlightModuleContext;
17import net.floodlightcontroller.core.module.FloodlightModuleException;
18import net.floodlightcontroller.core.module.IFloodlightModule;
19import net.floodlightcontroller.core.module.IFloodlightService;
20
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070021import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070022import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070023import net.onrc.onos.ofcontroller.util.FlowId;
24import net.onrc.onos.ofcontroller.util.FlowPath;
25import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
26
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070027import org.slf4j.Logger;
28import org.slf4j.LoggerFactory;
29
30import com.hazelcast.config.Config;
31import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070032import com.hazelcast.core.EntryEvent;
33import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070034import com.hazelcast.core.Hazelcast;
35import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070036import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070037import com.hazelcast.instance.GroupProperties;
38
39/**
40 * A datagrid service that uses Hazelcast as a datagrid.
41 * The relevant data is stored in the Hazelcast datagrid and shared as
42 * appropriate in a multi-node cluster.
43 */
44public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070045 private final static int MAX_BUFFER_SIZE = 64*1024;
46
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070047 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070048 protected IFloodlightProviderService floodlightProvider;
49
50 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070051 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070052 private Config hazelcastConfig = null;
53
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070054 private KryoFactory kryoFactory = new KryoFactory();
55
56 // State related to the Flow map
57 protected static final String mapFlowName = "mapFlow";
58 private IFlowService flowService = null;
59 private IMap<Long, byte[]> mapFlow = null;
60 private MapFlowListener mapFlowListener = null;
61 private String mapFlowListenerId = null;
62
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070063 // State related to the Network Topology map
64 protected static final String mapTopologyName = "mapTopology";
65 private IMap<String, byte[]> mapTopology = null;
66 private MapTopologyListener mapTopologyListener = null;
67 private String mapTopologyListenerId = null;
68
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070069 /**
70 * Class for receiving notifications for Flow state.
71 *
72 * The datagrid map is:
73 * - Key : Flow ID (Long)
74 * - Value : Serialized Flow (byte[])
75 */
76 class MapFlowListener implements EntryListener<Long, byte[]> {
77 /**
78 * Receive a notification that an entry is added.
79 *
80 * @param event the notification event for the entry.
81 */
82 public void entryAdded(EntryEvent event) {
83 Long keyLong = (Long)event.getKey();
84 byte[] valueBytes = (byte[])event.getValue();
85
86 //
87 // Decode the value and deliver the notification
88 //
89 Kryo kryo = kryoFactory.newKryo();
90 Input input = new Input(valueBytes);
91 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
92 kryoFactory.deleteKryo(kryo);
93 flowService.notificationRecvFlowAdded(flowPath);
94 }
95
96 /**
97 * Receive a notification that an entry is removed.
98 *
99 * @param event the notification event for the entry.
100 */
101 public void entryRemoved(EntryEvent event) {
102 Long keyLong = (Long)event.getKey();
103 byte[] valueBytes = (byte[])event.getValue();
104
105 //
106 // Decode the value and deliver the notification
107 //
108 Kryo kryo = kryoFactory.newKryo();
109 Input input = new Input(valueBytes);
110 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
111 kryoFactory.deleteKryo(kryo);
112 flowService.notificationRecvFlowRemoved(flowPath);
113 }
114
115 /**
116 * Receive a notification that an entry is updated.
117 *
118 * @param event the notification event for the entry.
119 */
120 public void entryUpdated(EntryEvent event) {
121 Long keyLong = (Long)event.getKey();
122 byte[] valueBytes = (byte[])event.getValue();
123
124 //
125 // Decode the value and deliver the notification
126 //
127 Kryo kryo = kryoFactory.newKryo();
128 Input input = new Input(valueBytes);
129 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
130 kryoFactory.deleteKryo(kryo);
131 flowService.notificationRecvFlowUpdated(flowPath);
132 }
133
134 /**
135 * Receive a notification that an entry is evicted.
136 *
137 * @param event the notification event for the entry.
138 */
139 public void entryEvicted(EntryEvent event) {
140 // NOTE: We don't use eviction for this map
141 }
142 }
143
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700144 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700145 * Class for receiving notifications for Network Topology state.
146 *
147 * The datagrid map is:
148 * - Key: TopologyElement ID (String)
149 * - Value: Serialized TopologyElement (byte[])
150 */
151 class MapTopologyListener implements EntryListener<String, byte[]> {
152 /**
153 * Receive a notification that an entry is added.
154 *
155 * @param event the notification event for the entry.
156 */
157 public void entryAdded(EntryEvent event) {
158 String keyString = (String)event.getKey();
159 byte[] valueBytes = (byte[])event.getValue();
160
161 //
162 // Decode the value and deliver the notification
163 //
164 Kryo kryo = kryoFactory.newKryo();
165 Input input = new Input(valueBytes);
166 TopologyElement topologyElement =
167 kryo.readObject(input, TopologyElement.class);
168 kryoFactory.deleteKryo(kryo);
169 flowService.notificationRecvTopologyElementAdded(topologyElement);
170 }
171
172 /**
173 * Receive a notification that an entry is removed.
174 *
175 * @param event the notification event for the entry.
176 */
177 public void entryRemoved(EntryEvent event) {
178 String keyString = (String)event.getKey();
179 byte[] valueBytes = (byte[])event.getValue();
180
181 //
182 // Decode the value and deliver the notification
183 //
184 Kryo kryo = kryoFactory.newKryo();
185 Input input = new Input(valueBytes);
186 TopologyElement topologyElement =
187 kryo.readObject(input, TopologyElement.class);
188 kryoFactory.deleteKryo(kryo);
189 flowService.notificationRecvTopologyElementRemoved(topologyElement);
190 }
191
192 /**
193 * Receive a notification that an entry is updated.
194 *
195 * @param event the notification event for the entry.
196 */
197 public void entryUpdated(EntryEvent event) {
198 String keyString = (String)event.getKey();
199 byte[] valueBytes = (byte[])event.getValue();
200
201 //
202 // Decode the value and deliver the notification
203 //
204 Kryo kryo = kryoFactory.newKryo();
205 Input input = new Input(valueBytes);
206 TopologyElement topologyElement =
207 kryo.readObject(input, TopologyElement.class);
208 kryoFactory.deleteKryo(kryo);
209 flowService.notificationRecvTopologyElementUpdated(topologyElement);
210 }
211
212 /**
213 * Receive a notification that an entry is evicted.
214 *
215 * @param event the notification event for the entry.
216 */
217 public void entryEvicted(EntryEvent event) {
218 // NOTE: We don't use eviction for this map
219 }
220 }
221
222 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700223 * Initialize the Hazelcast Datagrid operation.
224 *
225 * @param conf the configuration filename.
226 */
227 public void init(String configFilename) {
228 /*
229 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
230 System.setProperty("hazelcast.socket.send.buffer.size", "32");
231 */
232 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
233
234 // Init from configuration file
235 try {
236 hazelcastConfig = new FileSystemXmlConfig(configFilename);
237 } catch (FileNotFoundException e) {
238 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
239 }
240 /*
241 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
242 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
243 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
244 */
245 //
246 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
247 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
248 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
249 }
250
251 /**
252 * Shutdown the Hazelcast Datagrid operation.
253 */
254 public void finalize() {
255 close();
256 }
257
258 /**
259 * Shutdown the Hazelcast Datagrid operation.
260 */
261 public void close() {
262 Hazelcast.shutdownAll();
263 }
264
265 /**
266 * Get the collection of offered module services.
267 *
268 * @return the collection of offered module services.
269 */
270 @Override
271 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
272 Collection<Class<? extends IFloodlightService>> l =
273 new ArrayList<Class<? extends IFloodlightService>>();
274 l.add(IDatagridService.class);
275 return l;
276 }
277
278 /**
279 * Get the collection of implemented services.
280 *
281 * @return the collection of implemented services.
282 */
283 @Override
284 public Map<Class<? extends IFloodlightService>, IFloodlightService>
285 getServiceImpls() {
286 Map<Class<? extends IFloodlightService>,
Pavlin Radoslavov27da7532013-10-18 18:41:50 -0700287 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700288 new HashMap<Class<? extends IFloodlightService>,
289 IFloodlightService>();
290 m.put(IDatagridService.class, this);
291 return m;
292 }
293
294 /**
295 * Get the collection of modules this module depends on.
296 *
297 * @return the collection of modules this module depends on.
298 */
299 @Override
300 public Collection<Class<? extends IFloodlightService>>
301 getModuleDependencies() {
302 Collection<Class<? extends IFloodlightService>> l =
303 new ArrayList<Class<? extends IFloodlightService>>();
304 l.add(IFloodlightProviderService.class);
305 return l;
306 }
307
308 /**
309 * Initialize the module.
310 *
311 * @param context the module context to use for the initialization.
312 */
313 @Override
314 public void init(FloodlightModuleContext context)
315 throws FloodlightModuleException {
316 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
317
318 // Get the configuration file name and configure the Datagrid
319 Map<String, String> configMap = context.getConfigParams(this);
320 String configFilename = configMap.get(HazelcastConfigFile);
321 this.init(configFilename);
322 }
323
324 /**
325 * Startup module operation.
326 *
327 * @param context the module context to use for the startup.
328 */
329 @Override
330 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700331 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
332 }
333
334 /**
335 * Register Flow Service for receiving Flow-related notifications.
336 *
337 * NOTE: Only a single Flow Service can be registered.
338 *
339 * @param flowService the Flow Service to register.
340 */
341 @Override
342 public void registerFlowService(IFlowService flowService) {
343 this.flowService = flowService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700344
345 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700346 mapFlowListener = new MapFlowListener();
347 mapFlow = hazelcastInstance.getMap(mapFlowName);
348 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700349
350 // Initialize the Topology-related map state
351 mapTopologyListener = new MapTopologyListener();
352 mapTopology = hazelcastInstance.getMap(mapTopologyName);
353 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700354 }
355
356 /**
357 * De-register Flow Service for receiving Flow-related notifications.
358 *
359 * NOTE: Only a single Flow Service can be registered.
360 *
361 * @param flowService the Flow Service to de-register.
362 */
363 @Override
364 public void deregisterFlowService(IFlowService flowService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700365 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700366 mapFlow.removeEntryListener(mapFlowListenerId);
367 mapFlow = null;
368 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700369
370 // Clear the Topology-related map state
371 mapTopology.removeEntryListener(mapTopologyListenerId);
372 mapTopology = null;
373 mapTopologyListener = null;
374
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700375 this.flowService = null;
376 }
377
378 /**
379 * Get all Flows that are currently in the datagrid.
380 *
381 * @return all Flows that are currently in the datagrid.
382 */
383 @Override
384 public Collection<FlowPath> getAllFlows() {
385 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
386
387 //
388 // Get all current entries
389 //
390 Collection<byte[]> values = mapFlow.values();
391 Kryo kryo = kryoFactory.newKryo();
392 for (byte[] valueBytes : values) {
393 //
394 // Decode the value
395 //
396 Input input = new Input(valueBytes);
397 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
398 allFlows.add(flowPath);
399 }
400 kryoFactory.deleteKryo(kryo);
401
402 return allFlows;
403 }
404
405 /**
406 * Send a notification that a Flow is added.
407 *
408 * @param flowPath the flow that is added.
409 */
410 @Override
411 public void notificationSendFlowAdded(FlowPath flowPath) {
412 //
413 // Encode the value
414 //
415 byte[] buffer = new byte[MAX_BUFFER_SIZE];
416 Kryo kryo = kryoFactory.newKryo();
417 Output output = new Output(buffer, -1);
418 kryo.writeObject(output, flowPath);
419 byte[] valueBytes = output.toBytes();
420 kryoFactory.deleteKryo(kryo);
421
422 //
423 // Put the entry:
424 // - Key : Flow ID (Long)
425 // - Value : Serialized Flow (byte[])
426 //
427 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
428 }
429
430 /**
431 * Send a notification that a Flow is removed.
432 *
433 * @param flowId the Flow ID of the flow that is removed.
434 */
435 @Override
436 public void notificationSendFlowRemoved(FlowId flowId) {
437 //
438 // Remove the entry:
439 // - Key : Flow ID (Long)
440 // - Value : Serialized Flow (byte[])
441 //
442 mapFlow.removeAsync(flowId.value());
443 }
444
445 /**
446 * Send a notification that a Flow is updated.
447 *
448 * @param flowPath the flow that is updated.
449 */
450 @Override
451 public void notificationSendFlowUpdated(FlowPath flowPath) {
452 // NOTE: Adding an entry with an existing key automatically updates it
453 notificationSendFlowAdded(flowPath);
454 }
455
456 /**
457 * Send a notification that all Flows are removed.
458 */
459 @Override
460 public void notificationSendAllFlowsRemoved() {
461 //
462 // Remove all entries
463 // NOTE: We remove the entries one-by-one so the per-entry
464 // notifications will be delivered.
465 //
466 // mapFlow.clear();
467 Set<Long> keySet = mapFlow.keySet();
468 for (Long key : keySet) {
469 mapFlow.removeAsync(key);
470 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700471 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700472
473 /**
474 * Get all Topology Elements that are currently in the datagrid.
475 *
476 * @return all Topology Elements that are currently in the datagrid.
477 */
478 @Override
479 public Collection<TopologyElement> getAllTopologyElements() {
480 Collection<TopologyElement> allTopologyElements =
481 new LinkedList<TopologyElement>();
482
483 //
484 // Get all current entries
485 //
486 Collection<byte[]> values = mapTopology.values();
487 Kryo kryo = kryoFactory.newKryo();
488 for (byte[] valueBytes : values) {
489 //
490 // Decode the value
491 //
492 Input input = new Input(valueBytes);
493 TopologyElement topologyElement =
494 kryo.readObject(input, TopologyElement.class);
495 allTopologyElements.add(topologyElement);
496 }
497 kryoFactory.deleteKryo(kryo);
498
499 return allTopologyElements;
500 }
501
502 /**
503 * Send a notification that a Topology Element is added.
504 *
505 * @param topologyElement the Topology Element that is added.
506 */
507 @Override
508 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
509 //
510 // Encode the value
511 //
512 byte[] buffer = new byte[MAX_BUFFER_SIZE];
513 Kryo kryo = kryoFactory.newKryo();
514 Output output = new Output(buffer, -1);
515 kryo.writeObject(output, topologyElement);
516 byte[] valueBytes = output.toBytes();
517 kryoFactory.deleteKryo(kryo);
518
519 //
520 // Put the entry:
521 // - Key : TopologyElement ID (String)
522 // - Value : Serialized TopologyElement (byte[])
523 //
524 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
525 }
526
527 /**
528 * Send a notification that a Topology Element is removed.
529 *
530 * @param topologyElement the Topology Element that is removed.
531 */
532 @Override
533 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
534 //
535 // Remove the entry:
536 // - Key : TopologyElement ID (String)
537 // - Value : Serialized TopologyElement (byte[])
538 //
539 mapTopology.removeAsync(topologyElement.elementId());
540 }
541
542 /**
543 * Send a notification that a Topology Element is updated.
544 *
545 * @param topologyElement the Topology Element that is updated.
546 */
547 @Override
548 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
549 // NOTE: Adding an entry with an existing key automatically updates it
550 notificationSendTopologyElementAdded(topologyElement);
551 }
552
553 /**
554 * Send a notification that all Topology Elements are removed.
555 */
556 @Override
557 public void notificationSendAllTopologyElementsRemoved() {
558 //
559 // Remove all entries
560 // NOTE: We remove the entries one-by-one so the per-entry
561 // notifications will be delivered.
562 //
563 // mapTopology.clear();
564 Set<String> keySet = mapTopology.keySet();
565 for (String key : keySet) {
566 mapTopology.removeAsync(key);
567 }
568 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700569}