blob: 46487f26acfcf638123a846a4df4e26120b479a1 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import io.atomix.catalyst.util.Listener;
19import io.atomix.copycat.client.CopycatClient;
20import io.atomix.resource.Consistency;
21import io.atomix.resource.Resource;
22import io.atomix.resource.ResourceTypeInfo;
23
24import java.util.Collection;
25import java.util.ConcurrentModificationException;
26import java.util.Map.Entry;
27import java.util.Set;
28import java.util.concurrent.CompletableFuture;
29import java.util.concurrent.atomic.AtomicReference;
30import java.util.function.BiFunction;
31import java.util.function.Predicate;
32
33import org.onlab.util.Match;
34import org.onosproject.store.service.AsyncConsistentMap;
35import org.onosproject.store.service.MapEvent;
36import org.onosproject.store.service.MapEventListener;
37import org.onosproject.store.service.Versioned;
38
39import com.google.common.collect.Sets;
40
41/**
42 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
43 */
44@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
45public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
46 implements AsyncConsistentMap<String, byte[]> {
47
48 private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
49
50 private static final String CHANGE_SUBJECT = "change";
51
52 public AtomixConsistentMap(CopycatClient client, Resource.Options options) {
53 super(client, options);
54 }
55
56 @Override
57 public String name() {
58 return null;
59 }
60
61 @Override
62 public CompletableFuture<AtomixConsistentMap> open() {
63 return super.open().thenApply(result -> {
64 client.session().onEvent(CHANGE_SUBJECT, this::handleEvent);
65 return result;
66 });
67 }
68
69 private void handleEvent(MapEvent<String, byte[]> event) {
70 mapEventListeners.forEach(listener -> listener.event(event));
71 }
72
73 @Override
74 public AtomixConsistentMap with(Consistency consistency) {
75 super.with(consistency);
76 return this;
77 }
78
79 @Override
80 public CompletableFuture<Boolean> isEmpty() {
81 return submit(new AtomixConsistentMapCommands.IsEmpty());
82 }
83
84 @Override
85 public CompletableFuture<Integer> size() {
86 return submit(new AtomixConsistentMapCommands.Size());
87 }
88
89 @Override
90 public CompletableFuture<Boolean> containsKey(String key) {
91 return submit(new AtomixConsistentMapCommands.ContainsKey(key));
92 }
93
94 @Override
95 public CompletableFuture<Boolean> containsValue(byte[] value) {
96 return submit(new AtomixConsistentMapCommands.ContainsValue(value));
97 }
98
99 @Override
100 public CompletableFuture<Versioned<byte[]>> get(String key) {
101 return submit(new AtomixConsistentMapCommands.Get(key));
102 }
103
104 @Override
105 public CompletableFuture<Set<String>> keySet() {
106 return submit(new AtomixConsistentMapCommands.KeySet());
107 }
108
109 @Override
110 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
111 return submit(new AtomixConsistentMapCommands.Values());
112 }
113
114 @Override
115 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
116 return submit(new AtomixConsistentMapCommands.EntrySet());
117 }
118
119 @Override
120 @SuppressWarnings("unchecked")
121 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
122 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
123 .whenComplete((r, e) -> throwIfLocked(r.status()))
124 .thenApply(v -> v.oldValue());
125 }
126
127 @Override
128 @SuppressWarnings("unchecked")
129 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
130 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
131 .whenComplete((r, e) -> throwIfLocked(r.status()))
132 .thenApply(v -> v.newValue());
133 }
134
135 @Override
136 @SuppressWarnings("unchecked")
137 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
138 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NULL, Match.ANY))
139 .whenComplete((r, e) -> throwIfLocked(r.status()))
140 .thenApply(v -> v.oldValue());
141 }
142 @Override
143 @SuppressWarnings("unchecked")
144 public CompletableFuture<Versioned<byte[]>> remove(String key) {
145 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ANY))
146 .whenComplete((r, e) -> throwIfLocked(r.status()))
147 .thenApply(v -> v.oldValue());
148 }
149
150 @Override
151 @SuppressWarnings("unchecked")
152 public CompletableFuture<Boolean> remove(String key, byte[] value) {
153 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
154 .whenComplete((r, e) -> throwIfLocked(r.status()))
155 .thenApply(v -> v.updated());
156 }
157
158 @Override
159 @SuppressWarnings("unchecked")
160 public CompletableFuture<Boolean> remove(String key, long version) {
161 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
162 .whenComplete((r, e) -> throwIfLocked(r.status()))
163 .thenApply(v -> v.updated());
164 }
165
166 @Override
167 @SuppressWarnings("unchecked")
168 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
169 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
170 .whenComplete((r, e) -> throwIfLocked(r.status()))
171 .thenApply(v -> v.oldValue());
172 }
173
174 @Override
175 @SuppressWarnings("unchecked")
176 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
177 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
178 newValue,
179 Match.ifValue(oldValue),
180 Match.ANY))
181 .whenComplete((r, e) -> throwIfLocked(r.status()))
182 .thenApply(v -> v.updated());
183 }
184
185 @Override
186 @SuppressWarnings("unchecked")
187 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
188 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
189 newValue,
190 Match.ANY,
191 Match.ifValue(oldVersion)))
192 .whenComplete((r, e) -> throwIfLocked(r.status()))
193 .thenApply(v -> v.updated());
194 }
195
196 @Override
197 public CompletableFuture<Void> clear() {
198 return submit(new AtomixConsistentMapCommands.Clear())
199 .whenComplete((r, e) -> throwIfLocked(r))
200 .thenApply(v -> null);
201 }
202
203 @Override
204 @SuppressWarnings("unchecked")
205 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
206 Predicate<? super byte[]> condition,
207 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
208 return get(key).thenCompose(r1 -> {
209 byte[] existingValue = r1 == null ? null : r1.value();
210 // if the condition evaluates to false, return existing value.
211 if (!condition.test(existingValue)) {
212 return CompletableFuture.completedFuture(r1);
213 }
214
215 AtomicReference<byte[]> computedValue = new AtomicReference<>();
216 // if remappingFunction throws an exception, return the exception.
217 try {
218 computedValue.set(remappingFunction.apply(key, existingValue));
219 } catch (Exception e) {
220 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
221 future.completeExceptionally(e);
222 return future;
223 }
224 if (computedValue.get() == null && r1 == null) {
225 return CompletableFuture.completedFuture(null);
226 }
227 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
228 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
229 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
230 computedValue.get(),
231 valueMatch,
232 versionMatch))
233 .whenComplete((r, e) -> throwIfLocked(r.status()))
234 .thenApply(v -> v.newValue());
235 });
236 }
237
238 public CompletableFuture<PrepareResult> prepare(TransactionalMapUpdate<String, byte[]> update) {
239 return submit(new AtomixConsistentMapCommands.TransactionPrepare(update));
240 }
241
242 public CompletableFuture<CommitResult> commit(TransactionId transactionId) {
243 return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId));
244 }
245
246 public CompletableFuture<RollbackResult> rollback(TransactionId transactionId) {
247 return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId));
248 }
249
250 @Override
251 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
252 if (!mapEventListeners.isEmpty()) {
253 if (mapEventListeners.add(listener)) {
254 return CompletableFuture.completedFuture(new ChangeListener(listener)).thenApply(v -> null);
255 } else {
256 return CompletableFuture.completedFuture(null);
257 }
258 }
259 mapEventListeners.add(listener);
260 return submit(new AtomixConsistentMapCommands.Listen()).thenApply(v -> null);
261 }
262
263 @Override
264 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
265 if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
266 return submit(new AtomixConsistentMapCommands.Unlisten()).thenApply(v -> null);
267 }
268 return CompletableFuture.completedFuture(null);
269 }
270
271 private void throwIfLocked(MapEntryUpdateResult.Status status) {
272 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
273 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
274 }
275 }
276
277 /**
278 * Change listener context.
279 */
280 private final class ChangeListener implements Listener<MapEvent<String, byte[]>> {
281 private final MapEventListener<String, byte[]> listener;
282
283 private ChangeListener(MapEventListener<String, byte[]> listener) {
284 this.listener = listener;
285 }
286
287 @Override
288 public void accept(MapEvent<String, byte[]> event) {
289 listener.event(event);
290 }
291
292 @Override
293 public void close() {
294 synchronized (AtomixConsistentMap.this) {
295 mapEventListeners.remove(listener);
296 if (mapEventListeners.isEmpty()) {
297 submit(new AtomixConsistentMapCommands.Unlisten());
298 }
299 }
300 }
301 }
302}