blob: eaa647c829c46c3ce1e90759090dd388f9f3e481 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.hazelcast;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07002
3import java.io.FileNotFoundException;
4import java.util.Collection;
5import java.util.List;
6
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -07007import net.onrc.onos.core.datagrid.HazelcastDatagrid;
Jonathan Hart6df90172014-04-03 10:13:11 -07008import net.onrc.onos.core.datastore.IKVClient;
9import net.onrc.onos.core.datastore.IKVTable;
Jonathan Harta99ec672014-04-03 11:30:34 -070010import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
Jonathan Hart6df90172014-04-03 10:13:11 -070011import net.onrc.onos.core.datastore.IKVTableID;
12import net.onrc.onos.core.datastore.IMultiEntryOperation;
Jonathan Harta99ec672014-04-03 11:30:34 -070013import net.onrc.onos.core.datastore.IMultiEntryOperation.OPERATION;
14import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
Jonathan Hart6df90172014-04-03 10:13:11 -070015import net.onrc.onos.core.datastore.ObjectDoesntExistException;
16import net.onrc.onos.core.datastore.ObjectExistsException;
17import net.onrc.onos.core.datastore.WrongVersionException;
Jonathan Hart6df90172014-04-03 10:13:11 -070018import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
19import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070020import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070021
22import org.slf4j.Logger;
23import org.slf4j.LoggerFactory;
24
25import com.hazelcast.client.HazelcastClient;
26import com.hazelcast.client.config.ClientConfig;
27import com.hazelcast.config.Config;
28import com.hazelcast.config.FileSystemXmlConfig;
29import com.hazelcast.config.MapConfig;
30import com.hazelcast.config.SerializationConfig;
31import com.hazelcast.core.Hazelcast;
32import com.hazelcast.core.HazelcastInstance;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070033import com.hazelcast.core.IAtomicLong;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070034import com.hazelcast.core.IMap;
35
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070036/**
37 * Hazelcast implementation of datastore IKVClient.
38 */
Ray Milkey1584ec82014-04-10 11:58:30 -070039public final class HZClient implements IKVClient {
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070040 private static final Logger log = LoggerFactory.getLogger(HZClient.class);
41
42 static final long VERSION_NONEXISTENT = 0L;
43
44 private static final String MAP_PREFIX = "datastore://";
45
Jonathan Hartc00f5c22014-06-10 15:14:40 -070046 private static final String BASE_CONFIG_FILENAME =
47 System.getProperty("net.onrc.onos.core.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
Yuta HIGUCHI3ebc9482014-05-08 16:28:28 -070048 private static final String HAZELCAST_DEFAULT_XML = "conf/hazelcast.default.xml";
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070049
50 // XXX Remove this mode at some point
Jonathan Hartc00f5c22014-06-10 15:14:40 -070051 private static boolean useClientMode = Boolean.parseBoolean(
52 System.getProperty("net.onrc.onos.core.datastore.hazelcast.clientMode", "true"));
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070053
54 // Note: xml configuration will overwrite this value if present
Jonathan Hartc00f5c22014-06-10 15:14:40 -070055 private static int backupCount = Integer.parseInt(
56 System.getProperty("net.onrc.onos.core.datastore.hazelcast.backupCount", "3"));
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070057
58 private final HazelcastInstance hazelcastInstance;
59
60 private static final HZClient THE_INSTANCE = new HZClient();
61
Yuta HIGUCHI8447c362014-05-30 11:32:49 -070062 /**
63 * Get DataStoreClient implemented on Hazelcast.
64 *
65 * @return HZClient
66 */
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070067 public static HZClient getClient() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070068 return THE_INSTANCE;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070069 }
70
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070071 /**
72 * Default constructor.
73 * <p/>
74 * Get or create the Hazelcast Instance to use for datastore.
75 */
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070076 private HZClient() {
Yuta HIGUCHI448bca02014-08-13 17:08:00 -070077 Config config = HazelcastDatagrid.loadHazelcastConfig(BASE_CONFIG_FILENAME);
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070078 // Try to get the existing HZ instance in JVM if possible.
Yuta HIGUCHI448bca02014-08-13 17:08:00 -070079 HazelcastInstance instance = Hazelcast.getOrCreateHazelcastInstance(config);
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070080 if (instance == null) {
81 log.error("Failed to get the Hazelcast instance in JVM. "
82 + "Probably DataStoreClient was requested before "
83 + "IDatagridService was started "
84 + "or running as part of unit tests. "
85 + "Creating instance on it's own.");
86 instance = getFallbackHZinstance(BASE_CONFIG_FILENAME);
87 }
88 hazelcastInstance = instance;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070089 }
90
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070091 /**
92 * Get or create the hazelcast instance to use for datastore, when existing
93 * Hazelcast instance cannot be retrieved.
94 * <p/>
95 *
96 * @param hazelcastConfigFileName Hazelcast configuration to use when creating a
97 * @return HazelcastInstance to use for datastore
98 */
99 private static HazelcastInstance getFallbackHZinstance(final String hazelcastConfigFileName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700100 Config baseHzConfig = null;
101 try {
102 baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
103 } catch (FileNotFoundException e) {
104 log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
Yuta HIGUCHI3ebc9482014-05-08 16:28:28 -0700105 // Fallback mechanism to support running unit test without setup.
106 log.error("Falling back to default Hazelcast XML {}", HAZELCAST_DEFAULT_XML);
107 try {
108 baseHzConfig = new FileSystemXmlConfig(HAZELCAST_DEFAULT_XML);
109 } catch (FileNotFoundException e2) {
110 log.error("Error opening fall back Hazelcast XML configuration. "
111 + "File not found: " + HAZELCAST_DEFAULT_XML, e2);
112
113 // intentionally throwing Exception "e" thrown from non-fallback
114 // Hazelcast configuration loading.
Ray Milkey4373cbe2014-08-12 09:58:58 -0700115 throw new IllegalStateException("Cannot find Hazelcast configuration: " + hazelcastConfigFileName, e);
Yuta HIGUCHI3ebc9482014-05-08 16:28:28 -0700116 }
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700117 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700118
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700119 // use xml config if present, if not use System.property
120 MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
121 if (mapConfig != null) {
122 backupCount = mapConfig.getBackupCount();
123 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700124
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700125 HazelcastInstance instance = null;
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -0700126 // TODO Client mode should be removed at some point.
127 // we can get HZ instance used by ONOS using getHazelcastInstanceByName
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700128 if (useClientMode) {
129 log.info("Configuring Hazelcast datastore as Client mode");
130 ClientConfig clientConfig = new ClientConfig();
131 final int port = baseHzConfig.getNetworkConfig().getPort();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700132
Jonathan Hart6df90172014-04-03 10:13:11 -0700133 String server = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.server", "localhost");
Yuta HIGUCHI33a04972014-06-03 13:00:15 -0700134 clientConfig.getNetworkConfig().addAddress(server + ":" + port);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700135
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -0700136 // client mode connection limit.
137 // set to 0 for fast fall back to Instance mode.
138 String sAttempts = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit");
139 if (sAttempts != null) {
Yuta HIGUCHI33a04972014-06-03 13:00:15 -0700140 clientConfig.getNetworkConfig().setConnectionAttemptLimit(Integer.parseInt(sAttempts));
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -0700141 }
142
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700143 // copy group config from base Hazelcast configuration
144 clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
145 clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700146
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700147 // TODO We probably need to figure out what else need to be
148 // derived from baseConfig
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700149
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700150 registerSerializer(clientConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700151
Yuta HIGUCHI33a04972014-06-03 13:00:15 -0700152 log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getNetworkConfig().getAddresses());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700153
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700154 try {
155 instance = HazelcastClient.newHazelcastClient(clientConfig);
156 if (!instance.getCluster().getMembers().isEmpty()) {
157 log.debug("Members in cluster: " + instance.getCluster().getMembers());
158 return instance;
159 }
160 log.info("Failed to find cluster member, falling back to Instance mode");
161 } catch (IllegalStateException e) {
162 log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
163 }
164 useClientMode = false;
165 instance = null;
166 }
167 log.info("Configuring Hazelcast datastore as Instance mode");
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700168
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700169 // To run 2 Hazelcast instance in 1 JVM,
170 // we probably need to something like below
171 //int port = hazelcastConfig.getNetworkConfig().getPort();
172 //hazelcastConfig.getNetworkConfig().setPort(port+1);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700173
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700174 registerSerializer(baseHzConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700175
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700176 return Hazelcast.newHazelcastInstance(baseHzConfig);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700177 }
178
179 /**
180 * Register serializer for VersionedValue class used to imitate value version.
Ray Milkey269ffb92014-04-03 14:43:30 -0700181 *
Yuta HIGUCHI8447c362014-05-30 11:32:49 -0700182 * @param config SerializationConfig to add VersionedValueSerializableFactory.
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700183 */
184 private static void registerSerializer(final SerializationConfig config) {
Yuta HIGUCHI8447c362014-05-30 11:32:49 -0700185 // TODO remove this function at some point.
186 // This method is no longer required, if equibalent to the following
187 // is defined in hazelcast.xml
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700188 config.addDataSerializableFactoryClass(
189 VersionedValueSerializableFactory.FACTORY_ID,
190 VersionedValueSerializableFactory.class);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700191 }
192
Yuta HIGUCHI18e0db82014-06-16 09:44:21 -0700193
194 /**
195 * Gets the HazelcastInstance object.
196 *
197 * @return HazelcastInstance
198 */
199 HazelcastInstance getHZInstance() {
200 return hazelcastInstance;
201 }
202
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700203 @Override
204 public IKVTable getTable(final String tableName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700205 IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700206
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700207 if (!useClientMode) {
208 // config only available in Instance Mode
209 // Client Mode must rely on hazelcast.xml to be properly configured.
210 MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
211 // config for this map to be strong consistent
212 if (config.isReadBackupData()) {
213 config.setReadBackupData(false);
214 }
215 if (config.isNearCacheEnabled()) {
216 config.getNearCacheConfig().setMaxSize(0);
217 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700218
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700219 if (config.getBackupCount() != backupCount) {
220 config.setAsyncBackupCount(0);
221 config.setBackupCount(backupCount);
222 }
223 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700224
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700225 return new HZTable(tableName, map);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700226 }
227
228 @Override
229 public void dropTable(final IKVTable table) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700230 ((HZTable) table).getBackendMap().clear();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700231 }
232
233 @Override
234 public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
235 throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700236 IKVTable table = (IKVTable) tableId;
237 return table.create(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700238 }
239
240 @Override
241 public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700242 IKVTable table = (IKVTable) tableId;
243 return table.forceCreate(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700244 }
245
246 @Override
247 public IKVEntry read(final IKVTableID tableId, final byte[] key)
248 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700249 IKVTable table = (IKVTable) tableId;
250 return table.read(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700251 }
252
253 @Override
254 public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
Ray Milkey269ffb92014-04-03 14:43:30 -0700255 final long version) throws ObjectDoesntExistException,
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700256 WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700257 IKVTable table = (IKVTable) tableId;
258 return table.update(key, value, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700259 }
260
261 @Override
262 public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
263 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700264 IKVTable table = (IKVTable) tableId;
265 return table.update(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700266 }
267
268 @Override
269 public long delete(final IKVTableID tableId, final byte[] key, final long version)
270 throws ObjectDoesntExistException, WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700271 IKVTable table = (IKVTable) tableId;
272 return table.delete(key, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700273 }
274
275 @Override
276 public long forceDelete(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700277 IKVTable table = (IKVTable) tableId;
278 return table.forceDelete(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700279 }
280
281 @Override
282 public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700283 IKVTable table = (IKVTable) tableId;
284 return table.getAllEntries();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700285 }
286
287 @Override
288 public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700289 final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700290 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700291 }
292
293 @Override
294 public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700295 final byte[] value) {
Jonathan Hartc00f5c22014-06-10 15:14:40 -0700296 return new HZMultiEntryOperation((HZTable) tableId, key, value,
297 HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700298 }
299
300 @Override
301 public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700302 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700303 }
304
305 @Override
306 public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700307 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700308 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700309 }
310
311 @Override
312 public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700313 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700314 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700315 }
316
317 @Override
318 public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700319 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700320 }
321
322 @Override
323 public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700324 boolean failExists = false;
325 for (IMultiEntryOperation op : ops) {
326 HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
327 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700328 case DELETE:
329 try {
330 final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
331 mop.setVersion(version);
332 mop.setStatus(STATUS.SUCCESS);
333 } catch (ObjectDoesntExistException | WrongVersionException e) {
334 log.error(mop + " failed.", e);
335 mop.setStatus(STATUS.FAILED);
336 failExists = true;
337 }
338 break;
339 case FORCE_DELETE:
340 final long version = forceDelete(mop.getTableId(), mop.getKey());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700341 mop.setVersion(version);
342 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700343 break;
344 default:
345 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700346 }
347 }
348 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700349 }
350
351 @Override
352 public boolean multiWrite(final List<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700353 // there may be room to batch to improve performance
354 boolean failExists = false;
355 for (IMultiEntryOperation op : ops) {
356 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
357 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700358 case CREATE:
359 try {
360 long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
361 mop.setVersion(version);
362 mop.setStatus(STATUS.SUCCESS);
363 } catch (ObjectExistsException e) {
364 log.error(mop + " failed.", e);
365 mop.setStatus(STATUS.FAILED);
366 failExists = true;
367 }
368 break;
369 case FORCE_CREATE: {
370 final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700371 mop.setVersion(version);
372 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700373 break;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700374 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700375 case UPDATE:
376 try {
377 long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
378 mop.setVersion(version);
379 mop.setStatus(STATUS.SUCCESS);
380 } catch (ObjectDoesntExistException | WrongVersionException e) {
381 log.error(mop + " failed.", e);
382 mop.setStatus(STATUS.FAILED);
383 failExists = true;
384 }
385 break;
386 default:
387 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700388 }
389 }
390 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700391 }
392
393 @Override
394 public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700395 boolean failExists = false;
396 for (IMultiEntryOperation op : ops) {
397 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
398 HZTable table = (HZTable) op.getTableId();
399 ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
400 }
401 for (IMultiEntryOperation op : ops) {
402 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
Ray Milkey1aa71f82014-04-08 16:23:24 -0700403 if (!mop.hasSucceeded()) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700404 failExists = true;
405 }
406 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700407
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700408 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700409 }
410
411 @Override
Ray Milkey7531a342014-04-11 15:08:12 -0700412 public long getVersionNonexistant() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700413 return VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700414 }
415
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700416 private String getCounterName(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700417 StringBuilder buf = new StringBuilder(tableId.getTableName());
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700418 buf.append('@');
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700419 ByteArrayUtil.toHexStringBuilder(key, ":", buf);
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700420 return buf.toString();
421 }
422
423 private IAtomicLong getAtomicLong(final IKVTableID tableId, final byte[] key) {
424 // TODO we probably want to implement some sort of caching
425 return hazelcastInstance.getAtomicLong(getCounterName(tableId, key));
426 }
427
428 /**
429 * {@inheritDoc}
430 * <p />
431 * Warning: The counter is a different object from {@code key} entry on
432 * IKVTable with {@code tableId}. You cannot use table API to read/write
433 * counters.
434 *
435 * @param tableId Only getTableName() will be used.
436 * @param key tableId + key will be used as Counter name
437 */
438 @Override
439 public void createCounter(final IKVTableID tableId,
440 final byte[] key, final long initialValue)
441 throws ObjectExistsException {
442
443 IAtomicLong counter = getAtomicLong(tableId, key);
444 // Assumption here is that AtomicLong is initialized to 0L
445 final boolean success = counter.compareAndSet(0L, initialValue);
446 if (!success) {
447 throw new ObjectExistsException("Atomic counter "
448 + getCounterName(tableId, key)
449 + " already exist with value:" + counter.get());
450 }
451 }
452
453 @Override
454 public void setCounter(final IKVTableID tableId,
455 final byte[] key, final long value) {
456
457 IAtomicLong counter = getAtomicLong(tableId, key);
458 counter.set(value);
459 }
460
461 @Override
462 public long incrementCounter(final IKVTableID tableId,
463 final byte[] key, final long incrementValue) {
464
465 IAtomicLong counter = getAtomicLong(tableId, key);
466 return counter.addAndGet(incrementValue);
467 }
468
469 @Override
470 public void destroyCounter(final IKVTableID tableId, final byte[] key) {
471 IAtomicLong counter = getAtomicLong(tableId, key);
472 counter.destroy();
473
474 }
475
476 @Override
477 public long getCounter(final IKVTableID tableId, final byte[] key)
478 throws ObjectDoesntExistException {
479
480 IAtomicLong counter = getAtomicLong(tableId, key);
481 return counter.get();
482 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700483
484}