blob: e4cf65f4b440eaca5d8c395a7241c2fef62b2bc2 [file] [log] [blame]
Madan Jampani63c659f2015-06-11 00:52:58 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
Madan Jampani63c659f2015-06-11 00:52:58 -070017
Flavio Castro41b1f3a2015-07-31 13:51:32 -070018import com.google.common.collect.Sets;
19import com.google.common.util.concurrent.Futures;
Madan Jampania6d787b2015-08-11 11:02:02 -070020
21import org.onlab.util.SharedExecutors;
Madan Jampania090a112016-01-18 16:38:17 -080022import org.onosproject.store.service.DistributedPrimitive;
Flavio Castro41b1f3a2015-07-31 13:51:32 -070023import org.onosproject.store.service.DistributedQueue;
24import org.onosproject.store.service.Serializer;
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -080025import org.onosproject.utils.MeteringAgent;
Madan Jampani63c659f2015-06-11 00:52:58 -070026
Madan Jampania6d787b2015-08-11 11:02:02 -070027import java.util.List;
Madan Jampani63c659f2015-06-11 00:52:58 -070028import java.util.Set;
29import java.util.concurrent.CompletableFuture;
Madan Jampania090a112016-01-18 16:38:17 -080030
Flavio Castro41b1f3a2015-07-31 13:51:32 -070031import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampanif4c88502016-01-21 12:35:36 -080032import static org.onosproject.store.primitives.impl.StateMachineUpdate.Target.QUEUE_PUSH;
Madan Jampani63c659f2015-06-11 00:52:58 -070033
34/**
35 * DistributedQueue implementation that provides FIFO ordering semantics.
36 *
37 * @param <E> queue entry type
38 */
Flavio Castro41b1f3a2015-07-31 13:51:32 -070039public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
Madan Jampani63c659f2015-06-11 00:52:58 -070040
41 private final String name;
42 private final Database database;
43 private final Serializer serializer;
Madan Jampani63c659f2015-06-11 00:52:58 -070044 private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
Madan Jampani63c659f2015-06-11 00:52:58 -070045
Flavio Castro41b1f3a2015-07-31 13:51:32 -070046 private static final String PRIMITIVE_NAME = "distributedQueue";
47 private static final String SIZE = "size";
48 private static final String PUSH = "push";
49 private static final String POP = "pop";
50 private static final String PEEK = "peek";
51
Madan Jampani63c659f2015-06-11 00:52:58 -070052 private static final String ERROR_NULL_ENTRY = "Null entries are not allowed";
Flavio Castro41b1f3a2015-07-31 13:51:32 -070053 private final MeteringAgent monitor;
Madan Jampani63c659f2015-06-11 00:52:58 -070054
55 public DefaultDistributedQueue(String name,
Flavio Castro41b1f3a2015-07-31 13:51:32 -070056 Database database,
57 Serializer serializer,
Madan Jampania6d787b2015-08-11 11:02:02 -070058 boolean meteringEnabled) {
Madan Jampani63c659f2015-06-11 00:52:58 -070059 this.name = checkNotNull(name, "queue name cannot be null");
60 this.database = checkNotNull(database, "database cannot be null");
61 this.serializer = checkNotNull(serializer, "serializer cannot be null");
Flavio Castro41b1f3a2015-07-31 13:51:32 -070062 this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
Madan Jampania6d787b2015-08-11 11:02:02 -070063 this.database.registerConsumer(update -> {
64 SharedExecutors.getSingleThreadExecutor().execute(() -> {
65 if (update.target() == QUEUE_PUSH) {
66 List<Object> input = update.input();
67 String queueName = (String) input.get(0);
68 if (queueName.equals(name)) {
69 tryPoll();
70 }
71 }
72 });
73 });
Madan Jampani63c659f2015-06-11 00:52:58 -070074 }
75
76 @Override
77 public long size() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070078 final MeteringAgent.Context timer = monitor.startTimer(SIZE);
Flavio Castro6e044612015-08-13 14:13:58 -070079 return Futures.getUnchecked(database.queueSize(name).whenComplete((r, e) -> timer.stop(e)));
Madan Jampani63c659f2015-06-11 00:52:58 -070080 }
81
82 @Override
83 public void push(E entry) {
Madan Jampania6d787b2015-08-11 11:02:02 -070084 checkNotNull(entry, ERROR_NULL_ENTRY);
Flavio Castro41b1f3a2015-07-31 13:51:32 -070085 final MeteringAgent.Context timer = monitor.startTimer(PUSH);
Madan Jampania6d787b2015-08-11 11:02:02 -070086 Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
Flavio Castro6e044612015-08-13 14:13:58 -070087 .whenComplete((r, e) -> timer.stop(e)));
Madan Jampani63c659f2015-06-11 00:52:58 -070088 }
89
90 @Override
91 public CompletableFuture<E> pop() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070092 final MeteringAgent.Context timer = monitor.startTimer(POP);
Madan Jampania6d787b2015-08-11 11:02:02 -070093 return database.queuePop(name)
Flavio Castro6e044612015-08-13 14:13:58 -070094 .whenComplete((r, e) -> timer.stop(e))
Madan Jampani63c659f2015-06-11 00:52:58 -070095 .thenCompose(v -> {
96 if (v != null) {
HIGUCHI Yuta1b9c8ac2015-08-28 15:35:43 -070097 return CompletableFuture.<E>completedFuture(serializer.decode(v));
Madan Jampani63c659f2015-06-11 00:52:58 -070098 }
Madan Jampania6d787b2015-08-11 11:02:02 -070099 CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
100 pendingFutures.add(newPendingFuture);
101 return newPendingFuture;
102 });
Flavio Castro6e044612015-08-13 14:13:58 -0700103
Madan Jampani63c659f2015-06-11 00:52:58 -0700104 }
105
106 @Override
107 public E peek() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -0700108 final MeteringAgent.Context timer = monitor.startTimer(PEEK);
Madan Jampania6d787b2015-08-11 11:02:02 -0700109 return Futures.getUnchecked(database.queuePeek(name)
110 .thenApply(v -> v != null ? serializer.<E>decode(v) : null)
Flavio Castro6e044612015-08-13 14:13:58 -0700111 .whenComplete((r, e) -> timer.stop(e)));
Madan Jampani63c659f2015-06-11 00:52:58 -0700112 }
113
Madan Jampania090a112016-01-18 16:38:17 -0800114 @Override
Madan Jampani63c659f2015-06-11 00:52:58 -0700115 public String name() {
116 return name;
117 }
118
Madan Jampania090a112016-01-18 16:38:17 -0800119 @Override
120 public DistributedPrimitive.Type type() {
121 return DistributedPrimitive.Type.QUEUE;
122 }
123
Madan Jampani63c659f2015-06-11 00:52:58 -0700124 protected void tryPoll() {
125 Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
126 for (CompletableFuture<E> future : pendingFutures) {
Madan Jampania6d787b2015-08-11 11:02:02 -0700127 E entry = Futures.getUnchecked(database.queuePop(name)
Madan Jampani63c659f2015-06-11 00:52:58 -0700128 .thenApply(v -> v != null ? serializer.decode(v) : null));
129 if (entry != null) {
130 future.complete(entry);
131 completedFutures.add(future);
132 } else {
133 break;
134 }
135 }
136 pendingFutures.removeAll(completedFutures);
137 }
138}