WIP:RC Basic+Switch+Port+Link+Device+MultiRead

Change-Id: I487d3c2f07311416b3ae6e55fbc76d38a64c00e2
diff --git a/src/main/java/net/onrc/onos/datastore/RCClient.java b/src/main/java/net/onrc/onos/datastore/RCClient.java
new file mode 100644
index 0000000..df6a8ce
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/RCClient.java
@@ -0,0 +1,30 @@
+package net.onrc.onos.datastore;
+
+import edu.stanford.ramcloud.JRamCloud;
+
+public class RCClient {
+
+    // FIXME come up with a proper way to retrieve configuration
+    public static final int MAX_MULTI_READS = Integer.valueOf(System
+	    .getProperty("ramcloud.max_multi_reads", "400"));
+
+    private static final ThreadLocal<JRamCloud> tlsRCClient = new ThreadLocal<JRamCloud>() {
+	@Override
+	protected JRamCloud initialValue() {
+	    // FIXME come up with a proper way to retrieve configuration
+	    return new JRamCloud(System.getProperty("ramcloud.coordinator",
+		    "fast+udp:host=127.0.0.1,port=12246"));
+	}
+    };
+
+    /**
+     * @return JRamCloud instance intended to be used only within the
+     *         SameThread.
+     * @note Do not store the returned instance in a member variable, etc. which
+     *       may be accessed later by another thread.
+     */
+    static JRamCloud getClient() {
+	return tlsRCClient.get();
+    }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/RCObject.java b/src/main/java/net/onrc/onos/datastore/RCObject.java
new file mode 100644
index 0000000..5bea2c5
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/RCObject.java
@@ -0,0 +1,308 @@
+package net.onrc.onos.datastore;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import net.onrc.onos.datastore.RCTable.Entry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import edu.stanford.ramcloud.JRamCloud;
+import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
+import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
+import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
+
+/**
+ * Class to represent an Object represented as a single K-V pair Value blob.
+ *
+ */
+public class RCObject {
+    private static final Logger log = LoggerFactory.getLogger(RCObject.class);
+    /**
+     * Version number which represents that the object doesnot exist, or hase
+     * never read the DB before.
+     */
+    public static final long VERSION_NONEXISTENT = 0L;
+
+    // Each Object should prepare their own serializer, which has required
+    // objects registered.
+    private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
+	@Override
+	protected Kryo initialValue() {
+	    Kryo kryo = new Kryo();
+	    // kryo.setRegistrationRequired(true);
+	    // TODO TreeMap or just Map
+	    // kryo.register(TreeMap.class);
+	    kryo.setReferences(false);
+	    return kryo;
+	}
+    };
+
+    private final RCTable table;
+    private final byte[] key;
+    private byte[] value;
+    private long version;
+
+    private Map<Object, Object> propertyMap;
+
+    public RCObject(RCTable table, byte[] key) {
+	this(table, key, null);
+    }
+
+    public RCObject(RCTable table, byte[] key, byte[] value) {
+	if (table == null) {
+	    throw new IllegalArgumentException("table cannot be null");
+	}
+	if (key == null) {
+	    throw new IllegalArgumentException("key cannot be null");
+	}
+	this.table = table;
+	this.key = key;
+	this.value = value;
+	this.version = VERSION_NONEXISTENT;
+	this.propertyMap = new HashMap<Object, Object>();
+
+	if (this.value != null) {
+	    deserializeObjectFromValue();
+	}
+    }
+
+    public RCTable getTable() {
+	return table;
+    }
+
+    public long getTableId() {
+	return table.getTableId();
+    }
+
+    public byte[] getKey() {
+	return key;
+    }
+
+    public long getVersion() {
+	return version;
+    }
+
+    /**
+     * Return serialized Value.
+     *
+     * @note will not trigger serialization
+     * @return Will return null, if never been read, or was not serialized
+     */
+    public byte[] getSerializedValue() {
+	return value;
+    }
+
+    /**
+     * Return Object as a Map.
+     *
+     * @note Will not trigger deserialization
+     * @return Will return null, if never been set, or was not deserialized
+     */
+    protected Map<Object, Object> getObjectMap() {
+	return this.propertyMap;
+    }
+
+    protected Map<Object, Object> setObjectMap(Map<Object, Object> new_map) {
+	Map<Object, Object> old_map = this.propertyMap;
+	this.propertyMap = new_map;
+	return old_map;
+    }
+
+    public void serializeAndSetValue() {
+	serializeAndSetValue(defaultKryo.get(), this.propertyMap);
+    }
+
+    protected void serializeAndSetValue(Kryo kryo, Map<Object, Object> javaObject) {
+
+	// value
+	byte[] rcTemp = new byte[1024 * 1024];
+	Output output = new Output(rcTemp);
+	kryo.writeObject(output, javaObject);
+	this.value = output.toBytes();
+    }
+
+    /**
+     * Deserialize
+     *
+     * @return
+     */
+    public Map<Object, Object> deserializeObjectFromValue() {
+	return deserializeObjectFromValue(defaultKryo.get());
+    }
+
+    protected HashMap<Object, Object> deserializeObjectFromValue(Kryo kryo) {
+	return deserializeObjectFromValue(kryo, HashMap.class);
+    }
+
+    protected <T extends Map> T deserializeObjectFromValue(Kryo kryo, Class<T> type) {
+	if (this.value == null)
+	    return null;
+
+	Input input = new Input(this.value);
+	T map = kryo.readObject(input, type);
+	this.propertyMap = map;
+
+	return map;
+    }
+
+    /**
+     * Create an Object in DataStore.
+     *
+     * Fails if the Object with same key already exists.
+     *
+     * @note create an Empty Object if no object has never been set.
+     * @throws ObjectExistsException
+     */
+    public void create() throws ObjectExistsException {
+
+	if (this.propertyMap == null) {
+	    log.warn("No object map was set. Setting empty Map.");
+	    setObjectMap(new HashMap<Object, Object>());
+	}
+	serializeAndSetValue();
+
+	this.version = table.create(key, value);
+    }
+
+    /**
+     * Read an Object from DataStore.
+     *
+     * Fails if the Object with the key does not exist.
+     *
+     * @throws ObjectDoesntExistException
+     *
+     */
+    public void read() throws ObjectDoesntExistException {
+	Entry e = table.read(key);
+	this.value = e.value;
+	this.version = e.version;
+
+	// TODO should we deserialize immediately?
+	deserializeObjectFromValue();
+    }
+
+    /**
+     * Update an existing Object in DataStore checking versions.
+     *
+     * Fails if the Object with key does not exists, or conditional failure.
+     *
+     * @throws WrongVersionException
+     * @throws ObjectDoesntExistException
+     */
+    public void update() throws ObjectDoesntExistException,
+	    WrongVersionException {
+	if (this.propertyMap == null) {
+	    setObjectMap(new HashMap<Object, Object>());
+	}
+	serializeAndSetValue();
+
+	this.version = table.update(key, value, version);
+    }
+
+    /**
+     * Remove an existing Object in DataStore.
+     *
+     * Fails if the Object with key does not exists.
+     *
+     * @throws ObjectDoesntExistException
+     */
+    public void delete() throws ObjectDoesntExistException {
+	this.version = table.delete(key);
+    }
+
+    /**
+     * Multi-read RCObjects.
+     *
+     * If the blob value was read successfully, RCObject will deserialize them.
+     *
+     * @param objects
+     *            RCObjects to read
+     * @return true if there exist an failed read.
+     */
+    public static boolean multiRead(Collection<RCObject> objects) {
+	boolean fail_exists = false;
+
+	ArrayList<RCObject> req = new ArrayList<>();
+	Iterator<RCObject> it = objects.iterator();
+	while (it.hasNext()) {
+
+	    req.add(it.next());
+
+	    if (req.size() >= RCClient.MAX_MULTI_READS) {
+		// dispatch multiRead
+		fail_exists |= multiReadInternal(req);
+		req.clear();
+	    }
+	}
+
+	if (!req.isEmpty()) {
+	    // dispatch multiRead
+	    fail_exists |= multiReadInternal(req);
+	    req.clear();
+	}
+
+	return fail_exists;
+    }
+
+    private static boolean multiReadInternal(ArrayList<RCObject> req) {
+	boolean fail_exists = false;
+	JRamCloud rcClient = RCClient.getClient();
+
+	final int reqs = req.size();
+	JRamCloud.multiReadObject mrObjs[] = new JRamCloud.multiReadObject[reqs];
+
+	// setup multi-read operation
+	for (int i = 0; i < reqs; ++i) {
+	    RCObject obj = req.get(i);
+	    mrObjs[i] = new JRamCloud.multiReadObject(obj.getTableId(),
+		    obj.getKey());
+	}
+
+	// execute
+	JRamCloud.Object results[] = rcClient.multiRead(mrObjs);
+	assert (results.length <= req.size());
+
+	// reflect changes to RCObject
+	for (int i = 0; i < results.length; ++i) {
+	    RCObject obj = req.get(i);
+	    if (results[i] == null) {
+		log.error("MultiRead error, skipping {}, {}", obj.getTable(),
+		        obj);
+		fail_exists = true;
+		continue;
+	    }
+	    assert (Arrays.equals(results[i].key, obj.getKey()));
+
+	    obj.value = results[i].value;
+	    obj.version = results[i].version;
+	    if (obj.version == VERSION_NONEXISTENT) {
+		fail_exists = true;
+	    } else {
+		obj.deserializeObjectFromValue();
+	    }
+	}
+
+	return fail_exists;
+    }
+
+    /**
+     * Get All of it's kind?
+     */
+    public static Collection<? extends RCObject> getAllObjects() {
+	// TODO implement
+	throw new UnsupportedOperationException("Not implemented yet");
+	//Collection<? extends RCObject> list = new ArrayList<>();
+	//return list;
+    }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/RCTable.java b/src/main/java/net/onrc/onos/datastore/RCTable.java
new file mode 100644
index 0000000..4836f24
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/RCTable.java
@@ -0,0 +1,165 @@
+package net.onrc.onos.datastore;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.stanford.ramcloud.JRamCloud;
+import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
+import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
+import edu.stanford.ramcloud.JRamCloud.RejectRules;
+import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
+
+/**
+ * Class to represent a Table in RAMCloud
+ *
+ */
+public class RCTable {
+    private static final Logger log = LoggerFactory.getLogger(RCTable.class);
+
+    private static final ConcurrentHashMap<String, RCTable> table_map = new ConcurrentHashMap<>();
+
+    /**
+     *
+     * @param table
+     *            Table to drop.
+     * @note Instance passed must not be use after successful call.
+     *
+     */
+    public static void dropTable(RCTable table) {
+	JRamCloud rcClient = RCClient.getClient();
+	// TODO mark the table instance as dropped?
+	rcClient.dropTable(table.getTableName());
+	table_map.remove(table.getTableName());
+    }
+
+    public static RCTable getTable(String tableName) {
+	RCTable table = table_map.get(tableName);
+	if (table == null) {
+	    RCTable new_table = new RCTable(tableName);
+	    RCTable existing_table = table_map
+		    .putIfAbsent(tableName, new_table);
+	    if (existing_table != null) {
+		return existing_table;
+	    } else {
+		return new_table;
+	    }
+	}
+	return table;
+    }
+
+    public static class Entry {
+	public final byte[] key;
+	public byte[] value;
+	public long version;
+
+	public Entry(byte[] key, byte[] value, long version) {
+	    this.key = key;
+	    this.value = value;
+	    this.version = version;
+	}
+    }
+
+    // finally the Table itself
+
+    private final long rcTableId;
+    private final String rcTableName;
+
+    // private boolean isDropped = false;
+
+    /**
+     *
+     * @note rcTableName must be unique cluster wide.
+     * @param rcTableName
+     */
+    private RCTable(String rcTableName) {
+	JRamCloud rcClient = RCClient.getClient();
+
+	this.rcTableName = rcTableName;
+
+	// FIXME Is it better to create table here or at getTable
+	this.rcTableId = rcClient.createTable(rcTableName);
+    }
+
+    public long getTableId() {
+	return this.rcTableId;
+    }
+
+    public String getTableName() {
+	return this.rcTableName;
+    }
+
+    // TODO: Enumerate whole table?
+
+    // Reject if exist
+    public long create(final byte[] key, final byte[] value)
+	    throws ObjectExistsException {
+
+	JRamCloud rcClient = RCClient.getClient();
+
+	RejectRules rules = rcClient.new RejectRules();
+	rules.setExists();
+
+	long updated_version = rcClient.writeRule(this.rcTableId, key, value,
+	        rules);
+	return updated_version;
+    }
+
+    // read
+    public Entry read(final byte[] key) throws ObjectDoesntExistException {
+
+	JRamCloud rcClient = RCClient.getClient();
+
+	// FIXME underlying JRamCloud cannot detect "not exist"
+	// RejectRules rules = rcClient.new RejectRules();
+	// rules.setDoesntExists();
+	// JRamCloud.Object rcObj = rcClient.read(this.rcTableId, key, rules);
+	JRamCloud.Object rcObj = rcClient.read(this.rcTableId, key);
+
+	return new Entry(rcObj.key, rcObj.value, rcObj.version);
+    }
+
+    // Reject if version neq
+    public long update(final byte[] key, final byte[] value, final long version)
+	    throws ObjectDoesntExistException, WrongVersionException {
+
+	JRamCloud rcClient = RCClient.getClient();
+
+	RejectRules rules = rcClient.new RejectRules();
+	rules.setDoesntExists();
+	rules.setNeVersion(version);
+
+	long updated_version = rcClient.writeRule(this.rcTableId, key, value,
+	        rules);
+	return updated_version;
+    }
+
+    // Reject if not exist
+    public long update(final byte[] key, final byte[] value)
+	    throws ObjectDoesntExistException {
+
+	JRamCloud rcClient = RCClient.getClient();
+
+	RejectRules rules = rcClient.new RejectRules();
+	rules.setDoesntExists();
+
+	long updated_version = rcClient.writeRule(this.rcTableId, key, value,
+	        rules);
+	return updated_version;
+
+    }
+
+    // Reject if not exist
+    public long delete(final byte[] key) throws ObjectDoesntExistException {
+	JRamCloud rcClient = RCClient.getClient();
+
+	// FIXME underlying JRamCloud does not support cond remove
+	RejectRules rules = rcClient.new RejectRules();
+	rules.setDoesntExists();
+
+	long removed_version = rcClient.remove(this.rcTableId, key, rules);
+	return removed_version;
+    }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/RCDevice.java b/src/main/java/net/onrc/onos/datastore/topology/RCDevice.java
new file mode 100644
index 0000000..0e8afe4
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/topology/RCDevice.java
@@ -0,0 +1,155 @@
+package net.onrc.onos.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import net.onrc.onos.datastore.RCObject;
+import net.onrc.onos.datastore.RCTable;
+import net.onrc.onos.datastore.topology.RCLink.STATUS;
+import net.onrc.onos.datastore.utils.ByteArrayComparator;
+
+public class RCDevice extends RCObject {
+    @SuppressWarnings("unused")
+    private static final Logger log = LoggerFactory.getLogger(RCDevice.class);
+
+    private static final ThreadLocal<Kryo> deviceKryo = new ThreadLocal<Kryo>() {
+	@Override
+	protected Kryo initialValue() {
+	    Kryo kryo = new Kryo();
+	    kryo.setRegistrationRequired(true);
+	    kryo.setReferences(false);
+	    kryo.register(byte[].class);
+	    kryo.register(byte[][].class);
+	    kryo.register(HashMap.class);
+	    // TODO check if we should explicitly specify EnumSerializer
+	    kryo.register(STATUS.class);
+	    return kryo;
+	}
+    };
+
+    public static final String GLOBAL_DEVICE_TABLE_NAME = "G:Device";
+
+    // FIXME these should be Enum or some number, not String
+    private static final String PROP_MAC = "mac";
+    private static final String PROP_PORT_IDS = "port-ids";
+
+    private final byte[] mac;
+    private TreeSet<byte[]> portIds;
+    transient private boolean isPortIdsModified;
+
+    // Assuming mac is unique cluster-wide
+    public static byte[] getDeviceID(final byte[] mac) {
+	return ByteBuffer.allocate(2 + mac.length).putChar('D').put(mac)
+	        .array();
+    }
+
+    public static byte[] getMacFromKey(byte[] key) {
+	ByteBuffer keyBuf = ByteBuffer.wrap(key);
+	if (keyBuf.getChar() != 'D') {
+	    throw new IllegalArgumentException("Invalid Device key");
+	}
+	byte[] mac = new byte[keyBuf.remaining()];
+	keyBuf.get(mac);
+	return mac;
+    }
+
+    public RCDevice(byte[] mac) {
+	super(RCTable.getTable(GLOBAL_DEVICE_TABLE_NAME), getDeviceID(mac));
+
+	this.mac = mac;
+	this.portIds = new TreeSet<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
+	this.isPortIdsModified = true;
+    }
+
+    public static RCDevice createFromKey(byte[] key) {
+	return new RCDevice(getMacFromKey(key));
+    }
+
+    public byte[] getMac() {
+	// TODO may need to clone() to be sure this object will be immutable.
+	return mac;
+    }
+
+    public byte[] getId() {
+	return getKey();
+    }
+
+    public void addPortId(byte[] portId) {
+	// TODO: Should we copy portId, or reference is OK.
+	isPortIdsModified |= portIds.add(portId);
+    }
+
+    public void removePortId(byte[] portId) {
+	isPortIdsModified |= portIds.remove(portId);
+    }
+
+    public void emptyPortIds() {
+	portIds.clear();
+	this.isPortIdsModified = true;
+    }
+
+    public void addAllToPortIds(Collection<byte[]> portIds) {
+	// TODO: Should we copy portId, or reference is OK.
+	isPortIdsModified |= this.portIds.addAll(portIds);
+    }
+
+    /**
+     *
+     * @return Unmodifiable Set view of all the PortIds;
+     */
+    public Set<byte[]> getAllPortIds() {
+	return Collections.unmodifiableSet(portIds);
+    }
+
+    @Override
+    public void serializeAndSetValue() {
+	Map<Object, Object> map = getObjectMap();
+
+	map.put(PROP_MAC, mac);
+	if (isPortIdsModified) {
+	    byte[] portIdArray[] = new byte[portIds.size()][];
+	    map.put(PROP_PORT_IDS, portIds.toArray(portIdArray));
+	    isPortIdsModified = false;
+	}
+
+	serializeAndSetValue(deviceKryo.get(), map);
+    }
+
+    @Override
+    public Map<Object, Object> deserializeObjectFromValue() {
+	Map<Object, Object> map = deserializeObjectFromValue(deviceKryo.get());
+
+	if (this.portIds == null) {
+	    this.portIds = new TreeSet<>(
+		    ByteArrayComparator.BYTEARRAY_COMPARATOR);
+	}
+	byte[] portIdArray[] = (byte[][]) map.get(PROP_PORT_IDS);
+	if (portIdArray != null) {
+	    this.portIds.clear();
+	    this.portIds.addAll(Arrays.asList(portIdArray));
+	    isPortIdsModified = false;
+	} else {
+	    // trigger write on next serialize
+	    isPortIdsModified = true;
+	}
+
+	return map;
+    }
+
+    public static void main(String[] args) {
+	// TODO Auto-generated method stub
+
+    }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/RCLink.java b/src/main/java/net/onrc/onos/datastore/topology/RCLink.java
new file mode 100644
index 0000000..957ef88
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/topology/RCLink.java
@@ -0,0 +1,135 @@
+package net.onrc.onos.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import net.onrc.onos.datastore.RCObject;
+import net.onrc.onos.datastore.RCTable;
+
+public class RCLink extends RCObject {
+    @SuppressWarnings("unused")
+    private static final Logger log = LoggerFactory.getLogger(RCLink.class);
+
+    private static final ThreadLocal<Kryo> linkKryo = new ThreadLocal<Kryo>() {
+	@Override
+	protected Kryo initialValue() {
+	    Kryo kryo = new Kryo();
+	    kryo.setRegistrationRequired(true);
+	    kryo.setReferences(false);
+	    kryo.register(byte[].class);
+	    kryo.register(byte[][].class);
+	    kryo.register(HashMap.class);
+	    // TODO check if we should explicitly specify EnumSerializer
+	    kryo.register(STATUS.class);
+	    return kryo;
+	}
+    };
+
+    public static class SwitchPort {
+	public final Long dpid;
+	public final Long number;
+
+	public SwitchPort(Long dpid, Long number) {
+	    this.dpid = dpid;
+	    this.number = number;
+	}
+
+	public byte[] getPortID() {
+	    return RCPort.getPortID(dpid, number);
+	}
+
+	public byte[] getSwitchID() {
+	    return RCSwitch.getSwichID(dpid);
+	}
+    }
+
+    public static final String GLOBAL_LINK_TABLE_NAME = "G:Link";
+
+    // FIXME these should be Enum or some number, not String
+    private static final String PROP_STATUS = "status";
+    private static final String PROP_SRC_SW_ID = "src-sw-id";
+    private static final String PROP_SRC_PORT_ID = "src-port-id";
+    private static final String PROP_DST_SW_ID = "dst-sw-id";
+    private static final String PROP_DST_PORT_ID = "dst-port-id";
+
+    // must not re-order enum members, ordinal will be sent over wire
+    public enum STATUS {
+	INACTIVE, ACTIVE;
+    }
+
+    private final SwitchPort src;
+    private final SwitchPort dst;
+    private STATUS status;
+
+    public static final int LINKID_BYTES = 2 + RCPort.PORTID_BYTES * 2;
+
+    public static byte[] getLinkID(Long src_dpid, Long src_port_no,
+	    Long dst_dpid, Long dst_port_no) {
+	return ByteBuffer.allocate(LINKID_BYTES).putChar('L')
+	        .put(RCPort.getPortID(src_dpid, src_port_no))
+	        .put(RCPort.getPortID(dst_dpid, dst_port_no)).array();
+    }
+
+    public RCLink(Long src_dpid, Long src_port_no, Long dst_dpid,
+	    Long dst_port_no) {
+	super(RCTable.getTable(GLOBAL_LINK_TABLE_NAME), getLinkID(src_dpid,
+	        src_port_no, dst_dpid, dst_port_no));
+
+	src = new SwitchPort(src_dpid, src_port_no);
+	dst = new SwitchPort(dst_dpid, dst_port_no);
+	status = STATUS.INACTIVE;
+    }
+
+    public STATUS getStatus() {
+	return status;
+    }
+
+    public void setStatus(STATUS status) {
+	this.status = status;
+	getObjectMap().put(PROP_STATUS, status);
+    }
+
+    public SwitchPort getSrc() {
+	return src;
+    }
+
+    public SwitchPort getDst() {
+	return dst;
+    }
+
+    public byte[] getId() {
+	return getKey();
+    }
+
+    @Override
+    public void serializeAndSetValue() {
+	Map<Object, Object> map = getObjectMap();
+
+	map.put(PROP_SRC_SW_ID, src.getSwitchID());
+	map.put(PROP_SRC_PORT_ID, src.getPortID());
+	map.put(PROP_DST_SW_ID, dst.getSwitchID());
+	map.put(PROP_DST_PORT_ID, dst.getPortID());
+
+	serializeAndSetValue(linkKryo.get(), map);
+    }
+
+    @Override
+    public Map<Object, Object> deserializeObjectFromValue() {
+	Map<Object, Object> map = deserializeObjectFromValue(linkKryo.get());
+
+	this.status = (STATUS) map.get(PROP_STATUS);
+	return map;
+    }
+
+    public static void main(String[] args) {
+	// TODO Auto-generated method stub
+
+    }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/RCPort.java b/src/main/java/net/onrc/onos/datastore/topology/RCPort.java
new file mode 100644
index 0000000..cdaf383
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/topology/RCPort.java
@@ -0,0 +1,263 @@
+package net.onrc.onos.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import net.onrc.onos.datastore.RCObject;
+import net.onrc.onos.datastore.RCTable;
+import net.onrc.onos.datastore.utils.ByteArrayComparator;
+
+public class RCPort extends RCObject {
+    private static final Logger log = LoggerFactory.getLogger(RCPort.class);
+
+    private static final ThreadLocal<Kryo> portKryo = new ThreadLocal<Kryo>() {
+	@Override
+	protected Kryo initialValue() {
+	    Kryo kryo = new Kryo();
+	    kryo.setRegistrationRequired(true);
+	    kryo.setReferences(false);
+	    kryo.register(byte[].class);
+	    kryo.register(byte[][].class);
+	    kryo.register(HashMap.class);
+	    // TODO check if we should explicitly specify EnumSerializer
+	    kryo.register(STATUS.class);
+	    return kryo;
+	}
+    };
+
+    public static final String GLOBAL_PORT_TABLE_NAME = "G:Port";
+
+    // FIXME these should be Enum or some number, not String
+    private static final String PROP_DPID = "dpid";
+    private static final String PROP_NUMBER = "number";
+    private static final String PROP_STATUS = "status";
+    private static final String PROP_LINK_IDS = "link-ids";
+    private static final String PROP_DEVICE_IDS = "device-ids";
+
+    // must not re-order enum members, ordinal will be sent over wire
+    public enum STATUS {
+	INACTIVE, ACTIVE;
+    }
+
+    private final Long dpid;
+    private final Long number;
+
+    private STATUS status;
+    // XXX These 2 set of Ids can be removed from DataStore, if In-Memory cache
+    // build the indexing info from Link.
+    private TreeSet<byte[]> linkIds;
+    transient boolean isLinkIdsModified;
+    private TreeSet<byte[]> deviceIds;
+    transient boolean isDeviceIdsModified;
+
+    public static final int PORTID_BYTES = RCSwitch.SWITCHID_BYTES + 2 + 8;
+
+    public static byte[] getPortID(Long dpid, Long number) {
+	if (dpid == null) {
+	    throw new IllegalArgumentException("dpid cannot be null");
+	}
+	if (number == null) {
+	    throw new IllegalArgumentException("number cannot be null");
+	}
+	return ByteBuffer.allocate(PORTID_BYTES).putChar('S').putLong(dpid)
+	        .putChar('P').putLong(number).array();
+    }
+
+    public static long getDpidFromKey(byte[] key) {
+	ByteBuffer keyBuf = ByteBuffer.wrap(key);
+	if (keyBuf.getChar() != 'S') {
+	    throw new IllegalArgumentException("Invalid Port key");
+	}
+	return keyBuf.getLong();
+    }
+
+    public static long getNumberFromKey(byte[] key) {
+	ByteBuffer keyBuf = ByteBuffer.wrap(key);
+	keyBuf.position(RCSwitch.SWITCHID_BYTES);
+	if (keyBuf.getChar() != 'P') {
+	    throw new IllegalArgumentException("Invalid Port key");
+	}
+	return keyBuf.getLong();
+    }
+
+    // FIXME specify DPID,number here, or Should caller specify the key it self?
+    // In other words, should layer above have the control of the ID?
+    public RCPort(Long dpid, Long number) {
+	super(RCTable.getTable(GLOBAL_PORT_TABLE_NAME), getPortID(dpid, number));
+
+	// TODO Auto-generated constructor stub
+
+	this.dpid = dpid;
+	this.number = number;
+	this.status = STATUS.INACTIVE;
+	this.linkIds = new TreeSet<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
+	this.isLinkIdsModified = true;
+	this.deviceIds = new TreeSet<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
+	this.isDeviceIdsModified = true;
+    }
+
+    public static RCPort createFromKey(byte[] key) {
+	return new RCPort(getDpidFromKey(key), getNumberFromKey(key));
+    }
+
+    public STATUS getStatus() {
+	return status;
+    }
+
+    public void setStatus(STATUS status) {
+	this.status = status;
+	getObjectMap().put(PROP_STATUS, status);
+    }
+
+    public Long getDpid() {
+	return dpid;
+    }
+
+    public Long getNumber() {
+	return number;
+    }
+
+    public byte[] getId() {
+	return getKey();
+    }
+
+    public void addLinkId(byte[] linkId) {
+	isLinkIdsModified |= linkIds.add(linkId);
+    }
+
+    public void removeLinkId(byte[] linkId) {
+	isLinkIdsModified |= linkIds.remove(linkId);
+    }
+
+    public void emptyLinkIds() {
+	linkIds.clear();
+	isLinkIdsModified = true;
+    }
+
+    public void addAllToLinkIds(Collection<byte[]> linkIds) {
+	isLinkIdsModified |= this.linkIds.addAll(linkIds);
+    }
+
+    /**
+     *
+     * @return Unmodifiable Set view of all the LinkIds;
+     */
+    public Set<byte[]> getAllLinkIds() {
+	return Collections.unmodifiableSet(linkIds);
+    }
+
+    public void addDeviceId(byte[] deviceId) {
+	isDeviceIdsModified |= deviceIds.add(deviceId);
+    }
+
+    public void removeDeviceId(byte[] deviceId) {
+	isDeviceIdsModified |= deviceIds.remove(deviceId);
+    }
+
+    public void emptyDeviceIds() {
+	deviceIds.clear();
+	isDeviceIdsModified = true;
+    }
+
+    public void addAllToDeviceIds(Collection<byte[]> deviceIds) {
+	isDeviceIdsModified |= this.deviceIds.addAll(deviceIds);
+    }
+
+    /**
+     *
+     * @return Unmodifiable Set view of all the LinkIds;
+     */
+    public Set<byte[]> getAllDeviceIds() {
+	return Collections.unmodifiableSet(deviceIds);
+    }
+
+    @Override
+    public void serializeAndSetValue() {
+	Map<Object, Object> map = getObjectMap();
+
+	map.put(PROP_DPID, this.dpid);
+	map.put(PROP_NUMBER, this.number);
+	if (isLinkIdsModified) {
+	    byte[] linkIdArray[] = new byte[linkIds.size()][];
+	    map.put(PROP_LINK_IDS, linkIds.toArray(linkIdArray));
+	    isLinkIdsModified = false;
+	}
+	if (isDeviceIdsModified) {
+	    byte[] deviceIdArray[] = new byte[deviceIds.size()][];
+	    map.put(PROP_DEVICE_IDS, deviceIds.toArray(deviceIdArray));
+	    isDeviceIdsModified = false;
+	}
+	if (log.isWarnEnabled() && (linkIds.size() * deviceIds.size()) != 0) {
+	    log.warn("Either #LinkIds:{} or #DeviceIds:{} is expected to be 0",
+		    linkIds.size(), deviceIds.size());
+	}
+
+	serializeAndSetValue(portKryo.get(), map);
+    }
+
+    @Override
+    public Map<Object, Object> deserializeObjectFromValue() {
+	Map<Object, Object> map = deserializeObjectFromValue(portKryo.get());
+
+	this.status = (STATUS) map.get(PROP_STATUS);
+
+	if (this.linkIds == null) {
+	    this.linkIds = new TreeSet<>(
+		    ByteArrayComparator.BYTEARRAY_COMPARATOR);
+	}
+	byte[] linkIdArray[] = (byte[][]) map.get(PROP_LINK_IDS);
+	if (linkIdArray != null) {
+	    this.linkIds.clear();
+	    this.linkIds.addAll(Arrays.asList(linkIdArray));
+	    isLinkIdsModified = false;
+	} else {
+	    // trigger write on next serialize
+	    isLinkIdsModified = true;
+	}
+
+	if (this.deviceIds == null) {
+	    this.deviceIds = new TreeSet<>(
+		    ByteArrayComparator.BYTEARRAY_COMPARATOR);
+	}
+	byte[] deviceIdArray[] = (byte[][]) map.get(PROP_DEVICE_IDS);
+	if (deviceIdArray != null) {
+	    this.deviceIds.clear();
+	    this.deviceIds.addAll(Arrays.asList(deviceIdArray));
+	    isDeviceIdsModified = false;
+	} else {
+	    // trigger write on next serialize
+	    isDeviceIdsModified = true;
+	}
+
+	if (log.isWarnEnabled() && (linkIds.size() * deviceIds.size()) != 0) {
+	    log.warn("Either #LinkIds:{} or #DeviceIds:{} is expected to be 0",
+		    linkIds.size(), deviceIds.size());
+	}
+
+	return map;
+    }
+
+    @Override
+    public String toString() {
+	// TODO OUTPUT ALL?
+	return "[RCPort 0x" + Long.toHexString(dpid) + "@" + number
+	        + " STATUS:" + status + "]";
+    }
+
+    public static void main(String[] args) {
+	// TODO Auto-generated method stub
+
+    }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java b/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java
new file mode 100644
index 0000000..16ff01b
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java
@@ -0,0 +1,351 @@
+package net.onrc.onos.datastore.topology;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.TreeSet;
+
+import net.onrc.onos.datastore.RCObject;
+import net.onrc.onos.datastore.RCTable;
+import net.onrc.onos.datastore.utils.ByteArrayComparator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
+import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
+import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
+
+/**
+ * Switch Object in RC.
+ *
+ * @note This class will not maintain invariants. e.g. It will NOT automatically
+ *       remove Ports on Switch, when deleting a Switch.
+ *
+ */
+public class RCSwitch extends RCObject {
+    private static final Logger log = LoggerFactory.getLogger(RCSwitch.class);
+
+    private static final ThreadLocal<Kryo> switchKryo = new ThreadLocal<Kryo>() {
+	@Override
+	protected Kryo initialValue() {
+	    Kryo kryo = new Kryo();
+	    kryo.setRegistrationRequired(true);
+	    kryo.setReferences(false);
+	    kryo.register(byte[].class);
+	    kryo.register(byte[][].class);
+	    kryo.register(HashMap.class);
+	    // TODO check if we should explicitly specify EnumSerializer
+	    kryo.register(STATUS.class);
+	    return kryo;
+	}
+    };
+
+    public static final String GLOBAL_SWITCH_TABLE_NAME = "G:Switch";
+
+    // FIXME these should be Enum or some number, not String
+    private static final String PROP_DPID = "dpid";
+    private static final String PROP_STATUS = "status";
+    private static final String PROP_PORT_IDS = "port-ids";
+
+    // must not re-order enum members, ordinal will be sent over wire
+    public enum STATUS {
+	INACTIVE, ACTIVE;
+    }
+
+    private final Long dpid;
+    private STATUS status;
+    private TreeSet<byte[]> portIds;
+    transient private boolean isPortIdsModified;
+
+    public static final int SWITCHID_BYTES = 2 + 8;
+
+    public static byte[] getSwichID(Long dpid) {
+	if (dpid == null) {
+	    throw new IllegalArgumentException("dpid cannot be null");
+	}
+	return ByteBuffer.allocate(SWITCHID_BYTES).putChar('S').putLong(dpid)
+	        .array();
+    }
+
+    public static long getDpidFromKey(byte[] key) {
+	ByteBuffer keyBuf = ByteBuffer.wrap(key);
+	if (keyBuf.getChar() != 'S') {
+	    throw new IllegalArgumentException("Invalid Switch key");
+	}
+	return keyBuf.getLong();
+    }
+
+    // FIXME specify DPID here, or Should caller specify the key it self?
+    // In other words, should layer above have the control of the ID?
+    public RCSwitch(Long dpid) {
+	super(RCTable.getTable(GLOBAL_SWITCH_TABLE_NAME), getSwichID(dpid));
+
+	this.dpid = dpid;
+	this.status = STATUS.INACTIVE;
+	this.portIds = new TreeSet<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
+	this.isPortIdsModified = true;
+    }
+
+    public static RCSwitch createFromKey(byte[] key) {
+	return new RCSwitch(getDpidFromKey(key));
+    }
+
+    public STATUS getStatus() {
+	return status;
+    }
+
+    public void setStatus(STATUS status) {
+	this.status = status;
+	getObjectMap().put(PROP_STATUS, status);
+    }
+
+    public Long getDpid() {
+	return dpid;
+    }
+
+    public byte[] getId() {
+	return getKey();
+    }
+
+    public void addPortId(byte[] portId) {
+	// TODO: Should we copy portId, or reference is OK.
+	isPortIdsModified |= portIds.add(portId);
+    }
+
+    public void removePortId(byte[] portId) {
+	isPortIdsModified |= portIds.remove(portId);
+    }
+
+    public void emptyPortIds() {
+	portIds.clear();
+	this.isPortIdsModified = true;
+    }
+
+    public void addAllToPortIds(Collection<byte[]> portIds) {
+	// TODO: Should we copy portId, or reference is OK.
+	isPortIdsModified |= this.portIds.addAll(portIds);
+    }
+
+    /**
+     *
+     * @return Unmodifiable Set view of all the PortIds;
+     */
+    public Set<byte[]> getAllPortIds() {
+	return Collections.unmodifiableSet(portIds);
+    }
+
+    @Override
+    public void serializeAndSetValue() {
+	Map<Object, Object> map = getObjectMap();
+
+	map.put(PROP_DPID, this.dpid);
+	if (isPortIdsModified) {
+	    byte[] portIdArray[] = new byte[portIds.size()][];
+	    map.put(PROP_PORT_IDS, portIds.toArray(portIdArray));
+	    isPortIdsModified = false;
+	}
+
+	serializeAndSetValue(switchKryo.get(), map);
+    }
+
+    @Override
+    public Map<Object, Object> deserializeObjectFromValue() {
+	Map<Object, Object> map = deserializeObjectFromValue(switchKryo.get());
+
+	this.status = (STATUS) map.get(PROP_STATUS);
+
+	if (this.portIds == null) {
+	    this.portIds = new TreeSet<>(
+		    ByteArrayComparator.BYTEARRAY_COMPARATOR);
+	}
+	byte[] portIdArray[] = (byte[][]) map.get(PROP_PORT_IDS);
+	if (portIdArray != null) {
+	    this.portIds.clear();
+	    this.portIds.addAll(Arrays.asList(portIdArray));
+	    isPortIdsModified = false;
+	} else {
+	    // trigger write on next serialize
+	    isPortIdsModified = true;
+	}
+	return map;
+    }
+
+    @Override
+    public String toString() {
+	// TODO OUTPUT ALL?
+	return "[RCSwitch 0x" + Long.toHexString(dpid) + " STATUS:" + status
+	        + "]";
+    }
+
+    public static void main(String argv[]) {
+	// create active switch 0x1 with 2 ports
+	RCSwitch sw = new RCSwitch(0x1L);
+	sw.setStatus(STATUS.ACTIVE);
+	sw.addPortId("SW0x0001P001".getBytes());
+	sw.addPortId("SW0x0001P002".getBytes());
+
+	try {
+	    sw.create();
+	} catch (ObjectExistsException e) {
+	    log.debug("Create Switch Failed", e);
+	    e.printStackTrace();
+	}
+
+	// read switch 0x1
+	RCSwitch swRead = new RCSwitch(0x1L);
+	try {
+	    swRead.read();
+	} catch (ObjectDoesntExistException e) {
+	    log.debug("Reading Switch Failed", e);
+	}
+	assert (swRead.getStatus() == STATUS.ACTIVE);
+	for (byte[] portId : swRead.getAllPortIds()) {
+	    // bad example code, portId is not expected to be ASCII string
+	    log.debug("PortId: {}", new String(portId));
+	}
+	assert (swRead.getAllPortIds().size() == 2);
+
+	// update 0x1
+	swRead.setStatus(STATUS.INACTIVE);
+	swRead.removePortId("SW0x0001P001".getBytes());
+	try {
+	    swRead.update();
+	} catch (ObjectDoesntExistException | WrongVersionException e) {
+	    log.debug("Updating Switch Failed", e);
+	}
+
+	// read 0x1 again and delete
+	RCSwitch swRead2 = new RCSwitch(0x1L);
+	try {
+	    swRead2.read();
+	} catch (ObjectDoesntExistException e) {
+	    log.debug("Reading Switch Again Failed", e);
+	}
+	assert (swRead2.getStatus() == STATUS.INACTIVE);
+	for (byte[] portId : swRead2.getAllPortIds()) {
+	    // bad example code, portId is not expected to be ASCII string
+	    log.debug("PortId: {}", new String(portId));
+	}
+	assert (swRead2.getAllPortIds().size() == 1);
+	try {
+	    swRead2.delete();
+	} catch (ObjectDoesntExistException e) {
+	    log.debug("Deleting Switch Failed", e);
+	}
+
+	RCSwitch swRead3 = new RCSwitch(0x1L);
+	try {
+	    swRead3.read();
+	} catch (ObjectDoesntExistException e) {
+	    log.debug("Switch not found as expected");
+	}
+
+	topology_setup();
+	topology_walk();
+	topology_delete();
+    }
+
+    private static void topology_setup() {
+	log.debug("topology_setup start.");
+
+	RCSwitch sw1 = new RCSwitch(0x1L);
+	sw1.setStatus(STATUS.ACTIVE);
+	try {
+	    sw1.create();
+	} catch (ObjectExistsException e) {
+	    log.error("Switch creation failed", e);
+	}
+
+	RCPort sw1p1 = new RCPort(0x1L, 1L);
+	sw1p1.setStatus(RCPort.STATUS.ACTIVE);
+	RCPort sw1p2 = new RCPort(0x1L, 2L);
+	sw1p2.setStatus(RCPort.STATUS.ACTIVE);
+	try {
+	    sw1p1.create();
+	    sw1p2.create();
+	} catch (ObjectExistsException e) {
+	    log.error("Port creation failed", e);
+	}
+
+	sw1.emptyPortIds();
+	sw1.addPortId(sw1p1.getId());
+	sw1.addPortId(sw1p2.getId());
+	try {
+	    sw1.update();
+	} catch (ObjectDoesntExistException | WrongVersionException e) {
+	    log.error("Switch update failed", e);
+	}
+
+	RCDevice d1 = new RCDevice(new byte[] { 0, 1, 2, 3, 4, 5, 6 });
+	d1.addPortId(sw1p1.getId());
+	try {
+	    d1.create();
+	} catch (ObjectExistsException e) {
+	    log.error("Device creation failed", e);
+	}
+
+	RCSwitch sw2 = new RCSwitch(0x2L);
+	sw2.setStatus(STATUS.ACTIVE);
+	RCPort sw2p1 = new RCPort(0x2L, 1L);
+	sw2p1.setStatus(RCPort.STATUS.ACTIVE);
+	RCPort sw2p2 = new RCPort(0x2L, 2L);
+	sw2p2.setStatus(RCPort.STATUS.ACTIVE);
+
+	sw2.addPortId(sw2p1.getId());
+	sw2.addPortId(sw2p2.getId());
+	sw2.addAllToPortIds(Arrays.asList(sw2p1.getId(), sw2p2.getId()));
+	assert (sw2.getAllPortIds().size() == 2);
+
+	RCDevice d2 = new RCDevice(new byte[] { 6, 5, 4, 3, 2, 1, 0 });
+	d2.addPortId(sw2p2.getId());
+
+	try {
+	    sw2.create();
+	    sw2p1.create();
+	    sw2p2.create();
+	    d2.create();
+	} catch (ObjectExistsException e) {
+	    log.error("One of Switch/Port/Device creation failed", e);
+	}
+
+	RCLink l1 = new RCLink(0x1L, 2L, 0x2L, 1L);
+	l1.setStatus(RCLink.STATUS.ACTIVE);
+	try {
+	    l1.create();
+	} catch (ObjectExistsException e) {
+	    log.error("Link creation failed", e);
+	}
+	log.debug("topology_setup end.");
+    }
+
+    private static void topology_walk() {
+	log.debug("topology_walk start.");
+	RCSwitch sw1 = new RCSwitch(0x1L);
+	try {
+	    sw1.read();
+	} catch (ObjectDoesntExistException e) {
+	    log.error("Reading switch failed", e);
+	}
+
+	assert (sw1.getDpid() == 0x1L);
+	assert (sw1.getStatus() == STATUS.ACTIVE);
+	assert (sw1.getAllPortIds().size() == 2);
+	for (byte[] portId : sw1.getAllPortIds()) {
+	}
+
+	log.debug("topology_walk end.");
+    }
+
+    private static void topology_delete() {
+	log.debug("topology_delete start.");
+	log.debug("topology_delete end.");
+    }
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java
new file mode 100644
index 0000000..08290a3
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java
@@ -0,0 +1,16 @@
+package net.onrc.onos.datastore.utils;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+public final class ByteArrayComparator implements Comparator<byte[]> {
+
+    public static final ByteArrayComparator BYTEARRAY_COMPARATOR = new ByteArrayComparator();
+
+    @Override
+    public int compare(byte[] o1, byte[] o2) {
+	final ByteBuffer b1 = ByteBuffer.wrap(o1);
+	final ByteBuffer b2 = ByteBuffer.wrap(o2);
+	return b1.compareTo(b2);
+    }
+}