blob: 4c065a5314cabaecaffe56393068dbe5989522aa [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import io.atomix.catalyst.util.Listener;
19import io.atomix.copycat.client.CopycatClient;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.Resource;
21import io.atomix.resource.ResourceTypeInfo;
22
23import java.util.Collection;
24import java.util.ConcurrentModificationException;
Madan Jampanifc981772016-02-16 09:46:42 -080025import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080026import java.util.Map.Entry;
27import java.util.Set;
28import java.util.concurrent.CompletableFuture;
29import java.util.concurrent.atomic.AtomicReference;
30import java.util.function.BiFunction;
31import java.util.function.Predicate;
32
33import org.onlab.util.Match;
Madan Jampanicadd70b2016-02-08 13:45:43 -080034import org.onosproject.store.primitives.TransactionId;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080035import org.onosproject.store.service.AsyncConsistentMap;
36import org.onosproject.store.service.MapEvent;
37import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080038import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080039import org.onosproject.store.service.Versioned;
40
41import com.google.common.collect.Sets;
42
43/**
44 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
45 */
Madan Jampani86cb2432016-02-17 11:07:56 -080046@ResourceTypeInfo(id = -151,
47 stateMachine = AtomixConsistentMapState.class,
48 typeResolver = AtomixConsistentMapCommands.TypeResolver.class)
49public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
Madan Jampani74da78b2016-02-09 21:18:36 -080050 implements AsyncConsistentMap<String, byte[]> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080051
52 private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
53
Madan Jampanifc981772016-02-16 09:46:42 -080054 public static final String CHANGE_SUBJECT = "changeEvents";
Madan Jampani5e5b3d62016-02-01 16:03:33 -080055
56 public AtomixConsistentMap(CopycatClient client, Resource.Options options) {
57 super(client, options);
58 }
59
60 @Override
61 public String name() {
62 return null;
63 }
64
65 @Override
66 public CompletableFuture<AtomixConsistentMap> open() {
67 return super.open().thenApply(result -> {
Madan Jampani3a9911c2016-02-21 11:25:45 -080068 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080069 return result;
70 });
71 }
72
Madan Jampanifc981772016-02-16 09:46:42 -080073 private void handleEvent(List<MapEvent<String, byte[]>> events) {
74 events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080075 }
76
77 @Override
78 public CompletableFuture<Boolean> isEmpty() {
79 return submit(new AtomixConsistentMapCommands.IsEmpty());
80 }
81
82 @Override
83 public CompletableFuture<Integer> size() {
84 return submit(new AtomixConsistentMapCommands.Size());
85 }
86
87 @Override
88 public CompletableFuture<Boolean> containsKey(String key) {
89 return submit(new AtomixConsistentMapCommands.ContainsKey(key));
90 }
91
92 @Override
93 public CompletableFuture<Boolean> containsValue(byte[] value) {
94 return submit(new AtomixConsistentMapCommands.ContainsValue(value));
95 }
96
97 @Override
98 public CompletableFuture<Versioned<byte[]>> get(String key) {
99 return submit(new AtomixConsistentMapCommands.Get(key));
100 }
101
102 @Override
103 public CompletableFuture<Set<String>> keySet() {
104 return submit(new AtomixConsistentMapCommands.KeySet());
105 }
106
107 @Override
108 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
109 return submit(new AtomixConsistentMapCommands.Values());
110 }
111
112 @Override
113 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
114 return submit(new AtomixConsistentMapCommands.EntrySet());
115 }
116
117 @Override
118 @SuppressWarnings("unchecked")
119 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
120 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
121 .whenComplete((r, e) -> throwIfLocked(r.status()))
122 .thenApply(v -> v.oldValue());
123 }
124
125 @Override
126 @SuppressWarnings("unchecked")
127 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
128 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
129 .whenComplete((r, e) -> throwIfLocked(r.status()))
130 .thenApply(v -> v.newValue());
131 }
132
133 @Override
134 @SuppressWarnings("unchecked")
135 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
136 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NULL, Match.ANY))
137 .whenComplete((r, e) -> throwIfLocked(r.status()))
138 .thenApply(v -> v.oldValue());
139 }
140 @Override
141 @SuppressWarnings("unchecked")
142 public CompletableFuture<Versioned<byte[]>> remove(String key) {
143 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ANY))
144 .whenComplete((r, e) -> throwIfLocked(r.status()))
145 .thenApply(v -> v.oldValue());
146 }
147
148 @Override
149 @SuppressWarnings("unchecked")
150 public CompletableFuture<Boolean> remove(String key, byte[] value) {
151 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
152 .whenComplete((r, e) -> throwIfLocked(r.status()))
153 .thenApply(v -> v.updated());
154 }
155
156 @Override
157 @SuppressWarnings("unchecked")
158 public CompletableFuture<Boolean> remove(String key, long version) {
159 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
160 .whenComplete((r, e) -> throwIfLocked(r.status()))
161 .thenApply(v -> v.updated());
162 }
163
164 @Override
165 @SuppressWarnings("unchecked")
166 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
167 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
168 .whenComplete((r, e) -> throwIfLocked(r.status()))
169 .thenApply(v -> v.oldValue());
170 }
171
172 @Override
173 @SuppressWarnings("unchecked")
174 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
175 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
176 newValue,
177 Match.ifValue(oldValue),
178 Match.ANY))
179 .whenComplete((r, e) -> throwIfLocked(r.status()))
180 .thenApply(v -> v.updated());
181 }
182
183 @Override
184 @SuppressWarnings("unchecked")
185 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
186 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
187 newValue,
188 Match.ANY,
189 Match.ifValue(oldVersion)))
190 .whenComplete((r, e) -> throwIfLocked(r.status()))
191 .thenApply(v -> v.updated());
192 }
193
194 @Override
195 public CompletableFuture<Void> clear() {
196 return submit(new AtomixConsistentMapCommands.Clear())
197 .whenComplete((r, e) -> throwIfLocked(r))
198 .thenApply(v -> null);
199 }
200
201 @Override
202 @SuppressWarnings("unchecked")
203 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
204 Predicate<? super byte[]> condition,
205 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
206 return get(key).thenCompose(r1 -> {
207 byte[] existingValue = r1 == null ? null : r1.value();
208 // if the condition evaluates to false, return existing value.
209 if (!condition.test(existingValue)) {
210 return CompletableFuture.completedFuture(r1);
211 }
212
213 AtomicReference<byte[]> computedValue = new AtomicReference<>();
214 // if remappingFunction throws an exception, return the exception.
215 try {
216 computedValue.set(remappingFunction.apply(key, existingValue));
217 } catch (Exception e) {
218 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
219 future.completeExceptionally(e);
220 return future;
221 }
222 if (computedValue.get() == null && r1 == null) {
223 return CompletableFuture.completedFuture(null);
224 }
225 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
226 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
227 return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
228 computedValue.get(),
229 valueMatch,
230 versionMatch))
231 .whenComplete((r, e) -> throwIfLocked(r.status()))
232 .thenApply(v -> v.newValue());
233 });
234 }
235
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800236 @Override
237 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
238 if (!mapEventListeners.isEmpty()) {
239 if (mapEventListeners.add(listener)) {
240 return CompletableFuture.completedFuture(new ChangeListener(listener)).thenApply(v -> null);
241 } else {
242 return CompletableFuture.completedFuture(null);
243 }
244 }
245 mapEventListeners.add(listener);
246 return submit(new AtomixConsistentMapCommands.Listen()).thenApply(v -> null);
247 }
248
249 @Override
250 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
251 if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
252 return submit(new AtomixConsistentMapCommands.Unlisten()).thenApply(v -> null);
253 }
254 return CompletableFuture.completedFuture(null);
255 }
256
257 private void throwIfLocked(MapEntryUpdateResult.Status status) {
258 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
259 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
260 }
261 }
262
Madan Jampanicadd70b2016-02-08 13:45:43 -0800263 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800264 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
265 return submit(new AtomixConsistentMapCommands.TransactionPrepare(transaction))
266 .thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800267 }
268
269 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800270 public CompletableFuture<Void> commit(TransactionId transactionId) {
271 return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId))
272 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800273 }
274
275 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800276 public CompletableFuture<Void> rollback(TransactionId transactionId) {
277 return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId))
278 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800279 }
280
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800281 /**
282 * Change listener context.
283 */
284 private final class ChangeListener implements Listener<MapEvent<String, byte[]>> {
285 private final MapEventListener<String, byte[]> listener;
286
287 private ChangeListener(MapEventListener<String, byte[]> listener) {
288 this.listener = listener;
289 }
290
291 @Override
292 public void accept(MapEvent<String, byte[]> event) {
293 listener.event(event);
294 }
295
296 @Override
297 public void close() {
298 synchronized (AtomixConsistentMap.this) {
299 mapEventListeners.remove(listener);
300 if (mapEventListeners.isEmpty()) {
301 submit(new AtomixConsistentMapCommands.Unlisten());
302 }
303 }
304 }
305 }
306}