blob: bf77105a080e83e617d8eb0f47584d43fc606962 [file] [log] [blame]
Madan Jampani10073672016-01-21 19:13:59 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani10073672016-01-21 19:13:59 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19
20import java.util.Collection;
Madan Jampani10073672016-01-21 19:13:59 -080021import java.util.Map;
22import java.util.Map.Entry;
23import java.util.Set;
24import java.util.TreeMap;
25import java.util.concurrent.CompletableFuture;
Madan Jampani0463cf92016-05-04 14:46:08 -070026import java.util.concurrent.Executor;
Madan Jampani10073672016-01-21 19:13:59 -080027import java.util.function.BiFunction;
Madan Jampani1d3b6172016-04-28 13:22:57 -070028import java.util.function.Consumer;
Madan Jampani10073672016-01-21 19:13:59 -080029import java.util.function.Predicate;
Madan Jampani74da78b2016-02-09 21:18:36 -080030import java.util.stream.Collectors;
Madan Jampani10073672016-01-21 19:13:59 -080031
Madan Jampani307a21e2016-09-01 15:49:47 -070032import org.onlab.util.Match;
Madan Jampani74da78b2016-02-09 21:18:36 -080033import org.onlab.util.Tools;
Madan Jampani10073672016-01-21 19:13:59 -080034import org.onosproject.cluster.PartitionId;
Madan Jampani74da78b2016-02-09 21:18:36 -080035import org.onosproject.store.primitives.MapUpdate;
36import org.onosproject.store.primitives.TransactionId;
Madan Jampani10073672016-01-21 19:13:59 -080037import org.onosproject.store.service.AsyncConsistentMap;
38import org.onosproject.store.service.MapEventListener;
Jordan Halterman948d6592017-04-20 17:18:24 -070039import org.onosproject.store.service.TransactionLog;
40import org.onosproject.store.service.Version;
Madan Jampani10073672016-01-21 19:13:59 -080041import org.onosproject.store.service.Versioned;
Madan Jampani0463cf92016-05-04 14:46:08 -070042
Madan Jampani307a21e2016-09-01 15:49:47 -070043import com.google.common.collect.ImmutableList;
44import com.google.common.collect.ImmutableSet;
Madan Jampani10073672016-01-21 19:13:59 -080045import com.google.common.collect.Maps;
Madan Jampani10073672016-01-21 19:13:59 -080046
47/**
48 * {@link AsyncConsistentMap} that has its entries partitioned horizontally across
49 * several {@link AsyncConsistentMap maps}.
50 *
51 * @param <K> key type
52 * @param <V> value type
53 */
54public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
55
56 private final String name;
57 private final TreeMap<PartitionId, AsyncConsistentMap<K, V>> partitions = Maps.newTreeMap();
58 private final Hasher<K> keyHasher;
59
60 public PartitionedAsyncConsistentMap(String name,
61 Map<PartitionId, AsyncConsistentMap<K, V>> partitions,
62 Hasher<K> keyHasher) {
63 this.name = name;
64 this.partitions.putAll(checkNotNull(partitions));
65 this.keyHasher = checkNotNull(keyHasher);
66 }
67
68 @Override
69 public String name() {
70 return name;
71 }
72
73 @Override
74 public CompletableFuture<Integer> size() {
Madan Jampani307a21e2016-09-01 15:49:47 -070075 return Tools.allOf(getMaps().stream().map(m -> m.size()).collect(Collectors.toList()),
76 Math::addExact,
77 0);
Madan Jampani10073672016-01-21 19:13:59 -080078 }
79
80 @Override
81 public CompletableFuture<Boolean> isEmpty() {
82 return size().thenApply(size -> size == 0);
83 }
84
85 @Override
86 public CompletableFuture<Boolean> containsKey(K key) {
87 return getMap(key).containsKey(key);
88 }
89
90 @Override
91 public CompletableFuture<Boolean> containsValue(V value) {
Madan Jampani307a21e2016-09-01 15:49:47 -070092 return Tools.firstOf(getMaps().stream().map(m -> m.containsValue(value)).collect(Collectors.toList()),
93 Match.ifValue(true),
94 false);
Madan Jampani10073672016-01-21 19:13:59 -080095 }
Jordan Haltermanf6272442017-04-20 02:18:08 -070096
Madan Jampani10073672016-01-21 19:13:59 -080097 @Override
98 public CompletableFuture<Versioned<V>> get(K key) {
99 return getMap(key).get(key);
100 }
101
102 @Override
Jordan Haltermanf6272442017-04-20 02:18:08 -0700103 public CompletableFuture<Versioned<V>> getOrDefault(K key, V defaultValue) {
104 return getMap(key).getOrDefault(key, defaultValue);
105 }
106
107 @Override
Madan Jampani10073672016-01-21 19:13:59 -0800108 public CompletableFuture<Versioned<V>> computeIf(K key,
109 Predicate<? super V> condition,
110 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
111 return getMap(key).computeIf(key, condition, remappingFunction);
112 }
113
114 @Override
115 public CompletableFuture<Versioned<V>> put(K key, V value) {
116 return getMap(key).put(key, value);
117 }
118
119 @Override
120 public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
121 return getMap(key).putAndGet(key, value);
122 }
123
124 @Override
125 public CompletableFuture<Versioned<V>> remove(K key) {
126 return getMap(key).remove(key);
127 }
128
129 @Override
130 public CompletableFuture<Void> clear() {
131 return CompletableFuture.allOf(getMaps().stream()
132 .map(map -> map.clear())
133 .toArray(CompletableFuture[]::new));
134 }
135
136 @Override
137 public CompletableFuture<Set<K>> keySet() {
Madan Jampani307a21e2016-09-01 15:49:47 -0700138 return Tools.allOf(getMaps().stream().map(m -> m.keySet()).collect(Collectors.toList()),
139 (s1, s2) -> ImmutableSet.<K>builder().addAll(s1).addAll(s2).build(),
140 ImmutableSet.of());
Madan Jampani10073672016-01-21 19:13:59 -0800141 }
142
143 @Override
144 public CompletableFuture<Collection<Versioned<V>>> values() {
Madan Jampani307a21e2016-09-01 15:49:47 -0700145 return Tools.allOf(getMaps().stream().map(m -> m.values()).collect(Collectors.toList()),
146 (c1, c2) -> ImmutableList.<Versioned<V>>builder().addAll(c1).addAll(c2).build(),
147 ImmutableList.of());
Madan Jampani10073672016-01-21 19:13:59 -0800148 }
149
150 @Override
151 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
Madan Jampani307a21e2016-09-01 15:49:47 -0700152 return Tools.allOf(getMaps().stream().map(m -> m.entrySet()).collect(Collectors.toList()),
153 (s1, s2) -> ImmutableSet.<Entry<K, Versioned<V>>>builder().addAll(s1).addAll(s2).build(),
154 ImmutableSet.of());
Madan Jampani10073672016-01-21 19:13:59 -0800155 }
156
157 @Override
158 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
159 return getMap(key).putIfAbsent(key, value);
160 }
161
162 @Override
163 public CompletableFuture<Boolean> remove(K key, V value) {
164 return getMap(key).remove(key, value);
165 }
166
167 @Override
168 public CompletableFuture<Boolean> remove(K key, long version) {
169 return getMap(key).remove(key, version);
170 }
171
172 @Override
173 public CompletableFuture<Versioned<V>> replace(K key, V value) {
174 return getMap(key).replace(key, value);
175 }
176
177 @Override
178 public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
179 return getMap(key).replace(key, oldValue, newValue);
180 }
181
182 @Override
183 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
184 return getMap(key).replace(key, oldVersion, newValue);
185 }
186
187 @Override
Madan Jampani0463cf92016-05-04 14:46:08 -0700188 public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
Madan Jampani10073672016-01-21 19:13:59 -0800189 return CompletableFuture.allOf(getMaps().stream()
Madan Jampani0463cf92016-05-04 14:46:08 -0700190 .map(map -> map.addListener(listener, executor))
Madan Jampani10073672016-01-21 19:13:59 -0800191 .toArray(CompletableFuture[]::new));
192 }
193
194 @Override
195 public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
196 return CompletableFuture.allOf(getMaps().stream()
197 .map(map -> map.removeListener(listener))
198 .toArray(CompletableFuture[]::new));
199 }
200
Madan Jampani74da78b2016-02-09 21:18:36 -0800201 @Override
Jordan Halterman948d6592017-04-20 17:18:24 -0700202 public CompletableFuture<Version> begin(TransactionId transactionId) {
203 throw new UnsupportedOperationException();
204 }
Madan Jampani74da78b2016-02-09 21:18:36 -0800205
Jordan Halterman948d6592017-04-20 17:18:24 -0700206 @Override
207 public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
208 throw new UnsupportedOperationException();
209 }
Madan Jampani74da78b2016-02-09 21:18:36 -0800210
Jordan Halterman948d6592017-04-20 17:18:24 -0700211 @Override
212 public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
213 throw new UnsupportedOperationException();
Madan Jampani74da78b2016-02-09 21:18:36 -0800214 }
215
216 @Override
217 public CompletableFuture<Void> commit(TransactionId transactionId) {
Jordan Halterman948d6592017-04-20 17:18:24 -0700218 throw new UnsupportedOperationException();
Madan Jampani74da78b2016-02-09 21:18:36 -0800219 }
220
221 @Override
222 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Jordan Halterman948d6592017-04-20 17:18:24 -0700223 throw new UnsupportedOperationException();
Madan Jampani542d9e22016-04-05 15:39:55 -0700224 }
225
Madan Jampani1d3b6172016-04-28 13:22:57 -0700226 @Override
227 public void addStatusChangeListener(Consumer<Status> listener) {
228 partitions.values().forEach(map -> map.addStatusChangeListener(listener));
229 }
230
231 @Override
232 public void removeStatusChangeListener(Consumer<Status> listener) {
233 partitions.values().forEach(map -> map.removeStatusChangeListener(listener));
234 }
235
236 @Override
237 public Collection<Consumer<Status>> statusChangeListeners() {
238 throw new UnsupportedOperationException();
239 }
240
Madan Jampani10073672016-01-21 19:13:59 -0800241 /**
242 * Returns the map (partition) to which the specified key maps.
243 * @param key key
244 * @return AsyncConsistentMap to which key maps
245 */
246 private AsyncConsistentMap<K, V> getMap(K key) {
247 return partitions.get(keyHasher.hash(key));
248 }
249
250 /**
251 * Returns all the constituent maps.
252 * @return collection of maps.
253 */
254 private Collection<AsyncConsistentMap<K, V>> getMaps() {
255 return partitions.values();
256 }
257}