blob: 758a36c9ec599d4d856f9c73c38501168bd74840 [file] [log] [blame]
Aaron Kruglikov3e29f662016-07-13 10:18:10 -07001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import com.google.common.collect.Maps;
20import io.atomix.copycat.client.CopycatClient;
21import io.atomix.resource.AbstractResource;
22import io.atomix.resource.ResourceTypeInfo;
23import org.onlab.util.Match;
24import org.onosproject.store.primitives.TransactionId;
25import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstKey;
26import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorEntry;
27import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.HigherEntry;
28import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LastEntry;
29import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LowerEntry;
30import org.onosproject.store.service.AsyncConsistentTreeMap;
31import org.onosproject.store.service.MapEvent;
32import org.onosproject.store.service.MapEventListener;
33import org.onosproject.store.service.MapTransaction;
34import org.onosproject.store.service.Versioned;
35
36import java.util.Collection;
37import java.util.ConcurrentModificationException;
38import java.util.List;
39import java.util.Map;
40import java.util.NavigableMap;
41import java.util.NavigableSet;
42import java.util.Properties;
43import java.util.Set;
44import java.util.concurrent.CompletableFuture;
45import java.util.concurrent.Executor;
46import java.util.concurrent.atomic.AtomicReference;
47import java.util.function.BiFunction;
48import java.util.function.Predicate;
49
50import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.CeilingEntry;
51import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.CeilingKey;
52import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Clear;
53import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.ContainsKey;
54import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.ContainsValue;
55import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.EntrySet;
56import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstEntry;
57import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorKey;
58import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Get;
59import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.HigherKey;
60import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.IsEmpty;
61import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.KeySet;
62import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LastKey;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Listen;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LowerKey;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.PollFirstEntry;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.PollLastEntry;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Size;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Unlisten;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.UpdateAndGet;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Values;
71
72/**
73 * Implementation of {@link AsyncConsistentTreeMap}.
74 */
75@ResourceTypeInfo(id = -155, factory = AtomixConsistentTreeMapFactory.class)
76public class AtomixConsistentTreeMap extends AbstractResource<AtomixConsistentTreeMap>
Aaron Kruglikov6a164352016-07-25 11:46:17 -070077 implements AsyncConsistentTreeMap<byte[]> {
Aaron Kruglikov3e29f662016-07-13 10:18:10 -070078
79 private final Map<MapEventListener<String, byte[]>, Executor>
80 mapEventListeners = Maps.newConcurrentMap();
81
82 public static final String CHANGE_SUBJECT = "changeEvents";
83
84 public AtomixConsistentTreeMap(CopycatClient client, Properties options) {
85 super(client, options);
86 }
87
88 @Override
89 public String name() {
90 return null;
91 }
92
93 @Override
94 public CompletableFuture<AtomixConsistentTreeMap> open() {
95 return super.open().thenApply(result -> {
96 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
97 return result;
98 });
99 }
100
101 private void handleEvent(List<MapEvent<String, byte[]>> events) {
102 events.forEach(event -> mapEventListeners.
103 forEach((listener, executor) ->
104 executor.execute(() ->
105 listener.event(event))));
106 }
107
108 @Override
109 public CompletableFuture<Boolean> isEmpty() {
110 return client.submit(new IsEmpty());
111 }
112
113 @Override
114 public CompletableFuture<Integer> size() {
115 return client.submit(new Size());
116 }
117
118 @Override
119 public CompletableFuture<Boolean> containsKey(String key) {
120 return client.submit(new ContainsKey(key));
121 }
122
123 @Override
124 public CompletableFuture<Boolean> containsValue(byte[] value) {
125 return client.submit(new ContainsValue(value));
126 }
127
128 @Override
129 public CompletableFuture<Versioned<byte[]>> get(String key) {
130 return client.submit(new Get(key));
131 }
132
133 @Override
134 public CompletableFuture<Set<String>> keySet() {
135 return client.submit(new KeySet());
136 }
137
138 @Override
139 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
140 return client.submit(new Values());
141 }
142
143 @Override
144 public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet() {
145 return client.submit(new EntrySet());
146 }
147
148 @Override
149 @SuppressWarnings("unchecked")
150 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
151 return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
152 .whenComplete((r, e) -> throwIfLocked(r.status()))
153 .thenApply(v -> v.oldValue());
154 }
155
156 @Override
157 @SuppressWarnings("unchecked")
158 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
159 return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
160 .whenComplete((r, e) -> throwIfLocked(r.status()))
161 .thenApply(v -> v.newValue());
162 }
163
164 @Override
165 @SuppressWarnings("unchecked")
166 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
167 return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
168 .whenComplete((r, e) -> throwIfLocked(r.status()))
169 .thenApply(v -> v.oldValue());
170 }
171
172 @Override
173 @SuppressWarnings("unchecked")
174 public CompletableFuture<Versioned<byte[]>> remove(String key) {
175 return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
176 .whenComplete((r, e) -> throwIfLocked(r.status()))
177 .thenApply(v -> v.oldValue());
178 }
179
180 @Override
181 @SuppressWarnings("unchecked")
182 public CompletableFuture<Boolean> remove(String key, byte[] value) {
183 return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
184 .whenComplete((r, e) -> throwIfLocked(r.status()))
185 .thenApply(v -> v.updated());
186 }
187
188 @Override
189 @SuppressWarnings("unchecked")
190 public CompletableFuture<Boolean> remove(String key, long version) {
191 return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
192 .whenComplete((r, e) -> throwIfLocked(r.status()))
193 .thenApply(v -> v.updated());
194 }
195
196 @Override
197 @SuppressWarnings("unchecked")
198 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
199 return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
200 .whenComplete((r, e) -> throwIfLocked(r.status()))
201 .thenApply(v -> v.oldValue());
202 }
203
204 @Override
205 @SuppressWarnings("unchecked")
206 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
207 return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
208 .whenComplete((r, e) -> throwIfLocked(r.status()))
209 .thenApply(v -> v.updated());
210 }
211
212 @Override
213 @SuppressWarnings("unchecked")
214 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
215 return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
216 .whenComplete((r, e) -> throwIfLocked(r.status()))
217 .thenApply(v -> v.updated());
218 }
219
220 @Override
221 public CompletableFuture<Void> clear() {
222 return client.submit(new Clear())
223 .whenComplete((r, e) -> throwIfLocked(r))
224 .thenApply(v -> null);
225 }
226
227 @Override
228 @SuppressWarnings("unchecked")
229 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
230 Predicate<? super byte[]> condition,
231 BiFunction<? super String,
232 ? super byte[],
233 ? extends byte[]> remappingFunction) {
234 return get(key).thenCompose(r1 -> {
235 byte[] existingValue = r1 == null ? null : r1.value();
236
237 if (!condition.test(existingValue)) {
238 return CompletableFuture.completedFuture(r1);
239 }
240
241 AtomicReference<byte[]> computedValue = new AtomicReference<byte[]>();
242 try {
243 computedValue.set(remappingFunction.apply(key, existingValue));
244 } catch (Exception e) {
245 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
246 future.completeExceptionally(e);
247 return future;
248 }
249 if (computedValue.get() == null && r1 == null) {
250 return CompletableFuture.completedFuture(null);
251 }
252 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
253 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
254 return client.submit(new UpdateAndGet(key, computedValue.get(),
255 valueMatch, versionMatch))
256 .whenComplete((r, e) -> throwIfLocked(r.status()))
257 .thenApply(v -> v.newValue());
258 });
259 }
260
261 @Override
262 public CompletableFuture<Void> addListener(
263 MapEventListener<String, byte[]> listener, Executor executor) {
264 if (mapEventListeners.isEmpty()) {
265 return client.submit(new Listen()).thenRun(() ->
266 mapEventListeners.put(listener,
267 executor));
268 } else {
269 mapEventListeners.put(listener, executor);
270 return CompletableFuture.completedFuture(null);
271 }
272 }
273
274 @Override
275 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
276 if (mapEventListeners.remove(listener) != null &&
277 mapEventListeners.isEmpty()) {
278 return client.submit(new Unlisten())
279 .thenApply(v -> null);
280 }
281 return CompletableFuture.completedFuture(null);
282 }
283
284
285 private void throwIfLocked(MapEntryUpdateResult.Status status) {
286 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
287 throw new ConcurrentModificationException("Cannot update TreeMap: another update is in progress.");
288 }
289 }
290
291 @Override
292 public CompletableFuture<String> firstKey() {
293 return client.submit(new FirstKey<String>());
294 }
295
296 @Override
297 public CompletableFuture<String> lastKey() {
298 return client.submit(new LastKey<String>());
299 }
300
301 @Override
302 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> ceilingEntry(String key) {
303 return client.submit(new CeilingEntry(key));
304 }
305
306 @Override
307 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> floorEntry(String key) {
308 return client.submit(new FloorEntry<String, Versioned<byte[]>>(key));
309 }
310
311 @Override
312 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> higherEntry(
313 String key) {
314 return client.submit(new HigherEntry<String, Versioned<byte[]>>(key));
315 }
316
317 @Override
318 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> lowerEntry(
319 String key) {
320 return client.submit(new LowerEntry<>(key));
321 }
322
323 @Override
324 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> firstEntry() {
325 return client.submit(new FirstEntry());
326 }
327
328 @Override
329 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> lastEntry() {
330 return client.submit(new LastEntry<String, Versioned<byte[]>>());
331 }
332
333 @Override
334 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> pollFirstEntry() {
335 return client.submit(new PollFirstEntry());
336 }
337
338 @Override
339 public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> pollLastEntry() {
340 return client.submit(new PollLastEntry());
341 }
342
343 @Override
344 public CompletableFuture<String> lowerKey(String key) {
345 return client.submit(new LowerKey(key));
346 }
347
348 @Override
349 public CompletableFuture<String> floorKey(String key) {
350 return client.submit(new FloorKey(key));
351 }
352
353 @Override
354 public CompletableFuture<String> ceilingKey(String key) {
355 return client.submit(new CeilingKey(key));
356 }
357
358 @Override
359 public CompletableFuture<String> higherKey(String key) {
360 return client.submit(new HigherKey(key));
361 }
362
363 @Override
364 public CompletableFuture<NavigableSet<String>> navigableKeySet() {
365 throw new UnsupportedOperationException("This operation is not yet " +
366 "supported.");
367 }
368
369 @Override
370 public CompletableFuture<NavigableMap<String, byte[]>> subMap(
371 String upperKey, String lowerKey, boolean inclusiveUpper,
372 boolean inclusiveLower) {
373 throw new UnsupportedOperationException("This operation is not yet " +
374 "supported."); }
375
376 @Override
377 public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String,
378 byte[]> transaction) {
379 throw new UnsupportedOperationException("This operation is not yet " +
380 "supported.");
381 }
382
383 @Override
384 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]>
385 transaction) {
386 throw new UnsupportedOperationException("This operation is not yet " +
387 "supported.");
388 }
389
390 @Override
391 public CompletableFuture<Void> commit(TransactionId transactionId) {
392 throw new UnsupportedOperationException("This operation is not yet " +
393 "supported.");
394 }
395
396 @Override
397 public CompletableFuture<Void> rollback(TransactionId transactionId) {
398 throw new UnsupportedOperationException("This operation is not yet " +
399 "supported.");
400 }
401}