Renamed datagrid and datastore packages
net.onrc.onos.datagrid.* => net.onrc.onos.core.datagrid.*
net.onrc.onos.datastore.* => net.onrc.onos.core.datastore.*
Change-Id: Ibe1894a6fabae08ea7cfcbf6595f0c91b05ef497
diff --git a/src/main/java/net/onrc/onos/core/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/core/datagrid/HazelcastDatagrid.java
new file mode 100755
index 0000000..7d2b7f3
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/HazelcastDatagrid.java
@@ -0,0 +1,287 @@
+package net.onrc.onos.core.datagrid;
+
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.core.module.FloodlightModuleException;
+import net.floodlightcontroller.core.module.IFloodlightModule;
+import net.floodlightcontroller.core.module.IFloodlightService;
+import net.floodlightcontroller.restserver.IRestApiService;
+import net.onrc.onos.core.datagrid.web.DatagridWebRoutable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.instance.GroupProperties;
+
+/**
+ * A datagrid service that uses Hazelcast as a datagrid.
+ * The relevant data is stored in the Hazelcast datagrid and shared as
+ * appropriate in a multi-node cluster.
+ */
+public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
+ static final Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
+ private IRestApiService restApi;
+
+ static final String HAZELCAST_CONFIG_FILE = "datagridConfig";
+ private HazelcastInstance hazelcastInstance;
+ private Config hazelcastConfig;
+
+ //
+ // NOTE: eventChannels is kept thread safe by using explicit "synchronized"
+ // blocks below. Those are needed to protect the integrity of each entry
+ // instance, and avoid preemption during channel creation/startup.
+ //
+ private final Map<String, IEventChannel<?, ?>> eventChannels = new HashMap<>();
+
+ /**
+ * Initialize the Hazelcast Datagrid operation.
+ *
+ * @param configFilename the configuration filename.
+ */
+ public void init(String configFilename) {
+ /*
+ System.setProperty("hazelcast.socket.receive.buffer.size", "32");
+ System.setProperty("hazelcast.socket.send.buffer.size", "32");
+ */
+ // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
+
+ // Init from configuration file
+ try {
+ hazelcastConfig = new FileSystemXmlConfig(configFilename);
+ } catch (FileNotFoundException e) {
+ log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
+ }
+ /*
+ hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
+ hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
+ hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
+ */
+ //
+ hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
+ hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
+ hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
+ }
+
+ /**
+ * Shutdown the Hazelcast Datagrid operation.
+ */
+ @Override
+ protected void finalize() {
+ close();
+ }
+
+ /**
+ * Shutdown the Hazelcast Datagrid operation.
+ */
+ public void close() {
+ Hazelcast.shutdownAll();
+ }
+
+ /**
+ * Get the collection of offered module services.
+ *
+ * @return the collection of offered module services.
+ */
+ @Override
+ public Collection<Class<? extends IFloodlightService>> getModuleServices() {
+ Collection<Class<? extends IFloodlightService>> l =
+ new ArrayList<Class<? extends IFloodlightService>>();
+ l.add(IDatagridService.class);
+ return l;
+ }
+
+ /**
+ * Get the collection of implemented services.
+ *
+ * @return the collection of implemented services.
+ */
+ @Override
+ public Map<Class<? extends IFloodlightService>, IFloodlightService>
+ getServiceImpls() {
+ Map<Class<? extends IFloodlightService>,
+ IFloodlightService> m =
+ new HashMap<Class<? extends IFloodlightService>,
+ IFloodlightService>();
+ m.put(IDatagridService.class, this);
+ return m;
+ }
+
+ /**
+ * Get the collection of modules this module depends on.
+ *
+ * @return the collection of modules this module depends on.
+ */
+ @Override
+ public Collection<Class<? extends IFloodlightService>>
+ getModuleDependencies() {
+ Collection<Class<? extends IFloodlightService>> l =
+ new ArrayList<Class<? extends IFloodlightService>>();
+ l.add(IFloodlightProviderService.class);
+ l.add(IRestApiService.class);
+ return l;
+ }
+
+ /**
+ * Initialize the module.
+ *
+ * @param context the module context to use for the initialization.
+ * @throws FloodlightModuleException on error
+ */
+ @Override
+ public void init(FloodlightModuleContext context)
+ throws FloodlightModuleException {
+ restApi = context.getServiceImpl(IRestApiService.class);
+
+ // Get the configuration file name and configure the Datagrid
+ Map<String, String> configMap = context.getConfigParams(this);
+ String configFilename = configMap.get(HAZELCAST_CONFIG_FILE);
+ this.init(configFilename);
+ }
+
+ /**
+ * Startup module operation.
+ *
+ * @param context the module context to use for the startup.
+ */
+ @Override
+ public void startUp(FloodlightModuleContext context) {
+ hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
+
+ restApi.addRestletRoutable(new DatagridWebRoutable());
+ }
+
+ /**
+ * Create an event channel.
+ * <p/>
+ * If the channel already exists, just return it.
+ * NOTE: The channel is started automatically.
+ *
+ * @param channelName the event channel name.
+ * @param <K> the type of the Key in the Key-Value store.
+ * @param <V> the type of the Value in the Key-Value store.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ @Override
+ public <K, V> IEventChannel<K, V> createChannel(String channelName,
+ Class<K> typeK, Class<V> typeV) {
+ synchronized (eventChannels) {
+ IEventChannel<K, V> eventChannel =
+ createChannelImpl(channelName, typeK, typeV);
+ eventChannel.startup();
+ return eventChannel;
+ }
+ }
+
+ /**
+ * Create an event channel implementation.
+ * <p/>
+ * If the channel already exists, just return it.
+ * NOTE: The caller must call IEventChannel.startup() to startup the
+ * channel operation.
+ * NOTE: The caller must own the lock on "eventChannels".
+ *
+ * @param channelName the event channel name.
+ * @param <K> the type of the Key in the Key-Value store.
+ * @param <V> the type of the Value in the Key-Value store.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ private <K, V> IEventChannel<K, V> createChannelImpl(
+ String channelName,
+ Class<K> typeK, Class<V> typeV) {
+ IEventChannel<?, ?> genericEventChannel =
+ eventChannels.get(channelName);
+
+ // Add the channel if the first listener
+ if (genericEventChannel == null) {
+ IEventChannel<K, V> castedEventChannel =
+ new HazelcastEventChannel<K, V>(hazelcastInstance,
+ channelName, typeK, typeV);
+ eventChannels.put(channelName, castedEventChannel);
+ return castedEventChannel;
+ }
+
+ //
+ // TODO: Find if we can use Java internal support to check for
+ // type mismatch.
+ //
+ if (!genericEventChannel.verifyKeyValueTypes(typeK, typeV)) {
+ throw new ClassCastException("Key-value type mismatch for event channel " + channelName);
+ }
+ @SuppressWarnings("unchecked")
+ IEventChannel<K, V> castedEventChannel =
+ (IEventChannel<K, V>) genericEventChannel;
+ return castedEventChannel;
+ }
+
+ /**
+ * Add event channel listener.
+ * <p/>
+ * NOTE: The channel is started automatically right after the listener
+ * is added.
+ *
+ * @param channelName the event channel name.
+ * @param listener the listener to add.
+ * @param <K> the type of the Key in the Key-Value store.
+ * @param <V> the type of the Value in the Key-Value store.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ @Override
+ public <K, V> IEventChannel<K, V> addListener(String channelName,
+ IEventChannelListener<K, V> listener,
+ Class<K> typeK, Class<V> typeV) {
+ synchronized (eventChannels) {
+ IEventChannel<K, V> eventChannel =
+ createChannelImpl(channelName, typeK, typeV);
+ eventChannel.addListener(listener);
+ eventChannel.startup();
+
+ return eventChannel;
+ }
+ }
+
+ /**
+ * Remove event channel listener.
+ *
+ * @param <K> the type of the Key in the Key-Value store.
+ * @param <V> the type of the Value in the Key-Value store.
+ * @param channelName the event channel name.
+ * @param listener the listener to remove.
+ */
+ @Override
+ public <K, V> void removeListener(String channelName,
+ IEventChannelListener<K, V> listener) {
+ synchronized (eventChannels) {
+ IEventChannel<?, ?> genericEventChannel =
+ eventChannels.get(channelName);
+
+ if (genericEventChannel != null) {
+ //
+ // TODO: Find if we can use Java internal support to check for
+ // type mismatch.
+ // NOTE: Using "ClassCastException" exception below doesn't
+ // work.
+ //
+ @SuppressWarnings("unchecked")
+ IEventChannel<K, V> castedEventChannel =
+ (IEventChannel<K, V>) genericEventChannel;
+ castedEventChannel.removeListener(listener);
+ }
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/HazelcastEventChannel.java b/src/main/java/net/onrc/onos/core/datagrid/HazelcastEventChannel.java
new file mode 100644
index 0000000..8797dba
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/HazelcastEventChannel.java
@@ -0,0 +1,399 @@
+package net.onrc.onos.core.datagrid;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+
+import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+
+/**
+ * A datagrid event channel that uses Hazelcast as a datagrid.
+ *
+ * @param <K> The class type of the key.
+ * @param <V> The class type of the value.
+ */
+public class HazelcastEventChannel<K, V> implements IEventChannel<K, V> {
+ private final HazelcastInstance hazelcastInstance; // The Hazelcast instance
+ private final String channelName; // The event channel name
+ private final Class<K> typeK; // The class type of the key
+ private final Class<V> typeV; // The class type of the value
+ private IMap<K, byte[]> channelMap; // The Hazelcast channel map
+ // The channel listeners
+ private final CopyOnWriteArrayList<IEventChannelListener<K, V>> listeners =
+ new CopyOnWriteArrayList<>();
+
+ // The map entry listener
+ private final EntryListener<K, byte[]> mapEntryListener = new MapEntryListener();
+ private String mapListenerId; // The map listener ID
+
+ // TODO: We should use a single global KryoFactory instance
+ private final KryoFactory kryoFactory = new KryoFactory();
+
+ // Maximum serialized event size
+ private static final int MAX_BUFFER_SIZE = 64 * 1024;
+
+ /**
+ * Constructor for a given event channel name.
+ *
+ * @param newHazelcastInstance the Hazelcast instance to use.
+ * @param newChannelName the event channel name.
+ * @param newTypeK the type of the Key in the Key-Value store.
+ * @param newTypeV the type of the Value in the Key-Value store.
+ */
+ public HazelcastEventChannel(HazelcastInstance newHazelcastInstance,
+ String newChannelName, Class<K> newTypeK,
+ Class<V> newTypeV) {
+ hazelcastInstance = newHazelcastInstance;
+ channelName = newChannelName;
+ typeK = newTypeK;
+ typeV = newTypeV;
+ }
+
+ /**
+ * Verify the key and value types of a channel.
+ *
+ * @param typeKToVerify the type of the key to verify.
+ * @param typeVToVerify the type of the value to verify.
+ * @return true if the key and value types of the channel match,
+ * otherwise false.
+ */
+ @Override
+ public boolean verifyKeyValueTypes(Class typeKToVerify,
+ Class typeVToVerify) {
+ return (typeK.equals(typeKToVerify)) && (typeV.equals(typeVToVerify));
+ }
+
+ /**
+ * Cleanup and destroy the channel.
+ */
+ @Override
+ protected void finalize() {
+ shutdown();
+ }
+
+ /**
+ * Startup the channel operation.
+ */
+ @Override
+ public void startup() {
+ if (channelMap == null) {
+ channelMap = hazelcastInstance.getMap(channelName);
+ mapListenerId = channelMap.addEntryListener(mapEntryListener,
+ true);
+ }
+ }
+
+ /**
+ * Shutdown the channel operation.
+ */
+ @Override
+ public void shutdown() {
+ if (channelMap != null) {
+ channelMap.removeEntryListener(mapListenerId);
+ channelMap = null;
+ mapListenerId = null;
+ }
+ }
+
+ /**
+ * Add event channel listener.
+ *
+ * @param listener the listener to add.
+ */
+ @Override
+ public void addListener(IEventChannelListener<K, V> listener) {
+ if (listeners.contains(listener)) {
+ return; // Nothing to do: already a listener
+ }
+ listeners.add(listener);
+ }
+
+ /**
+ * Remove event channel listener.
+ *
+ * @param listener the listener to remove.
+ */
+ @Override
+ public void removeListener(IEventChannelListener<K, V> listener) {
+ listeners.remove(listener);
+ }
+
+ /**
+ * Add an entry to the channel.
+ *
+ * @param key the key of the entry to add.
+ * @param value the value of the entry to add.
+ */
+ @Override
+ public void addEntry(K key, V value) {
+ byte[] valueBytes = serializeValue(value);
+ //
+ // Put the entry in the map:
+ // - Key : Type <K>
+ // - Value : Serialized Value (byte[])
+ //
+ channelMap.putAsync(key, valueBytes);
+ }
+
+ /**
+ * Add a transient entry to the channel.
+ * <p/>
+ * The added entry is transient and will automatically timeout after 1ms.
+ *
+ * @param key the key of the entry to add.
+ * @param value the value of the entry to add.
+ */
+ @Override
+ public void addTransientEntry(K key, V value) {
+ byte[] valueBytes = serializeValue(value);
+ //
+ // Put the entry in the map:
+ // - Key : Type <K>
+ // - Value : Serialized Value (byte[])
+ // - Timeout: 1ms
+ //
+ channelMap.putAsync(key, valueBytes, 1L, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Serialize the value.
+ *
+ * @param value the value to serialize.
+ * @return the serialized value.
+ */
+ private byte[] serializeValue(V value) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, value);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ return valueBytes;
+ }
+
+ /**
+ * Remove an entry from the channel.
+ *
+ * @param key the key of the entry to remove.
+ */
+ @Override
+ public void removeEntry(K key) {
+ //
+ // Remove the entry:
+ // - Key : Type <K>
+ // - Value : Serialized Value (byte[])
+ //
+ channelMap.removeAsync(key);
+ }
+
+ /**
+ * Update an entry in the channel.
+ *
+ * @param key the key of the entry to update.
+ * @param value the value of the entry to update.
+ */
+ @Override
+ public void updateEntry(K key, V value) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ addEntry(key, value);
+ }
+
+ /**
+ * Get an entry from the channel.
+ *
+ * @param key the key of the entry to get.
+ * @return the entry if found, otherwise null.
+ */
+ @Override
+ @Deprecated
+ public V getEntry(K key) {
+ byte[] valueBytes = channelMap.get(key);
+ if (valueBytes == null) {
+ return null;
+ }
+
+ Kryo kryo = kryoFactory.newKryo();
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ V value = (V) kryo.readObject(input, typeV);
+ kryoFactory.deleteKryo(kryo);
+
+ return value;
+ }
+
+ /**
+ * Get all entries in the channel.
+ *
+ * @return all entries that are currently in the channel.
+ */
+ @Override
+ @Deprecated
+ public Collection<V> getAllEntries() {
+ Collection<V> allEntries = new LinkedList<V>();
+
+ if (channelMap == null) {
+ return allEntries; // Nothing found
+ }
+
+ //
+ // Get all entries
+ //
+ Collection<byte[]> values = channelMap.values();
+ Kryo kryo = kryoFactory.newKryo();
+ for (byte[] valueBytes : values) {
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ V value = (V) kryo.readObject(input, typeV);
+ allEntries.add(value);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allEntries;
+ }
+
+ /**
+ * Remove all entries in the channel.
+ */
+ @Override
+ @Deprecated
+ public void removeAllEntries() {
+ //
+ // Remove all entries
+ //
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // channelMap.clear();
+ Set<K> keySet = channelMap.keySet();
+ for (K key : keySet) {
+ channelMap.removeAsync(key);
+ }
+ }
+
+ /**
+ * Class for receiving event notifications for the channel.
+ * <p/>
+ * The datagrid map is:
+ * - Key: Type K
+ * - Value: Serialized V (byte[])
+ */
+ private class MapEntryListener implements EntryListener<K, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ @Override
+ public void entryAdded(EntryEvent<K, byte[]> event) {
+ //
+ // Decode the value
+ //
+ byte[] valueBytes = event.getValue();
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ V value = (V) kryo.readObject(input, typeV);
+
+ //
+ // Deliver the notification
+ //
+ int index = 0;
+ for (IEventChannelListener<K, V> listener : listeners) {
+ V copyValue = value;
+ if (index++ > 0) {
+ // Each listener should get a deep copy of the value
+ copyValue = kryo.copy(value);
+ }
+ listener.entryAdded(copyValue);
+ }
+ kryoFactory.deleteKryo(kryo);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ @Override
+ public void entryRemoved(EntryEvent<K, byte[]> event) {
+ //
+ // Decode the value
+ //
+ byte[] valueBytes = event.getValue();
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ V value = (V) kryo.readObject(input, typeV);
+
+ //
+ // Deliver the notification
+ //
+ int index = 0;
+ for (IEventChannelListener<K, V> listener : listeners) {
+ V copyValue = value;
+ if (index++ > 0) {
+ // Each listener should get a deep copy of the value
+ copyValue = kryo.copy(value);
+ }
+ listener.entryRemoved(copyValue);
+ }
+ kryoFactory.deleteKryo(kryo);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ @Override
+ public void entryUpdated(EntryEvent<K, byte[]> event) {
+ //
+ // Decode the value
+ //
+ byte[] valueBytes = event.getValue();
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ V value = (V) kryo.readObject(input, typeV);
+
+ //
+ // Deliver the notification
+ //
+ int index = 0;
+ for (IEventChannelListener<K, V> listener : listeners) {
+ V copyValue = value;
+ if (index++ > 0) {
+ // Each listener should get a deep copy of the value
+ copyValue = kryo.copy(value);
+ }
+ listener.entryUpdated(copyValue);
+ }
+ kryoFactory.deleteKryo(kryo);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ @Override
+ public void entryEvicted(EntryEvent<K, byte[]> event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/core/datagrid/IDatagridService.java
new file mode 100755
index 0000000..5fe52cc
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/IDatagridService.java
@@ -0,0 +1,47 @@
+package net.onrc.onos.core.datagrid;
+
+import net.floodlightcontroller.core.module.IFloodlightService;
+
+/**
+ * Interface for providing Datagrid Service to other modules.
+ */
+public interface IDatagridService extends IFloodlightService {
+ /**
+ * Create an event channel.
+ * <p/>
+ * If the channel already exists, just return it.
+ * NOTE: The channel is started automatically.
+ *
+ * @param channelName the event channel name.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ <K, V> IEventChannel<K, V> createChannel(String channelName,
+ Class<K> typeK, Class<V> typeV);
+
+ /**
+ * Add event channel listener.
+ * <p/>
+ * NOTE: The channel is started automatically right after the listener
+ * is added.
+ *
+ * @param channelName the event channel name.
+ * @param listener the listener to add.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ <K, V> IEventChannel<K, V> addListener(String channelName,
+ IEventChannelListener<K, V> listener,
+ Class<K> typeK, Class<V> typeV);
+
+ /**
+ * Remove event channel listener.
+ *
+ * @param channelName the event channel name.
+ * @param listener the listener to remove.
+ */
+ <K, V> void removeListener(String channelName,
+ IEventChannelListener<K, V> listener);
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/IEventChannel.java b/src/main/java/net/onrc/onos/core/datagrid/IEventChannel.java
new file mode 100644
index 0000000..5874b31
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/IEventChannel.java
@@ -0,0 +1,98 @@
+package net.onrc.onos.core.datagrid;
+
+import java.util.Collection;
+
+/**
+ * Event Channel Interface.
+ */
+public interface IEventChannel<K, V> {
+ /**
+ * Startup the channel operation.
+ */
+ void startup();
+
+ /**
+ * Shutdown the channel operation.
+ */
+ void shutdown();
+
+ /**
+ * Verify the key and value types of a channel.
+ *
+ * @param typeK the type of the key to verify.
+ * @param typeV the type of the value to verify.
+ * @return true if the key and value types of the channel match,
+ * otherwise false.
+ */
+ boolean verifyKeyValueTypes(Class typeK, Class typeV);
+
+ /**
+ * Add event channel listener.
+ *
+ * @param listener the listener to add.
+ */
+ void addListener(IEventChannelListener<K, V> listener);
+
+ /**
+ * Remove event channel listener.
+ *
+ * @param listener the listener to remove.
+ */
+ void removeListener(IEventChannelListener<K, V> listener);
+
+ /**
+ * Add an entry to the channel.
+ *
+ * @param key the key of the entry to add.
+ * @param value the value of the entry to add.
+ */
+ void addEntry(K key, V value);
+
+ /**
+ * Add a transient entry to the channel.
+ * <p/>
+ * The added entry is transient and will automatically timeout after 1ms.
+ *
+ * @param key the key of the entry to add.
+ * @param value the value of the entry to add.
+ */
+ void addTransientEntry(K key, V value);
+
+ /**
+ * Remove an entry from the channel.
+ *
+ * @param key the key of the entry to remove.
+ */
+ void removeEntry(K key);
+
+ /**
+ * Update an entry in the channel.
+ *
+ * @param key the key of the entry to update.
+ * @param value the value of the entry to update.
+ */
+ void updateEntry(K key, V value);
+
+ /**
+ * Get an entry from the channel.
+ *
+ * @param key the key of the entry to get.
+ * @return the entry if found, otherwise null.
+ */
+ @Deprecated
+ V getEntry(K key);
+
+ /**
+ * Get all entries in the channel.
+ *
+ * @return all entries that are currently in the channel.
+ */
+ @Deprecated
+ Collection<V> getAllEntries();
+
+ /**
+ * Remove all entries in the channel.
+ */
+ @Deprecated
+ void removeAllEntries();
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/IEventChannelListener.java b/src/main/java/net/onrc/onos/core/datagrid/IEventChannelListener.java
new file mode 100644
index 0000000..ca18aa7
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/IEventChannelListener.java
@@ -0,0 +1,27 @@
+package net.onrc.onos.core.datagrid;
+
+/**
+ * Event Channel Listener Interface.
+ */
+public interface IEventChannelListener<K, V> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param value the value for the entry.
+ */
+ void entryAdded(V value);
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param value the value for the entry.
+ */
+ void entryRemoved(V value);
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param value the value for the entry.
+ */
+ void entryUpdated(V value);
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/web/DatagridWebRoutable.java b/src/main/java/net/onrc/onos/core/datagrid/web/DatagridWebRoutable.java
new file mode 100755
index 0000000..672f66d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/web/DatagridWebRoutable.java
@@ -0,0 +1,37 @@
+package net.onrc.onos.core.datagrid.web;
+
+import net.floodlightcontroller.restserver.RestletRoutable;
+
+import org.restlet.Context;
+import org.restlet.Restlet;
+import org.restlet.routing.Router;
+
+/**
+ * REST API implementation for the Datagrid.
+ */
+public class DatagridWebRoutable implements RestletRoutable {
+ /**
+ * Create the Restlet router and bind to the proper resources.
+ */
+ @Override
+ public Restlet getRestlet(Context context) {
+ Router router = new Router(context);
+ router.attach("/add/intents/json", IntentResource.class);
+ router.attach("/get/intents/json", IntentResource.class);
+ router.attach("/get/intent/{intent_id}/json", IntentResource.class);
+ router.attach("/get/ng-events/json", GetNGEventsResource.class);
+ router.attach("/get/ng-flows/summary/json", GetNGFlowsSummaryResource.class);
+ router.attach("/get/intents/{category}/json", IntentResource.class);
+ router.attach("/get/intent/{category}/{intent_id}/json", IntentResource.class);
+ router.attach("/delete/intents/json", IntentResource.class);
+ return router;
+ }
+
+ /**
+ * Set the base path for the Topology
+ */
+ @Override
+ public String basePath() {
+ return "/wm/onos/datagrid";
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/web/GetNGEventsResource.java b/src/main/java/net/onrc/onos/core/datagrid/web/GetNGEventsResource.java
new file mode 100644
index 0000000..f319d23
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/web/GetNGEventsResource.java
@@ -0,0 +1,41 @@
+package net.onrc.onos.core.datagrid.web;
+
+import java.util.Collection;
+
+import net.onrc.onos.core.datagrid.IDatagridService;
+import net.onrc.onos.core.datagrid.IEventChannel;
+import net.onrc.onos.ofcontroller.networkgraph.TopologyEvent;
+import net.onrc.onos.ofcontroller.networkgraph.TopologyManager;
+
+import org.restlet.resource.Get;
+import org.restlet.resource.ServerResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GetNGEventsResource extends ServerResource {
+
+ public static final Logger log = LoggerFactory.getLogger(GetNGEventsResource.class);
+
+ @Get("json")
+ public String retrieve() {
+ IDatagridService datagridService =
+ (IDatagridService) getContext().getAttributes().
+ get(IDatagridService.class.getCanonicalName());
+
+
+ log.debug("Get network graph events");
+
+ IEventChannel<byte[], TopologyEvent> channel = datagridService.createChannel(TopologyManager.EVENT_CHANNEL_NAME,
+ byte[].class, TopologyEvent.class);
+
+ Collection<TopologyEvent> entries = channel.getAllEntries();
+
+ String result = "";
+ for (TopologyEvent event : entries) {
+ result += event.toString() + "\n";
+ }
+
+ return result;
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/web/GetNGFlowsSummaryResource.java b/src/main/java/net/onrc/onos/core/datagrid/web/GetNGFlowsSummaryResource.java
new file mode 100644
index 0000000..0ebeec9
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/web/GetNGFlowsSummaryResource.java
@@ -0,0 +1,114 @@
+package net.onrc.onos.core.datagrid.web;
+
+import java.util.ArrayList;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import net.onrc.onos.intent.Intent;
+import net.onrc.onos.intent.PathIntent;
+import net.onrc.onos.intent.ShortestPathIntent;
+import net.onrc.onos.intent.Intent.IntentState;
+import net.onrc.onos.intent.IntentMap;
+import net.onrc.onos.intent.runtime.IPathCalcRuntimeService;
+
+import net.onrc.onos.ofcontroller.networkgraph.LinkEvent;
+import net.onrc.onos.ofcontroller.networkgraph.Path;
+import net.onrc.onos.ofcontroller.util.CallerId;
+import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.FlowPathType;
+import net.onrc.onos.ofcontroller.util.FlowPathUserState;
+import net.onrc.onos.ofcontroller.util.Port;
+import net.onrc.onos.ofcontroller.util.SwitchPort;
+
+import org.restlet.resource.Get;
+import org.restlet.resource.ServerResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * REST API call to get a summary of Flow Paths.
+ *
+ * NOTE: This REST API call is needed for the ONOS GUI.
+ *
+ * GET /wm/onos/datagrid/get/ng-flows/summary/json
+ */
+public class GetNGFlowsSummaryResource extends ServerResource {
+ public static final Logger log = LoggerFactory.getLogger(GetNGFlowsSummaryResource.class);
+
+ @Get("json")
+ public ArrayList<FlowPath> retrieve() {
+ ArrayList<FlowPath> result = new ArrayList<>();
+ SortedMap<Long, FlowPath> sortedFlowPaths = new TreeMap<>();
+
+ IPathCalcRuntimeService pathRuntime =
+ (IPathCalcRuntimeService) getContext().
+ getAttributes().get(IPathCalcRuntimeService.class.getCanonicalName());
+ log.debug("Get NG Flows Summary");
+
+
+ IntentMap parentIntentMap = pathRuntime.getHighLevelIntents();
+ IntentMap intentMap = pathRuntime.getPathIntents();
+ for (Intent parentIntent : parentIntentMap.getAllIntents()) {
+ // Get only installed Shortest Paths
+ if (parentIntent.getState() != IntentState.INST_ACK)
+ continue;
+ if (!(parentIntent instanceof ShortestPathIntent))
+ continue;
+ ShortestPathIntent spIntent = (ShortestPathIntent) parentIntent;
+
+ // Get the Path Intent
+ Intent intent = intentMap.getIntent(spIntent.getPathIntentId());
+ if (!(intent instanceof PathIntent))
+ continue;
+ PathIntent pathIntent = (PathIntent) intent;
+
+ // Decode the Shortest Path ID
+ String applnIntentId = parentIntent.getId();
+ String intentId = applnIntentId.split(":")[1];
+
+ // Create the Flow Path
+ FlowId flowId = new FlowId(intentId);
+ FlowPath flowPath = new FlowPath();
+ flowPath.setFlowId(flowId);
+ sortedFlowPaths.put(flowPath.flowId().value(), flowPath);
+
+ flowPath.setInstallerId(new CallerId("E"));
+ flowPath.setFlowEntryActions(null);
+ flowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
+ flowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
+
+ // Setup the Source and Destination DPID and Port
+ SwitchPort srcPort = flowPath.dataPath().srcPort();
+ SwitchPort dstPort = flowPath.dataPath().dstPort();
+ srcPort.setDpid(new Dpid(spIntent.getSrcSwitchDpid()));
+ srcPort.setPort(new Port((short) spIntent.getSrcPortNumber()));
+ dstPort.setDpid(new Dpid(spIntent.getDstSwitchDpid()));
+ dstPort.setPort(new Port((short) spIntent.getDstPortNumber()));
+
+ // Extract the Flow Entries
+ Path path = pathIntent.getPath();
+ FlowEntry flowEntry;
+ ArrayList<FlowEntry> flowEntries = new ArrayList<>();
+ for (LinkEvent linkEvent : path) {
+ Dpid dpid = new Dpid(linkEvent.getSrc().getDpid());
+ flowEntry = new FlowEntry();
+ flowEntry.setDpid(dpid);
+ flowEntries.add(flowEntry);
+ }
+ // Add the final Flow Entry
+ flowEntry = new FlowEntry();
+ flowEntry.setDpid(new Dpid(spIntent.getDstSwitchDpid()));
+ flowEntries.add(flowEntry);
+ flowPath.dataPath().setFlowEntries(flowEntries);
+ }
+
+ // Prepare the return result
+ for (FlowPath flowPath : sortedFlowPaths.values())
+ result.add(flowPath);
+
+ return result;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/web/IntentResource.java b/src/main/java/net/onrc/onos/core/datagrid/web/IntentResource.java
new file mode 100755
index 0000000..7d338b5
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/web/IntentResource.java
@@ -0,0 +1,230 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package net.onrc.onos.core.datagrid.web;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import net.onrc.onos.intent.ConstrainedShortestPathIntent;
+import net.onrc.onos.intent.ShortestPathIntent;
+import net.onrc.onos.intent.IntentOperation;
+import net.onrc.onos.intent.IntentMap;
+import net.onrc.onos.intent.Intent;
+import net.onrc.onos.intent.runtime.IPathCalcRuntimeService;
+import net.onrc.onos.intent.IntentOperationList;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.restlet.resource.Post;
+import org.restlet.resource.ServerResource;
+import org.codehaus.jackson.map.ObjectMapper;
+import net.floodlightcontroller.util.MACAddress;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.ObjectNode;
+import org.restlet.resource.Delete;
+import org.restlet.resource.Get;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author nickkaranatsios
+ */
+public class IntentResource extends ServerResource {
+ private final static Logger log = LoggerFactory.getLogger(IntentResource.class);
+ // TODO need to assign proper application id.
+ private final String APPLN_ID = "1";
+
+ @Post("json")
+ public String store(String jsonIntent) throws IOException {
+ IPathCalcRuntimeService pathRuntime = (IPathCalcRuntimeService) getContext()
+ .getAttributes().get(IPathCalcRuntimeService.class.getCanonicalName());
+ if (pathRuntime == null) {
+ log.warn("Failed to get path calc runtime");
+ return "";
+ }
+ String reply = "";
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode jNode = null;
+ try {
+ jNode = mapper.readValue(jsonIntent, JsonNode.class);
+ } catch (JsonGenerationException ex) {
+ log.error("JsonGeneration exception ", ex);
+ } catch (JsonMappingException ex) {
+ log.error("JsonMappingException occurred", ex);
+ } catch (IOException ex) {
+ log.error("IOException occurred", ex);
+ }
+
+ if (jNode != null) {
+ reply = parseJsonNode(jNode.getElements(), pathRuntime);
+ }
+ return reply;
+ }
+
+ @Delete("json")
+ public String store() {
+ IPathCalcRuntimeService pathRuntime = (IPathCalcRuntimeService) getContext().
+ getAttributes().get(IPathCalcRuntimeService.class.getCanonicalName());
+ pathRuntime.purgeIntents();
+ // TODO no reply yet from the purge intents call
+ return "";
+
+ }
+
+ @Get("json")
+ public String retrieve() throws IOException {
+ IPathCalcRuntimeService pathRuntime = (IPathCalcRuntimeService) getContext().
+ getAttributes().get(IPathCalcRuntimeService.class.getCanonicalName());
+
+ String intentCategory = (String) getRequestAttributes().get("category");
+ IntentMap intentMap = null;
+ if (intentCategory.equals("high")) {
+ intentMap = pathRuntime.getHighLevelIntents();
+ } else {
+ intentMap = pathRuntime.getPathIntents();
+ }
+ ObjectMapper mapper = new ObjectMapper();
+ String restStr = "";
+
+ String intentId = (String) getRequestAttributes().get("intent_id");
+ ArrayNode arrayNode = mapper.createArrayNode();
+ Collection<Intent> intents = intentMap.getAllIntents();
+ if (!intents.isEmpty()) {
+ if ((intentId != null)) {
+ String applnIntentId = APPLN_ID + ":" + intentId;
+ Intent intent = intentMap.getIntent(applnIntentId);
+ if (intent != null) {
+ ObjectNode node = mapper.createObjectNode();
+ // TODO refactor/remove duplicate code.
+ node.put("intent_id", intentId);
+ node.put("status", intent.getState().toString());
+ LinkedList<String> logs = intent.getLogs();
+ ArrayNode logNode = mapper.createArrayNode();
+ for (String intentLog : logs) {
+ logNode.add(intentLog);
+ }
+ node.put("log", logNode);
+ arrayNode.add(node);
+ }
+ } else {
+ for (Intent intent : intents) {
+ ObjectNode node = mapper.createObjectNode();
+ String applnIntentId = intent.getId();
+ intentId = applnIntentId.split(":")[1];
+ node.put("intent_id", intentId);
+ node.put("status", intent.getState().toString());
+ LinkedList<String> logs = intent.getLogs();
+ ArrayNode logNode = mapper.createArrayNode();
+ for (String intentLog : logs) {
+ logNode.add(intentLog);
+ }
+ node.put("log", logNode);
+ arrayNode.add(node);
+ }
+ }
+ restStr = mapper.writeValueAsString(arrayNode);
+ }
+ return restStr;
+ }
+
+ private String parseJsonNode(Iterator<JsonNode> nodes,
+ IPathCalcRuntimeService pathRuntime) throws IOException {
+ IntentOperationList operations = new IntentOperationList();
+ ObjectMapper mapper = new ObjectMapper();
+ ArrayNode arrayNode = mapper.createArrayNode();
+ while (nodes.hasNext()) {
+ JsonNode node = nodes.next();
+ if (node.isObject()) {
+ JsonNode data;
+ Iterator<String> fieldNames = node.getFieldNames();
+ Map<String, Object> fields = new HashMap<>();
+ while (fieldNames.hasNext()) {
+ String fieldName = fieldNames.next();
+ data = node.get(fieldName);
+ parseFields(data, fieldName, fields);
+ }
+ Intent intent = processIntent(fields, operations);
+ appendIntentStatus(intent, (String) fields.get("intent_id"), mapper, arrayNode);
+ }
+ }
+ pathRuntime.executeIntentOperations(operations);
+ return mapper.writeValueAsString(arrayNode);
+ }
+
+ private void appendIntentStatus(Intent intent, final String intentId,
+ ObjectMapper mapper, ArrayNode arrayNode) throws IOException {
+ ObjectNode node = mapper.createObjectNode();
+ node.put("intent_id", intentId);
+ node.put("status", intent.getState().toString());
+ LinkedList<String> logs = intent.getLogs();
+ ArrayNode logNode = mapper.createArrayNode();
+ for (String intentLog : logs) {
+ logNode.add(intentLog);
+ }
+ node.put("log", logNode);
+ arrayNode.add(node);
+ }
+
+ private Intent processIntent(Map<String, Object> fields, IntentOperationList operations) {
+ String intentType = (String) fields.get("intent_type");
+ String intentOp = (String) fields.get("intent_op");
+ Intent intent;
+ String intentId = (String) fields.get("intent_id");
+ boolean pathFrozen = false;
+ if (intentId.startsWith("F")) { // TODO define REST API for frozen intents
+ pathFrozen = true;
+ intentId = intentId.substring(1);
+ }
+ String applnIntentId = APPLN_ID + ":" + intentId;
+
+ IntentOperation.Operator operation = IntentOperation.Operator.ADD;
+ if ((intentOp.equals("remove"))) {
+ operation = IntentOperation.Operator.REMOVE;
+ }
+ if (intentType.equals("shortest_intent_type")) {
+ ShortestPathIntent spi = new ShortestPathIntent(applnIntentId,
+ Long.decode((String) fields.get("srcSwitch")),
+ (long) fields.get("srcPort"),
+ MACAddress.valueOf((String) fields.get("srcMac")).toLong(),
+ Long.decode((String) fields.get("dstSwitch")),
+ (long) fields.get("dstPort"),
+ MACAddress.valueOf((String) fields.get("dstMac")).toLong());
+ spi.setPathFrozen(pathFrozen);
+ operations.add(operation, spi);
+ intent = spi;
+ } else {
+ ConstrainedShortestPathIntent cspi = new ConstrainedShortestPathIntent(applnIntentId,
+ Long.decode((String) fields.get("srcSwitch")),
+ (long) fields.get("srcPort"),
+ MACAddress.valueOf((String) fields.get("srcMac")).toLong(),
+ Long.decode((String) fields.get("dstSwitch")),
+ (long) fields.get("dstPort"),
+ MACAddress.valueOf((String) fields.get("dstMac")).toLong(),
+ (double) fields.get("bandwidth"));
+ cspi.setPathFrozen(pathFrozen);
+ operations.add(operation, cspi);
+ intent = cspi;
+ }
+ return intent;
+ }
+
+ private void parseFields(JsonNode node, String fieldName, Map<String, Object> fields) {
+ if ((node.isTextual())) {
+ fields.put(fieldName, node.getTextValue());
+ } else if ((node.isInt())) {
+ fields.put(fieldName, (long) node.getIntValue());
+ } else if (node.isDouble()) {
+ fields.put(fieldName, node.getDoubleValue());
+ } else if ((node.isLong())) {
+ fields.put(fieldName, node.getLongValue());
+ }
+ }
+}