blob: 7b4ad473d6494c0b98343fd212e4cd42913450a6 [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
19
20import java.util.Collection;
21import java.util.List;
22import java.util.Properties;
23import java.util.Timer;
24import java.util.concurrent.CompletableFuture;
25import java.util.concurrent.Executor;
26import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Executors;
28import java.util.concurrent.atomic.AtomicBoolean;
29import java.util.concurrent.atomic.AtomicInteger;
30import java.util.concurrent.atomic.AtomicReference;
31import java.util.function.Consumer;
32
33import org.onlab.util.AbstractAccumulator;
34import org.onlab.util.Accumulator;
35import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
36import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
37import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
38import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
39import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
40import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
41import org.onosproject.store.service.WorkQueue;
42import org.onosproject.store.service.Task;
43import org.onosproject.store.service.WorkQueueStats;
44import org.slf4j.Logger;
45
46import com.google.common.collect.ImmutableList;
47
48import io.atomix.copycat.client.CopycatClient;
49import io.atomix.resource.AbstractResource;
50import io.atomix.resource.ResourceTypeInfo;
51
52/**
53 * Distributed resource providing the {@link WorkQueue} primitive.
54 */
55@ResourceTypeInfo(id = -154, factory = AtomixWorkQueueFactory.class)
56public class AtomixWorkQueue extends AbstractResource<AtomixWorkQueue>
57 implements WorkQueue<byte[]> {
58
59 private final Logger log = getLogger(getClass());
60 public static final String TASK_AVAILABLE = "task-available";
61 private final ExecutorService executor = Executors.newSingleThreadExecutor();
62 private final AtomicReference<TaskProcessor> taskProcessor = new AtomicReference<>();
63 private final Timer timer = new Timer("atomix-work-queue-completer");
64 private final AtomicBoolean isRegistered = new AtomicBoolean(false);
65
66 protected AtomixWorkQueue(CopycatClient client, Properties options) {
67 super(client, options);
68 }
69
70 @Override
71 public CompletableFuture<AtomixWorkQueue> open() {
72 return super.open().thenApply(result -> {
73 client.onStateChange(state -> {
74 if (state == CopycatClient.State.CONNECTED && isRegistered.get()) {
75 client.submit(new Register());
76 }
77 });
78 client.onEvent(TASK_AVAILABLE, this::resumeWork);
79 return result;
80 });
81 }
82
83 @Override
84 public CompletableFuture<Void> addMultiple(Collection<byte[]> items) {
85 if (items.isEmpty()) {
86 return CompletableFuture.completedFuture(null);
87 }
88 return client.submit(new Add(items));
89 }
90
91 @Override
92 public CompletableFuture<Collection<Task<byte[]>>> take(int maxTasks) {
93 if (maxTasks <= 0) {
94 return CompletableFuture.completedFuture(ImmutableList.of());
95 }
96 return client.submit(new Take(maxTasks));
97 }
98
99 @Override
100 public CompletableFuture<Void> complete(Collection<String> taskIds) {
101 if (taskIds.isEmpty()) {
102 return CompletableFuture.completedFuture(null);
103 }
104 return client.submit(new Complete(taskIds));
105 }
106
107 @Override
108 public CompletableFuture<Void> registerTaskProcessor(Consumer<byte[]> callback,
109 int parallelism,
110 Executor executor) {
111 Accumulator<String> completedTaskAccumulator =
112 new CompletedTaskAccumulator(timer, 50, 50); // TODO: make configurable
113 taskProcessor.set(new TaskProcessor(callback,
114 parallelism,
115 executor,
116 completedTaskAccumulator));
117 return register().thenCompose(v -> take(parallelism))
118 .thenAccept(taskProcessor.get()::accept);
119 }
120
121 @Override
122 public CompletableFuture<Void> stopProcessing() {
123 return unregister();
124 }
125
126 @Override
127 public CompletableFuture<WorkQueueStats> stats() {
128 return client.submit(new Stats());
129 }
130
131 private void resumeWork() {
132 TaskProcessor activeProcessor = taskProcessor.get();
133 if (activeProcessor == null) {
134 return;
135 }
136 this.take(activeProcessor.headRoom())
137 .whenCompleteAsync((tasks, e) -> activeProcessor.accept(tasks), executor);
138 }
139
140 private CompletableFuture<Void> register() {
141 return client.submit(new Register()).thenRun(() -> isRegistered.set(true));
142 }
143
144 private CompletableFuture<Void> unregister() {
145 return client.submit(new Unregister()).thenRun(() -> isRegistered.set(false));
146 }
147
148 // TaskId accumulator for paced triggering of task completion calls.
149 private class CompletedTaskAccumulator extends AbstractAccumulator<String> {
150 CompletedTaskAccumulator(Timer timer, int maxTasksToBatch, int maxBatchMillis) {
151 super(timer, maxTasksToBatch, maxBatchMillis, Integer.MAX_VALUE);
152 }
153
154 @Override
155 public void processItems(List<String> items) {
156 complete(items);
157 }
158 }
159
160 private class TaskProcessor implements Consumer<Collection<Task<byte[]>>> {
161
162 private final AtomicInteger headRoom;
163 private final Consumer<byte[]> backingConsumer;
164 private final Executor executor;
165 private final Accumulator<String> taskCompleter;
166
167 public TaskProcessor(Consumer<byte[]> backingConsumer,
168 int parallelism,
169 Executor executor,
170 Accumulator<String> taskCompleter) {
171 this.backingConsumer = backingConsumer;
172 this.headRoom = new AtomicInteger(parallelism);
173 this.executor = executor;
174 this.taskCompleter = taskCompleter;
175 }
176
177 public int headRoom() {
178 return headRoom.get();
179 }
180
181 @Override
182 public void accept(Collection<Task<byte[]>> tasks) {
183 if (tasks == null) {
184 return;
185 }
186 headRoom.addAndGet(-1 * tasks.size());
187 tasks.forEach(task ->
188 executor.execute(() -> {
189 try {
190 backingConsumer.accept(task.payload());
191 taskCompleter.add(task.taskId());
192 } catch (Exception e) {
193 log.debug("Task execution failed", e);
194 } finally {
195 headRoom.incrementAndGet();
196 resumeWork();
197 }
198 }));
199 }
200 }
201}