blob: c190a28acc9714ccbbb4f3b107bfdf6cee1360f0 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Arrays;
20import java.util.Collection;
Madan Jampania89f8f92015-04-01 14:39:54 -070021import java.util.HashSet;
Madan Jampani94c23532015-02-05 17:40:01 -080022import java.util.Map;
23import java.util.Map.Entry;
Madan Jampani393e0f02015-02-12 07:35:39 +053024import java.util.stream.Collectors;
Madan Jampani94c23532015-02-05 17:40:01 -080025import java.util.Set;
26
Madan Jampani393e0f02015-02-12 07:35:39 +053027import org.apache.commons.lang3.tuple.Pair;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070028import org.onosproject.store.service.DatabaseUpdate;
29import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053030import org.onosproject.store.service.Versioned;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070031import org.onosproject.store.service.DatabaseUpdate.Type;
Madan Jampani393e0f02015-02-12 07:35:39 +053032
Madan Jampanibff6d8f2015-03-31 16:53:47 -070033import com.google.common.base.Objects;
Madan Jampani393e0f02015-02-12 07:35:39 +053034import com.google.common.collect.ImmutableList;
35import com.google.common.collect.ImmutableSet;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070036import com.google.common.collect.Maps;
Madan Jampani393e0f02015-02-12 07:35:39 +053037
Madan Jampani94c23532015-02-05 17:40:01 -080038import net.kuujo.copycat.state.Initializer;
39import net.kuujo.copycat.state.StateContext;
40
41/**
42 * Default database state.
Madan Jampani94c23532015-02-05 17:40:01 -080043 */
Madan Jampanibff6d8f2015-03-31 16:53:47 -070044public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
Madan Jampani94c23532015-02-05 17:40:01 -080045 private Long nextVersion;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070046 private Map<String, Map<String, Versioned<byte[]>>> tables;
47
48 /**
49 * This locks map has a structure similar to the "tables" map above and
50 * holds all the provisional updates made during a transaction's prepare phase.
51 * The entry value is represented as the tuple: (transactionId, newValue)
52 * If newValue == null that signifies this update is attempting to
53 * delete the existing value.
54 * This map also serves as a lock on the entries that are being updated.
55 * The presence of a entry in this map indicates that element is
56 * participating in a transaction and is currently locked for updates.
57 */
58 private Map<String, Map<String, Pair<Long, byte[]>>> locks;
Madan Jampani94c23532015-02-05 17:40:01 -080059
60 @Initializer
61 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -070062 public void init(StateContext<DatabaseState<String, byte[]>> context) {
Madan Jampani94c23532015-02-05 17:40:01 -080063 tables = context.get("tables");
64 if (tables == null) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -070065 tables = Maps.newConcurrentMap();
Madan Jampani94c23532015-02-05 17:40:01 -080066 context.put("tables", tables);
67 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -070068 locks = context.get("locks");
69 if (locks == null) {
70 locks = Maps.newConcurrentMap();
71 context.put("locks", locks);
72 }
Madan Jampani94c23532015-02-05 17:40:01 -080073 nextVersion = context.get("nextVersion");
74 if (nextVersion == null) {
75 nextVersion = new Long(0);
76 context.put("nextVersion", nextVersion);
77 }
78 }
79
Madan Jampani94c23532015-02-05 17:40:01 -080080 @Override
Madan Jampania89f8f92015-04-01 14:39:54 -070081 public Set<String> tableNames() {
82 return new HashSet<>(tables.keySet());
83 }
84
85 @Override
Madan Jampani94c23532015-02-05 17:40:01 -080086 public int size(String tableName) {
87 return getTableMap(tableName).size();
88 }
89
90 @Override
91 public boolean isEmpty(String tableName) {
92 return getTableMap(tableName).isEmpty();
93 }
94
95 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -070096 public boolean containsKey(String tableName, String key) {
Madan Jampani94c23532015-02-05 17:40:01 -080097 return getTableMap(tableName).containsKey(key);
98 }
99
100 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700101 public boolean containsValue(String tableName, byte[] value) {
102 return getTableMap(tableName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
Madan Jampani94c23532015-02-05 17:40:01 -0800103 }
104
105 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700106 public Versioned<byte[]> get(String tableName, String key) {
Madan Jampani94c23532015-02-05 17:40:01 -0800107 return getTableMap(tableName).get(key);
108 }
109
110 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700111 public Result<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
112 return isLockedForUpdates(tableName, key)
113 ? Result.locked()
114 : Result.ok(getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion)));
Madan Jampani94c23532015-02-05 17:40:01 -0800115 }
116
117 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700118 public Result<Versioned<byte[]>> remove(String tableName, String key) {
119 return isLockedForUpdates(tableName, key)
120 ? Result.locked()
121 : Result.ok(getTableMap(tableName).remove(key));
Madan Jampani94c23532015-02-05 17:40:01 -0800122 }
123
124 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700125 public Result<Void> clear(String tableName) {
126 if (areTransactionsInProgress(tableName)) {
127 return Result.locked();
128 }
Madan Jampani94c23532015-02-05 17:40:01 -0800129 getTableMap(tableName).clear();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700130 return Result.ok(null);
Madan Jampani94c23532015-02-05 17:40:01 -0800131 }
132
133 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700134 public Set<String> keySet(String tableName) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530135 return ImmutableSet.copyOf(getTableMap(tableName).keySet());
Madan Jampani94c23532015-02-05 17:40:01 -0800136 }
137
138 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700139 public Collection<Versioned<byte[]>> values(String tableName) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530140 return ImmutableList.copyOf(getTableMap(tableName).values());
Madan Jampani94c23532015-02-05 17:40:01 -0800141 }
142
143 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700144 public Set<Entry<String, Versioned<byte[]>>> entrySet(String tableName) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530145 return ImmutableSet.copyOf(getTableMap(tableName)
146 .entrySet()
147 .stream()
148 .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
149 .collect(Collectors.toSet()));
Madan Jampani94c23532015-02-05 17:40:01 -0800150 }
151
152 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700153 public Result<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
154 if (isLockedForUpdates(tableName, key)) {
155 return Result.locked();
Madan Jampani94c23532015-02-05 17:40:01 -0800156 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700157 Versioned<byte[]> existingValue = get(tableName, key);
158 Versioned<byte[]> currentValue = existingValue != null ? existingValue : put(tableName, key, value).value();
159 return Result.ok(currentValue);
Madan Jampani94c23532015-02-05 17:40:01 -0800160 }
161
162 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700163 public Result<Boolean> remove(String tableName, String key, byte[] value) {
164 if (isLockedForUpdates(tableName, key)) {
165 return Result.locked();
166 }
167 Versioned<byte[]> existing = get(tableName, key);
168 if (existing != null && Arrays.equals(existing.value(), value)) {
169 getTableMap(tableName).remove(key);
170 return Result.ok(true);
171 }
172 return Result.ok(false);
173 }
174
175 @Override
176 public Result<Boolean> remove(String tableName, String key, long version) {
177 if (isLockedForUpdates(tableName, key)) {
178 return Result.locked();
179 }
180 Versioned<byte[]> existing = get(tableName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800181 if (existing != null && existing.version() == version) {
182 remove(tableName, key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700183 return Result.ok(true);
Madan Jampani94c23532015-02-05 17:40:01 -0800184 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700185 return Result.ok(false);
Madan Jampani94c23532015-02-05 17:40:01 -0800186 }
187
188 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700189 public Result<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
190 if (isLockedForUpdates(tableName, key)) {
191 return Result.locked();
192 }
193 Versioned<byte[]> existing = get(tableName, key);
194 if (existing != null && Arrays.equals(existing.value(), oldValue)) {
Madan Jampani94c23532015-02-05 17:40:01 -0800195 put(tableName, key, newValue);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700196 return Result.ok(true);
Madan Jampani94c23532015-02-05 17:40:01 -0800197 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700198 return Result.ok(false);
Madan Jampani94c23532015-02-05 17:40:01 -0800199 }
200
201 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700202 public Result<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
203 if (isLockedForUpdates(tableName, key)) {
204 return Result.locked();
205 }
206 Versioned<byte[]> existing = get(tableName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800207 if (existing != null && existing.version() == oldVersion) {
208 put(tableName, key, newValue);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700209 return Result.ok(true);
210 }
211 return Result.ok(false);
212 }
213
214 @Override
215 public boolean prepareAndCommit(Transaction transaction) {
216 if (prepare(transaction)) {
217 return commit(transaction);
218 }
219 return false;
220 }
221
222 @Override
223 public boolean prepare(Transaction transaction) {
224 if (transaction.updates().stream().anyMatch(update ->
225 isLockedByAnotherTransaction(update.tableName(),
226 update.key(),
227 transaction.id()))) {
228 return false;
229 }
230
231 if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
232 transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
Madan Jampani94c23532015-02-05 17:40:01 -0800233 return true;
234 }
235 return false;
236 }
237
238 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700239 public boolean commit(Transaction transaction) {
240 transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
241 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800242 }
243
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700244 @Override
245 public boolean rollback(Transaction transaction) {
246 transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
247 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800248 }
249
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700250 private Map<String, Versioned<byte[]>> getTableMap(String tableName) {
251 return tables.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
252 }
253
254 private Map<String, Pair<Long, byte[]>> getLockMap(String tableName) {
255 return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
256 }
257
258 private boolean isUpdatePossible(DatabaseUpdate update) {
259 Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
Madan Jampani94c23532015-02-05 17:40:01 -0800260 switch (update.type()) {
261 case PUT:
262 case REMOVE:
263 return true;
264 case PUT_IF_ABSENT:
265 return existingEntry == null;
266 case PUT_IF_VERSION_MATCH:
267 return existingEntry != null && existingEntry.version() == update.currentVersion();
268 case PUT_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700269 return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800270 case REMOVE_IF_VERSION_MATCH:
271 return existingEntry == null || existingEntry.version() == update.currentVersion();
272 case REMOVE_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700273 return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800274 default:
275 throw new IllegalStateException("Unsupported type: " + update.type());
276 }
277 }
278
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700279 private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
280 Map<String, Pair<Long, byte[]>> lockMap = getLockMap(update.tableName());
281 switch (update.type()) {
282 case PUT:
283 case PUT_IF_ABSENT:
284 case PUT_IF_VERSION_MATCH:
285 case PUT_IF_VALUE_MATCH:
286 lockMap.put(update.key(), Pair.of(transactionId, update.value()));
287 break;
288 case REMOVE:
289 case REMOVE_IF_VERSION_MATCH:
290 case REMOVE_IF_VALUE_MATCH:
291 lockMap.put(update.key(), null);
292 break;
293 default:
294 throw new IllegalStateException("Unsupported type: " + update.type());
Madan Jampani94c23532015-02-05 17:40:01 -0800295 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700296 }
297
298 private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
299 String tableName = update.tableName();
300 String key = update.key();
301 Type type = update.type();
302 Pair<Long, byte[]> provisionalUpdate = getLockMap(tableName).get(key);
303 if (Objects.equal(transactionId, provisionalUpdate.getLeft())) {
304 getLockMap(tableName).remove(key);
305 } else {
306 return;
307 }
308
309 switch (type) {
310 case PUT:
311 case PUT_IF_ABSENT:
312 case PUT_IF_VERSION_MATCH:
313 case PUT_IF_VALUE_MATCH:
314 put(tableName, key, provisionalUpdate.getRight());
315 break;
316 case REMOVE:
317 case REMOVE_IF_VERSION_MATCH:
318 case REMOVE_IF_VALUE_MATCH:
319 remove(tableName, key);
320 break;
321 default:
322 break;
323 }
324 }
325
326 private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
327 String tableName = update.tableName();
328 String key = update.key();
329 Pair<Long, byte[]> provisionalUpdate = getLockMap(tableName).get(key);
330 if (provisionalUpdate == null) {
331 return;
332 }
333 if (Objects.equal(transactionId, provisionalUpdate.getLeft())) {
334 getLockMap(tableName).remove(key);
335 }
336 }
337
338 private boolean isLockedByAnotherTransaction(String tableName, String key, long transactionId) {
339 Pair<Long, byte[]> update = getLockMap(tableName).get(key);
340 return update != null && !Objects.equal(transactionId, update.getLeft());
341 }
342
343 private boolean isLockedForUpdates(String tableName, String key) {
344 return getLockMap(tableName).containsKey(key);
345 }
346
347 private boolean areTransactionsInProgress(String tableName) {
348 return !getLockMap(tableName).isEmpty();
Madan Jampani94c23532015-02-05 17:40:01 -0800349 }
350}