blob: 8c34caceab7049889e1bd17d25093f16a33588a5 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.hazelcast;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07002
3import java.io.FileNotFoundException;
4import java.util.Collection;
5import java.util.List;
6
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -07007import net.onrc.onos.core.datagrid.HazelcastDatagrid;
Jonathan Hart6df90172014-04-03 10:13:11 -07008import net.onrc.onos.core.datastore.IKVClient;
9import net.onrc.onos.core.datastore.IKVTable;
Jonathan Harta99ec672014-04-03 11:30:34 -070010import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
Jonathan Hart6df90172014-04-03 10:13:11 -070011import net.onrc.onos.core.datastore.IKVTableID;
12import net.onrc.onos.core.datastore.IMultiEntryOperation;
Jonathan Harta99ec672014-04-03 11:30:34 -070013import net.onrc.onos.core.datastore.IMultiEntryOperation.OPERATION;
14import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
Jonathan Hart6df90172014-04-03 10:13:11 -070015import net.onrc.onos.core.datastore.ObjectDoesntExistException;
16import net.onrc.onos.core.datastore.ObjectExistsException;
17import net.onrc.onos.core.datastore.WrongVersionException;
Jonathan Hart6df90172014-04-03 10:13:11 -070018import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
19import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070020import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070021
22import org.slf4j.Logger;
23import org.slf4j.LoggerFactory;
24
25import com.hazelcast.client.HazelcastClient;
26import com.hazelcast.client.config.ClientConfig;
27import com.hazelcast.config.Config;
28import com.hazelcast.config.FileSystemXmlConfig;
29import com.hazelcast.config.MapConfig;
30import com.hazelcast.config.SerializationConfig;
31import com.hazelcast.core.Hazelcast;
32import com.hazelcast.core.HazelcastInstance;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070033import com.hazelcast.core.IAtomicLong;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070034import com.hazelcast.core.IMap;
35
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070036/**
37 * Hazelcast implementation of datastore IKVClient.
38 */
Ray Milkey1584ec82014-04-10 11:58:30 -070039public final class HZClient implements IKVClient {
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070040 private static final Logger log = LoggerFactory.getLogger(HZClient.class);
41
42 static final long VERSION_NONEXISTENT = 0L;
43
44 private static final String MAP_PREFIX = "datastore://";
45
Jonathan Hart6df90172014-04-03 10:13:11 -070046 private static final String BASE_CONFIG_FILENAME = System.getProperty("net.onrc.onos.core.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
Yuta HIGUCHI3ebc9482014-05-08 16:28:28 -070047 private static final String HAZELCAST_DEFAULT_XML = "conf/hazelcast.default.xml";
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070048
49 // XXX Remove this mode at some point
Jonathan Hart6df90172014-04-03 10:13:11 -070050 private static boolean useClientMode = Boolean.parseBoolean(System.getProperty("net.onrc.onos.core.datastore.hazelcast.clientMode", "true"));
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070051
52 // Note: xml configuration will overwrite this value if present
Yuta HIGUCHI0fe749a2014-05-27 09:35:16 -070053 private static int backupCount = Integer.parseInt(System.getProperty("net.onrc.onos.core.datastore.hazelcast.backupCount", "3"));
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070054
55 private final HazelcastInstance hazelcastInstance;
56
57 private static final HZClient THE_INSTANCE = new HZClient();
58
Yuta HIGUCHI8447c362014-05-30 11:32:49 -070059 /**
60 * Get DataStoreClient implemented on Hazelcast.
61 *
62 * @return HZClient
63 */
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070064 public static HZClient getClient() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070065 return THE_INSTANCE;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070066 }
67
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070068 /**
69 * Default constructor.
70 * <p/>
71 * Get or create the Hazelcast Instance to use for datastore.
72 */
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070073 private HZClient() {
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070074 // Try to get the existing HZ instance in JVM if possible.
75 HazelcastInstance instance = Hazelcast.getHazelcastInstanceByName(HazelcastDatagrid.ONOS_HAZELCAST_INSTANCE);
76 if (instance == null) {
77 log.error("Failed to get the Hazelcast instance in JVM. "
78 + "Probably DataStoreClient was requested before "
79 + "IDatagridService was started "
80 + "or running as part of unit tests. "
81 + "Creating instance on it's own.");
82 instance = getFallbackHZinstance(BASE_CONFIG_FILENAME);
83 }
84 hazelcastInstance = instance;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070085 }
86
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070087 /**
88 * Get or create the hazelcast instance to use for datastore, when existing
89 * Hazelcast instance cannot be retrieved.
90 * <p/>
91 *
92 * @param hazelcastConfigFileName Hazelcast configuration to use when creating a
93 * @return HazelcastInstance to use for datastore
94 */
95 private static HazelcastInstance getFallbackHZinstance(final String hazelcastConfigFileName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070096 Config baseHzConfig = null;
97 try {
98 baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
99 } catch (FileNotFoundException e) {
100 log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
Yuta HIGUCHI3ebc9482014-05-08 16:28:28 -0700101 // Fallback mechanism to support running unit test without setup.
102 log.error("Falling back to default Hazelcast XML {}", HAZELCAST_DEFAULT_XML);
103 try {
104 baseHzConfig = new FileSystemXmlConfig(HAZELCAST_DEFAULT_XML);
105 } catch (FileNotFoundException e2) {
106 log.error("Error opening fall back Hazelcast XML configuration. "
107 + "File not found: " + HAZELCAST_DEFAULT_XML, e2);
108
109 // intentionally throwing Exception "e" thrown from non-fallback
110 // Hazelcast configuration loading.
111 throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName, e);
112 }
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700113 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700114
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700115 // use xml config if present, if not use System.property
116 MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
117 if (mapConfig != null) {
118 backupCount = mapConfig.getBackupCount();
119 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700120
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700121 HazelcastInstance instance = null;
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -0700122 // TODO Client mode should be removed at some point.
123 // we can get HZ instance used by ONOS using getHazelcastInstanceByName
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700124 if (useClientMode) {
125 log.info("Configuring Hazelcast datastore as Client mode");
126 ClientConfig clientConfig = new ClientConfig();
127 final int port = baseHzConfig.getNetworkConfig().getPort();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700128
Jonathan Hart6df90172014-04-03 10:13:11 -0700129 String server = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.server", "localhost");
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700130 clientConfig.addAddress(server + ":" + port);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700131
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -0700132 // client mode connection limit.
133 // set to 0 for fast fall back to Instance mode.
134 String sAttempts = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit");
135 if (sAttempts != null) {
Yuta HIGUCHI0fe749a2014-05-27 09:35:16 -0700136 clientConfig.setConnectionAttemptLimit(Integer.parseInt(sAttempts));
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -0700137 }
138
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700139 // copy group config from base Hazelcast configuration
140 clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
141 clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700142
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700143 // TODO We probably need to figure out what else need to be
144 // derived from baseConfig
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700145
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700146 registerSerializer(clientConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700147
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700148 log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700149
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700150 try {
151 instance = HazelcastClient.newHazelcastClient(clientConfig);
152 if (!instance.getCluster().getMembers().isEmpty()) {
153 log.debug("Members in cluster: " + instance.getCluster().getMembers());
154 return instance;
155 }
156 log.info("Failed to find cluster member, falling back to Instance mode");
157 } catch (IllegalStateException e) {
158 log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
159 }
160 useClientMode = false;
161 instance = null;
162 }
163 log.info("Configuring Hazelcast datastore as Instance mode");
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700164
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700165 // To run 2 Hazelcast instance in 1 JVM,
166 // we probably need to something like below
167 //int port = hazelcastConfig.getNetworkConfig().getPort();
168 //hazelcastConfig.getNetworkConfig().setPort(port+1);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700169
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700170 registerSerializer(baseHzConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700171
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700172 return Hazelcast.newHazelcastInstance(baseHzConfig);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700173 }
174
175 /**
176 * Register serializer for VersionedValue class used to imitate value version.
Ray Milkey269ffb92014-04-03 14:43:30 -0700177 *
Yuta HIGUCHI8447c362014-05-30 11:32:49 -0700178 * @param config SerializationConfig to add VersionedValueSerializableFactory.
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700179 */
180 private static void registerSerializer(final SerializationConfig config) {
Yuta HIGUCHI8447c362014-05-30 11:32:49 -0700181 // TODO remove this function at some point.
182 // This method is no longer required, if equibalent to the following
183 // is defined in hazelcast.xml
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700184 config.addDataSerializableFactoryClass(
185 VersionedValueSerializableFactory.FACTORY_ID,
186 VersionedValueSerializableFactory.class);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700187 }
188
189 @Override
190 public IKVTable getTable(final String tableName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700191 IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700192
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700193 if (!useClientMode) {
194 // config only available in Instance Mode
195 // Client Mode must rely on hazelcast.xml to be properly configured.
196 MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
197 // config for this map to be strong consistent
198 if (config.isReadBackupData()) {
199 config.setReadBackupData(false);
200 }
201 if (config.isNearCacheEnabled()) {
202 config.getNearCacheConfig().setMaxSize(0);
203 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700204
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700205 if (config.getBackupCount() != backupCount) {
206 config.setAsyncBackupCount(0);
207 config.setBackupCount(backupCount);
208 }
209 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700210
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700211 return new HZTable(tableName, map);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700212 }
213
214 @Override
215 public void dropTable(final IKVTable table) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700216 ((HZTable) table).getBackendMap().clear();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700217 }
218
219 @Override
220 public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
221 throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700222 IKVTable table = (IKVTable) tableId;
223 return table.create(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700224 }
225
226 @Override
227 public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700228 IKVTable table = (IKVTable) tableId;
229 return table.forceCreate(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700230 }
231
232 @Override
233 public IKVEntry read(final IKVTableID tableId, final byte[] key)
234 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700235 IKVTable table = (IKVTable) tableId;
236 return table.read(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700237 }
238
239 @Override
240 public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
Ray Milkey269ffb92014-04-03 14:43:30 -0700241 final long version) throws ObjectDoesntExistException,
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700242 WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700243 IKVTable table = (IKVTable) tableId;
244 return table.update(key, value, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700245 }
246
247 @Override
248 public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
249 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700250 IKVTable table = (IKVTable) tableId;
251 return table.update(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700252 }
253
254 @Override
255 public long delete(final IKVTableID tableId, final byte[] key, final long version)
256 throws ObjectDoesntExistException, WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700257 IKVTable table = (IKVTable) tableId;
258 return table.delete(key, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700259 }
260
261 @Override
262 public long forceDelete(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700263 IKVTable table = (IKVTable) tableId;
264 return table.forceDelete(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700265 }
266
267 @Override
268 public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700269 IKVTable table = (IKVTable) tableId;
270 return table.getAllEntries();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700271 }
272
273 @Override
274 public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700275 final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700276 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700277 }
278
279 @Override
280 public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700281 final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700282 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700283 }
284
285 @Override
286 public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700287 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700288 }
289
290 @Override
291 public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700292 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700293 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700294 }
295
296 @Override
297 public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700298 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700299 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700300 }
301
302 @Override
303 public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700304 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700305 }
306
307 @Override
308 public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700309 boolean failExists = false;
310 for (IMultiEntryOperation op : ops) {
311 HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
312 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700313 case DELETE:
314 try {
315 final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
316 mop.setVersion(version);
317 mop.setStatus(STATUS.SUCCESS);
318 } catch (ObjectDoesntExistException | WrongVersionException e) {
319 log.error(mop + " failed.", e);
320 mop.setStatus(STATUS.FAILED);
321 failExists = true;
322 }
323 break;
324 case FORCE_DELETE:
325 final long version = forceDelete(mop.getTableId(), mop.getKey());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700326 mop.setVersion(version);
327 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700328 break;
329 default:
330 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700331 }
332 }
333 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700334 }
335
336 @Override
337 public boolean multiWrite(final List<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700338 // there may be room to batch to improve performance
339 boolean failExists = false;
340 for (IMultiEntryOperation op : ops) {
341 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
342 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700343 case CREATE:
344 try {
345 long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
346 mop.setVersion(version);
347 mop.setStatus(STATUS.SUCCESS);
348 } catch (ObjectExistsException e) {
349 log.error(mop + " failed.", e);
350 mop.setStatus(STATUS.FAILED);
351 failExists = true;
352 }
353 break;
354 case FORCE_CREATE: {
355 final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700356 mop.setVersion(version);
357 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700358 break;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700359 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700360 case UPDATE:
361 try {
362 long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
363 mop.setVersion(version);
364 mop.setStatus(STATUS.SUCCESS);
365 } catch (ObjectDoesntExistException | WrongVersionException e) {
366 log.error(mop + " failed.", e);
367 mop.setStatus(STATUS.FAILED);
368 failExists = true;
369 }
370 break;
371 default:
372 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700373 }
374 }
375 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700376 }
377
378 @Override
379 public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700380 boolean failExists = false;
381 for (IMultiEntryOperation op : ops) {
382 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
383 HZTable table = (HZTable) op.getTableId();
384 ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
385 }
386 for (IMultiEntryOperation op : ops) {
387 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
Ray Milkey1aa71f82014-04-08 16:23:24 -0700388 if (!mop.hasSucceeded()) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700389 failExists = true;
390 }
391 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700392
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700393 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700394 }
395
396 @Override
Ray Milkey7531a342014-04-11 15:08:12 -0700397 public long getVersionNonexistant() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700398 return VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700399 }
400
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700401 private String getCounterName(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700402 StringBuilder buf = new StringBuilder(tableId.getTableName());
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700403 buf.append('@');
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700404 ByteArrayUtil.toHexStringBuilder(key, ":", buf);
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700405 return buf.toString();
406 }
407
408 private IAtomicLong getAtomicLong(final IKVTableID tableId, final byte[] key) {
409 // TODO we probably want to implement some sort of caching
410 return hazelcastInstance.getAtomicLong(getCounterName(tableId, key));
411 }
412
413 /**
414 * {@inheritDoc}
415 * <p />
416 * Warning: The counter is a different object from {@code key} entry on
417 * IKVTable with {@code tableId}. You cannot use table API to read/write
418 * counters.
419 *
420 * @param tableId Only getTableName() will be used.
421 * @param key tableId + key will be used as Counter name
422 */
423 @Override
424 public void createCounter(final IKVTableID tableId,
425 final byte[] key, final long initialValue)
426 throws ObjectExistsException {
427
428 IAtomicLong counter = getAtomicLong(tableId, key);
429 // Assumption here is that AtomicLong is initialized to 0L
430 final boolean success = counter.compareAndSet(0L, initialValue);
431 if (!success) {
432 throw new ObjectExistsException("Atomic counter "
433 + getCounterName(tableId, key)
434 + " already exist with value:" + counter.get());
435 }
436 }
437
438 @Override
439 public void setCounter(final IKVTableID tableId,
440 final byte[] key, final long value) {
441
442 IAtomicLong counter = getAtomicLong(tableId, key);
443 counter.set(value);
444 }
445
446 @Override
447 public long incrementCounter(final IKVTableID tableId,
448 final byte[] key, final long incrementValue) {
449
450 IAtomicLong counter = getAtomicLong(tableId, key);
451 return counter.addAndGet(incrementValue);
452 }
453
454 @Override
455 public void destroyCounter(final IKVTableID tableId, final byte[] key) {
456 IAtomicLong counter = getAtomicLong(tableId, key);
457 counter.destroy();
458
459 }
460
461 @Override
462 public long getCounter(final IKVTableID tableId, final byte[] key)
463 throws ObjectDoesntExistException {
464
465 IAtomicLong counter = getAtomicLong(tableId, key);
466 return counter.get();
467 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700468
469}