blob: 0bcbdc4f71ea6d3a3e6e3638e6b1f542cd96880f [file] [log] [blame]
Madan Jampani63c659f2015-06-11 00:52:58 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.consistent.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19
20import java.util.Set;
21import java.util.concurrent.CompletableFuture;
22import java.util.function.Consumer;
23
24import org.onosproject.cluster.NodeId;
25import org.onosproject.store.service.DistributedQueue;
26import org.onosproject.store.service.Serializer;
27
28import com.google.common.collect.Sets;
29import com.google.common.util.concurrent.Futures;
30
31/**
32 * DistributedQueue implementation that provides FIFO ordering semantics.
33 *
34 * @param <E> queue entry type
35 */
36public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
37
38 private final String name;
39 private final Database database;
40 private final Serializer serializer;
41 private final NodeId localNodeId;
42 private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
43 private final Consumer<Set<NodeId>> notifyConsumers;
44
45 private static final String ERROR_NULL_ENTRY = "Null entries are not allowed";
46
47 public DefaultDistributedQueue(String name,
48 Database database,
49 Serializer serializer,
50 NodeId localNodeId,
51 Consumer<Set<NodeId>> notifyConsumers) {
52 this.name = checkNotNull(name, "queue name cannot be null");
53 this.database = checkNotNull(database, "database cannot be null");
54 this.serializer = checkNotNull(serializer, "serializer cannot be null");
55 this.localNodeId = localNodeId;
56 this.notifyConsumers = notifyConsumers;
57 }
58
59 @Override
60 public long size() {
61 return Futures.getUnchecked(database.queueSize(name));
62 }
63
64 @Override
65 public void push(E entry) {
66 checkNotNull(entry, ERROR_NULL_ENTRY);
67 Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
68 .thenAccept(notifyConsumers)
69 .thenApply(v -> null));
70 }
71
72 @Override
73 public CompletableFuture<E> pop() {
74 return database.queuePop(name, localNodeId)
75 .thenCompose(v -> {
76 if (v != null) {
77 return CompletableFuture.completedFuture(serializer.decode(v));
78 } else {
79 CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
80 pendingFutures.add(newPendingFuture);
81 return newPendingFuture;
82 }
83 });
84 }
85
86 @Override
87 public E peek() {
88 return Futures.getUnchecked(database.queuePeek(name)
89 .thenApply(v -> v != null ? serializer.decode(v) : null));
90 }
91
92 public String name() {
93 return name;
94 }
95
96 protected void tryPoll() {
97 Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
98 for (CompletableFuture<E> future : pendingFutures) {
99 E entry = Futures.getUnchecked(database.queuePop(name, localNodeId)
100 .thenApply(v -> v != null ? serializer.decode(v) : null));
101 if (entry != null) {
102 future.complete(entry);
103 completedFutures.add(future);
104 } else {
105 break;
106 }
107 }
108 pendingFutures.removeAll(completedFutures);
109 }
110}