blob: 7edeb44963378bdcd0b69e2921d8d63878956424 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Arrays;
20import java.util.Collection;
Madan Jampania89f8f92015-04-01 14:39:54 -070021import java.util.HashSet;
Madan Jampani94c23532015-02-05 17:40:01 -080022import java.util.Map;
23import java.util.Map.Entry;
Madan Jampanib5d72d52015-04-03 16:53:50 -070024import java.util.concurrent.atomic.AtomicLong;
Madan Jampani393e0f02015-02-12 07:35:39 +053025import java.util.stream.Collectors;
Madan Jampani94c23532015-02-05 17:40:01 -080026import java.util.Set;
27
Madan Jampani393e0f02015-02-12 07:35:39 +053028import org.apache.commons.lang3.tuple.Pair;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070029import org.onosproject.store.service.DatabaseUpdate;
30import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053031import org.onosproject.store.service.Versioned;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070032import org.onosproject.store.service.DatabaseUpdate.Type;
Madan Jampani346d4f52015-05-04 11:09:39 -070033
Madan Jampanibff6d8f2015-03-31 16:53:47 -070034import com.google.common.base.Objects;
Madan Jampani393e0f02015-02-12 07:35:39 +053035import com.google.common.collect.ImmutableList;
36import com.google.common.collect.ImmutableSet;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070037import com.google.common.collect.Maps;
Madan Jampani393e0f02015-02-12 07:35:39 +053038
Madan Jampani94c23532015-02-05 17:40:01 -080039import net.kuujo.copycat.state.Initializer;
40import net.kuujo.copycat.state.StateContext;
41
42/**
43 * Default database state.
Madan Jampani94c23532015-02-05 17:40:01 -080044 */
Madan Jampanibff6d8f2015-03-31 16:53:47 -070045public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
Madan Jampani94c23532015-02-05 17:40:01 -080046 private Long nextVersion;
Madan Jampanib5d72d52015-04-03 16:53:50 -070047 private Map<String, AtomicLong> counters;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070048 private Map<String, Map<String, Versioned<byte[]>>> tables;
49
50 /**
51 * This locks map has a structure similar to the "tables" map above and
52 * holds all the provisional updates made during a transaction's prepare phase.
53 * The entry value is represented as the tuple: (transactionId, newValue)
54 * If newValue == null that signifies this update is attempting to
55 * delete the existing value.
56 * This map also serves as a lock on the entries that are being updated.
57 * The presence of a entry in this map indicates that element is
58 * participating in a transaction and is currently locked for updates.
59 */
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -070060 private Map<String, Map<String, Update>> locks;
Madan Jampani94c23532015-02-05 17:40:01 -080061
62 @Initializer
63 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -070064 public void init(StateContext<DatabaseState<String, byte[]>> context) {
Madan Jampanib5d72d52015-04-03 16:53:50 -070065 counters = context.get("counters");
66 if (counters == null) {
67 counters = Maps.newConcurrentMap();
68 context.put("counters", counters);
69 }
Madan Jampani94c23532015-02-05 17:40:01 -080070 tables = context.get("tables");
71 if (tables == null) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -070072 tables = Maps.newConcurrentMap();
Madan Jampani94c23532015-02-05 17:40:01 -080073 context.put("tables", tables);
74 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -070075 locks = context.get("locks");
76 if (locks == null) {
77 locks = Maps.newConcurrentMap();
78 context.put("locks", locks);
79 }
Madan Jampani94c23532015-02-05 17:40:01 -080080 nextVersion = context.get("nextVersion");
81 if (nextVersion == null) {
82 nextVersion = new Long(0);
83 context.put("nextVersion", nextVersion);
84 }
85 }
86
Madan Jampani94c23532015-02-05 17:40:01 -080087 @Override
Madan Jampania89f8f92015-04-01 14:39:54 -070088 public Set<String> tableNames() {
89 return new HashSet<>(tables.keySet());
90 }
91
92 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070093 public Map<String, Long> counters() {
94 Map<String, Long> counterMap = Maps.newHashMap();
95 counters.forEach((k, v) -> counterMap.put(k, v.get()));
96 return counterMap;
97 }
98
99 @Override
Madan Jampani94c23532015-02-05 17:40:01 -0800100 public int size(String tableName) {
101 return getTableMap(tableName).size();
102 }
103
104 @Override
105 public boolean isEmpty(String tableName) {
106 return getTableMap(tableName).isEmpty();
107 }
108
109 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700110 public boolean containsKey(String tableName, String key) {
Madan Jampani94c23532015-02-05 17:40:01 -0800111 return getTableMap(tableName).containsKey(key);
112 }
113
114 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700115 public boolean containsValue(String tableName, byte[] value) {
116 return getTableMap(tableName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
Madan Jampani94c23532015-02-05 17:40:01 -0800117 }
118
119 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700120 public Versioned<byte[]> get(String tableName, String key) {
Madan Jampani94c23532015-02-05 17:40:01 -0800121 return getTableMap(tableName).get(key);
122 }
123
124 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700125 public Result<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
126 return isLockedForUpdates(tableName, key)
127 ? Result.locked()
128 : Result.ok(getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion)));
Madan Jampani94c23532015-02-05 17:40:01 -0800129 }
130
131 @Override
Madan Jampani346d4f52015-05-04 11:09:39 -0700132 public Result<UpdateResult<Versioned<byte[]>>> putAndGet(String tableName,
133 String key,
134 byte[] value) {
135 if (isLockedForUpdates(tableName, key)) {
136 return Result.locked();
137 } else {
138 Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
139 Versioned<byte[]> oldValue = getTableMap(tableName).put(key, newValue);
140 return Result.ok(new UpdateResult<>(true, oldValue, newValue));
141 }
142 }
143
144 @Override
145 public Result<UpdateResult<Versioned<byte[]>>> putIfAbsentAndGet(String tableName,
146 String key,
147 byte[] value) {
148 if (isLockedForUpdates(tableName, key)) {
149 return Result.locked();
150 }
151 Versioned<byte[]> currentValue = getTableMap(tableName).get(key);
152 if (currentValue != null) {
153 return Result.ok(new UpdateResult<>(false, currentValue, currentValue));
154 } else {
155 Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
156 getTableMap(tableName).put(key, newValue);
157 return Result.ok(new UpdateResult<>(true, null, newValue));
158 }
159 }
160
161 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700162 public Result<Versioned<byte[]>> remove(String tableName, String key) {
163 return isLockedForUpdates(tableName, key)
164 ? Result.locked()
165 : Result.ok(getTableMap(tableName).remove(key));
Madan Jampani94c23532015-02-05 17:40:01 -0800166 }
167
168 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700169 public Result<Void> clear(String tableName) {
170 if (areTransactionsInProgress(tableName)) {
171 return Result.locked();
172 }
Madan Jampani94c23532015-02-05 17:40:01 -0800173 getTableMap(tableName).clear();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700174 return Result.ok(null);
Madan Jampani94c23532015-02-05 17:40:01 -0800175 }
176
177 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700178 public Set<String> keySet(String tableName) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530179 return ImmutableSet.copyOf(getTableMap(tableName).keySet());
Madan Jampani94c23532015-02-05 17:40:01 -0800180 }
181
182 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700183 public Collection<Versioned<byte[]>> values(String tableName) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530184 return ImmutableList.copyOf(getTableMap(tableName).values());
Madan Jampani94c23532015-02-05 17:40:01 -0800185 }
186
187 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700188 public Set<Entry<String, Versioned<byte[]>>> entrySet(String tableName) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530189 return ImmutableSet.copyOf(getTableMap(tableName)
190 .entrySet()
191 .stream()
192 .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
193 .collect(Collectors.toSet()));
Madan Jampani94c23532015-02-05 17:40:01 -0800194 }
195
196 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700197 public Result<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
198 if (isLockedForUpdates(tableName, key)) {
199 return Result.locked();
Madan Jampani94c23532015-02-05 17:40:01 -0800200 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700201 Versioned<byte[]> existingValue = get(tableName, key);
202 Versioned<byte[]> currentValue = existingValue != null ? existingValue : put(tableName, key, value).value();
203 return Result.ok(currentValue);
Madan Jampani94c23532015-02-05 17:40:01 -0800204 }
205
206 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700207 public Result<Boolean> remove(String tableName, String key, byte[] value) {
208 if (isLockedForUpdates(tableName, key)) {
209 return Result.locked();
210 }
211 Versioned<byte[]> existing = get(tableName, key);
212 if (existing != null && Arrays.equals(existing.value(), value)) {
213 getTableMap(tableName).remove(key);
214 return Result.ok(true);
215 }
216 return Result.ok(false);
217 }
218
219 @Override
220 public Result<Boolean> remove(String tableName, String key, long version) {
221 if (isLockedForUpdates(tableName, key)) {
222 return Result.locked();
223 }
224 Versioned<byte[]> existing = get(tableName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800225 if (existing != null && existing.version() == version) {
226 remove(tableName, key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700227 return Result.ok(true);
Madan Jampani94c23532015-02-05 17:40:01 -0800228 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700229 return Result.ok(false);
Madan Jampani94c23532015-02-05 17:40:01 -0800230 }
231
232 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700233 public Result<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
234 if (isLockedForUpdates(tableName, key)) {
235 return Result.locked();
236 }
237 Versioned<byte[]> existing = get(tableName, key);
238 if (existing != null && Arrays.equals(existing.value(), oldValue)) {
Madan Jampani94c23532015-02-05 17:40:01 -0800239 put(tableName, key, newValue);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700240 return Result.ok(true);
Madan Jampani94c23532015-02-05 17:40:01 -0800241 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700242 return Result.ok(false);
Madan Jampani94c23532015-02-05 17:40:01 -0800243 }
244
245 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700246 public Result<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
247 if (isLockedForUpdates(tableName, key)) {
248 return Result.locked();
249 }
250 Versioned<byte[]> existing = get(tableName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800251 if (existing != null && existing.version() == oldVersion) {
252 put(tableName, key, newValue);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700253 return Result.ok(true);
254 }
255 return Result.ok(false);
256 }
257
258 @Override
Madan Jampani346d4f52015-05-04 11:09:39 -0700259 public Result<UpdateResult<Versioned<byte[]>>> replaceAndGet(
260 String tableName, String key, long oldVersion, byte[] newValue) {
261 if (isLockedForUpdates(tableName, key)) {
262 return Result.locked();
263 }
264 boolean updated = false;
265 Versioned<byte[]> previous = get(tableName, key);
266 Versioned<byte[]> current = previous;
267 if (previous != null && previous.version() == oldVersion) {
268 current = new Versioned<>(newValue, ++nextVersion);
269 getTableMap(tableName).put(key, current);
270 updated = true;
271 }
272 return Result.ok(new UpdateResult<>(updated, previous, current));
273 }
274
275 @Override
Madan Jampani55ac1342015-05-04 19:05:04 -0700276 public Long counterAddAndGet(String counterName, long delta) {
277 return getCounter(counterName).addAndGet(delta);
Madan Jampani04aeb452015-05-02 16:12:24 -0700278 }
279
280 @Override
281 public Long counterGetAndAdd(String counterName, long delta) {
282 return getCounter(counterName).getAndAdd(delta);
283 }
284
285 @Override
286 public Long counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700287 return getCounter(counterName).get();
288 }
289
290 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700291 public boolean prepareAndCommit(Transaction transaction) {
292 if (prepare(transaction)) {
293 return commit(transaction);
294 }
295 return false;
296 }
297
298 @Override
299 public boolean prepare(Transaction transaction) {
300 if (transaction.updates().stream().anyMatch(update ->
301 isLockedByAnotherTransaction(update.tableName(),
302 update.key(),
303 transaction.id()))) {
304 return false;
305 }
306
307 if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
308 transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
Madan Jampani94c23532015-02-05 17:40:01 -0800309 return true;
310 }
311 return false;
312 }
313
314 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700315 public boolean commit(Transaction transaction) {
316 transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
317 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800318 }
319
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700320 @Override
321 public boolean rollback(Transaction transaction) {
322 transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
323 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800324 }
325
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700326 private Map<String, Versioned<byte[]>> getTableMap(String tableName) {
327 return tables.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
328 }
329
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700330 private Map<String, Update> getLockMap(String tableName) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700331 return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
332 }
333
Madan Jampanib5d72d52015-04-03 16:53:50 -0700334 private AtomicLong getCounter(String counterName) {
335 return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
336 }
337
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700338 private boolean isUpdatePossible(DatabaseUpdate update) {
339 Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
Madan Jampani94c23532015-02-05 17:40:01 -0800340 switch (update.type()) {
341 case PUT:
342 case REMOVE:
343 return true;
344 case PUT_IF_ABSENT:
345 return existingEntry == null;
346 case PUT_IF_VERSION_MATCH:
347 return existingEntry != null && existingEntry.version() == update.currentVersion();
348 case PUT_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700349 return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800350 case REMOVE_IF_VERSION_MATCH:
351 return existingEntry == null || existingEntry.version() == update.currentVersion();
352 case REMOVE_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700353 return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800354 default:
355 throw new IllegalStateException("Unsupported type: " + update.type());
356 }
357 }
358
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700359 private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700360 Map<String, Update> lockMap = getLockMap(update.tableName());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700361 switch (update.type()) {
362 case PUT:
363 case PUT_IF_ABSENT:
364 case PUT_IF_VERSION_MATCH:
365 case PUT_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700366 lockMap.put(update.key(), new Update(transactionId, update.value()));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700367 break;
368 case REMOVE:
369 case REMOVE_IF_VERSION_MATCH:
370 case REMOVE_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700371 lockMap.put(update.key(), new Update(transactionId, null));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700372 break;
373 default:
374 throw new IllegalStateException("Unsupported type: " + update.type());
Madan Jampani94c23532015-02-05 17:40:01 -0800375 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700376 }
377
378 private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
379 String tableName = update.tableName();
380 String key = update.key();
381 Type type = update.type();
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700382 Update provisionalUpdate = getLockMap(tableName).get(key);
383 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700384 getLockMap(tableName).remove(key);
385 } else {
386 return;
387 }
388
389 switch (type) {
390 case PUT:
391 case PUT_IF_ABSENT:
392 case PUT_IF_VERSION_MATCH:
393 case PUT_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700394 put(tableName, key, provisionalUpdate.value());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700395 break;
396 case REMOVE:
397 case REMOVE_IF_VERSION_MATCH:
398 case REMOVE_IF_VALUE_MATCH:
399 remove(tableName, key);
400 break;
401 default:
402 break;
403 }
404 }
405
406 private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
407 String tableName = update.tableName();
408 String key = update.key();
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700409 Update provisionalUpdate = getLockMap(tableName).get(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700410 if (provisionalUpdate == null) {
411 return;
412 }
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700413 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700414 getLockMap(tableName).remove(key);
415 }
416 }
417
418 private boolean isLockedByAnotherTransaction(String tableName, String key, long transactionId) {
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700419 Update update = getLockMap(tableName).get(key);
420 return update != null && !Objects.equal(transactionId, update.transactionId());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700421 }
422
423 private boolean isLockedForUpdates(String tableName, String key) {
424 return getLockMap(tableName).containsKey(key);
425 }
426
427 private boolean areTransactionsInProgress(String tableName) {
428 return !getLockMap(tableName).isEmpty();
Madan Jampani94c23532015-02-05 17:40:01 -0800429 }
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700430
431 private class Update {
432 private final long transactionId;
433 private final byte[] value;
434
435 public Update(long txId, byte[] value) {
436 this.transactionId = txId;
437 this.value = value;
438 }
439
440 public long transactionId() {
441 return this.transactionId;
442 }
443
444 public byte[] value() {
445 return this.value;
446 }
447 }
Madan Jampani94c23532015-02-05 17:40:01 -0800448}