blob: 2e6e6201579332f9b5430eef05906e96dbdf13aa [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import io.atomix.copycat.client.CopycatClient;
Madan Jampani65f24bb2016-03-15 15:16:18 -070019import io.atomix.resource.AbstractResource;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.ResourceTypeInfo;
21
22import java.util.Collection;
23import java.util.ConcurrentModificationException;
Madan Jampanifc981772016-02-16 09:46:42 -080024import java.util.List;
Madan Jampani0463cf92016-05-04 14:46:08 -070025import java.util.Map;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080026import java.util.Map.Entry;
Madan Jampani65f24bb2016-03-15 15:16:18 -070027import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080028import java.util.Set;
29import java.util.concurrent.CompletableFuture;
Madan Jampani0463cf92016-05-04 14:46:08 -070030import java.util.concurrent.Executor;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080031import java.util.concurrent.atomic.AtomicReference;
32import java.util.function.BiFunction;
Madan Jampani1d3b6172016-04-28 13:22:57 -070033import java.util.function.Consumer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080034import java.util.function.Predicate;
35
36import org.onlab.util.Match;
Madan Jampanicadd70b2016-02-08 13:45:43 -080037import org.onosproject.store.primitives.TransactionId;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080038import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
39import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
40import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
41import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
Madan Jampani542d9e22016-04-05 15:39:55 -070049import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080050import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
51import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080054import org.onosproject.store.service.AsyncConsistentMap;
55import org.onosproject.store.service.MapEvent;
56import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080057import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080058import org.onosproject.store.service.Versioned;
Madan Jampani1d3b6172016-04-28 13:22:57 -070059import com.google.common.collect.ImmutableSet;
Madan Jampani0463cf92016-05-04 14:46:08 -070060import com.google.common.collect.Maps;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080061import com.google.common.collect.Sets;
62
63/**
64 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
65 */
Madan Jampani65f24bb2016-03-15 15:16:18 -070066@ResourceTypeInfo(id = -151, factory = AtomixConsistentMapFactory.class)
67public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
Madan Jampani74da78b2016-02-09 21:18:36 -080068 implements AsyncConsistentMap<String, byte[]> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080069
Madan Jampani1d3b6172016-04-28 13:22:57 -070070 private final Set<Consumer<Status>> statusChangeListeners = Sets.newCopyOnWriteArraySet();
Madan Jampani0463cf92016-05-04 14:46:08 -070071 private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = Maps.newIdentityHashMap();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080072
Madan Jampanifc981772016-02-16 09:46:42 -080073 public static final String CHANGE_SUBJECT = "changeEvents";
Madan Jampani5e5b3d62016-02-01 16:03:33 -080074
Madan Jampani65f24bb2016-03-15 15:16:18 -070075 public AtomixConsistentMap(CopycatClient client, Properties properties) {
76 super(client, properties);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080077 }
78
79 @Override
80 public String name() {
81 return null;
82 }
83
84 @Override
85 public CompletableFuture<AtomixConsistentMap> open() {
86 return super.open().thenApply(result -> {
Madan Jampanifb786382016-06-13 10:25:35 -070087 client.onStateChange(state -> {
88 if (state == CopycatClient.State.CONNECTED && isListening()) {
89 client.submit(new Listen());
90 }
91 });
Madan Jampani3a9911c2016-02-21 11:25:45 -080092 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080093 return result;
94 });
95 }
96
Madan Jampanifc981772016-02-16 09:46:42 -080097 private void handleEvent(List<MapEvent<String, byte[]>> events) {
Madan Jampani0463cf92016-05-04 14:46:08 -070098 events.forEach(event ->
99 mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800100 }
101
102 @Override
103 public CompletableFuture<Boolean> isEmpty() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700104 return client.submit(new IsEmpty());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800105 }
106
107 @Override
108 public CompletableFuture<Integer> size() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700109 return client.submit(new Size());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800110 }
111
112 @Override
113 public CompletableFuture<Boolean> containsKey(String key) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700114 return client.submit(new ContainsKey(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800115 }
116
117 @Override
118 public CompletableFuture<Boolean> containsValue(byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700119 return client.submit(new ContainsValue(value));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800120 }
121
122 @Override
123 public CompletableFuture<Versioned<byte[]>> get(String key) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700124 return client.submit(new Get(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800125 }
126
127 @Override
128 public CompletableFuture<Set<String>> keySet() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700129 return client.submit(new KeySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800130 }
131
132 @Override
133 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700134 return client.submit(new Values());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800135 }
136
137 @Override
138 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700139 return client.submit(new EntrySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800140 }
141
142 @Override
143 @SuppressWarnings("unchecked")
144 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700145 return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800146 .whenComplete((r, e) -> throwIfLocked(r.status()))
147 .thenApply(v -> v.oldValue());
148 }
149
150 @Override
151 @SuppressWarnings("unchecked")
152 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700153 return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800154 .whenComplete((r, e) -> throwIfLocked(r.status()))
155 .thenApply(v -> v.newValue());
156 }
157
158 @Override
159 @SuppressWarnings("unchecked")
160 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700161 return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800162 .whenComplete((r, e) -> throwIfLocked(r.status()))
163 .thenApply(v -> v.oldValue());
164 }
165 @Override
166 @SuppressWarnings("unchecked")
167 public CompletableFuture<Versioned<byte[]>> remove(String key) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700168 return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800169 .whenComplete((r, e) -> throwIfLocked(r.status()))
170 .thenApply(v -> v.oldValue());
171 }
172
173 @Override
174 @SuppressWarnings("unchecked")
175 public CompletableFuture<Boolean> remove(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700176 return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800177 .whenComplete((r, e) -> throwIfLocked(r.status()))
178 .thenApply(v -> v.updated());
179 }
180
181 @Override
182 @SuppressWarnings("unchecked")
183 public CompletableFuture<Boolean> remove(String key, long version) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700184 return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800185 .whenComplete((r, e) -> throwIfLocked(r.status()))
186 .thenApply(v -> v.updated());
187 }
188
189 @Override
190 @SuppressWarnings("unchecked")
191 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700192 return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800193 .whenComplete((r, e) -> throwIfLocked(r.status()))
194 .thenApply(v -> v.oldValue());
195 }
196
197 @Override
198 @SuppressWarnings("unchecked")
199 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700200 return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800201 .whenComplete((r, e) -> throwIfLocked(r.status()))
202 .thenApply(v -> v.updated());
203 }
204
205 @Override
206 @SuppressWarnings("unchecked")
207 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700208 return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800209 .whenComplete((r, e) -> throwIfLocked(r.status()))
210 .thenApply(v -> v.updated());
211 }
212
213 @Override
214 public CompletableFuture<Void> clear() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700215 return client.submit(new Clear())
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800216 .whenComplete((r, e) -> throwIfLocked(r))
217 .thenApply(v -> null);
218 }
219
220 @Override
221 @SuppressWarnings("unchecked")
222 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
223 Predicate<? super byte[]> condition,
224 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
225 return get(key).thenCompose(r1 -> {
226 byte[] existingValue = r1 == null ? null : r1.value();
227 // if the condition evaluates to false, return existing value.
228 if (!condition.test(existingValue)) {
229 return CompletableFuture.completedFuture(r1);
230 }
231
232 AtomicReference<byte[]> computedValue = new AtomicReference<>();
233 // if remappingFunction throws an exception, return the exception.
234 try {
235 computedValue.set(remappingFunction.apply(key, existingValue));
236 } catch (Exception e) {
237 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
238 future.completeExceptionally(e);
239 return future;
240 }
241 if (computedValue.get() == null && r1 == null) {
242 return CompletableFuture.completedFuture(null);
243 }
244 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
245 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
Madan Jampani825a8b12016-06-06 19:42:01 -0700246 return client.submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800247 computedValue.get(),
248 valueMatch,
249 versionMatch))
250 .whenComplete((r, e) -> throwIfLocked(r.status()))
251 .thenApply(v -> v.newValue());
252 });
253 }
254
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800255 @Override
Madan Jampani0463cf92016-05-04 14:46:08 -0700256 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
257 Executor executor) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800258 if (mapEventListeners.isEmpty()) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700259 return client.submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800260 } else {
Madan Jampani0463cf92016-05-04 14:46:08 -0700261 mapEventListeners.put(listener, executor);
Madan Jampani40f022e2016-03-02 21:35:14 -0800262 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800263 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800264 }
265
266 @Override
267 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
Madan Jampani0463cf92016-05-04 14:46:08 -0700268 if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700269 return client.submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800270 }
271 return CompletableFuture.completedFuture(null);
272 }
273
274 private void throwIfLocked(MapEntryUpdateResult.Status status) {
275 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
276 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
277 }
278 }
279
Madan Jampanicadd70b2016-02-08 13:45:43 -0800280 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800281 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700282 return client.submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800283 }
284
285 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800286 public CompletableFuture<Void> commit(TransactionId transactionId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700287 return client.submit(new TransactionCommit(transactionId)).thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800288 }
289
290 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800291 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700292 return client.submit(new TransactionRollback(transactionId))
Madan Jampani74da78b2016-02-09 21:18:36 -0800293 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800294 }
Madan Jampani542d9e22016-04-05 15:39:55 -0700295
296 @Override
297 public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700298 return client.submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
Madan Jampani542d9e22016-04-05 15:39:55 -0700299 }
Madan Jampani1d3b6172016-04-28 13:22:57 -0700300
301 @Override
302 public void addStatusChangeListener(Consumer<Status> listener) {
303 statusChangeListeners.add(listener);
304 }
305
306 @Override
307 public void removeStatusChangeListener(Consumer<Status> listener) {
308 statusChangeListeners.remove(listener);
309 }
310
311 @Override
312 public Collection<Consumer<Status>> statusChangeListeners() {
313 return ImmutableSet.copyOf(statusChangeListeners);
314 }
Madan Jampanifb786382016-06-13 10:25:35 -0700315
316 private boolean isListening() {
317 return !mapEventListeners.isEmpty();
318 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800319}