blob: bb7442263be0000e7a494e423802f3dcf1561dda [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.hazelcast;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07002
3import java.io.FileNotFoundException;
4import java.util.Collection;
5import java.util.List;
6
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -07007import net.onrc.onos.core.datagrid.HazelcastDatagrid;
Jonathan Hart6df90172014-04-03 10:13:11 -07008import net.onrc.onos.core.datastore.IKVClient;
9import net.onrc.onos.core.datastore.IKVTable;
Jonathan Harta99ec672014-04-03 11:30:34 -070010import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
Jonathan Hart6df90172014-04-03 10:13:11 -070011import net.onrc.onos.core.datastore.IKVTableID;
12import net.onrc.onos.core.datastore.IMultiEntryOperation;
Jonathan Harta99ec672014-04-03 11:30:34 -070013import net.onrc.onos.core.datastore.IMultiEntryOperation.OPERATION;
14import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
Jonathan Hart6df90172014-04-03 10:13:11 -070015import net.onrc.onos.core.datastore.ObjectDoesntExistException;
16import net.onrc.onos.core.datastore.ObjectExistsException;
17import net.onrc.onos.core.datastore.WrongVersionException;
Jonathan Hart6df90172014-04-03 10:13:11 -070018import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
19import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070020import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070021
22import org.slf4j.Logger;
23import org.slf4j.LoggerFactory;
24
25import com.hazelcast.client.HazelcastClient;
26import com.hazelcast.client.config.ClientConfig;
27import com.hazelcast.config.Config;
28import com.hazelcast.config.FileSystemXmlConfig;
29import com.hazelcast.config.MapConfig;
30import com.hazelcast.config.SerializationConfig;
31import com.hazelcast.core.Hazelcast;
32import com.hazelcast.core.HazelcastInstance;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070033import com.hazelcast.core.IAtomicLong;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070034import com.hazelcast.core.IMap;
35
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070036/**
37 * Hazelcast implementation of datastore IKVClient.
38 */
Ray Milkey1584ec82014-04-10 11:58:30 -070039public final class HZClient implements IKVClient {
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070040 private static final Logger log = LoggerFactory.getLogger(HZClient.class);
41
42 static final long VERSION_NONEXISTENT = 0L;
43
44 private static final String MAP_PREFIX = "datastore://";
45
Jonathan Hart6df90172014-04-03 10:13:11 -070046 private static final String BASE_CONFIG_FILENAME = System.getProperty("net.onrc.onos.core.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
Yuta HIGUCHI3ebc9482014-05-08 16:28:28 -070047 private static final String HAZELCAST_DEFAULT_XML = "conf/hazelcast.default.xml";
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070048
49 // XXX Remove this mode at some point
Jonathan Hart6df90172014-04-03 10:13:11 -070050 private static boolean useClientMode = Boolean.parseBoolean(System.getProperty("net.onrc.onos.core.datastore.hazelcast.clientMode", "true"));
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070051
52 // Note: xml configuration will overwrite this value if present
Yuta HIGUCHI0fe749a2014-05-27 09:35:16 -070053 private static int backupCount = Integer.parseInt(System.getProperty("net.onrc.onos.core.datastore.hazelcast.backupCount", "3"));
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070054
55 private final HazelcastInstance hazelcastInstance;
56
57 private static final HZClient THE_INSTANCE = new HZClient();
58
59 public static HZClient getClient() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070060 return THE_INSTANCE;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070061 }
62
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070063 /**
64 * Default constructor.
65 * <p/>
66 * Get or create the Hazelcast Instance to use for datastore.
67 */
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070068 private HZClient() {
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070069 // Try to get the existing HZ instance in JVM if possible.
70 HazelcastInstance instance = Hazelcast.getHazelcastInstanceByName(HazelcastDatagrid.ONOS_HAZELCAST_INSTANCE);
71 if (instance == null) {
72 log.error("Failed to get the Hazelcast instance in JVM. "
73 + "Probably DataStoreClient was requested before "
74 + "IDatagridService was started "
75 + "or running as part of unit tests. "
76 + "Creating instance on it's own.");
77 instance = getFallbackHZinstance(BASE_CONFIG_FILENAME);
78 }
79 hazelcastInstance = instance;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070080 }
81
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070082 /**
83 * Get or create the hazelcast instance to use for datastore, when existing
84 * Hazelcast instance cannot be retrieved.
85 * <p/>
86 *
87 * @param hazelcastConfigFileName Hazelcast configuration to use when creating a
88 * @return HazelcastInstance to use for datastore
89 */
90 private static HazelcastInstance getFallbackHZinstance(final String hazelcastConfigFileName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070091 Config baseHzConfig = null;
92 try {
93 baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
94 } catch (FileNotFoundException e) {
95 log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
Yuta HIGUCHI3ebc9482014-05-08 16:28:28 -070096 // Fallback mechanism to support running unit test without setup.
97 log.error("Falling back to default Hazelcast XML {}", HAZELCAST_DEFAULT_XML);
98 try {
99 baseHzConfig = new FileSystemXmlConfig(HAZELCAST_DEFAULT_XML);
100 } catch (FileNotFoundException e2) {
101 log.error("Error opening fall back Hazelcast XML configuration. "
102 + "File not found: " + HAZELCAST_DEFAULT_XML, e2);
103
104 // intentionally throwing Exception "e" thrown from non-fallback
105 // Hazelcast configuration loading.
106 throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName, e);
107 }
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700108 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700109
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700110 // use xml config if present, if not use System.property
111 MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
112 if (mapConfig != null) {
113 backupCount = mapConfig.getBackupCount();
114 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700115
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700116 HazelcastInstance instance = null;
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -0700117 // TODO Client mode should be removed at some point.
118 // we can get HZ instance used by ONOS using getHazelcastInstanceByName
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700119 if (useClientMode) {
120 log.info("Configuring Hazelcast datastore as Client mode");
121 ClientConfig clientConfig = new ClientConfig();
122 final int port = baseHzConfig.getNetworkConfig().getPort();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700123
Jonathan Hart6df90172014-04-03 10:13:11 -0700124 String server = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.server", "localhost");
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700125 clientConfig.addAddress(server + ":" + port);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700126
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -0700127 // client mode connection limit.
128 // set to 0 for fast fall back to Instance mode.
129 String sAttempts = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit");
130 if (sAttempts != null) {
Yuta HIGUCHI0fe749a2014-05-27 09:35:16 -0700131 clientConfig.setConnectionAttemptLimit(Integer.parseInt(sAttempts));
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -0700132 }
133
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700134 // copy group config from base Hazelcast configuration
135 clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
136 clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700137
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700138 // TODO We probably need to figure out what else need to be
139 // derived from baseConfig
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700140
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700141 registerSerializer(clientConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700142
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700143 log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700144
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700145 try {
146 instance = HazelcastClient.newHazelcastClient(clientConfig);
147 if (!instance.getCluster().getMembers().isEmpty()) {
148 log.debug("Members in cluster: " + instance.getCluster().getMembers());
149 return instance;
150 }
151 log.info("Failed to find cluster member, falling back to Instance mode");
152 } catch (IllegalStateException e) {
153 log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
154 }
155 useClientMode = false;
156 instance = null;
157 }
158 log.info("Configuring Hazelcast datastore as Instance mode");
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700159
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700160 // To run 2 Hazelcast instance in 1 JVM,
161 // we probably need to something like below
162 //int port = hazelcastConfig.getNetworkConfig().getPort();
163 //hazelcastConfig.getNetworkConfig().setPort(port+1);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700164
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700165 registerSerializer(baseHzConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700166
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700167 return Hazelcast.newHazelcastInstance(baseHzConfig);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700168 }
169
170 /**
171 * Register serializer for VersionedValue class used to imitate value version.
Ray Milkey269ffb92014-04-03 14:43:30 -0700172 *
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700173 * @param config
174 */
175 private static void registerSerializer(final SerializationConfig config) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700176 config.addDataSerializableFactoryClass(
177 VersionedValueSerializableFactory.FACTORY_ID,
178 VersionedValueSerializableFactory.class);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700179 }
180
181 @Override
182 public IKVTable getTable(final String tableName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700183 IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700184
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700185 if (!useClientMode) {
186 // config only available in Instance Mode
187 // Client Mode must rely on hazelcast.xml to be properly configured.
188 MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
189 // config for this map to be strong consistent
190 if (config.isReadBackupData()) {
191 config.setReadBackupData(false);
192 }
193 if (config.isNearCacheEnabled()) {
194 config.getNearCacheConfig().setMaxSize(0);
195 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700196
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700197 if (config.getBackupCount() != backupCount) {
198 config.setAsyncBackupCount(0);
199 config.setBackupCount(backupCount);
200 }
201 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700202
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700203 return new HZTable(tableName, map);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700204 }
205
206 @Override
207 public void dropTable(final IKVTable table) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700208 ((HZTable) table).getBackendMap().clear();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700209 }
210
211 @Override
212 public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
213 throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700214 IKVTable table = (IKVTable) tableId;
215 return table.create(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700216 }
217
218 @Override
219 public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700220 IKVTable table = (IKVTable) tableId;
221 return table.forceCreate(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700222 }
223
224 @Override
225 public IKVEntry read(final IKVTableID tableId, final byte[] key)
226 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700227 IKVTable table = (IKVTable) tableId;
228 return table.read(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700229 }
230
231 @Override
232 public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
Ray Milkey269ffb92014-04-03 14:43:30 -0700233 final long version) throws ObjectDoesntExistException,
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700234 WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700235 IKVTable table = (IKVTable) tableId;
236 return table.update(key, value, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700237 }
238
239 @Override
240 public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
241 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700242 IKVTable table = (IKVTable) tableId;
243 return table.update(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700244 }
245
246 @Override
247 public long delete(final IKVTableID tableId, final byte[] key, final long version)
248 throws ObjectDoesntExistException, WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700249 IKVTable table = (IKVTable) tableId;
250 return table.delete(key, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700251 }
252
253 @Override
254 public long forceDelete(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700255 IKVTable table = (IKVTable) tableId;
256 return table.forceDelete(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700257 }
258
259 @Override
260 public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700261 IKVTable table = (IKVTable) tableId;
262 return table.getAllEntries();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700263 }
264
265 @Override
266 public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700267 final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700268 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700269 }
270
271 @Override
272 public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700273 final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700274 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700275 }
276
277 @Override
278 public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700279 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700280 }
281
282 @Override
283 public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700284 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700285 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700286 }
287
288 @Override
289 public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700290 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700291 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700292 }
293
294 @Override
295 public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700296 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700297 }
298
299 @Override
300 public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700301 boolean failExists = false;
302 for (IMultiEntryOperation op : ops) {
303 HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
304 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700305 case DELETE:
306 try {
307 final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
308 mop.setVersion(version);
309 mop.setStatus(STATUS.SUCCESS);
310 } catch (ObjectDoesntExistException | WrongVersionException e) {
311 log.error(mop + " failed.", e);
312 mop.setStatus(STATUS.FAILED);
313 failExists = true;
314 }
315 break;
316 case FORCE_DELETE:
317 final long version = forceDelete(mop.getTableId(), mop.getKey());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700318 mop.setVersion(version);
319 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700320 break;
321 default:
322 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700323 }
324 }
325 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700326 }
327
328 @Override
329 public boolean multiWrite(final List<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700330 // there may be room to batch to improve performance
331 boolean failExists = false;
332 for (IMultiEntryOperation op : ops) {
333 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
334 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700335 case CREATE:
336 try {
337 long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
338 mop.setVersion(version);
339 mop.setStatus(STATUS.SUCCESS);
340 } catch (ObjectExistsException e) {
341 log.error(mop + " failed.", e);
342 mop.setStatus(STATUS.FAILED);
343 failExists = true;
344 }
345 break;
346 case FORCE_CREATE: {
347 final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700348 mop.setVersion(version);
349 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700350 break;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700351 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700352 case UPDATE:
353 try {
354 long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
355 mop.setVersion(version);
356 mop.setStatus(STATUS.SUCCESS);
357 } catch (ObjectDoesntExistException | WrongVersionException e) {
358 log.error(mop + " failed.", e);
359 mop.setStatus(STATUS.FAILED);
360 failExists = true;
361 }
362 break;
363 default:
364 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700365 }
366 }
367 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700368 }
369
370 @Override
371 public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700372 boolean failExists = false;
373 for (IMultiEntryOperation op : ops) {
374 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
375 HZTable table = (HZTable) op.getTableId();
376 ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
377 }
378 for (IMultiEntryOperation op : ops) {
379 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
Ray Milkey1aa71f82014-04-08 16:23:24 -0700380 if (!mop.hasSucceeded()) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700381 failExists = true;
382 }
383 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700384
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700385 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700386 }
387
388 @Override
Ray Milkey7531a342014-04-11 15:08:12 -0700389 public long getVersionNonexistant() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700390 return VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700391 }
392
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700393 private String getCounterName(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700394 StringBuilder buf = new StringBuilder(tableId.getTableName());
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700395 buf.append('@');
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700396 ByteArrayUtil.toHexStringBuilder(key, ":", buf);
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700397 return buf.toString();
398 }
399
400 private IAtomicLong getAtomicLong(final IKVTableID tableId, final byte[] key) {
401 // TODO we probably want to implement some sort of caching
402 return hazelcastInstance.getAtomicLong(getCounterName(tableId, key));
403 }
404
405 /**
406 * {@inheritDoc}
407 * <p />
408 * Warning: The counter is a different object from {@code key} entry on
409 * IKVTable with {@code tableId}. You cannot use table API to read/write
410 * counters.
411 *
412 * @param tableId Only getTableName() will be used.
413 * @param key tableId + key will be used as Counter name
414 */
415 @Override
416 public void createCounter(final IKVTableID tableId,
417 final byte[] key, final long initialValue)
418 throws ObjectExistsException {
419
420 IAtomicLong counter = getAtomicLong(tableId, key);
421 // Assumption here is that AtomicLong is initialized to 0L
422 final boolean success = counter.compareAndSet(0L, initialValue);
423 if (!success) {
424 throw new ObjectExistsException("Atomic counter "
425 + getCounterName(tableId, key)
426 + " already exist with value:" + counter.get());
427 }
428 }
429
430 @Override
431 public void setCounter(final IKVTableID tableId,
432 final byte[] key, final long value) {
433
434 IAtomicLong counter = getAtomicLong(tableId, key);
435 counter.set(value);
436 }
437
438 @Override
439 public long incrementCounter(final IKVTableID tableId,
440 final byte[] key, final long incrementValue) {
441
442 IAtomicLong counter = getAtomicLong(tableId, key);
443 return counter.addAndGet(incrementValue);
444 }
445
446 @Override
447 public void destroyCounter(final IKVTableID tableId, final byte[] key) {
448 IAtomicLong counter = getAtomicLong(tableId, key);
449 counter.destroy();
450
451 }
452
453 @Override
454 public long getCounter(final IKVTableID tableId, final byte[] key)
455 throws ObjectDoesntExistException {
456
457 IAtomicLong counter = getAtomicLong(tableId, key);
458 return counter.get();
459 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700460
461}