blob: e444eb50b4ca8f14c7cf0ccb4190484e0ab20caa [file] [log] [blame]
Jordan Halterman00e92da2018-05-22 23:05:52 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuskab6d31672018-07-27 17:03:46 -070016package org.onosproject.store.atomix.primitives.impl;
Jordan Halterman00e92da2018-05-22 23:05:52 -070017
Jordan Haltermanbc982392018-08-07 15:02:37 -070018import java.time.Duration;
Jordan Halterman00e92da2018-05-22 23:05:52 -070019import java.util.Collection;
20import java.util.Map;
21import java.util.Set;
22import java.util.concurrent.CompletableFuture;
23import java.util.concurrent.Executor;
24import java.util.function.Function;
25
26import com.google.common.collect.Maps;
27import com.google.common.collect.Multiset;
28import io.atomix.core.map.impl.TranscodingAsyncDistributedMap;
29import org.onosproject.store.service.AsyncConsistentMultimap;
30import org.onosproject.store.service.AsyncIterator;
31import org.onosproject.store.service.MultimapEvent;
32import org.onosproject.store.service.MultimapEventListener;
33import org.onosproject.store.service.Versioned;
34
Jordan Halterman6cf60c32018-08-15 01:22:51 -070035import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptMapFuture;
36
Jordan Halterman00e92da2018-05-22 23:05:52 -070037/**
38 * Atomix consistent map.
39 */
40public class AtomixConsistentMultimap<K, V> implements AsyncConsistentMultimap<K, V> {
41 private final io.atomix.core.multimap.AsyncAtomicMultimap<K, V> atomixMultimap;
42 private final Map<MultimapEventListener<K, V>, io.atomix.core.multimap.AtomicMultimapEventListener<K, V>>
43 listenerMap = Maps.newIdentityHashMap();
44
45 public AtomixConsistentMultimap(io.atomix.core.multimap.AsyncAtomicMultimap<K, V> atomixMultimap) {
46 this.atomixMultimap = atomixMultimap;
47 }
48
49 @Override
50 public String name() {
51 return atomixMultimap.name();
52 }
53
54 @Override
55 public CompletableFuture<Integer> size() {
Jordan Halterman6cf60c32018-08-15 01:22:51 -070056 return adaptMapFuture(atomixMultimap.size());
Jordan Halterman00e92da2018-05-22 23:05:52 -070057 }
58
59 @Override
60 public CompletableFuture<Boolean> containsKey(K key) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -070061 return adaptMapFuture(atomixMultimap.containsKey(key));
Jordan Halterman00e92da2018-05-22 23:05:52 -070062 }
63
64 @Override
65 public CompletableFuture<Boolean> containsValue(V value) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -070066 return adaptMapFuture(atomixMultimap.containsValue(value));
Jordan Halterman00e92da2018-05-22 23:05:52 -070067 }
68
69 @Override
70 public CompletableFuture<Boolean> isEmpty() {
Jordan Halterman6cf60c32018-08-15 01:22:51 -070071 return adaptMapFuture(atomixMultimap.isEmpty());
Jordan Halterman00e92da2018-05-22 23:05:52 -070072 }
73
74 @Override
75 public CompletableFuture<Boolean> containsEntry(K key, V value) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -070076 return adaptMapFuture(atomixMultimap.containsEntry(key, value));
Jordan Halterman00e92da2018-05-22 23:05:52 -070077 }
78
79 @Override
80 public CompletableFuture<Boolean> put(K key, V value) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -070081 return adaptMapFuture(atomixMultimap.put(key, value));
Jordan Halterman00e92da2018-05-22 23:05:52 -070082 }
83
84 @Override
85 public CompletableFuture<Versioned<Collection<? extends V>>> putAndGet(K key, V value) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -070086 return adaptMapFuture(atomixMultimap.put(key, value).thenCompose(v -> atomixMultimap.get(key))
87 .thenApply(this::toVersioned));
Jordan Halterman00e92da2018-05-22 23:05:52 -070088 }
89
90 @Override
91 public CompletableFuture<Boolean> remove(K key, V value) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -070092 return adaptMapFuture(atomixMultimap.remove(key, value));
Jordan Halterman00e92da2018-05-22 23:05:52 -070093 }
94
95 @Override
96 public CompletableFuture<Versioned<Collection<? extends V>>> removeAndGet(K key, V value) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -070097 return adaptMapFuture(atomixMultimap.remove(key, value).thenCompose(v -> atomixMultimap.get(key))
98 .thenApply(this::toVersioned));
Jordan Halterman00e92da2018-05-22 23:05:52 -070099 }
100
101 @Override
102 public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700103 return adaptMapFuture(atomixMultimap.removeAll(key, values));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700104 }
105
106 @Override
107 public CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700108 return adaptMapFuture(atomixMultimap.removeAll(key).thenApply(this::toVersioned));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700109 }
110
111 @Override
pier4fcb4b22019-10-11 18:19:59 +0200112 public CompletableFuture<Boolean> removeAll(Map<K, Collection<? extends V>> mapping) {
113 return adaptMapFuture(atomixMultimap.removeAll(mapping));
114 }
115
116 @Override
Jordan Halterman00e92da2018-05-22 23:05:52 -0700117 public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700118 return adaptMapFuture(atomixMultimap.putAll(key, values));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700119 }
120
121 @Override
pier4fcb4b22019-10-11 18:19:59 +0200122 public CompletableFuture<Boolean> putAll(Map<K, Collection<? extends V>> mapping) {
123 return adaptMapFuture(atomixMultimap.putAll(mapping));
124 }
125
126 @Override
Jordan Halterman00e92da2018-05-22 23:05:52 -0700127 @SuppressWarnings("unchecked")
128 public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700129 return adaptMapFuture(atomixMultimap.replaceValues(key, values).thenApply(this::toVersioned));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700130 }
131
132 @Override
133 public CompletableFuture<Void> clear() {
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700134 return adaptMapFuture(atomixMultimap.clear());
Jordan Halterman00e92da2018-05-22 23:05:52 -0700135 }
136
137 @Override
138 @SuppressWarnings("unchecked")
139 public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700140 return adaptMapFuture(atomixMultimap.get(key).thenApply(this::toVersioned));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700141 }
142
143 @Override
144 public CompletableFuture<Set<K>> keySet() {
Jordan Haltermanbc982392018-08-07 15:02:37 -0700145 return CompletableFuture.completedFuture(atomixMultimap.keySet()
146 .sync(Duration.ofMillis(DEFAULT_OPERATION_TIMEOUT_MILLIS)));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700147 }
148
149 @Override
150 public CompletableFuture<Multiset<K>> keys() {
Jordan Haltermanbc982392018-08-07 15:02:37 -0700151 return CompletableFuture.completedFuture(atomixMultimap.keys()
152 .sync(Duration.ofMillis(DEFAULT_OPERATION_TIMEOUT_MILLIS)));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700153 }
154
155 @Override
156 public CompletableFuture<Multiset<V>> values() {
Jordan Haltermanbc982392018-08-07 15:02:37 -0700157 return CompletableFuture.completedFuture(atomixMultimap.values()
158 .sync(Duration.ofMillis(DEFAULT_OPERATION_TIMEOUT_MILLIS)));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700159 }
160
161 @Override
162 public CompletableFuture<Collection<Map.Entry<K, V>>> entries() {
Jordan Haltermanbc982392018-08-07 15:02:37 -0700163 return CompletableFuture.completedFuture(atomixMultimap.entries()
164 .sync(Duration.ofMillis(DEFAULT_OPERATION_TIMEOUT_MILLIS)));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700165 }
166
167 @Override
168 public CompletableFuture<AsyncIterator<Map.Entry<K, V>>> iterator() {
169 return CompletableFuture.completedFuture(new AtomixIterator<>(atomixMultimap.entries().iterator()));
170 }
171
172 @Override
173 public CompletableFuture<Map<K, Collection<V>>> asMap() {
174 return CompletableFuture.completedFuture(
175 new TranscodingAsyncDistributedMap<K, Collection<V>, K, io.atomix.utils.time.Versioned<Collection<V>>>(
176 atomixMultimap.asMap(),
177 Function.identity(),
178 Function.identity(),
179 v -> new io.atomix.utils.time.Versioned<>(v, 0),
Jordan Haltermanbc982392018-08-07 15:02:37 -0700180 v -> v.value())
181 .sync(Duration.ofMillis(DEFAULT_OPERATION_TIMEOUT_MILLIS)));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700182 }
183
184 @Override
185 public synchronized CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
186 io.atomix.core.multimap.AtomicMultimapEventListener<K, V> atomixListener = event ->
187 listener.event(new MultimapEvent<K, V>(
188 name(),
189 event.key(),
190 event.newValue(),
191 event.oldValue()));
192 listenerMap.put(listener, atomixListener);
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700193 return adaptMapFuture(atomixMultimap.addListener(atomixListener, executor));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700194 }
195
196 @Override
197 public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
198 io.atomix.core.multimap.AtomicMultimapEventListener<K, V> atomixListener = listenerMap.remove(listener);
199 if (atomixListener != null) {
Jordan Halterman6cf60c32018-08-15 01:22:51 -0700200 return adaptMapFuture(atomixMultimap.removeListener(atomixListener));
Jordan Halterman00e92da2018-05-22 23:05:52 -0700201 }
202 return CompletableFuture.completedFuture(null);
203 }
204
205 private Versioned<Collection<? extends V>> toVersioned(
206 io.atomix.utils.time.Versioned<Collection<V>> versioned) {
207 return versioned != null
208 ? new Versioned<>(versioned.value(), versioned.version(), versioned.creationTime())
209 : null;
210 }
211}