Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016-present Open Networking Laboratory |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.onosproject.store.primitives.resources.impl; |
| 17 | |
| 18 | import static org.slf4j.LoggerFactory.getLogger; |
Madan Jampani | 819d61d | 2016-07-25 20:29:43 -0700 | [diff] [blame^] | 19 | import io.atomix.copycat.server.Commit; |
| 20 | import io.atomix.copycat.server.Snapshottable; |
| 21 | import io.atomix.copycat.server.StateMachineExecutor; |
| 22 | import io.atomix.copycat.server.session.ServerSession; |
| 23 | import io.atomix.copycat.server.session.SessionListener; |
| 24 | import io.atomix.copycat.server.storage.snapshot.SnapshotReader; |
| 25 | import io.atomix.copycat.server.storage.snapshot.SnapshotWriter; |
| 26 | import io.atomix.resource.ResourceStateMachine; |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 27 | |
| 28 | import java.util.ArrayList; |
| 29 | import java.util.Collection; |
| 30 | import java.util.Iterator; |
| 31 | import java.util.Map; |
| 32 | import java.util.Properties; |
| 33 | import java.util.Queue; |
| 34 | import java.util.concurrent.atomic.AtomicInteger; |
| 35 | import java.util.concurrent.atomic.AtomicLong; |
| 36 | import java.util.function.Consumer; |
| 37 | import java.util.stream.Collectors; |
| 38 | import java.util.stream.IntStream; |
| 39 | |
| 40 | import org.onlab.util.CountDownCompleter; |
| 41 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add; |
Madan Jampani | 819d61d | 2016-07-25 20:29:43 -0700 | [diff] [blame^] | 42 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear; |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 43 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete; |
| 44 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register; |
| 45 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats; |
| 46 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take; |
| 47 | import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister; |
| 48 | import org.onosproject.store.service.Task; |
| 49 | import org.onosproject.store.service.WorkQueueStats; |
| 50 | import org.slf4j.Logger; |
| 51 | |
| 52 | import com.google.common.base.MoreObjects; |
| 53 | import com.google.common.base.Throwables; |
| 54 | import com.google.common.collect.ImmutableList; |
| 55 | import com.google.common.collect.Maps; |
| 56 | import com.google.common.collect.Queues; |
| 57 | import com.google.common.util.concurrent.AtomicLongMap; |
| 58 | |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 59 | /** |
| 60 | * State machine for {@link AtomixWorkQueue} resource. |
| 61 | */ |
| 62 | public class AtomixWorkQueueState extends ResourceStateMachine implements SessionListener, Snapshottable { |
| 63 | |
| 64 | private final Logger log = getLogger(getClass()); |
| 65 | |
| 66 | private final AtomicLong totalCompleted = new AtomicLong(0); |
| 67 | |
| 68 | private final Queue<TaskHolder> unassignedTasks = Queues.newArrayDeque(); |
| 69 | private final Map<String, TaskAssignment> assignments = Maps.newHashMap(); |
| 70 | private final Map<Long, Commit<? extends Register>> registeredWorkers = Maps.newHashMap(); |
| 71 | private final AtomicLongMap<Long> activeTasksPerSession = AtomicLongMap.create(); |
| 72 | |
| 73 | protected AtomixWorkQueueState(Properties config) { |
| 74 | super(config); |
| 75 | } |
| 76 | |
| 77 | @Override |
| 78 | protected void configure(StateMachineExecutor executor) { |
| 79 | executor.register(Stats.class, this::stats); |
| 80 | executor.register(Register.class, (Consumer<Commit<Register>>) this::register); |
| 81 | executor.register(Unregister.class, (Consumer<Commit<Unregister>>) this::unregister); |
| 82 | executor.register(Add.class, (Consumer<Commit<Add>>) this::add); |
| 83 | executor.register(Take.class, this::take); |
| 84 | executor.register(Complete.class, (Consumer<Commit<Complete>>) this::complete); |
Madan Jampani | 819d61d | 2016-07-25 20:29:43 -0700 | [diff] [blame^] | 85 | executor.register(Clear.class, (Consumer<Commit<Clear>>) this::clear); |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 86 | } |
| 87 | |
| 88 | protected WorkQueueStats stats(Commit<? extends Stats> commit) { |
| 89 | try { |
| 90 | return WorkQueueStats.builder() |
| 91 | .withTotalCompleted(totalCompleted.get()) |
| 92 | .withTotalPending(unassignedTasks.size()) |
| 93 | .withTotalInProgress(assignments.size()) |
| 94 | .build(); |
| 95 | } finally { |
| 96 | commit.close(); |
| 97 | } |
| 98 | } |
| 99 | |
Madan Jampani | 819d61d | 2016-07-25 20:29:43 -0700 | [diff] [blame^] | 100 | protected void clear(Commit<? extends Clear> commit) { |
| 101 | unassignedTasks.forEach(TaskHolder::complete); |
| 102 | unassignedTasks.clear(); |
| 103 | assignments.values().forEach(TaskAssignment::markComplete); |
| 104 | assignments.clear(); |
| 105 | registeredWorkers.values().forEach(Commit::close); |
| 106 | registeredWorkers.clear(); |
| 107 | activeTasksPerSession.clear(); |
| 108 | totalCompleted.set(0); |
| 109 | } |
| 110 | |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 111 | protected void register(Commit<? extends Register> commit) { |
| 112 | long sessionId = commit.session().id(); |
| 113 | if (registeredWorkers.putIfAbsent(sessionId, commit) != null) { |
| 114 | commit.close(); |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | protected void unregister(Commit<? extends Unregister> commit) { |
| 119 | try { |
| 120 | Commit<? extends Register> registerCommit = registeredWorkers.remove(commit.session().id()); |
| 121 | if (registerCommit != null) { |
| 122 | registerCommit.close(); |
| 123 | } |
| 124 | } finally { |
| 125 | commit.close(); |
| 126 | } |
| 127 | } |
| 128 | |
| 129 | protected void add(Commit<? extends Add> commit) { |
| 130 | Collection<byte[]> items = commit.operation().items(); |
| 131 | |
| 132 | // Create a CountDownCompleter that will close the commit when all tasks |
| 133 | // submitted as part of it are completed. |
| 134 | CountDownCompleter<Commit<? extends Add>> referenceTracker = |
| 135 | new CountDownCompleter<>(commit, items.size(), Commit::close); |
| 136 | |
| 137 | AtomicInteger itemIndex = new AtomicInteger(0); |
| 138 | items.forEach(item -> { |
| 139 | String taskId = String.format("%d:%d:%d", commit.session().id(), |
| 140 | commit.index(), |
| 141 | itemIndex.getAndIncrement()); |
| 142 | unassignedTasks.add(new TaskHolder(new Task<>(taskId, item), referenceTracker)); |
| 143 | }); |
| 144 | |
| 145 | // Send an event to all sessions that have expressed interest in task processing |
| 146 | // and are not actively processing a task. |
| 147 | registeredWorkers.values() |
| 148 | .stream() |
| 149 | .map(Commit::session) |
| 150 | .forEach(session -> session.publish(AtomixWorkQueue.TASK_AVAILABLE)); |
| 151 | // FIXME: This generates a lot of event traffic. |
| 152 | } |
| 153 | |
| 154 | protected Collection<Task<byte[]>> take(Commit<? extends Take> commit) { |
| 155 | try { |
| 156 | if (unassignedTasks.isEmpty()) { |
| 157 | return ImmutableList.of(); |
| 158 | } |
| 159 | long sessionId = commit.session().id(); |
| 160 | int maxTasks = commit.operation().maxTasks(); |
| 161 | return IntStream.range(0, Math.min(maxTasks, unassignedTasks.size())) |
| 162 | .mapToObj(i -> { |
| 163 | TaskHolder holder = unassignedTasks.poll(); |
| 164 | String taskId = holder.task().taskId(); |
| 165 | TaskAssignment assignment = new TaskAssignment(sessionId, holder); |
| 166 | |
| 167 | // bookkeeping |
| 168 | assignments.put(taskId, assignment); |
| 169 | activeTasksPerSession.incrementAndGet(sessionId); |
| 170 | |
| 171 | return holder.task(); |
| 172 | }) |
| 173 | .collect(Collectors.toCollection(ArrayList::new)); |
| 174 | } catch (Exception e) { |
| 175 | log.warn("State machine update failed", e); |
| 176 | throw Throwables.propagate(e); |
| 177 | } finally { |
| 178 | commit.close(); |
| 179 | } |
| 180 | } |
| 181 | |
| 182 | protected void complete(Commit<? extends Complete> commit) { |
| 183 | long sessionId = commit.session().id(); |
| 184 | try { |
| 185 | commit.operation().taskIds().forEach(taskId -> { |
| 186 | TaskAssignment assignment = assignments.get(taskId); |
Madan Jampani | 819d61d | 2016-07-25 20:29:43 -0700 | [diff] [blame^] | 187 | if (assignment != null && assignment.sessionId() == sessionId) { |
Madan Jampani | 35708a9 | 2016-07-06 10:48:19 -0700 | [diff] [blame] | 188 | assignments.remove(taskId).markComplete(); |
| 189 | // bookkeeping |
| 190 | totalCompleted.incrementAndGet(); |
| 191 | activeTasksPerSession.decrementAndGet(sessionId); |
| 192 | } |
| 193 | }); |
| 194 | } catch (Exception e) { |
| 195 | log.warn("State machine update failed", e); |
| 196 | throw Throwables.propagate(e); |
| 197 | } finally { |
| 198 | commit.close(); |
| 199 | } |
| 200 | } |
| 201 | |
| 202 | @Override |
| 203 | public void register(ServerSession session) { |
| 204 | } |
| 205 | |
| 206 | @Override |
| 207 | public void unregister(ServerSession session) { |
| 208 | evictWorker(session.id()); |
| 209 | } |
| 210 | |
| 211 | @Override |
| 212 | public void expire(ServerSession session) { |
| 213 | evictWorker(session.id()); |
| 214 | } |
| 215 | |
| 216 | @Override |
| 217 | public void close(ServerSession session) { |
| 218 | evictWorker(session.id()); |
| 219 | } |
| 220 | |
| 221 | @Override |
| 222 | public void snapshot(SnapshotWriter writer) { |
| 223 | writer.writeLong(totalCompleted.get()); |
| 224 | } |
| 225 | |
| 226 | @Override |
| 227 | public void install(SnapshotReader reader) { |
| 228 | totalCompleted.set(reader.readLong()); |
| 229 | } |
| 230 | |
| 231 | private void evictWorker(long sessionId) { |
| 232 | Commit<? extends Register> commit = registeredWorkers.remove(sessionId); |
| 233 | if (commit != null) { |
| 234 | commit.close(); |
| 235 | } |
| 236 | |
| 237 | // TODO: Maintain an index of tasks by session for efficient access. |
| 238 | Iterator<Map.Entry<String, TaskAssignment>> iter = assignments.entrySet().iterator(); |
| 239 | while (iter.hasNext()) { |
| 240 | Map.Entry<String, TaskAssignment> entry = iter.next(); |
| 241 | TaskAssignment assignment = entry.getValue(); |
| 242 | if (assignment.sessionId() == sessionId) { |
| 243 | unassignedTasks.add(assignment.taskHolder()); |
| 244 | iter.remove(); |
| 245 | } |
| 246 | } |
| 247 | |
| 248 | // Bookkeeping |
| 249 | activeTasksPerSession.remove(sessionId); |
| 250 | activeTasksPerSession.removeAllZeros(); |
| 251 | } |
| 252 | |
| 253 | private class TaskHolder { |
| 254 | |
| 255 | private final Task<byte[]> task; |
| 256 | private final CountDownCompleter<Commit<? extends Add>> referenceTracker; |
| 257 | |
| 258 | public TaskHolder(Task<byte[]> delegate, CountDownCompleter<Commit<? extends Add>> referenceTracker) { |
| 259 | this.task = delegate; |
| 260 | this.referenceTracker = referenceTracker; |
| 261 | } |
| 262 | |
| 263 | public Task<byte[]> task() { |
| 264 | return task; |
| 265 | } |
| 266 | |
| 267 | public void complete() { |
| 268 | referenceTracker.countDown(); |
| 269 | } |
| 270 | } |
| 271 | |
| 272 | private class TaskAssignment { |
| 273 | private final long sessionId; |
| 274 | private final TaskHolder taskHolder; |
| 275 | |
| 276 | public TaskAssignment(long sessionId, TaskHolder taskHolder) { |
| 277 | this.sessionId = sessionId; |
| 278 | this.taskHolder = taskHolder; |
| 279 | } |
| 280 | |
| 281 | public long sessionId() { |
| 282 | return sessionId; |
| 283 | } |
| 284 | |
| 285 | public TaskHolder taskHolder() { |
| 286 | return taskHolder; |
| 287 | } |
| 288 | |
| 289 | public void markComplete() { |
| 290 | taskHolder.complete(); |
| 291 | } |
| 292 | |
| 293 | @Override |
| 294 | public String toString() { |
| 295 | return MoreObjects.toStringHelper(getClass()) |
| 296 | .add("sessionId", sessionId) |
| 297 | .add("taskHolder", taskHolder) |
| 298 | .toString(); |
| 299 | } |
| 300 | } |
| 301 | } |