blob: 7220c3709279a0334604e2eba7bbb0407bf4225e [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import io.atomix.copycat.client.CopycatClient;
Madan Jampani65f24bb2016-03-15 15:16:18 -070019import io.atomix.resource.AbstractResource;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.ResourceTypeInfo;
21
Madan Jampani8e18c0e2016-06-20 14:08:16 -070022import java.util.concurrent.ConcurrentHashMap;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080023import java.util.Collection;
24import java.util.ConcurrentModificationException;
Madan Jampanifc981772016-02-16 09:46:42 -080025import java.util.List;
Madan Jampani0463cf92016-05-04 14:46:08 -070026import java.util.Map;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import java.util.Map.Entry;
Madan Jampani65f24bb2016-03-15 15:16:18 -070028import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080029import java.util.Set;
30import java.util.concurrent.CompletableFuture;
Madan Jampani0463cf92016-05-04 14:46:08 -070031import java.util.concurrent.Executor;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080032import java.util.concurrent.atomic.AtomicReference;
33import java.util.function.BiFunction;
Madan Jampani1d3b6172016-04-28 13:22:57 -070034import java.util.function.Consumer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080035import java.util.function.Predicate;
36
37import org.onlab.util.Match;
Madan Jampani80d2e972016-07-12 17:11:52 -070038import org.onlab.util.Tools;
Jordan Halterman948d6592017-04-20 17:18:24 -070039import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080040import org.onosproject.store.primitives.TransactionId;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080041import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
Jordan Haltermanf6272442017-04-20 02:18:08 -070046import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.GetOrDefault;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080047import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
Jordan Halterman948d6592017-04-20 17:18:24 -070051import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionBegin;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080052import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
Madan Jampani542d9e22016-04-05 15:39:55 -070054import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080055import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
56import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
57import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
58import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080059import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampani80d2e972016-07-12 17:11:52 -070060import org.onosproject.store.service.ConsistentMapException;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080061import org.onosproject.store.service.MapEvent;
62import org.onosproject.store.service.MapEventListener;
Jordan Halterman948d6592017-04-20 17:18:24 -070063import org.onosproject.store.service.TransactionLog;
64import org.onosproject.store.service.Version;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080065import org.onosproject.store.service.Versioned;
Madan Jampani8e18c0e2016-06-20 14:08:16 -070066
Madan Jampani1d3b6172016-04-28 13:22:57 -070067import com.google.common.collect.ImmutableSet;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080068import com.google.common.collect.Sets;
69
70/**
71 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
72 */
Madan Jampani65f24bb2016-03-15 15:16:18 -070073@ResourceTypeInfo(id = -151, factory = AtomixConsistentMapFactory.class)
74public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
Madan Jampani74da78b2016-02-09 21:18:36 -080075 implements AsyncConsistentMap<String, byte[]> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080076
Madan Jampani1d3b6172016-04-28 13:22:57 -070077 private final Set<Consumer<Status>> statusChangeListeners = Sets.newCopyOnWriteArraySet();
Madan Jampani8e18c0e2016-06-20 14:08:16 -070078 private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080079
Madan Jampanifc981772016-02-16 09:46:42 -080080 public static final String CHANGE_SUBJECT = "changeEvents";
Madan Jampani5e5b3d62016-02-01 16:03:33 -080081
Madan Jampani65f24bb2016-03-15 15:16:18 -070082 public AtomixConsistentMap(CopycatClient client, Properties properties) {
83 super(client, properties);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080084 }
85
86 @Override
87 public String name() {
88 return null;
89 }
90
91 @Override
92 public CompletableFuture<AtomixConsistentMap> open() {
93 return super.open().thenApply(result -> {
Madan Jampanifb786382016-06-13 10:25:35 -070094 client.onStateChange(state -> {
95 if (state == CopycatClient.State.CONNECTED && isListening()) {
96 client.submit(new Listen());
97 }
98 });
Madan Jampani3a9911c2016-02-21 11:25:45 -080099 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800100 return result;
101 });
102 }
103
Madan Jampanifc981772016-02-16 09:46:42 -0800104 private void handleEvent(List<MapEvent<String, byte[]>> events) {
Madan Jampani0463cf92016-05-04 14:46:08 -0700105 events.forEach(event ->
106 mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800107 }
108
109 @Override
110 public CompletableFuture<Boolean> isEmpty() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700111 return client.submit(new IsEmpty());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800112 }
113
114 @Override
115 public CompletableFuture<Integer> size() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700116 return client.submit(new Size());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800117 }
118
119 @Override
120 public CompletableFuture<Boolean> containsKey(String key) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700121 return client.submit(new ContainsKey(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800122 }
123
124 @Override
125 public CompletableFuture<Boolean> containsValue(byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700126 return client.submit(new ContainsValue(value));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800127 }
128
129 @Override
130 public CompletableFuture<Versioned<byte[]>> get(String key) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700131 return client.submit(new Get(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800132 }
133
134 @Override
Jordan Haltermanf6272442017-04-20 02:18:08 -0700135 public CompletableFuture<Versioned<byte[]>> getOrDefault(String key, byte[] defaultValue) {
136 return client.submit(new GetOrDefault(key, defaultValue));
137 }
138
139 @Override
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800140 public CompletableFuture<Set<String>> keySet() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700141 return client.submit(new KeySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800142 }
143
144 @Override
145 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700146 return client.submit(new Values());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800147 }
148
149 @Override
150 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700151 return client.submit(new EntrySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800152 }
153
154 @Override
155 @SuppressWarnings("unchecked")
156 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700157 return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800158 .whenComplete((r, e) -> throwIfLocked(r.status()))
159 .thenApply(v -> v.oldValue());
160 }
161
162 @Override
163 @SuppressWarnings("unchecked")
164 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700165 return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800166 .whenComplete((r, e) -> throwIfLocked(r.status()))
167 .thenApply(v -> v.newValue());
168 }
169
170 @Override
171 @SuppressWarnings("unchecked")
172 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700173 return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800174 .whenComplete((r, e) -> throwIfLocked(r.status()))
175 .thenApply(v -> v.oldValue());
176 }
177 @Override
178 @SuppressWarnings("unchecked")
179 public CompletableFuture<Versioned<byte[]>> remove(String key) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700180 return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800181 .whenComplete((r, e) -> throwIfLocked(r.status()))
182 .thenApply(v -> v.oldValue());
183 }
184
185 @Override
186 @SuppressWarnings("unchecked")
187 public CompletableFuture<Boolean> remove(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700188 return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800189 .whenComplete((r, e) -> throwIfLocked(r.status()))
190 .thenApply(v -> v.updated());
191 }
192
193 @Override
194 @SuppressWarnings("unchecked")
195 public CompletableFuture<Boolean> remove(String key, long version) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700196 return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800197 .whenComplete((r, e) -> throwIfLocked(r.status()))
198 .thenApply(v -> v.updated());
199 }
200
201 @Override
202 @SuppressWarnings("unchecked")
203 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700204 return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800205 .whenComplete((r, e) -> throwIfLocked(r.status()))
206 .thenApply(v -> v.oldValue());
207 }
208
209 @Override
210 @SuppressWarnings("unchecked")
211 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700212 return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800213 .whenComplete((r, e) -> throwIfLocked(r.status()))
214 .thenApply(v -> v.updated());
215 }
216
217 @Override
218 @SuppressWarnings("unchecked")
219 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700220 return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800221 .whenComplete((r, e) -> throwIfLocked(r.status()))
222 .thenApply(v -> v.updated());
223 }
224
225 @Override
226 public CompletableFuture<Void> clear() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700227 return client.submit(new Clear())
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800228 .whenComplete((r, e) -> throwIfLocked(r))
229 .thenApply(v -> null);
230 }
231
232 @Override
233 @SuppressWarnings("unchecked")
234 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
235 Predicate<? super byte[]> condition,
236 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
237 return get(key).thenCompose(r1 -> {
238 byte[] existingValue = r1 == null ? null : r1.value();
239 // if the condition evaluates to false, return existing value.
240 if (!condition.test(existingValue)) {
241 return CompletableFuture.completedFuture(r1);
242 }
243
244 AtomicReference<byte[]> computedValue = new AtomicReference<>();
245 // if remappingFunction throws an exception, return the exception.
246 try {
247 computedValue.set(remappingFunction.apply(key, existingValue));
248 } catch (Exception e) {
249 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
250 future.completeExceptionally(e);
251 return future;
252 }
253 if (computedValue.get() == null && r1 == null) {
254 return CompletableFuture.completedFuture(null);
255 }
256 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
257 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
Madan Jampani825a8b12016-06-06 19:42:01 -0700258 return client.submit(new UpdateAndGet(key,
Madan Jampani80d2e972016-07-12 17:11:52 -0700259 computedValue.get(),
260 valueMatch,
261 versionMatch))
262 .whenComplete((r, e) -> throwIfLocked(r.status()))
263 .thenCompose(r -> {
264 if (r.status() == MapEntryUpdateResult.Status.PRECONDITION_FAILED ||
265 r.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
266 return Tools.exceptionalFuture(new ConsistentMapException.ConcurrentModification());
267 }
268 return CompletableFuture.completedFuture(r);
269 })
270 .thenApply(v -> v.newValue());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800271 });
272 }
273
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800274 @Override
Madan Jampani0463cf92016-05-04 14:46:08 -0700275 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
276 Executor executor) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800277 if (mapEventListeners.isEmpty()) {
Aaron Kruglikov3e29f662016-07-13 10:18:10 -0700278 return client.submit(new Listen()).thenRun(() -> mapEventListeners.put(listener, executor));
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800279 } else {
Madan Jampani0463cf92016-05-04 14:46:08 -0700280 mapEventListeners.put(listener, executor);
Madan Jampani40f022e2016-03-02 21:35:14 -0800281 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800282 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800283 }
284
285 @Override
286 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
Madan Jampani0463cf92016-05-04 14:46:08 -0700287 if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700288 return client.submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800289 }
290 return CompletableFuture.completedFuture(null);
291 }
292
293 private void throwIfLocked(MapEntryUpdateResult.Status status) {
294 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
295 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
296 }
297 }
298
Madan Jampanicadd70b2016-02-08 13:45:43 -0800299 @Override
Jordan Halterman948d6592017-04-20 17:18:24 -0700300 public CompletableFuture<Version> begin(TransactionId transactionId) {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700301 return client.submit(new TransactionBegin(transactionId)).thenApply(Version::new);
Jordan Halterman948d6592017-04-20 17:18:24 -0700302 }
303
304 @Override
305 public CompletableFuture<Boolean> prepare(
306 TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
307 return client.submit(new TransactionPrepare(transactionLog))
308 .thenApply(v -> v == PrepareResult.OK);
309 }
310
311 @Override
312 public CompletableFuture<Boolean> prepareAndCommit(
313 TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
314 return client.submit(new TransactionPrepareAndCommit(transactionLog))
315 .thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800316 }
317
318 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800319 public CompletableFuture<Void> commit(TransactionId transactionId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700320 return client.submit(new TransactionCommit(transactionId)).thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800321 }
322
323 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800324 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Jordan Halterman948d6592017-04-20 17:18:24 -0700325 return client.submit(new TransactionRollback(transactionId)).thenApply(v -> null);
Madan Jampani542d9e22016-04-05 15:39:55 -0700326 }
Madan Jampani1d3b6172016-04-28 13:22:57 -0700327
328 @Override
329 public void addStatusChangeListener(Consumer<Status> listener) {
330 statusChangeListeners.add(listener);
331 }
332
333 @Override
334 public void removeStatusChangeListener(Consumer<Status> listener) {
335 statusChangeListeners.remove(listener);
336 }
337
338 @Override
339 public Collection<Consumer<Status>> statusChangeListeners() {
340 return ImmutableSet.copyOf(statusChangeListeners);
341 }
Madan Jampanifb786382016-06-13 10:25:35 -0700342
343 private boolean isListening() {
344 return !mapEventListeners.isEmpty();
345 }
Madan Jampani8e18c0e2016-06-20 14:08:16 -0700346}