blob: dc9d2f824bb5c29904430724e8b5d3bf250d1a64 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.hazelcast;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07002
3import java.io.FileNotFoundException;
4import java.util.Collection;
5import java.util.List;
6
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -07007import net.onrc.onos.core.datagrid.HazelcastDatagrid;
Jonathan Hart6df90172014-04-03 10:13:11 -07008import net.onrc.onos.core.datastore.IKVClient;
9import net.onrc.onos.core.datastore.IKVTable;
Jonathan Harta99ec672014-04-03 11:30:34 -070010import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
Jonathan Hart6df90172014-04-03 10:13:11 -070011import net.onrc.onos.core.datastore.IKVTableID;
12import net.onrc.onos.core.datastore.IMultiEntryOperation;
Jonathan Harta99ec672014-04-03 11:30:34 -070013import net.onrc.onos.core.datastore.IMultiEntryOperation.OPERATION;
14import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
Jonathan Hart6df90172014-04-03 10:13:11 -070015import net.onrc.onos.core.datastore.ObjectDoesntExistException;
16import net.onrc.onos.core.datastore.ObjectExistsException;
17import net.onrc.onos.core.datastore.WrongVersionException;
Jonathan Hart6df90172014-04-03 10:13:11 -070018import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
19import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070020import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070021
22import org.slf4j.Logger;
23import org.slf4j.LoggerFactory;
24
25import com.hazelcast.client.HazelcastClient;
26import com.hazelcast.client.config.ClientConfig;
27import com.hazelcast.config.Config;
28import com.hazelcast.config.FileSystemXmlConfig;
29import com.hazelcast.config.MapConfig;
30import com.hazelcast.config.SerializationConfig;
31import com.hazelcast.core.Hazelcast;
32import com.hazelcast.core.HazelcastInstance;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070033import com.hazelcast.core.IAtomicLong;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070034import com.hazelcast.core.IMap;
35
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070036/**
37 * Hazelcast implementation of datastore IKVClient.
38 */
Ray Milkey1584ec82014-04-10 11:58:30 -070039public final class HZClient implements IKVClient {
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070040 private static final Logger log = LoggerFactory.getLogger(HZClient.class);
41
42 static final long VERSION_NONEXISTENT = 0L;
43
44 private static final String MAP_PREFIX = "datastore://";
45
Jonathan Hartc00f5c22014-06-10 15:14:40 -070046 private static final String BASE_CONFIG_FILENAME =
47 System.getProperty("net.onrc.onos.core.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
Yuta HIGUCHI3ebc9482014-05-08 16:28:28 -070048 private static final String HAZELCAST_DEFAULT_XML = "conf/hazelcast.default.xml";
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070049
50 // XXX Remove this mode at some point
Jonathan Hartc00f5c22014-06-10 15:14:40 -070051 private static boolean useClientMode = Boolean.parseBoolean(
52 System.getProperty("net.onrc.onos.core.datastore.hazelcast.clientMode", "true"));
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070053
54 // Note: xml configuration will overwrite this value if present
Jonathan Hartc00f5c22014-06-10 15:14:40 -070055 private static int backupCount = Integer.parseInt(
56 System.getProperty("net.onrc.onos.core.datastore.hazelcast.backupCount", "3"));
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070057
58 private final HazelcastInstance hazelcastInstance;
59
60 private static final HZClient THE_INSTANCE = new HZClient();
61
Yuta HIGUCHI8447c362014-05-30 11:32:49 -070062 /**
63 * Get DataStoreClient implemented on Hazelcast.
64 *
65 * @return HZClient
66 */
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070067 public static HZClient getClient() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070068 return THE_INSTANCE;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070069 }
70
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070071 /**
72 * Default constructor.
73 * <p/>
74 * Get or create the Hazelcast Instance to use for datastore.
75 */
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070076 private HZClient() {
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070077 // Try to get the existing HZ instance in JVM if possible.
78 HazelcastInstance instance = Hazelcast.getHazelcastInstanceByName(HazelcastDatagrid.ONOS_HAZELCAST_INSTANCE);
79 if (instance == null) {
80 log.error("Failed to get the Hazelcast instance in JVM. "
81 + "Probably DataStoreClient was requested before "
82 + "IDatagridService was started "
83 + "or running as part of unit tests. "
84 + "Creating instance on it's own.");
85 instance = getFallbackHZinstance(BASE_CONFIG_FILENAME);
86 }
87 hazelcastInstance = instance;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070088 }
89
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070090 /**
91 * Get or create the hazelcast instance to use for datastore, when existing
92 * Hazelcast instance cannot be retrieved.
93 * <p/>
94 *
95 * @param hazelcastConfigFileName Hazelcast configuration to use when creating a
96 * @return HazelcastInstance to use for datastore
97 */
98 private static HazelcastInstance getFallbackHZinstance(final String hazelcastConfigFileName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070099 Config baseHzConfig = null;
100 try {
101 baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
102 } catch (FileNotFoundException e) {
103 log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
Yuta HIGUCHI3ebc9482014-05-08 16:28:28 -0700104 // Fallback mechanism to support running unit test without setup.
105 log.error("Falling back to default Hazelcast XML {}", HAZELCAST_DEFAULT_XML);
106 try {
107 baseHzConfig = new FileSystemXmlConfig(HAZELCAST_DEFAULT_XML);
108 } catch (FileNotFoundException e2) {
109 log.error("Error opening fall back Hazelcast XML configuration. "
110 + "File not found: " + HAZELCAST_DEFAULT_XML, e2);
111
112 // intentionally throwing Exception "e" thrown from non-fallback
113 // Hazelcast configuration loading.
114 throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName, e);
115 }
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700116 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700117
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700118 // use xml config if present, if not use System.property
119 MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
120 if (mapConfig != null) {
121 backupCount = mapConfig.getBackupCount();
122 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700123
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700124 HazelcastInstance instance = null;
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -0700125 // TODO Client mode should be removed at some point.
126 // we can get HZ instance used by ONOS using getHazelcastInstanceByName
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700127 if (useClientMode) {
128 log.info("Configuring Hazelcast datastore as Client mode");
129 ClientConfig clientConfig = new ClientConfig();
130 final int port = baseHzConfig.getNetworkConfig().getPort();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700131
Jonathan Hart6df90172014-04-03 10:13:11 -0700132 String server = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.server", "localhost");
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700133 clientConfig.addAddress(server + ":" + port);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700134
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -0700135 // client mode connection limit.
136 // set to 0 for fast fall back to Instance mode.
137 String sAttempts = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit");
138 if (sAttempts != null) {
Yuta HIGUCHI0fe749a2014-05-27 09:35:16 -0700139 clientConfig.setConnectionAttemptLimit(Integer.parseInt(sAttempts));
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -0700140 }
141
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700142 // copy group config from base Hazelcast configuration
143 clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
144 clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700145
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700146 // TODO We probably need to figure out what else need to be
147 // derived from baseConfig
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700148
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700149 registerSerializer(clientConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700150
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700151 log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700152
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700153 try {
154 instance = HazelcastClient.newHazelcastClient(clientConfig);
155 if (!instance.getCluster().getMembers().isEmpty()) {
156 log.debug("Members in cluster: " + instance.getCluster().getMembers());
157 return instance;
158 }
159 log.info("Failed to find cluster member, falling back to Instance mode");
160 } catch (IllegalStateException e) {
161 log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
162 }
163 useClientMode = false;
164 instance = null;
165 }
166 log.info("Configuring Hazelcast datastore as Instance mode");
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700167
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700168 // To run 2 Hazelcast instance in 1 JVM,
169 // we probably need to something like below
170 //int port = hazelcastConfig.getNetworkConfig().getPort();
171 //hazelcastConfig.getNetworkConfig().setPort(port+1);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700172
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700173 registerSerializer(baseHzConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700174
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700175 return Hazelcast.newHazelcastInstance(baseHzConfig);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700176 }
177
178 /**
179 * Register serializer for VersionedValue class used to imitate value version.
Ray Milkey269ffb92014-04-03 14:43:30 -0700180 *
Yuta HIGUCHI8447c362014-05-30 11:32:49 -0700181 * @param config SerializationConfig to add VersionedValueSerializableFactory.
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700182 */
183 private static void registerSerializer(final SerializationConfig config) {
Yuta HIGUCHI8447c362014-05-30 11:32:49 -0700184 // TODO remove this function at some point.
185 // This method is no longer required, if equibalent to the following
186 // is defined in hazelcast.xml
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700187 config.addDataSerializableFactoryClass(
188 VersionedValueSerializableFactory.FACTORY_ID,
189 VersionedValueSerializableFactory.class);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700190 }
191
Yuta HIGUCHI18e0db82014-06-16 09:44:21 -0700192
193 /**
194 * Gets the HazelcastInstance object.
195 *
196 * @return HazelcastInstance
197 */
198 HazelcastInstance getHZInstance() {
199 return hazelcastInstance;
200 }
201
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700202 @Override
203 public IKVTable getTable(final String tableName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700204 IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700205
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700206 if (!useClientMode) {
207 // config only available in Instance Mode
208 // Client Mode must rely on hazelcast.xml to be properly configured.
209 MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
210 // config for this map to be strong consistent
211 if (config.isReadBackupData()) {
212 config.setReadBackupData(false);
213 }
214 if (config.isNearCacheEnabled()) {
215 config.getNearCacheConfig().setMaxSize(0);
216 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700217
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700218 if (config.getBackupCount() != backupCount) {
219 config.setAsyncBackupCount(0);
220 config.setBackupCount(backupCount);
221 }
222 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700223
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700224 return new HZTable(tableName, map);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700225 }
226
227 @Override
228 public void dropTable(final IKVTable table) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700229 ((HZTable) table).getBackendMap().clear();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700230 }
231
232 @Override
233 public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
234 throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700235 IKVTable table = (IKVTable) tableId;
236 return table.create(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700237 }
238
239 @Override
240 public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700241 IKVTable table = (IKVTable) tableId;
242 return table.forceCreate(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700243 }
244
245 @Override
246 public IKVEntry read(final IKVTableID tableId, final byte[] key)
247 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700248 IKVTable table = (IKVTable) tableId;
249 return table.read(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700250 }
251
252 @Override
253 public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
Ray Milkey269ffb92014-04-03 14:43:30 -0700254 final long version) throws ObjectDoesntExistException,
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700255 WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700256 IKVTable table = (IKVTable) tableId;
257 return table.update(key, value, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700258 }
259
260 @Override
261 public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
262 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700263 IKVTable table = (IKVTable) tableId;
264 return table.update(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700265 }
266
267 @Override
268 public long delete(final IKVTableID tableId, final byte[] key, final long version)
269 throws ObjectDoesntExistException, WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700270 IKVTable table = (IKVTable) tableId;
271 return table.delete(key, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700272 }
273
274 @Override
275 public long forceDelete(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700276 IKVTable table = (IKVTable) tableId;
277 return table.forceDelete(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700278 }
279
280 @Override
281 public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700282 IKVTable table = (IKVTable) tableId;
283 return table.getAllEntries();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700284 }
285
286 @Override
287 public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700288 final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700289 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700290 }
291
292 @Override
293 public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700294 final byte[] value) {
Jonathan Hartc00f5c22014-06-10 15:14:40 -0700295 return new HZMultiEntryOperation((HZTable) tableId, key, value,
296 HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700297 }
298
299 @Override
300 public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700301 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700302 }
303
304 @Override
305 public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700306 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700307 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700308 }
309
310 @Override
311 public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700312 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700313 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700314 }
315
316 @Override
317 public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700318 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700319 }
320
321 @Override
322 public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700323 boolean failExists = false;
324 for (IMultiEntryOperation op : ops) {
325 HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
326 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700327 case DELETE:
328 try {
329 final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
330 mop.setVersion(version);
331 mop.setStatus(STATUS.SUCCESS);
332 } catch (ObjectDoesntExistException | WrongVersionException e) {
333 log.error(mop + " failed.", e);
334 mop.setStatus(STATUS.FAILED);
335 failExists = true;
336 }
337 break;
338 case FORCE_DELETE:
339 final long version = forceDelete(mop.getTableId(), mop.getKey());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700340 mop.setVersion(version);
341 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700342 break;
343 default:
344 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700345 }
346 }
347 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700348 }
349
350 @Override
351 public boolean multiWrite(final List<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700352 // there may be room to batch to improve performance
353 boolean failExists = false;
354 for (IMultiEntryOperation op : ops) {
355 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
356 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700357 case CREATE:
358 try {
359 long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
360 mop.setVersion(version);
361 mop.setStatus(STATUS.SUCCESS);
362 } catch (ObjectExistsException e) {
363 log.error(mop + " failed.", e);
364 mop.setStatus(STATUS.FAILED);
365 failExists = true;
366 }
367 break;
368 case FORCE_CREATE: {
369 final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700370 mop.setVersion(version);
371 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700372 break;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700373 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700374 case UPDATE:
375 try {
376 long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
377 mop.setVersion(version);
378 mop.setStatus(STATUS.SUCCESS);
379 } catch (ObjectDoesntExistException | WrongVersionException e) {
380 log.error(mop + " failed.", e);
381 mop.setStatus(STATUS.FAILED);
382 failExists = true;
383 }
384 break;
385 default:
386 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700387 }
388 }
389 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700390 }
391
392 @Override
393 public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700394 boolean failExists = false;
395 for (IMultiEntryOperation op : ops) {
396 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
397 HZTable table = (HZTable) op.getTableId();
398 ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
399 }
400 for (IMultiEntryOperation op : ops) {
401 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
Ray Milkey1aa71f82014-04-08 16:23:24 -0700402 if (!mop.hasSucceeded()) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700403 failExists = true;
404 }
405 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700406
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700407 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700408 }
409
410 @Override
Ray Milkey7531a342014-04-11 15:08:12 -0700411 public long getVersionNonexistant() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700412 return VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700413 }
414
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700415 private String getCounterName(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700416 StringBuilder buf = new StringBuilder(tableId.getTableName());
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700417 buf.append('@');
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700418 ByteArrayUtil.toHexStringBuilder(key, ":", buf);
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700419 return buf.toString();
420 }
421
422 private IAtomicLong getAtomicLong(final IKVTableID tableId, final byte[] key) {
423 // TODO we probably want to implement some sort of caching
424 return hazelcastInstance.getAtomicLong(getCounterName(tableId, key));
425 }
426
427 /**
428 * {@inheritDoc}
429 * <p />
430 * Warning: The counter is a different object from {@code key} entry on
431 * IKVTable with {@code tableId}. You cannot use table API to read/write
432 * counters.
433 *
434 * @param tableId Only getTableName() will be used.
435 * @param key tableId + key will be used as Counter name
436 */
437 @Override
438 public void createCounter(final IKVTableID tableId,
439 final byte[] key, final long initialValue)
440 throws ObjectExistsException {
441
442 IAtomicLong counter = getAtomicLong(tableId, key);
443 // Assumption here is that AtomicLong is initialized to 0L
444 final boolean success = counter.compareAndSet(0L, initialValue);
445 if (!success) {
446 throw new ObjectExistsException("Atomic counter "
447 + getCounterName(tableId, key)
448 + " already exist with value:" + counter.get());
449 }
450 }
451
452 @Override
453 public void setCounter(final IKVTableID tableId,
454 final byte[] key, final long value) {
455
456 IAtomicLong counter = getAtomicLong(tableId, key);
457 counter.set(value);
458 }
459
460 @Override
461 public long incrementCounter(final IKVTableID tableId,
462 final byte[] key, final long incrementValue) {
463
464 IAtomicLong counter = getAtomicLong(tableId, key);
465 return counter.addAndGet(incrementValue);
466 }
467
468 @Override
469 public void destroyCounter(final IKVTableID tableId, final byte[] key) {
470 IAtomicLong counter = getAtomicLong(tableId, key);
471 counter.destroy();
472
473 }
474
475 @Override
476 public long getCounter(final IKVTableID tableId, final byte[] key)
477 throws ObjectDoesntExistException {
478
479 IAtomicLong counter = getAtomicLong(tableId, key);
480 return counter.get();
481 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700482
483}