blob: 6458ec8dadcfe5231fe382717bc0b063c5e0a4f8 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import java.util.ArrayDeque;
19import java.util.ArrayList;
20import java.util.Collection;
21import java.util.HashMap;
22import java.util.Iterator;
23import java.util.Map;
24import java.util.Queue;
25import java.util.Set;
26import java.util.concurrent.atomic.AtomicInteger;
27import java.util.concurrent.atomic.AtomicLong;
28import java.util.stream.Collectors;
29import java.util.stream.IntStream;
30
31import com.google.common.base.MoreObjects;
32import com.google.common.base.Throwables;
33import com.google.common.collect.ImmutableList;
34import com.google.common.collect.Maps;
35import com.google.common.collect.Queues;
36import com.google.common.collect.Sets;
37import io.atomix.protocols.raft.service.AbstractRaftService;
38import io.atomix.protocols.raft.service.Commit;
39import io.atomix.protocols.raft.service.RaftServiceExecutor;
40import io.atomix.protocols.raft.session.RaftSession;
41import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
42import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
43import org.onlab.util.KryoNamespace;
44import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Add;
45import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Complete;
46import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Take;
47import org.onosproject.store.serializers.KryoNamespaces;
48import org.onosproject.store.service.Serializer;
49import org.onosproject.store.service.Task;
50import org.onosproject.store.service.WorkQueueStats;
51
52import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueEvents.TASK_AVAILABLE;
53import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.ADD;
54import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.CLEAR;
55import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.COMPLETE;
56import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.REGISTER;
57import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.STATS;
58import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.TAKE;
59import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.UNREGISTER;
60
61/**
62 * State machine for {@link AtomixWorkQueue} resource.
63 */
64public class AtomixWorkQueueService extends AbstractRaftService {
65
66 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
67 .register(KryoNamespaces.BASIC)
68 .register(AtomixWorkQueueOperations.NAMESPACE)
69 .register(AtomixWorkQueueEvents.NAMESPACE)
70 .register(TaskAssignment.class)
71 .register(new HashMap().keySet().getClass())
72 .register(ArrayDeque.class)
73 .build());
74
75 private final AtomicLong totalCompleted = new AtomicLong(0);
76
77 private Queue<Task<byte[]>> unassignedTasks = Queues.newArrayDeque();
78 private Map<String, TaskAssignment> assignments = Maps.newHashMap();
79 private Map<Long, RaftSession> registeredWorkers = Maps.newHashMap();
80
81 @Override
82 public void snapshot(SnapshotWriter writer) {
83 writer.writeObject(Sets.newHashSet(registeredWorkers.keySet()), SERIALIZER::encode);
84 writer.writeObject(assignments, SERIALIZER::encode);
85 writer.writeObject(unassignedTasks, SERIALIZER::encode);
86 writer.writeLong(totalCompleted.get());
87 }
88
89 @Override
90 public void install(SnapshotReader reader) {
91 registeredWorkers = Maps.newHashMap();
92 for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
93 registeredWorkers.put(sessionId, getSessions().getSession(sessionId));
94 }
95 assignments = reader.readObject(SERIALIZER::decode);
96 unassignedTasks = reader.readObject(SERIALIZER::decode);
97 totalCompleted.set(reader.readLong());
98 }
99
100 @Override
101 protected void configure(RaftServiceExecutor executor) {
102 executor.register(STATS, this::stats, SERIALIZER::encode);
103 executor.register(REGISTER, this::register);
104 executor.register(UNREGISTER, this::unregister);
105 executor.register(ADD, SERIALIZER::decode, this::add);
106 executor.register(TAKE, SERIALIZER::decode, this::take, SERIALIZER::encode);
107 executor.register(COMPLETE, SERIALIZER::decode, this::complete);
108 executor.register(CLEAR, this::clear);
109 }
110
111 protected WorkQueueStats stats(Commit<Void> commit) {
112 return WorkQueueStats.builder()
113 .withTotalCompleted(totalCompleted.get())
114 .withTotalPending(unassignedTasks.size())
115 .withTotalInProgress(assignments.size())
116 .build();
117 }
118
119 protected void clear(Commit<Void> commit) {
120 unassignedTasks.clear();
121 assignments.clear();
122 registeredWorkers.clear();
123 totalCompleted.set(0);
124 }
125
126 protected void register(Commit<Void> commit) {
127 registeredWorkers.put(commit.session().sessionId().id(), commit.session());
128 }
129
130 protected void unregister(Commit<Void> commit) {
131 registeredWorkers.remove(commit.session().sessionId().id());
132 }
133
134 protected void add(Commit<? extends Add> commit) {
135 Collection<byte[]> items = commit.value().items();
136
137 AtomicInteger itemIndex = new AtomicInteger(0);
138 items.forEach(item -> {
139 String taskId = String.format("%d:%d:%d", commit.session().sessionId().id(),
140 commit.index(),
141 itemIndex.getAndIncrement());
142 unassignedTasks.add(new Task<>(taskId, item));
143 });
144
145 // Send an event to all sessions that have expressed interest in task processing
146 // and are not actively processing a task.
147 registeredWorkers.values().forEach(session -> session.publish(TASK_AVAILABLE));
148 // FIXME: This generates a lot of event traffic.
149 }
150
151 protected Collection<Task<byte[]>> take(Commit<? extends Take> commit) {
152 try {
153 if (unassignedTasks.isEmpty()) {
154 return ImmutableList.of();
155 }
156 long sessionId = commit.session().sessionId().id();
157 int maxTasks = commit.value().maxTasks();
158 return IntStream.range(0, Math.min(maxTasks, unassignedTasks.size()))
159 .mapToObj(i -> {
160 Task<byte[]> task = unassignedTasks.poll();
161 String taskId = task.taskId();
162 TaskAssignment assignment = new TaskAssignment(sessionId, task);
163
164 // bookkeeping
165 assignments.put(taskId, assignment);
166
167 return task;
168 })
169 .collect(Collectors.toCollection(ArrayList::new));
170 } catch (Exception e) {
171 getLogger().warn("State machine update failed", e);
172 throw Throwables.propagate(e);
173 }
174 }
175
176 protected void complete(Commit<? extends Complete> commit) {
177 long sessionId = commit.session().sessionId().id();
178 try {
179 commit.value().taskIds().forEach(taskId -> {
180 TaskAssignment assignment = assignments.get(taskId);
181 if (assignment != null && assignment.sessionId() == sessionId) {
182 assignments.remove(taskId);
183 // bookkeeping
184 totalCompleted.incrementAndGet();
185 }
186 });
187 } catch (Exception e) {
188 getLogger().warn("State machine update failed", e);
189 throw Throwables.propagate(e);
190 }
191 }
192
193 @Override
194 public void onExpire(RaftSession session) {
195 evictWorker(session.sessionId().id());
196 }
197
198 @Override
199 public void onClose(RaftSession session) {
200 evictWorker(session.sessionId().id());
201 }
202
203 private void evictWorker(long sessionId) {
204 registeredWorkers.remove(sessionId);
205
206 // TODO: Maintain an index of tasks by session for efficient access.
207 Iterator<Map.Entry<String, TaskAssignment>> iter = assignments.entrySet().iterator();
208 while (iter.hasNext()) {
209 Map.Entry<String, TaskAssignment> entry = iter.next();
210 TaskAssignment assignment = entry.getValue();
211 if (assignment.sessionId() == sessionId) {
212 unassignedTasks.add(assignment.task());
213 iter.remove();
214 }
215 }
216 }
217
218 private static class TaskAssignment {
219 private final long sessionId;
220 private final Task<byte[]> task;
221
222 public TaskAssignment(long sessionId, Task<byte[]> task) {
223 this.sessionId = sessionId;
224 this.task = task;
225 }
226
227 public long sessionId() {
228 return sessionId;
229 }
230
231 public Task<byte[]> task() {
232 return task;
233 }
234
235 @Override
236 public String toString() {
237 return MoreObjects.toStringHelper(getClass())
238 .add("sessionId", sessionId)
239 .add("task", task)
240 .toString();
241 }
242 }
243}