blob: 82f28e88622dfb2740175f21e7031a3a10572870 [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani819d61d2016-07-25 20:29:43 -070019import io.atomix.copycat.server.Commit;
20import io.atomix.copycat.server.Snapshottable;
21import io.atomix.copycat.server.StateMachineExecutor;
22import io.atomix.copycat.server.session.ServerSession;
23import io.atomix.copycat.server.session.SessionListener;
24import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
25import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
26import io.atomix.resource.ResourceStateMachine;
Madan Jampani35708a92016-07-06 10:48:19 -070027
28import java.util.ArrayList;
29import java.util.Collection;
30import java.util.Iterator;
31import java.util.Map;
32import java.util.Properties;
33import java.util.Queue;
34import java.util.concurrent.atomic.AtomicInteger;
35import java.util.concurrent.atomic.AtomicLong;
36import java.util.function.Consumer;
37import java.util.stream.Collectors;
38import java.util.stream.IntStream;
39
40import org.onlab.util.CountDownCompleter;
41import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
Madan Jampani819d61d2016-07-25 20:29:43 -070042import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
Madan Jampani35708a92016-07-06 10:48:19 -070043import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
44import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
45import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
46import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
47import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
48import org.onosproject.store.service.Task;
49import org.onosproject.store.service.WorkQueueStats;
50import org.slf4j.Logger;
51
52import com.google.common.base.MoreObjects;
53import com.google.common.base.Throwables;
54import com.google.common.collect.ImmutableList;
55import com.google.common.collect.Maps;
56import com.google.common.collect.Queues;
57import com.google.common.util.concurrent.AtomicLongMap;
58
Madan Jampani35708a92016-07-06 10:48:19 -070059/**
60 * State machine for {@link AtomixWorkQueue} resource.
61 */
62public class AtomixWorkQueueState extends ResourceStateMachine implements SessionListener, Snapshottable {
63
64 private final Logger log = getLogger(getClass());
65
66 private final AtomicLong totalCompleted = new AtomicLong(0);
67
68 private final Queue<TaskHolder> unassignedTasks = Queues.newArrayDeque();
69 private final Map<String, TaskAssignment> assignments = Maps.newHashMap();
70 private final Map<Long, Commit<? extends Register>> registeredWorkers = Maps.newHashMap();
71 private final AtomicLongMap<Long> activeTasksPerSession = AtomicLongMap.create();
72
73 protected AtomixWorkQueueState(Properties config) {
74 super(config);
75 }
76
77 @Override
78 protected void configure(StateMachineExecutor executor) {
79 executor.register(Stats.class, this::stats);
80 executor.register(Register.class, (Consumer<Commit<Register>>) this::register);
81 executor.register(Unregister.class, (Consumer<Commit<Unregister>>) this::unregister);
82 executor.register(Add.class, (Consumer<Commit<Add>>) this::add);
83 executor.register(Take.class, this::take);
84 executor.register(Complete.class, (Consumer<Commit<Complete>>) this::complete);
Madan Jampani819d61d2016-07-25 20:29:43 -070085 executor.register(Clear.class, (Consumer<Commit<Clear>>) this::clear);
Madan Jampani35708a92016-07-06 10:48:19 -070086 }
87
88 protected WorkQueueStats stats(Commit<? extends Stats> commit) {
89 try {
90 return WorkQueueStats.builder()
91 .withTotalCompleted(totalCompleted.get())
92 .withTotalPending(unassignedTasks.size())
93 .withTotalInProgress(assignments.size())
94 .build();
95 } finally {
96 commit.close();
97 }
98 }
99
Madan Jampani819d61d2016-07-25 20:29:43 -0700100 protected void clear(Commit<? extends Clear> commit) {
Madan Jampanid4684b42016-09-02 22:26:31 -0700101 try {
102 unassignedTasks.forEach(TaskHolder::complete);
103 unassignedTasks.clear();
104 assignments.values().forEach(TaskAssignment::markComplete);
105 assignments.clear();
106 registeredWorkers.values().forEach(Commit::close);
107 registeredWorkers.clear();
108 activeTasksPerSession.clear();
109 totalCompleted.set(0);
110 } finally {
111 commit.close();
112 }
Madan Jampani819d61d2016-07-25 20:29:43 -0700113 }
114
Madan Jampani35708a92016-07-06 10:48:19 -0700115 protected void register(Commit<? extends Register> commit) {
116 long sessionId = commit.session().id();
117 if (registeredWorkers.putIfAbsent(sessionId, commit) != null) {
118 commit.close();
119 }
120 }
121
122 protected void unregister(Commit<? extends Unregister> commit) {
123 try {
124 Commit<? extends Register> registerCommit = registeredWorkers.remove(commit.session().id());
125 if (registerCommit != null) {
126 registerCommit.close();
127 }
128 } finally {
129 commit.close();
130 }
131 }
132
133 protected void add(Commit<? extends Add> commit) {
134 Collection<byte[]> items = commit.operation().items();
135
136 // Create a CountDownCompleter that will close the commit when all tasks
137 // submitted as part of it are completed.
138 CountDownCompleter<Commit<? extends Add>> referenceTracker =
139 new CountDownCompleter<>(commit, items.size(), Commit::close);
140
141 AtomicInteger itemIndex = new AtomicInteger(0);
142 items.forEach(item -> {
143 String taskId = String.format("%d:%d:%d", commit.session().id(),
144 commit.index(),
145 itemIndex.getAndIncrement());
146 unassignedTasks.add(new TaskHolder(new Task<>(taskId, item), referenceTracker));
147 });
148
149 // Send an event to all sessions that have expressed interest in task processing
150 // and are not actively processing a task.
151 registeredWorkers.values()
152 .stream()
153 .map(Commit::session)
154 .forEach(session -> session.publish(AtomixWorkQueue.TASK_AVAILABLE));
155 // FIXME: This generates a lot of event traffic.
156 }
157
158 protected Collection<Task<byte[]>> take(Commit<? extends Take> commit) {
159 try {
160 if (unassignedTasks.isEmpty()) {
161 return ImmutableList.of();
162 }
163 long sessionId = commit.session().id();
164 int maxTasks = commit.operation().maxTasks();
165 return IntStream.range(0, Math.min(maxTasks, unassignedTasks.size()))
166 .mapToObj(i -> {
167 TaskHolder holder = unassignedTasks.poll();
168 String taskId = holder.task().taskId();
169 TaskAssignment assignment = new TaskAssignment(sessionId, holder);
170
171 // bookkeeping
172 assignments.put(taskId, assignment);
173 activeTasksPerSession.incrementAndGet(sessionId);
174
175 return holder.task();
176 })
177 .collect(Collectors.toCollection(ArrayList::new));
178 } catch (Exception e) {
179 log.warn("State machine update failed", e);
180 throw Throwables.propagate(e);
181 } finally {
182 commit.close();
183 }
184 }
185
186 protected void complete(Commit<? extends Complete> commit) {
187 long sessionId = commit.session().id();
188 try {
189 commit.operation().taskIds().forEach(taskId -> {
190 TaskAssignment assignment = assignments.get(taskId);
Madan Jampani819d61d2016-07-25 20:29:43 -0700191 if (assignment != null && assignment.sessionId() == sessionId) {
Madan Jampani35708a92016-07-06 10:48:19 -0700192 assignments.remove(taskId).markComplete();
193 // bookkeeping
194 totalCompleted.incrementAndGet();
195 activeTasksPerSession.decrementAndGet(sessionId);
196 }
197 });
198 } catch (Exception e) {
199 log.warn("State machine update failed", e);
200 throw Throwables.propagate(e);
201 } finally {
202 commit.close();
203 }
204 }
205
206 @Override
207 public void register(ServerSession session) {
208 }
209
210 @Override
211 public void unregister(ServerSession session) {
212 evictWorker(session.id());
213 }
214
215 @Override
216 public void expire(ServerSession session) {
217 evictWorker(session.id());
218 }
219
220 @Override
221 public void close(ServerSession session) {
222 evictWorker(session.id());
223 }
224
225 @Override
226 public void snapshot(SnapshotWriter writer) {
227 writer.writeLong(totalCompleted.get());
228 }
229
230 @Override
231 public void install(SnapshotReader reader) {
232 totalCompleted.set(reader.readLong());
233 }
234
235 private void evictWorker(long sessionId) {
236 Commit<? extends Register> commit = registeredWorkers.remove(sessionId);
237 if (commit != null) {
238 commit.close();
239 }
240
241 // TODO: Maintain an index of tasks by session for efficient access.
242 Iterator<Map.Entry<String, TaskAssignment>> iter = assignments.entrySet().iterator();
243 while (iter.hasNext()) {
244 Map.Entry<String, TaskAssignment> entry = iter.next();
245 TaskAssignment assignment = entry.getValue();
246 if (assignment.sessionId() == sessionId) {
247 unassignedTasks.add(assignment.taskHolder());
248 iter.remove();
249 }
250 }
251
252 // Bookkeeping
253 activeTasksPerSession.remove(sessionId);
254 activeTasksPerSession.removeAllZeros();
255 }
256
257 private class TaskHolder {
258
259 private final Task<byte[]> task;
260 private final CountDownCompleter<Commit<? extends Add>> referenceTracker;
261
262 public TaskHolder(Task<byte[]> delegate, CountDownCompleter<Commit<? extends Add>> referenceTracker) {
263 this.task = delegate;
264 this.referenceTracker = referenceTracker;
265 }
266
267 public Task<byte[]> task() {
268 return task;
269 }
270
271 public void complete() {
272 referenceTracker.countDown();
273 }
274 }
275
276 private class TaskAssignment {
277 private final long sessionId;
278 private final TaskHolder taskHolder;
279
280 public TaskAssignment(long sessionId, TaskHolder taskHolder) {
281 this.sessionId = sessionId;
282 this.taskHolder = taskHolder;
283 }
284
285 public long sessionId() {
286 return sessionId;
287 }
288
289 public TaskHolder taskHolder() {
290 return taskHolder;
291 }
292
293 public void markComplete() {
294 taskHolder.complete();
295 }
296
297 @Override
298 public String toString() {
299 return MoreObjects.toStringHelper(getClass())
300 .add("sessionId", sessionId)
301 .add("taskHolder", taskHolder)
302 .toString();
303 }
304 }
305}