blob: f2f449adb04f30594980444ef213f5634748098d [file] [log] [blame]
Jordan Halterman948d6592017-04-20 17:18:24 -07001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.impl;
18
Jordan Halterman948d6592017-04-20 17:18:24 -070019import java.util.List;
20import java.util.Map;
21import java.util.Set;
22import java.util.concurrent.CompletableFuture;
23import java.util.concurrent.ExecutionException;
24import java.util.concurrent.TimeUnit;
25import java.util.concurrent.TimeoutException;
26import java.util.stream.Collectors;
27import java.util.stream.Stream;
28
29import com.google.common.base.MoreObjects;
30import com.google.common.base.Objects;
31import com.google.common.base.Throwables;
32import com.google.common.collect.Maps;
33import com.google.common.collect.Sets;
34import org.onosproject.store.primitives.MapUpdate;
35import org.onosproject.store.service.ConsistentMap;
36import org.onosproject.store.service.DistributedPrimitive;
37import org.onosproject.store.service.TransactionException;
38import org.onosproject.store.service.TransactionalMap;
39import org.onosproject.store.service.Version;
40
41import static com.google.common.base.Preconditions.checkNotNull;
42import static com.google.common.base.Preconditions.checkState;
43
44/**
45 * Base class for participants within a single {@link TransactionalMap}.
46 * <p>
47 * This class provides the basic functionality required by transactional map participants and provides methods
48 * for defining operations specific to individual isolation levels.
49 *
50 * @param <K> key type
51 * @param <V> value type.
52 */
53public abstract class TransactionalMapParticipant<K, V> implements TransactionalMap<K, V>, TransactionParticipant {
54 private static final String TX_CLOSED_ERROR = "Transaction is closed";
55 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
56 private static final String ERROR_NULL_KEY = "Null key is not allowed";
57
58 protected final ConsistentMap<K, V> backingMap;
59 protected final Transaction<MapUpdate<K, V>> transaction;
60 protected final Map<K, V> writeCache = Maps.newConcurrentMap();
61 protected final Set<K> deleteSet = Sets.newConcurrentHashSet();
Jordan Halterman948d6592017-04-20 17:18:24 -070062 protected volatile Version lock;
63
64 protected TransactionalMapParticipant(
65 ConsistentMap<K, V> backingMap,
66 Transaction<MapUpdate<K, V>> transaction) {
67 this.backingMap = backingMap;
68 this.transaction = transaction;
69 }
70
71 /**
72 * Starts the transaction for this partition when a read occurs.
73 * <p>
74 * Acquiring a pessimistic lock at the start of the transaction ensures that underlying cached maps have been
75 * synchronized prior to a read.
76 */
77 private void beginTransaction() {
Jordan Halterman5f97a302017-04-26 23:41:31 -070078 if (lock == null) {
79 synchronized (this) {
80 if (lock == null) {
81 try {
82 lock = transaction.begin()
Jordan Halterman6440b092017-05-24 17:48:08 -070083 .get(DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Jordan Halterman5f97a302017-04-26 23:41:31 -070084 } catch (InterruptedException e) {
85 Thread.currentThread().interrupt();
86 throw new TransactionException.Interrupted();
87 } catch (TimeoutException e) {
88 throw new TransactionException.Timeout();
89 } catch (ExecutionException e) {
90 Throwables.propagateIfPossible(e.getCause());
91 throw new TransactionException(e.getCause());
92 }
93 }
Jordan Halterman948d6592017-04-20 17:18:24 -070094 }
95 }
96 }
97
98 @Override
99 public V get(K key) {
100 // Start the transaction for this primitive/partition if necessary.
101 beginTransaction();
102
103 checkState(transaction.isOpen(), TX_CLOSED_ERROR);
104 checkNotNull(key, ERROR_NULL_KEY);
105
106 if (deleteSet.contains(key)) {
107 return null;
108 }
109
110 V latest = writeCache.get(key);
111 if (latest != null) {
112 return latest;
113 } else {
114 return read(key);
115 }
116 }
117
118 /**
119 * Executes a get operation based on the transaction isolation level.
120 *
121 * @param key the key to look up
122 * @return the value
123 */
124 protected abstract V read(K key);
125
126 @Override
127 public boolean containsKey(K key) {
128 return get(key) != null;
129 }
130
131 @Override
132 public V put(K key, V value) {
133 checkNotNull(value, ERROR_NULL_VALUE);
134
135 V latest = get(key);
136 writeCache.put(key, value);
137 deleteSet.remove(key);
138 return latest;
139 }
140
141 @Override
142 public V remove(K key) {
143 V latest = get(key);
144 if (latest != null) {
145 writeCache.remove(key);
146 deleteSet.add(key);
147 }
148 return latest;
149 }
150
151 @Override
152 public boolean remove(K key, V value) {
153 checkNotNull(value, ERROR_NULL_VALUE);
154
155 V latest = get(key);
156 if (Objects.equal(value, latest)) {
157 remove(key);
158 return true;
159 }
160 return false;
161 }
162
163 @Override
164 public boolean replace(K key, V oldValue, V newValue) {
165 checkNotNull(oldValue, ERROR_NULL_VALUE);
166 checkNotNull(newValue, ERROR_NULL_VALUE);
167
168 V latest = get(key);
169 if (Objects.equal(oldValue, latest)) {
170 put(key, newValue);
171 return true;
172 }
173 return false;
174 }
175
176 @Override
177 public V putIfAbsent(K key, V value) {
178 checkNotNull(value, ERROR_NULL_VALUE);
179
180 V latest = get(key);
181 if (latest == null) {
182 put(key, value);
183 }
184 return latest;
185 }
186
187 @Override
Jordan Halterman948d6592017-04-20 17:18:24 -0700188 public CompletableFuture<Boolean> prepare() {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700189 return transaction.prepare(log(lock));
Jordan Halterman948d6592017-04-20 17:18:24 -0700190 }
191
192 @Override
193 public CompletableFuture<Void> commit() {
194 return transaction.commit();
195 }
196
197 @Override
198 public CompletableFuture<Boolean> prepareAndCommit() {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700199 return transaction.prepareAndCommit(log(lock));
Jordan Halterman948d6592017-04-20 17:18:24 -0700200 }
201
202 @Override
203 public CompletableFuture<Void> rollback() {
204 return transaction.rollback();
205 }
206
207 /**
208 * Returns a list of updates performed within this map partition.
209 *
Jordan Halterman5f97a302017-04-26 23:41:31 -0700210 * @param lockVersion the global transaction lock version
Jordan Halterman948d6592017-04-20 17:18:24 -0700211 * @return a list of map updates
212 */
Jordan Halterman5f97a302017-04-26 23:41:31 -0700213 protected List<MapUpdate<K, V>> log(Version lockVersion) {
214 return records(lockVersion).collect(Collectors.toList());
Jordan Halterman948d6592017-04-20 17:18:24 -0700215 }
216
217 /**
218 * Returns a stream of updates performed within this map partition.
219 *
Jordan Halterman5f97a302017-04-26 23:41:31 -0700220 * @param lockVersion the global transaction lock version
Jordan Halterman948d6592017-04-20 17:18:24 -0700221 * @return a stream of map updates
222 */
Jordan Halterman5f97a302017-04-26 23:41:31 -0700223 protected abstract Stream<MapUpdate<K, V>> records(Version lockVersion);
Jordan Halterman948d6592017-04-20 17:18:24 -0700224
225 @Override
226 public String toString() {
227 return MoreObjects.toStringHelper(this)
228 .add("backingMap", backingMap)
Jordan Halterman948d6592017-04-20 17:18:24 -0700229 .toString();
230 }
231}