Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016-present Open Networking Laboratory |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.onosproject.store.primitives.resources.impl; |
| 17 | |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 18 | import java.util.Collection; |
| 19 | import java.util.List; |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 20 | import java.util.Timer; |
| 21 | import java.util.concurrent.CompletableFuture; |
| 22 | import java.util.concurrent.Executor; |
| 23 | import java.util.concurrent.ExecutorService; |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 24 | import java.util.concurrent.atomic.AtomicBoolean; |
| 25 | import java.util.concurrent.atomic.AtomicInteger; |
| 26 | import java.util.concurrent.atomic.AtomicReference; |
| 27 | import java.util.function.Consumer; |
| 28 | |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 29 | import com.google.common.collect.ImmutableList; |
| 30 | import io.atomix.protocols.raft.proxy.RaftProxy; |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 31 | import org.onlab.util.AbstractAccumulator; |
| 32 | import org.onlab.util.Accumulator; |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 33 | import org.onlab.util.KryoNamespace; |
| 34 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Add; |
| 35 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Complete; |
| 36 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Take; |
| 37 | import org.onosproject.store.serializers.KryoNamespaces; |
| 38 | import org.onosproject.store.service.Serializer; |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 39 | import org.onosproject.store.service.Task; |
Madan Jampani | 819d61d | 2016-07-25 20:29:43 -0700 | [diff] [blame] | 40 | import org.onosproject.store.service.WorkQueue; |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 41 | import org.onosproject.store.service.WorkQueueStats; |
| 42 | import org.slf4j.Logger; |
| 43 | |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 44 | import static java.util.concurrent.Executors.newSingleThreadExecutor; |
| 45 | import static org.onlab.util.Tools.groupedThreads; |
| 46 | import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueEvents.TASK_AVAILABLE; |
| 47 | import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.ADD; |
| 48 | import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.CLEAR; |
| 49 | import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.COMPLETE; |
| 50 | import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.REGISTER; |
| 51 | import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.STATS; |
| 52 | import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.TAKE; |
| 53 | import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.UNREGISTER; |
| 54 | import static org.slf4j.LoggerFactory.getLogger; |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 55 | |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 56 | /** |
| 57 | * Distributed resource providing the {@link WorkQueue} primitive. |
| 58 | */ |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 59 | public class AtomixWorkQueue extends AbstractRaftPrimitive implements WorkQueue<byte[]> { |
| 60 | private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder() |
| 61 | .register(KryoNamespaces.BASIC) |
| 62 | .register(AtomixWorkQueueOperations.NAMESPACE) |
| 63 | .register(AtomixWorkQueueEvents.NAMESPACE) |
| 64 | .build()); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 65 | |
| 66 | private final Logger log = getLogger(getClass()); |
Yuta HIGUCHI | 1624df1 | 2016-07-21 16:54:33 -0700 | [diff] [blame] | 67 | private final ExecutorService executor = newSingleThreadExecutor(groupedThreads("AtomixWorkQueue", "%d", log)); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 68 | private final AtomicReference<TaskProcessor> taskProcessor = new AtomicReference<>(); |
| 69 | private final Timer timer = new Timer("atomix-work-queue-completer"); |
| 70 | private final AtomicBoolean isRegistered = new AtomicBoolean(false); |
| 71 | |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 72 | public AtomixWorkQueue(RaftProxy proxy) { |
| 73 | super(proxy); |
| 74 | proxy.addStateChangeListener(state -> { |
| 75 | if (state == RaftProxy.State.CONNECTED && isRegistered.get()) { |
| 76 | proxy.invoke(REGISTER); |
| 77 | } |
| 78 | }); |
| 79 | proxy.addEventListener(TASK_AVAILABLE, this::resumeWork); |
Madan Jampani | 819d61d | 2016-07-25 20:29:43 -0700 | [diff] [blame] | 80 | } |
| 81 | |
| 82 | @Override |
| 83 | public CompletableFuture<Void> destroy() { |
| 84 | executor.shutdown(); |
| 85 | timer.cancel(); |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 86 | return proxy.invoke(CLEAR); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 87 | } |
| 88 | |
| 89 | @Override |
| 90 | public CompletableFuture<Void> addMultiple(Collection<byte[]> items) { |
| 91 | if (items.isEmpty()) { |
| 92 | return CompletableFuture.completedFuture(null); |
| 93 | } |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 94 | return proxy.invoke(ADD, SERIALIZER::encode, new Add(items)); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 95 | } |
| 96 | |
| 97 | @Override |
| 98 | public CompletableFuture<Collection<Task<byte[]>>> take(int maxTasks) { |
| 99 | if (maxTasks <= 0) { |
| 100 | return CompletableFuture.completedFuture(ImmutableList.of()); |
| 101 | } |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 102 | return proxy.invoke(TAKE, SERIALIZER::encode, new Take(maxTasks), SERIALIZER::decode); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 103 | } |
| 104 | |
| 105 | @Override |
| 106 | public CompletableFuture<Void> complete(Collection<String> taskIds) { |
| 107 | if (taskIds.isEmpty()) { |
| 108 | return CompletableFuture.completedFuture(null); |
| 109 | } |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 110 | return proxy.invoke(COMPLETE, SERIALIZER::encode, new Complete(taskIds)); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 111 | } |
| 112 | |
| 113 | @Override |
| 114 | public CompletableFuture<Void> registerTaskProcessor(Consumer<byte[]> callback, |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 115 | int parallelism, |
| 116 | Executor executor) { |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 117 | Accumulator<String> completedTaskAccumulator = |
| 118 | new CompletedTaskAccumulator(timer, 50, 50); // TODO: make configurable |
| 119 | taskProcessor.set(new TaskProcessor(callback, |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 120 | parallelism, |
| 121 | executor, |
| 122 | completedTaskAccumulator)); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 123 | return register().thenCompose(v -> take(parallelism)) |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 124 | .thenAccept(taskProcessor.get()); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 125 | } |
| 126 | |
| 127 | @Override |
| 128 | public CompletableFuture<Void> stopProcessing() { |
| 129 | return unregister(); |
| 130 | } |
| 131 | |
| 132 | @Override |
| 133 | public CompletableFuture<WorkQueueStats> stats() { |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 134 | return proxy.invoke(STATS, SERIALIZER::decode); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 135 | } |
| 136 | |
| 137 | private void resumeWork() { |
| 138 | TaskProcessor activeProcessor = taskProcessor.get(); |
| 139 | if (activeProcessor == null) { |
| 140 | return; |
| 141 | } |
| 142 | this.take(activeProcessor.headRoom()) |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 143 | .whenCompleteAsync((tasks, e) -> activeProcessor.accept(tasks), executor); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 144 | } |
| 145 | |
| 146 | private CompletableFuture<Void> register() { |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 147 | return proxy.invoke(REGISTER).thenRun(() -> isRegistered.set(true)); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 148 | } |
| 149 | |
| 150 | private CompletableFuture<Void> unregister() { |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 151 | return proxy.invoke(UNREGISTER).thenRun(() -> isRegistered.set(false)); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 152 | } |
| 153 | |
| 154 | // TaskId accumulator for paced triggering of task completion calls. |
| 155 | private class CompletedTaskAccumulator extends AbstractAccumulator<String> { |
| 156 | CompletedTaskAccumulator(Timer timer, int maxTasksToBatch, int maxBatchMillis) { |
| 157 | super(timer, maxTasksToBatch, maxBatchMillis, Integer.MAX_VALUE); |
| 158 | } |
| 159 | |
| 160 | @Override |
| 161 | public void processItems(List<String> items) { |
| 162 | complete(items); |
| 163 | } |
| 164 | } |
| 165 | |
| 166 | private class TaskProcessor implements Consumer<Collection<Task<byte[]>>> { |
| 167 | |
| 168 | private final AtomicInteger headRoom; |
| 169 | private final Consumer<byte[]> backingConsumer; |
| 170 | private final Executor executor; |
| 171 | private final Accumulator<String> taskCompleter; |
| 172 | |
| 173 | public TaskProcessor(Consumer<byte[]> backingConsumer, |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 174 | int parallelism, |
| 175 | Executor executor, |
| 176 | Accumulator<String> taskCompleter) { |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 177 | this.backingConsumer = backingConsumer; |
| 178 | this.headRoom = new AtomicInteger(parallelism); |
| 179 | this.executor = executor; |
| 180 | this.taskCompleter = taskCompleter; |
| 181 | } |
| 182 | |
| 183 | public int headRoom() { |
| 184 | return headRoom.get(); |
| 185 | } |
| 186 | |
| 187 | @Override |
| 188 | public void accept(Collection<Task<byte[]>> tasks) { |
| 189 | if (tasks == null) { |
| 190 | return; |
| 191 | } |
| 192 | headRoom.addAndGet(-1 * tasks.size()); |
| 193 | tasks.forEach(task -> |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 194 | executor.execute(() -> { |
| 195 | try { |
| 196 | backingConsumer.accept(task.payload()); |
| 197 | taskCompleter.add(task.taskId()); |
| 198 | } catch (Exception e) { |
| 199 | log.debug("Task execution failed", e); |
| 200 | } finally { |
| 201 | headRoom.incrementAndGet(); |
| 202 | resumeWork(); |
| 203 | } |
| 204 | })); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 205 | } |
| 206 | } |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame^] | 207 | } |