blob: 0f0968cce8768a1787e19ea3387ad2bd126ba9f0 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import io.atomix.copycat.client.CopycatClient;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080019import io.atomix.resource.Resource;
20import io.atomix.resource.ResourceTypeInfo;
21
22import java.util.Collection;
23import java.util.ConcurrentModificationException;
Madan Jampanifc981772016-02-16 09:46:42 -080024import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080025import java.util.Map.Entry;
26import java.util.Set;
27import java.util.concurrent.CompletableFuture;
28import java.util.concurrent.atomic.AtomicReference;
29import java.util.function.BiFunction;
30import java.util.function.Predicate;
31
32import org.onlab.util.Match;
Madan Jampanicadd70b2016-02-08 13:45:43 -080033import org.onosproject.store.primitives.TransactionId;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080034import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
35import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
36import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
37import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
38import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
39import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
40import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
41import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080049import org.onosproject.store.service.AsyncConsistentMap;
50import org.onosproject.store.service.MapEvent;
51import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080052import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080053import org.onosproject.store.service.Versioned;
54
55import com.google.common.collect.Sets;
56
57/**
58 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
59 */
Madan Jampani86cb2432016-02-17 11:07:56 -080060@ResourceTypeInfo(id = -151,
61 stateMachine = AtomixConsistentMapState.class,
62 typeResolver = AtomixConsistentMapCommands.TypeResolver.class)
63public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
Madan Jampani74da78b2016-02-09 21:18:36 -080064 implements AsyncConsistentMap<String, byte[]> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080065
66 private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
67
Madan Jampanifc981772016-02-16 09:46:42 -080068 public static final String CHANGE_SUBJECT = "changeEvents";
Madan Jampani5e5b3d62016-02-01 16:03:33 -080069
70 public AtomixConsistentMap(CopycatClient client, Resource.Options options) {
71 super(client, options);
72 }
73
74 @Override
75 public String name() {
76 return null;
77 }
78
79 @Override
80 public CompletableFuture<AtomixConsistentMap> open() {
81 return super.open().thenApply(result -> {
Madan Jampani3a9911c2016-02-21 11:25:45 -080082 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080083 return result;
84 });
85 }
86
Madan Jampanifc981772016-02-16 09:46:42 -080087 private void handleEvent(List<MapEvent<String, byte[]>> events) {
88 events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080089 }
90
91 @Override
92 public CompletableFuture<Boolean> isEmpty() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080093 return submit(new IsEmpty());
Madan Jampani5e5b3d62016-02-01 16:03:33 -080094 }
95
96 @Override
97 public CompletableFuture<Integer> size() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080098 return submit(new Size());
Madan Jampani5e5b3d62016-02-01 16:03:33 -080099 }
100
101 @Override
102 public CompletableFuture<Boolean> containsKey(String key) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800103 return submit(new ContainsKey(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800104 }
105
106 @Override
107 public CompletableFuture<Boolean> containsValue(byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800108 return submit(new ContainsValue(value));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800109 }
110
111 @Override
112 public CompletableFuture<Versioned<byte[]>> get(String key) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800113 return submit(new Get(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800114 }
115
116 @Override
117 public CompletableFuture<Set<String>> keySet() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800118 return submit(new KeySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800119 }
120
121 @Override
122 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800123 return submit(new Values());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800124 }
125
126 @Override
127 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800128 return submit(new EntrySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800129 }
130
131 @Override
132 @SuppressWarnings("unchecked")
133 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800134 return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800135 .whenComplete((r, e) -> throwIfLocked(r.status()))
136 .thenApply(v -> v.oldValue());
137 }
138
139 @Override
140 @SuppressWarnings("unchecked")
141 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800142 return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800143 .whenComplete((r, e) -> throwIfLocked(r.status()))
144 .thenApply(v -> v.newValue());
145 }
146
147 @Override
148 @SuppressWarnings("unchecked")
149 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800150 return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800151 .whenComplete((r, e) -> throwIfLocked(r.status()))
152 .thenApply(v -> v.oldValue());
153 }
154 @Override
155 @SuppressWarnings("unchecked")
156 public CompletableFuture<Versioned<byte[]>> remove(String key) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800157 return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800158 .whenComplete((r, e) -> throwIfLocked(r.status()))
159 .thenApply(v -> v.oldValue());
160 }
161
162 @Override
163 @SuppressWarnings("unchecked")
164 public CompletableFuture<Boolean> remove(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800165 return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800166 .whenComplete((r, e) -> throwIfLocked(r.status()))
167 .thenApply(v -> v.updated());
168 }
169
170 @Override
171 @SuppressWarnings("unchecked")
172 public CompletableFuture<Boolean> remove(String key, long version) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800173 return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800174 .whenComplete((r, e) -> throwIfLocked(r.status()))
175 .thenApply(v -> v.updated());
176 }
177
178 @Override
179 @SuppressWarnings("unchecked")
180 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800181 return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800182 .whenComplete((r, e) -> throwIfLocked(r.status()))
183 .thenApply(v -> v.oldValue());
184 }
185
186 @Override
187 @SuppressWarnings("unchecked")
188 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800189 return submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800190 newValue,
191 Match.ifValue(oldValue),
192 Match.ANY))
193 .whenComplete((r, e) -> throwIfLocked(r.status()))
194 .thenApply(v -> v.updated());
195 }
196
197 @Override
198 @SuppressWarnings("unchecked")
199 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800200 return submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800201 newValue,
202 Match.ANY,
203 Match.ifValue(oldVersion)))
204 .whenComplete((r, e) -> throwIfLocked(r.status()))
205 .thenApply(v -> v.updated());
206 }
207
208 @Override
209 public CompletableFuture<Void> clear() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800210 return submit(new Clear())
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800211 .whenComplete((r, e) -> throwIfLocked(r))
212 .thenApply(v -> null);
213 }
214
215 @Override
216 @SuppressWarnings("unchecked")
217 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
218 Predicate<? super byte[]> condition,
219 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
220 return get(key).thenCompose(r1 -> {
221 byte[] existingValue = r1 == null ? null : r1.value();
222 // if the condition evaluates to false, return existing value.
223 if (!condition.test(existingValue)) {
224 return CompletableFuture.completedFuture(r1);
225 }
226
227 AtomicReference<byte[]> computedValue = new AtomicReference<>();
228 // if remappingFunction throws an exception, return the exception.
229 try {
230 computedValue.set(remappingFunction.apply(key, existingValue));
231 } catch (Exception e) {
232 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
233 future.completeExceptionally(e);
234 return future;
235 }
236 if (computedValue.get() == null && r1 == null) {
237 return CompletableFuture.completedFuture(null);
238 }
239 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
240 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800241 return submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800242 computedValue.get(),
243 valueMatch,
244 versionMatch))
245 .whenComplete((r, e) -> throwIfLocked(r.status()))
246 .thenApply(v -> v.newValue());
247 });
248 }
249
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800250 @Override
251 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800252 if (mapEventListeners.isEmpty()) {
253 return submit(new Listen()).thenRun(() -> mapEventListeners.add(listener));
254 } else {
Madan Jampani40f022e2016-03-02 21:35:14 -0800255 mapEventListeners.add(listener);
256 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800257 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800258 }
259
260 @Override
261 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
262 if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800263 return submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800264 }
265 return CompletableFuture.completedFuture(null);
266 }
267
268 private void throwIfLocked(MapEntryUpdateResult.Status status) {
269 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
270 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
271 }
272 }
273
Madan Jampanicadd70b2016-02-08 13:45:43 -0800274 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800275 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800276 return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800277 }
278
279 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800280 public CompletableFuture<Void> commit(TransactionId transactionId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800281 return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800282 }
283
284 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800285 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800286 return submit(new TransactionRollback(transactionId))
Madan Jampani74da78b2016-02-09 21:18:36 -0800287 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800288 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800289}