blob: 6378024f7e901af8c483e39aac6b5d220f88842e [file] [log] [blame]
Madan Jampani10073672016-01-21 19:13:59 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani10073672016-01-21 19:13:59 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19
20import java.util.Collection;
21import java.util.List;
22import java.util.Map;
23import java.util.Map.Entry;
24import java.util.Set;
25import java.util.TreeMap;
26import java.util.concurrent.CompletableFuture;
Madan Jampani0463cf92016-05-04 14:46:08 -070027import java.util.concurrent.Executor;
Madan Jampani10073672016-01-21 19:13:59 -080028import java.util.function.BiFunction;
Madan Jampani1d3b6172016-04-28 13:22:57 -070029import java.util.function.Consumer;
Madan Jampani10073672016-01-21 19:13:59 -080030import java.util.function.Predicate;
Madan Jampani74da78b2016-02-09 21:18:36 -080031import java.util.stream.Collectors;
Madan Jampani10073672016-01-21 19:13:59 -080032
Madan Jampani307a21e2016-09-01 15:49:47 -070033import org.onlab.util.Match;
Madan Jampani74da78b2016-02-09 21:18:36 -080034import org.onlab.util.Tools;
Madan Jampani10073672016-01-21 19:13:59 -080035import org.onosproject.cluster.PartitionId;
Madan Jampani74da78b2016-02-09 21:18:36 -080036import org.onosproject.store.primitives.MapUpdate;
37import org.onosproject.store.primitives.TransactionId;
Madan Jampani10073672016-01-21 19:13:59 -080038import org.onosproject.store.service.AsyncConsistentMap;
39import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080040import org.onosproject.store.service.MapTransaction;
Madan Jampani10073672016-01-21 19:13:59 -080041import org.onosproject.store.service.Versioned;
Madan Jampani0463cf92016-05-04 14:46:08 -070042
Madan Jampani307a21e2016-09-01 15:49:47 -070043import com.google.common.collect.ImmutableList;
44import com.google.common.collect.ImmutableSet;
Madan Jampani10073672016-01-21 19:13:59 -080045import com.google.common.collect.Lists;
46import com.google.common.collect.Maps;
Madan Jampani10073672016-01-21 19:13:59 -080047
48/**
49 * {@link AsyncConsistentMap} that has its entries partitioned horizontally across
50 * several {@link AsyncConsistentMap maps}.
51 *
52 * @param <K> key type
53 * @param <V> value type
54 */
55public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
56
57 private final String name;
58 private final TreeMap<PartitionId, AsyncConsistentMap<K, V>> partitions = Maps.newTreeMap();
59 private final Hasher<K> keyHasher;
60
61 public PartitionedAsyncConsistentMap(String name,
62 Map<PartitionId, AsyncConsistentMap<K, V>> partitions,
63 Hasher<K> keyHasher) {
64 this.name = name;
65 this.partitions.putAll(checkNotNull(partitions));
66 this.keyHasher = checkNotNull(keyHasher);
67 }
68
69 @Override
70 public String name() {
71 return name;
72 }
73
74 @Override
75 public CompletableFuture<Integer> size() {
Madan Jampani307a21e2016-09-01 15:49:47 -070076 return Tools.allOf(getMaps().stream().map(m -> m.size()).collect(Collectors.toList()),
77 Math::addExact,
78 0);
Madan Jampani10073672016-01-21 19:13:59 -080079 }
80
81 @Override
82 public CompletableFuture<Boolean> isEmpty() {
83 return size().thenApply(size -> size == 0);
84 }
85
86 @Override
87 public CompletableFuture<Boolean> containsKey(K key) {
88 return getMap(key).containsKey(key);
89 }
90
91 @Override
92 public CompletableFuture<Boolean> containsValue(V value) {
Madan Jampani307a21e2016-09-01 15:49:47 -070093 return Tools.firstOf(getMaps().stream().map(m -> m.containsValue(value)).collect(Collectors.toList()),
94 Match.ifValue(true),
95 false);
Madan Jampani10073672016-01-21 19:13:59 -080096 }
97 @Override
98 public CompletableFuture<Versioned<V>> get(K key) {
99 return getMap(key).get(key);
100 }
101
102 @Override
103 public CompletableFuture<Versioned<V>> computeIf(K key,
104 Predicate<? super V> condition,
105 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
106 return getMap(key).computeIf(key, condition, remappingFunction);
107 }
108
109 @Override
110 public CompletableFuture<Versioned<V>> put(K key, V value) {
111 return getMap(key).put(key, value);
112 }
113
114 @Override
115 public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
116 return getMap(key).putAndGet(key, value);
117 }
118
119 @Override
120 public CompletableFuture<Versioned<V>> remove(K key) {
121 return getMap(key).remove(key);
122 }
123
124 @Override
125 public CompletableFuture<Void> clear() {
126 return CompletableFuture.allOf(getMaps().stream()
127 .map(map -> map.clear())
128 .toArray(CompletableFuture[]::new));
129 }
130
131 @Override
132 public CompletableFuture<Set<K>> keySet() {
Madan Jampani307a21e2016-09-01 15:49:47 -0700133 return Tools.allOf(getMaps().stream().map(m -> m.keySet()).collect(Collectors.toList()),
134 (s1, s2) -> ImmutableSet.<K>builder().addAll(s1).addAll(s2).build(),
135 ImmutableSet.of());
Madan Jampani10073672016-01-21 19:13:59 -0800136 }
137
138 @Override
139 public CompletableFuture<Collection<Versioned<V>>> values() {
Madan Jampani307a21e2016-09-01 15:49:47 -0700140 return Tools.allOf(getMaps().stream().map(m -> m.values()).collect(Collectors.toList()),
141 (c1, c2) -> ImmutableList.<Versioned<V>>builder().addAll(c1).addAll(c2).build(),
142 ImmutableList.of());
Madan Jampani10073672016-01-21 19:13:59 -0800143 }
144
145 @Override
146 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
Madan Jampani307a21e2016-09-01 15:49:47 -0700147 return Tools.allOf(getMaps().stream().map(m -> m.entrySet()).collect(Collectors.toList()),
148 (s1, s2) -> ImmutableSet.<Entry<K, Versioned<V>>>builder().addAll(s1).addAll(s2).build(),
149 ImmutableSet.of());
Madan Jampani10073672016-01-21 19:13:59 -0800150 }
151
152 @Override
153 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
154 return getMap(key).putIfAbsent(key, value);
155 }
156
157 @Override
158 public CompletableFuture<Boolean> remove(K key, V value) {
159 return getMap(key).remove(key, value);
160 }
161
162 @Override
163 public CompletableFuture<Boolean> remove(K key, long version) {
164 return getMap(key).remove(key, version);
165 }
166
167 @Override
168 public CompletableFuture<Versioned<V>> replace(K key, V value) {
169 return getMap(key).replace(key, value);
170 }
171
172 @Override
173 public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
174 return getMap(key).replace(key, oldValue, newValue);
175 }
176
177 @Override
178 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
179 return getMap(key).replace(key, oldVersion, newValue);
180 }
181
182 @Override
Madan Jampani0463cf92016-05-04 14:46:08 -0700183 public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
Madan Jampani10073672016-01-21 19:13:59 -0800184 return CompletableFuture.allOf(getMaps().stream()
Madan Jampani0463cf92016-05-04 14:46:08 -0700185 .map(map -> map.addListener(listener, executor))
Madan Jampani10073672016-01-21 19:13:59 -0800186 .toArray(CompletableFuture[]::new));
187 }
188
189 @Override
190 public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
191 return CompletableFuture.allOf(getMaps().stream()
192 .map(map -> map.removeListener(listener))
193 .toArray(CompletableFuture[]::new));
194 }
195
Madan Jampani74da78b2016-02-09 21:18:36 -0800196 @Override
197 public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
198
199 Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
200 transaction.updates().forEach(update -> {
201 AsyncConsistentMap<K, V> map = getMap(update.key());
202 updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
203 });
204 Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
205 Maps.transformValues(updatesGroupedByMap,
206 list -> new MapTransaction<>(transaction.transactionId(), list));
207
208 return Tools.allOf(transactionsByMap.entrySet()
209 .stream()
210 .map(e -> e.getKey().prepare(e.getValue()))
211 .collect(Collectors.toList()))
212 .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
213 }
214
215 @Override
216 public CompletableFuture<Void> commit(TransactionId transactionId) {
217 return CompletableFuture.allOf(getMaps().stream()
218 .map(e -> e.commit(transactionId))
219 .toArray(CompletableFuture[]::new));
220 }
221
222 @Override
223 public CompletableFuture<Void> rollback(TransactionId transactionId) {
224 return CompletableFuture.allOf(getMaps().stream()
225 .map(e -> e.rollback(transactionId))
226 .toArray(CompletableFuture[]::new));
227 }
228
Madan Jampani542d9e22016-04-05 15:39:55 -0700229 @Override
230 public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
231 Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
232 transaction.updates().forEach(update -> {
233 AsyncConsistentMap<K, V> map = getMap(update.key());
234 updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
235 });
236 Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
237 Maps.transformValues(updatesGroupedByMap,
238 list -> new MapTransaction<>(transaction.transactionId(), list));
239
240 return Tools.allOf(transactionsByMap.entrySet()
241 .stream()
242 .map(e -> e.getKey().prepareAndCommit(e.getValue()))
243 .collect(Collectors.toList()))
244 .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
245 }
246
Madan Jampani1d3b6172016-04-28 13:22:57 -0700247 @Override
248 public void addStatusChangeListener(Consumer<Status> listener) {
249 partitions.values().forEach(map -> map.addStatusChangeListener(listener));
250 }
251
252 @Override
253 public void removeStatusChangeListener(Consumer<Status> listener) {
254 partitions.values().forEach(map -> map.removeStatusChangeListener(listener));
255 }
256
257 @Override
258 public Collection<Consumer<Status>> statusChangeListeners() {
259 throw new UnsupportedOperationException();
260 }
261
Madan Jampani10073672016-01-21 19:13:59 -0800262 /**
263 * Returns the map (partition) to which the specified key maps.
264 * @param key key
265 * @return AsyncConsistentMap to which key maps
266 */
267 private AsyncConsistentMap<K, V> getMap(K key) {
268 return partitions.get(keyHasher.hash(key));
269 }
270
271 /**
272 * Returns all the constituent maps.
273 * @return collection of maps.
274 */
275 private Collection<AsyncConsistentMap<K, V>> getMaps() {
276 return partitions.values();
277 }
278}