blob: 5342d741918f2ff304fdce7169f35d88805d4cb9 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import io.atomix.catalyst.util.Listener;
19import io.atomix.copycat.client.CopycatClient;
20import io.atomix.resource.Consistency;
21import io.atomix.resource.Resource;
22import io.atomix.resource.ResourceTypeInfo;
23
24import java.util.Collection;
25import java.util.ConcurrentModificationException;
26import java.util.Map.Entry;
27import java.util.Set;
28import java.util.concurrent.CompletableFuture;
29import java.util.concurrent.atomic.AtomicReference;
30import java.util.function.BiFunction;
31import java.util.function.Predicate;
32
33import org.onlab.util.Match;
Madan Jampanicadd70b2016-02-08 13:45:43 -080034import org.onosproject.store.primitives.TransactionId;
35import org.onosproject.store.primitives.impl.Transaction;
36import org.onosproject.store.primitives.impl.TransactionParticipant;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080037import org.onosproject.store.service.AsyncConsistentMap;
38import org.onosproject.store.service.MapEvent;
39import org.onosproject.store.service.MapEventListener;
40import org.onosproject.store.service.Versioned;
41
42import com.google.common.collect.Sets;
43
44/**
45 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
46 */
47@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
48public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
Madan Jampanicadd70b2016-02-08 13:45:43 -080049 implements AsyncConsistentMap<String, byte[]>, TransactionParticipant {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080050
51 private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
52
53 private static final String CHANGE_SUBJECT = "change";
54
55 public AtomixConsistentMap(CopycatClient client, Resource.Options options) {
56 super(client, options);
57 }
58
59 @Override
60 public String name() {
61 return null;
62 }
63
64 @Override
65 public CompletableFuture<AtomixConsistentMap> open() {
66 return super.open().thenApply(result -> {
67 client.session().onEvent(CHANGE_SUBJECT, this::handleEvent);
68 return result;
69 });
70 }
71
72 private void handleEvent(MapEvent<String, byte[]> event) {
73 mapEventListeners.forEach(listener -> listener.event(event));
74 }
75
76 @Override
77 public AtomixConsistentMap with(Consistency consistency) {
78 super.with(consistency);
79 return this;
80 }
81
82 @Override
83 public CompletableFuture<Boolean> isEmpty() {
84 return submit(new AtomixConsistentMapCommands.IsEmpty());
85 }
86
87 @Override
88 public CompletableFuture<Integer> size() {
89 return submit(new AtomixConsistentMapCommands.Size());
90 }
91
92 @Override
93 public CompletableFuture<Boolean> containsKey(String key) {
94 return submit(new AtomixConsistentMapCommands.ContainsKey(key));
95 }
96
97 @Override
98 public CompletableFuture<Boolean> containsValue(byte[] value) {
99 return submit(new AtomixConsistentMapCommands.ContainsValue(value));
100 }
101
102 @Override
103 public CompletableFuture<Versioned<byte[]>> get(String key) {
104 return submit(new AtomixConsistentMapCommands.Get(key));
105 }
106
107 @Override
108 public CompletableFuture<Set<String>> keySet() {
109 return submit(new AtomixConsistentMapCommands.KeySet());
110 }
111
112 @Override
113 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
114 return submit(new AtomixConsistentMapCommands.Values());
115 }
116
117 @Override
118 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
119 return submit(new AtomixConsistentMapCommands.EntrySet());
120 }
121
122 @Override
123 @SuppressWarnings("unchecked")
124 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
125 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
126 .whenComplete((r, e) -> throwIfLocked(r.status()))
127 .thenApply(v -> v.oldValue());
128 }
129
130 @Override
131 @SuppressWarnings("unchecked")
132 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
133 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
134 .whenComplete((r, e) -> throwIfLocked(r.status()))
135 .thenApply(v -> v.newValue());
136 }
137
138 @Override
139 @SuppressWarnings("unchecked")
140 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
141 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NULL, Match.ANY))
142 .whenComplete((r, e) -> throwIfLocked(r.status()))
143 .thenApply(v -> v.oldValue());
144 }
145 @Override
146 @SuppressWarnings("unchecked")
147 public CompletableFuture<Versioned<byte[]>> remove(String key) {
148 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ANY))
149 .whenComplete((r, e) -> throwIfLocked(r.status()))
150 .thenApply(v -> v.oldValue());
151 }
152
153 @Override
154 @SuppressWarnings("unchecked")
155 public CompletableFuture<Boolean> remove(String key, byte[] value) {
156 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
157 .whenComplete((r, e) -> throwIfLocked(r.status()))
158 .thenApply(v -> v.updated());
159 }
160
161 @Override
162 @SuppressWarnings("unchecked")
163 public CompletableFuture<Boolean> remove(String key, long version) {
164 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
165 .whenComplete((r, e) -> throwIfLocked(r.status()))
166 .thenApply(v -> v.updated());
167 }
168
169 @Override
170 @SuppressWarnings("unchecked")
171 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
172 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
173 .whenComplete((r, e) -> throwIfLocked(r.status()))
174 .thenApply(v -> v.oldValue());
175 }
176
177 @Override
178 @SuppressWarnings("unchecked")
179 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
180 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
181 newValue,
182 Match.ifValue(oldValue),
183 Match.ANY))
184 .whenComplete((r, e) -> throwIfLocked(r.status()))
185 .thenApply(v -> v.updated());
186 }
187
188 @Override
189 @SuppressWarnings("unchecked")
190 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
191 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
192 newValue,
193 Match.ANY,
194 Match.ifValue(oldVersion)))
195 .whenComplete((r, e) -> throwIfLocked(r.status()))
196 .thenApply(v -> v.updated());
197 }
198
199 @Override
200 public CompletableFuture<Void> clear() {
201 return submit(new AtomixConsistentMapCommands.Clear())
202 .whenComplete((r, e) -> throwIfLocked(r))
203 .thenApply(v -> null);
204 }
205
206 @Override
207 @SuppressWarnings("unchecked")
208 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
209 Predicate<? super byte[]> condition,
210 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
211 return get(key).thenCompose(r1 -> {
212 byte[] existingValue = r1 == null ? null : r1.value();
213 // if the condition evaluates to false, return existing value.
214 if (!condition.test(existingValue)) {
215 return CompletableFuture.completedFuture(r1);
216 }
217
218 AtomicReference<byte[]> computedValue = new AtomicReference<>();
219 // if remappingFunction throws an exception, return the exception.
220 try {
221 computedValue.set(remappingFunction.apply(key, existingValue));
222 } catch (Exception e) {
223 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
224 future.completeExceptionally(e);
225 return future;
226 }
227 if (computedValue.get() == null && r1 == null) {
228 return CompletableFuture.completedFuture(null);
229 }
230 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
231 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
232 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
233 computedValue.get(),
234 valueMatch,
235 versionMatch))
236 .whenComplete((r, e) -> throwIfLocked(r.status()))
237 .thenApply(v -> v.newValue());
238 });
239 }
240
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800241 @Override
242 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
243 if (!mapEventListeners.isEmpty()) {
244 if (mapEventListeners.add(listener)) {
245 return CompletableFuture.completedFuture(new ChangeListener(listener)).thenApply(v -> null);
246 } else {
247 return CompletableFuture.completedFuture(null);
248 }
249 }
250 mapEventListeners.add(listener);
251 return submit(new AtomixConsistentMapCommands.Listen()).thenApply(v -> null);
252 }
253
254 @Override
255 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
256 if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
257 return submit(new AtomixConsistentMapCommands.Unlisten()).thenApply(v -> null);
258 }
259 return CompletableFuture.completedFuture(null);
260 }
261
262 private void throwIfLocked(MapEntryUpdateResult.Status status) {
263 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
264 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
265 }
266 }
267
Madan Jampanicadd70b2016-02-08 13:45:43 -0800268 @Override
269 public CompletableFuture<PrepareResult> prepare(Transaction transaction) {
270 return submit(new AtomixConsistentMapCommands.TransactionPrepare(transaction));
271 }
272
273 @Override
274 public CompletableFuture<CommitResult> commit(TransactionId transactionId) {
275 return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId));
276 }
277
278 @Override
279 public CompletableFuture<RollbackResult> rollback(TransactionId transactionId) {
280 return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId));
281 }
282
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800283 /**
284 * Change listener context.
285 */
286 private final class ChangeListener implements Listener<MapEvent<String, byte[]>> {
287 private final MapEventListener<String, byte[]> listener;
288
289 private ChangeListener(MapEventListener<String, byte[]> listener) {
290 this.listener = listener;
291 }
292
293 @Override
294 public void accept(MapEvent<String, byte[]> event) {
295 listener.event(event);
296 }
297
298 @Override
299 public void close() {
300 synchronized (AtomixConsistentMap.this) {
301 mapEventListeners.remove(listener);
302 if (mapEventListeners.isEmpty()) {
303 submit(new AtomixConsistentMapCommands.Unlisten());
304 }
305 }
306 }
307 }
308}