blob: 0c7be880cb4902776ec347b137bae5cf00542f87 [file] [log] [blame]
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07001package net.onrc.onos.datagrid;
2
3import java.io.FileNotFoundException;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -07007import java.util.LinkedList;
Jonathan Hart18ad55c2013-11-11 22:49:55 -08008import java.util.List;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -07009import java.util.Map;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070010import java.util.Set;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080011import java.util.concurrent.TimeUnit;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070012
13import net.floodlightcontroller.core.IFloodlightProviderService;
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070018import net.floodlightcontroller.restserver.IRestApiService;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070019import net.onrc.onos.datagrid.web.DatagridWebRoutable;
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070020import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
Jonathan Hart7804bea2014-01-07 10:50:52 -080021import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
22import net.onrc.onos.ofcontroller.proxyarp.IArpReplyEventHandler;
23import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
24import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070025import net.onrc.onos.ofcontroller.topology.TopologyElement;
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070026import net.onrc.onos.ofcontroller.util.FlowEntry;
27import net.onrc.onos.ofcontroller.util.FlowEntryId;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070028import net.onrc.onos.ofcontroller.util.FlowId;
29import net.onrc.onos.ofcontroller.util.FlowPath;
30import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
31
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070032import org.slf4j.Logger;
33import org.slf4j.LoggerFactory;
34
Jonathan Hart18ad55c2013-11-11 22:49:55 -080035import com.esotericsoftware.kryo2.Kryo;
36import com.esotericsoftware.kryo2.io.Input;
37import com.esotericsoftware.kryo2.io.Output;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070038import com.hazelcast.config.Config;
39import com.hazelcast.config.FileSystemXmlConfig;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070040import com.hazelcast.core.EntryEvent;
41import com.hazelcast.core.EntryListener;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070042import com.hazelcast.core.Hazelcast;
43import com.hazelcast.core.HazelcastInstance;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070044import com.hazelcast.core.IMap;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070045import com.hazelcast.instance.GroupProperties;
46
47/**
48 * A datagrid service that uses Hazelcast as a datagrid.
49 * The relevant data is stored in the Hazelcast datagrid and shared as
50 * appropriate in a multi-node cluster.
51 */
52public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070053 private final static int MAX_BUFFER_SIZE = 64*1024;
54
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070055 protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070056 protected IFloodlightProviderService floodlightProvider;
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -070057 protected IRestApiService restApi;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070058
59 protected static final String HazelcastConfigFile = "datagridConfig";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070060 private HazelcastInstance hazelcastInstance = null;
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -070061 private Config hazelcastConfig = null;
62
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070063 private KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov9a859022013-10-30 10:08:24 -070064 private IFlowEventHandlerService flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070065
66 // State related to the Flow map
67 protected static final String mapFlowName = "mapFlow";
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070068 private IMap<Long, byte[]> mapFlow = null;
69 private MapFlowListener mapFlowListener = null;
70 private String mapFlowListenerId = null;
71
Pavlin Radoslavovb7506842013-10-29 17:46:54 -070072 // State related to the Flow Entry map
73 protected static final String mapFlowEntryName = "mapFlowEntry";
74 private IMap<Long, byte[]> mapFlowEntry = null;
75 private MapFlowEntryListener mapFlowEntryListener = null;
76 private String mapFlowEntryListenerId = null;
77
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070078 // State related to the Network Topology map
79 protected static final String mapTopologyName = "mapTopology";
80 private IMap<String, byte[]> mapTopology = null;
81 private MapTopologyListener mapTopologyListener = null;
82 private String mapTopologyListenerId = null;
Jonathan Hart18ad55c2013-11-11 22:49:55 -080083
Jonathan Hart7804bea2014-01-07 10:50:52 -080084 // State related to the packet out map
85 protected static final String packetOutMapName = "packetOutMap";
86 private IMap<PacketOutNotification, byte[]> packetOutMap = null;
87 private List<IPacketOutEventHandler> packetOutEventHandlers = new ArrayList<IPacketOutEventHandler>();
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -080088
Jonathan Hart18ad55c2013-11-11 22:49:55 -080089 private final byte[] dummyByte = {0};
Jonathan Hart7804bea2014-01-07 10:50:52 -080090
91 // State related to the ARP reply map
92 protected static final String arpReplyMapName = "arpReplyMap";
93 private IMap<ArpReplyNotification, byte[]> arpReplyMap = null;
94 private List<IArpReplyEventHandler> arpReplyEventHandlers = new ArrayList<IArpReplyEventHandler>();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -070095
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -070096 /**
97 * Class for receiving notifications for Flow state.
98 *
99 * The datagrid map is:
100 * - Key : Flow ID (Long)
Pavlin Radoslavov5367d212013-11-07 11:18:51 -0800101 * - Value : Serialized FlowPath (byte[])
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700102 */
103 class MapFlowListener implements EntryListener<Long, byte[]> {
104 /**
105 * Receive a notification that an entry is added.
106 *
107 * @param event the notification event for the entry.
108 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800109 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800110 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800111 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700112
113 //
114 // Decode the value and deliver the notification
115 //
116 Kryo kryo = kryoFactory.newKryo();
117 Input input = new Input(valueBytes);
118 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
119 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700120 flowEventHandlerService.notificationRecvFlowAdded(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700121 }
122
123 /**
124 * Receive a notification that an entry is removed.
125 *
126 * @param event the notification event for the entry.
127 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800128 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800129 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800130 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700131
132 //
133 // Decode the value and deliver the notification
134 //
135 Kryo kryo = kryoFactory.newKryo();
136 Input input = new Input(valueBytes);
137 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
138 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700139 flowEventHandlerService.notificationRecvFlowRemoved(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700140 }
141
142 /**
143 * Receive a notification that an entry is updated.
144 *
145 * @param event the notification event for the entry.
146 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800147 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800148 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800149 byte[] valueBytes = event.getValue();
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700150
151 //
152 // Decode the value and deliver the notification
153 //
154 Kryo kryo = kryoFactory.newKryo();
155 Input input = new Input(valueBytes);
156 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
157 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700158 flowEventHandlerService.notificationRecvFlowUpdated(flowPath);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700159 }
160
161 /**
162 * Receive a notification that an entry is evicted.
163 *
164 * @param event the notification event for the entry.
165 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800166 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800167 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700168 // NOTE: We don't use eviction for this map
169 }
170 }
171
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700172 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700173 * Class for receiving notifications for FlowEntry state.
174 *
175 * The datagrid map is:
176 * - Key : FlowEntry ID (Long)
177 * - Value : Serialized FlowEntry (byte[])
178 */
179 class MapFlowEntryListener implements EntryListener<Long, byte[]> {
180 /**
181 * Receive a notification that an entry is added.
182 *
183 * @param event the notification event for the entry.
184 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800185 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800186 public void entryAdded(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800187 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700188
189 //
190 // Decode the value and deliver the notification
191 //
192 Kryo kryo = kryoFactory.newKryo();
193 Input input = new Input(valueBytes);
194 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
195 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700196 flowEventHandlerService.notificationRecvFlowEntryAdded(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700197 }
198
199 /**
200 * Receive a notification that an entry is removed.
201 *
202 * @param event the notification event for the entry.
203 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800204 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800205 public void entryRemoved(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800206 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700207
208 //
209 // Decode the value and deliver the notification
210 //
211 Kryo kryo = kryoFactory.newKryo();
212 Input input = new Input(valueBytes);
213 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
214 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700215 flowEventHandlerService.notificationRecvFlowEntryRemoved(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700216 }
217
218 /**
219 * Receive a notification that an entry is updated.
220 *
221 * @param event the notification event for the entry.
222 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800223 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800224 public void entryUpdated(EntryEvent<Long, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800225 byte[] valueBytes = event.getValue();
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700226
227 //
228 // Decode the value and deliver the notification
229 //
230 Kryo kryo = kryoFactory.newKryo();
231 Input input = new Input(valueBytes);
232 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
233 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700234 flowEventHandlerService.notificationRecvFlowEntryUpdated(flowEntry);
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700235 }
236
237 /**
238 * Receive a notification that an entry is evicted.
239 *
240 * @param event the notification event for the entry.
241 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800242 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800243 public void entryEvicted(EntryEvent<Long, byte[]> event) {
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700244 // NOTE: We don't use eviction for this map
245 }
246 }
247
248 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700249 * Class for receiving notifications for Network Topology state.
250 *
251 * The datagrid map is:
252 * - Key: TopologyElement ID (String)
253 * - Value: Serialized TopologyElement (byte[])
254 */
255 class MapTopologyListener implements EntryListener<String, byte[]> {
256 /**
257 * Receive a notification that an entry is added.
258 *
259 * @param event the notification event for the entry.
260 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800261 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800262 public void entryAdded(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800263 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700264
265 //
266 // Decode the value and deliver the notification
267 //
268 Kryo kryo = kryoFactory.newKryo();
269 Input input = new Input(valueBytes);
270 TopologyElement topologyElement =
271 kryo.readObject(input, TopologyElement.class);
272 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700273 flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700274 }
275
276 /**
277 * Receive a notification that an entry is removed.
278 *
279 * @param event the notification event for the entry.
280 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800281 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800282 public void entryRemoved(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800283 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700284
285 //
286 // Decode the value and deliver the notification
287 //
288 Kryo kryo = kryoFactory.newKryo();
289 Input input = new Input(valueBytes);
290 TopologyElement topologyElement =
291 kryo.readObject(input, TopologyElement.class);
292 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700293 flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700294 }
295
296 /**
297 * Receive a notification that an entry is updated.
298 *
299 * @param event the notification event for the entry.
300 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800301 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800302 public void entryUpdated(EntryEvent<String, byte[]> event) {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800303 byte[] valueBytes = event.getValue();
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700304
305 //
306 // Decode the value and deliver the notification
307 //
308 Kryo kryo = kryoFactory.newKryo();
309 Input input = new Input(valueBytes);
310 TopologyElement topologyElement =
311 kryo.readObject(input, TopologyElement.class);
312 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700313 flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700314 }
315
316 /**
317 * Receive a notification that an entry is evicted.
318 *
319 * @param event the notification event for the entry.
320 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800321 @Override
Pavlin Radoslavov95e6e902013-12-11 12:03:29 -0800322 public void entryEvicted(EntryEvent<String, byte[]> event) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700323 // NOTE: We don't use eviction for this map
324 }
325 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800326
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800327 /**
Jonathan Hart7804bea2014-01-07 10:50:52 -0800328 * Class for receiving notifications for sending packet-outs.
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800329 *
330 * The datagrid map is:
Jonathan Hart7804bea2014-01-07 10:50:52 -0800331 * - Key: Packet-out to send (PacketOutNotification)
332 * - Value: dummy value (we only need the key) (byte[])
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800333 */
Jonathan Hart7804bea2014-01-07 10:50:52 -0800334 class PacketOutMapListener implements EntryListener<PacketOutNotification, byte[]> {
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800335 /**
336 * Receive a notification that an entry is added.
337 *
338 * @param event the notification event for the entry.
339 */
Jonathan Hart7804bea2014-01-07 10:50:52 -0800340 public void entryAdded(EntryEvent<PacketOutNotification, byte[]> event) {
341 for (IPacketOutEventHandler packetOutEventHandler : packetOutEventHandlers) {
342 packetOutEventHandler.packetOutNotification(event.getKey());
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800343 }
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800344 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800345
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800346 /**
347 * Receive a notification that an entry is removed.
348 *
349 * @param event the notification event for the entry.
350 */
Jonathan Hart7804bea2014-01-07 10:50:52 -0800351 public void entryRemoved(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800352 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800353 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800354
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800355 /**
356 * Receive a notification that an entry is updated.
357 *
358 * @param event the notification event for the entry.
359 */
Jonathan Hart7804bea2014-01-07 10:50:52 -0800360 public void entryUpdated(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800361 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800362 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800363
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800364 /**
365 * Receive a notification that an entry is evicted.
366 *
367 * @param event the notification event for the entry.
368 */
Jonathan Hart7804bea2014-01-07 10:50:52 -0800369 public void entryEvicted(EntryEvent<PacketOutNotification, byte[]> event) {
Jonathan Hart799242b2013-11-12 12:57:29 -0800370 // Not used
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800371 }
372 }
Jonathan Hart7804bea2014-01-07 10:50:52 -0800373
374 /**
375 * Class for receiving notifications for sending packet-outs.
376 *
377 * The datagrid map is:
378 * - Key: Packet-out to send (PacketOutNotification)
379 * - Value: dummy value (we only need the key) (byte[])
380 */
381 class ArpReplyMapListener implements EntryListener<ArpReplyNotification, byte[]> {
382 /**
383 * Receive a notification that an entry is added.
384 *
385 * @param event the notification event for the entry.
386 */
387 public void entryAdded(EntryEvent<ArpReplyNotification, byte[]> event) {
388 for (IArpReplyEventHandler arpReplyEventHandler : arpReplyEventHandlers) {
389 arpReplyEventHandler.arpReplyEvent(event.getKey());
390 }
391 }
392
393 // These methods aren't used for ARP replies
394 public void entryRemoved(EntryEvent<ArpReplyNotification, byte[]> event) {}
395 public void entryUpdated(EntryEvent<ArpReplyNotification, byte[]> event) {}
396 public void entryEvicted(EntryEvent<ArpReplyNotification, byte[]> event) {}
397 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700398
399 /**
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700400 * Initialize the Hazelcast Datagrid operation.
401 *
402 * @param conf the configuration filename.
403 */
404 public void init(String configFilename) {
405 /*
406 System.setProperty("hazelcast.socket.receive.buffer.size", "32");
407 System.setProperty("hazelcast.socket.send.buffer.size", "32");
408 */
409 // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800410
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700411 // Init from configuration file
412 try {
413 hazelcastConfig = new FileSystemXmlConfig(configFilename);
414 } catch (FileNotFoundException e) {
415 log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
416 }
417 /*
418 hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
419 hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
420 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
421 */
422 //
423 hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
424 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
425 hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
426 }
427
428 /**
429 * Shutdown the Hazelcast Datagrid operation.
430 */
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800431 @Override
432 protected void finalize() {
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700433 close();
434 }
435
436 /**
437 * Shutdown the Hazelcast Datagrid operation.
438 */
439 public void close() {
440 Hazelcast.shutdownAll();
441 }
442
443 /**
444 * Get the collection of offered module services.
445 *
446 * @return the collection of offered module services.
447 */
448 @Override
449 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800450 Collection<Class<? extends IFloodlightService>> l =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700451 new ArrayList<Class<? extends IFloodlightService>>();
452 l.add(IDatagridService.class);
453 return l;
454 }
455
456 /**
457 * Get the collection of implemented services.
458 *
459 * @return the collection of implemented services.
460 */
461 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800462 public Map<Class<? extends IFloodlightService>, IFloodlightService>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700463 getServiceImpls() {
464 Map<Class<? extends IFloodlightService>,
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800465 IFloodlightService> m =
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700466 new HashMap<Class<? extends IFloodlightService>,
467 IFloodlightService>();
468 m.put(IDatagridService.class, this);
469 return m;
470 }
471
472 /**
473 * Get the collection of modules this module depends on.
474 *
475 * @return the collection of modules this module depends on.
476 */
477 @Override
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800478 public Collection<Class<? extends IFloodlightService>>
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700479 getModuleDependencies() {
480 Collection<Class<? extends IFloodlightService>> l =
481 new ArrayList<Class<? extends IFloodlightService>>();
482 l.add(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700483 l.add(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700484 return l;
485 }
486
487 /**
488 * Initialize the module.
489 *
490 * @param context the module context to use for the initialization.
491 */
492 @Override
493 public void init(FloodlightModuleContext context)
494 throws FloodlightModuleException {
495 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700496 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700497
498 // Get the configuration file name and configure the Datagrid
499 Map<String, String> configMap = context.getConfigParams(this);
500 String configFilename = configMap.get(HazelcastConfigFile);
501 this.init(configFilename);
502 }
503
504 /**
505 * Startup module operation.
506 *
507 * @param context the module context to use for the startup.
508 */
509 @Override
510 public void startUp(FloodlightModuleContext context) {
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700511 hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
Pavlin Radoslavovda7ef612013-10-30 16:12:14 -0700512
513 restApi.addRestletRoutable(new DatagridWebRoutable());
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800514
Jonathan Hart7804bea2014-01-07 10:50:52 -0800515 packetOutMap = hazelcastInstance.getMap(packetOutMapName);
516 packetOutMap.addEntryListener(new PacketOutMapListener(), true);
517
518 arpReplyMap = hazelcastInstance.getMap(arpReplyMapName);
519 arpReplyMap.addEntryListener(new ArpReplyMapListener(), true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700520 }
521
522 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700523 * Register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700524 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700525 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700526 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700527 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700528 * @param flowEventHandlerService the Flow Event Handler Service to register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700529 */
530 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700531 public void registerFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
532 this.flowEventHandlerService = flowEventHandlerService;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700533
534 // Initialize the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700535 mapFlowListener = new MapFlowListener();
536 mapFlow = hazelcastInstance.getMap(mapFlowName);
537 mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700538
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700539 // Initialize the FlowEntry-related map state
540 mapFlowEntryListener = new MapFlowEntryListener();
541 mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
542 mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
543
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700544 // Initialize the Topology-related map state
545 mapTopologyListener = new MapTopologyListener();
546 mapTopology = hazelcastInstance.getMap(mapTopologyName);
547 mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700548 }
549
550 /**
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700551 * De-register Flow Event Handler Service for receiving Flow-related
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700552 * notifications.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700553 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700554 * NOTE: Only a single Flow Event Handler Service can be registered.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700555 *
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700556 * @param flowEventHandlerService the Flow Event Handler Service to
Pavlin Radoslavov6b79f2b2013-10-26 21:31:10 -0700557 * de-register.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700558 */
559 @Override
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700560 public void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService) {
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700561 // Clear the Flow-related map state
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700562 mapFlow.removeEntryListener(mapFlowListenerId);
563 mapFlow = null;
564 mapFlowListener = null;
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700565
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700566 // Clear the FlowEntry-related map state
567 mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
568 mapFlowEntry = null;
569 mapFlowEntryListener = null;
570
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700571 // Clear the Topology-related map state
572 mapTopology.removeEntryListener(mapTopologyListenerId);
573 mapTopology = null;
574 mapTopologyListener = null;
575
Pavlin Radoslavov9a859022013-10-30 10:08:24 -0700576 this.flowEventHandlerService = null;
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700577 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800578
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800579 @Override
Jonathan Hartc6325622014-01-14 16:37:50 -0800580 public void registerPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
581 if (packetOutEventHandler != null) {
582 packetOutEventHandlers.add(packetOutEventHandler);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800583 }
584 }
585
586 @Override
Jonathan Hartc6325622014-01-14 16:37:50 -0800587 public void deregisterPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
588 packetOutEventHandlers.remove(packetOutEventHandler);
Jonathan Hart7804bea2014-01-07 10:50:52 -0800589 }
590
591 @Override
592 public void registerArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
593 if (arpReplyEventHandler != null) {
594 arpReplyEventHandlers.add(arpReplyEventHandler);
595 }
596 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800597
Jonathan Hart7804bea2014-01-07 10:50:52 -0800598 @Override
599 public void deregisterArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
600 arpReplyEventHandlers.remove(arpReplyEventHandler);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800601 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800602
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700603 /**
604 * Get all Flows that are currently in the datagrid.
605 *
606 * @return all Flows that are currently in the datagrid.
607 */
608 @Override
609 public Collection<FlowPath> getAllFlows() {
610 Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
611
612 //
613 // Get all current entries
614 //
615 Collection<byte[]> values = mapFlow.values();
616 Kryo kryo = kryoFactory.newKryo();
617 for (byte[] valueBytes : values) {
618 //
619 // Decode the value
620 //
621 Input input = new Input(valueBytes);
622 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
623 allFlows.add(flowPath);
624 }
625 kryoFactory.deleteKryo(kryo);
626
627 return allFlows;
628 }
629
630 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800631 * Get a Flow for a given Flow ID.
632 *
633 * @param flowId the Flow ID of the Flow to get.
634 * @return the Flow if found, otherwise null.
635 */
636 @Override
637 public FlowPath getFlow(FlowId flowId) {
638 byte[] valueBytes = mapFlow.get(flowId.value());
639 if (valueBytes == null)
640 return null;
641
642 Kryo kryo = kryoFactory.newKryo();
643 //
644 // Decode the value
645 //
646 Input input = new Input(valueBytes);
647 FlowPath flowPath = kryo.readObject(input, FlowPath.class);
648 kryoFactory.deleteKryo(kryo);
649
650 return flowPath;
651 }
652
653 /**
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700654 * Send a notification that a Flow is added.
655 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700656 * @param flowPath the Flow that is added.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700657 */
658 @Override
659 public void notificationSendFlowAdded(FlowPath flowPath) {
660 //
661 // Encode the value
662 //
663 byte[] buffer = new byte[MAX_BUFFER_SIZE];
664 Kryo kryo = kryoFactory.newKryo();
665 Output output = new Output(buffer, -1);
666 kryo.writeObject(output, flowPath);
667 byte[] valueBytes = output.toBytes();
668 kryoFactory.deleteKryo(kryo);
669
670 //
671 // Put the entry:
672 // - Key : Flow ID (Long)
673 // - Value : Serialized Flow (byte[])
674 //
675 mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
676 }
677
678 /**
679 * Send a notification that a Flow is removed.
680 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700681 * @param flowId the Flow ID of the Flow that is removed.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700682 */
683 @Override
684 public void notificationSendFlowRemoved(FlowId flowId) {
685 //
686 // Remove the entry:
687 // - Key : Flow ID (Long)
688 // - Value : Serialized Flow (byte[])
689 //
690 mapFlow.removeAsync(flowId.value());
691 }
692
693 /**
694 * Send a notification that a Flow is updated.
695 *
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700696 * @param flowPath the Flow that is updated.
Pavlin Radoslavov1308dc62013-10-25 15:54:31 -0700697 */
698 @Override
699 public void notificationSendFlowUpdated(FlowPath flowPath) {
700 // NOTE: Adding an entry with an existing key automatically updates it
701 notificationSendFlowAdded(flowPath);
702 }
703
704 /**
705 * Send a notification that all Flows are removed.
706 */
707 @Override
708 public void notificationSendAllFlowsRemoved() {
709 //
710 // Remove all entries
711 // NOTE: We remove the entries one-by-one so the per-entry
712 // notifications will be delivered.
713 //
714 // mapFlow.clear();
715 Set<Long> keySet = mapFlow.keySet();
716 for (Long key : keySet) {
717 mapFlow.removeAsync(key);
718 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700719 }
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700720
721 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700722 * Get all Flow Entries that are currently in the datagrid.
723 *
724 * @return all Flow Entries that are currently in the datagrid.
725 */
726 @Override
727 public Collection<FlowEntry> getAllFlowEntries() {
728 Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
729
730 //
731 // Get all current entries
732 //
733 Collection<byte[]> values = mapFlowEntry.values();
734 Kryo kryo = kryoFactory.newKryo();
735 for (byte[] valueBytes : values) {
736 //
737 // Decode the value
738 //
739 Input input = new Input(valueBytes);
740 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
741 allFlowEntries.add(flowEntry);
742 }
743 kryoFactory.deleteKryo(kryo);
744
745 return allFlowEntries;
746 }
747
748 /**
Pavlin Radoslavov379c9042013-11-26 15:40:49 -0800749 * Get a Flow Entry for a given Flow Entry ID.
750 *
751 * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
752 * @return the Flow Entry if found, otherwise null.
753 */
754 @Override
755 public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
756 byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
757 if (valueBytes == null)
758 return null;
759
760 Kryo kryo = kryoFactory.newKryo();
761 //
762 // Decode the value
763 //
764 Input input = new Input(valueBytes);
765 FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
766 kryoFactory.deleteKryo(kryo);
767
768 return flowEntry;
769 }
770
771 /**
Pavlin Radoslavovb7506842013-10-29 17:46:54 -0700772 * Send a notification that a FlowEntry is added.
773 *
774 * @param flowEntry the FlowEntry that is added.
775 */
776 @Override
777 public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
778 //
779 // Encode the value
780 //
781 byte[] buffer = new byte[MAX_BUFFER_SIZE];
782 Kryo kryo = kryoFactory.newKryo();
783 Output output = new Output(buffer, -1);
784 kryo.writeObject(output, flowEntry);
785 byte[] valueBytes = output.toBytes();
786 kryoFactory.deleteKryo(kryo);
787
788 //
789 // Put the entry:
790 // - Key : FlowEntry ID (Long)
791 // - Value : Serialized FlowEntry (byte[])
792 //
793 mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
794 }
795
796 /**
797 * Send a notification that a FlowEntry is removed.
798 *
799 * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
800 */
801 @Override
802 public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
803 //
804 // Remove the entry:
805 // - Key : FlowEntry ID (Long)
806 // - Value : Serialized FlowEntry (byte[])
807 //
808 mapFlowEntry.removeAsync(flowEntryId.value());
809 }
810
811 /**
812 * Send a notification that a FlowEntry is updated.
813 *
814 * @param flowEntry the FlowEntry that is updated.
815 */
816 @Override
817 public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
818 // NOTE: Adding an entry with an existing key automatically updates it
819 notificationSendFlowEntryAdded(flowEntry);
820 }
821
822 /**
823 * Send a notification that all Flow Entries are removed.
824 */
825 @Override
826 public void notificationSendAllFlowEntriesRemoved() {
827 //
828 // Remove all entries
829 // NOTE: We remove the entries one-by-one so the per-entry
830 // notifications will be delivered.
831 //
832 // mapFlowEntry.clear();
833 Set<Long> keySet = mapFlowEntry.keySet();
834 for (Long key : keySet) {
835 mapFlowEntry.removeAsync(key);
836 }
837 }
838
839 /**
Pavlin Radoslavovaaace7f2013-10-25 19:42:00 -0700840 * Get all Topology Elements that are currently in the datagrid.
841 *
842 * @return all Topology Elements that are currently in the datagrid.
843 */
844 @Override
845 public Collection<TopologyElement> getAllTopologyElements() {
846 Collection<TopologyElement> allTopologyElements =
847 new LinkedList<TopologyElement>();
848
849 //
850 // Get all current entries
851 //
852 Collection<byte[]> values = mapTopology.values();
853 Kryo kryo = kryoFactory.newKryo();
854 for (byte[] valueBytes : values) {
855 //
856 // Decode the value
857 //
858 Input input = new Input(valueBytes);
859 TopologyElement topologyElement =
860 kryo.readObject(input, TopologyElement.class);
861 allTopologyElements.add(topologyElement);
862 }
863 kryoFactory.deleteKryo(kryo);
864
865 return allTopologyElements;
866 }
867
868 /**
869 * Send a notification that a Topology Element is added.
870 *
871 * @param topologyElement the Topology Element that is added.
872 */
873 @Override
874 public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
875 //
876 // Encode the value
877 //
878 byte[] buffer = new byte[MAX_BUFFER_SIZE];
879 Kryo kryo = kryoFactory.newKryo();
880 Output output = new Output(buffer, -1);
881 kryo.writeObject(output, topologyElement);
882 byte[] valueBytes = output.toBytes();
883 kryoFactory.deleteKryo(kryo);
884
885 //
886 // Put the entry:
887 // - Key : TopologyElement ID (String)
888 // - Value : Serialized TopologyElement (byte[])
889 //
890 mapTopology.putAsync(topologyElement.elementId(), valueBytes);
891 }
892
893 /**
894 * Send a notification that a Topology Element is removed.
895 *
896 * @param topologyElement the Topology Element that is removed.
897 */
898 @Override
899 public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
900 //
901 // Remove the entry:
902 // - Key : TopologyElement ID (String)
903 // - Value : Serialized TopologyElement (byte[])
904 //
905 mapTopology.removeAsync(topologyElement.elementId());
906 }
907
908 /**
909 * Send a notification that a Topology Element is updated.
910 *
911 * @param topologyElement the Topology Element that is updated.
912 */
913 @Override
914 public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
915 // NOTE: Adding an entry with an existing key automatically updates it
916 notificationSendTopologyElementAdded(topologyElement);
917 }
918
919 /**
920 * Send a notification that all Topology Elements are removed.
921 */
922 @Override
923 public void notificationSendAllTopologyElementsRemoved() {
924 //
925 // Remove all entries
926 // NOTE: We remove the entries one-by-one so the per-entry
927 // notifications will be delivered.
928 //
929 // mapTopology.clear();
930 Set<String> keySet = mapTopology.keySet();
931 for (String key : keySet) {
932 mapTopology.removeAsync(key);
933 }
934 }
Yuta HIGUCHI67a7a3e2014-01-03 14:51:34 -0800935
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800936 @Override
Jonathan Hart7804bea2014-01-07 10:50:52 -0800937 public void sendPacketOutNotification(PacketOutNotification packetOutNotification) {
938 packetOutMap.putAsync(packetOutNotification, dummyByte, 1L, TimeUnit.MILLISECONDS);
Jonathan Hart18ad55c2013-11-11 22:49:55 -0800939 }
Jonathan Hart7804bea2014-01-07 10:50:52 -0800940
941 @Override
942 public void sendArpReplyNotification(ArpReplyNotification arpReply) {
943 arpReplyMap.putAsync(arpReply, dummyByte, 1L, TimeUnit.MILLISECONDS);
944 }
Pavlin Radoslavov1eee2c82013-10-15 02:30:32 -0700945}