blob: 3cb87a4e800daab60bd82c08d87242f3700d4034 [file] [log] [blame]
Jordan Halterman948d6592017-04-20 17:18:24 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman948d6592017-04-20 17:18:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.impl;
18
Jordan Halterman948d6592017-04-20 17:18:24 -070019import com.google.common.base.MoreObjects;
20import com.google.common.base.Objects;
Jordan Halterman948d6592017-04-20 17:18:24 -070021import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import org.onosproject.store.primitives.MapUpdate;
24import org.onosproject.store.service.ConsistentMap;
25import org.onosproject.store.service.DistributedPrimitive;
26import org.onosproject.store.service.TransactionException;
27import org.onosproject.store.service.TransactionalMap;
28import org.onosproject.store.service.Version;
29
Ray Milkey6a51cb92018-03-06 09:03:03 -080030import java.util.List;
31import java.util.Map;
32import java.util.Set;
33import java.util.concurrent.CompletableFuture;
34import java.util.concurrent.ExecutionException;
35import java.util.concurrent.TimeUnit;
36import java.util.concurrent.TimeoutException;
37import java.util.stream.Collectors;
38import java.util.stream.Stream;
39
Jordan Halterman948d6592017-04-20 17:18:24 -070040import static com.google.common.base.Preconditions.checkNotNull;
41import static com.google.common.base.Preconditions.checkState;
42
43/**
44 * Base class for participants within a single {@link TransactionalMap}.
45 * <p>
46 * This class provides the basic functionality required by transactional map participants and provides methods
47 * for defining operations specific to individual isolation levels.
48 *
49 * @param <K> key type
50 * @param <V> value type.
51 */
52public abstract class TransactionalMapParticipant<K, V> implements TransactionalMap<K, V>, TransactionParticipant {
53 private static final String TX_CLOSED_ERROR = "Transaction is closed";
54 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
55 private static final String ERROR_NULL_KEY = "Null key is not allowed";
56
57 protected final ConsistentMap<K, V> backingMap;
58 protected final Transaction<MapUpdate<K, V>> transaction;
59 protected final Map<K, V> writeCache = Maps.newConcurrentMap();
60 protected final Set<K> deleteSet = Sets.newConcurrentHashSet();
Jordan Halterman948d6592017-04-20 17:18:24 -070061 protected volatile Version lock;
62
63 protected TransactionalMapParticipant(
64 ConsistentMap<K, V> backingMap,
65 Transaction<MapUpdate<K, V>> transaction) {
66 this.backingMap = backingMap;
67 this.transaction = transaction;
68 }
69
70 /**
71 * Starts the transaction for this partition when a read occurs.
72 * <p>
73 * Acquiring a pessimistic lock at the start of the transaction ensures that underlying cached maps have been
74 * synchronized prior to a read.
75 */
76 private void beginTransaction() {
Jordan Halterman5f97a302017-04-26 23:41:31 -070077 if (lock == null) {
78 synchronized (this) {
79 if (lock == null) {
80 try {
81 lock = transaction.begin()
Jordan Halterman6440b092017-05-24 17:48:08 -070082 .get(DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Jordan Halterman5f97a302017-04-26 23:41:31 -070083 } catch (InterruptedException e) {
84 Thread.currentThread().interrupt();
85 throw new TransactionException.Interrupted();
86 } catch (TimeoutException e) {
87 throw new TransactionException.Timeout();
88 } catch (ExecutionException e) {
Jordan Halterman5f97a302017-04-26 23:41:31 -070089 throw new TransactionException(e.getCause());
90 }
91 }
Jordan Halterman948d6592017-04-20 17:18:24 -070092 }
93 }
94 }
95
96 @Override
97 public V get(K key) {
98 // Start the transaction for this primitive/partition if necessary.
99 beginTransaction();
100
101 checkState(transaction.isOpen(), TX_CLOSED_ERROR);
102 checkNotNull(key, ERROR_NULL_KEY);
103
104 if (deleteSet.contains(key)) {
105 return null;
106 }
107
108 V latest = writeCache.get(key);
109 if (latest != null) {
110 return latest;
111 } else {
112 return read(key);
113 }
114 }
115
116 /**
117 * Executes a get operation based on the transaction isolation level.
118 *
119 * @param key the key to look up
120 * @return the value
121 */
122 protected abstract V read(K key);
123
124 @Override
125 public boolean containsKey(K key) {
126 return get(key) != null;
127 }
128
129 @Override
130 public V put(K key, V value) {
131 checkNotNull(value, ERROR_NULL_VALUE);
132
133 V latest = get(key);
134 writeCache.put(key, value);
135 deleteSet.remove(key);
136 return latest;
137 }
138
139 @Override
140 public V remove(K key) {
141 V latest = get(key);
142 if (latest != null) {
143 writeCache.remove(key);
144 deleteSet.add(key);
145 }
146 return latest;
147 }
148
149 @Override
150 public boolean remove(K key, V value) {
151 checkNotNull(value, ERROR_NULL_VALUE);
152
153 V latest = get(key);
154 if (Objects.equal(value, latest)) {
155 remove(key);
156 return true;
157 }
158 return false;
159 }
160
161 @Override
162 public boolean replace(K key, V oldValue, V newValue) {
163 checkNotNull(oldValue, ERROR_NULL_VALUE);
164 checkNotNull(newValue, ERROR_NULL_VALUE);
165
166 V latest = get(key);
167 if (Objects.equal(oldValue, latest)) {
168 put(key, newValue);
169 return true;
170 }
171 return false;
172 }
173
174 @Override
175 public V putIfAbsent(K key, V value) {
176 checkNotNull(value, ERROR_NULL_VALUE);
177
178 V latest = get(key);
179 if (latest == null) {
180 put(key, value);
181 }
182 return latest;
183 }
184
185 @Override
Jordan Halterman948d6592017-04-20 17:18:24 -0700186 public CompletableFuture<Boolean> prepare() {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700187 return transaction.prepare(log(lock));
Jordan Halterman948d6592017-04-20 17:18:24 -0700188 }
189
190 @Override
191 public CompletableFuture<Void> commit() {
192 return transaction.commit();
193 }
194
195 @Override
196 public CompletableFuture<Boolean> prepareAndCommit() {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700197 return transaction.prepareAndCommit(log(lock));
Jordan Halterman948d6592017-04-20 17:18:24 -0700198 }
199
200 @Override
201 public CompletableFuture<Void> rollback() {
202 return transaction.rollback();
203 }
204
205 /**
206 * Returns a list of updates performed within this map partition.
207 *
Jordan Halterman5f97a302017-04-26 23:41:31 -0700208 * @param lockVersion the global transaction lock version
Jordan Halterman948d6592017-04-20 17:18:24 -0700209 * @return a list of map updates
210 */
Jordan Halterman5f97a302017-04-26 23:41:31 -0700211 protected List<MapUpdate<K, V>> log(Version lockVersion) {
212 return records(lockVersion).collect(Collectors.toList());
Jordan Halterman948d6592017-04-20 17:18:24 -0700213 }
214
215 /**
216 * Returns a stream of updates performed within this map partition.
217 *
Jordan Halterman5f97a302017-04-26 23:41:31 -0700218 * @param lockVersion the global transaction lock version
Jordan Halterman948d6592017-04-20 17:18:24 -0700219 * @return a stream of map updates
220 */
Jordan Halterman5f97a302017-04-26 23:41:31 -0700221 protected abstract Stream<MapUpdate<K, V>> records(Version lockVersion);
Jordan Halterman948d6592017-04-20 17:18:24 -0700222
223 @Override
224 public String toString() {
225 return MoreObjects.toStringHelper(this)
226 .add("backingMap", backingMap)
Jordan Halterman948d6592017-04-20 17:18:24 -0700227 .toString();
228 }
229}