blob: f45483c6881f7f00fa66416c7246759248938dd1 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.hazelcast;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07002
3import java.io.FileNotFoundException;
4import java.util.Collection;
5import java.util.List;
6
Jonathan Hart6df90172014-04-03 10:13:11 -07007import net.onrc.onos.core.datastore.IKVClient;
8import net.onrc.onos.core.datastore.IKVTable;
Jonathan Harta99ec672014-04-03 11:30:34 -07009import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
Jonathan Hart6df90172014-04-03 10:13:11 -070010import net.onrc.onos.core.datastore.IKVTableID;
11import net.onrc.onos.core.datastore.IMultiEntryOperation;
Jonathan Harta99ec672014-04-03 11:30:34 -070012import net.onrc.onos.core.datastore.IMultiEntryOperation.OPERATION;
13import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
Jonathan Hart6df90172014-04-03 10:13:11 -070014import net.onrc.onos.core.datastore.ObjectDoesntExistException;
15import net.onrc.onos.core.datastore.ObjectExistsException;
16import net.onrc.onos.core.datastore.WrongVersionException;
Jonathan Hart6df90172014-04-03 10:13:11 -070017import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
18import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070019import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070020
21import org.slf4j.Logger;
22import org.slf4j.LoggerFactory;
23
24import com.hazelcast.client.HazelcastClient;
25import com.hazelcast.client.config.ClientConfig;
26import com.hazelcast.config.Config;
27import com.hazelcast.config.FileSystemXmlConfig;
28import com.hazelcast.config.MapConfig;
29import com.hazelcast.config.SerializationConfig;
30import com.hazelcast.core.Hazelcast;
31import com.hazelcast.core.HazelcastInstance;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070032import com.hazelcast.core.IAtomicLong;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070033import com.hazelcast.core.IMap;
34
Ray Milkey1584ec82014-04-10 11:58:30 -070035public final class HZClient implements IKVClient {
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070036 private static final Logger log = LoggerFactory.getLogger(HZClient.class);
37
38 static final long VERSION_NONEXISTENT = 0L;
39
40 private static final String MAP_PREFIX = "datastore://";
41
42 // make this path configurable
Jonathan Hart6df90172014-04-03 10:13:11 -070043 private static final String BASE_CONFIG_FILENAME = System.getProperty("net.onrc.onos.core.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
44 private static boolean useClientMode = Boolean.parseBoolean(System.getProperty("net.onrc.onos.core.datastore.hazelcast.clientMode", "true"));
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070045
46 // Note: xml configuration will overwrite this value if present
Jonathan Hart6df90172014-04-03 10:13:11 -070047 private static int backupCount = Integer.valueOf(System.getProperty("net.onrc.onos.core.datastore.hazelcast.backupCount", "3"));
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070048
49 private final HazelcastInstance hazelcastInstance;
50
51 private static final HZClient THE_INSTANCE = new HZClient();
52
53 public static HZClient getClient() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070054 return THE_INSTANCE;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070055 }
56
57 private HZClient() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070058 hazelcastInstance = getHZinstance(BASE_CONFIG_FILENAME);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070059 }
60
61 private static HazelcastInstance getHZinstance(final String hazelcastConfigFileName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070062 Config baseHzConfig = null;
63 try {
64 baseHzConfig = new FileSystemXmlConfig(hazelcastConfigFileName);
65 } catch (FileNotFoundException e) {
66 log.error("Error opening Hazelcast XML configuration. File not found: " + hazelcastConfigFileName, e);
Ray Milkey269ffb92014-04-03 14:43:30 -070067 throw new Error("Cannot find Hazelcast configuration: " + hazelcastConfigFileName, e);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070068 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070069
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070070 // use xml config if present, if not use System.property
71 MapConfig mapConfig = baseHzConfig.getMapConfigs().get(MAP_PREFIX + "*");
72 if (mapConfig != null) {
73 backupCount = mapConfig.getBackupCount();
74 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070075
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070076 HazelcastInstance instance = null;
77 if (useClientMode) {
78 log.info("Configuring Hazelcast datastore as Client mode");
79 ClientConfig clientConfig = new ClientConfig();
80 final int port = baseHzConfig.getNetworkConfig().getPort();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070081
Jonathan Hart6df90172014-04-03 10:13:11 -070082 String server = System.getProperty("net.onrc.onos.core.datastore.hazelcast.client.server", "localhost");
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070083 clientConfig.addAddress(server + ":" + port);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070084
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070085 // copy group config from base Hazelcast configuration
86 clientConfig.getGroupConfig().setName(baseHzConfig.getGroupConfig().getName());
87 clientConfig.getGroupConfig().setPassword(baseHzConfig.getGroupConfig().getPassword());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070088
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070089 // TODO We probably need to figure out what else need to be
90 // derived from baseConfig
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070091
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070092 registerSerializer(clientConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070093
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070094 log.info("Starting Hazelcast datastore client for [{}]", clientConfig.getAddressList());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070095
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070096 try {
97 instance = HazelcastClient.newHazelcastClient(clientConfig);
98 if (!instance.getCluster().getMembers().isEmpty()) {
99 log.debug("Members in cluster: " + instance.getCluster().getMembers());
100 return instance;
101 }
102 log.info("Failed to find cluster member, falling back to Instance mode");
103 } catch (IllegalStateException e) {
104 log.info("Failed to initialize HazelcastClient, falling back to Instance mode");
105 }
106 useClientMode = false;
107 instance = null;
108 }
109 log.info("Configuring Hazelcast datastore as Instance mode");
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700110
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700111 // To run 2 Hazelcast instance in 1 JVM,
112 // we probably need to something like below
113 //int port = hazelcastConfig.getNetworkConfig().getPort();
114 //hazelcastConfig.getNetworkConfig().setPort(port+1);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700115
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700116 registerSerializer(baseHzConfig.getSerializationConfig());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700117
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700118 return Hazelcast.newHazelcastInstance(baseHzConfig);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700119 }
120
121 /**
122 * Register serializer for VersionedValue class used to imitate value version.
Ray Milkey269ffb92014-04-03 14:43:30 -0700123 *
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700124 * @param config
125 */
126 private static void registerSerializer(final SerializationConfig config) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700127 config.addDataSerializableFactoryClass(
128 VersionedValueSerializableFactory.FACTORY_ID,
129 VersionedValueSerializableFactory.class);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700130 }
131
132 @Override
133 public IKVTable getTable(final String tableName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700134 IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700135
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700136 if (!useClientMode) {
137 // config only available in Instance Mode
138 // Client Mode must rely on hazelcast.xml to be properly configured.
139 MapConfig config = hazelcastInstance.getConfig().getMapConfig(MAP_PREFIX + tableName);
140 // config for this map to be strong consistent
141 if (config.isReadBackupData()) {
142 config.setReadBackupData(false);
143 }
144 if (config.isNearCacheEnabled()) {
145 config.getNearCacheConfig().setMaxSize(0);
146 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700147
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700148 if (config.getBackupCount() != backupCount) {
149 config.setAsyncBackupCount(0);
150 config.setBackupCount(backupCount);
151 }
152 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700153
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700154 return new HZTable(tableName, map);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700155 }
156
157 @Override
158 public void dropTable(final IKVTable table) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700159 ((HZTable) table).getBackendMap().clear();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700160 }
161
162 @Override
163 public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
164 throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700165 IKVTable table = (IKVTable) tableId;
166 return table.create(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700167 }
168
169 @Override
170 public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700171 IKVTable table = (IKVTable) tableId;
172 return table.forceCreate(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700173 }
174
175 @Override
176 public IKVEntry read(final IKVTableID tableId, final byte[] key)
177 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700178 IKVTable table = (IKVTable) tableId;
179 return table.read(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700180 }
181
182 @Override
183 public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
Ray Milkey269ffb92014-04-03 14:43:30 -0700184 final long version) throws ObjectDoesntExistException,
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700185 WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700186 IKVTable table = (IKVTable) tableId;
187 return table.update(key, value, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700188 }
189
190 @Override
191 public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
192 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700193 IKVTable table = (IKVTable) tableId;
194 return table.update(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700195 }
196
197 @Override
198 public long delete(final IKVTableID tableId, final byte[] key, final long version)
199 throws ObjectDoesntExistException, WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700200 IKVTable table = (IKVTable) tableId;
201 return table.delete(key, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700202 }
203
204 @Override
205 public long forceDelete(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700206 IKVTable table = (IKVTable) tableId;
207 return table.forceDelete(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700208 }
209
210 @Override
211 public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700212 IKVTable table = (IKVTable) tableId;
213 return table.getAllEntries();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700214 }
215
216 @Override
217 public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700218 final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700219 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700220 }
221
222 @Override
223 public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700224 final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700225 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700226 }
227
228 @Override
229 public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700230 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700231 }
232
233 @Override
234 public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700235 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700236 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700237 }
238
239 @Override
240 public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700241 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700242 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700243 }
244
245 @Override
246 public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700247 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700248 }
249
250 @Override
251 public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700252 boolean failExists = false;
253 for (IMultiEntryOperation op : ops) {
254 HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
255 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700256 case DELETE:
257 try {
258 final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
259 mop.setVersion(version);
260 mop.setStatus(STATUS.SUCCESS);
261 } catch (ObjectDoesntExistException | WrongVersionException e) {
262 log.error(mop + " failed.", e);
263 mop.setStatus(STATUS.FAILED);
264 failExists = true;
265 }
266 break;
267 case FORCE_DELETE:
268 final long version = forceDelete(mop.getTableId(), mop.getKey());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700269 mop.setVersion(version);
270 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700271 break;
272 default:
273 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700274 }
275 }
276 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700277 }
278
279 @Override
280 public boolean multiWrite(final List<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700281 // there may be room to batch to improve performance
282 boolean failExists = false;
283 for (IMultiEntryOperation op : ops) {
284 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
285 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700286 case CREATE:
287 try {
288 long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
289 mop.setVersion(version);
290 mop.setStatus(STATUS.SUCCESS);
291 } catch (ObjectExistsException e) {
292 log.error(mop + " failed.", e);
293 mop.setStatus(STATUS.FAILED);
294 failExists = true;
295 }
296 break;
297 case FORCE_CREATE: {
298 final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700299 mop.setVersion(version);
300 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700301 break;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700302 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700303 case UPDATE:
304 try {
305 long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
306 mop.setVersion(version);
307 mop.setStatus(STATUS.SUCCESS);
308 } catch (ObjectDoesntExistException | WrongVersionException e) {
309 log.error(mop + " failed.", e);
310 mop.setStatus(STATUS.FAILED);
311 failExists = true;
312 }
313 break;
314 default:
315 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700316 }
317 }
318 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700319 }
320
321 @Override
322 public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700323 boolean failExists = false;
324 for (IMultiEntryOperation op : ops) {
325 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
326 HZTable table = (HZTable) op.getTableId();
327 ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
328 }
329 for (IMultiEntryOperation op : ops) {
330 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
Ray Milkey1aa71f82014-04-08 16:23:24 -0700331 if (!mop.hasSucceeded()) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700332 failExists = true;
333 }
334 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700335
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700336 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700337 }
338
339 @Override
Ray Milkey7531a342014-04-11 15:08:12 -0700340 public long getVersionNonexistant() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700341 return VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700342 }
343
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700344 private String getCounterName(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700345 StringBuilder buf = new StringBuilder(tableId.getTableName());
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700346 buf.append('@');
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700347 ByteArrayUtil.toHexStringBuilder(key, ":", buf);
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700348 return buf.toString();
349 }
350
351 private IAtomicLong getAtomicLong(final IKVTableID tableId, final byte[] key) {
352 // TODO we probably want to implement some sort of caching
353 return hazelcastInstance.getAtomicLong(getCounterName(tableId, key));
354 }
355
356 /**
357 * {@inheritDoc}
358 * <p />
359 * Warning: The counter is a different object from {@code key} entry on
360 * IKVTable with {@code tableId}. You cannot use table API to read/write
361 * counters.
362 *
363 * @param tableId Only getTableName() will be used.
364 * @param key tableId + key will be used as Counter name
365 */
366 @Override
367 public void createCounter(final IKVTableID tableId,
368 final byte[] key, final long initialValue)
369 throws ObjectExistsException {
370
371 IAtomicLong counter = getAtomicLong(tableId, key);
372 // Assumption here is that AtomicLong is initialized to 0L
373 final boolean success = counter.compareAndSet(0L, initialValue);
374 if (!success) {
375 throw new ObjectExistsException("Atomic counter "
376 + getCounterName(tableId, key)
377 + " already exist with value:" + counter.get());
378 }
379 }
380
381 @Override
382 public void setCounter(final IKVTableID tableId,
383 final byte[] key, final long value) {
384
385 IAtomicLong counter = getAtomicLong(tableId, key);
386 counter.set(value);
387 }
388
389 @Override
390 public long incrementCounter(final IKVTableID tableId,
391 final byte[] key, final long incrementValue) {
392
393 IAtomicLong counter = getAtomicLong(tableId, key);
394 return counter.addAndGet(incrementValue);
395 }
396
397 @Override
398 public void destroyCounter(final IKVTableID tableId, final byte[] key) {
399 IAtomicLong counter = getAtomicLong(tableId, key);
400 counter.destroy();
401
402 }
403
404 @Override
405 public long getCounter(final IKVTableID tableId, final byte[] key)
406 throws ObjectDoesntExistException {
407
408 IAtomicLong counter = getAtomicLong(tableId, key);
409 return counter.get();
410 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700411
412}