blob: 2a6b2c7cb4cd5b594a30f21c764c43751e6a4752 [file] [log] [blame]
Madan Jampani10073672016-01-21 19:13:59 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani10073672016-01-21 19:13:59 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19
20import java.util.Collection;
21import java.util.List;
22import java.util.Map;
23import java.util.Map.Entry;
24import java.util.Set;
25import java.util.TreeMap;
26import java.util.concurrent.CompletableFuture;
Madan Jampani0463cf92016-05-04 14:46:08 -070027import java.util.concurrent.Executor;
Madan Jampani10073672016-01-21 19:13:59 -080028import java.util.concurrent.atomic.AtomicBoolean;
29import java.util.concurrent.atomic.AtomicInteger;
30import java.util.function.BiFunction;
Madan Jampani1d3b6172016-04-28 13:22:57 -070031import java.util.function.Consumer;
Madan Jampani10073672016-01-21 19:13:59 -080032import java.util.function.Predicate;
Madan Jampani74da78b2016-02-09 21:18:36 -080033import java.util.stream.Collectors;
Madan Jampani10073672016-01-21 19:13:59 -080034
Madan Jampani74da78b2016-02-09 21:18:36 -080035import org.onlab.util.Tools;
Madan Jampani10073672016-01-21 19:13:59 -080036import org.onosproject.cluster.PartitionId;
Madan Jampani74da78b2016-02-09 21:18:36 -080037import org.onosproject.store.primitives.MapUpdate;
38import org.onosproject.store.primitives.TransactionId;
Madan Jampani10073672016-01-21 19:13:59 -080039import org.onosproject.store.service.AsyncConsistentMap;
40import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080041import org.onosproject.store.service.MapTransaction;
Madan Jampani10073672016-01-21 19:13:59 -080042import org.onosproject.store.service.Versioned;
Madan Jampani0463cf92016-05-04 14:46:08 -070043
Madan Jampani10073672016-01-21 19:13:59 -080044import com.google.common.collect.Lists;
45import com.google.common.collect.Maps;
46import com.google.common.collect.Sets;
47
48/**
49 * {@link AsyncConsistentMap} that has its entries partitioned horizontally across
50 * several {@link AsyncConsistentMap maps}.
51 *
52 * @param <K> key type
53 * @param <V> value type
54 */
55public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
56
57 private final String name;
58 private final TreeMap<PartitionId, AsyncConsistentMap<K, V>> partitions = Maps.newTreeMap();
59 private final Hasher<K> keyHasher;
60
61 public PartitionedAsyncConsistentMap(String name,
62 Map<PartitionId, AsyncConsistentMap<K, V>> partitions,
63 Hasher<K> keyHasher) {
64 this.name = name;
65 this.partitions.putAll(checkNotNull(partitions));
66 this.keyHasher = checkNotNull(keyHasher);
67 }
68
69 @Override
70 public String name() {
71 return name;
72 }
73
74 @Override
75 public CompletableFuture<Integer> size() {
76 AtomicInteger totalSize = new AtomicInteger(0);
77 return CompletableFuture.allOf(getMaps()
78 .stream()
79 .map(map -> map.size().thenAccept(totalSize::addAndGet))
80 .toArray(CompletableFuture[]::new))
81 .thenApply(v -> totalSize.get());
82 }
83
84 @Override
85 public CompletableFuture<Boolean> isEmpty() {
86 return size().thenApply(size -> size == 0);
87 }
88
89 @Override
90 public CompletableFuture<Boolean> containsKey(K key) {
91 return getMap(key).containsKey(key);
92 }
93
94 @Override
95 public CompletableFuture<Boolean> containsValue(V value) {
96 AtomicBoolean contains = new AtomicBoolean(false);
97 return CompletableFuture.allOf(getMaps().stream()
98 .map(map -> map.containsValue(value)
99 .thenAccept(v -> contains.set(contains.get() || v)))
100 .toArray(CompletableFuture[]::new))
101 .thenApply(v -> contains.get());
102 }
103 @Override
104 public CompletableFuture<Versioned<V>> get(K key) {
105 return getMap(key).get(key);
106 }
107
108 @Override
109 public CompletableFuture<Versioned<V>> computeIf(K key,
110 Predicate<? super V> condition,
111 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
112 return getMap(key).computeIf(key, condition, remappingFunction);
113 }
114
115 @Override
116 public CompletableFuture<Versioned<V>> put(K key, V value) {
117 return getMap(key).put(key, value);
118 }
119
120 @Override
121 public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
122 return getMap(key).putAndGet(key, value);
123 }
124
125 @Override
126 public CompletableFuture<Versioned<V>> remove(K key) {
127 return getMap(key).remove(key);
128 }
129
130 @Override
131 public CompletableFuture<Void> clear() {
132 return CompletableFuture.allOf(getMaps().stream()
133 .map(map -> map.clear())
134 .toArray(CompletableFuture[]::new));
135 }
136
137 @Override
138 public CompletableFuture<Set<K>> keySet() {
139 Set<K> allKeys = Sets.newConcurrentHashSet();
140 return CompletableFuture.allOf(getMaps().stream()
141 .map(map -> map.keySet().thenAccept(allKeys::addAll))
142 .toArray(CompletableFuture[]::new))
143 .thenApply(v -> allKeys);
144 }
145
146 @Override
147 public CompletableFuture<Collection<Versioned<V>>> values() {
148 List<Versioned<V>> allValues = Lists.newCopyOnWriteArrayList();
149 return CompletableFuture.allOf(getMaps().stream()
150 .map(map -> map.values().thenAccept(allValues::addAll))
151 .toArray(CompletableFuture[]::new))
152 .thenApply(v -> allValues);
153 }
154
155 @Override
156 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
157 Set<Entry<K, Versioned<V>>> allEntries = Sets.newConcurrentHashSet();
158 return CompletableFuture.allOf(getMaps().stream()
159 .map(map -> map.entrySet().thenAccept(allEntries::addAll))
160 .toArray(CompletableFuture[]::new))
161 .thenApply(v -> allEntries);
162 }
163
164 @Override
165 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
166 return getMap(key).putIfAbsent(key, value);
167 }
168
169 @Override
170 public CompletableFuture<Boolean> remove(K key, V value) {
171 return getMap(key).remove(key, value);
172 }
173
174 @Override
175 public CompletableFuture<Boolean> remove(K key, long version) {
176 return getMap(key).remove(key, version);
177 }
178
179 @Override
180 public CompletableFuture<Versioned<V>> replace(K key, V value) {
181 return getMap(key).replace(key, value);
182 }
183
184 @Override
185 public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
186 return getMap(key).replace(key, oldValue, newValue);
187 }
188
189 @Override
190 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
191 return getMap(key).replace(key, oldVersion, newValue);
192 }
193
194 @Override
Madan Jampani0463cf92016-05-04 14:46:08 -0700195 public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
Madan Jampani10073672016-01-21 19:13:59 -0800196 return CompletableFuture.allOf(getMaps().stream()
Madan Jampani0463cf92016-05-04 14:46:08 -0700197 .map(map -> map.addListener(listener, executor))
Madan Jampani10073672016-01-21 19:13:59 -0800198 .toArray(CompletableFuture[]::new));
199 }
200
201 @Override
202 public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
203 return CompletableFuture.allOf(getMaps().stream()
204 .map(map -> map.removeListener(listener))
205 .toArray(CompletableFuture[]::new));
206 }
207
Madan Jampani74da78b2016-02-09 21:18:36 -0800208 @Override
209 public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
210
211 Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
212 transaction.updates().forEach(update -> {
213 AsyncConsistentMap<K, V> map = getMap(update.key());
214 updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
215 });
216 Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
217 Maps.transformValues(updatesGroupedByMap,
218 list -> new MapTransaction<>(transaction.transactionId(), list));
219
220 return Tools.allOf(transactionsByMap.entrySet()
221 .stream()
222 .map(e -> e.getKey().prepare(e.getValue()))
223 .collect(Collectors.toList()))
224 .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
225 }
226
227 @Override
228 public CompletableFuture<Void> commit(TransactionId transactionId) {
229 return CompletableFuture.allOf(getMaps().stream()
230 .map(e -> e.commit(transactionId))
231 .toArray(CompletableFuture[]::new));
232 }
233
234 @Override
235 public CompletableFuture<Void> rollback(TransactionId transactionId) {
236 return CompletableFuture.allOf(getMaps().stream()
237 .map(e -> e.rollback(transactionId))
238 .toArray(CompletableFuture[]::new));
239 }
240
Madan Jampani542d9e22016-04-05 15:39:55 -0700241 @Override
242 public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
243 Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
244 transaction.updates().forEach(update -> {
245 AsyncConsistentMap<K, V> map = getMap(update.key());
246 updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
247 });
248 Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
249 Maps.transformValues(updatesGroupedByMap,
250 list -> new MapTransaction<>(transaction.transactionId(), list));
251
252 return Tools.allOf(transactionsByMap.entrySet()
253 .stream()
254 .map(e -> e.getKey().prepareAndCommit(e.getValue()))
255 .collect(Collectors.toList()))
256 .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
257 }
258
Madan Jampani1d3b6172016-04-28 13:22:57 -0700259 @Override
260 public void addStatusChangeListener(Consumer<Status> listener) {
261 partitions.values().forEach(map -> map.addStatusChangeListener(listener));
262 }
263
264 @Override
265 public void removeStatusChangeListener(Consumer<Status> listener) {
266 partitions.values().forEach(map -> map.removeStatusChangeListener(listener));
267 }
268
269 @Override
270 public Collection<Consumer<Status>> statusChangeListeners() {
271 throw new UnsupportedOperationException();
272 }
273
Madan Jampani10073672016-01-21 19:13:59 -0800274 /**
275 * Returns the map (partition) to which the specified key maps.
276 * @param key key
277 * @return AsyncConsistentMap to which key maps
278 */
279 private AsyncConsistentMap<K, V> getMap(K key) {
280 return partitions.get(keyHasher.hash(key));
281 }
282
283 /**
284 * Returns all the constituent maps.
285 * @return collection of maps.
286 */
287 private Collection<AsyncConsistentMap<K, V>> getMaps() {
288 return partitions.values();
289 }
290}