blob: c83e45d7f83167fa2980a891c2dfd163f283b715 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.hazelcast;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07002
3import java.io.IOException;
4import java.util.ArrayList;
5import java.util.Arrays;
6import java.util.List;
7import java.util.Set;
8import java.util.concurrent.atomic.AtomicLong;
9
Jonathan Hart6df90172014-04-03 10:13:11 -070010import net.onrc.onos.core.datastore.IKVTable;
11import net.onrc.onos.core.datastore.IKVTableID;
12import net.onrc.onos.core.datastore.ObjectDoesntExistException;
13import net.onrc.onos.core.datastore.ObjectExistsException;
14import net.onrc.onos.core.datastore.WrongVersionException;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070015
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -070016import org.apache.commons.lang.ArrayUtils;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070017import org.slf4j.Logger;
18import org.slf4j.LoggerFactory;
19
20import com.hazelcast.core.IMap;
21import com.hazelcast.nio.ObjectDataInput;
22import com.hazelcast.nio.ObjectDataOutput;
23import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
24
25public class HZTable implements IKVTable, IKVTableID {
26 @SuppressWarnings("unused")
27 private static final Logger log = LoggerFactory.getLogger(HZTable.class);
28
29 // not sure how strict this should be managed
Ray Milkey5c9f2db2014-04-09 10:31:21 -070030 private static final AtomicLong INITIAL_VERSION = new AtomicLong(HZClient.VERSION_NONEXISTENT);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070031
32 /**
33 * generate a new initial version for an entry.
Ray Milkey269ffb92014-04-03 14:43:30 -070034 *
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070035 * @return initial value
36 */
37 protected static long getInitialVersion() {
Ray Milkey5c9f2db2014-04-09 10:31:21 -070038 long version = INITIAL_VERSION.incrementAndGet();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070039 if (version == HZClient.VERSION_NONEXISTENT) {
40 // used up whole 64bit space?
Ray Milkey5c9f2db2014-04-09 10:31:21 -070041 version = INITIAL_VERSION.incrementAndGet();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070042 }
43 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070044 }
45
46 /**
Ray Milkey7531a342014-04-11 15:08:12 -070047 * increment version, avoiding versionNonexistant.
Ray Milkey269ffb92014-04-03 14:43:30 -070048 *
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070049 * @param version
50 * @return
51 */
52 protected static long getNextVersion(final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070053 long nextVersion = version + 1;
54 if (nextVersion == HZClient.VERSION_NONEXISTENT) {
55 ++nextVersion;
56 }
57 return nextVersion;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070058 }
59
60 static class VersionedValue implements IdentifiedDataSerializable {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070061 private static final long serialVersionUID = -3149375966890712708L;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070062
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070063 private byte[] value;
64 private long version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070065
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070066 protected VersionedValue() {
67 value = new byte[0];
68 version = HZClient.VERSION_NONEXISTENT;
69 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070070
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070071 public VersionedValue(final byte[] value, final long version) {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -070072 this.value = ArrayUtils.clone(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070073 this.version = version;
74 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070075
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070076 public byte[] getValue() {
77 return value;
78 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070079
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070080 public long getVersion() {
81 return version;
82 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070083
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070084 public void setValue(final byte[] value) {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -070085 this.value = ArrayUtils.clone(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070086 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070087
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070088 public void setNextVersion() {
89 this.version = getNextVersion(this.version);
90 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070091
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070092 @Override
93 public void writeData(final ObjectDataOutput out) throws IOException {
94 out.writeLong(version);
95 out.writeInt(value.length);
96 if (value.length > 0) {
97 out.write(value);
98 }
99 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700100
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700101 @Override
102 public void readData(final ObjectDataInput in) throws IOException {
103 version = in.readLong();
104 final int valueLen = in.readInt();
105 value = new byte[valueLen];
106 in.readFully(value);
107 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700108
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700109 @Override
110 public int getFactoryId() {
111 return VersionedValueSerializableFactory.FACTORY_ID;
112 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700113
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700114 @Override
115 public int getId() {
116 return VersionedValueSerializableFactory.VERSIONED_VALUE_ID;
117 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700118
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700119 @Override
120 public int hashCode() {
121 final int prime = 31;
122 int result = 1;
123 result = prime * result + (int) (version ^ (version >>> 32));
124 result = prime * result + Arrays.hashCode(value);
125 return result;
126 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700127
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700128 @Override
129 public boolean equals(final Object obj) {
130 if (this == obj) {
131 return true;
132 }
133 if (obj == null) {
134 return false;
135 }
136 if (getClass() != obj.getClass()) {
137 return false;
138 }
139 VersionedValue other = (VersionedValue) obj;
140 if (version != other.version) {
141 return false;
142 }
143 if (!Arrays.equals(value, other.value)) {
144 return false;
145 }
146 return true;
147 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700148 }
149
150 // TODO Refactor and extract common parts
151 public static class Entry implements IKVEntry {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700152 final byte[] key;
153 byte[] value;
154 long version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700155
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700156 public Entry(final byte[] key, final byte[] value, final long version) {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700157 this.key = key.clone();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700158 this.setValue(value);
159 this.setVersion(version);
160 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700161
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700162 public Entry(final byte[] key) {
163 this(key, null, HZClient.VERSION_NONEXISTENT);
164 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700165
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700166 @Override
167 public byte[] getKey() {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700168 return key.clone();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700169 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700170
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700171 @Override
172 public byte[] getValue() {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700173 return ArrayUtils.clone(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700174 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700175
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700176 @Override
177 public long getVersion() {
178 return version;
179 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700180
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700181 void setValue(final byte[] value) {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700182 this.value = ArrayUtils.clone(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700183 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700184
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700185 void setVersion(final long version) {
186 this.version = version;
187 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700188 }
189
190
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700191 private final String mapName;
192 private final IMap<byte[], VersionedValue> map;
193
194 public HZTable(final String mapName, final IMap<byte[], VersionedValue> map) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700195 this.mapName = mapName;
196 this.map = map;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700197 }
198
199 @Override
200 public String getTableName() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700201 return mapName;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700202 }
203
204 @Override
205 public IKVTableID getTableId() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700206 return this;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700207 }
208
209 @Override
210 public long create(final byte[] key, final byte[] value) throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700211 final long version = getInitialVersion();
212 VersionedValue existing = map.putIfAbsent(key, new VersionedValue(value, version));
213 if (existing != null) {
214 throw new ObjectExistsException(this, key);
215 }
216 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700217 }
218
219 @Override
220 public long forceCreate(final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700221 final long version = getInitialVersion();
222 map.set(key, new VersionedValue(value, version));
223 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700224 }
225
226 @Override
227 public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700228 final VersionedValue value = map.get(key);
229 if (value == null) {
230 throw new ObjectDoesntExistException(this, key);
231 }
232 return new Entry(key, value.getValue(), value.getVersion());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700233 }
234
235 @Override
236 public long update(final byte[] key, final byte[] value, final long version)
237 throws ObjectDoesntExistException, WrongVersionException {
238
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700239 try {
240 map.lock(key);
241 final VersionedValue oldValue = map.get(key);
242 if (oldValue == null) {
243 throw new ObjectDoesntExistException(this, key);
244 }
245 if (oldValue.getVersion() != version) {
246 throw new WrongVersionException(this, key, version, oldValue.getVersion());
247 }
248 final long nextVersion = getNextVersion(version);
249 map.set(key, new VersionedValue(value, nextVersion));
250 return nextVersion;
251 } finally {
252 map.unlock(key);
253 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700254 }
255
256 @Override
257 public long update(final byte[] key, final byte[] value)
258 throws ObjectDoesntExistException {
259
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700260 try {
261 map.lock(key);
262 final VersionedValue valueInMap = map.get(key);
263 if (valueInMap == null) {
264 throw new ObjectDoesntExistException(this, key);
265 }
266 valueInMap.setValue(value);
267 valueInMap.setNextVersion();
268 map.set(key, valueInMap);
269 return valueInMap.getVersion();
270 } finally {
271 map.unlock(key);
272 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700273 }
274
275 @Override
276 public long delete(final byte[] key, final long version)
277 throws ObjectDoesntExistException, WrongVersionException {
278
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700279 try {
280 map.lock(key);
281 final VersionedValue oldValue = map.get(key);
282 if (oldValue == null) {
283 throw new ObjectDoesntExistException(this, key);
284 }
285 if (oldValue.getVersion() != version) {
286 throw new WrongVersionException(this, key, version, oldValue.getVersion());
287 }
288 map.delete(key);
289 return oldValue.getVersion();
290 } finally {
291 map.unlock(key);
292 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700293 }
294
295 @Override
296 public long forceDelete(final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700297 final VersionedValue valueInMap = map.remove(key);
298 if (valueInMap == null) {
299 return HZClient.VERSION_NONEXISTENT;
300 }
301 return valueInMap.getVersion();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700302 }
303
304 @Override
305 public Iterable<IKVEntry> getAllEntries() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700306 final Set<IMap.Entry<byte[], VersionedValue>> entries = map.entrySet();
307 List<IKVEntry> entryList = new ArrayList<IKVTable.IKVEntry>(entries.size());
308 for (IMap.Entry<byte[], VersionedValue> entry : entries) {
309 entryList.add(new Entry(entry.getKey(), entry.getValue().getValue(), entry.getValue().getVersion()));
310 }
311 return entryList;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700312 }
313
314 @Override
315 public String toString() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700316 return "[HZTable " + mapName + "]";
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700317 }
318
319 IMap<byte[], VersionedValue> getBackendMap() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700320 return this.map;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700321 }
322
323 @Override
Ray Milkey7531a342014-04-11 15:08:12 -0700324 public long getVersionNonexistant() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700325 return HZClient.VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700326 }
327}