blob: bd9cf6983e2a9c9445e410e4bce4b465c6871a0a [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Arrays;
20import java.util.Collection;
Madan Jampania89f8f92015-04-01 14:39:54 -070021import java.util.HashSet;
Madan Jampani63c659f2015-06-11 00:52:58 -070022import java.util.LinkedList;
Madan Jampani94c23532015-02-05 17:40:01 -080023import java.util.Map;
24import java.util.Map.Entry;
Madan Jampani63c659f2015-06-11 00:52:58 -070025import java.util.Queue;
Madan Jampanib5d72d52015-04-03 16:53:50 -070026import java.util.concurrent.atomic.AtomicLong;
Madan Jampani393e0f02015-02-12 07:35:39 +053027import java.util.stream.Collectors;
Madan Jampani94c23532015-02-05 17:40:01 -080028import java.util.Set;
29
Madan Jampani63c659f2015-06-11 00:52:58 -070030import org.onosproject.cluster.NodeId;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070031import org.onosproject.store.service.DatabaseUpdate;
32import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053033import org.onosproject.store.service.Versioned;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070034import org.onosproject.store.service.DatabaseUpdate.Type;
Madan Jampani346d4f52015-05-04 11:09:39 -070035
Madan Jampanibff6d8f2015-03-31 16:53:47 -070036import com.google.common.base.Objects;
Madan Jampani393e0f02015-02-12 07:35:39 +053037import com.google.common.collect.ImmutableList;
38import com.google.common.collect.ImmutableSet;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070039import com.google.common.collect.Maps;
Madan Jampani393e0f02015-02-12 07:35:39 +053040
Madan Jampani94c23532015-02-05 17:40:01 -080041import net.kuujo.copycat.state.Initializer;
42import net.kuujo.copycat.state.StateContext;
43
44/**
45 * Default database state.
Madan Jampani94c23532015-02-05 17:40:01 -080046 */
Madan Jampanibff6d8f2015-03-31 16:53:47 -070047public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
Madan Jampani94c23532015-02-05 17:40:01 -080048 private Long nextVersion;
Madan Jampanib5d72d52015-04-03 16:53:50 -070049 private Map<String, AtomicLong> counters;
Madan Jampani7804c992015-07-20 13:20:19 -070050 private Map<String, Map<String, Versioned<byte[]>>> maps;
Madan Jampani63c659f2015-06-11 00:52:58 -070051 private Map<String, Queue<byte[]>> queues;
52 private Map<String, Set<NodeId>> queueUpdateNotificationTargets;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070053
54 /**
55 * This locks map has a structure similar to the "tables" map above and
56 * holds all the provisional updates made during a transaction's prepare phase.
57 * The entry value is represented as the tuple: (transactionId, newValue)
58 * If newValue == null that signifies this update is attempting to
59 * delete the existing value.
60 * This map also serves as a lock on the entries that are being updated.
61 * The presence of a entry in this map indicates that element is
62 * participating in a transaction and is currently locked for updates.
63 */
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -070064 private Map<String, Map<String, Update>> locks;
Madan Jampani94c23532015-02-05 17:40:01 -080065
66 @Initializer
67 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -070068 public void init(StateContext<DatabaseState<String, byte[]>> context) {
Madan Jampanib5d72d52015-04-03 16:53:50 -070069 counters = context.get("counters");
70 if (counters == null) {
71 counters = Maps.newConcurrentMap();
72 context.put("counters", counters);
73 }
Madan Jampani7804c992015-07-20 13:20:19 -070074 maps = context.get("maps");
75 if (maps == null) {
76 maps = Maps.newConcurrentMap();
77 context.put("maps", maps);
Madan Jampani94c23532015-02-05 17:40:01 -080078 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -070079 locks = context.get("locks");
80 if (locks == null) {
81 locks = Maps.newConcurrentMap();
82 context.put("locks", locks);
83 }
Madan Jampani63c659f2015-06-11 00:52:58 -070084 queues = context.get("queues");
85 if (queues == null) {
86 queues = Maps.newConcurrentMap();
87 context.put("queues", queues);
88 }
89 queueUpdateNotificationTargets = context.get("queueUpdateNotificationTargets");
90 if (queueUpdateNotificationTargets == null) {
91 queueUpdateNotificationTargets = Maps.newConcurrentMap();
92 context.put("queueUpdateNotificationTargets", queueUpdateNotificationTargets);
93 }
Madan Jampani94c23532015-02-05 17:40:01 -080094 nextVersion = context.get("nextVersion");
95 if (nextVersion == null) {
96 nextVersion = new Long(0);
97 context.put("nextVersion", nextVersion);
98 }
99 }
100
Madan Jampani94c23532015-02-05 17:40:01 -0800101 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700102 public Set<String> maps() {
103 return ImmutableSet.copyOf(maps.keySet());
Madan Jampania89f8f92015-04-01 14:39:54 -0700104 }
105
106 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -0700107 public Map<String, Long> counters() {
108 Map<String, Long> counterMap = Maps.newHashMap();
109 counters.forEach((k, v) -> counterMap.put(k, v.get()));
110 return counterMap;
111 }
112
113 @Override
Madan Jampani3ca9cb62015-07-21 11:35:44 -0700114 public int mapSize(String mapName) {
Madan Jampani7804c992015-07-20 13:20:19 -0700115 return getMap(mapName).size();
Madan Jampani94c23532015-02-05 17:40:01 -0800116 }
117
118 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700119 public boolean mapIsEmpty(String mapName) {
120 return getMap(mapName).isEmpty();
Madan Jampani94c23532015-02-05 17:40:01 -0800121 }
122
123 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700124 public boolean mapContainsKey(String mapName, String key) {
125 return getMap(mapName).containsKey(key);
Madan Jampani94c23532015-02-05 17:40:01 -0800126 }
127
128 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700129 public boolean mapContainsValue(String mapName, byte[] value) {
130 return getMap(mapName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
Madan Jampani94c23532015-02-05 17:40:01 -0800131 }
132
133 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700134 public Versioned<byte[]> mapGet(String mapName, String key) {
135 return getMap(mapName).get(key);
Madan Jampani94c23532015-02-05 17:40:01 -0800136 }
137
Madan Jampani94c23532015-02-05 17:40:01 -0800138
139 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700140 public Result<UpdateResult<String, byte[]>> mapUpdate(
141 String mapName,
Madan Jampani346d4f52015-05-04 11:09:39 -0700142 String key,
Madan Jampani7804c992015-07-20 13:20:19 -0700143 Match<byte[]> valueMatch,
144 Match<Long> versionMatch,
Madan Jampani346d4f52015-05-04 11:09:39 -0700145 byte[] value) {
Madan Jampani7804c992015-07-20 13:20:19 -0700146 if (isLockedForUpdates(mapName, key)) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700147 return Result.locked();
Madan Jampani7804c992015-07-20 13:20:19 -0700148 }
149 Versioned<byte[]> currentValue = getMap(mapName).get(key);
150 if (!valueMatch.matches(currentValue == null ? null : currentValue.value()) ||
151 !versionMatch.matches(currentValue == null ? null : currentValue.version())) {
152 return Result.ok(new UpdateResult<>(false, mapName, key, currentValue, currentValue));
Madan Jampani346d4f52015-05-04 11:09:39 -0700153 } else {
Madan Jampani7c4e09a2015-07-22 11:44:29 -0700154 if (value == null) {
155 if (currentValue == null) {
156 return Result.ok(new UpdateResult<>(false, mapName, key, null, null));
157 } else {
158 getMap(mapName).remove(key);
159 return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, null));
160 }
Madan Jampani7804c992015-07-20 13:20:19 -0700161 }
Madan Jampani346d4f52015-05-04 11:09:39 -0700162 Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
Madan Jampani7804c992015-07-20 13:20:19 -0700163 getMap(mapName).put(key, newValue);
164 return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, newValue));
Madan Jampani346d4f52015-05-04 11:09:39 -0700165 }
166 }
167
168 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700169 public Result<Void> mapClear(String mapName) {
170 if (areTransactionsInProgress(mapName)) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700171 return Result.locked();
172 }
Madan Jampani7804c992015-07-20 13:20:19 -0700173 getMap(mapName).clear();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700174 return Result.ok(null);
Madan Jampani94c23532015-02-05 17:40:01 -0800175 }
176
177 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700178 public Set<String> mapKeySet(String mapName) {
179 return ImmutableSet.copyOf(getMap(mapName).keySet());
Madan Jampani94c23532015-02-05 17:40:01 -0800180 }
181
182 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700183 public Collection<Versioned<byte[]>> mapValues(String mapName) {
184 return ImmutableList.copyOf(getMap(mapName).values());
Madan Jampani94c23532015-02-05 17:40:01 -0800185 }
186
187 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700188 public Set<Entry<String, Versioned<byte[]>>> mapEntrySet(String mapName) {
189 return ImmutableSet.copyOf(getMap(mapName)
Madan Jampani393e0f02015-02-12 07:35:39 +0530190 .entrySet()
191 .stream()
Madan Jampani3ca9cb62015-07-21 11:35:44 -0700192 .map(entry -> Maps.immutableEntry(entry.getKey(), entry.getValue()))
Madan Jampani393e0f02015-02-12 07:35:39 +0530193 .collect(Collectors.toSet()));
Madan Jampani94c23532015-02-05 17:40:01 -0800194 }
195
196 @Override
Madan Jampani55ac1342015-05-04 19:05:04 -0700197 public Long counterAddAndGet(String counterName, long delta) {
198 return getCounter(counterName).addAndGet(delta);
Madan Jampani04aeb452015-05-02 16:12:24 -0700199 }
200
201 @Override
202 public Long counterGetAndAdd(String counterName, long delta) {
203 return getCounter(counterName).getAndAdd(delta);
204 }
205
206 @Override
207 public Long counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700208 return getCounter(counterName).get();
209 }
210
211 @Override
Madan Jampani63c659f2015-06-11 00:52:58 -0700212 public Long queueSize(String queueName) {
213 return Long.valueOf(getQueue(queueName).size());
214 }
215
216 @Override
217 public byte[] queuePeek(String queueName) {
218 Queue<byte[]> queue = getQueue(queueName);
219 return queue.peek();
220 }
221
222 @Override
223 public byte[] queuePop(String queueName, NodeId requestor) {
224 Queue<byte[]> queue = getQueue(queueName);
225 if (queue.size() == 0 && requestor != null) {
226 getQueueUpdateNotificationTargets(queueName).add(requestor);
227 return null;
228 } else {
229 return queue.remove();
230 }
231 }
232
233 @Override
234 public Set<NodeId> queuePush(String queueName, byte[] entry) {
235 getQueue(queueName).add(entry);
236 Set<NodeId> notifyList = ImmutableSet.copyOf(getQueueUpdateNotificationTargets(queueName));
237 getQueueUpdateNotificationTargets(queueName).clear();
238 return notifyList;
239 }
240
241 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700242 public boolean prepareAndCommit(Transaction transaction) {
243 if (prepare(transaction)) {
244 return commit(transaction);
245 }
246 return false;
247 }
248
249 @Override
250 public boolean prepare(Transaction transaction) {
251 if (transaction.updates().stream().anyMatch(update ->
Madan Jampani7804c992015-07-20 13:20:19 -0700252 isLockedByAnotherTransaction(update.mapName(),
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700253 update.key(),
254 transaction.id()))) {
255 return false;
256 }
257
258 if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
259 transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
Madan Jampani94c23532015-02-05 17:40:01 -0800260 return true;
261 }
262 return false;
263 }
264
265 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700266 public boolean commit(Transaction transaction) {
267 transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
268 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800269 }
270
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700271 @Override
272 public boolean rollback(Transaction transaction) {
273 transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
274 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800275 }
276
Madan Jampani7804c992015-07-20 13:20:19 -0700277 private Map<String, Versioned<byte[]>> getMap(String mapName) {
278 return maps.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700279 }
280
Madan Jampani7804c992015-07-20 13:20:19 -0700281 private Map<String, Update> getLockMap(String mapName) {
282 return locks.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700283 }
284
Madan Jampanib5d72d52015-04-03 16:53:50 -0700285 private AtomicLong getCounter(String counterName) {
286 return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
287 }
288
Madan Jampani63c659f2015-06-11 00:52:58 -0700289 private Queue<byte[]> getQueue(String queueName) {
290 return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
291 }
292
293 private Set<NodeId> getQueueUpdateNotificationTargets(String queueName) {
294 return queueUpdateNotificationTargets.computeIfAbsent(queueName, name -> new HashSet<>());
295 }
296
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700297 private boolean isUpdatePossible(DatabaseUpdate update) {
Madan Jampani7804c992015-07-20 13:20:19 -0700298 Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
Madan Jampani94c23532015-02-05 17:40:01 -0800299 switch (update.type()) {
300 case PUT:
301 case REMOVE:
302 return true;
303 case PUT_IF_ABSENT:
304 return existingEntry == null;
305 case PUT_IF_VERSION_MATCH:
306 return existingEntry != null && existingEntry.version() == update.currentVersion();
307 case PUT_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700308 return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800309 case REMOVE_IF_VERSION_MATCH:
310 return existingEntry == null || existingEntry.version() == update.currentVersion();
311 case REMOVE_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700312 return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800313 default:
314 throw new IllegalStateException("Unsupported type: " + update.type());
315 }
316 }
317
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700318 private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700319 Map<String, Update> lockMap = getLockMap(update.mapName());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700320 switch (update.type()) {
321 case PUT:
322 case PUT_IF_ABSENT:
323 case PUT_IF_VERSION_MATCH:
324 case PUT_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700325 lockMap.put(update.key(), new Update(transactionId, update.value()));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700326 break;
327 case REMOVE:
328 case REMOVE_IF_VERSION_MATCH:
329 case REMOVE_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700330 lockMap.put(update.key(), new Update(transactionId, null));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700331 break;
332 default:
333 throw new IllegalStateException("Unsupported type: " + update.type());
Madan Jampani94c23532015-02-05 17:40:01 -0800334 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700335 }
336
337 private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700338 String mapName = update.mapName();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700339 String key = update.key();
340 Type type = update.type();
Madan Jampani7804c992015-07-20 13:20:19 -0700341 Update provisionalUpdate = getLockMap(mapName).get(key);
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700342 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
Madan Jampani7804c992015-07-20 13:20:19 -0700343 getLockMap(mapName).remove(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700344 } else {
345 return;
346 }
347
348 switch (type) {
349 case PUT:
350 case PUT_IF_ABSENT:
351 case PUT_IF_VERSION_MATCH:
352 case PUT_IF_VALUE_MATCH:
Madan Jampani7804c992015-07-20 13:20:19 -0700353 mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700354 break;
355 case REMOVE:
356 case REMOVE_IF_VERSION_MATCH:
357 case REMOVE_IF_VALUE_MATCH:
Madan Jampani7804c992015-07-20 13:20:19 -0700358 mapUpdate(mapName, key, Match.any(), Match.any(), null);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700359 break;
360 default:
361 break;
362 }
363 }
364
365 private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700366 String mapName = update.mapName();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700367 String key = update.key();
Madan Jampani7804c992015-07-20 13:20:19 -0700368 Update provisionalUpdate = getLockMap(mapName).get(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700369 if (provisionalUpdate == null) {
370 return;
371 }
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700372 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
Madan Jampani7804c992015-07-20 13:20:19 -0700373 getLockMap(mapName).remove(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700374 }
375 }
376
Madan Jampani7804c992015-07-20 13:20:19 -0700377 private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) {
378 Update update = getLockMap(mapName).get(key);
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700379 return update != null && !Objects.equal(transactionId, update.transactionId());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700380 }
381
Madan Jampani7804c992015-07-20 13:20:19 -0700382 private boolean isLockedForUpdates(String mapName, String key) {
383 return getLockMap(mapName).containsKey(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700384 }
385
Madan Jampani7804c992015-07-20 13:20:19 -0700386 private boolean areTransactionsInProgress(String mapName) {
387 return !getLockMap(mapName).isEmpty();
Madan Jampani94c23532015-02-05 17:40:01 -0800388 }
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700389
390 private class Update {
391 private final long transactionId;
392 private final byte[] value;
393
394 public Update(long txId, byte[] value) {
395 this.transactionId = txId;
396 this.value = value;
397 }
398
399 public long transactionId() {
400 return this.transactionId;
401 }
402
403 public byte[] value() {
404 return this.value;
405 }
406 }
Madan Jampani94c23532015-02-05 17:40:01 -0800407}