blob: 8943fc877df4d84cf45083b6e0bfb709cdb66d40 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070019import com.google.common.base.Objects;
20import com.google.common.collect.ImmutableList;
21import com.google.common.collect.ImmutableSet;
22import com.google.common.collect.Lists;
23import com.google.common.collect.Maps;
24import net.kuujo.copycat.state.Initializer;
25import net.kuujo.copycat.state.StateContext;
26import org.onosproject.store.service.DatabaseUpdate;
27import org.onosproject.store.service.Transaction;
28import org.onosproject.store.service.Versioned;
29
Madan Jampani94c23532015-02-05 17:40:01 -080030import java.util.Arrays;
31import java.util.Collection;
Madan Jampani63c659f2015-06-11 00:52:58 -070032import java.util.LinkedList;
Madan Jampani94c23532015-02-05 17:40:01 -080033import java.util.Map;
34import java.util.Map.Entry;
Madan Jampani63c659f2015-06-11 00:52:58 -070035import java.util.Queue;
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070036import java.util.Set;
Madan Jampanib5d72d52015-04-03 16:53:50 -070037import java.util.concurrent.atomic.AtomicLong;
Madan Jampani393e0f02015-02-12 07:35:39 +053038import java.util.stream.Collectors;
Madan Jampani94c23532015-02-05 17:40:01 -080039
40/**
41 * Default database state.
Madan Jampani94c23532015-02-05 17:40:01 -080042 */
Madan Jampanibff6d8f2015-03-31 16:53:47 -070043public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
Madan Jampani94c23532015-02-05 17:40:01 -080044 private Long nextVersion;
Madan Jampanib5d72d52015-04-03 16:53:50 -070045 private Map<String, AtomicLong> counters;
Madan Jampani7804c992015-07-20 13:20:19 -070046 private Map<String, Map<String, Versioned<byte[]>>> maps;
Madan Jampani63c659f2015-06-11 00:52:58 -070047 private Map<String, Queue<byte[]>> queues;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070048
49 /**
50 * This locks map has a structure similar to the "tables" map above and
51 * holds all the provisional updates made during a transaction's prepare phase.
52 * The entry value is represented as the tuple: (transactionId, newValue)
53 * If newValue == null that signifies this update is attempting to
54 * delete the existing value.
55 * This map also serves as a lock on the entries that are being updated.
56 * The presence of a entry in this map indicates that element is
57 * participating in a transaction and is currently locked for updates.
58 */
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -070059 private Map<String, Map<String, Update>> locks;
Madan Jampani94c23532015-02-05 17:40:01 -080060
61 @Initializer
62 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -070063 public void init(StateContext<DatabaseState<String, byte[]>> context) {
Madan Jampanib5d72d52015-04-03 16:53:50 -070064 counters = context.get("counters");
65 if (counters == null) {
66 counters = Maps.newConcurrentMap();
67 context.put("counters", counters);
68 }
Madan Jampani7804c992015-07-20 13:20:19 -070069 maps = context.get("maps");
70 if (maps == null) {
71 maps = Maps.newConcurrentMap();
72 context.put("maps", maps);
Madan Jampani94c23532015-02-05 17:40:01 -080073 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -070074 locks = context.get("locks");
75 if (locks == null) {
76 locks = Maps.newConcurrentMap();
77 context.put("locks", locks);
78 }
Madan Jampani63c659f2015-06-11 00:52:58 -070079 queues = context.get("queues");
80 if (queues == null) {
81 queues = Maps.newConcurrentMap();
82 context.put("queues", queues);
83 }
Madan Jampani94c23532015-02-05 17:40:01 -080084 nextVersion = context.get("nextVersion");
85 if (nextVersion == null) {
Sho SHIMIZUc355c932015-09-10 10:51:03 -070086 nextVersion = 0L;
Madan Jampani94c23532015-02-05 17:40:01 -080087 context.put("nextVersion", nextVersion);
88 }
89 }
90
Madan Jampani94c23532015-02-05 17:40:01 -080091 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070092 public Set<String> maps() {
93 return ImmutableSet.copyOf(maps.keySet());
Madan Jampania89f8f92015-04-01 14:39:54 -070094 }
95
96 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070097 public Map<String, Long> counters() {
98 Map<String, Long> counterMap = Maps.newHashMap();
99 counters.forEach((k, v) -> counterMap.put(k, v.get()));
100 return counterMap;
101 }
102
103 @Override
Madan Jampani3ca9cb62015-07-21 11:35:44 -0700104 public int mapSize(String mapName) {
Madan Jampani7804c992015-07-20 13:20:19 -0700105 return getMap(mapName).size();
Madan Jampani94c23532015-02-05 17:40:01 -0800106 }
107
108 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700109 public boolean mapIsEmpty(String mapName) {
110 return getMap(mapName).isEmpty();
Madan Jampani94c23532015-02-05 17:40:01 -0800111 }
112
113 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700114 public boolean mapContainsKey(String mapName, String key) {
115 return getMap(mapName).containsKey(key);
Madan Jampani94c23532015-02-05 17:40:01 -0800116 }
117
118 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700119 public boolean mapContainsValue(String mapName, byte[] value) {
120 return getMap(mapName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
Madan Jampani94c23532015-02-05 17:40:01 -0800121 }
122
123 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700124 public Versioned<byte[]> mapGet(String mapName, String key) {
125 return getMap(mapName).get(key);
Madan Jampani94c23532015-02-05 17:40:01 -0800126 }
127
Madan Jampani94c23532015-02-05 17:40:01 -0800128
129 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700130 public Result<UpdateResult<String, byte[]>> mapUpdate(
131 String mapName,
Madan Jampani346d4f52015-05-04 11:09:39 -0700132 String key,
Madan Jampani7804c992015-07-20 13:20:19 -0700133 Match<byte[]> valueMatch,
134 Match<Long> versionMatch,
Madan Jampani346d4f52015-05-04 11:09:39 -0700135 byte[] value) {
Madan Jampani7804c992015-07-20 13:20:19 -0700136 if (isLockedForUpdates(mapName, key)) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700137 return Result.locked();
Madan Jampani7804c992015-07-20 13:20:19 -0700138 }
139 Versioned<byte[]> currentValue = getMap(mapName).get(key);
140 if (!valueMatch.matches(currentValue == null ? null : currentValue.value()) ||
141 !versionMatch.matches(currentValue == null ? null : currentValue.version())) {
142 return Result.ok(new UpdateResult<>(false, mapName, key, currentValue, currentValue));
Madan Jampani346d4f52015-05-04 11:09:39 -0700143 } else {
Madan Jampani7c4e09a2015-07-22 11:44:29 -0700144 if (value == null) {
145 if (currentValue == null) {
146 return Result.ok(new UpdateResult<>(false, mapName, key, null, null));
147 } else {
148 getMap(mapName).remove(key);
149 return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, null));
150 }
Madan Jampani7804c992015-07-20 13:20:19 -0700151 }
Madan Jampani346d4f52015-05-04 11:09:39 -0700152 Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
Madan Jampani7804c992015-07-20 13:20:19 -0700153 getMap(mapName).put(key, newValue);
154 return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, newValue));
Madan Jampani346d4f52015-05-04 11:09:39 -0700155 }
156 }
157
158 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700159 public Result<Void> mapClear(String mapName) {
160 if (areTransactionsInProgress(mapName)) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700161 return Result.locked();
162 }
Madan Jampani7804c992015-07-20 13:20:19 -0700163 getMap(mapName).clear();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700164 return Result.ok(null);
Madan Jampani94c23532015-02-05 17:40:01 -0800165 }
166
167 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700168 public Set<String> mapKeySet(String mapName) {
169 return ImmutableSet.copyOf(getMap(mapName).keySet());
Madan Jampani94c23532015-02-05 17:40:01 -0800170 }
171
172 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700173 public Collection<Versioned<byte[]>> mapValues(String mapName) {
174 return ImmutableList.copyOf(getMap(mapName).values());
Madan Jampani94c23532015-02-05 17:40:01 -0800175 }
176
177 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700178 public Set<Entry<String, Versioned<byte[]>>> mapEntrySet(String mapName) {
179 return ImmutableSet.copyOf(getMap(mapName)
Madan Jampani393e0f02015-02-12 07:35:39 +0530180 .entrySet()
181 .stream()
Madan Jampani3ca9cb62015-07-21 11:35:44 -0700182 .map(entry -> Maps.immutableEntry(entry.getKey(), entry.getValue()))
Madan Jampani393e0f02015-02-12 07:35:39 +0530183 .collect(Collectors.toSet()));
Madan Jampani94c23532015-02-05 17:40:01 -0800184 }
185
186 @Override
Madan Jampani55ac1342015-05-04 19:05:04 -0700187 public Long counterAddAndGet(String counterName, long delta) {
188 return getCounter(counterName).addAndGet(delta);
Madan Jampani04aeb452015-05-02 16:12:24 -0700189 }
190
191 @Override
192 public Long counterGetAndAdd(String counterName, long delta) {
193 return getCounter(counterName).getAndAdd(delta);
194 }
195
196 @Override
Aaron Kruglikov82fd6322015-10-06 12:02:46 -0700197 public Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
198 return getCounter(counterName).compareAndSet(expectedValue, updateValue);
199 }
200
201 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700202 public Long counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700203 return getCounter(counterName).get();
204 }
205
206 @Override
Madan Jampani63c659f2015-06-11 00:52:58 -0700207 public Long queueSize(String queueName) {
208 return Long.valueOf(getQueue(queueName).size());
209 }
210
211 @Override
212 public byte[] queuePeek(String queueName) {
Madan Jampania6d787b2015-08-11 11:02:02 -0700213 return getQueue(queueName).peek();
Madan Jampani63c659f2015-06-11 00:52:58 -0700214 }
215
216 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700217 public byte[] queuePop(String queueName) {
218 return getQueue(queueName).poll();
Madan Jampani63c659f2015-06-11 00:52:58 -0700219 }
220
221 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700222 public void queuePush(String queueName, byte[] entry) {
223 getQueue(queueName).offer(entry);
Madan Jampani63c659f2015-06-11 00:52:58 -0700224 }
225
226 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700227 public CommitResponse prepareAndCommit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700228 if (prepare(transaction)) {
229 return commit(transaction);
230 }
Madan Jampanibab51a42015-08-10 13:53:35 -0700231 return CommitResponse.failure();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700232 }
233
234 @Override
235 public boolean prepare(Transaction transaction) {
236 if (transaction.updates().stream().anyMatch(update ->
Madan Jampani7804c992015-07-20 13:20:19 -0700237 isLockedByAnotherTransaction(update.mapName(),
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700238 update.key(),
239 transaction.id()))) {
240 return false;
241 }
242
243 if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
244 transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
Madan Jampani94c23532015-02-05 17:40:01 -0800245 return true;
246 }
247 return false;
248 }
249
250 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700251 public CommitResponse commit(Transaction transaction) {
252 return CommitResponse.success(Lists.transform(transaction.updates(),
253 update -> commitProvisionalUpdate(update, transaction.id())));
Madan Jampani94c23532015-02-05 17:40:01 -0800254 }
255
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700256 @Override
257 public boolean rollback(Transaction transaction) {
258 transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
259 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800260 }
261
Madan Jampani7804c992015-07-20 13:20:19 -0700262 private Map<String, Versioned<byte[]>> getMap(String mapName) {
263 return maps.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700264 }
265
Madan Jampani7804c992015-07-20 13:20:19 -0700266 private Map<String, Update> getLockMap(String mapName) {
267 return locks.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700268 }
269
Madan Jampanib5d72d52015-04-03 16:53:50 -0700270 private AtomicLong getCounter(String counterName) {
271 return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
272 }
273
Madan Jampani63c659f2015-06-11 00:52:58 -0700274 private Queue<byte[]> getQueue(String queueName) {
275 return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
276 }
277
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700278 private boolean isUpdatePossible(DatabaseUpdate update) {
Madan Jampani7804c992015-07-20 13:20:19 -0700279 Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
Madan Jampani94c23532015-02-05 17:40:01 -0800280 switch (update.type()) {
281 case PUT:
282 case REMOVE:
283 return true;
284 case PUT_IF_ABSENT:
285 return existingEntry == null;
286 case PUT_IF_VERSION_MATCH:
287 return existingEntry != null && existingEntry.version() == update.currentVersion();
288 case PUT_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700289 return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800290 case REMOVE_IF_VERSION_MATCH:
291 return existingEntry == null || existingEntry.version() == update.currentVersion();
292 case REMOVE_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700293 return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800294 default:
295 throw new IllegalStateException("Unsupported type: " + update.type());
296 }
297 }
298
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700299 private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700300 Map<String, Update> lockMap = getLockMap(update.mapName());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700301 switch (update.type()) {
302 case PUT:
303 case PUT_IF_ABSENT:
304 case PUT_IF_VERSION_MATCH:
305 case PUT_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700306 lockMap.put(update.key(), new Update(transactionId, update.value()));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700307 break;
308 case REMOVE:
309 case REMOVE_IF_VERSION_MATCH:
310 case REMOVE_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700311 lockMap.put(update.key(), new Update(transactionId, null));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700312 break;
313 default:
314 throw new IllegalStateException("Unsupported type: " + update.type());
Madan Jampani94c23532015-02-05 17:40:01 -0800315 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700316 }
317
Madan Jampanibab51a42015-08-10 13:53:35 -0700318 private UpdateResult<String, byte[]> commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700319 String mapName = update.mapName();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700320 String key = update.key();
Madan Jampani7804c992015-07-20 13:20:19 -0700321 Update provisionalUpdate = getLockMap(mapName).get(key);
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700322 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
Madan Jampani7804c992015-07-20 13:20:19 -0700323 getLockMap(mapName).remove(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700324 } else {
Madan Jampanibab51a42015-08-10 13:53:35 -0700325 throw new IllegalStateException("Invalid transaction Id");
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700326 }
Madan Jampanibab51a42015-08-10 13:53:35 -0700327 return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700328 }
329
330 private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700331 String mapName = update.mapName();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700332 String key = update.key();
Madan Jampani7804c992015-07-20 13:20:19 -0700333 Update provisionalUpdate = getLockMap(mapName).get(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700334 if (provisionalUpdate == null) {
335 return;
336 }
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700337 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
Madan Jampani7804c992015-07-20 13:20:19 -0700338 getLockMap(mapName).remove(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700339 }
340 }
341
Madan Jampani7804c992015-07-20 13:20:19 -0700342 private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) {
343 Update update = getLockMap(mapName).get(key);
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700344 return update != null && !Objects.equal(transactionId, update.transactionId());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700345 }
346
Madan Jampani7804c992015-07-20 13:20:19 -0700347 private boolean isLockedForUpdates(String mapName, String key) {
348 return getLockMap(mapName).containsKey(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700349 }
350
Madan Jampani7804c992015-07-20 13:20:19 -0700351 private boolean areTransactionsInProgress(String mapName) {
352 return !getLockMap(mapName).isEmpty();
Madan Jampani94c23532015-02-05 17:40:01 -0800353 }
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700354
355 private class Update {
356 private final long transactionId;
357 private final byte[] value;
358
359 public Update(long txId, byte[] value) {
360 this.transactionId = txId;
361 this.value = value;
362 }
363
364 public long transactionId() {
365 return this.transactionId;
366 }
367
368 public byte[] value() {
369 return this.value;
370 }
371 }
Madan Jampani94c23532015-02-05 17:40:01 -0800372}