| /* |
| * Copyright 2016-present Open Networking Laboratory |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.onosproject.store.primitives.resources.impl; |
| |
| import static org.slf4j.LoggerFactory.getLogger; |
| import io.atomix.copycat.server.Commit; |
| import io.atomix.copycat.server.Snapshottable; |
| import io.atomix.copycat.server.StateMachineExecutor; |
| import io.atomix.copycat.server.session.ServerSession; |
| import io.atomix.copycat.server.session.SessionListener; |
| import io.atomix.copycat.server.storage.snapshot.SnapshotReader; |
| import io.atomix.copycat.server.storage.snapshot.SnapshotWriter; |
| import io.atomix.resource.ResourceStateMachine; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Queue; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.Consumer; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| |
| import org.onlab.util.CountDownCompleter; |
| import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add; |
| import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear; |
| import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete; |
| import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register; |
| import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats; |
| import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take; |
| import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister; |
| import org.onosproject.store.service.Task; |
| import org.onosproject.store.service.WorkQueueStats; |
| import org.slf4j.Logger; |
| |
| import com.google.common.base.MoreObjects; |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Queues; |
| import com.google.common.util.concurrent.AtomicLongMap; |
| |
| /** |
| * State machine for {@link AtomixWorkQueue} resource. |
| */ |
| public class AtomixWorkQueueState extends ResourceStateMachine implements SessionListener, Snapshottable { |
| |
| private final Logger log = getLogger(getClass()); |
| |
| private final AtomicLong totalCompleted = new AtomicLong(0); |
| |
| private final Queue<TaskHolder> unassignedTasks = Queues.newArrayDeque(); |
| private final Map<String, TaskAssignment> assignments = Maps.newHashMap(); |
| private final Map<Long, Commit<? extends Register>> registeredWorkers = Maps.newHashMap(); |
| private final AtomicLongMap<Long> activeTasksPerSession = AtomicLongMap.create(); |
| |
| protected AtomixWorkQueueState(Properties config) { |
| super(config); |
| } |
| |
| @Override |
| protected void configure(StateMachineExecutor executor) { |
| executor.register(Stats.class, this::stats); |
| executor.register(Register.class, (Consumer<Commit<Register>>) this::register); |
| executor.register(Unregister.class, (Consumer<Commit<Unregister>>) this::unregister); |
| executor.register(Add.class, (Consumer<Commit<Add>>) this::add); |
| executor.register(Take.class, this::take); |
| executor.register(Complete.class, (Consumer<Commit<Complete>>) this::complete); |
| executor.register(Clear.class, (Consumer<Commit<Clear>>) this::clear); |
| } |
| |
| protected WorkQueueStats stats(Commit<? extends Stats> commit) { |
| try { |
| return WorkQueueStats.builder() |
| .withTotalCompleted(totalCompleted.get()) |
| .withTotalPending(unassignedTasks.size()) |
| .withTotalInProgress(assignments.size()) |
| .build(); |
| } finally { |
| commit.close(); |
| } |
| } |
| |
| protected void clear(Commit<? extends Clear> commit) { |
| unassignedTasks.forEach(TaskHolder::complete); |
| unassignedTasks.clear(); |
| assignments.values().forEach(TaskAssignment::markComplete); |
| assignments.clear(); |
| registeredWorkers.values().forEach(Commit::close); |
| registeredWorkers.clear(); |
| activeTasksPerSession.clear(); |
| totalCompleted.set(0); |
| } |
| |
| protected void register(Commit<? extends Register> commit) { |
| long sessionId = commit.session().id(); |
| if (registeredWorkers.putIfAbsent(sessionId, commit) != null) { |
| commit.close(); |
| } |
| } |
| |
| protected void unregister(Commit<? extends Unregister> commit) { |
| try { |
| Commit<? extends Register> registerCommit = registeredWorkers.remove(commit.session().id()); |
| if (registerCommit != null) { |
| registerCommit.close(); |
| } |
| } finally { |
| commit.close(); |
| } |
| } |
| |
| protected void add(Commit<? extends Add> commit) { |
| Collection<byte[]> items = commit.operation().items(); |
| |
| // Create a CountDownCompleter that will close the commit when all tasks |
| // submitted as part of it are completed. |
| CountDownCompleter<Commit<? extends Add>> referenceTracker = |
| new CountDownCompleter<>(commit, items.size(), Commit::close); |
| |
| AtomicInteger itemIndex = new AtomicInteger(0); |
| items.forEach(item -> { |
| String taskId = String.format("%d:%d:%d", commit.session().id(), |
| commit.index(), |
| itemIndex.getAndIncrement()); |
| unassignedTasks.add(new TaskHolder(new Task<>(taskId, item), referenceTracker)); |
| }); |
| |
| // Send an event to all sessions that have expressed interest in task processing |
| // and are not actively processing a task. |
| registeredWorkers.values() |
| .stream() |
| .map(Commit::session) |
| .forEach(session -> session.publish(AtomixWorkQueue.TASK_AVAILABLE)); |
| // FIXME: This generates a lot of event traffic. |
| } |
| |
| protected Collection<Task<byte[]>> take(Commit<? extends Take> commit) { |
| try { |
| if (unassignedTasks.isEmpty()) { |
| return ImmutableList.of(); |
| } |
| long sessionId = commit.session().id(); |
| int maxTasks = commit.operation().maxTasks(); |
| return IntStream.range(0, Math.min(maxTasks, unassignedTasks.size())) |
| .mapToObj(i -> { |
| TaskHolder holder = unassignedTasks.poll(); |
| String taskId = holder.task().taskId(); |
| TaskAssignment assignment = new TaskAssignment(sessionId, holder); |
| |
| // bookkeeping |
| assignments.put(taskId, assignment); |
| activeTasksPerSession.incrementAndGet(sessionId); |
| |
| return holder.task(); |
| }) |
| .collect(Collectors.toCollection(ArrayList::new)); |
| } catch (Exception e) { |
| log.warn("State machine update failed", e); |
| throw Throwables.propagate(e); |
| } finally { |
| commit.close(); |
| } |
| } |
| |
| protected void complete(Commit<? extends Complete> commit) { |
| long sessionId = commit.session().id(); |
| try { |
| commit.operation().taskIds().forEach(taskId -> { |
| TaskAssignment assignment = assignments.get(taskId); |
| if (assignment != null && assignment.sessionId() == sessionId) { |
| assignments.remove(taskId).markComplete(); |
| // bookkeeping |
| totalCompleted.incrementAndGet(); |
| activeTasksPerSession.decrementAndGet(sessionId); |
| } |
| }); |
| } catch (Exception e) { |
| log.warn("State machine update failed", e); |
| throw Throwables.propagate(e); |
| } finally { |
| commit.close(); |
| } |
| } |
| |
| @Override |
| public void register(ServerSession session) { |
| } |
| |
| @Override |
| public void unregister(ServerSession session) { |
| evictWorker(session.id()); |
| } |
| |
| @Override |
| public void expire(ServerSession session) { |
| evictWorker(session.id()); |
| } |
| |
| @Override |
| public void close(ServerSession session) { |
| evictWorker(session.id()); |
| } |
| |
| @Override |
| public void snapshot(SnapshotWriter writer) { |
| writer.writeLong(totalCompleted.get()); |
| } |
| |
| @Override |
| public void install(SnapshotReader reader) { |
| totalCompleted.set(reader.readLong()); |
| } |
| |
| private void evictWorker(long sessionId) { |
| Commit<? extends Register> commit = registeredWorkers.remove(sessionId); |
| if (commit != null) { |
| commit.close(); |
| } |
| |
| // TODO: Maintain an index of tasks by session for efficient access. |
| Iterator<Map.Entry<String, TaskAssignment>> iter = assignments.entrySet().iterator(); |
| while (iter.hasNext()) { |
| Map.Entry<String, TaskAssignment> entry = iter.next(); |
| TaskAssignment assignment = entry.getValue(); |
| if (assignment.sessionId() == sessionId) { |
| unassignedTasks.add(assignment.taskHolder()); |
| iter.remove(); |
| } |
| } |
| |
| // Bookkeeping |
| activeTasksPerSession.remove(sessionId); |
| activeTasksPerSession.removeAllZeros(); |
| } |
| |
| private class TaskHolder { |
| |
| private final Task<byte[]> task; |
| private final CountDownCompleter<Commit<? extends Add>> referenceTracker; |
| |
| public TaskHolder(Task<byte[]> delegate, CountDownCompleter<Commit<? extends Add>> referenceTracker) { |
| this.task = delegate; |
| this.referenceTracker = referenceTracker; |
| } |
| |
| public Task<byte[]> task() { |
| return task; |
| } |
| |
| public void complete() { |
| referenceTracker.countDown(); |
| } |
| } |
| |
| private class TaskAssignment { |
| private final long sessionId; |
| private final TaskHolder taskHolder; |
| |
| public TaskAssignment(long sessionId, TaskHolder taskHolder) { |
| this.sessionId = sessionId; |
| this.taskHolder = taskHolder; |
| } |
| |
| public long sessionId() { |
| return sessionId; |
| } |
| |
| public TaskHolder taskHolder() { |
| return taskHolder; |
| } |
| |
| public void markComplete() { |
| taskHolder.complete(); |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(getClass()) |
| .add("sessionId", sessionId) |
| .add("taskHolder", taskHolder) |
| .toString(); |
| } |
| } |
| } |