blob: 3bb413cd38962c5ceef4a7a1f854aac04f446431 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070019import com.google.common.base.Objects;
20import com.google.common.collect.ImmutableList;
21import com.google.common.collect.ImmutableSet;
22import com.google.common.collect.Lists;
23import com.google.common.collect.Maps;
Madan Jampanif2f086c2016-01-13 16:15:39 -080024
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070025import net.kuujo.copycat.state.Initializer;
26import net.kuujo.copycat.state.StateContext;
Madan Jampanif2f086c2016-01-13 16:15:39 -080027
28import org.onlab.util.Match;
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070029import org.onosproject.store.service.DatabaseUpdate;
30import org.onosproject.store.service.Transaction;
31import org.onosproject.store.service.Versioned;
32
Madan Jampani94c23532015-02-05 17:40:01 -080033import java.util.Arrays;
34import java.util.Collection;
Madan Jampani63c659f2015-06-11 00:52:58 -070035import java.util.LinkedList;
Madan Jampani94c23532015-02-05 17:40:01 -080036import java.util.Map;
37import java.util.Map.Entry;
Madan Jampani63c659f2015-06-11 00:52:58 -070038import java.util.Queue;
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070039import java.util.Set;
Madan Jampanib5d72d52015-04-03 16:53:50 -070040import java.util.concurrent.atomic.AtomicLong;
Madan Jampani393e0f02015-02-12 07:35:39 +053041import java.util.stream.Collectors;
Madan Jampani94c23532015-02-05 17:40:01 -080042
43/**
44 * Default database state.
Madan Jampani94c23532015-02-05 17:40:01 -080045 */
Madan Jampanibff6d8f2015-03-31 16:53:47 -070046public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
Madan Jampani94c23532015-02-05 17:40:01 -080047 private Long nextVersion;
Madan Jampanib5d72d52015-04-03 16:53:50 -070048 private Map<String, AtomicLong> counters;
Madan Jampani7804c992015-07-20 13:20:19 -070049 private Map<String, Map<String, Versioned<byte[]>>> maps;
Madan Jampani63c659f2015-06-11 00:52:58 -070050 private Map<String, Queue<byte[]>> queues;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070051
52 /**
53 * This locks map has a structure similar to the "tables" map above and
54 * holds all the provisional updates made during a transaction's prepare phase.
55 * The entry value is represented as the tuple: (transactionId, newValue)
56 * If newValue == null that signifies this update is attempting to
57 * delete the existing value.
58 * This map also serves as a lock on the entries that are being updated.
59 * The presence of a entry in this map indicates that element is
60 * participating in a transaction and is currently locked for updates.
61 */
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -070062 private Map<String, Map<String, Update>> locks;
Madan Jampani94c23532015-02-05 17:40:01 -080063
64 @Initializer
65 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -070066 public void init(StateContext<DatabaseState<String, byte[]>> context) {
Madan Jampanib5d72d52015-04-03 16:53:50 -070067 counters = context.get("counters");
68 if (counters == null) {
69 counters = Maps.newConcurrentMap();
70 context.put("counters", counters);
71 }
Madan Jampani7804c992015-07-20 13:20:19 -070072 maps = context.get("maps");
73 if (maps == null) {
74 maps = Maps.newConcurrentMap();
75 context.put("maps", maps);
Madan Jampani94c23532015-02-05 17:40:01 -080076 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -070077 locks = context.get("locks");
78 if (locks == null) {
79 locks = Maps.newConcurrentMap();
80 context.put("locks", locks);
81 }
Madan Jampani63c659f2015-06-11 00:52:58 -070082 queues = context.get("queues");
83 if (queues == null) {
84 queues = Maps.newConcurrentMap();
85 context.put("queues", queues);
86 }
Madan Jampani94c23532015-02-05 17:40:01 -080087 nextVersion = context.get("nextVersion");
88 if (nextVersion == null) {
Sho SHIMIZUc355c932015-09-10 10:51:03 -070089 nextVersion = 0L;
Madan Jampani94c23532015-02-05 17:40:01 -080090 context.put("nextVersion", nextVersion);
91 }
92 }
93
Madan Jampani94c23532015-02-05 17:40:01 -080094 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070095 public Set<String> maps() {
96 return ImmutableSet.copyOf(maps.keySet());
Madan Jampania89f8f92015-04-01 14:39:54 -070097 }
98
99 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -0700100 public Map<String, Long> counters() {
101 Map<String, Long> counterMap = Maps.newHashMap();
102 counters.forEach((k, v) -> counterMap.put(k, v.get()));
103 return counterMap;
104 }
105
106 @Override
Madan Jampani3ca9cb62015-07-21 11:35:44 -0700107 public int mapSize(String mapName) {
Madan Jampani7804c992015-07-20 13:20:19 -0700108 return getMap(mapName).size();
Madan Jampani94c23532015-02-05 17:40:01 -0800109 }
110
111 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700112 public boolean mapIsEmpty(String mapName) {
113 return getMap(mapName).isEmpty();
Madan Jampani94c23532015-02-05 17:40:01 -0800114 }
115
116 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700117 public boolean mapContainsKey(String mapName, String key) {
118 return getMap(mapName).containsKey(key);
Madan Jampani94c23532015-02-05 17:40:01 -0800119 }
120
121 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700122 public boolean mapContainsValue(String mapName, byte[] value) {
123 return getMap(mapName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
Madan Jampani94c23532015-02-05 17:40:01 -0800124 }
125
126 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700127 public Versioned<byte[]> mapGet(String mapName, String key) {
128 return getMap(mapName).get(key);
Madan Jampani94c23532015-02-05 17:40:01 -0800129 }
130
Madan Jampani94c23532015-02-05 17:40:01 -0800131
132 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700133 public Result<UpdateResult<String, byte[]>> mapUpdate(
134 String mapName,
Madan Jampani346d4f52015-05-04 11:09:39 -0700135 String key,
Madan Jampani7804c992015-07-20 13:20:19 -0700136 Match<byte[]> valueMatch,
137 Match<Long> versionMatch,
Madan Jampani346d4f52015-05-04 11:09:39 -0700138 byte[] value) {
Madan Jampani7804c992015-07-20 13:20:19 -0700139 if (isLockedForUpdates(mapName, key)) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700140 return Result.locked();
Madan Jampani7804c992015-07-20 13:20:19 -0700141 }
142 Versioned<byte[]> currentValue = getMap(mapName).get(key);
143 if (!valueMatch.matches(currentValue == null ? null : currentValue.value()) ||
144 !versionMatch.matches(currentValue == null ? null : currentValue.version())) {
145 return Result.ok(new UpdateResult<>(false, mapName, key, currentValue, currentValue));
Madan Jampani346d4f52015-05-04 11:09:39 -0700146 } else {
Madan Jampani7c4e09a2015-07-22 11:44:29 -0700147 if (value == null) {
148 if (currentValue == null) {
149 return Result.ok(new UpdateResult<>(false, mapName, key, null, null));
150 } else {
151 getMap(mapName).remove(key);
152 return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, null));
153 }
Madan Jampani7804c992015-07-20 13:20:19 -0700154 }
Madan Jampani346d4f52015-05-04 11:09:39 -0700155 Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
Madan Jampani7804c992015-07-20 13:20:19 -0700156 getMap(mapName).put(key, newValue);
157 return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, newValue));
Madan Jampani346d4f52015-05-04 11:09:39 -0700158 }
159 }
160
161 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700162 public Result<Void> mapClear(String mapName) {
163 if (areTransactionsInProgress(mapName)) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700164 return Result.locked();
165 }
Madan Jampani7804c992015-07-20 13:20:19 -0700166 getMap(mapName).clear();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700167 return Result.ok(null);
Madan Jampani94c23532015-02-05 17:40:01 -0800168 }
169
170 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700171 public Set<String> mapKeySet(String mapName) {
172 return ImmutableSet.copyOf(getMap(mapName).keySet());
Madan Jampani94c23532015-02-05 17:40:01 -0800173 }
174
175 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700176 public Collection<Versioned<byte[]>> mapValues(String mapName) {
177 return ImmutableList.copyOf(getMap(mapName).values());
Madan Jampani94c23532015-02-05 17:40:01 -0800178 }
179
180 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700181 public Set<Entry<String, Versioned<byte[]>>> mapEntrySet(String mapName) {
182 return ImmutableSet.copyOf(getMap(mapName)
Madan Jampani393e0f02015-02-12 07:35:39 +0530183 .entrySet()
184 .stream()
Madan Jampani3ca9cb62015-07-21 11:35:44 -0700185 .map(entry -> Maps.immutableEntry(entry.getKey(), entry.getValue()))
Madan Jampani393e0f02015-02-12 07:35:39 +0530186 .collect(Collectors.toSet()));
Madan Jampani94c23532015-02-05 17:40:01 -0800187 }
188
189 @Override
Madan Jampani55ac1342015-05-04 19:05:04 -0700190 public Long counterAddAndGet(String counterName, long delta) {
191 return getCounter(counterName).addAndGet(delta);
Madan Jampani04aeb452015-05-02 16:12:24 -0700192 }
193
194 @Override
195 public Long counterGetAndAdd(String counterName, long delta) {
196 return getCounter(counterName).getAndAdd(delta);
197 }
198
199 @Override
Aaron Kruglikov82fd6322015-10-06 12:02:46 -0700200 public Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
201 return getCounter(counterName).compareAndSet(expectedValue, updateValue);
202 }
203
204 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700205 public Long counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700206 return getCounter(counterName).get();
207 }
208
209 @Override
Madan Jampani63c659f2015-06-11 00:52:58 -0700210 public Long queueSize(String queueName) {
211 return Long.valueOf(getQueue(queueName).size());
212 }
213
214 @Override
215 public byte[] queuePeek(String queueName) {
Madan Jampania6d787b2015-08-11 11:02:02 -0700216 return getQueue(queueName).peek();
Madan Jampani63c659f2015-06-11 00:52:58 -0700217 }
218
219 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700220 public byte[] queuePop(String queueName) {
221 return getQueue(queueName).poll();
Madan Jampani63c659f2015-06-11 00:52:58 -0700222 }
223
224 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700225 public void queuePush(String queueName, byte[] entry) {
226 getQueue(queueName).offer(entry);
Madan Jampani63c659f2015-06-11 00:52:58 -0700227 }
228
229 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700230 public CommitResponse prepareAndCommit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700231 if (prepare(transaction)) {
232 return commit(transaction);
233 }
Madan Jampanibab51a42015-08-10 13:53:35 -0700234 return CommitResponse.failure();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700235 }
236
237 @Override
238 public boolean prepare(Transaction transaction) {
239 if (transaction.updates().stream().anyMatch(update ->
Madan Jampani7804c992015-07-20 13:20:19 -0700240 isLockedByAnotherTransaction(update.mapName(),
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700241 update.key(),
242 transaction.id()))) {
243 return false;
244 }
245
246 if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
247 transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
Madan Jampani94c23532015-02-05 17:40:01 -0800248 return true;
249 }
250 return false;
251 }
252
253 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700254 public CommitResponse commit(Transaction transaction) {
255 return CommitResponse.success(Lists.transform(transaction.updates(),
256 update -> commitProvisionalUpdate(update, transaction.id())));
Madan Jampani94c23532015-02-05 17:40:01 -0800257 }
258
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700259 @Override
260 public boolean rollback(Transaction transaction) {
261 transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
262 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800263 }
264
Madan Jampani7804c992015-07-20 13:20:19 -0700265 private Map<String, Versioned<byte[]>> getMap(String mapName) {
266 return maps.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700267 }
268
Madan Jampani7804c992015-07-20 13:20:19 -0700269 private Map<String, Update> getLockMap(String mapName) {
270 return locks.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700271 }
272
Madan Jampanib5d72d52015-04-03 16:53:50 -0700273 private AtomicLong getCounter(String counterName) {
274 return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
275 }
276
Madan Jampani63c659f2015-06-11 00:52:58 -0700277 private Queue<byte[]> getQueue(String queueName) {
278 return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
279 }
280
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700281 private boolean isUpdatePossible(DatabaseUpdate update) {
Madan Jampani7804c992015-07-20 13:20:19 -0700282 Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
Madan Jampani94c23532015-02-05 17:40:01 -0800283 switch (update.type()) {
284 case PUT:
285 case REMOVE:
286 return true;
287 case PUT_IF_ABSENT:
288 return existingEntry == null;
289 case PUT_IF_VERSION_MATCH:
290 return existingEntry != null && existingEntry.version() == update.currentVersion();
291 case PUT_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700292 return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800293 case REMOVE_IF_VERSION_MATCH:
294 return existingEntry == null || existingEntry.version() == update.currentVersion();
295 case REMOVE_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700296 return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800297 default:
298 throw new IllegalStateException("Unsupported type: " + update.type());
299 }
300 }
301
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700302 private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700303 Map<String, Update> lockMap = getLockMap(update.mapName());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700304 switch (update.type()) {
305 case PUT:
306 case PUT_IF_ABSENT:
307 case PUT_IF_VERSION_MATCH:
308 case PUT_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700309 lockMap.put(update.key(), new Update(transactionId, update.value()));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700310 break;
311 case REMOVE:
312 case REMOVE_IF_VERSION_MATCH:
313 case REMOVE_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700314 lockMap.put(update.key(), new Update(transactionId, null));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700315 break;
316 default:
317 throw new IllegalStateException("Unsupported type: " + update.type());
Madan Jampani94c23532015-02-05 17:40:01 -0800318 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700319 }
320
Madan Jampanibab51a42015-08-10 13:53:35 -0700321 private UpdateResult<String, byte[]> commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700322 String mapName = update.mapName();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700323 String key = update.key();
Madan Jampani7804c992015-07-20 13:20:19 -0700324 Update provisionalUpdate = getLockMap(mapName).get(key);
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700325 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
Madan Jampani7804c992015-07-20 13:20:19 -0700326 getLockMap(mapName).remove(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700327 } else {
Madan Jampanibab51a42015-08-10 13:53:35 -0700328 throw new IllegalStateException("Invalid transaction Id");
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700329 }
Madan Jampanibab51a42015-08-10 13:53:35 -0700330 return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700331 }
332
333 private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700334 String mapName = update.mapName();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700335 String key = update.key();
Madan Jampani7804c992015-07-20 13:20:19 -0700336 Update provisionalUpdate = getLockMap(mapName).get(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700337 if (provisionalUpdate == null) {
338 return;
339 }
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700340 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
Madan Jampani7804c992015-07-20 13:20:19 -0700341 getLockMap(mapName).remove(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700342 }
343 }
344
Madan Jampani7804c992015-07-20 13:20:19 -0700345 private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) {
346 Update update = getLockMap(mapName).get(key);
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700347 return update != null && !Objects.equal(transactionId, update.transactionId());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700348 }
349
Madan Jampani7804c992015-07-20 13:20:19 -0700350 private boolean isLockedForUpdates(String mapName, String key) {
351 return getLockMap(mapName).containsKey(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700352 }
353
Madan Jampani7804c992015-07-20 13:20:19 -0700354 private boolean areTransactionsInProgress(String mapName) {
355 return !getLockMap(mapName).isEmpty();
Madan Jampani94c23532015-02-05 17:40:01 -0800356 }
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700357
358 private class Update {
359 private final long transactionId;
360 private final byte[] value;
361
362 public Update(long txId, byte[] value) {
363 this.transactionId = txId;
364 this.value = value;
365 }
366
367 public long transactionId() {
368 return this.transactionId;
369 }
370
371 public byte[] value() {
372 return this.value;
373 }
374 }
Madan Jampani94c23532015-02-05 17:40:01 -0800375}