Add option to use Hazelcast as datastore for development environment.

This patch will set the default backend as Hazelcast.
To use RAMCloud as backend data store, add -Dnet.onrc.onos.datastore.backend=ramcloud to java option.

- ClientMode: Use existing Hazelcast Instance if it exist.
  - map name starting with datastore:// is now configured to be strong consistent

- add main for manual testing
- follow PMD,etc. where easily possible

- make HZClient Singleton

Change-Id: Ibe2afc3bfddfd7fd567c91477c16cd679fc543d4
diff --git a/conf/hazelcast.xml b/conf/hazelcast.xml
index 84c7354..cf44fc3 100644
--- a/conf/hazelcast.xml
+++ b/conf/hazelcast.xml
@@ -3,6 +3,16 @@
 	xmlns="http://www.hazelcast.com/schema/config"
 	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
 
+  <!-- Configuration of maps used as a data store. -->
+  <map name="datastore://*">
+    <!-- must use 'sync' backup to imitate other data store -->
+    <backup-count>3</backup-count>
+    <async-backup-count>0</async-backup-count>
+    <!-- must be false for strong consistency -->
+    <read-backup-data>false</read-backup-data>
+    <!-- near cache must not be used -->
+  </map>
+
   <map name="*">
 
     <!--
diff --git a/pom.xml b/pom.xml
index bfff1bc..2f4b1b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,6 +36,7 @@
      see https://github.com/OPENNETWORKINGLAB/ONOS/pull/425
     <github.global.server>github</github.global.server>
      -->
+    <hazelcast.version>3.0.2</hazelcast.version>
   </properties>
   <build>
     <plugins>
@@ -519,7 +520,12 @@
     <dependency>
       <groupId>com.hazelcast</groupId>
       <artifactId>hazelcast</artifactId>
-      <version>3.0.2</version>
+      <version>${hazelcast.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.hazelcast</groupId>
+      <artifactId>hazelcast-client</artifactId>
+      <version>${hazelcast.version}</version>
     </dependency>
     <dependency>
       <groupId>net.sf.json-lib</groupId>
diff --git a/src/main/java/net/onrc/onos/datastore/DataStoreClient.java b/src/main/java/net/onrc/onos/datastore/DataStoreClient.java
index 88daa8e..6aeb49e 100644
--- a/src/main/java/net/onrc/onos/datastore/DataStoreClient.java
+++ b/src/main/java/net/onrc/onos/datastore/DataStoreClient.java
@@ -1,10 +1,26 @@
 package net.onrc.onos.datastore;
 
+import net.onrc.onos.datastore.hazelcast.HZClient;
 import net.onrc.onos.datastore.ramcloud.RCClient;
 
+// This class probably need to be a service
 public class DataStoreClient {
+    private static final String BACKEND = System.getProperty("net.onrc.onos.datastore.backend", "hazelcast");
+
+    // Suppresses default constructor, ensuring non-instantiability.
+    private DataStoreClient() {}
+
     public static IKVClient getClient() {
 	// TODO read config and return appropriate IKVClient
-	return RCClient.getClient();
+	switch (BACKEND) {
+	case "ramcloud":
+	    return RCClient.getClient();
+	case "hazelcast":
+	    return HZClient.getClient();
+	default:
+	    return HZClient.getClient();
+	}
     }
+
+
 }
diff --git a/src/main/java/net/onrc/onos/datastore/hazelcast/HZClient.java b/src/main/java/net/onrc/onos/datastore/hazelcast/HZClient.java
new file mode 100644
index 0000000..2c2b3a4
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/hazelcast/HZClient.java
@@ -0,0 +1,345 @@
+package net.onrc.onos.datastore.hazelcast;
+
+import java.io.FileNotFoundException;
+import java.util.Collection;
+import java.util.List;
+
+import net.onrc.onos.datastore.IKVClient;
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.datastore.IKVTableID;
+import net.onrc.onos.datastore.IMultiEntryOperation;
+import net.onrc.onos.datastore.IMultiEntryOperation.OPERATION;
+import net.onrc.onos.datastore.IMultiEntryOperation.STATUS;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.hazelcast.HZTable.VersionedValue;
+import net.onrc.onos.datastore.internal.IModifiableMultiEntryOperation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.client.HazelcastClient;
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.config.SerializationConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+
+public class HZClient implements IKVClient {
+    private static final Logger log = LoggerFactory.getLogger(HZClient.class);
+
+    static final long VERSION_NONEXISTENT = 0L;
+
+    private static final String MAP_PREFIX = "datastore://";
+
+    // make this path configurable
+    private static final String BASE_CONFIG_FILENAME = System.getProperty("net.onrc.onos.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
+    private static boolean useClientMode = Boolean.parseBoolean(System.getProperty("net.onrc.onos.datastore.hazelcast.clientMode", "true"));
+
+    // Note: xml configuration will overwrite this value if present
+    private static int backupCount = Integer.valueOf(System.getProperty("net.onrc.onos.datastore.hazelcast.backupCount", "3"));
+
+    private final HazelcastInstance hazelcastInstance;
+
+    private static final HZClient THE_INSTANCE = new HZClient();
+
+    public static HZClient getClient() {
+	return THE_INSTANCE;
+    }
+
+    private HZClient() {
+	hazelcastInstance = getHZinstance(BASE_CONFIG_FILENAME);
+    }
+
+    private static HazelcastInstance getHZinstance(final String hazelcastConfigFileName) {
+	Config baseHzConfig = null;
+	try {
+	    baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
+	} catch (FileNotFoundException e) {
+	    log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
+	    throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName , e);
+	}
+
+	// use xml config if present, if not use System.property
+	MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
+	if (mapConfig != null) {
+	    backupCount = mapConfig.getBackupCount();
+	}
+
+	HazelcastInstance instance = null;
+	if (useClientMode) {
+	    log.info("Configuring Hazelcast datastore as Client mode");
+	    ClientConfig clientConfig = new ClientConfig();
+	    final int port = baseHzConfig.getNetworkConfig().getPort();
+
+	    String server = System.getProperty("net.onrc.onos.datastore.hazelcast.client.server", "localhost");
+	    clientConfig.addAddress(server + ":" + port);
+
+	    // copy group config from base Hazelcast configuration
+	    clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
+	    clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
+
+	    // TODO We probably need to figure out what else need to be
+	    // derived from baseConfig
+
+	    registerSerializer(clientConfig.getSerializationConfig());
+
+	    log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
+
+	    try {
+		instance = HazelcastClient.newHazelcastClient(clientConfig);
+		if (!instance.getCluster().getMembers().isEmpty()) {
+		    log.debug("Members in cluster: " + instance.getCluster().getMembers());
+		    return instance;
+		}
+		log.info("Failed to find cluster member, falling back to Instance mode");
+	    } catch (IllegalStateException e) {
+		log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
+	    }
+	    useClientMode = false;
+	    instance = null;
+	}
+	log.info("Configuring Hazelcast datastore as Instance mode");
+
+	// To run 2 Hazelcast instance in 1 JVM,
+	// we probably need to something like below
+	//int port = hazelcastConfig.getNetworkConfig().getPort();
+	//hazelcastConfig.getNetworkConfig().setPort(port+1);
+
+	registerSerializer(baseHzConfig.getSerializationConfig());
+
+	return Hazelcast.newHazelcastInstance(baseHzConfig);
+    }
+
+    /**
+     * Register serializer for VersionedValue class used to imitate value version.
+     * @param config
+     */
+    private static void registerSerializer(final SerializationConfig config) {
+	config.addDataSerializableFactoryClass(
+		    VersionedValueSerializableFactory.FACTORY_ID,
+		    VersionedValueSerializableFactory.class);
+    }
+
+    @Override
+    public IKVTable getTable(final String tableName) {
+	IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
+
+	if (!useClientMode) {
+	    // config only available in Instance Mode
+	    // Client Mode must rely on hazelcast.xml to be properly configured.
+	    MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
+	    // config for this map to be strong consistent
+	    if (config.isReadBackupData()) {
+		config.setReadBackupData(false);
+	    }
+	    if (config.isNearCacheEnabled()) {
+		config.getNearCacheConfig().setMaxSize(0);
+	    }
+
+	    if (config.getBackupCount() != backupCount) {
+		config.setAsyncBackupCount(0);
+		config.setBackupCount(backupCount);
+	    }
+	}
+
+	return new HZTable(tableName, map);
+    }
+
+    @Override
+    public void dropTable(final IKVTable table) {
+	((HZTable) table).getBackendMap().clear();
+    }
+
+    @Override
+    public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
+            throws ObjectExistsException {
+	IKVTable table = (IKVTable) tableId;
+	return table.create(key, value);
+    }
+
+    @Override
+    public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
+	IKVTable table = (IKVTable) tableId;
+	return table.forceCreate(key, value);
+    }
+
+    @Override
+    public IKVEntry read(final IKVTableID tableId, final byte[] key)
+            throws ObjectDoesntExistException {
+	IKVTable table = (IKVTable) tableId;
+	return table.read(key);
+    }
+
+    @Override
+    public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
+            final long version) throws ObjectDoesntExistException,
+            WrongVersionException {
+	IKVTable table = (IKVTable) tableId;
+	return table.update(key, value, version);
+    }
+
+    @Override
+    public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
+            throws ObjectDoesntExistException {
+	IKVTable table = (IKVTable) tableId;
+	return table.update(key, value);
+    }
+
+    @Override
+    public long delete(final IKVTableID tableId, final byte[] key, final long version)
+            throws ObjectDoesntExistException, WrongVersionException {
+	IKVTable table = (IKVTable) tableId;
+	return table.delete(key, version);
+    }
+
+    @Override
+    public long forceDelete(final IKVTableID tableId, final byte[] key) {
+	IKVTable table = (IKVTable) tableId;
+	return table.forceDelete(key);
+    }
+
+    @Override
+    public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
+	IKVTable table = (IKVTable) tableId;
+	return table.getAllEntries();
+    }
+
+    @Override
+    public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
+            final byte[] value) {
+	return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
+    }
+
+    @Override
+    public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
+            final byte[] value) {
+	return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
+    }
+
+    @Override
+    public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
+	return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
+    }
+
+    @Override
+    public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
+            final byte[] value, final long version) {
+	return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
+    }
+
+    @Override
+    public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
+            final byte[] value, final long version) {
+	return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
+    }
+
+    @Override
+    public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
+	return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
+    }
+
+    @Override
+    public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
+	boolean failExists = false;
+	for (IMultiEntryOperation op : ops) {
+	    HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
+	    switch (mop.getOperation()) {
+	    case DELETE:
+		try {
+		    final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
+		    mop.setVersion(version);
+		    mop.setStatus(STATUS.SUCCESS);
+		} catch (ObjectDoesntExistException | WrongVersionException e) {
+		    log.error(mop + " failed.", e);
+		    mop.setStatus(STATUS.FAILED);
+                    failExists = true;
+		}
+		break;
+	    case FORCE_DELETE:
+		final long version = forceDelete(mop.getTableId(), mop.getKey());
+		mop.setVersion(version);
+		mop.setStatus(STATUS.SUCCESS);
+		break;
+	    default:
+		throw new UnsupportedOperationException(mop.toString());
+	    }
+	}
+	return failExists;
+    }
+
+    @Override
+    public boolean multiWrite(final List<IMultiEntryOperation> ops) {
+	// there may be room to batch to improve performance
+	boolean failExists = false;
+	for (IMultiEntryOperation op : ops) {
+	    IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+	    switch (mop.getOperation()) {
+	    case CREATE:
+		try {
+		    long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
+		    mop.setVersion(version);
+		    mop.setStatus(STATUS.SUCCESS);
+		} catch (ObjectExistsException e) {
+		    log.error(mop + " failed.", e);
+		    mop.setStatus(STATUS.FAILED);
+		    failExists = true;
+		}
+		break;
+	    case FORCE_CREATE:
+	    {
+		final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
+		mop.setVersion(version);
+		mop.setStatus(STATUS.SUCCESS);
+		break;
+	    }
+	    case UPDATE:
+		try {
+		    long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
+		    mop.setVersion(version);
+		    mop.setStatus(STATUS.SUCCESS);
+		} catch (ObjectDoesntExistException | WrongVersionException e) {
+		    log.error(mop + " failed.", e);
+		    mop.setStatus(STATUS.FAILED);
+		    failExists = true;
+		}
+		break;
+	    default:
+		throw new UnsupportedOperationException(mop.toString());
+	    }
+	}
+	return failExists;
+    }
+
+    @Override
+    public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
+	boolean failExists = false;
+	for (IMultiEntryOperation op : ops) {
+	    IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+	    HZTable table = (HZTable) op.getTableId();
+	    ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
+	}
+	for (IMultiEntryOperation op : ops) {
+	    IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+	    if (mop.hasSucceeded()) {
+		// status update is already done, nothing to do.
+	    } else {
+		failExists = true;
+	    }
+	}
+
+	return failExists;
+    }
+
+    @Override
+    public long VERSION_NONEXISTENT() {
+	return VERSION_NONEXISTENT;
+    }
+
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/hazelcast/HZMultiEntryOperation.java b/src/main/java/net/onrc/onos/datastore/hazelcast/HZMultiEntryOperation.java
new file mode 100644
index 0000000..1da7f1a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/hazelcast/HZMultiEntryOperation.java
@@ -0,0 +1,162 @@
+package net.onrc.onos.datastore.hazelcast;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.datastore.IKVTableID;
+import net.onrc.onos.datastore.IMultiEntryOperation;
+import net.onrc.onos.datastore.hazelcast.HZTable.VersionedValue;
+import net.onrc.onos.datastore.internal.IModifiableMultiEntryOperation;
+import net.onrc.onos.datastore.utils.ByteArrayUtil;
+
+public class HZMultiEntryOperation implements IMultiEntryOperation, IModifiableMultiEntryOperation {
+    private static final Logger log = LoggerFactory.getLogger(HZMultiEntryOperation.class);
+
+    private final HZTable table;
+    private final byte[] key;
+    protected final OPERATION operation;
+    private STATUS status;
+
+    // for read op
+    private Future<VersionedValue> future;
+    // for write op
+    private VersionedValue writeValue;
+
+    /**
+     * Constructor for Read/ForceDelete Operation.
+     * @param table
+     * @param key
+     * @param operation
+     */
+    public HZMultiEntryOperation(final HZTable table, final byte[] key, final OPERATION operation) {
+	this.table = table;
+	this.key = key;
+	this.status = STATUS.NOT_EXECUTED;
+	this.operation = operation;
+
+	this.future = null;
+	this.writeValue = null;
+    }
+
+    /**
+     * Constructor for Other Operations.
+     * @param table
+     * @param key
+     * @param value
+     * @param version
+     * @param operation
+     */
+    public HZMultiEntryOperation(final HZTable table, final byte[] key, final byte[] value, final long version, final OPERATION operation) {
+	this.table = table;
+	this.key = key;
+	this.status = STATUS.NOT_EXECUTED;
+	this.operation = operation;
+
+	this.future = null;
+	this.writeValue = new VersionedValue(value, version);
+    }
+
+    @Override
+    public boolean hasSucceeded() {
+
+	VersionedValue value = get();
+	return (value != null) && (this.status == STATUS.SUCCESS);
+    }
+
+    @Override
+    public STATUS getStatus() {
+	get();
+	return status;
+    }
+
+    @Override
+    public IKVTableID getTableId() {
+	return table;
+    }
+
+    @Override
+    public byte[] getKey() {
+	return key;
+    }
+
+    @Override
+    public byte[] getValue() {
+	if (future != null) {
+	    VersionedValue value = get();
+	    return value.getValue();
+	}
+	return writeValue.getValue();
+    }
+
+    @Override
+    public long getVersion() {
+	if (future != null) {
+	    VersionedValue value = get();
+	    return value.getVersion();
+	}
+	return writeValue.getVersion();
+    }
+
+    @Override
+    public OPERATION getOperation() {
+	return operation;
+    }
+
+    /**
+     * Evaluate Future object and set Status and Value+Version.
+     * @return the value read or null on failure.
+     */
+    private VersionedValue get() {
+        try {
+            VersionedValue value = future.get();
+            setValue(value.getValue(), value.getVersion());
+            setStatus(STATUS.SUCCESS);
+            return value;
+        } catch (CancellationException | InterruptedException | ExecutionException e) {
+            log.error(this + " has failed.", e);
+            setStatus(STATUS.FAILED);
+            return null;
+        }
+    }
+
+    @Override
+    public void setValue(final byte[] value, final long version) {
+	writeValue = new VersionedValue(value, version);
+	setVersion(version);
+    }
+
+    @Override
+    public void setVersion(final long version) {
+	if (future != null) {
+	    // no-op on read
+	}
+	if (writeValue == null) {
+	    writeValue = new VersionedValue(null, version);
+	}
+    }
+
+    @Override
+    public void setStatus(final STATUS status) {
+	this.status = status;
+    }
+
+    @Override
+    public IModifiableMultiEntryOperation getActualOperation() {
+	return this;
+    }
+
+    void setFuture(final Future<VersionedValue> future) {
+        this.future = future;
+    }
+
+    @Override
+    public String toString() {
+	return "[HZMultiEntryOperation table=" + table + ", key="
+	        + ByteArrayUtil.toHexStringBuffer(key, ":") + ", operation=" + operation
+	        + ", status=" + status + ", writeValue=" + writeValue + "]";
+    }
+}
diff --git a/src/main/java/net/onrc/onos/datastore/hazelcast/HZTable.java b/src/main/java/net/onrc/onos/datastore/hazelcast/HZTable.java
new file mode 100644
index 0000000..ec98349
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/hazelcast/HZTable.java
@@ -0,0 +1,325 @@
+package net.onrc.onos.datastore.hazelcast;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.IKVTableID;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.core.IMap;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class HZTable implements IKVTable, IKVTableID {
+    @SuppressWarnings("unused")
+    private static final Logger log = LoggerFactory.getLogger(HZTable.class);
+
+    // not sure how strict this should be managed
+    private static final AtomicLong  initialVersion = new AtomicLong(HZClient.VERSION_NONEXISTENT);
+
+    /**
+     * generate a new initial version for an entry.
+     * @return initial value
+     */
+    protected static long getInitialVersion() {
+	long version = initialVersion.incrementAndGet();
+	if (version == HZClient.VERSION_NONEXISTENT) {
+	    // used up whole 64bit space?
+	    version = initialVersion.incrementAndGet();
+	}
+	return version;
+    }
+
+    /**
+     * increment version, avoiding VERSION_NONEXISTENT.
+     * @param version
+     * @return
+     */
+    protected static long getNextVersion(final long version) {
+	long nextVersion = version + 1;
+	if (nextVersion == HZClient.VERSION_NONEXISTENT) {
+	    ++nextVersion;
+	}
+	return nextVersion;
+    }
+
+    static class VersionedValue implements IdentifiedDataSerializable {
+	private static final long serialVersionUID = -3149375966890712708L;
+
+	private byte[] value;
+	private long version;
+
+	protected VersionedValue() {
+	    value = new byte[0];
+	    version = HZClient.VERSION_NONEXISTENT;
+	}
+
+	public VersionedValue(final byte[] value, final long version) {
+	    this.value = value;
+	    this.version = version;
+	}
+
+	public byte[] getValue() {
+	    return value;
+	}
+
+	public long getVersion() {
+	    return version;
+	}
+
+	public void setValue(final byte[] value) {
+	    this.value = value;
+	}
+
+	public void setNextVersion() {
+	    this.version = getNextVersion(this.version);
+	}
+
+	@Override
+	public void writeData(final ObjectDataOutput out) throws IOException {
+	    out.writeLong(version);
+	    out.writeInt(value.length);
+	    if (value.length > 0) {
+		out.write(value);
+	    }
+	}
+
+	@Override
+	public void readData(final ObjectDataInput in) throws IOException {
+	    version = in.readLong();
+	    final int valueLen = in.readInt();
+	    value = new byte[valueLen];
+	    in.readFully(value);
+	}
+
+	@Override
+	public int getFactoryId() {
+	    return VersionedValueSerializableFactory.FACTORY_ID;
+	}
+
+	@Override
+	public int getId() {
+	    return VersionedValueSerializableFactory.VERSIONED_VALUE_ID;
+	}
+
+	@Override
+	public int hashCode() {
+	    final int prime = 31;
+	    int result = 1;
+	    result = prime * result + (int) (version ^ (version >>> 32));
+	    result = prime * result + Arrays.hashCode(value);
+	    return result;
+	}
+
+	@Override
+	public boolean equals(final Object obj) {
+	    if (this == obj) {
+		return true;
+	    }
+	    if (obj == null) {
+		return false;
+	    }
+	    if (getClass() != obj.getClass()) {
+		return false;
+	    }
+	    VersionedValue other = (VersionedValue) obj;
+	    if (version != other.version) {
+		return false;
+	    }
+	    if (!Arrays.equals(value, other.value)) {
+		return false;
+	    }
+	    return true;
+	}
+    }
+
+    // TODO Refactor and extract common parts
+    public static class Entry implements IKVEntry {
+	final byte[] key;
+	byte[] value;
+	long version;
+
+	public Entry(final byte[] key, final byte[] value, final long version) {
+	    this.key = key;
+	    this.setValue(value);
+	    this.setVersion(version);
+	}
+
+	public Entry(final byte[] key) {
+	    this(key, null, HZClient.VERSION_NONEXISTENT);
+	}
+
+	@Override
+	public byte[] getKey() {
+	    return key;
+	}
+
+	@Override
+	public byte[] getValue() {
+	    return value;
+	}
+
+	@Override
+	public long getVersion() {
+	    return version;
+	}
+
+	void setValue(final byte[] value) {
+	    this.value = value;
+	}
+
+	void setVersion(final long version) {
+	    this.version = version;
+	}
+    }
+
+
+
+    private final String mapName;
+    private final IMap<byte[], VersionedValue> map;
+
+    public HZTable(final String mapName, final IMap<byte[], VersionedValue> map) {
+	this.mapName = mapName;
+	this.map = map;
+    }
+
+    @Override
+    public String getTableName() {
+	return mapName;
+    }
+
+    @Override
+    public IKVTableID getTableId() {
+	return this;
+    }
+
+    @Override
+    public long create(final byte[] key, final byte[] value) throws ObjectExistsException {
+	final long version = getInitialVersion();
+	VersionedValue existing = map.putIfAbsent(key, new VersionedValue(value, version));
+	if (existing != null) {
+	    throw new ObjectExistsException(this, key);
+	}
+	return version;
+    }
+
+    @Override
+    public long forceCreate(final byte[] key, final byte[] value) {
+	final long version = getInitialVersion();
+	map.set(key, new VersionedValue(value, version));
+	return version;
+    }
+
+    @Override
+    public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
+	final VersionedValue value = map.get(key);
+	if (value == null) {
+	    throw new ObjectDoesntExistException(this, key);
+	}
+	return new Entry(key, value.getValue(), value.getVersion());
+    }
+
+    @Override
+    public long update(final byte[] key, final byte[] value, final long version)
+            throws ObjectDoesntExistException, WrongVersionException {
+
+	try {
+	    map.lock(key);
+	    final VersionedValue oldValue = map.get(key);
+	    if (oldValue == null) {
+		throw new ObjectDoesntExistException(this, key);
+	    }
+	    if (oldValue.getVersion() != version) {
+		throw new WrongVersionException(this, key, version, oldValue.getVersion());
+	    }
+	    final long nextVersion = getNextVersion(version);
+	    map.set(key, new VersionedValue(value, nextVersion));
+	    return nextVersion;
+	} finally {
+	    map.unlock(key);
+	}
+    }
+
+    @Override
+    public long update(final byte[] key, final byte[] value)
+            throws ObjectDoesntExistException {
+
+	try {
+	    map.lock(key);
+	    final VersionedValue valueInMap = map.get(key);
+	    if (valueInMap == null) {
+		throw new ObjectDoesntExistException(this, key);
+	    }
+	    valueInMap.setValue(value);
+	    valueInMap.setNextVersion();
+	    map.set(key, valueInMap);
+	    return valueInMap.getVersion();
+	} finally {
+	    map.unlock(key);
+	}
+    }
+
+    @Override
+    public long delete(final byte[] key, final long version)
+            throws ObjectDoesntExistException, WrongVersionException {
+
+	try {
+	    map.lock(key);
+	    final VersionedValue oldValue = map.get(key);
+	    if (oldValue == null) {
+		throw new ObjectDoesntExistException(this, key);
+	    }
+	    if (oldValue.getVersion() != version) {
+		throw new WrongVersionException(this, key, version, oldValue.getVersion());
+	    }
+	    map.delete(key);
+	    return oldValue.getVersion();
+	} finally {
+	    map.unlock(key);
+	}
+    }
+
+    @Override
+    public long forceDelete(final byte[] key) {
+	final VersionedValue valueInMap = map.remove(key);
+	if (valueInMap == null) {
+	    return HZClient.VERSION_NONEXISTENT;
+	}
+	return valueInMap.getVersion();
+    }
+
+    @Override
+    public Iterable<IKVEntry> getAllEntries() {
+	final Set<IMap.Entry<byte[], VersionedValue>> entries = map.entrySet();
+	List<IKVEntry> entryList = new ArrayList<IKVTable.IKVEntry>(entries.size());
+	for (IMap.Entry<byte[], VersionedValue> entry : entries) {
+	    entryList.add(new Entry(entry.getKey(), entry.getValue().getValue(), entry.getValue().getVersion()));
+	}
+	return entryList;
+    }
+
+    @Override
+    public String toString() {
+	return "[HZTable " + mapName + "]";
+    }
+
+    IMap<byte[], VersionedValue> getBackendMap() {
+	return this.map;
+    }
+
+    @Override
+    public long VERSION_NONEXISTENT() {
+	return HZClient.VERSION_NONEXISTENT;
+    }
+}
diff --git a/src/main/java/net/onrc/onos/datastore/hazelcast/VersionedValueSerializableFactory.java b/src/main/java/net/onrc/onos/datastore/hazelcast/VersionedValueSerializableFactory.java
new file mode 100644
index 0000000..a4e7fb9
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/hazelcast/VersionedValueSerializableFactory.java
@@ -0,0 +1,25 @@
+package net.onrc.onos.datastore.hazelcast;
+
+
+import com.hazelcast.nio.serialization.DataSerializableFactory;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class VersionedValueSerializableFactory implements
+        DataSerializableFactory {
+    // revisit these magic numbers
+    public static final int FACTORY_ID = 1;
+
+    public static final int VERSIONED_VALUE_ID = 1;
+
+    @Override
+    public IdentifiedDataSerializable create(final int typeId) {
+	switch (typeId) {
+	case VERSIONED_VALUE_ID:
+	    return new HZTable.VersionedValue();
+
+	default:
+		return null;
+	}
+    }
+
+}
diff --git a/src/test/java/net/onrc/onos/datastore/hazelcast/HZTableTest.java b/src/test/java/net/onrc/onos/datastore/hazelcast/HZTableTest.java
new file mode 100644
index 0000000..a3bfab6
--- /dev/null
+++ b/src/test/java/net/onrc/onos/datastore/hazelcast/HZTableTest.java
@@ -0,0 +1,365 @@
+package net.onrc.onos.datastore.hazelcast;
+
+import static org.junit.Assert.*;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.TreeMap;
+
+import net.onrc.onos.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.hazelcast.HZTable.VersionedValue;
+import net.onrc.onos.datastore.utils.ByteArrayComparator;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class HZTableTest {
+    @Rule
+    public TestName name= new TestName();
+
+    static final String TEST_TABLE_NAME = "TableForUnitTest";
+    HZTable table;
+
+    @Before
+    public void setUp() throws Exception {
+	table = (HZTable) HZClient.getClient().getTable(TEST_TABLE_NAME);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+	HZClient.getClient().dropTable(table);
+    }
+
+    public void assertEntryInTable(final byte[] key, final byte[] value, final long version) {
+	VersionedValue valueblob = table.getBackendMap().get(key);
+	assertNotNull(valueblob);
+	assertArrayEquals(value, valueblob.getValue());
+	assertEquals(version, valueblob.getVersion());
+    }
+
+    public void assertKeyNotInTable(final byte[] key) {
+	VersionedValue valueblob = table.getBackendMap().get(key);
+	assertNull(valueblob);
+    }
+
+    @Test
+    public void testGetInitialVersion() {
+	final long version1 = HZTable.getInitialVersion();
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version1);
+
+	final long version2 = HZTable.getInitialVersion();
+	assertNotEquals(HZClient.VERSION_NONEXISTENT, version2);
+	assertNotEquals(version1, version2);
+    }
+
+    @Test
+    public void testGetNextVersion() {
+	final long nextVersion = HZTable.getNextVersion(1);
+	assertNotEquals(nextVersion, HZClient.VERSION_NONEXISTENT);
+	assertNotEquals(nextVersion, 1);
+
+	final long nextVersion1 = HZTable.getNextVersion(Long.MAX_VALUE);
+	assertNotEquals(nextVersion1, HZClient.VERSION_NONEXISTENT);
+	assertNotEquals(nextVersion1, Long.MAX_VALUE);
+
+	final long nextVersion11 = HZTable.getNextVersion(HZClient.VERSION_NONEXISTENT-1);
+	assertNotEquals(nextVersion11, HZClient.VERSION_NONEXISTENT);
+	assertNotEquals(nextVersion11, HZClient.VERSION_NONEXISTENT-1);
+    }
+
+    @Ignore // nothing to test for now
+    @Test
+    public void testHZTable() {
+	fail("Not yet implemented");
+    }
+
+    @Test
+    public void testGetTableName() {
+	assertEquals(TEST_TABLE_NAME, table.getTableName());
+    }
+
+    @Test
+    public void testVERSION_NONEXISTENT() {
+	assertEquals(HZClient.VERSION_NONEXISTENT, table.VERSION_NONEXISTENT());
+    }
+
+    @Test
+    public void testGetTableId() {
+	// for Hazelcast implementation IKVTableID is table itself
+	assertEquals(table, table.getTableId());
+    }
+
+    @Test
+    public void testCreate() throws ObjectExistsException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	final long version = table.create(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+
+	assertEntryInTable(key, value, version);
+    }
+
+    @Test(expected = ObjectExistsException.class)
+    public void testCreateConflict() throws ObjectExistsException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	final long version = table.create(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+
+	assertEntryInTable(key, value, version);
+
+	table.create(key, value);
+	fail("Should have thrown exception");
+    }
+
+    @Test
+    public void testForceCreate() {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	final long version = table.forceCreate(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+	assertEntryInTable(key, value, version);
+
+
+	final long version1 = table.forceCreate(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version1);
+	assertNotEquals(version, version1);
+	assertEntryInTable(key, value, version1);
+    }
+
+    @Test
+    public void testRead() throws ObjectDoesntExistException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to read
+	final long version = table.forceCreate(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+	assertEntryInTable(key, value, version);
+
+	// test body
+	final IKVEntry readValue = table.read(key);
+	assertArrayEquals(key, readValue.getKey());
+	assertArrayEquals(value, readValue.getValue());
+	assertEquals(version, readValue.getVersion());
+    }
+
+
+    @Test(expected = ObjectDoesntExistException.class)
+    public void testReadNotExist() throws ObjectDoesntExistException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+
+	table.read(key);
+	fail("Should have thrown exception");
+    }
+
+    @Test
+    public void testUpdateByteArrayByteArrayLongSuccess() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to update
+	final long oldVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEntryInTable(key, oldValue, oldVersion);
+
+	final long version = table.update(key, value, oldVersion);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+	assertEntryInTable(key, value, version);
+    }
+
+    @Test(expected = ObjectDoesntExistException.class)
+    public void testUpdateByteArrayByteArrayLongFailNoOldValue() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	final long oldVersion = 0xDEAD;
+
+	final long version = table.update(key, value, oldVersion);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+	assertEntryInTable(key, value, version);
+    }
+
+    @Test(expected = WrongVersionException.class)
+    public void testUpdateByteArrayByteArrayLongFailWrongVersion() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to update
+	final long oldVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEntryInTable(key, oldValue, oldVersion);
+	// some one updates (from different thread/process in reality)
+	table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+
+
+	table.update(key, value, oldVersion);
+	fail("Should have thrown exception");
+    }
+
+    @Test
+    public void testUpdateByteArrayByteArraySuccess() throws ObjectDoesntExistException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to update
+	final long oldVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEntryInTable(key, oldValue, oldVersion);
+
+	final long version = table.update(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+	assertEntryInTable(key, value, version);
+    }
+
+    @Test(expected = ObjectDoesntExistException.class)
+    public void testUpdateByteArrayByteArrayFailNoOldValue() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	final long version = table.update(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+	assertEntryInTable(key, value, version);
+	fail("Should have thrown exception");
+    }
+
+    @Test
+    public void testUpdateByteArrayByteArraySuccessIgnoreVersion() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+	final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to update
+	final long oldVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEntryInTable(key, oldValue, oldVersion);
+	// someone updates (from different thread/process in reality)
+	table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+
+
+	final long version = table.update(key, value);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+	assertEntryInTable(key, value, version);
+    }
+
+    @Test
+    public void testDelete() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to delete
+	final long oldVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEntryInTable(key, oldValue, oldVersion);
+
+	long version = table.delete(key, oldVersion);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEquals(oldVersion, version);
+	assertKeyNotInTable(key);
+    }
+
+    @Test(expected = ObjectDoesntExistException.class)
+    public void testDeleteFailNoEntry() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+
+	final long oldVersion = 0xDEAD;
+
+	try {
+	    table.delete(key, oldVersion);
+	} catch (ObjectDoesntExistException | WrongVersionException e) {
+	    assertKeyNotInTable(key);
+	    throw e;
+	}
+	fail("Should have thrown exception");
+    }
+
+    @Test(expected = WrongVersionException.class)
+    public void testDeleteFailWrongVersion() throws ObjectDoesntExistException, WrongVersionException {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to delete
+	final long oldVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEntryInTable(key, oldValue, oldVersion);
+	// someone updates (from different thread/process in reality)
+	final long latestVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,latestVersion);
+
+	try {
+	    table.delete(key, oldVersion);
+	} catch (ObjectDoesntExistException | WrongVersionException e) {
+	    assertEntryInTable(key, oldValue, latestVersion);
+	    throw e;
+	}
+	fail("Should have thrown exception");
+    }
+
+
+
+    @Test
+    public void testForceDelete() {
+	final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+	final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+
+	// put data to delete
+	final long oldVersion = table.forceCreate(key, oldValue);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEntryInTable(key, oldValue, oldVersion);
+
+	long version = table.forceDelete(key);
+	assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+	assertEquals(oldVersion, version);
+	assertKeyNotInTable(key);
+    }
+
+    @Test
+    public void testGetAllEntries() {
+	final int DATASETSIZE = 100;
+	final Map<byte[], VersionedValue> testdata = new TreeMap<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
+	for(int i = 0 ; i < DATASETSIZE ; ++i) {
+	    final byte[] key = (name.getMethodName()+i).getBytes(StandardCharsets.UTF_8);
+	    final byte[] value = ("Value"+i).getBytes(StandardCharsets.UTF_8);
+
+	    // put data to delete
+	    final long version = table.forceCreate(key, value);
+	    assertNotEquals(HZClient.VERSION_NONEXISTENT, version);
+	    assertEntryInTable(key, value, version);
+
+	    testdata.put(key, new VersionedValue(value, version));
+	}
+
+	Iterable<IKVEntry> datastore = table.getAllEntries();
+	for( IKVEntry entry : datastore ) {
+	    VersionedValue expectedValue = testdata.get(entry.getKey());
+	    assertNotNull(expectedValue);
+	    assertArrayEquals(expectedValue.getValue(), entry.getValue());
+	    assertEquals(expectedValue.getVersion(), entry.getVersion());
+
+	    testdata.remove(entry.getKey());
+	}
+
+	assertTrue(testdata.isEmpty());
+    }
+
+    @Test
+    public void testToString() {
+	assertEquals("[HZTable " + TEST_TABLE_NAME + "]", table.toString());
+    }
+
+}
diff --git a/src/test/java/net/onrc/onos/datastore/topology/KVSwitchNoDataStoreTest.java b/src/test/java/net/onrc/onos/datastore/topology/KVSwitchNoDataStoreTest.java
new file mode 100644
index 0000000..e656596
--- /dev/null
+++ b/src/test/java/net/onrc/onos/datastore/topology/KVSwitchNoDataStoreTest.java
@@ -0,0 +1,83 @@
+package net.onrc.onos.datastore.topology;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+
+import net.onrc.onos.datastore.topology.KVSwitch.STATUS;
+
+import org.junit.Test;
+
+public class KVSwitchNoDataStoreTest {
+
+    @Test
+    public void testGetDpidFromKeyByteArray() {
+	// reference bytes
+	final byte[] key = KVSwitch.getSwitchID(0x1L);
+
+	assertEquals(0x1L, KVSwitch.getDpidFromKey(key));
+    }
+
+    @Test
+    public void testGetDpidFromKeyByteBuffer() {
+	// reference bytes
+	final ByteBuffer key = ByteBuffer.wrap(KVSwitch.getSwitchID(0x1L));
+
+	assertEquals(0x1L, KVSwitch.getDpidFromKey(key));
+    }
+
+    @Test
+    public void testCreateFromKeyByteArray() {
+	// reference bytes
+	Long dpid = Long.valueOf(0x1L);
+	final byte[] key = KVSwitch.getSwitchID(dpid);
+
+	KVSwitch sw = KVSwitch.createFromKey(key);
+	assertNotNull(sw);
+	assertEquals(dpid, sw.getDpid());
+    }
+
+    @Test
+    public void testGetStatus() {
+	KVSwitch sw = new KVSwitch(0x1L);
+
+	assertEquals(STATUS.INACTIVE, sw.getStatus());
+    }
+
+    @Test
+    public void testSetStatus() {
+	KVSwitch sw = new KVSwitch(0x1L);
+	assertEquals(STATUS.INACTIVE, sw.getStatus());
+
+	sw.setStatus(STATUS.ACTIVE);
+	assertEquals(STATUS.ACTIVE, sw.getStatus());
+    }
+
+    @Test
+    public void testGetDpid() {
+	Long dpid = 0x1L;
+	KVSwitch sw = new KVSwitch(dpid);
+	assertEquals(dpid, sw.getDpid());
+    }
+
+    @Test
+    public void testGetId() {
+	// reference bytes
+	Long dpid = Long.valueOf(0x1L);
+	final byte[] key = KVSwitch.getSwitchID(dpid);
+
+	KVSwitch sw = KVSwitch.createFromKey(key);
+	assertArrayEquals(key, sw.getId());
+    }
+
+    @Test
+    public void testToString() {
+	final String expected = "[" + "KVSwitch"
+		+ " 0x" + 1 + " STATUS:" + STATUS.INACTIVE + "]";
+
+	Long dpid = 0x1L;
+	KVSwitch sw = new KVSwitch(dpid);
+
+	assertEquals(expected, sw.toString());
+    }
+}
diff --git a/src/test/java/net/onrc/onos/datastore/topology/KVSwitchTest.java b/src/test/java/net/onrc/onos/datastore/topology/KVSwitchTest.java
new file mode 100644
index 0000000..ce73034
--- /dev/null
+++ b/src/test/java/net/onrc/onos/datastore/topology/KVSwitchTest.java
@@ -0,0 +1,203 @@
+package net.onrc.onos.datastore.topology;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import net.onrc.onos.datastore.DataStoreClient;
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.topology.KVSwitch.STATUS;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KVSwitchTest {
+    IKVTable switchTable;
+    static final Long dpid1 = 0x1L;
+    KVSwitch sw1;
+
+    @Before
+    public void setUp() throws Exception {
+	switchTable = DataStoreClient.getClient().getTable(KVSwitch.GLOBAL_SWITCH_TABLE_NAME);
+	sw1 = new KVSwitch(dpid1);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+	DataStoreClient.getClient().dropTable(switchTable);
+    }
+
+    public KVSwitch assertSwitchInDataStore(final Long dpid, final STATUS status) {
+	try {
+	    final KVSwitch sw = new KVSwitch(dpid);
+	    sw.read();
+	    assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	    assertEquals(dpid, sw.getDpid());
+	    assertEquals(status, sw.getStatus());
+	    return sw;
+	} catch (ObjectDoesntExistException e) {
+	    fail("Switch was not written to datastore");
+	}
+	return null;
+    }
+    public void assertSwitchNotInDataStore(final Long dpid) {
+	final KVSwitch sw = new KVSwitch(dpid);
+	try {
+	    sw.read();
+	    fail("Switch was not supposed to be there in datastore");
+	} catch (ObjectDoesntExistException e) {
+	}
+    }
+
+    @Test
+    public void testGetAllSwitches() throws ObjectExistsException {
+	final int NUM_SWITCHES = 100;
+	Map<Long,KVSwitch> expected = new HashMap<>();
+	for (long dpid = 1 ; dpid <= NUM_SWITCHES ; ++dpid) {
+	    KVSwitch sw = new KVSwitch(dpid);
+	    sw.setStatus(STATUS.ACTIVE);
+	    sw.create();
+	    assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	    expected.put(sw.getDpid(), sw);
+	}
+
+	Iterable<KVSwitch> switches = KVSwitch.getAllSwitches();
+
+	for (KVSwitch sw : switches) {
+	    KVSwitch expectedSw = expected.get(sw.getDpid());
+	    assertNotNull(expectedSw);
+	    assertEquals(expectedSw.getDpid(), sw.getDpid());
+	    assertEquals(expectedSw.getStatus(), sw.getStatus());
+	    assertEquals(expectedSw.getVersion(), sw.getVersion());
+
+	    assertArrayEquals(expectedSw.getKey(), sw.getKey());
+	}
+    }
+
+    @Test
+    public void testCreate() throws ObjectExistsException {
+	sw1.setStatus(STATUS.ACTIVE);
+	sw1.create();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+
+	assertEquals(dpid1, sw1.getDpid());
+	assertEquals(STATUS.ACTIVE, sw1.getStatus());
+
+	assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+    }
+
+    @Test(expected = ObjectExistsException.class)
+    public void testCreateFailAlreadyExist() throws ObjectExistsException {
+	// setup pre-existing Switch
+	KVSwitch sw = new KVSwitch(dpid1);
+	sw.forceCreate();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	assertSwitchInDataStore(dpid1, STATUS.INACTIVE);
+
+	sw1.setStatus(STATUS.ACTIVE);
+	sw1.create();
+	fail("Should have thrown an exception");
+    }
+
+    @Test
+    public void testForceCreate() {
+	// setup pre-existing Switch
+	KVSwitch sw = new KVSwitch(dpid1);
+	sw.forceCreate();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	assertSwitchInDataStore(dpid1, STATUS.INACTIVE);
+
+
+	sw1.setStatus(STATUS.ACTIVE);
+	sw1.forceCreate();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+
+	assertEquals(dpid1, sw1.getDpid());
+	assertEquals(STATUS.ACTIVE, sw1.getStatus());
+	assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+    }
+
+    @Test
+    public void testRead() throws ObjectDoesntExistException {
+	// setup pre-existing Switch
+	KVSwitch sw = new KVSwitch(dpid1);
+	sw.setStatus(STATUS.ACTIVE);
+	sw.forceCreate();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+
+	sw1.read();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+	assertEquals(sw.getVersion(), sw1.getVersion());
+	assertEquals(dpid1, sw1.getDpid());
+	assertEquals(STATUS.ACTIVE, sw1.getStatus());
+    }
+
+    @Test(expected = ObjectDoesntExistException.class)
+    public void testReadFailNoExist() throws ObjectDoesntExistException {
+
+	sw1.read();
+	fail("Should have thrown an exception");
+    }
+
+    @Test
+    public void testUpdate() throws ObjectDoesntExistException, WrongVersionException {
+	// setup pre-existing Switch
+	KVSwitch sw = new KVSwitch(dpid1);
+	sw.setStatus(STATUS.ACTIVE);
+	sw.forceCreate();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+
+
+	sw1.read();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+
+	sw1.setStatus(STATUS.INACTIVE);
+	sw1.update();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+	assertNotEquals(sw.getVersion(), sw1.getVersion());
+	assertEquals(dpid1, sw1.getDpid());
+	assertEquals(STATUS.INACTIVE, sw1.getStatus());
+    }
+
+    @Test
+    public void testDelete() throws ObjectDoesntExistException, WrongVersionException {
+	// setup pre-existing Switch
+	KVSwitch sw = new KVSwitch(dpid1);
+	sw.setStatus(STATUS.ACTIVE);
+	sw.forceCreate();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+
+
+	try {
+	    sw1.read();
+	} catch (ObjectDoesntExistException e) {
+	    fail("Failed reading switch to delete");
+	}
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+	sw1.delete();
+	assertSwitchNotInDataStore(dpid1);
+    }
+
+    @Test
+    public void testForceDelete() {
+	// setup pre-existing Switch
+	KVSwitch sw = new KVSwitch(dpid1);
+	sw.setStatus(STATUS.ACTIVE);
+	sw.forceCreate();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+	assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+
+
+	sw1.forceDelete();
+	assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+    }
+
+}
diff --git a/src/test/java/net/onrc/onos/datastore/topology/KVTopologyTest.java b/src/test/java/net/onrc/onos/datastore/topology/KVTopologyTest.java
new file mode 100644
index 0000000..4fe0a1b
--- /dev/null
+++ b/src/test/java/net/onrc/onos/datastore/topology/KVTopologyTest.java
@@ -0,0 +1,423 @@
+package net.onrc.onos.datastore.topology;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.Matchers.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import net.onrc.onos.datastore.DataStoreClient;
+import net.onrc.onos.datastore.IKVClient;
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.utils.ByteArrayComparator;
+import net.onrc.onos.datastore.utils.KVObject;
+import net.onrc.onos.datastore.utils.KVObject.WriteOp;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KVTopologyTest {
+
+    static final long VERSION_NONEXISTENT = DataStoreClient.getClient().VERSION_NONEXISTENT();
+
+    private static final byte[] DEVICE2_MAC_SW2P2 = new byte[] { 6, 5, 4, 3, 2, 1, 0 };
+
+    private static final Long SW2_PORTNO2 = 2L;
+
+    private static final Long SW2_PORTNO1 = 1L;
+
+    private static final Long DPID2 = 0x2L;
+
+    private static final byte[] DEVICE1_MAC_SW1P1 = new byte[] { 0, 1, 2, 3, 4, 5, 6 };
+
+    private static final Long SW1_PORTNO2 = 2L;
+
+    private static final Long SW1_PORTNO1 = 1L;
+
+    private static final Long DPID1 = 0x1L;
+
+    @Before
+    @After
+    public void wipeTopology() throws Exception {
+	IKVTable switchTable = DataStoreClient.getClient().getTable(KVSwitch.GLOBAL_SWITCH_TABLE_NAME);
+	DataStoreClient.getClient().dropTable(switchTable);
+
+	IKVTable portTable = DataStoreClient.getClient().getTable(KVPort.GLOBAL_PORT_TABLE_NAME);
+	DataStoreClient.getClient().dropTable(portTable);
+
+	IKVTable linkTable = DataStoreClient.getClient().getTable(KVLink.GLOBAL_LINK_TABLE_NAME);
+	DataStoreClient.getClient().dropTable(linkTable);
+
+	IKVTable deviceTable = DataStoreClient.getClient().getTable(KVDevice.GLOBAL_DEVICE_TABLE_NAME);
+	DataStoreClient.getClient().dropTable(deviceTable);
+    }
+
+    @Test
+    public void basic_switch_test() {
+	// create switch 0x1
+	try {
+	    KVSwitch sw = new KVSwitch(DPID1);
+	    sw.setStatus(KVSwitch.STATUS.ACTIVE);
+	    sw.create();
+	    assertNotEquals(VERSION_NONEXISTENT, sw.getVersion());
+	    assertEquals(DPID1, sw.getDpid());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, sw.getStatus());
+	} catch (ObjectExistsException e) {
+	    e.printStackTrace();
+	    fail("Create Switch Failed " + e);
+	}
+
+	// read switch 0x1
+	KVSwitch swRead = new KVSwitch(DPID1);
+	try {
+	    swRead.read();
+	    assertNotEquals(VERSION_NONEXISTENT, swRead.getVersion());
+	    assertEquals(DPID1, swRead.getDpid());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, swRead.getStatus());
+	} catch (ObjectDoesntExistException e) {
+	    e.printStackTrace();
+	    fail("Reading Switch Failed " + e);
+	}
+
+	// and update 0x1
+	swRead.setStatus(KVSwitch.STATUS.INACTIVE);
+	try {
+	    swRead.update();
+	    assertNotEquals(VERSION_NONEXISTENT, swRead.getVersion());
+	    assertEquals(DPID1, swRead.getDpid());
+	    assertEquals(KVSwitch.STATUS.INACTIVE, swRead.getStatus());
+	} catch (ObjectDoesntExistException | WrongVersionException e) {
+	    e.printStackTrace();
+	    fail("Updating Switch Failed " + e);
+	}
+
+	// read 0x1 again and delete
+	KVSwitch swRead2 = new KVSwitch(DPID1);
+	try {
+	    swRead2.read();
+	    assertNotEquals(VERSION_NONEXISTENT, swRead2.getVersion());
+	    assertEquals(DPID1, swRead2.getDpid());
+	    assertEquals(KVSwitch.STATUS.INACTIVE, swRead2.getStatus());
+	} catch (ObjectDoesntExistException e) {
+	    e.printStackTrace();
+	    fail("Reading Switch Again Failed " + e);
+	}
+
+	try {
+	    swRead2.delete();
+	    assertNotEquals(VERSION_NONEXISTENT, swRead2.getVersion());
+	} catch (ObjectDoesntExistException | WrongVersionException e) {
+	    e.printStackTrace();
+	    fail("Deleting Switch Failed " + e);
+	}
+
+	// make sure 0x1 is deleted
+	KVObject swRead3 = new KVSwitch(DPID1);
+	try {
+	    swRead3.read();
+	    fail(swRead3 + " was supposed to be deleted, but read succeed");
+	} catch (ObjectDoesntExistException e) {
+	    System.out.println("-- " + swRead3 + " not found as expected--");
+	    e.printStackTrace(System.out);
+	    System.out.println("---------------------------------------");
+	}
+    }
+
+    @Test
+    public void topology_setup_and_tear_down() {
+	topology_setup();
+	topology_walk();
+	topology_delete();
+    }
+
+    private static void topology_setup() {
+
+	// d1 - s1p1 - s1 - s1p2 - s2p1 - s2 - s2p2
+
+	KVSwitch sw1 = new KVSwitch(DPID1);
+	sw1.setStatus(KVSwitch.STATUS.ACTIVE);
+	try {
+	    sw1.create();
+	    assertNotEquals(VERSION_NONEXISTENT, sw1.getVersion());
+	    assertEquals(DPID1, sw1.getDpid());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, sw1.getStatus());
+	} catch (ObjectExistsException e) {
+	    e.printStackTrace();
+	    fail("Switch creation failed " + e);
+	}
+
+	KVPort sw1p1 = new KVPort(DPID1, SW1_PORTNO1);
+	sw1p1.setStatus(KVPort.STATUS.ACTIVE);
+	KVPort sw1p2 = new KVPort(DPID1, SW1_PORTNO2);
+	sw1p2.setStatus(KVPort.STATUS.ACTIVE);
+	try {
+	    sw1p1.create();
+	    assertNotEquals(VERSION_NONEXISTENT, sw1p1.getVersion());
+	    assertEquals(DPID1, sw1p1.getDpid());
+	    assertEquals(SW1_PORTNO1, sw1p1.getNumber());
+	    assertEquals(KVPort.STATUS.ACTIVE, sw1p1.getStatus());
+
+	    sw1p2.create();
+	    assertNotEquals(VERSION_NONEXISTENT, sw1p2.getVersion());
+	    assertEquals(DPID1, sw1p2.getDpid());
+	    assertEquals(SW1_PORTNO2, sw1p2.getNumber());
+	    assertEquals(KVPort.STATUS.ACTIVE, sw1p2.getStatus());
+	} catch (ObjectExistsException e) {
+	    e.printStackTrace();
+	    fail("Port creation failed " + e);
+	}
+
+	try {
+	    sw1.update();
+	    assertNotEquals(VERSION_NONEXISTENT, sw1.getVersion());
+	    assertEquals(DPID1, sw1.getDpid());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, sw1.getStatus());
+	} catch (ObjectDoesntExistException | WrongVersionException e) {
+	    e.printStackTrace();
+	    fail("Switch update failed " + e);
+	}
+
+	KVDevice d1 = new KVDevice(DEVICE1_MAC_SW1P1);
+	d1.addPortId(sw1p1.getId());
+
+	try {
+	    d1.create();
+	    assertNotEquals(VERSION_NONEXISTENT, d1.getVersion());
+	    assertEquals(1, d1.getAllPortIds().size());
+	    assertArrayEquals(sw1p1.getId(), d1.getAllPortIds().iterator().next());
+
+	    try {
+		sw1p1.update();
+		assertNotEquals(VERSION_NONEXISTENT, sw1p1.getVersion());
+		assertEquals(DPID1, sw1p1.getDpid());
+		assertEquals(SW1_PORTNO1, sw1p1.getNumber());
+		assertEquals(KVPort.STATUS.ACTIVE, sw1p1.getStatus());
+	    } catch (ObjectDoesntExistException | WrongVersionException e) {
+		e.printStackTrace();
+		fail("Link update failed " + e);
+	    }
+	} catch (ObjectExistsException e) {
+	    e.printStackTrace();
+	    fail("Device creation failed " + e);
+	}
+
+	KVSwitch sw2 = new KVSwitch(DPID2);
+	sw2.setStatus(KVSwitch.STATUS.ACTIVE);
+	KVPort sw2p1 = new KVPort(DPID2, SW2_PORTNO1);
+	sw2p1.setStatus(KVPort.STATUS.ACTIVE);
+	KVPort sw2p2 = new KVPort(DPID2, SW2_PORTNO2);
+	sw2p2.setStatus(KVPort.STATUS.ACTIVE);
+
+	KVDevice d2 = new KVDevice(DEVICE2_MAC_SW2P2);
+	d2.addPortId(sw2p2.getId());
+
+	IKVClient client = DataStoreClient.getClient();
+
+	List<WriteOp> groupOp = Arrays.asList(
+		sw2.createOp(client), sw2p1.createOp(client),
+		sw2p2.createOp(client), d2.createOp(client) );
+	boolean failed = KVObject.multiWrite(groupOp);
+	if (failed) {
+	    for ( WriteOp op : groupOp ) {
+		System.err.println(op);
+	    }
+	    fail("Some of Switch/Port/Device creation failed");
+	} else {
+	    assertNotEquals(VERSION_NONEXISTENT, sw2.getVersion());
+	    assertEquals(DPID2, sw2.getDpid());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, sw2.getStatus());
+
+	    assertNotEquals(VERSION_NONEXISTENT, sw2p1.getVersion());
+	    assertEquals(DPID2, sw2p1.getDpid());
+	    assertEquals(SW2_PORTNO1, sw2p1.getNumber());
+	    assertEquals(KVPort.STATUS.ACTIVE, sw2p1.getStatus());
+
+	    assertNotEquals(VERSION_NONEXISTENT, sw2p2.getVersion());
+	    assertEquals(DPID2, sw2p2.getDpid());
+	    assertEquals(SW2_PORTNO2, sw2p2.getNumber());
+	    assertEquals(KVPort.STATUS.ACTIVE, sw2p2.getStatus());
+
+	    assertNotEquals(VERSION_NONEXISTENT, d2.getVersion());
+	    assertEquals(1, d2.getAllPortIds().size());
+	    assertArrayEquals(sw2p2.getId(), d2.getAllPortIds().iterator().next());
+	}
+
+	KVLink l1 = new KVLink(DPID1, SW1_PORTNO2, DPID2, SW2_PORTNO1);
+	l1.setStatus(KVLink.STATUS.ACTIVE);
+
+	try {
+	    l1.create();
+	    assertNotEquals(VERSION_NONEXISTENT, l1.getVersion());
+	    assertEquals(KVLink.STATUS.ACTIVE, l1.getStatus());
+	    assertArrayEquals(sw1.getId(), l1.getSrc().getSwitchID());
+	    assertArrayEquals(sw1p2.getId(), l1.getSrc().getPortID());
+	    assertArrayEquals(sw2.getId(), l1.getDst().getSwitchID());
+	    assertArrayEquals(sw2p1.getId(), l1.getDst().getPortID());
+
+	    try {
+		sw1p2.update();
+		assertNotEquals(VERSION_NONEXISTENT, sw1p2.getVersion());
+		assertEquals(DPID1, sw1p2.getDpid());
+		assertEquals(SW1_PORTNO2, sw1p2.getNumber());
+		assertEquals(KVPort.STATUS.ACTIVE, sw1p2.getStatus());
+
+		sw2p1.update();
+		assertNotEquals(VERSION_NONEXISTENT, sw2p1.getVersion());
+		assertEquals(DPID2, sw2p1.getDpid());
+		assertEquals(SW2_PORTNO1, sw2p1.getNumber());
+		assertEquals(KVPort.STATUS.ACTIVE, sw2p1.getStatus());
+	    } catch (ObjectDoesntExistException | WrongVersionException e) {
+		e.printStackTrace();
+		fail("Port update failed " + e);
+	    }
+	} catch (ObjectExistsException e) {
+	    e.printStackTrace();
+	    fail("Link creation failed " + e);
+	}
+    }
+
+
+    private static void topology_walk() {
+	Iterable<KVSwitch> swIt = KVSwitch.getAllSwitches();
+	List<Long> switchesExpected = new ArrayList<>(Arrays.asList(DPID1, DPID2));
+
+	System.out.println("Enumerating Switches start");
+	for (KVSwitch sw : swIt) {
+	    System.out.println(sw + " @ " + sw.getVersion());
+	    assertNotEquals(VERSION_NONEXISTENT, sw.getVersion());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, sw.getStatus());
+	    assertThat(sw.getDpid(), is(anyOf(equalTo(DPID1), equalTo(DPID2))));
+	    assertThat(switchesExpected, hasItem(sw.getDpid()));
+	    switchesExpected.remove(sw.getDpid());
+	}
+	System.out.println("Enumerating Switches end");
+
+	KVSwitch sw1 = new KVSwitch(DPID1);
+	try {
+	    sw1.read();
+	    assertNotEquals(VERSION_NONEXISTENT, sw1.getVersion());
+	    assertEquals(DPID1, sw1.getDpid());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, sw1.getStatus());
+	} catch (ObjectDoesntExistException e) {
+	    e.printStackTrace();
+	    fail("Reading switch failed " + e);
+	}
+
+	KVSwitch sw2 = new KVSwitch(DPID2);
+	if (KVObject.multiRead( Arrays.asList(sw2) )) {
+	    fail("Failed to read switch " + sw2);
+	} else {
+	    assertNotEquals(VERSION_NONEXISTENT, sw2.getVersion());
+	    assertEquals(DPID2, sw2.getDpid());
+	    assertEquals(KVSwitch.STATUS.ACTIVE, sw2.getStatus());
+	}
+
+
+	// DPID -> [port_no]
+	@SuppressWarnings("serial")
+	Map<Long,List<Long>> expectedPorts = new HashMap<Long,List<Long>>() {{
+	    put(DPID1, new ArrayList<>(Arrays.asList(SW1_PORTNO1, SW1_PORTNO2)));
+	    put(DPID2, new ArrayList<>(Arrays.asList(SW2_PORTNO1, SW2_PORTNO2)));
+	}};
+
+	for (KVPort port : KVPort.getAllPorts()) {
+	    System.out.println(port + " @ " + port.getVersion());
+	    assertNotEquals(VERSION_NONEXISTENT, port.getVersion());
+	    assertEquals(KVPort.STATUS.ACTIVE, port.getStatus());
+	    assertThat(port.getDpid(), is(anyOf(equalTo(DPID1), equalTo(DPID2))));
+	    assertThat(port.getNumber(), is(anyOf(equalTo(SW1_PORTNO1), equalTo(SW1_PORTNO2))));
+
+	    assertThat(expectedPorts, hasKey(port.getDpid()));
+	    assertThat(expectedPorts.get(port.getDpid()), hasItem(port.getNumber()));
+	    expectedPorts.get(port.getDpid()).remove(port.getNumber());
+	}
+
+	// DeviceID -> PortID
+	@SuppressWarnings("serial")
+	Map<byte[], byte[]> expectedDevice = new TreeMap<byte[], byte[]>(ByteArrayComparator.BYTEARRAY_COMPARATOR) {{
+	    put(DEVICE1_MAC_SW1P1, KVPort.getPortID(DPID1, SW1_PORTNO1));
+	    put(DEVICE2_MAC_SW2P2, KVPort.getPortID(DPID2, SW2_PORTNO2));
+	}};
+
+	for (KVDevice device : KVDevice.getAllDevices()) {
+	    System.out.println(device + " @ " + device.getVersion());
+	    assertNotEquals(VERSION_NONEXISTENT, device.getVersion());
+
+	    assertThat(expectedDevice, hasKey(device.getMac()));
+	    assertThat(device.getAllPortIds(), hasItem(expectedDevice.get(device.getMac())));
+	    expectedDevice.remove(device.getMac());
+	}
+
+	for (KVLink link : KVLink.getAllLinks()) {
+	    System.out.println(link + " @ " + link.getVersion());
+	    assertNotEquals(VERSION_NONEXISTENT, link.getVersion());
+
+	    // there is currently only 1 link SW1P2->SW2P1
+	    assertEquals(DPID1, link.getSrc().dpid);
+	    assertEquals(SW1_PORTNO2, link.getSrc().number);
+	    assertEquals(DPID2, link.getDst().dpid);
+	    assertEquals(SW2_PORTNO1, link.getDst().number);
+	}
+
+    }
+
+
+    private static void topology_delete() {
+
+	for (KVSwitch sw : KVSwitch.getAllSwitches()) {
+	    try {
+		sw.read();
+		sw.delete();
+		assertNotEquals(VERSION_NONEXISTENT, sw.getVersion());
+	    } catch (ObjectDoesntExistException | WrongVersionException e) {
+		e.printStackTrace();
+		fail("Delete Switch Failed " + e);
+	    }
+	}
+
+	for (KVPort p : KVPort.getAllPorts()) {
+	    try {
+		p.read();
+		p.delete();
+		assertNotEquals(VERSION_NONEXISTENT, p.getVersion());
+	    } catch (ObjectDoesntExistException | WrongVersionException e) {
+		e.printStackTrace();
+		fail("Delete Port Failed " + e);
+	    }
+	}
+
+	for (KVDevice d : KVDevice.getAllDevices()) {
+	    d.forceDelete();
+	    assertNotEquals(VERSION_NONEXISTENT, d.getVersion());
+	}
+
+	for (KVLink l : KVLink.getAllLinks()) {
+	    try {
+		l.read();
+		l.delete();
+		assertNotEquals(VERSION_NONEXISTENT, l.getVersion());
+	    } catch (ObjectDoesntExistException | WrongVersionException e) {
+		e.printStackTrace();
+		fail("Delete Link Failed " + e);
+	    }
+	}
+    }
+
+    public static void main(final String[] argv) {
+
+	topology_setup();
+	topology_walk();
+	topology_delete();
+
+	System.exit(0);
+    }
+
+}