blob: 4429a1b0babd1046be9cf86a423e9d8a51cd829b [file] [log] [blame]
Jordan Halterman948d6592017-04-20 17:18:24 -07001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.impl;
18
19import java.util.ArrayList;
20import java.util.List;
21import java.util.Map;
22import java.util.Set;
23import java.util.concurrent.CompletableFuture;
24import java.util.concurrent.ExecutionException;
25import java.util.concurrent.TimeUnit;
26import java.util.concurrent.TimeoutException;
27import java.util.stream.Collectors;
28import java.util.stream.Stream;
29
30import com.google.common.base.MoreObjects;
31import com.google.common.base.Objects;
32import com.google.common.base.Throwables;
33import com.google.common.collect.Maps;
34import com.google.common.collect.Sets;
35import org.onosproject.store.primitives.MapUpdate;
36import org.onosproject.store.service.ConsistentMap;
37import org.onosproject.store.service.DistributedPrimitive;
38import org.onosproject.store.service.TransactionException;
39import org.onosproject.store.service.TransactionalMap;
40import org.onosproject.store.service.Version;
41
42import static com.google.common.base.Preconditions.checkNotNull;
43import static com.google.common.base.Preconditions.checkState;
44
45/**
46 * Base class for participants within a single {@link TransactionalMap}.
47 * <p>
48 * This class provides the basic functionality required by transactional map participants and provides methods
49 * for defining operations specific to individual isolation levels.
50 *
51 * @param <K> key type
52 * @param <V> value type.
53 */
54public abstract class TransactionalMapParticipant<K, V> implements TransactionalMap<K, V>, TransactionParticipant {
55 private static final String TX_CLOSED_ERROR = "Transaction is closed";
56 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
57 private static final String ERROR_NULL_KEY = "Null key is not allowed";
58
59 protected final ConsistentMap<K, V> backingMap;
60 protected final Transaction<MapUpdate<K, V>> transaction;
61 protected final Map<K, V> writeCache = Maps.newConcurrentMap();
62 protected final Set<K> deleteSet = Sets.newConcurrentHashSet();
63 protected final List<MapUpdate<K, V>> log = new ArrayList<>();
64 protected volatile Version lock;
65
66 protected TransactionalMapParticipant(
67 ConsistentMap<K, V> backingMap,
68 Transaction<MapUpdate<K, V>> transaction) {
69 this.backingMap = backingMap;
70 this.transaction = transaction;
71 }
72
73 /**
74 * Starts the transaction for this partition when a read occurs.
75 * <p>
76 * Acquiring a pessimistic lock at the start of the transaction ensures that underlying cached maps have been
77 * synchronized prior to a read.
78 */
79 private void beginTransaction() {
80 if (!transaction.isOpen()) {
81 try {
82 lock = transaction.begin()
83 .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
84 } catch (InterruptedException e) {
85 Thread.currentThread().interrupt();
86 throw new TransactionException.Interrupted();
87 } catch (TimeoutException e) {
88 throw new TransactionException.Timeout();
89 } catch (ExecutionException e) {
90 Throwables.propagateIfPossible(e.getCause());
91 throw new TransactionException(e.getCause());
92 }
93 }
94 }
95
96 @Override
97 public V get(K key) {
98 // Start the transaction for this primitive/partition if necessary.
99 beginTransaction();
100
101 checkState(transaction.isOpen(), TX_CLOSED_ERROR);
102 checkNotNull(key, ERROR_NULL_KEY);
103
104 if (deleteSet.contains(key)) {
105 return null;
106 }
107
108 V latest = writeCache.get(key);
109 if (latest != null) {
110 return latest;
111 } else {
112 return read(key);
113 }
114 }
115
116 /**
117 * Executes a get operation based on the transaction isolation level.
118 *
119 * @param key the key to look up
120 * @return the value
121 */
122 protected abstract V read(K key);
123
124 @Override
125 public boolean containsKey(K key) {
126 return get(key) != null;
127 }
128
129 @Override
130 public V put(K key, V value) {
131 checkNotNull(value, ERROR_NULL_VALUE);
132
133 V latest = get(key);
134 writeCache.put(key, value);
135 deleteSet.remove(key);
136 return latest;
137 }
138
139 @Override
140 public V remove(K key) {
141 V latest = get(key);
142 if (latest != null) {
143 writeCache.remove(key);
144 deleteSet.add(key);
145 }
146 return latest;
147 }
148
149 @Override
150 public boolean remove(K key, V value) {
151 checkNotNull(value, ERROR_NULL_VALUE);
152
153 V latest = get(key);
154 if (Objects.equal(value, latest)) {
155 remove(key);
156 return true;
157 }
158 return false;
159 }
160
161 @Override
162 public boolean replace(K key, V oldValue, V newValue) {
163 checkNotNull(oldValue, ERROR_NULL_VALUE);
164 checkNotNull(newValue, ERROR_NULL_VALUE);
165
166 V latest = get(key);
167 if (Objects.equal(oldValue, latest)) {
168 put(key, newValue);
169 return true;
170 }
171 return false;
172 }
173
174 @Override
175 public V putIfAbsent(K key, V value) {
176 checkNotNull(value, ERROR_NULL_VALUE);
177
178 V latest = get(key);
179 if (latest == null) {
180 put(key, value);
181 }
182 return latest;
183 }
184
185 @Override
186 public boolean hasPendingUpdates() {
187 return records().findAny().isPresent();
188 }
189
190 @Override
191 public CompletableFuture<Boolean> prepare() {
192 return transaction.prepare(log());
193 }
194
195 @Override
196 public CompletableFuture<Void> commit() {
197 return transaction.commit();
198 }
199
200 @Override
201 public CompletableFuture<Boolean> prepareAndCommit() {
202 return transaction.prepareAndCommit(log());
203 }
204
205 @Override
206 public CompletableFuture<Void> rollback() {
207 return transaction.rollback();
208 }
209
210 /**
211 * Returns a list of updates performed within this map partition.
212 *
213 * @return a list of map updates
214 */
215 protected List<MapUpdate<K, V>> log() {
216 return records().collect(Collectors.toList());
217 }
218
219 /**
220 * Returns a stream of updates performed within this map partition.
221 *
222 * @return a stream of map updates
223 */
224 protected abstract Stream<MapUpdate<K, V>> records();
225
226 @Override
227 public String toString() {
228 return MoreObjects.toStringHelper(this)
229 .add("backingMap", backingMap)
230 .add("updates", log())
231 .toString();
232 }
233}