blob: bd9690cdcaecd71087acf8bc36ab46e6b1ec66db [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import io.atomix.catalyst.util.Listener;
19import io.atomix.copycat.client.CopycatClient;
20import io.atomix.resource.Consistency;
21import io.atomix.resource.Resource;
22import io.atomix.resource.ResourceTypeInfo;
23
24import java.util.Collection;
25import java.util.ConcurrentModificationException;
26import java.util.Map.Entry;
27import java.util.Set;
28import java.util.concurrent.CompletableFuture;
29import java.util.concurrent.atomic.AtomicReference;
30import java.util.function.BiFunction;
31import java.util.function.Predicate;
32
33import org.onlab.util.Match;
Madan Jampanicadd70b2016-02-08 13:45:43 -080034import org.onosproject.store.primitives.TransactionId;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080035import org.onosproject.store.service.AsyncConsistentMap;
36import org.onosproject.store.service.MapEvent;
37import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080038import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080039import org.onosproject.store.service.Versioned;
40
41import com.google.common.collect.Sets;
42
43/**
44 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
45 */
46@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
47public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
Madan Jampani74da78b2016-02-09 21:18:36 -080048 implements AsyncConsistentMap<String, byte[]> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080049
50 private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
51
52 private static final String CHANGE_SUBJECT = "change";
53
54 public AtomixConsistentMap(CopycatClient client, Resource.Options options) {
55 super(client, options);
56 }
57
58 @Override
59 public String name() {
60 return null;
61 }
62
63 @Override
64 public CompletableFuture<AtomixConsistentMap> open() {
65 return super.open().thenApply(result -> {
66 client.session().onEvent(CHANGE_SUBJECT, this::handleEvent);
67 return result;
68 });
69 }
70
71 private void handleEvent(MapEvent<String, byte[]> event) {
72 mapEventListeners.forEach(listener -> listener.event(event));
73 }
74
75 @Override
76 public AtomixConsistentMap with(Consistency consistency) {
77 super.with(consistency);
78 return this;
79 }
80
81 @Override
82 public CompletableFuture<Boolean> isEmpty() {
83 return submit(new AtomixConsistentMapCommands.IsEmpty());
84 }
85
86 @Override
87 public CompletableFuture<Integer> size() {
88 return submit(new AtomixConsistentMapCommands.Size());
89 }
90
91 @Override
92 public CompletableFuture<Boolean> containsKey(String key) {
93 return submit(new AtomixConsistentMapCommands.ContainsKey(key));
94 }
95
96 @Override
97 public CompletableFuture<Boolean> containsValue(byte[] value) {
98 return submit(new AtomixConsistentMapCommands.ContainsValue(value));
99 }
100
101 @Override
102 public CompletableFuture<Versioned<byte[]>> get(String key) {
103 return submit(new AtomixConsistentMapCommands.Get(key));
104 }
105
106 @Override
107 public CompletableFuture<Set<String>> keySet() {
108 return submit(new AtomixConsistentMapCommands.KeySet());
109 }
110
111 @Override
112 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
113 return submit(new AtomixConsistentMapCommands.Values());
114 }
115
116 @Override
117 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
118 return submit(new AtomixConsistentMapCommands.EntrySet());
119 }
120
121 @Override
122 @SuppressWarnings("unchecked")
123 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
124 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
125 .whenComplete((r, e) -> throwIfLocked(r.status()))
126 .thenApply(v -> v.oldValue());
127 }
128
129 @Override
130 @SuppressWarnings("unchecked")
131 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
132 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
133 .whenComplete((r, e) -> throwIfLocked(r.status()))
134 .thenApply(v -> v.newValue());
135 }
136
137 @Override
138 @SuppressWarnings("unchecked")
139 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
140 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NULL, Match.ANY))
141 .whenComplete((r, e) -> throwIfLocked(r.status()))
142 .thenApply(v -> v.oldValue());
143 }
144 @Override
145 @SuppressWarnings("unchecked")
146 public CompletableFuture<Versioned<byte[]>> remove(String key) {
147 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ANY))
148 .whenComplete((r, e) -> throwIfLocked(r.status()))
149 .thenApply(v -> v.oldValue());
150 }
151
152 @Override
153 @SuppressWarnings("unchecked")
154 public CompletableFuture<Boolean> remove(String key, byte[] value) {
155 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
156 .whenComplete((r, e) -> throwIfLocked(r.status()))
157 .thenApply(v -> v.updated());
158 }
159
160 @Override
161 @SuppressWarnings("unchecked")
162 public CompletableFuture<Boolean> remove(String key, long version) {
163 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
164 .whenComplete((r, e) -> throwIfLocked(r.status()))
165 .thenApply(v -> v.updated());
166 }
167
168 @Override
169 @SuppressWarnings("unchecked")
170 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
171 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
172 .whenComplete((r, e) -> throwIfLocked(r.status()))
173 .thenApply(v -> v.oldValue());
174 }
175
176 @Override
177 @SuppressWarnings("unchecked")
178 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
179 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
180 newValue,
181 Match.ifValue(oldValue),
182 Match.ANY))
183 .whenComplete((r, e) -> throwIfLocked(r.status()))
184 .thenApply(v -> v.updated());
185 }
186
187 @Override
188 @SuppressWarnings("unchecked")
189 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
190 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
191 newValue,
192 Match.ANY,
193 Match.ifValue(oldVersion)))
194 .whenComplete((r, e) -> throwIfLocked(r.status()))
195 .thenApply(v -> v.updated());
196 }
197
198 @Override
199 public CompletableFuture<Void> clear() {
200 return submit(new AtomixConsistentMapCommands.Clear())
201 .whenComplete((r, e) -> throwIfLocked(r))
202 .thenApply(v -> null);
203 }
204
205 @Override
206 @SuppressWarnings("unchecked")
207 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
208 Predicate<? super byte[]> condition,
209 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
210 return get(key).thenCompose(r1 -> {
211 byte[] existingValue = r1 == null ? null : r1.value();
212 // if the condition evaluates to false, return existing value.
213 if (!condition.test(existingValue)) {
214 return CompletableFuture.completedFuture(r1);
215 }
216
217 AtomicReference<byte[]> computedValue = new AtomicReference<>();
218 // if remappingFunction throws an exception, return the exception.
219 try {
220 computedValue.set(remappingFunction.apply(key, existingValue));
221 } catch (Exception e) {
222 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
223 future.completeExceptionally(e);
224 return future;
225 }
226 if (computedValue.get() == null && r1 == null) {
227 return CompletableFuture.completedFuture(null);
228 }
229 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
230 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
231 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
232 computedValue.get(),
233 valueMatch,
234 versionMatch))
235 .whenComplete((r, e) -> throwIfLocked(r.status()))
236 .thenApply(v -> v.newValue());
237 });
238 }
239
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800240 @Override
241 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
242 if (!mapEventListeners.isEmpty()) {
243 if (mapEventListeners.add(listener)) {
244 return CompletableFuture.completedFuture(new ChangeListener(listener)).thenApply(v -> null);
245 } else {
246 return CompletableFuture.completedFuture(null);
247 }
248 }
249 mapEventListeners.add(listener);
250 return submit(new AtomixConsistentMapCommands.Listen()).thenApply(v -> null);
251 }
252
253 @Override
254 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
255 if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
256 return submit(new AtomixConsistentMapCommands.Unlisten()).thenApply(v -> null);
257 }
258 return CompletableFuture.completedFuture(null);
259 }
260
261 private void throwIfLocked(MapEntryUpdateResult.Status status) {
262 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
263 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
264 }
265 }
266
Madan Jampanicadd70b2016-02-08 13:45:43 -0800267 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800268 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
269 return submit(new AtomixConsistentMapCommands.TransactionPrepare(transaction))
270 .thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800271 }
272
273 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800274 public CompletableFuture<Void> commit(TransactionId transactionId) {
275 return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId))
276 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800277 }
278
279 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800280 public CompletableFuture<Void> rollback(TransactionId transactionId) {
281 return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId))
282 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800283 }
284
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800285 /**
286 * Change listener context.
287 */
288 private final class ChangeListener implements Listener<MapEvent<String, byte[]>> {
289 private final MapEventListener<String, byte[]> listener;
290
291 private ChangeListener(MapEventListener<String, byte[]> listener) {
292 this.listener = listener;
293 }
294
295 @Override
296 public void accept(MapEvent<String, byte[]> event) {
297 listener.event(event);
298 }
299
300 @Override
301 public void close() {
302 synchronized (AtomixConsistentMap.this) {
303 mapEventListeners.remove(listener);
304 if (mapEventListeners.isEmpty()) {
305 submit(new AtomixConsistentMapCommands.Unlisten());
306 }
307 }
308 }
309 }
310}