blob: 5398eb861ca0264d55e0ee2eff0dcadafd0f5c9d [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.hazelcast;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07002
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07003import java.util.Collection;
4import java.util.List;
5
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -07006import net.onrc.onos.core.datagrid.HazelcastDatagrid;
Jonathan Hart6df90172014-04-03 10:13:11 -07007import net.onrc.onos.core.datastore.IKVClient;
8import net.onrc.onos.core.datastore.IKVTable;
Jonathan Harta99ec672014-04-03 11:30:34 -07009import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
Jonathan Hart6df90172014-04-03 10:13:11 -070010import net.onrc.onos.core.datastore.IKVTableID;
11import net.onrc.onos.core.datastore.IMultiEntryOperation;
Jonathan Harta99ec672014-04-03 11:30:34 -070012import net.onrc.onos.core.datastore.IMultiEntryOperation.OPERATION;
13import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
Jonathan Hart6df90172014-04-03 10:13:11 -070014import net.onrc.onos.core.datastore.ObjectDoesntExistException;
15import net.onrc.onos.core.datastore.ObjectExistsException;
16import net.onrc.onos.core.datastore.WrongVersionException;
Jonathan Hart6df90172014-04-03 10:13:11 -070017import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
18import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070019import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070020import org.slf4j.Logger;
21import org.slf4j.LoggerFactory;
22
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070023import com.hazelcast.config.Config;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070024import com.hazelcast.core.Hazelcast;
25import com.hazelcast.core.HazelcastInstance;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070026import com.hazelcast.core.IAtomicLong;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070027import com.hazelcast.core.IMap;
28
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070029/**
30 * Hazelcast implementation of datastore IKVClient.
31 */
Ray Milkey1584ec82014-04-10 11:58:30 -070032public final class HZClient implements IKVClient {
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070033 private static final Logger log = LoggerFactory.getLogger(HZClient.class);
34
35 static final long VERSION_NONEXISTENT = 0L;
36
37 private static final String MAP_PREFIX = "datastore://";
38
Jonathan Hartc00f5c22014-06-10 15:14:40 -070039 private static final String BASE_CONFIG_FILENAME =
40 System.getProperty("net.onrc.onos.core.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070041
42 private final HazelcastInstance hazelcastInstance;
43
44 private static final HZClient THE_INSTANCE = new HZClient();
45
Yuta HIGUCHI8447c362014-05-30 11:32:49 -070046 /**
47 * Get DataStoreClient implemented on Hazelcast.
48 *
49 * @return HZClient
50 */
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070051 public static HZClient getClient() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070052 return THE_INSTANCE;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070053 }
54
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070055 /**
56 * Default constructor.
57 * <p/>
58 * Get or create the Hazelcast Instance to use for datastore.
59 */
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070060 private HZClient() {
Yuta HIGUCHI448bca02014-08-13 17:08:00 -070061 Config config = HazelcastDatagrid.loadHazelcastConfig(BASE_CONFIG_FILENAME);
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070062 // Try to get the existing HZ instance in JVM if possible.
Yuta HIGUCHIfde382d2014-08-20 17:22:17 -070063 hazelcastInstance = Hazelcast.getOrCreateHazelcastInstance(config);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070064 }
65
Yuta HIGUCHI6dfba392014-05-28 15:45:44 -070066 /**
Yuta HIGUCHI18e0db82014-06-16 09:44:21 -070067 * Gets the HazelcastInstance object.
68 *
69 * @return HazelcastInstance
70 */
71 HazelcastInstance getHZInstance() {
72 return hazelcastInstance;
73 }
74
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070075 @Override
76 public IKVTable getTable(final String tableName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070077 IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070078
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070079 return new HZTable(tableName, map);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070080 }
81
82 @Override
83 public void dropTable(final IKVTable table) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070084 ((HZTable) table).getBackendMap().clear();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070085 }
86
87 @Override
88 public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
89 throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070090 IKVTable table = (IKVTable) tableId;
91 return table.create(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070092 }
93
94 @Override
95 public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070096 IKVTable table = (IKVTable) tableId;
97 return table.forceCreate(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070098 }
99
100 @Override
101 public IKVEntry read(final IKVTableID tableId, final byte[] key)
102 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700103 IKVTable table = (IKVTable) tableId;
104 return table.read(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700105 }
106
107 @Override
108 public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
Ray Milkey269ffb92014-04-03 14:43:30 -0700109 final long version) throws ObjectDoesntExistException,
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700110 WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700111 IKVTable table = (IKVTable) tableId;
112 return table.update(key, value, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700113 }
114
115 @Override
116 public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
117 throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700118 IKVTable table = (IKVTable) tableId;
119 return table.update(key, value);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700120 }
121
122 @Override
123 public long delete(final IKVTableID tableId, final byte[] key, final long version)
124 throws ObjectDoesntExistException, WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700125 IKVTable table = (IKVTable) tableId;
126 return table.delete(key, version);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700127 }
128
129 @Override
130 public long forceDelete(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700131 IKVTable table = (IKVTable) tableId;
132 return table.forceDelete(key);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700133 }
134
135 @Override
136 public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700137 IKVTable table = (IKVTable) tableId;
138 return table.getAllEntries();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700139 }
140
141 @Override
142 public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700143 final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700144 return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700145 }
146
147 @Override
148 public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700149 final byte[] value) {
Jonathan Hartc00f5c22014-06-10 15:14:40 -0700150 return new HZMultiEntryOperation((HZTable) tableId, key, value,
151 HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700152 }
153
154 @Override
155 public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700156 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700157 }
158
159 @Override
160 public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700161 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700162 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700163 }
164
165 @Override
166 public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
Ray Milkey269ffb92014-04-03 14:43:30 -0700167 final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700168 return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700169 }
170
171 @Override
172 public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700173 return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700174 }
175
176 @Override
177 public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700178 boolean failExists = false;
179 for (IMultiEntryOperation op : ops) {
180 HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
181 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700182 case DELETE:
183 try {
184 final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
185 mop.setVersion(version);
186 mop.setStatus(STATUS.SUCCESS);
187 } catch (ObjectDoesntExistException | WrongVersionException e) {
188 log.error(mop + " failed.", e);
189 mop.setStatus(STATUS.FAILED);
190 failExists = true;
191 }
192 break;
193 case FORCE_DELETE:
194 final long version = forceDelete(mop.getTableId(), mop.getKey());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700195 mop.setVersion(version);
196 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700197 break;
198 default:
199 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700200 }
201 }
202 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700203 }
204
205 @Override
206 public boolean multiWrite(final List<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700207 // there may be room to batch to improve performance
208 boolean failExists = false;
209 for (IMultiEntryOperation op : ops) {
210 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
211 switch (mop.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700212 case CREATE:
213 try {
214 long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
215 mop.setVersion(version);
216 mop.setStatus(STATUS.SUCCESS);
217 } catch (ObjectExistsException e) {
218 log.error(mop + " failed.", e);
219 mop.setStatus(STATUS.FAILED);
220 failExists = true;
221 }
222 break;
223 case FORCE_CREATE: {
224 final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700225 mop.setVersion(version);
226 mop.setStatus(STATUS.SUCCESS);
Ray Milkey269ffb92014-04-03 14:43:30 -0700227 break;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700228 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700229 case UPDATE:
230 try {
231 long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
232 mop.setVersion(version);
233 mop.setStatus(STATUS.SUCCESS);
234 } catch (ObjectDoesntExistException | WrongVersionException e) {
235 log.error(mop + " failed.", e);
236 mop.setStatus(STATUS.FAILED);
237 failExists = true;
238 }
239 break;
240 default:
241 throw new UnsupportedOperationException(mop.toString());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700242 }
243 }
244 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700245 }
246
247 @Override
248 public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700249 boolean failExists = false;
250 for (IMultiEntryOperation op : ops) {
251 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
252 HZTable table = (HZTable) op.getTableId();
253 ((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
254 }
255 for (IMultiEntryOperation op : ops) {
256 IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
Ray Milkey1aa71f82014-04-08 16:23:24 -0700257 if (!mop.hasSucceeded()) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700258 failExists = true;
259 }
260 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700261
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700262 return failExists;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700263 }
264
265 @Override
Ray Milkey7531a342014-04-11 15:08:12 -0700266 public long getVersionNonexistant() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700267 return VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700268 }
269
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700270 private String getCounterName(final IKVTableID tableId, final byte[] key) {
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700271 StringBuilder buf = new StringBuilder(tableId.getTableName());
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700272 buf.append('@');
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700273 ByteArrayUtil.toHexStringBuilder(key, ":", buf);
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700274 return buf.toString();
275 }
276
277 private IAtomicLong getAtomicLong(final IKVTableID tableId, final byte[] key) {
278 // TODO we probably want to implement some sort of caching
279 return hazelcastInstance.getAtomicLong(getCounterName(tableId, key));
280 }
281
282 /**
283 * {@inheritDoc}
284 * <p />
285 * Warning: The counter is a different object from {@code key} entry on
286 * IKVTable with {@code tableId}. You cannot use table API to read/write
287 * counters.
288 *
289 * @param tableId Only getTableName() will be used.
290 * @param key tableId + key will be used as Counter name
291 */
292 @Override
293 public void createCounter(final IKVTableID tableId,
294 final byte[] key, final long initialValue)
295 throws ObjectExistsException {
296
297 IAtomicLong counter = getAtomicLong(tableId, key);
298 // Assumption here is that AtomicLong is initialized to 0L
299 final boolean success = counter.compareAndSet(0L, initialValue);
300 if (!success) {
301 throw new ObjectExistsException("Atomic counter "
302 + getCounterName(tableId, key)
303 + " already exist with value:" + counter.get());
304 }
305 }
306
307 @Override
308 public void setCounter(final IKVTableID tableId,
309 final byte[] key, final long value) {
310
311 IAtomicLong counter = getAtomicLong(tableId, key);
312 counter.set(value);
313 }
314
315 @Override
316 public long incrementCounter(final IKVTableID tableId,
317 final byte[] key, final long incrementValue) {
318
319 IAtomicLong counter = getAtomicLong(tableId, key);
320 return counter.addAndGet(incrementValue);
321 }
322
323 @Override
324 public void destroyCounter(final IKVTableID tableId, final byte[] key) {
325 IAtomicLong counter = getAtomicLong(tableId, key);
326 counter.destroy();
327
328 }
329
330 @Override
331 public long getCounter(final IKVTableID tableId, final byte[] key)
332 throws ObjectDoesntExistException {
333
334 IAtomicLong counter = getAtomicLong(tableId, key);
335 return counter.get();
336 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700337
338}