blob: 837e5acc708706de713b82f6df6d5f344a19eb30 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import io.atomix.copycat.client.CopycatClient;
Madan Jampani65f24bb2016-03-15 15:16:18 -070019import io.atomix.resource.AbstractResource;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.ResourceTypeInfo;
21
Madan Jampani8e18c0e2016-06-20 14:08:16 -070022import java.util.concurrent.ConcurrentHashMap;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080023import java.util.Collection;
24import java.util.ConcurrentModificationException;
Madan Jampanifc981772016-02-16 09:46:42 -080025import java.util.List;
Madan Jampani0463cf92016-05-04 14:46:08 -070026import java.util.Map;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import java.util.Map.Entry;
Madan Jampani65f24bb2016-03-15 15:16:18 -070028import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080029import java.util.Set;
30import java.util.concurrent.CompletableFuture;
Madan Jampani0463cf92016-05-04 14:46:08 -070031import java.util.concurrent.Executor;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080032import java.util.concurrent.atomic.AtomicReference;
33import java.util.function.BiFunction;
Madan Jampani1d3b6172016-04-28 13:22:57 -070034import java.util.function.Consumer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080035import java.util.function.Predicate;
36
37import org.onlab.util.Match;
Madan Jampanicadd70b2016-02-08 13:45:43 -080038import org.onosproject.store.primitives.TransactionId;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080039import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
40import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
41import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
Madan Jampani542d9e22016-04-05 15:39:55 -070050import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080051import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
54import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080055import org.onosproject.store.service.AsyncConsistentMap;
56import org.onosproject.store.service.MapEvent;
57import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080058import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080059import org.onosproject.store.service.Versioned;
Madan Jampani8e18c0e2016-06-20 14:08:16 -070060
Madan Jampani1d3b6172016-04-28 13:22:57 -070061import com.google.common.collect.ImmutableSet;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080062import com.google.common.collect.Sets;
63
64/**
65 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
66 */
Madan Jampani65f24bb2016-03-15 15:16:18 -070067@ResourceTypeInfo(id = -151, factory = AtomixConsistentMapFactory.class)
68public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
Madan Jampani74da78b2016-02-09 21:18:36 -080069 implements AsyncConsistentMap<String, byte[]> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080070
Madan Jampani1d3b6172016-04-28 13:22:57 -070071 private final Set<Consumer<Status>> statusChangeListeners = Sets.newCopyOnWriteArraySet();
Madan Jampani8e18c0e2016-06-20 14:08:16 -070072 private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080073
Madan Jampanifc981772016-02-16 09:46:42 -080074 public static final String CHANGE_SUBJECT = "changeEvents";
Madan Jampani5e5b3d62016-02-01 16:03:33 -080075
Madan Jampani65f24bb2016-03-15 15:16:18 -070076 public AtomixConsistentMap(CopycatClient client, Properties properties) {
77 super(client, properties);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080078 }
79
80 @Override
81 public String name() {
82 return null;
83 }
84
85 @Override
86 public CompletableFuture<AtomixConsistentMap> open() {
87 return super.open().thenApply(result -> {
Madan Jampanifb786382016-06-13 10:25:35 -070088 client.onStateChange(state -> {
89 if (state == CopycatClient.State.CONNECTED && isListening()) {
90 client.submit(new Listen());
91 }
92 });
Madan Jampani3a9911c2016-02-21 11:25:45 -080093 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080094 return result;
95 });
96 }
97
Madan Jampanifc981772016-02-16 09:46:42 -080098 private void handleEvent(List<MapEvent<String, byte[]>> events) {
Madan Jampani0463cf92016-05-04 14:46:08 -070099 events.forEach(event ->
100 mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800101 }
102
103 @Override
104 public CompletableFuture<Boolean> isEmpty() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700105 return client.submit(new IsEmpty());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800106 }
107
108 @Override
109 public CompletableFuture<Integer> size() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700110 return client.submit(new Size());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800111 }
112
113 @Override
114 public CompletableFuture<Boolean> containsKey(String key) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700115 return client.submit(new ContainsKey(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800116 }
117
118 @Override
119 public CompletableFuture<Boolean> containsValue(byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700120 return client.submit(new ContainsValue(value));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800121 }
122
123 @Override
124 public CompletableFuture<Versioned<byte[]>> get(String key) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700125 return client.submit(new Get(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800126 }
127
128 @Override
129 public CompletableFuture<Set<String>> keySet() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700130 return client.submit(new KeySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800131 }
132
133 @Override
134 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700135 return client.submit(new Values());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800136 }
137
138 @Override
139 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700140 return client.submit(new EntrySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800141 }
142
143 @Override
144 @SuppressWarnings("unchecked")
145 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700146 return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800147 .whenComplete((r, e) -> throwIfLocked(r.status()))
148 .thenApply(v -> v.oldValue());
149 }
150
151 @Override
152 @SuppressWarnings("unchecked")
153 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700154 return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800155 .whenComplete((r, e) -> throwIfLocked(r.status()))
156 .thenApply(v -> v.newValue());
157 }
158
159 @Override
160 @SuppressWarnings("unchecked")
161 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700162 return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800163 .whenComplete((r, e) -> throwIfLocked(r.status()))
164 .thenApply(v -> v.oldValue());
165 }
166 @Override
167 @SuppressWarnings("unchecked")
168 public CompletableFuture<Versioned<byte[]>> remove(String key) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700169 return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800170 .whenComplete((r, e) -> throwIfLocked(r.status()))
171 .thenApply(v -> v.oldValue());
172 }
173
174 @Override
175 @SuppressWarnings("unchecked")
176 public CompletableFuture<Boolean> remove(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700177 return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800178 .whenComplete((r, e) -> throwIfLocked(r.status()))
179 .thenApply(v -> v.updated());
180 }
181
182 @Override
183 @SuppressWarnings("unchecked")
184 public CompletableFuture<Boolean> remove(String key, long version) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700185 return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800186 .whenComplete((r, e) -> throwIfLocked(r.status()))
187 .thenApply(v -> v.updated());
188 }
189
190 @Override
191 @SuppressWarnings("unchecked")
192 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700193 return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800194 .whenComplete((r, e) -> throwIfLocked(r.status()))
195 .thenApply(v -> v.oldValue());
196 }
197
198 @Override
199 @SuppressWarnings("unchecked")
200 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700201 return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800202 .whenComplete((r, e) -> throwIfLocked(r.status()))
203 .thenApply(v -> v.updated());
204 }
205
206 @Override
207 @SuppressWarnings("unchecked")
208 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700209 return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800210 .whenComplete((r, e) -> throwIfLocked(r.status()))
211 .thenApply(v -> v.updated());
212 }
213
214 @Override
215 public CompletableFuture<Void> clear() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700216 return client.submit(new Clear())
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800217 .whenComplete((r, e) -> throwIfLocked(r))
218 .thenApply(v -> null);
219 }
220
221 @Override
222 @SuppressWarnings("unchecked")
223 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
224 Predicate<? super byte[]> condition,
225 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
226 return get(key).thenCompose(r1 -> {
227 byte[] existingValue = r1 == null ? null : r1.value();
228 // if the condition evaluates to false, return existing value.
229 if (!condition.test(existingValue)) {
230 return CompletableFuture.completedFuture(r1);
231 }
232
233 AtomicReference<byte[]> computedValue = new AtomicReference<>();
234 // if remappingFunction throws an exception, return the exception.
235 try {
236 computedValue.set(remappingFunction.apply(key, existingValue));
237 } catch (Exception e) {
238 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
239 future.completeExceptionally(e);
240 return future;
241 }
242 if (computedValue.get() == null && r1 == null) {
243 return CompletableFuture.completedFuture(null);
244 }
245 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
246 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
Madan Jampani825a8b12016-06-06 19:42:01 -0700247 return client.submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800248 computedValue.get(),
249 valueMatch,
250 versionMatch))
251 .whenComplete((r, e) -> throwIfLocked(r.status()))
252 .thenApply(v -> v.newValue());
253 });
254 }
255
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800256 @Override
Madan Jampani0463cf92016-05-04 14:46:08 -0700257 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
258 Executor executor) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800259 if (mapEventListeners.isEmpty()) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700260 return client.submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800261 } else {
Madan Jampani0463cf92016-05-04 14:46:08 -0700262 mapEventListeners.put(listener, executor);
Madan Jampani40f022e2016-03-02 21:35:14 -0800263 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800264 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800265 }
266
267 @Override
268 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
Madan Jampani0463cf92016-05-04 14:46:08 -0700269 if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700270 return client.submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800271 }
272 return CompletableFuture.completedFuture(null);
273 }
274
275 private void throwIfLocked(MapEntryUpdateResult.Status status) {
276 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
277 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
278 }
279 }
280
Madan Jampanicadd70b2016-02-08 13:45:43 -0800281 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800282 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700283 return client.submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800284 }
285
286 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800287 public CompletableFuture<Void> commit(TransactionId transactionId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700288 return client.submit(new TransactionCommit(transactionId)).thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800289 }
290
291 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800292 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700293 return client.submit(new TransactionRollback(transactionId))
Madan Jampani74da78b2016-02-09 21:18:36 -0800294 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800295 }
Madan Jampani542d9e22016-04-05 15:39:55 -0700296
297 @Override
298 public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700299 return client.submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
Madan Jampani542d9e22016-04-05 15:39:55 -0700300 }
Madan Jampani1d3b6172016-04-28 13:22:57 -0700301
302 @Override
303 public void addStatusChangeListener(Consumer<Status> listener) {
304 statusChangeListeners.add(listener);
305 }
306
307 @Override
308 public void removeStatusChangeListener(Consumer<Status> listener) {
309 statusChangeListeners.remove(listener);
310 }
311
312 @Override
313 public Collection<Consumer<Status>> statusChangeListeners() {
314 return ImmutableSet.copyOf(statusChangeListeners);
315 }
Madan Jampanifb786382016-06-13 10:25:35 -0700316
317 private boolean isListening() {
318 return !mapEventListeners.isEmpty();
319 }
Madan Jampani8e18c0e2016-06-20 14:08:16 -0700320}