blob: 73bc8028419bfd4859006eff6fbd8543117bb29b [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani35708a92016-07-06 10:48:19 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani35708a92016-07-06 10:48:19 -070018import java.util.Collection;
19import java.util.List;
Madan Jampani35708a92016-07-06 10:48:19 -070020import java.util.Timer;
21import java.util.concurrent.CompletableFuture;
22import java.util.concurrent.Executor;
23import java.util.concurrent.ExecutorService;
Madan Jampani35708a92016-07-06 10:48:19 -070024import java.util.concurrent.atomic.AtomicBoolean;
25import java.util.concurrent.atomic.AtomicInteger;
26import java.util.concurrent.atomic.AtomicReference;
27import java.util.function.Consumer;
28
Jordan Halterman2bf177c2017-06-29 01:49:08 -070029import com.google.common.collect.ImmutableList;
30import io.atomix.protocols.raft.proxy.RaftProxy;
Madan Jampani35708a92016-07-06 10:48:19 -070031import org.onlab.util.AbstractAccumulator;
32import org.onlab.util.Accumulator;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070033import org.onlab.util.KryoNamespace;
34import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Add;
35import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Complete;
36import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Take;
37import org.onosproject.store.serializers.KryoNamespaces;
38import org.onosproject.store.service.Serializer;
Madan Jampani35708a92016-07-06 10:48:19 -070039import org.onosproject.store.service.Task;
Madan Jampani819d61d2016-07-25 20:29:43 -070040import org.onosproject.store.service.WorkQueue;
Madan Jampani35708a92016-07-06 10:48:19 -070041import org.onosproject.store.service.WorkQueueStats;
42import org.slf4j.Logger;
43
Jordan Halterman2bf177c2017-06-29 01:49:08 -070044import static java.util.concurrent.Executors.newSingleThreadExecutor;
45import static org.onlab.util.Tools.groupedThreads;
46import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueEvents.TASK_AVAILABLE;
47import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.ADD;
48import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.CLEAR;
49import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.COMPLETE;
50import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.REGISTER;
51import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.STATS;
52import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.TAKE;
53import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.UNREGISTER;
54import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani35708a92016-07-06 10:48:19 -070055
Madan Jampani35708a92016-07-06 10:48:19 -070056/**
57 * Distributed resource providing the {@link WorkQueue} primitive.
58 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070059public class AtomixWorkQueue extends AbstractRaftPrimitive implements WorkQueue<byte[]> {
60 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
61 .register(KryoNamespaces.BASIC)
62 .register(AtomixWorkQueueOperations.NAMESPACE)
63 .register(AtomixWorkQueueEvents.NAMESPACE)
64 .build());
Madan Jampani35708a92016-07-06 10:48:19 -070065
66 private final Logger log = getLogger(getClass());
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070067 private final ExecutorService executor = newSingleThreadExecutor(groupedThreads("AtomixWorkQueue", "%d", log));
Madan Jampani35708a92016-07-06 10:48:19 -070068 private final AtomicReference<TaskProcessor> taskProcessor = new AtomicReference<>();
69 private final Timer timer = new Timer("atomix-work-queue-completer");
70 private final AtomicBoolean isRegistered = new AtomicBoolean(false);
71
Jordan Halterman2bf177c2017-06-29 01:49:08 -070072 public AtomixWorkQueue(RaftProxy proxy) {
73 super(proxy);
74 proxy.addStateChangeListener(state -> {
75 if (state == RaftProxy.State.CONNECTED && isRegistered.get()) {
76 proxy.invoke(REGISTER);
77 }
78 });
79 proxy.addEventListener(TASK_AVAILABLE, this::resumeWork);
Madan Jampani819d61d2016-07-25 20:29:43 -070080 }
81
82 @Override
83 public CompletableFuture<Void> destroy() {
84 executor.shutdown();
85 timer.cancel();
Jordan Halterman2bf177c2017-06-29 01:49:08 -070086 return proxy.invoke(CLEAR);
Madan Jampani35708a92016-07-06 10:48:19 -070087 }
88
89 @Override
90 public CompletableFuture<Void> addMultiple(Collection<byte[]> items) {
91 if (items.isEmpty()) {
92 return CompletableFuture.completedFuture(null);
93 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -070094 return proxy.invoke(ADD, SERIALIZER::encode, new Add(items));
Madan Jampani35708a92016-07-06 10:48:19 -070095 }
96
97 @Override
98 public CompletableFuture<Collection<Task<byte[]>>> take(int maxTasks) {
99 if (maxTasks <= 0) {
100 return CompletableFuture.completedFuture(ImmutableList.of());
101 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700102 return proxy.invoke(TAKE, SERIALIZER::encode, new Take(maxTasks), SERIALIZER::decode);
Madan Jampani35708a92016-07-06 10:48:19 -0700103 }
104
105 @Override
106 public CompletableFuture<Void> complete(Collection<String> taskIds) {
107 if (taskIds.isEmpty()) {
108 return CompletableFuture.completedFuture(null);
109 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700110 return proxy.invoke(COMPLETE, SERIALIZER::encode, new Complete(taskIds));
Madan Jampani35708a92016-07-06 10:48:19 -0700111 }
112
113 @Override
114 public CompletableFuture<Void> registerTaskProcessor(Consumer<byte[]> callback,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700115 int parallelism,
116 Executor executor) {
Madan Jampani35708a92016-07-06 10:48:19 -0700117 Accumulator<String> completedTaskAccumulator =
118 new CompletedTaskAccumulator(timer, 50, 50); // TODO: make configurable
119 taskProcessor.set(new TaskProcessor(callback,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700120 parallelism,
121 executor,
122 completedTaskAccumulator));
Madan Jampani35708a92016-07-06 10:48:19 -0700123 return register().thenCompose(v -> take(parallelism))
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700124 .thenAccept(taskProcessor.get());
Madan Jampani35708a92016-07-06 10:48:19 -0700125 }
126
127 @Override
128 public CompletableFuture<Void> stopProcessing() {
129 return unregister();
130 }
131
132 @Override
133 public CompletableFuture<WorkQueueStats> stats() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700134 return proxy.invoke(STATS, SERIALIZER::decode);
Madan Jampani35708a92016-07-06 10:48:19 -0700135 }
136
137 private void resumeWork() {
138 TaskProcessor activeProcessor = taskProcessor.get();
139 if (activeProcessor == null) {
140 return;
141 }
142 this.take(activeProcessor.headRoom())
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700143 .whenCompleteAsync((tasks, e) -> activeProcessor.accept(tasks), executor);
Madan Jampani35708a92016-07-06 10:48:19 -0700144 }
145
146 private CompletableFuture<Void> register() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700147 return proxy.invoke(REGISTER).thenRun(() -> isRegistered.set(true));
Madan Jampani35708a92016-07-06 10:48:19 -0700148 }
149
150 private CompletableFuture<Void> unregister() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700151 return proxy.invoke(UNREGISTER).thenRun(() -> isRegistered.set(false));
Madan Jampani35708a92016-07-06 10:48:19 -0700152 }
153
154 // TaskId accumulator for paced triggering of task completion calls.
155 private class CompletedTaskAccumulator extends AbstractAccumulator<String> {
156 CompletedTaskAccumulator(Timer timer, int maxTasksToBatch, int maxBatchMillis) {
157 super(timer, maxTasksToBatch, maxBatchMillis, Integer.MAX_VALUE);
158 }
159
160 @Override
161 public void processItems(List<String> items) {
162 complete(items);
163 }
164 }
165
166 private class TaskProcessor implements Consumer<Collection<Task<byte[]>>> {
167
168 private final AtomicInteger headRoom;
169 private final Consumer<byte[]> backingConsumer;
170 private final Executor executor;
171 private final Accumulator<String> taskCompleter;
172
173 public TaskProcessor(Consumer<byte[]> backingConsumer,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700174 int parallelism,
175 Executor executor,
176 Accumulator<String> taskCompleter) {
Madan Jampani35708a92016-07-06 10:48:19 -0700177 this.backingConsumer = backingConsumer;
178 this.headRoom = new AtomicInteger(parallelism);
179 this.executor = executor;
180 this.taskCompleter = taskCompleter;
181 }
182
183 public int headRoom() {
184 return headRoom.get();
185 }
186
187 @Override
188 public void accept(Collection<Task<byte[]>> tasks) {
189 if (tasks == null) {
190 return;
191 }
192 headRoom.addAndGet(-1 * tasks.size());
193 tasks.forEach(task ->
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700194 executor.execute(() -> {
195 try {
196 backingConsumer.accept(task.payload());
197 taskCompleter.add(task.taskId());
198 } catch (Exception e) {
199 log.debug("Task execution failed", e);
200 } finally {
201 headRoom.incrementAndGet();
202 resumeWork();
203 }
204 }));
Madan Jampani35708a92016-07-06 10:48:19 -0700205 }
206 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700207}