Add option to use Hazelcast as datastore for development environment.
This patch will set the default backend as Hazelcast.
To use RAMCloud as backend data store, add -Dnet.onrc.onos.datastore.backend=ramcloud to java option.
- ClientMode: Use existing Hazelcast Instance if it exist.
- map name starting with datastore:// is now configured to be strong consistent
- add main for manual testing
- follow PMD,etc. where easily possible
- make HZClient Singleton
Change-Id: Ibe2afc3bfddfd7fd567c91477c16cd679fc543d4
diff --git a/src/main/java/net/onrc/onos/datastore/hazelcast/HZClient.java b/src/main/java/net/onrc/onos/datastore/hazelcast/HZClient.java
new file mode 100644
index 0000000..2c2b3a4
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/hazelcast/HZClient.java
@@ -0,0 +1,345 @@
+package net.onrc.onos.datastore.hazelcast;
+
+import java.io.FileNotFoundException;
+import java.util.Collection;
+import java.util.List;
+
+import net.onrc.onos.datastore.IKVClient;
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.datastore.IKVTableID;
+import net.onrc.onos.datastore.IMultiEntryOperation;
+import net.onrc.onos.datastore.IMultiEntryOperation.OPERATION;
+import net.onrc.onos.datastore.IMultiEntryOperation.STATUS;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.hazelcast.HZTable.VersionedValue;
+import net.onrc.onos.datastore.internal.IModifiableMultiEntryOperation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hazelcast.client.HazelcastClient;
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.config.SerializationConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+
+public class HZClient implements IKVClient {
+ private static final Logger log = LoggerFactory.getLogger(HZClient.class);
+
+ static final long VERSION_NONEXISTENT = 0L;
+
+ private static final String MAP_PREFIX = "datastore://";
+
+ // make this path configurable
+ private static final String BASE_CONFIG_FILENAME = System.getProperty("net.onrc.onos.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
+ private static boolean useClientMode = Boolean.parseBoolean(System.getProperty("net.onrc.onos.datastore.hazelcast.clientMode", "true"));
+
+ // Note: xml configuration will overwrite this value if present
+ private static int backupCount = Integer.valueOf(System.getProperty("net.onrc.onos.datastore.hazelcast.backupCount", "3"));
+
+ private final HazelcastInstance hazelcastInstance;
+
+ private static final HZClient THE_INSTANCE = new HZClient();
+
+ public static HZClient getClient() {
+ return THE_INSTANCE;
+ }
+
+ private HZClient() {
+ hazelcastInstance = getHZinstance(BASE_CONFIG_FILENAME);
+ }
+
+ private static HazelcastInstance getHZinstance(final String hazelcastConfigFileName) {
+ Config baseHzConfig = null;
+ try {
+ baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
+ } catch (FileNotFoundException e) {
+ log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
+ throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName , e);
+ }
+
+ // use xml config if present, if not use System.property
+ MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
+ if (mapConfig != null) {
+ backupCount = mapConfig.getBackupCount();
+ }
+
+ HazelcastInstance instance = null;
+ if (useClientMode) {
+ log.info("Configuring Hazelcast datastore as Client mode");
+ ClientConfig clientConfig = new ClientConfig();
+ final int port = baseHzConfig.getNetworkConfig().getPort();
+
+ String server = System.getProperty("net.onrc.onos.datastore.hazelcast.client.server", "localhost");
+ clientConfig.addAddress(server + ":" + port);
+
+ // copy group config from base Hazelcast configuration
+ clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
+ clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
+
+ // TODO We probably need to figure out what else need to be
+ // derived from baseConfig
+
+ registerSerializer(clientConfig.getSerializationConfig());
+
+ log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
+
+ try {
+ instance = HazelcastClient.newHazelcastClient(clientConfig);
+ if (!instance.getCluster().getMembers().isEmpty()) {
+ log.debug("Members in cluster: " + instance.getCluster().getMembers());
+ return instance;
+ }
+ log.info("Failed to find cluster member, falling back to Instance mode");
+ } catch (IllegalStateException e) {
+ log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
+ }
+ useClientMode = false;
+ instance = null;
+ }
+ log.info("Configuring Hazelcast datastore as Instance mode");
+
+ // To run 2 Hazelcast instance in 1 JVM,
+ // we probably need to something like below
+ //int port = hazelcastConfig.getNetworkConfig().getPort();
+ //hazelcastConfig.getNetworkConfig().setPort(port+1);
+
+ registerSerializer(baseHzConfig.getSerializationConfig());
+
+ return Hazelcast.newHazelcastInstance(baseHzConfig);
+ }
+
+ /**
+ * Register serializer for VersionedValue class used to imitate value version.
+ * @param config
+ */
+ private static void registerSerializer(final SerializationConfig config) {
+ config.addDataSerializableFactoryClass(
+ VersionedValueSerializableFactory.FACTORY_ID,
+ VersionedValueSerializableFactory.class);
+ }
+
+ @Override
+ public IKVTable getTable(final String tableName) {
+ IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
+
+ if (!useClientMode) {
+ // config only available in Instance Mode
+ // Client Mode must rely on hazelcast.xml to be properly configured.
+ MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
+ // config for this map to be strong consistent
+ if (config.isReadBackupData()) {
+ config.setReadBackupData(false);
+ }
+ if (config.isNearCacheEnabled()) {
+ config.getNearCacheConfig().setMaxSize(0);
+ }
+
+ if (config.getBackupCount() != backupCount) {
+ config.setAsyncBackupCount(0);
+ config.setBackupCount(backupCount);
+ }
+ }
+
+ return new HZTable(tableName, map);
+ }
+
+ @Override
+ public void dropTable(final IKVTable table) {
+ ((HZTable) table).getBackendMap().clear();
+ }
+
+ @Override
+ public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
+ throws ObjectExistsException {
+ IKVTable table = (IKVTable) tableId;
+ return table.create(key, value);
+ }
+
+ @Override
+ public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
+ IKVTable table = (IKVTable) tableId;
+ return table.forceCreate(key, value);
+ }
+
+ @Override
+ public IKVEntry read(final IKVTableID tableId, final byte[] key)
+ throws ObjectDoesntExistException {
+ IKVTable table = (IKVTable) tableId;
+ return table.read(key);
+ }
+
+ @Override
+ public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
+ final long version) throws ObjectDoesntExistException,
+ WrongVersionException {
+ IKVTable table = (IKVTable) tableId;
+ return table.update(key, value, version);
+ }
+
+ @Override
+ public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
+ throws ObjectDoesntExistException {
+ IKVTable table = (IKVTable) tableId;
+ return table.update(key, value);
+ }
+
+ @Override
+ public long delete(final IKVTableID tableId, final byte[] key, final long version)
+ throws ObjectDoesntExistException, WrongVersionException {
+ IKVTable table = (IKVTable) tableId;
+ return table.delete(key, version);
+ }
+
+ @Override
+ public long forceDelete(final IKVTableID tableId, final byte[] key) {
+ IKVTable table = (IKVTable) tableId;
+ return table.forceDelete(key);
+ }
+
+ @Override
+ public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
+ IKVTable table = (IKVTable) tableId;
+ return table.getAllEntries();
+ }
+
+ @Override
+ public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
+ final byte[] value) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
+ }
+
+ @Override
+ public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
+ final byte[] value) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
+ }
+
+ @Override
+ public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
+ }
+
+ @Override
+ public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
+ final byte[] value, final long version) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
+ }
+
+ @Override
+ public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
+ final byte[] value, final long version) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
+ }
+
+ @Override
+ public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
+ return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
+ }
+
+ @Override
+ public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
+ boolean failExists = false;
+ for (IMultiEntryOperation op : ops) {
+ HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
+ switch (mop.getOperation()) {
+ case DELETE:
+ try {
+ final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ log.error(mop + " failed.", e);
+ mop.setStatus(STATUS.FAILED);
+ failExists = true;
+ }
+ break;
+ case FORCE_DELETE:
+ final long version = forceDelete(mop.getTableId(), mop.getKey());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ break;
+ default:
+ throw new UnsupportedOperationException(mop.toString());
+ }
+ }
+ return failExists;
+ }
+
+ @Override
+ public boolean multiWrite(final List<IMultiEntryOperation> ops) {
+ // there may be room to batch to improve performance
+ boolean failExists = false;
+ for (IMultiEntryOperation op : ops) {
+ IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+ switch (mop.getOperation()) {
+ case CREATE:
+ try {
+ long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ } catch (ObjectExistsException e) {
+ log.error(mop + " failed.", e);
+ mop.setStatus(STATUS.FAILED);
+ failExists = true;
+ }
+ break;
+ case FORCE_CREATE:
+ {
+ final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ break;
+ }
+ case UPDATE:
+ try {
+ long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
+ mop.setVersion(version);
+ mop.setStatus(STATUS.SUCCESS);
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ log.error(mop + " failed.", e);
+ mop.setStatus(STATUS.FAILED);
+ failExists = true;
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(mop.toString());
+ }
+ }
+ return failExists;
+ }
+
+ @Override
+ public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
+ boolean failExists = false;
+ for (IMultiEntryOperation op : ops) {
+ IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+ HZTable table = (HZTable) op.getTableId();
+ ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
+ }
+ for (IMultiEntryOperation op : ops) {
+ IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
+ if (mop.hasSucceeded()) {
+ // status update is already done, nothing to do.
+ } else {
+ failExists = true;
+ }
+ }
+
+ return failExists;
+ }
+
+ @Override
+ public long VERSION_NONEXISTENT() {
+ return VERSION_NONEXISTENT;
+ }
+
+
+}