blob: 8ea1d72ae3bc9eaef8c7ed36c44db51c8be59544 [file] [log] [blame]
Madan Jampani63c659f2015-06-11 00:52:58 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
Madan Jampani63c659f2015-06-11 00:52:58 -070017
Flavio Castro41b1f3a2015-07-31 13:51:32 -070018import com.google.common.collect.Sets;
19import com.google.common.util.concurrent.Futures;
Madan Jampania6d787b2015-08-11 11:02:02 -070020
21import org.onlab.util.SharedExecutors;
Madan Jampania090a112016-01-18 16:38:17 -080022import org.onosproject.store.service.DistributedPrimitive;
Flavio Castro41b1f3a2015-07-31 13:51:32 -070023import org.onosproject.store.service.DistributedQueue;
24import org.onosproject.store.service.Serializer;
Madan Jampani63c659f2015-06-11 00:52:58 -070025
Madan Jampania6d787b2015-08-11 11:02:02 -070026import java.util.List;
Madan Jampani63c659f2015-06-11 00:52:58 -070027import java.util.Set;
28import java.util.concurrent.CompletableFuture;
Madan Jampania090a112016-01-18 16:38:17 -080029
Flavio Castro41b1f3a2015-07-31 13:51:32 -070030import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampanif4c88502016-01-21 12:35:36 -080031import static org.onosproject.store.primitives.impl.StateMachineUpdate.Target.QUEUE_PUSH;
Madan Jampani63c659f2015-06-11 00:52:58 -070032
33/**
34 * DistributedQueue implementation that provides FIFO ordering semantics.
35 *
36 * @param <E> queue entry type
37 */
Flavio Castro41b1f3a2015-07-31 13:51:32 -070038public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
Madan Jampani63c659f2015-06-11 00:52:58 -070039
40 private final String name;
41 private final Database database;
42 private final Serializer serializer;
Madan Jampani63c659f2015-06-11 00:52:58 -070043 private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
Madan Jampani63c659f2015-06-11 00:52:58 -070044
Flavio Castro41b1f3a2015-07-31 13:51:32 -070045 private static final String PRIMITIVE_NAME = "distributedQueue";
46 private static final String SIZE = "size";
47 private static final String PUSH = "push";
48 private static final String POP = "pop";
49 private static final String PEEK = "peek";
50
Madan Jampani63c659f2015-06-11 00:52:58 -070051 private static final String ERROR_NULL_ENTRY = "Null entries are not allowed";
Flavio Castro41b1f3a2015-07-31 13:51:32 -070052 private final MeteringAgent monitor;
Madan Jampani63c659f2015-06-11 00:52:58 -070053
54 public DefaultDistributedQueue(String name,
Flavio Castro41b1f3a2015-07-31 13:51:32 -070055 Database database,
56 Serializer serializer,
Madan Jampania6d787b2015-08-11 11:02:02 -070057 boolean meteringEnabled) {
Madan Jampani63c659f2015-06-11 00:52:58 -070058 this.name = checkNotNull(name, "queue name cannot be null");
59 this.database = checkNotNull(database, "database cannot be null");
60 this.serializer = checkNotNull(serializer, "serializer cannot be null");
Flavio Castro41b1f3a2015-07-31 13:51:32 -070061 this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
Madan Jampania6d787b2015-08-11 11:02:02 -070062 this.database.registerConsumer(update -> {
63 SharedExecutors.getSingleThreadExecutor().execute(() -> {
64 if (update.target() == QUEUE_PUSH) {
65 List<Object> input = update.input();
66 String queueName = (String) input.get(0);
67 if (queueName.equals(name)) {
68 tryPoll();
69 }
70 }
71 });
72 });
Madan Jampani63c659f2015-06-11 00:52:58 -070073 }
74
75 @Override
76 public long size() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070077 final MeteringAgent.Context timer = monitor.startTimer(SIZE);
Flavio Castro6e044612015-08-13 14:13:58 -070078 return Futures.getUnchecked(database.queueSize(name).whenComplete((r, e) -> timer.stop(e)));
Madan Jampani63c659f2015-06-11 00:52:58 -070079 }
80
81 @Override
82 public void push(E entry) {
Madan Jampania6d787b2015-08-11 11:02:02 -070083 checkNotNull(entry, ERROR_NULL_ENTRY);
Flavio Castro41b1f3a2015-07-31 13:51:32 -070084 final MeteringAgent.Context timer = monitor.startTimer(PUSH);
Madan Jampania6d787b2015-08-11 11:02:02 -070085 Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
Flavio Castro6e044612015-08-13 14:13:58 -070086 .whenComplete((r, e) -> timer.stop(e)));
Madan Jampani63c659f2015-06-11 00:52:58 -070087 }
88
89 @Override
90 public CompletableFuture<E> pop() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070091 final MeteringAgent.Context timer = monitor.startTimer(POP);
Madan Jampania6d787b2015-08-11 11:02:02 -070092 return database.queuePop(name)
Flavio Castro6e044612015-08-13 14:13:58 -070093 .whenComplete((r, e) -> timer.stop(e))
Madan Jampani63c659f2015-06-11 00:52:58 -070094 .thenCompose(v -> {
95 if (v != null) {
HIGUCHI Yuta1b9c8ac2015-08-28 15:35:43 -070096 return CompletableFuture.<E>completedFuture(serializer.decode(v));
Madan Jampani63c659f2015-06-11 00:52:58 -070097 }
Madan Jampania6d787b2015-08-11 11:02:02 -070098 CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
99 pendingFutures.add(newPendingFuture);
100 return newPendingFuture;
101 });
Flavio Castro6e044612015-08-13 14:13:58 -0700102
Madan Jampani63c659f2015-06-11 00:52:58 -0700103 }
104
105 @Override
106 public E peek() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -0700107 final MeteringAgent.Context timer = monitor.startTimer(PEEK);
Madan Jampania6d787b2015-08-11 11:02:02 -0700108 return Futures.getUnchecked(database.queuePeek(name)
109 .thenApply(v -> v != null ? serializer.<E>decode(v) : null)
Flavio Castro6e044612015-08-13 14:13:58 -0700110 .whenComplete((r, e) -> timer.stop(e)));
Madan Jampani63c659f2015-06-11 00:52:58 -0700111 }
112
Madan Jampania090a112016-01-18 16:38:17 -0800113 @Override
Madan Jampani63c659f2015-06-11 00:52:58 -0700114 public String name() {
115 return name;
116 }
117
Madan Jampania090a112016-01-18 16:38:17 -0800118 @Override
119 public DistributedPrimitive.Type type() {
120 return DistributedPrimitive.Type.QUEUE;
121 }
122
Madan Jampani63c659f2015-06-11 00:52:58 -0700123 protected void tryPoll() {
124 Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
125 for (CompletableFuture<E> future : pendingFutures) {
Madan Jampania6d787b2015-08-11 11:02:02 -0700126 E entry = Futures.getUnchecked(database.queuePop(name)
Madan Jampani63c659f2015-06-11 00:52:58 -0700127 .thenApply(v -> v != null ? serializer.decode(v) : null));
128 if (entry != null) {
129 future.complete(entry);
130 completedFutures.add(future);
131 } else {
132 break;
133 }
134 }
135 pendingFutures.removeAll(completedFutures);
136 }
137}