blob: bf462c9071dcbae346c2ab0c0484db1803b7d62d [file] [log] [blame]
Aaron Kruglikov3e29f662016-07-13 10:18:10 -07001/*
Brian O'Connor0a4e6742016-09-15 23:03:10 -07002 * Copyright 2016-present Open Networking Laboratory
Aaron Kruglikov3e29f662016-07-13 10:18:10 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import com.google.common.collect.Maps;
20import io.atomix.copycat.client.CopycatClient;
21import io.atomix.resource.AbstractResource;
22import io.atomix.resource.ResourceTypeInfo;
23import org.onlab.util.Match;
Jordan Halterman948d6592017-04-20 17:18:24 -070024import org.onosproject.store.primitives.MapUpdate;
Aaron Kruglikov3e29f662016-07-13 10:18:10 -070025import org.onosproject.store.primitives.TransactionId;
26import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstKey;
27import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorEntry;
28import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.HigherEntry;
29import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LastEntry;
30import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LowerEntry;
31import org.onosproject.store.service.AsyncConsistentTreeMap;
32import org.onosproject.store.service.MapEvent;
33import org.onosproject.store.service.MapEventListener;
Jordan Halterman948d6592017-04-20 17:18:24 -070034import org.onosproject.store.service.TransactionLog;
35import org.onosproject.store.service.Version;
Aaron Kruglikov3e29f662016-07-13 10:18:10 -070036import org.onosproject.store.service.Versioned;
37
38import java.util.Collection;
39import java.util.ConcurrentModificationException;
40import java.util.List;
41import java.util.Map;
42import java.util.NavigableMap;
43import java.util.NavigableSet;
44import java.util.Properties;
45import java.util.Set;
46import java.util.concurrent.CompletableFuture;
47import java.util.concurrent.Executor;
48import java.util.concurrent.atomic.AtomicReference;
49import java.util.function.BiFunction;
50import java.util.function.Predicate;
51
52import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.CeilingEntry;
53import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.CeilingKey;
54import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Clear;
55import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.ContainsKey;
56import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.ContainsValue;
57import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.EntrySet;
58import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstEntry;
59import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorKey;
60import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Get;
Jordan Haltermanf6272442017-04-20 02:18:08 -070061import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.GetOrDefault;
Aaron Kruglikov3e29f662016-07-13 10:18:10 -070062import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.HigherKey;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.IsEmpty;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.KeySet;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LastKey;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Listen;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LowerKey;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.PollFirstEntry;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.PollLastEntry;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Size;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Unlisten;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.UpdateAndGet;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Values;
74
75/**
76 * Implementation of {@link AsyncConsistentTreeMap}.
77 */
78@ResourceTypeInfo(id = -155, factory = AtomixConsistentTreeMapFactory.class)
79public class AtomixConsistentTreeMap extends AbstractResource<AtomixConsistentTreeMap>
Aaron Kruglikov6a164352016-07-25 11:46:17 -070080 implements AsyncConsistentTreeMap<byte[]> {
Aaron Kruglikov3e29f662016-07-13 10:18:10 -070081
82 private final Map<MapEventListener<String, byte[]>, Executor>
83 mapEventListeners = Maps.newConcurrentMap();
84
85 public static final String CHANGE_SUBJECT = "changeEvents";
86
87 public AtomixConsistentTreeMap(CopycatClient client, Properties options) {
88 super(client, options);
89 }
90
91 @Override
92 public String name() {
93 return null;
94 }
95
96 @Override
97 public CompletableFuture<AtomixConsistentTreeMap> open() {
98 return super.open().thenApply(result -> {
99 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
100 return result;
101 });
102 }
103
104 private void handleEvent(List<MapEvent<String, byte[]>> events) {
105 events.forEach(event -> mapEventListeners.
106 forEach((listener, executor) ->
107 executor.execute(() ->
108 listener.event(event))));
109 }
110
111 @Override
112 public CompletableFuture<Boolean> isEmpty() {
113 return client.submit(new IsEmpty());
114 }
115
116 @Override
117 public CompletableFuture<Integer> size() {
118 return client.submit(new Size());
119 }
120
121 @Override
122 public CompletableFuture<Boolean> containsKey(String key) {
123 return client.submit(new ContainsKey(key));
124 }
125
126 @Override
127 public CompletableFuture<Boolean> containsValue(byte[] value) {
128 return client.submit(new ContainsValue(value));
129 }
130
131 @Override
132 public CompletableFuture<Versioned<byte[]>> get(String key) {
133 return client.submit(new Get(key));
134 }
135
136 @Override
Jordan Haltermanf6272442017-04-20 02:18:08 -0700137 public CompletableFuture<Versioned<byte[]>> getOrDefault(String key, byte[] defaultValue) {
138 return client.submit(new GetOrDefault(key, defaultValue));
139 }
140
141 @Override
Aaron Kruglikov3e29f662016-07-13 10:18:10 -0700142 public CompletableFuture<Set<String>> keySet() {
143 return client.submit(new KeySet());
144 }
145
146 @Override
147 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
148 return client.submit(new Values());
149 }
150
151 @Override
152 public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet() {
153 return client.submit(new EntrySet());
154 }
155
156 @Override
157 @SuppressWarnings("unchecked")
158 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
159 return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
160 .whenComplete((r, e) -> throwIfLocked(r.status()))
161 .thenApply(v -> v.oldValue());
162 }
163
164 @Override
165 @SuppressWarnings("unchecked")
166 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
167 return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
168 .whenComplete((r, e) -> throwIfLocked(r.status()))
169 .thenApply(v -> v.newValue());
170 }
171
172 @Override
173 @SuppressWarnings("unchecked")
174 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
175 return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
176 .whenComplete((r, e) -> throwIfLocked(r.status()))
177 .thenApply(v -> v.oldValue());
178 }
179
180 @Override
181 @SuppressWarnings("unchecked")
182 public CompletableFuture<Versioned<byte[]>> remove(String key) {
183 return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
184 .whenComplete((r, e) -> throwIfLocked(r.status()))
185 .thenApply(v -> v.oldValue());
186 }
187
188 @Override
189 @SuppressWarnings("unchecked")
190 public CompletableFuture<Boolean> remove(String key, byte[] value) {
191 return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
192 .whenComplete((r, e) -> throwIfLocked(r.status()))
193 .thenApply(v -> v.updated());
194 }
195
196 @Override
197 @SuppressWarnings("unchecked")
198 public CompletableFuture<Boolean> remove(String key, long version) {
199 return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
200 .whenComplete((r, e) -> throwIfLocked(r.status()))
201 .thenApply(v -> v.updated());
202 }
203
204 @Override
205 @SuppressWarnings("unchecked")
206 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
207 return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
208 .whenComplete((r, e) -> throwIfLocked(r.status()))
209 .thenApply(v -> v.oldValue());
210 }
211
212 @Override
213 @SuppressWarnings("unchecked")
214 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
215 return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
216 .whenComplete((r, e) -> throwIfLocked(r.status()))
217 .thenApply(v -> v.updated());
218 }
219
220 @Override
221 @SuppressWarnings("unchecked")
222 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
223 return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
224 .whenComplete((r, e) -> throwIfLocked(r.status()))
225 .thenApply(v -> v.updated());
226 }
227
228 @Override
229 public CompletableFuture<Void> clear() {
230 return client.submit(new Clear())
231 .whenComplete((r, e) -> throwIfLocked(r))
232 .thenApply(v -> null);
233 }
234
235 @Override
236 @SuppressWarnings("unchecked")
237 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
238 Predicate<? super byte[]> condition,
239 BiFunction<? super String,
240 ? super byte[],
241 ? extends byte[]> remappingFunction) {
242 return get(key).thenCompose(r1 -> {
243 byte[] existingValue = r1 == null ? null : r1.value();
244
245 if (!condition.test(existingValue)) {
246 return CompletableFuture.completedFuture(r1);
247 }
248
249 AtomicReference<byte[]> computedValue = new AtomicReference<byte[]>();
250 try {
251 computedValue.set(remappingFunction.apply(key, existingValue));
252 } catch (Exception e) {
253 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
254 future.completeExceptionally(e);
255 return future;
256 }
257 if (computedValue.get() == null && r1 == null) {
258 return CompletableFuture.completedFuture(null);
259 }
260 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
261 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
262 return client.submit(new UpdateAndGet(key, computedValue.get(),
263 valueMatch, versionMatch))
264 .whenComplete((r, e) -> throwIfLocked(r.status()))
265 .thenApply(v -> v.newValue());
266 });
267 }
268
269 @Override
270 public CompletableFuture<Void> addListener(
271 MapEventListener<String, byte[]> listener, Executor executor) {
272 if (mapEventListeners.isEmpty()) {
273 return client.submit(new Listen()).thenRun(() ->
274 mapEventListeners.put(listener,
275 executor));
276 } else {
277 mapEventListeners.put(listener, executor);
278 return CompletableFuture.completedFuture(null);
279 }
280 }
281
282 @Override
283 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
284 if (mapEventListeners.remove(listener) != null &&
285 mapEventListeners.isEmpty()) {
286 return client.submit(new Unlisten())
287 .thenApply(v -> null);
288 }
289 return CompletableFuture.completedFuture(null);
290 }
291
292
293 private void throwIfLocked(MapEntryUpdateResult.Status status) {
294 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
295 throw new ConcurrentModificationException("Cannot update TreeMap: another update is in progress.");
296 }
297 }
298
299 @Override
300 public CompletableFuture<String> firstKey() {
301 return client.submit(new FirstKey<String>());
302 }
303
304 @Override
305 public CompletableFuture<String> lastKey() {
306 return client.submit(new LastKey<String>());
307 }
308
309 @Override
310 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> ceilingEntry(String key) {
311 return client.submit(new CeilingEntry(key));
312 }
313
314 @Override
315 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> floorEntry(String key) {
316 return client.submit(new FloorEntry<String, Versioned<byte[]>>(key));
317 }
318
319 @Override
320 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> higherEntry(
321 String key) {
322 return client.submit(new HigherEntry<String, Versioned<byte[]>>(key));
323 }
324
325 @Override
326 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> lowerEntry(
327 String key) {
328 return client.submit(new LowerEntry<>(key));
329 }
330
331 @Override
332 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> firstEntry() {
333 return client.submit(new FirstEntry());
334 }
335
336 @Override
337 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> lastEntry() {
338 return client.submit(new LastEntry<String, Versioned<byte[]>>());
339 }
340
341 @Override
342 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> pollFirstEntry() {
343 return client.submit(new PollFirstEntry());
344 }
345
346 @Override
347 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> pollLastEntry() {
348 return client.submit(new PollLastEntry());
349 }
350
351 @Override
352 public CompletableFuture<String> lowerKey(String key) {
353 return client.submit(new LowerKey(key));
354 }
355
356 @Override
357 public CompletableFuture<String> floorKey(String key) {
358 return client.submit(new FloorKey(key));
359 }
360
361 @Override
362 public CompletableFuture<String> ceilingKey(String key) {
363 return client.submit(new CeilingKey(key));
364 }
365
366 @Override
367 public CompletableFuture<String> higherKey(String key) {
368 return client.submit(new HigherKey(key));
369 }
370
371 @Override
372 public CompletableFuture<NavigableSet<String>> navigableKeySet() {
Jordan Halterman948d6592017-04-20 17:18:24 -0700373 throw new UnsupportedOperationException("This operation is not yet supported.");
Aaron Kruglikov3e29f662016-07-13 10:18:10 -0700374 }
375
376 @Override
377 public CompletableFuture<NavigableMap<String, byte[]>> subMap(
378 String upperKey, String lowerKey, boolean inclusiveUpper,
379 boolean inclusiveLower) {
Jordan Halterman948d6592017-04-20 17:18:24 -0700380 throw new UnsupportedOperationException("This operation is not yet supported.");
Aaron Kruglikov3e29f662016-07-13 10:18:10 -0700381 }
382
383 @Override
Jordan Halterman948d6592017-04-20 17:18:24 -0700384 public CompletableFuture<Version> begin(TransactionId transactionId) {
385 throw new UnsupportedOperationException("This operation is not yet supported.");
386 }
387
388 @Override
389 public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
390 throw new UnsupportedOperationException("This operation is not yet supported.");
391 }
392
393 @Override
394 public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
395 throw new UnsupportedOperationException("This operation is not yet supported.");
Aaron Kruglikov3e29f662016-07-13 10:18:10 -0700396 }
397
398 @Override
399 public CompletableFuture<Void> commit(TransactionId transactionId) {
Jordan Halterman948d6592017-04-20 17:18:24 -0700400 throw new UnsupportedOperationException("This operation is not yet supported.");
Aaron Kruglikov3e29f662016-07-13 10:18:10 -0700401 }
402
403 @Override
404 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Jordan Halterman948d6592017-04-20 17:18:24 -0700405 throw new UnsupportedOperationException("This operation is not yet supported.");
Aaron Kruglikov3e29f662016-07-13 10:18:10 -0700406 }
407}