blob: 219b8470b6b5a571e60f2e13d62cbb3b7f981e24 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Arrays;
20import java.util.Collection;
Madan Jampania89f8f92015-04-01 14:39:54 -070021import java.util.HashSet;
Madan Jampani63c659f2015-06-11 00:52:58 -070022import java.util.LinkedList;
Madan Jampani94c23532015-02-05 17:40:01 -080023import java.util.Map;
24import java.util.Map.Entry;
Madan Jampani63c659f2015-06-11 00:52:58 -070025import java.util.Queue;
Madan Jampanib5d72d52015-04-03 16:53:50 -070026import java.util.concurrent.atomic.AtomicLong;
Madan Jampani393e0f02015-02-12 07:35:39 +053027import java.util.stream.Collectors;
Madan Jampani94c23532015-02-05 17:40:01 -080028import java.util.Set;
29
Madan Jampani63c659f2015-06-11 00:52:58 -070030import org.onosproject.cluster.NodeId;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070031import org.onosproject.store.service.DatabaseUpdate;
32import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053033import org.onosproject.store.service.Versioned;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070034import com.google.common.base.Objects;
Madan Jampani393e0f02015-02-12 07:35:39 +053035import com.google.common.collect.ImmutableList;
36import com.google.common.collect.ImmutableSet;
Madan Jampanibab51a42015-08-10 13:53:35 -070037import com.google.common.collect.Lists;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070038import com.google.common.collect.Maps;
Madan Jampani393e0f02015-02-12 07:35:39 +053039
Madan Jampani94c23532015-02-05 17:40:01 -080040import net.kuujo.copycat.state.Initializer;
41import net.kuujo.copycat.state.StateContext;
42
43/**
44 * Default database state.
Madan Jampani94c23532015-02-05 17:40:01 -080045 */
Madan Jampanibff6d8f2015-03-31 16:53:47 -070046public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
Madan Jampani94c23532015-02-05 17:40:01 -080047 private Long nextVersion;
Madan Jampanib5d72d52015-04-03 16:53:50 -070048 private Map<String, AtomicLong> counters;
Madan Jampani7804c992015-07-20 13:20:19 -070049 private Map<String, Map<String, Versioned<byte[]>>> maps;
Madan Jampani63c659f2015-06-11 00:52:58 -070050 private Map<String, Queue<byte[]>> queues;
51 private Map<String, Set<NodeId>> queueUpdateNotificationTargets;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070052
53 /**
54 * This locks map has a structure similar to the "tables" map above and
55 * holds all the provisional updates made during a transaction's prepare phase.
56 * The entry value is represented as the tuple: (transactionId, newValue)
57 * If newValue == null that signifies this update is attempting to
58 * delete the existing value.
59 * This map also serves as a lock on the entries that are being updated.
60 * The presence of a entry in this map indicates that element is
61 * participating in a transaction and is currently locked for updates.
62 */
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -070063 private Map<String, Map<String, Update>> locks;
Madan Jampani94c23532015-02-05 17:40:01 -080064
65 @Initializer
66 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -070067 public void init(StateContext<DatabaseState<String, byte[]>> context) {
Madan Jampanib5d72d52015-04-03 16:53:50 -070068 counters = context.get("counters");
69 if (counters == null) {
70 counters = Maps.newConcurrentMap();
71 context.put("counters", counters);
72 }
Madan Jampani7804c992015-07-20 13:20:19 -070073 maps = context.get("maps");
74 if (maps == null) {
75 maps = Maps.newConcurrentMap();
76 context.put("maps", maps);
Madan Jampani94c23532015-02-05 17:40:01 -080077 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -070078 locks = context.get("locks");
79 if (locks == null) {
80 locks = Maps.newConcurrentMap();
81 context.put("locks", locks);
82 }
Madan Jampani63c659f2015-06-11 00:52:58 -070083 queues = context.get("queues");
84 if (queues == null) {
85 queues = Maps.newConcurrentMap();
86 context.put("queues", queues);
87 }
88 queueUpdateNotificationTargets = context.get("queueUpdateNotificationTargets");
89 if (queueUpdateNotificationTargets == null) {
90 queueUpdateNotificationTargets = Maps.newConcurrentMap();
91 context.put("queueUpdateNotificationTargets", queueUpdateNotificationTargets);
92 }
Madan Jampani94c23532015-02-05 17:40:01 -080093 nextVersion = context.get("nextVersion");
94 if (nextVersion == null) {
95 nextVersion = new Long(0);
96 context.put("nextVersion", nextVersion);
97 }
98 }
99
Madan Jampani94c23532015-02-05 17:40:01 -0800100 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700101 public Set<String> maps() {
102 return ImmutableSet.copyOf(maps.keySet());
Madan Jampania89f8f92015-04-01 14:39:54 -0700103 }
104
105 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -0700106 public Map<String, Long> counters() {
107 Map<String, Long> counterMap = Maps.newHashMap();
108 counters.forEach((k, v) -> counterMap.put(k, v.get()));
109 return counterMap;
110 }
111
112 @Override
Madan Jampani3ca9cb62015-07-21 11:35:44 -0700113 public int mapSize(String mapName) {
Madan Jampani7804c992015-07-20 13:20:19 -0700114 return getMap(mapName).size();
Madan Jampani94c23532015-02-05 17:40:01 -0800115 }
116
117 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700118 public boolean mapIsEmpty(String mapName) {
119 return getMap(mapName).isEmpty();
Madan Jampani94c23532015-02-05 17:40:01 -0800120 }
121
122 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700123 public boolean mapContainsKey(String mapName, String key) {
124 return getMap(mapName).containsKey(key);
Madan Jampani94c23532015-02-05 17:40:01 -0800125 }
126
127 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700128 public boolean mapContainsValue(String mapName, byte[] value) {
129 return getMap(mapName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
Madan Jampani94c23532015-02-05 17:40:01 -0800130 }
131
132 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700133 public Versioned<byte[]> mapGet(String mapName, String key) {
134 return getMap(mapName).get(key);
Madan Jampani94c23532015-02-05 17:40:01 -0800135 }
136
Madan Jampani94c23532015-02-05 17:40:01 -0800137
138 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700139 public Result<UpdateResult<String, byte[]>> mapUpdate(
140 String mapName,
Madan Jampani346d4f52015-05-04 11:09:39 -0700141 String key,
Madan Jampani7804c992015-07-20 13:20:19 -0700142 Match<byte[]> valueMatch,
143 Match<Long> versionMatch,
Madan Jampani346d4f52015-05-04 11:09:39 -0700144 byte[] value) {
Madan Jampani7804c992015-07-20 13:20:19 -0700145 if (isLockedForUpdates(mapName, key)) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700146 return Result.locked();
Madan Jampani7804c992015-07-20 13:20:19 -0700147 }
148 Versioned<byte[]> currentValue = getMap(mapName).get(key);
149 if (!valueMatch.matches(currentValue == null ? null : currentValue.value()) ||
150 !versionMatch.matches(currentValue == null ? null : currentValue.version())) {
151 return Result.ok(new UpdateResult<>(false, mapName, key, currentValue, currentValue));
Madan Jampani346d4f52015-05-04 11:09:39 -0700152 } else {
Madan Jampani7c4e09a2015-07-22 11:44:29 -0700153 if (value == null) {
154 if (currentValue == null) {
155 return Result.ok(new UpdateResult<>(false, mapName, key, null, null));
156 } else {
157 getMap(mapName).remove(key);
158 return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, null));
159 }
Madan Jampani7804c992015-07-20 13:20:19 -0700160 }
Madan Jampani346d4f52015-05-04 11:09:39 -0700161 Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
Madan Jampani7804c992015-07-20 13:20:19 -0700162 getMap(mapName).put(key, newValue);
163 return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, newValue));
Madan Jampani346d4f52015-05-04 11:09:39 -0700164 }
165 }
166
167 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700168 public Result<Void> mapClear(String mapName) {
169 if (areTransactionsInProgress(mapName)) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700170 return Result.locked();
171 }
Madan Jampani7804c992015-07-20 13:20:19 -0700172 getMap(mapName).clear();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700173 return Result.ok(null);
Madan Jampani94c23532015-02-05 17:40:01 -0800174 }
175
176 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700177 public Set<String> mapKeySet(String mapName) {
178 return ImmutableSet.copyOf(getMap(mapName).keySet());
Madan Jampani94c23532015-02-05 17:40:01 -0800179 }
180
181 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700182 public Collection<Versioned<byte[]>> mapValues(String mapName) {
183 return ImmutableList.copyOf(getMap(mapName).values());
Madan Jampani94c23532015-02-05 17:40:01 -0800184 }
185
186 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700187 public Set<Entry<String, Versioned<byte[]>>> mapEntrySet(String mapName) {
188 return ImmutableSet.copyOf(getMap(mapName)
Madan Jampani393e0f02015-02-12 07:35:39 +0530189 .entrySet()
190 .stream()
Madan Jampani3ca9cb62015-07-21 11:35:44 -0700191 .map(entry -> Maps.immutableEntry(entry.getKey(), entry.getValue()))
Madan Jampani393e0f02015-02-12 07:35:39 +0530192 .collect(Collectors.toSet()));
Madan Jampani94c23532015-02-05 17:40:01 -0800193 }
194
195 @Override
Madan Jampani55ac1342015-05-04 19:05:04 -0700196 public Long counterAddAndGet(String counterName, long delta) {
197 return getCounter(counterName).addAndGet(delta);
Madan Jampani04aeb452015-05-02 16:12:24 -0700198 }
199
200 @Override
201 public Long counterGetAndAdd(String counterName, long delta) {
202 return getCounter(counterName).getAndAdd(delta);
203 }
204
205 @Override
206 public Long counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700207 return getCounter(counterName).get();
208 }
209
210 @Override
Madan Jampani63c659f2015-06-11 00:52:58 -0700211 public Long queueSize(String queueName) {
212 return Long.valueOf(getQueue(queueName).size());
213 }
214
215 @Override
216 public byte[] queuePeek(String queueName) {
217 Queue<byte[]> queue = getQueue(queueName);
218 return queue.peek();
219 }
220
221 @Override
222 public byte[] queuePop(String queueName, NodeId requestor) {
223 Queue<byte[]> queue = getQueue(queueName);
224 if (queue.size() == 0 && requestor != null) {
225 getQueueUpdateNotificationTargets(queueName).add(requestor);
226 return null;
227 } else {
228 return queue.remove();
229 }
230 }
231
232 @Override
233 public Set<NodeId> queuePush(String queueName, byte[] entry) {
234 getQueue(queueName).add(entry);
235 Set<NodeId> notifyList = ImmutableSet.copyOf(getQueueUpdateNotificationTargets(queueName));
236 getQueueUpdateNotificationTargets(queueName).clear();
237 return notifyList;
238 }
239
240 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700241 public CommitResponse prepareAndCommit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700242 if (prepare(transaction)) {
243 return commit(transaction);
244 }
Madan Jampanibab51a42015-08-10 13:53:35 -0700245 return CommitResponse.failure();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700246 }
247
248 @Override
249 public boolean prepare(Transaction transaction) {
250 if (transaction.updates().stream().anyMatch(update ->
Madan Jampani7804c992015-07-20 13:20:19 -0700251 isLockedByAnotherTransaction(update.mapName(),
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700252 update.key(),
253 transaction.id()))) {
254 return false;
255 }
256
257 if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
258 transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
Madan Jampani94c23532015-02-05 17:40:01 -0800259 return true;
260 }
261 return false;
262 }
263
264 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700265 public CommitResponse commit(Transaction transaction) {
266 return CommitResponse.success(Lists.transform(transaction.updates(),
267 update -> commitProvisionalUpdate(update, transaction.id())));
Madan Jampani94c23532015-02-05 17:40:01 -0800268 }
269
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700270 @Override
271 public boolean rollback(Transaction transaction) {
272 transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
273 return true;
Madan Jampani94c23532015-02-05 17:40:01 -0800274 }
275
Madan Jampani7804c992015-07-20 13:20:19 -0700276 private Map<String, Versioned<byte[]>> getMap(String mapName) {
277 return maps.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700278 }
279
Madan Jampani7804c992015-07-20 13:20:19 -0700280 private Map<String, Update> getLockMap(String mapName) {
281 return locks.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700282 }
283
Madan Jampanib5d72d52015-04-03 16:53:50 -0700284 private AtomicLong getCounter(String counterName) {
285 return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
286 }
287
Madan Jampani63c659f2015-06-11 00:52:58 -0700288 private Queue<byte[]> getQueue(String queueName) {
289 return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
290 }
291
292 private Set<NodeId> getQueueUpdateNotificationTargets(String queueName) {
293 return queueUpdateNotificationTargets.computeIfAbsent(queueName, name -> new HashSet<>());
294 }
295
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700296 private boolean isUpdatePossible(DatabaseUpdate update) {
Madan Jampani7804c992015-07-20 13:20:19 -0700297 Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
Madan Jampani94c23532015-02-05 17:40:01 -0800298 switch (update.type()) {
299 case PUT:
300 case REMOVE:
301 return true;
302 case PUT_IF_ABSENT:
303 return existingEntry == null;
304 case PUT_IF_VERSION_MATCH:
305 return existingEntry != null && existingEntry.version() == update.currentVersion();
306 case PUT_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700307 return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800308 case REMOVE_IF_VERSION_MATCH:
309 return existingEntry == null || existingEntry.version() == update.currentVersion();
310 case REMOVE_IF_VALUE_MATCH:
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700311 return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800312 default:
313 throw new IllegalStateException("Unsupported type: " + update.type());
314 }
315 }
316
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700317 private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700318 Map<String, Update> lockMap = getLockMap(update.mapName());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700319 switch (update.type()) {
320 case PUT:
321 case PUT_IF_ABSENT:
322 case PUT_IF_VERSION_MATCH:
323 case PUT_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700324 lockMap.put(update.key(), new Update(transactionId, update.value()));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700325 break;
326 case REMOVE:
327 case REMOVE_IF_VERSION_MATCH:
328 case REMOVE_IF_VALUE_MATCH:
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700329 lockMap.put(update.key(), new Update(transactionId, null));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700330 break;
331 default:
332 throw new IllegalStateException("Unsupported type: " + update.type());
Madan Jampani94c23532015-02-05 17:40:01 -0800333 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700334 }
335
Madan Jampanibab51a42015-08-10 13:53:35 -0700336 private UpdateResult<String, byte[]> commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700337 String mapName = update.mapName();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700338 String key = update.key();
Madan Jampani7804c992015-07-20 13:20:19 -0700339 Update provisionalUpdate = getLockMap(mapName).get(key);
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700340 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
Madan Jampani7804c992015-07-20 13:20:19 -0700341 getLockMap(mapName).remove(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700342 } else {
Madan Jampanibab51a42015-08-10 13:53:35 -0700343 throw new IllegalStateException("Invalid transaction Id");
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700344 }
Madan Jampanibab51a42015-08-10 13:53:35 -0700345 return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700346 }
347
348 private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Madan Jampani7804c992015-07-20 13:20:19 -0700349 String mapName = update.mapName();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700350 String key = update.key();
Madan Jampani7804c992015-07-20 13:20:19 -0700351 Update provisionalUpdate = getLockMap(mapName).get(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700352 if (provisionalUpdate == null) {
353 return;
354 }
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700355 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
Madan Jampani7804c992015-07-20 13:20:19 -0700356 getLockMap(mapName).remove(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700357 }
358 }
359
Madan Jampani7804c992015-07-20 13:20:19 -0700360 private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) {
361 Update update = getLockMap(mapName).get(key);
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700362 return update != null && !Objects.equal(transactionId, update.transactionId());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700363 }
364
Madan Jampani7804c992015-07-20 13:20:19 -0700365 private boolean isLockedForUpdates(String mapName, String key) {
366 return getLockMap(mapName).containsKey(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700367 }
368
Madan Jampani7804c992015-07-20 13:20:19 -0700369 private boolean areTransactionsInProgress(String mapName) {
370 return !getLockMap(mapName).isEmpty();
Madan Jampani94c23532015-02-05 17:40:01 -0800371 }
Ayaka Koshibe2c6b7ef2015-04-28 17:18:05 -0700372
373 private class Update {
374 private final long transactionId;
375 private final byte[] value;
376
377 public Update(long txId, byte[] value) {
378 this.transactionId = txId;
379 this.value = value;
380 }
381
382 public long transactionId() {
383 return this.transactionId;
384 }
385
386 public byte[] value() {
387 return this.value;
388 }
389 }
Madan Jampani94c23532015-02-05 17:40:01 -0800390}