Add option to use Hazelcast as datastore for development environment.
This patch will set the default backend as Hazelcast.
To use RAMCloud as backend data store, add -Dnet.onrc.onos.datastore.backend=ramcloud to java option.
- ClientMode: Use existing Hazelcast Instance if it exist.
- map name starting with datastore:// is now configured to be strong consistent
- add main for manual testing
- follow PMD,etc. where easily possible
- make HZClient Singleton
Change-Id: Ibe2afc3bfddfd7fd567c91477c16cd679fc543d4
diff --git a/conf/hazelcast.xml b/conf/hazelcast.xml
index 84c7354..cf44fc3 100644
--- a/conf/hazelcast.xml
+++ b/conf/hazelcast.xml
@@ -3,6 +3,16 @@
xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <!-- Configuration of maps used as a data store. -->
+ <map name="datastore://*">
+ <!-- must use 'sync' backup to imitate other data store -->
+ <backup-count>3</backup-count>
+ <async-backup-count>0</async-backup-count>
+ <!-- must be false for strong consistency -->
+ <read-backup-data>false</read-backup-data>
+ <!-- near cache must not be used -->
+ </map>
+
<map name="*">
<!--
diff --git a/pom.xml b/pom.xml
index bfff1bc..2f4b1b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,6 +36,7 @@
see https://github.com/OPENNETWORKINGLAB/ONOS/pull/425
<github.global.server>github</github.global.server>
-->
+ <hazelcast.version>3.0.2</hazelcast.version>
</properties>
<build>
<plugins>
@@ -519,7 +520,12 @@
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
- <version>3.0.2</version>
+ <version>${hazelcast.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast-client</artifactId>
+ <version>${hazelcast.version}</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
diff --git a/src/main/java/net/onrc/onos/datastore/DataStoreClient.java b/src/main/java/net/onrc/onos/datastore/DataStoreClient.java
index 88daa8e..6aeb49e 100644
--- a/src/main/java/net/onrc/onos/datastore/DataStoreClient.java
+++ b/src/main/java/net/onrc/onos/datastore/DataStoreClient.java
@@ -1,10 +1,26 @@
package net.onrc.onos.datastore;
+import net.onrc.onos.datastore.hazelcast.HZClient;
import net.onrc.onos.datastore.ramcloud.RCClient;
+// This class probably need to be a service
public class DataStoreClient {
+ private static final String BACKEND = System.getProperty("net.onrc.onos.datastore.backend", "hazelcast");
+
+ // Suppresses default constructor, ensuring non-instantiability.
+ private DataStoreClient() {}
+
public static IKVClient getClient() {
// TODO read config and return appropriate IKVClient
- return RCClient.getClient();
+ switch (BACKEND) {
+ case "ramcloud":
+ return RCClient.getClient();
+ case "hazelcast":
+ return HZClient.getClient();
+ default:
+ return HZClient.getClient();
+ }
}
+
+
}
diff --git a/src/main/java/net/onrc/onos/datastore/hazelcast/HZClient.java b/src/main/java/net/onrc/onos/datastore/hazelcast/HZClient.java
new file mode 100644
index 0000000..2c2b3a4
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/hazelcast/HZClient.java
@@ -0,0 +1,345 @@
+package net.onrc.onos.datastore.hazelcast;
+
+import java.io.FileNotFoundException;
+import java.util.Collection;
+import java.util.List;
+
+import net.onrc.onos.datastore.IKVClient;
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.datastore.IKVTableID;
+import net.onrc.onos.datastore.IMultiEntryOperation;
+import net.onrc.onos.datastore.IMultiEntryOperation.OPERATION;
+import net.onrc.onos.datastore.IMultiEntryOperation.STATUS;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.hazelcast.HZTable.VersionedValue;
+import net.onrc.onos.datastore.internal.IModifiableMultiEntryOperation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.client.HazelcastClient;
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.config.SerializationConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+
+public class HZClient implements IKVClient {
+ private static final Logger log = LoggerFactory.getLogger(HZClient.class);
+
+ static final long VERSION_NONEXISTENT = 0L;
+
+ private static final String MAP_PREFIX = "datastore://";
+
+ // make this path configurable
+ private static final String BASE_CONFIG_FILENAME = System.getProperty("net.onrc.onos.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
+ private static boolean useClientMode = Boolean.parseBoolean(System.getProperty("net.onrc.onos.datastore.hazelcast.clientMode", "true"));
+
+ // Note: xml configuration will overwrite this value if present
+ private static int backupCount = Integer.valueOf(System.getProperty("net.onrc.onos.datastore.hazelcast.backupCount", "3"));
+
+ private final HazelcastInstance hazelcastInstance;
+
+ private static final HZClient THE_INSTANCE = new HZClient();
+
+ public static HZClient getClient() {
+ return THE_INSTANCE;
+ }
+
+ private HZClient() {
+ hazelcastInstance = getHZinstance(BASE_CONFIG_FILENAME);
+ }
+
+ private static HazelcastInstance getHZinstance(final String hazelcastConfigFileName) {
+ Config baseHzConfig = null;
+ try {
+ baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
+ } catch (FileNotFoundException e) {
+ log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
+ throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName , e);
+ }
+
+ // use xml config if present, if not use System.property
+ MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
+ if (mapConfig != null) {
+ backupCount = mapConfig.getBackupCount();
+ }
+
+ HazelcastInstance instance = null;
+ if (useClientMode) {
+ log.info("Configuring Hazelcast datastore as Client mode");
+ ClientConfig clientConfig = new ClientConfig();
+ final int port = baseHzConfig.getNetworkConfig().getPort();
+
+ String server = System.getProperty("net.onrc.onos.datastore.hazelcast.client.server", "localhost");
+ clientConfig.addAddress(server + ":" + port);
+
+ // copy group config from base Hazelcast configuration
+ clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
+ clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
+
+ // TODO We probably need to figure out what else need to be
+ // derived from baseConfig
+
+ registerSerializer(clientConfig.getSerializationConfig());
+
+ log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
+
+ try {
+ instance = HazelcastClient.newHazelcastClient(clientConfig);
+ if (!instance.getCluster().getMembers().isEmpty()) {
+ log.debug("Members in cluster: " + instance.getCluster().getMembers());
+ return instance;
+ }
+ log.info("Failed to find cluster member, falling back to Instance mode");
+ } catch (IllegalStateException e) {
+ log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
+ }
+ useClientMode = false;
+ instance = null;
+ }
+ log.info("Configuring Hazelcast datastore as Instance mode");
+
+ // To run 2 Hazelcast instance in 1 JVM,
+ // we probably need to something like below
+ //int port = hazelcastConfig.getNetworkConfig().getPort();
+ //hazelcastConfig.getNetworkConfig().setPort(port+1);
+
+ registerSerializer(baseHzConfig.getSerializationConfig());
+
+ return Hazelcast.newHazelcastInstance(baseHzConfig);
+ }
+
+ /**
+ * Register serializer for VersionedValue class used to imitate value version.
+ * @param config
+ */
+ private static void registerSerializer(final SerializationConfig config) {
+ config.addDataSerializableFactoryClass(
+ VersionedValueSerializableFactory.FACTORY_ID,
+ VersionedValueSerializableFactory.class);
+ }
+
+ @Override
+ public IKVTable getTable(final String tableName) {
+ IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
+
+ if (!useClientMode) {
+ // config only available in Instance Mode
+ // Client Mode must rely on hazelcast.xml to be properly configured.
+ MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
+ // config for this map to be strong consistent
+ if (config.isReadBackupData()) {
+ config.setReadBackupData(false);
+ }
+ if (config.isNearCacheEnabled()) {
+ config.getNearCacheConfig().setMaxSize(0);
+ }
+
+ if (config.getBackupCount() != backupCount) {
+ config.setAsyncBackupCount(0);
+ config.setBackupCount(backupCount);
+ }
+ }
+
+ return new HZTable(tableName, map);
+ }
+
+ @Override
+ public void dropTable(final IKVTable table) {
+ ((HZTable) table).getBackendMap().clear();
+ }
+
+ @Override
+ public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
+ throws ObjectExistsException {
+ IKVTable table = (IKVTable) tableId;
+ return table.create(key, value);
+ }
+
+ @Override
+ public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
+ IKVTable table = (IKVTable) tableId;
+ return table.forceCreate(key, value);
+ }
+
+ @Override
+ public IKVEntry read(final IKVTableID tableId, final byte[] key)
+ throws ObjectDoesntExistException {
+ IKVTable table = (IKVTable) tableId;
+ return table.read(key);
+ }
+
+ @Override
+ public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
+ final long version) throws ObjectDoesntExistException,
+ WrongVersionException {
+ IKVTable table = (IKVTable) tableId;
+ return table.update(key, value, version);
+ }
+
+ @Override
+ public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
+ throws ObjectDoesntExistException {
+ IKVTable table = (IKVTable) tableId;
+ return table.update(key, value);
+ }
+
+ @Override
+ public long delete(final IKVTableID tableId, final byte[] key, final long version)
+ throws ObjectDoesntExistException, WrongVersionException {
+ IKVTable table = (IKVTable) tableId;
+ return table.delete(key, version);
+ }
+
+ @Override
+ public long forceDelete(final IKVTableID tableId, final byte[] key) {
+ IKVTable table = (IKVTable) tableId;
+ return table.forceDelete(key);
+ }
+
+ @Override
+ public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
+ IKVTable table = (IKVTable) tableId;
+ return table.getAllEntries();
+ }
+
+ @Override
+ public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
+ final byte[] value) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
+ }
+
+ @Override
+ public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
+ final byte[] value) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
+ }
+
+ @Override
+ public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
+ }
+
+ @Override
+ public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
+ final byte[] value, final long version) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
+ }
+
+ @Override
+ public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
+ final byte[] value, final long version) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
+ }
+
+ @Override
+ public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
+ }
+
+ @Override
+ public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
+ boolean failExists = false;
+ for (IMultiEntryOperation op : ops) {
+ HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
+ switch (mop.getOperation()) {
+ case DELETE:
+ try {
+ final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ log.error(mop + " failed.", e);
+ mop.setStatus(STATUS.FAILED);
+ failExists = true;
+ }
+ break;
+ case FORCE_DELETE:
+ final long version = forceDelete(mop.getTableId(), mop.getKey());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ break;
+ default:
+ throw new UnsupportedOperationException(mop.toString());
+ }
+ }
+ return failExists;
+ }
+
+ @Override
+ public boolean multiWrite(final List<IMultiEntryOperation> ops) {
+ // there may be room to batch to improve performance
+ boolean failExists = false;
+ for (IMultiEntryOperation op : ops) {
+ IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+ switch (mop.getOperation()) {
+ case CREATE:
+ try {
+ long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ } catch (ObjectExistsException e) {
+ log.error(mop + " failed.", e);
+ mop.setStatus(STATUS.FAILED);
+ failExists = true;
+ }
+ break;
+ case FORCE_CREATE:
+ {
+ final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ break;
+ }
+ case UPDATE:
+ try {
+ long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ log.error(mop + " failed.", e);
+ mop.setStatus(STATUS.FAILED);
+ failExists = true;
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(mop.toString());
+ }
+ }
+ return failExists;
+ }
+
+ @Override
+ public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
+ boolean failExists = false;
+ for (IMultiEntryOperation op : ops) {
+ IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+ HZTable table = (HZTable) op.getTableId();
+ ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
+ }
+ for (IMultiEntryOperation op : ops) {
+ IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+ if (mop.hasSucceeded()) {
+ // status update is already done, nothing to do.
+ } else {
+ failExists = true;
+ }
+ }
+
+ return failExists;
+ }
+
+ @Override
+ public long VERSION_NONEXISTENT() {
+ return VERSION_NONEXISTENT;
+ }
+
+
+}
diff --git a/src/main/java/net/onrc/onos/datastore/hazelcast/HZMultiEntryOperation.java b/src/main/java/net/onrc/onos/datastore/hazelcast/HZMultiEntryOperation.java
new file mode 100644
index 0000000..1da7f1a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/hazelcast/HZMultiEntryOperation.java
@@ -0,0 +1,162 @@
+package net.onrc.onos.datastore.hazelcast;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.datastore.IKVTableID;
+import net.onrc.onos.datastore.IMultiEntryOperation;
+import net.onrc.onos.datastore.hazelcast.HZTable.VersionedValue;
+import net.onrc.onos.datastore.internal.IModifiableMultiEntryOperation;
+import net.onrc.onos.datastore.utils.ByteArrayUtil;
+
+public class HZMultiEntryOperation implements IMultiEntryOperation, IModifiableMultiEntryOperation {
+ private static final Logger log = LoggerFactory.getLogger(HZMultiEntryOperation.class);
+
+ private final HZTable table;
+ private final byte[] key;
+ protected final OPERATION operation;
+ private STATUS status;
+
+ // for read op
+ private Future<VersionedValue> future;
+ // for write op
+ private VersionedValue writeValue;
+
+ /**
+ * Constructor for Read/ForceDelete Operation.
+ * @param table
+ * @param key
+ * @param operation
+ */
+ public HZMultiEntryOperation(final HZTable table, final byte[] key, final OPERATION operation) {
+ this.table = table;
+ this.key = key;
+ this.status = STATUS.NOT_EXECUTED;
+ this.operation = operation;
+
+ this.future = null;
+ this.writeValue = null;
+ }
+
+ /**
+ * Constructor for Other Operations.
+ * @param table
+ * @param key
+ * @param value
+ * @param version
+ * @param operation
+ */
+ public HZMultiEntryOperation(final HZTable table, final byte[] key, final byte[] value, final long version, final OPERATION operation) {
+ this.table = table;
+ this.key = key;
+ this.status = STATUS.NOT_EXECUTED;
+ this.operation = operation;
+
+ this.future = null;
+ this.writeValue = new VersionedValue(value, version);
+ }
+
+ @Override
+ public boolean hasSucceeded() {
+
+ VersionedValue value = get();
+ return (value != null) && (this.status == STATUS.SUCCESS);
+ }
+
+ @Override
+ public STATUS getStatus() {
+ get();
+ return status;
+ }
+
+ @Override
+ public IKVTableID getTableId() {
+ return table;
+ }
+
+ @Override
+ public byte[] getKey() {
+ return key;
+ }
+
+ @Override
+ public byte[] getValue() {
+ if (future != null) {
+ VersionedValue value = get();
+ return value.getValue();
+ }
+ return writeValue.getValue();
+ }
+
+ @Override
+ public long getVersion() {
+ if (future != null) {
+ VersionedValue value = get();
+ return value.getVersion();
+ }
+ return writeValue.getVersion();
+ }
+
+ @Override
+ public OPERATION getOperation() {
+ return operation;
+ }
+
+ /**
+ * Evaluate Future object and set Status and Value+Version.
+ * @return the value read or null on failure.
+ */
+ private VersionedValue get() {
+ try {
+ VersionedValue value = future.get();
+ setValue(value.getValue(), value.getVersion());
+ setStatus(STATUS.SUCCESS);
+ return value;
+ } catch (CancellationException | InterruptedException | ExecutionException e) {
+ log.error(this + " has failed.", e);
+ setStatus(STATUS.FAILED);
+ return null;
+ }
+ }
+
+ @Override
+ public void setValue(final byte[] value, final long version) {
+ writeValue = new VersionedValue(value, version);
+ setVersion(version);
+ }
+
+ @Override
+ public void setVersion(final long version) {
+ if (future != null) {
+ // no-op on read
+ }
+ if (writeValue == null) {
+ writeValue = new VersionedValue(null, version);
+ }
+ }
+
+ @Override
+ public void setStatus(final STATUS status) {
+ this.status = status;
+ }
+
+ @Override
+ public IModifiableMultiEntryOperation getActualOperation() {
+ return this;
+ }
+
+ void setFuture(final Future<VersionedValue> future) {
+ this.future = future;
+ }
+
+ @Override
+ public String toString() {
+ return "[HZMultiEntryOperation table=" + table + ", key="
+ + ByteArrayUtil.toHexStringBuffer(key, ":") + ", operation=" + operation
+ + ", status=" + status + ", writeValue=" + writeValue + "]";
+ }
+}
diff --git a/src/main/java/net/onrc/onos/datastore/hazelcast/HZTable.java b/src/main/java/net/onrc/onos/datastore/hazelcast/HZTable.java
new file mode 100644
index 0000000..ec98349
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/hazelcast/HZTable.java
@@ -0,0 +1,325 @@
+package net.onrc.onos.datastore.hazelcast;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.IKVTableID;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.core.IMap;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class HZTable implements IKVTable, IKVTableID {
+ @SuppressWarnings("unused")
+ private static final Logger log = LoggerFactory.getLogger(HZTable.class);
+
+ // not sure how strict this should be managed
+ private static final AtomicLong initialVersion = new AtomicLong(HZClient.VERSION_NONEXISTENT);
+
+ /**
+ * generate a new initial version for an entry.
+ * @return initial value
+ */
+ protected static long getInitialVersion() {
+ long version = initialVersion.incrementAndGet();
+ if (version == HZClient.VERSION_NONEXISTENT) {
+ // used up whole 64bit space?
+ version = initialVersion.incrementAndGet();
+ }
+ return version;
+ }
+
+ /**
+ * increment version, avoiding VERSION_NONEXISTENT.
+ * @param version
+ * @return
+ */
+ protected static long getNextVersion(final long version) {
+ long nextVersion = version + 1;
+ if (nextVersion == HZClient.VERSION_NONEXISTENT) {
+ ++nextVersion;
+ }
+ return nextVersion;
+ }
+
+ static class VersionedValue implements IdentifiedDataSerializable {
+ private static final long serialVersionUID = -3149375966890712708L;
+
+ private byte[] value;
+ private long version;
+
+ protected VersionedValue() {
+ value = new byte[0];
+ version = HZClient.VERSION_NONEXISTENT;
+ }
+
+ public VersionedValue(final byte[] value, final long version) {
+ this.value = value;
+ this.version = version;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+
+ public void setValue(final byte[] value) {
+ this.value = value;
+ }
+
+ public void setNextVersion() {
+ this.version = getNextVersion(this.version);
+ }
+
+ @Override
+ public void writeData(final ObjectDataOutput out) throws IOException {
+ out.writeLong(version);
+ out.writeInt(value.length);
+ if (value.length > 0) {
+ out.write(value);
+ }
+ }
+
+ @Override
+ public void readData(final ObjectDataInput in) throws IOException {
+ version = in.readLong();
+ final int valueLen = in.readInt();
+ value = new byte[valueLen];
+ in.readFully(value);
+ }
+
+ @Override
+ public int getFactoryId() {
+ return VersionedValueSerializableFactory.FACTORY_ID;
+ }
+
+ @Override
+ public int getId() {
+ return VersionedValueSerializableFactory.VERSIONED_VALUE_ID;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (version ^ (version >>> 32));
+ result = prime * result + Arrays.hashCode(value);
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ VersionedValue other = (VersionedValue) obj;
+ if (version != other.version) {
+ return false;
+ }
+ if (!Arrays.equals(value, other.value)) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ // TODO Refactor and extract common parts
+ public static class Entry implements IKVEntry {
+ final byte[] key;
+ byte[] value;
+ long version;
+
+ public Entry(final byte[] key, final byte[] value, final long version) {
+ this.key = key;
+ this.setValue(value);
+ this.setVersion(version);
+ }
+
+ public Entry(final byte[] key) {
+ this(key, null, HZClient.VERSION_NONEXISTENT);
+ }
+
+ @Override
+ public byte[] getKey() {
+ return key;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return value;
+ }
+
+ @Override
+ public long getVersion() {
+ return version;
+ }
+
+ void setValue(final byte[] value) {
+ this.value = value;
+ }
+
+ void setVersion(final long version) {
+ this.version = version;
+ }
+ }
+
+
+
+ private final String mapName;
+ private final IMap<byte[], VersionedValue> map;
+
+ public HZTable(final String mapName, final IMap<byte[], VersionedValue> map) {
+ this.mapName = mapName;
+ this.map = map;
+ }
+
+ @Override
+ public String getTableName() {
+ return mapName;
+ }
+
+ @Override
+ public IKVTableID getTableId() {
+ return this;
+ }
+
+ @Override
+ public long create(final byte[] key, final byte[] value) throws ObjectExistsException {
+ final long version = getInitialVersion();
+ VersionedValue existing = map.putIfAbsent(key, new VersionedValue(value, version));
+ if (existing != null) {
+ throw new ObjectExistsException(this, key);
+ }
+ return version;
+ }
+
+ @Override
+ public long forceCreate(final byte[] key, final byte[] value) {
+ final long version = getInitialVersion();
+ map.set(key, new VersionedValue(value, version));
+ return version;
+ }
+
+ @Override
+ public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
+ final VersionedValue value = map.get(key);
+ if (value == null) {
+ throw new ObjectDoesntExistException(this, key);
+ }
+ return new Entry(key, value.getValue(), value.getVersion());
+ }
+
+ @Override
+ public long update(final byte[] key, final byte[] value, final long version)
+ throws ObjectDoesntExistException, WrongVersionException {
+
+ try {
+ map.lock(key);
+ final VersionedValue oldValue = map.get(key);
+ if (oldValue == null) {
+ throw new ObjectDoesntExistException(this, key);
+ }
+ if (oldValue.getVersion() != version) {
+ throw new WrongVersionException(this, key, version, oldValue.getVersion());
+ }
+ final long nextVersion = getNextVersion(version);
+ map.set(key, new VersionedValue(value, nextVersion));
+ return nextVersion;
+ } finally {
+ map.unlock(key);
+ }
+ }
+
+ @Override
+ public long update(final byte[] key, final byte[] value)
+ throws ObjectDoesntExistException {
+
+ try {
+ map.lock(key);
+ final VersionedValue valueInMap = map.get(key);
+ if (valueInMap == null) {
+ throw new ObjectDoesntExistException(this, key);
+ }
+ valueInMap.setValue(value);
+ valueInMap.setNextVersion();
+ map.set(key, valueInMap);
+ return valueInMap.getVersion();
+ } finally {
+ map.unlock(key);
+ }
+ }
+
+ @Override
+ public long delete(final byte[] key, final long version)
+ throws ObjectDoesntExistException, WrongVersionException {
+
+ try {
+ map.lock(key);
+ final VersionedValue oldValue = map.get(key);
+ if (oldValue == null) {
+ throw new ObjectDoesntExistException(this, key);
+ }
+ if (oldValue.getVersion() != version) {
+ throw new WrongVersionException(this, key, version, oldValue.getVersion());
+ }
+ map.delete(key);
+ return oldValue.getVersion();
+ } finally {
+ map.unlock(key);
+ }
+ }
+
+ @Override
+ public long forceDelete(final byte[] key) {
+ final VersionedValue valueInMap = map.remove(key);
+ if (valueInMap == null) {
+ return HZClient.VERSION_NONEXISTENT;
+ }
+ return valueInMap.getVersion();
+ }
+
+ @Override
+ public Iterable<IKVEntry> getAllEntries() {
+ final Set<IMap.Entry<byte[], VersionedValue>> entries = map.entrySet();
+ List<IKVEntry> entryList = new ArrayList<IKVTable.IKVEntry>(entries.size());
+ for (IMap.Entry<byte[], VersionedValue> entry : entries) {
+ entryList.add(new Entry(entry.getKey(), entry.getValue().getValue(), entry.getValue().getVersion()));
+ }
+ return entryList;
+ }
+
+ @Override
+ public String toString() {
+ return "[HZTable " + mapName + "]";
+ }
+
+ IMap<byte[], VersionedValue> getBackendMap() {
+ return this.map;
+ }
+
+ @Override
+ public long VERSION_NONEXISTENT() {
+ return HZClient.VERSION_NONEXISTENT;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/datastore/hazelcast/VersionedValueSerializableFactory.java b/src/main/java/net/onrc/onos/datastore/hazelcast/VersionedValueSerializableFactory.java
new file mode 100644
index 0000000..a4e7fb9
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/hazelcast/VersionedValueSerializableFactory.java
@@ -0,0 +1,25 @@
+package net.onrc.onos.datastore.hazelcast;
+
+
+import com.hazelcast.nio.serialization.DataSerializableFactory;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class VersionedValueSerializableFactory implements
+ DataSerializableFactory {
+ // revisit these magic numbers
+ public static final int FACTORY_ID = 1;
+
+ public static final int VERSIONED_VALUE_ID = 1;
+
+ @Override
+ public IdentifiedDataSerializable create(final int typeId) {
+ switch (typeId) {
+ case VERSIONED_VALUE_ID:
+ return new HZTable.VersionedValue();
+
+ default:
+ return null;
+ }
+ }
+
+}
diff --git a/src/test/java/net/onrc/onos/datastore/hazelcast/HZTableTest.java b/src/test/java/net/onrc/onos/datastore/hazelcast/HZTableTest.java
new file mode 100644
index 0000000..a3bfab6
--- /dev/null
+++ b/src/test/java/net/onrc/onos/datastore/hazelcast/HZTableTest.java
@@ -0,0 +1,365 @@
+package net.onrc.onos.datastore.hazelcast;
+
+import static org.junit.Assert.*;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.TreeMap;
+
+import net.onrc.onos.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.hazelcast.HZTable.VersionedValue;
+import net.onrc.onos.datastore.utils.ByteArrayComparator;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class HZTableTest {
+ @Rule
+ public TestName name= new TestName();
+
+ static final String TEST_TABLE_NAME = "TableForUnitTest";
+ HZTable table;
+
+ @Before
+ public void setUp() throws Exception {
+ table = (HZTable) HZClient.getClient().getTable(TEST_TABLE_NAME);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ HZClient.getClient().dropTable(table);
+ }
+
+ public void assertEntryInTable(final byte[] key, final byte[] value, final long version) {
+ VersionedValue valueblob = table.getBackendMap().get(key);
+ assertNotNull(valueblob);
+ assertArrayEquals(value, valueblob.getValue());
+ assertEquals(version, valueblob.getVersion());
+ }
+
+ public void assertKeyNotInTable(final byte[] key) {
+ VersionedValue valueblob = table.getBackendMap().get(key);
+ assertNull(valueblob);
+ }
+
+ @Test
+ public void testGetInitialVersion() {
+ final long version1 = HZTable.getInitialVersion();
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,version1);
+
+ final long version2 = HZTable.getInitialVersion();
+ assertNotEquals(HZClient.VERSION_NONEXISTENT, version2);
+ assertNotEquals(version1, version2);
+ }
+
+ @Test
+ public void testGetNextVersion() {
+ final long nextVersion = HZTable.getNextVersion(1);
+ assertNotEquals(nextVersion, HZClient.VERSION_NONEXISTENT);
+ assertNotEquals(nextVersion, 1);
+
+ final long nextVersion1 = HZTable.getNextVersion(Long.MAX_VALUE);
+ assertNotEquals(nextVersion1, HZClient.VERSION_NONEXISTENT);
+ assertNotEquals(nextVersion1, Long.MAX_VALUE);
+
+ final long nextVersion11 = HZTable.getNextVersion(HZClient.VERSION_NONEXISTENT-1);
+ assertNotEquals(nextVersion11, HZClient.VERSION_NONEXISTENT);
+ assertNotEquals(nextVersion11, HZClient.VERSION_NONEXISTENT-1);
+ }
+
+ @Ignore // nothing to test for now
+ @Test
+ public void testHZTable() {
+ fail("Not yet implemented");
+ }
+
+ @Test
+ public void testGetTableName() {
+ assertEquals(TEST_TABLE_NAME, table.getTableName());
+ }
+
+ @Test
+ public void testVERSION_NONEXISTENT() {
+ assertEquals(HZClient.VERSION_NONEXISTENT, table.VERSION_NONEXISTENT());
+ }
+
+ @Test
+ public void testGetTableId() {
+ // for Hazelcast implementation IKVTableID is table itself
+ assertEquals(table, table.getTableId());
+ }
+
+ @Test
+ public void testCreate() throws ObjectExistsException {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+ final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+ final long version = table.create(key, value);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+
+ assertEntryInTable(key, value, version);
+ }
+
+ @Test(expected = ObjectExistsException.class)
+ public void testCreateConflict() throws ObjectExistsException {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+ final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+ final long version = table.create(key, value);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+
+ assertEntryInTable(key, value, version);
+
+ table.create(key, value);
+ fail("Should have thrown exception");
+ }
+
+ @Test
+ public void testForceCreate() {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+ final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+ final long version = table.forceCreate(key, value);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+ assertEntryInTable(key, value, version);
+
+
+ final long version1 = table.forceCreate(key, value);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,version1);
+ assertNotEquals(version, version1);
+ assertEntryInTable(key, value, version1);
+ }
+
+ @Test
+ public void testRead() throws ObjectDoesntExistException {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+ final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+ // put data to read
+ final long version = table.forceCreate(key, value);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+ assertEntryInTable(key, value, version);
+
+ // test body
+ final IKVEntry readValue = table.read(key);
+ assertArrayEquals(key, readValue.getKey());
+ assertArrayEquals(value, readValue.getValue());
+ assertEquals(version, readValue.getVersion());
+ }
+
+
+ @Test(expected = ObjectDoesntExistException.class)
+ public void testReadNotExist() throws ObjectDoesntExistException {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+
+ table.read(key);
+ fail("Should have thrown exception");
+ }
+
+ @Test
+ public void testUpdateByteArrayByteArrayLongSuccess() throws ObjectDoesntExistException, WrongVersionException {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+ final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+ final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+ // put data to update
+ final long oldVersion = table.forceCreate(key, oldValue);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+ assertEntryInTable(key, oldValue, oldVersion);
+
+ final long version = table.update(key, value, oldVersion);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+ assertEntryInTable(key, value, version);
+ }
+
+ @Test(expected = ObjectDoesntExistException.class)
+ public void testUpdateByteArrayByteArrayLongFailNoOldValue() throws ObjectDoesntExistException, WrongVersionException {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+ final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+ final long oldVersion = 0xDEAD;
+
+ final long version = table.update(key, value, oldVersion);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+ assertEntryInTable(key, value, version);
+ }
+
+ @Test(expected = WrongVersionException.class)
+ public void testUpdateByteArrayByteArrayLongFailWrongVersion() throws ObjectDoesntExistException, WrongVersionException {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+ final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+ final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+ // put data to update
+ final long oldVersion = table.forceCreate(key, oldValue);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+ assertEntryInTable(key, oldValue, oldVersion);
+ // some one updates (from different thread/process in reality)
+ table.forceCreate(key, oldValue);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+
+
+ table.update(key, value, oldVersion);
+ fail("Should have thrown exception");
+ }
+
+ @Test
+ public void testUpdateByteArrayByteArraySuccess() throws ObjectDoesntExistException {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+ final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+ final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+ // put data to update
+ final long oldVersion = table.forceCreate(key, oldValue);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+ assertEntryInTable(key, oldValue, oldVersion);
+
+ final long version = table.update(key, value);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+ assertEntryInTable(key, value, version);
+ }
+
+ @Test(expected = ObjectDoesntExistException.class)
+ public void testUpdateByteArrayByteArrayFailNoOldValue() throws ObjectDoesntExistException, WrongVersionException {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+ final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+ final long version = table.update(key, value);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+ assertEntryInTable(key, value, version);
+ fail("Should have thrown exception");
+ }
+
+ @Test
+ public void testUpdateByteArrayByteArraySuccessIgnoreVersion() throws ObjectDoesntExistException, WrongVersionException {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+ final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+ final byte[] value = "SomeValue".getBytes(StandardCharsets.UTF_8);
+
+ // put data to update
+ final long oldVersion = table.forceCreate(key, oldValue);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+ assertEntryInTable(key, oldValue, oldVersion);
+ // someone updates (from different thread/process in reality)
+ table.forceCreate(key, oldValue);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+
+
+ final long version = table.update(key, value);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,version);
+ assertEntryInTable(key, value, version);
+ }
+
+ @Test
+ public void testDelete() throws ObjectDoesntExistException, WrongVersionException {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+ final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+
+ // put data to delete
+ final long oldVersion = table.forceCreate(key, oldValue);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+ assertEntryInTable(key, oldValue, oldVersion);
+
+ long version = table.delete(key, oldVersion);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+ assertEquals(oldVersion, version);
+ assertKeyNotInTable(key);
+ }
+
+ @Test(expected = ObjectDoesntExistException.class)
+ public void testDeleteFailNoEntry() throws ObjectDoesntExistException, WrongVersionException {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+
+ final long oldVersion = 0xDEAD;
+
+ try {
+ table.delete(key, oldVersion);
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ assertKeyNotInTable(key);
+ throw e;
+ }
+ fail("Should have thrown exception");
+ }
+
+ @Test(expected = WrongVersionException.class)
+ public void testDeleteFailWrongVersion() throws ObjectDoesntExistException, WrongVersionException {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+ final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+
+ // put data to delete
+ final long oldVersion = table.forceCreate(key, oldValue);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+ assertEntryInTable(key, oldValue, oldVersion);
+ // someone updates (from different thread/process in reality)
+ final long latestVersion = table.forceCreate(key, oldValue);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,latestVersion);
+
+ try {
+ table.delete(key, oldVersion);
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ assertEntryInTable(key, oldValue, latestVersion);
+ throw e;
+ }
+ fail("Should have thrown exception");
+ }
+
+
+
+ @Test
+ public void testForceDelete() {
+ final byte[] key = name.getMethodName().getBytes(StandardCharsets.UTF_8);
+ final byte[] oldValue = "OldValue".getBytes(StandardCharsets.UTF_8);
+
+ // put data to delete
+ final long oldVersion = table.forceCreate(key, oldValue);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+ assertEntryInTable(key, oldValue, oldVersion);
+
+ long version = table.forceDelete(key);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT,oldVersion);
+ assertEquals(oldVersion, version);
+ assertKeyNotInTable(key);
+ }
+
+ @Test
+ public void testGetAllEntries() {
+ final int DATASETSIZE = 100;
+ final Map<byte[], VersionedValue> testdata = new TreeMap<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ for(int i = 0 ; i < DATASETSIZE ; ++i) {
+ final byte[] key = (name.getMethodName()+i).getBytes(StandardCharsets.UTF_8);
+ final byte[] value = ("Value"+i).getBytes(StandardCharsets.UTF_8);
+
+ // put data to delete
+ final long version = table.forceCreate(key, value);
+ assertNotEquals(HZClient.VERSION_NONEXISTENT, version);
+ assertEntryInTable(key, value, version);
+
+ testdata.put(key, new VersionedValue(value, version));
+ }
+
+ Iterable<IKVEntry> datastore = table.getAllEntries();
+ for( IKVEntry entry : datastore ) {
+ VersionedValue expectedValue = testdata.get(entry.getKey());
+ assertNotNull(expectedValue);
+ assertArrayEquals(expectedValue.getValue(), entry.getValue());
+ assertEquals(expectedValue.getVersion(), entry.getVersion());
+
+ testdata.remove(entry.getKey());
+ }
+
+ assertTrue(testdata.isEmpty());
+ }
+
+ @Test
+ public void testToString() {
+ assertEquals("[HZTable " + TEST_TABLE_NAME + "]", table.toString());
+ }
+
+}
diff --git a/src/test/java/net/onrc/onos/datastore/topology/KVSwitchNoDataStoreTest.java b/src/test/java/net/onrc/onos/datastore/topology/KVSwitchNoDataStoreTest.java
new file mode 100644
index 0000000..e656596
--- /dev/null
+++ b/src/test/java/net/onrc/onos/datastore/topology/KVSwitchNoDataStoreTest.java
@@ -0,0 +1,83 @@
+package net.onrc.onos.datastore.topology;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+
+import net.onrc.onos.datastore.topology.KVSwitch.STATUS;
+
+import org.junit.Test;
+
+public class KVSwitchNoDataStoreTest {
+
+ @Test
+ public void testGetDpidFromKeyByteArray() {
+ // reference bytes
+ final byte[] key = KVSwitch.getSwitchID(0x1L);
+
+ assertEquals(0x1L, KVSwitch.getDpidFromKey(key));
+ }
+
+ @Test
+ public void testGetDpidFromKeyByteBuffer() {
+ // reference bytes
+ final ByteBuffer key = ByteBuffer.wrap(KVSwitch.getSwitchID(0x1L));
+
+ assertEquals(0x1L, KVSwitch.getDpidFromKey(key));
+ }
+
+ @Test
+ public void testCreateFromKeyByteArray() {
+ // reference bytes
+ Long dpid = Long.valueOf(0x1L);
+ final byte[] key = KVSwitch.getSwitchID(dpid);
+
+ KVSwitch sw = KVSwitch.createFromKey(key);
+ assertNotNull(sw);
+ assertEquals(dpid, sw.getDpid());
+ }
+
+ @Test
+ public void testGetStatus() {
+ KVSwitch sw = new KVSwitch(0x1L);
+
+ assertEquals(STATUS.INACTIVE, sw.getStatus());
+ }
+
+ @Test
+ public void testSetStatus() {
+ KVSwitch sw = new KVSwitch(0x1L);
+ assertEquals(STATUS.INACTIVE, sw.getStatus());
+
+ sw.setStatus(STATUS.ACTIVE);
+ assertEquals(STATUS.ACTIVE, sw.getStatus());
+ }
+
+ @Test
+ public void testGetDpid() {
+ Long dpid = 0x1L;
+ KVSwitch sw = new KVSwitch(dpid);
+ assertEquals(dpid, sw.getDpid());
+ }
+
+ @Test
+ public void testGetId() {
+ // reference bytes
+ Long dpid = Long.valueOf(0x1L);
+ final byte[] key = KVSwitch.getSwitchID(dpid);
+
+ KVSwitch sw = KVSwitch.createFromKey(key);
+ assertArrayEquals(key, sw.getId());
+ }
+
+ @Test
+ public void testToString() {
+ final String expected = "[" + "KVSwitch"
+ + " 0x" + 1 + " STATUS:" + STATUS.INACTIVE + "]";
+
+ Long dpid = 0x1L;
+ KVSwitch sw = new KVSwitch(dpid);
+
+ assertEquals(expected, sw.toString());
+ }
+}
diff --git a/src/test/java/net/onrc/onos/datastore/topology/KVSwitchTest.java b/src/test/java/net/onrc/onos/datastore/topology/KVSwitchTest.java
new file mode 100644
index 0000000..ce73034
--- /dev/null
+++ b/src/test/java/net/onrc/onos/datastore/topology/KVSwitchTest.java
@@ -0,0 +1,203 @@
+package net.onrc.onos.datastore.topology;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import net.onrc.onos.datastore.DataStoreClient;
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.topology.KVSwitch.STATUS;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KVSwitchTest {
+ IKVTable switchTable;
+ static final Long dpid1 = 0x1L;
+ KVSwitch sw1;
+
+ @Before
+ public void setUp() throws Exception {
+ switchTable = DataStoreClient.getClient().getTable(KVSwitch.GLOBAL_SWITCH_TABLE_NAME);
+ sw1 = new KVSwitch(dpid1);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ DataStoreClient.getClient().dropTable(switchTable);
+ }
+
+ public KVSwitch assertSwitchInDataStore(final Long dpid, final STATUS status) {
+ try {
+ final KVSwitch sw = new KVSwitch(dpid);
+ sw.read();
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+ assertEquals(dpid, sw.getDpid());
+ assertEquals(status, sw.getStatus());
+ return sw;
+ } catch (ObjectDoesntExistException e) {
+ fail("Switch was not written to datastore");
+ }
+ return null;
+ }
+ public void assertSwitchNotInDataStore(final Long dpid) {
+ final KVSwitch sw = new KVSwitch(dpid);
+ try {
+ sw.read();
+ fail("Switch was not supposed to be there in datastore");
+ } catch (ObjectDoesntExistException e) {
+ }
+ }
+
+ @Test
+ public void testGetAllSwitches() throws ObjectExistsException {
+ final int NUM_SWITCHES = 100;
+ Map<Long,KVSwitch> expected = new HashMap<>();
+ for (long dpid = 1 ; dpid <= NUM_SWITCHES ; ++dpid) {
+ KVSwitch sw = new KVSwitch(dpid);
+ sw.setStatus(STATUS.ACTIVE);
+ sw.create();
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+ expected.put(sw.getDpid(), sw);
+ }
+
+ Iterable<KVSwitch> switches = KVSwitch.getAllSwitches();
+
+ for (KVSwitch sw : switches) {
+ KVSwitch expectedSw = expected.get(sw.getDpid());
+ assertNotNull(expectedSw);
+ assertEquals(expectedSw.getDpid(), sw.getDpid());
+ assertEquals(expectedSw.getStatus(), sw.getStatus());
+ assertEquals(expectedSw.getVersion(), sw.getVersion());
+
+ assertArrayEquals(expectedSw.getKey(), sw.getKey());
+ }
+ }
+
+ @Test
+ public void testCreate() throws ObjectExistsException {
+ sw1.setStatus(STATUS.ACTIVE);
+ sw1.create();
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+
+ assertEquals(dpid1, sw1.getDpid());
+ assertEquals(STATUS.ACTIVE, sw1.getStatus());
+
+ assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+ }
+
+ @Test(expected = ObjectExistsException.class)
+ public void testCreateFailAlreadyExist() throws ObjectExistsException {
+ // setup pre-existing Switch
+ KVSwitch sw = new KVSwitch(dpid1);
+ sw.forceCreate();
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+ assertSwitchInDataStore(dpid1, STATUS.INACTIVE);
+
+ sw1.setStatus(STATUS.ACTIVE);
+ sw1.create();
+ fail("Should have thrown an exception");
+ }
+
+ @Test
+ public void testForceCreate() {
+ // setup pre-existing Switch
+ KVSwitch sw = new KVSwitch(dpid1);
+ sw.forceCreate();
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+ assertSwitchInDataStore(dpid1, STATUS.INACTIVE);
+
+
+ sw1.setStatus(STATUS.ACTIVE);
+ sw1.forceCreate();
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+
+ assertEquals(dpid1, sw1.getDpid());
+ assertEquals(STATUS.ACTIVE, sw1.getStatus());
+ assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+ }
+
+ @Test
+ public void testRead() throws ObjectDoesntExistException {
+ // setup pre-existing Switch
+ KVSwitch sw = new KVSwitch(dpid1);
+ sw.setStatus(STATUS.ACTIVE);
+ sw.forceCreate();
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+ assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+
+ sw1.read();
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+ assertEquals(sw.getVersion(), sw1.getVersion());
+ assertEquals(dpid1, sw1.getDpid());
+ assertEquals(STATUS.ACTIVE, sw1.getStatus());
+ }
+
+ @Test(expected = ObjectDoesntExistException.class)
+ public void testReadFailNoExist() throws ObjectDoesntExistException {
+
+ sw1.read();
+ fail("Should have thrown an exception");
+ }
+
+ @Test
+ public void testUpdate() throws ObjectDoesntExistException, WrongVersionException {
+ // setup pre-existing Switch
+ KVSwitch sw = new KVSwitch(dpid1);
+ sw.setStatus(STATUS.ACTIVE);
+ sw.forceCreate();
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+ assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+
+
+ sw1.read();
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+
+ sw1.setStatus(STATUS.INACTIVE);
+ sw1.update();
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+ assertNotEquals(sw.getVersion(), sw1.getVersion());
+ assertEquals(dpid1, sw1.getDpid());
+ assertEquals(STATUS.INACTIVE, sw1.getStatus());
+ }
+
+ @Test
+ public void testDelete() throws ObjectDoesntExistException, WrongVersionException {
+ // setup pre-existing Switch
+ KVSwitch sw = new KVSwitch(dpid1);
+ sw.setStatus(STATUS.ACTIVE);
+ sw.forceCreate();
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+ assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+
+
+ try {
+ sw1.read();
+ } catch (ObjectDoesntExistException e) {
+ fail("Failed reading switch to delete");
+ }
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+ sw1.delete();
+ assertSwitchNotInDataStore(dpid1);
+ }
+
+ @Test
+ public void testForceDelete() {
+ // setup pre-existing Switch
+ KVSwitch sw = new KVSwitch(dpid1);
+ sw.setStatus(STATUS.ACTIVE);
+ sw.forceCreate();
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw.getVersion());
+ assertSwitchInDataStore(dpid1, STATUS.ACTIVE);
+
+
+ sw1.forceDelete();
+ assertNotEquals(DataStoreClient.getClient().VERSION_NONEXISTENT(), sw1.getVersion());
+ }
+
+}
diff --git a/src/test/java/net/onrc/onos/datastore/topology/KVTopologyTest.java b/src/test/java/net/onrc/onos/datastore/topology/KVTopologyTest.java
new file mode 100644
index 0000000..4fe0a1b
--- /dev/null
+++ b/src/test/java/net/onrc/onos/datastore/topology/KVTopologyTest.java
@@ -0,0 +1,423 @@
+package net.onrc.onos.datastore.topology;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.Matchers.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import net.onrc.onos.datastore.DataStoreClient;
+import net.onrc.onos.datastore.IKVClient;
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.utils.ByteArrayComparator;
+import net.onrc.onos.datastore.utils.KVObject;
+import net.onrc.onos.datastore.utils.KVObject.WriteOp;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KVTopologyTest {
+
+ static final long VERSION_NONEXISTENT = DataStoreClient.getClient().VERSION_NONEXISTENT();
+
+ private static final byte[] DEVICE2_MAC_SW2P2 = new byte[] { 6, 5, 4, 3, 2, 1, 0 };
+
+ private static final Long SW2_PORTNO2 = 2L;
+
+ private static final Long SW2_PORTNO1 = 1L;
+
+ private static final Long DPID2 = 0x2L;
+
+ private static final byte[] DEVICE1_MAC_SW1P1 = new byte[] { 0, 1, 2, 3, 4, 5, 6 };
+
+ private static final Long SW1_PORTNO2 = 2L;
+
+ private static final Long SW1_PORTNO1 = 1L;
+
+ private static final Long DPID1 = 0x1L;
+
+ @Before
+ @After
+ public void wipeTopology() throws Exception {
+ IKVTable switchTable = DataStoreClient.getClient().getTable(KVSwitch.GLOBAL_SWITCH_TABLE_NAME);
+ DataStoreClient.getClient().dropTable(switchTable);
+
+ IKVTable portTable = DataStoreClient.getClient().getTable(KVPort.GLOBAL_PORT_TABLE_NAME);
+ DataStoreClient.getClient().dropTable(portTable);
+
+ IKVTable linkTable = DataStoreClient.getClient().getTable(KVLink.GLOBAL_LINK_TABLE_NAME);
+ DataStoreClient.getClient().dropTable(linkTable);
+
+ IKVTable deviceTable = DataStoreClient.getClient().getTable(KVDevice.GLOBAL_DEVICE_TABLE_NAME);
+ DataStoreClient.getClient().dropTable(deviceTable);
+ }
+
+ @Test
+ public void basic_switch_test() {
+ // create switch 0x1
+ try {
+ KVSwitch sw = new KVSwitch(DPID1);
+ sw.setStatus(KVSwitch.STATUS.ACTIVE);
+ sw.create();
+ assertNotEquals(VERSION_NONEXISTENT, sw.getVersion());
+ assertEquals(DPID1, sw.getDpid());
+ assertEquals(KVSwitch.STATUS.ACTIVE, sw.getStatus());
+ } catch (ObjectExistsException e) {
+ e.printStackTrace();
+ fail("Create Switch Failed " + e);
+ }
+
+ // read switch 0x1
+ KVSwitch swRead = new KVSwitch(DPID1);
+ try {
+ swRead.read();
+ assertNotEquals(VERSION_NONEXISTENT, swRead.getVersion());
+ assertEquals(DPID1, swRead.getDpid());
+ assertEquals(KVSwitch.STATUS.ACTIVE, swRead.getStatus());
+ } catch (ObjectDoesntExistException e) {
+ e.printStackTrace();
+ fail("Reading Switch Failed " + e);
+ }
+
+ // and update 0x1
+ swRead.setStatus(KVSwitch.STATUS.INACTIVE);
+ try {
+ swRead.update();
+ assertNotEquals(VERSION_NONEXISTENT, swRead.getVersion());
+ assertEquals(DPID1, swRead.getDpid());
+ assertEquals(KVSwitch.STATUS.INACTIVE, swRead.getStatus());
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ e.printStackTrace();
+ fail("Updating Switch Failed " + e);
+ }
+
+ // read 0x1 again and delete
+ KVSwitch swRead2 = new KVSwitch(DPID1);
+ try {
+ swRead2.read();
+ assertNotEquals(VERSION_NONEXISTENT, swRead2.getVersion());
+ assertEquals(DPID1, swRead2.getDpid());
+ assertEquals(KVSwitch.STATUS.INACTIVE, swRead2.getStatus());
+ } catch (ObjectDoesntExistException e) {
+ e.printStackTrace();
+ fail("Reading Switch Again Failed " + e);
+ }
+
+ try {
+ swRead2.delete();
+ assertNotEquals(VERSION_NONEXISTENT, swRead2.getVersion());
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ e.printStackTrace();
+ fail("Deleting Switch Failed " + e);
+ }
+
+ // make sure 0x1 is deleted
+ KVObject swRead3 = new KVSwitch(DPID1);
+ try {
+ swRead3.read();
+ fail(swRead3 + " was supposed to be deleted, but read succeed");
+ } catch (ObjectDoesntExistException e) {
+ System.out.println("-- " + swRead3 + " not found as expected--");
+ e.printStackTrace(System.out);
+ System.out.println("---------------------------------------");
+ }
+ }
+
+ @Test
+ public void topology_setup_and_tear_down() {
+ topology_setup();
+ topology_walk();
+ topology_delete();
+ }
+
+ private static void topology_setup() {
+
+ // d1 - s1p1 - s1 - s1p2 - s2p1 - s2 - s2p2
+
+ KVSwitch sw1 = new KVSwitch(DPID1);
+ sw1.setStatus(KVSwitch.STATUS.ACTIVE);
+ try {
+ sw1.create();
+ assertNotEquals(VERSION_NONEXISTENT, sw1.getVersion());
+ assertEquals(DPID1, sw1.getDpid());
+ assertEquals(KVSwitch.STATUS.ACTIVE, sw1.getStatus());
+ } catch (ObjectExistsException e) {
+ e.printStackTrace();
+ fail("Switch creation failed " + e);
+ }
+
+ KVPort sw1p1 = new KVPort(DPID1, SW1_PORTNO1);
+ sw1p1.setStatus(KVPort.STATUS.ACTIVE);
+ KVPort sw1p2 = new KVPort(DPID1, SW1_PORTNO2);
+ sw1p2.setStatus(KVPort.STATUS.ACTIVE);
+ try {
+ sw1p1.create();
+ assertNotEquals(VERSION_NONEXISTENT, sw1p1.getVersion());
+ assertEquals(DPID1, sw1p1.getDpid());
+ assertEquals(SW1_PORTNO1, sw1p1.getNumber());
+ assertEquals(KVPort.STATUS.ACTIVE, sw1p1.getStatus());
+
+ sw1p2.create();
+ assertNotEquals(VERSION_NONEXISTENT, sw1p2.getVersion());
+ assertEquals(DPID1, sw1p2.getDpid());
+ assertEquals(SW1_PORTNO2, sw1p2.getNumber());
+ assertEquals(KVPort.STATUS.ACTIVE, sw1p2.getStatus());
+ } catch (ObjectExistsException e) {
+ e.printStackTrace();
+ fail("Port creation failed " + e);
+ }
+
+ try {
+ sw1.update();
+ assertNotEquals(VERSION_NONEXISTENT, sw1.getVersion());
+ assertEquals(DPID1, sw1.getDpid());
+ assertEquals(KVSwitch.STATUS.ACTIVE, sw1.getStatus());
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ e.printStackTrace();
+ fail("Switch update failed " + e);
+ }
+
+ KVDevice d1 = new KVDevice(DEVICE1_MAC_SW1P1);
+ d1.addPortId(sw1p1.getId());
+
+ try {
+ d1.create();
+ assertNotEquals(VERSION_NONEXISTENT, d1.getVersion());
+ assertEquals(1, d1.getAllPortIds().size());
+ assertArrayEquals(sw1p1.getId(), d1.getAllPortIds().iterator().next());
+
+ try {
+ sw1p1.update();
+ assertNotEquals(VERSION_NONEXISTENT, sw1p1.getVersion());
+ assertEquals(DPID1, sw1p1.getDpid());
+ assertEquals(SW1_PORTNO1, sw1p1.getNumber());
+ assertEquals(KVPort.STATUS.ACTIVE, sw1p1.getStatus());
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ e.printStackTrace();
+ fail("Link update failed " + e);
+ }
+ } catch (ObjectExistsException e) {
+ e.printStackTrace();
+ fail("Device creation failed " + e);
+ }
+
+ KVSwitch sw2 = new KVSwitch(DPID2);
+ sw2.setStatus(KVSwitch.STATUS.ACTIVE);
+ KVPort sw2p1 = new KVPort(DPID2, SW2_PORTNO1);
+ sw2p1.setStatus(KVPort.STATUS.ACTIVE);
+ KVPort sw2p2 = new KVPort(DPID2, SW2_PORTNO2);
+ sw2p2.setStatus(KVPort.STATUS.ACTIVE);
+
+ KVDevice d2 = new KVDevice(DEVICE2_MAC_SW2P2);
+ d2.addPortId(sw2p2.getId());
+
+ IKVClient client = DataStoreClient.getClient();
+
+ List<WriteOp> groupOp = Arrays.asList(
+ sw2.createOp(client), sw2p1.createOp(client),
+ sw2p2.createOp(client), d2.createOp(client) );
+ boolean failed = KVObject.multiWrite(groupOp);
+ if (failed) {
+ for ( WriteOp op : groupOp ) {
+ System.err.println(op);
+ }
+ fail("Some of Switch/Port/Device creation failed");
+ } else {
+ assertNotEquals(VERSION_NONEXISTENT, sw2.getVersion());
+ assertEquals(DPID2, sw2.getDpid());
+ assertEquals(KVSwitch.STATUS.ACTIVE, sw2.getStatus());
+
+ assertNotEquals(VERSION_NONEXISTENT, sw2p1.getVersion());
+ assertEquals(DPID2, sw2p1.getDpid());
+ assertEquals(SW2_PORTNO1, sw2p1.getNumber());
+ assertEquals(KVPort.STATUS.ACTIVE, sw2p1.getStatus());
+
+ assertNotEquals(VERSION_NONEXISTENT, sw2p2.getVersion());
+ assertEquals(DPID2, sw2p2.getDpid());
+ assertEquals(SW2_PORTNO2, sw2p2.getNumber());
+ assertEquals(KVPort.STATUS.ACTIVE, sw2p2.getStatus());
+
+ assertNotEquals(VERSION_NONEXISTENT, d2.getVersion());
+ assertEquals(1, d2.getAllPortIds().size());
+ assertArrayEquals(sw2p2.getId(), d2.getAllPortIds().iterator().next());
+ }
+
+ KVLink l1 = new KVLink(DPID1, SW1_PORTNO2, DPID2, SW2_PORTNO1);
+ l1.setStatus(KVLink.STATUS.ACTIVE);
+
+ try {
+ l1.create();
+ assertNotEquals(VERSION_NONEXISTENT, l1.getVersion());
+ assertEquals(KVLink.STATUS.ACTIVE, l1.getStatus());
+ assertArrayEquals(sw1.getId(), l1.getSrc().getSwitchID());
+ assertArrayEquals(sw1p2.getId(), l1.getSrc().getPortID());
+ assertArrayEquals(sw2.getId(), l1.getDst().getSwitchID());
+ assertArrayEquals(sw2p1.getId(), l1.getDst().getPortID());
+
+ try {
+ sw1p2.update();
+ assertNotEquals(VERSION_NONEXISTENT, sw1p2.getVersion());
+ assertEquals(DPID1, sw1p2.getDpid());
+ assertEquals(SW1_PORTNO2, sw1p2.getNumber());
+ assertEquals(KVPort.STATUS.ACTIVE, sw1p2.getStatus());
+
+ sw2p1.update();
+ assertNotEquals(VERSION_NONEXISTENT, sw2p1.getVersion());
+ assertEquals(DPID2, sw2p1.getDpid());
+ assertEquals(SW2_PORTNO1, sw2p1.getNumber());
+ assertEquals(KVPort.STATUS.ACTIVE, sw2p1.getStatus());
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ e.printStackTrace();
+ fail("Port update failed " + e);
+ }
+ } catch (ObjectExistsException e) {
+ e.printStackTrace();
+ fail("Link creation failed " + e);
+ }
+ }
+
+
+ private static void topology_walk() {
+ Iterable<KVSwitch> swIt = KVSwitch.getAllSwitches();
+ List<Long> switchesExpected = new ArrayList<>(Arrays.asList(DPID1, DPID2));
+
+ System.out.println("Enumerating Switches start");
+ for (KVSwitch sw : swIt) {
+ System.out.println(sw + " @ " + sw.getVersion());
+ assertNotEquals(VERSION_NONEXISTENT, sw.getVersion());
+ assertEquals(KVSwitch.STATUS.ACTIVE, sw.getStatus());
+ assertThat(sw.getDpid(), is(anyOf(equalTo(DPID1), equalTo(DPID2))));
+ assertThat(switchesExpected, hasItem(sw.getDpid()));
+ switchesExpected.remove(sw.getDpid());
+ }
+ System.out.println("Enumerating Switches end");
+
+ KVSwitch sw1 = new KVSwitch(DPID1);
+ try {
+ sw1.read();
+ assertNotEquals(VERSION_NONEXISTENT, sw1.getVersion());
+ assertEquals(DPID1, sw1.getDpid());
+ assertEquals(KVSwitch.STATUS.ACTIVE, sw1.getStatus());
+ } catch (ObjectDoesntExistException e) {
+ e.printStackTrace();
+ fail("Reading switch failed " + e);
+ }
+
+ KVSwitch sw2 = new KVSwitch(DPID2);
+ if (KVObject.multiRead( Arrays.asList(sw2) )) {
+ fail("Failed to read switch " + sw2);
+ } else {
+ assertNotEquals(VERSION_NONEXISTENT, sw2.getVersion());
+ assertEquals(DPID2, sw2.getDpid());
+ assertEquals(KVSwitch.STATUS.ACTIVE, sw2.getStatus());
+ }
+
+
+ // DPID -> [port_no]
+ @SuppressWarnings("serial")
+ Map<Long,List<Long>> expectedPorts = new HashMap<Long,List<Long>>() {{
+ put(DPID1, new ArrayList<>(Arrays.asList(SW1_PORTNO1, SW1_PORTNO2)));
+ put(DPID2, new ArrayList<>(Arrays.asList(SW2_PORTNO1, SW2_PORTNO2)));
+ }};
+
+ for (KVPort port : KVPort.getAllPorts()) {
+ System.out.println(port + " @ " + port.getVersion());
+ assertNotEquals(VERSION_NONEXISTENT, port.getVersion());
+ assertEquals(KVPort.STATUS.ACTIVE, port.getStatus());
+ assertThat(port.getDpid(), is(anyOf(equalTo(DPID1), equalTo(DPID2))));
+ assertThat(port.getNumber(), is(anyOf(equalTo(SW1_PORTNO1), equalTo(SW1_PORTNO2))));
+
+ assertThat(expectedPorts, hasKey(port.getDpid()));
+ assertThat(expectedPorts.get(port.getDpid()), hasItem(port.getNumber()));
+ expectedPorts.get(port.getDpid()).remove(port.getNumber());
+ }
+
+ // DeviceID -> PortID
+ @SuppressWarnings("serial")
+ Map<byte[], byte[]> expectedDevice = new TreeMap<byte[], byte[]>(ByteArrayComparator.BYTEARRAY_COMPARATOR) {{
+ put(DEVICE1_MAC_SW1P1, KVPort.getPortID(DPID1, SW1_PORTNO1));
+ put(DEVICE2_MAC_SW2P2, KVPort.getPortID(DPID2, SW2_PORTNO2));
+ }};
+
+ for (KVDevice device : KVDevice.getAllDevices()) {
+ System.out.println(device + " @ " + device.getVersion());
+ assertNotEquals(VERSION_NONEXISTENT, device.getVersion());
+
+ assertThat(expectedDevice, hasKey(device.getMac()));
+ assertThat(device.getAllPortIds(), hasItem(expectedDevice.get(device.getMac())));
+ expectedDevice.remove(device.getMac());
+ }
+
+ for (KVLink link : KVLink.getAllLinks()) {
+ System.out.println(link + " @ " + link.getVersion());
+ assertNotEquals(VERSION_NONEXISTENT, link.getVersion());
+
+ // there is currently only 1 link SW1P2->SW2P1
+ assertEquals(DPID1, link.getSrc().dpid);
+ assertEquals(SW1_PORTNO2, link.getSrc().number);
+ assertEquals(DPID2, link.getDst().dpid);
+ assertEquals(SW2_PORTNO1, link.getDst().number);
+ }
+
+ }
+
+
+ private static void topology_delete() {
+
+ for (KVSwitch sw : KVSwitch.getAllSwitches()) {
+ try {
+ sw.read();
+ sw.delete();
+ assertNotEquals(VERSION_NONEXISTENT, sw.getVersion());
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ e.printStackTrace();
+ fail("Delete Switch Failed " + e);
+ }
+ }
+
+ for (KVPort p : KVPort.getAllPorts()) {
+ try {
+ p.read();
+ p.delete();
+ assertNotEquals(VERSION_NONEXISTENT, p.getVersion());
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ e.printStackTrace();
+ fail("Delete Port Failed " + e);
+ }
+ }
+
+ for (KVDevice d : KVDevice.getAllDevices()) {
+ d.forceDelete();
+ assertNotEquals(VERSION_NONEXISTENT, d.getVersion());
+ }
+
+ for (KVLink l : KVLink.getAllLinks()) {
+ try {
+ l.read();
+ l.delete();
+ assertNotEquals(VERSION_NONEXISTENT, l.getVersion());
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ e.printStackTrace();
+ fail("Delete Link Failed " + e);
+ }
+ }
+ }
+
+ public static void main(final String[] argv) {
+
+ topology_setup();
+ topology_walk();
+ topology_delete();
+
+ System.exit(0);
+ }
+
+}