blob: 0085932bdce090c79bc77903a906dab00d46c9fa [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070018import static java.util.concurrent.Executors.newSingleThreadExecutor;
19import static org.onlab.util.Tools.groupedThreads;
Madan Jampani35708a92016-07-06 10:48:19 -070020import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani819d61d2016-07-25 20:29:43 -070021import io.atomix.copycat.client.CopycatClient;
22import io.atomix.resource.AbstractResource;
23import io.atomix.resource.ResourceTypeInfo;
Madan Jampani35708a92016-07-06 10:48:19 -070024
25import java.util.Collection;
26import java.util.List;
27import java.util.Properties;
28import java.util.Timer;
29import java.util.concurrent.CompletableFuture;
30import java.util.concurrent.Executor;
31import java.util.concurrent.ExecutorService;
Madan Jampani35708a92016-07-06 10:48:19 -070032import java.util.concurrent.atomic.AtomicBoolean;
33import java.util.concurrent.atomic.AtomicInteger;
34import java.util.concurrent.atomic.AtomicReference;
35import java.util.function.Consumer;
36
37import org.onlab.util.AbstractAccumulator;
38import org.onlab.util.Accumulator;
39import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
Madan Jampani819d61d2016-07-25 20:29:43 -070040import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
Madan Jampani35708a92016-07-06 10:48:19 -070041import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
42import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
43import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
44import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
45import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
Madan Jampani35708a92016-07-06 10:48:19 -070046import org.onosproject.store.service.Task;
Madan Jampani819d61d2016-07-25 20:29:43 -070047import org.onosproject.store.service.WorkQueue;
Madan Jampani35708a92016-07-06 10:48:19 -070048import org.onosproject.store.service.WorkQueueStats;
49import org.slf4j.Logger;
50
51import com.google.common.collect.ImmutableList;
52
Madan Jampani35708a92016-07-06 10:48:19 -070053/**
54 * Distributed resource providing the {@link WorkQueue} primitive.
55 */
56@ResourceTypeInfo(id = -154, factory = AtomixWorkQueueFactory.class)
57public class AtomixWorkQueue extends AbstractResource<AtomixWorkQueue>
58 implements WorkQueue<byte[]> {
59
60 private final Logger log = getLogger(getClass());
61 public static final String TASK_AVAILABLE = "task-available";
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070062 private final ExecutorService executor = newSingleThreadExecutor(groupedThreads("AtomixWorkQueue", "%d", log));
Madan Jampani35708a92016-07-06 10:48:19 -070063 private final AtomicReference<TaskProcessor> taskProcessor = new AtomicReference<>();
64 private final Timer timer = new Timer("atomix-work-queue-completer");
65 private final AtomicBoolean isRegistered = new AtomicBoolean(false);
66
67 protected AtomixWorkQueue(CopycatClient client, Properties options) {
68 super(client, options);
69 }
70
71 @Override
Madan Jampani819d61d2016-07-25 20:29:43 -070072 public String name() {
73 return null;
74 }
75
76 @Override
77 public CompletableFuture<Void> destroy() {
78 executor.shutdown();
79 timer.cancel();
80 return client.submit(new Clear());
81 }
82
83 @Override
Madan Jampani35708a92016-07-06 10:48:19 -070084 public CompletableFuture<AtomixWorkQueue> open() {
85 return super.open().thenApply(result -> {
86 client.onStateChange(state -> {
87 if (state == CopycatClient.State.CONNECTED && isRegistered.get()) {
88 client.submit(new Register());
89 }
90 });
91 client.onEvent(TASK_AVAILABLE, this::resumeWork);
92 return result;
93 });
94 }
95
96 @Override
97 public CompletableFuture<Void> addMultiple(Collection<byte[]> items) {
98 if (items.isEmpty()) {
99 return CompletableFuture.completedFuture(null);
100 }
101 return client.submit(new Add(items));
102 }
103
104 @Override
105 public CompletableFuture<Collection<Task<byte[]>>> take(int maxTasks) {
106 if (maxTasks <= 0) {
107 return CompletableFuture.completedFuture(ImmutableList.of());
108 }
109 return client.submit(new Take(maxTasks));
110 }
111
112 @Override
113 public CompletableFuture<Void> complete(Collection<String> taskIds) {
114 if (taskIds.isEmpty()) {
115 return CompletableFuture.completedFuture(null);
116 }
117 return client.submit(new Complete(taskIds));
118 }
119
120 @Override
121 public CompletableFuture<Void> registerTaskProcessor(Consumer<byte[]> callback,
122 int parallelism,
123 Executor executor) {
124 Accumulator<String> completedTaskAccumulator =
125 new CompletedTaskAccumulator(timer, 50, 50); // TODO: make configurable
126 taskProcessor.set(new TaskProcessor(callback,
127 parallelism,
128 executor,
129 completedTaskAccumulator));
130 return register().thenCompose(v -> take(parallelism))
131 .thenAccept(taskProcessor.get()::accept);
132 }
133
134 @Override
135 public CompletableFuture<Void> stopProcessing() {
136 return unregister();
137 }
138
139 @Override
140 public CompletableFuture<WorkQueueStats> stats() {
141 return client.submit(new Stats());
142 }
143
144 private void resumeWork() {
145 TaskProcessor activeProcessor = taskProcessor.get();
146 if (activeProcessor == null) {
147 return;
148 }
149 this.take(activeProcessor.headRoom())
150 .whenCompleteAsync((tasks, e) -> activeProcessor.accept(tasks), executor);
151 }
152
153 private CompletableFuture<Void> register() {
154 return client.submit(new Register()).thenRun(() -> isRegistered.set(true));
155 }
156
157 private CompletableFuture<Void> unregister() {
158 return client.submit(new Unregister()).thenRun(() -> isRegistered.set(false));
159 }
160
161 // TaskId accumulator for paced triggering of task completion calls.
162 private class CompletedTaskAccumulator extends AbstractAccumulator<String> {
163 CompletedTaskAccumulator(Timer timer, int maxTasksToBatch, int maxBatchMillis) {
164 super(timer, maxTasksToBatch, maxBatchMillis, Integer.MAX_VALUE);
165 }
166
167 @Override
168 public void processItems(List<String> items) {
169 complete(items);
170 }
171 }
172
173 private class TaskProcessor implements Consumer<Collection<Task<byte[]>>> {
174
175 private final AtomicInteger headRoom;
176 private final Consumer<byte[]> backingConsumer;
177 private final Executor executor;
178 private final Accumulator<String> taskCompleter;
179
180 public TaskProcessor(Consumer<byte[]> backingConsumer,
181 int parallelism,
182 Executor executor,
183 Accumulator<String> taskCompleter) {
184 this.backingConsumer = backingConsumer;
185 this.headRoom = new AtomicInteger(parallelism);
186 this.executor = executor;
187 this.taskCompleter = taskCompleter;
188 }
189
190 public int headRoom() {
191 return headRoom.get();
192 }
193
194 @Override
195 public void accept(Collection<Task<byte[]>> tasks) {
196 if (tasks == null) {
197 return;
198 }
199 headRoom.addAndGet(-1 * tasks.size());
200 tasks.forEach(task ->
201 executor.execute(() -> {
202 try {
203 backingConsumer.accept(task.payload());
204 taskCompleter.add(task.taskId());
205 } catch (Exception e) {
206 log.debug("Task execution failed", e);
207 } finally {
208 headRoom.incrementAndGet();
209 resumeWork();
210 }
211 }));
212 }
213 }
214}