blob: 37ada35b3881150804ee24af9faea0dc711a0a35 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07008import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07009import java.util.Set;
10
11import com.esotericsoftware.kryo2.Kryo;
12import com.esotericsoftware.kryo2.io.Input;
13import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070014
15import net.floodlightcontroller.core.IFloodlightProviderService;
16import net.floodlightcontroller.core.module.FloodlightModuleContext;
17import net.floodlightcontroller.core.module.FloodlightModuleException;
18import net.floodlightcontroller.core.module.IFloodlightModule;
19import net.floodlightcontroller.core.module.IFloodlightService;
20
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070021import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
22import net.onrc.onos.ofcontroller.util.FlowId;
23import net.onrc.onos.ofcontroller.util.FlowPath;
24import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
25
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070026import org.slf4j.Logger;
27import org.slf4j.LoggerFactory;
28
29import com.hazelcast.config.Config;
30import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070031import com.hazelcast.core.EntryEvent;
32import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070033import com.hazelcast.core.Hazelcast;
34import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070035import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070036import com.hazelcast.instance.GroupProperties;
37
38/**
39 * A datagrid service that uses Hazelcast as a datagrid.
40 * The relevant data is stored in the Hazelcast datagrid and shared as
41 * appropriate in a multi-node cluster.
42 */
43public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070044 private final static int MAX_BUFFER_SIZE = 64*1024;
45
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070046 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070047 protected IFloodlightProviderService floodlightProvider;
48
49 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070050 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070051 private Config hazelcastConfig = null;
52
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070053 private KryoFactory kryoFactory = new KryoFactory();
54
55 // State related to the Flow map
56 protected static final String mapFlowName = "mapFlow";
57 private IFlowService flowService = null;
58 private IMap<Long, byte[]> mapFlow = null;
59 private MapFlowListener mapFlowListener = null;
60 private String mapFlowListenerId = null;
61
62 /**
63 * Class for receiving notifications for Flow state.
64 *
65 * The datagrid map is:
66 * - Key : Flow ID (Long)
67 * - Value : Serialized Flow (byte[])
68 */
69 class MapFlowListener implements EntryListener<Long, byte[]> {
70 /**
71 * Receive a notification that an entry is added.
72 *
73 * @param event the notification event for the entry.
74 */
75 public void entryAdded(EntryEvent event) {
76 Long keyLong = (Long)event.getKey();
77 byte[] valueBytes = (byte[])event.getValue();
78
79 //
80 // Decode the value and deliver the notification
81 //
82 Kryo kryo = kryoFactory.newKryo();
83 Input input = new Input(valueBytes);
84 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
85 kryoFactory.deleteKryo(kryo);
86 flowService.notificationRecvFlowAdded(flowPath);
87 }
88
89 /**
90 * Receive a notification that an entry is removed.
91 *
92 * @param event the notification event for the entry.
93 */
94 public void entryRemoved(EntryEvent event) {
95 Long keyLong = (Long)event.getKey();
96 byte[] valueBytes = (byte[])event.getValue();
97
98 //
99 // Decode the value and deliver the notification
100 //
101 Kryo kryo = kryoFactory.newKryo();
102 Input input = new Input(valueBytes);
103 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
104 kryoFactory.deleteKryo(kryo);
105 flowService.notificationRecvFlowRemoved(flowPath);
106 }
107
108 /**
109 * Receive a notification that an entry is updated.
110 *
111 * @param event the notification event for the entry.
112 */
113 public void entryUpdated(EntryEvent event) {
114 Long keyLong = (Long)event.getKey();
115 byte[] valueBytes = (byte[])event.getValue();
116
117 //
118 // Decode the value and deliver the notification
119 //
120 Kryo kryo = kryoFactory.newKryo();
121 Input input = new Input(valueBytes);
122 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
123 kryoFactory.deleteKryo(kryo);
124 flowService.notificationRecvFlowUpdated(flowPath);
125 }
126
127 /**
128 * Receive a notification that an entry is evicted.
129 *
130 * @param event the notification event for the entry.
131 */
132 public void entryEvicted(EntryEvent event) {
133 // NOTE: We don't use eviction for this map
134 }
135 }
136
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700137 /**
138 * Initialize the Hazelcast Datagrid operation.
139 *
140 * @param conf the configuration filename.
141 */
142 public void init(String configFilename) {
143 /*
144 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
145 System.setProperty("hazelcast.socket.send.buffer.size", "32");
146 */
147 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
148
149 // Init from configuration file
150 try {
151 hazelcastConfig = new FileSystemXmlConfig(configFilename);
152 } catch (FileNotFoundException e) {
153 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
154 }
155 /*
156 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
157 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
158 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
159 */
160 //
161 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
162 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
163 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
164 }
165
166 /**
167 * Shutdown the Hazelcast Datagrid operation.
168 */
169 public void finalize() {
170 close();
171 }
172
173 /**
174 * Shutdown the Hazelcast Datagrid operation.
175 */
176 public void close() {
177 Hazelcast.shutdownAll();
178 }
179
180 /**
181 * Get the collection of offered module services.
182 *
183 * @return the collection of offered module services.
184 */
185 @Override
186 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
187 Collection<Class<? extends IFloodlightService>> l =
188 new ArrayList<Class<? extends IFloodlightService>>();
189 l.add(IDatagridService.class);
190 return l;
191 }
192
193 /**
194 * Get the collection of implemented services.
195 *
196 * @return the collection of implemented services.
197 */
198 @Override
199 public Map<Class<? extends IFloodlightService>, IFloodlightService>
200 getServiceImpls() {
201 Map<Class<? extends IFloodlightService>,
Pavlin Radoslavov27da7532013-10-18 18:41:50 -0700202 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700203 new HashMap<Class<? extends IFloodlightService>,
204 IFloodlightService>();
205 m.put(IDatagridService.class, this);
206 return m;
207 }
208
209 /**
210 * Get the collection of modules this module depends on.
211 *
212 * @return the collection of modules this module depends on.
213 */
214 @Override
215 public Collection<Class<? extends IFloodlightService>>
216 getModuleDependencies() {
217 Collection<Class<? extends IFloodlightService>> l =
218 new ArrayList<Class<? extends IFloodlightService>>();
219 l.add(IFloodlightProviderService.class);
220 return l;
221 }
222
223 /**
224 * Initialize the module.
225 *
226 * @param context the module context to use for the initialization.
227 */
228 @Override
229 public void init(FloodlightModuleContext context)
230 throws FloodlightModuleException {
231 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
232
233 // Get the configuration file name and configure the Datagrid
234 Map<String, String> configMap = context.getConfigParams(this);
235 String configFilename = configMap.get(HazelcastConfigFile);
236 this.init(configFilename);
237 }
238
239 /**
240 * Startup module operation.
241 *
242 * @param context the module context to use for the startup.
243 */
244 @Override
245 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700246 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
247 }
248
249 /**
250 * Register Flow Service for receiving Flow-related notifications.
251 *
252 * NOTE: Only a single Flow Service can be registered.
253 *
254 * @param flowService the Flow Service to register.
255 */
256 @Override
257 public void registerFlowService(IFlowService flowService) {
258 this.flowService = flowService;
259 mapFlowListener = new MapFlowListener();
260 mapFlow = hazelcastInstance.getMap(mapFlowName);
261 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
262 }
263
264 /**
265 * De-register Flow Service for receiving Flow-related notifications.
266 *
267 * NOTE: Only a single Flow Service can be registered.
268 *
269 * @param flowService the Flow Service to de-register.
270 */
271 @Override
272 public void deregisterFlowService(IFlowService flowService) {
273 mapFlow.removeEntryListener(mapFlowListenerId);
274 mapFlow = null;
275 mapFlowListener = null;
276 this.flowService = null;
277 }
278
279 /**
280 * Get all Flows that are currently in the datagrid.
281 *
282 * @return all Flows that are currently in the datagrid.
283 */
284 @Override
285 public Collection<FlowPath> getAllFlows() {
286 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
287
288 //
289 // Get all current entries
290 //
291 Collection<byte[]> values = mapFlow.values();
292 Kryo kryo = kryoFactory.newKryo();
293 for (byte[] valueBytes : values) {
294 //
295 // Decode the value
296 //
297 Input input = new Input(valueBytes);
298 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
299 allFlows.add(flowPath);
300 }
301 kryoFactory.deleteKryo(kryo);
302
303 return allFlows;
304 }
305
306 /**
307 * Send a notification that a Flow is added.
308 *
309 * @param flowPath the flow that is added.
310 */
311 @Override
312 public void notificationSendFlowAdded(FlowPath flowPath) {
313 //
314 // Encode the value
315 //
316 byte[] buffer = new byte[MAX_BUFFER_SIZE];
317 Kryo kryo = kryoFactory.newKryo();
318 Output output = new Output(buffer, -1);
319 kryo.writeObject(output, flowPath);
320 byte[] valueBytes = output.toBytes();
321 kryoFactory.deleteKryo(kryo);
322
323 //
324 // Put the entry:
325 // - Key : Flow ID (Long)
326 // - Value : Serialized Flow (byte[])
327 //
328 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
329 }
330
331 /**
332 * Send a notification that a Flow is removed.
333 *
334 * @param flowId the Flow ID of the flow that is removed.
335 */
336 @Override
337 public void notificationSendFlowRemoved(FlowId flowId) {
338 //
339 // Remove the entry:
340 // - Key : Flow ID (Long)
341 // - Value : Serialized Flow (byte[])
342 //
343 mapFlow.removeAsync(flowId.value());
344 }
345
346 /**
347 * Send a notification that a Flow is updated.
348 *
349 * @param flowPath the flow that is updated.
350 */
351 @Override
352 public void notificationSendFlowUpdated(FlowPath flowPath) {
353 // NOTE: Adding an entry with an existing key automatically updates it
354 notificationSendFlowAdded(flowPath);
355 }
356
357 /**
358 * Send a notification that all Flows are removed.
359 */
360 @Override
361 public void notificationSendAllFlowsRemoved() {
362 //
363 // Remove all entries
364 // NOTE: We remove the entries one-by-one so the per-entry
365 // notifications will be delivered.
366 //
367 // mapFlow.clear();
368 Set<Long> keySet = mapFlow.keySet();
369 for (Long key : keySet) {
370 mapFlow.removeAsync(key);
371 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700372 }
373}