blob: b04c9dd42ab51167f6ebc6e177284c421fbd8f0e [file] [log] [blame]
Aaron Kruglikova26f6542016-04-19 13:37:42 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Aaron Kruglikova26f6542016-04-19 13:37:42 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
Aaron Kruglikova26f6542016-04-19 13:37:42 -070019import java.util.Collection;
Jordan Halterman15f33712018-06-21 00:00:15 -070020import java.util.Collections;
Aaron Kruglikova26f6542016-04-19 13:37:42 -070021import java.util.ConcurrentModificationException;
Jordan Halterman15f33712018-06-21 00:00:15 -070022import java.util.Iterator;
Jonathan Hart46bf89b2017-02-27 15:56:42 -080023import java.util.List;
Aaron Kruglikova26f6542016-04-19 13:37:42 -070024import java.util.Map;
Jordan Halterman5e884352018-05-21 22:11:07 -070025import java.util.NoSuchElementException;
Aaron Kruglikova26f6542016-04-19 13:37:42 -070026import java.util.Set;
27import java.util.concurrent.CompletableFuture;
Jonathan Hart46bf89b2017-02-27 15:56:42 -080028import java.util.concurrent.ConcurrentHashMap;
29import java.util.concurrent.Executor;
Jordan Halterman15f33712018-06-21 00:00:15 -070030import java.util.function.Function;
Aaron Kruglikova26f6542016-04-19 13:37:42 -070031
Jordan Halterman2bf177c2017-06-29 01:49:08 -070032import com.google.common.collect.Lists;
33import com.google.common.collect.Multiset;
34import io.atomix.protocols.raft.proxy.RaftProxy;
35import org.onlab.util.KryoNamespace;
Jordan Halterman5e884352018-05-21 22:11:07 -070036import org.onlab.util.Tools;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070037import org.onosproject.store.serializers.KryoNamespaces;
38import org.onosproject.store.service.AsyncConsistentMultimap;
Jordan Halterman5e884352018-05-21 22:11:07 -070039import org.onosproject.store.service.AsyncIterator;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070040import org.onosproject.store.service.MultimapEvent;
41import org.onosproject.store.service.MultimapEventListener;
42import org.onosproject.store.service.Serializer;
43import org.onosproject.store.service.Versioned;
44
45import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
46import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
47import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
Jordan Halterman15f33712018-06-21 00:00:15 -070048import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLOSE_ITERATOR;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070049import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
50import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
51import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
52import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsEntry;
53import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsKey;
54import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ContainsValue;
55import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ENTRIES;
56import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
57import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
58import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
Jordan Halterman15f33712018-06-21 00:00:15 -070059import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorBatch;
60import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorPosition;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070061import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
62import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
Jordan Halterman15f33712018-06-21 00:00:15 -070064import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070066import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
Jordan Halterman8c57a092018-06-04 14:53:06 -070067import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070068import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
Jordan Halterman8c57a092018-06-04 14:53:06 -070071import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070072import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
75import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Replace;
76import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.SIZE;
77import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.VALUES;
Aaron Kruglikova26f6542016-04-19 13:37:42 -070078
Jonathan Hart46bf89b2017-02-27 15:56:42 -080079
Aaron Kruglikova26f6542016-04-19 13:37:42 -070080/**
81 * Set based implementation of the {@link AsyncConsistentMultimap}.
82 * <p>
83 * Note: this implementation does not allow null entries or duplicate entries.
84 */
Aaron Kruglikova1801aa2016-07-12 15:17:30 -070085public class AtomixConsistentSetMultimap
Jordan Halterman5e884352018-05-21 22:11:07 -070086 extends AbstractRaftPrimitive
87 implements AsyncConsistentMultimap<String, byte[]> {
Aaron Kruglikova26f6542016-04-19 13:37:42 -070088
Jordan Halterman2bf177c2017-06-29 01:49:08 -070089 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
Jordan Halterman5e884352018-05-21 22:11:07 -070090 .register(KryoNamespaces.BASIC)
91 .register(AtomixConsistentSetMultimapOperations.NAMESPACE)
92 .register(AtomixConsistentSetMultimapEvents.NAMESPACE)
93 .build());
Jordan Halterman2bf177c2017-06-29 01:49:08 -070094
Jonathan Hart46bf89b2017-02-27 15:56:42 -080095 private final Map<MultimapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
96
Jordan Halterman2bf177c2017-06-29 01:49:08 -070097 public AtomixConsistentSetMultimap(RaftProxy proxy) {
98 super(proxy);
Jordan Halterman5e884352018-05-21 22:11:07 -070099 proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleChange);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700100 proxy.addStateChangeListener(state -> {
101 if (state == RaftProxy.State.CONNECTED && isListening()) {
102 proxy.invoke(ADD_LISTENER);
103 }
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800104 });
105 }
106
Jordan Halterman5e884352018-05-21 22:11:07 -0700107 private void handleChange(List<MultimapEvent<String, byte[]>> events) {
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800108 events.forEach(event ->
Jordan Halterman5e884352018-05-21 22:11:07 -0700109 mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
110 }
111
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700112 @Override
113 public CompletableFuture<Integer> size() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700114 return proxy.invoke(SIZE, SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700115 }
116
117 @Override
118 public CompletableFuture<Boolean> isEmpty() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700119 return proxy.invoke(IS_EMPTY, SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700120 }
121
122 @Override
123 public CompletableFuture<Boolean> containsKey(String key) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700124 return proxy.invoke(CONTAINS_KEY, SERIALIZER::encode, new ContainsKey(key), SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700125 }
126
127 @Override
128 public CompletableFuture<Boolean> containsValue(byte[] value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700129 return proxy.invoke(CONTAINS_VALUE, SERIALIZER::encode, new ContainsValue(value), SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700130 }
131
132 @Override
133 public CompletableFuture<Boolean> containsEntry(String key, byte[] value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700134 return proxy.invoke(CONTAINS_ENTRY, SERIALIZER::encode, new ContainsEntry(key, value), SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700135 }
136
137 @Override
138 public CompletableFuture<Boolean> put(String key, byte[] value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700139 return proxy.invoke(
Jordan Halterman5e884352018-05-21 22:11:07 -0700140 PUT,
141 SERIALIZER::encode,
142 new Put(key, Lists.newArrayList(value), null),
143 SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700144 }
145
146 @Override
Jordan Halterman8c57a092018-06-04 14:53:06 -0700147 public CompletableFuture<Versioned<Collection<? extends byte[]>>> putAndGet(String key, byte[] value) {
148 return proxy.invoke(
149 PUT_AND_GET,
150 SERIALIZER::encode,
151 new Put(key, Lists.newArrayList(value), null),
152 SERIALIZER::decode);
153 }
154
155 @Override
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700156 public CompletableFuture<Boolean> remove(String key, byte[] value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700157 return proxy.invoke(REMOVE, SERIALIZER::encode, new MultiRemove(key,
Jordan Halterman5e884352018-05-21 22:11:07 -0700158 Lists.newArrayList(value),
159 null), SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700160 }
161
162 @Override
Jordan Halterman8c57a092018-06-04 14:53:06 -0700163 public CompletableFuture<Versioned<Collection<? extends byte[]>>> removeAndGet(String key, byte[] value) {
164 return proxy.invoke(REMOVE_AND_GET, SERIALIZER::encode, new MultiRemove(key,
165 Lists.newArrayList(value),
166 null), SERIALIZER::decode);
167 }
168
169 @Override
Madan Jampani630e7ac2016-05-31 11:34:05 -0700170 public CompletableFuture<Boolean> removeAll(String key, Collection<? extends byte[]> values) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700171 return proxy.invoke(
Jordan Halterman5e884352018-05-21 22:11:07 -0700172 REMOVE,
173 SERIALIZER::encode,
174 new MultiRemove(key, (Collection<byte[]>) values, null),
175 SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700176 }
177
178 @Override
Madan Jampani630e7ac2016-05-31 11:34:05 -0700179 public CompletableFuture<Versioned<Collection<? extends byte[]>>> removeAll(String key) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700180 return proxy.invoke(REMOVE_ALL, SERIALIZER::encode, new RemoveAll(key, null), SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700181 }
182
183 @Override
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700184 public CompletableFuture<Boolean> putAll(
Jordan Halterman5e884352018-05-21 22:11:07 -0700185 String key, Collection<? extends byte[]> values) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700186 return proxy.invoke(PUT, SERIALIZER::encode, new Put(key, values, null), SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700187 }
188
189 @Override
Madan Jampani630e7ac2016-05-31 11:34:05 -0700190 public CompletableFuture<Versioned<Collection<? extends byte[]>>> replaceValues(
Jordan Halterman5e884352018-05-21 22:11:07 -0700191 String key, Collection<byte[]> values) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700192 return proxy.invoke(
Jordan Halterman5e884352018-05-21 22:11:07 -0700193 REPLACE,
194 SERIALIZER::encode,
195 new Replace(key, values, null),
196 SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700197 }
198
199 @Override
200 public CompletableFuture<Void> clear() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700201 return proxy.invoke(CLEAR);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700202 }
203
204 @Override
Madan Jampani630e7ac2016-05-31 11:34:05 -0700205 public CompletableFuture<Versioned<Collection<? extends byte[]>>> get(String key) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700206 return proxy.invoke(GET, SERIALIZER::encode, new Get(key), SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700207 }
208
209 @Override
210 public CompletableFuture<Set<String>> keySet() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700211 return proxy.invoke(KEY_SET, SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700212 }
213
214 @Override
215 public CompletableFuture<Multiset<String>> keys() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700216 return proxy.invoke(KEYS, SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700217 }
218
Jonathan Hartad0c3022017-02-22 14:06:01 -0800219 @Override
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700220 public CompletableFuture<Multiset<byte[]>> values() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700221 return proxy.invoke(VALUES, SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700222 }
223
224 @Override
225 public CompletableFuture<Collection<Map.Entry<String, byte[]>>> entries() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700226 return proxy.invoke(ENTRIES, SERIALIZER::decode);
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700227 }
228
229 @Override
Jordan Halterman5e884352018-05-21 22:11:07 -0700230 public CompletableFuture<AsyncIterator<Map.Entry<String, byte[]>>> iterator() {
Jordan Halterman15f33712018-06-21 00:00:15 -0700231 return proxy.<Long>invoke(OPEN_ITERATOR, SERIALIZER::decode).thenApply(ConsistentMultimapIterator::new);
Jordan Halterman5e884352018-05-21 22:11:07 -0700232 }
233
Jordan Halterman15f33712018-06-21 00:00:15 -0700234 /**
235 * Consistent multimap iterator.
236 */
237 private class ConsistentMultimapIterator implements AsyncIterator<Map.Entry<String, byte[]>> {
238 private final long id;
239 private volatile CompletableFuture<IteratorBatch> batch;
240 private volatile CompletableFuture<Void> closeFuture;
Jordan Halterman5e884352018-05-21 22:11:07 -0700241
Jordan Halterman15f33712018-06-21 00:00:15 -0700242 ConsistentMultimapIterator(long id) {
243 this.id = id;
244 this.batch = CompletableFuture.completedFuture(
245 new IteratorBatch(0, Collections.emptyList()));
Jordan Halterman5e884352018-05-21 22:11:07 -0700246 }
247
Jordan Halterman15f33712018-06-21 00:00:15 -0700248 /**
249 * Returns the current batch iterator or lazily fetches the next batch from the cluster.
250 *
251 * @return the next batch iterator
252 */
253 private CompletableFuture<Iterator<Map.Entry<String, byte[]>>> batch() {
254 return batch.thenCompose(iterator -> {
255 if (iterator != null && !iterator.hasNext()) {
256 batch = fetch(iterator.position());
257 return batch.thenApply(Function.identity());
258 }
259 return CompletableFuture.completedFuture(iterator);
260 });
261 }
262
263 /**
264 * Fetches the next batch of entries from the cluster.
265 *
266 * @param position the position from which to fetch the next batch
267 * @return the next batch of entries from the cluster
268 */
269 private CompletableFuture<IteratorBatch> fetch(int position) {
270 return proxy.<IteratorPosition, IteratorBatch>invoke(
271 NEXT,
272 SERIALIZER::encode,
273 new IteratorPosition(id, position),
274 SERIALIZER::decode)
275 .thenCompose(batch -> {
276 if (batch == null) {
277 return close().thenApply(v -> null);
278 }
279 return CompletableFuture.completedFuture(batch);
280 });
281 }
282
283 /**
284 * Closes the iterator.
285 *
286 * @return future to be completed once the iterator has been closed
287 */
288 private CompletableFuture<Void> close() {
289 if (closeFuture == null) {
290 synchronized (this) {
291 if (closeFuture == null) {
292 closeFuture = proxy.invoke(CLOSE_ITERATOR, SERIALIZER::encode, id);
293 }
294 }
Jordan Halterman5e884352018-05-21 22:11:07 -0700295 }
Jordan Halterman15f33712018-06-21 00:00:15 -0700296 return closeFuture;
Jordan Halterman5e884352018-05-21 22:11:07 -0700297 }
298
299 @Override
Jordan Halterman15f33712018-06-21 00:00:15 -0700300 public CompletableFuture<Boolean> hasNext() {
301 return batch().thenApply(iterator -> iterator != null && iterator.hasNext());
Jordan Halterman5e884352018-05-21 22:11:07 -0700302 }
303
304 @Override
Jordan Halterman15f33712018-06-21 00:00:15 -0700305 public CompletableFuture<Map.Entry<String, byte[]>> next() {
306 return batch().thenCompose(iterator -> {
307 if (iterator == null) {
308 return Tools.exceptionalFuture(new NoSuchElementException());
309 }
310 return CompletableFuture.completedFuture(iterator.next());
311 });
Jordan Halterman5e884352018-05-21 22:11:07 -0700312 }
313 }
314
315 @Override
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800316 public CompletableFuture<Void> addListener(MultimapEventListener<String, byte[]> listener, Executor executor) {
317 if (mapEventListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700318 return proxy.invoke(ADD_LISTENER).thenRun(() -> mapEventListeners.put(listener, executor));
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800319 } else {
320 mapEventListeners.put(listener, executor);
321 return CompletableFuture.completedFuture(null);
322 }
323 }
324
325 @Override
326 public CompletableFuture<Void> removeListener(MultimapEventListener<String, byte[]> listener) {
327 if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700328 return proxy.invoke(REMOVE_LISTENER).thenApply(v -> null);
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800329 }
330 return CompletableFuture.completedFuture(null);
331 }
332
333 @Override
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700334 public CompletableFuture<Map<String, Collection<byte[]>>> asMap() {
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700335 throw new UnsupportedOperationException("Expensive operation.");
336 }
337
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700338 /**
339 * Helper to check if there was a lock based issue.
Jordan Halterman5e884352018-05-21 22:11:07 -0700340 *
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700341 * @param status the status of an update result
342 */
343 private void throwIfLocked(MapEntryUpdateResult.Status status) {
344 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
Aaron Kruglikov44a1fef2016-04-27 11:29:23 -0700345 throw new ConcurrentModificationException("Cannot update map: " +
Jordan Halterman5e884352018-05-21 22:11:07 -0700346 "Another transaction " +
347 "in progress");
Aaron Kruglikova26f6542016-04-19 13:37:42 -0700348 }
349 }
Jonathan Hart46bf89b2017-02-27 15:56:42 -0800350
351 private boolean isListening() {
352 return !mapEventListeners.isEmpty();
353 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700354}