| package net.onrc.onos.core.datastore.hazelcast; |
| |
| import java.io.FileNotFoundException; |
| import java.util.Collection; |
| import java.util.List; |
| |
| import net.onrc.onos.core.datastore.IKVClient; |
| import net.onrc.onos.core.datastore.IKVTable; |
| import net.onrc.onos.core.datastore.IKVTable.IKVEntry; |
| import net.onrc.onos.core.datastore.IKVTableID; |
| import net.onrc.onos.core.datastore.IMultiEntryOperation; |
| import net.onrc.onos.core.datastore.IMultiEntryOperation.OPERATION; |
| import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS; |
| import net.onrc.onos.core.datastore.ObjectDoesntExistException; |
| import net.onrc.onos.core.datastore.ObjectExistsException; |
| import net.onrc.onos.core.datastore.WrongVersionException; |
| import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue; |
| import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation; |
| import net.onrc.onos.core.datastore.utils.ByteArrayUtil; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.hazelcast.client.HazelcastClient; |
| import com.hazelcast.client.config.ClientConfig; |
| import com.hazelcast.config.Config; |
| import com.hazelcast.config.FileSystemXmlConfig; |
| import com.hazelcast.config.MapConfig; |
| import com.hazelcast.config.SerializationConfig; |
| import com.hazelcast.core.Hazelcast; |
| import com.hazelcast.core.HazelcastInstance; |
| import com.hazelcast.core.IAtomicLong; |
| import com.hazelcast.core.IMap; |
| |
| public final class HZClient implements IKVClient { |
| private static final Logger log = LoggerFactory.getLogger(HZClient.class); |
| |
| static final long VERSION_NONEXISTENT = 0L; |
| |
| private static final String MAP_PREFIX = "datastore://"; |
| |
| // make this path configurable |
| private static final String BASE_CONFIG_FILENAME = System.getProperty("net.onrc.onos.core.datastore.hazelcast.baseConfig", "conf/hazelcast.xml"); |
| private static boolean useClientMode = Boolean.parseBoolean(System.getProperty("net.onrc.onos.core.datastore.hazelcast.clientMode", "true")); |
| |
| // Note: xml configuration will overwrite this value if present |
| private static int backupCount = Integer.valueOf(System.getProperty("net.onrc.onos.core.datastore.hazelcast.backupCount", "3")); |
| |
| private final HazelcastInstance hazelcastInstance; |
| |
| private static final HZClient THE_INSTANCE = new HZClient(); |
| |
| public static HZClient getClient() { |
| return THE_INSTANCE; |
| } |
| |
| private HZClient() { |
| hazelcastInstance = getHZinstance(BASE_CONFIG_FILENAME); |
| } |
| |
| private static HazelcastInstance getHZinstance(final String hazelcastConfigFileName) { |
| Config baseHzConfig = null; |
| try { |
| baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName); |
| } catch (FileNotFoundException e) { |
| log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e); |
| throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName, e); |
| } |
| |
| // use xml config if present, if not use System.property |
| MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*"); |
| if (mapConfig != null) { |
| backupCount = mapConfig.getBackupCount(); |
| } |
| |
| HazelcastInstance instance = null; |
| if (useClientMode) { |
| log.info("Configuring Hazelcast datastore as Client mode"); |
| ClientConfig clientConfig = new ClientConfig(); |
| final int port = baseHzConfig.getNetworkConfig().getPort(); |
| |
| String server = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.server", "localhost"); |
| clientConfig.addAddress(server + ":" + port); |
| |
| // client mode connection limit. |
| // set to 0 for fast fall back to Instance mode. |
| String sAttempts = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit"); |
| if (sAttempts != null) { |
| clientConfig.setConnectionAttemptLimit(Integer.valueOf(sAttempts).intValue()); |
| } |
| |
| // copy group config from base Hazelcast configuration |
| clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName()); |
| clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword()); |
| |
| // TODO We probably need to figure out what else need to be |
| // derived from baseConfig |
| |
| registerSerializer(clientConfig.getSerializationConfig()); |
| |
| log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList()); |
| |
| try { |
| instance = HazelcastClient.newHazelcastClient(clientConfig); |
| if (!instance.getCluster().getMembers().isEmpty()) { |
| log.debug("Members in cluster: " + instance.getCluster().getMembers()); |
| return instance; |
| } |
| log.info("Failed to find cluster member, falling back to Instance mode"); |
| } catch (IllegalStateException e) { |
| log.info("Failed to initialize HazelcastClient, falling back to Instance mode"); |
| } |
| useClientMode = false; |
| instance = null; |
| } |
| log.info("Configuring Hazelcast datastore as Instance mode"); |
| |
| // To run 2 Hazelcast instance in 1 JVM, |
| // we probably need to something like below |
| //int port = hazelcastConfig.getNetworkConfig().getPort(); |
| //hazelcastConfig.getNetworkConfig().setPort(port+1); |
| |
| registerSerializer(baseHzConfig.getSerializationConfig()); |
| |
| return Hazelcast.newHazelcastInstance(baseHzConfig); |
| } |
| |
| /** |
| * Register serializer for VersionedValue class used to imitate value version. |
| * |
| * @param config |
| */ |
| private static void registerSerializer(final SerializationConfig config) { |
| config.addDataSerializableFactoryClass( |
| VersionedValueSerializableFactory.FACTORY_ID, |
| VersionedValueSerializableFactory.class); |
| } |
| |
| @Override |
| public IKVTable getTable(final String tableName) { |
| IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName); |
| |
| if (!useClientMode) { |
| // config only available in Instance Mode |
| // Client Mode must rely on hazelcast.xml to be properly configured. |
| MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName); |
| // config for this map to be strong consistent |
| if (config.isReadBackupData()) { |
| config.setReadBackupData(false); |
| } |
| if (config.isNearCacheEnabled()) { |
| config.getNearCacheConfig().setMaxSize(0); |
| } |
| |
| if (config.getBackupCount() != backupCount) { |
| config.setAsyncBackupCount(0); |
| config.setBackupCount(backupCount); |
| } |
| } |
| |
| return new HZTable(tableName, map); |
| } |
| |
| @Override |
| public void dropTable(final IKVTable table) { |
| ((HZTable) table).getBackendMap().clear(); |
| } |
| |
| @Override |
| public long create(final IKVTableID tableId, final byte[] key, final byte[] value) |
| throws ObjectExistsException { |
| IKVTable table = (IKVTable) tableId; |
| return table.create(key, value); |
| } |
| |
| @Override |
| public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) { |
| IKVTable table = (IKVTable) tableId; |
| return table.forceCreate(key, value); |
| } |
| |
| @Override |
| public IKVEntry read(final IKVTableID tableId, final byte[] key) |
| throws ObjectDoesntExistException { |
| IKVTable table = (IKVTable) tableId; |
| return table.read(key); |
| } |
| |
| @Override |
| public long update(final IKVTableID tableId, final byte[] key, final byte[] value, |
| final long version) throws ObjectDoesntExistException, |
| WrongVersionException { |
| IKVTable table = (IKVTable) tableId; |
| return table.update(key, value, version); |
| } |
| |
| @Override |
| public long update(final IKVTableID tableId, final byte[] key, final byte[] value) |
| throws ObjectDoesntExistException { |
| IKVTable table = (IKVTable) tableId; |
| return table.update(key, value); |
| } |
| |
| @Override |
| public long delete(final IKVTableID tableId, final byte[] key, final long version) |
| throws ObjectDoesntExistException, WrongVersionException { |
| IKVTable table = (IKVTable) tableId; |
| return table.delete(key, version); |
| } |
| |
| @Override |
| public long forceDelete(final IKVTableID tableId, final byte[] key) { |
| IKVTable table = (IKVTable) tableId; |
| return table.forceDelete(key); |
| } |
| |
| @Override |
| public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) { |
| IKVTable table = (IKVTable) tableId; |
| return table.getAllEntries(); |
| } |
| |
| @Override |
| public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key, |
| final byte[] value) { |
| return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE); |
| } |
| |
| @Override |
| public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key, |
| final byte[] value) { |
| return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE); |
| } |
| |
| @Override |
| public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) { |
| return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ); |
| } |
| |
| @Override |
| public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key, |
| final byte[] value, final long version) { |
| return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE); |
| } |
| |
| @Override |
| public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key, |
| final byte[] value, final long version) { |
| return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE); |
| } |
| |
| @Override |
| public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) { |
| return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE); |
| } |
| |
| @Override |
| public boolean multiDelete(final Collection<IMultiEntryOperation> ops) { |
| boolean failExists = false; |
| for (IMultiEntryOperation op : ops) { |
| HZMultiEntryOperation mop = (HZMultiEntryOperation) op; |
| switch (mop.getOperation()) { |
| case DELETE: |
| try { |
| final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion()); |
| mop.setVersion(version); |
| mop.setStatus(STATUS.SUCCESS); |
| } catch (ObjectDoesntExistException | WrongVersionException e) { |
| log.error(mop + " failed.", e); |
| mop.setStatus(STATUS.FAILED); |
| failExists = true; |
| } |
| break; |
| case FORCE_DELETE: |
| final long version = forceDelete(mop.getTableId(), mop.getKey()); |
| mop.setVersion(version); |
| mop.setStatus(STATUS.SUCCESS); |
| break; |
| default: |
| throw new UnsupportedOperationException(mop.toString()); |
| } |
| } |
| return failExists; |
| } |
| |
| @Override |
| public boolean multiWrite(final List<IMultiEntryOperation> ops) { |
| // there may be room to batch to improve performance |
| boolean failExists = false; |
| for (IMultiEntryOperation op : ops) { |
| IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op; |
| switch (mop.getOperation()) { |
| case CREATE: |
| try { |
| long version = create(mop.getTableId(), mop.getKey(), mop.getValue()); |
| mop.setVersion(version); |
| mop.setStatus(STATUS.SUCCESS); |
| } catch (ObjectExistsException e) { |
| log.error(mop + " failed.", e); |
| mop.setStatus(STATUS.FAILED); |
| failExists = true; |
| } |
| break; |
| case FORCE_CREATE: { |
| final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue()); |
| mop.setVersion(version); |
| mop.setStatus(STATUS.SUCCESS); |
| break; |
| } |
| case UPDATE: |
| try { |
| long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion()); |
| mop.setVersion(version); |
| mop.setStatus(STATUS.SUCCESS); |
| } catch (ObjectDoesntExistException | WrongVersionException e) { |
| log.error(mop + " failed.", e); |
| mop.setStatus(STATUS.FAILED); |
| failExists = true; |
| } |
| break; |
| default: |
| throw new UnsupportedOperationException(mop.toString()); |
| } |
| } |
| return failExists; |
| } |
| |
| @Override |
| public boolean multiRead(final Collection<IMultiEntryOperation> ops) { |
| boolean failExists = false; |
| for (IMultiEntryOperation op : ops) { |
| IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op; |
| HZTable table = (HZTable) op.getTableId(); |
| ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey())); |
| } |
| for (IMultiEntryOperation op : ops) { |
| IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op; |
| if (!mop.hasSucceeded()) { |
| failExists = true; |
| } |
| } |
| |
| return failExists; |
| } |
| |
| @Override |
| public long getVersionNonexistant() { |
| return VERSION_NONEXISTENT; |
| } |
| |
| private String getCounterName(final IKVTableID tableId, final byte[] key) { |
| StringBuilder buf = new StringBuilder(tableId.getTableName()); |
| buf.append('@'); |
| ByteArrayUtil.toHexStringBuilder(key, ":", buf); |
| return buf.toString(); |
| } |
| |
| private IAtomicLong getAtomicLong(final IKVTableID tableId, final byte[] key) { |
| // TODO we probably want to implement some sort of caching |
| return hazelcastInstance.getAtomicLong(getCounterName(tableId, key)); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * <p /> |
| * Warning: The counter is a different object from {@code key} entry on |
| * IKVTable with {@code tableId}. You cannot use table API to read/write |
| * counters. |
| * |
| * @param tableId Only getTableName() will be used. |
| * @param key tableId + key will be used as Counter name |
| */ |
| @Override |
| public void createCounter(final IKVTableID tableId, |
| final byte[] key, final long initialValue) |
| throws ObjectExistsException { |
| |
| IAtomicLong counter = getAtomicLong(tableId, key); |
| // Assumption here is that AtomicLong is initialized to 0L |
| final boolean success = counter.compareAndSet(0L, initialValue); |
| if (!success) { |
| throw new ObjectExistsException("Atomic counter " |
| + getCounterName(tableId, key) |
| + " already exist with value:" + counter.get()); |
| } |
| } |
| |
| @Override |
| public void setCounter(final IKVTableID tableId, |
| final byte[] key, final long value) { |
| |
| IAtomicLong counter = getAtomicLong(tableId, key); |
| counter.set(value); |
| } |
| |
| @Override |
| public long incrementCounter(final IKVTableID tableId, |
| final byte[] key, final long incrementValue) { |
| |
| IAtomicLong counter = getAtomicLong(tableId, key); |
| return counter.addAndGet(incrementValue); |
| } |
| |
| @Override |
| public void destroyCounter(final IKVTableID tableId, final byte[] key) { |
| IAtomicLong counter = getAtomicLong(tableId, key); |
| counter.destroy(); |
| |
| } |
| |
| @Override |
| public long getCounter(final IKVTableID tableId, final byte[] key) |
| throws ObjectDoesntExistException { |
| |
| IAtomicLong counter = getAtomicLong(tableId, key); |
| return counter.get(); |
| } |
| |
| } |