blob: e7182a76870570acd2c212c1fcadbe468afef61b [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.hazelcast;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07002
3import java.io.IOException;
4import java.util.ArrayList;
5import java.util.Arrays;
6import java.util.List;
7import java.util.Set;
8import java.util.concurrent.atomic.AtomicLong;
9
Yuta HIGUCHId395b932014-05-01 15:15:20 -070010import net.onrc.onos.core.datastore.DataStoreClient;
Jonathan Hart6df90172014-04-03 10:13:11 -070011import net.onrc.onos.core.datastore.IKVTable;
12import net.onrc.onos.core.datastore.IKVTableID;
13import net.onrc.onos.core.datastore.ObjectDoesntExistException;
14import net.onrc.onos.core.datastore.ObjectExistsException;
15import net.onrc.onos.core.datastore.WrongVersionException;
Yuta HIGUCHIf148aac2014-05-05 14:59:06 -070016import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070017
Yuta HIGUCHId395b932014-05-01 15:15:20 -070018import org.apache.commons.collections.BufferOverflowException;
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -070019import org.apache.commons.lang.ArrayUtils;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070020import org.slf4j.Logger;
21import org.slf4j.LoggerFactory;
22
23import com.hazelcast.core.IMap;
24import com.hazelcast.nio.ObjectDataInput;
25import com.hazelcast.nio.ObjectDataOutput;
26import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
27
28public class HZTable implements IKVTable, IKVTableID {
29 @SuppressWarnings("unused")
30 private static final Logger log = LoggerFactory.getLogger(HZTable.class);
31
32 // not sure how strict this should be managed
Ray Milkey5c9f2db2014-04-09 10:31:21 -070033 private static final AtomicLong INITIAL_VERSION = new AtomicLong(HZClient.VERSION_NONEXISTENT);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070034
35 /**
36 * generate a new initial version for an entry.
Ray Milkey269ffb92014-04-03 14:43:30 -070037 *
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070038 * @return initial value
39 */
40 protected static long getInitialVersion() {
Ray Milkey5c9f2db2014-04-09 10:31:21 -070041 long version = INITIAL_VERSION.incrementAndGet();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070042 if (version == HZClient.VERSION_NONEXISTENT) {
43 // used up whole 64bit space?
Ray Milkey5c9f2db2014-04-09 10:31:21 -070044 version = INITIAL_VERSION.incrementAndGet();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070045 }
46 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070047 }
48
49 /**
Ray Milkey7531a342014-04-11 15:08:12 -070050 * increment version, avoiding versionNonexistant.
Ray Milkey269ffb92014-04-03 14:43:30 -070051 *
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070052 * @param version
Jonathan Hart99ff20a2014-06-15 16:53:00 -070053 * @return the next version number
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070054 */
55 protected static long getNextVersion(final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070056 long nextVersion = version + 1;
57 if (nextVersion == HZClient.VERSION_NONEXISTENT) {
58 ++nextVersion;
59 }
60 return nextVersion;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070061 }
62
63 static class VersionedValue implements IdentifiedDataSerializable {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070064 private static final long serialVersionUID = -3149375966890712708L;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070065
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070066 private byte[] value;
67 private long version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070068
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070069 protected VersionedValue() {
70 value = new byte[0];
71 version = HZClient.VERSION_NONEXISTENT;
72 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070073
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070074 public VersionedValue(final byte[] value, final long version) {
Yuta HIGUCHId395b932014-05-01 15:15:20 -070075 setValue(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070076 this.version = version;
77 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070078
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070079 public byte[] getValue() {
80 return value;
81 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070082
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070083 public long getVersion() {
84 return version;
85 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070086
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070087 public void setValue(final byte[] value) {
Yuta HIGUCHId395b932014-05-01 15:15:20 -070088 if (value != null && value.length > DataStoreClient.MAX_VALUE_BYTES) {
89 throw new BufferOverflowException("Value must be smaller than 1MB");
90 }
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -070091 this.value = ArrayUtils.clone(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070092 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070093
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070094 public void setNextVersion() {
95 this.version = getNextVersion(this.version);
96 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070097
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070098 @Override
99 public void writeData(final ObjectDataOutput out) throws IOException {
100 out.writeLong(version);
101 out.writeInt(value.length);
102 if (value.length > 0) {
103 out.write(value);
104 }
105 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700106
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700107 @Override
108 public void readData(final ObjectDataInput in) throws IOException {
109 version = in.readLong();
110 final int valueLen = in.readInt();
111 value = new byte[valueLen];
112 in.readFully(value);
113 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700114
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700115 @Override
116 public int getFactoryId() {
117 return VersionedValueSerializableFactory.FACTORY_ID;
118 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700119
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700120 @Override
121 public int getId() {
122 return VersionedValueSerializableFactory.VERSIONED_VALUE_ID;
123 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700124
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700125 @Override
126 public int hashCode() {
127 final int prime = 31;
128 int result = 1;
129 result = prime * result + (int) (version ^ (version >>> 32));
130 result = prime * result + Arrays.hashCode(value);
131 return result;
132 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700133
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700134 @Override
135 public boolean equals(final Object obj) {
136 if (this == obj) {
137 return true;
138 }
139 if (obj == null) {
140 return false;
141 }
142 if (getClass() != obj.getClass()) {
143 return false;
144 }
145 VersionedValue other = (VersionedValue) obj;
146 if (version != other.version) {
147 return false;
148 }
149 if (!Arrays.equals(value, other.value)) {
150 return false;
151 }
152 return true;
153 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700154 }
155
156 // TODO Refactor and extract common parts
157 public static class Entry implements IKVEntry {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700158 final byte[] key;
159 byte[] value;
160 long version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700161
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700162 public Entry(final byte[] key, final byte[] value, final long version) {
Yuta HIGUCHId395b932014-05-01 15:15:20 -0700163 if (key.length > DataStoreClient.MAX_KEY_BYTES) {
164 throw new BufferOverflowException("Key must be smaller than 64KB");
165 }
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700166 this.key = key.clone();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700167 this.setValue(value);
168 this.setVersion(version);
169 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700170
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700171 public Entry(final byte[] key) {
172 this(key, null, HZClient.VERSION_NONEXISTENT);
173 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700174
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700175 @Override
176 public byte[] getKey() {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700177 return key.clone();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700178 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700179
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700180 @Override
181 public byte[] getValue() {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700182 return ArrayUtils.clone(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700183 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700184
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700185 @Override
186 public long getVersion() {
187 return version;
188 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700189
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700190 void setValue(final byte[] value) {
Yuta HIGUCHIce7e7f82014-04-15 21:37:38 -0700191 this.value = ArrayUtils.clone(value);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700192 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700193
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700194 void setVersion(final long version) {
195 this.version = version;
196 }
Yuta HIGUCHIf148aac2014-05-05 14:59:06 -0700197
198 @Override
199 public String toString() {
200 return "[Entry key=" + ByteArrayUtil.toHexStringBuilder(key, ":") + ", value="
201 + ByteArrayUtil.toHexStringBuilder(value, ":") + ", version=" + version + "]";
202 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700203 }
204
205
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700206 private final String mapName;
207 private final IMap<byte[], VersionedValue> map;
208
209 public HZTable(final String mapName, final IMap<byte[], VersionedValue> map) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700210 this.mapName = mapName;
211 this.map = map;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700212 }
213
214 @Override
215 public String getTableName() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700216 return mapName;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700217 }
218
219 @Override
220 public IKVTableID getTableId() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700221 return this;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700222 }
223
224 @Override
225 public long create(final byte[] key, final byte[] value) throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700226 final long version = getInitialVersion();
227 VersionedValue existing = map.putIfAbsent(key, new VersionedValue(value, version));
228 if (existing != null) {
229 throw new ObjectExistsException(this, key);
230 }
231 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700232 }
233
234 @Override
235 public long forceCreate(final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700236 final long version = getInitialVersion();
237 map.set(key, new VersionedValue(value, version));
238 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700239 }
240
241 @Override
242 public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700243 final VersionedValue value = map.get(key);
244 if (value == null) {
245 throw new ObjectDoesntExistException(this, key);
246 }
247 return new Entry(key, value.getValue(), value.getVersion());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700248 }
249
250 @Override
251 public long update(final byte[] key, final byte[] value, final long version)
252 throws ObjectDoesntExistException, WrongVersionException {
253
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700254 try {
255 map.lock(key);
256 final VersionedValue oldValue = map.get(key);
257 if (oldValue == null) {
258 throw new ObjectDoesntExistException(this, key);
259 }
260 if (oldValue.getVersion() != version) {
261 throw new WrongVersionException(this, key, version, oldValue.getVersion());
262 }
263 final long nextVersion = getNextVersion(version);
264 map.set(key, new VersionedValue(value, nextVersion));
265 return nextVersion;
266 } finally {
267 map.unlock(key);
268 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700269 }
270
271 @Override
272 public long update(final byte[] key, final byte[] value)
273 throws ObjectDoesntExistException {
274
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700275 try {
276 map.lock(key);
277 final VersionedValue valueInMap = map.get(key);
278 if (valueInMap == null) {
279 throw new ObjectDoesntExistException(this, key);
280 }
281 valueInMap.setValue(value);
282 valueInMap.setNextVersion();
283 map.set(key, valueInMap);
284 return valueInMap.getVersion();
285 } finally {
286 map.unlock(key);
287 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700288 }
289
290 @Override
291 public long delete(final byte[] key, final long version)
292 throws ObjectDoesntExistException, WrongVersionException {
293
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700294 try {
295 map.lock(key);
296 final VersionedValue oldValue = map.get(key);
297 if (oldValue == null) {
298 throw new ObjectDoesntExistException(this, key);
299 }
300 if (oldValue.getVersion() != version) {
301 throw new WrongVersionException(this, key, version, oldValue.getVersion());
302 }
303 map.delete(key);
304 return oldValue.getVersion();
305 } finally {
306 map.unlock(key);
307 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700308 }
309
310 @Override
311 public long forceDelete(final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700312 final VersionedValue valueInMap = map.remove(key);
313 if (valueInMap == null) {
314 return HZClient.VERSION_NONEXISTENT;
315 }
316 return valueInMap.getVersion();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700317 }
318
319 @Override
320 public Iterable<IKVEntry> getAllEntries() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700321 final Set<IMap.Entry<byte[], VersionedValue>> entries = map.entrySet();
322 List<IKVEntry> entryList = new ArrayList<IKVTable.IKVEntry>(entries.size());
323 for (IMap.Entry<byte[], VersionedValue> entry : entries) {
324 entryList.add(new Entry(entry.getKey(), entry.getValue().getValue(), entry.getValue().getVersion()));
325 }
326 return entryList;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700327 }
328
329 @Override
330 public String toString() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700331 return "[HZTable " + mapName + "]";
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700332 }
333
334 IMap<byte[], VersionedValue> getBackendMap() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700335 return this.map;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700336 }
337
338 @Override
Ray Milkey7531a342014-04-11 15:08:12 -0700339 public long getVersionNonexistant() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700340 return HZClient.VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700341 }
342}