blob: a4e0bf95a124860c3f0f5dc507add16eaa0cde33 [file] [log] [blame]
Jordan Halterman471be0c2018-05-21 23:12:45 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.Collection;
Jordan Halterman5e884352018-05-21 22:11:07 -070019import java.util.Iterator;
20import java.util.List;
Jordan Halterman471be0c2018-05-21 23:12:45 -070021import java.util.Map;
22import java.util.Map.Entry;
Jordan Halterman5e884352018-05-21 22:11:07 -070023import java.util.NoSuchElementException;
Jordan Halterman471be0c2018-05-21 23:12:45 -070024import java.util.Set;
25import java.util.TreeMap;
26import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.Executor;
28import java.util.function.Consumer;
29import java.util.stream.Collectors;
30
31import com.google.common.collect.HashMultiset;
32import com.google.common.collect.ImmutableList;
33import com.google.common.collect.ImmutableSet;
34import com.google.common.collect.Iterables;
35import com.google.common.collect.Maps;
36import com.google.common.collect.Multiset;
37import com.google.common.collect.Multisets;
38import org.onlab.util.Match;
39import org.onlab.util.Tools;
40import org.onosproject.cluster.PartitionId;
41import org.onosproject.store.service.AsyncConsistentMultimap;
Jordan Halterman5e884352018-05-21 22:11:07 -070042import org.onosproject.store.service.AsyncIterator;
Jordan Halterman471be0c2018-05-21 23:12:45 -070043import org.onosproject.store.service.MultimapEventListener;
44import org.onosproject.store.service.Versioned;
45
46import static com.google.common.base.Preconditions.checkNotNull;
47
48/**
49 * {@link AsyncConsistentMultimap} that has its entries partitioned horizontally across
50 * several {@link AsyncConsistentMultimap maps}.
51 *
52 * @param <K> key type
53 * @param <V> value type
54 */
55public class PartitionedAsyncConsistentMultimap<K, V> implements AsyncConsistentMultimap<K, V> {
56
57 private final String name;
58 private final TreeMap<PartitionId, AsyncConsistentMultimap<K, V>> partitions = Maps.newTreeMap();
59 private final Hasher<K> keyHasher;
60
61 public PartitionedAsyncConsistentMultimap(String name,
62 Map<PartitionId, AsyncConsistentMultimap<K, V>> partitions,
63 Hasher<K> keyHasher) {
64 this.name = name;
65 this.partitions.putAll(checkNotNull(partitions));
66 this.keyHasher = checkNotNull(keyHasher);
67 }
68
69 @Override
70 public String name() {
71 return name;
72 }
73
74 @Override
75 public CompletableFuture<Integer> size() {
76 return Tools.allOf(getMultimaps().stream().map(m -> m.size()).collect(Collectors.toList()),
77 Math::addExact,
78 0);
79 }
80
81 @Override
82 public CompletableFuture<Boolean> isEmpty() {
83 return size().thenApply(size -> size == 0);
84 }
85
86 @Override
87 public CompletableFuture<Boolean> containsKey(K key) {
88 return getMultimap(key).containsKey(key);
89 }
90
91 @Override
92 public CompletableFuture<Boolean> containsValue(V value) {
93 return Tools.firstOf(getMultimaps().stream().map(m -> m.containsValue(value)).collect(Collectors.toList()),
94 Match.ifValue(true),
95 false);
96 }
97
98 @Override
99 public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
100 return getMultimap(key).get(key);
101 }
102
103 @Override
104 public CompletableFuture<Boolean> containsEntry(K key, V value) {
105 return getMultimap(key).containsEntry(key, value);
106 }
107
108 @Override
109 public CompletableFuture<Boolean> put(K key, V value) {
110 return getMultimap(key).put(key, value);
111 }
112
113 @Override
Jordan Halterman8c57a092018-06-04 14:53:06 -0700114 public CompletableFuture<Versioned<Collection<? extends V>>> putAndGet(K key, V value) {
115 return getMultimap(key).putAndGet(key, value);
116 }
117
118 @Override
Jordan Halterman471be0c2018-05-21 23:12:45 -0700119 public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
120 return getMultimap(key).removeAll(key, values);
121 }
122
123 @Override
124 public CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key) {
125 return getMultimap(key).removeAll(key);
126 }
127
128 @Override
129 public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
130 return getMultimap(key).putAll(key, values);
131 }
132
133 @Override
134 public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
135 return getMultimap(key).replaceValues(key, values);
136 }
137
138 @Override
139 public CompletableFuture<Map<K, Collection<V>>> asMap() {
140 throw new UnsupportedOperationException("Expensive operation.");
141 }
142
143 @Override
144 public CompletableFuture<Void> clear() {
145 return CompletableFuture.allOf(getMultimaps().stream()
146 .map(map -> map.clear())
147 .toArray(CompletableFuture[]::new));
148 }
149
150 @Override
151 public CompletableFuture<Set<K>> keySet() {
152 return Tools.allOf(getMultimaps().stream().map(m -> m.keySet()).collect(Collectors.toList()),
153 (s1, s2) -> ImmutableSet.<K>builder().addAll(s1).addAll(s2).build(),
154 ImmutableSet.of());
155 }
156
157 @Override
158 public CompletableFuture<Multiset<K>> keys() {
159 return Tools.allOf(getMultimaps().stream().map(m -> m.keys()).collect(Collectors.toList()))
160 .thenApply(results -> results.stream().reduce(Multisets::sum).orElse(HashMultiset.create()));
161 }
162
163 @Override
164 public CompletableFuture<Multiset<V>> values() {
165 return Tools.allOf(getMultimaps().stream().map(m -> m.values()).collect(Collectors.toList()))
166 .thenApply(results -> results.stream().reduce(Multisets::sum).orElse(HashMultiset.create()));
167 }
168
169 @Override
170 public CompletableFuture<Collection<Entry<K, V>>> entries() {
171 return Tools.allOf(getMultimaps().stream().map(m -> m.entries()).collect(Collectors.toList()))
172 .thenApply(results -> results.stream().reduce((s1, s2) -> ImmutableList.copyOf(Iterables.concat(s1, s2)))
173 .orElse(ImmutableList.of()));
174 }
175
176 @Override
177 public CompletableFuture<Boolean> remove(K key, V value) {
178 return getMultimap(key).remove(key, value);
179 }
180
181 @Override
Jordan Halterman8c57a092018-06-04 14:53:06 -0700182 public CompletableFuture<Versioned<Collection<? extends V>>> removeAndGet(K key, V value) {
183 return getMultimap(key).removeAndGet(key, value);
184 }
185
186 @Override
Jordan Halterman5e884352018-05-21 22:11:07 -0700187 public CompletableFuture<AsyncIterator<Entry<K, V>>> iterator() {
188 return Tools.allOf(getMultimaps().stream().map(m -> m.iterator()).collect(Collectors.toList()))
189 .thenApply(PartitionedMultimapIterator::new);
190 }
191
192 @Override
Jordan Halterman471be0c2018-05-21 23:12:45 -0700193 public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
194 return CompletableFuture.allOf(getMultimaps().stream()
195 .map(map -> map.addListener(listener, executor))
196 .toArray(CompletableFuture[]::new));
197 }
198
199 @Override
200 public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
201 return CompletableFuture.allOf(getMultimaps().stream()
202 .map(map -> map.removeListener(listener))
203 .toArray(CompletableFuture[]::new));
204 }
205
206 @Override
207 public void addStatusChangeListener(Consumer<Status> listener) {
208 partitions.values().forEach(map -> map.addStatusChangeListener(listener));
209 }
210
211 @Override
212 public void removeStatusChangeListener(Consumer<Status> listener) {
213 partitions.values().forEach(map -> map.removeStatusChangeListener(listener));
214 }
215
216 @Override
217 public Collection<Consumer<Status>> statusChangeListeners() {
218 throw new UnsupportedOperationException();
219 }
220
221 /**
222 * Returns the map (partition) to which the specified key maps.
223 *
224 * @param key key
225 * @return AsyncConsistentMap to which key maps
226 */
227 private AsyncConsistentMultimap<K, V> getMultimap(K key) {
228 return partitions.get(keyHasher.hash(key));
229 }
230
231 /**
232 * Returns all the constituent maps.
233 *
234 * @return collection of maps.
235 */
236 private Collection<AsyncConsistentMultimap<K, V>> getMultimaps() {
237 return partitions.values();
238 }
Jordan Halterman5e884352018-05-21 22:11:07 -0700239
240 private class PartitionedMultimapIterator<K, V> implements AsyncIterator<Map.Entry<K, V>> {
241 private final Iterator<AsyncIterator<Entry<K, V>>> iterators;
242 private volatile AsyncIterator<Entry<K, V>> iterator;
243
244 public PartitionedMultimapIterator(List<AsyncIterator<Entry<K, V>>> iterators) {
245 this.iterators = iterators.iterator();
246 }
247
248 @Override
249 public CompletableFuture<Boolean> hasNext() {
250 if (iterator == null && iterators.hasNext()) {
251 iterator = iterators.next();
252 }
253 if (iterator == null) {
254 return CompletableFuture.completedFuture(false);
255 }
256 return iterator.hasNext()
257 .thenCompose(hasNext -> {
258 if (!hasNext) {
259 iterator = null;
260 return hasNext();
261 }
262 return CompletableFuture.completedFuture(true);
263 });
264 }
265
266 @Override
267 public CompletableFuture<Entry<K, V>> next() {
268 if (iterator == null && iterators.hasNext()) {
269 iterator = iterators.next();
270 }
271 if (iterator == null) {
272 return Tools.exceptionalFuture(new NoSuchElementException());
273 }
274 return iterator.next();
275 }
276 }
Jordan Halterman471be0c2018-05-21 23:12:45 -0700277}