blob: da2c102a7b1c587c42bfddfb87d7f5d95a748602 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.hazelcast;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07002
3import java.io.IOException;
4import java.util.ArrayList;
5import java.util.Arrays;
6import java.util.List;
7import java.util.Set;
8import java.util.concurrent.atomic.AtomicLong;
9
Yuta HIGUCHId395b932014-05-01 15:15:20 -070010import net.onrc.onos.core.datastore.DataStoreClient;
Jonathan Hart6df90172014-04-03 10:13:11 -070011import net.onrc.onos.core.datastore.IKVTable;
12import net.onrc.onos.core.datastore.IKVTableID;
13import net.onrc.onos.core.datastore.ObjectDoesntExistException;
14import net.onrc.onos.core.datastore.ObjectExistsException;
15import net.onrc.onos.core.datastore.WrongVersionException;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070016
Yuta HIGUCHId395b932014-05-01 15:15:20 -070017import org.apache.commons.collections.BufferOverflowException;
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -070018import org.apache.commons.lang.ArrayUtils;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070019import org.slf4j.Logger;
20import org.slf4j.LoggerFactory;
21
22import com.hazelcast.core.IMap;
23import com.hazelcast.nio.ObjectDataInput;
24import com.hazelcast.nio.ObjectDataOutput;
25import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
26
27public class HZTable implements IKVTable, IKVTableID {
28 @SuppressWarnings("unused")
29 private static final Logger log = LoggerFactory.getLogger(HZTable.class);
30
31 // not sure how strict this should be managed
Ray Milkey5c9f2db2014-04-09 10:31:21 -070032 private static final AtomicLong INITIAL_VERSION = new AtomicLong(HZClient.VERSION_NONEXISTENT);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070033
34 /**
35 * generate a new initial version for an entry.
Ray Milkey269ffb92014-04-03 14:43:30 -070036 *
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070037 * @return initial value
38 */
39 protected static long getInitialVersion() {
Ray Milkey5c9f2db2014-04-09 10:31:21 -070040 long version = INITIAL_VERSION.incrementAndGet();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070041 if (version == HZClient.VERSION_NONEXISTENT) {
42 // used up whole 64bit space?
Ray Milkey5c9f2db2014-04-09 10:31:21 -070043 version = INITIAL_VERSION.incrementAndGet();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070044 }
45 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070046 }
47
48 /**
Ray Milkey7531a342014-04-11 15:08:12 -070049 * increment version, avoiding versionNonexistant.
Ray Milkey269ffb92014-04-03 14:43:30 -070050 *
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070051 * @param version
52 * @return
53 */
54 protected static long getNextVersion(final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070055 long nextVersion = version + 1;
56 if (nextVersion == HZClient.VERSION_NONEXISTENT) {
57 ++nextVersion;
58 }
59 return nextVersion;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070060 }
61
62 static class VersionedValue implements IdentifiedDataSerializable {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070063 private static final long serialVersionUID = -3149375966890712708L;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070064
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070065 private byte[] value;
66 private long version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070067
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070068 protected VersionedValue() {
69 value = new byte[0];
70 version = HZClient.VERSION_NONEXISTENT;
71 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070072
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070073 public VersionedValue(final byte[] value, final long version) {
Yuta HIGUCHId395b932014-05-01 15:15:20 -070074 setValue(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070075 this.version = version;
76 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070077
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070078 public byte[] getValue() {
79 return value;
80 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070081
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070082 public long getVersion() {
83 return version;
84 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070085
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070086 public void setValue(final byte[] value) {
Yuta HIGUCHId395b932014-05-01 15:15:20 -070087 if (value != null && value.length > DataStoreClient.MAX_VALUE_BYTES) {
88 throw new BufferOverflowException("Value must be smaller than 1MB");
89 }
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -070090 this.value = ArrayUtils.clone(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070091 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070092
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070093 public void setNextVersion() {
94 this.version = getNextVersion(this.version);
95 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070096
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070097 @Override
98 public void writeData(final ObjectDataOutput out) throws IOException {
99 out.writeLong(version);
100 out.writeInt(value.length);
101 if (value.length > 0) {
102 out.write(value);
103 }
104 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700105
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700106 @Override
107 public void readData(final ObjectDataInput in) throws IOException {
108 version = in.readLong();
109 final int valueLen = in.readInt();
110 value = new byte[valueLen];
111 in.readFully(value);
112 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700113
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700114 @Override
115 public int getFactoryId() {
116 return VersionedValueSerializableFactory.FACTORY_ID;
117 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700118
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700119 @Override
120 public int getId() {
121 return VersionedValueSerializableFactory.VERSIONED_VALUE_ID;
122 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700123
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700124 @Override
125 public int hashCode() {
126 final int prime = 31;
127 int result = 1;
128 result = prime * result + (int) (version ^ (version >>> 32));
129 result = prime * result + Arrays.hashCode(value);
130 return result;
131 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700132
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700133 @Override
134 public boolean equals(final Object obj) {
135 if (this == obj) {
136 return true;
137 }
138 if (obj == null) {
139 return false;
140 }
141 if (getClass() != obj.getClass()) {
142 return false;
143 }
144 VersionedValue other = (VersionedValue) obj;
145 if (version != other.version) {
146 return false;
147 }
148 if (!Arrays.equals(value, other.value)) {
149 return false;
150 }
151 return true;
152 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700153 }
154
155 // TODO Refactor and extract common parts
156 public static class Entry implements IKVEntry {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700157 final byte[] key;
158 byte[] value;
159 long version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700160
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700161 public Entry(final byte[] key, final byte[] value, final long version) {
Yuta HIGUCHId395b932014-05-01 15:15:20 -0700162 if (key.length > DataStoreClient.MAX_KEY_BYTES) {
163 throw new BufferOverflowException("Key must be smaller than 64KB");
164 }
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700165 this.key = key.clone();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700166 this.setValue(value);
167 this.setVersion(version);
168 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700169
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700170 public Entry(final byte[] key) {
171 this(key, null, HZClient.VERSION_NONEXISTENT);
172 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700173
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700174 @Override
175 public byte[] getKey() {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700176 return key.clone();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700177 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700178
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700179 @Override
180 public byte[] getValue() {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700181 return ArrayUtils.clone(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700182 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700183
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700184 @Override
185 public long getVersion() {
186 return version;
187 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700188
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700189 void setValue(final byte[] value) {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700190 this.value = ArrayUtils.clone(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700191 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700192
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700193 void setVersion(final long version) {
194 this.version = version;
195 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700196 }
197
198
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700199 private final String mapName;
200 private final IMap<byte[], VersionedValue> map;
201
202 public HZTable(final String mapName, final IMap<byte[], VersionedValue> map) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700203 this.mapName = mapName;
204 this.map = map;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700205 }
206
207 @Override
208 public String getTableName() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700209 return mapName;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700210 }
211
212 @Override
213 public IKVTableID getTableId() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700214 return this;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700215 }
216
217 @Override
218 public long create(final byte[] key, final byte[] value) throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700219 final long version = getInitialVersion();
220 VersionedValue existing = map.putIfAbsent(key, new VersionedValue(value, version));
221 if (existing != null) {
222 throw new ObjectExistsException(this, key);
223 }
224 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700225 }
226
227 @Override
228 public long forceCreate(final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700229 final long version = getInitialVersion();
230 map.set(key, new VersionedValue(value, version));
231 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700232 }
233
234 @Override
235 public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700236 final VersionedValue value = map.get(key);
237 if (value == null) {
238 throw new ObjectDoesntExistException(this, key);
239 }
240 return new Entry(key, value.getValue(), value.getVersion());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700241 }
242
243 @Override
244 public long update(final byte[] key, final byte[] value, final long version)
245 throws ObjectDoesntExistException, WrongVersionException {
246
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700247 try {
248 map.lock(key);
249 final VersionedValue oldValue = map.get(key);
250 if (oldValue == null) {
251 throw new ObjectDoesntExistException(this, key);
252 }
253 if (oldValue.getVersion() != version) {
254 throw new WrongVersionException(this, key, version, oldValue.getVersion());
255 }
256 final long nextVersion = getNextVersion(version);
257 map.set(key, new VersionedValue(value, nextVersion));
258 return nextVersion;
259 } finally {
260 map.unlock(key);
261 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700262 }
263
264 @Override
265 public long update(final byte[] key, final byte[] value)
266 throws ObjectDoesntExistException {
267
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700268 try {
269 map.lock(key);
270 final VersionedValue valueInMap = map.get(key);
271 if (valueInMap == null) {
272 throw new ObjectDoesntExistException(this, key);
273 }
274 valueInMap.setValue(value);
275 valueInMap.setNextVersion();
276 map.set(key, valueInMap);
277 return valueInMap.getVersion();
278 } finally {
279 map.unlock(key);
280 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700281 }
282
283 @Override
284 public long delete(final byte[] key, final long version)
285 throws ObjectDoesntExistException, WrongVersionException {
286
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700287 try {
288 map.lock(key);
289 final VersionedValue oldValue = map.get(key);
290 if (oldValue == null) {
291 throw new ObjectDoesntExistException(this, key);
292 }
293 if (oldValue.getVersion() != version) {
294 throw new WrongVersionException(this, key, version, oldValue.getVersion());
295 }
296 map.delete(key);
297 return oldValue.getVersion();
298 } finally {
299 map.unlock(key);
300 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700301 }
302
303 @Override
304 public long forceDelete(final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700305 final VersionedValue valueInMap = map.remove(key);
306 if (valueInMap == null) {
307 return HZClient.VERSION_NONEXISTENT;
308 }
309 return valueInMap.getVersion();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700310 }
311
312 @Override
313 public Iterable<IKVEntry> getAllEntries() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700314 final Set<IMap.Entry<byte[], VersionedValue>> entries = map.entrySet();
315 List<IKVEntry> entryList = new ArrayList<IKVTable.IKVEntry>(entries.size());
316 for (IMap.Entry<byte[], VersionedValue> entry : entries) {
317 entryList.add(new Entry(entry.getKey(), entry.getValue().getValue(), entry.getValue().getVersion()));
318 }
319 return entryList;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700320 }
321
322 @Override
323 public String toString() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700324 return "[HZTable " + mapName + "]";
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700325 }
326
327 IMap<byte[], VersionedValue> getBackendMap() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700328 return this.map;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700329 }
330
331 @Override
Ray Milkey7531a342014-04-11 15:08:12 -0700332 public long getVersionNonexistant() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700333 return HZClient.VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700334 }
335}