blob: f071ecc94a749c1fcf0a350551ee6bafb8f963f6 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.hazelcast;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07002
3import java.io.IOException;
4import java.util.ArrayList;
5import java.util.Arrays;
6import java.util.List;
7import java.util.Set;
8import java.util.concurrent.atomic.AtomicLong;
9
Jonathan Hart6df90172014-04-03 10:13:11 -070010import net.onrc.onos.core.datastore.IKVTable;
11import net.onrc.onos.core.datastore.IKVTableID;
12import net.onrc.onos.core.datastore.ObjectDoesntExistException;
13import net.onrc.onos.core.datastore.ObjectExistsException;
14import net.onrc.onos.core.datastore.WrongVersionException;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070015
16import org.slf4j.Logger;
17import org.slf4j.LoggerFactory;
18
19import com.hazelcast.core.IMap;
20import com.hazelcast.nio.ObjectDataInput;
21import com.hazelcast.nio.ObjectDataOutput;
22import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
23
24public class HZTable implements IKVTable, IKVTableID {
25 @SuppressWarnings("unused")
26 private static final Logger log = LoggerFactory.getLogger(HZTable.class);
27
28 // not sure how strict this should be managed
29 private static final AtomicLong initialVersion = new AtomicLong(HZClient.VERSION_NONEXISTENT);
30
31 /**
32 * generate a new initial version for an entry.
33 * @return initial value
34 */
35 protected static long getInitialVersion() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070036 long version = initialVersion.incrementAndGet();
37 if (version == HZClient.VERSION_NONEXISTENT) {
38 // used up whole 64bit space?
39 version = initialVersion.incrementAndGet();
40 }
41 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070042 }
43
44 /**
45 * increment version, avoiding VERSION_NONEXISTENT.
46 * @param version
47 * @return
48 */
49 protected static long getNextVersion(final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070050 long nextVersion = version + 1;
51 if (nextVersion == HZClient.VERSION_NONEXISTENT) {
52 ++nextVersion;
53 }
54 return nextVersion;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070055 }
56
57 static class VersionedValue implements IdentifiedDataSerializable {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070058 private static final long serialVersionUID = -3149375966890712708L;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070059
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070060 private byte[] value;
61 private long version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070062
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070063 protected VersionedValue() {
64 value = new byte[0];
65 version = HZClient.VERSION_NONEXISTENT;
66 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070067
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070068 public VersionedValue(final byte[] value, final long version) {
69 this.value = value;
70 this.version = version;
71 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070072
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070073 public byte[] getValue() {
74 return value;
75 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070076
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070077 public long getVersion() {
78 return version;
79 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070080
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070081 public void setValue(final byte[] value) {
82 this.value = value;
83 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070084
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070085 public void setNextVersion() {
86 this.version = getNextVersion(this.version);
87 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070088
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070089 @Override
90 public void writeData(final ObjectDataOutput out) throws IOException {
91 out.writeLong(version);
92 out.writeInt(value.length);
93 if (value.length > 0) {
94 out.write(value);
95 }
96 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070097
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070098 @Override
99 public void readData(final ObjectDataInput in) throws IOException {
100 version = in.readLong();
101 final int valueLen = in.readInt();
102 value = new byte[valueLen];
103 in.readFully(value);
104 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700105
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700106 @Override
107 public int getFactoryId() {
108 return VersionedValueSerializableFactory.FACTORY_ID;
109 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700110
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700111 @Override
112 public int getId() {
113 return VersionedValueSerializableFactory.VERSIONED_VALUE_ID;
114 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700115
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700116 @Override
117 public int hashCode() {
118 final int prime = 31;
119 int result = 1;
120 result = prime * result + (int) (version ^ (version >>> 32));
121 result = prime * result + Arrays.hashCode(value);
122 return result;
123 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700124
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700125 @Override
126 public boolean equals(final Object obj) {
127 if (this == obj) {
128 return true;
129 }
130 if (obj == null) {
131 return false;
132 }
133 if (getClass() != obj.getClass()) {
134 return false;
135 }
136 VersionedValue other = (VersionedValue) obj;
137 if (version != other.version) {
138 return false;
139 }
140 if (!Arrays.equals(value, other.value)) {
141 return false;
142 }
143 return true;
144 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700145 }
146
147 // TODO Refactor and extract common parts
148 public static class Entry implements IKVEntry {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700149 final byte[] key;
150 byte[] value;
151 long version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700152
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700153 public Entry(final byte[] key, final byte[] value, final long version) {
154 this.key = key;
155 this.setValue(value);
156 this.setVersion(version);
157 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700158
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700159 public Entry(final byte[] key) {
160 this(key, null, HZClient.VERSION_NONEXISTENT);
161 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700162
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700163 @Override
164 public byte[] getKey() {
165 return key;
166 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700167
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700168 @Override
169 public byte[] getValue() {
170 return value;
171 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700172
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700173 @Override
174 public long getVersion() {
175 return version;
176 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700177
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700178 void setValue(final byte[] value) {
179 this.value = value;
180 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700181
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700182 void setVersion(final long version) {
183 this.version = version;
184 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700185 }
186
187
188
189 private final String mapName;
190 private final IMap<byte[], VersionedValue> map;
191
192 public HZTable(final String mapName, final IMap<byte[], VersionedValue> map) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700193 this.mapName = mapName;
194 this.map = map;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700195 }
196
197 @Override
198 public String getTableName() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700199 return mapName;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700200 }
201
202 @Override
203 public IKVTableID getTableId() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700204 return this;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700205 }
206
207 @Override
208 public long create(final byte[] key, final byte[] value) throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700209 final long version = getInitialVersion();
210 VersionedValue existing = map.putIfAbsent(key, new VersionedValue(value, version));
211 if (existing != null) {
212 throw new ObjectExistsException(this, key);
213 }
214 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700215 }
216
217 @Override
218 public long forceCreate(final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700219 final long version = getInitialVersion();
220 map.set(key, new VersionedValue(value, version));
221 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700222 }
223
224 @Override
225 public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700226 final VersionedValue value = map.get(key);
227 if (value == null) {
228 throw new ObjectDoesntExistException(this, key);
229 }
230 return new Entry(key, value.getValue(), value.getVersion());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700231 }
232
233 @Override
234 public long update(final byte[] key, final byte[] value, final long version)
235 throws ObjectDoesntExistException, WrongVersionException {
236
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700237 try {
238 map.lock(key);
239 final VersionedValue oldValue = map.get(key);
240 if (oldValue == null) {
241 throw new ObjectDoesntExistException(this, key);
242 }
243 if (oldValue.getVersion() != version) {
244 throw new WrongVersionException(this, key, version, oldValue.getVersion());
245 }
246 final long nextVersion = getNextVersion(version);
247 map.set(key, new VersionedValue(value, nextVersion));
248 return nextVersion;
249 } finally {
250 map.unlock(key);
251 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700252 }
253
254 @Override
255 public long update(final byte[] key, final byte[] value)
256 throws ObjectDoesntExistException {
257
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700258 try {
259 map.lock(key);
260 final VersionedValue valueInMap = map.get(key);
261 if (valueInMap == null) {
262 throw new ObjectDoesntExistException(this, key);
263 }
264 valueInMap.setValue(value);
265 valueInMap.setNextVersion();
266 map.set(key, valueInMap);
267 return valueInMap.getVersion();
268 } finally {
269 map.unlock(key);
270 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700271 }
272
273 @Override
274 public long delete(final byte[] key, final long version)
275 throws ObjectDoesntExistException, WrongVersionException {
276
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700277 try {
278 map.lock(key);
279 final VersionedValue oldValue = map.get(key);
280 if (oldValue == null) {
281 throw new ObjectDoesntExistException(this, key);
282 }
283 if (oldValue.getVersion() != version) {
284 throw new WrongVersionException(this, key, version, oldValue.getVersion());
285 }
286 map.delete(key);
287 return oldValue.getVersion();
288 } finally {
289 map.unlock(key);
290 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700291 }
292
293 @Override
294 public long forceDelete(final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700295 final VersionedValue valueInMap = map.remove(key);
296 if (valueInMap == null) {
297 return HZClient.VERSION_NONEXISTENT;
298 }
299 return valueInMap.getVersion();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700300 }
301
302 @Override
303 public Iterable<IKVEntry> getAllEntries() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700304 final Set<IMap.Entry<byte[], VersionedValue>> entries = map.entrySet();
305 List<IKVEntry> entryList = new ArrayList<IKVTable.IKVEntry>(entries.size());
306 for (IMap.Entry<byte[], VersionedValue> entry : entries) {
307 entryList.add(new Entry(entry.getKey(), entry.getValue().getValue(), entry.getValue().getVersion()));
308 }
309 return entryList;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700310 }
311
312 @Override
313 public String toString() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700314 return "[HZTable " + mapName + "]";
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700315 }
316
317 IMap<byte[], VersionedValue> getBackendMap() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700318 return this.map;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700319 }
320
321 @Override
322 public long VERSION_NONEXISTENT() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700323 return HZClient.VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700324 }
325}