blob: 492204e0937c4950f00dffeea5802426f1d1cb69 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.hazelcast;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07002
Yuta HIGUCHI5bb0a292014-07-23 16:51:24 -07003import static com.google.common.base.Preconditions.checkNotNull;
4import static com.google.common.base.Preconditions.checkArgument;
5
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07006import java.io.IOException;
7import java.util.ArrayList;
8import java.util.Arrays;
9import java.util.List;
10import java.util.Set;
11import java.util.concurrent.atomic.AtomicLong;
12
Yuta HIGUCHId395b932014-05-01 15:15:20 -070013import net.onrc.onos.core.datastore.DataStoreClient;
Jonathan Hart6df90172014-04-03 10:13:11 -070014import net.onrc.onos.core.datastore.IKVTable;
15import net.onrc.onos.core.datastore.IKVTableID;
16import net.onrc.onos.core.datastore.ObjectDoesntExistException;
17import net.onrc.onos.core.datastore.ObjectExistsException;
18import net.onrc.onos.core.datastore.WrongVersionException;
Yuta HIGUCHIf148aac2014-05-05 14:59:06 -070019import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
Yuta HIGUCHI8f182192014-08-02 18:47:42 -070020import net.onrc.onos.core.util.serializers.HazelcastSerializationConstants;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070021
Yuta HIGUCHI498e1532014-08-20 21:30:28 -070022import org.apache.commons.lang3.ArrayUtils;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070023import org.slf4j.Logger;
24import org.slf4j.LoggerFactory;
25
26import com.hazelcast.core.IMap;
27import com.hazelcast.nio.ObjectDataInput;
28import com.hazelcast.nio.ObjectDataOutput;
29import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
30
31public class HZTable implements IKVTable, IKVTableID {
32 @SuppressWarnings("unused")
33 private static final Logger log = LoggerFactory.getLogger(HZTable.class);
34
35 // not sure how strict this should be managed
Ray Milkey5c9f2db2014-04-09 10:31:21 -070036 private static final AtomicLong INITIAL_VERSION = new AtomicLong(HZClient.VERSION_NONEXISTENT);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070037
38 /**
39 * generate a new initial version for an entry.
Ray Milkey269ffb92014-04-03 14:43:30 -070040 *
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070041 * @return initial value
42 */
43 protected static long getInitialVersion() {
Ray Milkey5c9f2db2014-04-09 10:31:21 -070044 long version = INITIAL_VERSION.incrementAndGet();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070045 if (version == HZClient.VERSION_NONEXISTENT) {
46 // used up whole 64bit space?
Ray Milkey5c9f2db2014-04-09 10:31:21 -070047 version = INITIAL_VERSION.incrementAndGet();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070048 }
49 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070050 }
51
52 /**
Ray Milkey7531a342014-04-11 15:08:12 -070053 * increment version, avoiding versionNonexistant.
Ray Milkey269ffb92014-04-03 14:43:30 -070054 *
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070055 * @param version
Jonathan Hart99ff20a2014-06-15 16:53:00 -070056 * @return the next version number
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070057 */
58 protected static long getNextVersion(final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070059 long nextVersion = version + 1;
60 if (nextVersion == HZClient.VERSION_NONEXISTENT) {
61 ++nextVersion;
62 }
63 return nextVersion;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070064 }
65
66 static class VersionedValue implements IdentifiedDataSerializable {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070067 private static final long serialVersionUID = -3149375966890712708L;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070068
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070069 private byte[] value;
70 private long version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070071
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070072 protected VersionedValue() {
73 value = new byte[0];
74 version = HZClient.VERSION_NONEXISTENT;
75 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070076
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070077 public VersionedValue(final byte[] value, final long version) {
Yuta HIGUCHId395b932014-05-01 15:15:20 -070078 setValue(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070079 this.version = version;
80 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070081
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070082 public byte[] getValue() {
83 return value;
84 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070085
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070086 public long getVersion() {
87 return version;
88 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070089
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070090 public void setValue(final byte[] value) {
Yuta HIGUCHI5bb0a292014-07-23 16:51:24 -070091 checkArgument(value == null ||
92 value.length <= DataStoreClient.MAX_VALUE_BYTES,
93 "Value must be smaller than 1MB");
94
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -070095 this.value = ArrayUtils.clone(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070096 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070097
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070098 public void setNextVersion() {
99 this.version = getNextVersion(this.version);
100 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700101
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700102 @Override
103 public void writeData(final ObjectDataOutput out) throws IOException {
104 out.writeLong(version);
105 out.writeInt(value.length);
106 if (value.length > 0) {
107 out.write(value);
108 }
109 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700110
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700111 @Override
112 public void readData(final ObjectDataInput in) throws IOException {
113 version = in.readLong();
114 final int valueLen = in.readInt();
115 value = new byte[valueLen];
116 in.readFully(value);
117 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700118
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700119 @Override
120 public int getFactoryId() {
Yuta HIGUCHI8f182192014-08-02 18:47:42 -0700121 return HazelcastSerializationConstants.VERSIONED_VALUE_SERIALIZABLE_FACTORY_ID;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700122 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700123
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700124 @Override
125 public int getId() {
Yuta HIGUCHI8f182192014-08-02 18:47:42 -0700126 return HazelcastSerializationConstants.VERSIONED_VALUE_TYPE_ID;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700127 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700128
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700129 @Override
130 public int hashCode() {
131 final int prime = 31;
132 int result = 1;
133 result = prime * result + (int) (version ^ (version >>> 32));
134 result = prime * result + Arrays.hashCode(value);
135 return result;
136 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700137
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700138 @Override
139 public boolean equals(final Object obj) {
140 if (this == obj) {
141 return true;
142 }
143 if (obj == null) {
144 return false;
145 }
146 if (getClass() != obj.getClass()) {
147 return false;
148 }
149 VersionedValue other = (VersionedValue) obj;
150 if (version != other.version) {
151 return false;
152 }
153 if (!Arrays.equals(value, other.value)) {
154 return false;
155 }
156 return true;
157 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700158 }
159
160 // TODO Refactor and extract common parts
161 public static class Entry implements IKVEntry {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700162 final byte[] key;
163 byte[] value;
164 long version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700165
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700166 public Entry(final byte[] key, final byte[] value, final long version) {
Yuta HIGUCHI5bb0a292014-07-23 16:51:24 -0700167 checkNotNull(key);
168 checkArgument(
169 key.length <= DataStoreClient.MAX_KEY_BYTES,
170 "Key must be smaller than 64KB");
171
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700172 this.key = key.clone();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700173 this.setValue(value);
174 this.setVersion(version);
175 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700176
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700177 public Entry(final byte[] key) {
178 this(key, null, HZClient.VERSION_NONEXISTENT);
179 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700180
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700181 @Override
182 public byte[] getKey() {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700183 return key.clone();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700184 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700185
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700186 @Override
187 public byte[] getValue() {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700188 return ArrayUtils.clone(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700189 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700190
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700191 @Override
192 public long getVersion() {
193 return version;
194 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700195
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700196 void setValue(final byte[] value) {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700197 this.value = ArrayUtils.clone(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700198 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700199
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700200 void setVersion(final long version) {
201 this.version = version;
202 }
Yuta HIGUCHIf148aac2014-05-05 14:59:06 -0700203
204 @Override
205 public String toString() {
206 return "[Entry key=" + ByteArrayUtil.toHexStringBuilder(key, ":") + ", value="
207 + ByteArrayUtil.toHexStringBuilder(value, ":") + ", version=" + version + "]";
208 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700209 }
210
211
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700212 private final String mapName;
213 private final IMap<byte[], VersionedValue> map;
214
215 public HZTable(final String mapName, final IMap<byte[], VersionedValue> map) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700216 this.mapName = mapName;
217 this.map = map;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700218 }
219
220 @Override
221 public String getTableName() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700222 return mapName;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700223 }
224
225 @Override
226 public IKVTableID getTableId() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700227 return this;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700228 }
229
230 @Override
231 public long create(final byte[] key, final byte[] value) throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700232 final long version = getInitialVersion();
233 VersionedValue existing = map.putIfAbsent(key, new VersionedValue(value, version));
234 if (existing != null) {
235 throw new ObjectExistsException(this, key);
236 }
237 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700238 }
239
240 @Override
241 public long forceCreate(final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700242 final long version = getInitialVersion();
243 map.set(key, new VersionedValue(value, version));
244 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700245 }
246
247 @Override
248 public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700249 final VersionedValue value = map.get(key);
250 if (value == null) {
251 throw new ObjectDoesntExistException(this, key);
252 }
253 return new Entry(key, value.getValue(), value.getVersion());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700254 }
255
256 @Override
257 public long update(final byte[] key, final byte[] value, final long version)
258 throws ObjectDoesntExistException, WrongVersionException {
259
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700260 try {
261 map.lock(key);
262 final VersionedValue oldValue = map.get(key);
263 if (oldValue == null) {
264 throw new ObjectDoesntExistException(this, key);
265 }
266 if (oldValue.getVersion() != version) {
267 throw new WrongVersionException(this, key, version, oldValue.getVersion());
268 }
269 final long nextVersion = getNextVersion(version);
270 map.set(key, new VersionedValue(value, nextVersion));
271 return nextVersion;
272 } finally {
273 map.unlock(key);
274 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700275 }
276
277 @Override
278 public long update(final byte[] key, final byte[] value)
279 throws ObjectDoesntExistException {
280
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700281 try {
282 map.lock(key);
283 final VersionedValue valueInMap = map.get(key);
284 if (valueInMap == null) {
285 throw new ObjectDoesntExistException(this, key);
286 }
287 valueInMap.setValue(value);
288 valueInMap.setNextVersion();
289 map.set(key, valueInMap);
290 return valueInMap.getVersion();
291 } finally {
292 map.unlock(key);
293 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700294 }
295
296 @Override
297 public long delete(final byte[] key, final long version)
298 throws ObjectDoesntExistException, WrongVersionException {
299
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700300 try {
301 map.lock(key);
302 final VersionedValue oldValue = map.get(key);
303 if (oldValue == null) {
304 throw new ObjectDoesntExistException(this, key);
305 }
306 if (oldValue.getVersion() != version) {
307 throw new WrongVersionException(this, key, version, oldValue.getVersion());
308 }
309 map.delete(key);
310 return oldValue.getVersion();
311 } finally {
312 map.unlock(key);
313 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700314 }
315
316 @Override
317 public long forceDelete(final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700318 final VersionedValue valueInMap = map.remove(key);
319 if (valueInMap == null) {
320 return HZClient.VERSION_NONEXISTENT;
321 }
322 return valueInMap.getVersion();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700323 }
324
325 @Override
326 public Iterable<IKVEntry> getAllEntries() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700327 final Set<IMap.Entry<byte[], VersionedValue>> entries = map.entrySet();
328 List<IKVEntry> entryList = new ArrayList<IKVTable.IKVEntry>(entries.size());
329 for (IMap.Entry<byte[], VersionedValue> entry : entries) {
330 entryList.add(new Entry(entry.getKey(), entry.getValue().getValue(), entry.getValue().getVersion()));
331 }
332 return entryList;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700333 }
334
335 @Override
336 public String toString() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700337 return "[HZTable " + mapName + "]";
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700338 }
339
340 IMap<byte[], VersionedValue> getBackendMap() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700341 return this.map;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700342 }
343
344 @Override
Ray Milkey7531a342014-04-11 15:08:12 -0700345 public long getVersionNonexistant() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700346 return HZClient.VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700347 }
348}