blob: 73bc8b71e2eda7d44758e59bcc84f983dc837e5e [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import io.atomix.catalyst.util.Listener;
19import io.atomix.copycat.client.CopycatClient;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.Resource;
21import io.atomix.resource.ResourceTypeInfo;
22
23import java.util.Collection;
24import java.util.ConcurrentModificationException;
Madan Jampanifc981772016-02-16 09:46:42 -080025import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080026import java.util.Map.Entry;
27import java.util.Set;
28import java.util.concurrent.CompletableFuture;
29import java.util.concurrent.atomic.AtomicReference;
30import java.util.function.BiFunction;
31import java.util.function.Predicate;
32
33import org.onlab.util.Match;
Madan Jampanicadd70b2016-02-08 13:45:43 -080034import org.onosproject.store.primitives.TransactionId;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080035import org.onosproject.store.service.AsyncConsistentMap;
36import org.onosproject.store.service.MapEvent;
37import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080038import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080039import org.onosproject.store.service.Versioned;
40
41import com.google.common.collect.Sets;
42
43/**
44 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
45 */
46@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
47public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
Madan Jampani74da78b2016-02-09 21:18:36 -080048 implements AsyncConsistentMap<String, byte[]> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080049
50 private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
51
Madan Jampanifc981772016-02-16 09:46:42 -080052 public static final String CHANGE_SUBJECT = "changeEvents";
Madan Jampani5e5b3d62016-02-01 16:03:33 -080053
54 public AtomixConsistentMap(CopycatClient client, Resource.Options options) {
55 super(client, options);
56 }
57
58 @Override
59 public String name() {
60 return null;
61 }
62
63 @Override
64 public CompletableFuture<AtomixConsistentMap> open() {
65 return super.open().thenApply(result -> {
66 client.session().onEvent(CHANGE_SUBJECT, this::handleEvent);
67 return result;
68 });
69 }
70
Madan Jampanifc981772016-02-16 09:46:42 -080071 private void handleEvent(List<MapEvent<String, byte[]>> events) {
72 events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080073 }
74
75 @Override
76 public CompletableFuture<Boolean> isEmpty() {
77 return submit(new AtomixConsistentMapCommands.IsEmpty());
78 }
79
80 @Override
81 public CompletableFuture<Integer> size() {
82 return submit(new AtomixConsistentMapCommands.Size());
83 }
84
85 @Override
86 public CompletableFuture<Boolean> containsKey(String key) {
87 return submit(new AtomixConsistentMapCommands.ContainsKey(key));
88 }
89
90 @Override
91 public CompletableFuture<Boolean> containsValue(byte[] value) {
92 return submit(new AtomixConsistentMapCommands.ContainsValue(value));
93 }
94
95 @Override
96 public CompletableFuture<Versioned<byte[]>> get(String key) {
97 return submit(new AtomixConsistentMapCommands.Get(key));
98 }
99
100 @Override
101 public CompletableFuture<Set<String>> keySet() {
102 return submit(new AtomixConsistentMapCommands.KeySet());
103 }
104
105 @Override
106 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
107 return submit(new AtomixConsistentMapCommands.Values());
108 }
109
110 @Override
111 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
112 return submit(new AtomixConsistentMapCommands.EntrySet());
113 }
114
115 @Override
116 @SuppressWarnings("unchecked")
117 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
118 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
119 .whenComplete((r, e) -> throwIfLocked(r.status()))
120 .thenApply(v -> v.oldValue());
121 }
122
123 @Override
124 @SuppressWarnings("unchecked")
125 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
126 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
127 .whenComplete((r, e) -> throwIfLocked(r.status()))
128 .thenApply(v -> v.newValue());
129 }
130
131 @Override
132 @SuppressWarnings("unchecked")
133 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
134 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NULL, Match.ANY))
135 .whenComplete((r, e) -> throwIfLocked(r.status()))
136 .thenApply(v -> v.oldValue());
137 }
138 @Override
139 @SuppressWarnings("unchecked")
140 public CompletableFuture<Versioned<byte[]>> remove(String key) {
141 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ANY))
142 .whenComplete((r, e) -> throwIfLocked(r.status()))
143 .thenApply(v -> v.oldValue());
144 }
145
146 @Override
147 @SuppressWarnings("unchecked")
148 public CompletableFuture<Boolean> remove(String key, byte[] value) {
149 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
150 .whenComplete((r, e) -> throwIfLocked(r.status()))
151 .thenApply(v -> v.updated());
152 }
153
154 @Override
155 @SuppressWarnings("unchecked")
156 public CompletableFuture<Boolean> remove(String key, long version) {
157 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
158 .whenComplete((r, e) -> throwIfLocked(r.status()))
159 .thenApply(v -> v.updated());
160 }
161
162 @Override
163 @SuppressWarnings("unchecked")
164 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
165 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
166 .whenComplete((r, e) -> throwIfLocked(r.status()))
167 .thenApply(v -> v.oldValue());
168 }
169
170 @Override
171 @SuppressWarnings("unchecked")
172 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
173 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
174 newValue,
175 Match.ifValue(oldValue),
176 Match.ANY))
177 .whenComplete((r, e) -> throwIfLocked(r.status()))
178 .thenApply(v -> v.updated());
179 }
180
181 @Override
182 @SuppressWarnings("unchecked")
183 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
184 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
185 newValue,
186 Match.ANY,
187 Match.ifValue(oldVersion)))
188 .whenComplete((r, e) -> throwIfLocked(r.status()))
189 .thenApply(v -> v.updated());
190 }
191
192 @Override
193 public CompletableFuture<Void> clear() {
194 return submit(new AtomixConsistentMapCommands.Clear())
195 .whenComplete((r, e) -> throwIfLocked(r))
196 .thenApply(v -> null);
197 }
198
199 @Override
200 @SuppressWarnings("unchecked")
201 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
202 Predicate<? super byte[]> condition,
203 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
204 return get(key).thenCompose(r1 -> {
205 byte[] existingValue = r1 == null ? null : r1.value();
206 // if the condition evaluates to false, return existing value.
207 if (!condition.test(existingValue)) {
208 return CompletableFuture.completedFuture(r1);
209 }
210
211 AtomicReference<byte[]> computedValue = new AtomicReference<>();
212 // if remappingFunction throws an exception, return the exception.
213 try {
214 computedValue.set(remappingFunction.apply(key, existingValue));
215 } catch (Exception e) {
216 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
217 future.completeExceptionally(e);
218 return future;
219 }
220 if (computedValue.get() == null && r1 == null) {
221 return CompletableFuture.completedFuture(null);
222 }
223 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
224 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
225 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
226 computedValue.get(),
227 valueMatch,
228 versionMatch))
229 .whenComplete((r, e) -> throwIfLocked(r.status()))
230 .thenApply(v -> v.newValue());
231 });
232 }
233
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800234 @Override
235 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
236 if (!mapEventListeners.isEmpty()) {
237 if (mapEventListeners.add(listener)) {
238 return CompletableFuture.completedFuture(new ChangeListener(listener)).thenApply(v -> null);
239 } else {
240 return CompletableFuture.completedFuture(null);
241 }
242 }
243 mapEventListeners.add(listener);
244 return submit(new AtomixConsistentMapCommands.Listen()).thenApply(v -> null);
245 }
246
247 @Override
248 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
249 if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
250 return submit(new AtomixConsistentMapCommands.Unlisten()).thenApply(v -> null);
251 }
252 return CompletableFuture.completedFuture(null);
253 }
254
255 private void throwIfLocked(MapEntryUpdateResult.Status status) {
256 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
257 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
258 }
259 }
260
Madan Jampanicadd70b2016-02-08 13:45:43 -0800261 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800262 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
263 return submit(new AtomixConsistentMapCommands.TransactionPrepare(transaction))
264 .thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800265 }
266
267 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800268 public CompletableFuture<Void> commit(TransactionId transactionId) {
269 return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId))
270 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800271 }
272
273 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800274 public CompletableFuture<Void> rollback(TransactionId transactionId) {
275 return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId))
276 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800277 }
278
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800279 /**
280 * Change listener context.
281 */
282 private final class ChangeListener implements Listener<MapEvent<String, byte[]>> {
283 private final MapEventListener<String, byte[]> listener;
284
285 private ChangeListener(MapEventListener<String, byte[]> listener) {
286 this.listener = listener;
287 }
288
289 @Override
290 public void accept(MapEvent<String, byte[]> event) {
291 listener.event(event);
292 }
293
294 @Override
295 public void close() {
296 synchronized (AtomixConsistentMap.this) {
297 mapEventListeners.remove(listener);
298 if (mapEventListeners.isEmpty()) {
299 submit(new AtomixConsistentMapCommands.Unlisten());
300 }
301 }
302 }
303 }
304}