blob: ec98349ca75648b3d3af95acaccb852ee9198d8a [file] [log] [blame]
Yuta HIGUCHI6a643132014-03-18 22:39:27 -07001package net.onrc.onos.datastore.hazelcast;
2
3import java.io.IOException;
4import java.util.ArrayList;
5import java.util.Arrays;
6import java.util.List;
7import java.util.Set;
8import java.util.concurrent.atomic.AtomicLong;
9
10import net.onrc.onos.datastore.IKVTable;
11import net.onrc.onos.datastore.IKVTableID;
12import net.onrc.onos.datastore.ObjectDoesntExistException;
13import net.onrc.onos.datastore.ObjectExistsException;
14import net.onrc.onos.datastore.WrongVersionException;
15
16import org.slf4j.Logger;
17import org.slf4j.LoggerFactory;
18
19import com.hazelcast.core.IMap;
20import com.hazelcast.nio.ObjectDataInput;
21import com.hazelcast.nio.ObjectDataOutput;
22import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
23
24public class HZTable implements IKVTable, IKVTableID {
25 @SuppressWarnings("unused")
26 private static final Logger log = LoggerFactory.getLogger(HZTable.class);
27
28 // not sure how strict this should be managed
29 private static final AtomicLong initialVersion = new AtomicLong(HZClient.VERSION_NONEXISTENT);
30
31 /**
32 * generate a new initial version for an entry.
33 * @return initial value
34 */
35 protected static long getInitialVersion() {
36 long version = initialVersion.incrementAndGet();
37 if (version == HZClient.VERSION_NONEXISTENT) {
38 // used up whole 64bit space?
39 version = initialVersion.incrementAndGet();
40 }
41 return version;
42 }
43
44 /**
45 * increment version, avoiding VERSION_NONEXISTENT.
46 * @param version
47 * @return
48 */
49 protected static long getNextVersion(final long version) {
50 long nextVersion = version + 1;
51 if (nextVersion == HZClient.VERSION_NONEXISTENT) {
52 ++nextVersion;
53 }
54 return nextVersion;
55 }
56
57 static class VersionedValue implements IdentifiedDataSerializable {
58 private static final long serialVersionUID = -3149375966890712708L;
59
60 private byte[] value;
61 private long version;
62
63 protected VersionedValue() {
64 value = new byte[0];
65 version = HZClient.VERSION_NONEXISTENT;
66 }
67
68 public VersionedValue(final byte[] value, final long version) {
69 this.value = value;
70 this.version = version;
71 }
72
73 public byte[] getValue() {
74 return value;
75 }
76
77 public long getVersion() {
78 return version;
79 }
80
81 public void setValue(final byte[] value) {
82 this.value = value;
83 }
84
85 public void setNextVersion() {
86 this.version = getNextVersion(this.version);
87 }
88
89 @Override
90 public void writeData(final ObjectDataOutput out) throws IOException {
91 out.writeLong(version);
92 out.writeInt(value.length);
93 if (value.length > 0) {
94 out.write(value);
95 }
96 }
97
98 @Override
99 public void readData(final ObjectDataInput in) throws IOException {
100 version = in.readLong();
101 final int valueLen = in.readInt();
102 value = new byte[valueLen];
103 in.readFully(value);
104 }
105
106 @Override
107 public int getFactoryId() {
108 return VersionedValueSerializableFactory.FACTORY_ID;
109 }
110
111 @Override
112 public int getId() {
113 return VersionedValueSerializableFactory.VERSIONED_VALUE_ID;
114 }
115
116 @Override
117 public int hashCode() {
118 final int prime = 31;
119 int result = 1;
120 result = prime * result + (int) (version ^ (version >>> 32));
121 result = prime * result + Arrays.hashCode(value);
122 return result;
123 }
124
125 @Override
126 public boolean equals(final Object obj) {
127 if (this == obj) {
128 return true;
129 }
130 if (obj == null) {
131 return false;
132 }
133 if (getClass() != obj.getClass()) {
134 return false;
135 }
136 VersionedValue other = (VersionedValue) obj;
137 if (version != other.version) {
138 return false;
139 }
140 if (!Arrays.equals(value, other.value)) {
141 return false;
142 }
143 return true;
144 }
145 }
146
147 // TODO Refactor and extract common parts
148 public static class Entry implements IKVEntry {
149 final byte[] key;
150 byte[] value;
151 long version;
152
153 public Entry(final byte[] key, final byte[] value, final long version) {
154 this.key = key;
155 this.setValue(value);
156 this.setVersion(version);
157 }
158
159 public Entry(final byte[] key) {
160 this(key, null, HZClient.VERSION_NONEXISTENT);
161 }
162
163 @Override
164 public byte[] getKey() {
165 return key;
166 }
167
168 @Override
169 public byte[] getValue() {
170 return value;
171 }
172
173 @Override
174 public long getVersion() {
175 return version;
176 }
177
178 void setValue(final byte[] value) {
179 this.value = value;
180 }
181
182 void setVersion(final long version) {
183 this.version = version;
184 }
185 }
186
187
188
189 private final String mapName;
190 private final IMap<byte[], VersionedValue> map;
191
192 public HZTable(final String mapName, final IMap<byte[], VersionedValue> map) {
193 this.mapName = mapName;
194 this.map = map;
195 }
196
197 @Override
198 public String getTableName() {
199 return mapName;
200 }
201
202 @Override
203 public IKVTableID getTableId() {
204 return this;
205 }
206
207 @Override
208 public long create(final byte[] key, final byte[] value) throws ObjectExistsException {
209 final long version = getInitialVersion();
210 VersionedValue existing = map.putIfAbsent(key, new VersionedValue(value, version));
211 if (existing != null) {
212 throw new ObjectExistsException(this, key);
213 }
214 return version;
215 }
216
217 @Override
218 public long forceCreate(final byte[] key, final byte[] value) {
219 final long version = getInitialVersion();
220 map.set(key, new VersionedValue(value, version));
221 return version;
222 }
223
224 @Override
225 public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
226 final VersionedValue value = map.get(key);
227 if (value == null) {
228 throw new ObjectDoesntExistException(this, key);
229 }
230 return new Entry(key, value.getValue(), value.getVersion());
231 }
232
233 @Override
234 public long update(final byte[] key, final byte[] value, final long version)
235 throws ObjectDoesntExistException, WrongVersionException {
236
237 try {
238 map.lock(key);
239 final VersionedValue oldValue = map.get(key);
240 if (oldValue == null) {
241 throw new ObjectDoesntExistException(this, key);
242 }
243 if (oldValue.getVersion() != version) {
244 throw new WrongVersionException(this, key, version, oldValue.getVersion());
245 }
246 final long nextVersion = getNextVersion(version);
247 map.set(key, new VersionedValue(value, nextVersion));
248 return nextVersion;
249 } finally {
250 map.unlock(key);
251 }
252 }
253
254 @Override
255 public long update(final byte[] key, final byte[] value)
256 throws ObjectDoesntExistException {
257
258 try {
259 map.lock(key);
260 final VersionedValue valueInMap = map.get(key);
261 if (valueInMap == null) {
262 throw new ObjectDoesntExistException(this, key);
263 }
264 valueInMap.setValue(value);
265 valueInMap.setNextVersion();
266 map.set(key, valueInMap);
267 return valueInMap.getVersion();
268 } finally {
269 map.unlock(key);
270 }
271 }
272
273 @Override
274 public long delete(final byte[] key, final long version)
275 throws ObjectDoesntExistException, WrongVersionException {
276
277 try {
278 map.lock(key);
279 final VersionedValue oldValue = map.get(key);
280 if (oldValue == null) {
281 throw new ObjectDoesntExistException(this, key);
282 }
283 if (oldValue.getVersion() != version) {
284 throw new WrongVersionException(this, key, version, oldValue.getVersion());
285 }
286 map.delete(key);
287 return oldValue.getVersion();
288 } finally {
289 map.unlock(key);
290 }
291 }
292
293 @Override
294 public long forceDelete(final byte[] key) {
295 final VersionedValue valueInMap = map.remove(key);
296 if (valueInMap == null) {
297 return HZClient.VERSION_NONEXISTENT;
298 }
299 return valueInMap.getVersion();
300 }
301
302 @Override
303 public Iterable<IKVEntry> getAllEntries() {
304 final Set<IMap.Entry<byte[], VersionedValue>> entries = map.entrySet();
305 List<IKVEntry> entryList = new ArrayList<IKVTable.IKVEntry>(entries.size());
306 for (IMap.Entry<byte[], VersionedValue> entry : entries) {
307 entryList.add(new Entry(entry.getKey(), entry.getValue().getValue(), entry.getValue().getVersion()));
308 }
309 return entryList;
310 }
311
312 @Override
313 public String toString() {
314 return "[HZTable " + mapName + "]";
315 }
316
317 IMap<byte[], VersionedValue> getBackendMap() {
318 return this.map;
319 }
320
321 @Override
322 public long VERSION_NONEXISTENT() {
323 return HZClient.VERSION_NONEXISTENT;
324 }
325}