blob: bad3782be78229ef6c49d90a4c81002885ee89d7 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Arrays;
20import java.util.Collection;
Madan Jampania89f8f92015-04-01 14:39:54 -070021import java.util.HashSet;
Madan Jampani94c23532015-02-05 17:40:01 -080022import java.util.Map;
23import java.util.Map.Entry;
Madan Jampanib5d72d52015-04-03 16:53:50 -070024import java.util.concurrent.atomic.AtomicLong;
Madan Jampani393e0f02015-02-12 07:35:39 +053025import java.util.stream.Collectors;
Madan Jampani94c23532015-02-05 17:40:01 -080026import java.util.Set;
27
Madan Jampani393e0f02015-02-12 07:35:39 +053028import org.apache.commons.lang3.tuple.Pair;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070029import org.onosproject.store.service.DatabaseUpdate;
30import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053031import org.onosproject.store.service.Versioned;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070032import org.onosproject.store.service.DatabaseUpdate.Type;
Madan Jampani393e0f02015-02-12 07:35:39 +053033
Madan Jampanibff6d8f2015-03-31 16:53:47 -070034import com.google.common.base.Objects;
Madan Jampani393e0f02015-02-12 07:35:39 +053035import com.google.common.collect.ImmutableList;
36import com.google.common.collect.ImmutableSet;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070037import com.google.common.collect.Maps;
Madan Jampani393e0f02015-02-12 07:35:39 +053038
Madan Jampani94c23532015-02-05 17:40:01 -080039import net.kuujo.copycat.state.Initializer;
40import net.kuujo.copycat.state.StateContext;
41
42/**
43 * Default database state.
Madan Jampani94c23532015-02-05 17:40:01 -080044 */
Madan Jampanibff6d8f2015-03-31 16:53:47 -070045public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
Madan Jampani94c23532015-02-05 17:40:01 -080046 private Long nextVersion;
Madan Jampanib5d72d52015-04-03 16:53:50 -070047 private Map<String, AtomicLong> counters;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070048 private Map<String, Map<String, Versioned<byte[]>>> tables;
49
50 /**
51 * This locks map has a structure similar to the "tables" map above and
52 * holds all the provisional updates made during a transaction's prepare phase.
53 * The entry value is represented as the tuple: (transactionId, newValue)
54 * If newValue == null that signifies this update is attempting to
55 * delete the existing value.
56 * This map also serves as a lock on the entries that are being updated.
57 * The presence of a entry in this map indicates that element is
58 * participating in a transaction and is currently locked for updates.
59 */
60 private Map<String, Map<String, Pair<Long, byte[]>>> locks;
Madan Jampani94c23532015-02-05 17:40:01 -080061
62 @Initializer
63 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -070064 public void init(StateContext<DatabaseState<String, byte[]>> context) {
Madan Jampanib5d72d52015-04-03 16:53:50 -070065 counters = context.get("counters");
66 if (counters == null) {
67 counters = Maps.newConcurrentMap();
68 context.put("counters", counters);
69 }
Madan Jampani94c23532015-02-05 17:40:01 -080070 tables = context.get("tables");
71 if (tables == null) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -070072 tables = Maps.newConcurrentMap();
Madan Jampani94c23532015-02-05 17:40:01 -080073 context.put("tables", tables);
74 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -070075 locks = context.get("locks");
76 if (locks == null) {
77 locks = Maps.newConcurrentMap();
78 context.put("locks", locks);
79 }
Madan Jampani94c23532015-02-05 17:40:01 -080080 nextVersion = context.get("nextVersion");
81 if (nextVersion == null) {
82 nextVersion = new Long(0);
83 context.put("nextVersion", nextVersion);
84 }
85 }
86
Madan Jampani94c23532015-02-05 17:40:01 -080087 @Override
Madan Jampania89f8f92015-04-01 14:39:54 -070088 public Set<String> tableNames() {
89 return new HashSet<>(tables.keySet());
90 }
91
92 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070093 public Map<String, Long> counters() {
94 Map<String, Long> counterMap = Maps.newHashMap();
95 counters.forEach((k, v) -> counterMap.put(k, v.get()));
96 return counterMap;
97 }
98
99 @Override
Madan Jampani94c23532015-02-05 17:40:01 -0800100 public int size(String tableName) {
101 return getTableMap(tableName).size();
102 }
103
104 @Override
105 public boolean isEmpty(String tableName) {
106 return getTableMap(tableName).isEmpty();
107 }
108
109 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700110 public boolean containsKey(String tableName, String key) {
Madan Jampani94c23532015-02-05 17:40:01 -0800111 return getTableMap(tableName).containsKey(key);
112 }
113
114 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700115 public boolean containsValue(String tableName, byte[] value) {
116 return getTableMap(tableName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
Madan Jampani94c23532015-02-05 17:40:01 -0800117 }
118
119 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700120 public Versioned<byte[]> get(String tableName, String key) {
Madan Jampani94c23532015-02-05 17:40:01 -0800121 return getTableMap(tableName).get(key);
122 }
123
124 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700125 public Result<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
126 return isLockedForUpdates(tableName, key)
127 ? Result.locked()
128 : Result.ok(getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion)));
Madan Jampani94c23532015-02-05 17:40:01 -0800129 }
130
131 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700132 public Result<Versioned<byte[]>> remove(String tableName, String key) {
133 return isLockedForUpdates(tableName, key)
134 ? Result.locked()
135 : Result.ok(getTableMap(tableName).remove(key));
Madan Jampani94c23532015-02-05 17:40:01 -0800136 }
137
138 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700139 public Result<Void> clear(String tableName) {
140 if (areTransactionsInProgress(tableName)) {
141 return Result.locked();
142 }
Madan Jampani94c23532015-02-05 17:40:01 -0800143 getTableMap(tableName).clear();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700144 return Result.ok(null);
Madan Jampani94c23532015-02-05 17:40:01 -0800145 }
146
147 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700148 public Set<String> keySet(String tableName) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530149 return ImmutableSet.copyOf(getTableMap(tableName).keySet());
Madan Jampani94c23532015-02-05 17:40:01 -0800150 }
151
152 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700153 public Collection<Versioned<byte[]>> values(String tableName) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530154 return ImmutableList.copyOf(getTableMap(tableName).values());
Madan Jampani94c23532015-02-05 17:40:01 -0800155 }
156
157 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700158 public Set<Entry<String, Versioned<byte[]>>> entrySet(String tableName) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530159 return ImmutableSet.copyOf(getTableMap(tableName)
160 .entrySet()
161 .stream()
162 .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
163 .collect(Collectors.toSet()));
Madan Jampani94c23532015-02-05 17:40:01 -0800164 }
165
166 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700167 public Result<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
168 if (isLockedForUpdates(tableName, key)) {
169 return Result.locked();
Madan Jampani94c23532015-02-05 17:40:01 -0800170 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700171 Versioned<byte[]> existingValue = get(tableName, key);
172 Versioned<byte[]> currentValue = existingValue != null ? existingValue : put(tableName, key, value).value();
173 return Result.ok(currentValue);
Madan Jampani94c23532015-02-05 17:40:01 -0800174 }
175
176 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700177 public Result<Boolean> remove(String tableName, String key, byte[] value) {
178 if (isLockedForUpdates(tableName, key)) {
179 return Result.locked();
180 }
181 Versioned<byte[]> existing = get(tableName, key);
182 if (existing != null && Arrays.equals(existing.value(), value)) {
183 getTableMap(tableName).remove(key);
184 return Result.ok(true);
185 }
186 return Result.ok(false);
187 }
188
189 @Override
190 public Result<Boolean> remove(String tableName, String key, long version) {
191 if (isLockedForUpdates(tableName, key)) {
192 return Result.locked();
193 }
194 Versioned<byte[]> existing = get(tableName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800195 if (existing != null && existing.version() == version) {
196 remove(tableName, key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700197 return Result.ok(true);
Madan Jampani94c23532015-02-05 17:40:01 -0800198 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700199 return Result.ok(false);
Madan Jampani94c23532015-02-05 17:40:01 -0800200 }
201
202 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700203 public Result<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
204 if (isLockedForUpdates(tableName, key)) {
205 return Result.locked();
206 }
207 Versioned<byte[]> existing = get(tableName, key);
208 if (existing != null && Arrays.equals(existing.value(), oldValue)) {
Madan Jampani94c23532015-02-05 17:40:01 -0800209 put(tableName, key, newValue);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700210 return Result.ok(true);
Madan Jampani94c23532015-02-05 17:40:01 -0800211 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700212 return Result.ok(false);
Madan Jampani94c23532015-02-05 17:40:01 -0800213 }
214
215 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700216 public Result<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
217 if (isLockedForUpdates(tableName, key)) {
218 return Result.locked();
219 }
220 Versioned<byte[]> existing = get(tableName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800221 if (existing != null && existing.version() == oldVersion) {
222 put(tableName, key, newValue);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700223 return Result.ok(true);
224 }
225 return Result.ok(false);
226 }
227
228 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -0700229 public Long nextValue(String counterName) {
230 return getCounter(counterName).incrementAndGet();
231 }
232
233 @Override
234 public Long currentValue(String counterName) {
235 return getCounter(counterName).get();
236 }
237
238 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700239 public boolean prepareAndCommit(Transaction transaction) {
240 if (prepare(transaction)) {
241 return commit(transaction);
242 }
243 return false;
244 }
245
246 @Override
247 public boolean prepare(Transaction transaction) {
248 if (transaction.updates().stream().anyMatch(update ->
249 isLockedByAnotherTransaction(update.tableName(),
250 update.key(),
251 transaction.id()))) {
252 return false;
253 }
254
255 if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
256 transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
Madan Jampani94c23532015-02-05 17:40:01 -0800257 return true;
258 }
259 return false;
260 }
261
262 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700263 public boolean commit(Transaction transaction) {
264 transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
265 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800266 }
267
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700268 @Override
269 public boolean rollback(Transaction transaction) {
270 transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
271 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800272 }
273
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700274 private Map<String, Versioned<byte[]>> getTableMap(String tableName) {
275 return tables.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
276 }
277
278 private Map<String, Pair<Long, byte[]>> getLockMap(String tableName) {
279 return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
280 }
281
Madan Jampanib5d72d52015-04-03 16:53:50 -0700282 private AtomicLong getCounter(String counterName) {
283 return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
284 }
285
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700286 private boolean isUpdatePossible(DatabaseUpdate update) {
287 Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
Madan Jampani94c23532015-02-05 17:40:01 -0800288 switch (update.type()) {
289 case PUT:
290 case REMOVE:
291 return true;
292 case PUT_IF_ABSENT:
293 return existingEntry == null;
294 case PUT_IF_VERSION_MATCH:
295 return existingEntry != null && existingEntry.version() == update.currentVersion();
296 case PUT_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700297 return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800298 case REMOVE_IF_VERSION_MATCH:
299 return existingEntry == null || existingEntry.version() == update.currentVersion();
300 case REMOVE_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700301 return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800302 default:
303 throw new IllegalStateException("Unsupported type: " + update.type());
304 }
305 }
306
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700307 private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
308 Map<String, Pair<Long, byte[]>> lockMap = getLockMap(update.tableName());
309 switch (update.type()) {
310 case PUT:
311 case PUT_IF_ABSENT:
312 case PUT_IF_VERSION_MATCH:
313 case PUT_IF_VALUE_MATCH:
314 lockMap.put(update.key(), Pair.of(transactionId, update.value()));
315 break;
316 case REMOVE:
317 case REMOVE_IF_VERSION_MATCH:
318 case REMOVE_IF_VALUE_MATCH:
319 lockMap.put(update.key(), null);
320 break;
321 default:
322 throw new IllegalStateException("Unsupported type: " + update.type());
Madan Jampani94c23532015-02-05 17:40:01 -0800323 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700324 }
325
326 private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
327 String tableName = update.tableName();
328 String key = update.key();
329 Type type = update.type();
330 Pair<Long, byte[]> provisionalUpdate = getLockMap(tableName).get(key);
331 if (Objects.equal(transactionId, provisionalUpdate.getLeft())) {
332 getLockMap(tableName).remove(key);
333 } else {
334 return;
335 }
336
337 switch (type) {
338 case PUT:
339 case PUT_IF_ABSENT:
340 case PUT_IF_VERSION_MATCH:
341 case PUT_IF_VALUE_MATCH:
342 put(tableName, key, provisionalUpdate.getRight());
343 break;
344 case REMOVE:
345 case REMOVE_IF_VERSION_MATCH:
346 case REMOVE_IF_VALUE_MATCH:
347 remove(tableName, key);
348 break;
349 default:
350 break;
351 }
352 }
353
354 private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
355 String tableName = update.tableName();
356 String key = update.key();
357 Pair<Long, byte[]> provisionalUpdate = getLockMap(tableName).get(key);
358 if (provisionalUpdate == null) {
359 return;
360 }
361 if (Objects.equal(transactionId, provisionalUpdate.getLeft())) {
362 getLockMap(tableName).remove(key);
363 }
364 }
365
366 private boolean isLockedByAnotherTransaction(String tableName, String key, long transactionId) {
367 Pair<Long, byte[]> update = getLockMap(tableName).get(key);
368 return update != null && !Objects.equal(transactionId, update.getLeft());
369 }
370
371 private boolean isLockedForUpdates(String tableName, String key) {
372 return getLockMap(tableName).containsKey(key);
373 }
374
375 private boolean areTransactionsInProgress(String tableName) {
376 return !getLockMap(tableName).isEmpty();
Madan Jampani94c23532015-02-05 17:40:01 -0800377 }
378}