Add option to use Hazelcast as datastore for development environment.

This patch will set the default backend as Hazelcast.
To use RAMCloud as backend data store, add -Dnet.onrc.onos.datastore.backend=ramcloud to java option.

- ClientMode: Use existing Hazelcast Instance if it exist.
  - map name starting with datastore:// is now configured to be strong consistent

- add main for manual testing
- follow PMD,etc. where easily possible

- make HZClient Singleton

Change-Id: Ibe2afc3bfddfd7fd567c91477c16cd679fc543d4
diff --git a/src/test/java/net/onrc/onos/datastore/hazelcast/HZTableTest.java b/src/test/java/net/onrc/onos/datastore/hazelcast/HZTableTest.java
new file mode 100644
index 0000000..a3bfab6
--- /dev/null
+++ b/src/test/java/net/onrc/onos/datastore/hazelcast/HZTableTest.java
@@ -0,0 +1,365 @@
+package net.onrc.onos.datastore.hazelcast;
+
+import static org.junit.Assert.*;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.TreeMap;
+
+import net.onrc.onos.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.hazelcast.HZTable.VersionedValue;
+import net.onrc.onos.datastore.utils.ByteArrayComparator;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class HZTableTest {
+    @Rule
+    public TestName name= new TestName();
+
+    static final String TEST_TABLE_NAME = "TableForUnitTest";
+    HZTable table;
+
+    @Before
+    public void setUp() throws Exception {
+	table = (HZTable) HZClient.getClient().getTable(TEST_TABLE_NAME);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+	HZClient.getClient().dropTable(table);
+    }
+
+    public void assertEntryInTable(final byte[] key, final byte[] value, final long version) {
+	VersionedValue valueblob = table.getBackendMap().get(key);
+	assertNotNull(valueblob);
+	assertArrayEquals(value, valueblob.getValue());
+	assertEquals(version, valueblob.getVersion());
+    }
+
+    public void assertKeyNotInTable(final byte[] key) {
+	VersionedValue valueblob = table.getBackendMap().get(key);
+	assertNull(valueblob);
+    }
+
+    @Test
+    public void testGetInitialVersion() {
+	final long version1 = HZTable.getInitialVersion();
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version1);
+
+	final long version2 = HZTable.getInitialVersion();
+	assertNotEquals(HZClient.VERSION_NONEXISTENT, version2);
+	assertNotEquals(version1, version2);
+    }
+
+    @Test
+    public void testGetNextVersion() {
+	final long nextVersion = HZTable.getNextVersion(1);
+	assertNotEquals(nextVersion, HZClient.VERSION_NONEXISTENT);
+	assertNotEquals(nextVersion, 1);
+
+	final long nextVersion1 = HZTable.getNextVersion(Long.MAX_VALUE);
+	assertNotEquals(nextVersion1, HZClient.VERSION_NONEXISTENT);
+	assertNotEquals(nextVersion1, Long.MAX_VALUE);
+
+	final long nextVersion11 = HZTable.getNextVersion(HZClient.VERSION_NONEXISTENT-1);
+	assertNotEquals(nextVersion11, HZClient.VERSION_NONEXISTENT);
+	assertNotEquals(nextVersion11, HZClient.VERSION_NONEXISTENT-1);
+    }
+
+    @Ignore // nothing to test for now
+    @Test
+    public void testHZTable() {
+	fail("Not yet implemented");
+    }
+
+    @Test
+    public void testGetTableName() {
+	assertEquals(TEST_TABLE_NAME, table.getTableName());
+    }
+
+    @Test
+    public void testVERSION_NONEXISTENT() {
+	assertEquals(HZClient.VERSION_NONEXISTENT, table.VERSION_NONEXISTENT());
+    }
+
+    @Test
+    public void testGetTableId() {
+	// for Hazelcast implementation IKVTableID is table itself
+	assertEquals(table, table.getTableId());
+    }
+
+    @Test
+    public void testCreate() throws ObjectExistsException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	final long version = table.create(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+
+	assertEntryInTable(key, value, version);
+    }
+
+    @Test(expected = ObjectExistsException.class)
+    public void testCreateConflict() throws ObjectExistsException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	final long version = table.create(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+
+	assertEntryInTable(key, value, version);
+
+	table.create(key, value);
+	fail("Should have thrown exception");
+    }
+
+    @Test
+    public void testForceCreate() {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	final long version = table.forceCreate(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+	assertEntryInTable(key, value, version);
+
+
+	final long version1 = table.forceCreate(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version1);
+	assertNotEquals(version, version1);
+	assertEntryInTable(key, value, version1);
+    }
+
+    @Test
+    public void testRead() throws ObjectDoesntExistException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to read
+	final long version = table.forceCreate(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+	assertEntryInTable(key, value, version);
+
+	// test body
+	final IKVEntry readValue = table.read(key);
+	assertArrayEquals(key, readValue.getKey());
+	assertArrayEquals(value, readValue.getValue());
+	assertEquals(version, readValue.getVersion());
+    }
+
+
+    @Test(expected = ObjectDoesntExistException.class)
+    public void testReadNotExist() throws ObjectDoesntExistException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+
+	table.read(key);
+	fail("Should have thrown exception");
+    }
+
+    @Test
+    public void testUpdateByteArrayByteArrayLongSuccess() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to update
+	final long oldVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEntryInTable(key, oldValue, oldVersion);
+
+	final long version = table.update(key, value, oldVersion);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+	assertEntryInTable(key, value, version);
+    }
+
+    @Test(expected = ObjectDoesntExistException.class)
+    public void testUpdateByteArrayByteArrayLongFailNoOldValue() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	final long oldVersion = 0xDEAD;
+
+	final long version = table.update(key, value, oldVersion);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+	assertEntryInTable(key, value, version);
+    }
+
+    @Test(expected = WrongVersionException.class)
+    public void testUpdateByteArrayByteArrayLongFailWrongVersion() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to update
+	final long oldVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEntryInTable(key, oldValue, oldVersion);
+	// some one updates (from different thread/process in reality)
+	table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+
+
+	table.update(key, value, oldVersion);
+	fail("Should have thrown exception");
+    }
+
+    @Test
+    public void testUpdateByteArrayByteArraySuccess() throws ObjectDoesntExistException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to update
+	final long oldVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEntryInTable(key, oldValue, oldVersion);
+
+	final long version = table.update(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+	assertEntryInTable(key, value, version);
+    }
+
+    @Test(expected = ObjectDoesntExistException.class)
+    public void testUpdateByteArrayByteArrayFailNoOldValue() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	final long version = table.update(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+	assertEntryInTable(key, value, version);
+	fail("Should have thrown exception");
+    }
+
+    @Test
+    public void testUpdateByteArrayByteArraySuccessIgnoreVersion() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to update
+	final long oldVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEntryInTable(key, oldValue, oldVersion);
+	// someone updates (from different thread/process in reality)
+	table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+
+
+	final long version = table.update(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+	assertEntryInTable(key, value, version);
+    }
+
+    @Test
+    public void testDelete() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to delete
+	final long oldVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEntryInTable(key, oldValue, oldVersion);
+
+	long version = table.delete(key, oldVersion);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEquals(oldVersion, version);
+	assertKeyNotInTable(key);
+    }
+
+    @Test(expected = ObjectDoesntExistException.class)
+    public void testDeleteFailNoEntry() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+
+	final long oldVersion = 0xDEAD;
+
+	try {
+	    table.delete(key, oldVersion);
+	} catch (ObjectDoesntExistException | WrongVersionException e) {
+	    assertKeyNotInTable(key);
+	    throw e;
+	}
+	fail("Should have thrown exception");
+    }
+
+    @Test(expected = WrongVersionException.class)
+    public void testDeleteFailWrongVersion() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to delete
+	final long oldVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEntryInTable(key, oldValue, oldVersion);
+	// someone updates (from different thread/process in reality)
+	final long latestVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,latestVersion);
+
+	try {
+	    table.delete(key, oldVersion);
+	} catch (ObjectDoesntExistException | WrongVersionException e) {
+	    assertEntryInTable(key, oldValue, latestVersion);
+	    throw e;
+	}
+	fail("Should have thrown exception");
+    }
+
+
+
+    @Test
+    public void testForceDelete() {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to delete
+	final long oldVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEntryInTable(key, oldValue, oldVersion);
+
+	long version = table.forceDelete(key);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEquals(oldVersion, version);
+	assertKeyNotInTable(key);
+    }
+
+    @Test
+    public void testGetAllEntries() {
+	final int DATASETSIZE = 100;
+	final Map<byte[], VersionedValue> testdata = new TreeMap<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
+	for(int i = 0 ; i < DATASETSIZE ; ++i) {
+	    final byte[] key = (name.getMethodName()+i).getBytes(StandardCharsets.UTF_8);
+	    final byte[] value = ("Value"+i).getBytes(StandardCharsets.UTF_8);
+
+	    // put data to delete
+	    final long version = table.forceCreate(key, value);
+	    assertNotEquals(HZClient.VERSION_NONEXISTENT, version);
+	    assertEntryInTable(key, value, version);
+
+	    testdata.put(key, new VersionedValue(value, version));
+	}
+
+	Iterable<IKVEntry> datastore = table.getAllEntries();
+	for( IKVEntry entry : datastore ) {
+	    VersionedValue expectedValue = testdata.get(entry.getKey());
+	    assertNotNull(expectedValue);
+	    assertArrayEquals(expectedValue.getValue(), entry.getValue());
+	    assertEquals(expectedValue.getVersion(), entry.getVersion());
+
+	    testdata.remove(entry.getKey());
+	}
+
+	assertTrue(testdata.isEmpty());
+    }
+
+    @Test
+    public void testToString() {
+	assertEquals("[HZTable " + TEST_TABLE_NAME + "]", table.toString());
+    }
+
+}
diff --git a/src/test/java/net/onrc/onos/datastore/topology/KVSwitchNoDataStoreTest.java b/src/test/java/net/onrc/onos/datastore/topology/KVSwitchNoDataStoreTest.java
new file mode 100644
index 0000000..e656596
--- /dev/null
+++ b/src/test/java/net/onrc/onos/datastore/topology/KVSwitchNoDataStoreTest.java
@@ -0,0 +1,83 @@
+package net.onrc.onos.datastore.topology;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+
+import net.onrc.onos.datastore.topology.KVSwitch.STATUS;
+
+import org.junit.Test;
+
+public class KVSwitchNoDataStoreTest {
+
+    @Test
+    public void testGetDpidFromKeyByteArray() {
+	// reference bytes
+	final byte[] key = KVSwitch.getSwitchID(0x1L);
+
+	assertEquals(0x1L, KVSwitch.getDpidFromKey(key));
+    }
+
+    @Test
+    public void testGetDpidFromKeyByteBuffer() {
+	// reference bytes
+	final ByteBuffer key = ByteBuffer.wrap(KVSwitch.getSwitchID(0x1L));
+
+	assertEquals(0x1L, KVSwitch.getDpidFromKey(key));
+    }
+
+    @Test
+    public void testCreateFromKeyByteArray() {
+	// reference bytes
+	Long dpid = Long.valueOf(0x1L);
+	final byte[] key = KVSwitch.getSwitchID(dpid);
+
+	KVSwitch sw = KVSwitch.createFromKey(key);
+	assertNotNull(sw);
+	assertEquals(dpid, sw.getDpid());
+    }
+
+    @Test
+    public void testGetStatus() {
+	KVSwitch sw = new KVSwitch(0x1L);
+
+	assertEquals(STATUS.INACTIVE, sw.getStatus());
+    }
+
+    @Test
+    public void testSetStatus() {
+	KVSwitch sw = new KVSwitch(0x1L);
+	assertEquals(STATUS.INACTIVE, sw.getStatus());
+
+	sw.setStatus(STATUS.ACTIVE);
+	assertEquals(STATUS.ACTIVE, sw.getStatus());
+    }
+
+    @Test
+    public void testGetDpid() {
+	Long dpid = 0x1L;
+	KVSwitch sw = new KVSwitch(dpid);
+	assertEquals(dpid, sw.getDpid());
+    }
+
+    @Test
+    public void testGetId() {
+	// reference bytes
+	Long dpid = Long.valueOf(0x1L);
+	final byte[] key = KVSwitch.getSwitchID(dpid);
+
+	KVSwitch sw = KVSwitch.createFromKey(key);
+	assertArrayEquals(key, sw.getId());
+    }
+
+    @Test
+    public void testToString() {
+	final String expected = "[" + "KVSwitch"
+		+ " 0x" + 1 + " STATUS:" + STATUS.INACTIVE + "]";
+
+	Long dpid = 0x1L;
+	KVSwitch sw = new KVSwitch(dpid);
+
+	assertEquals(expected, sw.toString());
+    }
+}
diff --git a/src/test/java/net/onrc/onos/datastore/topology/KVSwitchTest.java b/src/test/java/net/onrc/onos/datastore/topology/KVSwitchTest.java
new file mode 100644
index 0000000..ce73034
--- /dev/null
+++ b/src/test/java/net/onrc/onos/datastore/topology/KVSwitchTest.java
@@ -0,0 +1,203 @@
+package net.onrc.onos.datastore.topology;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import net.onrc.onos.datastore.DataStoreClient;
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.topology.KVSwitch.STATUS;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KVSwitchTest {
+    IKVTable switchTable;
+    static final Long dpid1 = 0x1L;
+    KVSwitch sw1;
+
+    @Before
+    public void setUp() throws Exception {
+	switchTable = DataStoreClient.getClient().getTable(KVSwitch.GLOBAL_SWITCH_TABLE_NAME);
+	sw1 = new KVSwitch(dpid1);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+	DataStoreClient.getClient().dropTable(switchTable);
+    }
+
+    public KVSwitch assertSwitchInDataStore(final Long dpid, final STATUS status) {
+	try {
+	    final KVSwitch sw = new KVSwitch(dpid);
+	    sw.read();
+	    assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	    assertEquals(dpid, sw.getDpid());
+	    assertEquals(status, sw.getStatus());
+	    return sw;
+	} catch (ObjectDoesntExistException e) {
+	    fail("Switch was not written to datastore");
+	}
+	return null;
+    }
+    public void assertSwitchNotInDataStore(final Long dpid) {
+	final KVSwitch sw = new KVSwitch(dpid);
+	try {
+	    sw.read();
+	    fail("Switch was not supposed to be there in datastore");
+	} catch (ObjectDoesntExistException e) {
+	}
+    }
+
+    @Test
+    public void testGetAllSwitches() throws ObjectExistsException {
+	final int NUM_SWITCHES = 100;
+	Map<Long,KVSwitch> expected = new HashMap<>();
+	for (long dpid = 1 ; dpid <= NUM_SWITCHES ; ++dpid) {
+	    KVSwitch sw = new KVSwitch(dpid);
+	    sw.setStatus(STATUS.ACTIVE);
+	    sw.create();
+	    assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	    expected.put(sw.getDpid(), sw);
+	}
+
+	Iterable<KVSwitch> switches = KVSwitch.getAllSwitches();
+
+	for (KVSwitch sw : switches) {
+	    KVSwitch expectedSw = expected.get(sw.getDpid());
+	    assertNotNull(expectedSw);
+	    assertEquals(expectedSw.getDpid(), sw.getDpid());
+	    assertEquals(expectedSw.getStatus(), sw.getStatus());
+	    assertEquals(expectedSw.getVersion(), sw.getVersion());
+
+	    assertArrayEquals(expectedSw.getKey(), sw.getKey());
+	}
+    }
+
+    @Test
+    public void testCreate() throws ObjectExistsException {
+	sw1.setStatus(STATUS.ACTIVE);
+	sw1.create();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+
+	assertEquals(dpid1, sw1.getDpid());
+	assertEquals(STATUS.ACTIVE, sw1.getStatus());
+
+	assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+    }
+
+    @Test(expected = ObjectExistsException.class)
+    public void testCreateFailAlreadyExist() throws ObjectExistsException {
+	// setup pre-existing Switch
+	KVSwitch sw = new KVSwitch(dpid1);
+	sw.forceCreate();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	assertSwitchInDataStore(dpid1, STATUS.INACTIVE);
+
+	sw1.setStatus(STATUS.ACTIVE);
+	sw1.create();
+	fail("Should have thrown an exception");
+    }
+
+    @Test
+    public void testForceCreate() {
+	// setup pre-existing Switch
+	KVSwitch sw = new KVSwitch(dpid1);
+	sw.forceCreate();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	assertSwitchInDataStore(dpid1, STATUS.INACTIVE);
+
+
+	sw1.setStatus(STATUS.ACTIVE);
+	sw1.forceCreate();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+
+	assertEquals(dpid1, sw1.getDpid());
+	assertEquals(STATUS.ACTIVE, sw1.getStatus());
+	assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+    }
+
+    @Test
+    public void testRead() throws ObjectDoesntExistException {
+	// setup pre-existing Switch
+	KVSwitch sw = new KVSwitch(dpid1);
+	sw.setStatus(STATUS.ACTIVE);
+	sw.forceCreate();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+
+	sw1.read();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+	assertEquals(sw.getVersion(), sw1.getVersion());
+	assertEquals(dpid1, sw1.getDpid());
+	assertEquals(STATUS.ACTIVE, sw1.getStatus());
+    }
+
+    @Test(expected = ObjectDoesntExistException.class)
+    public void testReadFailNoExist() throws ObjectDoesntExistException {
+
+	sw1.read();
+	fail("Should have thrown an exception");
+    }
+
+    @Test
+    public void testUpdate() throws ObjectDoesntExistException, WrongVersionException {
+	// setup pre-existing Switch
+	KVSwitch sw = new KVSwitch(dpid1);
+	sw.setStatus(STATUS.ACTIVE);
+	sw.forceCreate();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+
+
+	sw1.read();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+
+	sw1.setStatus(STATUS.INACTIVE);
+	sw1.update();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+	assertNotEquals(sw.getVersion(), sw1.getVersion());
+	assertEquals(dpid1, sw1.getDpid());
+	assertEquals(STATUS.INACTIVE, sw1.getStatus());
+    }
+
+    @Test
+    public void testDelete() throws ObjectDoesntExistException, WrongVersionException {
+	// setup pre-existing Switch
+	KVSwitch sw = new KVSwitch(dpid1);
+	sw.setStatus(STATUS.ACTIVE);
+	sw.forceCreate();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+
+
+	try {
+	    sw1.read();
+	} catch (ObjectDoesntExistException e) {
+	    fail("Failed reading switch to delete");
+	}
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+	sw1.delete();
+	assertSwitchNotInDataStore(dpid1);
+    }
+
+    @Test
+    public void testForceDelete() {
+	// setup pre-existing Switch
+	KVSwitch sw = new KVSwitch(dpid1);
+	sw.setStatus(STATUS.ACTIVE);
+	sw.forceCreate();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+
+
+	sw1.forceDelete();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+    }
+
+}
diff --git a/src/test/java/net/onrc/onos/datastore/topology/KVTopologyTest.java b/src/test/java/net/onrc/onos/datastore/topology/KVTopologyTest.java
new file mode 100644
index 0000000..4fe0a1b
--- /dev/null
+++ b/src/test/java/net/onrc/onos/datastore/topology/KVTopologyTest.java
@@ -0,0 +1,423 @@
+package net.onrc.onos.datastore.topology;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.Matchers.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import net.onrc.onos.datastore.DataStoreClient;
+import net.onrc.onos.datastore.IKVClient;
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.utils.ByteArrayComparator;
+import net.onrc.onos.datastore.utils.KVObject;
+import net.onrc.onos.datastore.utils.KVObject.WriteOp;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KVTopologyTest {
+
+    static final long VERSION_NONEXISTENT = DataStoreClient.getClient().VERSION_NONEXISTENT();
+
+    private static final byte[] DEVICE2_MAC_SW2P2 = new byte[] { 6, 5, 4, 3, 2, 1, 0 };
+
+    private static final Long SW2_PORTNO2 = 2L;
+
+    private static final Long SW2_PORTNO1 = 1L;
+
+    private static final Long DPID2 = 0x2L;
+
+    private static final byte[] DEVICE1_MAC_SW1P1 = new byte[] { 0, 1, 2, 3, 4, 5, 6 };
+
+    private static final Long SW1_PORTNO2 = 2L;
+
+    private static final Long SW1_PORTNO1 = 1L;
+
+    private static final Long DPID1 = 0x1L;
+
+    @Before
+    @After
+    public void wipeTopology() throws Exception {
+	IKVTable switchTable = DataStoreClient.getClient().getTable(KVSwitch.GLOBAL_SWITCH_TABLE_NAME);
+	DataStoreClient.getClient().dropTable(switchTable);
+
+	IKVTable portTable = DataStoreClient.getClient().getTable(KVPort.GLOBAL_PORT_TABLE_NAME);
+	DataStoreClient.getClient().dropTable(portTable);
+
+	IKVTable linkTable = DataStoreClient.getClient().getTable(KVLink.GLOBAL_LINK_TABLE_NAME);
+	DataStoreClient.getClient().dropTable(linkTable);
+
+	IKVTable deviceTable = DataStoreClient.getClient().getTable(KVDevice.GLOBAL_DEVICE_TABLE_NAME);
+	DataStoreClient.getClient().dropTable(deviceTable);
+    }
+
+    @Test
+    public void basic_switch_test() {
+	// create switch 0x1
+	try {
+	    KVSwitch sw = new KVSwitch(DPID1);
+	    sw.setStatus(KVSwitch.STATUS.ACTIVE);
+	    sw.create();
+	    assertNotEquals(VERSION_NONEXISTENT, sw.getVersion());
+	    assertEquals(DPID1, sw.getDpid());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, sw.getStatus());
+	} catch (ObjectExistsException e) {
+	    e.printStackTrace();
+	    fail("Create Switch Failed " + e);
+	}
+
+	// read switch 0x1
+	KVSwitch swRead = new KVSwitch(DPID1);
+	try {
+	    swRead.read();
+	    assertNotEquals(VERSION_NONEXISTENT, swRead.getVersion());
+	    assertEquals(DPID1, swRead.getDpid());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, swRead.getStatus());
+	} catch (ObjectDoesntExistException e) {
+	    e.printStackTrace();
+	    fail("Reading Switch Failed " + e);
+	}
+
+	// and update 0x1
+	swRead.setStatus(KVSwitch.STATUS.INACTIVE);
+	try {
+	    swRead.update();
+	    assertNotEquals(VERSION_NONEXISTENT, swRead.getVersion());
+	    assertEquals(DPID1, swRead.getDpid());
+	    assertEquals(KVSwitch.STATUS.INACTIVE, swRead.getStatus());
+	} catch (ObjectDoesntExistException | WrongVersionException e) {
+	    e.printStackTrace();
+	    fail("Updating Switch Failed " + e);
+	}
+
+	// read 0x1 again and delete
+	KVSwitch swRead2 = new KVSwitch(DPID1);
+	try {
+	    swRead2.read();
+	    assertNotEquals(VERSION_NONEXISTENT, swRead2.getVersion());
+	    assertEquals(DPID1, swRead2.getDpid());
+	    assertEquals(KVSwitch.STATUS.INACTIVE, swRead2.getStatus());
+	} catch (ObjectDoesntExistException e) {
+	    e.printStackTrace();
+	    fail("Reading Switch Again Failed " + e);
+	}
+
+	try {
+	    swRead2.delete();
+	    assertNotEquals(VERSION_NONEXISTENT, swRead2.getVersion());
+	} catch (ObjectDoesntExistException | WrongVersionException e) {
+	    e.printStackTrace();
+	    fail("Deleting Switch Failed " + e);
+	}
+
+	// make sure 0x1 is deleted
+	KVObject swRead3 = new KVSwitch(DPID1);
+	try {
+	    swRead3.read();
+	    fail(swRead3 + " was supposed to be deleted, but read succeed");
+	} catch (ObjectDoesntExistException e) {
+	    System.out.println("-- " + swRead3 + " not found as expected--");
+	    e.printStackTrace(System.out);
+	    System.out.println("---------------------------------------");
+	}
+    }
+
+    @Test
+    public void topology_setup_and_tear_down() {
+	topology_setup();
+	topology_walk();
+	topology_delete();
+    }
+
+    private static void topology_setup() {
+
+	// d1 - s1p1 - s1 - s1p2 - s2p1 - s2 - s2p2
+
+	KVSwitch sw1 = new KVSwitch(DPID1);
+	sw1.setStatus(KVSwitch.STATUS.ACTIVE);
+	try {
+	    sw1.create();
+	    assertNotEquals(VERSION_NONEXISTENT, sw1.getVersion());
+	    assertEquals(DPID1, sw1.getDpid());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, sw1.getStatus());
+	} catch (ObjectExistsException e) {
+	    e.printStackTrace();
+	    fail("Switch creation failed " + e);
+	}
+
+	KVPort sw1p1 = new KVPort(DPID1, SW1_PORTNO1);
+	sw1p1.setStatus(KVPort.STATUS.ACTIVE);
+	KVPort sw1p2 = new KVPort(DPID1, SW1_PORTNO2);
+	sw1p2.setStatus(KVPort.STATUS.ACTIVE);
+	try {
+	    sw1p1.create();
+	    assertNotEquals(VERSION_NONEXISTENT, sw1p1.getVersion());
+	    assertEquals(DPID1, sw1p1.getDpid());
+	    assertEquals(SW1_PORTNO1, sw1p1.getNumber());
+	    assertEquals(KVPort.STATUS.ACTIVE, sw1p1.getStatus());
+
+	    sw1p2.create();
+	    assertNotEquals(VERSION_NONEXISTENT, sw1p2.getVersion());
+	    assertEquals(DPID1, sw1p2.getDpid());
+	    assertEquals(SW1_PORTNO2, sw1p2.getNumber());
+	    assertEquals(KVPort.STATUS.ACTIVE, sw1p2.getStatus());
+	} catch (ObjectExistsException e) {
+	    e.printStackTrace();
+	    fail("Port creation failed " + e);
+	}
+
+	try {
+	    sw1.update();
+	    assertNotEquals(VERSION_NONEXISTENT, sw1.getVersion());
+	    assertEquals(DPID1, sw1.getDpid());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, sw1.getStatus());
+	} catch (ObjectDoesntExistException | WrongVersionException e) {
+	    e.printStackTrace();
+	    fail("Switch update failed " + e);
+	}
+
+	KVDevice d1 = new KVDevice(DEVICE1_MAC_SW1P1);
+	d1.addPortId(sw1p1.getId());
+
+	try {
+	    d1.create();
+	    assertNotEquals(VERSION_NONEXISTENT, d1.getVersion());
+	    assertEquals(1, d1.getAllPortIds().size());
+	    assertArrayEquals(sw1p1.getId(), d1.getAllPortIds().iterator().next());
+
+	    try {
+		sw1p1.update();
+		assertNotEquals(VERSION_NONEXISTENT, sw1p1.getVersion());
+		assertEquals(DPID1, sw1p1.getDpid());
+		assertEquals(SW1_PORTNO1, sw1p1.getNumber());
+		assertEquals(KVPort.STATUS.ACTIVE, sw1p1.getStatus());
+	    } catch (ObjectDoesntExistException | WrongVersionException e) {
+		e.printStackTrace();
+		fail("Link update failed " + e);
+	    }
+	} catch (ObjectExistsException e) {
+	    e.printStackTrace();
+	    fail("Device creation failed " + e);
+	}
+
+	KVSwitch sw2 = new KVSwitch(DPID2);
+	sw2.setStatus(KVSwitch.STATUS.ACTIVE);
+	KVPort sw2p1 = new KVPort(DPID2, SW2_PORTNO1);
+	sw2p1.setStatus(KVPort.STATUS.ACTIVE);
+	KVPort sw2p2 = new KVPort(DPID2, SW2_PORTNO2);
+	sw2p2.setStatus(KVPort.STATUS.ACTIVE);
+
+	KVDevice d2 = new KVDevice(DEVICE2_MAC_SW2P2);
+	d2.addPortId(sw2p2.getId());
+
+	IKVClient client = DataStoreClient.getClient();
+
+	List<WriteOp> groupOp = Arrays.asList(
+		sw2.createOp(client), sw2p1.createOp(client),
+		sw2p2.createOp(client), d2.createOp(client) );
+	boolean failed = KVObject.multiWrite(groupOp);
+	if (failed) {
+	    for ( WriteOp op : groupOp ) {
+		System.err.println(op);
+	    }
+	    fail("Some of Switch/Port/Device creation failed");
+	} else {
+	    assertNotEquals(VERSION_NONEXISTENT, sw2.getVersion());
+	    assertEquals(DPID2, sw2.getDpid());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, sw2.getStatus());
+
+	    assertNotEquals(VERSION_NONEXISTENT, sw2p1.getVersion());
+	    assertEquals(DPID2, sw2p1.getDpid());
+	    assertEquals(SW2_PORTNO1, sw2p1.getNumber());
+	    assertEquals(KVPort.STATUS.ACTIVE, sw2p1.getStatus());
+
+	    assertNotEquals(VERSION_NONEXISTENT, sw2p2.getVersion());
+	    assertEquals(DPID2, sw2p2.getDpid());
+	    assertEquals(SW2_PORTNO2, sw2p2.getNumber());
+	    assertEquals(KVPort.STATUS.ACTIVE, sw2p2.getStatus());
+
+	    assertNotEquals(VERSION_NONEXISTENT, d2.getVersion());
+	    assertEquals(1, d2.getAllPortIds().size());
+	    assertArrayEquals(sw2p2.getId(), d2.getAllPortIds().iterator().next());
+	}
+
+	KVLink l1 = new KVLink(DPID1, SW1_PORTNO2, DPID2, SW2_PORTNO1);
+	l1.setStatus(KVLink.STATUS.ACTIVE);
+
+	try {
+	    l1.create();
+	    assertNotEquals(VERSION_NONEXISTENT, l1.getVersion());
+	    assertEquals(KVLink.STATUS.ACTIVE, l1.getStatus());
+	    assertArrayEquals(sw1.getId(), l1.getSrc().getSwitchID());
+	    assertArrayEquals(sw1p2.getId(), l1.getSrc().getPortID());
+	    assertArrayEquals(sw2.getId(), l1.getDst().getSwitchID());
+	    assertArrayEquals(sw2p1.getId(), l1.getDst().getPortID());
+
+	    try {
+		sw1p2.update();
+		assertNotEquals(VERSION_NONEXISTENT, sw1p2.getVersion());
+		assertEquals(DPID1, sw1p2.getDpid());
+		assertEquals(SW1_PORTNO2, sw1p2.getNumber());
+		assertEquals(KVPort.STATUS.ACTIVE, sw1p2.getStatus());
+
+		sw2p1.update();
+		assertNotEquals(VERSION_NONEXISTENT, sw2p1.getVersion());
+		assertEquals(DPID2, sw2p1.getDpid());
+		assertEquals(SW2_PORTNO1, sw2p1.getNumber());
+		assertEquals(KVPort.STATUS.ACTIVE, sw2p1.getStatus());
+	    } catch (ObjectDoesntExistException | WrongVersionException e) {
+		e.printStackTrace();
+		fail("Port update failed " + e);
+	    }
+	} catch (ObjectExistsException e) {
+	    e.printStackTrace();
+	    fail("Link creation failed " + e);
+	}
+    }
+
+
+    private static void topology_walk() {
+	Iterable<KVSwitch> swIt = KVSwitch.getAllSwitches();
+	List<Long> switchesExpected = new ArrayList<>(Arrays.asList(DPID1, DPID2));
+
+	System.out.println("Enumerating Switches start");
+	for (KVSwitch sw : swIt) {
+	    System.out.println(sw + " @ " + sw.getVersion());
+	    assertNotEquals(VERSION_NONEXISTENT, sw.getVersion());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, sw.getStatus());
+	    assertThat(sw.getDpid(), is(anyOf(equalTo(DPID1), equalTo(DPID2))));
+	    assertThat(switchesExpected, hasItem(sw.getDpid()));
+	    switchesExpected.remove(sw.getDpid());
+	}
+	System.out.println("Enumerating Switches end");
+
+	KVSwitch sw1 = new KVSwitch(DPID1);
+	try {
+	    sw1.read();
+	    assertNotEquals(VERSION_NONEXISTENT, sw1.getVersion());
+	    assertEquals(DPID1, sw1.getDpid());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, sw1.getStatus());
+	} catch (ObjectDoesntExistException e) {
+	    e.printStackTrace();
+	    fail("Reading switch failed " + e);
+	}
+
+	KVSwitch sw2 = new KVSwitch(DPID2);
+	if (KVObject.multiRead( Arrays.asList(sw2) )) {
+	    fail("Failed to read switch " + sw2);
+	} else {
+	    assertNotEquals(VERSION_NONEXISTENT, sw2.getVersion());
+	    assertEquals(DPID2, sw2.getDpid());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, sw2.getStatus());
+	}
+
+
+	// DPID -> [port_no]
+	@SuppressWarnings("serial")
+	Map<Long,List<Long>> expectedPorts = new HashMap<Long,List<Long>>() {{
+	    put(DPID1, new ArrayList<>(Arrays.asList(SW1_PORTNO1, SW1_PORTNO2)));
+	    put(DPID2, new ArrayList<>(Arrays.asList(SW2_PORTNO1, SW2_PORTNO2)));
+	}};
+
+	for (KVPort port : KVPort.getAllPorts()) {
+	    System.out.println(port + " @ " + port.getVersion());
+	    assertNotEquals(VERSION_NONEXISTENT, port.getVersion());
+	    assertEquals(KVPort.STATUS.ACTIVE, port.getStatus());
+	    assertThat(port.getDpid(), is(anyOf(equalTo(DPID1), equalTo(DPID2))));
+	    assertThat(port.getNumber(), is(anyOf(equalTo(SW1_PORTNO1), equalTo(SW1_PORTNO2))));
+
+	    assertThat(expectedPorts, hasKey(port.getDpid()));
+	    assertThat(expectedPorts.get(port.getDpid()), hasItem(port.getNumber()));
+	    expectedPorts.get(port.getDpid()).remove(port.getNumber());
+	}
+
+	// DeviceID -> PortID
+	@SuppressWarnings("serial")
+	Map<byte[], byte[]> expectedDevice = new TreeMap<byte[], byte[]>(ByteArrayComparator.BYTEARRAY_COMPARATOR) {{
+	    put(DEVICE1_MAC_SW1P1, KVPort.getPortID(DPID1, SW1_PORTNO1));
+	    put(DEVICE2_MAC_SW2P2, KVPort.getPortID(DPID2, SW2_PORTNO2));
+	}};
+
+	for (KVDevice device : KVDevice.getAllDevices()) {
+	    System.out.println(device + " @ " + device.getVersion());
+	    assertNotEquals(VERSION_NONEXISTENT, device.getVersion());
+
+	    assertThat(expectedDevice, hasKey(device.getMac()));
+	    assertThat(device.getAllPortIds(), hasItem(expectedDevice.get(device.getMac())));
+	    expectedDevice.remove(device.getMac());
+	}
+
+	for (KVLink link : KVLink.getAllLinks()) {
+	    System.out.println(link + " @ " + link.getVersion());
+	    assertNotEquals(VERSION_NONEXISTENT, link.getVersion());
+
+	    // there is currently only 1 link SW1P2->SW2P1
+	    assertEquals(DPID1, link.getSrc().dpid);
+	    assertEquals(SW1_PORTNO2, link.getSrc().number);
+	    assertEquals(DPID2, link.getDst().dpid);
+	    assertEquals(SW2_PORTNO1, link.getDst().number);
+	}
+
+    }
+
+
+    private static void topology_delete() {
+
+	for (KVSwitch sw : KVSwitch.getAllSwitches()) {
+	    try {
+		sw.read();
+		sw.delete();
+		assertNotEquals(VERSION_NONEXISTENT, sw.getVersion());
+	    } catch (ObjectDoesntExistException | WrongVersionException e) {
+		e.printStackTrace();
+		fail("Delete Switch Failed " + e);
+	    }
+	}
+
+	for (KVPort p : KVPort.getAllPorts()) {
+	    try {
+		p.read();
+		p.delete();
+		assertNotEquals(VERSION_NONEXISTENT, p.getVersion());
+	    } catch (ObjectDoesntExistException | WrongVersionException e) {
+		e.printStackTrace();
+		fail("Delete Port Failed " + e);
+	    }
+	}
+
+	for (KVDevice d : KVDevice.getAllDevices()) {
+	    d.forceDelete();
+	    assertNotEquals(VERSION_NONEXISTENT, d.getVersion());
+	}
+
+	for (KVLink l : KVLink.getAllLinks()) {
+	    try {
+		l.read();
+		l.delete();
+		assertNotEquals(VERSION_NONEXISTENT, l.getVersion());
+	    } catch (ObjectDoesntExistException | WrongVersionException e) {
+		e.printStackTrace();
+		fail("Delete Link Failed " + e);
+	    }
+	}
+    }
+
+    public static void main(final String[] argv) {
+
+	topology_setup();
+	topology_walk();
+	topology_delete();
+
+	System.exit(0);
+    }
+
+}