blob: df567d28e03411931ae59d6e4a6f7d723b382071 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import io.atomix.copycat.client.CopycatClient;
Madan Jampani65f24bb2016-03-15 15:16:18 -070019import io.atomix.resource.AbstractResource;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.ResourceTypeInfo;
21
Madan Jampani8e18c0e2016-06-20 14:08:16 -070022import java.util.concurrent.ConcurrentHashMap;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080023import java.util.Collection;
24import java.util.ConcurrentModificationException;
Madan Jampanifc981772016-02-16 09:46:42 -080025import java.util.List;
Madan Jampani0463cf92016-05-04 14:46:08 -070026import java.util.Map;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import java.util.Map.Entry;
Madan Jampani65f24bb2016-03-15 15:16:18 -070028import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080029import java.util.Set;
30import java.util.concurrent.CompletableFuture;
Madan Jampani0463cf92016-05-04 14:46:08 -070031import java.util.concurrent.Executor;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080032import java.util.concurrent.atomic.AtomicReference;
33import java.util.function.BiFunction;
Madan Jampani1d3b6172016-04-28 13:22:57 -070034import java.util.function.Consumer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080035import java.util.function.Predicate;
36
37import org.onlab.util.Match;
Madan Jampani80d2e972016-07-12 17:11:52 -070038import org.onlab.util.Tools;
Madan Jampanicadd70b2016-02-08 13:45:43 -080039import org.onosproject.store.primitives.TransactionId;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080040import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
41import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
Madan Jampani542d9e22016-04-05 15:39:55 -070051import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080052import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
54import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
55import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080056import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampani80d2e972016-07-12 17:11:52 -070057import org.onosproject.store.service.ConsistentMapException;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080058import org.onosproject.store.service.MapEvent;
59import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080060import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080061import org.onosproject.store.service.Versioned;
Madan Jampani8e18c0e2016-06-20 14:08:16 -070062
Madan Jampani1d3b6172016-04-28 13:22:57 -070063import com.google.common.collect.ImmutableSet;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080064import com.google.common.collect.Sets;
65
66/**
67 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
68 */
Madan Jampani65f24bb2016-03-15 15:16:18 -070069@ResourceTypeInfo(id = -151, factory = AtomixConsistentMapFactory.class)
70public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
Madan Jampani74da78b2016-02-09 21:18:36 -080071 implements AsyncConsistentMap<String, byte[]> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080072
Madan Jampani1d3b6172016-04-28 13:22:57 -070073 private final Set<Consumer<Status>> statusChangeListeners = Sets.newCopyOnWriteArraySet();
Madan Jampani8e18c0e2016-06-20 14:08:16 -070074 private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080075
Madan Jampanifc981772016-02-16 09:46:42 -080076 public static final String CHANGE_SUBJECT = "changeEvents";
Madan Jampani5e5b3d62016-02-01 16:03:33 -080077
Madan Jampani65f24bb2016-03-15 15:16:18 -070078 public AtomixConsistentMap(CopycatClient client, Properties properties) {
79 super(client, properties);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080080 }
81
82 @Override
83 public String name() {
84 return null;
85 }
86
87 @Override
88 public CompletableFuture<AtomixConsistentMap> open() {
89 return super.open().thenApply(result -> {
Madan Jampanifb786382016-06-13 10:25:35 -070090 client.onStateChange(state -> {
91 if (state == CopycatClient.State.CONNECTED && isListening()) {
92 client.submit(new Listen());
93 }
94 });
Madan Jampani3a9911c2016-02-21 11:25:45 -080095 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080096 return result;
97 });
98 }
99
Madan Jampanifc981772016-02-16 09:46:42 -0800100 private void handleEvent(List<MapEvent<String, byte[]>> events) {
Madan Jampani0463cf92016-05-04 14:46:08 -0700101 events.forEach(event ->
102 mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800103 }
104
105 @Override
106 public CompletableFuture<Boolean> isEmpty() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700107 return client.submit(new IsEmpty());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800108 }
109
110 @Override
111 public CompletableFuture<Integer> size() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700112 return client.submit(new Size());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800113 }
114
115 @Override
116 public CompletableFuture<Boolean> containsKey(String key) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700117 return client.submit(new ContainsKey(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800118 }
119
120 @Override
121 public CompletableFuture<Boolean> containsValue(byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700122 return client.submit(new ContainsValue(value));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800123 }
124
125 @Override
126 public CompletableFuture<Versioned<byte[]>> get(String key) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700127 return client.submit(new Get(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800128 }
129
130 @Override
131 public CompletableFuture<Set<String>> keySet() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700132 return client.submit(new KeySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800133 }
134
135 @Override
136 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700137 return client.submit(new Values());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800138 }
139
140 @Override
141 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700142 return client.submit(new EntrySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800143 }
144
145 @Override
146 @SuppressWarnings("unchecked")
147 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700148 return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800149 .whenComplete((r, e) -> throwIfLocked(r.status()))
150 .thenApply(v -> v.oldValue());
151 }
152
153 @Override
154 @SuppressWarnings("unchecked")
155 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700156 return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800157 .whenComplete((r, e) -> throwIfLocked(r.status()))
158 .thenApply(v -> v.newValue());
159 }
160
161 @Override
162 @SuppressWarnings("unchecked")
163 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700164 return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800165 .whenComplete((r, e) -> throwIfLocked(r.status()))
166 .thenApply(v -> v.oldValue());
167 }
168 @Override
169 @SuppressWarnings("unchecked")
170 public CompletableFuture<Versioned<byte[]>> remove(String key) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700171 return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800172 .whenComplete((r, e) -> throwIfLocked(r.status()))
173 .thenApply(v -> v.oldValue());
174 }
175
176 @Override
177 @SuppressWarnings("unchecked")
178 public CompletableFuture<Boolean> remove(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700179 return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800180 .whenComplete((r, e) -> throwIfLocked(r.status()))
181 .thenApply(v -> v.updated());
182 }
183
184 @Override
185 @SuppressWarnings("unchecked")
186 public CompletableFuture<Boolean> remove(String key, long version) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700187 return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800188 .whenComplete((r, e) -> throwIfLocked(r.status()))
189 .thenApply(v -> v.updated());
190 }
191
192 @Override
193 @SuppressWarnings("unchecked")
194 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700195 return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800196 .whenComplete((r, e) -> throwIfLocked(r.status()))
197 .thenApply(v -> v.oldValue());
198 }
199
200 @Override
201 @SuppressWarnings("unchecked")
202 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700203 return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800204 .whenComplete((r, e) -> throwIfLocked(r.status()))
205 .thenApply(v -> v.updated());
206 }
207
208 @Override
209 @SuppressWarnings("unchecked")
210 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700211 return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800212 .whenComplete((r, e) -> throwIfLocked(r.status()))
213 .thenApply(v -> v.updated());
214 }
215
216 @Override
217 public CompletableFuture<Void> clear() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700218 return client.submit(new Clear())
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800219 .whenComplete((r, e) -> throwIfLocked(r))
220 .thenApply(v -> null);
221 }
222
223 @Override
224 @SuppressWarnings("unchecked")
225 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
226 Predicate<? super byte[]> condition,
227 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
228 return get(key).thenCompose(r1 -> {
229 byte[] existingValue = r1 == null ? null : r1.value();
230 // if the condition evaluates to false, return existing value.
231 if (!condition.test(existingValue)) {
232 return CompletableFuture.completedFuture(r1);
233 }
234
235 AtomicReference<byte[]> computedValue = new AtomicReference<>();
236 // if remappingFunction throws an exception, return the exception.
237 try {
238 computedValue.set(remappingFunction.apply(key, existingValue));
239 } catch (Exception e) {
240 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
241 future.completeExceptionally(e);
242 return future;
243 }
244 if (computedValue.get() == null && r1 == null) {
245 return CompletableFuture.completedFuture(null);
246 }
247 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
248 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
Madan Jampani825a8b12016-06-06 19:42:01 -0700249 return client.submit(new UpdateAndGet(key,
Madan Jampani80d2e972016-07-12 17:11:52 -0700250 computedValue.get(),
251 valueMatch,
252 versionMatch))
253 .whenComplete((r, e) -> throwIfLocked(r.status()))
254 .thenCompose(r -> {
255 if (r.status() == MapEntryUpdateResult.Status.PRECONDITION_FAILED ||
256 r.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
257 return Tools.exceptionalFuture(new ConsistentMapException.ConcurrentModification());
258 }
259 return CompletableFuture.completedFuture(r);
260 })
261 .thenApply(v -> v.newValue());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800262 });
263 }
264
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800265 @Override
Madan Jampani0463cf92016-05-04 14:46:08 -0700266 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
267 Executor executor) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800268 if (mapEventListeners.isEmpty()) {
Aaron Kruglikov3e29f662016-07-13 10:18:10 -0700269 return client.submit(new Listen()).thenRun(() -> mapEventListeners.put(listener, executor));
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800270 } else {
Madan Jampani0463cf92016-05-04 14:46:08 -0700271 mapEventListeners.put(listener, executor);
Madan Jampani40f022e2016-03-02 21:35:14 -0800272 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800273 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800274 }
275
276 @Override
277 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
Madan Jampani0463cf92016-05-04 14:46:08 -0700278 if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700279 return client.submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800280 }
281 return CompletableFuture.completedFuture(null);
282 }
283
284 private void throwIfLocked(MapEntryUpdateResult.Status status) {
285 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
286 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
287 }
288 }
289
Madan Jampanicadd70b2016-02-08 13:45:43 -0800290 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800291 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700292 return client.submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800293 }
294
295 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800296 public CompletableFuture<Void> commit(TransactionId transactionId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700297 return client.submit(new TransactionCommit(transactionId)).thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800298 }
299
300 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800301 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700302 return client.submit(new TransactionRollback(transactionId))
Madan Jampani74da78b2016-02-09 21:18:36 -0800303 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800304 }
Madan Jampani542d9e22016-04-05 15:39:55 -0700305
306 @Override
307 public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700308 return client.submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
Madan Jampani542d9e22016-04-05 15:39:55 -0700309 }
Madan Jampani1d3b6172016-04-28 13:22:57 -0700310
311 @Override
312 public void addStatusChangeListener(Consumer<Status> listener) {
313 statusChangeListeners.add(listener);
314 }
315
316 @Override
317 public void removeStatusChangeListener(Consumer<Status> listener) {
318 statusChangeListeners.remove(listener);
319 }
320
321 @Override
322 public Collection<Consumer<Status>> statusChangeListeners() {
323 return ImmutableSet.copyOf(statusChangeListeners);
324 }
Madan Jampanifb786382016-06-13 10:25:35 -0700325
326 private boolean isListening() {
327 return !mapEventListeners.isEmpty();
328 }
Madan Jampani8e18c0e2016-06-20 14:08:16 -0700329}