Renamed datagrid and datastore packages
net.onrc.onos.datagrid.* => net.onrc.onos.core.datagrid.*
net.onrc.onos.datastore.* => net.onrc.onos.core.datastore.*
Change-Id: Ibe1894a6fabae08ea7cfcbf6595f0c91b05ef497
diff --git a/src/main/java/net/onrc/onos/core/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/core/datagrid/HazelcastDatagrid.java
new file mode 100755
index 0000000..7d2b7f3
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/HazelcastDatagrid.java
@@ -0,0 +1,287 @@
+package net.onrc.onos.core.datagrid;
+
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.core.module.FloodlightModuleException;
+import net.floodlightcontroller.core.module.IFloodlightModule;
+import net.floodlightcontroller.core.module.IFloodlightService;
+import net.floodlightcontroller.restserver.IRestApiService;
+import net.onrc.onos.core.datagrid.web.DatagridWebRoutable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.instance.GroupProperties;
+
+/**
+ * A datagrid service that uses Hazelcast as a datagrid.
+ * The relevant data is stored in the Hazelcast datagrid and shared as
+ * appropriate in a multi-node cluster.
+ */
+public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
+ static final Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
+ private IRestApiService restApi;
+
+ static final String HAZELCAST_CONFIG_FILE = "datagridConfig";
+ private HazelcastInstance hazelcastInstance;
+ private Config hazelcastConfig;
+
+ //
+ // NOTE: eventChannels is kept thread safe by using explicit "synchronized"
+ // blocks below. Those are needed to protect the integrity of each entry
+ // instance, and avoid preemption during channel creation/startup.
+ //
+ private final Map<String, IEventChannel<?, ?>> eventChannels = new HashMap<>();
+
+ /**
+ * Initialize the Hazelcast Datagrid operation.
+ *
+ * @param configFilename the configuration filename.
+ */
+ public void init(String configFilename) {
+ /*
+ System.setProperty("hazelcast.socket.receive.buffer.size", "32");
+ System.setProperty("hazelcast.socket.send.buffer.size", "32");
+ */
+ // System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
+
+ // Init from configuration file
+ try {
+ hazelcastConfig = new FileSystemXmlConfig(configFilename);
+ } catch (FileNotFoundException e) {
+ log.error("Error opening Hazelcast XML configuration. File not found: " + configFilename, e);
+ }
+ /*
+ hazelcastConfig.setProperty(GroupProperties.PROP_IO_THREAD_COUNT, "1");
+ hazelcastConfig.setProperty(GroupProperties.PROP_OPERATION_THREAD_COUNT, "1");
+ hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_THREAD_COUNT, "1");
+ */
+ //
+ hazelcastConfig.setProperty(GroupProperties.PROP_EVENT_QUEUE_CAPACITY, "4000000");
+ hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_RECEIVE_BUFFER_SIZE, "4096");
+ hazelcastConfig.setProperty(GroupProperties.PROP_SOCKET_SEND_BUFFER_SIZE, "4096");
+ }
+
+ /**
+ * Shutdown the Hazelcast Datagrid operation.
+ */
+ @Override
+ protected void finalize() {
+ close();
+ }
+
+ /**
+ * Shutdown the Hazelcast Datagrid operation.
+ */
+ public void close() {
+ Hazelcast.shutdownAll();
+ }
+
+ /**
+ * Get the collection of offered module services.
+ *
+ * @return the collection of offered module services.
+ */
+ @Override
+ public Collection<Class<? extends IFloodlightService>> getModuleServices() {
+ Collection<Class<? extends IFloodlightService>> l =
+ new ArrayList<Class<? extends IFloodlightService>>();
+ l.add(IDatagridService.class);
+ return l;
+ }
+
+ /**
+ * Get the collection of implemented services.
+ *
+ * @return the collection of implemented services.
+ */
+ @Override
+ public Map<Class<? extends IFloodlightService>, IFloodlightService>
+ getServiceImpls() {
+ Map<Class<? extends IFloodlightService>,
+ IFloodlightService> m =
+ new HashMap<Class<? extends IFloodlightService>,
+ IFloodlightService>();
+ m.put(IDatagridService.class, this);
+ return m;
+ }
+
+ /**
+ * Get the collection of modules this module depends on.
+ *
+ * @return the collection of modules this module depends on.
+ */
+ @Override
+ public Collection<Class<? extends IFloodlightService>>
+ getModuleDependencies() {
+ Collection<Class<? extends IFloodlightService>> l =
+ new ArrayList<Class<? extends IFloodlightService>>();
+ l.add(IFloodlightProviderService.class);
+ l.add(IRestApiService.class);
+ return l;
+ }
+
+ /**
+ * Initialize the module.
+ *
+ * @param context the module context to use for the initialization.
+ * @throws FloodlightModuleException on error
+ */
+ @Override
+ public void init(FloodlightModuleContext context)
+ throws FloodlightModuleException {
+ restApi = context.getServiceImpl(IRestApiService.class);
+
+ // Get the configuration file name and configure the Datagrid
+ Map<String, String> configMap = context.getConfigParams(this);
+ String configFilename = configMap.get(HAZELCAST_CONFIG_FILE);
+ this.init(configFilename);
+ }
+
+ /**
+ * Startup module operation.
+ *
+ * @param context the module context to use for the startup.
+ */
+ @Override
+ public void startUp(FloodlightModuleContext context) {
+ hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
+
+ restApi.addRestletRoutable(new DatagridWebRoutable());
+ }
+
+ /**
+ * Create an event channel.
+ * <p/>
+ * If the channel already exists, just return it.
+ * NOTE: The channel is started automatically.
+ *
+ * @param channelName the event channel name.
+ * @param <K> the type of the Key in the Key-Value store.
+ * @param <V> the type of the Value in the Key-Value store.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ @Override
+ public <K, V> IEventChannel<K, V> createChannel(String channelName,
+ Class<K> typeK, Class<V> typeV) {
+ synchronized (eventChannels) {
+ IEventChannel<K, V> eventChannel =
+ createChannelImpl(channelName, typeK, typeV);
+ eventChannel.startup();
+ return eventChannel;
+ }
+ }
+
+ /**
+ * Create an event channel implementation.
+ * <p/>
+ * If the channel already exists, just return it.
+ * NOTE: The caller must call IEventChannel.startup() to startup the
+ * channel operation.
+ * NOTE: The caller must own the lock on "eventChannels".
+ *
+ * @param channelName the event channel name.
+ * @param <K> the type of the Key in the Key-Value store.
+ * @param <V> the type of the Value in the Key-Value store.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ private <K, V> IEventChannel<K, V> createChannelImpl(
+ String channelName,
+ Class<K> typeK, Class<V> typeV) {
+ IEventChannel<?, ?> genericEventChannel =
+ eventChannels.get(channelName);
+
+ // Add the channel if the first listener
+ if (genericEventChannel == null) {
+ IEventChannel<K, V> castedEventChannel =
+ new HazelcastEventChannel<K, V>(hazelcastInstance,
+ channelName, typeK, typeV);
+ eventChannels.put(channelName, castedEventChannel);
+ return castedEventChannel;
+ }
+
+ //
+ // TODO: Find if we can use Java internal support to check for
+ // type mismatch.
+ //
+ if (!genericEventChannel.verifyKeyValueTypes(typeK, typeV)) {
+ throw new ClassCastException("Key-value type mismatch for event channel " + channelName);
+ }
+ @SuppressWarnings("unchecked")
+ IEventChannel<K, V> castedEventChannel =
+ (IEventChannel<K, V>) genericEventChannel;
+ return castedEventChannel;
+ }
+
+ /**
+ * Add event channel listener.
+ * <p/>
+ * NOTE: The channel is started automatically right after the listener
+ * is added.
+ *
+ * @param channelName the event channel name.
+ * @param listener the listener to add.
+ * @param <K> the type of the Key in the Key-Value store.
+ * @param <V> the type of the Value in the Key-Value store.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ @Override
+ public <K, V> IEventChannel<K, V> addListener(String channelName,
+ IEventChannelListener<K, V> listener,
+ Class<K> typeK, Class<V> typeV) {
+ synchronized (eventChannels) {
+ IEventChannel<K, V> eventChannel =
+ createChannelImpl(channelName, typeK, typeV);
+ eventChannel.addListener(listener);
+ eventChannel.startup();
+
+ return eventChannel;
+ }
+ }
+
+ /**
+ * Remove event channel listener.
+ *
+ * @param <K> the type of the Key in the Key-Value store.
+ * @param <V> the type of the Value in the Key-Value store.
+ * @param channelName the event channel name.
+ * @param listener the listener to remove.
+ */
+ @Override
+ public <K, V> void removeListener(String channelName,
+ IEventChannelListener<K, V> listener) {
+ synchronized (eventChannels) {
+ IEventChannel<?, ?> genericEventChannel =
+ eventChannels.get(channelName);
+
+ if (genericEventChannel != null) {
+ //
+ // TODO: Find if we can use Java internal support to check for
+ // type mismatch.
+ // NOTE: Using "ClassCastException" exception below doesn't
+ // work.
+ //
+ @SuppressWarnings("unchecked")
+ IEventChannel<K, V> castedEventChannel =
+ (IEventChannel<K, V>) genericEventChannel;
+ castedEventChannel.removeListener(listener);
+ }
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/HazelcastEventChannel.java b/src/main/java/net/onrc/onos/core/datagrid/HazelcastEventChannel.java
new file mode 100644
index 0000000..8797dba
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/HazelcastEventChannel.java
@@ -0,0 +1,399 @@
+package net.onrc.onos.core.datagrid;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+
+import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+
+/**
+ * A datagrid event channel that uses Hazelcast as a datagrid.
+ *
+ * @param <K> The class type of the key.
+ * @param <V> The class type of the value.
+ */
+public class HazelcastEventChannel<K, V> implements IEventChannel<K, V> {
+ private final HazelcastInstance hazelcastInstance; // The Hazelcast instance
+ private final String channelName; // The event channel name
+ private final Class<K> typeK; // The class type of the key
+ private final Class<V> typeV; // The class type of the value
+ private IMap<K, byte[]> channelMap; // The Hazelcast channel map
+ // The channel listeners
+ private final CopyOnWriteArrayList<IEventChannelListener<K, V>> listeners =
+ new CopyOnWriteArrayList<>();
+
+ // The map entry listener
+ private final EntryListener<K, byte[]> mapEntryListener = new MapEntryListener();
+ private String mapListenerId; // The map listener ID
+
+ // TODO: We should use a single global KryoFactory instance
+ private final KryoFactory kryoFactory = new KryoFactory();
+
+ // Maximum serialized event size
+ private static final int MAX_BUFFER_SIZE = 64 * 1024;
+
+ /**
+ * Constructor for a given event channel name.
+ *
+ * @param newHazelcastInstance the Hazelcast instance to use.
+ * @param newChannelName the event channel name.
+ * @param newTypeK the type of the Key in the Key-Value store.
+ * @param newTypeV the type of the Value in the Key-Value store.
+ */
+ public HazelcastEventChannel(HazelcastInstance newHazelcastInstance,
+ String newChannelName, Class<K> newTypeK,
+ Class<V> newTypeV) {
+ hazelcastInstance = newHazelcastInstance;
+ channelName = newChannelName;
+ typeK = newTypeK;
+ typeV = newTypeV;
+ }
+
+ /**
+ * Verify the key and value types of a channel.
+ *
+ * @param typeKToVerify the type of the key to verify.
+ * @param typeVToVerify the type of the value to verify.
+ * @return true if the key and value types of the channel match,
+ * otherwise false.
+ */
+ @Override
+ public boolean verifyKeyValueTypes(Class typeKToVerify,
+ Class typeVToVerify) {
+ return (typeK.equals(typeKToVerify)) && (typeV.equals(typeVToVerify));
+ }
+
+ /**
+ * Cleanup and destroy the channel.
+ */
+ @Override
+ protected void finalize() {
+ shutdown();
+ }
+
+ /**
+ * Startup the channel operation.
+ */
+ @Override
+ public void startup() {
+ if (channelMap == null) {
+ channelMap = hazelcastInstance.getMap(channelName);
+ mapListenerId = channelMap.addEntryListener(mapEntryListener,
+ true);
+ }
+ }
+
+ /**
+ * Shutdown the channel operation.
+ */
+ @Override
+ public void shutdown() {
+ if (channelMap != null) {
+ channelMap.removeEntryListener(mapListenerId);
+ channelMap = null;
+ mapListenerId = null;
+ }
+ }
+
+ /**
+ * Add event channel listener.
+ *
+ * @param listener the listener to add.
+ */
+ @Override
+ public void addListener(IEventChannelListener<K, V> listener) {
+ if (listeners.contains(listener)) {
+ return; // Nothing to do: already a listener
+ }
+ listeners.add(listener);
+ }
+
+ /**
+ * Remove event channel listener.
+ *
+ * @param listener the listener to remove.
+ */
+ @Override
+ public void removeListener(IEventChannelListener<K, V> listener) {
+ listeners.remove(listener);
+ }
+
+ /**
+ * Add an entry to the channel.
+ *
+ * @param key the key of the entry to add.
+ * @param value the value of the entry to add.
+ */
+ @Override
+ public void addEntry(K key, V value) {
+ byte[] valueBytes = serializeValue(value);
+ //
+ // Put the entry in the map:
+ // - Key : Type <K>
+ // - Value : Serialized Value (byte[])
+ //
+ channelMap.putAsync(key, valueBytes);
+ }
+
+ /**
+ * Add a transient entry to the channel.
+ * <p/>
+ * The added entry is transient and will automatically timeout after 1ms.
+ *
+ * @param key the key of the entry to add.
+ * @param value the value of the entry to add.
+ */
+ @Override
+ public void addTransientEntry(K key, V value) {
+ byte[] valueBytes = serializeValue(value);
+ //
+ // Put the entry in the map:
+ // - Key : Type <K>
+ // - Value : Serialized Value (byte[])
+ // - Timeout: 1ms
+ //
+ channelMap.putAsync(key, valueBytes, 1L, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Serialize the value.
+ *
+ * @param value the value to serialize.
+ * @return the serialized value.
+ */
+ private byte[] serializeValue(V value) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, value);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ return valueBytes;
+ }
+
+ /**
+ * Remove an entry from the channel.
+ *
+ * @param key the key of the entry to remove.
+ */
+ @Override
+ public void removeEntry(K key) {
+ //
+ // Remove the entry:
+ // - Key : Type <K>
+ // - Value : Serialized Value (byte[])
+ //
+ channelMap.removeAsync(key);
+ }
+
+ /**
+ * Update an entry in the channel.
+ *
+ * @param key the key of the entry to update.
+ * @param value the value of the entry to update.
+ */
+ @Override
+ public void updateEntry(K key, V value) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ addEntry(key, value);
+ }
+
+ /**
+ * Get an entry from the channel.
+ *
+ * @param key the key of the entry to get.
+ * @return the entry if found, otherwise null.
+ */
+ @Override
+ @Deprecated
+ public V getEntry(K key) {
+ byte[] valueBytes = channelMap.get(key);
+ if (valueBytes == null) {
+ return null;
+ }
+
+ Kryo kryo = kryoFactory.newKryo();
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ V value = (V) kryo.readObject(input, typeV);
+ kryoFactory.deleteKryo(kryo);
+
+ return value;
+ }
+
+ /**
+ * Get all entries in the channel.
+ *
+ * @return all entries that are currently in the channel.
+ */
+ @Override
+ @Deprecated
+ public Collection<V> getAllEntries() {
+ Collection<V> allEntries = new LinkedList<V>();
+
+ if (channelMap == null) {
+ return allEntries; // Nothing found
+ }
+
+ //
+ // Get all entries
+ //
+ Collection<byte[]> values = channelMap.values();
+ Kryo kryo = kryoFactory.newKryo();
+ for (byte[] valueBytes : values) {
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ V value = (V) kryo.readObject(input, typeV);
+ allEntries.add(value);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allEntries;
+ }
+
+ /**
+ * Remove all entries in the channel.
+ */
+ @Override
+ @Deprecated
+ public void removeAllEntries() {
+ //
+ // Remove all entries
+ //
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // channelMap.clear();
+ Set<K> keySet = channelMap.keySet();
+ for (K key : keySet) {
+ channelMap.removeAsync(key);
+ }
+ }
+
+ /**
+ * Class for receiving event notifications for the channel.
+ * <p/>
+ * The datagrid map is:
+ * - Key: Type K
+ * - Value: Serialized V (byte[])
+ */
+ private class MapEntryListener implements EntryListener<K, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ @Override
+ public void entryAdded(EntryEvent<K, byte[]> event) {
+ //
+ // Decode the value
+ //
+ byte[] valueBytes = event.getValue();
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ V value = (V) kryo.readObject(input, typeV);
+
+ //
+ // Deliver the notification
+ //
+ int index = 0;
+ for (IEventChannelListener<K, V> listener : listeners) {
+ V copyValue = value;
+ if (index++ > 0) {
+ // Each listener should get a deep copy of the value
+ copyValue = kryo.copy(value);
+ }
+ listener.entryAdded(copyValue);
+ }
+ kryoFactory.deleteKryo(kryo);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ @Override
+ public void entryRemoved(EntryEvent<K, byte[]> event) {
+ //
+ // Decode the value
+ //
+ byte[] valueBytes = event.getValue();
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ V value = (V) kryo.readObject(input, typeV);
+
+ //
+ // Deliver the notification
+ //
+ int index = 0;
+ for (IEventChannelListener<K, V> listener : listeners) {
+ V copyValue = value;
+ if (index++ > 0) {
+ // Each listener should get a deep copy of the value
+ copyValue = kryo.copy(value);
+ }
+ listener.entryRemoved(copyValue);
+ }
+ kryoFactory.deleteKryo(kryo);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ @Override
+ public void entryUpdated(EntryEvent<K, byte[]> event) {
+ //
+ // Decode the value
+ //
+ byte[] valueBytes = event.getValue();
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ V value = (V) kryo.readObject(input, typeV);
+
+ //
+ // Deliver the notification
+ //
+ int index = 0;
+ for (IEventChannelListener<K, V> listener : listeners) {
+ V copyValue = value;
+ if (index++ > 0) {
+ // Each listener should get a deep copy of the value
+ copyValue = kryo.copy(value);
+ }
+ listener.entryUpdated(copyValue);
+ }
+ kryoFactory.deleteKryo(kryo);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ @Override
+ public void entryEvicted(EntryEvent<K, byte[]> event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/core/datagrid/IDatagridService.java
new file mode 100755
index 0000000..5fe52cc
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/IDatagridService.java
@@ -0,0 +1,47 @@
+package net.onrc.onos.core.datagrid;
+
+import net.floodlightcontroller.core.module.IFloodlightService;
+
+/**
+ * Interface for providing Datagrid Service to other modules.
+ */
+public interface IDatagridService extends IFloodlightService {
+ /**
+ * Create an event channel.
+ * <p/>
+ * If the channel already exists, just return it.
+ * NOTE: The channel is started automatically.
+ *
+ * @param channelName the event channel name.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ <K, V> IEventChannel<K, V> createChannel(String channelName,
+ Class<K> typeK, Class<V> typeV);
+
+ /**
+ * Add event channel listener.
+ * <p/>
+ * NOTE: The channel is started automatically right after the listener
+ * is added.
+ *
+ * @param channelName the event channel name.
+ * @param listener the listener to add.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ <K, V> IEventChannel<K, V> addListener(String channelName,
+ IEventChannelListener<K, V> listener,
+ Class<K> typeK, Class<V> typeV);
+
+ /**
+ * Remove event channel listener.
+ *
+ * @param channelName the event channel name.
+ * @param listener the listener to remove.
+ */
+ <K, V> void removeListener(String channelName,
+ IEventChannelListener<K, V> listener);
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/IEventChannel.java b/src/main/java/net/onrc/onos/core/datagrid/IEventChannel.java
new file mode 100644
index 0000000..5874b31
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/IEventChannel.java
@@ -0,0 +1,98 @@
+package net.onrc.onos.core.datagrid;
+
+import java.util.Collection;
+
+/**
+ * Event Channel Interface.
+ */
+public interface IEventChannel<K, V> {
+ /**
+ * Startup the channel operation.
+ */
+ void startup();
+
+ /**
+ * Shutdown the channel operation.
+ */
+ void shutdown();
+
+ /**
+ * Verify the key and value types of a channel.
+ *
+ * @param typeK the type of the key to verify.
+ * @param typeV the type of the value to verify.
+ * @return true if the key and value types of the channel match,
+ * otherwise false.
+ */
+ boolean verifyKeyValueTypes(Class typeK, Class typeV);
+
+ /**
+ * Add event channel listener.
+ *
+ * @param listener the listener to add.
+ */
+ void addListener(IEventChannelListener<K, V> listener);
+
+ /**
+ * Remove event channel listener.
+ *
+ * @param listener the listener to remove.
+ */
+ void removeListener(IEventChannelListener<K, V> listener);
+
+ /**
+ * Add an entry to the channel.
+ *
+ * @param key the key of the entry to add.
+ * @param value the value of the entry to add.
+ */
+ void addEntry(K key, V value);
+
+ /**
+ * Add a transient entry to the channel.
+ * <p/>
+ * The added entry is transient and will automatically timeout after 1ms.
+ *
+ * @param key the key of the entry to add.
+ * @param value the value of the entry to add.
+ */
+ void addTransientEntry(K key, V value);
+
+ /**
+ * Remove an entry from the channel.
+ *
+ * @param key the key of the entry to remove.
+ */
+ void removeEntry(K key);
+
+ /**
+ * Update an entry in the channel.
+ *
+ * @param key the key of the entry to update.
+ * @param value the value of the entry to update.
+ */
+ void updateEntry(K key, V value);
+
+ /**
+ * Get an entry from the channel.
+ *
+ * @param key the key of the entry to get.
+ * @return the entry if found, otherwise null.
+ */
+ @Deprecated
+ V getEntry(K key);
+
+ /**
+ * Get all entries in the channel.
+ *
+ * @return all entries that are currently in the channel.
+ */
+ @Deprecated
+ Collection<V> getAllEntries();
+
+ /**
+ * Remove all entries in the channel.
+ */
+ @Deprecated
+ void removeAllEntries();
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/IEventChannelListener.java b/src/main/java/net/onrc/onos/core/datagrid/IEventChannelListener.java
new file mode 100644
index 0000000..ca18aa7
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/IEventChannelListener.java
@@ -0,0 +1,27 @@
+package net.onrc.onos.core.datagrid;
+
+/**
+ * Event Channel Listener Interface.
+ */
+public interface IEventChannelListener<K, V> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param value the value for the entry.
+ */
+ void entryAdded(V value);
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param value the value for the entry.
+ */
+ void entryRemoved(V value);
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param value the value for the entry.
+ */
+ void entryUpdated(V value);
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/web/DatagridWebRoutable.java b/src/main/java/net/onrc/onos/core/datagrid/web/DatagridWebRoutable.java
new file mode 100755
index 0000000..672f66d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/web/DatagridWebRoutable.java
@@ -0,0 +1,37 @@
+package net.onrc.onos.core.datagrid.web;
+
+import net.floodlightcontroller.restserver.RestletRoutable;
+
+import org.restlet.Context;
+import org.restlet.Restlet;
+import org.restlet.routing.Router;
+
+/**
+ * REST API implementation for the Datagrid.
+ */
+public class DatagridWebRoutable implements RestletRoutable {
+ /**
+ * Create the Restlet router and bind to the proper resources.
+ */
+ @Override
+ public Restlet getRestlet(Context context) {
+ Router router = new Router(context);
+ router.attach("/add/intents/json", IntentResource.class);
+ router.attach("/get/intents/json", IntentResource.class);
+ router.attach("/get/intent/{intent_id}/json", IntentResource.class);
+ router.attach("/get/ng-events/json", GetNGEventsResource.class);
+ router.attach("/get/ng-flows/summary/json", GetNGFlowsSummaryResource.class);
+ router.attach("/get/intents/{category}/json", IntentResource.class);
+ router.attach("/get/intent/{category}/{intent_id}/json", IntentResource.class);
+ router.attach("/delete/intents/json", IntentResource.class);
+ return router;
+ }
+
+ /**
+ * Set the base path for the Topology
+ */
+ @Override
+ public String basePath() {
+ return "/wm/onos/datagrid";
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/web/GetNGEventsResource.java b/src/main/java/net/onrc/onos/core/datagrid/web/GetNGEventsResource.java
new file mode 100644
index 0000000..f319d23
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/web/GetNGEventsResource.java
@@ -0,0 +1,41 @@
+package net.onrc.onos.core.datagrid.web;
+
+import java.util.Collection;
+
+import net.onrc.onos.core.datagrid.IDatagridService;
+import net.onrc.onos.core.datagrid.IEventChannel;
+import net.onrc.onos.ofcontroller.networkgraph.TopologyEvent;
+import net.onrc.onos.ofcontroller.networkgraph.TopologyManager;
+
+import org.restlet.resource.Get;
+import org.restlet.resource.ServerResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GetNGEventsResource extends ServerResource {
+
+ public static final Logger log = LoggerFactory.getLogger(GetNGEventsResource.class);
+
+ @Get("json")
+ public String retrieve() {
+ IDatagridService datagridService =
+ (IDatagridService) getContext().getAttributes().
+ get(IDatagridService.class.getCanonicalName());
+
+
+ log.debug("Get network graph events");
+
+ IEventChannel<byte[], TopologyEvent> channel = datagridService.createChannel(TopologyManager.EVENT_CHANNEL_NAME,
+ byte[].class, TopologyEvent.class);
+
+ Collection<TopologyEvent> entries = channel.getAllEntries();
+
+ String result = "";
+ for (TopologyEvent event : entries) {
+ result += event.toString() + "\n";
+ }
+
+ return result;
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/web/GetNGFlowsSummaryResource.java b/src/main/java/net/onrc/onos/core/datagrid/web/GetNGFlowsSummaryResource.java
new file mode 100644
index 0000000..0ebeec9
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/web/GetNGFlowsSummaryResource.java
@@ -0,0 +1,114 @@
+package net.onrc.onos.core.datagrid.web;
+
+import java.util.ArrayList;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import net.onrc.onos.intent.Intent;
+import net.onrc.onos.intent.PathIntent;
+import net.onrc.onos.intent.ShortestPathIntent;
+import net.onrc.onos.intent.Intent.IntentState;
+import net.onrc.onos.intent.IntentMap;
+import net.onrc.onos.intent.runtime.IPathCalcRuntimeService;
+
+import net.onrc.onos.ofcontroller.networkgraph.LinkEvent;
+import net.onrc.onos.ofcontroller.networkgraph.Path;
+import net.onrc.onos.ofcontroller.util.CallerId;
+import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.FlowPathType;
+import net.onrc.onos.ofcontroller.util.FlowPathUserState;
+import net.onrc.onos.ofcontroller.util.Port;
+import net.onrc.onos.ofcontroller.util.SwitchPort;
+
+import org.restlet.resource.Get;
+import org.restlet.resource.ServerResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * REST API call to get a summary of Flow Paths.
+ *
+ * NOTE: This REST API call is needed for the ONOS GUI.
+ *
+ * GET /wm/onos/datagrid/get/ng-flows/summary/json
+ */
+public class GetNGFlowsSummaryResource extends ServerResource {
+ public static final Logger log = LoggerFactory.getLogger(GetNGFlowsSummaryResource.class);
+
+ @Get("json")
+ public ArrayList<FlowPath> retrieve() {
+ ArrayList<FlowPath> result = new ArrayList<>();
+ SortedMap<Long, FlowPath> sortedFlowPaths = new TreeMap<>();
+
+ IPathCalcRuntimeService pathRuntime =
+ (IPathCalcRuntimeService) getContext().
+ getAttributes().get(IPathCalcRuntimeService.class.getCanonicalName());
+ log.debug("Get NG Flows Summary");
+
+
+ IntentMap parentIntentMap = pathRuntime.getHighLevelIntents();
+ IntentMap intentMap = pathRuntime.getPathIntents();
+ for (Intent parentIntent : parentIntentMap.getAllIntents()) {
+ // Get only installed Shortest Paths
+ if (parentIntent.getState() != IntentState.INST_ACK)
+ continue;
+ if (!(parentIntent instanceof ShortestPathIntent))
+ continue;
+ ShortestPathIntent spIntent = (ShortestPathIntent) parentIntent;
+
+ // Get the Path Intent
+ Intent intent = intentMap.getIntent(spIntent.getPathIntentId());
+ if (!(intent instanceof PathIntent))
+ continue;
+ PathIntent pathIntent = (PathIntent) intent;
+
+ // Decode the Shortest Path ID
+ String applnIntentId = parentIntent.getId();
+ String intentId = applnIntentId.split(":")[1];
+
+ // Create the Flow Path
+ FlowId flowId = new FlowId(intentId);
+ FlowPath flowPath = new FlowPath();
+ flowPath.setFlowId(flowId);
+ sortedFlowPaths.put(flowPath.flowId().value(), flowPath);
+
+ flowPath.setInstallerId(new CallerId("E"));
+ flowPath.setFlowEntryActions(null);
+ flowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
+ flowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
+
+ // Setup the Source and Destination DPID and Port
+ SwitchPort srcPort = flowPath.dataPath().srcPort();
+ SwitchPort dstPort = flowPath.dataPath().dstPort();
+ srcPort.setDpid(new Dpid(spIntent.getSrcSwitchDpid()));
+ srcPort.setPort(new Port((short) spIntent.getSrcPortNumber()));
+ dstPort.setDpid(new Dpid(spIntent.getDstSwitchDpid()));
+ dstPort.setPort(new Port((short) spIntent.getDstPortNumber()));
+
+ // Extract the Flow Entries
+ Path path = pathIntent.getPath();
+ FlowEntry flowEntry;
+ ArrayList<FlowEntry> flowEntries = new ArrayList<>();
+ for (LinkEvent linkEvent : path) {
+ Dpid dpid = new Dpid(linkEvent.getSrc().getDpid());
+ flowEntry = new FlowEntry();
+ flowEntry.setDpid(dpid);
+ flowEntries.add(flowEntry);
+ }
+ // Add the final Flow Entry
+ flowEntry = new FlowEntry();
+ flowEntry.setDpid(new Dpid(spIntent.getDstSwitchDpid()));
+ flowEntries.add(flowEntry);
+ flowPath.dataPath().setFlowEntries(flowEntries);
+ }
+
+ // Prepare the return result
+ for (FlowPath flowPath : sortedFlowPaths.values())
+ result.add(flowPath);
+
+ return result;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datagrid/web/IntentResource.java b/src/main/java/net/onrc/onos/core/datagrid/web/IntentResource.java
new file mode 100755
index 0000000..7d338b5
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datagrid/web/IntentResource.java
@@ -0,0 +1,230 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package net.onrc.onos.core.datagrid.web;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import net.onrc.onos.intent.ConstrainedShortestPathIntent;
+import net.onrc.onos.intent.ShortestPathIntent;
+import net.onrc.onos.intent.IntentOperation;
+import net.onrc.onos.intent.IntentMap;
+import net.onrc.onos.intent.Intent;
+import net.onrc.onos.intent.runtime.IPathCalcRuntimeService;
+import net.onrc.onos.intent.IntentOperationList;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.restlet.resource.Post;
+import org.restlet.resource.ServerResource;
+import org.codehaus.jackson.map.ObjectMapper;
+import net.floodlightcontroller.util.MACAddress;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.ObjectNode;
+import org.restlet.resource.Delete;
+import org.restlet.resource.Get;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author nickkaranatsios
+ */
+public class IntentResource extends ServerResource {
+ private final static Logger log = LoggerFactory.getLogger(IntentResource.class);
+ // TODO need to assign proper application id.
+ private final String APPLN_ID = "1";
+
+ @Post("json")
+ public String store(String jsonIntent) throws IOException {
+ IPathCalcRuntimeService pathRuntime = (IPathCalcRuntimeService) getContext()
+ .getAttributes().get(IPathCalcRuntimeService.class.getCanonicalName());
+ if (pathRuntime == null) {
+ log.warn("Failed to get path calc runtime");
+ return "";
+ }
+ String reply = "";
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode jNode = null;
+ try {
+ jNode = mapper.readValue(jsonIntent, JsonNode.class);
+ } catch (JsonGenerationException ex) {
+ log.error("JsonGeneration exception ", ex);
+ } catch (JsonMappingException ex) {
+ log.error("JsonMappingException occurred", ex);
+ } catch (IOException ex) {
+ log.error("IOException occurred", ex);
+ }
+
+ if (jNode != null) {
+ reply = parseJsonNode(jNode.getElements(), pathRuntime);
+ }
+ return reply;
+ }
+
+ @Delete("json")
+ public String store() {
+ IPathCalcRuntimeService pathRuntime = (IPathCalcRuntimeService) getContext().
+ getAttributes().get(IPathCalcRuntimeService.class.getCanonicalName());
+ pathRuntime.purgeIntents();
+ // TODO no reply yet from the purge intents call
+ return "";
+
+ }
+
+ @Get("json")
+ public String retrieve() throws IOException {
+ IPathCalcRuntimeService pathRuntime = (IPathCalcRuntimeService) getContext().
+ getAttributes().get(IPathCalcRuntimeService.class.getCanonicalName());
+
+ String intentCategory = (String) getRequestAttributes().get("category");
+ IntentMap intentMap = null;
+ if (intentCategory.equals("high")) {
+ intentMap = pathRuntime.getHighLevelIntents();
+ } else {
+ intentMap = pathRuntime.getPathIntents();
+ }
+ ObjectMapper mapper = new ObjectMapper();
+ String restStr = "";
+
+ String intentId = (String) getRequestAttributes().get("intent_id");
+ ArrayNode arrayNode = mapper.createArrayNode();
+ Collection<Intent> intents = intentMap.getAllIntents();
+ if (!intents.isEmpty()) {
+ if ((intentId != null)) {
+ String applnIntentId = APPLN_ID + ":" + intentId;
+ Intent intent = intentMap.getIntent(applnIntentId);
+ if (intent != null) {
+ ObjectNode node = mapper.createObjectNode();
+ // TODO refactor/remove duplicate code.
+ node.put("intent_id", intentId);
+ node.put("status", intent.getState().toString());
+ LinkedList<String> logs = intent.getLogs();
+ ArrayNode logNode = mapper.createArrayNode();
+ for (String intentLog : logs) {
+ logNode.add(intentLog);
+ }
+ node.put("log", logNode);
+ arrayNode.add(node);
+ }
+ } else {
+ for (Intent intent : intents) {
+ ObjectNode node = mapper.createObjectNode();
+ String applnIntentId = intent.getId();
+ intentId = applnIntentId.split(":")[1];
+ node.put("intent_id", intentId);
+ node.put("status", intent.getState().toString());
+ LinkedList<String> logs = intent.getLogs();
+ ArrayNode logNode = mapper.createArrayNode();
+ for (String intentLog : logs) {
+ logNode.add(intentLog);
+ }
+ node.put("log", logNode);
+ arrayNode.add(node);
+ }
+ }
+ restStr = mapper.writeValueAsString(arrayNode);
+ }
+ return restStr;
+ }
+
+ private String parseJsonNode(Iterator<JsonNode> nodes,
+ IPathCalcRuntimeService pathRuntime) throws IOException {
+ IntentOperationList operations = new IntentOperationList();
+ ObjectMapper mapper = new ObjectMapper();
+ ArrayNode arrayNode = mapper.createArrayNode();
+ while (nodes.hasNext()) {
+ JsonNode node = nodes.next();
+ if (node.isObject()) {
+ JsonNode data;
+ Iterator<String> fieldNames = node.getFieldNames();
+ Map<String, Object> fields = new HashMap<>();
+ while (fieldNames.hasNext()) {
+ String fieldName = fieldNames.next();
+ data = node.get(fieldName);
+ parseFields(data, fieldName, fields);
+ }
+ Intent intent = processIntent(fields, operations);
+ appendIntentStatus(intent, (String) fields.get("intent_id"), mapper, arrayNode);
+ }
+ }
+ pathRuntime.executeIntentOperations(operations);
+ return mapper.writeValueAsString(arrayNode);
+ }
+
+ private void appendIntentStatus(Intent intent, final String intentId,
+ ObjectMapper mapper, ArrayNode arrayNode) throws IOException {
+ ObjectNode node = mapper.createObjectNode();
+ node.put("intent_id", intentId);
+ node.put("status", intent.getState().toString());
+ LinkedList<String> logs = intent.getLogs();
+ ArrayNode logNode = mapper.createArrayNode();
+ for (String intentLog : logs) {
+ logNode.add(intentLog);
+ }
+ node.put("log", logNode);
+ arrayNode.add(node);
+ }
+
+ private Intent processIntent(Map<String, Object> fields, IntentOperationList operations) {
+ String intentType = (String) fields.get("intent_type");
+ String intentOp = (String) fields.get("intent_op");
+ Intent intent;
+ String intentId = (String) fields.get("intent_id");
+ boolean pathFrozen = false;
+ if (intentId.startsWith("F")) { // TODO define REST API for frozen intents
+ pathFrozen = true;
+ intentId = intentId.substring(1);
+ }
+ String applnIntentId = APPLN_ID + ":" + intentId;
+
+ IntentOperation.Operator operation = IntentOperation.Operator.ADD;
+ if ((intentOp.equals("remove"))) {
+ operation = IntentOperation.Operator.REMOVE;
+ }
+ if (intentType.equals("shortest_intent_type")) {
+ ShortestPathIntent spi = new ShortestPathIntent(applnIntentId,
+ Long.decode((String) fields.get("srcSwitch")),
+ (long) fields.get("srcPort"),
+ MACAddress.valueOf((String) fields.get("srcMac")).toLong(),
+ Long.decode((String) fields.get("dstSwitch")),
+ (long) fields.get("dstPort"),
+ MACAddress.valueOf((String) fields.get("dstMac")).toLong());
+ spi.setPathFrozen(pathFrozen);
+ operations.add(operation, spi);
+ intent = spi;
+ } else {
+ ConstrainedShortestPathIntent cspi = new ConstrainedShortestPathIntent(applnIntentId,
+ Long.decode((String) fields.get("srcSwitch")),
+ (long) fields.get("srcPort"),
+ MACAddress.valueOf((String) fields.get("srcMac")).toLong(),
+ Long.decode((String) fields.get("dstSwitch")),
+ (long) fields.get("dstPort"),
+ MACAddress.valueOf((String) fields.get("dstMac")).toLong(),
+ (double) fields.get("bandwidth"));
+ cspi.setPathFrozen(pathFrozen);
+ operations.add(operation, cspi);
+ intent = cspi;
+ }
+ return intent;
+ }
+
+ private void parseFields(JsonNode node, String fieldName, Map<String, Object> fields) {
+ if ((node.isTextual())) {
+ fields.put(fieldName, node.getTextValue());
+ } else if ((node.isInt())) {
+ fields.put(fieldName, (long) node.getIntValue());
+ } else if (node.isDouble()) {
+ fields.put(fieldName, node.getDoubleValue());
+ } else if ((node.isLong())) {
+ fields.put(fieldName, node.getLongValue());
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/DataStoreClient.java b/src/main/java/net/onrc/onos/core/datastore/DataStoreClient.java
new file mode 100644
index 0000000..7ec6251
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/DataStoreClient.java
@@ -0,0 +1,26 @@
+package net.onrc.onos.core.datastore;
+
+import net.onrc.onos.core.datastore.hazelcast.HZClient;
+import net.onrc.onos.core.datastore.ramcloud.RCClient;
+
+// This class probably need to be a service
+public class DataStoreClient {
+ private static final String BACKEND = System.getProperty("net.onrc.onos.core.datastore.backend", "hazelcast");
+
+ // Suppresses default constructor, ensuring non-instantiability.
+ private DataStoreClient() {}
+
+ public static IKVClient getClient() {
+ // TODO read config and return appropriate IKVClient
+ switch (BACKEND) {
+ case "ramcloud":
+ return RCClient.getClient();
+ case "hazelcast":
+ return HZClient.getClient();
+ default:
+ return HZClient.getClient();
+ }
+ }
+
+
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/IKVClient.java b/src/main/java/net/onrc/onos/core/datastore/IKVClient.java
new file mode 100644
index 0000000..e298548
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/IKVClient.java
@@ -0,0 +1,165 @@
+package net.onrc.onos.core.datastore;
+
+import java.util.Collection;
+import java.util.List;
+
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+
+/**
+ * Interface for a client class used to access the Key-Value store
+ */
+public interface IKVClient {
+
+ public IKVTable getTable(final String tableName);
+
+ /**
+ * Drop table.
+ *
+ * Behavior of IKVTable instances accessing dropped table is undefined.
+ *
+ * @param table IKVTable to drop.
+ */
+ public void dropTable(IKVTable table);
+
+ /**
+ * Create a Key-Value entry on table.
+ *
+ * @param tableId
+ * @param key
+ * @param value
+ * @return version of the created entry
+ * @throws ObjectExistsException
+ */
+ public long create(IKVTableID tableId, byte[] key, byte[] value) throws ObjectExistsException;
+
+ /**
+ * Create a Key-Value entry on table, without existence checking.
+ *
+ * @param tableId
+ * @param key
+ * @param value
+ * @return version of the created entry
+ */
+ public long forceCreate(IKVTableID tableId, byte[] key, byte[] value);
+
+ /**
+ * Read a Key-Value entry from table.
+ *
+ * @param tableId
+ * @param key
+ * @return Corresponding {@link IKVEntry}
+ * @throws ObjectDoesntExistException
+ */
+ public IKVEntry read(IKVTableID tableId, byte[] key) throws ObjectDoesntExistException;
+
+ /**
+ * Update an existing Key-Value entry in table.
+ *
+ * @param tableId
+ * @param key
+ * @param value
+ * @param version
+ * expected version in the data store
+ * @return version after update
+ * @throws ObjectDoesntExistException
+ * @throws WrongVersionException
+ */
+ public long update(IKVTableID tableId, byte[] key, byte[] value, long version)
+ throws ObjectDoesntExistException, WrongVersionException;
+
+ /**
+ * Update an existing Key-Value entry in table, without checking version.
+ *
+ * FIXME remove this method and use forceCreate for this purpose?
+ * @param tableId
+ * @param key
+ * @param value
+ * @return version after update
+ * @throws ObjectDoesntExistException
+ */
+ @Deprecated
+ public long update(IKVTableID tableId, byte[] key, byte[] value)
+ throws ObjectDoesntExistException;
+
+ // TODO Adding serialized value as parameter to this interface may
+ // give an option to improve performance on some backends.
+ /**
+ * Remove an existing Key-Value entry in table
+ *
+ * @param tableId
+ * @param key
+ * @param version
+ * expected version in the data store
+ * @return version of removed object
+ * @throws ObjectDoesntExistException
+ * @throws WrongVersionException
+ */
+ public long delete(IKVTableID tableId, byte[] key, long version)
+ throws ObjectDoesntExistException, WrongVersionException;
+
+ /**
+ * Remove a Key-Value entry in table
+ *
+ * @param tableId
+ * @param key
+ * @return version of removed object or -1, if it did not exist.
+ */
+ public long forceDelete(IKVTableID tableId, byte[] key);
+
+ /**
+ * Get all the entries in table.
+ * @param tableId
+ * @return entries in this table.
+ */
+ public Iterable<IKVEntry> getAllEntries(IKVTableID tableId);
+
+ /**
+ *
+ * @see #create(IKVTableID, byte[], byte[])
+ *
+ * @return IMultiOpEntry for this operation
+ *
+ */
+ public IMultiEntryOperation createOp(IKVTableID tableId, byte[] key, byte[] value);
+
+ public IMultiEntryOperation forceCreateOp(IKVTableID tableId, byte[] key,
+ byte[] value);
+
+ public IMultiEntryOperation readOp(IKVTableID tableId, byte[] key);
+
+ public IMultiEntryOperation updateOp(IKVTableID tableId, byte[] key, byte[] value,
+ long version);
+
+ public IMultiEntryOperation deleteOp(IKVTableID tableId, byte[] key, byte[] value,
+ long version);
+
+ public IMultiEntryOperation forceDeleteOp(IKVTableID tableId, byte[] key);
+
+ /**
+ * Batch delete operation
+ * @param ops delete operations
+ * @return true if failed operation exists
+ */
+ public boolean multiDelete(final Collection<IMultiEntryOperation> ops);
+
+ /**
+ * Batch write operation
+ * @param ops write operations
+ * @return true if failed operation exists
+ */
+ public boolean multiWrite(final List<IMultiEntryOperation> ops);
+
+ /**
+ * Batch read operation
+ * @param ops read operations
+ * @return true if failed operation exists
+ */
+ public boolean multiRead(final Collection<IMultiEntryOperation> ops);
+
+ /**
+ * Version number which represents that the object does not exist, or has
+ * never been read the DB before.
+ */
+ public long VERSION_NONEXISTENT();
+
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/IKVTable.java b/src/main/java/net/onrc/onos/core/datastore/IKVTable.java
new file mode 100644
index 0000000..864b270
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/IKVTable.java
@@ -0,0 +1,112 @@
+package net.onrc.onos.core.datastore;
+
+
+/**
+ * Interface for a class to represent a Table in a Key-Value store
+ */
+public interface IKVTable {
+
+ /**
+ * Version number which represents that the object does not exist, or has
+ * never been read the DB before.
+ */
+ public long VERSION_NONEXISTENT();
+
+ /**
+ * Interface for a class to represent an entry in Table.
+ */
+ public static interface IKVEntry {
+
+ public byte[] getKey();
+
+ public byte[] getValue();
+
+ public long getVersion();
+
+ }
+
+ /**
+ * @return ID to identify this table.
+ */
+ public IKVTableID getTableId();
+
+ /**
+ * Create a Key-Value entry on table.
+ *
+ * @param key
+ * @param value
+ * @return version of the created entry
+ * @throws ObjectExistsException
+ */
+ public long create(byte[] key, byte[] value) throws ObjectExistsException;
+
+ /**
+ * Create a Key-Value entry on table, without existence checking.
+ *
+ * @param key
+ * @param value
+ * @return version of the created entry
+ */
+ public long forceCreate(byte[] key, byte[] value);
+
+ /**
+ * Read a Key-Value entry from table.
+ *
+ * @param key
+ * @return Corresponding {@link IKVEntry}
+ * @throws ObjectDoesntExistException
+ */
+ public IKVEntry read(byte[] key) throws ObjectDoesntExistException;
+
+ /**
+ * Update an existing Key-Value entry in table.
+ *
+ * @param key
+ * @param value
+ * @param version
+ * expected version in the data store
+ * @return version after update
+ * @throws ObjectDoesntExistException
+ * @throws WrongVersionException
+ */
+ public long update(byte[] key, byte[] value, long version)
+ throws ObjectDoesntExistException, WrongVersionException;
+
+ /**
+ * Update an existing Key-Value entry in table, without checking version.
+ *
+ * @param key
+ * @param value
+ * @return version after update
+ * @throws ObjectDoesntExistException
+ */
+ public long update(byte[] key, byte[] value)
+ throws ObjectDoesntExistException;
+
+ /**
+ * Remove an existing Key-Value entry in table
+ *
+ * @param key
+ * @param version
+ * expected version in the data store
+ * @return version of removed object
+ * @throws ObjectDoesntExistException
+ * @throws WrongVersionException
+ */
+ public long delete(byte[] key, long version)
+ throws ObjectDoesntExistException, WrongVersionException;
+
+ /**
+ * Remove a Key-Value entry in table
+ *
+ * @param key
+ * @return version of removed object or VERSION_NONEXISTENT, if it did not exist.
+ */
+ public long forceDelete(byte[] key);
+
+ /**
+ * Get all the entries in table.
+ * @return entries in this table.
+ */
+ public Iterable<IKVEntry> getAllEntries();
+}
\ No newline at end of file
diff --git a/src/main/java/net/onrc/onos/core/datastore/IKVTableID.java b/src/main/java/net/onrc/onos/core/datastore/IKVTableID.java
new file mode 100644
index 0000000..c736908
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/IKVTableID.java
@@ -0,0 +1,7 @@
+package net.onrc.onos.core.datastore;
+
+public interface IKVTableID {
+
+ public String getTableName();
+
+}
\ No newline at end of file
diff --git a/src/main/java/net/onrc/onos/core/datastore/IMultiEntryOperation.java b/src/main/java/net/onrc/onos/core/datastore/IMultiEntryOperation.java
new file mode 100644
index 0000000..535fe95
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/IMultiEntryOperation.java
@@ -0,0 +1,30 @@
+package net.onrc.onos.core.datastore;
+
+/**
+ * Interface for a class to specify which K-V pair to batch read/write/delete.
+ */
+public interface IMultiEntryOperation {
+
+ public enum STATUS {
+ NOT_EXECUTED, SUCCESS, FAILED
+ }
+
+ public enum OPERATION {
+ CREATE, FORCE_CREATE, UPDATE, READ, DELETE, FORCE_DELETE
+ }
+
+ public boolean hasSucceeded();
+
+ public STATUS getStatus();
+
+ public IKVTableID getTableId();
+
+ public byte[] getKey();
+
+ public byte[] getValue();
+
+ public long getVersion();
+
+ public OPERATION getOperation();
+
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/IMultiObjectOperation.java b/src/main/java/net/onrc/onos/core/datastore/IMultiObjectOperation.java
new file mode 100644
index 0000000..898327f
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/IMultiObjectOperation.java
@@ -0,0 +1,9 @@
+package net.onrc.onos.core.datastore;
+
+import net.onrc.onos.core.datastore.utils.KVObject;
+
+public interface IMultiObjectOperation {
+
+ public KVObject getObject();
+
+}
\ No newline at end of file
diff --git a/src/main/java/net/onrc/onos/core/datastore/ObjectDoesntExistException.java b/src/main/java/net/onrc/onos/core/datastore/ObjectDoesntExistException.java
new file mode 100644
index 0000000..06d21ca
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/ObjectDoesntExistException.java
@@ -0,0 +1,25 @@
+package net.onrc.onos.core.datastore;
+
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+
+/**
+ * Exception thrown when object was expected, but not found in data store.
+ */
+public class ObjectDoesntExistException extends RejectRulesException {
+ private static final long serialVersionUID = 859082748533417866L;
+
+ public ObjectDoesntExistException(final String message) {
+ super(message);
+ }
+
+ public ObjectDoesntExistException(final IKVTableID tableID,
+ final byte[] key, final Throwable cause) {
+ super(ByteArrayUtil.toHexStringBuffer(key, ":")
+ + " did not exist on table:" + tableID, cause);
+ }
+
+ public ObjectDoesntExistException(final IKVTableID tableID, final byte[] key) {
+ super(ByteArrayUtil.toHexStringBuffer(key, ":")
+ + " did not exist on table:" + tableID);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/ObjectExistsException.java b/src/main/java/net/onrc/onos/core/datastore/ObjectExistsException.java
new file mode 100644
index 0000000..75c4e4d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/ObjectExistsException.java
@@ -0,0 +1,25 @@
+package net.onrc.onos.core.datastore;
+
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+
+/**
+ * Exception thrown when object was not expected to be in data store.
+ */
+public class ObjectExistsException extends RejectRulesException {
+ private static final long serialVersionUID = -1488647215779909457L;
+
+ public ObjectExistsException(final String message) {
+ super(message);
+ }
+
+ public ObjectExistsException(final IKVTableID tableID, final byte[] key,
+ final Throwable cause) {
+ super(ByteArrayUtil.toHexStringBuffer(key, ":")
+ + " already exist on table:" + tableID, cause);
+ }
+
+ public ObjectExistsException(final IKVTableID tableID, final byte[] key) {
+ super(ByteArrayUtil.toHexStringBuffer(key, ":")
+ + " already exist on table:" + tableID);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/RCProtos.java b/src/main/java/net/onrc/onos/core/datastore/RCProtos.java
new file mode 100644
index 0000000..5e823ee
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/RCProtos.java
@@ -0,0 +1,2177 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: protobuf/ramcloud.proto
+
+package net.onrc.onos.core.datastore;
+
+public final class RCProtos {
+ private RCProtos() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ }
+ public interface SwitchPropertyOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // required int64 dpid = 1;
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ boolean hasDpid();
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ long getDpid();
+
+ // required int32 status = 2;
+ /**
+ * <code>required int32 status = 2;</code>
+ */
+ boolean hasStatus();
+ /**
+ * <code>required int32 status = 2;</code>
+ */
+ int getStatus();
+
+ // optional bytes value = 3;
+ /**
+ * <code>optional bytes value = 3;</code>
+ */
+ boolean hasValue();
+ /**
+ * <code>optional bytes value = 3;</code>
+ */
+ com.google.protobuf.ByteString getValue();
+ }
+ /**
+ * Protobuf type {@code RCProtos.SwitchProperty}
+ */
+ public static final class SwitchProperty extends
+ com.google.protobuf.GeneratedMessage
+ implements SwitchPropertyOrBuilder {
+ // Use SwitchProperty.newBuilder() to construct.
+ private SwitchProperty(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private SwitchProperty(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final SwitchProperty defaultInstance;
+ public static SwitchProperty getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public SwitchProperty getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private SwitchProperty(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 8: {
+ bitField0_ |= 0x00000001;
+ dpid_ = input.readInt64();
+ break;
+ }
+ case 16: {
+ bitField0_ |= 0x00000002;
+ status_ = input.readInt32();
+ break;
+ }
+ case 26: {
+ bitField0_ |= 0x00000004;
+ value_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_SwitchProperty_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_SwitchProperty_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ net.onrc.onos.core.datastore.RCProtos.SwitchProperty.class, net.onrc.onos.core.datastore.RCProtos.SwitchProperty.Builder.class);
+ }
+
+ public static com.google.protobuf.Parser<SwitchProperty> PARSER =
+ new com.google.protobuf.AbstractParser<SwitchProperty>() {
+ public SwitchProperty parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new SwitchProperty(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public com.google.protobuf.Parser<SwitchProperty> getParserForType() {
+ return PARSER;
+ }
+
+ private int bitField0_;
+ // required int64 dpid = 1;
+ public static final int DPID_FIELD_NUMBER = 1;
+ private long dpid_;
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ public boolean hasDpid() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ public long getDpid() {
+ return dpid_;
+ }
+
+ // required int32 status = 2;
+ public static final int STATUS_FIELD_NUMBER = 2;
+ private int status_;
+ /**
+ * <code>required int32 status = 2;</code>
+ */
+ public boolean hasStatus() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>required int32 status = 2;</code>
+ */
+ public int getStatus() {
+ return status_;
+ }
+
+ // optional bytes value = 3;
+ public static final int VALUE_FIELD_NUMBER = 3;
+ private com.google.protobuf.ByteString value_;
+ /**
+ * <code>optional bytes value = 3;</code>
+ */
+ public boolean hasValue() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional bytes value = 3;</code>
+ */
+ public com.google.protobuf.ByteString getValue() {
+ return value_;
+ }
+
+ private void initFields() {
+ dpid_ = 0L;
+ status_ = 0;
+ value_ = com.google.protobuf.ByteString.EMPTY;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasDpid()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasStatus()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeInt64(1, dpid_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeInt32(2, status_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeBytes(3, value_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(1, dpid_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(2, status_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(3, value_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static net.onrc.onos.core.datastore.RCProtos.SwitchProperty parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.SwitchProperty parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.SwitchProperty parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.SwitchProperty parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.SwitchProperty parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.SwitchProperty parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.SwitchProperty parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.SwitchProperty parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.SwitchProperty parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.SwitchProperty parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(net.onrc.onos.core.datastore.RCProtos.SwitchProperty prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code RCProtos.SwitchProperty}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder<Builder>
+ implements net.onrc.onos.core.datastore.RCProtos.SwitchPropertyOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_SwitchProperty_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_SwitchProperty_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ net.onrc.onos.core.datastore.RCProtos.SwitchProperty.class, net.onrc.onos.core.datastore.RCProtos.SwitchProperty.Builder.class);
+ }
+
+ // Construct using net.onrc.onos.core.datastore.RCProtos.SwitchProperty.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ dpid_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ status_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ value_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_SwitchProperty_descriptor;
+ }
+
+ public net.onrc.onos.core.datastore.RCProtos.SwitchProperty getDefaultInstanceForType() {
+ return net.onrc.onos.core.datastore.RCProtos.SwitchProperty.getDefaultInstance();
+ }
+
+ public net.onrc.onos.core.datastore.RCProtos.SwitchProperty build() {
+ net.onrc.onos.core.datastore.RCProtos.SwitchProperty result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public net.onrc.onos.core.datastore.RCProtos.SwitchProperty buildPartial() {
+ net.onrc.onos.core.datastore.RCProtos.SwitchProperty result = new net.onrc.onos.core.datastore.RCProtos.SwitchProperty(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.dpid_ = dpid_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.status_ = status_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.value_ = value_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof net.onrc.onos.core.datastore.RCProtos.SwitchProperty) {
+ return mergeFrom((net.onrc.onos.core.datastore.RCProtos.SwitchProperty)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(net.onrc.onos.core.datastore.RCProtos.SwitchProperty other) {
+ if (other == net.onrc.onos.core.datastore.RCProtos.SwitchProperty.getDefaultInstance()) return this;
+ if (other.hasDpid()) {
+ setDpid(other.getDpid());
+ }
+ if (other.hasStatus()) {
+ setStatus(other.getStatus());
+ }
+ if (other.hasValue()) {
+ setValue(other.getValue());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasDpid()) {
+
+ return false;
+ }
+ if (!hasStatus()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ net.onrc.onos.core.datastore.RCProtos.SwitchProperty parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (net.onrc.onos.core.datastore.RCProtos.SwitchProperty) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // required int64 dpid = 1;
+ private long dpid_ ;
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ public boolean hasDpid() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ public long getDpid() {
+ return dpid_;
+ }
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ public Builder setDpid(long value) {
+ bitField0_ |= 0x00000001;
+ dpid_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ public Builder clearDpid() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ dpid_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // required int32 status = 2;
+ private int status_ ;
+ /**
+ * <code>required int32 status = 2;</code>
+ */
+ public boolean hasStatus() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>required int32 status = 2;</code>
+ */
+ public int getStatus() {
+ return status_;
+ }
+ /**
+ * <code>required int32 status = 2;</code>
+ */
+ public Builder setStatus(int value) {
+ bitField0_ |= 0x00000002;
+ status_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required int32 status = 2;</code>
+ */
+ public Builder clearStatus() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ status_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // optional bytes value = 3;
+ private com.google.protobuf.ByteString value_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * <code>optional bytes value = 3;</code>
+ */
+ public boolean hasValue() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional bytes value = 3;</code>
+ */
+ public com.google.protobuf.ByteString getValue() {
+ return value_;
+ }
+ /**
+ * <code>optional bytes value = 3;</code>
+ */
+ public Builder setValue(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ value_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bytes value = 3;</code>
+ */
+ public Builder clearValue() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ value_ = getDefaultInstance().getValue();
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:RCProtos.SwitchProperty)
+ }
+
+ static {
+ defaultInstance = new SwitchProperty(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:RCProtos.SwitchProperty)
+ }
+
+ public interface PortPropertyOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // required int64 dpid = 1;
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ boolean hasDpid();
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ long getDpid();
+
+ // required int64 number = 2;
+ /**
+ * <code>required int64 number = 2;</code>
+ */
+ boolean hasNumber();
+ /**
+ * <code>required int64 number = 2;</code>
+ */
+ long getNumber();
+
+ // required int32 status = 3;
+ /**
+ * <code>required int32 status = 3;</code>
+ */
+ boolean hasStatus();
+ /**
+ * <code>required int32 status = 3;</code>
+ */
+ int getStatus();
+
+ // optional bytes value = 4;
+ /**
+ * <code>optional bytes value = 4;</code>
+ */
+ boolean hasValue();
+ /**
+ * <code>optional bytes value = 4;</code>
+ */
+ com.google.protobuf.ByteString getValue();
+ }
+ /**
+ * Protobuf type {@code RCProtos.PortProperty}
+ */
+ public static final class PortProperty extends
+ com.google.protobuf.GeneratedMessage
+ implements PortPropertyOrBuilder {
+ // Use PortProperty.newBuilder() to construct.
+ private PortProperty(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private PortProperty(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final PortProperty defaultInstance;
+ public static PortProperty getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public PortProperty getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private PortProperty(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 8: {
+ bitField0_ |= 0x00000001;
+ dpid_ = input.readInt64();
+ break;
+ }
+ case 16: {
+ bitField0_ |= 0x00000002;
+ number_ = input.readInt64();
+ break;
+ }
+ case 24: {
+ bitField0_ |= 0x00000004;
+ status_ = input.readInt32();
+ break;
+ }
+ case 34: {
+ bitField0_ |= 0x00000008;
+ value_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_PortProperty_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_PortProperty_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ net.onrc.onos.core.datastore.RCProtos.PortProperty.class, net.onrc.onos.core.datastore.RCProtos.PortProperty.Builder.class);
+ }
+
+ public static com.google.protobuf.Parser<PortProperty> PARSER =
+ new com.google.protobuf.AbstractParser<PortProperty>() {
+ public PortProperty parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new PortProperty(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public com.google.protobuf.Parser<PortProperty> getParserForType() {
+ return PARSER;
+ }
+
+ private int bitField0_;
+ // required int64 dpid = 1;
+ public static final int DPID_FIELD_NUMBER = 1;
+ private long dpid_;
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ public boolean hasDpid() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ public long getDpid() {
+ return dpid_;
+ }
+
+ // required int64 number = 2;
+ public static final int NUMBER_FIELD_NUMBER = 2;
+ private long number_;
+ /**
+ * <code>required int64 number = 2;</code>
+ */
+ public boolean hasNumber() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>required int64 number = 2;</code>
+ */
+ public long getNumber() {
+ return number_;
+ }
+
+ // required int32 status = 3;
+ public static final int STATUS_FIELD_NUMBER = 3;
+ private int status_;
+ /**
+ * <code>required int32 status = 3;</code>
+ */
+ public boolean hasStatus() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>required int32 status = 3;</code>
+ */
+ public int getStatus() {
+ return status_;
+ }
+
+ // optional bytes value = 4;
+ public static final int VALUE_FIELD_NUMBER = 4;
+ private com.google.protobuf.ByteString value_;
+ /**
+ * <code>optional bytes value = 4;</code>
+ */
+ public boolean hasValue() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional bytes value = 4;</code>
+ */
+ public com.google.protobuf.ByteString getValue() {
+ return value_;
+ }
+
+ private void initFields() {
+ dpid_ = 0L;
+ number_ = 0L;
+ status_ = 0;
+ value_ = com.google.protobuf.ByteString.EMPTY;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasDpid()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasNumber()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasStatus()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeInt64(1, dpid_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeInt64(2, number_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeInt32(3, status_);
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeBytes(4, value_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(1, dpid_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(2, number_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(3, status_);
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(4, value_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static net.onrc.onos.core.datastore.RCProtos.PortProperty parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.PortProperty parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.PortProperty parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.PortProperty parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.PortProperty parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.PortProperty parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.PortProperty parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.PortProperty parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.PortProperty parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.PortProperty parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(net.onrc.onos.core.datastore.RCProtos.PortProperty prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code RCProtos.PortProperty}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder<Builder>
+ implements net.onrc.onos.core.datastore.RCProtos.PortPropertyOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_PortProperty_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_PortProperty_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ net.onrc.onos.core.datastore.RCProtos.PortProperty.class, net.onrc.onos.core.datastore.RCProtos.PortProperty.Builder.class);
+ }
+
+ // Construct using net.onrc.onos.core.datastore.RCProtos.PortProperty.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ dpid_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ number_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ status_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ value_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000008);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_PortProperty_descriptor;
+ }
+
+ public net.onrc.onos.core.datastore.RCProtos.PortProperty getDefaultInstanceForType() {
+ return net.onrc.onos.core.datastore.RCProtos.PortProperty.getDefaultInstance();
+ }
+
+ public net.onrc.onos.core.datastore.RCProtos.PortProperty build() {
+ net.onrc.onos.core.datastore.RCProtos.PortProperty result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public net.onrc.onos.core.datastore.RCProtos.PortProperty buildPartial() {
+ net.onrc.onos.core.datastore.RCProtos.PortProperty result = new net.onrc.onos.core.datastore.RCProtos.PortProperty(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.dpid_ = dpid_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.number_ = number_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.status_ = status_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.value_ = value_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof net.onrc.onos.core.datastore.RCProtos.PortProperty) {
+ return mergeFrom((net.onrc.onos.core.datastore.RCProtos.PortProperty)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(net.onrc.onos.core.datastore.RCProtos.PortProperty other) {
+ if (other == net.onrc.onos.core.datastore.RCProtos.PortProperty.getDefaultInstance()) return this;
+ if (other.hasDpid()) {
+ setDpid(other.getDpid());
+ }
+ if (other.hasNumber()) {
+ setNumber(other.getNumber());
+ }
+ if (other.hasStatus()) {
+ setStatus(other.getStatus());
+ }
+ if (other.hasValue()) {
+ setValue(other.getValue());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasDpid()) {
+
+ return false;
+ }
+ if (!hasNumber()) {
+
+ return false;
+ }
+ if (!hasStatus()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ net.onrc.onos.core.datastore.RCProtos.PortProperty parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (net.onrc.onos.core.datastore.RCProtos.PortProperty) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // required int64 dpid = 1;
+ private long dpid_ ;
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ public boolean hasDpid() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ public long getDpid() {
+ return dpid_;
+ }
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ public Builder setDpid(long value) {
+ bitField0_ |= 0x00000001;
+ dpid_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required int64 dpid = 1;</code>
+ */
+ public Builder clearDpid() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ dpid_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // required int64 number = 2;
+ private long number_ ;
+ /**
+ * <code>required int64 number = 2;</code>
+ */
+ public boolean hasNumber() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>required int64 number = 2;</code>
+ */
+ public long getNumber() {
+ return number_;
+ }
+ /**
+ * <code>required int64 number = 2;</code>
+ */
+ public Builder setNumber(long value) {
+ bitField0_ |= 0x00000002;
+ number_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required int64 number = 2;</code>
+ */
+ public Builder clearNumber() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ number_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // required int32 status = 3;
+ private int status_ ;
+ /**
+ * <code>required int32 status = 3;</code>
+ */
+ public boolean hasStatus() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>required int32 status = 3;</code>
+ */
+ public int getStatus() {
+ return status_;
+ }
+ /**
+ * <code>required int32 status = 3;</code>
+ */
+ public Builder setStatus(int value) {
+ bitField0_ |= 0x00000004;
+ status_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required int32 status = 3;</code>
+ */
+ public Builder clearStatus() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ status_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // optional bytes value = 4;
+ private com.google.protobuf.ByteString value_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * <code>optional bytes value = 4;</code>
+ */
+ public boolean hasValue() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional bytes value = 4;</code>
+ */
+ public com.google.protobuf.ByteString getValue() {
+ return value_;
+ }
+ /**
+ * <code>optional bytes value = 4;</code>
+ */
+ public Builder setValue(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000008;
+ value_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bytes value = 4;</code>
+ */
+ public Builder clearValue() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ value_ = getDefaultInstance().getValue();
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:RCProtos.PortProperty)
+ }
+
+ static {
+ defaultInstance = new PortProperty(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:RCProtos.PortProperty)
+ }
+
+ public interface LinkPropertyOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // required bytes srcSwId = 1;
+ /**
+ * <code>required bytes srcSwId = 1;</code>
+ */
+ boolean hasSrcSwId();
+ /**
+ * <code>required bytes srcSwId = 1;</code>
+ */
+ com.google.protobuf.ByteString getSrcSwId();
+
+ // required bytes srcPortId = 2;
+ /**
+ * <code>required bytes srcPortId = 2;</code>
+ */
+ boolean hasSrcPortId();
+ /**
+ * <code>required bytes srcPortId = 2;</code>
+ */
+ com.google.protobuf.ByteString getSrcPortId();
+
+ // required bytes dstSwId = 3;
+ /**
+ * <code>required bytes dstSwId = 3;</code>
+ */
+ boolean hasDstSwId();
+ /**
+ * <code>required bytes dstSwId = 3;</code>
+ */
+ com.google.protobuf.ByteString getDstSwId();
+
+ // required bytes dstPortId = 4;
+ /**
+ * <code>required bytes dstPortId = 4;</code>
+ */
+ boolean hasDstPortId();
+ /**
+ * <code>required bytes dstPortId = 4;</code>
+ */
+ com.google.protobuf.ByteString getDstPortId();
+
+ // required int32 status = 5;
+ /**
+ * <code>required int32 status = 5;</code>
+ */
+ boolean hasStatus();
+ /**
+ * <code>required int32 status = 5;</code>
+ */
+ int getStatus();
+
+ // optional bytes value = 6;
+ /**
+ * <code>optional bytes value = 6;</code>
+ */
+ boolean hasValue();
+ /**
+ * <code>optional bytes value = 6;</code>
+ */
+ com.google.protobuf.ByteString getValue();
+ }
+ /**
+ * Protobuf type {@code RCProtos.LinkProperty}
+ */
+ public static final class LinkProperty extends
+ com.google.protobuf.GeneratedMessage
+ implements LinkPropertyOrBuilder {
+ // Use LinkProperty.newBuilder() to construct.
+ private LinkProperty(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private LinkProperty(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final LinkProperty defaultInstance;
+ public static LinkProperty getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public LinkProperty getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private LinkProperty(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 10: {
+ bitField0_ |= 0x00000001;
+ srcSwId_ = input.readBytes();
+ break;
+ }
+ case 18: {
+ bitField0_ |= 0x00000002;
+ srcPortId_ = input.readBytes();
+ break;
+ }
+ case 26: {
+ bitField0_ |= 0x00000004;
+ dstSwId_ = input.readBytes();
+ break;
+ }
+ case 34: {
+ bitField0_ |= 0x00000008;
+ dstPortId_ = input.readBytes();
+ break;
+ }
+ case 40: {
+ bitField0_ |= 0x00000010;
+ status_ = input.readInt32();
+ break;
+ }
+ case 50: {
+ bitField0_ |= 0x00000020;
+ value_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_LinkProperty_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_LinkProperty_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ net.onrc.onos.core.datastore.RCProtos.LinkProperty.class, net.onrc.onos.core.datastore.RCProtos.LinkProperty.Builder.class);
+ }
+
+ public static com.google.protobuf.Parser<LinkProperty> PARSER =
+ new com.google.protobuf.AbstractParser<LinkProperty>() {
+ public LinkProperty parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new LinkProperty(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public com.google.protobuf.Parser<LinkProperty> getParserForType() {
+ return PARSER;
+ }
+
+ private int bitField0_;
+ // required bytes srcSwId = 1;
+ public static final int SRCSWID_FIELD_NUMBER = 1;
+ private com.google.protobuf.ByteString srcSwId_;
+ /**
+ * <code>required bytes srcSwId = 1;</code>
+ */
+ public boolean hasSrcSwId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>required bytes srcSwId = 1;</code>
+ */
+ public com.google.protobuf.ByteString getSrcSwId() {
+ return srcSwId_;
+ }
+
+ // required bytes srcPortId = 2;
+ public static final int SRCPORTID_FIELD_NUMBER = 2;
+ private com.google.protobuf.ByteString srcPortId_;
+ /**
+ * <code>required bytes srcPortId = 2;</code>
+ */
+ public boolean hasSrcPortId() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>required bytes srcPortId = 2;</code>
+ */
+ public com.google.protobuf.ByteString getSrcPortId() {
+ return srcPortId_;
+ }
+
+ // required bytes dstSwId = 3;
+ public static final int DSTSWID_FIELD_NUMBER = 3;
+ private com.google.protobuf.ByteString dstSwId_;
+ /**
+ * <code>required bytes dstSwId = 3;</code>
+ */
+ public boolean hasDstSwId() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>required bytes dstSwId = 3;</code>
+ */
+ public com.google.protobuf.ByteString getDstSwId() {
+ return dstSwId_;
+ }
+
+ // required bytes dstPortId = 4;
+ public static final int DSTPORTID_FIELD_NUMBER = 4;
+ private com.google.protobuf.ByteString dstPortId_;
+ /**
+ * <code>required bytes dstPortId = 4;</code>
+ */
+ public boolean hasDstPortId() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>required bytes dstPortId = 4;</code>
+ */
+ public com.google.protobuf.ByteString getDstPortId() {
+ return dstPortId_;
+ }
+
+ // required int32 status = 5;
+ public static final int STATUS_FIELD_NUMBER = 5;
+ private int status_;
+ /**
+ * <code>required int32 status = 5;</code>
+ */
+ public boolean hasStatus() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * <code>required int32 status = 5;</code>
+ */
+ public int getStatus() {
+ return status_;
+ }
+
+ // optional bytes value = 6;
+ public static final int VALUE_FIELD_NUMBER = 6;
+ private com.google.protobuf.ByteString value_;
+ /**
+ * <code>optional bytes value = 6;</code>
+ */
+ public boolean hasValue() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional bytes value = 6;</code>
+ */
+ public com.google.protobuf.ByteString getValue() {
+ return value_;
+ }
+
+ private void initFields() {
+ srcSwId_ = com.google.protobuf.ByteString.EMPTY;
+ srcPortId_ = com.google.protobuf.ByteString.EMPTY;
+ dstSwId_ = com.google.protobuf.ByteString.EMPTY;
+ dstPortId_ = com.google.protobuf.ByteString.EMPTY;
+ status_ = 0;
+ value_ = com.google.protobuf.ByteString.EMPTY;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasSrcSwId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasSrcPortId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasDstSwId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasDstPortId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasStatus()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(1, srcSwId_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, srcPortId_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeBytes(3, dstSwId_);
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeBytes(4, dstPortId_);
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeInt32(5, status_);
+ }
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeBytes(6, value_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(1, srcSwId_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, srcPortId_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(3, dstSwId_);
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(4, dstPortId_);
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(5, status_);
+ }
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(6, value_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static net.onrc.onos.core.datastore.RCProtos.LinkProperty parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.LinkProperty parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.LinkProperty parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.LinkProperty parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.LinkProperty parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.LinkProperty parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.LinkProperty parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.LinkProperty parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.LinkProperty parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static net.onrc.onos.core.datastore.RCProtos.LinkProperty parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(net.onrc.onos.core.datastore.RCProtos.LinkProperty prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code RCProtos.LinkProperty}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder<Builder>
+ implements net.onrc.onos.core.datastore.RCProtos.LinkPropertyOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_LinkProperty_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_LinkProperty_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ net.onrc.onos.core.datastore.RCProtos.LinkProperty.class, net.onrc.onos.core.datastore.RCProtos.LinkProperty.Builder.class);
+ }
+
+ // Construct using net.onrc.onos.core.datastore.RCProtos.LinkProperty.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ srcSwId_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ srcPortId_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ dstSwId_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ dstPortId_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000008);
+ status_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000010);
+ value_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000020);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return net.onrc.onos.core.datastore.RCProtos.internal_static_ProtoBuffer_LinkProperty_descriptor;
+ }
+
+ public net.onrc.onos.core.datastore.RCProtos.LinkProperty getDefaultInstanceForType() {
+ return net.onrc.onos.core.datastore.RCProtos.LinkProperty.getDefaultInstance();
+ }
+
+ public net.onrc.onos.core.datastore.RCProtos.LinkProperty build() {
+ net.onrc.onos.core.datastore.RCProtos.LinkProperty result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public net.onrc.onos.core.datastore.RCProtos.LinkProperty buildPartial() {
+ net.onrc.onos.core.datastore.RCProtos.LinkProperty result = new net.onrc.onos.core.datastore.RCProtos.LinkProperty(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.srcSwId_ = srcSwId_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.srcPortId_ = srcPortId_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.dstSwId_ = dstSwId_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.dstPortId_ = dstPortId_;
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.status_ = status_;
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.value_ = value_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof net.onrc.onos.core.datastore.RCProtos.LinkProperty) {
+ return mergeFrom((net.onrc.onos.core.datastore.RCProtos.LinkProperty)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(net.onrc.onos.core.datastore.RCProtos.LinkProperty other) {
+ if (other == net.onrc.onos.core.datastore.RCProtos.LinkProperty.getDefaultInstance()) return this;
+ if (other.hasSrcSwId()) {
+ setSrcSwId(other.getSrcSwId());
+ }
+ if (other.hasSrcPortId()) {
+ setSrcPortId(other.getSrcPortId());
+ }
+ if (other.hasDstSwId()) {
+ setDstSwId(other.getDstSwId());
+ }
+ if (other.hasDstPortId()) {
+ setDstPortId(other.getDstPortId());
+ }
+ if (other.hasStatus()) {
+ setStatus(other.getStatus());
+ }
+ if (other.hasValue()) {
+ setValue(other.getValue());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasSrcSwId()) {
+
+ return false;
+ }
+ if (!hasSrcPortId()) {
+
+ return false;
+ }
+ if (!hasDstSwId()) {
+
+ return false;
+ }
+ if (!hasDstPortId()) {
+
+ return false;
+ }
+ if (!hasStatus()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ net.onrc.onos.core.datastore.RCProtos.LinkProperty parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (net.onrc.onos.core.datastore.RCProtos.LinkProperty) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // required bytes srcSwId = 1;
+ private com.google.protobuf.ByteString srcSwId_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * <code>required bytes srcSwId = 1;</code>
+ */
+ public boolean hasSrcSwId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>required bytes srcSwId = 1;</code>
+ */
+ public com.google.protobuf.ByteString getSrcSwId() {
+ return srcSwId_;
+ }
+ /**
+ * <code>required bytes srcSwId = 1;</code>
+ */
+ public Builder setSrcSwId(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ srcSwId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required bytes srcSwId = 1;</code>
+ */
+ public Builder clearSrcSwId() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ srcSwId_ = getDefaultInstance().getSrcSwId();
+ onChanged();
+ return this;
+ }
+
+ // required bytes srcPortId = 2;
+ private com.google.protobuf.ByteString srcPortId_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * <code>required bytes srcPortId = 2;</code>
+ */
+ public boolean hasSrcPortId() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>required bytes srcPortId = 2;</code>
+ */
+ public com.google.protobuf.ByteString getSrcPortId() {
+ return srcPortId_;
+ }
+ /**
+ * <code>required bytes srcPortId = 2;</code>
+ */
+ public Builder setSrcPortId(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ srcPortId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required bytes srcPortId = 2;</code>
+ */
+ public Builder clearSrcPortId() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ srcPortId_ = getDefaultInstance().getSrcPortId();
+ onChanged();
+ return this;
+ }
+
+ // required bytes dstSwId = 3;
+ private com.google.protobuf.ByteString dstSwId_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * <code>required bytes dstSwId = 3;</code>
+ */
+ public boolean hasDstSwId() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>required bytes dstSwId = 3;</code>
+ */
+ public com.google.protobuf.ByteString getDstSwId() {
+ return dstSwId_;
+ }
+ /**
+ * <code>required bytes dstSwId = 3;</code>
+ */
+ public Builder setDstSwId(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ dstSwId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required bytes dstSwId = 3;</code>
+ */
+ public Builder clearDstSwId() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ dstSwId_ = getDefaultInstance().getDstSwId();
+ onChanged();
+ return this;
+ }
+
+ // required bytes dstPortId = 4;
+ private com.google.protobuf.ByteString dstPortId_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * <code>required bytes dstPortId = 4;</code>
+ */
+ public boolean hasDstPortId() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>required bytes dstPortId = 4;</code>
+ */
+ public com.google.protobuf.ByteString getDstPortId() {
+ return dstPortId_;
+ }
+ /**
+ * <code>required bytes dstPortId = 4;</code>
+ */
+ public Builder setDstPortId(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000008;
+ dstPortId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required bytes dstPortId = 4;</code>
+ */
+ public Builder clearDstPortId() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ dstPortId_ = getDefaultInstance().getDstPortId();
+ onChanged();
+ return this;
+ }
+
+ // required int32 status = 5;
+ private int status_ ;
+ /**
+ * <code>required int32 status = 5;</code>
+ */
+ public boolean hasStatus() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * <code>required int32 status = 5;</code>
+ */
+ public int getStatus() {
+ return status_;
+ }
+ /**
+ * <code>required int32 status = 5;</code>
+ */
+ public Builder setStatus(int value) {
+ bitField0_ |= 0x00000010;
+ status_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required int32 status = 5;</code>
+ */
+ public Builder clearStatus() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ status_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // optional bytes value = 6;
+ private com.google.protobuf.ByteString value_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * <code>optional bytes value = 6;</code>
+ */
+ public boolean hasValue() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional bytes value = 6;</code>
+ */
+ public com.google.protobuf.ByteString getValue() {
+ return value_;
+ }
+ /**
+ * <code>optional bytes value = 6;</code>
+ */
+ public Builder setValue(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000020;
+ value_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bytes value = 6;</code>
+ */
+ public Builder clearValue() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ value_ = getDefaultInstance().getValue();
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:RCProtos.LinkProperty)
+ }
+
+ static {
+ defaultInstance = new LinkProperty(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:RCProtos.LinkProperty)
+ }
+
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_ProtoBuffer_SwitchProperty_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_ProtoBuffer_SwitchProperty_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_ProtoBuffer_PortProperty_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_ProtoBuffer_PortProperty_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_ProtoBuffer_LinkProperty_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_ProtoBuffer_LinkProperty_fieldAccessorTable;
+
+ public static com.google.protobuf.Descriptors.FileDescriptor
+ getDescriptor() {
+ return descriptor;
+ }
+ private static com.google.protobuf.Descriptors.FileDescriptor
+ descriptor;
+ static {
+ java.lang.String[] descriptorData = {
+ "\n\027protobuf/ramcloud.proto\022\013ProtoBuffer\"=" +
+ "\n\016SwitchProperty\022\014\n\004dpid\030\001 \002(\003\022\016\n\006status" +
+ "\030\002 \002(\005\022\r\n\005value\030\003 \001(\014\"K\n\014PortProperty\022\014\n" +
+ "\004dpid\030\001 \002(\003\022\016\n\006number\030\002 \002(\003\022\016\n\006status\030\003 " +
+ "\002(\005\022\r\n\005value\030\004 \001(\014\"u\n\014LinkProperty\022\017\n\007sr" +
+ "cSwId\030\001 \002(\014\022\021\n\tsrcPortId\030\002 \002(\014\022\017\n\007dstSwI" +
+ "d\030\003 \002(\014\022\021\n\tdstPortId\030\004 \002(\014\022\016\n\006status\030\005 \002" +
+ "(\005\022\r\n\005value\030\006 \001(\014B&\n\027net.onrc.onos.datas" +
+ "toreB\013ProtoBuffer"
+ };
+ com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+ new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+ public com.google.protobuf.ExtensionRegistry assignDescriptors(
+ com.google.protobuf.Descriptors.FileDescriptor root) {
+ descriptor = root;
+ internal_static_ProtoBuffer_SwitchProperty_descriptor =
+ getDescriptor().getMessageTypes().get(0);
+ internal_static_ProtoBuffer_SwitchProperty_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_ProtoBuffer_SwitchProperty_descriptor,
+ new java.lang.String[] { "Dpid", "Status", "Value", });
+ internal_static_ProtoBuffer_PortProperty_descriptor =
+ getDescriptor().getMessageTypes().get(1);
+ internal_static_ProtoBuffer_PortProperty_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_ProtoBuffer_PortProperty_descriptor,
+ new java.lang.String[] { "Dpid", "Number", "Status", "Value", });
+ internal_static_ProtoBuffer_LinkProperty_descriptor =
+ getDescriptor().getMessageTypes().get(2);
+ internal_static_ProtoBuffer_LinkProperty_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_ProtoBuffer_LinkProperty_descriptor,
+ new java.lang.String[] { "SrcSwId", "SrcPortId", "DstSwId", "DstPortId", "Status", "Value", });
+ return null;
+ }
+ };
+ com.google.protobuf.Descriptors.FileDescriptor
+ .internalBuildGeneratedFileFrom(descriptorData,
+ new com.google.protobuf.Descriptors.FileDescriptor[] {
+ }, assigner);
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/RejectRulesException.java b/src/main/java/net/onrc/onos/core/datastore/RejectRulesException.java
new file mode 100644
index 0000000..351d1a1
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/RejectRulesException.java
@@ -0,0 +1,24 @@
+package net.onrc.onos.core.datastore;
+
+//
+// Not sure if we really need this base class.
+// Just copied hierarchy from RAMCloud.
+//
+/**
+ * Base exception class for conditional write, etc. failure.
+ */
+public class RejectRulesException extends Exception {
+ private static final long serialVersionUID = -1444683012320423530L;
+
+ public RejectRulesException(final String message) {
+ super(message);
+ }
+
+ public RejectRulesException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public RejectRulesException(final Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/WrongVersionException.java b/src/main/java/net/onrc/onos/core/datastore/WrongVersionException.java
new file mode 100644
index 0000000..568f5e1
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/WrongVersionException.java
@@ -0,0 +1,31 @@
+package net.onrc.onos.core.datastore;
+
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+
+/**
+ * Exception thrown when conditional operation failed due to version mismatch.
+ */
+public class WrongVersionException extends RejectRulesException {
+ private static final long serialVersionUID = -1644202495890190823L;
+
+ public WrongVersionException(final String message) {
+ super(message);
+ }
+
+ public WrongVersionException(final IKVTableID tableID, final byte[] key,
+ final long expectedVersion, final Throwable cause) {
+ // It will be best if {@code cause} has actual version encountered, but
+ // doesn't currently.
+ super(ByteArrayUtil.toHexStringBuffer(key, ":") + " on table:"
+ + tableID + " was expected to be version:" + expectedVersion,
+ cause);
+ }
+
+ public WrongVersionException(final IKVTableID tableID, final byte[] key,
+ final long expectedVersion, final long encounteredVersion) {
+ super(ByteArrayUtil.toHexStringBuffer(key, ":") + " on table:"
+ + tableID + " was expected to be version:" + expectedVersion
+ + " but found:" + encounteredVersion);
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
new file mode 100644
index 0000000..b21ef8e
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
@@ -0,0 +1,345 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import java.io.FileNotFoundException;
+import java.util.Collection;
+import java.util.List;
+
+import net.onrc.onos.core.datastore.IKVClient;
+import net.onrc.onos.core.datastore.IKVTable;
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.IMultiEntryOperation;
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.datastore.ObjectExistsException;
+import net.onrc.onos.core.datastore.WrongVersionException;
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.core.datastore.IMultiEntryOperation.OPERATION;
+import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
+import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
+import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.client.HazelcastClient;
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.config.SerializationConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+
+public class HZClient implements IKVClient {
+ private static final Logger log = LoggerFactory.getLogger(HZClient.class);
+
+ static final long VERSION_NONEXISTENT = 0L;
+
+ private static final String MAP_PREFIX = "datastore://";
+
+ // make this path configurable
+ private static final String BASE_CONFIG_FILENAME = System.getProperty("net.onrc.onos.core.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
+ private static boolean useClientMode = Boolean.parseBoolean(System.getProperty("net.onrc.onos.core.datastore.hazelcast.clientMode", "true"));
+
+ // Note: xml configuration will overwrite this value if present
+ private static int backupCount = Integer.valueOf(System.getProperty("net.onrc.onos.core.datastore.hazelcast.backupCount", "3"));
+
+ private final HazelcastInstance hazelcastInstance;
+
+ private static final HZClient THE_INSTANCE = new HZClient();
+
+ public static HZClient getClient() {
+ return THE_INSTANCE;
+ }
+
+ private HZClient() {
+ hazelcastInstance = getHZinstance(BASE_CONFIG_FILENAME);
+ }
+
+ private static HazelcastInstance getHZinstance(final String hazelcastConfigFileName) {
+ Config baseHzConfig = null;
+ try {
+ baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
+ } catch (FileNotFoundException e) {
+ log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
+ throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName , e);
+ }
+
+ // use xml config if present, if not use System.property
+ MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
+ if (mapConfig != null) {
+ backupCount = mapConfig.getBackupCount();
+ }
+
+ HazelcastInstance instance = null;
+ if (useClientMode) {
+ log.info("Configuring Hazelcast datastore as Client mode");
+ ClientConfig clientConfig = new ClientConfig();
+ final int port = baseHzConfig.getNetworkConfig().getPort();
+
+ String server = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.server", "localhost");
+ clientConfig.addAddress(server + ":" + port);
+
+ // copy group config from base Hazelcast configuration
+ clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
+ clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
+
+ // TODO We probably need to figure out what else need to be
+ // derived from baseConfig
+
+ registerSerializer(clientConfig.getSerializationConfig());
+
+ log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
+
+ try {
+ instance = HazelcastClient.newHazelcastClient(clientConfig);
+ if (!instance.getCluster().getMembers().isEmpty()) {
+ log.debug("Members in cluster: " + instance.getCluster().getMembers());
+ return instance;
+ }
+ log.info("Failed to find cluster member, falling back to Instance mode");
+ } catch (IllegalStateException e) {
+ log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
+ }
+ useClientMode = false;
+ instance = null;
+ }
+ log.info("Configuring Hazelcast datastore as Instance mode");
+
+ // To run 2 Hazelcast instance in 1 JVM,
+ // we probably need to something like below
+ //int port = hazelcastConfig.getNetworkConfig().getPort();
+ //hazelcastConfig.getNetworkConfig().setPort(port+1);
+
+ registerSerializer(baseHzConfig.getSerializationConfig());
+
+ return Hazelcast.newHazelcastInstance(baseHzConfig);
+ }
+
+ /**
+ * Register serializer for VersionedValue class used to imitate value version.
+ * @param config
+ */
+ private static void registerSerializer(final SerializationConfig config) {
+ config.addDataSerializableFactoryClass(
+ VersionedValueSerializableFactory.FACTORY_ID,
+ VersionedValueSerializableFactory.class);
+ }
+
+ @Override
+ public IKVTable getTable(final String tableName) {
+ IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
+
+ if (!useClientMode) {
+ // config only available in Instance Mode
+ // Client Mode must rely on hazelcast.xml to be properly configured.
+ MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
+ // config for this map to be strong consistent
+ if (config.isReadBackupData()) {
+ config.setReadBackupData(false);
+ }
+ if (config.isNearCacheEnabled()) {
+ config.getNearCacheConfig().setMaxSize(0);
+ }
+
+ if (config.getBackupCount() != backupCount) {
+ config.setAsyncBackupCount(0);
+ config.setBackupCount(backupCount);
+ }
+ }
+
+ return new HZTable(tableName, map);
+ }
+
+ @Override
+ public void dropTable(final IKVTable table) {
+ ((HZTable) table).getBackendMap().clear();
+ }
+
+ @Override
+ public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
+ throws ObjectExistsException {
+ IKVTable table = (IKVTable) tableId;
+ return table.create(key, value);
+ }
+
+ @Override
+ public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
+ IKVTable table = (IKVTable) tableId;
+ return table.forceCreate(key, value);
+ }
+
+ @Override
+ public IKVEntry read(final IKVTableID tableId, final byte[] key)
+ throws ObjectDoesntExistException {
+ IKVTable table = (IKVTable) tableId;
+ return table.read(key);
+ }
+
+ @Override
+ public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
+ final long version) throws ObjectDoesntExistException,
+ WrongVersionException {
+ IKVTable table = (IKVTable) tableId;
+ return table.update(key, value, version);
+ }
+
+ @Override
+ public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
+ throws ObjectDoesntExistException {
+ IKVTable table = (IKVTable) tableId;
+ return table.update(key, value);
+ }
+
+ @Override
+ public long delete(final IKVTableID tableId, final byte[] key, final long version)
+ throws ObjectDoesntExistException, WrongVersionException {
+ IKVTable table = (IKVTable) tableId;
+ return table.delete(key, version);
+ }
+
+ @Override
+ public long forceDelete(final IKVTableID tableId, final byte[] key) {
+ IKVTable table = (IKVTable) tableId;
+ return table.forceDelete(key);
+ }
+
+ @Override
+ public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
+ IKVTable table = (IKVTable) tableId;
+ return table.getAllEntries();
+ }
+
+ @Override
+ public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
+ final byte[] value) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
+ }
+
+ @Override
+ public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
+ final byte[] value) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
+ }
+
+ @Override
+ public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
+ }
+
+ @Override
+ public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
+ final byte[] value, final long version) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
+ }
+
+ @Override
+ public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
+ final byte[] value, final long version) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
+ }
+
+ @Override
+ public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
+ }
+
+ @Override
+ public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
+ boolean failExists = false;
+ for (IMultiEntryOperation op : ops) {
+ HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
+ switch (mop.getOperation()) {
+ case DELETE:
+ try {
+ final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ log.error(mop + " failed.", e);
+ mop.setStatus(STATUS.FAILED);
+ failExists = true;
+ }
+ break;
+ case FORCE_DELETE:
+ final long version = forceDelete(mop.getTableId(), mop.getKey());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ break;
+ default:
+ throw new UnsupportedOperationException(mop.toString());
+ }
+ }
+ return failExists;
+ }
+
+ @Override
+ public boolean multiWrite(final List<IMultiEntryOperation> ops) {
+ // there may be room to batch to improve performance
+ boolean failExists = false;
+ for (IMultiEntryOperation op : ops) {
+ IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+ switch (mop.getOperation()) {
+ case CREATE:
+ try {
+ long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ } catch (ObjectExistsException e) {
+ log.error(mop + " failed.", e);
+ mop.setStatus(STATUS.FAILED);
+ failExists = true;
+ }
+ break;
+ case FORCE_CREATE:
+ {
+ final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ break;
+ }
+ case UPDATE:
+ try {
+ long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ log.error(mop + " failed.", e);
+ mop.setStatus(STATUS.FAILED);
+ failExists = true;
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(mop.toString());
+ }
+ }
+ return failExists;
+ }
+
+ @Override
+ public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
+ boolean failExists = false;
+ for (IMultiEntryOperation op : ops) {
+ IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+ HZTable table = (HZTable) op.getTableId();
+ ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
+ }
+ for (IMultiEntryOperation op : ops) {
+ IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+ if (mop.hasSucceeded()) {
+ // status update is already done, nothing to do.
+ } else {
+ failExists = true;
+ }
+ }
+
+ return failExists;
+ }
+
+ @Override
+ public long VERSION_NONEXISTENT() {
+ return VERSION_NONEXISTENT;
+ }
+
+
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZMultiEntryOperation.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZMultiEntryOperation.java
new file mode 100644
index 0000000..a4842ef
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZMultiEntryOperation.java
@@ -0,0 +1,162 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.IMultiEntryOperation;
+import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
+import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+
+public class HZMultiEntryOperation implements IMultiEntryOperation, IModifiableMultiEntryOperation {
+ private static final Logger log = LoggerFactory.getLogger(HZMultiEntryOperation.class);
+
+ private final HZTable table;
+ private final byte[] key;
+ protected final OPERATION operation;
+ private STATUS status;
+
+ // for read op
+ private Future<VersionedValue> future;
+ // for write op
+ private VersionedValue writeValue;
+
+ /**
+ * Constructor for Read/ForceDelete Operation.
+ * @param table
+ * @param key
+ * @param operation
+ */
+ public HZMultiEntryOperation(final HZTable table, final byte[] key, final OPERATION operation) {
+ this.table = table;
+ this.key = key;
+ this.status = STATUS.NOT_EXECUTED;
+ this.operation = operation;
+
+ this.future = null;
+ this.writeValue = null;
+ }
+
+ /**
+ * Constructor for Other Operations.
+ * @param table
+ * @param key
+ * @param value
+ * @param version
+ * @param operation
+ */
+ public HZMultiEntryOperation(final HZTable table, final byte[] key, final byte[] value, final long version, final OPERATION operation) {
+ this.table = table;
+ this.key = key;
+ this.status = STATUS.NOT_EXECUTED;
+ this.operation = operation;
+
+ this.future = null;
+ this.writeValue = new VersionedValue(value, version);
+ }
+
+ @Override
+ public boolean hasSucceeded() {
+
+ VersionedValue value = get();
+ return (value != null) && (this.status == STATUS.SUCCESS);
+ }
+
+ @Override
+ public STATUS getStatus() {
+ get();
+ return status;
+ }
+
+ @Override
+ public IKVTableID getTableId() {
+ return table;
+ }
+
+ @Override
+ public byte[] getKey() {
+ return key;
+ }
+
+ @Override
+ public byte[] getValue() {
+ if (future != null) {
+ VersionedValue value = get();
+ return value.getValue();
+ }
+ return writeValue.getValue();
+ }
+
+ @Override
+ public long getVersion() {
+ if (future != null) {
+ VersionedValue value = get();
+ return value.getVersion();
+ }
+ return writeValue.getVersion();
+ }
+
+ @Override
+ public OPERATION getOperation() {
+ return operation;
+ }
+
+ /**
+ * Evaluate Future object and set Status and Value+Version.
+ * @return the value read or null on failure.
+ */
+ private VersionedValue get() {
+ try {
+ VersionedValue value = future.get();
+ setValue(value.getValue(), value.getVersion());
+ setStatus(STATUS.SUCCESS);
+ return value;
+ } catch (CancellationException | InterruptedException | ExecutionException e) {
+ log.error(this + " has failed.", e);
+ setStatus(STATUS.FAILED);
+ return null;
+ }
+ }
+
+ @Override
+ public void setValue(final byte[] value, final long version) {
+ writeValue = new VersionedValue(value, version);
+ setVersion(version);
+ }
+
+ @Override
+ public void setVersion(final long version) {
+ if (future != null) {
+ // no-op on read
+ }
+ if (writeValue == null) {
+ writeValue = new VersionedValue(null, version);
+ }
+ }
+
+ @Override
+ public void setStatus(final STATUS status) {
+ this.status = status;
+ }
+
+ @Override
+ public IModifiableMultiEntryOperation getActualOperation() {
+ return this;
+ }
+
+ void setFuture(final Future<VersionedValue> future) {
+ this.future = future;
+ }
+
+ @Override
+ public String toString() {
+ return "[HZMultiEntryOperation table=" + table + ", key="
+ + ByteArrayUtil.toHexStringBuffer(key, ":") + ", operation=" + operation
+ + ", status=" + status + ", writeValue=" + writeValue + "]";
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZTable.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZTable.java
new file mode 100644
index 0000000..f071ecc
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZTable.java
@@ -0,0 +1,325 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import net.onrc.onos.core.datastore.IKVTable;
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.datastore.ObjectExistsException;
+import net.onrc.onos.core.datastore.WrongVersionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.core.IMap;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class HZTable implements IKVTable, IKVTableID {
+ @SuppressWarnings("unused")
+ private static final Logger log = LoggerFactory.getLogger(HZTable.class);
+
+ // not sure how strict this should be managed
+ private static final AtomicLong initialVersion = new AtomicLong(HZClient.VERSION_NONEXISTENT);
+
+ /**
+ * generate a new initial version for an entry.
+ * @return initial value
+ */
+ protected static long getInitialVersion() {
+ long version = initialVersion.incrementAndGet();
+ if (version == HZClient.VERSION_NONEXISTENT) {
+ // used up whole 64bit space?
+ version = initialVersion.incrementAndGet();
+ }
+ return version;
+ }
+
+ /**
+ * increment version, avoiding VERSION_NONEXISTENT.
+ * @param version
+ * @return
+ */
+ protected static long getNextVersion(final long version) {
+ long nextVersion = version + 1;
+ if (nextVersion == HZClient.VERSION_NONEXISTENT) {
+ ++nextVersion;
+ }
+ return nextVersion;
+ }
+
+ static class VersionedValue implements IdentifiedDataSerializable {
+ private static final long serialVersionUID = -3149375966890712708L;
+
+ private byte[] value;
+ private long version;
+
+ protected VersionedValue() {
+ value = new byte[0];
+ version = HZClient.VERSION_NONEXISTENT;
+ }
+
+ public VersionedValue(final byte[] value, final long version) {
+ this.value = value;
+ this.version = version;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+
+ public void setValue(final byte[] value) {
+ this.value = value;
+ }
+
+ public void setNextVersion() {
+ this.version = getNextVersion(this.version);
+ }
+
+ @Override
+ public void writeData(final ObjectDataOutput out) throws IOException {
+ out.writeLong(version);
+ out.writeInt(value.length);
+ if (value.length > 0) {
+ out.write(value);
+ }
+ }
+
+ @Override
+ public void readData(final ObjectDataInput in) throws IOException {
+ version = in.readLong();
+ final int valueLen = in.readInt();
+ value = new byte[valueLen];
+ in.readFully(value);
+ }
+
+ @Override
+ public int getFactoryId() {
+ return VersionedValueSerializableFactory.FACTORY_ID;
+ }
+
+ @Override
+ public int getId() {
+ return VersionedValueSerializableFactory.VERSIONED_VALUE_ID;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (version ^ (version >>> 32));
+ result = prime * result + Arrays.hashCode(value);
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ VersionedValue other = (VersionedValue) obj;
+ if (version != other.version) {
+ return false;
+ }
+ if (!Arrays.equals(value, other.value)) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ // TODO Refactor and extract common parts
+ public static class Entry implements IKVEntry {
+ final byte[] key;
+ byte[] value;
+ long version;
+
+ public Entry(final byte[] key, final byte[] value, final long version) {
+ this.key = key;
+ this.setValue(value);
+ this.setVersion(version);
+ }
+
+ public Entry(final byte[] key) {
+ this(key, null, HZClient.VERSION_NONEXISTENT);
+ }
+
+ @Override
+ public byte[] getKey() {
+ return key;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return value;
+ }
+
+ @Override
+ public long getVersion() {
+ return version;
+ }
+
+ void setValue(final byte[] value) {
+ this.value = value;
+ }
+
+ void setVersion(final long version) {
+ this.version = version;
+ }
+ }
+
+
+
+ private final String mapName;
+ private final IMap<byte[], VersionedValue> map;
+
+ public HZTable(final String mapName, final IMap<byte[], VersionedValue> map) {
+ this.mapName = mapName;
+ this.map = map;
+ }
+
+ @Override
+ public String getTableName() {
+ return mapName;
+ }
+
+ @Override
+ public IKVTableID getTableId() {
+ return this;
+ }
+
+ @Override
+ public long create(final byte[] key, final byte[] value) throws ObjectExistsException {
+ final long version = getInitialVersion();
+ VersionedValue existing = map.putIfAbsent(key, new VersionedValue(value, version));
+ if (existing != null) {
+ throw new ObjectExistsException(this, key);
+ }
+ return version;
+ }
+
+ @Override
+ public long forceCreate(final byte[] key, final byte[] value) {
+ final long version = getInitialVersion();
+ map.set(key, new VersionedValue(value, version));
+ return version;
+ }
+
+ @Override
+ public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
+ final VersionedValue value = map.get(key);
+ if (value == null) {
+ throw new ObjectDoesntExistException(this, key);
+ }
+ return new Entry(key, value.getValue(), value.getVersion());
+ }
+
+ @Override
+ public long update(final byte[] key, final byte[] value, final long version)
+ throws ObjectDoesntExistException, WrongVersionException {
+
+ try {
+ map.lock(key);
+ final VersionedValue oldValue = map.get(key);
+ if (oldValue == null) {
+ throw new ObjectDoesntExistException(this, key);
+ }
+ if (oldValue.getVersion() != version) {
+ throw new WrongVersionException(this, key, version, oldValue.getVersion());
+ }
+ final long nextVersion = getNextVersion(version);
+ map.set(key, new VersionedValue(value, nextVersion));
+ return nextVersion;
+ } finally {
+ map.unlock(key);
+ }
+ }
+
+ @Override
+ public long update(final byte[] key, final byte[] value)
+ throws ObjectDoesntExistException {
+
+ try {
+ map.lock(key);
+ final VersionedValue valueInMap = map.get(key);
+ if (valueInMap == null) {
+ throw new ObjectDoesntExistException(this, key);
+ }
+ valueInMap.setValue(value);
+ valueInMap.setNextVersion();
+ map.set(key, valueInMap);
+ return valueInMap.getVersion();
+ } finally {
+ map.unlock(key);
+ }
+ }
+
+ @Override
+ public long delete(final byte[] key, final long version)
+ throws ObjectDoesntExistException, WrongVersionException {
+
+ try {
+ map.lock(key);
+ final VersionedValue oldValue = map.get(key);
+ if (oldValue == null) {
+ throw new ObjectDoesntExistException(this, key);
+ }
+ if (oldValue.getVersion() != version) {
+ throw new WrongVersionException(this, key, version, oldValue.getVersion());
+ }
+ map.delete(key);
+ return oldValue.getVersion();
+ } finally {
+ map.unlock(key);
+ }
+ }
+
+ @Override
+ public long forceDelete(final byte[] key) {
+ final VersionedValue valueInMap = map.remove(key);
+ if (valueInMap == null) {
+ return HZClient.VERSION_NONEXISTENT;
+ }
+ return valueInMap.getVersion();
+ }
+
+ @Override
+ public Iterable<IKVEntry> getAllEntries() {
+ final Set<IMap.Entry<byte[], VersionedValue>> entries = map.entrySet();
+ List<IKVEntry> entryList = new ArrayList<IKVTable.IKVEntry>(entries.size());
+ for (IMap.Entry<byte[], VersionedValue> entry : entries) {
+ entryList.add(new Entry(entry.getKey(), entry.getValue().getValue(), entry.getValue().getVersion()));
+ }
+ return entryList;
+ }
+
+ @Override
+ public String toString() {
+ return "[HZTable " + mapName + "]";
+ }
+
+ IMap<byte[], VersionedValue> getBackendMap() {
+ return this.map;
+ }
+
+ @Override
+ public long VERSION_NONEXISTENT() {
+ return HZClient.VERSION_NONEXISTENT;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/VersionedValueSerializableFactory.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/VersionedValueSerializableFactory.java
new file mode 100644
index 0000000..7e77be7
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/VersionedValueSerializableFactory.java
@@ -0,0 +1,24 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import com.hazelcast.nio.serialization.DataSerializableFactory;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class VersionedValueSerializableFactory implements
+ DataSerializableFactory {
+ // revisit these magic numbers
+ public static final int FACTORY_ID = 1;
+
+ public static final int VERSIONED_VALUE_ID = 1;
+
+ @Override
+ public IdentifiedDataSerializable create(final int typeId) {
+ switch (typeId) {
+ case VERSIONED_VALUE_ID:
+ return new HZTable.VersionedValue();
+
+ default:
+ return null;
+ }
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/internal/IModifiableMultiEntryOperation.java b/src/main/java/net/onrc/onos/core/datastore/internal/IModifiableMultiEntryOperation.java
new file mode 100644
index 0000000..a607add
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/internal/IModifiableMultiEntryOperation.java
@@ -0,0 +1,44 @@
+package net.onrc.onos.core.datastore.internal;
+
+import net.onrc.onos.core.datastore.IMultiEntryOperation;
+
+/**
+ * Interface for backend to realize IMultiEntryOperation.
+ *
+ * Backend implementation must use these interfaces to update IMultiEntryOperation
+ * in order to support KVObject.
+ */
+public interface IModifiableMultiEntryOperation extends IMultiEntryOperation {
+
+ /**
+ * Set value and version.
+ *
+ * Expected to be called on multiRead implementations.
+ * @param value
+ * @param version
+ */
+ public void setValue(final byte[] value, final long version);
+
+ /**
+ * Update version of the value.
+ *
+ * Expected to be called on multiWrite, multiRead implementations.
+ * @param version
+ */
+ public void setVersion(long version);
+
+ /**
+ * Update status.
+ *
+ * Backend implementation is expected to update to SUCCESS or FAILED after
+ * datastore operation.
+ * @param status
+ */
+ public void setStatus(STATUS status);
+
+ /**
+ * Return actual IModifiableMultiEntryOperation if is a wrapper, this otherwise.
+ * @return actual IModifiableMultiEntryOperation directly interact with data store
+ */
+ public IModifiableMultiEntryOperation getActualOperation();
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java
new file mode 100644
index 0000000..18ef642
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java
@@ -0,0 +1,572 @@
+package net.onrc.onos.core.datastore.ramcloud;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import net.onrc.onos.core.datastore.IKVClient;
+import net.onrc.onos.core.datastore.IKVTable;
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.IMultiEntryOperation;
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.datastore.ObjectExistsException;
+import net.onrc.onos.core.datastore.WrongVersionException;
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
+import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
+import net.onrc.onos.core.datastore.ramcloud.RCTable.Entry;
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.stanford.ramcloud.JRamCloud;
+import edu.stanford.ramcloud.JRamCloud.MultiReadObject;
+import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
+import edu.stanford.ramcloud.JRamCloud.MultiWriteRspObject;
+import edu.stanford.ramcloud.JRamCloud.RejectRules;
+import edu.stanford.ramcloud.JRamCloud.RejectRulesException;
+import edu.stanford.ramcloud.JRamCloud.TableEnumerator2;
+
+public class RCClient implements IKVClient {
+ private static final Logger log = LoggerFactory.getLogger(RCClient.class);
+
+ private static final String DB_CONFIG_FILE = "conf/ramcloud.conf";
+ public static final Configuration config = getConfiguration();
+
+ // Value taken from RAMCloud's Status.h
+ // FIXME These constants should be defined by JRamCloud
+ public static final int STATUS_OK = 0;
+
+ // FIXME come up with a proper way to retrieve configuration
+ public static final int MAX_MULTI_READS = Math.max(1, Integer
+ .valueOf(System.getProperty("ramcloud.max_multi_reads", "400")));
+
+ public static final int MAX_MULTI_WRITES = Math.max(1, Integer
+ .valueOf(System.getProperty("ramcloud.max_multi_writes", "800")));
+
+ private static final ThreadLocal<JRamCloud> tlsRCClient = new ThreadLocal<JRamCloud>() {
+ @Override
+ protected JRamCloud initialValue() {
+ return new JRamCloud(getCoordinatorUrl(config));
+ }
+ };
+
+ /**
+ * @return JRamCloud instance intended to be used only within the
+ * SameThread.
+ * @note Do not store the returned instance in a member variable, etc. which
+ * may be accessed later by another thread.
+ */
+ static JRamCloud getJRamCloudClient() {
+ return tlsRCClient.get();
+ }
+
+ // Currently RCClient is state-less
+ private static final RCClient theInstance= new RCClient();
+
+ public static RCClient getClient() {
+ return theInstance;
+ }
+
+ public static final Configuration getConfiguration() {
+ final File configFile = new File(System.getProperty("ramcloud.config.path", DB_CONFIG_FILE));
+ return getConfiguration(configFile);
+ }
+
+ public static final Configuration getConfiguration(final File configFile) {
+ if (configFile == null) {
+ throw new IllegalArgumentException("Need to specify a configuration file or storage directory");
+ }
+
+ if (!configFile.isFile()) {
+ throw new IllegalArgumentException("Location of configuration must be a file");
+ }
+
+ try {
+ return new PropertiesConfiguration(configFile);
+ } catch (ConfigurationException e) {
+ throw new IllegalArgumentException("Could not load configuration at: " + configFile, e);
+ }
+ }
+
+ public static String getCoordinatorUrl(final Configuration configuration) {
+ final String coordinatorIp = configuration.getString("ramcloud.coordinatorIp", "fast+udp:host=127.0.0.1");
+ final String coordinatorPort = configuration.getString("ramcloud.coordinatorPort", "port=12246");
+ final String coordinatorURL = coordinatorIp + "," + coordinatorPort;
+ return coordinatorURL;
+ }
+
+ @Override
+ public IMultiEntryOperation createOp(IKVTableID tableId, byte[] key, byte[] value) {
+ return RCMultiEntryOperation.create(tableId, key, value);
+ }
+
+ /**
+ * @param tableId RCTableID instance
+ */
+ @Override
+ public long create(IKVTableID tableId, byte[] key, byte[] value)
+ throws ObjectExistsException {
+
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ RejectRules rules = new RejectRules();
+ rules.rejectIfExists();
+
+ try {
+ return rcClient.write(rcTableId.getTableID(), key, value, rules);
+ } catch (JRamCloud.ObjectExistsException e) {
+ throw new ObjectExistsException(rcTableId, key, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return JRamCloud.VERSION_NONEXISTENT;
+ }
+ }
+
+ @Override
+ public IMultiEntryOperation forceCreateOp(IKVTableID tableId, byte[] key, byte[] value) {
+ return RCMultiEntryOperation.forceCreate(tableId, key, value);
+ }
+
+ @Override
+ public long forceCreate(IKVTableID tableId, byte[] key, byte[] value) {
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ long updatedVersion = rcClient.write(rcTableId.getTableID(), key, value);
+ return updatedVersion;
+ }
+
+ @Override
+ public IMultiEntryOperation readOp(IKVTableID tableId, byte[] key) {
+ return RCMultiEntryOperation.read(tableId, key);
+ }
+
+ @Override
+ public IKVEntry read(IKVTableID tableId, byte[] key)
+ throws ObjectDoesntExistException {
+
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
+ try {
+ JRamCloud.Object rcObj = rcClient.read(rcTableId.getTableID(), key, rules);
+ return new Entry(rcObj.key, rcObj.value, rcObj.version);
+ } catch (JRamCloud.ObjectDoesntExistException e) {
+ throw new ObjectDoesntExistException(rcTableId, key, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return null;
+ }
+ }
+
+ @Override
+ public IMultiEntryOperation updateOp(IKVTableID tableId, byte[] key, byte[] value,long version) {
+ return RCMultiEntryOperation.update(tableId, key, value, version);
+ }
+
+ @Override
+ public long update(IKVTableID tableId, byte[] key, byte[] value,
+ long version) throws ObjectDoesntExistException,
+ WrongVersionException {
+
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
+ rules.rejectIfNeVersion(version);
+
+ try {
+ return rcClient.write(rcTableId.getTableID(), key, value, rules);
+ } catch (JRamCloud.ObjectDoesntExistException e) {
+ throw new ObjectDoesntExistException(rcTableId, key, e);
+ } catch (JRamCloud.WrongVersionException e) {
+ throw new WrongVersionException(rcTableId, key, version, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return JRamCloud.VERSION_NONEXISTENT;
+ }
+ }
+
+
+ @Override
+ public long update(IKVTableID tableId, byte[] key, byte[] value)
+ throws ObjectDoesntExistException {
+
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
+
+ try {
+ return rcClient.write(rcTableId.getTableID(), key, value, rules);
+ } catch (JRamCloud.ObjectDoesntExistException e) {
+ throw new ObjectDoesntExistException(rcTableId, key, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return JRamCloud.VERSION_NONEXISTENT;
+ }
+ }
+
+ @Override
+ public IMultiEntryOperation deleteOp(IKVTableID tableId, byte[] key, byte[] value,long version) {
+ return RCMultiEntryOperation.delete(tableId, key, value, version);
+ }
+
+ @Override
+ public long delete(IKVTableID tableId, byte[] key, long version)
+ throws ObjectDoesntExistException, WrongVersionException {
+
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
+ rules.rejectIfNeVersion(version);
+
+ try {
+ return rcClient.remove(rcTableId.getTableID(), key, rules);
+ } catch (JRamCloud.ObjectDoesntExistException e) {
+ throw new ObjectDoesntExistException(rcTableId, key, e);
+ } catch (JRamCloud.WrongVersionException e) {
+ throw new WrongVersionException(rcTableId, key, version, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return JRamCloud.VERSION_NONEXISTENT;
+ }
+ }
+
+ @Override
+ public IMultiEntryOperation forceDeleteOp(IKVTableID tableId, byte[] key) {
+ return RCMultiEntryOperation.forceDelete(tableId, key);
+ }
+
+ @Override
+ public long forceDelete(IKVTableID tableId, byte[] key) {
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+ final long removedVersion = rcClient.remove(rcTableId.getTableID(), key);
+ return removedVersion;
+ }
+
+ @Override
+ public Iterable<IKVEntry> getAllEntries(IKVTableID tableId) {
+ return new RCTableEntryIterable((RCTableID) tableId);
+ }
+
+ static class RCTableEntryIterable implements Iterable<IKVEntry> {
+ private final RCTableID tableId;
+
+ public RCTableEntryIterable(final RCTableID tableId) {
+ this.tableId = tableId;
+ }
+
+ @Override
+ public Iterator<IKVEntry> iterator() {
+ return new RCClient.RCTableIterator(tableId);
+ }
+ }
+
+ public static class RCTableIterator implements Iterator<IKVEntry> {
+ private final RCTableID tableId;
+ protected final TableEnumerator2 enumerator;
+ private JRamCloud.Object last;
+
+ public RCTableIterator(final RCTableID tableId) {
+ this.tableId = tableId;
+ this.enumerator = getJRamCloudClient().new TableEnumerator2(tableId.getTableID());
+ this.last = null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.enumerator.hasNext();
+ }
+
+ @Override
+ public RCTable.Entry next() {
+ last = enumerator.next();
+ return new RCTable.Entry(last.key, last.value, last.version);
+ }
+
+ @Override
+ public void remove() {
+ if (last != null) {
+ getJRamCloudClient();
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ RejectRules rules = new RejectRules();
+ rules.rejectIfNeVersion(last.version);
+ try {
+ rcClient.remove(tableId.getTableID(), last.key, rules);
+ } catch (RejectRulesException e) {
+ log.trace("remove failed", e);
+ }
+ last = null;
+ }
+ }
+ }
+
+ @Override
+ public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
+
+ if ( ops.size() <= MAX_MULTI_READS && ops instanceof ArrayList) {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final ArrayList<RCMultiEntryOperation> arrays = (ArrayList)ops;
+ return multiReadInternal(arrays);
+ }
+
+ boolean failExists = false;
+
+ ArrayList<RCMultiEntryOperation> req = new ArrayList<>();
+ Iterator<IMultiEntryOperation> it = ops.iterator();
+ while (it.hasNext()) {
+
+ req.add((RCMultiEntryOperation) it.next());
+
+ if (req.size() >= MAX_MULTI_READS) {
+ // dispatch multiRead
+ failExists |= multiReadInternal(req);
+ req.clear();
+ }
+ }
+
+ if (!req.isEmpty()) {
+ // dispatch multiRead
+ failExists |= multiReadInternal(req);
+ req.clear();
+ }
+
+ return failExists;
+ }
+
+ @Override
+ public boolean multiWrite(final List<IMultiEntryOperation> ops) {
+
+ if ( ops.size() <= MAX_MULTI_WRITES && ops instanceof ArrayList) {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final ArrayList<RCMultiEntryOperation> arrays = (ArrayList)ops;
+ return multiWriteInternal(arrays);
+ }
+
+ boolean failExists = false;
+
+ ArrayList<RCMultiEntryOperation> req = new ArrayList<>();
+ Iterator<IMultiEntryOperation> it = ops.iterator();
+ while (it.hasNext()) {
+
+ req.add((RCMultiEntryOperation) it.next());
+
+ if (req.size() >= MAX_MULTI_WRITES) {
+ // dispatch multiWrite
+ failExists |= multiWriteInternal(req);
+ req.clear();
+ }
+ }
+
+ if (!req.isEmpty()) {
+ // dispatch multiWrite
+ failExists |= multiWriteInternal(req);
+ req.clear();
+ }
+
+ return failExists;
+ }
+
+ @Override
+ public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
+
+ // TODO implement multiRemove JNI, etc. if we need performance
+
+ boolean failExists = false;
+ JRamCloud rcClient = getJRamCloudClient();
+
+ for (IMultiEntryOperation iop : ops) {
+ RCMultiEntryOperation op = (RCMultiEntryOperation)iop;
+ switch (op.getOperation()) {
+ case DELETE:
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
+ rules.rejectIfNeVersion(op.getVersion());
+
+ try {
+ final long removedVersion = rcClient.remove(op.tableId.getTableID(), op.entry.getKey(), rules);
+ op.entry.setVersion(removedVersion);
+ op.status = STATUS.SUCCESS;
+ } catch (JRamCloud.ObjectDoesntExistException|JRamCloud.WrongVersionException e) {
+ log.error("Failed to remove key:" + ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), "") + " from tableID:" + op.tableId, e );
+ failExists = true;
+ op.status = STATUS.FAILED;
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Failed to remove key:" + ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), "") + " from tableID:" + op.tableId, e );
+ failExists = true;
+ op.status = STATUS.FAILED;
+ }
+ break;
+
+ case FORCE_DELETE:
+ final long removedVersion = rcClient.remove(op.tableId.getTableID(), op.entry.getKey());
+ if (removedVersion != VERSION_NONEXISTENT) {
+ op.entry.setVersion(removedVersion);
+ op.status = STATUS.SUCCESS;
+ } else {
+ log.error("Failed to remove key:{} from tableID:{}", ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), ""), op.tableId );
+ failExists = true;
+ op.status = STATUS.FAILED;
+ }
+ break;
+
+ default:
+ log.error("Invalid operation {} specified on multiDelete", op.getOperation() );
+ failExists = true;
+ op.status = STATUS.FAILED;
+ break;
+ }
+ }
+ return failExists;
+ }
+
+ private boolean multiReadInternal(final ArrayList<RCMultiEntryOperation> ops) {
+ boolean failExists = false;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ final int reqs = ops.size();
+
+ MultiReadObject multiReadObjects = new MultiReadObject(reqs);
+
+ // setup multi-read operation objects
+ for (int i = 0; i < reqs; ++i) {
+ IMultiEntryOperation op = ops.get(i);
+ multiReadObjects.setObject(i, ((RCTableID)op.getTableId()).getTableID(), op.getKey());
+ }
+
+ // execute
+ JRamCloud.Object[] results = rcClient.multiRead(multiReadObjects.tableId, multiReadObjects.key, multiReadObjects.keyLength, reqs);
+ if (results.length != reqs) {
+ log.error("multiRead returned unexpected number of results. (requested:{}, returned:{})", reqs, results.length);
+ failExists = true;
+ }
+
+ for (int i = 0; i < results.length; ++i) {
+ IModifiableMultiEntryOperation op = ops.get(i);
+ if (results[i] == null) {
+ log.error("MultiRead error, skipping {}, {}", op.getTableId(), op);
+ failExists = true;
+ op.setStatus(STATUS.FAILED);
+ continue;
+ }
+ assert (Arrays.equals(results[i].key, op.getKey()));
+
+ op.setValue(results[i].value, results[i].version);
+ if (results[i].version == JRamCloud.VERSION_NONEXISTENT) {
+ failExists = true;
+ op.setStatus(STATUS.FAILED);
+ } else {
+ op.setStatus(STATUS.SUCCESS);
+ }
+ }
+
+ return failExists;
+ }
+
+ private boolean multiWriteInternal(final ArrayList<RCMultiEntryOperation> ops) {
+ boolean failExists = false;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ final int reqs = ops.size();
+
+ MultiWriteObject multiWriteObjects = new MultiWriteObject(reqs);
+
+ for (int i = 0; i < reqs; ++i) {
+
+ IModifiableMultiEntryOperation op = ops.get(i);
+ RejectRules rules = new RejectRules();
+
+ switch (op.getOperation()) {
+ case CREATE:
+ rules.rejectIfExists();
+ break;
+ case FORCE_CREATE:
+ // no reject rule
+ break;
+ case UPDATE:
+ rules.rejectIfDoesntExists();
+ rules.rejectIfNeVersion(op.getVersion());
+ break;
+
+ default:
+ log.error("Invalid operation {} specified on multiWriteInternal", op.getOperation() );
+ failExists = true;
+ op.setStatus(STATUS.FAILED);
+ return failExists;
+ }
+ multiWriteObjects.setObject(i, ((RCTableID)op.getTableId()).getTableID(), op.getKey(), op.getValue(), rules);
+ }
+
+ MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects.tableId, multiWriteObjects.key, multiWriteObjects.keyLength, multiWriteObjects.value, multiWriteObjects.valueLength, ops.size(), multiWriteObjects.rules);
+ if (results.length != reqs) {
+ log.error("multiWrite returned unexpected number of results. (requested:{}, returned:{})", reqs, results.length);
+ failExists = true;
+ }
+
+ for (int i = 0; i < results.length; ++i) {
+ IModifiableMultiEntryOperation op = ops.get(i);
+
+ if (results[i] != null
+ && results[i].getStatus() == RCClient.STATUS_OK) {
+ op.setStatus(STATUS.SUCCESS);
+ op.setVersion(results[i].getVersion());
+ } else {
+ op.setStatus(STATUS.FAILED);
+ failExists = true;
+ }
+ }
+
+ return failExists;
+ }
+
+ private static final ConcurrentHashMap<String, RCTable> tables = new ConcurrentHashMap<>();
+
+ @Override
+ public IKVTable getTable(final String tableName) {
+ RCTable table = tables.get(tableName);
+ if (table == null) {
+ RCTable newTable = new RCTable(tableName);
+ RCTable existingTable = tables
+ .putIfAbsent(tableName, newTable);
+ if (existingTable != null) {
+ return existingTable;
+ } else {
+ return newTable;
+ }
+ }
+ return table;
+ }
+
+ @Override
+ public void dropTable(IKVTable table) {
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+ rcClient.dropTable(table.getTableId().getTableName());
+ tables.remove(table.getTableId().getTableName());
+ }
+
+ static final long VERSION_NONEXISTENT = JRamCloud.VERSION_NONEXISTENT;
+
+ @Override
+ public long VERSION_NONEXISTENT() {
+ return VERSION_NONEXISTENT;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCMultiEntryOperation.java b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCMultiEntryOperation.java
new file mode 100644
index 0000000..9b8c8b6
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCMultiEntryOperation.java
@@ -0,0 +1,115 @@
+package net.onrc.onos.core.datastore.ramcloud;
+
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.IMultiEntryOperation;
+import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
+import net.onrc.onos.core.datastore.ramcloud.RCTable.Entry;
+
+// FIXME move or extract this
+public class RCMultiEntryOperation implements IMultiEntryOperation, IModifiableMultiEntryOperation {
+ protected final RCTableID tableId;
+ protected final Entry entry;
+ protected final OPERATION operation;
+ protected STATUS status;
+
+ @Override
+ public boolean hasSucceeded() {
+ return this.status == STATUS.SUCCESS;
+ }
+
+ @Override
+ public STATUS getStatus() {
+ return status;
+ }
+
+ @Override
+ public IKVTableID getTableId() {
+ return tableId;
+ }
+
+ @Override
+ public byte[] getKey() {
+ return entry.key;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return entry.value;
+ }
+
+ @Override
+ public long getVersion() {
+ return entry.version;
+ }
+
+ @Override
+ public OPERATION getOperation() {
+ return operation;
+ }
+
+ @Override
+ public void setStatus(final STATUS status) {
+ this.status = status;
+ }
+
+ @Override
+ public void setValue(byte[] value, final long version) {
+ this.entry.setValue(value);
+ setVersion(version);
+ }
+
+ @Override
+ public void setVersion(final long version) {
+ this.entry.setVersion(version);
+ }
+
+
+ public RCMultiEntryOperation(final IKVTableID tableId, final Entry entry, final OPERATION operation) {
+ this.tableId = (RCTableID) tableId;
+ this.operation = operation;
+
+ this.entry = entry;
+ this.status = STATUS.NOT_EXECUTED;
+ }
+
+ public static IMultiEntryOperation create(final IKVTableID tableId, final byte[] key, final byte[] value) {
+ return new RCMultiEntryOperation(tableId, new Entry(key,value, RCClient.VERSION_NONEXISTENT), OPERATION.CREATE);
+ }
+
+ public static IMultiEntryOperation forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
+ return new RCMultiEntryOperation(tableId, new Entry(key,value, RCClient.VERSION_NONEXISTENT), OPERATION.FORCE_CREATE);
+ }
+
+ /**
+ * Constructor for READ operation.
+ *
+ * @param tableId table to read from
+ * @param key key of an Entry to read
+ */
+ public static IMultiEntryOperation read(final IKVTableID tableId, final byte[] key) {
+ return new RCMultiEntryOperation(tableId, new Entry(key), OPERATION.READ);
+ }
+
+ public static IMultiEntryOperation update(final IKVTableID tableId, final byte[] key, final byte[] value, final long version) {
+ return new RCMultiEntryOperation(tableId, new Entry(key,value, version), OPERATION.UPDATE);
+ }
+
+ public static IMultiEntryOperation delete(final IKVTableID tableId, final byte[] key, final byte[] value, final long version) {
+ return new RCMultiEntryOperation(tableId, new Entry(key,value, version), OPERATION.DELETE);
+ }
+
+ public static IMultiEntryOperation forceDelete(final IKVTableID tableId, final byte[] key) {
+ return new RCMultiEntryOperation(tableId, new Entry(key), OPERATION.FORCE_DELETE);
+ }
+
+ @Override
+ public IModifiableMultiEntryOperation getActualOperation() {
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "[RCMultiEntryOperation tableId=" + tableId + ", entry=" + entry
+ + ", operation=" + operation + ", status=" + status + "]";
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCTable.java b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCTable.java
new file mode 100644
index 0000000..098215e
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCTable.java
@@ -0,0 +1,134 @@
+package net.onrc.onos.core.datastore.ramcloud;
+
+import net.onrc.onos.core.datastore.IKVTable;
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.datastore.ObjectExistsException;
+import net.onrc.onos.core.datastore.WrongVersionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to represent a Table in RAMCloud
+ */
+public class RCTable implements IKVTable {
+ @SuppressWarnings("unused")
+ private static final Logger log = LoggerFactory.getLogger(RCTable.class);
+
+ public static class Entry implements IKVEntry {
+ final byte[] key;
+ byte[] value;
+ long version;
+
+ public Entry(final byte[] key, final byte[] value, final long version) {
+ this.key = key;
+ this.setValue(value);
+ this.setVersion(version);
+ }
+
+ public Entry(final byte[] key) {
+ this(key, null, RCClient.VERSION_NONEXISTENT);
+ }
+
+ @Override
+ public byte[] getKey() {
+ return key;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return value;
+ }
+
+ @Override
+ public long getVersion() {
+ return version;
+ }
+
+ void setValue(byte[] value) {
+ this.value = value;
+ }
+
+ void setVersion(long version) {
+ this.version = version;
+ }
+ }
+
+ private final RCTableID rcTableId;
+
+ /**
+ *
+ * {@code rcTableName} must be unique cluster wide.
+ * @param rcTableName RAMCloud table name
+ */
+ RCTable(final String rcTableName) {
+ this.rcTableId = new RCTableID(rcTableName);
+
+ // Trigger RAMCloud ID allocation. If lazy allocation is OK, remove.
+ this.rcTableId.getTableID();
+ }
+
+ @Override
+ public IKVTableID getTableId() {
+ return this.rcTableId;
+ }
+
+ public String getTableName() {
+ return this.rcTableId.getTableName();
+ }
+
+ @Override
+ public long create(final byte[] key, final byte[] value)
+ throws ObjectExistsException {
+
+ return RCClient.getClient().create(this.rcTableId, key, value);
+ }
+
+ @Override
+ public long forceCreate(final byte[] key, final byte[] value) {
+ return RCClient.getClient().forceCreate(rcTableId, key, value);
+ }
+
+ @Override
+ public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
+ return RCClient.getClient().read(rcTableId, key);
+ }
+
+ @Override
+ public long update(final byte[] key, final byte[] value, final long version)
+ throws ObjectDoesntExistException, WrongVersionException {
+
+ return RCClient.getClient().update(rcTableId, key, value, version);
+ }
+
+ @Override
+ public long update(final byte[] key, final byte[] value)
+ throws ObjectDoesntExistException {
+
+ return RCClient.getClient().update(rcTableId, key, value);
+ }
+
+ @Override
+ public long delete(final byte[] key, final long version)
+ throws ObjectDoesntExistException, WrongVersionException {
+
+ return RCClient.getClient().delete(rcTableId, key, version);
+ }
+
+ @Override
+ public long forceDelete(final byte[] key) {
+ return RCClient.getClient().forceDelete(rcTableId, key);
+ }
+
+ @Override
+ public Iterable<IKVEntry> getAllEntries() {
+ return RCClient.getClient().getAllEntries(this.getTableId());
+ }
+
+ @Override
+ public long VERSION_NONEXISTENT() {
+ return RCClient.VERSION_NONEXISTENT;
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCTableID.java b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCTableID.java
new file mode 100644
index 0000000..f69addc
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCTableID.java
@@ -0,0 +1,60 @@
+package net.onrc.onos.core.datastore.ramcloud;
+
+import java.util.Objects;
+
+import net.onrc.onos.core.datastore.IKVTableID;
+
+public class RCTableID implements IKVTableID {
+ private final String tableName;
+ private long tableID;
+
+ public RCTableID(String tableName) {
+ this.tableName = tableName;
+ this.tableID = 0;
+ }
+
+ @Override
+ public String getTableName() {
+ return tableName;
+ }
+
+ // following is RAMCloud specific
+
+ public long getTableID() {
+ if ( tableID != 0) {
+ return tableID;
+ }
+ tableID = RCClient.getJRamCloudClient().createTable(tableName);
+ return tableID;
+ }
+
+ void resetTableID() {
+ this.tableID = 0;
+ }
+
+ @Override
+ public String toString() {
+ return "["+tableName + "]@" + getTableID();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableName, getTableID());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ RCTableID other = (RCTableID) obj;
+ return Objects.equals(tableName, other.tableName)
+ && Objects.equals(getTableID(), other.getTableID());
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/net/onrc/onos/core/datastore/topology/KVDevice.java b/src/main/java/net/onrc/onos/core/datastore/topology/KVDevice.java
new file mode 100644
index 0000000..96411a9
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/topology/KVDevice.java
@@ -0,0 +1,222 @@
+package net.onrc.onos.core.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import net.onrc.onos.core.datastore.DataStoreClient;
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.core.datastore.topology.KVLink.STATUS;
+import net.onrc.onos.core.datastore.utils.ByteArrayComparator;
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+import net.onrc.onos.core.datastore.utils.KVObject;
+import net.onrc.onos.ofcontroller.networkgraph.DeviceEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+
+/**
+ * Device object.
+ *
+ * TODO switch to ProtoBuf, etc.
+ */
+public class KVDevice extends KVObject {
+ private static final Logger log = LoggerFactory.getLogger(KVDevice.class);
+
+ private static final ThreadLocal<Kryo> deviceKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
+ };
+
+ public static final String GLOBAL_DEVICE_TABLE_NAME = "G:Device";
+
+ // FIXME these should be Enum or some number, not String
+ private static final String PROP_MAC = "mac";
+ private static final String PROP_PORT_IDS = "port-ids";
+
+ private final byte[] mac;
+ private TreeSet<byte[]> portIds;
+ private transient boolean isPortIdsModified;
+
+ // Assume there is only one ip on a device now.
+ private int ip;
+ private long lastSeenTime;
+
+ // Assuming mac is unique cluster-wide
+ public static byte[] getDeviceID(final byte[] mac) {
+ return DeviceEvent.getDeviceID(mac).array();
+ }
+
+ public static byte[] getMacFromKey(final byte[] key) {
+ ByteBuffer keyBuf = ByteBuffer.wrap(key);
+ if (keyBuf.getChar() != 'D') {
+ throw new IllegalArgumentException("Invalid Device key");
+ }
+ byte[] mac = new byte[keyBuf.remaining()];
+ keyBuf.get(mac);
+ return mac;
+ }
+
+ public KVDevice(final byte[] mac) {
+ super(DataStoreClient.getClient().getTable(GLOBAL_DEVICE_TABLE_NAME), getDeviceID(mac));
+
+ this.mac = mac;
+ this.portIds = new TreeSet<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ this.isPortIdsModified = true;
+ }
+
+ /**
+ * Get an instance from Key.
+ *
+ * @note You need to call `read()` to get the DB content.
+ * @param key
+ * @return
+ */
+ public static KVDevice createFromKey(final byte[] key) {
+ return new KVDevice(getMacFromKey(key));
+ }
+
+ public static Iterable<KVDevice> getAllDevices() {
+ return new DeviceEnumerator();
+ }
+
+ public static class DeviceEnumerator implements Iterable<KVDevice> {
+
+ @Override
+ public Iterator<KVDevice> iterator() {
+ return new DeviceIterator();
+ }
+ }
+
+ public static class DeviceIterator extends AbstractObjectIterator<KVDevice> {
+
+ public DeviceIterator() {
+ super(DataStoreClient.getClient().getTable(GLOBAL_DEVICE_TABLE_NAME));
+ }
+
+ @Override
+ public KVDevice next() {
+ IKVEntry o = enumerator.next();
+ KVDevice e = KVDevice.createFromKey(o.getKey());
+ e.deserialize(o.getValue(), o.getVersion());
+ return e;
+ }
+ }
+
+ public byte[] getMac() {
+ // TODO may need to clone() to be sure this object will be immutable.
+ return mac;
+ }
+
+ public byte[] getId() {
+ return getKey();
+ }
+
+ public void addPortId(final byte[] portId) {
+ // TODO: Should we copy portId, or reference is OK.
+ isPortIdsModified |= portIds.add(portId);
+ }
+
+ public void removePortId(final byte[] portId) {
+ isPortIdsModified |= portIds.remove(portId);
+ }
+
+ public void emptyPortIds() {
+ portIds.clear();
+ this.isPortIdsModified = true;
+ }
+
+ public void addAllToPortIds(final Collection<byte[]> portIds) {
+ // TODO: Should we copy portId, or reference is OK.
+ isPortIdsModified |= this.portIds.addAll(portIds);
+ }
+
+ /**
+ *
+ * @return Unmodifiable Set view of all the PortIds;
+ */
+ public Set<byte[]> getAllPortIds() {
+ return Collections.unmodifiableSet(portIds);
+ }
+
+ @Override
+ public byte[] serialize() {
+ Map<Object, Object> map = getPropertyMap();
+
+ map.put(PROP_MAC, mac);
+ if (isPortIdsModified) {
+ byte[][] portIdArray = new byte[portIds.size()][];
+ map.put(PROP_PORT_IDS, portIds.toArray(portIdArray));
+ isPortIdsModified = false;
+ }
+
+ return serializePropertyMap(deviceKryo.get(), map);
+ }
+
+ @Override
+ protected boolean deserialize(final byte[] bytes) {
+ boolean success = deserializePropertyMap(deviceKryo.get(), bytes);
+ if (!success) {
+ log.error("Deserializing Link: " + this + " failed.");
+ return false;
+ }
+ Map<Object, Object> map = this.getPropertyMap();
+
+ if (this.portIds == null) {
+ this.portIds = new TreeSet<>(
+ ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ }
+ byte[][] portIdArray = (byte[][]) map.get(PROP_PORT_IDS);
+ if (portIdArray != null) {
+ this.portIds.clear();
+ this.portIds.addAll(Arrays.asList(portIdArray));
+ isPortIdsModified = false;
+ } else {
+ // trigger write on next serialize
+ isPortIdsModified = true;
+ }
+
+ return success;
+ }
+
+ @Override
+ public String toString() {
+ // TODO output all properties?
+ return "[" + this.getClass().getSimpleName()
+ + " " + ByteArrayUtil.toHexStringBuffer(mac, ":") + "]";
+ }
+
+ public int getIp() {
+ return ip;
+ }
+
+ public void setIp(int ip) {
+ this.ip = ip;
+ }
+
+ public long getLastSeenTime() {
+ return lastSeenTime;
+ }
+
+ public void setLastSeenTime(long lastSeenTime) {
+ this.lastSeenTime = lastSeenTime;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/topology/KVLink.java b/src/main/java/net/onrc/onos/core/datastore/topology/KVLink.java
new file mode 100644
index 0000000..971cb6d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/topology/KVLink.java
@@ -0,0 +1,217 @@
+package net.onrc.onos.core.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import net.onrc.onos.core.datastore.DataStoreClient;
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.core.datastore.RCProtos.LinkProperty;
+import net.onrc.onos.core.datastore.utils.KVObject;
+import net.onrc.onos.ofcontroller.networkgraph.LinkEvent;
+import net.onrc.onos.ofcontroller.networkgraph.PortEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Link object in data store.
+ */
+public class KVLink extends KVObject {
+ private static final Logger log = LoggerFactory.getLogger(KVLink.class);
+
+ private static final ThreadLocal<Kryo> linkKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
+ };
+
+ public static class SwitchPort {
+ public final Long dpid;
+ public final Long number;
+
+ public SwitchPort(final Long dpid, final Long number) {
+ this.dpid = dpid;
+ this.number = number;
+ }
+
+ public byte[] getPortID() {
+ return KVPort.getPortID(dpid, number);
+ }
+
+ public byte[] getSwitchID() {
+ return KVSwitch.getSwitchID(dpid);
+ }
+
+ @Override
+ public String toString() {
+ return "(" + Long.toHexString(dpid) + "@" + number + ")";
+ }
+
+ }
+
+ public static final String GLOBAL_LINK_TABLE_NAME = "G:Link";
+
+ // must not re-order enum members, ordinal will be sent over wire
+ public enum STATUS {
+ INACTIVE, ACTIVE;
+ }
+
+ private final SwitchPort src;
+ private final SwitchPort dst;
+ private STATUS status;
+
+ public static byte[] getLinkID(final Long src_dpid, final Long src_port_no,
+ final Long dst_dpid, final Long dst_port_no) {
+ return LinkEvent.getLinkID(src_dpid, src_port_no, dst_dpid,
+ dst_port_no).array();
+ }
+
+ public static long[] getLinkTupleFromKey(final byte[] key) {
+ return getLinkTupleFromKey(ByteBuffer.wrap(key));
+ }
+
+ public static long[] getLinkTupleFromKey(final ByteBuffer keyBuf) {
+ if (keyBuf.getChar() != 'L') {
+ throw new IllegalArgumentException("Invalid Link key");
+ }
+ final long[] srcPortPair = KVPort.getPortPairFromKey(keyBuf.slice());
+ keyBuf.position(2 + PortEvent.PORTID_BYTES);
+ final long[] dstPortPair = KVPort.getPortPairFromKey(keyBuf.slice());
+
+ long[] tuple = new long[4];
+ tuple[0] = srcPortPair[0];
+ tuple[1] = srcPortPair[1];
+ tuple[2] = dstPortPair[0];
+ tuple[3] = dstPortPair[1];
+
+ return tuple;
+ }
+
+ public KVLink(final Long src_dpid, final Long src_port_no,
+ final Long dst_dpid, final Long dst_port_no) {
+ super(DataStoreClient.getClient().getTable(GLOBAL_LINK_TABLE_NAME), getLinkID(src_dpid,
+ src_port_no, dst_dpid, dst_port_no));
+
+ src = new SwitchPort(src_dpid, src_port_no);
+ dst = new SwitchPort(dst_dpid, dst_port_no);
+ status = STATUS.INACTIVE;
+ }
+
+ /**
+ * Get an instance from Key.
+ *
+ * @note You need to call `read()` to get the DB content.
+ * @param key
+ * @return KVLink instance
+ */
+ public static KVLink createFromKey(final byte[] key) {
+ long[] linkTuple = getLinkTupleFromKey(key);
+ return new KVLink(linkTuple[0], linkTuple[1], linkTuple[2],
+ linkTuple[3]);
+ }
+
+ public static Iterable<KVLink> getAllLinks() {
+ return new LinkEnumerator();
+ }
+
+ public static class LinkEnumerator implements Iterable<KVLink> {
+
+ @Override
+ public Iterator<KVLink> iterator() {
+ return new LinkIterator();
+ }
+ }
+
+ public static class LinkIterator extends AbstractObjectIterator<KVLink> {
+
+ public LinkIterator() {
+ super(DataStoreClient.getClient().getTable(GLOBAL_LINK_TABLE_NAME));
+ }
+
+ @Override
+ public KVLink next() {
+ IKVEntry o = enumerator.next();
+ KVLink e = KVLink.createFromKey(o.getKey());
+ e.deserialize(o.getValue(), o.getVersion());
+ return e;
+ }
+ }
+
+ public STATUS getStatus() {
+ return status;
+ }
+
+ public void setStatus(final STATUS status) {
+ this.status = status;
+ }
+
+ public SwitchPort getSrc() {
+ return src;
+ }
+
+ public SwitchPort getDst() {
+ return dst;
+ }
+
+ public byte[] getId() {
+ return getKey();
+ }
+
+ @Override
+ public byte[] serialize() {
+ Map<Object, Object> map = getPropertyMap();
+
+ LinkProperty.Builder link = LinkProperty.newBuilder();
+ link.setSrcSwId(ByteString.copyFrom(src.getSwitchID()));
+ link.setSrcPortId(ByteString.copyFrom(src.getPortID()));
+ link.setDstSwId(ByteString.copyFrom(dst.getSwitchID()));
+ link.setDstPortId(ByteString.copyFrom(dst.getPortID()));
+ link.setStatus(status.ordinal());
+
+ if (!map.isEmpty()) {
+ byte[] propMaps = serializePropertyMap(linkKryo.get(), map);
+ link.setValue(ByteString.copyFrom(propMaps));
+ }
+
+ return link.build().toByteArray();
+ }
+
+ @Override
+ protected boolean deserialize(final byte[] bytes) {
+ try {
+ boolean success = true;
+
+ LinkProperty link = LinkProperty.parseFrom(bytes);
+ byte[] props = link.getValue().toByteArray();
+ success &= deserializePropertyMap(linkKryo.get(), props);
+ this.status = STATUS.values()[link.getStatus()];
+
+ return success;
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Deserializing Link: " + this + " failed.", e);
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ // TODO output all properties?
+ return "[" + this.getClass().getSimpleName()
+ + " " + src + "->" + dst + " STATUS:" + status + "]";
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/topology/KVPort.java b/src/main/java/net/onrc/onos/core/datastore/topology/KVPort.java
new file mode 100644
index 0000000..9ab2d2a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/topology/KVPort.java
@@ -0,0 +1,205 @@
+package net.onrc.onos.core.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import net.onrc.onos.core.datastore.DataStoreClient;
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.core.datastore.RCProtos.PortProperty;
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+import net.onrc.onos.core.datastore.utils.KVObject;
+import net.onrc.onos.ofcontroller.networkgraph.PortEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Port object in data store.
+ *
+ * Note: This class will not maintain invariants.
+ * e.g., It will NOT automatically remove Links or Devices on Port,
+ * when deleting a Port.
+ */
+public class KVPort extends KVObject {
+ private static final Logger log = LoggerFactory.getLogger(KVPort.class);
+
+ private static final ThreadLocal<Kryo> portKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
+ };
+
+ public static final String GLOBAL_PORT_TABLE_NAME = "G:Port";
+
+ // must not re-order enum members, ordinal will be sent over wire
+ public enum STATUS {
+ INACTIVE, ACTIVE;
+ }
+
+ private final Long dpid;
+ private final Long number;
+
+ private STATUS status;
+
+ public static byte[] getPortID(final Long dpid, final Long number) {
+ return PortEvent.getPortID(dpid, number).array();
+ }
+
+ public static long[] getPortPairFromKey(final byte[] key) {
+ return getPortPairFromKey(ByteBuffer.wrap(key));
+ }
+
+ public static long[] getPortPairFromKey(final ByteBuffer keyBuf) {
+ if (keyBuf.getChar() != 'S') {
+ throw new IllegalArgumentException("Invalid Port key:" + keyBuf
+ + " "
+ + ByteArrayUtil.toHexStringBuffer(keyBuf.array(), ":"));
+ }
+ long[] pair = new long[2];
+ pair[0] = keyBuf.getLong();
+ if (keyBuf.getChar() != 'P') {
+ throw new IllegalArgumentException("Invalid Port key:" + keyBuf
+ + " "
+ + ByteArrayUtil.toHexStringBuffer(keyBuf.array(), ":"));
+ }
+ pair[1] = keyBuf.getLong();
+ return pair;
+
+ }
+
+ public static long getDpidFromKey(final byte[] key) {
+ return getPortPairFromKey(key)[0];
+ }
+
+ public static long getNumberFromKey(final byte[] key) {
+ return getPortPairFromKey(key)[1];
+ }
+
+ // FIXME specify DPID,number here, or Should caller specify the key it self?
+ // In other words, should layer above have the control of the ID?
+ public KVPort(final Long dpid, final Long number) {
+ super(DataStoreClient.getClient().getTable(GLOBAL_PORT_TABLE_NAME), getPortID(dpid, number));
+
+ // TODO Auto-generated constructor stub
+
+ this.dpid = dpid;
+ this.number = number;
+ this.status = STATUS.INACTIVE;
+ }
+
+ /**
+ * Get an instance from Key.
+ *
+ * @note You need to call `read()` to get the DB content.
+ * @param key
+ * @return KVPort instance
+ */
+ public static KVPort createFromKey(final byte[] key) {
+ long[] pair = getPortPairFromKey(key);
+ return new KVPort(pair[0], pair[1]);
+ }
+
+ public static Iterable<KVPort> getAllPorts() {
+ return new PortEnumerator();
+ }
+
+ public static class PortEnumerator implements Iterable<KVPort> {
+
+ @Override
+ public Iterator<KVPort> iterator() {
+ return new PortIterator();
+ }
+ }
+
+ public static class PortIterator extends AbstractObjectIterator<KVPort> {
+
+ public PortIterator() {
+ super(DataStoreClient.getClient().getTable(GLOBAL_PORT_TABLE_NAME));
+ }
+
+ @Override
+ public KVPort next() {
+ IKVEntry o = enumerator.next();
+ KVPort e = KVPort.createFromKey(o.getKey());
+ e.deserialize(o.getValue(), o.getVersion());
+ return e;
+ }
+ }
+
+ public STATUS getStatus() {
+ return status;
+ }
+
+ public void setStatus(final STATUS status) {
+ this.status = status;
+ }
+
+ public Long getDpid() {
+ return dpid;
+ }
+
+ public Long getNumber() {
+ return number;
+ }
+
+ public byte[] getId() {
+ return getKey();
+ }
+
+ @Override
+ public byte[] serialize() {
+ Map<Object, Object> map = getPropertyMap();
+
+ PortProperty.Builder port = PortProperty.newBuilder();
+ port.setDpid(dpid);
+ port.setNumber(number);
+ port.setStatus(status.ordinal());
+
+ if (!map.isEmpty()) {
+ byte[] propMaps = serializePropertyMap(portKryo.get(), map);
+ port.setValue(ByteString.copyFrom(propMaps));
+ }
+
+ return port.build().toByteArray();
+ }
+
+ @Override
+ protected boolean deserialize(final byte[] bytes) {
+ try {
+ boolean success = true;
+
+ PortProperty port = PortProperty.parseFrom(bytes);
+ byte[] props = port.getValue().toByteArray();
+ success &= deserializePropertyMap(portKryo.get(), props);
+ this.status = STATUS.values()[port.getStatus()];
+
+ return success;
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Deserializing Port: " + this + " failed.", e);
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ // TODO output all properties?
+ return "[" + this.getClass().getSimpleName()
+ + " 0x" + Long.toHexString(dpid) + "@" + number
+ + " STATUS:" + status + "]";
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/topology/KVSwitch.java b/src/main/java/net/onrc/onos/core/datastore/topology/KVSwitch.java
new file mode 100644
index 0000000..eb27f7a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/topology/KVSwitch.java
@@ -0,0 +1,174 @@
+package net.onrc.onos.core.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import net.onrc.onos.core.datastore.DataStoreClient;
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.core.datastore.RCProtos.SwitchProperty;
+import net.onrc.onos.core.datastore.utils.KVObject;
+import net.onrc.onos.ofcontroller.networkgraph.SwitchEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Switch object in data store.
+ *
+ * Note: This class will not maintain invariants.
+ * e.g., It will NOT automatically remove Ports on Switch,
+ * when deleting a Switch.
+ */
+public class KVSwitch extends KVObject {
+ private static final Logger log = LoggerFactory.getLogger(KVSwitch.class);
+
+ private static final ThreadLocal<Kryo> switchKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
+ };
+
+ public static final String GLOBAL_SWITCH_TABLE_NAME = "G:Switch";
+
+ // must not re-order enum members, ordinal will be sent over wire
+ public enum STATUS {
+ INACTIVE, ACTIVE;
+ }
+
+ private final Long dpid;
+ private STATUS status;
+
+ public static byte[] getSwitchID(final Long dpid) {
+ return SwitchEvent.getSwitchID(dpid).array();
+ }
+
+ public static long getDpidFromKey(final byte[] key) {
+ return getDpidFromKey(ByteBuffer.wrap(key));
+ }
+
+ public static long getDpidFromKey(final ByteBuffer keyBuf) {
+ if (keyBuf.getChar() != 'S') {
+ throw new IllegalArgumentException("Invalid Switch key");
+ }
+ return keyBuf.getLong();
+ }
+
+ // FIXME specify DPID here, or Should caller specify the key it self?
+ // In other words, should layer above have the control of the ID?
+ public KVSwitch(final Long dpid) {
+ super(DataStoreClient.getClient().getTable(GLOBAL_SWITCH_TABLE_NAME), getSwitchID(dpid));
+
+ this.dpid = dpid;
+ this.status = STATUS.INACTIVE;
+ }
+
+ /**
+ * Get an instance from Key.
+ *
+ * @note You need to call `read()` to get the DB content.
+ * @param key
+ * @return KVSwitch instance
+ */
+ public static KVSwitch createFromKey(final byte[] key) {
+ return new KVSwitch(getDpidFromKey(key));
+ }
+
+ public static Iterable<KVSwitch> getAllSwitches() {
+ return new SwitchEnumerator();
+ }
+
+ public static class SwitchEnumerator implements Iterable<KVSwitch> {
+
+ @Override
+ public Iterator<KVSwitch> iterator() {
+ return new SwitchIterator();
+ }
+ }
+
+ public static class SwitchIterator extends AbstractObjectIterator<KVSwitch> {
+
+ public SwitchIterator() {
+ super(DataStoreClient.getClient().getTable(GLOBAL_SWITCH_TABLE_NAME));
+ }
+
+ @Override
+ public KVSwitch next() {
+ IKVEntry o = enumerator.next();
+ KVSwitch e = KVSwitch.createFromKey(o.getKey());
+ e.deserialize(o.getValue(), o.getVersion());
+ return e;
+ }
+ }
+
+ public STATUS getStatus() {
+ return status;
+ }
+
+ public void setStatus(final STATUS status) {
+ this.status = status;
+ }
+
+ public Long getDpid() {
+ return dpid;
+ }
+
+ public byte[] getId() {
+ return getKey();
+ }
+
+ @Override
+ public byte[] serialize() {
+ Map<Object, Object> map = getPropertyMap();
+
+ SwitchProperty.Builder sw = SwitchProperty.newBuilder();
+ sw.setDpid(dpid);
+ sw.setStatus(status.ordinal());
+
+ if (!map.isEmpty()) {
+ byte[] propMaps = serializePropertyMap(switchKryo.get(), map);
+ sw.setValue(ByteString.copyFrom(propMaps));
+ }
+
+ return sw.build().toByteArray();
+ }
+
+ @Override
+ protected boolean deserialize(final byte[] bytes) {
+ try {
+ boolean success = true;
+
+ SwitchProperty sw = SwitchProperty.parseFrom(bytes);
+ byte[] props = sw.getValue().toByteArray();
+ success &= deserializePropertyMap(switchKryo.get(), props);
+ this.status = STATUS.values()[sw.getStatus()];
+
+ return success;
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Deserializing Switch: " + this + " failed.", e);
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ // TODO output all properties?
+ return "[" + this.getClass().getSimpleName()
+ + " 0x" + Long.toHexString(dpid) + " STATUS:" + status + "]";
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/utils/ByteArrayComparator.java b/src/main/java/net/onrc/onos/core/datastore/utils/ByteArrayComparator.java
new file mode 100644
index 0000000..f937e50
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/utils/ByteArrayComparator.java
@@ -0,0 +1,24 @@
+package net.onrc.onos.core.datastore.utils;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+/**
+ * Comparator which will compares the content of byte[].
+ *
+ * Expected to be used with TreeMap, etc. when you want to use byte[] as a key.
+ */
+public final class ByteArrayComparator implements Comparator<byte[]> {
+
+ /**
+ * Instance which can be used, if you want to avoid instantiation per Map.
+ */
+ public static final ByteArrayComparator BYTEARRAY_COMPARATOR = new ByteArrayComparator();
+
+ @Override
+ public int compare(final byte[] o1, final byte[] o2) {
+ final ByteBuffer b1 = ByteBuffer.wrap(o1);
+ final ByteBuffer b2 = ByteBuffer.wrap(o2);
+ return b1.compareTo(b2);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/utils/ByteArrayUtil.java b/src/main/java/net/onrc/onos/core/datastore/utils/ByteArrayUtil.java
new file mode 100644
index 0000000..14cd419
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/utils/ByteArrayUtil.java
@@ -0,0 +1,53 @@
+package net.onrc.onos.core.datastore.utils;
+
+import java.nio.ByteBuffer;
+
+public class ByteArrayUtil {
+
+ // Suppresses default constructor, ensuring non-instantiability.
+ private ByteArrayUtil() {}
+
+ /**
+ * Returns a StringBuffer with each byte in {@code bytes}
+ * converted to a String with {@link Integer#toHexString(int)},
+ * separated by {@code sep}.
+ *
+ * @param bytes byte array to convert
+ * @param sep separator between each bytes
+ * @return {@code bytes} converted to a StringBuffer
+ */
+ public static StringBuffer toHexStringBuffer(final byte[] bytes,
+ final String sep) {
+ return toHexStringBuffer(bytes, sep, new StringBuffer());
+ }
+
+ /**
+ * Returns a StringBuffer with each byte in {@code bytes}
+ * converted to a String with {@link Integer#toHexString(int)},
+ * separated by {@code sep}.
+ *
+ * @param bytes byte array to convert
+ * @param sep separator between each bytes
+ * @param buf StringBuffer to append to.
+ * @return {@code buf}
+ */
+ public static StringBuffer toHexStringBuffer(final byte[] bytes,
+ final String sep, final StringBuffer buf) {
+ if (bytes == null) {
+ return buf;
+ }
+
+ ByteBuffer wrap = ByteBuffer.wrap(bytes);
+
+ boolean hasWritten = false;
+ while (wrap.hasRemaining()) {
+ if (hasWritten) {
+ buf.append(sep);
+ }
+ buf.append(Integer.toHexString(wrap.get()));
+ hasWritten = true;
+ }
+
+ return buf;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/utils/KVObject.java b/src/main/java/net/onrc/onos/core/datastore/utils/KVObject.java
new file mode 100644
index 0000000..522ced0
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/utils/KVObject.java
@@ -0,0 +1,470 @@
+package net.onrc.onos.core.datastore.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import net.onrc.onos.core.datastore.DataStoreClient;
+import net.onrc.onos.core.datastore.IKVClient;
+import net.onrc.onos.core.datastore.IKVTable;
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.IMultiEntryOperation;
+import net.onrc.onos.core.datastore.IMultiObjectOperation;
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.datastore.ObjectExistsException;
+import net.onrc.onos.core.datastore.WrongVersionException;
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Class to represent an Object represented as a single K-V pair Value blob.
+ *
+ */
+public class KVObject {
+ private static final Logger log = LoggerFactory.getLogger(KVObject.class);
+
+ // Default Kryo serializer.
+ // each sub-class should prepare their own serializer, which has required
+ // objects registered for better performance.
+ private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ // kryo.setRegistrationRequired(true);
+ // kryo.setReferences(false);
+ return kryo;
+ }
+ };
+
+ private final IKVTable table;
+ private final byte[] key;
+
+ /**
+ * serialized value version stored on data store or
+ * {@link IKVTable.VERSION_NONEXISTENT()} if is a new object.
+ */
+ private long version;
+
+ /**
+ * Map to store user-defined properties
+ */
+ private Map<Object, Object> propertyMap;
+
+ public KVObject(final IKVTable table, final byte[] key) {
+ this(table, key, null, table.VERSION_NONEXISTENT());
+ }
+
+ public KVObject(final IKVTable table, final byte[] key, final byte[] value, final long version) {
+ if (table == null) {
+ throw new IllegalArgumentException("table cannot be null");
+ }
+ if (key == null) {
+ throw new IllegalArgumentException("key cannot be null");
+ }
+ this.table = table;
+ this.key = key;
+ this.version = version;
+ this.propertyMap = new HashMap<Object, Object>();
+
+ if (value != null) {
+ deserialize(value);
+ }
+ }
+
+ protected static KVObject createFromKey(final byte[] key) {
+ // Equivalent of this method is expected to be implemented by SubClasses
+ throw new UnsupportedOperationException(
+ "createFromKey() is not expected to be called for RCObject");
+ }
+
+ public IKVTable getTable() {
+ return table;
+ }
+
+ public IKVTableID getTableId() {
+ return table.getTableId();
+ }
+
+ public byte[] getKey() {
+ return key;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+
+ /**
+ * Return user-defined object properties.
+ *
+ * @note Will not trigger deserialization
+ * @return Will return null, if never been set, or was not deserialized
+ */
+ protected Map<Object, Object> getPropertyMap() {
+ return this.propertyMap;
+ }
+
+ protected Map<Object, Object> replacePropertyMap(final Map<Object, Object> newMap) {
+ Map<Object, Object> oldMap = this.propertyMap;
+ this.propertyMap = newMap;
+ return oldMap;
+ }
+
+ /**
+ * Serialize object.
+ *
+ * sub-classes should override this method to customize serialization.
+ *
+ * @return serialized byte array
+ */
+ public byte[] serialize() {
+ return serializePropertyMap(defaultKryo.get(), this.propertyMap);
+ }
+
+ protected byte[] serializePropertyMap(final Kryo kryo,
+ final Map<Object, Object> propMap) {
+
+
+ // value
+ byte[] rcTemp = new byte[1024 * 1024];
+ Output output = new Output(rcTemp);
+ kryo.writeObject(output, propMap);
+ return output.toBytes();
+ }
+
+
+ /**
+ * Deserialize using value and version stored in data store.
+ *
+ * @param bytes serialized bytes
+ * @param version version of this {@code bytes}
+ * @return true if success
+ */
+ public boolean deserialize(final byte[] bytes, final long version) {
+ this.version = version;
+ return deserialize(bytes);
+ }
+
+ /**
+ * Deserialize object.
+ *
+ * sub-classes should override this method to customize deserialization.
+ *
+ * @param bytes serialized byte array
+ * @return true if success
+ */
+ protected boolean deserialize(final byte[] bytes) {
+ deserializePropertyMap(defaultKryo.get(), bytes);
+ return true;
+ }
+
+ /**
+ * Deserialize and set {@link propertyMap}.
+ * @param kryo serializer to use
+ * @param bytes Kryo serialized Map object
+ * @return true if success
+ */
+ protected boolean deserializePropertyMap(final Kryo kryo, final byte[] bytes) {
+ @SuppressWarnings("unchecked")
+ Map<Object, Object> map = deserializePropertyMap(kryo, bytes, HashMap.class);
+ if (map == null) {
+ map = new HashMap<>();
+ }
+ this.propertyMap = map;
+ return true;
+ }
+
+ protected <T extends Map<?, ?>> T deserializePropertyMap(final Kryo kryo,
+ final byte[] bytes, final Class<T> type) {
+
+ if (bytes == null || bytes.length == 0) {
+ return null;
+ }
+
+ Input input = new Input(bytes);
+ T map = kryo.readObject(input, type);
+
+ return map;
+ }
+
+
+ /**
+ * Create an Object in DataStore.
+ *
+ * Fails if the Object with same key already exists.
+ *
+ * @throws ObjectExistsException
+ */
+ public void create() throws ObjectExistsException {
+
+ if (this.propertyMap == null) {
+ log.warn("No object map was set. Setting empty Map.");
+ replacePropertyMap(new HashMap<Object, Object>());
+ }
+
+ this.version = table.create(key, this.serialize());
+ }
+
+ public void forceCreate() {
+
+ if (this.propertyMap == null) {
+ log.warn("No object map was set. Setting empty Map.");
+ replacePropertyMap(new HashMap<Object, Object>());
+ }
+
+ this.version = table.forceCreate(key, this.serialize());
+ }
+
+ /**
+ * Read an Object from DataStore.
+ *
+ * Fails if the Object with the key does not exist.
+ *
+ * @throws ObjectDoesntExistException
+ *
+ */
+ public void read() throws ObjectDoesntExistException {
+ IKVEntry e = table.read(key);
+ deserialize(e.getValue(), e.getVersion());
+ }
+
+ /**
+ * Update an existing Object in DataStore checking versions.
+ *
+ * Fails if the Object with key does not exists, or conditional failure.
+ *
+ * @throws WrongVersionException
+ * @throws ObjectDoesntExistException
+ */
+ public void update() throws ObjectDoesntExistException,
+ WrongVersionException {
+ if (this.propertyMap == null) {
+ replacePropertyMap(new HashMap<Object, Object>());
+ }
+
+ this.version = table.update(key, this.serialize(), version);
+ }
+
+ /**
+ * Remove an existing Object in DataStore.
+ *
+ * Fails if the Object with key does not exists.
+ *
+ * @throws ObjectDoesntExistException
+ * @throws WrongVersionException
+ */
+ public void delete() throws ObjectDoesntExistException,
+ WrongVersionException {
+ this.version = table.delete(key, this.version);
+ }
+
+ public void forceDelete() {
+ this.version = table.forceDelete(key);
+ }
+
+ public WriteOp forceCreateOp(IKVClient client) {
+ return new WriteOp(client.forceCreateOp(getTableId(), getKey(), serialize()), this);
+ }
+
+ public WriteOp createOp(IKVClient client) {
+ return new WriteOp(client.createOp(getTableId(), getKey(), serialize()), this);
+ }
+
+ // this might not be needed?
+ public WriteOp readOp(IKVClient client) {
+ return new WriteOp(client.readOp(getTableId(), getKey()), this);
+ }
+
+ public WriteOp updateOp(IKVClient client) {
+ return new WriteOp(client.updateOp(getTableId(), getKey(), serialize(), getVersion()), this);
+ }
+
+ public WriteOp deleteOp(IKVClient client) {
+ return new WriteOp(client.deleteOp(getTableId(), getKey(), serialize(), getVersion()), this);
+ }
+
+ public WriteOp forceDeleteOp(IKVClient client) {
+ return new WriteOp(client.forceDeleteOp(getTableId(), getKey()), this);
+ }
+
+ /**
+ * Multi-read RCObjects.
+ *
+ * If the blob value was read successfully, RCObject will deserialize them.
+ *
+ * @param objects
+ * RCObjects to read
+ * @return true if there exist a failed read.
+ */
+ public static boolean multiRead(final List<? extends KVObject> objects) {
+
+ final IKVClient client = DataStoreClient.getClient();
+
+ final ArrayList<IMultiEntryOperation> readOps = new ArrayList<>(objects.size());
+ for (KVObject o : objects) {
+ readOps.add(o.readOp(client));
+ }
+
+ boolean failExists = client.multiRead(readOps);
+
+ for (int i = 0; i < readOps.size(); ++i) {
+ KVObject obj = objects.get(i);
+ IMultiEntryOperation entry = readOps.get(i);
+ if ( entry.hasSucceeded() ) {
+ if ( !obj.deserialize(entry.getValue(), entry.getVersion()) ) {
+ //deserialize return true on success
+ failExists = true;
+ log.error("MultiRead error, failed to deserialize {}, {}", obj.getTable(), obj);
+ }
+ } else {
+ log.error("MultiRead error, skipping {}, {}", obj.getTable(), obj);
+ obj.version = obj.getTable().VERSION_NONEXISTENT();
+ failExists = true;
+ }
+ }
+
+ return failExists;
+ }
+
+ /**
+ * TODO Extract common interface
+ */
+ public static class WriteOp implements IMultiObjectOperation, IModifiableMultiEntryOperation {
+
+ private final IModifiableMultiEntryOperation base;
+ private final KVObject obj;
+
+ public WriteOp(IMultiEntryOperation base, final KVObject obj) {
+ this.base = (IModifiableMultiEntryOperation) base;
+ this.obj = obj;
+
+ // switch (base.getOperation()) {
+ // case CREATE:
+ // case FORCE_CREATE:
+ // case UPDATE:
+ // break;
+ // default:
+ // throw new UnsupportedOperationException("Unexpected OPERATION:"+base.getOperation());
+ // }
+ }
+
+ @Override
+ public KVObject getObject() {
+ return obj;
+ }
+
+ @Deprecated
+ public OPERATION getOp() {
+ return this.getOperation();
+ }
+
+ @Override
+ public boolean hasSucceeded() {
+ return base.hasSucceeded();
+ }
+
+ @Override
+ public STATUS getStatus() {
+ return base.getStatus();
+ }
+
+ @Override
+ public IKVTableID getTableId() {
+ return base.getTableId();
+ }
+
+ @Override
+ public byte[] getKey() {
+ return base.getKey();
+ }
+
+ @Override
+ public byte[] getValue() {
+ return base.getValue();
+ }
+
+ @Override
+ public long getVersion() {
+ return base.getVersion();
+ }
+
+ @Override
+ public OPERATION getOperation() {
+ return base.getOperation();
+ }
+
+ @Override
+ public void setStatus(STATUS status) {
+ base.setStatus(status);
+ }
+
+ @Override
+ public void setValue(byte[] value, long version) {
+ base.setValue(value, version);
+ }
+
+ @Override
+ public void setVersion(long version) {
+ base.setVersion(version);
+ this.obj.version = version;
+ }
+
+ @Override
+ public IModifiableMultiEntryOperation getActualOperation() {
+ return base;
+ }
+ }
+
+ public static boolean multiWrite(final List<WriteOp> objects) {
+
+ final IKVClient client = DataStoreClient.getClient();
+
+ final ArrayList<IMultiEntryOperation> writeOps = new ArrayList<>(objects.size());
+ for (WriteOp o : objects) {
+ writeOps.add(o);
+ }
+
+ return client.multiWrite(writeOps);
+ }
+
+ public abstract static class AbstractObjectIterator<E extends KVObject> implements
+ Iterator<E> {
+
+ protected Iterator<IKVEntry> enumerator;
+
+ public AbstractObjectIterator(final IKVTable table) {
+ this.enumerator = table.getAllEntries().iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return enumerator.hasNext();
+ }
+
+ // Implement something similar to below to realize Iterator
+ // @Override
+ // public E next() {
+ // IKVTable.IKVEntry o = enumerator.next();
+ // E obj = E.createFromKey(o.getKey());
+ // obj.deserialize(o.getValue(), o.getVersion());
+ // return obj;
+ // }
+
+ @Deprecated
+ @Override
+ public void remove() {
+ // TODO Not implemented, as I cannot find a use-case for it.
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ }
+
+}