Eliminate tab character from datastore package
Change-Id: I6eea92ce72268c4ca90f1bd24bdccf7edf4cbb7f
diff --git a/src/main/java/net/onrc/onos/datastore/DataStoreClient.java b/src/main/java/net/onrc/onos/datastore/DataStoreClient.java
index 6aeb49e..bdf5df5 100644
--- a/src/main/java/net/onrc/onos/datastore/DataStoreClient.java
+++ b/src/main/java/net/onrc/onos/datastore/DataStoreClient.java
@@ -11,15 +11,15 @@
private DataStoreClient() {}
public static IKVClient getClient() {
- // TODO read config and return appropriate IKVClient
- switch (BACKEND) {
- case "ramcloud":
- return RCClient.getClient();
- case "hazelcast":
- return HZClient.getClient();
- default:
- return HZClient.getClient();
- }
+ // TODO read config and return appropriate IKVClient
+ switch (BACKEND) {
+ case "ramcloud":
+ return RCClient.getClient();
+ case "hazelcast":
+ return HZClient.getClient();
+ default:
+ return HZClient.getClient();
+ }
}
diff --git a/src/main/java/net/onrc/onos/datastore/IKVClient.java b/src/main/java/net/onrc/onos/datastore/IKVClient.java
index eba804c..d2602c9 100644
--- a/src/main/java/net/onrc/onos/datastore/IKVClient.java
+++ b/src/main/java/net/onrc/onos/datastore/IKVClient.java
@@ -65,7 +65,7 @@
* @throws WrongVersionException
*/
public long update(IKVTableID tableId, byte[] key, byte[] value, long version)
- throws ObjectDoesntExistException, WrongVersionException;
+ throws ObjectDoesntExistException, WrongVersionException;
/**
* Update an existing Key-Value entry in table, without checking version.
@@ -79,7 +79,7 @@
*/
@Deprecated
public long update(IKVTableID tableId, byte[] key, byte[] value)
- throws ObjectDoesntExistException;
+ throws ObjectDoesntExistException;
// TODO Adding serialized value as parameter to this interface may
// give an option to improve performance on some backends.
@@ -95,7 +95,7 @@
* @throws WrongVersionException
*/
public long delete(IKVTableID tableId, byte[] key, long version)
- throws ObjectDoesntExistException, WrongVersionException;
+ throws ObjectDoesntExistException, WrongVersionException;
/**
* Remove a Key-Value entry in table
@@ -123,15 +123,15 @@
public IMultiEntryOperation createOp(IKVTableID tableId, byte[] key, byte[] value);
public IMultiEntryOperation forceCreateOp(IKVTableID tableId, byte[] key,
- byte[] value);
+ byte[] value);
public IMultiEntryOperation readOp(IKVTableID tableId, byte[] key);
public IMultiEntryOperation updateOp(IKVTableID tableId, byte[] key, byte[] value,
- long version);
+ long version);
public IMultiEntryOperation deleteOp(IKVTableID tableId, byte[] key, byte[] value,
- long version);
+ long version);
public IMultiEntryOperation forceDeleteOp(IKVTableID tableId, byte[] key);
diff --git a/src/main/java/net/onrc/onos/datastore/IKVTable.java b/src/main/java/net/onrc/onos/datastore/IKVTable.java
index 0164293..3151808 100644
--- a/src/main/java/net/onrc/onos/datastore/IKVTable.java
+++ b/src/main/java/net/onrc/onos/datastore/IKVTable.java
@@ -17,11 +17,11 @@
*/
public static interface IKVEntry {
- public byte[] getKey();
+ public byte[] getKey();
- public byte[] getValue();
+ public byte[] getValue();
- public long getVersion();
+ public long getVersion();
}
@@ -70,7 +70,7 @@
* @throws WrongVersionException
*/
public long update(byte[] key, byte[] value, long version)
- throws ObjectDoesntExistException, WrongVersionException;
+ throws ObjectDoesntExistException, WrongVersionException;
/**
* Update an existing Key-Value entry in table, without checking version.
@@ -81,7 +81,7 @@
* @throws ObjectDoesntExistException
*/
public long update(byte[] key, byte[] value)
- throws ObjectDoesntExistException;
+ throws ObjectDoesntExistException;
/**
* Remove an existing Key-Value entry in table
@@ -94,7 +94,7 @@
* @throws WrongVersionException
*/
public long delete(byte[] key, long version)
- throws ObjectDoesntExistException, WrongVersionException;
+ throws ObjectDoesntExistException, WrongVersionException;
/**
* Remove a Key-Value entry in table
diff --git a/src/main/java/net/onrc/onos/datastore/IMultiEntryOperation.java b/src/main/java/net/onrc/onos/datastore/IMultiEntryOperation.java
index 25d5a13..dab1aad 100644
--- a/src/main/java/net/onrc/onos/datastore/IMultiEntryOperation.java
+++ b/src/main/java/net/onrc/onos/datastore/IMultiEntryOperation.java
@@ -6,11 +6,11 @@
public interface IMultiEntryOperation {
public enum STATUS {
- NOT_EXECUTED, SUCCESS, FAILED
+ NOT_EXECUTED, SUCCESS, FAILED
}
public enum OPERATION {
- CREATE, FORCE_CREATE, UPDATE, READ, DELETE, FORCE_DELETE
+ CREATE, FORCE_CREATE, UPDATE, READ, DELETE, FORCE_DELETE
}
public boolean hasSucceeded();
diff --git a/src/main/java/net/onrc/onos/datastore/ObjectDoesntExistException.java b/src/main/java/net/onrc/onos/datastore/ObjectDoesntExistException.java
index 712ba4e..030680f 100644
--- a/src/main/java/net/onrc/onos/datastore/ObjectDoesntExistException.java
+++ b/src/main/java/net/onrc/onos/datastore/ObjectDoesntExistException.java
@@ -9,17 +9,17 @@
private static final long serialVersionUID = 859082748533417866L;
public ObjectDoesntExistException(final String message) {
- super(message);
+ super(message);
}
public ObjectDoesntExistException(final IKVTableID tableID,
- final byte[] key, final Throwable cause) {
- super(ByteArrayUtil.toHexStringBuffer(key, ":")
- + " did not exist on table:" + tableID, cause);
+ final byte[] key, final Throwable cause) {
+ super(ByteArrayUtil.toHexStringBuffer(key, ":")
+ + " did not exist on table:" + tableID, cause);
}
public ObjectDoesntExistException(final IKVTableID tableID, final byte[] key) {
- super(ByteArrayUtil.toHexStringBuffer(key, ":")
- + " did not exist on table:" + tableID);
+ super(ByteArrayUtil.toHexStringBuffer(key, ":")
+ + " did not exist on table:" + tableID);
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/ObjectExistsException.java b/src/main/java/net/onrc/onos/datastore/ObjectExistsException.java
index 3ac3bf8..8ce6055 100644
--- a/src/main/java/net/onrc/onos/datastore/ObjectExistsException.java
+++ b/src/main/java/net/onrc/onos/datastore/ObjectExistsException.java
@@ -9,17 +9,17 @@
private static final long serialVersionUID = -1488647215779909457L;
public ObjectExistsException(final String message) {
- super(message);
+ super(message);
}
public ObjectExistsException(final IKVTableID tableID, final byte[] key,
- final Throwable cause) {
- super(ByteArrayUtil.toHexStringBuffer(key, ":")
- + " already exist on table:" + tableID, cause);
+ final Throwable cause) {
+ super(ByteArrayUtil.toHexStringBuffer(key, ":")
+ + " already exist on table:" + tableID, cause);
}
public ObjectExistsException(final IKVTableID tableID, final byte[] key) {
- super(ByteArrayUtil.toHexStringBuffer(key, ":")
- + " already exist on table:" + tableID);
+ super(ByteArrayUtil.toHexStringBuffer(key, ":")
+ + " already exist on table:" + tableID);
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/RejectRulesException.java b/src/main/java/net/onrc/onos/datastore/RejectRulesException.java
index 0bd5072..1777ef6 100644
--- a/src/main/java/net/onrc/onos/datastore/RejectRulesException.java
+++ b/src/main/java/net/onrc/onos/datastore/RejectRulesException.java
@@ -11,14 +11,14 @@
private static final long serialVersionUID = -1444683012320423530L;
public RejectRulesException(final String message) {
- super(message);
+ super(message);
}
public RejectRulesException(final String message, final Throwable cause) {
- super(message, cause);
+ super(message, cause);
}
public RejectRulesException(final Throwable cause) {
- super(cause);
+ super(cause);
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/WrongVersionException.java b/src/main/java/net/onrc/onos/datastore/WrongVersionException.java
index 9b59ff1..441409f 100644
--- a/src/main/java/net/onrc/onos/datastore/WrongVersionException.java
+++ b/src/main/java/net/onrc/onos/datastore/WrongVersionException.java
@@ -9,23 +9,23 @@
private static final long serialVersionUID = -1644202495890190823L;
public WrongVersionException(final String message) {
- super(message);
+ super(message);
}
public WrongVersionException(final IKVTableID tableID, final byte[] key,
- final long expectedVersion, final Throwable cause) {
- // It will be best if {@code cause} has actual version encountered, but
- // doesn't currently.
- super(ByteArrayUtil.toHexStringBuffer(key, ":") + " on table:"
- + tableID + " was expected to be version:" + expectedVersion,
- cause);
+ final long expectedVersion, final Throwable cause) {
+ // It will be best if {@code cause} has actual version encountered, but
+ // doesn't currently.
+ super(ByteArrayUtil.toHexStringBuffer(key, ":") + " on table:"
+ + tableID + " was expected to be version:" + expectedVersion,
+ cause);
}
public WrongVersionException(final IKVTableID tableID, final byte[] key,
- final long expectedVersion, final long encounteredVersion) {
- super(ByteArrayUtil.toHexStringBuffer(key, ":") + " on table:"
- + tableID + " was expected to be version:" + expectedVersion
- + " but found:" + encounteredVersion);
+ final long expectedVersion, final long encounteredVersion) {
+ super(ByteArrayUtil.toHexStringBuffer(key, ":") + " on table:"
+ + tableID + " was expected to be version:" + expectedVersion
+ + " but found:" + encounteredVersion);
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/hazelcast/HZClient.java b/src/main/java/net/onrc/onos/datastore/hazelcast/HZClient.java
index 2c2b3a4..aec9f9d 100644
--- a/src/main/java/net/onrc/onos/datastore/hazelcast/HZClient.java
+++ b/src/main/java/net/onrc/onos/datastore/hazelcast/HZClient.java
@@ -49,71 +49,71 @@
private static final HZClient THE_INSTANCE = new HZClient();
public static HZClient getClient() {
- return THE_INSTANCE;
+ return THE_INSTANCE;
}
private HZClient() {
- hazelcastInstance = getHZinstance(BASE_CONFIG_FILENAME);
+ hazelcastInstance = getHZinstance(BASE_CONFIG_FILENAME);
}
private static HazelcastInstance getHZinstance(final String hazelcastConfigFileName) {
- Config baseHzConfig = null;
- try {
- baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
- } catch (FileNotFoundException e) {
- log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
- throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName , e);
- }
+ Config baseHzConfig = null;
+ try {
+ baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
+ } catch (FileNotFoundException e) {
+ log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
+ throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName , e);
+ }
- // use xml config if present, if not use System.property
- MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
- if (mapConfig != null) {
- backupCount = mapConfig.getBackupCount();
- }
+ // use xml config if present, if not use System.property
+ MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
+ if (mapConfig != null) {
+ backupCount = mapConfig.getBackupCount();
+ }
- HazelcastInstance instance = null;
- if (useClientMode) {
- log.info("Configuring Hazelcast datastore as Client mode");
- ClientConfig clientConfig = new ClientConfig();
- final int port = baseHzConfig.getNetworkConfig().getPort();
+ HazelcastInstance instance = null;
+ if (useClientMode) {
+ log.info("Configuring Hazelcast datastore as Client mode");
+ ClientConfig clientConfig = new ClientConfig();
+ final int port = baseHzConfig.getNetworkConfig().getPort();
- String server = System.getProperty("net.onrc.onos.datastore.hazelcast.client.server", "localhost");
- clientConfig.addAddress(server + ":" + port);
+ String server = System.getProperty("net.onrc.onos.datastore.hazelcast.client.server", "localhost");
+ clientConfig.addAddress(server + ":" + port);
- // copy group config from base Hazelcast configuration
- clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
- clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
+ // copy group config from base Hazelcast configuration
+ clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
+ clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
- // TODO We probably need to figure out what else need to be
- // derived from baseConfig
+ // TODO We probably need to figure out what else need to be
+ // derived from baseConfig
- registerSerializer(clientConfig.getSerializationConfig());
+ registerSerializer(clientConfig.getSerializationConfig());
- log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
+ log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
- try {
- instance = HazelcastClient.newHazelcastClient(clientConfig);
- if (!instance.getCluster().getMembers().isEmpty()) {
- log.debug("Members in cluster: " + instance.getCluster().getMembers());
- return instance;
- }
- log.info("Failed to find cluster member, falling back to Instance mode");
- } catch (IllegalStateException e) {
- log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
- }
- useClientMode = false;
- instance = null;
- }
- log.info("Configuring Hazelcast datastore as Instance mode");
+ try {
+ instance = HazelcastClient.newHazelcastClient(clientConfig);
+ if (!instance.getCluster().getMembers().isEmpty()) {
+ log.debug("Members in cluster: " + instance.getCluster().getMembers());
+ return instance;
+ }
+ log.info("Failed to find cluster member, falling back to Instance mode");
+ } catch (IllegalStateException e) {
+ log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
+ }
+ useClientMode = false;
+ instance = null;
+ }
+ log.info("Configuring Hazelcast datastore as Instance mode");
- // To run 2 Hazelcast instance in 1 JVM,
- // we probably need to something like below
- //int port = hazelcastConfig.getNetworkConfig().getPort();
- //hazelcastConfig.getNetworkConfig().setPort(port+1);
+ // To run 2 Hazelcast instance in 1 JVM,
+ // we probably need to something like below
+ //int port = hazelcastConfig.getNetworkConfig().getPort();
+ //hazelcastConfig.getNetworkConfig().setPort(port+1);
- registerSerializer(baseHzConfig.getSerializationConfig());
+ registerSerializer(baseHzConfig.getSerializationConfig());
- return Hazelcast.newHazelcastInstance(baseHzConfig);
+ return Hazelcast.newHazelcastInstance(baseHzConfig);
}
/**
@@ -121,224 +121,224 @@
* @param config
*/
private static void registerSerializer(final SerializationConfig config) {
- config.addDataSerializableFactoryClass(
- VersionedValueSerializableFactory.FACTORY_ID,
- VersionedValueSerializableFactory.class);
+ config.addDataSerializableFactoryClass(
+ VersionedValueSerializableFactory.FACTORY_ID,
+ VersionedValueSerializableFactory.class);
}
@Override
public IKVTable getTable(final String tableName) {
- IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
+ IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
- if (!useClientMode) {
- // config only available in Instance Mode
- // Client Mode must rely on hazelcast.xml to be properly configured.
- MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
- // config for this map to be strong consistent
- if (config.isReadBackupData()) {
- config.setReadBackupData(false);
- }
- if (config.isNearCacheEnabled()) {
- config.getNearCacheConfig().setMaxSize(0);
- }
+ if (!useClientMode) {
+ // config only available in Instance Mode
+ // Client Mode must rely on hazelcast.xml to be properly configured.
+ MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
+ // config for this map to be strong consistent
+ if (config.isReadBackupData()) {
+ config.setReadBackupData(false);
+ }
+ if (config.isNearCacheEnabled()) {
+ config.getNearCacheConfig().setMaxSize(0);
+ }
- if (config.getBackupCount() != backupCount) {
- config.setAsyncBackupCount(0);
- config.setBackupCount(backupCount);
- }
- }
+ if (config.getBackupCount() != backupCount) {
+ config.setAsyncBackupCount(0);
+ config.setBackupCount(backupCount);
+ }
+ }
- return new HZTable(tableName, map);
+ return new HZTable(tableName, map);
}
@Override
public void dropTable(final IKVTable table) {
- ((HZTable) table).getBackendMap().clear();
+ ((HZTable) table).getBackendMap().clear();
}
@Override
public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
throws ObjectExistsException {
- IKVTable table = (IKVTable) tableId;
- return table.create(key, value);
+ IKVTable table = (IKVTable) tableId;
+ return table.create(key, value);
}
@Override
public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
- IKVTable table = (IKVTable) tableId;
- return table.forceCreate(key, value);
+ IKVTable table = (IKVTable) tableId;
+ return table.forceCreate(key, value);
}
@Override
public IKVEntry read(final IKVTableID tableId, final byte[] key)
throws ObjectDoesntExistException {
- IKVTable table = (IKVTable) tableId;
- return table.read(key);
+ IKVTable table = (IKVTable) tableId;
+ return table.read(key);
}
@Override
public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
final long version) throws ObjectDoesntExistException,
WrongVersionException {
- IKVTable table = (IKVTable) tableId;
- return table.update(key, value, version);
+ IKVTable table = (IKVTable) tableId;
+ return table.update(key, value, version);
}
@Override
public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
throws ObjectDoesntExistException {
- IKVTable table = (IKVTable) tableId;
- return table.update(key, value);
+ IKVTable table = (IKVTable) tableId;
+ return table.update(key, value);
}
@Override
public long delete(final IKVTableID tableId, final byte[] key, final long version)
throws ObjectDoesntExistException, WrongVersionException {
- IKVTable table = (IKVTable) tableId;
- return table.delete(key, version);
+ IKVTable table = (IKVTable) tableId;
+ return table.delete(key, version);
}
@Override
public long forceDelete(final IKVTableID tableId, final byte[] key) {
- IKVTable table = (IKVTable) tableId;
- return table.forceDelete(key);
+ IKVTable table = (IKVTable) tableId;
+ return table.forceDelete(key);
}
@Override
public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
- IKVTable table = (IKVTable) tableId;
- return table.getAllEntries();
+ IKVTable table = (IKVTable) tableId;
+ return table.getAllEntries();
}
@Override
public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
final byte[] value) {
- return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
}
@Override
public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
final byte[] value) {
- return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
}
@Override
public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
- return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
+ return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
}
@Override
public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
final byte[] value, final long version) {
- return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
}
@Override
public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
final byte[] value, final long version) {
- return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
}
@Override
public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
- return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
+ return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
}
@Override
public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
- boolean failExists = false;
- for (IMultiEntryOperation op : ops) {
- HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
- switch (mop.getOperation()) {
- case DELETE:
- try {
- final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
- mop.setVersion(version);
- mop.setStatus(STATUS.SUCCESS);
- } catch (ObjectDoesntExistException | WrongVersionException e) {
- log.error(mop + " failed.", e);
- mop.setStatus(STATUS.FAILED);
+ boolean failExists = false;
+ for (IMultiEntryOperation op : ops) {
+ HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
+ switch (mop.getOperation()) {
+ case DELETE:
+ try {
+ final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ log.error(mop + " failed.", e);
+ mop.setStatus(STATUS.FAILED);
failExists = true;
- }
- break;
- case FORCE_DELETE:
- final long version = forceDelete(mop.getTableId(), mop.getKey());
- mop.setVersion(version);
- mop.setStatus(STATUS.SUCCESS);
- break;
- default:
- throw new UnsupportedOperationException(mop.toString());
- }
- }
- return failExists;
+ }
+ break;
+ case FORCE_DELETE:
+ final long version = forceDelete(mop.getTableId(), mop.getKey());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ break;
+ default:
+ throw new UnsupportedOperationException(mop.toString());
+ }
+ }
+ return failExists;
}
@Override
public boolean multiWrite(final List<IMultiEntryOperation> ops) {
- // there may be room to batch to improve performance
- boolean failExists = false;
- for (IMultiEntryOperation op : ops) {
- IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
- switch (mop.getOperation()) {
- case CREATE:
- try {
- long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
- mop.setVersion(version);
- mop.setStatus(STATUS.SUCCESS);
- } catch (ObjectExistsException e) {
- log.error(mop + " failed.", e);
- mop.setStatus(STATUS.FAILED);
- failExists = true;
- }
- break;
- case FORCE_CREATE:
- {
- final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
- mop.setVersion(version);
- mop.setStatus(STATUS.SUCCESS);
- break;
- }
- case UPDATE:
- try {
- long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
- mop.setVersion(version);
- mop.setStatus(STATUS.SUCCESS);
- } catch (ObjectDoesntExistException | WrongVersionException e) {
- log.error(mop + " failed.", e);
- mop.setStatus(STATUS.FAILED);
- failExists = true;
- }
- break;
- default:
- throw new UnsupportedOperationException(mop.toString());
- }
- }
- return failExists;
+ // there may be room to batch to improve performance
+ boolean failExists = false;
+ for (IMultiEntryOperation op : ops) {
+ IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+ switch (mop.getOperation()) {
+ case CREATE:
+ try {
+ long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ } catch (ObjectExistsException e) {
+ log.error(mop + " failed.", e);
+ mop.setStatus(STATUS.FAILED);
+ failExists = true;
+ }
+ break;
+ case FORCE_CREATE:
+ {
+ final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ break;
+ }
+ case UPDATE:
+ try {
+ long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ log.error(mop + " failed.", e);
+ mop.setStatus(STATUS.FAILED);
+ failExists = true;
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(mop.toString());
+ }
+ }
+ return failExists;
}
@Override
public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
- boolean failExists = false;
- for (IMultiEntryOperation op : ops) {
- IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
- HZTable table = (HZTable) op.getTableId();
- ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
- }
- for (IMultiEntryOperation op : ops) {
- IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
- if (mop.hasSucceeded()) {
- // status update is already done, nothing to do.
- } else {
- failExists = true;
- }
- }
+ boolean failExists = false;
+ for (IMultiEntryOperation op : ops) {
+ IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+ HZTable table = (HZTable) op.getTableId();
+ ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
+ }
+ for (IMultiEntryOperation op : ops) {
+ IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+ if (mop.hasSucceeded()) {
+ // status update is already done, nothing to do.
+ } else {
+ failExists = true;
+ }
+ }
- return failExists;
+ return failExists;
}
@Override
public long VERSION_NONEXISTENT() {
- return VERSION_NONEXISTENT;
+ return VERSION_NONEXISTENT;
}
diff --git a/src/main/java/net/onrc/onos/datastore/hazelcast/HZMultiEntryOperation.java b/src/main/java/net/onrc/onos/datastore/hazelcast/HZMultiEntryOperation.java
index 1da7f1a..67a868e 100644
--- a/src/main/java/net/onrc/onos/datastore/hazelcast/HZMultiEntryOperation.java
+++ b/src/main/java/net/onrc/onos/datastore/hazelcast/HZMultiEntryOperation.java
@@ -33,13 +33,13 @@
* @param operation
*/
public HZMultiEntryOperation(final HZTable table, final byte[] key, final OPERATION operation) {
- this.table = table;
- this.key = key;
- this.status = STATUS.NOT_EXECUTED;
- this.operation = operation;
+ this.table = table;
+ this.key = key;
+ this.status = STATUS.NOT_EXECUTED;
+ this.operation = operation;
- this.future = null;
- this.writeValue = null;
+ this.future = null;
+ this.writeValue = null;
}
/**
@@ -51,59 +51,59 @@
* @param operation
*/
public HZMultiEntryOperation(final HZTable table, final byte[] key, final byte[] value, final long version, final OPERATION operation) {
- this.table = table;
- this.key = key;
- this.status = STATUS.NOT_EXECUTED;
- this.operation = operation;
+ this.table = table;
+ this.key = key;
+ this.status = STATUS.NOT_EXECUTED;
+ this.operation = operation;
- this.future = null;
- this.writeValue = new VersionedValue(value, version);
+ this.future = null;
+ this.writeValue = new VersionedValue(value, version);
}
@Override
public boolean hasSucceeded() {
- VersionedValue value = get();
- return (value != null) && (this.status == STATUS.SUCCESS);
+ VersionedValue value = get();
+ return (value != null) && (this.status == STATUS.SUCCESS);
}
@Override
public STATUS getStatus() {
- get();
- return status;
+ get();
+ return status;
}
@Override
public IKVTableID getTableId() {
- return table;
+ return table;
}
@Override
public byte[] getKey() {
- return key;
+ return key;
}
@Override
public byte[] getValue() {
- if (future != null) {
- VersionedValue value = get();
- return value.getValue();
- }
- return writeValue.getValue();
+ if (future != null) {
+ VersionedValue value = get();
+ return value.getValue();
+ }
+ return writeValue.getValue();
}
@Override
public long getVersion() {
- if (future != null) {
- VersionedValue value = get();
- return value.getVersion();
- }
- return writeValue.getVersion();
+ if (future != null) {
+ VersionedValue value = get();
+ return value.getVersion();
+ }
+ return writeValue.getVersion();
}
@Override
public OPERATION getOperation() {
- return operation;
+ return operation;
}
/**
@@ -125,28 +125,28 @@
@Override
public void setValue(final byte[] value, final long version) {
- writeValue = new VersionedValue(value, version);
- setVersion(version);
+ writeValue = new VersionedValue(value, version);
+ setVersion(version);
}
@Override
public void setVersion(final long version) {
- if (future != null) {
- // no-op on read
- }
- if (writeValue == null) {
- writeValue = new VersionedValue(null, version);
- }
+ if (future != null) {
+ // no-op on read
+ }
+ if (writeValue == null) {
+ writeValue = new VersionedValue(null, version);
+ }
}
@Override
public void setStatus(final STATUS status) {
- this.status = status;
+ this.status = status;
}
@Override
public IModifiableMultiEntryOperation getActualOperation() {
- return this;
+ return this;
}
void setFuture(final Future<VersionedValue> future) {
@@ -155,8 +155,8 @@
@Override
public String toString() {
- return "[HZMultiEntryOperation table=" + table + ", key="
- + ByteArrayUtil.toHexStringBuffer(key, ":") + ", operation=" + operation
- + ", status=" + status + ", writeValue=" + writeValue + "]";
+ return "[HZMultiEntryOperation table=" + table + ", key="
+ + ByteArrayUtil.toHexStringBuffer(key, ":") + ", operation=" + operation
+ + ", status=" + status + ", writeValue=" + writeValue + "]";
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/hazelcast/HZTable.java b/src/main/java/net/onrc/onos/datastore/hazelcast/HZTable.java
index ec98349..894854c 100644
--- a/src/main/java/net/onrc/onos/datastore/hazelcast/HZTable.java
+++ b/src/main/java/net/onrc/onos/datastore/hazelcast/HZTable.java
@@ -33,12 +33,12 @@
* @return initial value
*/
protected static long getInitialVersion() {
- long version = initialVersion.incrementAndGet();
- if (version == HZClient.VERSION_NONEXISTENT) {
- // used up whole 64bit space?
- version = initialVersion.incrementAndGet();
- }
- return version;
+ long version = initialVersion.incrementAndGet();
+ if (version == HZClient.VERSION_NONEXISTENT) {
+ // used up whole 64bit space?
+ version = initialVersion.incrementAndGet();
+ }
+ return version;
}
/**
@@ -47,141 +47,141 @@
* @return
*/
protected static long getNextVersion(final long version) {
- long nextVersion = version + 1;
- if (nextVersion == HZClient.VERSION_NONEXISTENT) {
- ++nextVersion;
- }
- return nextVersion;
+ long nextVersion = version + 1;
+ if (nextVersion == HZClient.VERSION_NONEXISTENT) {
+ ++nextVersion;
+ }
+ return nextVersion;
}
static class VersionedValue implements IdentifiedDataSerializable {
- private static final long serialVersionUID = -3149375966890712708L;
+ private static final long serialVersionUID = -3149375966890712708L;
- private byte[] value;
- private long version;
+ private byte[] value;
+ private long version;
- protected VersionedValue() {
- value = new byte[0];
- version = HZClient.VERSION_NONEXISTENT;
- }
+ protected VersionedValue() {
+ value = new byte[0];
+ version = HZClient.VERSION_NONEXISTENT;
+ }
- public VersionedValue(final byte[] value, final long version) {
- this.value = value;
- this.version = version;
- }
+ public VersionedValue(final byte[] value, final long version) {
+ this.value = value;
+ this.version = version;
+ }
- public byte[] getValue() {
- return value;
- }
+ public byte[] getValue() {
+ return value;
+ }
- public long getVersion() {
- return version;
- }
+ public long getVersion() {
+ return version;
+ }
- public void setValue(final byte[] value) {
- this.value = value;
- }
+ public void setValue(final byte[] value) {
+ this.value = value;
+ }
- public void setNextVersion() {
- this.version = getNextVersion(this.version);
- }
+ public void setNextVersion() {
+ this.version = getNextVersion(this.version);
+ }
- @Override
- public void writeData(final ObjectDataOutput out) throws IOException {
- out.writeLong(version);
- out.writeInt(value.length);
- if (value.length > 0) {
- out.write(value);
- }
- }
+ @Override
+ public void writeData(final ObjectDataOutput out) throws IOException {
+ out.writeLong(version);
+ out.writeInt(value.length);
+ if (value.length > 0) {
+ out.write(value);
+ }
+ }
- @Override
- public void readData(final ObjectDataInput in) throws IOException {
- version = in.readLong();
- final int valueLen = in.readInt();
- value = new byte[valueLen];
- in.readFully(value);
- }
+ @Override
+ public void readData(final ObjectDataInput in) throws IOException {
+ version = in.readLong();
+ final int valueLen = in.readInt();
+ value = new byte[valueLen];
+ in.readFully(value);
+ }
- @Override
- public int getFactoryId() {
- return VersionedValueSerializableFactory.FACTORY_ID;
- }
+ @Override
+ public int getFactoryId() {
+ return VersionedValueSerializableFactory.FACTORY_ID;
+ }
- @Override
- public int getId() {
- return VersionedValueSerializableFactory.VERSIONED_VALUE_ID;
- }
+ @Override
+ public int getId() {
+ return VersionedValueSerializableFactory.VERSIONED_VALUE_ID;
+ }
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (int) (version ^ (version >>> 32));
- result = prime * result + Arrays.hashCode(value);
- return result;
- }
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (version ^ (version >>> 32));
+ result = prime * result + Arrays.hashCode(value);
+ return result;
+ }
- @Override
- public boolean equals(final Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- VersionedValue other = (VersionedValue) obj;
- if (version != other.version) {
- return false;
- }
- if (!Arrays.equals(value, other.value)) {
- return false;
- }
- return true;
- }
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ VersionedValue other = (VersionedValue) obj;
+ if (version != other.version) {
+ return false;
+ }
+ if (!Arrays.equals(value, other.value)) {
+ return false;
+ }
+ return true;
+ }
}
// TODO Refactor and extract common parts
public static class Entry implements IKVEntry {
- final byte[] key;
- byte[] value;
- long version;
+ final byte[] key;
+ byte[] value;
+ long version;
- public Entry(final byte[] key, final byte[] value, final long version) {
- this.key = key;
- this.setValue(value);
- this.setVersion(version);
- }
+ public Entry(final byte[] key, final byte[] value, final long version) {
+ this.key = key;
+ this.setValue(value);
+ this.setVersion(version);
+ }
- public Entry(final byte[] key) {
- this(key, null, HZClient.VERSION_NONEXISTENT);
- }
+ public Entry(final byte[] key) {
+ this(key, null, HZClient.VERSION_NONEXISTENT);
+ }
- @Override
- public byte[] getKey() {
- return key;
- }
+ @Override
+ public byte[] getKey() {
+ return key;
+ }
- @Override
- public byte[] getValue() {
- return value;
- }
+ @Override
+ public byte[] getValue() {
+ return value;
+ }
- @Override
- public long getVersion() {
- return version;
- }
+ @Override
+ public long getVersion() {
+ return version;
+ }
- void setValue(final byte[] value) {
- this.value = value;
- }
+ void setValue(final byte[] value) {
+ this.value = value;
+ }
- void setVersion(final long version) {
- this.version = version;
- }
+ void setVersion(final long version) {
+ this.version = version;
+ }
}
@@ -190,136 +190,136 @@
private final IMap<byte[], VersionedValue> map;
public HZTable(final String mapName, final IMap<byte[], VersionedValue> map) {
- this.mapName = mapName;
- this.map = map;
+ this.mapName = mapName;
+ this.map = map;
}
@Override
public String getTableName() {
- return mapName;
+ return mapName;
}
@Override
public IKVTableID getTableId() {
- return this;
+ return this;
}
@Override
public long create(final byte[] key, final byte[] value) throws ObjectExistsException {
- final long version = getInitialVersion();
- VersionedValue existing = map.putIfAbsent(key, new VersionedValue(value, version));
- if (existing != null) {
- throw new ObjectExistsException(this, key);
- }
- return version;
+ final long version = getInitialVersion();
+ VersionedValue existing = map.putIfAbsent(key, new VersionedValue(value, version));
+ if (existing != null) {
+ throw new ObjectExistsException(this, key);
+ }
+ return version;
}
@Override
public long forceCreate(final byte[] key, final byte[] value) {
- final long version = getInitialVersion();
- map.set(key, new VersionedValue(value, version));
- return version;
+ final long version = getInitialVersion();
+ map.set(key, new VersionedValue(value, version));
+ return version;
}
@Override
public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
- final VersionedValue value = map.get(key);
- if (value == null) {
- throw new ObjectDoesntExistException(this, key);
- }
- return new Entry(key, value.getValue(), value.getVersion());
+ final VersionedValue value = map.get(key);
+ if (value == null) {
+ throw new ObjectDoesntExistException(this, key);
+ }
+ return new Entry(key, value.getValue(), value.getVersion());
}
@Override
public long update(final byte[] key, final byte[] value, final long version)
throws ObjectDoesntExistException, WrongVersionException {
- try {
- map.lock(key);
- final VersionedValue oldValue = map.get(key);
- if (oldValue == null) {
- throw new ObjectDoesntExistException(this, key);
- }
- if (oldValue.getVersion() != version) {
- throw new WrongVersionException(this, key, version, oldValue.getVersion());
- }
- final long nextVersion = getNextVersion(version);
- map.set(key, new VersionedValue(value, nextVersion));
- return nextVersion;
- } finally {
- map.unlock(key);
- }
+ try {
+ map.lock(key);
+ final VersionedValue oldValue = map.get(key);
+ if (oldValue == null) {
+ throw new ObjectDoesntExistException(this, key);
+ }
+ if (oldValue.getVersion() != version) {
+ throw new WrongVersionException(this, key, version, oldValue.getVersion());
+ }
+ final long nextVersion = getNextVersion(version);
+ map.set(key, new VersionedValue(value, nextVersion));
+ return nextVersion;
+ } finally {
+ map.unlock(key);
+ }
}
@Override
public long update(final byte[] key, final byte[] value)
throws ObjectDoesntExistException {
- try {
- map.lock(key);
- final VersionedValue valueInMap = map.get(key);
- if (valueInMap == null) {
- throw new ObjectDoesntExistException(this, key);
- }
- valueInMap.setValue(value);
- valueInMap.setNextVersion();
- map.set(key, valueInMap);
- return valueInMap.getVersion();
- } finally {
- map.unlock(key);
- }
+ try {
+ map.lock(key);
+ final VersionedValue valueInMap = map.get(key);
+ if (valueInMap == null) {
+ throw new ObjectDoesntExistException(this, key);
+ }
+ valueInMap.setValue(value);
+ valueInMap.setNextVersion();
+ map.set(key, valueInMap);
+ return valueInMap.getVersion();
+ } finally {
+ map.unlock(key);
+ }
}
@Override
public long delete(final byte[] key, final long version)
throws ObjectDoesntExistException, WrongVersionException {
- try {
- map.lock(key);
- final VersionedValue oldValue = map.get(key);
- if (oldValue == null) {
- throw new ObjectDoesntExistException(this, key);
- }
- if (oldValue.getVersion() != version) {
- throw new WrongVersionException(this, key, version, oldValue.getVersion());
- }
- map.delete(key);
- return oldValue.getVersion();
- } finally {
- map.unlock(key);
- }
+ try {
+ map.lock(key);
+ final VersionedValue oldValue = map.get(key);
+ if (oldValue == null) {
+ throw new ObjectDoesntExistException(this, key);
+ }
+ if (oldValue.getVersion() != version) {
+ throw new WrongVersionException(this, key, version, oldValue.getVersion());
+ }
+ map.delete(key);
+ return oldValue.getVersion();
+ } finally {
+ map.unlock(key);
+ }
}
@Override
public long forceDelete(final byte[] key) {
- final VersionedValue valueInMap = map.remove(key);
- if (valueInMap == null) {
- return HZClient.VERSION_NONEXISTENT;
- }
- return valueInMap.getVersion();
+ final VersionedValue valueInMap = map.remove(key);
+ if (valueInMap == null) {
+ return HZClient.VERSION_NONEXISTENT;
+ }
+ return valueInMap.getVersion();
}
@Override
public Iterable<IKVEntry> getAllEntries() {
- final Set<IMap.Entry<byte[], VersionedValue>> entries = map.entrySet();
- List<IKVEntry> entryList = new ArrayList<IKVTable.IKVEntry>(entries.size());
- for (IMap.Entry<byte[], VersionedValue> entry : entries) {
- entryList.add(new Entry(entry.getKey(), entry.getValue().getValue(), entry.getValue().getVersion()));
- }
- return entryList;
+ final Set<IMap.Entry<byte[], VersionedValue>> entries = map.entrySet();
+ List<IKVEntry> entryList = new ArrayList<IKVTable.IKVEntry>(entries.size());
+ for (IMap.Entry<byte[], VersionedValue> entry : entries) {
+ entryList.add(new Entry(entry.getKey(), entry.getValue().getValue(), entry.getValue().getVersion()));
+ }
+ return entryList;
}
@Override
public String toString() {
- return "[HZTable " + mapName + "]";
+ return "[HZTable " + mapName + "]";
}
IMap<byte[], VersionedValue> getBackendMap() {
- return this.map;
+ return this.map;
}
@Override
public long VERSION_NONEXISTENT() {
- return HZClient.VERSION_NONEXISTENT;
+ return HZClient.VERSION_NONEXISTENT;
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/hazelcast/VersionedValueSerializableFactory.java b/src/main/java/net/onrc/onos/datastore/hazelcast/VersionedValueSerializableFactory.java
index a4e7fb9..3326890 100644
--- a/src/main/java/net/onrc/onos/datastore/hazelcast/VersionedValueSerializableFactory.java
+++ b/src/main/java/net/onrc/onos/datastore/hazelcast/VersionedValueSerializableFactory.java
@@ -1,6 +1,5 @@
package net.onrc.onos.datastore.hazelcast;
-
import com.hazelcast.nio.serialization.DataSerializableFactory;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
@@ -13,13 +12,13 @@
@Override
public IdentifiedDataSerializable create(final int typeId) {
- switch (typeId) {
- case VERSIONED_VALUE_ID:
- return new HZTable.VersionedValue();
+ switch (typeId) {
+ case VERSIONED_VALUE_ID:
+ return new HZTable.VersionedValue();
- default:
- return null;
- }
+ default:
+ return null;
+ }
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/ramcloud/RCClient.java b/src/main/java/net/onrc/onos/datastore/ramcloud/RCClient.java
index 0f18d9c..a11e82f 100644
--- a/src/main/java/net/onrc/onos/datastore/ramcloud/RCClient.java
+++ b/src/main/java/net/onrc/onos/datastore/ramcloud/RCClient.java
@@ -47,16 +47,16 @@
// FIXME come up with a proper way to retrieve configuration
public static final int MAX_MULTI_READS = Math.max(1, Integer
- .valueOf(System.getProperty("ramcloud.max_multi_reads", "400")));
+ .valueOf(System.getProperty("ramcloud.max_multi_reads", "400")));
public static final int MAX_MULTI_WRITES = Math.max(1, Integer
- .valueOf(System.getProperty("ramcloud.max_multi_writes", "800")));
+ .valueOf(System.getProperty("ramcloud.max_multi_writes", "800")));
private static final ThreadLocal<JRamCloud> tlsRCClient = new ThreadLocal<JRamCloud>() {
- @Override
- protected JRamCloud initialValue() {
- return new JRamCloud(getCoordinatorUrl(config));
- }
+ @Override
+ protected JRamCloud initialValue() {
+ return new JRamCloud(getCoordinatorUrl(config));
+ }
};
/**
@@ -66,47 +66,47 @@
* may be accessed later by another thread.
*/
static JRamCloud getJRamCloudClient() {
- return tlsRCClient.get();
+ return tlsRCClient.get();
}
// Currently RCClient is state-less
private static final RCClient theInstance= new RCClient();
public static RCClient getClient() {
- return theInstance;
+ return theInstance;
}
public static final Configuration getConfiguration() {
- final File configFile = new File(System.getProperty("ramcloud.config.path", DB_CONFIG_FILE));
- return getConfiguration(configFile);
+ final File configFile = new File(System.getProperty("ramcloud.config.path", DB_CONFIG_FILE));
+ return getConfiguration(configFile);
}
public static final Configuration getConfiguration(final File configFile) {
- if (configFile == null) {
- throw new IllegalArgumentException("Need to specify a configuration file or storage directory");
- }
+ if (configFile == null) {
+ throw new IllegalArgumentException("Need to specify a configuration file or storage directory");
+ }
- if (!configFile.isFile()) {
- throw new IllegalArgumentException("Location of configuration must be a file");
- }
+ if (!configFile.isFile()) {
+ throw new IllegalArgumentException("Location of configuration must be a file");
+ }
- try {
- return new PropertiesConfiguration(configFile);
- } catch (ConfigurationException e) {
- throw new IllegalArgumentException("Could not load configuration at: " + configFile, e);
- }
+ try {
+ return new PropertiesConfiguration(configFile);
+ } catch (ConfigurationException e) {
+ throw new IllegalArgumentException("Could not load configuration at: " + configFile, e);
+ }
}
public static String getCoordinatorUrl(final Configuration configuration) {
- final String coordinatorIp = configuration.getString("ramcloud.coordinatorIp", "fast+udp:host=127.0.0.1");
- final String coordinatorPort = configuration.getString("ramcloud.coordinatorPort", "port=12246");
- final String coordinatorURL = coordinatorIp + "," + coordinatorPort;
- return coordinatorURL;
+ final String coordinatorIp = configuration.getString("ramcloud.coordinatorIp", "fast+udp:host=127.0.0.1");
+ final String coordinatorPort = configuration.getString("ramcloud.coordinatorPort", "port=12246");
+ final String coordinatorURL = coordinatorIp + "," + coordinatorPort;
+ return coordinatorURL;
}
@Override
public IMultiEntryOperation createOp(IKVTableID tableId, byte[] key, byte[] value) {
- return RCMultiEntryOperation.create(tableId, key, value);
+ return RCMultiEntryOperation.create(tableId, key, value);
}
/**
@@ -114,459 +114,459 @@
*/
@Override
public long create(IKVTableID tableId, byte[] key, byte[] value)
- throws ObjectExistsException {
+ throws ObjectExistsException {
- RCTableID rcTableId = (RCTableID) tableId;
- JRamCloud rcClient = RCClient.getJRamCloudClient();
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
- RejectRules rules = new RejectRules();
- rules.rejectIfExists();
+ RejectRules rules = new RejectRules();
+ rules.rejectIfExists();
- try {
- return rcClient.write(rcTableId.getTableID(), key, value, rules);
- } catch (JRamCloud.ObjectExistsException e) {
- throw new ObjectExistsException(rcTableId, key, e);
- } catch (JRamCloud.RejectRulesException e) {
- log.error("Unexpected RejectRulesException", e);
- return JRamCloud.VERSION_NONEXISTENT;
- }
+ try {
+ return rcClient.write(rcTableId.getTableID(), key, value, rules);
+ } catch (JRamCloud.ObjectExistsException e) {
+ throw new ObjectExistsException(rcTableId, key, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return JRamCloud.VERSION_NONEXISTENT;
+ }
}
@Override
public IMultiEntryOperation forceCreateOp(IKVTableID tableId, byte[] key, byte[] value) {
- return RCMultiEntryOperation.forceCreate(tableId, key, value);
+ return RCMultiEntryOperation.forceCreate(tableId, key, value);
}
@Override
public long forceCreate(IKVTableID tableId, byte[] key, byte[] value) {
- RCTableID rcTableId = (RCTableID) tableId;
- JRamCloud rcClient = RCClient.getJRamCloudClient();
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
- long updated_version = rcClient.write(rcTableId.getTableID(), key, value);
- return updated_version;
+ long updated_version = rcClient.write(rcTableId.getTableID(), key, value);
+ return updated_version;
}
@Override
public IMultiEntryOperation readOp(IKVTableID tableId, byte[] key) {
- return RCMultiEntryOperation.read(tableId, key);
+ return RCMultiEntryOperation.read(tableId, key);
}
@Override
public IKVEntry read(IKVTableID tableId, byte[] key)
- throws ObjectDoesntExistException {
+ throws ObjectDoesntExistException {
- RCTableID rcTableId = (RCTableID) tableId;
- JRamCloud rcClient = RCClient.getJRamCloudClient();
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
- RejectRules rules = new RejectRules();
- rules.rejectIfDoesntExists();
- try {
- JRamCloud.Object rcObj = rcClient.read(rcTableId.getTableID(), key, rules);
- return new Entry(rcObj.key, rcObj.value, rcObj.version);
- } catch (JRamCloud.ObjectDoesntExistException e) {
- throw new ObjectDoesntExistException(rcTableId, key, e);
- } catch (JRamCloud.RejectRulesException e) {
- log.error("Unexpected RejectRulesException", e);
- return null;
- }
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
+ try {
+ JRamCloud.Object rcObj = rcClient.read(rcTableId.getTableID(), key, rules);
+ return new Entry(rcObj.key, rcObj.value, rcObj.version);
+ } catch (JRamCloud.ObjectDoesntExistException e) {
+ throw new ObjectDoesntExistException(rcTableId, key, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return null;
+ }
}
@Override
public IMultiEntryOperation updateOp(IKVTableID tableId, byte[] key, byte[] value,long version) {
- return RCMultiEntryOperation.update(tableId, key, value, version);
+ return RCMultiEntryOperation.update(tableId, key, value, version);
}
@Override
public long update(IKVTableID tableId, byte[] key, byte[] value,
- long version) throws ObjectDoesntExistException,
- WrongVersionException {
+ long version) throws ObjectDoesntExistException,
+ WrongVersionException {
- RCTableID rcTableId = (RCTableID) tableId;
- JRamCloud rcClient = RCClient.getJRamCloudClient();
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
- RejectRules rules = new RejectRules();
- rules.rejectIfDoesntExists();
- rules.rejectIfNeVersion(version);
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
+ rules.rejectIfNeVersion(version);
- try {
- return rcClient.write(rcTableId.getTableID(), key, value, rules);
- } catch (JRamCloud.ObjectDoesntExistException e) {
- throw new ObjectDoesntExistException(rcTableId, key, e);
- } catch (JRamCloud.WrongVersionException e) {
- throw new WrongVersionException(rcTableId, key, version, e);
- } catch (JRamCloud.RejectRulesException e) {
- log.error("Unexpected RejectRulesException", e);
- return JRamCloud.VERSION_NONEXISTENT;
- }
+ try {
+ return rcClient.write(rcTableId.getTableID(), key, value, rules);
+ } catch (JRamCloud.ObjectDoesntExistException e) {
+ throw new ObjectDoesntExistException(rcTableId, key, e);
+ } catch (JRamCloud.WrongVersionException e) {
+ throw new WrongVersionException(rcTableId, key, version, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return JRamCloud.VERSION_NONEXISTENT;
+ }
}
@Override
public long update(IKVTableID tableId, byte[] key, byte[] value)
- throws ObjectDoesntExistException {
+ throws ObjectDoesntExistException {
- RCTableID rcTableId = (RCTableID) tableId;
- JRamCloud rcClient = RCClient.getJRamCloudClient();
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
- RejectRules rules = new RejectRules();
- rules.rejectIfDoesntExists();
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
- try {
- return rcClient.write(rcTableId.getTableID(), key, value, rules);
- } catch (JRamCloud.ObjectDoesntExistException e) {
- throw new ObjectDoesntExistException(rcTableId, key, e);
- } catch (JRamCloud.RejectRulesException e) {
- log.error("Unexpected RejectRulesException", e);
- return JRamCloud.VERSION_NONEXISTENT;
- }
+ try {
+ return rcClient.write(rcTableId.getTableID(), key, value, rules);
+ } catch (JRamCloud.ObjectDoesntExistException e) {
+ throw new ObjectDoesntExistException(rcTableId, key, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return JRamCloud.VERSION_NONEXISTENT;
+ }
}
@Override
public IMultiEntryOperation deleteOp(IKVTableID tableId, byte[] key, byte[] value,long version) {
- return RCMultiEntryOperation.delete(tableId, key, value, version);
+ return RCMultiEntryOperation.delete(tableId, key, value, version);
}
@Override
public long delete(IKVTableID tableId, byte[] key, long version)
- throws ObjectDoesntExistException, WrongVersionException {
+ throws ObjectDoesntExistException, WrongVersionException {
- RCTableID rcTableId = (RCTableID) tableId;
- JRamCloud rcClient = RCClient.getJRamCloudClient();
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
- RejectRules rules = new RejectRules();
- rules.rejectIfDoesntExists();
- rules.rejectIfNeVersion(version);
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
+ rules.rejectIfNeVersion(version);
- try {
- return rcClient.remove(rcTableId.getTableID(), key, rules);
- } catch (JRamCloud.ObjectDoesntExistException e) {
- throw new ObjectDoesntExistException(rcTableId, key, e);
- } catch (JRamCloud.WrongVersionException e) {
- throw new WrongVersionException(rcTableId, key, version, e);
- } catch (JRamCloud.RejectRulesException e) {
- log.error("Unexpected RejectRulesException", e);
- return JRamCloud.VERSION_NONEXISTENT;
- }
+ try {
+ return rcClient.remove(rcTableId.getTableID(), key, rules);
+ } catch (JRamCloud.ObjectDoesntExistException e) {
+ throw new ObjectDoesntExistException(rcTableId, key, e);
+ } catch (JRamCloud.WrongVersionException e) {
+ throw new WrongVersionException(rcTableId, key, version, e);
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Unexpected RejectRulesException", e);
+ return JRamCloud.VERSION_NONEXISTENT;
+ }
}
@Override
public IMultiEntryOperation forceDeleteOp(IKVTableID tableId, byte[] key) {
- return RCMultiEntryOperation.forceDelete(tableId, key);
+ return RCMultiEntryOperation.forceDelete(tableId, key);
}
@Override
public long forceDelete(IKVTableID tableId, byte[] key) {
- RCTableID rcTableId = (RCTableID) tableId;
- JRamCloud rcClient = RCClient.getJRamCloudClient();
- long removed_version = rcClient.remove(rcTableId.getTableID(), key);
- return removed_version;
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+ long removed_version = rcClient.remove(rcTableId.getTableID(), key);
+ return removed_version;
}
@Override
public Iterable<IKVEntry> getAllEntries(IKVTableID tableId) {
- return new RCTableEntryIterable((RCTableID) tableId);
+ return new RCTableEntryIterable((RCTableID) tableId);
}
static class RCTableEntryIterable implements Iterable<IKVEntry> {
- private final RCTableID tableId;
+ private final RCTableID tableId;
- public RCTableEntryIterable(final RCTableID tableId) {
- this.tableId = tableId;
- }
+ public RCTableEntryIterable(final RCTableID tableId) {
+ this.tableId = tableId;
+ }
- @Override
- public Iterator<IKVEntry> iterator() {
- return new RCClient.RCTableIterator(tableId);
- }
+ @Override
+ public Iterator<IKVEntry> iterator() {
+ return new RCClient.RCTableIterator(tableId);
+ }
}
public static class RCTableIterator implements Iterator<IKVEntry> {
- private final RCTableID tableId;
- protected final TableEnumerator2 enumerator;
- private JRamCloud.Object last;
+ private final RCTableID tableId;
+ protected final TableEnumerator2 enumerator;
+ private JRamCloud.Object last;
- public RCTableIterator(final RCTableID tableId) {
- this.tableId = tableId;
- this.enumerator = getJRamCloudClient().new TableEnumerator2(tableId.getTableID());
- this.last = null;
- }
+ public RCTableIterator(final RCTableID tableId) {
+ this.tableId = tableId;
+ this.enumerator = getJRamCloudClient().new TableEnumerator2(tableId.getTableID());
+ this.last = null;
+ }
- @Override
- public boolean hasNext() {
- return this.enumerator.hasNext();
- }
+ @Override
+ public boolean hasNext() {
+ return this.enumerator.hasNext();
+ }
- @Override
- public RCTable.Entry next() {
- last = enumerator.next();
- return new RCTable.Entry(last.key, last.value, last.version);
- }
+ @Override
+ public RCTable.Entry next() {
+ last = enumerator.next();
+ return new RCTable.Entry(last.key, last.value, last.version);
+ }
- @Override
- public void remove() {
- if (last != null) {
- getJRamCloudClient();
- JRamCloud rcClient = RCClient.getJRamCloudClient();
+ @Override
+ public void remove() {
+ if (last != null) {
+ getJRamCloudClient();
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
- RejectRules rules = new RejectRules();
- rules.rejectIfNeVersion(last.version);
- try {
- rcClient.remove(tableId.getTableID(), last.key, rules);
- } catch (RejectRulesException e) {
- log.trace("remove failed", e);
- }
- last = null;
- }
- }
+ RejectRules rules = new RejectRules();
+ rules.rejectIfNeVersion(last.version);
+ try {
+ rcClient.remove(tableId.getTableID(), last.key, rules);
+ } catch (RejectRulesException e) {
+ log.trace("remove failed", e);
+ }
+ last = null;
+ }
+ }
}
@Override
public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
- if ( ops.size() <= MAX_MULTI_READS && ops instanceof ArrayList) {
- @SuppressWarnings({ "unchecked", "rawtypes" })
- final ArrayList<RCMultiEntryOperation> arrays = (ArrayList)ops;
- return multiReadInternal(arrays);
- }
+ if ( ops.size() <= MAX_MULTI_READS && ops instanceof ArrayList) {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final ArrayList<RCMultiEntryOperation> arrays = (ArrayList)ops;
+ return multiReadInternal(arrays);
+ }
- boolean fail_exists = false;
+ boolean fail_exists = false;
- ArrayList<RCMultiEntryOperation> req = new ArrayList<>();
- Iterator<IMultiEntryOperation> it = ops.iterator();
- while (it.hasNext()) {
+ ArrayList<RCMultiEntryOperation> req = new ArrayList<>();
+ Iterator<IMultiEntryOperation> it = ops.iterator();
+ while (it.hasNext()) {
- req.add((RCMultiEntryOperation) it.next());
+ req.add((RCMultiEntryOperation) it.next());
- if (req.size() >= MAX_MULTI_READS) {
- // dispatch multiRead
- fail_exists |= multiReadInternal(req);
- req.clear();
- }
- }
+ if (req.size() >= MAX_MULTI_READS) {
+ // dispatch multiRead
+ fail_exists |= multiReadInternal(req);
+ req.clear();
+ }
+ }
- if (!req.isEmpty()) {
- // dispatch multiRead
- fail_exists |= multiReadInternal(req);
- req.clear();
- }
+ if (!req.isEmpty()) {
+ // dispatch multiRead
+ fail_exists |= multiReadInternal(req);
+ req.clear();
+ }
- return fail_exists;
+ return fail_exists;
}
@Override
public boolean multiWrite(final List<IMultiEntryOperation> ops) {
- if ( ops.size() <= MAX_MULTI_WRITES && ops instanceof ArrayList) {
- @SuppressWarnings({ "unchecked", "rawtypes" })
- final ArrayList<RCMultiEntryOperation> arrays = (ArrayList)ops;
- return multiWriteInternal(arrays);
- }
+ if ( ops.size() <= MAX_MULTI_WRITES && ops instanceof ArrayList) {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final ArrayList<RCMultiEntryOperation> arrays = (ArrayList)ops;
+ return multiWriteInternal(arrays);
+ }
- boolean fail_exists = false;
+ boolean fail_exists = false;
- ArrayList<RCMultiEntryOperation> req = new ArrayList<>();
- Iterator<IMultiEntryOperation> it = ops.iterator();
- while (it.hasNext()) {
+ ArrayList<RCMultiEntryOperation> req = new ArrayList<>();
+ Iterator<IMultiEntryOperation> it = ops.iterator();
+ while (it.hasNext()) {
- req.add((RCMultiEntryOperation) it.next());
+ req.add((RCMultiEntryOperation) it.next());
- if (req.size() >= MAX_MULTI_WRITES) {
- // dispatch multiWrite
- fail_exists |= multiWriteInternal(req);
- req.clear();
- }
- }
+ if (req.size() >= MAX_MULTI_WRITES) {
+ // dispatch multiWrite
+ fail_exists |= multiWriteInternal(req);
+ req.clear();
+ }
+ }
- if (!req.isEmpty()) {
- // dispatch multiWrite
- fail_exists |= multiWriteInternal(req);
- req.clear();
- }
+ if (!req.isEmpty()) {
+ // dispatch multiWrite
+ fail_exists |= multiWriteInternal(req);
+ req.clear();
+ }
- return fail_exists;
+ return fail_exists;
}
@Override
public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
- // TODO implement multiRemove JNI, etc. if we need performance
+ // TODO implement multiRemove JNI, etc. if we need performance
- boolean fail_exists = false;
- JRamCloud rcClient = getJRamCloudClient();
+ boolean fail_exists = false;
+ JRamCloud rcClient = getJRamCloudClient();
- for (IMultiEntryOperation iop : ops) {
- RCMultiEntryOperation op = (RCMultiEntryOperation)iop;
- switch (op.getOperation()) {
- case DELETE:
- RejectRules rules = new RejectRules();
- rules.rejectIfDoesntExists();
- rules.rejectIfNeVersion(op.getVersion());
+ for (IMultiEntryOperation iop : ops) {
+ RCMultiEntryOperation op = (RCMultiEntryOperation)iop;
+ switch (op.getOperation()) {
+ case DELETE:
+ RejectRules rules = new RejectRules();
+ rules.rejectIfDoesntExists();
+ rules.rejectIfNeVersion(op.getVersion());
- try {
- long removed_version = rcClient.remove(op.tableId.getTableID(), op.entry.getKey(), rules);
- op.entry.setVersion(removed_version);
- op.status = STATUS.SUCCESS;
- } catch (JRamCloud.ObjectDoesntExistException|JRamCloud.WrongVersionException e) {
- log.error("Failed to remove key:" + ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), "") + " from tableID:" + op.tableId, e );
- fail_exists = true;
- op.status = STATUS.FAILED;
- } catch (JRamCloud.RejectRulesException e) {
- log.error("Failed to remove key:" + ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), "") + " from tableID:" + op.tableId, e );
- fail_exists = true;
- op.status = STATUS.FAILED;
- }
- break;
+ try {
+ long removed_version = rcClient.remove(op.tableId.getTableID(), op.entry.getKey(), rules);
+ op.entry.setVersion(removed_version);
+ op.status = STATUS.SUCCESS;
+ } catch (JRamCloud.ObjectDoesntExistException|JRamCloud.WrongVersionException e) {
+ log.error("Failed to remove key:" + ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), "") + " from tableID:" + op.tableId, e );
+ fail_exists = true;
+ op.status = STATUS.FAILED;
+ } catch (JRamCloud.RejectRulesException e) {
+ log.error("Failed to remove key:" + ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), "") + " from tableID:" + op.tableId, e );
+ fail_exists = true;
+ op.status = STATUS.FAILED;
+ }
+ break;
- case FORCE_DELETE:
- long removed_version = rcClient.remove(op.tableId.getTableID(), op.entry.getKey());
- if (removed_version != JRamCloud.VERSION_NONEXISTENT) {
- op.entry.setVersion(removed_version);
- op.status = STATUS.SUCCESS;
- } else {
- log.error("Failed to remove key:{} from tableID:{}", ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), ""), op.tableId );
- fail_exists = true;
- op.status = STATUS.FAILED;
- }
- break;
+ case FORCE_DELETE:
+ long removed_version = rcClient.remove(op.tableId.getTableID(), op.entry.getKey());
+ if (removed_version != JRamCloud.VERSION_NONEXISTENT) {
+ op.entry.setVersion(removed_version);
+ op.status = STATUS.SUCCESS;
+ } else {
+ log.error("Failed to remove key:{} from tableID:{}", ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), ""), op.tableId );
+ fail_exists = true;
+ op.status = STATUS.FAILED;
+ }
+ break;
- default:
- log.error("Invalid operation {} specified on multiDelete", op.getOperation() );
- fail_exists = true;
- op.status = STATUS.FAILED;
- break;
- }
- }
- return fail_exists;
+ default:
+ log.error("Invalid operation {} specified on multiDelete", op.getOperation() );
+ fail_exists = true;
+ op.status = STATUS.FAILED;
+ break;
+ }
+ }
+ return fail_exists;
}
private boolean multiReadInternal(final ArrayList<RCMultiEntryOperation> ops) {
- boolean fail_exists = false;
- JRamCloud rcClient = RCClient.getJRamCloudClient();
+ boolean fail_exists = false;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
- final int reqs = ops.size();
+ final int reqs = ops.size();
- MultiReadObject multiReadObjects = new MultiReadObject(reqs);
+ MultiReadObject multiReadObjects = new MultiReadObject(reqs);
- // setup multi-read operation objects
- for (int i = 0; i < reqs; ++i) {
- IMultiEntryOperation op = ops.get(i);
- multiReadObjects.setObject(i, ((RCTableID)op.getTableId()).getTableID(), op.getKey());
- }
+ // setup multi-read operation objects
+ for (int i = 0; i < reqs; ++i) {
+ IMultiEntryOperation op = ops.get(i);
+ multiReadObjects.setObject(i, ((RCTableID)op.getTableId()).getTableID(), op.getKey());
+ }
- // execute
- JRamCloud.Object[] results = rcClient.multiRead(multiReadObjects.tableId, multiReadObjects.key, multiReadObjects.keyLength, reqs);
- if (results.length != reqs) {
- log.error("multiRead returned unexpected number of results. (requested:{}, returned:{})", reqs, results.length);
- fail_exists = true;
- }
+ // execute
+ JRamCloud.Object[] results = rcClient.multiRead(multiReadObjects.tableId, multiReadObjects.key, multiReadObjects.keyLength, reqs);
+ if (results.length != reqs) {
+ log.error("multiRead returned unexpected number of results. (requested:{}, returned:{})", reqs, results.length);
+ fail_exists = true;
+ }
- for (int i = 0; i < results.length; ++i) {
- IModifiableMultiEntryOperation op = ops.get(i);
- if (results[i] == null) {
- log.error("MultiRead error, skipping {}, {}", op.getTableId(), op);
- fail_exists = true;
- op.setStatus(STATUS.FAILED);
- continue;
- }
- assert (Arrays.equals(results[i].key, op.getKey()));
+ for (int i = 0; i < results.length; ++i) {
+ IModifiableMultiEntryOperation op = ops.get(i);
+ if (results[i] == null) {
+ log.error("MultiRead error, skipping {}, {}", op.getTableId(), op);
+ fail_exists = true;
+ op.setStatus(STATUS.FAILED);
+ continue;
+ }
+ assert (Arrays.equals(results[i].key, op.getKey()));
- op.setValue(results[i].value, results[i].version);
- if (results[i].version == JRamCloud.VERSION_NONEXISTENT) {
- fail_exists = true;
- op.setStatus(STATUS.FAILED);
- } else {
- op.setStatus(STATUS.SUCCESS);
- }
- }
+ op.setValue(results[i].value, results[i].version);
+ if (results[i].version == JRamCloud.VERSION_NONEXISTENT) {
+ fail_exists = true;
+ op.setStatus(STATUS.FAILED);
+ } else {
+ op.setStatus(STATUS.SUCCESS);
+ }
+ }
- return fail_exists;
+ return fail_exists;
}
private boolean multiWriteInternal(final ArrayList<RCMultiEntryOperation> ops) {
- boolean fail_exists = false;
- JRamCloud rcClient = RCClient.getJRamCloudClient();
+ boolean fail_exists = false;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
- final int reqs = ops.size();
+ final int reqs = ops.size();
- MultiWriteObject multiWriteObjects = new MultiWriteObject(reqs);
+ MultiWriteObject multiWriteObjects = new MultiWriteObject(reqs);
- for (int i = 0; i < reqs; ++i) {
+ for (int i = 0; i < reqs; ++i) {
- IModifiableMultiEntryOperation op = ops.get(i);
- RejectRules rules = new RejectRules();
+ IModifiableMultiEntryOperation op = ops.get(i);
+ RejectRules rules = new RejectRules();
- switch (op.getOperation()) {
- case CREATE:
- rules.rejectIfExists();
- break;
- case FORCE_CREATE:
- // no reject rule
- break;
- case UPDATE:
- rules.rejectIfDoesntExists();
- rules.rejectIfNeVersion(op.getVersion());
- break;
+ switch (op.getOperation()) {
+ case CREATE:
+ rules.rejectIfExists();
+ break;
+ case FORCE_CREATE:
+ // no reject rule
+ break;
+ case UPDATE:
+ rules.rejectIfDoesntExists();
+ rules.rejectIfNeVersion(op.getVersion());
+ break;
- default:
- log.error("Invalid operation {} specified on multiWriteInternal", op.getOperation() );
- fail_exists = true;
- op.setStatus(STATUS.FAILED);
- return fail_exists;
- }
- multiWriteObjects.setObject(i, ((RCTableID)op.getTableId()).getTableID(), op.getKey(), op.getValue(), rules);
- }
+ default:
+ log.error("Invalid operation {} specified on multiWriteInternal", op.getOperation() );
+ fail_exists = true;
+ op.setStatus(STATUS.FAILED);
+ return fail_exists;
+ }
+ multiWriteObjects.setObject(i, ((RCTableID)op.getTableId()).getTableID(), op.getKey(), op.getValue(), rules);
+ }
- MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects.tableId, multiWriteObjects.key, multiWriteObjects.keyLength, multiWriteObjects.value, multiWriteObjects.valueLength, ops.size(), multiWriteObjects.rules);
- if (results.length != reqs) {
- log.error("multiWrite returned unexpected number of results. (requested:{}, returned:{})", reqs, results.length);
- fail_exists = true;
- }
+ MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects.tableId, multiWriteObjects.key, multiWriteObjects.keyLength, multiWriteObjects.value, multiWriteObjects.valueLength, ops.size(), multiWriteObjects.rules);
+ if (results.length != reqs) {
+ log.error("multiWrite returned unexpected number of results. (requested:{}, returned:{})", reqs, results.length);
+ fail_exists = true;
+ }
- for (int i = 0; i < results.length; ++i) {
- IModifiableMultiEntryOperation op = ops.get(i);
+ for (int i = 0; i < results.length; ++i) {
+ IModifiableMultiEntryOperation op = ops.get(i);
- if (results[i] != null
- && results[i].getStatus() == RCClient.STATUS_OK) {
- op.setStatus(STATUS.SUCCESS);
- op.setVersion(results[i].getVersion());
- } else {
- op.setStatus(STATUS.FAILED);
- fail_exists = true;
- }
- }
+ if (results[i] != null
+ && results[i].getStatus() == RCClient.STATUS_OK) {
+ op.setStatus(STATUS.SUCCESS);
+ op.setVersion(results[i].getVersion());
+ } else {
+ op.setStatus(STATUS.FAILED);
+ fail_exists = true;
+ }
+ }
- return fail_exists;
+ return fail_exists;
}
private static final ConcurrentHashMap<String, RCTable> tables = new ConcurrentHashMap<>();
@Override
public IKVTable getTable(final String tableName) {
- RCTable table = tables.get(tableName);
- if (table == null) {
- RCTable new_table = new RCTable(tableName);
- RCTable existing_table = tables
- .putIfAbsent(tableName, new_table);
- if (existing_table != null) {
- return existing_table;
- } else {
- return new_table;
- }
- }
- return table;
+ RCTable table = tables.get(tableName);
+ if (table == null) {
+ RCTable new_table = new RCTable(tableName);
+ RCTable existing_table = tables
+ .putIfAbsent(tableName, new_table);
+ if (existing_table != null) {
+ return existing_table;
+ } else {
+ return new_table;
+ }
+ }
+ return table;
}
@Override
public void dropTable(IKVTable table) {
- JRamCloud rcClient = RCClient.getJRamCloudClient();
- rcClient.dropTable(table.getTableId().getTableName());
- tables.remove(table.getTableId().getTableName());
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+ rcClient.dropTable(table.getTableId().getTableName());
+ tables.remove(table.getTableId().getTableName());
}
static final long VERSION_NONEXISTENT = JRamCloud.VERSION_NONEXISTENT;
@Override
public long VERSION_NONEXISTENT() {
- return VERSION_NONEXISTENT;
+ return VERSION_NONEXISTENT;
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/ramcloud/RCMultiEntryOperation.java b/src/main/java/net/onrc/onos/datastore/ramcloud/RCMultiEntryOperation.java
index 0feec0a..28c5a0b 100644
--- a/src/main/java/net/onrc/onos/datastore/ramcloud/RCMultiEntryOperation.java
+++ b/src/main/java/net/onrc/onos/datastore/ramcloud/RCMultiEntryOperation.java
@@ -14,70 +14,70 @@
@Override
public boolean hasSucceeded() {
- return this.status == STATUS.SUCCESS;
+ return this.status == STATUS.SUCCESS;
}
@Override
public STATUS getStatus() {
- return status;
+ return status;
}
@Override
public IKVTableID getTableId() {
- return tableId;
+ return tableId;
}
@Override
public byte[] getKey() {
- return entry.key;
+ return entry.key;
}
@Override
public byte[] getValue() {
- return entry.value;
+ return entry.value;
}
@Override
public long getVersion() {
- return entry.version;
+ return entry.version;
}
@Override
public OPERATION getOperation() {
- return operation;
+ return operation;
}
@Override
public void setStatus(final STATUS status) {
- this.status = status;
+ this.status = status;
}
@Override
public void setValue(byte[] value, final long version) {
- this.entry.setValue(value);
- setVersion(version);
+ this.entry.setValue(value);
+ setVersion(version);
}
@Override
public void setVersion(final long version) {
- this.entry.setVersion(version);
+ this.entry.setVersion(version);
}
public RCMultiEntryOperation(final IKVTableID tableId, final Entry entry, final OPERATION operation) {
- this.tableId = (RCTableID) tableId;
- this.operation = operation;
+ this.tableId = (RCTableID) tableId;
+ this.operation = operation;
- this.entry = entry;
- this.status = STATUS.NOT_EXECUTED;
+ this.entry = entry;
+ this.status = STATUS.NOT_EXECUTED;
}
public static IMultiEntryOperation create(final IKVTableID tableId, final byte[] key, final byte[] value) {
- return new RCMultiEntryOperation(tableId, new Entry(key,value, RCClient.VERSION_NONEXISTENT), OPERATION.CREATE);
+ return new RCMultiEntryOperation(tableId, new Entry(key,value, RCClient.VERSION_NONEXISTENT), OPERATION.CREATE);
}
public static IMultiEntryOperation forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
- return new RCMultiEntryOperation(tableId, new Entry(key,value, RCClient.VERSION_NONEXISTENT), OPERATION.FORCE_CREATE);
+ return new RCMultiEntryOperation(tableId, new Entry(key,value, RCClient.VERSION_NONEXISTENT), OPERATION.FORCE_CREATE);
}
/**
@@ -87,29 +87,29 @@
* @param key key of an Entry to read
*/
public static IMultiEntryOperation read(final IKVTableID tableId, final byte[] key) {
- return new RCMultiEntryOperation(tableId, new Entry(key), OPERATION.READ);
+ return new RCMultiEntryOperation(tableId, new Entry(key), OPERATION.READ);
}
public static IMultiEntryOperation update(final IKVTableID tableId, final byte[] key, final byte[] value, final long version) {
- return new RCMultiEntryOperation(tableId, new Entry(key,value, version), OPERATION.UPDATE);
+ return new RCMultiEntryOperation(tableId, new Entry(key,value, version), OPERATION.UPDATE);
}
public static IMultiEntryOperation delete(final IKVTableID tableId, final byte[] key, final byte[] value, final long version) {
- return new RCMultiEntryOperation(tableId, new Entry(key,value, version), OPERATION.DELETE);
+ return new RCMultiEntryOperation(tableId, new Entry(key,value, version), OPERATION.DELETE);
}
public static IMultiEntryOperation forceDelete(final IKVTableID tableId, final byte[] key) {
- return new RCMultiEntryOperation(tableId, new Entry(key), OPERATION.FORCE_DELETE);
+ return new RCMultiEntryOperation(tableId, new Entry(key), OPERATION.FORCE_DELETE);
}
@Override
public IModifiableMultiEntryOperation getActualOperation() {
- return this;
+ return this;
}
@Override
public String toString() {
- return "[RCMultiEntryOperation tableId=" + tableId + ", entry=" + entry
- + ", operation=" + operation + ", status=" + status + "]";
+ return "[RCMultiEntryOperation tableId=" + tableId + ", entry=" + entry
+ + ", operation=" + operation + ", status=" + status + "]";
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/ramcloud/RCTable.java b/src/main/java/net/onrc/onos/datastore/ramcloud/RCTable.java
index f823eae..741df0a 100644
--- a/src/main/java/net/onrc/onos/datastore/ramcloud/RCTable.java
+++ b/src/main/java/net/onrc/onos/datastore/ramcloud/RCTable.java
@@ -17,42 +17,42 @@
private static final Logger log = LoggerFactory.getLogger(RCTable.class);
public static class Entry implements IKVEntry {
- final byte[] key;
- byte[] value;
- long version;
+ final byte[] key;
+ byte[] value;
+ long version;
- public Entry(final byte[] key, final byte[] value, final long version) {
- this.key = key;
- this.setValue(value);
- this.setVersion(version);
- }
+ public Entry(final byte[] key, final byte[] value, final long version) {
+ this.key = key;
+ this.setValue(value);
+ this.setVersion(version);
+ }
- public Entry(final byte[] key) {
- this(key, null, RCClient.VERSION_NONEXISTENT);
- }
+ public Entry(final byte[] key) {
+ this(key, null, RCClient.VERSION_NONEXISTENT);
+ }
- @Override
- public byte[] getKey() {
- return key;
- }
+ @Override
+ public byte[] getKey() {
+ return key;
+ }
- @Override
- public byte[] getValue() {
- return value;
- }
+ @Override
+ public byte[] getValue() {
+ return value;
+ }
- @Override
- public long getVersion() {
- return version;
- }
+ @Override
+ public long getVersion() {
+ return version;
+ }
- void setValue(byte[] value) {
- this.value = value;
- }
+ void setValue(byte[] value) {
+ this.value = value;
+ }
- void setVersion(long version) {
- this.version = version;
- }
+ void setVersion(long version) {
+ this.version = version;
+ }
}
private final RCTableID rcTableId;
@@ -63,72 +63,72 @@
* @param rcTableName RAMCloud table name
*/
RCTable(final String rcTableName) {
- this.rcTableId = new RCTableID(rcTableName);
+ this.rcTableId = new RCTableID(rcTableName);
- // Trigger RAMCloud ID allocation. If lazy allocation is OK, remove.
- this.rcTableId.getTableID();
+ // Trigger RAMCloud ID allocation. If lazy allocation is OK, remove.
+ this.rcTableId.getTableID();
}
@Override
public IKVTableID getTableId() {
- return this.rcTableId;
+ return this.rcTableId;
}
public String getTableName() {
- return this.rcTableId.getTableName();
+ return this.rcTableId.getTableName();
}
@Override
public long create(final byte[] key, final byte[] value)
- throws ObjectExistsException {
+ throws ObjectExistsException {
- return RCClient.getClient().create(this.rcTableId, key, value);
+ return RCClient.getClient().create(this.rcTableId, key, value);
}
@Override
public long forceCreate(final byte[] key, final byte[] value) {
- return RCClient.getClient().forceCreate(rcTableId, key, value);
+ return RCClient.getClient().forceCreate(rcTableId, key, value);
}
@Override
public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
- return RCClient.getClient().read(rcTableId, key);
+ return RCClient.getClient().read(rcTableId, key);
}
@Override
public long update(final byte[] key, final byte[] value, final long version)
- throws ObjectDoesntExistException, WrongVersionException {
+ throws ObjectDoesntExistException, WrongVersionException {
- return RCClient.getClient().update(rcTableId, key, value, version);
+ return RCClient.getClient().update(rcTableId, key, value, version);
}
@Override
public long update(final byte[] key, final byte[] value)
- throws ObjectDoesntExistException {
+ throws ObjectDoesntExistException {
- return RCClient.getClient().update(rcTableId, key, value);
+ return RCClient.getClient().update(rcTableId, key, value);
}
@Override
public long delete(final byte[] key, final long version)
- throws ObjectDoesntExistException, WrongVersionException {
+ throws ObjectDoesntExistException, WrongVersionException {
- return RCClient.getClient().delete(rcTableId, key, version);
+ return RCClient.getClient().delete(rcTableId, key, version);
}
@Override
public long forceDelete(final byte[] key) {
- return RCClient.getClient().forceDelete(rcTableId, key);
+ return RCClient.getClient().forceDelete(rcTableId, key);
}
@Override
public Iterable<IKVEntry> getAllEntries() {
- return RCClient.getClient().getAllEntries(this.getTableId());
+ return RCClient.getClient().getAllEntries(this.getTableId());
}
@Override
public long VERSION_NONEXISTENT() {
- return RCClient.VERSION_NONEXISTENT;
+ return RCClient.VERSION_NONEXISTENT;
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/ramcloud/RCTableID.java b/src/main/java/net/onrc/onos/datastore/ramcloud/RCTableID.java
index 5fa56c2..0c2f6b0 100644
--- a/src/main/java/net/onrc/onos/datastore/ramcloud/RCTableID.java
+++ b/src/main/java/net/onrc/onos/datastore/ramcloud/RCTableID.java
@@ -9,49 +9,49 @@
private long tableID;
public RCTableID(String tableName) {
- this.tableName = tableName;
- this.tableID = 0;
+ this.tableName = tableName;
+ this.tableID = 0;
}
@Override
public String getTableName() {
- return tableName;
+ return tableName;
}
// following is RAMCloud specific
public long getTableID() {
- if ( tableID != 0) {
- return tableID;
- }
- tableID = RCClient.getJRamCloudClient().createTable(tableName);
- return tableID;
+ if ( tableID != 0) {
+ return tableID;
+ }
+ tableID = RCClient.getJRamCloudClient().createTable(tableName);
+ return tableID;
}
void resetTableID() {
- this.tableID = 0;
+ this.tableID = 0;
}
@Override
public String toString() {
- return "["+tableName + "]@" + getTableID();
+ return "["+tableName + "]@" + getTableID();
}
@Override
public int hashCode() {
- return Objects.hash(tableName, getTableID());
+ return Objects.hash(tableName, getTableID());
}
@Override
public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- RCTableID other = (RCTableID) obj;
- return Objects.equals(tableName, other.tableName)
- && Objects.equals(getTableID(), other.getTableID());
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ RCTableID other = (RCTableID) obj;
+ return Objects.equals(tableName, other.tableName)
+ && Objects.equals(getTableID(), other.getTableID());
}
}
\ No newline at end of file
diff --git a/src/main/java/net/onrc/onos/datastore/topology/KVDevice.java b/src/main/java/net/onrc/onos/datastore/topology/KVDevice.java
index 105f6f7..e076c14 100644
--- a/src/main/java/net/onrc/onos/datastore/topology/KVDevice.java
+++ b/src/main/java/net/onrc/onos/datastore/topology/KVDevice.java
@@ -32,18 +32,18 @@
private static final Logger log = LoggerFactory.getLogger(KVDevice.class);
private static final ThreadLocal<Kryo> deviceKryo = new ThreadLocal<Kryo>() {
- @Override
- protected Kryo initialValue() {
- Kryo kryo = new Kryo();
- kryo.setRegistrationRequired(true);
- kryo.setReferences(false);
- kryo.register(byte[].class);
- kryo.register(byte[][].class);
- kryo.register(HashMap.class);
- // TODO check if we should explicitly specify EnumSerializer
- kryo.register(STATUS.class);
- return kryo;
- }
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
};
public static final String GLOBAL_DEVICE_TABLE_NAME = "G:Device";
@@ -58,25 +58,25 @@
// Assuming mac is unique cluster-wide
public static byte[] getDeviceID(final byte[] mac) {
- return DeviceEvent.getDeviceID(mac).array();
+ return DeviceEvent.getDeviceID(mac).array();
}
public static byte[] getMacFromKey(final byte[] key) {
- ByteBuffer keyBuf = ByteBuffer.wrap(key);
- if (keyBuf.getChar() != 'D') {
- throw new IllegalArgumentException("Invalid Device key");
- }
- byte[] mac = new byte[keyBuf.remaining()];
- keyBuf.get(mac);
- return mac;
+ ByteBuffer keyBuf = ByteBuffer.wrap(key);
+ if (keyBuf.getChar() != 'D') {
+ throw new IllegalArgumentException("Invalid Device key");
+ }
+ byte[] mac = new byte[keyBuf.remaining()];
+ keyBuf.get(mac);
+ return mac;
}
public KVDevice(final byte[] mac) {
- super(DataStoreClient.getClient().getTable(GLOBAL_DEVICE_TABLE_NAME), getDeviceID(mac));
+ super(DataStoreClient.getClient().getTable(GLOBAL_DEVICE_TABLE_NAME), getDeviceID(mac));
- this.mac = mac;
- this.portIds = new TreeSet<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
- this.isPortIdsModified = true;
+ this.mac = mac;
+ this.portIds = new TreeSet<>(ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ this.isPortIdsModified = true;
}
/**
@@ -87,62 +87,62 @@
* @return
*/
public static KVDevice createFromKey(final byte[] key) {
- return new KVDevice(getMacFromKey(key));
+ return new KVDevice(getMacFromKey(key));
}
public static Iterable<KVDevice> getAllDevices() {
- return new DeviceEnumerator();
+ return new DeviceEnumerator();
}
public static class DeviceEnumerator implements Iterable<KVDevice> {
- @Override
- public Iterator<KVDevice> iterator() {
- return new DeviceIterator();
- }
+ @Override
+ public Iterator<KVDevice> iterator() {
+ return new DeviceIterator();
+ }
}
public static class DeviceIterator extends AbstractObjectIterator<KVDevice> {
- public DeviceIterator() {
- super(DataStoreClient.getClient().getTable(GLOBAL_DEVICE_TABLE_NAME));
- }
+ public DeviceIterator() {
+ super(DataStoreClient.getClient().getTable(GLOBAL_DEVICE_TABLE_NAME));
+ }
- @Override
- public KVDevice next() {
- IKVEntry o = enumerator.next();
- KVDevice e = KVDevice.createFromKey(o.getKey());
- e.deserialize(o.getValue(), o.getVersion());
- return e;
- }
+ @Override
+ public KVDevice next() {
+ IKVEntry o = enumerator.next();
+ KVDevice e = KVDevice.createFromKey(o.getKey());
+ e.deserialize(o.getValue(), o.getVersion());
+ return e;
+ }
}
public byte[] getMac() {
- // TODO may need to clone() to be sure this object will be immutable.
- return mac;
+ // TODO may need to clone() to be sure this object will be immutable.
+ return mac;
}
public byte[] getId() {
- return getKey();
+ return getKey();
}
public void addPortId(final byte[] portId) {
- // TODO: Should we copy portId, or reference is OK.
- isPortIdsModified |= portIds.add(portId);
+ // TODO: Should we copy portId, or reference is OK.
+ isPortIdsModified |= portIds.add(portId);
}
public void removePortId(final byte[] portId) {
- isPortIdsModified |= portIds.remove(portId);
+ isPortIdsModified |= portIds.remove(portId);
}
public void emptyPortIds() {
- portIds.clear();
- this.isPortIdsModified = true;
+ portIds.clear();
+ this.isPortIdsModified = true;
}
public void addAllToPortIds(final Collection<byte[]> portIds) {
- // TODO: Should we copy portId, or reference is OK.
- isPortIdsModified |= this.portIds.addAll(portIds);
+ // TODO: Should we copy portId, or reference is OK.
+ isPortIdsModified |= this.portIds.addAll(portIds);
}
/**
@@ -150,53 +150,53 @@
* @return Unmodifiable Set view of all the PortIds;
*/
public Set<byte[]> getAllPortIds() {
- return Collections.unmodifiableSet(portIds);
+ return Collections.unmodifiableSet(portIds);
}
@Override
public byte[] serialize() {
- Map<Object, Object> map = getPropertyMap();
+ Map<Object, Object> map = getPropertyMap();
- map.put(PROP_MAC, mac);
- if (isPortIdsModified) {
- byte[][] portIdArray = new byte[portIds.size()][];
- map.put(PROP_PORT_IDS, portIds.toArray(portIdArray));
- isPortIdsModified = false;
- }
+ map.put(PROP_MAC, mac);
+ if (isPortIdsModified) {
+ byte[][] portIdArray = new byte[portIds.size()][];
+ map.put(PROP_PORT_IDS, portIds.toArray(portIdArray));
+ isPortIdsModified = false;
+ }
- return serializePropertyMap(deviceKryo.get(), map);
+ return serializePropertyMap(deviceKryo.get(), map);
}
@Override
protected boolean deserialize(final byte[] bytes) {
- boolean success = deserializePropertyMap(deviceKryo.get(), bytes);
- if (!success) {
- log.error("Deserializing Link: " + this + " failed.");
- return false;
- }
- Map<Object, Object> map = this.getPropertyMap();
+ boolean success = deserializePropertyMap(deviceKryo.get(), bytes);
+ if (!success) {
+ log.error("Deserializing Link: " + this + " failed.");
+ return false;
+ }
+ Map<Object, Object> map = this.getPropertyMap();
- if (this.portIds == null) {
- this.portIds = new TreeSet<>(
- ByteArrayComparator.BYTEARRAY_COMPARATOR);
- }
- byte[][] portIdArray = (byte[][]) map.get(PROP_PORT_IDS);
- if (portIdArray != null) {
- this.portIds.clear();
- this.portIds.addAll(Arrays.asList(portIdArray));
- isPortIdsModified = false;
- } else {
- // trigger write on next serialize
- isPortIdsModified = true;
- }
+ if (this.portIds == null) {
+ this.portIds = new TreeSet<>(
+ ByteArrayComparator.BYTEARRAY_COMPARATOR);
+ }
+ byte[][] portIdArray = (byte[][]) map.get(PROP_PORT_IDS);
+ if (portIdArray != null) {
+ this.portIds.clear();
+ this.portIds.addAll(Arrays.asList(portIdArray));
+ isPortIdsModified = false;
+ } else {
+ // trigger write on next serialize
+ isPortIdsModified = true;
+ }
- return success;
+ return success;
}
@Override
public String toString() {
- // TODO output all properties?
- return "[" + this.getClass().getSimpleName()
- + " " + ByteArrayUtil.toHexStringBuffer(mac, ":") + "]";
+ // TODO output all properties?
+ return "[" + this.getClass().getSimpleName()
+ + " " + ByteArrayUtil.toHexStringBuffer(mac, ":") + "]";
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/KVLink.java b/src/main/java/net/onrc/onos/datastore/topology/KVLink.java
index c9273ae..5fbf070 100644
--- a/src/main/java/net/onrc/onos/datastore/topology/KVLink.java
+++ b/src/main/java/net/onrc/onos/datastore/topology/KVLink.java
@@ -26,41 +26,41 @@
private static final Logger log = LoggerFactory.getLogger(KVLink.class);
private static final ThreadLocal<Kryo> linkKryo = new ThreadLocal<Kryo>() {
- @Override
- protected Kryo initialValue() {
- Kryo kryo = new Kryo();
- kryo.setRegistrationRequired(true);
- kryo.setReferences(false);
- kryo.register(byte[].class);
- kryo.register(byte[][].class);
- kryo.register(HashMap.class);
- // TODO check if we should explicitly specify EnumSerializer
- kryo.register(STATUS.class);
- return kryo;
- }
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
};
public static class SwitchPort {
- public final Long dpid;
- public final Long number;
+ public final Long dpid;
+ public final Long number;
- public SwitchPort(final Long dpid, final Long number) {
- this.dpid = dpid;
- this.number = number;
- }
+ public SwitchPort(final Long dpid, final Long number) {
+ this.dpid = dpid;
+ this.number = number;
+ }
- public byte[] getPortID() {
- return KVPort.getPortID(dpid, number);
- }
+ public byte[] getPortID() {
+ return KVPort.getPortID(dpid, number);
+ }
- public byte[] getSwitchID() {
- return KVSwitch.getSwitchID(dpid);
- }
+ public byte[] getSwitchID() {
+ return KVSwitch.getSwitchID(dpid);
+ }
- @Override
- public String toString() {
- return "(" + Long.toHexString(dpid) + "@" + number + ")";
- }
+ @Override
+ public String toString() {
+ return "(" + Long.toHexString(dpid) + "@" + number + ")";
+ }
}
@@ -68,7 +68,7 @@
// must not re-order enum members, ordinal will be sent over wire
public enum STATUS {
- INACTIVE, ACTIVE;
+ INACTIVE, ACTIVE;
}
private final SwitchPort src;
@@ -76,40 +76,40 @@
private STATUS status;
public static byte[] getLinkID(final Long src_dpid, final Long src_port_no,
- final Long dst_dpid, final Long dst_port_no) {
- return LinkEvent.getLinkID(src_dpid, src_port_no, dst_dpid,
- dst_port_no).array();
+ final Long dst_dpid, final Long dst_port_no) {
+ return LinkEvent.getLinkID(src_dpid, src_port_no, dst_dpid,
+ dst_port_no).array();
}
public static long[] getLinkTupleFromKey(final byte[] key) {
- return getLinkTupleFromKey(ByteBuffer.wrap(key));
+ return getLinkTupleFromKey(ByteBuffer.wrap(key));
}
public static long[] getLinkTupleFromKey(final ByteBuffer keyBuf) {
- long[] tuple = new long[4];
- if (keyBuf.getChar() != 'L') {
- throw new IllegalArgumentException("Invalid Link key");
- }
- long[] src_port_pair = KVPort.getPortPairFromKey(keyBuf.slice());
- keyBuf.position(2 + PortEvent.PORTID_BYTES);
- long[] dst_port_pair = KVPort.getPortPairFromKey(keyBuf.slice());
+ long[] tuple = new long[4];
+ if (keyBuf.getChar() != 'L') {
+ throw new IllegalArgumentException("Invalid Link key");
+ }
+ long[] src_port_pair = KVPort.getPortPairFromKey(keyBuf.slice());
+ keyBuf.position(2 + PortEvent.PORTID_BYTES);
+ long[] dst_port_pair = KVPort.getPortPairFromKey(keyBuf.slice());
- tuple[0] = src_port_pair[0];
- tuple[1] = src_port_pair[1];
- tuple[2] = dst_port_pair[0];
- tuple[3] = dst_port_pair[1];
+ tuple[0] = src_port_pair[0];
+ tuple[1] = src_port_pair[1];
+ tuple[2] = dst_port_pair[0];
+ tuple[3] = dst_port_pair[1];
- return tuple;
+ return tuple;
}
public KVLink(final Long src_dpid, final Long src_port_no,
- final Long dst_dpid, final Long dst_port_no) {
- super(DataStoreClient.getClient().getTable(GLOBAL_LINK_TABLE_NAME), getLinkID(src_dpid,
- src_port_no, dst_dpid, dst_port_no));
+ final Long dst_dpid, final Long dst_port_no) {
+ super(DataStoreClient.getClient().getTable(GLOBAL_LINK_TABLE_NAME), getLinkID(src_dpid,
+ src_port_no, dst_dpid, dst_port_no));
- src = new SwitchPort(src_dpid, src_port_no);
- dst = new SwitchPort(dst_dpid, dst_port_no);
- status = STATUS.INACTIVE;
+ src = new SwitchPort(src_dpid, src_port_no);
+ dst = new SwitchPort(dst_dpid, dst_port_no);
+ status = STATUS.INACTIVE;
}
/**
@@ -120,98 +120,98 @@
* @return KVLink instance
*/
public static KVLink createFromKey(final byte[] key) {
- long[] linkTuple = getLinkTupleFromKey(key);
- return new KVLink(linkTuple[0], linkTuple[1], linkTuple[2],
- linkTuple[3]);
+ long[] linkTuple = getLinkTupleFromKey(key);
+ return new KVLink(linkTuple[0], linkTuple[1], linkTuple[2],
+ linkTuple[3]);
}
public static Iterable<KVLink> getAllLinks() {
- return new LinkEnumerator();
+ return new LinkEnumerator();
}
public static class LinkEnumerator implements Iterable<KVLink> {
- @Override
- public Iterator<KVLink> iterator() {
- return new LinkIterator();
- }
+ @Override
+ public Iterator<KVLink> iterator() {
+ return new LinkIterator();
+ }
}
public static class LinkIterator extends AbstractObjectIterator<KVLink> {
- public LinkIterator() {
- super(DataStoreClient.getClient().getTable(GLOBAL_LINK_TABLE_NAME));
- }
+ public LinkIterator() {
+ super(DataStoreClient.getClient().getTable(GLOBAL_LINK_TABLE_NAME));
+ }
- @Override
- public KVLink next() {
- IKVEntry o = enumerator.next();
- KVLink e = KVLink.createFromKey(o.getKey());
- e.deserialize(o.getValue(), o.getVersion());
- return e;
- }
+ @Override
+ public KVLink next() {
+ IKVEntry o = enumerator.next();
+ KVLink e = KVLink.createFromKey(o.getKey());
+ e.deserialize(o.getValue(), o.getVersion());
+ return e;
+ }
}
public STATUS getStatus() {
- return status;
+ return status;
}
public void setStatus(final STATUS status) {
- this.status = status;
+ this.status = status;
}
public SwitchPort getSrc() {
- return src;
+ return src;
}
public SwitchPort getDst() {
- return dst;
+ return dst;
}
public byte[] getId() {
- return getKey();
+ return getKey();
}
@Override
public byte[] serialize() {
- Map<Object, Object> map = getPropertyMap();
+ Map<Object, Object> map = getPropertyMap();
- LinkProperty.Builder link = LinkProperty.newBuilder();
- link.setSrcSwId(ByteString.copyFrom(src.getSwitchID()));
- link.setSrcPortId(ByteString.copyFrom(src.getPortID()));
- link.setDstSwId(ByteString.copyFrom(dst.getSwitchID()));
- link.setDstPortId(ByteString.copyFrom(dst.getPortID()));
- link.setStatus(status.ordinal());
+ LinkProperty.Builder link = LinkProperty.newBuilder();
+ link.setSrcSwId(ByteString.copyFrom(src.getSwitchID()));
+ link.setSrcPortId(ByteString.copyFrom(src.getPortID()));
+ link.setDstSwId(ByteString.copyFrom(dst.getSwitchID()));
+ link.setDstPortId(ByteString.copyFrom(dst.getPortID()));
+ link.setStatus(status.ordinal());
- if (!map.isEmpty()) {
- byte[] propMaps = serializePropertyMap(linkKryo.get(), map);
- link.setValue(ByteString.copyFrom(propMaps));
- }
+ if (!map.isEmpty()) {
+ byte[] propMaps = serializePropertyMap(linkKryo.get(), map);
+ link.setValue(ByteString.copyFrom(propMaps));
+ }
- return link.build().toByteArray();
+ return link.build().toByteArray();
}
@Override
protected boolean deserialize(final byte[] bytes) {
- try {
- boolean success = true;
+ try {
+ boolean success = true;
- LinkProperty link = LinkProperty.parseFrom(bytes);
- byte[] props = link.getValue().toByteArray();
- success &= deserializePropertyMap(linkKryo.get(), props);
- this.status = STATUS.values()[link.getStatus()];
+ LinkProperty link = LinkProperty.parseFrom(bytes);
+ byte[] props = link.getValue().toByteArray();
+ success &= deserializePropertyMap(linkKryo.get(), props);
+ this.status = STATUS.values()[link.getStatus()];
- return success;
- } catch (InvalidProtocolBufferException e) {
- log.error("Deserializing Link: " + this + " failed.", e);
- return false;
- }
+ return success;
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Deserializing Link: " + this + " failed.", e);
+ return false;
+ }
}
@Override
public String toString() {
- // TODO output all properties?
- return "[" + this.getClass().getSimpleName()
- + " " + src + "->" + dst + " STATUS:" + status + "]";
+ // TODO output all properties?
+ return "[" + this.getClass().getSimpleName()
+ + " " + src + "->" + dst + " STATUS:" + status + "]";
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/KVPort.java b/src/main/java/net/onrc/onos/datastore/topology/KVPort.java
index 3b66864..452bf94 100644
--- a/src/main/java/net/onrc/onos/datastore/topology/KVPort.java
+++ b/src/main/java/net/onrc/onos/datastore/topology/KVPort.java
@@ -30,25 +30,25 @@
private static final Logger log = LoggerFactory.getLogger(KVPort.class);
private static final ThreadLocal<Kryo> portKryo = new ThreadLocal<Kryo>() {
- @Override
- protected Kryo initialValue() {
- Kryo kryo = new Kryo();
- kryo.setRegistrationRequired(true);
- kryo.setReferences(false);
- kryo.register(byte[].class);
- kryo.register(byte[][].class);
- kryo.register(HashMap.class);
- // TODO check if we should explicitly specify EnumSerializer
- kryo.register(STATUS.class);
- return kryo;
- }
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
};
public static final String GLOBAL_PORT_TABLE_NAME = "G:Port";
// must not re-order enum members, ordinal will be sent over wire
public enum STATUS {
- INACTIVE, ACTIVE;
+ INACTIVE, ACTIVE;
}
private final Long dpid;
@@ -57,49 +57,49 @@
private STATUS status;
public static byte[] getPortID(final Long dpid, final Long number) {
- return PortEvent.getPortID(dpid, number).array();
+ return PortEvent.getPortID(dpid, number).array();
}
public static long[] getPortPairFromKey(final byte[] key) {
- return getPortPairFromKey(ByteBuffer.wrap(key));
+ return getPortPairFromKey(ByteBuffer.wrap(key));
}
public static long[] getPortPairFromKey(final ByteBuffer keyBuf) {
- long[] pair = new long[2];
- if (keyBuf.getChar() != 'S') {
- throw new IllegalArgumentException("Invalid Port key:" + keyBuf
- + " "
- + ByteArrayUtil.toHexStringBuffer(keyBuf.array(), ":"));
- }
- pair[0] = keyBuf.getLong();
- if (keyBuf.getChar() != 'P') {
- throw new IllegalArgumentException("Invalid Port key:" + keyBuf
- + " "
- + ByteArrayUtil.toHexStringBuffer(keyBuf.array(), ":"));
- }
- pair[1] = keyBuf.getLong();
- return pair;
+ long[] pair = new long[2];
+ if (keyBuf.getChar() != 'S') {
+ throw new IllegalArgumentException("Invalid Port key:" + keyBuf
+ + " "
+ + ByteArrayUtil.toHexStringBuffer(keyBuf.array(), ":"));
+ }
+ pair[0] = keyBuf.getLong();
+ if (keyBuf.getChar() != 'P') {
+ throw new IllegalArgumentException("Invalid Port key:" + keyBuf
+ + " "
+ + ByteArrayUtil.toHexStringBuffer(keyBuf.array(), ":"));
+ }
+ pair[1] = keyBuf.getLong();
+ return pair;
}
public static long getDpidFromKey(final byte[] key) {
- return getPortPairFromKey(key)[0];
+ return getPortPairFromKey(key)[0];
}
public static long getNumberFromKey(final byte[] key) {
- return getPortPairFromKey(key)[1];
+ return getPortPairFromKey(key)[1];
}
// FIXME specify DPID,number here, or Should caller specify the key it self?
// In other words, should layer above have the control of the ID?
public KVPort(final Long dpid, final Long number) {
- super(DataStoreClient.getClient().getTable(GLOBAL_PORT_TABLE_NAME), getPortID(dpid, number));
+ super(DataStoreClient.getClient().getTable(GLOBAL_PORT_TABLE_NAME), getPortID(dpid, number));
- // TODO Auto-generated constructor stub
+ // TODO Auto-generated constructor stub
- this.dpid = dpid;
- this.number = number;
- this.status = STATUS.INACTIVE;
+ this.dpid = dpid;
+ this.number = number;
+ this.status = STATUS.INACTIVE;
}
/**
@@ -110,96 +110,96 @@
* @return KVPort instance
*/
public static KVPort createFromKey(final byte[] key) {
- long[] pair = getPortPairFromKey(key);
- return new KVPort(pair[0], pair[1]);
+ long[] pair = getPortPairFromKey(key);
+ return new KVPort(pair[0], pair[1]);
}
public static Iterable<KVPort> getAllPorts() {
- return new PortEnumerator();
+ return new PortEnumerator();
}
public static class PortEnumerator implements Iterable<KVPort> {
- @Override
- public Iterator<KVPort> iterator() {
- return new PortIterator();
- }
+ @Override
+ public Iterator<KVPort> iterator() {
+ return new PortIterator();
+ }
}
public static class PortIterator extends AbstractObjectIterator<KVPort> {
- public PortIterator() {
- super(DataStoreClient.getClient().getTable(GLOBAL_PORT_TABLE_NAME));
- }
+ public PortIterator() {
+ super(DataStoreClient.getClient().getTable(GLOBAL_PORT_TABLE_NAME));
+ }
- @Override
- public KVPort next() {
- IKVEntry o = enumerator.next();
- KVPort e = KVPort.createFromKey(o.getKey());
- e.deserialize(o.getValue(), o.getVersion());
- return e;
- }
+ @Override
+ public KVPort next() {
+ IKVEntry o = enumerator.next();
+ KVPort e = KVPort.createFromKey(o.getKey());
+ e.deserialize(o.getValue(), o.getVersion());
+ return e;
+ }
}
public STATUS getStatus() {
- return status;
+ return status;
}
public void setStatus(final STATUS status) {
- this.status = status;
+ this.status = status;
}
public Long getDpid() {
- return dpid;
+ return dpid;
}
public Long getNumber() {
- return number;
+ return number;
}
public byte[] getId() {
- return getKey();
+ return getKey();
}
@Override
public byte[] serialize() {
- Map<Object, Object> map = getPropertyMap();
+ Map<Object, Object> map = getPropertyMap();
- PortProperty.Builder port = PortProperty.newBuilder();
- port.setDpid(dpid);
- port.setNumber(number);
- port.setStatus(status.ordinal());
+ PortProperty.Builder port = PortProperty.newBuilder();
+ port.setDpid(dpid);
+ port.setNumber(number);
+ port.setStatus(status.ordinal());
- if (!map.isEmpty()) {
- byte[] propMaps = serializePropertyMap(portKryo.get(), map);
- port.setValue(ByteString.copyFrom(propMaps));
- }
+ if (!map.isEmpty()) {
+ byte[] propMaps = serializePropertyMap(portKryo.get(), map);
+ port.setValue(ByteString.copyFrom(propMaps));
+ }
- return port.build().toByteArray();
+ return port.build().toByteArray();
}
@Override
protected boolean deserialize(final byte[] bytes) {
- try {
- boolean success = true;
+ try {
+ boolean success = true;
- PortProperty port = PortProperty.parseFrom(bytes);
- byte[] props = port.getValue().toByteArray();
- success &= deserializePropertyMap(portKryo.get(), props);
- this.status = STATUS.values()[port.getStatus()];
+ PortProperty port = PortProperty.parseFrom(bytes);
+ byte[] props = port.getValue().toByteArray();
+ success &= deserializePropertyMap(portKryo.get(), props);
+ this.status = STATUS.values()[port.getStatus()];
- return success;
- } catch (InvalidProtocolBufferException e) {
- log.error("Deserializing Port: " + this + " failed.", e);
- return false;
- }
+ return success;
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Deserializing Port: " + this + " failed.", e);
+ return false;
+ }
}
@Override
public String toString() {
- // TODO output all properties?
- return "[" + this.getClass().getSimpleName()
- + " 0x" + Long.toHexString(dpid) + "@" + number
- + " STATUS:" + status + "]";
+ // TODO output all properties?
+ return "[" + this.getClass().getSimpleName()
+ + " 0x" + Long.toHexString(dpid) + "@" + number
+ + " STATUS:" + status + "]";
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/topology/KVSwitch.java b/src/main/java/net/onrc/onos/datastore/topology/KVSwitch.java
index e915160..5f1c8fd 100644
--- a/src/main/java/net/onrc/onos/datastore/topology/KVSwitch.java
+++ b/src/main/java/net/onrc/onos/datastore/topology/KVSwitch.java
@@ -29,52 +29,52 @@
private static final Logger log = LoggerFactory.getLogger(KVSwitch.class);
private static final ThreadLocal<Kryo> switchKryo = new ThreadLocal<Kryo>() {
- @Override
- protected Kryo initialValue() {
- Kryo kryo = new Kryo();
- kryo.setRegistrationRequired(true);
- kryo.setReferences(false);
- kryo.register(byte[].class);
- kryo.register(byte[][].class);
- kryo.register(HashMap.class);
- // TODO check if we should explicitly specify EnumSerializer
- kryo.register(STATUS.class);
- return kryo;
- }
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.register(byte[].class);
+ kryo.register(byte[][].class);
+ kryo.register(HashMap.class);
+ // TODO check if we should explicitly specify EnumSerializer
+ kryo.register(STATUS.class);
+ return kryo;
+ }
};
public static final String GLOBAL_SWITCH_TABLE_NAME = "G:Switch";
// must not re-order enum members, ordinal will be sent over wire
public enum STATUS {
- INACTIVE, ACTIVE;
+ INACTIVE, ACTIVE;
}
private final Long dpid;
private STATUS status;
public static byte[] getSwitchID(final Long dpid) {
- return SwitchEvent.getSwitchID(dpid).array();
+ return SwitchEvent.getSwitchID(dpid).array();
}
public static long getDpidFromKey(final byte[] key) {
- return getDpidFromKey(ByteBuffer.wrap(key));
+ return getDpidFromKey(ByteBuffer.wrap(key));
}
public static long getDpidFromKey(final ByteBuffer keyBuf) {
- if (keyBuf.getChar() != 'S') {
- throw new IllegalArgumentException("Invalid Switch key");
- }
- return keyBuf.getLong();
+ if (keyBuf.getChar() != 'S') {
+ throw new IllegalArgumentException("Invalid Switch key");
+ }
+ return keyBuf.getLong();
}
// FIXME specify DPID here, or Should caller specify the key it self?
// In other words, should layer above have the control of the ID?
public KVSwitch(final Long dpid) {
- super(DataStoreClient.getClient().getTable(GLOBAL_SWITCH_TABLE_NAME), getSwitchID(dpid));
+ super(DataStoreClient.getClient().getTable(GLOBAL_SWITCH_TABLE_NAME), getSwitchID(dpid));
- this.dpid = dpid;
- this.status = STATUS.INACTIVE;
+ this.dpid = dpid;
+ this.status = STATUS.INACTIVE;
}
/**
@@ -85,90 +85,90 @@
* @return KVSwitch instance
*/
public static KVSwitch createFromKey(final byte[] key) {
- return new KVSwitch(getDpidFromKey(key));
+ return new KVSwitch(getDpidFromKey(key));
}
public static Iterable<KVSwitch> getAllSwitches() {
- return new SwitchEnumerator();
+ return new SwitchEnumerator();
}
public static class SwitchEnumerator implements Iterable<KVSwitch> {
- @Override
- public Iterator<KVSwitch> iterator() {
- return new SwitchIterator();
- }
+ @Override
+ public Iterator<KVSwitch> iterator() {
+ return new SwitchIterator();
+ }
}
public static class SwitchIterator extends AbstractObjectIterator<KVSwitch> {
- public SwitchIterator() {
- super(DataStoreClient.getClient().getTable(GLOBAL_SWITCH_TABLE_NAME));
- }
+ public SwitchIterator() {
+ super(DataStoreClient.getClient().getTable(GLOBAL_SWITCH_TABLE_NAME));
+ }
- @Override
- public KVSwitch next() {
- IKVEntry o = enumerator.next();
- KVSwitch e = KVSwitch.createFromKey(o.getKey());
- e.deserialize(o.getValue(), o.getVersion());
- return e;
- }
+ @Override
+ public KVSwitch next() {
+ IKVEntry o = enumerator.next();
+ KVSwitch e = KVSwitch.createFromKey(o.getKey());
+ e.deserialize(o.getValue(), o.getVersion());
+ return e;
+ }
}
public STATUS getStatus() {
- return status;
+ return status;
}
public void setStatus(final STATUS status) {
- this.status = status;
+ this.status = status;
}
public Long getDpid() {
- return dpid;
+ return dpid;
}
public byte[] getId() {
- return getKey();
+ return getKey();
}
@Override
public byte[] serialize() {
- Map<Object, Object> map = getPropertyMap();
+ Map<Object, Object> map = getPropertyMap();
- SwitchProperty.Builder sw = SwitchProperty.newBuilder();
- sw.setDpid(dpid);
- sw.setStatus(status.ordinal());
+ SwitchProperty.Builder sw = SwitchProperty.newBuilder();
+ sw.setDpid(dpid);
+ sw.setStatus(status.ordinal());
- if (!map.isEmpty()) {
- byte[] propMaps = serializePropertyMap(switchKryo.get(), map);
- sw.setValue(ByteString.copyFrom(propMaps));
- }
+ if (!map.isEmpty()) {
+ byte[] propMaps = serializePropertyMap(switchKryo.get(), map);
+ sw.setValue(ByteString.copyFrom(propMaps));
+ }
- return sw.build().toByteArray();
+ return sw.build().toByteArray();
}
@Override
protected boolean deserialize(final byte[] bytes) {
- try {
- boolean success = true;
+ try {
+ boolean success = true;
- SwitchProperty sw = SwitchProperty.parseFrom(bytes);
- byte[] props = sw.getValue().toByteArray();
- success &= deserializePropertyMap(switchKryo.get(), props);
- this.status = STATUS.values()[sw.getStatus()];
+ SwitchProperty sw = SwitchProperty.parseFrom(bytes);
+ byte[] props = sw.getValue().toByteArray();
+ success &= deserializePropertyMap(switchKryo.get(), props);
+ this.status = STATUS.values()[sw.getStatus()];
- return success;
- } catch (InvalidProtocolBufferException e) {
- log.error("Deserializing Switch: " + this + " failed.", e);
- return false;
- }
+ return success;
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Deserializing Switch: " + this + " failed.", e);
+ return false;
+ }
}
@Override
public String toString() {
- // TODO output all properties?
- return "[" + this.getClass().getSimpleName()
- + " 0x" + Long.toHexString(dpid) + " STATUS:" + status + "]";
+ // TODO output all properties?
+ return "[" + this.getClass().getSimpleName()
+ + " 0x" + Long.toHexString(dpid) + " STATUS:" + status + "]";
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java
index 56b088f..fbe3ebd 100644
--- a/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java
+++ b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java
@@ -17,8 +17,8 @@
@Override
public int compare(final byte[] o1, final byte[] o2) {
- final ByteBuffer b1 = ByteBuffer.wrap(o1);
- final ByteBuffer b2 = ByteBuffer.wrap(o2);
- return b1.compareTo(b2);
+ final ByteBuffer b1 = ByteBuffer.wrap(o1);
+ final ByteBuffer b2 = ByteBuffer.wrap(o2);
+ return b1.compareTo(b2);
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/utils/ByteArrayUtil.java b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayUtil.java
index c13d7ea..c7e3765 100644
--- a/src/main/java/net/onrc/onos/datastore/utils/ByteArrayUtil.java
+++ b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayUtil.java
@@ -17,8 +17,8 @@
* @return {@code bytes} converted to a StringBuffer
*/
public static StringBuffer toHexStringBuffer(final byte[] bytes,
- final String sep) {
- return toHexStringBuffer(bytes, sep, new StringBuffer());
+ final String sep) {
+ return toHexStringBuffer(bytes, sep, new StringBuffer());
}
/**
@@ -32,22 +32,22 @@
* @return {@code buf}
*/
public static StringBuffer toHexStringBuffer(final byte[] bytes,
- final String sep, final StringBuffer buf) {
- if (bytes == null) {
- return buf;
- }
+ final String sep, final StringBuffer buf) {
+ if (bytes == null) {
+ return buf;
+ }
- ByteBuffer wrap = ByteBuffer.wrap(bytes);
+ ByteBuffer wrap = ByteBuffer.wrap(bytes);
- boolean hasWritten = false;
- while (wrap.hasRemaining()) {
- if (hasWritten) {
- buf.append(sep);
- }
- buf.append(Integer.toHexString(wrap.get()));
- hasWritten = true;
- }
+ boolean hasWritten = false;
+ while (wrap.hasRemaining()) {
+ if (hasWritten) {
+ buf.append(sep);
+ }
+ buf.append(Integer.toHexString(wrap.get()));
+ hasWritten = true;
+ }
- return buf;
+ return buf;
}
}
diff --git a/src/main/java/net/onrc/onos/datastore/utils/KVObject.java b/src/main/java/net/onrc/onos/datastore/utils/KVObject.java
index 2aac0e3..b46c9f5 100644
--- a/src/main/java/net/onrc/onos/datastore/utils/KVObject.java
+++ b/src/main/java/net/onrc/onos/datastore/utils/KVObject.java
@@ -36,13 +36,13 @@
// each sub-class should prepare their own serializer, which has required
// objects registered for better performance.
private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
- @Override
- protected Kryo initialValue() {
- Kryo kryo = new Kryo();
- // kryo.setRegistrationRequired(true);
- // kryo.setReferences(false);
- return kryo;
- }
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ // kryo.setRegistrationRequired(true);
+ // kryo.setReferences(false);
+ return kryo;
+ }
};
private final IKVTable table;
@@ -60,46 +60,46 @@
private Map<Object, Object> propertyMap;
public KVObject(final IKVTable table, final byte[] key) {
- this(table, key, null, table.VERSION_NONEXISTENT());
+ this(table, key, null, table.VERSION_NONEXISTENT());
}
public KVObject(final IKVTable table, final byte[] key, final byte[] value, final long version) {
- if (table == null) {
- throw new IllegalArgumentException("table cannot be null");
- }
- if (key == null) {
- throw new IllegalArgumentException("key cannot be null");
- }
- this.table = table;
- this.key = key;
- this.version = version;
- this.propertyMap = new HashMap<Object, Object>();
+ if (table == null) {
+ throw new IllegalArgumentException("table cannot be null");
+ }
+ if (key == null) {
+ throw new IllegalArgumentException("key cannot be null");
+ }
+ this.table = table;
+ this.key = key;
+ this.version = version;
+ this.propertyMap = new HashMap<Object, Object>();
- if (value != null) {
- deserialize(value);
- }
+ if (value != null) {
+ deserialize(value);
+ }
}
protected static KVObject createFromKey(final byte[] key) {
- // Equivalent of this method is expected to be implemented by SubClasses
- throw new UnsupportedOperationException(
- "createFromKey() is not expected to be called for RCObject");
+ // Equivalent of this method is expected to be implemented by SubClasses
+ throw new UnsupportedOperationException(
+ "createFromKey() is not expected to be called for RCObject");
}
public IKVTable getTable() {
- return table;
+ return table;
}
public IKVTableID getTableId() {
- return table.getTableId();
+ return table.getTableId();
}
public byte[] getKey() {
- return key;
+ return key;
}
public long getVersion() {
- return version;
+ return version;
}
/**
@@ -109,13 +109,13 @@
* @return Will return null, if never been set, or was not deserialized
*/
protected Map<Object, Object> getPropertyMap() {
- return this.propertyMap;
+ return this.propertyMap;
}
protected Map<Object, Object> replacePropertyMap(final Map<Object, Object> newMap) {
- Map<Object, Object> oldMap = this.propertyMap;
- this.propertyMap = newMap;
- return oldMap;
+ Map<Object, Object> oldMap = this.propertyMap;
+ this.propertyMap = newMap;
+ return oldMap;
}
/**
@@ -126,18 +126,18 @@
* @return serialized byte array
*/
public byte[] serialize() {
- return serializePropertyMap(defaultKryo.get(), this.propertyMap);
+ return serializePropertyMap(defaultKryo.get(), this.propertyMap);
}
protected byte[] serializePropertyMap(final Kryo kryo,
- final Map<Object, Object> propMap) {
+ final Map<Object, Object> propMap) {
- // value
- byte[] rcTemp = new byte[1024 * 1024];
- Output output = new Output(rcTemp);
- kryo.writeObject(output, propMap);
- return output.toBytes();
+ // value
+ byte[] rcTemp = new byte[1024 * 1024];
+ Output output = new Output(rcTemp);
+ kryo.writeObject(output, propMap);
+ return output.toBytes();
}
@@ -149,8 +149,8 @@
* @return true if success
*/
public boolean deserialize(final byte[] bytes, final long version) {
- this.version = version;
- return deserialize(bytes);
+ this.version = version;
+ return deserialize(bytes);
}
/**
@@ -162,8 +162,8 @@
* @return true if success
*/
protected boolean deserialize(final byte[] bytes) {
- deserializePropertyMap(defaultKryo.get(), bytes);
- return true;
+ deserializePropertyMap(defaultKryo.get(), bytes);
+ return true;
}
/**
@@ -173,26 +173,26 @@
* @return true if success
*/
protected boolean deserializePropertyMap(final Kryo kryo, final byte[] bytes) {
- @SuppressWarnings("unchecked")
- Map<Object, Object> map = deserializePropertyMap(kryo, bytes, HashMap.class);
- if (map == null) {
- map = new HashMap<>();
- }
- this.propertyMap = map;
- return true;
+ @SuppressWarnings("unchecked")
+ Map<Object, Object> map = deserializePropertyMap(kryo, bytes, HashMap.class);
+ if (map == null) {
+ map = new HashMap<>();
+ }
+ this.propertyMap = map;
+ return true;
}
protected <T extends Map<?, ?>> T deserializePropertyMap(final Kryo kryo,
- final byte[] bytes, final Class<T> type) {
+ final byte[] bytes, final Class<T> type) {
- if (bytes == null || bytes.length == 0) {
- return null;
- }
+ if (bytes == null || bytes.length == 0) {
+ return null;
+ }
- Input input = new Input(bytes);
- T map = kryo.readObject(input, type);
+ Input input = new Input(bytes);
+ T map = kryo.readObject(input, type);
- return map;
+ return map;
}
@@ -205,22 +205,22 @@
*/
public void create() throws ObjectExistsException {
- if (this.propertyMap == null) {
- log.warn("No object map was set. Setting empty Map.");
- replacePropertyMap(new HashMap<Object, Object>());
- }
+ if (this.propertyMap == null) {
+ log.warn("No object map was set. Setting empty Map.");
+ replacePropertyMap(new HashMap<Object, Object>());
+ }
- this.version = table.create(key, this.serialize());
+ this.version = table.create(key, this.serialize());
}
public void forceCreate() {
- if (this.propertyMap == null) {
- log.warn("No object map was set. Setting empty Map.");
- replacePropertyMap(new HashMap<Object, Object>());
- }
+ if (this.propertyMap == null) {
+ log.warn("No object map was set. Setting empty Map.");
+ replacePropertyMap(new HashMap<Object, Object>());
+ }
- this.version = table.forceCreate(key, this.serialize());
+ this.version = table.forceCreate(key, this.serialize());
}
/**
@@ -232,8 +232,8 @@
*
*/
public void read() throws ObjectDoesntExistException {
- IKVEntry e = table.read(key);
- deserialize(e.getValue(), e.getVersion());
+ IKVEntry e = table.read(key);
+ deserialize(e.getValue(), e.getVersion());
}
/**
@@ -246,11 +246,11 @@
*/
public void update() throws ObjectDoesntExistException,
WrongVersionException {
- if (this.propertyMap == null) {
- replacePropertyMap(new HashMap<Object, Object>());
- }
+ if (this.propertyMap == null) {
+ replacePropertyMap(new HashMap<Object, Object>());
+ }
- this.version = table.update(key, this.serialize(), version);
+ this.version = table.update(key, this.serialize(), version);
}
/**
@@ -263,36 +263,36 @@
*/
public void delete() throws ObjectDoesntExistException,
WrongVersionException {
- this.version = table.delete(key, this.version);
+ this.version = table.delete(key, this.version);
}
public void forceDelete() {
- this.version = table.forceDelete(key);
+ this.version = table.forceDelete(key);
}
public WriteOp forceCreateOp(IKVClient client) {
- return new WriteOp(client.forceCreateOp(getTableId(), getKey(), serialize()), this);
+ return new WriteOp(client.forceCreateOp(getTableId(), getKey(), serialize()), this);
}
public WriteOp createOp(IKVClient client) {
- return new WriteOp(client.createOp(getTableId(), getKey(), serialize()), this);
+ return new WriteOp(client.createOp(getTableId(), getKey(), serialize()), this);
}
// this might not be needed?
public WriteOp readOp(IKVClient client) {
- return new WriteOp(client.readOp(getTableId(), getKey()), this);
+ return new WriteOp(client.readOp(getTableId(), getKey()), this);
}
public WriteOp updateOp(IKVClient client) {
- return new WriteOp(client.updateOp(getTableId(), getKey(), serialize(), getVersion()), this);
+ return new WriteOp(client.updateOp(getTableId(), getKey(), serialize(), getVersion()), this);
}
public WriteOp deleteOp(IKVClient client) {
- return new WriteOp(client.deleteOp(getTableId(), getKey(), serialize(), getVersion()), this);
+ return new WriteOp(client.deleteOp(getTableId(), getKey(), serialize(), getVersion()), this);
}
public WriteOp forceDeleteOp(IKVClient client) {
- return new WriteOp(client.forceDeleteOp(getTableId(), getKey()), this);
+ return new WriteOp(client.forceDeleteOp(getTableId(), getKey()), this);
}
/**
@@ -306,32 +306,32 @@
*/
public static boolean multiRead(final List<? extends KVObject> objects) {
- final IKVClient client = DataStoreClient.getClient();
+ final IKVClient client = DataStoreClient.getClient();
- final ArrayList<IMultiEntryOperation> readOps = new ArrayList<>(objects.size());
- for (KVObject o : objects) {
- readOps.add(o.readOp(client));
- }
+ final ArrayList<IMultiEntryOperation> readOps = new ArrayList<>(objects.size());
+ for (KVObject o : objects) {
+ readOps.add(o.readOp(client));
+ }
- boolean failExists = client.multiRead(readOps);
+ boolean failExists = client.multiRead(readOps);
- for (int i = 0; i < readOps.size(); ++i) {
- KVObject obj = objects.get(i);
- IMultiEntryOperation entry = readOps.get(i);
- if ( entry.hasSucceeded() ) {
- if ( !obj.deserialize(entry.getValue(), entry.getVersion()) ) {
- //deserialize return true on success
- failExists = true;
- log.error("MultiRead error, failed to deserialize {}, {}", obj.getTable(), obj);
- }
- } else {
- log.error("MultiRead error, skipping {}, {}", obj.getTable(), obj);
- obj.version = obj.getTable().VERSION_NONEXISTENT();
- failExists = true;
- }
- }
+ for (int i = 0; i < readOps.size(); ++i) {
+ KVObject obj = objects.get(i);
+ IMultiEntryOperation entry = readOps.get(i);
+ if ( entry.hasSucceeded() ) {
+ if ( !obj.deserialize(entry.getValue(), entry.getVersion()) ) {
+ //deserialize return true on success
+ failExists = true;
+ log.error("MultiRead error, failed to deserialize {}, {}", obj.getTable(), obj);
+ }
+ } else {
+ log.error("MultiRead error, skipping {}, {}", obj.getTable(), obj);
+ obj.version = obj.getTable().VERSION_NONEXISTENT();
+ failExists = true;
+ }
+ }
- return failExists;
+ return failExists;
}
/**
@@ -339,131 +339,131 @@
*/
public static class WriteOp implements IMultiObjectOperation, IModifiableMultiEntryOperation {
- private final IModifiableMultiEntryOperation base;
- private final KVObject obj;
+ private final IModifiableMultiEntryOperation base;
+ private final KVObject obj;
- public WriteOp(IMultiEntryOperation base, final KVObject obj) {
- this.base = (IModifiableMultiEntryOperation) base;
- this.obj = obj;
+ public WriteOp(IMultiEntryOperation base, final KVObject obj) {
+ this.base = (IModifiableMultiEntryOperation) base;
+ this.obj = obj;
- // switch (base.getOperation()) {
- // case CREATE:
- // case FORCE_CREATE:
- // case UPDATE:
- // break;
- // default:
- // throw new UnsupportedOperationException("Unexpected OPERATION:"+base.getOperation());
- // }
- }
+ // switch (base.getOperation()) {
+ // case CREATE:
+ // case FORCE_CREATE:
+ // case UPDATE:
+ // break;
+ // default:
+ // throw new UnsupportedOperationException("Unexpected OPERATION:"+base.getOperation());
+ // }
+ }
- @Override
- public KVObject getObject() {
- return obj;
- }
+ @Override
+ public KVObject getObject() {
+ return obj;
+ }
- @Deprecated
- public OPERATION getOp() {
- return this.getOperation();
- }
+ @Deprecated
+ public OPERATION getOp() {
+ return this.getOperation();
+ }
- @Override
- public boolean hasSucceeded() {
- return base.hasSucceeded();
- }
+ @Override
+ public boolean hasSucceeded() {
+ return base.hasSucceeded();
+ }
- @Override
- public STATUS getStatus() {
- return base.getStatus();
- }
+ @Override
+ public STATUS getStatus() {
+ return base.getStatus();
+ }
- @Override
- public IKVTableID getTableId() {
- return base.getTableId();
- }
+ @Override
+ public IKVTableID getTableId() {
+ return base.getTableId();
+ }
- @Override
- public byte[] getKey() {
- return base.getKey();
- }
+ @Override
+ public byte[] getKey() {
+ return base.getKey();
+ }
- @Override
- public byte[] getValue() {
- return base.getValue();
- }
+ @Override
+ public byte[] getValue() {
+ return base.getValue();
+ }
- @Override
- public long getVersion() {
- return base.getVersion();
- }
+ @Override
+ public long getVersion() {
+ return base.getVersion();
+ }
- @Override
- public OPERATION getOperation() {
- return base.getOperation();
- }
+ @Override
+ public OPERATION getOperation() {
+ return base.getOperation();
+ }
- @Override
- public void setStatus(STATUS status) {
- base.setStatus(status);
- }
+ @Override
+ public void setStatus(STATUS status) {
+ base.setStatus(status);
+ }
- @Override
- public void setValue(byte[] value, long version) {
- base.setValue(value, version);
- }
+ @Override
+ public void setValue(byte[] value, long version) {
+ base.setValue(value, version);
+ }
- @Override
- public void setVersion(long version) {
- base.setVersion(version);
- this.obj.version = version;
- }
+ @Override
+ public void setVersion(long version) {
+ base.setVersion(version);
+ this.obj.version = version;
+ }
- @Override
- public IModifiableMultiEntryOperation getActualOperation() {
- return base;
- }
+ @Override
+ public IModifiableMultiEntryOperation getActualOperation() {
+ return base;
+ }
}
public static boolean multiWrite(final List<WriteOp> objects) {
- final IKVClient client = DataStoreClient.getClient();
+ final IKVClient client = DataStoreClient.getClient();
- final ArrayList<IMultiEntryOperation> writeOps = new ArrayList<>(objects.size());
- for (WriteOp o : objects) {
- writeOps.add(o);
- }
+ final ArrayList<IMultiEntryOperation> writeOps = new ArrayList<>(objects.size());
+ for (WriteOp o : objects) {
+ writeOps.add(o);
+ }
- return client.multiWrite(writeOps);
+ return client.multiWrite(writeOps);
}
public abstract static class AbstractObjectIterator<E extends KVObject> implements
Iterator<E> {
- protected Iterator<IKVEntry> enumerator;
+ protected Iterator<IKVEntry> enumerator;
- public AbstractObjectIterator(final IKVTable table) {
- this.enumerator = table.getAllEntries().iterator();
- }
+ public AbstractObjectIterator(final IKVTable table) {
+ this.enumerator = table.getAllEntries().iterator();
+ }
- @Override
- public boolean hasNext() {
- return enumerator.hasNext();
- }
+ @Override
+ public boolean hasNext() {
+ return enumerator.hasNext();
+ }
- // Implement something similar to below to realize Iterator
- // @Override
- // public E next() {
- // IKVTable.IKVEntry o = enumerator.next();
- // E obj = E.createFromKey(o.getKey());
- // obj.deserialize(o.getValue(), o.getVersion());
- // return obj;
- // }
+ // Implement something similar to below to realize Iterator
+ // @Override
+ // public E next() {
+ // IKVTable.IKVEntry o = enumerator.next();
+ // E obj = E.createFromKey(o.getKey());
+ // obj.deserialize(o.getValue(), o.getVersion());
+ // return obj;
+ // }
- @Deprecated
- @Override
- public void remove() {
- // TODO Not implemented, as I cannot find a use-case for it.
- throw new UnsupportedOperationException("Not implemented yet");
- }
+ @Deprecated
+ @Override
+ public void remove() {
+ // TODO Not implemented, as I cannot find a use-case for it.
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
}