blob: c27774a9eabafb4b96501de9e7017cf19349f55b [file] [log] [blame]
Madan Jampani63c659f2015-06-11 00:52:58 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.consistent.impl;
17
Flavio Castro41b1f3a2015-07-31 13:51:32 -070018import com.google.common.collect.Sets;
19import com.google.common.util.concurrent.Futures;
20import org.onosproject.cluster.NodeId;
21import org.onosproject.store.service.DistributedQueue;
22import org.onosproject.store.service.Serializer;
Madan Jampani63c659f2015-06-11 00:52:58 -070023
24import java.util.Set;
25import java.util.concurrent.CompletableFuture;
26import java.util.function.Consumer;
27
Flavio Castro41b1f3a2015-07-31 13:51:32 -070028import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampani63c659f2015-06-11 00:52:58 -070029
30/**
31 * DistributedQueue implementation that provides FIFO ordering semantics.
32 *
33 * @param <E> queue entry type
34 */
Flavio Castro41b1f3a2015-07-31 13:51:32 -070035public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
Madan Jampani63c659f2015-06-11 00:52:58 -070036
37 private final String name;
38 private final Database database;
39 private final Serializer serializer;
40 private final NodeId localNodeId;
41 private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
42 private final Consumer<Set<NodeId>> notifyConsumers;
43
Flavio Castro41b1f3a2015-07-31 13:51:32 -070044 private static final String PRIMITIVE_NAME = "distributedQueue";
45 private static final String SIZE = "size";
46 private static final String PUSH = "push";
47 private static final String POP = "pop";
48 private static final String PEEK = "peek";
49
Madan Jampani63c659f2015-06-11 00:52:58 -070050 private static final String ERROR_NULL_ENTRY = "Null entries are not allowed";
Flavio Castro41b1f3a2015-07-31 13:51:32 -070051 private final MeteringAgent monitor;
Madan Jampani63c659f2015-06-11 00:52:58 -070052
53 public DefaultDistributedQueue(String name,
Flavio Castro41b1f3a2015-07-31 13:51:32 -070054 Database database,
55 Serializer serializer,
56 NodeId localNodeId,
57 boolean meteringEnabled,
58 Consumer<Set<NodeId>> notifyConsumers) {
Madan Jampani63c659f2015-06-11 00:52:58 -070059 this.name = checkNotNull(name, "queue name cannot be null");
60 this.database = checkNotNull(database, "database cannot be null");
61 this.serializer = checkNotNull(serializer, "serializer cannot be null");
62 this.localNodeId = localNodeId;
63 this.notifyConsumers = notifyConsumers;
Flavio Castro41b1f3a2015-07-31 13:51:32 -070064 this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
65
Madan Jampani63c659f2015-06-11 00:52:58 -070066 }
67
68 @Override
69 public long size() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070070 final MeteringAgent.Context timer = monitor.startTimer(SIZE);
71 try {
72 return Futures.getUnchecked(database.queueSize(name));
73 } finally {
74 timer.stop();
75 }
Madan Jampani63c659f2015-06-11 00:52:58 -070076 }
77
78 @Override
79 public void push(E entry) {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070080 final MeteringAgent.Context timer = monitor.startTimer(PUSH);
81 try {
82 checkNotNull(entry, ERROR_NULL_ENTRY);
83 Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
84 .thenAccept(notifyConsumers)
85 .thenApply(v -> null));
86 } finally {
87 timer.stop();
88 }
Madan Jampani63c659f2015-06-11 00:52:58 -070089 }
90
91 @Override
92 public CompletableFuture<E> pop() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070093 final MeteringAgent.Context timer = monitor.startTimer(POP);
Madan Jampani63c659f2015-06-11 00:52:58 -070094 return database.queuePop(name, localNodeId)
95 .thenCompose(v -> {
96 if (v != null) {
97 return CompletableFuture.completedFuture(serializer.decode(v));
98 } else {
99 CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
100 pendingFutures.add(newPendingFuture);
101 return newPendingFuture;
102 }
Flavio Castro41b1f3a2015-07-31 13:51:32 -0700103 })
104 .whenComplete((r, e) -> timer.stop());
Madan Jampani63c659f2015-06-11 00:52:58 -0700105 }
106
107 @Override
108 public E peek() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -0700109 final MeteringAgent.Context timer = monitor.startTimer(PEEK);
110 try {
111 return Futures.getUnchecked(database.queuePeek(name)
112 .thenApply(v -> v != null ? serializer.decode(v) : null));
113 } finally {
114 timer.stop();
115 }
Madan Jampani63c659f2015-06-11 00:52:58 -0700116 }
117
118 public String name() {
119 return name;
120 }
121
122 protected void tryPoll() {
123 Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
124 for (CompletableFuture<E> future : pendingFutures) {
125 E entry = Futures.getUnchecked(database.queuePop(name, localNodeId)
126 .thenApply(v -> v != null ? serializer.decode(v) : null));
127 if (entry != null) {
128 future.complete(entry);
129 completedFutures.add(future);
130 } else {
131 break;
132 }
133 }
134 pendingFutures.removeAll(completedFutures);
135 }
136}