blob: 2cb4a50f902056e6c431d874b5b8a527421082d2 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import io.atomix.copycat.client.CopycatClient;
Madan Jampani65f24bb2016-03-15 15:16:18 -070019import io.atomix.resource.AbstractResource;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.ResourceTypeInfo;
21
22import java.util.Collection;
23import java.util.ConcurrentModificationException;
Madan Jampanifc981772016-02-16 09:46:42 -080024import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080025import java.util.Map.Entry;
Madan Jampani65f24bb2016-03-15 15:16:18 -070026import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import java.util.Set;
28import java.util.concurrent.CompletableFuture;
29import java.util.concurrent.atomic.AtomicReference;
30import java.util.function.BiFunction;
Madan Jampani1d3b6172016-04-28 13:22:57 -070031import java.util.function.Consumer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080032import java.util.function.Predicate;
33
34import org.onlab.util.Match;
Madan Jampanicadd70b2016-02-08 13:45:43 -080035import org.onosproject.store.primitives.TransactionId;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080036import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
37import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
38import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
39import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
40import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
41import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
Madan Jampani542d9e22016-04-05 15:39:55 -070047import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080048import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
51import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080052import org.onosproject.store.service.AsyncConsistentMap;
53import org.onosproject.store.service.MapEvent;
54import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080055import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080056import org.onosproject.store.service.Versioned;
Madan Jampani1d3b6172016-04-28 13:22:57 -070057import com.google.common.collect.ImmutableSet;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080058import com.google.common.collect.Sets;
59
60/**
61 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
62 */
Madan Jampani65f24bb2016-03-15 15:16:18 -070063@ResourceTypeInfo(id = -151, factory = AtomixConsistentMapFactory.class)
64public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
Madan Jampani74da78b2016-02-09 21:18:36 -080065 implements AsyncConsistentMap<String, byte[]> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080066
Madan Jampani1d3b6172016-04-28 13:22:57 -070067 private final Set<Consumer<Status>> statusChangeListeners = Sets.newCopyOnWriteArraySet();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080068 private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
69
Madan Jampanifc981772016-02-16 09:46:42 -080070 public static final String CHANGE_SUBJECT = "changeEvents";
Madan Jampani5e5b3d62016-02-01 16:03:33 -080071
Madan Jampani65f24bb2016-03-15 15:16:18 -070072 public AtomixConsistentMap(CopycatClient client, Properties properties) {
73 super(client, properties);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080074 }
75
76 @Override
77 public String name() {
78 return null;
79 }
80
81 @Override
82 public CompletableFuture<AtomixConsistentMap> open() {
83 return super.open().thenApply(result -> {
Madan Jampani3a9911c2016-02-21 11:25:45 -080084 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080085 return result;
86 });
87 }
88
Madan Jampanifc981772016-02-16 09:46:42 -080089 private void handleEvent(List<MapEvent<String, byte[]>> events) {
Madan Jampani3780d4b2016-04-04 18:18:24 -070090 events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080091 }
92
93 @Override
94 public CompletableFuture<Boolean> isEmpty() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080095 return submit(new IsEmpty());
Madan Jampani5e5b3d62016-02-01 16:03:33 -080096 }
97
98 @Override
99 public CompletableFuture<Integer> size() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800100 return submit(new Size());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800101 }
102
103 @Override
104 public CompletableFuture<Boolean> containsKey(String key) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800105 return submit(new ContainsKey(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800106 }
107
108 @Override
109 public CompletableFuture<Boolean> containsValue(byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800110 return submit(new ContainsValue(value));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800111 }
112
113 @Override
114 public CompletableFuture<Versioned<byte[]>> get(String key) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800115 return submit(new Get(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800116 }
117
118 @Override
119 public CompletableFuture<Set<String>> keySet() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800120 return submit(new KeySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800121 }
122
123 @Override
124 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800125 return submit(new Values());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800126 }
127
128 @Override
129 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800130 return submit(new EntrySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800131 }
132
133 @Override
134 @SuppressWarnings("unchecked")
135 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800136 return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800137 .whenComplete((r, e) -> throwIfLocked(r.status()))
138 .thenApply(v -> v.oldValue());
139 }
140
141 @Override
142 @SuppressWarnings("unchecked")
143 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800144 return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800145 .whenComplete((r, e) -> throwIfLocked(r.status()))
146 .thenApply(v -> v.newValue());
147 }
148
149 @Override
150 @SuppressWarnings("unchecked")
151 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800152 return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800153 .whenComplete((r, e) -> throwIfLocked(r.status()))
154 .thenApply(v -> v.oldValue());
155 }
156 @Override
157 @SuppressWarnings("unchecked")
158 public CompletableFuture<Versioned<byte[]>> remove(String key) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800159 return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800160 .whenComplete((r, e) -> throwIfLocked(r.status()))
161 .thenApply(v -> v.oldValue());
162 }
163
164 @Override
165 @SuppressWarnings("unchecked")
166 public CompletableFuture<Boolean> remove(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800167 return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800168 .whenComplete((r, e) -> throwIfLocked(r.status()))
169 .thenApply(v -> v.updated());
170 }
171
172 @Override
173 @SuppressWarnings("unchecked")
174 public CompletableFuture<Boolean> remove(String key, long version) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800175 return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800176 .whenComplete((r, e) -> throwIfLocked(r.status()))
177 .thenApply(v -> v.updated());
178 }
179
180 @Override
181 @SuppressWarnings("unchecked")
182 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800183 return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800184 .whenComplete((r, e) -> throwIfLocked(r.status()))
185 .thenApply(v -> v.oldValue());
186 }
187
188 @Override
189 @SuppressWarnings("unchecked")
190 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800191 return submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800192 newValue,
193 Match.ifValue(oldValue),
194 Match.ANY))
195 .whenComplete((r, e) -> throwIfLocked(r.status()))
196 .thenApply(v -> v.updated());
197 }
198
199 @Override
200 @SuppressWarnings("unchecked")
201 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800202 return submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800203 newValue,
204 Match.ANY,
205 Match.ifValue(oldVersion)))
206 .whenComplete((r, e) -> throwIfLocked(r.status()))
207 .thenApply(v -> v.updated());
208 }
209
210 @Override
211 public CompletableFuture<Void> clear() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800212 return submit(new Clear())
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800213 .whenComplete((r, e) -> throwIfLocked(r))
214 .thenApply(v -> null);
215 }
216
217 @Override
218 @SuppressWarnings("unchecked")
219 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
220 Predicate<? super byte[]> condition,
221 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
222 return get(key).thenCompose(r1 -> {
223 byte[] existingValue = r1 == null ? null : r1.value();
224 // if the condition evaluates to false, return existing value.
225 if (!condition.test(existingValue)) {
226 return CompletableFuture.completedFuture(r1);
227 }
228
229 AtomicReference<byte[]> computedValue = new AtomicReference<>();
230 // if remappingFunction throws an exception, return the exception.
231 try {
232 computedValue.set(remappingFunction.apply(key, existingValue));
233 } catch (Exception e) {
234 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
235 future.completeExceptionally(e);
236 return future;
237 }
238 if (computedValue.get() == null && r1 == null) {
239 return CompletableFuture.completedFuture(null);
240 }
241 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
242 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800243 return submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800244 computedValue.get(),
245 valueMatch,
246 versionMatch))
247 .whenComplete((r, e) -> throwIfLocked(r.status()))
248 .thenApply(v -> v.newValue());
249 });
250 }
251
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800252 @Override
253 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800254 if (mapEventListeners.isEmpty()) {
255 return submit(new Listen()).thenRun(() -> mapEventListeners.add(listener));
256 } else {
Madan Jampani40f022e2016-03-02 21:35:14 -0800257 mapEventListeners.add(listener);
258 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800259 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800260 }
261
262 @Override
263 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
264 if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800265 return submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800266 }
267 return CompletableFuture.completedFuture(null);
268 }
269
270 private void throwIfLocked(MapEntryUpdateResult.Status status) {
271 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
272 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
273 }
274 }
275
Madan Jampanicadd70b2016-02-08 13:45:43 -0800276 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800277 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800278 return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800279 }
280
281 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800282 public CompletableFuture<Void> commit(TransactionId transactionId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800283 return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800284 }
285
286 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800287 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800288 return submit(new TransactionRollback(transactionId))
Madan Jampani74da78b2016-02-09 21:18:36 -0800289 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800290 }
Madan Jampani542d9e22016-04-05 15:39:55 -0700291
292 @Override
293 public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
294 return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
295 }
Madan Jampani1d3b6172016-04-28 13:22:57 -0700296
297 @Override
298 public void addStatusChangeListener(Consumer<Status> listener) {
299 statusChangeListeners.add(listener);
300 }
301
302 @Override
303 public void removeStatusChangeListener(Consumer<Status> listener) {
304 statusChangeListeners.remove(listener);
305 }
306
307 @Override
308 public Collection<Consumer<Status>> statusChangeListeners() {
309 return ImmutableSet.copyOf(statusChangeListeners);
310 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800311}