Refactoring datastore package
Preparation to make datastore replacable
- Extract datastore interfaces
- Add multi Read/Write/Remove
- Add a method to walk over RCTable
- Refactor serialize/deserialize RCObject
- Localize dependency to JRAMCloud
- Separate RAMCloud specific code into ramcloud package
- Remove dependency to RAMCloud exception classes
- Remove RC prefix from non RAMCloud specific code
- Cosmetics and update sample/test code
- reflect Naoki's comment
- more cosmetic fixes
- reordered OPERATION enums
- removed no longer used code
- follow pmd, etc. where easily possible
Change-Id: I6f9153d705600447acf48a64f713c654c9f26713
diff --git a/src/main/cpp/edu_stanford_ramcloud_JRamCloud.cc b/src/main/cpp/edu_stanford_ramcloud_JRamCloud.cc
index 830fd47..775674e 100644
--- a/src/main/cpp/edu_stanford_ramcloud_JRamCloud.cc
+++ b/src/main/cpp/edu_stanford_ramcloud_JRamCloud.cc
@@ -497,7 +497,7 @@
{
RamCloud* ramcloud = getRamCloud(env, jRamCloud);
JByteArrayReference key(env, jKey);
- uint64_t version;
+ uint64_t version = VERSION_NONEXISTENT;
try {
ramcloud->remove(jTableId, key.pointer, key.length, NULL, &version);
} EXCEPTION_CATCHER(-1);
@@ -532,7 +532,7 @@
JByteArrayReference key(env, jKey);
RejectRules rules = {};
setRejectRules(env, jRejectRules, rules);
- uint64_t version;
+ uint64_t version = VERSION_NONEXISTENT;
try {
ramcloud->remove(jTableId, key.pointer, key.length, &rules, &version);
} EXCEPTION_CATCHER(-1);
diff --git a/src/main/java/net/onrc/onos/datastore/DataStoreClient.java b/src/main/java/net/onrc/onos/datastore/DataStoreClient.java
new file mode 100644
index 0000000..88daa8e
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/DataStoreClient.java
@@ -0,0 +1,10 @@
+package net.onrc.onos.datastore;
+
+import net.onrc.onos.datastore.ramcloud.RCClient;
+
+public class DataStoreClient {
+ public static IKVClient getClient() {
+ // TODO read config and return appropriate IKVClient
+ return RCClient.getClient();
+ }
+}
diff --git a/src/main/java/net/onrc/onos/datastore/IKVClient.java b/src/main/java/net/onrc/onos/datastore/IKVClient.java
new file mode 100644
index 0000000..eba804c
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/IKVClient.java
@@ -0,0 +1,165 @@
+package net.onrc.onos.datastore;
+
+import java.util.Collection;
+import java.util.List;
+
+import net.onrc.onos.datastore.IKVTable.IKVEntry;
+
+/**
+ * Interface for a client class used to access the Key-Value store
+ */
+public interface IKVClient {
+
+ public IKVTable getTable(final String tableName);
+
+ /**
+ * Drop table.
+ *
+ * Behavior of IKVTable instances accessing dropped table is undefined.
+ *
+ * @param table IKVTable to drop.
+ */
+ public void dropTable(IKVTable table);
+
+ /**
+ * Create a Key-Value entry on table.
+ *
+ * @param tableId
+ * @param key
+ * @param value
+ * @return version of the created entry
+ * @throws ObjectExistsException
+ */
+ public long create(IKVTableID tableId, byte[] key, byte[] value) throws ObjectExistsException;
+
+ /**
+ * Create a Key-Value entry on table, without existence checking.
+ *
+ * @param tableId
+ * @param key
+ * @param value
+ * @return version of the created entry
+ */
+ public long forceCreate(IKVTableID tableId, byte[] key, byte[] value);
+
+ /**
+ * Read a Key-Value entry from table.
+ *
+ * @param tableId
+ * @param key
+ * @return Corresponding {@link IKVEntry}
+ * @throws ObjectDoesntExistException
+ */
+ public IKVEntry read(IKVTableID tableId, byte[] key) throws ObjectDoesntExistException;
+
+ /**
+ * Update an existing Key-Value entry in table.
+ *
+ * @param tableId
+ * @param key
+ * @param value
+ * @param version
+ * expected version in the data store
+ * @return version after update
+ * @throws ObjectDoesntExistException
+ * @throws WrongVersionException
+ */
+ public long update(IKVTableID tableId, byte[] key, byte[] value, long version)
+ throws ObjectDoesntExistException, WrongVersionException;
+
+ /**
+ * Update an existing Key-Value entry in table, without checking version.
+ *
+ * FIXME remove this method and use forceCreate for this purpose?
+ * @param tableId
+ * @param key
+ * @param value
+ * @return version after update
+ * @throws ObjectDoesntExistException
+ */
+ @Deprecated
+ public long update(IKVTableID tableId, byte[] key, byte[] value)
+ throws ObjectDoesntExistException;
+
+ // TODO Adding serialized value as parameter to this interface may
+ // give an option to improve performance on some backends.
+ /**
+ * Remove an existing Key-Value entry in table
+ *
+ * @param tableId
+ * @param key
+ * @param version
+ * expected version in the data store
+ * @return version of removed object
+ * @throws ObjectDoesntExistException
+ * @throws WrongVersionException
+ */
+ public long delete(IKVTableID tableId, byte[] key, long version)
+ throws ObjectDoesntExistException, WrongVersionException;
+
+ /**
+ * Remove a Key-Value entry in table
+ *
+ * @param tableId
+ * @param key
+ * @return version of removed object or -1, if it did not exist.
+ */
+ public long forceDelete(IKVTableID tableId, byte[] key);
+
+ /**
+ * Get all the entries in table.
+ * @param tableId
+ * @return entries in this table.
+ */
+ public Iterable<IKVEntry> getAllEntries(IKVTableID tableId);
+
+ /**
+ *
+ * @see #create(IKVTableID, byte[], byte[])
+ *
+ * @return IMultiOpEntry for this operation
+ *
+ */
+ public IMultiEntryOperation createOp(IKVTableID tableId, byte[] key, byte[] value);
+
+ public IMultiEntryOperation forceCreateOp(IKVTableID tableId, byte[] key,
+ byte[] value);
+
+ public IMultiEntryOperation readOp(IKVTableID tableId, byte[] key);
+
+ public IMultiEntryOperation updateOp(IKVTableID tableId, byte[] key, byte[] value,
+ long version);
+
+ public IMultiEntryOperation deleteOp(IKVTableID tableId, byte[] key, byte[] value,
+ long version);
+
+ public IMultiEntryOperation forceDeleteOp(IKVTableID tableId, byte[] key);
+
+ /**
+ * Batch delete operation
+ * @param ops delete operations
+ * @return true if failed operation exists
+ */
+ public boolean multiDelete(final Collection<IMultiEntryOperation> ops);
+
+ /**
+ * Batch write operation
+ * @param ops write operations
+ * @return true if failed operation exists
+ */
+ public boolean multiWrite(final List<IMultiEntryOperation> ops);
+
+ /**
+ * Batch read operation
+ * @param ops read operations
+ * @return true if failed operation exists
+ */
+ public boolean multiRead(final Collection<IMultiEntryOperation> ops);
+
+ /**
+ * Version number which represents that the object does not exist, or has
+ * never been read the DB before.
+ */
+ public long VERSION_NONEXISTENT();
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/IKVTable.java b/src/main/java/net/onrc/onos/datastore/IKVTable.java
new file mode 100644
index 0000000..0164293
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/IKVTable.java
@@ -0,0 +1,112 @@
+package net.onrc.onos.datastore;
+
+
+/**
+ * Interface for a class to represent a Table in a Key-Value store
+ */
+public interface IKVTable {
+
+ /**
+ * Version number which represents that the object does not exist, or has
+ * never been read the DB before.
+ */
+ public long VERSION_NONEXISTENT();
+
+ /**
+ * Interface for a class to represent an entry in Table.
+ */
+ public static interface IKVEntry {
+
+ public byte[] getKey();
+
+ public byte[] getValue();
+
+ public long getVersion();
+
+ }
+
+ /**
+ * @return ID to identify this table.
+ */
+ public IKVTableID getTableId();
+
+ /**
+ * Create a Key-Value entry on table.
+ *
+ * @param key
+ * @param value
+ * @return version of the created entry
+ * @throws ObjectExistsException
+ */
+ public long create(byte[] key, byte[] value) throws ObjectExistsException;
+
+ /**
+ * Create a Key-Value entry on table, without existence checking.
+ *
+ * @param key
+ * @param value
+ * @return version of the created entry
+ */
+ public long forceCreate(byte[] key, byte[] value);
+
+ /**
+ * Read a Key-Value entry from table.
+ *
+ * @param key
+ * @return Corresponding {@link IKVEntry}
+ * @throws ObjectDoesntExistException
+ */
+ public IKVEntry read(byte[] key) throws ObjectDoesntExistException;
+
+ /**
+ * Update an existing Key-Value entry in table.
+ *
+ * @param key
+ * @param value
+ * @param version
+ * expected version in the data store
+ * @return version after update
+ * @throws ObjectDoesntExistException
+ * @throws WrongVersionException
+ */
+ public long update(byte[] key, byte[] value, long version)
+ throws ObjectDoesntExistException, WrongVersionException;
+
+ /**
+ * Update an existing Key-Value entry in table, without checking version.
+ *
+ * @param key
+ * @param value
+ * @return version after update
+ * @throws ObjectDoesntExistException
+ */
+ public long update(byte[] key, byte[] value)
+ throws ObjectDoesntExistException;
+
+ /**
+ * Remove an existing Key-Value entry in table
+ *
+ * @param key
+ * @param version
+ * expected version in the data store
+ * @return version of removed object
+ * @throws ObjectDoesntExistException
+ * @throws WrongVersionException
+ */
+ public long delete(byte[] key, long version)
+ throws ObjectDoesntExistException, WrongVersionException;
+
+ /**
+ * Remove a Key-Value entry in table
+ *
+ * @param key
+ * @return version of removed object or VERSION_NONEXISTENT, if it did not exist.
+ */
+ public long forceDelete(byte[] key);
+
+ /**
+ * Get all the entries in table.
+ * @return entries in this table.
+ */
+ public Iterable<IKVEntry> getAllEntries();
+}
\ No newline at end of file
diff --git a/src/main/java/net/onrc/onos/datastore/IKVTableID.java b/src/main/java/net/onrc/onos/datastore/IKVTableID.java
new file mode 100644
index 0000000..d66370c
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/IKVTableID.java
@@ -0,0 +1,7 @@
+package net.onrc.onos.datastore;
+
+public interface IKVTableID {
+
+ public String getTableName();
+
+}
\ No newline at end of file
diff --git a/src/main/java/net/onrc/onos/datastore/IMultiEntryOperation.java b/src/main/java/net/onrc/onos/datastore/IMultiEntryOperation.java
new file mode 100644
index 0000000..25d5a13
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/IMultiEntryOperation.java
@@ -0,0 +1,30 @@
+package net.onrc.onos.datastore;
+
+/**
+ * Interface for a class to specify which K-V pair to batch read/write/delete.
+ */
+public interface IMultiEntryOperation {
+
+ public enum STATUS {
+ NOT_EXECUTED, SUCCESS, FAILED
+ }
+
+ public enum OPERATION {
+ CREATE, FORCE_CREATE, UPDATE, READ, DELETE, FORCE_DELETE
+ }
+
+ public boolean hasSucceeded();
+
+ public STATUS getStatus();
+
+ public IKVTableID getTableId();
+
+ public byte[] getKey();
+
+ public byte[] getValue();
+
+ public long getVersion();
+
+ public OPERATION getOperation();
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/IMultiObjectOperation.java b/src/main/java/net/onrc/onos/datastore/IMultiObjectOperation.java
new file mode 100644
index 0000000..95e4e2a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/IMultiObjectOperation.java
@@ -0,0 +1,9 @@
+package net.onrc.onos.datastore;
+
+import net.onrc.onos.datastore.utils.KVObject;
+
+public interface IMultiObjectOperation {
+
+ public KVObject getObject();
+
+}
\ No newline at end of file
diff --git a/src/main/java/net/onrc/onos/datastore/ObjectDoesntExistException.java b/src/main/java/net/onrc/onos/datastore/ObjectDoesntExistException.java
new file mode 100644
index 0000000..712ba4e
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/ObjectDoesntExistException.java
@@ -0,0 +1,25 @@
+package net.onrc.onos.datastore;
+
+import net.onrc.onos.datastore.utils.ByteArrayUtil;
+
+/**
+ * Exception thrown when object was expected, but not found in data store.
+ */
+public class ObjectDoesntExistException extends RejectRulesException {
+ private static final long serialVersionUID = 859082748533417866L;
+
+ public ObjectDoesntExistException(final String message) {
+ super(message);
+ }
+
+ public ObjectDoesntExistException(final IKVTableID tableID,
+ final byte[] key, final Throwable cause) {
+ super(ByteArrayUtil.toHexStringBuffer(key, ":")
+ + " did not exist on table:" + tableID, cause);
+ }
+
+ public ObjectDoesntExistException(final IKVTableID tableID, final byte[] key) {
+ super(ByteArrayUtil.toHexStringBuffer(key, ":")
+ + " did not exist on table:" + tableID);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/datastore/ObjectExistsException.java b/src/main/java/net/onrc/onos/datastore/ObjectExistsException.java
new file mode 100644
index 0000000..3ac3bf8
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/ObjectExistsException.java
@@ -0,0 +1,25 @@
+package net.onrc.onos.datastore;
+
+import net.onrc.onos.datastore.utils.ByteArrayUtil;
+
+/**
+ * Exception thrown when object was not expected to be in data store.
+ */
+public class ObjectExistsException extends RejectRulesException {
+ private static final long serialVersionUID = -1488647215779909457L;
+
+ public ObjectExistsException(final String message) {
+ super(message);
+ }
+
+ public ObjectExistsException(final IKVTableID tableID, final byte[] key,
+ final Throwable cause) {
+ super(ByteArrayUtil.toHexStringBuffer(key, ":")
+ + " already exist on table:" + tableID, cause);
+ }
+
+ public ObjectExistsException(final IKVTableID tableID, final byte[] key) {
+ super(ByteArrayUtil.toHexStringBuffer(key, ":")
+ + " already exist on table:" + tableID);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/datastore/RCClient.java b/src/main/java/net/onrc/onos/datastore/RCClient.java
deleted file mode 100644
index a918f02..0000000
--- a/src/main/java/net/onrc/onos/datastore/RCClient.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package net.onrc.onos.datastore;
-
-import java.io.File;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-
-import edu.stanford.ramcloud.JRamCloud;
-
-public class RCClient {
-
- private static final String DB_CONFIG_FILE = "conf/ramcloud.conf";
- public static final Configuration config = getConfiguration();
-
- // Value taken from RAMCloud's Status.h
- // FIXME These constants should be defined by JRamCloud
- public static final int STATUS_OK = 0;
-
- // FIXME come up with a proper way to retrieve configuration
- public static final int MAX_MULTI_READS = Math.max(1, Integer
- .valueOf(System.getProperty("ramcloud.max_multi_reads", "400")));
-
- public static final int MAX_MULTI_WRITES = Math.max(1, Integer
- .valueOf(System.getProperty("ramcloud.max_multi_writes", "800")));
-
- private static final ThreadLocal<JRamCloud> tlsRCClient = new ThreadLocal<JRamCloud>() {
- @Override
- protected JRamCloud initialValue() {
- return new JRamCloud(getCoordinatorUrl(config));
- }
- };
-
- /**
- * @return JRamCloud instance intended to be used only within the
- * SameThread.
- * @note Do not store the returned instance in a member variable, etc. which
- * may be accessed later by another thread.
- */
- static JRamCloud getClient() {
- return tlsRCClient.get();
- }
-
- public static final Configuration getConfiguration() {
- final File configFile = new File(System.getProperty("ramcloud.config.path", DB_CONFIG_FILE));
- return getConfiguration(configFile);
- }
-
- public static final Configuration getConfiguration(final File configFile) {
- if (configFile == null) {
- throw new IllegalArgumentException("Need to specify a configuration file or storage directory");
- }
-
- if (!configFile.isFile()) {
- throw new IllegalArgumentException("Location of configuration must be a file");
- }
-
- try {
- return new PropertiesConfiguration(configFile);
- } catch (ConfigurationException e) {
- throw new IllegalArgumentException("Could not load configuration at: " + configFile, e);
- }
- }
-
- public static String getCoordinatorUrl(final Configuration configuration) {
- final String coordinatorIp = configuration.getString("ramcloud.coordinatorIp", "fast+udp:host=127.0.0.1");
- final String coordinatorPort = configuration.getString("ramcloud.coordinatorPort", "port=12246");
- final String coordinatorURL = coordinatorIp + "," + coordinatorPort;
- return coordinatorURL;
- }
-}
diff --git a/src/main/java/net/onrc/onos/datastore/RCObject.java b/src/main/java/net/onrc/onos/datastore/RCObject.java
deleted file mode 100644
index 009b584..0000000
--- a/src/main/java/net/onrc/onos/datastore/RCObject.java
+++ /dev/null
@@ -1,502 +0,0 @@
-package net.onrc.onos.datastore;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import net.onrc.onos.datastore.RCObject.WriteOp.STATUS;
-import net.onrc.onos.datastore.RCTable.Entry;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-import edu.stanford.ramcloud.JRamCloud;
-import edu.stanford.ramcloud.JRamCloud.MultiReadObject;
-import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
-import edu.stanford.ramcloud.JRamCloud.MultiWriteRspObject;
-import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
-import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
-import edu.stanford.ramcloud.JRamCloud.RejectRules;
-import edu.stanford.ramcloud.JRamCloud.TableEnumerator2;
-import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
-
-/**
- * Class to represent an Object represented as a single K-V pair Value blob.
- *
- */
-public class RCObject {
- private static final Logger log = LoggerFactory.getLogger(RCObject.class);
- /**
- * Version number which represents that the object doesnot exist, or hase
- * never read the DB before.
- */
- public static final long VERSION_NONEXISTENT = 0L;
-
- // Each Object should prepare their own serializer, which has required
- // objects registered.
- private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
- @Override
- protected Kryo initialValue() {
- Kryo kryo = new Kryo();
- // kryo.setRegistrationRequired(true);
- // TODO TreeMap or just Map
- // kryo.register(TreeMap.class);
- kryo.setReferences(false);
- return kryo;
- }
- };
-
- private final RCTable table;
- private final byte[] key;
- protected byte[] value; //FIXME should not be exposed
- private long version;
-
- private Map<Object, Object> propertyMap;
-
- public RCObject(RCTable table, byte[] key) {
- this(table, key, null, VERSION_NONEXISTENT);
- }
-
- public RCObject(RCTable table, byte[] key, byte[] value, long version) {
- if (table == null) {
- throw new IllegalArgumentException("table cannot be null");
- }
- if (key == null) {
- throw new IllegalArgumentException("key cannot be null");
- }
- this.table = table;
- this.key = key;
- this.value = value;
- this.version = version;
- this.propertyMap = new HashMap<Object, Object>();
-
- if (this.value != null) {
- deserializeObjectFromValue();
- }
- }
-
- public static <T extends RCObject> T createFromKey(byte[] key) {
- // Equivalent of this method is expected to be implemented by SubClasses
- throw new UnsupportedOperationException(
- "createFromKey() is not expected to be called for RCObject");
- }
-
- public RCTable getTable() {
- return table;
- }
-
- public long getTableId() {
- return table.getTableId();
- }
-
- public byte[] getKey() {
- return key;
- }
-
- /**
- * Get the byte array value of this object
- *
- * @note will trigger serialization, if value was null.
- * @return
- */
- protected byte[] getValue() {
- if (value == null) {
- serializeAndSetValue();
- }
- return value;
- }
-
- public long getVersion() {
- return version;
- }
-
- /**
- * Return serialized Value.
- *
- * @note will not trigger serialization
- * @return Will return null, if never been read, or was not serialized
- */
- public byte[] getSerializedValue() {
- return value;
- }
-
- /**
- * Return Object as a Map.
- *
- * @note Will not trigger deserialization
- * @return Will return null, if never been set, or was not deserialized
- */
- protected Map<Object, Object> getObjectMap() {
- return this.propertyMap;
- }
-
- protected Map<Object, Object> setObjectMap(Map<Object, Object> new_map) {
- Map<Object, Object> old_map = this.propertyMap;
- this.propertyMap = new_map;
- return old_map;
- }
-
- public void serializeAndSetValue() {
- serializeAndSetValue(defaultKryo.get(), this.propertyMap);
- }
-
- protected void serializeAndSetValue(Kryo kryo,
- Map<Object, Object> javaObject) {
-
-
- // value
- byte[] rcTemp = new byte[1024 * 1024];
- Output output = new Output(rcTemp);
- kryo.writeObject(output, javaObject);
- this.value = output.toBytes();
- }
-
- /**
- * Deserialize
- *
- * @return
- */
- public Map<Object, Object> deserializeObjectFromValue() {
- return deserializeObjectFromValue(defaultKryo.get());
- }
-
- protected HashMap<Object, Object> deserializeObjectFromValue(Kryo kryo) {
- return deserializeObjectFromValue(kryo, HashMap.class);
- }
-
- protected <T extends Map> T deserializeObjectFromValue(Kryo kryo,
- Class<T> type) {
- if (this.value == null)
- return null;
-
- Input input = new Input(this.value);
- T map = kryo.readObject(input, type);
- this.propertyMap = map;
-
- return map;
- }
-
- protected void setValueAndDeserialize(byte[] value, long version) {
- this.value = value;
- this.version = version;
- deserializeObjectFromValue();
- }
-
- /**
- * Create an Object in DataStore.
- *
- * Fails if the Object with same key already exists.
- *
- * @throws ObjectExistsException
- */
- public void create() throws ObjectExistsException {
-
- if (this.propertyMap == null) {
- log.warn("No object map was set. Setting empty Map.");
- setObjectMap(new HashMap<Object, Object>());
- }
- serializeAndSetValue();
-
- this.version = table.create(key, value);
- }
-
- public void forceCreate() {
-
- if (this.propertyMap == null) {
- log.warn("No object map was set. Setting empty Map.");
- setObjectMap(new HashMap<Object, Object>());
- }
- serializeAndSetValue();
-
- this.version = table.forceCreate(key, value);
- }
-
- /**
- * Read an Object from DataStore.
- *
- * Fails if the Object with the key does not exist.
- *
- * @throws ObjectDoesntExistException
- *
- */
- public void read() throws ObjectDoesntExistException {
- Entry e = table.read(key);
- // TODO should we deserialize immediately?
- setValueAndDeserialize(e.value, e.version);
- }
-
- /**
- * Update an existing Object in DataStore checking versions.
- *
- * Fails if the Object with key does not exists, or conditional failure.
- *
- * @throws WrongVersionException
- * @throws ObjectDoesntExistException
- */
- public void update() throws ObjectDoesntExistException,
- WrongVersionException {
- if (this.propertyMap == null) {
- setObjectMap(new HashMap<Object, Object>());
- }
- serializeAndSetValue();
-
- this.version = table.update(key, value, version);
- }
-
- /**
- * Remove an existing Object in DataStore.
- *
- * Fails if the Object with key does not exists.
- *
- * @throws ObjectDoesntExistException
- * @throws WrongVersionException
- */
- public void delete() throws ObjectDoesntExistException,
- WrongVersionException {
- this.version = table.delete(key, this.version);
- }
-
- public void forceDelete() {
- this.version = table.forceDelete(key);
- }
-
- /**
- * Multi-read RCObjects.
- *
- * If the blob value was read successfully, RCObject will deserialize them.
- *
- * @param objects
- * RCObjects to read
- * @return true if there exist a failed read.
- */
- public static boolean multiRead(Collection<RCObject> objects) {
- boolean fail_exists = false;
-
- ArrayList<RCObject> req = new ArrayList<>();
- Iterator<RCObject> it = objects.iterator();
- while (it.hasNext()) {
-
- req.add(it.next());
-
- if (req.size() >= RCClient.MAX_MULTI_READS) {
- // dispatch multiRead
- fail_exists |= multiReadInternal(req);
- req.clear();
- }
- }
-
- if (!req.isEmpty()) {
- // dispatch multiRead
- fail_exists |= multiReadInternal(req);
- req.clear();
- }
-
- return fail_exists;
- }
-
- private static boolean multiReadInternal(ArrayList<RCObject> req) {
- boolean fail_exists = false;
- JRamCloud rcClient = RCClient.getClient();
-
- final int reqs = req.size();
-
- MultiReadObject multiReadObjects = new MultiReadObject(req.size());
-
- // setup multi-read operation
- for (int i = 0; i < reqs; ++i) {
- RCObject obj = req.get(i);
- multiReadObjects.setObject(i, obj.getTableId(), obj.getKey());
- }
-
- // execute
- JRamCloud.Object results[] = rcClient.multiRead(multiReadObjects.tableId, multiReadObjects.key, multiReadObjects.keyLength, reqs);
- assert (results.length <= req.size());
-
- // reflect changes to RCObject
- for (int i = 0; i < results.length; ++i) {
- RCObject obj = req.get(i);
- if (results[i] == null) {
- log.error("MultiRead error, skipping {}, {}", obj.getTable(),
- obj);
- fail_exists = true;
- continue;
- }
- assert (Arrays.equals(results[i].key, obj.getKey()));
-
- obj.value = results[i].value;
- obj.version = results[i].version;
- if (obj.version == VERSION_NONEXISTENT) {
- fail_exists = true;
- } else {
- obj.deserializeObjectFromValue();
- }
- }
-
- return fail_exists;
- }
-
- public static class WriteOp {
- public enum STATUS {
- NOT_EXECUTED, SUCCESS, FAILED
- }
-
- public enum OPS {
- CREATE, UPDATE, FORCE_CREATE
- }
-
- private RCObject obj;
- private OPS op;
- private STATUS status;
-
- public static WriteOp Create(RCObject obj) {
- return new WriteOp(obj, OPS.CREATE);
- }
-
- public static WriteOp Update(RCObject obj) {
- return new WriteOp(obj, OPS.UPDATE);
- }
-
- public static WriteOp ForceCreate(RCObject obj) {
- return new WriteOp(obj, OPS.FORCE_CREATE);
- }
-
- public WriteOp(RCObject obj, OPS op) {
- this.obj = obj;
- this.op = op;
- this.status = STATUS.NOT_EXECUTED;
- }
-
- public boolean hasSucceed() {
- return status == STATUS.SUCCESS;
- }
-
- public RCObject getObject() {
- return obj;
- }
-
- public OPS getOp() {
- return op;
- }
-
- public STATUS getStatus() {
- return status;
- }
- }
-
- public static boolean multiWrite(Collection<WriteOp> objects) {
- boolean fail_exists = false;
-
- ArrayList<WriteOp> req = new ArrayList<>();
- Iterator<WriteOp> it = objects.iterator();
- while (it.hasNext()) {
-
- req.add(it.next());
-
- if (req.size() >= RCClient.MAX_MULTI_WRITES) {
- // dispatch multiWrite
- fail_exists |= multiWriteInternal(req);
- req.clear();
- }
- }
-
- if (!req.isEmpty()) {
- // dispatch multiWrite
- fail_exists |= multiWriteInternal(req);
- req.clear();
- }
-
- return fail_exists;
- }
-
- private static boolean multiWriteInternal(ArrayList<WriteOp> ops) {
-
- boolean fail_exists = false;
- MultiWriteObject multiWriteObjects = new MultiWriteObject(ops.size());
- JRamCloud rcClient = RCClient.getClient();
-
- for (int i = 0; i < ops.size(); ++i) {
- WriteOp op = ops.get(i);
- RCObject obj = op.getObject();
-
- RejectRules rules = new RejectRules();
-
- switch (op.getOp()) {
- case CREATE:
- rules.rejectIfExists();
- break;
- case FORCE_CREATE:
- // no reject rule
- break;
- case UPDATE:
- rules.rejectIfDoesntExists();
- rules.rejectIfNeVersion(obj.getVersion());
- break;
- }
- multiWriteObjects.setObject(i, obj.getTableId(), obj.getKey(), obj.getValue(), rules);
- }
-
- MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects.tableId, multiWriteObjects.key, multiWriteObjects.keyLength, multiWriteObjects.value, multiWriteObjects.valueLength, ops.size(), multiWriteObjects.rules);
- assert (results.length == ops.size());
-
- for (int i = 0; i < results.length; ++i) {
- WriteOp op = ops.get(i);
-
- if (results[i] != null
- && results[i].getStatus() == RCClient.STATUS_OK) {
- op.status = STATUS.SUCCESS;
-
- RCObject obj = op.getObject();
- obj.version = results[i].getVersion();
- } else {
- op.status = STATUS.FAILED;
- fail_exists = true;
- }
-
- }
-
- return fail_exists;
- }
-
- public static abstract class ObjectIterator<E extends RCObject> implements
- Iterator<E> {
-
- protected TableEnumerator2 enumerator;
-
- public ObjectIterator(RCTable table) {
- // FIXME workaround for JRamCloud bug. It should have been declared
- // as static class
- JRamCloud c = RCClient.getClient();
- this.enumerator = c.new TableEnumerator2(table.getTableId());
- }
-
- @Override
- public boolean hasNext() {
- return enumerator.hasNext();
- }
-
-// Implement something similar to below to realize Iterator
-// @Override
-// public E next() {
-// JRamCloud.Object o = enumerator.next();
-// E obj = E.createFromKey(o.key);
-// obj.setValueAndDeserialize(o.value, o.version);
-// return obj;
-// }
-
- @Deprecated
- @Override
- public void remove() {
- // TODO Not implemented, as I cannot find a use-case for it.
- throw new UnsupportedOperationException("Not implemented yet");
- }
-
- }
-
-}
diff --git a/src/main/java/net/onrc/onos/datastore/RCTable.java b/src/main/java/net/onrc/onos/datastore/RCTable.java
deleted file mode 100644
index 0617bf7..0000000
--- a/src/main/java/net/onrc/onos/datastore/RCTable.java
+++ /dev/null
@@ -1,257 +0,0 @@
-package net.onrc.onos.datastore;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import edu.stanford.ramcloud.JRamCloud;
-import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
-import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
-import edu.stanford.ramcloud.JRamCloud.RejectRules;
-import edu.stanford.ramcloud.JRamCloud.RejectRulesException;
-import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
-
-/**
- * Class to represent a Table in RAMCloud
- *
- */
-public class RCTable {
- @SuppressWarnings("unused")
- private static final Logger log = LoggerFactory.getLogger(RCTable.class);
-
- private static final ConcurrentHashMap<String, RCTable> table_map = new ConcurrentHashMap<>();
-
- /**
- *
- * @param table
- * Table to drop.
- * @note Instance passed must not be use after successful call.
- *
- */
- public static void dropTable(RCTable table) {
- JRamCloud rcClient = RCClient.getClient();
- // TODO mark the table instance as dropped?
- rcClient.dropTable(table.getTableName());
- table_map.remove(table.getTableName());
- }
-
- public static RCTable getTable(String tableName) {
- RCTable table = table_map.get(tableName);
- if (table == null) {
- RCTable new_table = new RCTable(tableName);
- RCTable existing_table = table_map
- .putIfAbsent(tableName, new_table);
- if (existing_table != null) {
- return existing_table;
- } else {
- return new_table;
- }
- }
- return table;
- }
-
- public static class Entry {
- public final byte[] key;
- public byte[] value;
- public long version;
-
- public Entry(byte[] key, byte[] value, long version) {
- this.key = key;
- this.value = value;
- this.version = version;
- }
- }
-
- // finally the Table itself
-
- private final long rcTableId;
- private final String rcTableName;
-
- // private boolean isDropped = false;
-
- /**
- *
- * @note rcTableName must be unique cluster wide.
- * @param rcTableName
- */
- private RCTable(String rcTableName) {
- JRamCloud rcClient = RCClient.getClient();
-
- this.rcTableName = rcTableName;
-
- // FIXME Is it better to create table here or at getTable
- this.rcTableId = rcClient.createTable(rcTableName);
- }
-
- public long getTableId() {
- return this.rcTableId;
- }
-
- public String getTableName() {
- return this.rcTableName;
- }
-
- // TODO: Enumerate whole table?
-
- /**
- * Create a Key-Value entry on table.
- *
- * @param key
- * @param value
- * @return version of the created entry
- * @throws ObjectExistsException
- */
- public long create(final byte[] key, final byte[] value)
- throws ObjectExistsException {
-
- JRamCloud rcClient = RCClient.getClient();
-
- RejectRules rules = new RejectRules();
- rules.rejectIfExists();
-
- try {
- return rcClient.write(this.rcTableId, key, value, rules);
- } catch (ObjectExistsException e) {
- throw e;
- } catch (RejectRulesException e) {
- log.error("Unexpected RejectRulesException", e);
- return JRamCloud.VERSION_NONEXISTENT;
- }
- }
-
- /**
- * Create a Key-Value entry on table, without existence checking.
- *
- * @param key
- * @param value
- * @return version of the created entry
- */
- public long forceCreate(final byte[] key, final byte[] value) {
- JRamCloud rcClient = RCClient.getClient();
-
- long updated_version = rcClient.write(this.rcTableId, key, value);
- return updated_version;
-
- }
-
- /**
- * Read a Key-Value entry from table.
- *
- * @param key
- * @return Corresponding {@link Entry}
- * @throws ObjectDoesntExistException
- */
- public Entry read(final byte[] key) throws ObjectDoesntExistException {
-
- JRamCloud rcClient = RCClient.getClient();
-
- RejectRules rules = new RejectRules();
- rules.rejectIfDoesntExists();
- try {
- JRamCloud.Object rcObj = rcClient.read(this.rcTableId, key, rules);
- return new Entry(rcObj.key, rcObj.value, rcObj.version);
- } catch (ObjectDoesntExistException e) {
- throw e;
- } catch (RejectRulesException e) {
- log.error("Unexpected RejectRulesException", e);
- return null;
- }
- }
-
- /**
- * Update an existing Key-Value entry in table.
- *
- * @param key
- * @param value
- * @param version
- * expected version in the data store
- * @return version after update
- * @throws ObjectDoesntExistException
- * @throws WrongVersionException
- */
- public long update(final byte[] key, final byte[] value, final long version)
- throws ObjectDoesntExistException, WrongVersionException {
-
- JRamCloud rcClient = RCClient.getClient();
-
- RejectRules rules = new RejectRules();
- rules.rejectIfDoesntExists();
- rules.rejectIfNeVersion(version);
-
- try {
- return rcClient.write(this.rcTableId, key, value, rules);
- } catch (ObjectDoesntExistException|WrongVersionException e) {
- throw e;
- } catch (RejectRulesException e) {
- log.error("Unexpected RejectRulesException", e);
- return JRamCloud.VERSION_NONEXISTENT;
- }
- }
-
- /**
- * Update an existing Key-Value entry in table, without checking version.
- *
- * @param key
- * @param value
- * @return version after update
- * @throws ObjectDoesntExistException
- */
- public long update(final byte[] key, final byte[] value)
- throws ObjectDoesntExistException {
-
- JRamCloud rcClient = RCClient.getClient();
-
- RejectRules rules = new RejectRules();
- rules.rejectIfDoesntExists();
-
- try {
- return rcClient.write(this.rcTableId, key, value, rules);
- } catch (ObjectDoesntExistException e) {
- throw e;
- } catch (RejectRulesException e) {
- log.error("Unexpected RejectRulesException", e);
- return JRamCloud.VERSION_NONEXISTENT;
- }
- }
-
- /**
- * Remove an existing Key-Value entry in table
- *
- * @param key
- * @param version
- * expected version in the data store
- * @return version of removed object
- * @throws ObjectDoesntExistException
- * @throws WrongVersionException
- */
- public long delete(final byte[] key, final long version)
- throws ObjectDoesntExistException, WrongVersionException {
- JRamCloud rcClient = RCClient.getClient();
-
- RejectRules rules = new RejectRules();
- rules.rejectIfDoesntExists();
- rules.rejectIfNeVersion(version);
-
- try {
- return rcClient.remove(this.rcTableId, key, rules);
- } catch (ObjectDoesntExistException|WrongVersionException e) {
- throw e;
- } catch (RejectRulesException e) {
- log.error("Unexpected RejectRulesException", e);
- return JRamCloud.VERSION_NONEXISTENT;
- }
- }
-
- /**
- * Remove a Key-Value entry in table
- *
- * @param key
- * @return version of removed object or -1, if it did not exist.
- */
- public long forceDelete(final byte[] key) {
- JRamCloud rcClient = RCClient.getClient();
- long removed_version = rcClient.remove(this.rcTableId, key);
- return removed_version;
- }
-}
diff --git a/src/main/java/net/onrc/onos/datastore/RejectRulesException.java b/src/main/java/net/onrc/onos/datastore/RejectRulesException.java
new file mode 100644
index 0000000..0bd5072
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/RejectRulesException.java
@@ -0,0 +1,24 @@
+package net.onrc.onos.datastore;
+
+//
+// Not sure if we really need this base class.
+// Just copied hierarchy from RAMCloud.
+//
+/**
+ * Base exception class for conditional write, etc. failure.
+ */
+public class RejectRulesException extends Exception {
+ private static final long serialVersionUID = -1444683012320423530L;
+
+ public RejectRulesException(final String message) {
+ super(message);
+ }
+
+ public RejectRulesException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public RejectRulesException(final Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/datastore/WrongVersionException.java b/src/main/java/net/onrc/onos/datastore/WrongVersionException.java
new file mode 100644
index 0000000..9b59ff1
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/WrongVersionException.java
@@ -0,0 +1,31 @@
+package net.onrc.onos.datastore;
+
+import net.onrc.onos.datastore.utils.ByteArrayUtil;
+
+/**
+ * Exception thrown when conditional operation failed due to version mismatch.
+ */
+public class WrongVersionException extends RejectRulesException {
+ private static final long serialVersionUID = -1644202495890190823L;
+
+ public WrongVersionException(final String message) {
+ super(message);
+ }
+
+ public WrongVersionException(final IKVTableID tableID, final byte[] key,
+ final long expectedVersion, final Throwable cause) {
+ // It will be best if {@code cause} has actual version encountered, but
+ // doesn't currently.
+ super(ByteArrayUtil.toHexStringBuffer(key, ":") + " on table:"
+ + tableID + " was expected to be version:" + expectedVersion,
+ cause);
+ }
+
+ public WrongVersionException(final IKVTableID tableID, final byte[] key,
+ final long expectedVersion, final long encounteredVersion) {
+ super(ByteArrayUtil.toHexStringBuffer(key, ":") + " on table:"
+ + tableID + " was expected to be version:" + expectedVersion
+ + " but found:" + encounteredVersion);
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/internal/IModifiableMultiEntryOperation.java b/src/main/java/net/onrc/onos/datastore/internal/IModifiableMultiEntryOperation.java
new file mode 100644
index 0000000..8a23330
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/internal/IModifiableMultiEntryOperation.java
@@ -0,0 +1,44 @@
+package net.onrc.onos.datastore.internal;
+
+import net.onrc.onos.datastore.IMultiEntryOperation;
+
+/**
+ * Interface for backend to realize IMultiEntryOperation.
+ *
+ * Backend implementation must use these interfaces to update IMultiEntryOperation
+ * in order to support KVObject.
+ */
+public interface IModifiableMultiEntryOperation extends IMultiEntryOperation {
+
+ /**
+ * Set value and version.
+ *
+ * Expected to be called on multiRead implementations.
+ * @param value
+ * @param version
+ */
+ public void setValue(final byte[] value, final long version);
+
+ /**
+ * Update version of the value.
+ *
+ * Expected to be called on multiWrite, multiRead implementations.
+ * @param version
+ */
+ public void setVersion(long version);
+
+ /**
+ * Update status.
+ *
+ * Backend implementation is expected to update to SUCCESS or FAILED after
+ * datastore operation.
+ * @param status
+ */
+ public void setStatus(STATUS status);
+
+ /**
+ * Return actual IModifiableMultiEntryOperation if is a wrapper, this otherwise.
+ * @return actual IModifiableMultiEntryOperation directly interact with data store
+ */
+ public IModifiableMultiEntryOperation getActualOperation();
+}
diff --git a/src/main/java/net/onrc/onos/datastore/ramcloud/RCClient.java b/src/main/java/net/onrc/onos/datastore/ramcloud/RCClient.java
new file mode 100644
index 0000000..0f18d9c
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/ramcloud/RCClient.java
@@ -0,0 +1,572 @@
+package net.onrc.onos.datastore.ramcloud;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import net.onrc.onos.datastore.IKVClient;
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.datastore.IKVTableID;
+import net.onrc.onos.datastore.IMultiEntryOperation;
+import net.onrc.onos.datastore.IMultiEntryOperation.STATUS;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.internal.IModifiableMultiEntryOperation;
+import net.onrc.onos.datastore.ramcloud.RCTable.Entry;
+import net.onrc.onos.datastore.utils.ByteArrayUtil;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.stanford.ramcloud.JRamCloud;
+import edu.stanford.ramcloud.JRamCloud.MultiReadObject;
+import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
+import edu.stanford.ramcloud.JRamCloud.MultiWriteRspObject;
+import edu.stanford.ramcloud.JRamCloud.RejectRules;
+import edu.stanford.ramcloud.JRamCloud.RejectRulesException;
+import edu.stanford.ramcloud.JRamCloud.TableEnumerator2;
+
+public class RCClient implements IKVClient {
+ private static final Logger log = LoggerFactory.getLogger(RCClient.class);
+
+ private static final String DB_CONFIG_FILE = "conf/ramcloud.conf";
+ public static final Configuration config = getConfiguration();
+
+ // Value taken from RAMCloud's Status.h
+ // FIXME These constants should be defined by JRamCloud
+ public static final int STATUS_OK = 0;
+
+ // FIXME come up with a proper way to retrieve configuration
+ public static final int MAX_MULTI_READS = Math.max(1, Integer
+ .valueOf(System.getProperty("ramcloud.max_multi_reads", "400")));
+
+ public static final int MAX_MULTI_WRITES = Math.max(1, Integer
+ .valueOf(System.getProperty("ramcloud.max_multi_writes", "800")));
+
+ private static final ThreadLocal<JRamCloud> tlsRCClient = new ThreadLocal<JRamCloud>() {
+ @Override
+ protected JRamCloud initialValue() {
+ return new JRamCloud(getCoordinatorUrl(config));
+ }
+ };
+
+ /**
+ * @return JRamCloud instance intended to be used only within the
+ * SameThread.
+ * @note Do not store the returned instance in a member variable, etc. which
+ * may be accessed later by another thread.
+ */
+ static JRamCloud getJRamCloudClient() {
+ return tlsRCClient.get();
+ }
+
+ // Currently RCClient is state-less
+ private static final RCClient theInstance= new RCClient();
+
+ public static RCClient getClient() {
+ return theInstance;
+ }
+
+ public static final Configuration getConfiguration() {
+ final File configFile = new File(System.getProperty("ramcloud.config.path", DB_CONFIG_FILE));
+ return getConfiguration(configFile);
+ }
+
+ public static final Configuration getConfiguration(final File configFile) {
+ if (configFile == null) {
+ throw new IllegalArgumentException("Need to specify a configuration file or storage directory");
+ }
+
+ if (!configFile.isFile()) {
+ throw new IllegalArgumentException("Location of configuration must be a file");
+ }
+
+ try {
+ return new PropertiesConfiguration(configFile);
+ } catch (ConfigurationException e) {
+ throw new IllegalArgumentException("Could not load configuration at: " + configFile, e);
+ }
+ }
+
+ public static String getCoordinatorUrl(final Configuration configuration) {
+ final String coordinatorIp = configuration.getString("ramcloud.coordinatorIp", "fast+udp:host=127.0.0.1");
+ final String coordinatorPort = configuration.getString("ramcloud.coordinatorPort", "port=12246");
+ final String coordinatorURL = coordinatorIp + "," + coordinatorPort;
+ return coordinatorURL;
+ }
+
+ @Override
+ public IMultiEntryOperation createOp(IKVTableID tableId, byte[] key, byte[] value) {
+ return RCMultiEntryOperation.create(tableId, key, value);
+ }
+
+ /**
+ * @param tableId RCTableID instance
+ */
+ @Override
+ public long create(IKVTableID tableId, byte[] key, byte[] value)
+ throws ObjectExistsException {
+
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ RejectRules rules = new RejectRules();
+ rules.rejectIfExists();
+
+ try {
+ return rcClient.write(rcTableId.getTableID(), key, value, rules);
+ } catch (JRamCloud.ObjectExistsException e) {
+ throw new ObjectExistsException(rcTableId, key, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return JRamCloud.VERSION_NONEXISTENT;
+ }
+ }
+
+ @Override
+ public IMultiEntryOperation forceCreateOp(IKVTableID tableId, byte[] key, byte[] value) {
+ return RCMultiEntryOperation.forceCreate(tableId, key, value);
+ }
+
+ @Override
+ public long forceCreate(IKVTableID tableId, byte[] key, byte[] value) {
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ long updated_version = rcClient.write(rcTableId.getTableID(), key, value);
+ return updated_version;
+ }
+
+ @Override
+ public IMultiEntryOperation readOp(IKVTableID tableId, byte[] key) {
+ return RCMultiEntryOperation.read(tableId, key);
+ }
+
+ @Override
+ public IKVEntry read(IKVTableID tableId, byte[] key)
+ throws ObjectDoesntExistException {
+
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
+ try {
+ JRamCloud.Object rcObj = rcClient.read(rcTableId.getTableID(), key, rules);
+ return new Entry(rcObj.key, rcObj.value, rcObj.version);
+ } catch (JRamCloud.ObjectDoesntExistException e) {
+ throw new ObjectDoesntExistException(rcTableId, key, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return null;
+ }
+ }
+
+ @Override
+ public IMultiEntryOperation updateOp(IKVTableID tableId, byte[] key, byte[] value,long version) {
+ return RCMultiEntryOperation.update(tableId, key, value, version);
+ }
+
+ @Override
+ public long update(IKVTableID tableId, byte[] key, byte[] value,
+ long version) throws ObjectDoesntExistException,
+ WrongVersionException {
+
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
+ rules.rejectIfNeVersion(version);
+
+ try {
+ return rcClient.write(rcTableId.getTableID(), key, value, rules);
+ } catch (JRamCloud.ObjectDoesntExistException e) {
+ throw new ObjectDoesntExistException(rcTableId, key, e);
+ } catch (JRamCloud.WrongVersionException e) {
+ throw new WrongVersionException(rcTableId, key, version, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return JRamCloud.VERSION_NONEXISTENT;
+ }
+ }
+
+
+ @Override
+ public long update(IKVTableID tableId, byte[] key, byte[] value)
+ throws ObjectDoesntExistException {
+
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
+
+ try {
+ return rcClient.write(rcTableId.getTableID(), key, value, rules);
+ } catch (JRamCloud.ObjectDoesntExistException e) {
+ throw new ObjectDoesntExistException(rcTableId, key, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return JRamCloud.VERSION_NONEXISTENT;
+ }
+ }
+
+ @Override
+ public IMultiEntryOperation deleteOp(IKVTableID tableId, byte[] key, byte[] value,long version) {
+ return RCMultiEntryOperation.delete(tableId, key, value, version);
+ }
+
+ @Override
+ public long delete(IKVTableID tableId, byte[] key, long version)
+ throws ObjectDoesntExistException, WrongVersionException {
+
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
+ rules.rejectIfNeVersion(version);
+
+ try {
+ return rcClient.remove(rcTableId.getTableID(), key, rules);
+ } catch (JRamCloud.ObjectDoesntExistException e) {
+ throw new ObjectDoesntExistException(rcTableId, key, e);
+ } catch (JRamCloud.WrongVersionException e) {
+ throw new WrongVersionException(rcTableId, key, version, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return JRamCloud.VERSION_NONEXISTENT;
+ }
+ }
+
+ @Override
+ public IMultiEntryOperation forceDeleteOp(IKVTableID tableId, byte[] key) {
+ return RCMultiEntryOperation.forceDelete(tableId, key);
+ }
+
+ @Override
+ public long forceDelete(IKVTableID tableId, byte[] key) {
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+ long removed_version = rcClient.remove(rcTableId.getTableID(), key);
+ return removed_version;
+ }
+
+ @Override
+ public Iterable<IKVEntry> getAllEntries(IKVTableID tableId) {
+ return new RCTableEntryIterable((RCTableID) tableId);
+ }
+
+ static class RCTableEntryIterable implements Iterable<IKVEntry> {
+ private final RCTableID tableId;
+
+ public RCTableEntryIterable(final RCTableID tableId) {
+ this.tableId = tableId;
+ }
+
+ @Override
+ public Iterator<IKVEntry> iterator() {
+ return new RCClient.RCTableIterator(tableId);
+ }
+ }
+
+ public static class RCTableIterator implements Iterator<IKVEntry> {
+ private final RCTableID tableId;
+ protected final TableEnumerator2 enumerator;
+ private JRamCloud.Object last;
+
+ public RCTableIterator(final RCTableID tableId) {
+ this.tableId = tableId;
+ this.enumerator = getJRamCloudClient().new TableEnumerator2(tableId.getTableID());
+ this.last = null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.enumerator.hasNext();
+ }
+
+ @Override
+ public RCTable.Entry next() {
+ last = enumerator.next();
+ return new RCTable.Entry(last.key, last.value, last.version);
+ }
+
+ @Override
+ public void remove() {
+ if (last != null) {
+ getJRamCloudClient();
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ RejectRules rules = new RejectRules();
+ rules.rejectIfNeVersion(last.version);
+ try {
+ rcClient.remove(tableId.getTableID(), last.key, rules);
+ } catch (RejectRulesException e) {
+ log.trace("remove failed", e);
+ }
+ last = null;
+ }
+ }
+ }
+
+ @Override
+ public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
+
+ if ( ops.size() <= MAX_MULTI_READS && ops instanceof ArrayList) {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final ArrayList<RCMultiEntryOperation> arrays = (ArrayList)ops;
+ return multiReadInternal(arrays);
+ }
+
+ boolean fail_exists = false;
+
+ ArrayList<RCMultiEntryOperation> req = new ArrayList<>();
+ Iterator<IMultiEntryOperation> it = ops.iterator();
+ while (it.hasNext()) {
+
+ req.add((RCMultiEntryOperation) it.next());
+
+ if (req.size() >= MAX_MULTI_READS) {
+ // dispatch multiRead
+ fail_exists |= multiReadInternal(req);
+ req.clear();
+ }
+ }
+
+ if (!req.isEmpty()) {
+ // dispatch multiRead
+ fail_exists |= multiReadInternal(req);
+ req.clear();
+ }
+
+ return fail_exists;
+ }
+
+ @Override
+ public boolean multiWrite(final List<IMultiEntryOperation> ops) {
+
+ if ( ops.size() <= MAX_MULTI_WRITES && ops instanceof ArrayList) {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final ArrayList<RCMultiEntryOperation> arrays = (ArrayList)ops;
+ return multiWriteInternal(arrays);
+ }
+
+ boolean fail_exists = false;
+
+ ArrayList<RCMultiEntryOperation> req = new ArrayList<>();
+ Iterator<IMultiEntryOperation> it = ops.iterator();
+ while (it.hasNext()) {
+
+ req.add((RCMultiEntryOperation) it.next());
+
+ if (req.size() >= MAX_MULTI_WRITES) {
+ // dispatch multiWrite
+ fail_exists |= multiWriteInternal(req);
+ req.clear();
+ }
+ }
+
+ if (!req.isEmpty()) {
+ // dispatch multiWrite
+ fail_exists |= multiWriteInternal(req);
+ req.clear();
+ }
+
+ return fail_exists;
+ }
+
+ @Override
+ public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
+
+ // TODO implement multiRemove JNI, etc. if we need performance
+
+ boolean fail_exists = false;
+ JRamCloud rcClient = getJRamCloudClient();
+
+ for (IMultiEntryOperation iop : ops) {
+ RCMultiEntryOperation op = (RCMultiEntryOperation)iop;
+ switch (op.getOperation()) {
+ case DELETE:
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
+ rules.rejectIfNeVersion(op.getVersion());
+
+ try {
+ long removed_version = rcClient.remove(op.tableId.getTableID(), op.entry.getKey(), rules);
+ op.entry.setVersion(removed_version);
+ op.status = STATUS.SUCCESS;
+ } catch (JRamCloud.ObjectDoesntExistException|JRamCloud.WrongVersionException e) {
+ log.error("Failed to remove key:" + ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), "") + " from tableID:" + op.tableId, e );
+ fail_exists = true;
+ op.status = STATUS.FAILED;
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Failed to remove key:" + ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), "") + " from tableID:" + op.tableId, e );
+ fail_exists = true;
+ op.status = STATUS.FAILED;
+ }
+ break;
+
+ case FORCE_DELETE:
+ long removed_version = rcClient.remove(op.tableId.getTableID(), op.entry.getKey());
+ if (removed_version != JRamCloud.VERSION_NONEXISTENT) {
+ op.entry.setVersion(removed_version);
+ op.status = STATUS.SUCCESS;
+ } else {
+ log.error("Failed to remove key:{} from tableID:{}", ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), ""), op.tableId );
+ fail_exists = true;
+ op.status = STATUS.FAILED;
+ }
+ break;
+
+ default:
+ log.error("Invalid operation {} specified on multiDelete", op.getOperation() );
+ fail_exists = true;
+ op.status = STATUS.FAILED;
+ break;
+ }
+ }
+ return fail_exists;
+ }
+
+ private boolean multiReadInternal(final ArrayList<RCMultiEntryOperation> ops) {
+ boolean fail_exists = false;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ final int reqs = ops.size();
+
+ MultiReadObject multiReadObjects = new MultiReadObject(reqs);
+
+ // setup multi-read operation objects
+ for (int i = 0; i < reqs; ++i) {
+ IMultiEntryOperation op = ops.get(i);
+ multiReadObjects.setObject(i, ((RCTableID)op.getTableId()).getTableID(), op.getKey());
+ }
+
+ // execute
+ JRamCloud.Object[] results = rcClient.multiRead(multiReadObjects.tableId, multiReadObjects.key, multiReadObjects.keyLength, reqs);
+ if (results.length != reqs) {
+ log.error("multiRead returned unexpected number of results. (requested:{}, returned:{})", reqs, results.length);
+ fail_exists = true;
+ }
+
+ for (int i = 0; i < results.length; ++i) {
+ IModifiableMultiEntryOperation op = ops.get(i);
+ if (results[i] == null) {
+ log.error("MultiRead error, skipping {}, {}", op.getTableId(), op);
+ fail_exists = true;
+ op.setStatus(STATUS.FAILED);
+ continue;
+ }
+ assert (Arrays.equals(results[i].key, op.getKey()));
+
+ op.setValue(results[i].value, results[i].version);
+ if (results[i].version == JRamCloud.VERSION_NONEXISTENT) {
+ fail_exists = true;
+ op.setStatus(STATUS.FAILED);
+ } else {
+ op.setStatus(STATUS.SUCCESS);
+ }
+ }
+
+ return fail_exists;
+ }
+
+ private boolean multiWriteInternal(final ArrayList<RCMultiEntryOperation> ops) {
+ boolean fail_exists = false;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ final int reqs = ops.size();
+
+ MultiWriteObject multiWriteObjects = new MultiWriteObject(reqs);
+
+ for (int i = 0; i < reqs; ++i) {
+
+ IModifiableMultiEntryOperation op = ops.get(i);
+ RejectRules rules = new RejectRules();
+
+ switch (op.getOperation()) {
+ case CREATE:
+ rules.rejectIfExists();
+ break;
+ case FORCE_CREATE:
+ // no reject rule
+ break;
+ case UPDATE:
+ rules.rejectIfDoesntExists();
+ rules.rejectIfNeVersion(op.getVersion());
+ break;
+
+ default:
+ log.error("Invalid operation {} specified on multiWriteInternal", op.getOperation() );
+ fail_exists = true;
+ op.setStatus(STATUS.FAILED);
+ return fail_exists;
+ }
+ multiWriteObjects.setObject(i, ((RCTableID)op.getTableId()).getTableID(), op.getKey(), op.getValue(), rules);
+ }
+
+ MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects.tableId, multiWriteObjects.key, multiWriteObjects.keyLength, multiWriteObjects.value, multiWriteObjects.valueLength, ops.size(), multiWriteObjects.rules);
+ if (results.length != reqs) {
+ log.error("multiWrite returned unexpected number of results. (requested:{}, returned:{})", reqs, results.length);
+ fail_exists = true;
+ }
+
+ for (int i = 0; i < results.length; ++i) {
+ IModifiableMultiEntryOperation op = ops.get(i);
+
+ if (results[i] != null
+ && results[i].getStatus() == RCClient.STATUS_OK) {
+ op.setStatus(STATUS.SUCCESS);
+ op.setVersion(results[i].getVersion());
+ } else {
+ op.setStatus(STATUS.FAILED);
+ fail_exists = true;
+ }
+ }
+
+ return fail_exists;
+ }
+
+ private static final ConcurrentHashMap<String, RCTable> tables = new ConcurrentHashMap<>();
+
+ @Override
+ public IKVTable getTable(final String tableName) {
+ RCTable table = tables.get(tableName);
+ if (table == null) {
+ RCTable new_table = new RCTable(tableName);
+ RCTable existing_table = tables
+ .putIfAbsent(tableName, new_table);
+ if (existing_table != null) {
+ return existing_table;
+ } else {
+ return new_table;
+ }
+ }
+ return table;
+ }
+
+ @Override
+ public void dropTable(IKVTable table) {
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+ rcClient.dropTable(table.getTableId().getTableName());
+ tables.remove(table.getTableId().getTableName());
+ }
+
+ static final long VERSION_NONEXISTENT = JRamCloud.VERSION_NONEXISTENT;
+
+ @Override
+ public long VERSION_NONEXISTENT() {
+ return VERSION_NONEXISTENT;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/datastore/ramcloud/RCMultiEntryOperation.java b/src/main/java/net/onrc/onos/datastore/ramcloud/RCMultiEntryOperation.java
new file mode 100644
index 0000000..0feec0a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/ramcloud/RCMultiEntryOperation.java
@@ -0,0 +1,115 @@
+package net.onrc.onos.datastore.ramcloud;
+
+import net.onrc.onos.datastore.IKVTableID;
+import net.onrc.onos.datastore.IMultiEntryOperation;
+import net.onrc.onos.datastore.internal.IModifiableMultiEntryOperation;
+import net.onrc.onos.datastore.ramcloud.RCTable.Entry;
+
+// FIXME move or extract this
+public class RCMultiEntryOperation implements IMultiEntryOperation, IModifiableMultiEntryOperation {
+ protected final RCTableID tableId;
+ protected final Entry entry;
+ protected final OPERATION operation;
+ protected STATUS status;
+
+ @Override
+ public boolean hasSucceeded() {
+ return this.status == STATUS.SUCCESS;
+ }
+
+ @Override
+ public STATUS getStatus() {
+ return status;
+ }
+
+ @Override
+ public IKVTableID getTableId() {
+ return tableId;
+ }
+
+ @Override
+ public byte[] getKey() {
+ return entry.key;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return entry.value;
+ }
+
+ @Override
+ public long getVersion() {
+ return entry.version;
+ }
+
+ @Override
+ public OPERATION getOperation() {
+ return operation;
+ }
+
+ @Override
+ public void setStatus(final STATUS status) {
+ this.status = status;
+ }
+
+ @Override
+ public void setValue(byte[] value, final long version) {
+ this.entry.setValue(value);
+ setVersion(version);
+ }
+
+ @Override
+ public void setVersion(final long version) {
+ this.entry.setVersion(version);
+ }
+
+
+ public RCMultiEntryOperation(final IKVTableID tableId, final Entry entry, final OPERATION operation) {
+ this.tableId = (RCTableID) tableId;
+ this.operation = operation;
+
+ this.entry = entry;
+ this.status = STATUS.NOT_EXECUTED;
+ }
+
+ public static IMultiEntryOperation create(final IKVTableID tableId, final byte[] key, final byte[] value) {
+ return new RCMultiEntryOperation(tableId, new Entry(key,value, RCClient.VERSION_NONEXISTENT), OPERATION.CREATE);
+ }
+
+ public static IMultiEntryOperation forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
+ return new RCMultiEntryOperation(tableId, new Entry(key,value, RCClient.VERSION_NONEXISTENT), OPERATION.FORCE_CREATE);
+ }
+
+ /**
+ * Constructor for READ operation.
+ *
+ * @param tableId table to read from
+ * @param key key of an Entry to read
+ */
+ public static IMultiEntryOperation read(final IKVTableID tableId, final byte[] key) {
+ return new RCMultiEntryOperation(tableId, new Entry(key), OPERATION.READ);
+ }
+
+ public static IMultiEntryOperation update(final IKVTableID tableId, final byte[] key, final byte[] value, final long version) {
+ return new RCMultiEntryOperation(tableId, new Entry(key,value, version), OPERATION.UPDATE);
+ }
+
+ public static IMultiEntryOperation delete(final IKVTableID tableId, final byte[] key, final byte[] value, final long version) {
+ return new RCMultiEntryOperation(tableId, new Entry(key,value, version), OPERATION.DELETE);
+ }
+
+ public static IMultiEntryOperation forceDelete(final IKVTableID tableId, final byte[] key) {
+ return new RCMultiEntryOperation(tableId, new Entry(key), OPERATION.FORCE_DELETE);
+ }
+
+ @Override
+ public IModifiableMultiEntryOperation getActualOperation() {
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "[RCMultiEntryOperation tableId=" + tableId + ", entry=" + entry
+ + ", operation=" + operation + ", status=" + status + "]";
+ }
+}
diff --git a/src/main/java/net/onrc/onos/datastore/ramcloud/RCTable.java b/src/main/java/net/onrc/onos/datastore/ramcloud/RCTable.java
new file mode 100644
index 0000000..f823eae
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/ramcloud/RCTable.java
@@ -0,0 +1,134 @@
+package net.onrc.onos.datastore.ramcloud;
+
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.IKVTableID;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to represent a Table in RAMCloud
+ */
+public class RCTable implements IKVTable {
+ @SuppressWarnings("unused")
+ private static final Logger log = LoggerFactory.getLogger(RCTable.class);
+
+ public static class Entry implements IKVEntry {
+ final byte[] key;
+ byte[] value;
+ long version;
+
+ public Entry(final byte[] key, final byte[] value, final long version) {
+ this.key = key;
+ this.setValue(value);
+ this.setVersion(version);
+ }
+
+ public Entry(final byte[] key) {
+ this(key, null, RCClient.VERSION_NONEXISTENT);
+ }
+
+ @Override
+ public byte[] getKey() {
+ return key;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return value;
+ }
+
+ @Override
+ public long getVersion() {
+ return version;
+ }
+
+ void setValue(byte[] value) {
+ this.value = value;
+ }
+
+ void setVersion(long version) {
+ this.version = version;
+ }
+ }
+
+ private final RCTableID rcTableId;
+
+ /**
+ *
+ * {@code rcTableName} must be unique cluster wide.
+ * @param rcTableName RAMCloud table name
+ */
+ RCTable(final String rcTableName) {
+ this.rcTableId = new RCTableID(rcTableName);
+
+ // Trigger RAMCloud ID allocation. If lazy allocation is OK, remove.
+ this.rcTableId.getTableID();
+ }
+
+ @Override
+ public IKVTableID getTableId() {
+ return this.rcTableId;
+ }
+
+ public String getTableName() {
+ return this.rcTableId.getTableName();
+ }
+
+ @Override
+ public long create(final byte[] key, final byte[] value)
+ throws ObjectExistsException {
+
+ return RCClient.getClient().create(this.rcTableId, key, value);
+ }
+
+ @Override
+ public long forceCreate(final byte[] key, final byte[] value) {
+ return RCClient.getClient().forceCreate(rcTableId, key, value);
+ }
+
+ @Override
+ public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
+ return RCClient.getClient().read(rcTableId, key);
+ }
+
+ @Override
+ public long update(final byte[] key, final byte[] value, final long version)
+ throws ObjectDoesntExistException, WrongVersionException {
+
+ return RCClient.getClient().update(rcTableId, key, value, version);
+ }
+
+ @Override
+ public long update(final byte[] key, final byte[] value)
+ throws ObjectDoesntExistException {
+
+ return RCClient.getClient().update(rcTableId, key, value);
+ }
+
+ @Override
+ public long delete(final byte[] key, final long version)
+ throws ObjectDoesntExistException, WrongVersionException {
+
+ return RCClient.getClient().delete(rcTableId, key, version);
+ }
+
+ @Override
+ public long forceDelete(final byte[] key) {
+ return RCClient.getClient().forceDelete(rcTableId, key);
+ }
+
+ @Override
+ public Iterable<IKVEntry> getAllEntries() {
+ return RCClient.getClient().getAllEntries(this.getTableId());
+ }
+
+ @Override
+ public long VERSION_NONEXISTENT() {
+ return RCClient.VERSION_NONEXISTENT;
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/ramcloud/RCTableID.java b/src/main/java/net/onrc/onos/datastore/ramcloud/RCTableID.java
new file mode 100644
index 0000000..5fa56c2
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/ramcloud/RCTableID.java
@@ -0,0 +1,57 @@
+package net.onrc.onos.datastore.ramcloud;
+
+import java.util.Objects;
+
+import net.onrc.onos.datastore.IKVTableID;
+
+public class RCTableID implements IKVTableID {
+ private final String tableName;
+ private long tableID;
+
+ public RCTableID(String tableName) {
+ this.tableName = tableName;
+ this.tableID = 0;
+ }
+
+ @Override
+ public String getTableName() {
+ return tableName;
+ }
+
+ // following is RAMCloud specific
+
+ public long getTableID() {
+ if ( tableID != 0) {
+ return tableID;
+ }
+ tableID = RCClient.getJRamCloudClient().createTable(tableName);
+ return tableID;
+ }
+
+ void resetTableID() {
+ this.tableID = 0;
+ }
+
+ @Override
+ public String toString() {
+ return "["+tableName + "]@" + getTableID();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableName, getTableID());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ RCTableID other = (RCTableID) obj;
+ return Objects.equals(tableName, other.tableName)
+ && Objects.equals(getTableID(), other.getTableID());
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/net/onrc/onos/datastore/topology/KVDevice.java b/src/main/java/net/onrc/onos/datastore/topology/KVDevice.java
new file mode 100644
index 0000000..105f6f7
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/topology/KVDevice.java
@@ -0,0 +1,202 @@
+package net.onrc.onos.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import net.onrc.onos.datastore.DataStoreClient;
+import net.onrc.onos.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.datastore.topology.KVLink.STATUS;
+import net.onrc.onos.datastore.utils.ByteArrayComparator;
+import net.onrc.onos.datastore.utils.ByteArrayUtil;
+import net.onrc.onos.datastore.utils.KVObject;
+import net.onrc.onos.ofcontroller.networkgraph.DeviceEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+
+/**
+ * Device object.
+ *
+ * TODO switch to ProtoBuf, etc.
+ */
+public class KVDevice extends KVObject {
+ private static final Logger log = LoggerFactory.getLogger(KVDevice.class);
+
+ private static final ThreadLocal<Kryo> deviceKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
+ };
+
+ public static final String GLOBAL_DEVICE_TABLE_NAME = "G:Device";
+
+ // FIXME these should be Enum or some number, not String
+ private static final String PROP_MAC = "mac";
+ private static final String PROP_PORT_IDS = "port-ids";
+
+ private final byte[] mac;
+ private TreeSet<byte[]> portIds;
+ private transient boolean isPortIdsModified;
+
+ // Assuming mac is unique cluster-wide
+ public static byte[] getDeviceID(final byte[] mac) {
+ return DeviceEvent.getDeviceID(mac).array();
+ }
+
+ public static byte[] getMacFromKey(final byte[] key) {
+ ByteBuffer keyBuf = ByteBuffer.wrap(key);
+ if (keyBuf.getChar() != 'D') {
+ throw new IllegalArgumentException("Invalid Device key");
+ }
+ byte[] mac = new byte[keyBuf.remaining()];
+ keyBuf.get(mac);
+ return mac;
+ }
+
+ public KVDevice(final byte[] mac) {
+ super(DataStoreClient.getClient().getTable(GLOBAL_DEVICE_TABLE_NAME), getDeviceID(mac));
+
+ this.mac = mac;
+ this.portIds = new TreeSet<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ this.isPortIdsModified = true;
+ }
+
+ /**
+ * Get an instance from Key.
+ *
+ * @note You need to call `read()` to get the DB content.
+ * @param key
+ * @return
+ */
+ public static KVDevice createFromKey(final byte[] key) {
+ return new KVDevice(getMacFromKey(key));
+ }
+
+ public static Iterable<KVDevice> getAllDevices() {
+ return new DeviceEnumerator();
+ }
+
+ public static class DeviceEnumerator implements Iterable<KVDevice> {
+
+ @Override
+ public Iterator<KVDevice> iterator() {
+ return new DeviceIterator();
+ }
+ }
+
+ public static class DeviceIterator extends AbstractObjectIterator<KVDevice> {
+
+ public DeviceIterator() {
+ super(DataStoreClient.getClient().getTable(GLOBAL_DEVICE_TABLE_NAME));
+ }
+
+ @Override
+ public KVDevice next() {
+ IKVEntry o = enumerator.next();
+ KVDevice e = KVDevice.createFromKey(o.getKey());
+ e.deserialize(o.getValue(), o.getVersion());
+ return e;
+ }
+ }
+
+ public byte[] getMac() {
+ // TODO may need to clone() to be sure this object will be immutable.
+ return mac;
+ }
+
+ public byte[] getId() {
+ return getKey();
+ }
+
+ public void addPortId(final byte[] portId) {
+ // TODO: Should we copy portId, or reference is OK.
+ isPortIdsModified |= portIds.add(portId);
+ }
+
+ public void removePortId(final byte[] portId) {
+ isPortIdsModified |= portIds.remove(portId);
+ }
+
+ public void emptyPortIds() {
+ portIds.clear();
+ this.isPortIdsModified = true;
+ }
+
+ public void addAllToPortIds(final Collection<byte[]> portIds) {
+ // TODO: Should we copy portId, or reference is OK.
+ isPortIdsModified |= this.portIds.addAll(portIds);
+ }
+
+ /**
+ *
+ * @return Unmodifiable Set view of all the PortIds;
+ */
+ public Set<byte[]> getAllPortIds() {
+ return Collections.unmodifiableSet(portIds);
+ }
+
+ @Override
+ public byte[] serialize() {
+ Map<Object, Object> map = getPropertyMap();
+
+ map.put(PROP_MAC, mac);
+ if (isPortIdsModified) {
+ byte[][] portIdArray = new byte[portIds.size()][];
+ map.put(PROP_PORT_IDS, portIds.toArray(portIdArray));
+ isPortIdsModified = false;
+ }
+
+ return serializePropertyMap(deviceKryo.get(), map);
+ }
+
+ @Override
+ protected boolean deserialize(final byte[] bytes) {
+ boolean success = deserializePropertyMap(deviceKryo.get(), bytes);
+ if (!success) {
+ log.error("Deserializing Link: " + this + " failed.");
+ return false;
+ }
+ Map<Object, Object> map = this.getPropertyMap();
+
+ if (this.portIds == null) {
+ this.portIds = new TreeSet<>(
+ ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ }
+ byte[][] portIdArray = (byte[][]) map.get(PROP_PORT_IDS);
+ if (portIdArray != null) {
+ this.portIds.clear();
+ this.portIds.addAll(Arrays.asList(portIdArray));
+ isPortIdsModified = false;
+ } else {
+ // trigger write on next serialize
+ isPortIdsModified = true;
+ }
+
+ return success;
+ }
+
+ @Override
+ public String toString() {
+ // TODO output all properties?
+ return "[" + this.getClass().getSimpleName()
+ + " " + ByteArrayUtil.toHexStringBuffer(mac, ":") + "]";
+ }
+}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/KVLink.java b/src/main/java/net/onrc/onos/datastore/topology/KVLink.java
new file mode 100644
index 0000000..c9273ae
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/topology/KVLink.java
@@ -0,0 +1,217 @@
+package net.onrc.onos.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import net.onrc.onos.datastore.DataStoreClient;
+import net.onrc.onos.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.datastore.RCProtos.LinkProperty;
+import net.onrc.onos.datastore.utils.KVObject;
+import net.onrc.onos.ofcontroller.networkgraph.LinkEvent;
+import net.onrc.onos.ofcontroller.networkgraph.PortEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Link object in data store.
+ */
+public class KVLink extends KVObject {
+ private static final Logger log = LoggerFactory.getLogger(KVLink.class);
+
+ private static final ThreadLocal<Kryo> linkKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
+ };
+
+ public static class SwitchPort {
+ public final Long dpid;
+ public final Long number;
+
+ public SwitchPort(final Long dpid, final Long number) {
+ this.dpid = dpid;
+ this.number = number;
+ }
+
+ public byte[] getPortID() {
+ return KVPort.getPortID(dpid, number);
+ }
+
+ public byte[] getSwitchID() {
+ return KVSwitch.getSwitchID(dpid);
+ }
+
+ @Override
+ public String toString() {
+ return "(" + Long.toHexString(dpid) + "@" + number + ")";
+ }
+
+ }
+
+ public static final String GLOBAL_LINK_TABLE_NAME = "G:Link";
+
+ // must not re-order enum members, ordinal will be sent over wire
+ public enum STATUS {
+ INACTIVE, ACTIVE;
+ }
+
+ private final SwitchPort src;
+ private final SwitchPort dst;
+ private STATUS status;
+
+ public static byte[] getLinkID(final Long src_dpid, final Long src_port_no,
+ final Long dst_dpid, final Long dst_port_no) {
+ return LinkEvent.getLinkID(src_dpid, src_port_no, dst_dpid,
+ dst_port_no).array();
+ }
+
+ public static long[] getLinkTupleFromKey(final byte[] key) {
+ return getLinkTupleFromKey(ByteBuffer.wrap(key));
+ }
+
+ public static long[] getLinkTupleFromKey(final ByteBuffer keyBuf) {
+ long[] tuple = new long[4];
+ if (keyBuf.getChar() != 'L') {
+ throw new IllegalArgumentException("Invalid Link key");
+ }
+ long[] src_port_pair = KVPort.getPortPairFromKey(keyBuf.slice());
+ keyBuf.position(2 + PortEvent.PORTID_BYTES);
+ long[] dst_port_pair = KVPort.getPortPairFromKey(keyBuf.slice());
+
+ tuple[0] = src_port_pair[0];
+ tuple[1] = src_port_pair[1];
+ tuple[2] = dst_port_pair[0];
+ tuple[3] = dst_port_pair[1];
+
+ return tuple;
+ }
+
+ public KVLink(final Long src_dpid, final Long src_port_no,
+ final Long dst_dpid, final Long dst_port_no) {
+ super(DataStoreClient.getClient().getTable(GLOBAL_LINK_TABLE_NAME), getLinkID(src_dpid,
+ src_port_no, dst_dpid, dst_port_no));
+
+ src = new SwitchPort(src_dpid, src_port_no);
+ dst = new SwitchPort(dst_dpid, dst_port_no);
+ status = STATUS.INACTIVE;
+ }
+
+ /**
+ * Get an instance from Key.
+ *
+ * @note You need to call `read()` to get the DB content.
+ * @param key
+ * @return KVLink instance
+ */
+ public static KVLink createFromKey(final byte[] key) {
+ long[] linkTuple = getLinkTupleFromKey(key);
+ return new KVLink(linkTuple[0], linkTuple[1], linkTuple[2],
+ linkTuple[3]);
+ }
+
+ public static Iterable<KVLink> getAllLinks() {
+ return new LinkEnumerator();
+ }
+
+ public static class LinkEnumerator implements Iterable<KVLink> {
+
+ @Override
+ public Iterator<KVLink> iterator() {
+ return new LinkIterator();
+ }
+ }
+
+ public static class LinkIterator extends AbstractObjectIterator<KVLink> {
+
+ public LinkIterator() {
+ super(DataStoreClient.getClient().getTable(GLOBAL_LINK_TABLE_NAME));
+ }
+
+ @Override
+ public KVLink next() {
+ IKVEntry o = enumerator.next();
+ KVLink e = KVLink.createFromKey(o.getKey());
+ e.deserialize(o.getValue(), o.getVersion());
+ return e;
+ }
+ }
+
+ public STATUS getStatus() {
+ return status;
+ }
+
+ public void setStatus(final STATUS status) {
+ this.status = status;
+ }
+
+ public SwitchPort getSrc() {
+ return src;
+ }
+
+ public SwitchPort getDst() {
+ return dst;
+ }
+
+ public byte[] getId() {
+ return getKey();
+ }
+
+ @Override
+ public byte[] serialize() {
+ Map<Object, Object> map = getPropertyMap();
+
+ LinkProperty.Builder link = LinkProperty.newBuilder();
+ link.setSrcSwId(ByteString.copyFrom(src.getSwitchID()));
+ link.setSrcPortId(ByteString.copyFrom(src.getPortID()));
+ link.setDstSwId(ByteString.copyFrom(dst.getSwitchID()));
+ link.setDstPortId(ByteString.copyFrom(dst.getPortID()));
+ link.setStatus(status.ordinal());
+
+ if (!map.isEmpty()) {
+ byte[] propMaps = serializePropertyMap(linkKryo.get(), map);
+ link.setValue(ByteString.copyFrom(propMaps));
+ }
+
+ return link.build().toByteArray();
+ }
+
+ @Override
+ protected boolean deserialize(final byte[] bytes) {
+ try {
+ boolean success = true;
+
+ LinkProperty link = LinkProperty.parseFrom(bytes);
+ byte[] props = link.getValue().toByteArray();
+ success &= deserializePropertyMap(linkKryo.get(), props);
+ this.status = STATUS.values()[link.getStatus()];
+
+ return success;
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Deserializing Link: " + this + " failed.", e);
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ // TODO output all properties?
+ return "[" + this.getClass().getSimpleName()
+ + " " + src + "->" + dst + " STATUS:" + status + "]";
+ }
+}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/KVPort.java b/src/main/java/net/onrc/onos/datastore/topology/KVPort.java
new file mode 100644
index 0000000..3b66864
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/topology/KVPort.java
@@ -0,0 +1,205 @@
+package net.onrc.onos.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import net.onrc.onos.datastore.DataStoreClient;
+import net.onrc.onos.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.datastore.RCProtos.PortProperty;
+import net.onrc.onos.datastore.utils.ByteArrayUtil;
+import net.onrc.onos.datastore.utils.KVObject;
+import net.onrc.onos.ofcontroller.networkgraph.PortEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Port object in data store.
+ *
+ * Note: This class will not maintain invariants.
+ * e.g., It will NOT automatically remove Links or Devices on Port,
+ * when deleting a Port.
+ */
+public class KVPort extends KVObject {
+ private static final Logger log = LoggerFactory.getLogger(KVPort.class);
+
+ private static final ThreadLocal<Kryo> portKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
+ };
+
+ public static final String GLOBAL_PORT_TABLE_NAME = "G:Port";
+
+ // must not re-order enum members, ordinal will be sent over wire
+ public enum STATUS {
+ INACTIVE, ACTIVE;
+ }
+
+ private final Long dpid;
+ private final Long number;
+
+ private STATUS status;
+
+ public static byte[] getPortID(final Long dpid, final Long number) {
+ return PortEvent.getPortID(dpid, number).array();
+ }
+
+ public static long[] getPortPairFromKey(final byte[] key) {
+ return getPortPairFromKey(ByteBuffer.wrap(key));
+ }
+
+ public static long[] getPortPairFromKey(final ByteBuffer keyBuf) {
+ long[] pair = new long[2];
+ if (keyBuf.getChar() != 'S') {
+ throw new IllegalArgumentException("Invalid Port key:" + keyBuf
+ + " "
+ + ByteArrayUtil.toHexStringBuffer(keyBuf.array(), ":"));
+ }
+ pair[0] = keyBuf.getLong();
+ if (keyBuf.getChar() != 'P') {
+ throw new IllegalArgumentException("Invalid Port key:" + keyBuf
+ + " "
+ + ByteArrayUtil.toHexStringBuffer(keyBuf.array(), ":"));
+ }
+ pair[1] = keyBuf.getLong();
+ return pair;
+
+ }
+
+ public static long getDpidFromKey(final byte[] key) {
+ return getPortPairFromKey(key)[0];
+ }
+
+ public static long getNumberFromKey(final byte[] key) {
+ return getPortPairFromKey(key)[1];
+ }
+
+ // FIXME specify DPID,number here, or Should caller specify the key it self?
+ // In other words, should layer above have the control of the ID?
+ public KVPort(final Long dpid, final Long number) {
+ super(DataStoreClient.getClient().getTable(GLOBAL_PORT_TABLE_NAME), getPortID(dpid, number));
+
+ // TODO Auto-generated constructor stub
+
+ this.dpid = dpid;
+ this.number = number;
+ this.status = STATUS.INACTIVE;
+ }
+
+ /**
+ * Get an instance from Key.
+ *
+ * @note You need to call `read()` to get the DB content.
+ * @param key
+ * @return KVPort instance
+ */
+ public static KVPort createFromKey(final byte[] key) {
+ long[] pair = getPortPairFromKey(key);
+ return new KVPort(pair[0], pair[1]);
+ }
+
+ public static Iterable<KVPort> getAllPorts() {
+ return new PortEnumerator();
+ }
+
+ public static class PortEnumerator implements Iterable<KVPort> {
+
+ @Override
+ public Iterator<KVPort> iterator() {
+ return new PortIterator();
+ }
+ }
+
+ public static class PortIterator extends AbstractObjectIterator<KVPort> {
+
+ public PortIterator() {
+ super(DataStoreClient.getClient().getTable(GLOBAL_PORT_TABLE_NAME));
+ }
+
+ @Override
+ public KVPort next() {
+ IKVEntry o = enumerator.next();
+ KVPort e = KVPort.createFromKey(o.getKey());
+ e.deserialize(o.getValue(), o.getVersion());
+ return e;
+ }
+ }
+
+ public STATUS getStatus() {
+ return status;
+ }
+
+ public void setStatus(final STATUS status) {
+ this.status = status;
+ }
+
+ public Long getDpid() {
+ return dpid;
+ }
+
+ public Long getNumber() {
+ return number;
+ }
+
+ public byte[] getId() {
+ return getKey();
+ }
+
+ @Override
+ public byte[] serialize() {
+ Map<Object, Object> map = getPropertyMap();
+
+ PortProperty.Builder port = PortProperty.newBuilder();
+ port.setDpid(dpid);
+ port.setNumber(number);
+ port.setStatus(status.ordinal());
+
+ if (!map.isEmpty()) {
+ byte[] propMaps = serializePropertyMap(portKryo.get(), map);
+ port.setValue(ByteString.copyFrom(propMaps));
+ }
+
+ return port.build().toByteArray();
+ }
+
+ @Override
+ protected boolean deserialize(final byte[] bytes) {
+ try {
+ boolean success = true;
+
+ PortProperty port = PortProperty.parseFrom(bytes);
+ byte[] props = port.getValue().toByteArray();
+ success &= deserializePropertyMap(portKryo.get(), props);
+ this.status = STATUS.values()[port.getStatus()];
+
+ return success;
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Deserializing Port: " + this + " failed.", e);
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ // TODO output all properties?
+ return "[" + this.getClass().getSimpleName()
+ + " 0x" + Long.toHexString(dpid) + "@" + number
+ + " STATUS:" + status + "]";
+ }
+}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/KVSwitch.java b/src/main/java/net/onrc/onos/datastore/topology/KVSwitch.java
new file mode 100644
index 0000000..e915160
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/topology/KVSwitch.java
@@ -0,0 +1,174 @@
+package net.onrc.onos.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import net.onrc.onos.datastore.DataStoreClient;
+import net.onrc.onos.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.datastore.RCProtos.SwitchProperty;
+import net.onrc.onos.datastore.utils.KVObject;
+import net.onrc.onos.ofcontroller.networkgraph.SwitchEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Switch object in data store.
+ *
+ * Note: This class will not maintain invariants.
+ * e.g., It will NOT automatically remove Ports on Switch,
+ * when deleting a Switch.
+ */
+public class KVSwitch extends KVObject {
+ private static final Logger log = LoggerFactory.getLogger(KVSwitch.class);
+
+ private static final ThreadLocal<Kryo> switchKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
+ };
+
+ public static final String GLOBAL_SWITCH_TABLE_NAME = "G:Switch";
+
+ // must not re-order enum members, ordinal will be sent over wire
+ public enum STATUS {
+ INACTIVE, ACTIVE;
+ }
+
+ private final Long dpid;
+ private STATUS status;
+
+ public static byte[] getSwitchID(final Long dpid) {
+ return SwitchEvent.getSwitchID(dpid).array();
+ }
+
+ public static long getDpidFromKey(final byte[] key) {
+ return getDpidFromKey(ByteBuffer.wrap(key));
+ }
+
+ public static long getDpidFromKey(final ByteBuffer keyBuf) {
+ if (keyBuf.getChar() != 'S') {
+ throw new IllegalArgumentException("Invalid Switch key");
+ }
+ return keyBuf.getLong();
+ }
+
+ // FIXME specify DPID here, or Should caller specify the key it self?
+ // In other words, should layer above have the control of the ID?
+ public KVSwitch(final Long dpid) {
+ super(DataStoreClient.getClient().getTable(GLOBAL_SWITCH_TABLE_NAME), getSwitchID(dpid));
+
+ this.dpid = dpid;
+ this.status = STATUS.INACTIVE;
+ }
+
+ /**
+ * Get an instance from Key.
+ *
+ * @note You need to call `read()` to get the DB content.
+ * @param key
+ * @return KVSwitch instance
+ */
+ public static KVSwitch createFromKey(final byte[] key) {
+ return new KVSwitch(getDpidFromKey(key));
+ }
+
+ public static Iterable<KVSwitch> getAllSwitches() {
+ return new SwitchEnumerator();
+ }
+
+ public static class SwitchEnumerator implements Iterable<KVSwitch> {
+
+ @Override
+ public Iterator<KVSwitch> iterator() {
+ return new SwitchIterator();
+ }
+ }
+
+ public static class SwitchIterator extends AbstractObjectIterator<KVSwitch> {
+
+ public SwitchIterator() {
+ super(DataStoreClient.getClient().getTable(GLOBAL_SWITCH_TABLE_NAME));
+ }
+
+ @Override
+ public KVSwitch next() {
+ IKVEntry o = enumerator.next();
+ KVSwitch e = KVSwitch.createFromKey(o.getKey());
+ e.deserialize(o.getValue(), o.getVersion());
+ return e;
+ }
+ }
+
+ public STATUS getStatus() {
+ return status;
+ }
+
+ public void setStatus(final STATUS status) {
+ this.status = status;
+ }
+
+ public Long getDpid() {
+ return dpid;
+ }
+
+ public byte[] getId() {
+ return getKey();
+ }
+
+ @Override
+ public byte[] serialize() {
+ Map<Object, Object> map = getPropertyMap();
+
+ SwitchProperty.Builder sw = SwitchProperty.newBuilder();
+ sw.setDpid(dpid);
+ sw.setStatus(status.ordinal());
+
+ if (!map.isEmpty()) {
+ byte[] propMaps = serializePropertyMap(switchKryo.get(), map);
+ sw.setValue(ByteString.copyFrom(propMaps));
+ }
+
+ return sw.build().toByteArray();
+ }
+
+ @Override
+ protected boolean deserialize(final byte[] bytes) {
+ try {
+ boolean success = true;
+
+ SwitchProperty sw = SwitchProperty.parseFrom(bytes);
+ byte[] props = sw.getValue().toByteArray();
+ success &= deserializePropertyMap(switchKryo.get(), props);
+ this.status = STATUS.values()[sw.getStatus()];
+
+ return success;
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Deserializing Switch: " + this + " failed.", e);
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ // TODO output all properties?
+ return "[" + this.getClass().getSimpleName()
+ + " 0x" + Long.toHexString(dpid) + " STATUS:" + status + "]";
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/RCDevice.java b/src/main/java/net/onrc/onos/datastore/topology/RCDevice.java
deleted file mode 100644
index bb8a1f2..0000000
--- a/src/main/java/net/onrc/onos/datastore/topology/RCDevice.java
+++ /dev/null
@@ -1,221 +0,0 @@
-package net.onrc.onos.datastore.topology;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.esotericsoftware.kryo.Kryo;
-
-import edu.stanford.ramcloud.JRamCloud;
-import net.onrc.onos.datastore.RCObject;
-import net.onrc.onos.datastore.RCTable;
-import net.onrc.onos.datastore.topology.RCLink.STATUS;
-import net.onrc.onos.datastore.utils.ByteArrayComparator;
-import net.onrc.onos.datastore.utils.ByteArrayUtil;
-import net.onrc.onos.ofcontroller.networkgraph.DeviceEvent;
-
-public class RCDevice extends RCObject {
- @SuppressWarnings("unused")
- private static final Logger log = LoggerFactory.getLogger(RCDevice.class);
-
- private static final ThreadLocal<Kryo> deviceKryo = new ThreadLocal<Kryo>() {
- @Override
- protected Kryo initialValue() {
- Kryo kryo = new Kryo();
- kryo.setRegistrationRequired(true);
- kryo.setReferences(false);
- kryo.register(byte[].class);
- kryo.register(byte[][].class);
- kryo.register(HashMap.class);
- // TODO check if we should explicitly specify EnumSerializer
- kryo.register(STATUS.class);
- return kryo;
- }
- };
-
- public static final String GLOBAL_DEVICE_TABLE_NAME = "G:Device";
-
- // FIXME these should be Enum or some number, not String
- private static final String PROP_MAC = "mac";
- private static final String PROP_PORT_IDS = "port-ids";
-
- private final byte[] mac;
- private TreeSet<byte[]> portIds;
- transient private boolean isPortIdsModified;
-
- // Assuming mac is unique cluster-wide
- public static byte[] getDeviceID(final byte[] mac) {
- return DeviceEvent.getDeviceID(mac).array();
- }
-
- public static StringBuilder keysToSB(Collection<byte[]> keys) {
- StringBuilder sb = new StringBuilder();
- sb.append("[");
- boolean hasWritten = false;
- for (byte[] key : keys) {
- if (hasWritten) {
- sb.append(", ");
- }
- sb.append(keyToString(key));
- hasWritten = true;
- }
- sb.append("]");
- return sb;
- }
-
- public static String keyToString(byte[] key) {
- // For debug log
- byte[] mac = getMacFromKey(key);
- return "D" + ByteArrayUtil.toHexStringBuffer(mac, ":");
- }
-
- public static byte[] getMacFromKey(byte[] key) {
- ByteBuffer keyBuf = ByteBuffer.wrap(key);
- if (keyBuf.getChar() != 'D') {
- throw new IllegalArgumentException("Invalid Device key");
- }
- byte[] mac = new byte[keyBuf.remaining()];
- keyBuf.get(mac);
- return mac;
- }
-
- public RCDevice(byte[] mac) {
- super(RCTable.getTable(GLOBAL_DEVICE_TABLE_NAME), getDeviceID(mac));
-
- this.mac = mac;
- this.portIds = new TreeSet<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
- this.isPortIdsModified = true;
- }
-
- /**
- * Get an instance from Key.
- *
- * @note You need to call `read()` to get the DB content.
- * @param key
- * @return
- */
- public static <D extends RCObject> D createFromKey(byte[] key) {
- @SuppressWarnings("unchecked")
- D d = (D) new RCDevice(getMacFromKey(key));
- return d;
- }
-
- public static Iterable<RCDevice> getAllDevices() {
- return new DeviceEnumerator();
- }
-
- public static class DeviceEnumerator implements Iterable<RCDevice> {
-
- @Override
- public Iterator<RCDevice> iterator() {
- return new DeviceIterator();
- }
- }
-
- public static class DeviceIterator extends ObjectIterator<RCDevice> {
-
- public DeviceIterator() {
- super(RCTable.getTable(GLOBAL_DEVICE_TABLE_NAME));
- }
-
- @Override
- public RCDevice next() {
- JRamCloud.Object o = enumerator.next();
- RCDevice e = RCDevice.createFromKey(o.key);
- e.setValueAndDeserialize(o.value, o.version);
- return e;
- }
- }
-
- public byte[] getMac() {
- // TODO may need to clone() to be sure this object will be immutable.
- return mac;
- }
-
- public byte[] getId() {
- return getKey();
- }
-
- public void addPortId(byte[] portId) {
- // TODO: Should we copy portId, or reference is OK.
- isPortIdsModified |= portIds.add(portId);
- }
-
- public void removePortId(byte[] portId) {
- isPortIdsModified |= portIds.remove(portId);
- }
-
- public void emptyPortIds() {
- portIds.clear();
- this.isPortIdsModified = true;
- }
-
- public void addAllToPortIds(Collection<byte[]> portIds) {
- // TODO: Should we copy portId, or reference is OK.
- isPortIdsModified |= this.portIds.addAll(portIds);
- }
-
- /**
- *
- * @return Unmodifiable Set view of all the PortIds;
- */
- public Set<byte[]> getAllPortIds() {
- return Collections.unmodifiableSet(portIds);
- }
-
- @Override
- public void serializeAndSetValue() {
- Map<Object, Object> map = getObjectMap();
-
- map.put(PROP_MAC, mac);
- if (isPortIdsModified) {
- byte[] portIdArray[] = new byte[portIds.size()][];
- map.put(PROP_PORT_IDS, portIds.toArray(portIdArray));
- isPortIdsModified = false;
- }
-
- serializeAndSetValue(deviceKryo.get(), map);
- }
-
- @Override
- public Map<Object, Object> deserializeObjectFromValue() {
- Map<Object, Object> map = deserializeObjectFromValue(deviceKryo.get());
-
- if (this.portIds == null) {
- this.portIds = new TreeSet<>(
- ByteArrayComparator.BYTEARRAY_COMPARATOR);
- }
- byte[] portIdArray[] = (byte[][]) map.get(PROP_PORT_IDS);
- if (portIdArray != null) {
- this.portIds.clear();
- this.portIds.addAll(Arrays.asList(portIdArray));
- isPortIdsModified = false;
- } else {
- // trigger write on next serialize
- isPortIdsModified = true;
- }
-
- return map;
- }
-
- @Override
- public String toString() {
- // TODO OUTPUT ALL?
- return "[RCDevice " + ByteArrayUtil.toHexStringBuffer(mac, ":") + "]";
- }
-
- public static void main(String[] args) {
- // TODO Auto-generated method stub
-
- }
-
-}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/RCLink.java b/src/main/java/net/onrc/onos/datastore/topology/RCLink.java
deleted file mode 100644
index e3edff0..0000000
--- a/src/main/java/net/onrc/onos/datastore/topology/RCLink.java
+++ /dev/null
@@ -1,247 +0,0 @@
-package net.onrc.onos.datastore.topology;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.openflow.util.HexString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-import edu.stanford.ramcloud.JRamCloud;
-import net.onrc.onos.datastore.RCProtos.LinkProperty;
-import net.onrc.onos.datastore.RCObject;
-import net.onrc.onos.datastore.RCTable;
-import net.onrc.onos.ofcontroller.networkgraph.LinkEvent;
-import net.onrc.onos.ofcontroller.networkgraph.PortEvent;
-
-public class RCLink extends RCObject {
- private static final Logger log = LoggerFactory.getLogger(RCLink.class);
-
- private static final ThreadLocal<Kryo> linkKryo = new ThreadLocal<Kryo>() {
- @Override
- protected Kryo initialValue() {
- Kryo kryo = new Kryo();
- kryo.setRegistrationRequired(true);
- kryo.setReferences(false);
- kryo.register(byte[].class);
- kryo.register(byte[][].class);
- kryo.register(HashMap.class);
- // TODO check if we should explicitly specify EnumSerializer
- kryo.register(STATUS.class);
- return kryo;
- }
- };
-
- public static class SwitchPort {
- public final Long dpid;
- public final Long number;
-
- public SwitchPort(Long dpid, Long number) {
- this.dpid = dpid;
- this.number = number;
- }
-
- public byte[] getPortID() {
- return RCPort.getPortID(dpid, number);
- }
-
- public byte[] getSwitchID() {
- return RCSwitch.getSwitchID(dpid);
- }
-
- @Override
- public String toString() {
- return "(" + Long.toHexString(dpid) + "@" + number + ")";
- }
-
- }
-
- public static final String GLOBAL_LINK_TABLE_NAME = "G:Link";
-
- // must not re-order enum members, ordinal will be sent over wire
- public enum STATUS {
- INACTIVE, ACTIVE;
- }
-
- private final SwitchPort src;
- private final SwitchPort dst;
- private STATUS status;
-
- public static byte[] getLinkID(Long src_dpid, Long src_port_no,
- Long dst_dpid, Long dst_port_no) {
- return LinkEvent.getLinkID(src_dpid, src_port_no, dst_dpid,
- dst_port_no).array();
- }
-
- public static StringBuilder keysToSB(Collection<byte[]> keys) {
- StringBuilder sb = new StringBuilder();
- sb.append("[");
- boolean hasWritten = false;
- for (byte[] key : keys) {
- if (hasWritten) {
- sb.append(", ");
- }
- sb.append(keyToString(key));
- hasWritten = true;
- }
- sb.append("]");
- return sb;
- }
-
- public static String keyToString(byte[] key) {
- // For debug log
- long[] tuple = getLinkTupleFromKey(key);
- return "L" + "S" + HexString.toHexString(tuple[0]) + "P" + tuple[1]
- + "S" + HexString.toHexString(tuple[2]) + "P" + tuple[3];
- }
-
- public static long[] getLinkTupleFromKey(byte[] key) {
- return getLinkTupleFromKey(ByteBuffer.wrap(key));
- }
-
- public static long[] getLinkTupleFromKey(ByteBuffer keyBuf) {
- long tuple[] = new long[4];
- if (keyBuf.getChar() != 'L') {
- throw new IllegalArgumentException("Invalid Link key");
- }
- long src_port_pair[] = RCPort.getPortPairFromKey(keyBuf.slice());
- keyBuf.position(2 + PortEvent.PORTID_BYTES);
- long dst_port_pair[] = RCPort.getPortPairFromKey(keyBuf.slice());
-
- tuple[0] = src_port_pair[0];
- tuple[1] = src_port_pair[1];
- tuple[2] = dst_port_pair[0];
- tuple[3] = dst_port_pair[1];
-
- return tuple;
- }
-
- public RCLink(Long src_dpid, Long src_port_no, Long dst_dpid,
- Long dst_port_no) {
- super(RCTable.getTable(GLOBAL_LINK_TABLE_NAME), getLinkID(src_dpid,
- src_port_no, dst_dpid, dst_port_no));
-
- src = new SwitchPort(src_dpid, src_port_no);
- dst = new SwitchPort(dst_dpid, dst_port_no);
- status = STATUS.INACTIVE;
- }
-
- /**
- * Get an instance from Key.
- *
- * @note You need to call `read()` to get the DB content.
- * @param key
- * @return RCLink instance
- */
- public static <L extends RCObject> L createFromKey(byte[] key) {
- long linkTuple[] = getLinkTupleFromKey(key);
- @SuppressWarnings("unchecked")
- L l = (L) new RCLink(linkTuple[0], linkTuple[1], linkTuple[2],
- linkTuple[3]);
- return l;
- }
-
- public static Iterable<RCLink> getAllLinks() {
- return new LinkEnumerator();
- }
-
- public static class LinkEnumerator implements Iterable<RCLink> {
-
- @Override
- public Iterator<RCLink> iterator() {
- return new LinkIterator();
- }
- }
-
- public static class LinkIterator extends ObjectIterator<RCLink> {
-
- public LinkIterator() {
- super(RCTable.getTable(GLOBAL_LINK_TABLE_NAME));
- }
-
- @Override
- public RCLink next() {
- JRamCloud.Object o = enumerator.next();
- RCLink e = RCLink.createFromKey(o.key);
- e.setValueAndDeserialize(o.value, o.version);
- return e;
- }
- }
-
- public STATUS getStatus() {
- return status;
- }
-
- public void setStatus(STATUS status) {
- this.status = status;
- }
-
- public SwitchPort getSrc() {
- return src;
- }
-
- public SwitchPort getDst() {
- return dst;
- }
-
- public byte[] getId() {
- return getKey();
- }
-
- @Override
- public void serializeAndSetValue() {
- Map<Object, Object> map = getObjectMap();
-
- LinkProperty.Builder link = LinkProperty.newBuilder();
- link.setSrcSwId(ByteString.copyFrom(src.getSwitchID()));
- link.setSrcPortId(ByteString.copyFrom(src.getPortID()));
- link.setDstSwId(ByteString.copyFrom(dst.getSwitchID()));
- link.setDstPortId(ByteString.copyFrom(dst.getPortID()));
- link.setStatus(status.ordinal());
-
- if (!map.isEmpty()) {
- serializeAndSetValue(linkKryo.get(), map);
- link.setValue(ByteString.copyFrom(this.getSerializedValue()));
- }
-
- this.value = link.build().toByteArray();
- }
-
- @Override
- public Map<Object, Object> deserializeObjectFromValue() {
- LinkProperty link = null;
- Map<Object, Object> map = null;
- try {
- link = LinkProperty.parseFrom(this.value);
- this.value = link.getValue().toByteArray();
- if (this.value.length >= 1) {
- map = deserializeObjectFromValue(linkKryo.get());
- } else {
- map = new HashMap<>();
- }
- this.status = STATUS.values()[link.getStatus()];
- return map;
- } catch (InvalidProtocolBufferException e) {
- log.error("{" + toString() + "}: Read Link: ", e);
- return null;
- }
- }
-
- @Override
- public String toString() {
- return "[RCLink " + src + "->" + dst + " STATUS:" + status + "]";
- }
-
- public static void main(String[] args) {
- // TODO Auto-generated method stub
-
- }
-
-}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/RCPort.java b/src/main/java/net/onrc/onos/datastore/topology/RCPort.java
deleted file mode 100644
index 0b88b60..0000000
--- a/src/main/java/net/onrc/onos/datastore/topology/RCPort.java
+++ /dev/null
@@ -1,231 +0,0 @@
-package net.onrc.onos.datastore.topology;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import org.openflow.util.HexString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-import edu.stanford.ramcloud.JRamCloud;
-import net.onrc.onos.datastore.RCProtos.PortProperty;
-import net.onrc.onos.datastore.RCObject;
-import net.onrc.onos.datastore.RCTable;
-import net.onrc.onos.datastore.utils.ByteArrayUtil;
-import net.onrc.onos.ofcontroller.networkgraph.PortEvent;
-
-public class RCPort extends RCObject {
- private static final Logger log = LoggerFactory.getLogger(RCPort.class);
-
- private static final ThreadLocal<Kryo> portKryo = new ThreadLocal<Kryo>() {
- @Override
- protected Kryo initialValue() {
- Kryo kryo = new Kryo();
- kryo.setRegistrationRequired(true);
- kryo.setReferences(false);
- kryo.register(byte[].class);
- kryo.register(byte[][].class);
- kryo.register(HashMap.class);
- // TODO check if we should explicitly specify EnumSerializer
- kryo.register(STATUS.class);
- return kryo;
- }
- };
-
- public static final String GLOBAL_PORT_TABLE_NAME = "G:Port";
-
- // must not re-order enum members, ordinal will be sent over wire
- public enum STATUS {
- INACTIVE, ACTIVE;
- }
-
- private final Long dpid;
- private final Long number;
-
- private STATUS status;
-
- public static byte[] getPortID(Long dpid, Long number) {
- return PortEvent.getPortID(dpid, number).array();
- }
-
- public static StringBuilder keysToSB(Collection<byte[]> keys) {
- StringBuilder sb = new StringBuilder();
- sb.append("[");
- boolean hasWritten = false;
- for (byte[] key : keys) {
- if (hasWritten) {
- sb.append(", ");
- }
- sb.append(keyToString(key));
- hasWritten = true;
- }
- sb.append("]");
- return sb;
- }
-
- public static String keyToString(byte[] key) {
- // For debug log
- long[] pair = getPortPairFromKey(key);
- return "S" + HexString.toHexString(pair[0]) + "P" + pair[1];
- }
-
- public static long[] getPortPairFromKey(byte[] key) {
- return getPortPairFromKey(ByteBuffer.wrap(key));
-
- }
-
- public static long[] getPortPairFromKey(ByteBuffer keyBuf) {
- long[] pair = new long[2];
- if (keyBuf.getChar() != 'S') {
- throw new IllegalArgumentException("Invalid Port key:" + keyBuf
- + " "
- + ByteArrayUtil.toHexStringBuffer(keyBuf.array(), ":"));
- }
- pair[0] = keyBuf.getLong();
- if (keyBuf.getChar() != 'P') {
- throw new IllegalArgumentException("Invalid Port key:" + keyBuf
- + " "
- + ByteArrayUtil.toHexStringBuffer(keyBuf.array(), ":"));
- }
- pair[1] = keyBuf.getLong();
- return pair;
-
- }
-
- public static long getDpidFromKey(byte[] key) {
- return getPortPairFromKey(key)[0];
- }
-
- public static long getNumberFromKey(byte[] key) {
- return getPortPairFromKey(key)[1];
- }
-
- // FIXME specify DPID,number here, or Should caller specify the key it self?
- // In other words, should layer above have the control of the ID?
- public RCPort(Long dpid, Long number) {
- super(RCTable.getTable(GLOBAL_PORT_TABLE_NAME), getPortID(dpid, number));
-
- // TODO Auto-generated constructor stub
-
- this.dpid = dpid;
- this.number = number;
- this.status = STATUS.INACTIVE;
- }
-
- /**
- * Get an instance from Key.
- *
- * @note You need to call `read()` to get the DB content.
- * @param key
- * @return RCPort instance
- */
- public static <P extends RCObject> P createFromKey(byte[] key) {
- long[] pair = getPortPairFromKey(key);
- @SuppressWarnings("unchecked")
- P p = (P) new RCPort(pair[0], pair[1]);
- return p;
- }
-
- public static Iterable<RCPort> getAllPorts() {
- return new PortEnumerator();
- }
-
- public static class PortEnumerator implements Iterable<RCPort> {
-
- @Override
- public Iterator<RCPort> iterator() {
- return new PortIterator();
- }
- }
-
- public static class PortIterator extends ObjectIterator<RCPort> {
-
- public PortIterator() {
- super(RCTable.getTable(GLOBAL_PORT_TABLE_NAME));
- }
-
- @Override
- public RCPort next() {
- JRamCloud.Object o = enumerator.next();
- RCPort e = RCPort.createFromKey(o.key);
- e.setValueAndDeserialize(o.value, o.version);
- return e;
- }
- }
-
- public STATUS getStatus() {
- return status;
- }
-
- public void setStatus(STATUS status) {
- this.status = status;
- }
-
- public Long getDpid() {
- return dpid;
- }
-
- public Long getNumber() {
- return number;
- }
-
- public byte[] getId() {
- return getKey();
- }
-
- @Override
- public void serializeAndSetValue() {
- Map<Object, Object> map = getObjectMap();
-
- PortProperty.Builder port = PortProperty.newBuilder();
- port.setDpid(dpid);
- port.setNumber(number);
- port.setStatus(status.ordinal());
-
- if (!map.isEmpty()) {
- serializeAndSetValue(portKryo.get(), map);
- port.setValue(ByteString.copyFrom(this.getSerializedValue()));
- }
-
- this.value = port.build().toByteArray();
- }
-
- @Override
- public Map<Object, Object> deserializeObjectFromValue() {
- PortProperty port = null;
- Map<Object, Object> map = null;
- try {
- port = PortProperty.parseFrom(this.value);
- this.value = port.getValue().toByteArray();
- if (this.value.length >= 1) {
- map = deserializeObjectFromValue(portKryo.get());
- } else {
- map = new HashMap<>();
- }
- this.status = STATUS.values()[port.getStatus()];
- return map;
- } catch (InvalidProtocolBufferException e) {
- log.error("{" + toString() + "}: Read Port: ", e);
- return null;
- }
- }
-
- @Override
- public String toString() {
- // TODO OUTPUT ALL?
- return "[RCPort 0x" + Long.toHexString(dpid) + "@" + number
- + " STATUS:" + status + "]";
- }
-
- public static void main(String[] args) {
- // TODO Auto-generated method stub
-
- }
-
-}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java b/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java
deleted file mode 100644
index 19b9801..0000000
--- a/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java
+++ /dev/null
@@ -1,482 +0,0 @@
-package net.onrc.onos.datastore.topology;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.HashMap;
-import net.onrc.onos.datastore.RCObject;
-import net.onrc.onos.datastore.RCTable;
-import net.onrc.onos.ofcontroller.networkgraph.SwitchEvent;
-
-import org.openflow.util.HexString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-import edu.stanford.ramcloud.JRamCloud;
-import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
-import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
-import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
-import net.onrc.onos.datastore.RCProtos.SwitchProperty;
-
-/**
- * Switch Object in RC.
- *
- * @note This class will not maintain invariants. e.g. It will NOT automatically
- * remove Ports on Switch, when deleting a Switch.
- *
- */
-public class RCSwitch extends RCObject {
- private static final Logger log = LoggerFactory.getLogger(RCSwitch.class);
-
- private static final ThreadLocal<Kryo> switchKryo = new ThreadLocal<Kryo>() {
- @Override
- protected Kryo initialValue() {
- Kryo kryo = new Kryo();
- kryo.setRegistrationRequired(true);
- kryo.setReferences(false);
- kryo.register(byte[].class);
- kryo.register(byte[][].class);
- kryo.register(HashMap.class);
- // TODO check if we should explicitly specify EnumSerializer
- kryo.register(STATUS.class);
- return kryo;
- }
- };
-
- public static final String GLOBAL_SWITCH_TABLE_NAME = "G:Switch";
-
- // must not re-order enum members, ordinal will be sent over wire
- public enum STATUS {
- INACTIVE, ACTIVE;
- }
-
- private final Long dpid;
- private STATUS status;
-
- public static byte[] getSwitchID(Long dpid) {
- return SwitchEvent.getSwitchID(dpid).array();
- }
-
- public static StringBuilder keysToSB(Collection<byte[]> keys) {
- StringBuilder sb = new StringBuilder();
- sb.append("[");
- boolean hasWritten = false;
- for (byte[] key : keys) {
- if (hasWritten) {
- sb.append(", ");
- }
- sb.append(keyToString(key));
- hasWritten = true;
- }
- sb.append("]");
- return sb;
- }
-
- public static String keyToString(byte[] key) {
- // For debug log
- return "S" + HexString.toHexString(getDpidFromKey(key));
- }
-
- public static long getDpidFromKey(byte[] key) {
- return getDpidFromKey(ByteBuffer.wrap(key));
- }
-
- public static long getDpidFromKey(ByteBuffer keyBuf) {
- if (keyBuf.getChar() != 'S') {
- throw new IllegalArgumentException("Invalid Switch key");
- }
- return keyBuf.getLong();
- }
-
- // FIXME specify DPID here, or Should caller specify the key it self?
- // In other words, should layer above have the control of the ID?
- public RCSwitch(Long dpid) {
- super(RCTable.getTable(GLOBAL_SWITCH_TABLE_NAME), getSwitchID(dpid));
-
- this.dpid = dpid;
- this.status = STATUS.INACTIVE;
- }
-
- /**
- * Get an instance from Key.
- *
- * @note You need to call `read()` to get the DB content.
- * @param key
- * @return RCSwitch instance
- */
- public static <SW extends RCObject> SW createFromKey(byte[] key) {
- @SuppressWarnings("unchecked")
- SW sw = (SW) new RCSwitch(getDpidFromKey(key));
- return sw;
- }
-
- public static Iterable<RCSwitch> getAllSwitches() {
- return new SwitchEnumerator();
- }
-
- public static class SwitchEnumerator implements Iterable<RCSwitch> {
-
- @Override
- public Iterator<RCSwitch> iterator() {
- return new SwitchIterator();
- }
- }
-
- public static class SwitchIterator extends ObjectIterator<RCSwitch> {
-
- public SwitchIterator() {
- super(RCTable.getTable(GLOBAL_SWITCH_TABLE_NAME));
- }
-
- @Override
- public RCSwitch next() {
- JRamCloud.Object o = enumerator.next();
- RCSwitch e = RCSwitch.createFromKey(o.key);
- e.setValueAndDeserialize(o.value, o.version);
- return e;
- }
- }
-
- public STATUS getStatus() {
- return status;
- }
-
- public void setStatus(STATUS status) {
- this.status = status;
- }
-
- public Long getDpid() {
- return dpid;
- }
-
- public byte[] getId() {
- return getKey();
- }
-
- @Override
- public void serializeAndSetValue() {
- Map<Object, Object> map = getObjectMap();
-
- SwitchProperty.Builder sw = SwitchProperty.newBuilder();
- sw.setDpid(dpid);
- sw.setStatus(status.ordinal());
-
- if (!map.isEmpty()) {
- serializeAndSetValue(switchKryo.get(), map);
- sw.setValue(ByteString.copyFrom(this.getSerializedValue()));
- }
-
- this.value = sw.build().toByteArray();
- }
-
- @Override
- public Map<Object, Object> deserializeObjectFromValue() {
- SwitchProperty sw = null;
- Map<Object, Object> map = null;
- try {
- sw = SwitchProperty.parseFrom(this.value);
- this.value = sw.getValue().toByteArray();
- if (this.value.length >= 1) {
- map = deserializeObjectFromValue(switchKryo.get());
- } else {
- map = new HashMap<>();
- }
- this.status = STATUS.values()[sw.getStatus()];
- return map;
- } catch (InvalidProtocolBufferException e) {
- log.error("{" + toString() + "}: Read Switch: ", e);
- return null;
- }
- }
-
- @Override
- public String toString() {
- // TODO OUTPUT ALL?
- return "[RCSwitch 0x" + Long.toHexString(dpid) + " STATUS:" + status
- + "]";
- }
-
- public static void main(String argv[]) {
- // create active switch 0x1 with 2 ports
- RCSwitch sw = new RCSwitch(0x1L);
- sw.setStatus(STATUS.ACTIVE);
-
- try {
- sw.create();
- } catch (ObjectExistsException e) {
- log.debug("Create Switch Failed", e);
- e.printStackTrace();
- }
-
- // read switch 0x1
- RCSwitch swRead = new RCSwitch(0x1L);
- try {
- swRead.read();
- } catch (ObjectDoesntExistException e) {
- log.debug("Reading Switch Failed", e);
- }
- assert (swRead.getStatus() == STATUS.ACTIVE);
-
- // update 0x1
- swRead.setStatus(STATUS.INACTIVE);
- try {
- swRead.update();
- } catch (ObjectDoesntExistException | WrongVersionException e) {
- log.debug("Updating Switch Failed", e);
- }
-
- // read 0x1 again and delete
- RCSwitch swRead2 = new RCSwitch(0x1L);
- try {
- swRead2.read();
- } catch (ObjectDoesntExistException e) {
- log.debug("Reading Switch Again Failed", e);
- }
- assert (swRead2.getStatus() == STATUS.INACTIVE);
- try {
- swRead2.delete();
- } catch (ObjectDoesntExistException | WrongVersionException e) {
- log.debug("Deleting Switch Failed", e);
- }
-
- RCSwitch swRead3 = new RCSwitch(0x1L);
- try {
- swRead3.read();
- } catch (ObjectDoesntExistException e) {
- log.debug("Switch not found as expected");
- }
-
- topology_setup();
- topology_walk();
- topology_delete();
- }
-
- @Deprecated
- private static void topology_setup() {
- log.debug("topology_setup start.");
-
- // d1 - s1p1 - s1 - s1p2 - s2p1 - s2 - s2p2
-
- RCSwitch sw1 = new RCSwitch(0x1L);
- sw1.setStatus(STATUS.ACTIVE);
- try {
- sw1.create();
- log.debug("Create {}", sw1);
- } catch (ObjectExistsException e) {
- log.error("Switch creation failed", e);
- }
-
- RCPort sw1p1 = new RCPort(0x1L, 1L);
- sw1p1.setStatus(RCPort.STATUS.ACTIVE);
- RCPort sw1p2 = new RCPort(0x1L, 2L);
- sw1p2.setStatus(RCPort.STATUS.ACTIVE);
- try {
- sw1p1.create();
- log.debug("Create {}", sw1p1);
- sw1p2.create();
- log.debug("Create {}", sw1p2);
- } catch (ObjectExistsException e) {
- log.error("Port creation failed", e);
- }
-
- try {
- sw1.update();
- log.debug("Update {}", sw1);
- } catch (ObjectDoesntExistException | WrongVersionException e) {
- log.error("Switch update failed", e);
- }
-
- RCDevice d1 = new RCDevice(new byte[] { 0, 1, 2, 3, 4, 5, 6 });
- d1.addPortId(sw1p1.getId());
-
- try {
- d1.create();
- log.debug("Create {}", d1);
- try {
- sw1p1.update();
- } catch (ObjectDoesntExistException | WrongVersionException e) {
- log.error("Link update failed", e);
- }
- log.debug("Create {}", sw1p1);
- } catch (ObjectExistsException e) {
- log.error("Device creation failed", e);
- }
-
- RCSwitch sw2 = new RCSwitch(0x2L);
- sw2.setStatus(STATUS.ACTIVE);
- RCPort sw2p1 = new RCPort(0x2L, 1L);
- sw2p1.setStatus(RCPort.STATUS.ACTIVE);
- RCPort sw2p2 = new RCPort(0x2L, 2L);
- sw2p2.setStatus(RCPort.STATUS.ACTIVE);
-
- RCDevice d2 = new RCDevice(new byte[] { 6, 5, 4, 3, 2, 1, 0 });
- d2.addPortId(sw2p2.getId());
-
- // XXX Collection created by Arrays.asList needs to be stored, so that
- // which operation failed
- Collection<WriteOp> groupOp = Arrays.asList(
- RCObject.WriteOp.Create(sw2), RCObject.WriteOp.Create(sw2p1),
- RCObject.WriteOp.Create(sw2p2), RCObject.WriteOp.Create(d2));
- boolean failed = RCObject.multiWrite(groupOp);
- if (failed) {
- log.error("Some of Switch/Port/Device creation failed");
- for ( WriteOp op : groupOp ) {
- log.debug("{} - Result:{}", op.getObject(), op.getStatus() );
- }
- } else {
- log.debug("Create {} Version:{}", sw2, sw2.getVersion());
- log.debug("Create {} Version:{}", sw2p1, sw2p1.getVersion());
- log.debug("Create {} Version:{}", sw2p2, sw2p2.getVersion());
- log.debug("Create {} Version:{}", d2, d2.getVersion());
- }
-
- RCLink l1 = new RCLink(0x1L, 2L, 0x2L, 1L);
- l1.setStatus(RCLink.STATUS.ACTIVE);
-
- try {
- l1.create();
- log.debug("Create {}", l1);
- try {
- sw1p2.update();
- log.debug("Update {}", sw1p2);
- sw2p1.update();
- log.debug("Update {}", sw2p1);
- } catch (ObjectDoesntExistException | WrongVersionException e) {
- log.error("Port update failed", e);
- }
- } catch (ObjectExistsException e) {
- log.error("Link creation failed", e);
- }
-
- log.debug("topology_setup end.");
- }
-
- @Deprecated
- private static void topology_walk() {
- log.debug("topology_walk start.");
-
- Iterable<RCSwitch> swIt = RCSwitch.getAllSwitches();
- log.debug("Enumerating Switches start");
- for (RCSwitch sw : swIt) {
- log.debug("{}", sw);
- }
- log.debug("Enumerating Switches end");
-
- RCSwitch sw1 = new RCSwitch(0x1L);
- try {
- sw1.read();
- log.debug("{}", sw1);
- } catch (ObjectDoesntExistException e) {
- log.error("Reading switch failed", e);
- }
-
- assert (sw1.getDpid() == 0x1L);
- assert (sw1.getStatus() == STATUS.ACTIVE);
- for (RCPort port : RCPort.getAllPorts()) {
- if (port.getDpid() != 0x1L) {
- continue;
- }
- log.debug("{}", port);
-
- for (RCDevice device : RCDevice.getAllDevices()) {
- if (!device.getAllPortIds().contains(port.getId())) {
- continue;
- }
- log.debug("{} - PortIDs:{}", device,
- RCPort.keysToSB(device.getAllPortIds()));
- }
-
- for (RCLink link : RCLink.getAllLinks()) {
- if (!Arrays.equals(link.getSrc().getPortID(), port.getId())) {
- continue;
- }
- log.debug("Link {}", link);
- }
- }
-
- RCSwitch sw2 = new RCSwitch(0x2L);
- try {
- sw2.read();
- log.debug("{}", sw2);
- } catch (ObjectDoesntExistException e) {
- log.error("Reading switch failed", e);
- }
-
- assert (sw2.getDpid() == 0x2L);
- assert (sw2.getStatus() == STATUS.ACTIVE);
- for (RCPort port : RCPort.getAllPorts()) {
- if (port.getDpid() != 0x2L) {
- continue;
- }
- log.debug("{}", port);
-
- for (RCDevice device : RCDevice.getAllDevices()) {
- if (!device.getAllPortIds().contains(port.getId())) {
- continue;
- }
- log.debug("{} - PortIDs:{}", device,
- RCPort.keysToSB(device.getAllPortIds()));
- }
-
- for (RCLink link : RCLink.getAllLinks()) {
- if (!Arrays.equals(link.getSrc().getPortID(), port.getId())) {
- continue;
- }
- log.debug("Link {}", link);
- }
-
- }
-
- log.debug("topology_walk end.");
- }
-
- @Deprecated
- private static void topology_delete() {
- log.debug("topology_delete start.");
-
- for (RCSwitch sw : RCSwitch.getAllSwitches()) {
- try {
- sw.read();
- sw.delete();
- } catch (ObjectDoesntExistException | WrongVersionException e) {
- log.debug("Delete Switch Failed", e);
- }
- }
-
- for (RCPort p : RCPort.getAllPorts()) {
- try {
- p.read();
- p.delete();
- } catch (ObjectDoesntExistException | WrongVersionException e) {
- log.debug("Delete Port Failed", e);
- }
- }
-
- for (RCDevice d : RCDevice.getAllDevices()) {
- try {
- d.read();
- d.delete();
- } catch (ObjectDoesntExistException | WrongVersionException e) {
- log.debug("Delete Device Failed", e);
- }
- }
-
- for (RCLink l : RCLink.getAllLinks()) {
- try {
- l.read();
- l.delete();
- } catch (ObjectDoesntExistException | WrongVersionException e) {
- log.debug("Delete Link Failed", e);
- }
- }
-
- log.debug("topology_delete end.");
- }
-
-}
diff --git a/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java
index 08290a3..56b088f 100644
--- a/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java
+++ b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java
@@ -3,12 +3,20 @@
import java.nio.ByteBuffer;
import java.util.Comparator;
+/**
+ * Comparator which will compares the content of byte[].
+ *
+ * Expected to be used with TreeMap, etc. when you want to use byte[] as a key.
+ */
public final class ByteArrayComparator implements Comparator<byte[]> {
+ /**
+ * Instance which can be used, if you want to avoid instantiation per Map.
+ */
public static final ByteArrayComparator BYTEARRAY_COMPARATOR = new ByteArrayComparator();
@Override
- public int compare(byte[] o1, byte[] o2) {
+ public int compare(final byte[] o1, final byte[] o2) {
final ByteBuffer b1 = ByteBuffer.wrap(o1);
final ByteBuffer b2 = ByteBuffer.wrap(o2);
return b1.compareTo(b2);
diff --git a/src/main/java/net/onrc/onos/datastore/utils/ByteArrayUtil.java b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayUtil.java
index 96f8743..c13d7ea 100644
--- a/src/main/java/net/onrc/onos/datastore/utils/ByteArrayUtil.java
+++ b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayUtil.java
@@ -4,13 +4,35 @@
public class ByteArrayUtil {
+ // Suppresses default constructor, ensuring non-instantiability.
+ private ByteArrayUtil() {}
+
+ /**
+ * Returns a StringBuffer with each byte in {@code bytes}
+ * converted to a String with {@link Integer#toHexString(int)},
+ * separated by {@code sep}.
+ *
+ * @param bytes byte array to convert
+ * @param sep separator between each bytes
+ * @return {@code bytes} converted to a StringBuffer
+ */
public static StringBuffer toHexStringBuffer(final byte[] bytes,
final String sep) {
return toHexStringBuffer(bytes, sep, new StringBuffer());
}
+ /**
+ * Returns a StringBuffer with each byte in {@code bytes}
+ * converted to a String with {@link Integer#toHexString(int)},
+ * separated by {@code sep}.
+ *
+ * @param bytes byte array to convert
+ * @param sep separator between each bytes
+ * @param buf StringBuffer to append to.
+ * @return {@code buf}
+ */
public static StringBuffer toHexStringBuffer(final byte[] bytes,
- final String sep, StringBuffer buf) {
+ final String sep, final StringBuffer buf) {
if (bytes == null) {
return buf;
}
diff --git a/src/main/java/net/onrc/onos/datastore/utils/KVObject.java b/src/main/java/net/onrc/onos/datastore/utils/KVObject.java
new file mode 100644
index 0000000..2aac0e3
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/utils/KVObject.java
@@ -0,0 +1,470 @@
+package net.onrc.onos.datastore.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import net.onrc.onos.datastore.DataStoreClient;
+import net.onrc.onos.datastore.IKVClient;
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.datastore.IKVTableID;
+import net.onrc.onos.datastore.IMultiEntryOperation;
+import net.onrc.onos.datastore.IMultiObjectOperation;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.internal.IModifiableMultiEntryOperation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Class to represent an Object represented as a single K-V pair Value blob.
+ *
+ */
+public class KVObject {
+ private static final Logger log = LoggerFactory.getLogger(KVObject.class);
+
+ // Default Kryo serializer.
+ // each sub-class should prepare their own serializer, which has required
+ // objects registered for better performance.
+ private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ // kryo.setRegistrationRequired(true);
+ // kryo.setReferences(false);
+ return kryo;
+ }
+ };
+
+ private final IKVTable table;
+ private final byte[] key;
+
+ /**
+ * serialized value version stored on data store or
+ * {@link IKVTable.VERSION_NONEXISTENT()} if is a new object.
+ */
+ private long version;
+
+ /**
+ * Map to store user-defined properties
+ */
+ private Map<Object, Object> propertyMap;
+
+ public KVObject(final IKVTable table, final byte[] key) {
+ this(table, key, null, table.VERSION_NONEXISTENT());
+ }
+
+ public KVObject(final IKVTable table, final byte[] key, final byte[] value, final long version) {
+ if (table == null) {
+ throw new IllegalArgumentException("table cannot be null");
+ }
+ if (key == null) {
+ throw new IllegalArgumentException("key cannot be null");
+ }
+ this.table = table;
+ this.key = key;
+ this.version = version;
+ this.propertyMap = new HashMap<Object, Object>();
+
+ if (value != null) {
+ deserialize(value);
+ }
+ }
+
+ protected static KVObject createFromKey(final byte[] key) {
+ // Equivalent of this method is expected to be implemented by SubClasses
+ throw new UnsupportedOperationException(
+ "createFromKey() is not expected to be called for RCObject");
+ }
+
+ public IKVTable getTable() {
+ return table;
+ }
+
+ public IKVTableID getTableId() {
+ return table.getTableId();
+ }
+
+ public byte[] getKey() {
+ return key;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+
+ /**
+ * Return user-defined object properties.
+ *
+ * @note Will not trigger deserialization
+ * @return Will return null, if never been set, or was not deserialized
+ */
+ protected Map<Object, Object> getPropertyMap() {
+ return this.propertyMap;
+ }
+
+ protected Map<Object, Object> replacePropertyMap(final Map<Object, Object> newMap) {
+ Map<Object, Object> oldMap = this.propertyMap;
+ this.propertyMap = newMap;
+ return oldMap;
+ }
+
+ /**
+ * Serialize object.
+ *
+ * sub-classes should override this method to customize serialization.
+ *
+ * @return serialized byte array
+ */
+ public byte[] serialize() {
+ return serializePropertyMap(defaultKryo.get(), this.propertyMap);
+ }
+
+ protected byte[] serializePropertyMap(final Kryo kryo,
+ final Map<Object, Object> propMap) {
+
+
+ // value
+ byte[] rcTemp = new byte[1024 * 1024];
+ Output output = new Output(rcTemp);
+ kryo.writeObject(output, propMap);
+ return output.toBytes();
+ }
+
+
+ /**
+ * Deserialize using value and version stored in data store.
+ *
+ * @param bytes serialized bytes
+ * @param version version of this {@code bytes}
+ * @return true if success
+ */
+ public boolean deserialize(final byte[] bytes, final long version) {
+ this.version = version;
+ return deserialize(bytes);
+ }
+
+ /**
+ * Deserialize object.
+ *
+ * sub-classes should override this method to customize deserialization.
+ *
+ * @param bytes serialized byte array
+ * @return true if success
+ */
+ protected boolean deserialize(final byte[] bytes) {
+ deserializePropertyMap(defaultKryo.get(), bytes);
+ return true;
+ }
+
+ /**
+ * Deserialize and set {@link propertyMap}.
+ * @param kryo serializer to use
+ * @param bytes Kryo serialized Map object
+ * @return true if success
+ */
+ protected boolean deserializePropertyMap(final Kryo kryo, final byte[] bytes) {
+ @SuppressWarnings("unchecked")
+ Map<Object, Object> map = deserializePropertyMap(kryo, bytes, HashMap.class);
+ if (map == null) {
+ map = new HashMap<>();
+ }
+ this.propertyMap = map;
+ return true;
+ }
+
+ protected <T extends Map<?, ?>> T deserializePropertyMap(final Kryo kryo,
+ final byte[] bytes, final Class<T> type) {
+
+ if (bytes == null || bytes.length == 0) {
+ return null;
+ }
+
+ Input input = new Input(bytes);
+ T map = kryo.readObject(input, type);
+
+ return map;
+ }
+
+
+ /**
+ * Create an Object in DataStore.
+ *
+ * Fails if the Object with same key already exists.
+ *
+ * @throws ObjectExistsException
+ */
+ public void create() throws ObjectExistsException {
+
+ if (this.propertyMap == null) {
+ log.warn("No object map was set. Setting empty Map.");
+ replacePropertyMap(new HashMap<Object, Object>());
+ }
+
+ this.version = table.create(key, this.serialize());
+ }
+
+ public void forceCreate() {
+
+ if (this.propertyMap == null) {
+ log.warn("No object map was set. Setting empty Map.");
+ replacePropertyMap(new HashMap<Object, Object>());
+ }
+
+ this.version = table.forceCreate(key, this.serialize());
+ }
+
+ /**
+ * Read an Object from DataStore.
+ *
+ * Fails if the Object with the key does not exist.
+ *
+ * @throws ObjectDoesntExistException
+ *
+ */
+ public void read() throws ObjectDoesntExistException {
+ IKVEntry e = table.read(key);
+ deserialize(e.getValue(), e.getVersion());
+ }
+
+ /**
+ * Update an existing Object in DataStore checking versions.
+ *
+ * Fails if the Object with key does not exists, or conditional failure.
+ *
+ * @throws WrongVersionException
+ * @throws ObjectDoesntExistException
+ */
+ public void update() throws ObjectDoesntExistException,
+ WrongVersionException {
+ if (this.propertyMap == null) {
+ replacePropertyMap(new HashMap<Object, Object>());
+ }
+
+ this.version = table.update(key, this.serialize(), version);
+ }
+
+ /**
+ * Remove an existing Object in DataStore.
+ *
+ * Fails if the Object with key does not exists.
+ *
+ * @throws ObjectDoesntExistException
+ * @throws WrongVersionException
+ */
+ public void delete() throws ObjectDoesntExistException,
+ WrongVersionException {
+ this.version = table.delete(key, this.version);
+ }
+
+ public void forceDelete() {
+ this.version = table.forceDelete(key);
+ }
+
+ public WriteOp forceCreateOp(IKVClient client) {
+ return new WriteOp(client.forceCreateOp(getTableId(), getKey(), serialize()), this);
+ }
+
+ public WriteOp createOp(IKVClient client) {
+ return new WriteOp(client.createOp(getTableId(), getKey(), serialize()), this);
+ }
+
+ // this might not be needed?
+ public WriteOp readOp(IKVClient client) {
+ return new WriteOp(client.readOp(getTableId(), getKey()), this);
+ }
+
+ public WriteOp updateOp(IKVClient client) {
+ return new WriteOp(client.updateOp(getTableId(), getKey(), serialize(), getVersion()), this);
+ }
+
+ public WriteOp deleteOp(IKVClient client) {
+ return new WriteOp(client.deleteOp(getTableId(), getKey(), serialize(), getVersion()), this);
+ }
+
+ public WriteOp forceDeleteOp(IKVClient client) {
+ return new WriteOp(client.forceDeleteOp(getTableId(), getKey()), this);
+ }
+
+ /**
+ * Multi-read RCObjects.
+ *
+ * If the blob value was read successfully, RCObject will deserialize them.
+ *
+ * @param objects
+ * RCObjects to read
+ * @return true if there exist a failed read.
+ */
+ public static boolean multiRead(final List<? extends KVObject> objects) {
+
+ final IKVClient client = DataStoreClient.getClient();
+
+ final ArrayList<IMultiEntryOperation> readOps = new ArrayList<>(objects.size());
+ for (KVObject o : objects) {
+ readOps.add(o.readOp(client));
+ }
+
+ boolean failExists = client.multiRead(readOps);
+
+ for (int i = 0; i < readOps.size(); ++i) {
+ KVObject obj = objects.get(i);
+ IMultiEntryOperation entry = readOps.get(i);
+ if ( entry.hasSucceeded() ) {
+ if ( !obj.deserialize(entry.getValue(), entry.getVersion()) ) {
+ //deserialize return true on success
+ failExists = true;
+ log.error("MultiRead error, failed to deserialize {}, {}", obj.getTable(), obj);
+ }
+ } else {
+ log.error("MultiRead error, skipping {}, {}", obj.getTable(), obj);
+ obj.version = obj.getTable().VERSION_NONEXISTENT();
+ failExists = true;
+ }
+ }
+
+ return failExists;
+ }
+
+ /**
+ * TODO Extract common interface
+ */
+ public static class WriteOp implements IMultiObjectOperation, IModifiableMultiEntryOperation {
+
+ private final IModifiableMultiEntryOperation base;
+ private final KVObject obj;
+
+ public WriteOp(IMultiEntryOperation base, final KVObject obj) {
+ this.base = (IModifiableMultiEntryOperation) base;
+ this.obj = obj;
+
+ // switch (base.getOperation()) {
+ // case CREATE:
+ // case FORCE_CREATE:
+ // case UPDATE:
+ // break;
+ // default:
+ // throw new UnsupportedOperationException("Unexpected OPERATION:"+base.getOperation());
+ // }
+ }
+
+ @Override
+ public KVObject getObject() {
+ return obj;
+ }
+
+ @Deprecated
+ public OPERATION getOp() {
+ return this.getOperation();
+ }
+
+ @Override
+ public boolean hasSucceeded() {
+ return base.hasSucceeded();
+ }
+
+ @Override
+ public STATUS getStatus() {
+ return base.getStatus();
+ }
+
+ @Override
+ public IKVTableID getTableId() {
+ return base.getTableId();
+ }
+
+ @Override
+ public byte[] getKey() {
+ return base.getKey();
+ }
+
+ @Override
+ public byte[] getValue() {
+ return base.getValue();
+ }
+
+ @Override
+ public long getVersion() {
+ return base.getVersion();
+ }
+
+ @Override
+ public OPERATION getOperation() {
+ return base.getOperation();
+ }
+
+ @Override
+ public void setStatus(STATUS status) {
+ base.setStatus(status);
+ }
+
+ @Override
+ public void setValue(byte[] value, long version) {
+ base.setValue(value, version);
+ }
+
+ @Override
+ public void setVersion(long version) {
+ base.setVersion(version);
+ this.obj.version = version;
+ }
+
+ @Override
+ public IModifiableMultiEntryOperation getActualOperation() {
+ return base;
+ }
+ }
+
+ public static boolean multiWrite(final List<WriteOp> objects) {
+
+ final IKVClient client = DataStoreClient.getClient();
+
+ final ArrayList<IMultiEntryOperation> writeOps = new ArrayList<>(objects.size());
+ for (WriteOp o : objects) {
+ writeOps.add(o);
+ }
+
+ return client.multiWrite(writeOps);
+ }
+
+ public abstract static class AbstractObjectIterator<E extends KVObject> implements
+ Iterator<E> {
+
+ protected Iterator<IKVEntry> enumerator;
+
+ public AbstractObjectIterator(final IKVTable table) {
+ this.enumerator = table.getAllEntries().iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return enumerator.hasNext();
+ }
+
+ // Implement something similar to below to realize Iterator
+ // @Override
+ // public E next() {
+ // IKVTable.IKVEntry o = enumerator.next();
+ // E obj = E.createFromKey(o.getKey());
+ // obj.deserialize(o.getValue(), o.getVersion());
+ // return obj;
+ // }
+
+ @Deprecated
+ @Override
+ public void remove() {
+ // TODO Not implemented, as I cannot find a use-case for it.
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/intent/persist/PersistIntent.java b/src/main/java/net/onrc/onos/intent/persist/PersistIntent.java
index 602762b..a5af79b 100755
--- a/src/main/java/net/onrc/onos/intent/persist/PersistIntent.java
+++ b/src/main/java/net/onrc/onos/intent/persist/PersistIntent.java
@@ -6,20 +6,24 @@
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Output;
-import edu.stanford.ramcloud.JRamCloud;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
+
import net.onrc.onos.datagrid.web.IntentResource;
-import net.onrc.onos.datastore.RCTable;
+import net.onrc.onos.datastore.DataStoreClient;
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.ObjectExistsException;
import net.onrc.onos.intent.IntentOperationList;
import net.onrc.onos.ofcontroller.networkgraph.INetworkGraphService;
import net.onrc.onos.ofcontroller.networkgraph.NetworkGraph;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
import net.onrc.onos.registry.controller.IControllerRegistryService;
import net.onrc.onos.registry.controller.IdBlock;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,24 +38,24 @@
NetworkGraph graph = null;
private final static String intentJournal = "G:IntentJournal";
private final static int valueStoreLimit = 1024 * 1024;
- private RCTable table;
+ private IKVTable table;
private Kryo kryo;
private ByteArrayOutputStream stream;
private Output output = null;
private AtomicLong nextId = null;
private long rangeEnd;
private IdBlock idBlock = null;
-
-
+
+
public PersistIntent(final IControllerRegistryService controllerRegistry, INetworkGraphService ng) {
this.controllerRegistry = controllerRegistry;
this.graph = ng.getNetworkGraph();
- table = RCTable.getTable(intentJournal);
+ table = DataStoreClient.getClient().getTable(intentJournal);
stream = new ByteArrayOutputStream(1024);
output = new Output(stream);
kryo = (new KryoFactory()).newKryo();
}
-
+
public long getKey() {
long key;
if (idBlock == null) {
@@ -64,15 +68,15 @@
}
return key;
}
-
+
private long getNextBlock() {
- // XXX This method is not thread safe, may lose allocated IdBlock
+ // XXX This method is not thread safe, may lose allocated IdBlock
idBlock = controllerRegistry.allocateUniqueIdBlock(range);
nextId = new AtomicLong(idBlock.getStart());
rangeEnd = idBlock.getEnd();
return nextId.get();
}
-
+
public boolean persistIfLeader(long key, IntentOperationList operations) {
boolean leader = true;
boolean ret = false;
@@ -114,7 +118,7 @@
stream.close();
log.debug("persist operations to ramcloud size of operations: {}", operations.size());
ret = true;
- } catch (JRamCloud.ObjectExistsException ex) {
+ } catch (ObjectExistsException ex) {
log.warn("Failed to store intent journal with key " + key);
} catch (IOException ex) {
log.error("Failed to close the stream");
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java
index a3ab24a..30b2f73 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java
@@ -4,12 +4,14 @@
import java.util.List;
import java.util.Collection;
-import net.onrc.onos.datastore.RCObject;
-import net.onrc.onos.datastore.RCObject.WriteOp;
-import net.onrc.onos.datastore.topology.RCLink;
-import net.onrc.onos.datastore.topology.RCPort;
-import net.onrc.onos.datastore.topology.RCPort.STATUS;
-import net.onrc.onos.datastore.topology.RCSwitch;
+import net.onrc.onos.datastore.DataStoreClient;
+import net.onrc.onos.datastore.IKVClient;
+import net.onrc.onos.datastore.topology.KVLink;
+import net.onrc.onos.datastore.topology.KVPort;
+import net.onrc.onos.datastore.topology.KVPort.STATUS;
+import net.onrc.onos.datastore.topology.KVSwitch;
+import net.onrc.onos.datastore.utils.KVObject;
+import net.onrc.onos.datastore.utils.KVObject.WriteOp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,22 +47,24 @@
log.debug("Adding switch {}", sw);
ArrayList<WriteOp> groupOp = new ArrayList<>();
- RCSwitch rcSwitch = new RCSwitch(sw.getDpid());
- rcSwitch.setStatus(RCSwitch.STATUS.ACTIVE);
+ KVSwitch rcSwitch = new KVSwitch(sw.getDpid());
+ rcSwitch.setStatus(KVSwitch.STATUS.ACTIVE);
+
+ IKVClient client = DataStoreClient.getClient();
// XXX Is ForceCreating Switch on DB OK here?
// If ForceCreating, who ever is calling this method needs
// to assure that DPID is unique cluster-wide, etc.
- groupOp.add(WriteOp.ForceCreate(rcSwitch));
+ groupOp.add(rcSwitch.forceCreateOp(client));
for (PortEvent portEvent : portEvents) {
- RCPort rcPort = new RCPort(sw.getDpid(), portEvent.getNumber());
- rcPort.setStatus(RCPort.STATUS.ACTIVE);
+ KVPort rcPort = new KVPort(sw.getDpid(), portEvent.getNumber());
+ rcPort.setStatus(KVPort.STATUS.ACTIVE);
- groupOp.add(WriteOp.ForceCreate(rcPort));
+ groupOp.add(rcPort.forceCreateOp(client));
}
- boolean failed = RCObject.multiWrite(groupOp);
+ boolean failed = KVObject.multiWrite(groupOp);
if (failed) {
log.error("Adding Switch {} and its ports failed.", sw.getDpid());
@@ -84,21 +88,23 @@
public boolean deactivateSwitch(SwitchEvent sw,
Collection<PortEvent> portEvents) {
log.debug("Deactivating switch {}", sw);
- RCSwitch rcSwitch = new RCSwitch(sw.getDpid());
+ KVSwitch rcSwitch = new KVSwitch(sw.getDpid());
+
+ IKVClient client = DataStoreClient.getClient();
List<WriteOp> groupOp = new ArrayList<>();
- rcSwitch.setStatus(RCSwitch.STATUS.INACTIVE);
+ rcSwitch.setStatus(KVSwitch.STATUS.INACTIVE);
- groupOp.add(WriteOp.ForceCreate(rcSwitch));
+ groupOp.add(rcSwitch.forceCreateOp(client));
for (PortEvent portEvent : portEvents) {
- RCPort rcPort = new RCPort(sw.getDpid(), portEvent.getNumber());
- rcPort.setStatus(RCPort.STATUS.INACTIVE);
+ KVPort rcPort = new KVPort(sw.getDpid(), portEvent.getNumber());
+ rcPort.setStatus(KVPort.STATUS.INACTIVE);
- groupOp.add(WriteOp.ForceCreate(rcPort));
+ groupOp.add(rcPort.forceCreateOp(client));
}
- boolean failed = RCObject.multiWrite(groupOp);
+ boolean failed = KVObject.multiWrite(groupOp);
return !failed;
}
@@ -112,10 +118,10 @@
public boolean addPort(PortEvent port) {
log.debug("Adding port {}", port);
- RCPort rcPort = new RCPort(port.getDpid(), port.getNumber());
- rcPort.setStatus(RCPort.STATUS.ACTIVE);
+ KVPort rcPort = new KVPort(port.getDpid(), port.getNumber());
+ rcPort.setStatus(KVPort.STATUS.ACTIVE);
rcPort.forceCreate();
- // TODO add description into RCPort
+ // TODO add description into KVPort
//rcPort.setDescription(port.getDescription());
return true;
@@ -130,7 +136,7 @@
public boolean deactivatePort(PortEvent port) {
log.debug("Deactivating port {}", port);
- RCPort rcPort = new RCPort(port.getDpid(), port.getNumber());
+ KVPort rcPort = new KVPort(port.getDpid(), port.getNumber());
rcPort.setStatus(STATUS.INACTIVE);
rcPort.forceCreate();
@@ -147,7 +153,7 @@
public boolean addLink(LinkEvent link) {
log.debug("Adding link {}", link);
- RCLink rcLink = new RCLink(link.getSrc().getDpid(),
+ KVLink rcLink = new KVLink(link.getSrc().getDpid(),
link.getSrc().getNumber(),
link.getDst().getDpid(),
link.getDst().getNumber());
@@ -157,7 +163,7 @@
// so we can force write here
//
// TODO: We need to check for errors
- rcLink.setStatus(RCLink.STATUS.ACTIVE);
+ rcLink.setStatus(KVLink.STATUS.ACTIVE);
rcLink.forceCreate();
return true; // Success
@@ -166,7 +172,7 @@
public boolean removeLink(LinkEvent linkEvent) {
log.debug("Removing link {}", linkEvent);
- RCLink rcLink = new RCLink(linkEvent.getSrc().getDpid(), linkEvent.getSrc().getNumber(),
+ KVLink rcLink = new KVLink(linkEvent.getSrc().getDpid(), linkEvent.getSrc().getNumber(),
linkEvent.getDst().getDpid(), linkEvent.getDst().getNumber());
rcLink.forceDelete();
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyManager.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyManager.java
index 3f3af2d..a323726 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyManager.java
@@ -17,9 +17,9 @@
import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.datagrid.IEventChannel;
import net.onrc.onos.datagrid.IEventChannelListener;
-import net.onrc.onos.datastore.topology.RCLink;
-import net.onrc.onos.datastore.topology.RCPort;
-import net.onrc.onos.datastore.topology.RCSwitch;
+import net.onrc.onos.datastore.topology.KVLink;
+import net.onrc.onos.datastore.topology.KVPort;
+import net.onrc.onos.datastore.topology.KVSwitch;
import net.onrc.onos.ofcontroller.networkgraph.PortEvent.SwitchPort;
import net.onrc.onos.ofcontroller.util.EventEntry;
import net.onrc.onos.registry.controller.IControllerRegistryService;
@@ -1141,8 +1141,8 @@
// how we initially subscribe to replication events
// Add all active switches
- for (RCSwitch sw : RCSwitch.getAllSwitches()) {
- if (sw.getStatus() != RCSwitch.STATUS.ACTIVE) {
+ for (KVSwitch sw : KVSwitch.getAllSwitches()) {
+ if (sw.getStatus() != KVSwitch.STATUS.ACTIVE) {
continue;
}
@@ -1155,8 +1155,8 @@
}
// Add all active ports
- for (RCPort p : RCPort.getAllPorts()) {
- if (p.getStatus() != RCPort.STATUS.ACTIVE) {
+ for (KVPort p : KVPort.getAllPorts()) {
+ if (p.getStatus() != KVPort.STATUS.ACTIVE) {
continue;
}
@@ -1169,14 +1169,14 @@
}
// TODO Is Device going to be in DB? If so, read from DB.
- // for (RCDevice d : RCDevice.getAllDevices()) {
+ // for (KVDevice d : KVDevice.getAllDevices()) {
// DeviceEvent devEvent = new DeviceEvent( MACAddress.valueOf(d.getMac()) );
// for (byte[] portId : d.getAllPortIds() ) {
- // devEvent.addAttachmentPoint( new SwitchPort( RCPort.getDpidFromKey(portId), RCPort.getNumberFromKey(portId) ));
+ // devEvent.addAttachmentPoint( new SwitchPort( KVPort.getDpidFromKey(portId), KVPort.getNumberFromKey(portId) ));
// }
// }
- for (RCLink l : RCLink.getAllLinks()) {
+ for (KVLink l : KVLink.getAllLinks()) {
LinkEvent linkEvent = new LinkEvent(l.getSrc().dpid,
l.getSrc().number,
l.getDst().dpid,
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/web/RamcloudLinksResource.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/web/RamcloudLinksResource.java
index abd1831..ad5059c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/web/RamcloudLinksResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/web/RamcloudLinksResource.java
@@ -1,6 +1,6 @@
package net.onrc.onos.ofcontroller.networkgraph.web;
-import net.onrc.onos.datastore.topology.RCLink;
+import net.onrc.onos.datastore.topology.KVLink;
import org.restlet.resource.Get;
import org.restlet.resource.ServerResource;
@@ -8,7 +8,7 @@
public class RamcloudLinksResource extends ServerResource {
@Get("json")
- public Iterable<RCLink> retrieve() {
- return RCLink.getAllLinks();
+ public Iterable<KVLink> retrieve() {
+ return KVLink.getAllLinks();
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/web/RamcloudPortsResource.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/web/RamcloudPortsResource.java
index b2f88dd..bac73b3 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/web/RamcloudPortsResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/web/RamcloudPortsResource.java
@@ -1,6 +1,6 @@
package net.onrc.onos.ofcontroller.networkgraph.web;
-import net.onrc.onos.datastore.topology.RCPort;
+import net.onrc.onos.datastore.topology.KVPort;
import org.restlet.resource.Get;
import org.restlet.resource.ServerResource;
@@ -8,7 +8,7 @@
public class RamcloudPortsResource extends ServerResource {
@Get("json")
- public Iterable<RCPort> retrieve() {
- return RCPort.getAllPorts();
+ public Iterable<KVPort> retrieve() {
+ return KVPort.getAllPorts();
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/web/RamcloudSwitchesResource.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/web/RamcloudSwitchesResource.java
index e8998b0..b8a8fe6 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/web/RamcloudSwitchesResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/web/RamcloudSwitchesResource.java
@@ -1,6 +1,6 @@
package net.onrc.onos.ofcontroller.networkgraph.web;
-import net.onrc.onos.datastore.topology.RCSwitch;
+import net.onrc.onos.datastore.topology.KVSwitch;
import org.restlet.resource.Get;
import org.restlet.resource.ServerResource;
@@ -8,8 +8,8 @@
public class RamcloudSwitchesResource extends ServerResource {
@Get("json")
- public Iterable<RCSwitch> retrieve() {
- return RCSwitch.getAllSwitches();
+ public Iterable<KVSwitch> retrieve() {
+ return KVSwitch.getAllSwitches();
}
}