blob: 879cbb35d78478bad3dd6fe1088e06dc84b21597 [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070018import static java.util.concurrent.Executors.newSingleThreadExecutor;
19import static org.onlab.util.Tools.groupedThreads;
Madan Jampani35708a92016-07-06 10:48:19 -070020import static org.slf4j.LoggerFactory.getLogger;
21
22import java.util.Collection;
23import java.util.List;
24import java.util.Properties;
25import java.util.Timer;
26import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.Executor;
28import java.util.concurrent.ExecutorService;
Madan Jampani35708a92016-07-06 10:48:19 -070029import java.util.concurrent.atomic.AtomicBoolean;
30import java.util.concurrent.atomic.AtomicInteger;
31import java.util.concurrent.atomic.AtomicReference;
32import java.util.function.Consumer;
33
34import org.onlab.util.AbstractAccumulator;
35import org.onlab.util.Accumulator;
36import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
37import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
38import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
39import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
40import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
41import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
42import org.onosproject.store.service.WorkQueue;
43import org.onosproject.store.service.Task;
44import org.onosproject.store.service.WorkQueueStats;
45import org.slf4j.Logger;
46
47import com.google.common.collect.ImmutableList;
48
49import io.atomix.copycat.client.CopycatClient;
50import io.atomix.resource.AbstractResource;
51import io.atomix.resource.ResourceTypeInfo;
52
53/**
54 * Distributed resource providing the {@link WorkQueue} primitive.
55 */
56@ResourceTypeInfo(id = -154, factory = AtomixWorkQueueFactory.class)
57public class AtomixWorkQueue extends AbstractResource<AtomixWorkQueue>
58 implements WorkQueue<byte[]> {
59
60 private final Logger log = getLogger(getClass());
61 public static final String TASK_AVAILABLE = "task-available";
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070062 private final ExecutorService executor = newSingleThreadExecutor(groupedThreads("AtomixWorkQueue", "%d", log));
Madan Jampani35708a92016-07-06 10:48:19 -070063 private final AtomicReference<TaskProcessor> taskProcessor = new AtomicReference<>();
64 private final Timer timer = new Timer("atomix-work-queue-completer");
65 private final AtomicBoolean isRegistered = new AtomicBoolean(false);
66
67 protected AtomixWorkQueue(CopycatClient client, Properties options) {
68 super(client, options);
69 }
70
71 @Override
72 public CompletableFuture<AtomixWorkQueue> open() {
73 return super.open().thenApply(result -> {
74 client.onStateChange(state -> {
75 if (state == CopycatClient.State.CONNECTED && isRegistered.get()) {
76 client.submit(new Register());
77 }
78 });
79 client.onEvent(TASK_AVAILABLE, this::resumeWork);
80 return result;
81 });
82 }
83
84 @Override
85 public CompletableFuture<Void> addMultiple(Collection<byte[]> items) {
86 if (items.isEmpty()) {
87 return CompletableFuture.completedFuture(null);
88 }
89 return client.submit(new Add(items));
90 }
91
92 @Override
93 public CompletableFuture<Collection<Task<byte[]>>> take(int maxTasks) {
94 if (maxTasks <= 0) {
95 return CompletableFuture.completedFuture(ImmutableList.of());
96 }
97 return client.submit(new Take(maxTasks));
98 }
99
100 @Override
101 public CompletableFuture<Void> complete(Collection<String> taskIds) {
102 if (taskIds.isEmpty()) {
103 return CompletableFuture.completedFuture(null);
104 }
105 return client.submit(new Complete(taskIds));
106 }
107
108 @Override
109 public CompletableFuture<Void> registerTaskProcessor(Consumer<byte[]> callback,
110 int parallelism,
111 Executor executor) {
112 Accumulator<String> completedTaskAccumulator =
113 new CompletedTaskAccumulator(timer, 50, 50); // TODO: make configurable
114 taskProcessor.set(new TaskProcessor(callback,
115 parallelism,
116 executor,
117 completedTaskAccumulator));
118 return register().thenCompose(v -> take(parallelism))
119 .thenAccept(taskProcessor.get()::accept);
120 }
121
122 @Override
123 public CompletableFuture<Void> stopProcessing() {
124 return unregister();
125 }
126
127 @Override
128 public CompletableFuture<WorkQueueStats> stats() {
129 return client.submit(new Stats());
130 }
131
132 private void resumeWork() {
133 TaskProcessor activeProcessor = taskProcessor.get();
134 if (activeProcessor == null) {
135 return;
136 }
137 this.take(activeProcessor.headRoom())
138 .whenCompleteAsync((tasks, e) -> activeProcessor.accept(tasks), executor);
139 }
140
141 private CompletableFuture<Void> register() {
142 return client.submit(new Register()).thenRun(() -> isRegistered.set(true));
143 }
144
145 private CompletableFuture<Void> unregister() {
146 return client.submit(new Unregister()).thenRun(() -> isRegistered.set(false));
147 }
148
149 // TaskId accumulator for paced triggering of task completion calls.
150 private class CompletedTaskAccumulator extends AbstractAccumulator<String> {
151 CompletedTaskAccumulator(Timer timer, int maxTasksToBatch, int maxBatchMillis) {
152 super(timer, maxTasksToBatch, maxBatchMillis, Integer.MAX_VALUE);
153 }
154
155 @Override
156 public void processItems(List<String> items) {
157 complete(items);
158 }
159 }
160
161 private class TaskProcessor implements Consumer<Collection<Task<byte[]>>> {
162
163 private final AtomicInteger headRoom;
164 private final Consumer<byte[]> backingConsumer;
165 private final Executor executor;
166 private final Accumulator<String> taskCompleter;
167
168 public TaskProcessor(Consumer<byte[]> backingConsumer,
169 int parallelism,
170 Executor executor,
171 Accumulator<String> taskCompleter) {
172 this.backingConsumer = backingConsumer;
173 this.headRoom = new AtomicInteger(parallelism);
174 this.executor = executor;
175 this.taskCompleter = taskCompleter;
176 }
177
178 public int headRoom() {
179 return headRoom.get();
180 }
181
182 @Override
183 public void accept(Collection<Task<byte[]>> tasks) {
184 if (tasks == null) {
185 return;
186 }
187 headRoom.addAndGet(-1 * tasks.size());
188 tasks.forEach(task ->
189 executor.execute(() -> {
190 try {
191 backingConsumer.accept(task.payload());
192 taskCompleter.add(task.taskId());
193 } catch (Exception e) {
194 log.debug("Task execution failed", e);
195 } finally {
196 headRoom.incrementAndGet();
197 resumeWork();
198 }
199 }));
200 }
201 }
202}