blob: e609bce9d964c13eebd8c3d0e4662313927f2e3d [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.hazelcast;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07002
3import java.io.IOException;
4import java.util.ArrayList;
5import java.util.Arrays;
6import java.util.List;
7import java.util.Set;
8import java.util.concurrent.atomic.AtomicLong;
9
Pavlin Radoslavovc9bacee2014-04-11 19:02:17 -070010import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
11
Jonathan Hart6df90172014-04-03 10:13:11 -070012import net.onrc.onos.core.datastore.IKVTable;
13import net.onrc.onos.core.datastore.IKVTableID;
14import net.onrc.onos.core.datastore.ObjectDoesntExistException;
15import net.onrc.onos.core.datastore.ObjectExistsException;
16import net.onrc.onos.core.datastore.WrongVersionException;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070017
18import org.slf4j.Logger;
19import org.slf4j.LoggerFactory;
20
21import com.hazelcast.core.IMap;
22import com.hazelcast.nio.ObjectDataInput;
23import com.hazelcast.nio.ObjectDataOutput;
24import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
25
26public class HZTable implements IKVTable, IKVTableID {
27 @SuppressWarnings("unused")
28 private static final Logger log = LoggerFactory.getLogger(HZTable.class);
29
30 // not sure how strict this should be managed
Ray Milkey5c9f2db2014-04-09 10:31:21 -070031 private static final AtomicLong INITIAL_VERSION = new AtomicLong(HZClient.VERSION_NONEXISTENT);
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070032
33 /**
34 * generate a new initial version for an entry.
Ray Milkey269ffb92014-04-03 14:43:30 -070035 *
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070036 * @return initial value
37 */
38 protected static long getInitialVersion() {
Ray Milkey5c9f2db2014-04-09 10:31:21 -070039 long version = INITIAL_VERSION.incrementAndGet();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070040 if (version == HZClient.VERSION_NONEXISTENT) {
41 // used up whole 64bit space?
Ray Milkey5c9f2db2014-04-09 10:31:21 -070042 version = INITIAL_VERSION.incrementAndGet();
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070043 }
44 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070045 }
46
47 /**
Ray Milkey7531a342014-04-11 15:08:12 -070048 * increment version, avoiding versionNonexistant.
Ray Milkey269ffb92014-04-03 14:43:30 -070049 *
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070050 * @param version
51 * @return
52 */
53 protected static long getNextVersion(final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070054 long nextVersion = version + 1;
55 if (nextVersion == HZClient.VERSION_NONEXISTENT) {
56 ++nextVersion;
57 }
58 return nextVersion;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070059 }
60
61 static class VersionedValue implements IdentifiedDataSerializable {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070062 private static final long serialVersionUID = -3149375966890712708L;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070063
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070064 private byte[] value;
65 private long version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070066
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070067 protected VersionedValue() {
68 value = new byte[0];
69 version = HZClient.VERSION_NONEXISTENT;
70 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070071
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070072 public VersionedValue(final byte[] value, final long version) {
73 this.value = value;
74 this.version = version;
75 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070076
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070077 public byte[] getValue() {
78 return value;
79 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070080
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070081 public long getVersion() {
82 return version;
83 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070084
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070085 public void setValue(final byte[] value) {
86 this.value = value;
87 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070088
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070089 public void setNextVersion() {
90 this.version = getNextVersion(this.version);
91 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -070092
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070093 @Override
94 public void writeData(final ObjectDataOutput out) throws IOException {
95 out.writeLong(version);
96 out.writeInt(value.length);
97 if (value.length > 0) {
98 out.write(value);
99 }
100 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700101
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700102 @Override
103 public void readData(final ObjectDataInput in) throws IOException {
104 version = in.readLong();
105 final int valueLen = in.readInt();
106 value = new byte[valueLen];
107 in.readFully(value);
108 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700109
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700110 @Override
111 public int getFactoryId() {
112 return VersionedValueSerializableFactory.FACTORY_ID;
113 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700114
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700115 @Override
116 public int getId() {
117 return VersionedValueSerializableFactory.VERSIONED_VALUE_ID;
118 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700119
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700120 @Override
121 public int hashCode() {
122 final int prime = 31;
123 int result = 1;
124 result = prime * result + (int) (version ^ (version >>> 32));
125 result = prime * result + Arrays.hashCode(value);
126 return result;
127 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700128
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700129 @Override
130 public boolean equals(final Object obj) {
131 if (this == obj) {
132 return true;
133 }
134 if (obj == null) {
135 return false;
136 }
137 if (getClass() != obj.getClass()) {
138 return false;
139 }
140 VersionedValue other = (VersionedValue) obj;
141 if (version != other.version) {
142 return false;
143 }
144 if (!Arrays.equals(value, other.value)) {
145 return false;
146 }
147 return true;
148 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700149 }
150
151 // TODO Refactor and extract common parts
152 public static class Entry implements IKVEntry {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700153 final byte[] key;
154 byte[] value;
155 long version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700156
Pavlin Radoslavovc9bacee2014-04-11 19:02:17 -0700157 @SuppressFBWarnings(value = "EI_EXPOSE_REP2",
158 justification = "TODO: Store a copy of the object?")
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700159 public Entry(final byte[] key, final byte[] value, final long version) {
160 this.key = key;
161 this.setValue(value);
162 this.setVersion(version);
163 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700164
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700165 public Entry(final byte[] key) {
166 this(key, null, HZClient.VERSION_NONEXISTENT);
167 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700168
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700169 @Override
Pavlin Radoslavovc9bacee2014-04-11 19:02:17 -0700170 @SuppressFBWarnings(value = "EI_EXPOSE_REP",
171 justification = "TODO: Return a copy of the object?")
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700172 public byte[] getKey() {
173 return key;
174 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700175
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700176 @Override
Pavlin Radoslavovc9bacee2014-04-11 19:02:17 -0700177 @SuppressFBWarnings(value = "EI_EXPOSE_REP",
178 justification = "TODO: Return a copy of the object?")
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700179 public byte[] getValue() {
180 return value;
181 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700182
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700183 @Override
184 public long getVersion() {
185 return version;
186 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700187
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700188 void setValue(final byte[] value) {
189 this.value = value;
190 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700191
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700192 void setVersion(final long version) {
193 this.version = version;
194 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700195 }
196
197
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700198 private final String mapName;
199 private final IMap<byte[], VersionedValue> map;
200
201 public HZTable(final String mapName, final IMap<byte[], VersionedValue> map) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700202 this.mapName = mapName;
203 this.map = map;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700204 }
205
206 @Override
207 public String getTableName() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700208 return mapName;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700209 }
210
211 @Override
212 public IKVTableID getTableId() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700213 return this;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700214 }
215
216 @Override
217 public long create(final byte[] key, final byte[] value) throws ObjectExistsException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700218 final long version = getInitialVersion();
219 VersionedValue existing = map.putIfAbsent(key, new VersionedValue(value, version));
220 if (existing != null) {
221 throw new ObjectExistsException(this, key);
222 }
223 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700224 }
225
226 @Override
227 public long forceCreate(final byte[] key, final byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700228 final long version = getInitialVersion();
229 map.set(key, new VersionedValue(value, version));
230 return version;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700231 }
232
233 @Override
234 public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700235 final VersionedValue value = map.get(key);
236 if (value == null) {
237 throw new ObjectDoesntExistException(this, key);
238 }
239 return new Entry(key, value.getValue(), value.getVersion());
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700240 }
241
242 @Override
243 public long update(final byte[] key, final byte[] value, final long version)
244 throws ObjectDoesntExistException, WrongVersionException {
245
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700246 try {
247 map.lock(key);
248 final VersionedValue oldValue = map.get(key);
249 if (oldValue == null) {
250 throw new ObjectDoesntExistException(this, key);
251 }
252 if (oldValue.getVersion() != version) {
253 throw new WrongVersionException(this, key, version, oldValue.getVersion());
254 }
255 final long nextVersion = getNextVersion(version);
256 map.set(key, new VersionedValue(value, nextVersion));
257 return nextVersion;
258 } finally {
259 map.unlock(key);
260 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700261 }
262
263 @Override
264 public long update(final byte[] key, final byte[] value)
265 throws ObjectDoesntExistException {
266
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700267 try {
268 map.lock(key);
269 final VersionedValue valueInMap = map.get(key);
270 if (valueInMap == null) {
271 throw new ObjectDoesntExistException(this, key);
272 }
273 valueInMap.setValue(value);
274 valueInMap.setNextVersion();
275 map.set(key, valueInMap);
276 return valueInMap.getVersion();
277 } finally {
278 map.unlock(key);
279 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700280 }
281
282 @Override
283 public long delete(final byte[] key, final long version)
284 throws ObjectDoesntExistException, WrongVersionException {
285
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700286 try {
287 map.lock(key);
288 final VersionedValue oldValue = map.get(key);
289 if (oldValue == null) {
290 throw new ObjectDoesntExistException(this, key);
291 }
292 if (oldValue.getVersion() != version) {
293 throw new WrongVersionException(this, key, version, oldValue.getVersion());
294 }
295 map.delete(key);
296 return oldValue.getVersion();
297 } finally {
298 map.unlock(key);
299 }
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700300 }
301
302 @Override
303 public long forceDelete(final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700304 final VersionedValue valueInMap = map.remove(key);
305 if (valueInMap == null) {
306 return HZClient.VERSION_NONEXISTENT;
307 }
308 return valueInMap.getVersion();
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700309 }
310
311 @Override
312 public Iterable<IKVEntry> getAllEntries() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700313 final Set<IMap.Entry<byte[], VersionedValue>> entries = map.entrySet();
314 List<IKVEntry> entryList = new ArrayList<IKVTable.IKVEntry>(entries.size());
315 for (IMap.Entry<byte[], VersionedValue> entry : entries) {
316 entryList.add(new Entry(entry.getKey(), entry.getValue().getValue(), entry.getValue().getVersion()));
317 }
318 return entryList;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700319 }
320
321 @Override
322 public String toString() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700323 return "[HZTable " + mapName + "]";
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700324 }
325
326 IMap<byte[], VersionedValue> getBackendMap() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700327 return this.map;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700328 }
329
330 @Override
Ray Milkey7531a342014-04-11 15:08:12 -0700331 public long getVersionNonexistant() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700332 return HZClient.VERSION_NONEXISTENT;
Yuta HIGUCHI6a643132014-03-18 22:39:27 -0700333 }
334}