WIP:RC Basic+Switch+Port+Link+Device+MultiRead
Change-Id: I487d3c2f07311416b3ae6e55fbc76d38a64c00e2
diff --git a/src/main/java/net/onrc/onos/datastore/RCClient.java b/src/main/java/net/onrc/onos/datastore/RCClient.java
new file mode 100644
index 0000000..df6a8ce
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/RCClient.java
@@ -0,0 +1,30 @@
+package net.onrc.onos.datastore;
+
+import edu.stanford.ramcloud.JRamCloud;
+
+public class RCClient {
+
+ // FIXME come up with a proper way to retrieve configuration
+ public static final int MAX_MULTI_READS = Integer.valueOf(System
+ .getProperty("ramcloud.max_multi_reads", "400"));
+
+ private static final ThreadLocal<JRamCloud> tlsRCClient = new ThreadLocal<JRamCloud>() {
+ @Override
+ protected JRamCloud initialValue() {
+ // FIXME come up with a proper way to retrieve configuration
+ return new JRamCloud(System.getProperty("ramcloud.coordinator",
+ "fast+udp:host=127.0.0.1,port=12246"));
+ }
+ };
+
+ /**
+ * @return JRamCloud instance intended to be used only within the
+ * SameThread.
+ * @note Do not store the returned instance in a member variable, etc. which
+ * may be accessed later by another thread.
+ */
+ static JRamCloud getClient() {
+ return tlsRCClient.get();
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/RCObject.java b/src/main/java/net/onrc/onos/datastore/RCObject.java
new file mode 100644
index 0000000..5bea2c5
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/RCObject.java
@@ -0,0 +1,308 @@
+package net.onrc.onos.datastore;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import net.onrc.onos.datastore.RCTable.Entry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import edu.stanford.ramcloud.JRamCloud;
+import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
+import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
+import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
+
+/**
+ * Class to represent an Object represented as a single K-V pair Value blob.
+ *
+ */
+public class RCObject {
+ private static final Logger log = LoggerFactory.getLogger(RCObject.class);
+ /**
+ * Version number which represents that the object doesnot exist, or hase
+ * never read the DB before.
+ */
+ public static final long VERSION_NONEXISTENT = 0L;
+
+ // Each Object should prepare their own serializer, which has required
+ // objects registered.
+ private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ // kryo.setRegistrationRequired(true);
+ // TODO TreeMap or just Map
+ // kryo.register(TreeMap.class);
+ kryo.setReferences(false);
+ return kryo;
+ }
+ };
+
+ private final RCTable table;
+ private final byte[] key;
+ private byte[] value;
+ private long version;
+
+ private Map<Object, Object> propertyMap;
+
+ public RCObject(RCTable table, byte[] key) {
+ this(table, key, null);
+ }
+
+ public RCObject(RCTable table, byte[] key, byte[] value) {
+ if (table == null) {
+ throw new IllegalArgumentException("table cannot be null");
+ }
+ if (key == null) {
+ throw new IllegalArgumentException("key cannot be null");
+ }
+ this.table = table;
+ this.key = key;
+ this.value = value;
+ this.version = VERSION_NONEXISTENT;
+ this.propertyMap = new HashMap<Object, Object>();
+
+ if (this.value != null) {
+ deserializeObjectFromValue();
+ }
+ }
+
+ public RCTable getTable() {
+ return table;
+ }
+
+ public long getTableId() {
+ return table.getTableId();
+ }
+
+ public byte[] getKey() {
+ return key;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+
+ /**
+ * Return serialized Value.
+ *
+ * @note will not trigger serialization
+ * @return Will return null, if never been read, or was not serialized
+ */
+ public byte[] getSerializedValue() {
+ return value;
+ }
+
+ /**
+ * Return Object as a Map.
+ *
+ * @note Will not trigger deserialization
+ * @return Will return null, if never been set, or was not deserialized
+ */
+ protected Map<Object, Object> getObjectMap() {
+ return this.propertyMap;
+ }
+
+ protected Map<Object, Object> setObjectMap(Map<Object, Object> new_map) {
+ Map<Object, Object> old_map = this.propertyMap;
+ this.propertyMap = new_map;
+ return old_map;
+ }
+
+ public void serializeAndSetValue() {
+ serializeAndSetValue(defaultKryo.get(), this.propertyMap);
+ }
+
+ protected void serializeAndSetValue(Kryo kryo, Map<Object, Object> javaObject) {
+
+ // value
+ byte[] rcTemp = new byte[1024 * 1024];
+ Output output = new Output(rcTemp);
+ kryo.writeObject(output, javaObject);
+ this.value = output.toBytes();
+ }
+
+ /**
+ * Deserialize
+ *
+ * @return
+ */
+ public Map<Object, Object> deserializeObjectFromValue() {
+ return deserializeObjectFromValue(defaultKryo.get());
+ }
+
+ protected HashMap<Object, Object> deserializeObjectFromValue(Kryo kryo) {
+ return deserializeObjectFromValue(kryo, HashMap.class);
+ }
+
+ protected <T extends Map> T deserializeObjectFromValue(Kryo kryo, Class<T> type) {
+ if (this.value == null)
+ return null;
+
+ Input input = new Input(this.value);
+ T map = kryo.readObject(input, type);
+ this.propertyMap = map;
+
+ return map;
+ }
+
+ /**
+ * Create an Object in DataStore.
+ *
+ * Fails if the Object with same key already exists.
+ *
+ * @note create an Empty Object if no object has never been set.
+ * @throws ObjectExistsException
+ */
+ public void create() throws ObjectExistsException {
+
+ if (this.propertyMap == null) {
+ log.warn("No object map was set. Setting empty Map.");
+ setObjectMap(new HashMap<Object, Object>());
+ }
+ serializeAndSetValue();
+
+ this.version = table.create(key, value);
+ }
+
+ /**
+ * Read an Object from DataStore.
+ *
+ * Fails if the Object with the key does not exist.
+ *
+ * @throws ObjectDoesntExistException
+ *
+ */
+ public void read() throws ObjectDoesntExistException {
+ Entry e = table.read(key);
+ this.value = e.value;
+ this.version = e.version;
+
+ // TODO should we deserialize immediately?
+ deserializeObjectFromValue();
+ }
+
+ /**
+ * Update an existing Object in DataStore checking versions.
+ *
+ * Fails if the Object with key does not exists, or conditional failure.
+ *
+ * @throws WrongVersionException
+ * @throws ObjectDoesntExistException
+ */
+ public void update() throws ObjectDoesntExistException,
+ WrongVersionException {
+ if (this.propertyMap == null) {
+ setObjectMap(new HashMap<Object, Object>());
+ }
+ serializeAndSetValue();
+
+ this.version = table.update(key, value, version);
+ }
+
+ /**
+ * Remove an existing Object in DataStore.
+ *
+ * Fails if the Object with key does not exists.
+ *
+ * @throws ObjectDoesntExistException
+ */
+ public void delete() throws ObjectDoesntExistException {
+ this.version = table.delete(key);
+ }
+
+ /**
+ * Multi-read RCObjects.
+ *
+ * If the blob value was read successfully, RCObject will deserialize them.
+ *
+ * @param objects
+ * RCObjects to read
+ * @return true if there exist an failed read.
+ */
+ public static boolean multiRead(Collection<RCObject> objects) {
+ boolean fail_exists = false;
+
+ ArrayList<RCObject> req = new ArrayList<>();
+ Iterator<RCObject> it = objects.iterator();
+ while (it.hasNext()) {
+
+ req.add(it.next());
+
+ if (req.size() >= RCClient.MAX_MULTI_READS) {
+ // dispatch multiRead
+ fail_exists |= multiReadInternal(req);
+ req.clear();
+ }
+ }
+
+ if (!req.isEmpty()) {
+ // dispatch multiRead
+ fail_exists |= multiReadInternal(req);
+ req.clear();
+ }
+
+ return fail_exists;
+ }
+
+ private static boolean multiReadInternal(ArrayList<RCObject> req) {
+ boolean fail_exists = false;
+ JRamCloud rcClient = RCClient.getClient();
+
+ final int reqs = req.size();
+ JRamCloud.multiReadObject mrObjs[] = new JRamCloud.multiReadObject[reqs];
+
+ // setup multi-read operation
+ for (int i = 0; i < reqs; ++i) {
+ RCObject obj = req.get(i);
+ mrObjs[i] = new JRamCloud.multiReadObject(obj.getTableId(),
+ obj.getKey());
+ }
+
+ // execute
+ JRamCloud.Object results[] = rcClient.multiRead(mrObjs);
+ assert (results.length <= req.size());
+
+ // reflect changes to RCObject
+ for (int i = 0; i < results.length; ++i) {
+ RCObject obj = req.get(i);
+ if (results[i] == null) {
+ log.error("MultiRead error, skipping {}, {}", obj.getTable(),
+ obj);
+ fail_exists = true;
+ continue;
+ }
+ assert (Arrays.equals(results[i].key, obj.getKey()));
+
+ obj.value = results[i].value;
+ obj.version = results[i].version;
+ if (obj.version == VERSION_NONEXISTENT) {
+ fail_exists = true;
+ } else {
+ obj.deserializeObjectFromValue();
+ }
+ }
+
+ return fail_exists;
+ }
+
+ /**
+ * Get All of it's kind?
+ */
+ public static Collection<? extends RCObject> getAllObjects() {
+ // TODO implement
+ throw new UnsupportedOperationException("Not implemented yet");
+ //Collection<? extends RCObject> list = new ArrayList<>();
+ //return list;
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/RCTable.java b/src/main/java/net/onrc/onos/datastore/RCTable.java
new file mode 100644
index 0000000..4836f24
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/RCTable.java
@@ -0,0 +1,165 @@
+package net.onrc.onos.datastore;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.stanford.ramcloud.JRamCloud;
+import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
+import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
+import edu.stanford.ramcloud.JRamCloud.RejectRules;
+import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
+
+/**
+ * Class to represent a Table in RAMCloud
+ *
+ */
+public class RCTable {
+ private static final Logger log = LoggerFactory.getLogger(RCTable.class);
+
+ private static final ConcurrentHashMap<String, RCTable> table_map = new ConcurrentHashMap<>();
+
+ /**
+ *
+ * @param table
+ * Table to drop.
+ * @note Instance passed must not be use after successful call.
+ *
+ */
+ public static void dropTable(RCTable table) {
+ JRamCloud rcClient = RCClient.getClient();
+ // TODO mark the table instance as dropped?
+ rcClient.dropTable(table.getTableName());
+ table_map.remove(table.getTableName());
+ }
+
+ public static RCTable getTable(String tableName) {
+ RCTable table = table_map.get(tableName);
+ if (table == null) {
+ RCTable new_table = new RCTable(tableName);
+ RCTable existing_table = table_map
+ .putIfAbsent(tableName, new_table);
+ if (existing_table != null) {
+ return existing_table;
+ } else {
+ return new_table;
+ }
+ }
+ return table;
+ }
+
+ public static class Entry {
+ public final byte[] key;
+ public byte[] value;
+ public long version;
+
+ public Entry(byte[] key, byte[] value, long version) {
+ this.key = key;
+ this.value = value;
+ this.version = version;
+ }
+ }
+
+ // finally the Table itself
+
+ private final long rcTableId;
+ private final String rcTableName;
+
+ // private boolean isDropped = false;
+
+ /**
+ *
+ * @note rcTableName must be unique cluster wide.
+ * @param rcTableName
+ */
+ private RCTable(String rcTableName) {
+ JRamCloud rcClient = RCClient.getClient();
+
+ this.rcTableName = rcTableName;
+
+ // FIXME Is it better to create table here or at getTable
+ this.rcTableId = rcClient.createTable(rcTableName);
+ }
+
+ public long getTableId() {
+ return this.rcTableId;
+ }
+
+ public String getTableName() {
+ return this.rcTableName;
+ }
+
+ // TODO: Enumerate whole table?
+
+ // Reject if exist
+ public long create(final byte[] key, final byte[] value)
+ throws ObjectExistsException {
+
+ JRamCloud rcClient = RCClient.getClient();
+
+ RejectRules rules = rcClient.new RejectRules();
+ rules.setExists();
+
+ long updated_version = rcClient.writeRule(this.rcTableId, key, value,
+ rules);
+ return updated_version;
+ }
+
+ // read
+ public Entry read(final byte[] key) throws ObjectDoesntExistException {
+
+ JRamCloud rcClient = RCClient.getClient();
+
+ // FIXME underlying JRamCloud cannot detect "not exist"
+ // RejectRules rules = rcClient.new RejectRules();
+ // rules.setDoesntExists();
+ // JRamCloud.Object rcObj = rcClient.read(this.rcTableId, key, rules);
+ JRamCloud.Object rcObj = rcClient.read(this.rcTableId, key);
+
+ return new Entry(rcObj.key, rcObj.value, rcObj.version);
+ }
+
+ // Reject if version neq
+ public long update(final byte[] key, final byte[] value, final long version)
+ throws ObjectDoesntExistException, WrongVersionException {
+
+ JRamCloud rcClient = RCClient.getClient();
+
+ RejectRules rules = rcClient.new RejectRules();
+ rules.setDoesntExists();
+ rules.setNeVersion(version);
+
+ long updated_version = rcClient.writeRule(this.rcTableId, key, value,
+ rules);
+ return updated_version;
+ }
+
+ // Reject if not exist
+ public long update(final byte[] key, final byte[] value)
+ throws ObjectDoesntExistException {
+
+ JRamCloud rcClient = RCClient.getClient();
+
+ RejectRules rules = rcClient.new RejectRules();
+ rules.setDoesntExists();
+
+ long updated_version = rcClient.writeRule(this.rcTableId, key, value,
+ rules);
+ return updated_version;
+
+ }
+
+ // Reject if not exist
+ public long delete(final byte[] key) throws ObjectDoesntExistException {
+ JRamCloud rcClient = RCClient.getClient();
+
+ // FIXME underlying JRamCloud does not support cond remove
+ RejectRules rules = rcClient.new RejectRules();
+ rules.setDoesntExists();
+
+ long removed_version = rcClient.remove(this.rcTableId, key, rules);
+ return removed_version;
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/RCDevice.java b/src/main/java/net/onrc/onos/datastore/topology/RCDevice.java
new file mode 100644
index 0000000..0e8afe4
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/topology/RCDevice.java
@@ -0,0 +1,155 @@
+package net.onrc.onos.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import net.onrc.onos.datastore.RCObject;
+import net.onrc.onos.datastore.RCTable;
+import net.onrc.onos.datastore.topology.RCLink.STATUS;
+import net.onrc.onos.datastore.utils.ByteArrayComparator;
+
+public class RCDevice extends RCObject {
+ @SuppressWarnings("unused")
+ private static final Logger log = LoggerFactory.getLogger(RCDevice.class);
+
+ private static final ThreadLocal<Kryo> deviceKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
+ };
+
+ public static final String GLOBAL_DEVICE_TABLE_NAME = "G:Device";
+
+ // FIXME these should be Enum or some number, not String
+ private static final String PROP_MAC = "mac";
+ private static final String PROP_PORT_IDS = "port-ids";
+
+ private final byte[] mac;
+ private TreeSet<byte[]> portIds;
+ transient private boolean isPortIdsModified;
+
+ // Assuming mac is unique cluster-wide
+ public static byte[] getDeviceID(final byte[] mac) {
+ return ByteBuffer.allocate(2 + mac.length).putChar('D').put(mac)
+ .array();
+ }
+
+ public static byte[] getMacFromKey(byte[] key) {
+ ByteBuffer keyBuf = ByteBuffer.wrap(key);
+ if (keyBuf.getChar() != 'D') {
+ throw new IllegalArgumentException("Invalid Device key");
+ }
+ byte[] mac = new byte[keyBuf.remaining()];
+ keyBuf.get(mac);
+ return mac;
+ }
+
+ public RCDevice(byte[] mac) {
+ super(RCTable.getTable(GLOBAL_DEVICE_TABLE_NAME), getDeviceID(mac));
+
+ this.mac = mac;
+ this.portIds = new TreeSet<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ this.isPortIdsModified = true;
+ }
+
+ public static RCDevice createFromKey(byte[] key) {
+ return new RCDevice(getMacFromKey(key));
+ }
+
+ public byte[] getMac() {
+ // TODO may need to clone() to be sure this object will be immutable.
+ return mac;
+ }
+
+ public byte[] getId() {
+ return getKey();
+ }
+
+ public void addPortId(byte[] portId) {
+ // TODO: Should we copy portId, or reference is OK.
+ isPortIdsModified |= portIds.add(portId);
+ }
+
+ public void removePortId(byte[] portId) {
+ isPortIdsModified |= portIds.remove(portId);
+ }
+
+ public void emptyPortIds() {
+ portIds.clear();
+ this.isPortIdsModified = true;
+ }
+
+ public void addAllToPortIds(Collection<byte[]> portIds) {
+ // TODO: Should we copy portId, or reference is OK.
+ isPortIdsModified |= this.portIds.addAll(portIds);
+ }
+
+ /**
+ *
+ * @return Unmodifiable Set view of all the PortIds;
+ */
+ public Set<byte[]> getAllPortIds() {
+ return Collections.unmodifiableSet(portIds);
+ }
+
+ @Override
+ public void serializeAndSetValue() {
+ Map<Object, Object> map = getObjectMap();
+
+ map.put(PROP_MAC, mac);
+ if (isPortIdsModified) {
+ byte[] portIdArray[] = new byte[portIds.size()][];
+ map.put(PROP_PORT_IDS, portIds.toArray(portIdArray));
+ isPortIdsModified = false;
+ }
+
+ serializeAndSetValue(deviceKryo.get(), map);
+ }
+
+ @Override
+ public Map<Object, Object> deserializeObjectFromValue() {
+ Map<Object, Object> map = deserializeObjectFromValue(deviceKryo.get());
+
+ if (this.portIds == null) {
+ this.portIds = new TreeSet<>(
+ ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ }
+ byte[] portIdArray[] = (byte[][]) map.get(PROP_PORT_IDS);
+ if (portIdArray != null) {
+ this.portIds.clear();
+ this.portIds.addAll(Arrays.asList(portIdArray));
+ isPortIdsModified = false;
+ } else {
+ // trigger write on next serialize
+ isPortIdsModified = true;
+ }
+
+ return map;
+ }
+
+ public static void main(String[] args) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/RCLink.java b/src/main/java/net/onrc/onos/datastore/topology/RCLink.java
new file mode 100644
index 0000000..957ef88
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/topology/RCLink.java
@@ -0,0 +1,135 @@
+package net.onrc.onos.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import net.onrc.onos.datastore.RCObject;
+import net.onrc.onos.datastore.RCTable;
+
+public class RCLink extends RCObject {
+ @SuppressWarnings("unused")
+ private static final Logger log = LoggerFactory.getLogger(RCLink.class);
+
+ private static final ThreadLocal<Kryo> linkKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
+ };
+
+ public static class SwitchPort {
+ public final Long dpid;
+ public final Long number;
+
+ public SwitchPort(Long dpid, Long number) {
+ this.dpid = dpid;
+ this.number = number;
+ }
+
+ public byte[] getPortID() {
+ return RCPort.getPortID(dpid, number);
+ }
+
+ public byte[] getSwitchID() {
+ return RCSwitch.getSwichID(dpid);
+ }
+ }
+
+ public static final String GLOBAL_LINK_TABLE_NAME = "G:Link";
+
+ // FIXME these should be Enum or some number, not String
+ private static final String PROP_STATUS = "status";
+ private static final String PROP_SRC_SW_ID = "src-sw-id";
+ private static final String PROP_SRC_PORT_ID = "src-port-id";
+ private static final String PROP_DST_SW_ID = "dst-sw-id";
+ private static final String PROP_DST_PORT_ID = "dst-port-id";
+
+ // must not re-order enum members, ordinal will be sent over wire
+ public enum STATUS {
+ INACTIVE, ACTIVE;
+ }
+
+ private final SwitchPort src;
+ private final SwitchPort dst;
+ private STATUS status;
+
+ public static final int LINKID_BYTES = 2 + RCPort.PORTID_BYTES * 2;
+
+ public static byte[] getLinkID(Long src_dpid, Long src_port_no,
+ Long dst_dpid, Long dst_port_no) {
+ return ByteBuffer.allocate(LINKID_BYTES).putChar('L')
+ .put(RCPort.getPortID(src_dpid, src_port_no))
+ .put(RCPort.getPortID(dst_dpid, dst_port_no)).array();
+ }
+
+ public RCLink(Long src_dpid, Long src_port_no, Long dst_dpid,
+ Long dst_port_no) {
+ super(RCTable.getTable(GLOBAL_LINK_TABLE_NAME), getLinkID(src_dpid,
+ src_port_no, dst_dpid, dst_port_no));
+
+ src = new SwitchPort(src_dpid, src_port_no);
+ dst = new SwitchPort(dst_dpid, dst_port_no);
+ status = STATUS.INACTIVE;
+ }
+
+ public STATUS getStatus() {
+ return status;
+ }
+
+ public void setStatus(STATUS status) {
+ this.status = status;
+ getObjectMap().put(PROP_STATUS, status);
+ }
+
+ public SwitchPort getSrc() {
+ return src;
+ }
+
+ public SwitchPort getDst() {
+ return dst;
+ }
+
+ public byte[] getId() {
+ return getKey();
+ }
+
+ @Override
+ public void serializeAndSetValue() {
+ Map<Object, Object> map = getObjectMap();
+
+ map.put(PROP_SRC_SW_ID, src.getSwitchID());
+ map.put(PROP_SRC_PORT_ID, src.getPortID());
+ map.put(PROP_DST_SW_ID, dst.getSwitchID());
+ map.put(PROP_DST_PORT_ID, dst.getPortID());
+
+ serializeAndSetValue(linkKryo.get(), map);
+ }
+
+ @Override
+ public Map<Object, Object> deserializeObjectFromValue() {
+ Map<Object, Object> map = deserializeObjectFromValue(linkKryo.get());
+
+ this.status = (STATUS) map.get(PROP_STATUS);
+ return map;
+ }
+
+ public static void main(String[] args) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/RCPort.java b/src/main/java/net/onrc/onos/datastore/topology/RCPort.java
new file mode 100644
index 0000000..cdaf383
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/topology/RCPort.java
@@ -0,0 +1,263 @@
+package net.onrc.onos.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import net.onrc.onos.datastore.RCObject;
+import net.onrc.onos.datastore.RCTable;
+import net.onrc.onos.datastore.utils.ByteArrayComparator;
+
+public class RCPort extends RCObject {
+ private static final Logger log = LoggerFactory.getLogger(RCPort.class);
+
+ private static final ThreadLocal<Kryo> portKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
+ };
+
+ public static final String GLOBAL_PORT_TABLE_NAME = "G:Port";
+
+ // FIXME these should be Enum or some number, not String
+ private static final String PROP_DPID = "dpid";
+ private static final String PROP_NUMBER = "number";
+ private static final String PROP_STATUS = "status";
+ private static final String PROP_LINK_IDS = "link-ids";
+ private static final String PROP_DEVICE_IDS = "device-ids";
+
+ // must not re-order enum members, ordinal will be sent over wire
+ public enum STATUS {
+ INACTIVE, ACTIVE;
+ }
+
+ private final Long dpid;
+ private final Long number;
+
+ private STATUS status;
+ // XXX These 2 set of Ids can be removed from DataStore, if In-Memory cache
+ // build the indexing info from Link.
+ private TreeSet<byte[]> linkIds;
+ transient boolean isLinkIdsModified;
+ private TreeSet<byte[]> deviceIds;
+ transient boolean isDeviceIdsModified;
+
+ public static final int PORTID_BYTES = RCSwitch.SWITCHID_BYTES + 2 + 8;
+
+ public static byte[] getPortID(Long dpid, Long number) {
+ if (dpid == null) {
+ throw new IllegalArgumentException("dpid cannot be null");
+ }
+ if (number == null) {
+ throw new IllegalArgumentException("number cannot be null");
+ }
+ return ByteBuffer.allocate(PORTID_BYTES).putChar('S').putLong(dpid)
+ .putChar('P').putLong(number).array();
+ }
+
+ public static long getDpidFromKey(byte[] key) {
+ ByteBuffer keyBuf = ByteBuffer.wrap(key);
+ if (keyBuf.getChar() != 'S') {
+ throw new IllegalArgumentException("Invalid Port key");
+ }
+ return keyBuf.getLong();
+ }
+
+ public static long getNumberFromKey(byte[] key) {
+ ByteBuffer keyBuf = ByteBuffer.wrap(key);
+ keyBuf.position(RCSwitch.SWITCHID_BYTES);
+ if (keyBuf.getChar() != 'P') {
+ throw new IllegalArgumentException("Invalid Port key");
+ }
+ return keyBuf.getLong();
+ }
+
+ // FIXME specify DPID,number here, or Should caller specify the key it self?
+ // In other words, should layer above have the control of the ID?
+ public RCPort(Long dpid, Long number) {
+ super(RCTable.getTable(GLOBAL_PORT_TABLE_NAME), getPortID(dpid, number));
+
+ // TODO Auto-generated constructor stub
+
+ this.dpid = dpid;
+ this.number = number;
+ this.status = STATUS.INACTIVE;
+ this.linkIds = new TreeSet<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ this.isLinkIdsModified = true;
+ this.deviceIds = new TreeSet<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ this.isDeviceIdsModified = true;
+ }
+
+ public static RCPort createFromKey(byte[] key) {
+ return new RCPort(getDpidFromKey(key), getNumberFromKey(key));
+ }
+
+ public STATUS getStatus() {
+ return status;
+ }
+
+ public void setStatus(STATUS status) {
+ this.status = status;
+ getObjectMap().put(PROP_STATUS, status);
+ }
+
+ public Long getDpid() {
+ return dpid;
+ }
+
+ public Long getNumber() {
+ return number;
+ }
+
+ public byte[] getId() {
+ return getKey();
+ }
+
+ public void addLinkId(byte[] linkId) {
+ isLinkIdsModified |= linkIds.add(linkId);
+ }
+
+ public void removeLinkId(byte[] linkId) {
+ isLinkIdsModified |= linkIds.remove(linkId);
+ }
+
+ public void emptyLinkIds() {
+ linkIds.clear();
+ isLinkIdsModified = true;
+ }
+
+ public void addAllToLinkIds(Collection<byte[]> linkIds) {
+ isLinkIdsModified |= this.linkIds.addAll(linkIds);
+ }
+
+ /**
+ *
+ * @return Unmodifiable Set view of all the LinkIds;
+ */
+ public Set<byte[]> getAllLinkIds() {
+ return Collections.unmodifiableSet(linkIds);
+ }
+
+ public void addDeviceId(byte[] deviceId) {
+ isDeviceIdsModified |= deviceIds.add(deviceId);
+ }
+
+ public void removeDeviceId(byte[] deviceId) {
+ isDeviceIdsModified |= deviceIds.remove(deviceId);
+ }
+
+ public void emptyDeviceIds() {
+ deviceIds.clear();
+ isDeviceIdsModified = true;
+ }
+
+ public void addAllToDeviceIds(Collection<byte[]> deviceIds) {
+ isDeviceIdsModified |= this.deviceIds.addAll(deviceIds);
+ }
+
+ /**
+ *
+ * @return Unmodifiable Set view of all the LinkIds;
+ */
+ public Set<byte[]> getAllDeviceIds() {
+ return Collections.unmodifiableSet(deviceIds);
+ }
+
+ @Override
+ public void serializeAndSetValue() {
+ Map<Object, Object> map = getObjectMap();
+
+ map.put(PROP_DPID, this.dpid);
+ map.put(PROP_NUMBER, this.number);
+ if (isLinkIdsModified) {
+ byte[] linkIdArray[] = new byte[linkIds.size()][];
+ map.put(PROP_LINK_IDS, linkIds.toArray(linkIdArray));
+ isLinkIdsModified = false;
+ }
+ if (isDeviceIdsModified) {
+ byte[] deviceIdArray[] = new byte[deviceIds.size()][];
+ map.put(PROP_DEVICE_IDS, deviceIds.toArray(deviceIdArray));
+ isDeviceIdsModified = false;
+ }
+ if (log.isWarnEnabled() && (linkIds.size() * deviceIds.size()) != 0) {
+ log.warn("Either #LinkIds:{} or #DeviceIds:{} is expected to be 0",
+ linkIds.size(), deviceIds.size());
+ }
+
+ serializeAndSetValue(portKryo.get(), map);
+ }
+
+ @Override
+ public Map<Object, Object> deserializeObjectFromValue() {
+ Map<Object, Object> map = deserializeObjectFromValue(portKryo.get());
+
+ this.status = (STATUS) map.get(PROP_STATUS);
+
+ if (this.linkIds == null) {
+ this.linkIds = new TreeSet<>(
+ ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ }
+ byte[] linkIdArray[] = (byte[][]) map.get(PROP_LINK_IDS);
+ if (linkIdArray != null) {
+ this.linkIds.clear();
+ this.linkIds.addAll(Arrays.asList(linkIdArray));
+ isLinkIdsModified = false;
+ } else {
+ // trigger write on next serialize
+ isLinkIdsModified = true;
+ }
+
+ if (this.deviceIds == null) {
+ this.deviceIds = new TreeSet<>(
+ ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ }
+ byte[] deviceIdArray[] = (byte[][]) map.get(PROP_DEVICE_IDS);
+ if (deviceIdArray != null) {
+ this.deviceIds.clear();
+ this.deviceIds.addAll(Arrays.asList(deviceIdArray));
+ isDeviceIdsModified = false;
+ } else {
+ // trigger write on next serialize
+ isDeviceIdsModified = true;
+ }
+
+ if (log.isWarnEnabled() && (linkIds.size() * deviceIds.size()) != 0) {
+ log.warn("Either #LinkIds:{} or #DeviceIds:{} is expected to be 0",
+ linkIds.size(), deviceIds.size());
+ }
+
+ return map;
+ }
+
+ @Override
+ public String toString() {
+ // TODO OUTPUT ALL?
+ return "[RCPort 0x" + Long.toHexString(dpid) + "@" + number
+ + " STATUS:" + status + "]";
+ }
+
+ public static void main(String[] args) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java b/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java
new file mode 100644
index 0000000..16ff01b
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java
@@ -0,0 +1,351 @@
+package net.onrc.onos.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.TreeSet;
+
+import net.onrc.onos.datastore.RCObject;
+import net.onrc.onos.datastore.RCTable;
+import net.onrc.onos.datastore.utils.ByteArrayComparator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
+import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
+import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
+
+/**
+ * Switch Object in RC.
+ *
+ * @note This class will not maintain invariants. e.g. It will NOT automatically
+ * remove Ports on Switch, when deleting a Switch.
+ *
+ */
+public class RCSwitch extends RCObject {
+ private static final Logger log = LoggerFactory.getLogger(RCSwitch.class);
+
+ private static final ThreadLocal<Kryo> switchKryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
+ };
+
+ public static final String GLOBAL_SWITCH_TABLE_NAME = "G:Switch";
+
+ // FIXME these should be Enum or some number, not String
+ private static final String PROP_DPID = "dpid";
+ private static final String PROP_STATUS = "status";
+ private static final String PROP_PORT_IDS = "port-ids";
+
+ // must not re-order enum members, ordinal will be sent over wire
+ public enum STATUS {
+ INACTIVE, ACTIVE;
+ }
+
+ private final Long dpid;
+ private STATUS status;
+ private TreeSet<byte[]> portIds;
+ transient private boolean isPortIdsModified;
+
+ public static final int SWITCHID_BYTES = 2 + 8;
+
+ public static byte[] getSwichID(Long dpid) {
+ if (dpid == null) {
+ throw new IllegalArgumentException("dpid cannot be null");
+ }
+ return ByteBuffer.allocate(SWITCHID_BYTES).putChar('S').putLong(dpid)
+ .array();
+ }
+
+ public static long getDpidFromKey(byte[] key) {
+ ByteBuffer keyBuf = ByteBuffer.wrap(key);
+ if (keyBuf.getChar() != 'S') {
+ throw new IllegalArgumentException("Invalid Switch key");
+ }
+ return keyBuf.getLong();
+ }
+
+ // FIXME specify DPID here, or Should caller specify the key it self?
+ // In other words, should layer above have the control of the ID?
+ public RCSwitch(Long dpid) {
+ super(RCTable.getTable(GLOBAL_SWITCH_TABLE_NAME), getSwichID(dpid));
+
+ this.dpid = dpid;
+ this.status = STATUS.INACTIVE;
+ this.portIds = new TreeSet<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ this.isPortIdsModified = true;
+ }
+
+ public static RCSwitch createFromKey(byte[] key) {
+ return new RCSwitch(getDpidFromKey(key));
+ }
+
+ public STATUS getStatus() {
+ return status;
+ }
+
+ public void setStatus(STATUS status) {
+ this.status = status;
+ getObjectMap().put(PROP_STATUS, status);
+ }
+
+ public Long getDpid() {
+ return dpid;
+ }
+
+ public byte[] getId() {
+ return getKey();
+ }
+
+ public void addPortId(byte[] portId) {
+ // TODO: Should we copy portId, or reference is OK.
+ isPortIdsModified |= portIds.add(portId);
+ }
+
+ public void removePortId(byte[] portId) {
+ isPortIdsModified |= portIds.remove(portId);
+ }
+
+ public void emptyPortIds() {
+ portIds.clear();
+ this.isPortIdsModified = true;
+ }
+
+ public void addAllToPortIds(Collection<byte[]> portIds) {
+ // TODO: Should we copy portId, or reference is OK.
+ isPortIdsModified |= this.portIds.addAll(portIds);
+ }
+
+ /**
+ *
+ * @return Unmodifiable Set view of all the PortIds;
+ */
+ public Set<byte[]> getAllPortIds() {
+ return Collections.unmodifiableSet(portIds);
+ }
+
+ @Override
+ public void serializeAndSetValue() {
+ Map<Object, Object> map = getObjectMap();
+
+ map.put(PROP_DPID, this.dpid);
+ if (isPortIdsModified) {
+ byte[] portIdArray[] = new byte[portIds.size()][];
+ map.put(PROP_PORT_IDS, portIds.toArray(portIdArray));
+ isPortIdsModified = false;
+ }
+
+ serializeAndSetValue(switchKryo.get(), map);
+ }
+
+ @Override
+ public Map<Object, Object> deserializeObjectFromValue() {
+ Map<Object, Object> map = deserializeObjectFromValue(switchKryo.get());
+
+ this.status = (STATUS) map.get(PROP_STATUS);
+
+ if (this.portIds == null) {
+ this.portIds = new TreeSet<>(
+ ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ }
+ byte[] portIdArray[] = (byte[][]) map.get(PROP_PORT_IDS);
+ if (portIdArray != null) {
+ this.portIds.clear();
+ this.portIds.addAll(Arrays.asList(portIdArray));
+ isPortIdsModified = false;
+ } else {
+ // trigger write on next serialize
+ isPortIdsModified = true;
+ }
+ return map;
+ }
+
+ @Override
+ public String toString() {
+ // TODO OUTPUT ALL?
+ return "[RCSwitch 0x" + Long.toHexString(dpid) + " STATUS:" + status
+ + "]";
+ }
+
+ public static void main(String argv[]) {
+ // create active switch 0x1 with 2 ports
+ RCSwitch sw = new RCSwitch(0x1L);
+ sw.setStatus(STATUS.ACTIVE);
+ sw.addPortId("SW0x0001P001".getBytes());
+ sw.addPortId("SW0x0001P002".getBytes());
+
+ try {
+ sw.create();
+ } catch (ObjectExistsException e) {
+ log.debug("Create Switch Failed", e);
+ e.printStackTrace();
+ }
+
+ // read switch 0x1
+ RCSwitch swRead = new RCSwitch(0x1L);
+ try {
+ swRead.read();
+ } catch (ObjectDoesntExistException e) {
+ log.debug("Reading Switch Failed", e);
+ }
+ assert (swRead.getStatus() == STATUS.ACTIVE);
+ for (byte[] portId : swRead.getAllPortIds()) {
+ // bad example code, portId is not expected to be ASCII string
+ log.debug("PortId: {}", new String(portId));
+ }
+ assert (swRead.getAllPortIds().size() == 2);
+
+ // update 0x1
+ swRead.setStatus(STATUS.INACTIVE);
+ swRead.removePortId("SW0x0001P001".getBytes());
+ try {
+ swRead.update();
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ log.debug("Updating Switch Failed", e);
+ }
+
+ // read 0x1 again and delete
+ RCSwitch swRead2 = new RCSwitch(0x1L);
+ try {
+ swRead2.read();
+ } catch (ObjectDoesntExistException e) {
+ log.debug("Reading Switch Again Failed", e);
+ }
+ assert (swRead2.getStatus() == STATUS.INACTIVE);
+ for (byte[] portId : swRead2.getAllPortIds()) {
+ // bad example code, portId is not expected to be ASCII string
+ log.debug("PortId: {}", new String(portId));
+ }
+ assert (swRead2.getAllPortIds().size() == 1);
+ try {
+ swRead2.delete();
+ } catch (ObjectDoesntExistException e) {
+ log.debug("Deleting Switch Failed", e);
+ }
+
+ RCSwitch swRead3 = new RCSwitch(0x1L);
+ try {
+ swRead3.read();
+ } catch (ObjectDoesntExistException e) {
+ log.debug("Switch not found as expected");
+ }
+
+ topology_setup();
+ topology_walk();
+ topology_delete();
+ }
+
+ private static void topology_setup() {
+ log.debug("topology_setup start.");
+
+ RCSwitch sw1 = new RCSwitch(0x1L);
+ sw1.setStatus(STATUS.ACTIVE);
+ try {
+ sw1.create();
+ } catch (ObjectExistsException e) {
+ log.error("Switch creation failed", e);
+ }
+
+ RCPort sw1p1 = new RCPort(0x1L, 1L);
+ sw1p1.setStatus(RCPort.STATUS.ACTIVE);
+ RCPort sw1p2 = new RCPort(0x1L, 2L);
+ sw1p2.setStatus(RCPort.STATUS.ACTIVE);
+ try {
+ sw1p1.create();
+ sw1p2.create();
+ } catch (ObjectExistsException e) {
+ log.error("Port creation failed", e);
+ }
+
+ sw1.emptyPortIds();
+ sw1.addPortId(sw1p1.getId());
+ sw1.addPortId(sw1p2.getId());
+ try {
+ sw1.update();
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ log.error("Switch update failed", e);
+ }
+
+ RCDevice d1 = new RCDevice(new byte[] { 0, 1, 2, 3, 4, 5, 6 });
+ d1.addPortId(sw1p1.getId());
+ try {
+ d1.create();
+ } catch (ObjectExistsException e) {
+ log.error("Device creation failed", e);
+ }
+
+ RCSwitch sw2 = new RCSwitch(0x2L);
+ sw2.setStatus(STATUS.ACTIVE);
+ RCPort sw2p1 = new RCPort(0x2L, 1L);
+ sw2p1.setStatus(RCPort.STATUS.ACTIVE);
+ RCPort sw2p2 = new RCPort(0x2L, 2L);
+ sw2p2.setStatus(RCPort.STATUS.ACTIVE);
+
+ sw2.addPortId(sw2p1.getId());
+ sw2.addPortId(sw2p2.getId());
+ sw2.addAllToPortIds(Arrays.asList(sw2p1.getId(), sw2p2.getId()));
+ assert (sw2.getAllPortIds().size() == 2);
+
+ RCDevice d2 = new RCDevice(new byte[] { 6, 5, 4, 3, 2, 1, 0 });
+ d2.addPortId(sw2p2.getId());
+
+ try {
+ sw2.create();
+ sw2p1.create();
+ sw2p2.create();
+ d2.create();
+ } catch (ObjectExistsException e) {
+ log.error("One of Switch/Port/Device creation failed", e);
+ }
+
+ RCLink l1 = new RCLink(0x1L, 2L, 0x2L, 1L);
+ l1.setStatus(RCLink.STATUS.ACTIVE);
+ try {
+ l1.create();
+ } catch (ObjectExistsException e) {
+ log.error("Link creation failed", e);
+ }
+ log.debug("topology_setup end.");
+ }
+
+ private static void topology_walk() {
+ log.debug("topology_walk start.");
+ RCSwitch sw1 = new RCSwitch(0x1L);
+ try {
+ sw1.read();
+ } catch (ObjectDoesntExistException e) {
+ log.error("Reading switch failed", e);
+ }
+
+ assert (sw1.getDpid() == 0x1L);
+ assert (sw1.getStatus() == STATUS.ACTIVE);
+ assert (sw1.getAllPortIds().size() == 2);
+ for (byte[] portId : sw1.getAllPortIds()) {
+ }
+
+ log.debug("topology_walk end.");
+ }
+
+ private static void topology_delete() {
+ log.debug("topology_delete start.");
+ log.debug("topology_delete end.");
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java
new file mode 100644
index 0000000..08290a3
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java
@@ -0,0 +1,16 @@
+package net.onrc.onos.datastore.utils;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+public final class ByteArrayComparator implements Comparator<byte[]> {
+
+ public static final ByteArrayComparator BYTEARRAY_COMPARATOR = new ByteArrayComparator();
+
+ @Override
+ public int compare(byte[] o1, byte[] o2) {
+ final ByteBuffer b1 = ByteBuffer.wrap(o1);
+ final ByteBuffer b2 = ByteBuffer.wrap(o2);
+ return b1.compareTo(b2);
+ }
+}