Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016-present Open Networking Laboratory |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.onosproject.store.primitives.resources.impl; |
| 17 | |
Yuta HIGUCHI | 1624df1 | 2016-07-21 16:54:33 -0700 | [diff] [blame] | 18 | import static java.util.concurrent.Executors.newSingleThreadExecutor; |
| 19 | import static org.onlab.util.Tools.groupedThreads; |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 20 | import static org.slf4j.LoggerFactory.getLogger; |
Madan Jampani | 819d61d | 2016-07-25 20:29:43 -0700 | [diff] [blame] | 21 | import io.atomix.copycat.client.CopycatClient; |
| 22 | import io.atomix.resource.AbstractResource; |
| 23 | import io.atomix.resource.ResourceTypeInfo; |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 24 | |
| 25 | import java.util.Collection; |
| 26 | import java.util.List; |
| 27 | import java.util.Properties; |
| 28 | import java.util.Timer; |
| 29 | import java.util.concurrent.CompletableFuture; |
| 30 | import java.util.concurrent.Executor; |
| 31 | import java.util.concurrent.ExecutorService; |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 32 | import java.util.concurrent.atomic.AtomicBoolean; |
| 33 | import java.util.concurrent.atomic.AtomicInteger; |
| 34 | import java.util.concurrent.atomic.AtomicReference; |
| 35 | import java.util.function.Consumer; |
| 36 | |
| 37 | import org.onlab.util.AbstractAccumulator; |
| 38 | import org.onlab.util.Accumulator; |
| 39 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add; |
Madan Jampani | 819d61d | 2016-07-25 20:29:43 -0700 | [diff] [blame] | 40 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear; |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 41 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete; |
| 42 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register; |
| 43 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats; |
| 44 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take; |
| 45 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister; |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 46 | import org.onosproject.store.service.Task; |
Madan Jampani | 819d61d | 2016-07-25 20:29:43 -0700 | [diff] [blame] | 47 | import org.onosproject.store.service.WorkQueue; |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 48 | import org.onosproject.store.service.WorkQueueStats; |
| 49 | import org.slf4j.Logger; |
| 50 | |
| 51 | import com.google.common.collect.ImmutableList; |
| 52 | |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 53 | /** |
| 54 | * Distributed resource providing the {@link WorkQueue} primitive. |
| 55 | */ |
| 56 | @ResourceTypeInfo(id = -154, factory = AtomixWorkQueueFactory.class) |
| 57 | public class AtomixWorkQueue extends AbstractResource<AtomixWorkQueue> |
| 58 | implements WorkQueue<byte[]> { |
| 59 | |
| 60 | private final Logger log = getLogger(getClass()); |
| 61 | public static final String TASK_AVAILABLE = "task-available"; |
Yuta HIGUCHI | 1624df1 | 2016-07-21 16:54:33 -0700 | [diff] [blame] | 62 | private final ExecutorService executor = newSingleThreadExecutor(groupedThreads("AtomixWorkQueue", "%d", log)); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 63 | private final AtomicReference<TaskProcessor> taskProcessor = new AtomicReference<>(); |
| 64 | private final Timer timer = new Timer("atomix-work-queue-completer"); |
| 65 | private final AtomicBoolean isRegistered = new AtomicBoolean(false); |
| 66 | |
| 67 | protected AtomixWorkQueue(CopycatClient client, Properties options) { |
| 68 | super(client, options); |
| 69 | } |
| 70 | |
| 71 | @Override |
Madan Jampani | 819d61d | 2016-07-25 20:29:43 -0700 | [diff] [blame] | 72 | public String name() { |
| 73 | return null; |
| 74 | } |
| 75 | |
| 76 | @Override |
| 77 | public CompletableFuture<Void> destroy() { |
| 78 | executor.shutdown(); |
| 79 | timer.cancel(); |
| 80 | return client.submit(new Clear()); |
| 81 | } |
| 82 | |
| 83 | @Override |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 84 | public CompletableFuture<AtomixWorkQueue> open() { |
| 85 | return super.open().thenApply(result -> { |
| 86 | client.onStateChange(state -> { |
| 87 | if (state == CopycatClient.State.CONNECTED && isRegistered.get()) { |
| 88 | client.submit(new Register()); |
| 89 | } |
| 90 | }); |
| 91 | client.onEvent(TASK_AVAILABLE, this::resumeWork); |
| 92 | return result; |
| 93 | }); |
| 94 | } |
| 95 | |
| 96 | @Override |
| 97 | public CompletableFuture<Void> addMultiple(Collection<byte[]> items) { |
| 98 | if (items.isEmpty()) { |
| 99 | return CompletableFuture.completedFuture(null); |
| 100 | } |
| 101 | return client.submit(new Add(items)); |
| 102 | } |
| 103 | |
| 104 | @Override |
| 105 | public CompletableFuture<Collection<Task<byte[]>>> take(int maxTasks) { |
| 106 | if (maxTasks <= 0) { |
| 107 | return CompletableFuture.completedFuture(ImmutableList.of()); |
| 108 | } |
| 109 | return client.submit(new Take(maxTasks)); |
| 110 | } |
| 111 | |
| 112 | @Override |
| 113 | public CompletableFuture<Void> complete(Collection<String> taskIds) { |
| 114 | if (taskIds.isEmpty()) { |
| 115 | return CompletableFuture.completedFuture(null); |
| 116 | } |
| 117 | return client.submit(new Complete(taskIds)); |
| 118 | } |
| 119 | |
| 120 | @Override |
| 121 | public CompletableFuture<Void> registerTaskProcessor(Consumer<byte[]> callback, |
| 122 | int parallelism, |
| 123 | Executor executor) { |
| 124 | Accumulator<String> completedTaskAccumulator = |
| 125 | new CompletedTaskAccumulator(timer, 50, 50); // TODO: make configurable |
| 126 | taskProcessor.set(new TaskProcessor(callback, |
| 127 | parallelism, |
| 128 | executor, |
| 129 | completedTaskAccumulator)); |
| 130 | return register().thenCompose(v -> take(parallelism)) |
Sho SHIMIZU | fad6dbc | 2016-08-12 15:43:56 -0700 | [diff] [blame] | 131 | .thenAccept(taskProcessor.get()); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 132 | } |
| 133 | |
| 134 | @Override |
| 135 | public CompletableFuture<Void> stopProcessing() { |
| 136 | return unregister(); |
| 137 | } |
| 138 | |
| 139 | @Override |
| 140 | public CompletableFuture<WorkQueueStats> stats() { |
| 141 | return client.submit(new Stats()); |
| 142 | } |
| 143 | |
| 144 | private void resumeWork() { |
| 145 | TaskProcessor activeProcessor = taskProcessor.get(); |
| 146 | if (activeProcessor == null) { |
| 147 | return; |
| 148 | } |
| 149 | this.take(activeProcessor.headRoom()) |
| 150 | .whenCompleteAsync((tasks, e) -> activeProcessor.accept(tasks), executor); |
| 151 | } |
| 152 | |
| 153 | private CompletableFuture<Void> register() { |
| 154 | return client.submit(new Register()).thenRun(() -> isRegistered.set(true)); |
| 155 | } |
| 156 | |
| 157 | private CompletableFuture<Void> unregister() { |
| 158 | return client.submit(new Unregister()).thenRun(() -> isRegistered.set(false)); |
| 159 | } |
| 160 | |
| 161 | // TaskId accumulator for paced triggering of task completion calls. |
| 162 | private class CompletedTaskAccumulator extends AbstractAccumulator<String> { |
| 163 | CompletedTaskAccumulator(Timer timer, int maxTasksToBatch, int maxBatchMillis) { |
| 164 | super(timer, maxTasksToBatch, maxBatchMillis, Integer.MAX_VALUE); |
| 165 | } |
| 166 | |
| 167 | @Override |
| 168 | public void processItems(List<String> items) { |
| 169 | complete(items); |
| 170 | } |
| 171 | } |
| 172 | |
| 173 | private class TaskProcessor implements Consumer<Collection<Task<byte[]>>> { |
| 174 | |
| 175 | private final AtomicInteger headRoom; |
| 176 | private final Consumer<byte[]> backingConsumer; |
| 177 | private final Executor executor; |
| 178 | private final Accumulator<String> taskCompleter; |
| 179 | |
| 180 | public TaskProcessor(Consumer<byte[]> backingConsumer, |
| 181 | int parallelism, |
| 182 | Executor executor, |
| 183 | Accumulator<String> taskCompleter) { |
| 184 | this.backingConsumer = backingConsumer; |
| 185 | this.headRoom = new AtomicInteger(parallelism); |
| 186 | this.executor = executor; |
| 187 | this.taskCompleter = taskCompleter; |
| 188 | } |
| 189 | |
| 190 | public int headRoom() { |
| 191 | return headRoom.get(); |
| 192 | } |
| 193 | |
| 194 | @Override |
| 195 | public void accept(Collection<Task<byte[]>> tasks) { |
| 196 | if (tasks == null) { |
| 197 | return; |
| 198 | } |
| 199 | headRoom.addAndGet(-1 * tasks.size()); |
| 200 | tasks.forEach(task -> |
| 201 | executor.execute(() -> { |
| 202 | try { |
| 203 | backingConsumer.accept(task.payload()); |
| 204 | taskCompleter.add(task.taskId()); |
| 205 | } catch (Exception e) { |
| 206 | log.debug("Task execution failed", e); |
| 207 | } finally { |
| 208 | headRoom.incrementAndGet(); |
| 209 | resumeWork(); |
| 210 | } |
| 211 | })); |
| 212 | } |
| 213 | } |
| 214 | } |