blob: d287e19ed2085637a4fb665f76468cfc76559a4c [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
19
20import java.util.ArrayList;
21import java.util.Collection;
22import java.util.Iterator;
23import java.util.Map;
24import java.util.Properties;
25import java.util.Queue;
26import java.util.concurrent.atomic.AtomicInteger;
27import java.util.concurrent.atomic.AtomicLong;
28import java.util.function.Consumer;
29import java.util.stream.Collectors;
30import java.util.stream.IntStream;
31
32import org.onlab.util.CountDownCompleter;
33import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
34import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
35import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
36import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
37import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
38import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
39import org.onosproject.store.service.Task;
40import org.onosproject.store.service.WorkQueueStats;
41import org.slf4j.Logger;
42
43import com.google.common.base.MoreObjects;
44import com.google.common.base.Throwables;
45import com.google.common.collect.ImmutableList;
46import com.google.common.collect.Maps;
47import com.google.common.collect.Queues;
48import com.google.common.util.concurrent.AtomicLongMap;
49
50import io.atomix.copycat.server.Commit;
51import io.atomix.copycat.server.Snapshottable;
52import io.atomix.copycat.server.StateMachineExecutor;
53import io.atomix.copycat.server.session.ServerSession;
54import io.atomix.copycat.server.session.SessionListener;
55import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
56import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
57import io.atomix.resource.ResourceStateMachine;
58
59/**
60 * State machine for {@link AtomixWorkQueue} resource.
61 */
62public class AtomixWorkQueueState extends ResourceStateMachine implements SessionListener, Snapshottable {
63
64 private final Logger log = getLogger(getClass());
65
66 private final AtomicLong totalCompleted = new AtomicLong(0);
67
68 private final Queue<TaskHolder> unassignedTasks = Queues.newArrayDeque();
69 private final Map<String, TaskAssignment> assignments = Maps.newHashMap();
70 private final Map<Long, Commit<? extends Register>> registeredWorkers = Maps.newHashMap();
71 private final AtomicLongMap<Long> activeTasksPerSession = AtomicLongMap.create();
72
73 protected AtomixWorkQueueState(Properties config) {
74 super(config);
75 }
76
77 @Override
78 protected void configure(StateMachineExecutor executor) {
79 executor.register(Stats.class, this::stats);
80 executor.register(Register.class, (Consumer<Commit<Register>>) this::register);
81 executor.register(Unregister.class, (Consumer<Commit<Unregister>>) this::unregister);
82 executor.register(Add.class, (Consumer<Commit<Add>>) this::add);
83 executor.register(Take.class, this::take);
84 executor.register(Complete.class, (Consumer<Commit<Complete>>) this::complete);
85 }
86
87 protected WorkQueueStats stats(Commit<? extends Stats> commit) {
88 try {
89 return WorkQueueStats.builder()
90 .withTotalCompleted(totalCompleted.get())
91 .withTotalPending(unassignedTasks.size())
92 .withTotalInProgress(assignments.size())
93 .build();
94 } finally {
95 commit.close();
96 }
97 }
98
99 protected void register(Commit<? extends Register> commit) {
100 long sessionId = commit.session().id();
101 if (registeredWorkers.putIfAbsent(sessionId, commit) != null) {
102 commit.close();
103 }
104 }
105
106 protected void unregister(Commit<? extends Unregister> commit) {
107 try {
108 Commit<? extends Register> registerCommit = registeredWorkers.remove(commit.session().id());
109 if (registerCommit != null) {
110 registerCommit.close();
111 }
112 } finally {
113 commit.close();
114 }
115 }
116
117 protected void add(Commit<? extends Add> commit) {
118 Collection<byte[]> items = commit.operation().items();
119
120 // Create a CountDownCompleter that will close the commit when all tasks
121 // submitted as part of it are completed.
122 CountDownCompleter<Commit<? extends Add>> referenceTracker =
123 new CountDownCompleter<>(commit, items.size(), Commit::close);
124
125 AtomicInteger itemIndex = new AtomicInteger(0);
126 items.forEach(item -> {
127 String taskId = String.format("%d:%d:%d", commit.session().id(),
128 commit.index(),
129 itemIndex.getAndIncrement());
130 unassignedTasks.add(new TaskHolder(new Task<>(taskId, item), referenceTracker));
131 });
132
133 // Send an event to all sessions that have expressed interest in task processing
134 // and are not actively processing a task.
135 registeredWorkers.values()
136 .stream()
137 .map(Commit::session)
138 .forEach(session -> session.publish(AtomixWorkQueue.TASK_AVAILABLE));
139 // FIXME: This generates a lot of event traffic.
140 }
141
142 protected Collection<Task<byte[]>> take(Commit<? extends Take> commit) {
143 try {
144 if (unassignedTasks.isEmpty()) {
145 return ImmutableList.of();
146 }
147 long sessionId = commit.session().id();
148 int maxTasks = commit.operation().maxTasks();
149 return IntStream.range(0, Math.min(maxTasks, unassignedTasks.size()))
150 .mapToObj(i -> {
151 TaskHolder holder = unassignedTasks.poll();
152 String taskId = holder.task().taskId();
153 TaskAssignment assignment = new TaskAssignment(sessionId, holder);
154
155 // bookkeeping
156 assignments.put(taskId, assignment);
157 activeTasksPerSession.incrementAndGet(sessionId);
158
159 return holder.task();
160 })
161 .collect(Collectors.toCollection(ArrayList::new));
162 } catch (Exception e) {
163 log.warn("State machine update failed", e);
164 throw Throwables.propagate(e);
165 } finally {
166 commit.close();
167 }
168 }
169
170 protected void complete(Commit<? extends Complete> commit) {
171 long sessionId = commit.session().id();
172 try {
173 commit.operation().taskIds().forEach(taskId -> {
174 TaskAssignment assignment = assignments.get(taskId);
175 if (assignment != null) {
176 assignments.remove(taskId).markComplete();
177 // bookkeeping
178 totalCompleted.incrementAndGet();
179 activeTasksPerSession.decrementAndGet(sessionId);
180 }
181 });
182 } catch (Exception e) {
183 log.warn("State machine update failed", e);
184 throw Throwables.propagate(e);
185 } finally {
186 commit.close();
187 }
188 }
189
190 @Override
191 public void register(ServerSession session) {
192 }
193
194 @Override
195 public void unregister(ServerSession session) {
196 evictWorker(session.id());
197 }
198
199 @Override
200 public void expire(ServerSession session) {
201 evictWorker(session.id());
202 }
203
204 @Override
205 public void close(ServerSession session) {
206 evictWorker(session.id());
207 }
208
209 @Override
210 public void snapshot(SnapshotWriter writer) {
211 writer.writeLong(totalCompleted.get());
212 }
213
214 @Override
215 public void install(SnapshotReader reader) {
216 totalCompleted.set(reader.readLong());
217 }
218
219 private void evictWorker(long sessionId) {
220 Commit<? extends Register> commit = registeredWorkers.remove(sessionId);
221 if (commit != null) {
222 commit.close();
223 }
224
225 // TODO: Maintain an index of tasks by session for efficient access.
226 Iterator<Map.Entry<String, TaskAssignment>> iter = assignments.entrySet().iterator();
227 while (iter.hasNext()) {
228 Map.Entry<String, TaskAssignment> entry = iter.next();
229 TaskAssignment assignment = entry.getValue();
230 if (assignment.sessionId() == sessionId) {
231 unassignedTasks.add(assignment.taskHolder());
232 iter.remove();
233 }
234 }
235
236 // Bookkeeping
237 activeTasksPerSession.remove(sessionId);
238 activeTasksPerSession.removeAllZeros();
239 }
240
241 private class TaskHolder {
242
243 private final Task<byte[]> task;
244 private final CountDownCompleter<Commit<? extends Add>> referenceTracker;
245
246 public TaskHolder(Task<byte[]> delegate, CountDownCompleter<Commit<? extends Add>> referenceTracker) {
247 this.task = delegate;
248 this.referenceTracker = referenceTracker;
249 }
250
251 public Task<byte[]> task() {
252 return task;
253 }
254
255 public void complete() {
256 referenceTracker.countDown();
257 }
258 }
259
260 private class TaskAssignment {
261 private final long sessionId;
262 private final TaskHolder taskHolder;
263
264 public TaskAssignment(long sessionId, TaskHolder taskHolder) {
265 this.sessionId = sessionId;
266 this.taskHolder = taskHolder;
267 }
268
269 public long sessionId() {
270 return sessionId;
271 }
272
273 public TaskHolder taskHolder() {
274 return taskHolder;
275 }
276
277 public void markComplete() {
278 taskHolder.complete();
279 }
280
281 @Override
282 public String toString() {
283 return MoreObjects.toStringHelper(getClass())
284 .add("sessionId", sessionId)
285 .add("taskHolder", taskHolder)
286 .toString();
287 }
288 }
289}