blob: 208f0e0604870e5e66db37cf4f7081146c77a71e [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Arrays;
20import java.util.Collection;
Madan Jampania89f8f92015-04-01 14:39:54 -070021import java.util.HashSet;
Madan Jampani94c23532015-02-05 17:40:01 -080022import java.util.Map;
23import java.util.Map.Entry;
Madan Jampanib5d72d52015-04-03 16:53:50 -070024import java.util.concurrent.atomic.AtomicLong;
Madan Jampani393e0f02015-02-12 07:35:39 +053025import java.util.stream.Collectors;
Madan Jampani94c23532015-02-05 17:40:01 -080026import java.util.Set;
27
Madan Jampani393e0f02015-02-12 07:35:39 +053028import org.apache.commons.lang3.tuple.Pair;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070029import org.onosproject.store.service.DatabaseUpdate;
30import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053031import org.onosproject.store.service.Versioned;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070032import org.onosproject.store.service.DatabaseUpdate.Type;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070033import com.google.common.base.Objects;
Madan Jampani393e0f02015-02-12 07:35:39 +053034import com.google.common.collect.ImmutableList;
35import com.google.common.collect.ImmutableSet;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070036import com.google.common.collect.Maps;
Madan Jampani393e0f02015-02-12 07:35:39 +053037
Madan Jampani94c23532015-02-05 17:40:01 -080038import net.kuujo.copycat.state.Initializer;
39import net.kuujo.copycat.state.StateContext;
40
41/**
42 * Default database state.
Madan Jampani94c23532015-02-05 17:40:01 -080043 */
Madan Jampanibff6d8f2015-03-31 16:53:47 -070044public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
Madan Jampani94c23532015-02-05 17:40:01 -080045 private Long nextVersion;
Madan Jampanib5d72d52015-04-03 16:53:50 -070046 private Map<String, AtomicLong> counters;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070047 private Map<String, Map<String, Versioned<byte[]>>> tables;
48
49 /**
50 * This locks map has a structure similar to the "tables" map above and
51 * holds all the provisional updates made during a transaction's prepare phase.
52 * The entry value is represented as the tuple: (transactionId, newValue)
53 * If newValue == null that signifies this update is attempting to
54 * delete the existing value.
55 * This map also serves as a lock on the entries that are being updated.
56 * The presence of a entry in this map indicates that element is
57 * participating in a transaction and is currently locked for updates.
58 */
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -070059 private Map<String, Map<String, Update>> locks;
Madan Jampani94c23532015-02-05 17:40:01 -080060
61 @Initializer
62 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -070063 public void init(StateContext<DatabaseState<String, byte[]>> context) {
Madan Jampanib5d72d52015-04-03 16:53:50 -070064 counters = context.get("counters");
65 if (counters == null) {
66 counters = Maps.newConcurrentMap();
67 context.put("counters", counters);
68 }
Madan Jampani94c23532015-02-05 17:40:01 -080069 tables = context.get("tables");
70 if (tables == null) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -070071 tables = Maps.newConcurrentMap();
Madan Jampani94c23532015-02-05 17:40:01 -080072 context.put("tables", tables);
73 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -070074 locks = context.get("locks");
75 if (locks == null) {
76 locks = Maps.newConcurrentMap();
77 context.put("locks", locks);
78 }
Madan Jampani94c23532015-02-05 17:40:01 -080079 nextVersion = context.get("nextVersion");
80 if (nextVersion == null) {
81 nextVersion = new Long(0);
82 context.put("nextVersion", nextVersion);
83 }
84 }
85
Madan Jampani94c23532015-02-05 17:40:01 -080086 @Override
Madan Jampania89f8f92015-04-01 14:39:54 -070087 public Set<String> tableNames() {
88 return new HashSet<>(tables.keySet());
89 }
90
91 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070092 public Map<String, Long> counters() {
93 Map<String, Long> counterMap = Maps.newHashMap();
94 counters.forEach((k, v) -> counterMap.put(k, v.get()));
95 return counterMap;
96 }
97
98 @Override
Madan Jampani94c23532015-02-05 17:40:01 -080099 public int size(String tableName) {
100 return getTableMap(tableName).size();
101 }
102
103 @Override
104 public boolean isEmpty(String tableName) {
105 return getTableMap(tableName).isEmpty();
106 }
107
108 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700109 public boolean containsKey(String tableName, String key) {
Madan Jampani94c23532015-02-05 17:40:01 -0800110 return getTableMap(tableName).containsKey(key);
111 }
112
113 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700114 public boolean containsValue(String tableName, byte[] value) {
115 return getTableMap(tableName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
Madan Jampani94c23532015-02-05 17:40:01 -0800116 }
117
118 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700119 public Versioned<byte[]> get(String tableName, String key) {
Madan Jampani94c23532015-02-05 17:40:01 -0800120 return getTableMap(tableName).get(key);
121 }
122
123 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700124 public Result<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
125 return isLockedForUpdates(tableName, key)
126 ? Result.locked()
127 : Result.ok(getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion)));
Madan Jampani94c23532015-02-05 17:40:01 -0800128 }
129
130 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700131 public Result<Versioned<byte[]>> remove(String tableName, String key) {
132 return isLockedForUpdates(tableName, key)
133 ? Result.locked()
134 : Result.ok(getTableMap(tableName).remove(key));
Madan Jampani94c23532015-02-05 17:40:01 -0800135 }
136
137 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700138 public Result<Void> clear(String tableName) {
139 if (areTransactionsInProgress(tableName)) {
140 return Result.locked();
141 }
Madan Jampani94c23532015-02-05 17:40:01 -0800142 getTableMap(tableName).clear();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700143 return Result.ok(null);
Madan Jampani94c23532015-02-05 17:40:01 -0800144 }
145
146 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700147 public Set<String> keySet(String tableName) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530148 return ImmutableSet.copyOf(getTableMap(tableName).keySet());
Madan Jampani94c23532015-02-05 17:40:01 -0800149 }
150
151 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700152 public Collection<Versioned<byte[]>> values(String tableName) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530153 return ImmutableList.copyOf(getTableMap(tableName).values());
Madan Jampani94c23532015-02-05 17:40:01 -0800154 }
155
156 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700157 public Set<Entry<String, Versioned<byte[]>>> entrySet(String tableName) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530158 return ImmutableSet.copyOf(getTableMap(tableName)
159 .entrySet()
160 .stream()
161 .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
162 .collect(Collectors.toSet()));
Madan Jampani94c23532015-02-05 17:40:01 -0800163 }
164
165 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700166 public Result<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
167 if (isLockedForUpdates(tableName, key)) {
168 return Result.locked();
Madan Jampani94c23532015-02-05 17:40:01 -0800169 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700170 Versioned<byte[]> existingValue = get(tableName, key);
171 Versioned<byte[]> currentValue = existingValue != null ? existingValue : put(tableName, key, value).value();
172 return Result.ok(currentValue);
Madan Jampani94c23532015-02-05 17:40:01 -0800173 }
174
175 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700176 public Result<Boolean> remove(String tableName, String key, byte[] value) {
177 if (isLockedForUpdates(tableName, key)) {
178 return Result.locked();
179 }
180 Versioned<byte[]> existing = get(tableName, key);
181 if (existing != null && Arrays.equals(existing.value(), value)) {
182 getTableMap(tableName).remove(key);
183 return Result.ok(true);
184 }
185 return Result.ok(false);
186 }
187
188 @Override
189 public Result<Boolean> remove(String tableName, String key, long version) {
190 if (isLockedForUpdates(tableName, key)) {
191 return Result.locked();
192 }
193 Versioned<byte[]> existing = get(tableName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800194 if (existing != null && existing.version() == version) {
195 remove(tableName, key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700196 return Result.ok(true);
Madan Jampani94c23532015-02-05 17:40:01 -0800197 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700198 return Result.ok(false);
Madan Jampani94c23532015-02-05 17:40:01 -0800199 }
200
201 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700202 public Result<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
203 if (isLockedForUpdates(tableName, key)) {
204 return Result.locked();
205 }
206 Versioned<byte[]> existing = get(tableName, key);
207 if (existing != null && Arrays.equals(existing.value(), oldValue)) {
Madan Jampani94c23532015-02-05 17:40:01 -0800208 put(tableName, key, newValue);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700209 return Result.ok(true);
Madan Jampani94c23532015-02-05 17:40:01 -0800210 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700211 return Result.ok(false);
Madan Jampani94c23532015-02-05 17:40:01 -0800212 }
213
214 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700215 public Result<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
216 if (isLockedForUpdates(tableName, key)) {
217 return Result.locked();
218 }
219 Versioned<byte[]> existing = get(tableName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800220 if (existing != null && existing.version() == oldVersion) {
221 put(tableName, key, newValue);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700222 return Result.ok(true);
223 }
224 return Result.ok(false);
225 }
226
227 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700228 public Long counterIncrementAndGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700229 return getCounter(counterName).incrementAndGet();
230 }
231
232 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700233 public Long counterGetAndIncrement(String counterName) {
234 return getCounter(counterName).getAndIncrement();
235 }
236
237 @Override
238 public Long counterGetAndAdd(String counterName, long delta) {
239 return getCounter(counterName).getAndAdd(delta);
240 }
241
242 @Override
243 public Long counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700244 return getCounter(counterName).get();
245 }
246
247 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700248 public boolean prepareAndCommit(Transaction transaction) {
249 if (prepare(transaction)) {
250 return commit(transaction);
251 }
252 return false;
253 }
254
255 @Override
256 public boolean prepare(Transaction transaction) {
257 if (transaction.updates().stream().anyMatch(update ->
258 isLockedByAnotherTransaction(update.tableName(),
259 update.key(),
260 transaction.id()))) {
261 return false;
262 }
263
264 if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
265 transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
Madan Jampani94c23532015-02-05 17:40:01 -0800266 return true;
267 }
268 return false;
269 }
270
271 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700272 public boolean commit(Transaction transaction) {
273 transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
274 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800275 }
276
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700277 @Override
278 public boolean rollback(Transaction transaction) {
279 transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
280 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800281 }
282
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700283 private Map<String, Versioned<byte[]>> getTableMap(String tableName) {
284 return tables.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
285 }
286
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700287 private Map<String, Update> getLockMap(String tableName) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700288 return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
289 }
290
Madan Jampanib5d72d52015-04-03 16:53:50 -0700291 private AtomicLong getCounter(String counterName) {
292 return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
293 }
294
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700295 private boolean isUpdatePossible(DatabaseUpdate update) {
296 Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
Madan Jampani94c23532015-02-05 17:40:01 -0800297 switch (update.type()) {
298 case PUT:
299 case REMOVE:
300 return true;
301 case PUT_IF_ABSENT:
302 return existingEntry == null;
303 case PUT_IF_VERSION_MATCH:
304 return existingEntry != null && existingEntry.version() == update.currentVersion();
305 case PUT_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700306 return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800307 case REMOVE_IF_VERSION_MATCH:
308 return existingEntry == null || existingEntry.version() == update.currentVersion();
309 case REMOVE_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700310 return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800311 default:
312 throw new IllegalStateException("Unsupported type: " + update.type());
313 }
314 }
315
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700316 private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700317 Map<String, Update> lockMap = getLockMap(update.tableName());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700318 switch (update.type()) {
319 case PUT:
320 case PUT_IF_ABSENT:
321 case PUT_IF_VERSION_MATCH:
322 case PUT_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700323 lockMap.put(update.key(), new Update(transactionId, update.value()));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700324 break;
325 case REMOVE:
326 case REMOVE_IF_VERSION_MATCH:
327 case REMOVE_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700328 lockMap.put(update.key(), new Update(transactionId, null));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700329 break;
330 default:
331 throw new IllegalStateException("Unsupported type: " + update.type());
Madan Jampani94c23532015-02-05 17:40:01 -0800332 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700333 }
334
335 private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
336 String tableName = update.tableName();
337 String key = update.key();
338 Type type = update.type();
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700339 Update provisionalUpdate = getLockMap(tableName).get(key);
340 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700341 getLockMap(tableName).remove(key);
342 } else {
343 return;
344 }
345
346 switch (type) {
347 case PUT:
348 case PUT_IF_ABSENT:
349 case PUT_IF_VERSION_MATCH:
350 case PUT_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700351 put(tableName, key, provisionalUpdate.value());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700352 break;
353 case REMOVE:
354 case REMOVE_IF_VERSION_MATCH:
355 case REMOVE_IF_VALUE_MATCH:
356 remove(tableName, key);
357 break;
358 default:
359 break;
360 }
361 }
362
363 private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
364 String tableName = update.tableName();
365 String key = update.key();
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700366 Update provisionalUpdate = getLockMap(tableName).get(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700367 if (provisionalUpdate == null) {
368 return;
369 }
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700370 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700371 getLockMap(tableName).remove(key);
372 }
373 }
374
375 private boolean isLockedByAnotherTransaction(String tableName, String key, long transactionId) {
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700376 Update update = getLockMap(tableName).get(key);
377 return update != null && !Objects.equal(transactionId, update.transactionId());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700378 }
379
380 private boolean isLockedForUpdates(String tableName, String key) {
381 return getLockMap(tableName).containsKey(key);
382 }
383
384 private boolean areTransactionsInProgress(String tableName) {
385 return !getLockMap(tableName).isEmpty();
Madan Jampani94c23532015-02-05 17:40:01 -0800386 }
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700387
388 private class Update {
389 private final long transactionId;
390 private final byte[] value;
391
392 public Update(long txId, byte[] value) {
393 this.transactionId = txId;
394 this.value = value;
395 }
396
397 public long transactionId() {
398 return this.transactionId;
399 }
400
401 public byte[] value() {
402 return this.value;
403 }
404 }
Madan Jampani94c23532015-02-05 17:40:01 -0800405}