blob: 2a37e5300ef56d21de8dfd13258995d35a962597 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Arrays;
20import java.util.Collection;
Madan Jampania89f8f92015-04-01 14:39:54 -070021import java.util.HashSet;
Madan Jampani63c659f2015-06-11 00:52:58 -070022import java.util.LinkedList;
Madan Jampani94c23532015-02-05 17:40:01 -080023import java.util.Map;
24import java.util.Map.Entry;
Madan Jampani63c659f2015-06-11 00:52:58 -070025import java.util.Queue;
Madan Jampanib5d72d52015-04-03 16:53:50 -070026import java.util.concurrent.atomic.AtomicLong;
Madan Jampani393e0f02015-02-12 07:35:39 +053027import java.util.stream.Collectors;
Madan Jampani94c23532015-02-05 17:40:01 -080028import java.util.Set;
29
Madan Jampani393e0f02015-02-12 07:35:39 +053030import org.apache.commons.lang3.tuple.Pair;
Madan Jampani63c659f2015-06-11 00:52:58 -070031import org.onosproject.cluster.NodeId;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070032import org.onosproject.store.service.DatabaseUpdate;
33import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053034import org.onosproject.store.service.Versioned;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070035import org.onosproject.store.service.DatabaseUpdate.Type;
Madan Jampani346d4f52015-05-04 11:09:39 -070036
Madan Jampanibff6d8f2015-03-31 16:53:47 -070037import com.google.common.base.Objects;
Madan Jampani393e0f02015-02-12 07:35:39 +053038import com.google.common.collect.ImmutableList;
39import com.google.common.collect.ImmutableSet;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070040import com.google.common.collect.Maps;
Madan Jampani393e0f02015-02-12 07:35:39 +053041
Madan Jampani94c23532015-02-05 17:40:01 -080042import net.kuujo.copycat.state.Initializer;
43import net.kuujo.copycat.state.StateContext;
44
45/**
46 * Default database state.
Madan Jampani94c23532015-02-05 17:40:01 -080047 */
Madan Jampanibff6d8f2015-03-31 16:53:47 -070048public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
Madan Jampani94c23532015-02-05 17:40:01 -080049 private Long nextVersion;
Madan Jampanib5d72d52015-04-03 16:53:50 -070050 private Map<String, AtomicLong> counters;
Madan Jampani7804c992015-07-20 13:20:19 -070051 private Map<String, Map<String, Versioned<byte[]>>> maps;
Madan Jampani63c659f2015-06-11 00:52:58 -070052 private Map<String, Queue<byte[]>> queues;
53 private Map<String, Set<NodeId>> queueUpdateNotificationTargets;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070054
55 /**
56 * This locks map has a structure similar to the "tables" map above and
57 * holds all the provisional updates made during a transaction's prepare phase.
58 * The entry value is represented as the tuple: (transactionId, newValue)
59 * If newValue == null that signifies this update is attempting to
60 * delete the existing value.
61 * This map also serves as a lock on the entries that are being updated.
62 * The presence of a entry in this map indicates that element is
63 * participating in a transaction and is currently locked for updates.
64 */
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -070065 private Map<String, Map<String, Update>> locks;
Madan Jampani94c23532015-02-05 17:40:01 -080066
67 @Initializer
68 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -070069 public void init(StateContext<DatabaseState<String, byte[]>> context) {
Madan Jampanib5d72d52015-04-03 16:53:50 -070070 counters = context.get("counters");
71 if (counters == null) {
72 counters = Maps.newConcurrentMap();
73 context.put("counters", counters);
74 }
Madan Jampani7804c992015-07-20 13:20:19 -070075 maps = context.get("maps");
76 if (maps == null) {
77 maps = Maps.newConcurrentMap();
78 context.put("maps", maps);
Madan Jampani94c23532015-02-05 17:40:01 -080079 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -070080 locks = context.get("locks");
81 if (locks == null) {
82 locks = Maps.newConcurrentMap();
83 context.put("locks", locks);
84 }
Madan Jampani63c659f2015-06-11 00:52:58 -070085 queues = context.get("queues");
86 if (queues == null) {
87 queues = Maps.newConcurrentMap();
88 context.put("queues", queues);
89 }
90 queueUpdateNotificationTargets = context.get("queueUpdateNotificationTargets");
91 if (queueUpdateNotificationTargets == null) {
92 queueUpdateNotificationTargets = Maps.newConcurrentMap();
93 context.put("queueUpdateNotificationTargets", queueUpdateNotificationTargets);
94 }
Madan Jampani94c23532015-02-05 17:40:01 -080095 nextVersion = context.get("nextVersion");
96 if (nextVersion == null) {
97 nextVersion = new Long(0);
98 context.put("nextVersion", nextVersion);
99 }
100 }
101
Madan Jampani94c23532015-02-05 17:40:01 -0800102 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700103 public Set<String> maps() {
104 return ImmutableSet.copyOf(maps.keySet());
Madan Jampania89f8f92015-04-01 14:39:54 -0700105 }
106
107 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -0700108 public Map<String, Long> counters() {
109 Map<String, Long> counterMap = Maps.newHashMap();
110 counters.forEach((k, v) -> counterMap.put(k, v.get()));
111 return counterMap;
112 }
113
114 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700115 public int size(String mapName) {
116 return getMap(mapName).size();
Madan Jampani94c23532015-02-05 17:40:01 -0800117 }
118
119 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700120 public boolean mapIsEmpty(String mapName) {
121 return getMap(mapName).isEmpty();
Madan Jampani94c23532015-02-05 17:40:01 -0800122 }
123
124 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700125 public boolean mapContainsKey(String mapName, String key) {
126 return getMap(mapName).containsKey(key);
Madan Jampani94c23532015-02-05 17:40:01 -0800127 }
128
129 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700130 public boolean mapContainsValue(String mapName, byte[] value) {
131 return getMap(mapName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
Madan Jampani94c23532015-02-05 17:40:01 -0800132 }
133
134 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700135 public Versioned<byte[]> mapGet(String mapName, String key) {
136 return getMap(mapName).get(key);
Madan Jampani94c23532015-02-05 17:40:01 -0800137 }
138
Madan Jampani94c23532015-02-05 17:40:01 -0800139
140 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700141 public Result<UpdateResult<String, byte[]>> mapUpdate(
142 String mapName,
Madan Jampani346d4f52015-05-04 11:09:39 -0700143 String key,
Madan Jampani7804c992015-07-20 13:20:19 -0700144 Match<byte[]> valueMatch,
145 Match<Long> versionMatch,
Madan Jampani346d4f52015-05-04 11:09:39 -0700146 byte[] value) {
Madan Jampani7804c992015-07-20 13:20:19 -0700147 if (isLockedForUpdates(mapName, key)) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700148 return Result.locked();
Madan Jampani7804c992015-07-20 13:20:19 -0700149 }
150 Versioned<byte[]> currentValue = getMap(mapName).get(key);
151 if (!valueMatch.matches(currentValue == null ? null : currentValue.value()) ||
152 !versionMatch.matches(currentValue == null ? null : currentValue.version())) {
153 return Result.ok(new UpdateResult<>(false, mapName, key, currentValue, currentValue));
Madan Jampani346d4f52015-05-04 11:09:39 -0700154 } else {
Madan Jampani7804c992015-07-20 13:20:19 -0700155 if (value == null && currentValue != null) {
156 getMap(mapName).remove(key);
157 return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, null));
158 }
Madan Jampani346d4f52015-05-04 11:09:39 -0700159 Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
Madan Jampani7804c992015-07-20 13:20:19 -0700160 getMap(mapName).put(key, newValue);
161 return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, newValue));
Madan Jampani346d4f52015-05-04 11:09:39 -0700162 }
163 }
164
165 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700166 public Result<Void> mapClear(String mapName) {
167 if (areTransactionsInProgress(mapName)) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700168 return Result.locked();
169 }
Madan Jampani7804c992015-07-20 13:20:19 -0700170 getMap(mapName).clear();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700171 return Result.ok(null);
Madan Jampani94c23532015-02-05 17:40:01 -0800172 }
173
174 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700175 public Set<String> mapKeySet(String mapName) {
176 return ImmutableSet.copyOf(getMap(mapName).keySet());
Madan Jampani94c23532015-02-05 17:40:01 -0800177 }
178
179 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700180 public Collection<Versioned<byte[]>> mapValues(String mapName) {
181 return ImmutableList.copyOf(getMap(mapName).values());
Madan Jampani94c23532015-02-05 17:40:01 -0800182 }
183
184 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700185 public Set<Entry<String, Versioned<byte[]>>> mapEntrySet(String mapName) {
186 return ImmutableSet.copyOf(getMap(mapName)
Madan Jampani393e0f02015-02-12 07:35:39 +0530187 .entrySet()
188 .stream()
189 .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
190 .collect(Collectors.toSet()));
Madan Jampani94c23532015-02-05 17:40:01 -0800191 }
192
193 @Override
Madan Jampani55ac1342015-05-04 19:05:04 -0700194 public Long counterAddAndGet(String counterName, long delta) {
195 return getCounter(counterName).addAndGet(delta);
Madan Jampani04aeb452015-05-02 16:12:24 -0700196 }
197
198 @Override
199 public Long counterGetAndAdd(String counterName, long delta) {
200 return getCounter(counterName).getAndAdd(delta);
201 }
202
203 @Override
204 public Long counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700205 return getCounter(counterName).get();
206 }
207
208 @Override
Madan Jampani63c659f2015-06-11 00:52:58 -0700209 public Long queueSize(String queueName) {
210 return Long.valueOf(getQueue(queueName).size());
211 }
212
213 @Override
214 public byte[] queuePeek(String queueName) {
215 Queue<byte[]> queue = getQueue(queueName);
216 return queue.peek();
217 }
218
219 @Override
220 public byte[] queuePop(String queueName, NodeId requestor) {
221 Queue<byte[]> queue = getQueue(queueName);
222 if (queue.size() == 0 && requestor != null) {
223 getQueueUpdateNotificationTargets(queueName).add(requestor);
224 return null;
225 } else {
226 return queue.remove();
227 }
228 }
229
230 @Override
231 public Set<NodeId> queuePush(String queueName, byte[] entry) {
232 getQueue(queueName).add(entry);
233 Set<NodeId> notifyList = ImmutableSet.copyOf(getQueueUpdateNotificationTargets(queueName));
234 getQueueUpdateNotificationTargets(queueName).clear();
235 return notifyList;
236 }
237
238 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700239 public boolean prepareAndCommit(Transaction transaction) {
240 if (prepare(transaction)) {
241 return commit(transaction);
242 }
243 return false;
244 }
245
246 @Override
247 public boolean prepare(Transaction transaction) {
248 if (transaction.updates().stream().anyMatch(update ->
Madan Jampani7804c992015-07-20 13:20:19 -0700249 isLockedByAnotherTransaction(update.mapName(),
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700250 update.key(),
251 transaction.id()))) {
252 return false;
253 }
254
255 if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
256 transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
Madan Jampani94c23532015-02-05 17:40:01 -0800257 return true;
258 }
259 return false;
260 }
261
262 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700263 public boolean commit(Transaction transaction) {
264 transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
265 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800266 }
267
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700268 @Override
269 public boolean rollback(Transaction transaction) {
270 transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
271 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800272 }
273
Madan Jampani7804c992015-07-20 13:20:19 -0700274 private Map<String, Versioned<byte[]>> getMap(String mapName) {
275 return maps.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700276 }
277
Madan Jampani7804c992015-07-20 13:20:19 -0700278 private Map<String, Update> getLockMap(String mapName) {
279 return locks.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700280 }
281
Madan Jampanib5d72d52015-04-03 16:53:50 -0700282 private AtomicLong getCounter(String counterName) {
283 return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
284 }
285
Madan Jampani63c659f2015-06-11 00:52:58 -0700286 private Queue<byte[]> getQueue(String queueName) {
287 return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
288 }
289
290 private Set<NodeId> getQueueUpdateNotificationTargets(String queueName) {
291 return queueUpdateNotificationTargets.computeIfAbsent(queueName, name -> new HashSet<>());
292 }
293
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700294 private boolean isUpdatePossible(DatabaseUpdate update) {
Madan Jampani7804c992015-07-20 13:20:19 -0700295 Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
Madan Jampani94c23532015-02-05 17:40:01 -0800296 switch (update.type()) {
297 case PUT:
298 case REMOVE:
299 return true;
300 case PUT_IF_ABSENT:
301 return existingEntry == null;
302 case PUT_IF_VERSION_MATCH:
303 return existingEntry != null && existingEntry.version() == update.currentVersion();
304 case PUT_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700305 return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800306 case REMOVE_IF_VERSION_MATCH:
307 return existingEntry == null || existingEntry.version() == update.currentVersion();
308 case REMOVE_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700309 return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800310 default:
311 throw new IllegalStateException("Unsupported type: " + update.type());
312 }
313 }
314
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700315 private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700316 Map<String, Update> lockMap = getLockMap(update.mapName());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700317 switch (update.type()) {
318 case PUT:
319 case PUT_IF_ABSENT:
320 case PUT_IF_VERSION_MATCH:
321 case PUT_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700322 lockMap.put(update.key(), new Update(transactionId, update.value()));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700323 break;
324 case REMOVE:
325 case REMOVE_IF_VERSION_MATCH:
326 case REMOVE_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700327 lockMap.put(update.key(), new Update(transactionId, null));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700328 break;
329 default:
330 throw new IllegalStateException("Unsupported type: " + update.type());
Madan Jampani94c23532015-02-05 17:40:01 -0800331 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700332 }
333
334 private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700335 String mapName = update.mapName();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700336 String key = update.key();
337 Type type = update.type();
Madan Jampani7804c992015-07-20 13:20:19 -0700338 Update provisionalUpdate = getLockMap(mapName).get(key);
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700339 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
Madan Jampani7804c992015-07-20 13:20:19 -0700340 getLockMap(mapName).remove(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700341 } else {
342 return;
343 }
344
345 switch (type) {
346 case PUT:
347 case PUT_IF_ABSENT:
348 case PUT_IF_VERSION_MATCH:
349 case PUT_IF_VALUE_MATCH:
Madan Jampani7804c992015-07-20 13:20:19 -0700350 mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700351 break;
352 case REMOVE:
353 case REMOVE_IF_VERSION_MATCH:
354 case REMOVE_IF_VALUE_MATCH:
Madan Jampani7804c992015-07-20 13:20:19 -0700355 mapUpdate(mapName, key, Match.any(), Match.any(), null);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700356 break;
357 default:
358 break;
359 }
360 }
361
362 private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700363 String mapName = update.mapName();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700364 String key = update.key();
Madan Jampani7804c992015-07-20 13:20:19 -0700365 Update provisionalUpdate = getLockMap(mapName).get(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700366 if (provisionalUpdate == null) {
367 return;
368 }
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700369 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
Madan Jampani7804c992015-07-20 13:20:19 -0700370 getLockMap(mapName).remove(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700371 }
372 }
373
Madan Jampani7804c992015-07-20 13:20:19 -0700374 private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) {
375 Update update = getLockMap(mapName).get(key);
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700376 return update != null && !Objects.equal(transactionId, update.transactionId());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700377 }
378
Madan Jampani7804c992015-07-20 13:20:19 -0700379 private boolean isLockedForUpdates(String mapName, String key) {
380 return getLockMap(mapName).containsKey(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700381 }
382
Madan Jampani7804c992015-07-20 13:20:19 -0700383 private boolean areTransactionsInProgress(String mapName) {
384 return !getLockMap(mapName).isEmpty();
Madan Jampani94c23532015-02-05 17:40:01 -0800385 }
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700386
387 private class Update {
388 private final long transactionId;
389 private final byte[] value;
390
391 public Update(long txId, byte[] value) {
392 this.transactionId = txId;
393 this.value = value;
394 }
395
396 public long transactionId() {
397 return this.transactionId;
398 }
399
400 public byte[] value() {
401 return this.value;
402 }
403 }
Madan Jampani94c23532015-02-05 17:40:01 -0800404}