blob: 38ba0f383c34040228839e5bca113274e99dc771 [file] [log] [blame]
Madan Jampani10073672016-01-21 19:13:59 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani10073672016-01-21 19:13:59 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19
20import java.util.Collection;
Jordan Haltermandae11602018-07-03 00:00:47 -070021import java.util.Iterator;
22import java.util.List;
Madan Jampani10073672016-01-21 19:13:59 -080023import java.util.Map;
24import java.util.Map.Entry;
Jordan Haltermandae11602018-07-03 00:00:47 -070025import java.util.NoSuchElementException;
Madan Jampani10073672016-01-21 19:13:59 -080026import java.util.Set;
27import java.util.TreeMap;
28import java.util.concurrent.CompletableFuture;
Madan Jampani0463cf92016-05-04 14:46:08 -070029import java.util.concurrent.Executor;
Madan Jampani10073672016-01-21 19:13:59 -080030import java.util.function.BiFunction;
Madan Jampani1d3b6172016-04-28 13:22:57 -070031import java.util.function.Consumer;
Madan Jampani10073672016-01-21 19:13:59 -080032import java.util.function.Predicate;
Madan Jampani74da78b2016-02-09 21:18:36 -080033import java.util.stream.Collectors;
Madan Jampani10073672016-01-21 19:13:59 -080034
Madan Jampani307a21e2016-09-01 15:49:47 -070035import org.onlab.util.Match;
Madan Jampani74da78b2016-02-09 21:18:36 -080036import org.onlab.util.Tools;
Madan Jampani10073672016-01-21 19:13:59 -080037import org.onosproject.cluster.PartitionId;
Madan Jampani74da78b2016-02-09 21:18:36 -080038import org.onosproject.store.primitives.MapUpdate;
39import org.onosproject.store.primitives.TransactionId;
Madan Jampani10073672016-01-21 19:13:59 -080040import org.onosproject.store.service.AsyncConsistentMap;
Jordan Haltermandae11602018-07-03 00:00:47 -070041import org.onosproject.store.service.AsyncIterator;
Madan Jampani10073672016-01-21 19:13:59 -080042import org.onosproject.store.service.MapEventListener;
Jordan Halterman948d6592017-04-20 17:18:24 -070043import org.onosproject.store.service.TransactionLog;
44import org.onosproject.store.service.Version;
Madan Jampani10073672016-01-21 19:13:59 -080045import org.onosproject.store.service.Versioned;
Madan Jampani0463cf92016-05-04 14:46:08 -070046
Madan Jampani307a21e2016-09-01 15:49:47 -070047import com.google.common.collect.ImmutableList;
48import com.google.common.collect.ImmutableSet;
Madan Jampani10073672016-01-21 19:13:59 -080049import com.google.common.collect.Maps;
Madan Jampani10073672016-01-21 19:13:59 -080050
51/**
52 * {@link AsyncConsistentMap} that has its entries partitioned horizontally across
53 * several {@link AsyncConsistentMap maps}.
54 *
55 * @param <K> key type
56 * @param <V> value type
57 */
58public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
59
60 private final String name;
61 private final TreeMap<PartitionId, AsyncConsistentMap<K, V>> partitions = Maps.newTreeMap();
62 private final Hasher<K> keyHasher;
63
64 public PartitionedAsyncConsistentMap(String name,
65 Map<PartitionId, AsyncConsistentMap<K, V>> partitions,
66 Hasher<K> keyHasher) {
67 this.name = name;
68 this.partitions.putAll(checkNotNull(partitions));
69 this.keyHasher = checkNotNull(keyHasher);
70 }
71
72 @Override
73 public String name() {
74 return name;
75 }
76
77 @Override
78 public CompletableFuture<Integer> size() {
Madan Jampani307a21e2016-09-01 15:49:47 -070079 return Tools.allOf(getMaps().stream().map(m -> m.size()).collect(Collectors.toList()),
80 Math::addExact,
81 0);
Madan Jampani10073672016-01-21 19:13:59 -080082 }
83
84 @Override
85 public CompletableFuture<Boolean> isEmpty() {
86 return size().thenApply(size -> size == 0);
87 }
88
89 @Override
90 public CompletableFuture<Boolean> containsKey(K key) {
91 return getMap(key).containsKey(key);
92 }
93
94 @Override
95 public CompletableFuture<Boolean> containsValue(V value) {
Madan Jampani307a21e2016-09-01 15:49:47 -070096 return Tools.firstOf(getMaps().stream().map(m -> m.containsValue(value)).collect(Collectors.toList()),
97 Match.ifValue(true),
98 false);
Madan Jampani10073672016-01-21 19:13:59 -080099 }
Jordan Haltermanf6272442017-04-20 02:18:08 -0700100
Madan Jampani10073672016-01-21 19:13:59 -0800101 @Override
102 public CompletableFuture<Versioned<V>> get(K key) {
103 return getMap(key).get(key);
104 }
105
106 @Override
Jordan Haltermanf6272442017-04-20 02:18:08 -0700107 public CompletableFuture<Versioned<V>> getOrDefault(K key, V defaultValue) {
108 return getMap(key).getOrDefault(key, defaultValue);
109 }
110
111 @Override
Madan Jampani10073672016-01-21 19:13:59 -0800112 public CompletableFuture<Versioned<V>> computeIf(K key,
113 Predicate<? super V> condition,
114 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
115 return getMap(key).computeIf(key, condition, remappingFunction);
116 }
117
118 @Override
119 public CompletableFuture<Versioned<V>> put(K key, V value) {
120 return getMap(key).put(key, value);
121 }
122
123 @Override
124 public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
125 return getMap(key).putAndGet(key, value);
126 }
127
128 @Override
129 public CompletableFuture<Versioned<V>> remove(K key) {
130 return getMap(key).remove(key);
131 }
132
133 @Override
134 public CompletableFuture<Void> clear() {
135 return CompletableFuture.allOf(getMaps().stream()
136 .map(map -> map.clear())
137 .toArray(CompletableFuture[]::new));
138 }
139
140 @Override
141 public CompletableFuture<Set<K>> keySet() {
Madan Jampani307a21e2016-09-01 15:49:47 -0700142 return Tools.allOf(getMaps().stream().map(m -> m.keySet()).collect(Collectors.toList()),
143 (s1, s2) -> ImmutableSet.<K>builder().addAll(s1).addAll(s2).build(),
144 ImmutableSet.of());
Madan Jampani10073672016-01-21 19:13:59 -0800145 }
146
147 @Override
148 public CompletableFuture<Collection<Versioned<V>>> values() {
Madan Jampani307a21e2016-09-01 15:49:47 -0700149 return Tools.allOf(getMaps().stream().map(m -> m.values()).collect(Collectors.toList()),
150 (c1, c2) -> ImmutableList.<Versioned<V>>builder().addAll(c1).addAll(c2).build(),
151 ImmutableList.of());
Madan Jampani10073672016-01-21 19:13:59 -0800152 }
153
154 @Override
155 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
Madan Jampani307a21e2016-09-01 15:49:47 -0700156 return Tools.allOf(getMaps().stream().map(m -> m.entrySet()).collect(Collectors.toList()),
157 (s1, s2) -> ImmutableSet.<Entry<K, Versioned<V>>>builder().addAll(s1).addAll(s2).build(),
158 ImmutableSet.of());
Madan Jampani10073672016-01-21 19:13:59 -0800159 }
160
161 @Override
162 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
163 return getMap(key).putIfAbsent(key, value);
164 }
165
166 @Override
167 public CompletableFuture<Boolean> remove(K key, V value) {
168 return getMap(key).remove(key, value);
169 }
170
171 @Override
172 public CompletableFuture<Boolean> remove(K key, long version) {
173 return getMap(key).remove(key, version);
174 }
175
176 @Override
177 public CompletableFuture<Versioned<V>> replace(K key, V value) {
178 return getMap(key).replace(key, value);
179 }
180
181 @Override
182 public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
183 return getMap(key).replace(key, oldValue, newValue);
184 }
185
186 @Override
187 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
188 return getMap(key).replace(key, oldVersion, newValue);
189 }
190
191 @Override
Jordan Haltermandae11602018-07-03 00:00:47 -0700192 public CompletableFuture<AsyncIterator<Entry<K, Versioned<V>>>> iterator() {
193 return Tools.allOf(getMaps().stream().map(m -> m.iterator()).collect(Collectors.toList()))
194 .thenApply(PartitionedMultimapIterator::new);
195 }
196
197 @Override
Madan Jampani0463cf92016-05-04 14:46:08 -0700198 public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
Madan Jampani10073672016-01-21 19:13:59 -0800199 return CompletableFuture.allOf(getMaps().stream()
Madan Jampani0463cf92016-05-04 14:46:08 -0700200 .map(map -> map.addListener(listener, executor))
Madan Jampani10073672016-01-21 19:13:59 -0800201 .toArray(CompletableFuture[]::new));
202 }
203
204 @Override
205 public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
206 return CompletableFuture.allOf(getMaps().stream()
207 .map(map -> map.removeListener(listener))
208 .toArray(CompletableFuture[]::new));
209 }
210
Madan Jampani74da78b2016-02-09 21:18:36 -0800211 @Override
Jordan Halterman948d6592017-04-20 17:18:24 -0700212 public CompletableFuture<Version> begin(TransactionId transactionId) {
213 throw new UnsupportedOperationException();
214 }
Madan Jampani74da78b2016-02-09 21:18:36 -0800215
Jordan Halterman948d6592017-04-20 17:18:24 -0700216 @Override
217 public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
218 throw new UnsupportedOperationException();
219 }
Madan Jampani74da78b2016-02-09 21:18:36 -0800220
Jordan Halterman948d6592017-04-20 17:18:24 -0700221 @Override
222 public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
223 throw new UnsupportedOperationException();
Madan Jampani74da78b2016-02-09 21:18:36 -0800224 }
225
226 @Override
227 public CompletableFuture<Void> commit(TransactionId transactionId) {
Jordan Halterman948d6592017-04-20 17:18:24 -0700228 throw new UnsupportedOperationException();
Madan Jampani74da78b2016-02-09 21:18:36 -0800229 }
230
231 @Override
232 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Jordan Halterman948d6592017-04-20 17:18:24 -0700233 throw new UnsupportedOperationException();
Madan Jampani542d9e22016-04-05 15:39:55 -0700234 }
235
Madan Jampani1d3b6172016-04-28 13:22:57 -0700236 @Override
237 public void addStatusChangeListener(Consumer<Status> listener) {
238 partitions.values().forEach(map -> map.addStatusChangeListener(listener));
239 }
240
241 @Override
242 public void removeStatusChangeListener(Consumer<Status> listener) {
243 partitions.values().forEach(map -> map.removeStatusChangeListener(listener));
244 }
245
246 @Override
247 public Collection<Consumer<Status>> statusChangeListeners() {
248 throw new UnsupportedOperationException();
249 }
250
Madan Jampani10073672016-01-21 19:13:59 -0800251 /**
252 * Returns the map (partition) to which the specified key maps.
253 * @param key key
254 * @return AsyncConsistentMap to which key maps
255 */
256 private AsyncConsistentMap<K, V> getMap(K key) {
257 return partitions.get(keyHasher.hash(key));
258 }
259
260 /**
261 * Returns all the constituent maps.
262 * @return collection of maps.
263 */
264 private Collection<AsyncConsistentMap<K, V>> getMaps() {
265 return partitions.values();
266 }
Jordan Haltermandae11602018-07-03 00:00:47 -0700267
268 private class PartitionedMultimapIterator<K, V> implements AsyncIterator<Map.Entry<K, Versioned<V>>> {
269 private final Iterator<AsyncIterator<Entry<K, Versioned<V>>>> iterators;
270 private volatile AsyncIterator<Entry<K, Versioned<V>>> iterator;
271
272 public PartitionedMultimapIterator(List<AsyncIterator<Entry<K, Versioned<V>>>> iterators) {
273 this.iterators = iterators.iterator();
274 }
275
276 @Override
277 public CompletableFuture<Boolean> hasNext() {
278 if (iterator == null && iterators.hasNext()) {
279 iterator = iterators.next();
280 }
281 if (iterator == null) {
282 return CompletableFuture.completedFuture(false);
283 }
284 return iterator.hasNext()
285 .thenCompose(hasNext -> {
286 if (!hasNext) {
287 iterator = null;
288 return hasNext();
289 }
290 return CompletableFuture.completedFuture(true);
291 });
292 }
293
294 @Override
295 public CompletableFuture<Entry<K, Versioned<V>>> next() {
296 if (iterator == null && iterators.hasNext()) {
297 iterator = iterators.next();
298 }
299 if (iterator == null) {
300 return Tools.exceptionalFuture(new NoSuchElementException());
301 }
302 return iterator.next();
303 }
304 }
Madan Jampani10073672016-01-21 19:13:59 -0800305}