blob: 4540aa517bb3c50c1624c104eaaa939557fcb8e0 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import java.util.ArrayDeque;
19import java.util.ArrayList;
20import java.util.Collection;
21import java.util.HashMap;
22import java.util.Iterator;
23import java.util.Map;
24import java.util.Queue;
25import java.util.Set;
26import java.util.concurrent.atomic.AtomicInteger;
27import java.util.concurrent.atomic.AtomicLong;
28import java.util.stream.Collectors;
29import java.util.stream.IntStream;
30
31import com.google.common.base.MoreObjects;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070032import com.google.common.collect.ImmutableList;
33import com.google.common.collect.Maps;
34import com.google.common.collect.Queues;
35import com.google.common.collect.Sets;
36import io.atomix.protocols.raft.service.AbstractRaftService;
37import io.atomix.protocols.raft.service.Commit;
38import io.atomix.protocols.raft.service.RaftServiceExecutor;
39import io.atomix.protocols.raft.session.RaftSession;
40import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
41import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
42import org.onlab.util.KryoNamespace;
43import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Add;
44import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Complete;
45import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Take;
46import org.onosproject.store.serializers.KryoNamespaces;
47import org.onosproject.store.service.Serializer;
48import org.onosproject.store.service.Task;
49import org.onosproject.store.service.WorkQueueStats;
50
51import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueEvents.TASK_AVAILABLE;
52import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.ADD;
53import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.CLEAR;
54import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.COMPLETE;
55import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.REGISTER;
56import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.STATS;
57import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.TAKE;
58import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.UNREGISTER;
59
60/**
61 * State machine for {@link AtomixWorkQueue} resource.
62 */
63public class AtomixWorkQueueService extends AbstractRaftService {
64
65 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
66 .register(KryoNamespaces.BASIC)
67 .register(AtomixWorkQueueOperations.NAMESPACE)
68 .register(AtomixWorkQueueEvents.NAMESPACE)
69 .register(TaskAssignment.class)
70 .register(new HashMap().keySet().getClass())
71 .register(ArrayDeque.class)
72 .build());
73
74 private final AtomicLong totalCompleted = new AtomicLong(0);
75
76 private Queue<Task<byte[]>> unassignedTasks = Queues.newArrayDeque();
77 private Map<String, TaskAssignment> assignments = Maps.newHashMap();
78 private Map<Long, RaftSession> registeredWorkers = Maps.newHashMap();
79
80 @Override
81 public void snapshot(SnapshotWriter writer) {
82 writer.writeObject(Sets.newHashSet(registeredWorkers.keySet()), SERIALIZER::encode);
83 writer.writeObject(assignments, SERIALIZER::encode);
84 writer.writeObject(unassignedTasks, SERIALIZER::encode);
85 writer.writeLong(totalCompleted.get());
86 }
87
88 @Override
89 public void install(SnapshotReader reader) {
90 registeredWorkers = Maps.newHashMap();
91 for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -070092 registeredWorkers.put(sessionId, sessions().getSession(sessionId));
Jordan Halterman2bf177c2017-06-29 01:49:08 -070093 }
94 assignments = reader.readObject(SERIALIZER::decode);
95 unassignedTasks = reader.readObject(SERIALIZER::decode);
96 totalCompleted.set(reader.readLong());
97 }
98
99 @Override
100 protected void configure(RaftServiceExecutor executor) {
101 executor.register(STATS, this::stats, SERIALIZER::encode);
102 executor.register(REGISTER, this::register);
103 executor.register(UNREGISTER, this::unregister);
104 executor.register(ADD, SERIALIZER::decode, this::add);
105 executor.register(TAKE, SERIALIZER::decode, this::take, SERIALIZER::encode);
106 executor.register(COMPLETE, SERIALIZER::decode, this::complete);
107 executor.register(CLEAR, this::clear);
108 }
109
110 protected WorkQueueStats stats(Commit<Void> commit) {
111 return WorkQueueStats.builder()
112 .withTotalCompleted(totalCompleted.get())
113 .withTotalPending(unassignedTasks.size())
114 .withTotalInProgress(assignments.size())
115 .build();
116 }
117
118 protected void clear(Commit<Void> commit) {
119 unassignedTasks.clear();
120 assignments.clear();
121 registeredWorkers.clear();
122 totalCompleted.set(0);
123 }
124
125 protected void register(Commit<Void> commit) {
126 registeredWorkers.put(commit.session().sessionId().id(), commit.session());
127 }
128
129 protected void unregister(Commit<Void> commit) {
130 registeredWorkers.remove(commit.session().sessionId().id());
131 }
132
133 protected void add(Commit<? extends Add> commit) {
134 Collection<byte[]> items = commit.value().items();
135
136 AtomicInteger itemIndex = new AtomicInteger(0);
137 items.forEach(item -> {
138 String taskId = String.format("%d:%d:%d", commit.session().sessionId().id(),
139 commit.index(),
140 itemIndex.getAndIncrement());
141 unassignedTasks.add(new Task<>(taskId, item));
142 });
143
144 // Send an event to all sessions that have expressed interest in task processing
145 // and are not actively processing a task.
146 registeredWorkers.values().forEach(session -> session.publish(TASK_AVAILABLE));
147 // FIXME: This generates a lot of event traffic.
148 }
149
150 protected Collection<Task<byte[]>> take(Commit<? extends Take> commit) {
151 try {
152 if (unassignedTasks.isEmpty()) {
153 return ImmutableList.of();
154 }
155 long sessionId = commit.session().sessionId().id();
156 int maxTasks = commit.value().maxTasks();
157 return IntStream.range(0, Math.min(maxTasks, unassignedTasks.size()))
158 .mapToObj(i -> {
159 Task<byte[]> task = unassignedTasks.poll();
160 String taskId = task.taskId();
161 TaskAssignment assignment = new TaskAssignment(sessionId, task);
162
163 // bookkeeping
164 assignments.put(taskId, assignment);
165
166 return task;
167 })
168 .collect(Collectors.toCollection(ArrayList::new));
169 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700170 logger().warn("State machine update failed", e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800171 throw new IllegalStateException(e);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700172 }
173 }
174
175 protected void complete(Commit<? extends Complete> commit) {
176 long sessionId = commit.session().sessionId().id();
177 try {
178 commit.value().taskIds().forEach(taskId -> {
179 TaskAssignment assignment = assignments.get(taskId);
180 if (assignment != null && assignment.sessionId() == sessionId) {
181 assignments.remove(taskId);
182 // bookkeeping
183 totalCompleted.incrementAndGet();
184 }
185 });
186 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700187 logger().warn("State machine update failed", e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800188 throw new IllegalStateException(e);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700189 }
190 }
191
192 @Override
193 public void onExpire(RaftSession session) {
194 evictWorker(session.sessionId().id());
195 }
196
197 @Override
198 public void onClose(RaftSession session) {
199 evictWorker(session.sessionId().id());
200 }
201
202 private void evictWorker(long sessionId) {
203 registeredWorkers.remove(sessionId);
204
205 // TODO: Maintain an index of tasks by session for efficient access.
206 Iterator<Map.Entry<String, TaskAssignment>> iter = assignments.entrySet().iterator();
207 while (iter.hasNext()) {
208 Map.Entry<String, TaskAssignment> entry = iter.next();
209 TaskAssignment assignment = entry.getValue();
210 if (assignment.sessionId() == sessionId) {
211 unassignedTasks.add(assignment.task());
212 iter.remove();
213 }
214 }
215 }
216
217 private static class TaskAssignment {
218 private final long sessionId;
219 private final Task<byte[]> task;
220
221 public TaskAssignment(long sessionId, Task<byte[]> task) {
222 this.sessionId = sessionId;
223 this.task = task;
224 }
225
226 public long sessionId() {
227 return sessionId;
228 }
229
230 public Task<byte[]> task() {
231 return task;
232 }
233
234 @Override
235 public String toString() {
236 return MoreObjects.toStringHelper(getClass())
237 .add("sessionId", sessionId)
238 .add("task", task)
239 .toString();
240 }
241 }
242}