blob: b226860ca44d1cb83a23464431455c084e80c669 [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani819d61d2016-07-25 20:29:43 -070019import io.atomix.copycat.server.Commit;
20import io.atomix.copycat.server.Snapshottable;
21import io.atomix.copycat.server.StateMachineExecutor;
22import io.atomix.copycat.server.session.ServerSession;
23import io.atomix.copycat.server.session.SessionListener;
24import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
25import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
26import io.atomix.resource.ResourceStateMachine;
Madan Jampani35708a92016-07-06 10:48:19 -070027
28import java.util.ArrayList;
29import java.util.Collection;
30import java.util.Iterator;
31import java.util.Map;
32import java.util.Properties;
33import java.util.Queue;
34import java.util.concurrent.atomic.AtomicInteger;
35import java.util.concurrent.atomic.AtomicLong;
36import java.util.function.Consumer;
37import java.util.stream.Collectors;
38import java.util.stream.IntStream;
39
40import org.onlab.util.CountDownCompleter;
41import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
Madan Jampani819d61d2016-07-25 20:29:43 -070042import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
Madan Jampani35708a92016-07-06 10:48:19 -070043import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
44import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
45import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
46import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
47import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
48import org.onosproject.store.service.Task;
49import org.onosproject.store.service.WorkQueueStats;
50import org.slf4j.Logger;
51
52import com.google.common.base.MoreObjects;
53import com.google.common.base.Throwables;
54import com.google.common.collect.ImmutableList;
55import com.google.common.collect.Maps;
56import com.google.common.collect.Queues;
57import com.google.common.util.concurrent.AtomicLongMap;
58
Madan Jampani35708a92016-07-06 10:48:19 -070059/**
60 * State machine for {@link AtomixWorkQueue} resource.
61 */
62public class AtomixWorkQueueState extends ResourceStateMachine implements SessionListener, Snapshottable {
63
64 private final Logger log = getLogger(getClass());
65
66 private final AtomicLong totalCompleted = new AtomicLong(0);
67
68 private final Queue<TaskHolder> unassignedTasks = Queues.newArrayDeque();
69 private final Map<String, TaskAssignment> assignments = Maps.newHashMap();
70 private final Map<Long, Commit<? extends Register>> registeredWorkers = Maps.newHashMap();
71 private final AtomicLongMap<Long> activeTasksPerSession = AtomicLongMap.create();
72
73 protected AtomixWorkQueueState(Properties config) {
74 super(config);
75 }
76
77 @Override
78 protected void configure(StateMachineExecutor executor) {
79 executor.register(Stats.class, this::stats);
80 executor.register(Register.class, (Consumer<Commit<Register>>) this::register);
81 executor.register(Unregister.class, (Consumer<Commit<Unregister>>) this::unregister);
82 executor.register(Add.class, (Consumer<Commit<Add>>) this::add);
83 executor.register(Take.class, this::take);
84 executor.register(Complete.class, (Consumer<Commit<Complete>>) this::complete);
Madan Jampani819d61d2016-07-25 20:29:43 -070085 executor.register(Clear.class, (Consumer<Commit<Clear>>) this::clear);
Madan Jampani35708a92016-07-06 10:48:19 -070086 }
87
88 protected WorkQueueStats stats(Commit<? extends Stats> commit) {
89 try {
90 return WorkQueueStats.builder()
91 .withTotalCompleted(totalCompleted.get())
92 .withTotalPending(unassignedTasks.size())
93 .withTotalInProgress(assignments.size())
94 .build();
95 } finally {
96 commit.close();
97 }
98 }
99
Madan Jampani819d61d2016-07-25 20:29:43 -0700100 protected void clear(Commit<? extends Clear> commit) {
101 unassignedTasks.forEach(TaskHolder::complete);
102 unassignedTasks.clear();
103 assignments.values().forEach(TaskAssignment::markComplete);
104 assignments.clear();
105 registeredWorkers.values().forEach(Commit::close);
106 registeredWorkers.clear();
107 activeTasksPerSession.clear();
108 totalCompleted.set(0);
109 }
110
Madan Jampani35708a92016-07-06 10:48:19 -0700111 protected void register(Commit<? extends Register> commit) {
112 long sessionId = commit.session().id();
113 if (registeredWorkers.putIfAbsent(sessionId, commit) != null) {
114 commit.close();
115 }
116 }
117
118 protected void unregister(Commit<? extends Unregister> commit) {
119 try {
120 Commit<? extends Register> registerCommit = registeredWorkers.remove(commit.session().id());
121 if (registerCommit != null) {
122 registerCommit.close();
123 }
124 } finally {
125 commit.close();
126 }
127 }
128
129 protected void add(Commit<? extends Add> commit) {
130 Collection<byte[]> items = commit.operation().items();
131
132 // Create a CountDownCompleter that will close the commit when all tasks
133 // submitted as part of it are completed.
134 CountDownCompleter<Commit<? extends Add>> referenceTracker =
135 new CountDownCompleter<>(commit, items.size(), Commit::close);
136
137 AtomicInteger itemIndex = new AtomicInteger(0);
138 items.forEach(item -> {
139 String taskId = String.format("%d:%d:%d", commit.session().id(),
140 commit.index(),
141 itemIndex.getAndIncrement());
142 unassignedTasks.add(new TaskHolder(new Task<>(taskId, item), referenceTracker));
143 });
144
145 // Send an event to all sessions that have expressed interest in task processing
146 // and are not actively processing a task.
147 registeredWorkers.values()
148 .stream()
149 .map(Commit::session)
150 .forEach(session -> session.publish(AtomixWorkQueue.TASK_AVAILABLE));
151 // FIXME: This generates a lot of event traffic.
152 }
153
154 protected Collection<Task<byte[]>> take(Commit<? extends Take> commit) {
155 try {
156 if (unassignedTasks.isEmpty()) {
157 return ImmutableList.of();
158 }
159 long sessionId = commit.session().id();
160 int maxTasks = commit.operation().maxTasks();
161 return IntStream.range(0, Math.min(maxTasks, unassignedTasks.size()))
162 .mapToObj(i -> {
163 TaskHolder holder = unassignedTasks.poll();
164 String taskId = holder.task().taskId();
165 TaskAssignment assignment = new TaskAssignment(sessionId, holder);
166
167 // bookkeeping
168 assignments.put(taskId, assignment);
169 activeTasksPerSession.incrementAndGet(sessionId);
170
171 return holder.task();
172 })
173 .collect(Collectors.toCollection(ArrayList::new));
174 } catch (Exception e) {
175 log.warn("State machine update failed", e);
176 throw Throwables.propagate(e);
177 } finally {
178 commit.close();
179 }
180 }
181
182 protected void complete(Commit<? extends Complete> commit) {
183 long sessionId = commit.session().id();
184 try {
185 commit.operation().taskIds().forEach(taskId -> {
186 TaskAssignment assignment = assignments.get(taskId);
Madan Jampani819d61d2016-07-25 20:29:43 -0700187 if (assignment != null && assignment.sessionId() == sessionId) {
Madan Jampani35708a92016-07-06 10:48:19 -0700188 assignments.remove(taskId).markComplete();
189 // bookkeeping
190 totalCompleted.incrementAndGet();
191 activeTasksPerSession.decrementAndGet(sessionId);
192 }
193 });
194 } catch (Exception e) {
195 log.warn("State machine update failed", e);
196 throw Throwables.propagate(e);
197 } finally {
198 commit.close();
199 }
200 }
201
202 @Override
203 public void register(ServerSession session) {
204 }
205
206 @Override
207 public void unregister(ServerSession session) {
208 evictWorker(session.id());
209 }
210
211 @Override
212 public void expire(ServerSession session) {
213 evictWorker(session.id());
214 }
215
216 @Override
217 public void close(ServerSession session) {
218 evictWorker(session.id());
219 }
220
221 @Override
222 public void snapshot(SnapshotWriter writer) {
223 writer.writeLong(totalCompleted.get());
224 }
225
226 @Override
227 public void install(SnapshotReader reader) {
228 totalCompleted.set(reader.readLong());
229 }
230
231 private void evictWorker(long sessionId) {
232 Commit<? extends Register> commit = registeredWorkers.remove(sessionId);
233 if (commit != null) {
234 commit.close();
235 }
236
237 // TODO: Maintain an index of tasks by session for efficient access.
238 Iterator<Map.Entry<String, TaskAssignment>> iter = assignments.entrySet().iterator();
239 while (iter.hasNext()) {
240 Map.Entry<String, TaskAssignment> entry = iter.next();
241 TaskAssignment assignment = entry.getValue();
242 if (assignment.sessionId() == sessionId) {
243 unassignedTasks.add(assignment.taskHolder());
244 iter.remove();
245 }
246 }
247
248 // Bookkeeping
249 activeTasksPerSession.remove(sessionId);
250 activeTasksPerSession.removeAllZeros();
251 }
252
253 private class TaskHolder {
254
255 private final Task<byte[]> task;
256 private final CountDownCompleter<Commit<? extends Add>> referenceTracker;
257
258 public TaskHolder(Task<byte[]> delegate, CountDownCompleter<Commit<? extends Add>> referenceTracker) {
259 this.task = delegate;
260 this.referenceTracker = referenceTracker;
261 }
262
263 public Task<byte[]> task() {
264 return task;
265 }
266
267 public void complete() {
268 referenceTracker.countDown();
269 }
270 }
271
272 private class TaskAssignment {
273 private final long sessionId;
274 private final TaskHolder taskHolder;
275
276 public TaskAssignment(long sessionId, TaskHolder taskHolder) {
277 this.sessionId = sessionId;
278 this.taskHolder = taskHolder;
279 }
280
281 public long sessionId() {
282 return sessionId;
283 }
284
285 public TaskHolder taskHolder() {
286 return taskHolder;
287 }
288
289 public void markComplete() {
290 taskHolder.complete();
291 }
292
293 @Override
294 public String toString() {
295 return MoreObjects.toStringHelper(getClass())
296 .add("sessionId", sessionId)
297 .add("taskHolder", taskHolder)
298 .toString();
299 }
300 }
301}