blob: 0cd3fe5c5c983a17afad157455f2cef730b3557c [file] [log] [blame]
Jordan Halterman142b9842018-05-21 23:12:45 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.Collection;
Jordan Halterman21ef9e42018-05-21 22:11:07 -070019import java.util.Iterator;
20import java.util.List;
Jordan Halterman142b9842018-05-21 23:12:45 -070021import java.util.Map;
22import java.util.Map.Entry;
Jordan Halterman21ef9e42018-05-21 22:11:07 -070023import java.util.NoSuchElementException;
Jordan Halterman142b9842018-05-21 23:12:45 -070024import java.util.Set;
25import java.util.TreeMap;
26import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.Executor;
28import java.util.function.Consumer;
29import java.util.stream.Collectors;
30
31import com.google.common.collect.HashMultiset;
32import com.google.common.collect.ImmutableList;
33import com.google.common.collect.ImmutableSet;
34import com.google.common.collect.Iterables;
35import com.google.common.collect.Maps;
36import com.google.common.collect.Multiset;
37import com.google.common.collect.Multisets;
38import org.onlab.util.Match;
39import org.onlab.util.Tools;
40import org.onosproject.cluster.PartitionId;
41import org.onosproject.store.service.AsyncConsistentMultimap;
Jordan Halterman21ef9e42018-05-21 22:11:07 -070042import org.onosproject.store.service.AsyncIterator;
Jordan Halterman142b9842018-05-21 23:12:45 -070043import org.onosproject.store.service.MultimapEventListener;
44import org.onosproject.store.service.Versioned;
45
46import static com.google.common.base.Preconditions.checkNotNull;
47
48/**
49 * {@link AsyncConsistentMultimap} that has its entries partitioned horizontally across
50 * several {@link AsyncConsistentMultimap maps}.
51 *
52 * @param <K> key type
53 * @param <V> value type
54 */
55public class PartitionedAsyncConsistentMultimap<K, V> implements AsyncConsistentMultimap<K, V> {
56
57 private final String name;
58 private final TreeMap<PartitionId, AsyncConsistentMultimap<K, V>> partitions = Maps.newTreeMap();
59 private final Hasher<K> keyHasher;
60
61 public PartitionedAsyncConsistentMultimap(String name,
62 Map<PartitionId, AsyncConsistentMultimap<K, V>> partitions,
63 Hasher<K> keyHasher) {
64 this.name = name;
65 this.partitions.putAll(checkNotNull(partitions));
66 this.keyHasher = checkNotNull(keyHasher);
67 }
68
69 @Override
70 public String name() {
71 return name;
72 }
73
74 @Override
75 public CompletableFuture<Integer> size() {
76 return Tools.allOf(getMultimaps().stream().map(m -> m.size()).collect(Collectors.toList()),
77 Math::addExact,
78 0);
79 }
80
81 @Override
82 public CompletableFuture<Boolean> isEmpty() {
83 return size().thenApply(size -> size == 0);
84 }
85
86 @Override
87 public CompletableFuture<Boolean> containsKey(K key) {
88 return getMultimap(key).containsKey(key);
89 }
90
91 @Override
92 public CompletableFuture<Boolean> containsValue(V value) {
93 return Tools.firstOf(getMultimaps().stream().map(m -> m.containsValue(value)).collect(Collectors.toList()),
94 Match.ifValue(true),
95 false);
96 }
97
98 @Override
99 public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
100 return getMultimap(key).get(key);
101 }
102
103 @Override
104 public CompletableFuture<Boolean> containsEntry(K key, V value) {
105 return getMultimap(key).containsEntry(key, value);
106 }
107
108 @Override
109 public CompletableFuture<Boolean> put(K key, V value) {
110 return getMultimap(key).put(key, value);
111 }
112
113 @Override
Jordan Halterman99c654d2018-06-04 14:53:06 -0700114 public CompletableFuture<Versioned<Collection<? extends V>>> putAndGet(K key, V value) {
115 return getMultimap(key).putAndGet(key, value);
116 }
117
118 @Override
Jordan Halterman142b9842018-05-21 23:12:45 -0700119 public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
120 return getMultimap(key).removeAll(key, values);
121 }
122
123 @Override
124 public CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key) {
125 return getMultimap(key).removeAll(key);
126 }
127
128 @Override
pierfa48c6e2019-10-11 18:19:59 +0200129 public CompletableFuture<Boolean> removeAll(Map<K, Collection<? extends V>> mapping) {
130 Map<PartitionId, Map<K, Collection<? extends V>>> subMappings = buildSubMappings(mapping);
131 // Semantic is that any change in the partitions should return true
132 return Tools.allOf(subMappings.entrySet().stream()
133 .map(entry -> partitions.get(entry.getKey()).removeAll(entry.getValue()))
134 .collect(Collectors.toList()),
135 Boolean::logicalOr, false);
136 }
137
138 @Override
Jordan Halterman142b9842018-05-21 23:12:45 -0700139 public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
140 return getMultimap(key).putAll(key, values);
141 }
142
143 @Override
pierfa48c6e2019-10-11 18:19:59 +0200144 public CompletableFuture<Boolean> putAll(Map<K, Collection<? extends V>> mapping) {
145 Map<PartitionId, Map<K, Collection<? extends V>>> subMappings = buildSubMappings(mapping);
146 // Semantic is that any change in the partitions should return true
147 return Tools.allOf(subMappings.entrySet().stream()
148 .map(entry -> partitions.get(entry.getKey()).putAll(entry.getValue()))
149 .collect(Collectors.toList()),
150 Boolean::logicalOr, false);
151 }
152
153 @Override
Jordan Halterman142b9842018-05-21 23:12:45 -0700154 public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
155 return getMultimap(key).replaceValues(key, values);
156 }
157
158 @Override
159 public CompletableFuture<Map<K, Collection<V>>> asMap() {
160 throw new UnsupportedOperationException("Expensive operation.");
161 }
162
163 @Override
164 public CompletableFuture<Void> clear() {
165 return CompletableFuture.allOf(getMultimaps().stream()
166 .map(map -> map.clear())
167 .toArray(CompletableFuture[]::new));
168 }
169
170 @Override
171 public CompletableFuture<Set<K>> keySet() {
172 return Tools.allOf(getMultimaps().stream().map(m -> m.keySet()).collect(Collectors.toList()),
173 (s1, s2) -> ImmutableSet.<K>builder().addAll(s1).addAll(s2).build(),
174 ImmutableSet.of());
175 }
176
177 @Override
178 public CompletableFuture<Multiset<K>> keys() {
179 return Tools.allOf(getMultimaps().stream().map(m -> m.keys()).collect(Collectors.toList()))
180 .thenApply(results -> results.stream().reduce(Multisets::sum).orElse(HashMultiset.create()));
181 }
182
183 @Override
184 public CompletableFuture<Multiset<V>> values() {
185 return Tools.allOf(getMultimaps().stream().map(m -> m.values()).collect(Collectors.toList()))
186 .thenApply(results -> results.stream().reduce(Multisets::sum).orElse(HashMultiset.create()));
187 }
188
189 @Override
190 public CompletableFuture<Collection<Entry<K, V>>> entries() {
191 return Tools.allOf(getMultimaps().stream().map(m -> m.entries()).collect(Collectors.toList()))
192 .thenApply(results -> results.stream().reduce((s1, s2) -> ImmutableList.copyOf(Iterables.concat(s1, s2)))
193 .orElse(ImmutableList.of()));
194 }
195
196 @Override
197 public CompletableFuture<Boolean> remove(K key, V value) {
198 return getMultimap(key).remove(key, value);
199 }
200
201 @Override
Jordan Halterman99c654d2018-06-04 14:53:06 -0700202 public CompletableFuture<Versioned<Collection<? extends V>>> removeAndGet(K key, V value) {
203 return getMultimap(key).removeAndGet(key, value);
204 }
205
206 @Override
Jordan Halterman21ef9e42018-05-21 22:11:07 -0700207 public CompletableFuture<AsyncIterator<Entry<K, V>>> iterator() {
208 return Tools.allOf(getMultimaps().stream().map(m -> m.iterator()).collect(Collectors.toList()))
209 .thenApply(PartitionedMultimapIterator::new);
210 }
211
212 @Override
Jordan Halterman142b9842018-05-21 23:12:45 -0700213 public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
214 return CompletableFuture.allOf(getMultimaps().stream()
215 .map(map -> map.addListener(listener, executor))
216 .toArray(CompletableFuture[]::new));
217 }
218
219 @Override
220 public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
221 return CompletableFuture.allOf(getMultimaps().stream()
222 .map(map -> map.removeListener(listener))
223 .toArray(CompletableFuture[]::new));
224 }
225
226 @Override
227 public void addStatusChangeListener(Consumer<Status> listener) {
228 partitions.values().forEach(map -> map.addStatusChangeListener(listener));
229 }
230
231 @Override
232 public void removeStatusChangeListener(Consumer<Status> listener) {
233 partitions.values().forEach(map -> map.removeStatusChangeListener(listener));
234 }
235
236 @Override
237 public Collection<Consumer<Status>> statusChangeListeners() {
238 throw new UnsupportedOperationException();
239 }
240
241 /**
242 * Returns the map (partition) to which the specified key maps.
243 *
244 * @param key key
245 * @return AsyncConsistentMap to which key maps
246 */
247 private AsyncConsistentMultimap<K, V> getMultimap(K key) {
248 return partitions.get(keyHasher.hash(key));
249 }
250
251 /**
252 * Returns all the constituent maps.
253 *
254 * @return collection of maps.
255 */
256 private Collection<AsyncConsistentMultimap<K, V>> getMultimaps() {
257 return partitions.values();
258 }
Jordan Halterman21ef9e42018-05-21 22:11:07 -0700259
pierfa48c6e2019-10-11 18:19:59 +0200260 /**
261 * Build sub-mappings for each partition.
262 *
263 * @param mapping initial mapping key-value
264 * @return sub-mappings partition-values
265 */
266 private Map<PartitionId, Map<K, Collection<? extends V>>> buildSubMappings(
267 Map<K, Collection<? extends V>> mapping) {
268 Map<PartitionId, Map<K, Collection<? extends V>>> subMappings = Maps.newHashMap();
269 // Build first a mapping with the partitions
270 mapping.forEach((key, values) -> subMappings.compute(keyHasher.hash(key), (k, v) -> {
271 if (v == null) {
272 v = Maps.newHashMap();
273 }
274 v.put(key, values);
275 return v;
276 }));
277 return subMappings;
278 }
279
Jordan Halterman21ef9e42018-05-21 22:11:07 -0700280 private class PartitionedMultimapIterator<K, V> implements AsyncIterator<Map.Entry<K, V>> {
281 private final Iterator<AsyncIterator<Entry<K, V>>> iterators;
282 private volatile AsyncIterator<Entry<K, V>> iterator;
283
284 public PartitionedMultimapIterator(List<AsyncIterator<Entry<K, V>>> iterators) {
285 this.iterators = iterators.iterator();
286 }
287
288 @Override
289 public CompletableFuture<Boolean> hasNext() {
290 if (iterator == null && iterators.hasNext()) {
291 iterator = iterators.next();
292 }
293 if (iterator == null) {
294 return CompletableFuture.completedFuture(false);
295 }
296 return iterator.hasNext()
297 .thenCompose(hasNext -> {
298 if (!hasNext) {
299 iterator = null;
300 return hasNext();
301 }
302 return CompletableFuture.completedFuture(true);
303 });
304 }
305
306 @Override
307 public CompletableFuture<Entry<K, V>> next() {
308 if (iterator == null && iterators.hasNext()) {
309 iterator = iterators.next();
310 }
311 if (iterator == null) {
312 return Tools.exceptionalFuture(new NoSuchElementException());
313 }
314 return iterator.next();
315 }
316 }
Jordan Halterman142b9842018-05-21 23:12:45 -0700317}