blob: 796976e61cf37eaacdb42c7e65f1b6210f729ede [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.hazelcast;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07002
3import java.io.FileNotFoundException;
4import java.util.Collection;
5import java.util.List;
6
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -07007import net.onrc.onos.core.datagrid.HazelcastDatagrid;
Jonathan Hart6df90172014-04-03 10:13:11 -07008import net.onrc.onos.core.datastore.IKVClient;
9import net.onrc.onos.core.datastore.IKVTable;
Jonathan Harta99ec672014-04-03 11:30:34 -070010import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
Jonathan Hart6df90172014-04-03 10:13:11 -070011import net.onrc.onos.core.datastore.IKVTableID;
12import net.onrc.onos.core.datastore.IMultiEntryOperation;
Jonathan Harta99ec672014-04-03 11:30:34 -070013import net.onrc.onos.core.datastore.IMultiEntryOperation.OPERATION;
14import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
Jonathan Hart6df90172014-04-03 10:13:11 -070015import net.onrc.onos.core.datastore.ObjectDoesntExistException;
16import net.onrc.onos.core.datastore.ObjectExistsException;
17import net.onrc.onos.core.datastore.WrongVersionException;
Jonathan Hart6df90172014-04-03 10:13:11 -070018import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
19import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070020import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070021
22import org.slf4j.Logger;
23import org.slf4j.LoggerFactory;
24
25import com.hazelcast.client.HazelcastClient;
26import com.hazelcast.client.config.ClientConfig;
27import com.hazelcast.config.Config;
28import com.hazelcast.config.FileSystemXmlConfig;
29import com.hazelcast.config.MapConfig;
30import com.hazelcast.config.SerializationConfig;
31import com.hazelcast.core.Hazelcast;
32import com.hazelcast.core.HazelcastInstance;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070033import com.hazelcast.core.IAtomicLong;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070034import com.hazelcast.core.IMap;
35
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070036/**
37 * Hazelcast implementation of datastore IKVClient.
38 */
Ray Milkey1584ec82014-04-10 11:58:30 -070039public final class HZClient implements IKVClient {
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070040 private static final Logger log = LoggerFactory.getLogger(HZClient.class);
41
42 static final long VERSION_NONEXISTENT = 0L;
43
44 private static final String MAP_PREFIX = "datastore://";
45
Jonathan Hartc00f5c22014-06-10 15:14:40 -070046 private static final String BASE_CONFIG_FILENAME =
47 System.getProperty("net.onrc.onos.core.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
Yuta HIGUCHI3ebc9482014-05-08 16:28:28 -070048 private static final String HAZELCAST_DEFAULT_XML = "conf/hazelcast.default.xml";
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070049
50 // XXX Remove this mode at some point
Jonathan Hartc00f5c22014-06-10 15:14:40 -070051 private static boolean useClientMode = Boolean.parseBoolean(
52 System.getProperty("net.onrc.onos.core.datastore.hazelcast.clientMode", "true"));
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070053
54 // Note: xml configuration will overwrite this value if present
Jonathan Hartc00f5c22014-06-10 15:14:40 -070055 private static int backupCount = Integer.parseInt(
56 System.getProperty("net.onrc.onos.core.datastore.hazelcast.backupCount", "3"));
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070057
58 private final HazelcastInstance hazelcastInstance;
59
60 private static final HZClient THE_INSTANCE = new HZClient();
61
Yuta HIGUCHI8447c362014-05-30 11:32:49 -070062 /**
63 * Get DataStoreClient implemented on Hazelcast.
64 *
65 * @return HZClient
66 */
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070067 public static HZClient getClient() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070068 return THE_INSTANCE;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070069 }
70
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070071 /**
72 * Default constructor.
73 * <p/>
74 * Get or create the Hazelcast Instance to use for datastore.
75 */
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070076 private HZClient() {
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070077 // Try to get the existing HZ instance in JVM if possible.
78 HazelcastInstance instance = Hazelcast.getHazelcastInstanceByName(HazelcastDatagrid.ONOS_HAZELCAST_INSTANCE);
79 if (instance == null) {
80 log.error("Failed to get the Hazelcast instance in JVM. "
81 + "Probably DataStoreClient was requested before "
82 + "IDatagridService was started "
83 + "or running as part of unit tests. "
84 + "Creating instance on it's own.");
85 instance = getFallbackHZinstance(BASE_CONFIG_FILENAME);
86 }
87 hazelcastInstance = instance;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070088 }
89
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070090 /**
91 * Get or create the hazelcast instance to use for datastore, when existing
92 * Hazelcast instance cannot be retrieved.
93 * <p/>
94 *
95 * @param hazelcastConfigFileName Hazelcast configuration to use when creating a
96 * @return HazelcastInstance to use for datastore
97 */
98 private static HazelcastInstance getFallbackHZinstance(final String hazelcastConfigFileName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070099 Config baseHzConfig = null;
100 try {
101 baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
102 } catch (FileNotFoundException e) {
103 log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
Yuta HIGUCHI3ebc9482014-05-08 16:28:28 -0700104 // Fallback mechanism to support running unit test without setup.
105 log.error("Falling back to default Hazelcast XML {}", HAZELCAST_DEFAULT_XML);
106 try {
107 baseHzConfig = new FileSystemXmlConfig(HAZELCAST_DEFAULT_XML);
108 } catch (FileNotFoundException e2) {
109 log.error("Error opening fall back Hazelcast XML configuration. "
110 + "File not found: " + HAZELCAST_DEFAULT_XML, e2);
111
112 // intentionally throwing Exception "e" thrown from non-fallback
113 // Hazelcast configuration loading.
114 throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName, e);
115 }
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700116 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700117
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700118 // use xml config if present, if not use System.property
119 MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
120 if (mapConfig != null) {
121 backupCount = mapConfig.getBackupCount();
122 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700123
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700124 HazelcastInstance instance = null;
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -0700125 // TODO Client mode should be removed at some point.
126 // we can get HZ instance used by ONOS using getHazelcastInstanceByName
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700127 if (useClientMode) {
128 log.info("Configuring Hazelcast datastore as Client mode");
129 ClientConfig clientConfig = new ClientConfig();
130 final int port = baseHzConfig.getNetworkConfig().getPort();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700131
Jonathan Hart6df90172014-04-03 10:13:11 -0700132 String server = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.server", "localhost");
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700133 clientConfig.addAddress(server + ":" + port);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700134
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -0700135 // client mode connection limit.
136 // set to 0 for fast fall back to Instance mode.
137 String sAttempts = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit");
138 if (sAttempts != null) {
Yuta HIGUCHI0fe749a2014-05-27 09:35:16 -0700139 clientConfig.setConnectionAttemptLimit(Integer.parseInt(sAttempts));
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -0700140 }
141
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700142 // copy group config from base Hazelcast configuration
143 clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
144 clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700145
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700146 // TODO We probably need to figure out what else need to be
147 // derived from baseConfig
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700148
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700149 registerSerializer(clientConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700150
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700151 log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700152
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700153 try {
154 instance = HazelcastClient.newHazelcastClient(clientConfig);
155 if (!instance.getCluster().getMembers().isEmpty()) {
156 log.debug("Members in cluster: " + instance.getCluster().getMembers());
157 return instance;
158 }
159 log.info("Failed to find cluster member, falling back to Instance mode");
160 } catch (IllegalStateException e) {
161 log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
162 }
163 useClientMode = false;
164 instance = null;
165 }
166 log.info("Configuring Hazelcast datastore as Instance mode");
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700167
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700168 // To run 2 Hazelcast instance in 1 JVM,
169 // we probably need to something like below
170 //int port = hazelcastConfig.getNetworkConfig().getPort();
171 //hazelcastConfig.getNetworkConfig().setPort(port+1);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700172
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700173 registerSerializer(baseHzConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700174
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700175 return Hazelcast.newHazelcastInstance(baseHzConfig);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700176 }
177
178 /**
179 * Register serializer for VersionedValue class used to imitate value version.
Ray Milkey269ffb92014-04-03 14:43:30 -0700180 *
Yuta HIGUCHI8447c362014-05-30 11:32:49 -0700181 * @param config SerializationConfig to add VersionedValueSerializableFactory.
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700182 */
183 private static void registerSerializer(final SerializationConfig config) {
Yuta HIGUCHI8447c362014-05-30 11:32:49 -0700184 // TODO remove this function at some point.
185 // This method is no longer required, if equibalent to the following
186 // is defined in hazelcast.xml
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700187 config.addDataSerializableFactoryClass(
188 VersionedValueSerializableFactory.FACTORY_ID,
189 VersionedValueSerializableFactory.class);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700190 }
191
192 @Override
193 public IKVTable getTable(final String tableName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700194 IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700195
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700196 if (!useClientMode) {
197 // config only available in Instance Mode
198 // Client Mode must rely on hazelcast.xml to be properly configured.
199 MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
200 // config for this map to be strong consistent
201 if (config.isReadBackupData()) {
202 config.setReadBackupData(false);
203 }
204 if (config.isNearCacheEnabled()) {
205 config.getNearCacheConfig().setMaxSize(0);
206 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700207
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700208 if (config.getBackupCount() != backupCount) {
209 config.setAsyncBackupCount(0);
210 config.setBackupCount(backupCount);
211 }
212 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700213
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700214 return new HZTable(tableName, map);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700215 }
216
217 @Override
218 public void dropTable(final IKVTable table) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700219 ((HZTable) table).getBackendMap().clear();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700220 }
221
222 @Override
223 public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
224 throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700225 IKVTable table = (IKVTable) tableId;
226 return table.create(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700227 }
228
229 @Override
230 public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700231 IKVTable table = (IKVTable) tableId;
232 return table.forceCreate(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700233 }
234
235 @Override
236 public IKVEntry read(final IKVTableID tableId, final byte[] key)
237 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700238 IKVTable table = (IKVTable) tableId;
239 return table.read(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700240 }
241
242 @Override
243 public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
Ray Milkey269ffb92014-04-03 14:43:30 -0700244 final long version) throws ObjectDoesntExistException,
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700245 WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700246 IKVTable table = (IKVTable) tableId;
247 return table.update(key, value, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700248 }
249
250 @Override
251 public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
252 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700253 IKVTable table = (IKVTable) tableId;
254 return table.update(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700255 }
256
257 @Override
258 public long delete(final IKVTableID tableId, final byte[] key, final long version)
259 throws ObjectDoesntExistException, WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700260 IKVTable table = (IKVTable) tableId;
261 return table.delete(key, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700262 }
263
264 @Override
265 public long forceDelete(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700266 IKVTable table = (IKVTable) tableId;
267 return table.forceDelete(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700268 }
269
270 @Override
271 public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700272 IKVTable table = (IKVTable) tableId;
273 return table.getAllEntries();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700274 }
275
276 @Override
277 public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700278 final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700279 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700280 }
281
282 @Override
283 public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700284 final byte[] value) {
Jonathan Hartc00f5c22014-06-10 15:14:40 -0700285 return new HZMultiEntryOperation((HZTable) tableId, key, value,
286 HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700287 }
288
289 @Override
290 public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700291 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700292 }
293
294 @Override
295 public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700296 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700297 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700298 }
299
300 @Override
301 public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700302 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700303 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700304 }
305
306 @Override
307 public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700308 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700309 }
310
311 @Override
312 public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700313 boolean failExists = false;
314 for (IMultiEntryOperation op : ops) {
315 HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
316 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700317 case DELETE:
318 try {
319 final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
320 mop.setVersion(version);
321 mop.setStatus(STATUS.SUCCESS);
322 } catch (ObjectDoesntExistException | WrongVersionException e) {
323 log.error(mop + " failed.", e);
324 mop.setStatus(STATUS.FAILED);
325 failExists = true;
326 }
327 break;
328 case FORCE_DELETE:
329 final long version = forceDelete(mop.getTableId(), mop.getKey());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700330 mop.setVersion(version);
331 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700332 break;
333 default:
334 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700335 }
336 }
337 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700338 }
339
340 @Override
341 public boolean multiWrite(final List<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700342 // there may be room to batch to improve performance
343 boolean failExists = false;
344 for (IMultiEntryOperation op : ops) {
345 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
346 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700347 case CREATE:
348 try {
349 long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
350 mop.setVersion(version);
351 mop.setStatus(STATUS.SUCCESS);
352 } catch (ObjectExistsException e) {
353 log.error(mop + " failed.", e);
354 mop.setStatus(STATUS.FAILED);
355 failExists = true;
356 }
357 break;
358 case FORCE_CREATE: {
359 final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700360 mop.setVersion(version);
361 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700362 break;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700363 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700364 case UPDATE:
365 try {
366 long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
367 mop.setVersion(version);
368 mop.setStatus(STATUS.SUCCESS);
369 } catch (ObjectDoesntExistException | WrongVersionException e) {
370 log.error(mop + " failed.", e);
371 mop.setStatus(STATUS.FAILED);
372 failExists = true;
373 }
374 break;
375 default:
376 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700377 }
378 }
379 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700380 }
381
382 @Override
383 public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700384 boolean failExists = false;
385 for (IMultiEntryOperation op : ops) {
386 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
387 HZTable table = (HZTable) op.getTableId();
388 ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
389 }
390 for (IMultiEntryOperation op : ops) {
391 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
Ray Milkey1aa71f82014-04-08 16:23:24 -0700392 if (!mop.hasSucceeded()) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700393 failExists = true;
394 }
395 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700396
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700397 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700398 }
399
400 @Override
Ray Milkey7531a342014-04-11 15:08:12 -0700401 public long getVersionNonexistant() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700402 return VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700403 }
404
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700405 private String getCounterName(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700406 StringBuilder buf = new StringBuilder(tableId.getTableName());
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700407 buf.append('@');
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700408 ByteArrayUtil.toHexStringBuilder(key, ":", buf);
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700409 return buf.toString();
410 }
411
412 private IAtomicLong getAtomicLong(final IKVTableID tableId, final byte[] key) {
413 // TODO we probably want to implement some sort of caching
414 return hazelcastInstance.getAtomicLong(getCounterName(tableId, key));
415 }
416
417 /**
418 * {@inheritDoc}
419 * <p />
420 * Warning: The counter is a different object from {@code key} entry on
421 * IKVTable with {@code tableId}. You cannot use table API to read/write
422 * counters.
423 *
424 * @param tableId Only getTableName() will be used.
425 * @param key tableId + key will be used as Counter name
426 */
427 @Override
428 public void createCounter(final IKVTableID tableId,
429 final byte[] key, final long initialValue)
430 throws ObjectExistsException {
431
432 IAtomicLong counter = getAtomicLong(tableId, key);
433 // Assumption here is that AtomicLong is initialized to 0L
434 final boolean success = counter.compareAndSet(0L, initialValue);
435 if (!success) {
436 throw new ObjectExistsException("Atomic counter "
437 + getCounterName(tableId, key)
438 + " already exist with value:" + counter.get());
439 }
440 }
441
442 @Override
443 public void setCounter(final IKVTableID tableId,
444 final byte[] key, final long value) {
445
446 IAtomicLong counter = getAtomicLong(tableId, key);
447 counter.set(value);
448 }
449
450 @Override
451 public long incrementCounter(final IKVTableID tableId,
452 final byte[] key, final long incrementValue) {
453
454 IAtomicLong counter = getAtomicLong(tableId, key);
455 return counter.addAndGet(incrementValue);
456 }
457
458 @Override
459 public void destroyCounter(final IKVTableID tableId, final byte[] key) {
460 IAtomicLong counter = getAtomicLong(tableId, key);
461 counter.destroy();
462
463 }
464
465 @Override
466 public long getCounter(final IKVTableID tableId, final byte[] key)
467 throws ObjectDoesntExistException {
468
469 IAtomicLong counter = getAtomicLong(tableId, key);
470 return counter.get();
471 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700472
473}