blob: 23d2f86a92a8711f76101c9076487cb88ebdb783 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampanid5b200f2016-06-06 17:15:25 -070018import io.atomix.copycat.Operation;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080019import io.atomix.copycat.client.CopycatClient;
Madan Jampani65f24bb2016-03-15 15:16:18 -070020import io.atomix.resource.AbstractResource;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080021import io.atomix.resource.ResourceTypeInfo;
22
23import java.util.Collection;
24import java.util.ConcurrentModificationException;
Madan Jampanifc981772016-02-16 09:46:42 -080025import java.util.List;
Madan Jampani0463cf92016-05-04 14:46:08 -070026import java.util.Map;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import java.util.Map.Entry;
Madan Jampani65f24bb2016-03-15 15:16:18 -070028import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080029import java.util.Set;
30import java.util.concurrent.CompletableFuture;
Madan Jampani0463cf92016-05-04 14:46:08 -070031import java.util.concurrent.Executor;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080032import java.util.concurrent.atomic.AtomicReference;
33import java.util.function.BiFunction;
Madan Jampani1d3b6172016-04-28 13:22:57 -070034import java.util.function.Consumer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080035import java.util.function.Predicate;
36
37import org.onlab.util.Match;
Madan Jampanid5b200f2016-06-06 17:15:25 -070038import org.onlab.util.Tools;
Madan Jampanicadd70b2016-02-08 13:45:43 -080039import org.onosproject.store.primitives.TransactionId;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080040import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
41import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
Madan Jampani542d9e22016-04-05 15:39:55 -070051import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080052import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
54import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
55import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080056import org.onosproject.store.service.AsyncConsistentMap;
57import org.onosproject.store.service.MapEvent;
58import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080059import org.onosproject.store.service.MapTransaction;
Madan Jampanid5b200f2016-06-06 17:15:25 -070060import org.onosproject.store.service.StorageException;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080061import org.onosproject.store.service.Versioned;
Madan Jampani0463cf92016-05-04 14:46:08 -070062
Madan Jampani1d3b6172016-04-28 13:22:57 -070063import com.google.common.collect.ImmutableSet;
Madan Jampani0463cf92016-05-04 14:46:08 -070064import com.google.common.collect.Maps;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080065import com.google.common.collect.Sets;
66
67/**
68 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
69 */
Madan Jampani65f24bb2016-03-15 15:16:18 -070070@ResourceTypeInfo(id = -151, factory = AtomixConsistentMapFactory.class)
71public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
Madan Jampani74da78b2016-02-09 21:18:36 -080072 implements AsyncConsistentMap<String, byte[]> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080073
Madan Jampani1d3b6172016-04-28 13:22:57 -070074 private final Set<Consumer<Status>> statusChangeListeners = Sets.newCopyOnWriteArraySet();
Madan Jampani0463cf92016-05-04 14:46:08 -070075 private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = Maps.newIdentityHashMap();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080076
Madan Jampanifc981772016-02-16 09:46:42 -080077 public static final String CHANGE_SUBJECT = "changeEvents";
Madan Jampani5e5b3d62016-02-01 16:03:33 -080078
Madan Jampani65f24bb2016-03-15 15:16:18 -070079 public AtomixConsistentMap(CopycatClient client, Properties properties) {
80 super(client, properties);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080081 }
82
83 @Override
84 public String name() {
85 return null;
86 }
87
88 @Override
89 public CompletableFuture<AtomixConsistentMap> open() {
90 return super.open().thenApply(result -> {
Madan Jampani3a9911c2016-02-21 11:25:45 -080091 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080092 return result;
93 });
94 }
95
Madan Jampanifc981772016-02-16 09:46:42 -080096 private void handleEvent(List<MapEvent<String, byte[]>> events) {
Madan Jampani0463cf92016-05-04 14:46:08 -070097 events.forEach(event ->
98 mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080099 }
100
101 @Override
102 public CompletableFuture<Boolean> isEmpty() {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700103 return submit(new IsEmpty());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800104 }
105
106 @Override
107 public CompletableFuture<Integer> size() {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700108 return submit(new Size());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800109 }
110
111 @Override
112 public CompletableFuture<Boolean> containsKey(String key) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700113 return submit(new ContainsKey(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800114 }
115
116 @Override
117 public CompletableFuture<Boolean> containsValue(byte[] value) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700118 return submit(new ContainsValue(value));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800119 }
120
121 @Override
122 public CompletableFuture<Versioned<byte[]>> get(String key) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700123 return submit(new Get(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800124 }
125
126 @Override
127 public CompletableFuture<Set<String>> keySet() {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700128 return submit(new KeySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800129 }
130
131 @Override
132 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700133 return submit(new Values());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800134 }
135
136 @Override
137 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700138 return submit(new EntrySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800139 }
140
141 @Override
142 @SuppressWarnings("unchecked")
143 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700144 return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800145 .whenComplete((r, e) -> throwIfLocked(r.status()))
146 .thenApply(v -> v.oldValue());
147 }
148
149 @Override
150 @SuppressWarnings("unchecked")
151 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700152 return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800153 .whenComplete((r, e) -> throwIfLocked(r.status()))
154 .thenApply(v -> v.newValue());
155 }
156
157 @Override
158 @SuppressWarnings("unchecked")
159 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700160 return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800161 .whenComplete((r, e) -> throwIfLocked(r.status()))
162 .thenApply(v -> v.oldValue());
163 }
164 @Override
165 @SuppressWarnings("unchecked")
166 public CompletableFuture<Versioned<byte[]>> remove(String key) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700167 return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800168 .whenComplete((r, e) -> throwIfLocked(r.status()))
169 .thenApply(v -> v.oldValue());
170 }
171
172 @Override
173 @SuppressWarnings("unchecked")
174 public CompletableFuture<Boolean> remove(String key, byte[] value) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700175 return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800176 .whenComplete((r, e) -> throwIfLocked(r.status()))
177 .thenApply(v -> v.updated());
178 }
179
180 @Override
181 @SuppressWarnings("unchecked")
182 public CompletableFuture<Boolean> remove(String key, long version) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700183 return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800184 .whenComplete((r, e) -> throwIfLocked(r.status()))
185 .thenApply(v -> v.updated());
186 }
187
188 @Override
189 @SuppressWarnings("unchecked")
190 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700191 return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800192 .whenComplete((r, e) -> throwIfLocked(r.status()))
193 .thenApply(v -> v.oldValue());
194 }
195
196 @Override
197 @SuppressWarnings("unchecked")
198 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700199 return submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800200 .whenComplete((r, e) -> throwIfLocked(r.status()))
201 .thenApply(v -> v.updated());
202 }
203
204 @Override
205 @SuppressWarnings("unchecked")
206 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700207 return submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800208 .whenComplete((r, e) -> throwIfLocked(r.status()))
209 .thenApply(v -> v.updated());
210 }
211
212 @Override
213 public CompletableFuture<Void> clear() {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700214 return submit(new Clear())
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800215 .whenComplete((r, e) -> throwIfLocked(r))
216 .thenApply(v -> null);
217 }
218
219 @Override
220 @SuppressWarnings("unchecked")
221 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
222 Predicate<? super byte[]> condition,
223 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
224 return get(key).thenCompose(r1 -> {
225 byte[] existingValue = r1 == null ? null : r1.value();
226 // if the condition evaluates to false, return existing value.
227 if (!condition.test(existingValue)) {
228 return CompletableFuture.completedFuture(r1);
229 }
230
231 AtomicReference<byte[]> computedValue = new AtomicReference<>();
232 // if remappingFunction throws an exception, return the exception.
233 try {
234 computedValue.set(remappingFunction.apply(key, existingValue));
235 } catch (Exception e) {
236 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
237 future.completeExceptionally(e);
238 return future;
239 }
240 if (computedValue.get() == null && r1 == null) {
241 return CompletableFuture.completedFuture(null);
242 }
243 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
244 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
Madan Jampanid5b200f2016-06-06 17:15:25 -0700245 return submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800246 computedValue.get(),
247 valueMatch,
248 versionMatch))
249 .whenComplete((r, e) -> throwIfLocked(r.status()))
250 .thenApply(v -> v.newValue());
251 });
252 }
253
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800254 @Override
Madan Jampani0463cf92016-05-04 14:46:08 -0700255 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
256 Executor executor) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800257 if (mapEventListeners.isEmpty()) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700258 return submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800259 } else {
Madan Jampani0463cf92016-05-04 14:46:08 -0700260 mapEventListeners.put(listener, executor);
Madan Jampani40f022e2016-03-02 21:35:14 -0800261 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800262 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800263 }
264
265 @Override
266 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
Madan Jampani0463cf92016-05-04 14:46:08 -0700267 if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700268 return submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800269 }
270 return CompletableFuture.completedFuture(null);
271 }
272
273 private void throwIfLocked(MapEntryUpdateResult.Status status) {
274 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
275 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
276 }
277 }
278
Madan Jampanicadd70b2016-02-08 13:45:43 -0800279 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800280 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700281 return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800282 }
283
284 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800285 public CompletableFuture<Void> commit(TransactionId transactionId) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700286 return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800287 }
288
289 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800290 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700291 return submit(new TransactionRollback(transactionId))
Madan Jampani74da78b2016-02-09 21:18:36 -0800292 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800293 }
Madan Jampani542d9e22016-04-05 15:39:55 -0700294
295 @Override
296 public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
Madan Jampanid5b200f2016-06-06 17:15:25 -0700297 return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
Madan Jampani542d9e22016-04-05 15:39:55 -0700298 }
Madan Jampani1d3b6172016-04-28 13:22:57 -0700299
300 @Override
301 public void addStatusChangeListener(Consumer<Status> listener) {
302 statusChangeListeners.add(listener);
303 }
304
305 @Override
306 public void removeStatusChangeListener(Consumer<Status> listener) {
307 statusChangeListeners.remove(listener);
308 }
309
310 @Override
311 public Collection<Consumer<Status>> statusChangeListeners() {
312 return ImmutableSet.copyOf(statusChangeListeners);
313 }
Madan Jampanid5b200f2016-06-06 17:15:25 -0700314
315 <T> CompletableFuture<T> submit(Operation<T> command) {
316 if (client.state() == CopycatClient.State.SUSPENDED || client.state() == CopycatClient.State.CLOSED) {
317 return Tools.exceptionalFuture(new StorageException.Unavailable());
318 }
319 return client.submit(command);
320 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800321}