blob: 5f69fde87b795851cac3c0b863375a71b974ea7f [file] [log] [blame]
Madan Jampani63c659f2015-06-11 00:52:58 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.consistent.impl;
17
Flavio Castro41b1f3a2015-07-31 13:51:32 -070018import com.google.common.collect.Sets;
19import com.google.common.util.concurrent.Futures;
Madan Jampania6d787b2015-08-11 11:02:02 -070020
21import org.onlab.util.SharedExecutors;
Flavio Castro41b1f3a2015-07-31 13:51:32 -070022import org.onosproject.store.service.DistributedQueue;
23import org.onosproject.store.service.Serializer;
Madan Jampani63c659f2015-06-11 00:52:58 -070024
Madan Jampania6d787b2015-08-11 11:02:02 -070025import java.util.List;
Madan Jampani63c659f2015-06-11 00:52:58 -070026import java.util.Set;
27import java.util.concurrent.CompletableFuture;
Flavio Castro41b1f3a2015-07-31 13:51:32 -070028import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampania6d787b2015-08-11 11:02:02 -070029import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.QUEUE_PUSH;
Madan Jampani63c659f2015-06-11 00:52:58 -070030
31/**
32 * DistributedQueue implementation that provides FIFO ordering semantics.
33 *
34 * @param <E> queue entry type
35 */
Flavio Castro41b1f3a2015-07-31 13:51:32 -070036public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
Madan Jampani63c659f2015-06-11 00:52:58 -070037
38 private final String name;
39 private final Database database;
40 private final Serializer serializer;
Madan Jampani63c659f2015-06-11 00:52:58 -070041 private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
Madan Jampani63c659f2015-06-11 00:52:58 -070042
Flavio Castro41b1f3a2015-07-31 13:51:32 -070043 private static final String PRIMITIVE_NAME = "distributedQueue";
44 private static final String SIZE = "size";
45 private static final String PUSH = "push";
46 private static final String POP = "pop";
47 private static final String PEEK = "peek";
48
Madan Jampani63c659f2015-06-11 00:52:58 -070049 private static final String ERROR_NULL_ENTRY = "Null entries are not allowed";
Flavio Castro41b1f3a2015-07-31 13:51:32 -070050 private final MeteringAgent monitor;
Madan Jampani63c659f2015-06-11 00:52:58 -070051
52 public DefaultDistributedQueue(String name,
Flavio Castro41b1f3a2015-07-31 13:51:32 -070053 Database database,
54 Serializer serializer,
Madan Jampania6d787b2015-08-11 11:02:02 -070055 boolean meteringEnabled) {
Madan Jampani63c659f2015-06-11 00:52:58 -070056 this.name = checkNotNull(name, "queue name cannot be null");
57 this.database = checkNotNull(database, "database cannot be null");
58 this.serializer = checkNotNull(serializer, "serializer cannot be null");
Flavio Castro41b1f3a2015-07-31 13:51:32 -070059 this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
Madan Jampania6d787b2015-08-11 11:02:02 -070060 this.database.registerConsumer(update -> {
61 SharedExecutors.getSingleThreadExecutor().execute(() -> {
62 if (update.target() == QUEUE_PUSH) {
63 List<Object> input = update.input();
64 String queueName = (String) input.get(0);
65 if (queueName.equals(name)) {
66 tryPoll();
67 }
68 }
69 });
70 });
Madan Jampani63c659f2015-06-11 00:52:58 -070071 }
72
73 @Override
74 public long size() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070075 final MeteringAgent.Context timer = monitor.startTimer(SIZE);
Flavio Castro6e044612015-08-13 14:13:58 -070076 return Futures.getUnchecked(database.queueSize(name).whenComplete((r, e) -> timer.stop(e)));
Madan Jampani63c659f2015-06-11 00:52:58 -070077 }
78
79 @Override
80 public void push(E entry) {
Madan Jampania6d787b2015-08-11 11:02:02 -070081 checkNotNull(entry, ERROR_NULL_ENTRY);
Flavio Castro41b1f3a2015-07-31 13:51:32 -070082 final MeteringAgent.Context timer = monitor.startTimer(PUSH);
Madan Jampania6d787b2015-08-11 11:02:02 -070083 Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
Flavio Castro6e044612015-08-13 14:13:58 -070084 .whenComplete((r, e) -> timer.stop(e)));
Madan Jampani63c659f2015-06-11 00:52:58 -070085 }
86
87 @Override
88 public CompletableFuture<E> pop() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070089 final MeteringAgent.Context timer = monitor.startTimer(POP);
Madan Jampania6d787b2015-08-11 11:02:02 -070090 return database.queuePop(name)
Flavio Castro6e044612015-08-13 14:13:58 -070091 .whenComplete((r, e) -> timer.stop(e))
Madan Jampani63c659f2015-06-11 00:52:58 -070092 .thenCompose(v -> {
93 if (v != null) {
HIGUCHI Yuta1b9c8ac2015-08-28 15:35:43 -070094 return CompletableFuture.<E>completedFuture(serializer.decode(v));
Madan Jampani63c659f2015-06-11 00:52:58 -070095 }
Madan Jampania6d787b2015-08-11 11:02:02 -070096 CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
97 pendingFutures.add(newPendingFuture);
98 return newPendingFuture;
99 });
Flavio Castro6e044612015-08-13 14:13:58 -0700100
Madan Jampani63c659f2015-06-11 00:52:58 -0700101 }
102
103 @Override
104 public E peek() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -0700105 final MeteringAgent.Context timer = monitor.startTimer(PEEK);
Madan Jampania6d787b2015-08-11 11:02:02 -0700106 return Futures.getUnchecked(database.queuePeek(name)
107 .thenApply(v -> v != null ? serializer.<E>decode(v) : null)
Flavio Castro6e044612015-08-13 14:13:58 -0700108 .whenComplete((r, e) -> timer.stop(e)));
Madan Jampani63c659f2015-06-11 00:52:58 -0700109 }
110
111 public String name() {
112 return name;
113 }
114
115 protected void tryPoll() {
116 Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
117 for (CompletableFuture<E> future : pendingFutures) {
Madan Jampania6d787b2015-08-11 11:02:02 -0700118 E entry = Futures.getUnchecked(database.queuePop(name)
Madan Jampani63c659f2015-06-11 00:52:58 -0700119 .thenApply(v -> v != null ? serializer.decode(v) : null));
120 if (entry != null) {
121 future.complete(entry);
122 completedFutures.add(future);
123 } else {
124 break;
125 }
126 }
127 pendingFutures.removeAll(completedFutures);
128 }
129}