blob: b226860ca44d1cb83a23464431455c084e80c669 [file] [log] [blame]
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.onlab.util.CountDownCompleter;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.AtomicLongMap;
/**
* State machine for {@link AtomixWorkQueue} resource.
*/
public class AtomixWorkQueueState extends ResourceStateMachine implements SessionListener, Snapshottable {
private final Logger log = getLogger(getClass());
private final AtomicLong totalCompleted = new AtomicLong(0);
private final Queue<TaskHolder> unassignedTasks = Queues.newArrayDeque();
private final Map<String, TaskAssignment> assignments = Maps.newHashMap();
private final Map<Long, Commit<? extends Register>> registeredWorkers = Maps.newHashMap();
private final AtomicLongMap<Long> activeTasksPerSession = AtomicLongMap.create();
protected AtomixWorkQueueState(Properties config) {
super(config);
}
@Override
protected void configure(StateMachineExecutor executor) {
executor.register(Stats.class, this::stats);
executor.register(Register.class, (Consumer<Commit<Register>>) this::register);
executor.register(Unregister.class, (Consumer<Commit<Unregister>>) this::unregister);
executor.register(Add.class, (Consumer<Commit<Add>>) this::add);
executor.register(Take.class, this::take);
executor.register(Complete.class, (Consumer<Commit<Complete>>) this::complete);
executor.register(Clear.class, (Consumer<Commit<Clear>>) this::clear);
}
protected WorkQueueStats stats(Commit<? extends Stats> commit) {
try {
return WorkQueueStats.builder()
.withTotalCompleted(totalCompleted.get())
.withTotalPending(unassignedTasks.size())
.withTotalInProgress(assignments.size())
.build();
} finally {
commit.close();
}
}
protected void clear(Commit<? extends Clear> commit) {
unassignedTasks.forEach(TaskHolder::complete);
unassignedTasks.clear();
assignments.values().forEach(TaskAssignment::markComplete);
assignments.clear();
registeredWorkers.values().forEach(Commit::close);
registeredWorkers.clear();
activeTasksPerSession.clear();
totalCompleted.set(0);
}
protected void register(Commit<? extends Register> commit) {
long sessionId = commit.session().id();
if (registeredWorkers.putIfAbsent(sessionId, commit) != null) {
commit.close();
}
}
protected void unregister(Commit<? extends Unregister> commit) {
try {
Commit<? extends Register> registerCommit = registeredWorkers.remove(commit.session().id());
if (registerCommit != null) {
registerCommit.close();
}
} finally {
commit.close();
}
}
protected void add(Commit<? extends Add> commit) {
Collection<byte[]> items = commit.operation().items();
// Create a CountDownCompleter that will close the commit when all tasks
// submitted as part of it are completed.
CountDownCompleter<Commit<? extends Add>> referenceTracker =
new CountDownCompleter<>(commit, items.size(), Commit::close);
AtomicInteger itemIndex = new AtomicInteger(0);
items.forEach(item -> {
String taskId = String.format("%d:%d:%d", commit.session().id(),
commit.index(),
itemIndex.getAndIncrement());
unassignedTasks.add(new TaskHolder(new Task<>(taskId, item), referenceTracker));
});
// Send an event to all sessions that have expressed interest in task processing
// and are not actively processing a task.
registeredWorkers.values()
.stream()
.map(Commit::session)
.forEach(session -> session.publish(AtomixWorkQueue.TASK_AVAILABLE));
// FIXME: This generates a lot of event traffic.
}
protected Collection<Task<byte[]>> take(Commit<? extends Take> commit) {
try {
if (unassignedTasks.isEmpty()) {
return ImmutableList.of();
}
long sessionId = commit.session().id();
int maxTasks = commit.operation().maxTasks();
return IntStream.range(0, Math.min(maxTasks, unassignedTasks.size()))
.mapToObj(i -> {
TaskHolder holder = unassignedTasks.poll();
String taskId = holder.task().taskId();
TaskAssignment assignment = new TaskAssignment(sessionId, holder);
// bookkeeping
assignments.put(taskId, assignment);
activeTasksPerSession.incrementAndGet(sessionId);
return holder.task();
})
.collect(Collectors.toCollection(ArrayList::new));
} catch (Exception e) {
log.warn("State machine update failed", e);
throw Throwables.propagate(e);
} finally {
commit.close();
}
}
protected void complete(Commit<? extends Complete> commit) {
long sessionId = commit.session().id();
try {
commit.operation().taskIds().forEach(taskId -> {
TaskAssignment assignment = assignments.get(taskId);
if (assignment != null && assignment.sessionId() == sessionId) {
assignments.remove(taskId).markComplete();
// bookkeeping
totalCompleted.incrementAndGet();
activeTasksPerSession.decrementAndGet(sessionId);
}
});
} catch (Exception e) {
log.warn("State machine update failed", e);
throw Throwables.propagate(e);
} finally {
commit.close();
}
}
@Override
public void register(ServerSession session) {
}
@Override
public void unregister(ServerSession session) {
evictWorker(session.id());
}
@Override
public void expire(ServerSession session) {
evictWorker(session.id());
}
@Override
public void close(ServerSession session) {
evictWorker(session.id());
}
@Override
public void snapshot(SnapshotWriter writer) {
writer.writeLong(totalCompleted.get());
}
@Override
public void install(SnapshotReader reader) {
totalCompleted.set(reader.readLong());
}
private void evictWorker(long sessionId) {
Commit<? extends Register> commit = registeredWorkers.remove(sessionId);
if (commit != null) {
commit.close();
}
// TODO: Maintain an index of tasks by session for efficient access.
Iterator<Map.Entry<String, TaskAssignment>> iter = assignments.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, TaskAssignment> entry = iter.next();
TaskAssignment assignment = entry.getValue();
if (assignment.sessionId() == sessionId) {
unassignedTasks.add(assignment.taskHolder());
iter.remove();
}
}
// Bookkeeping
activeTasksPerSession.remove(sessionId);
activeTasksPerSession.removeAllZeros();
}
private class TaskHolder {
private final Task<byte[]> task;
private final CountDownCompleter<Commit<? extends Add>> referenceTracker;
public TaskHolder(Task<byte[]> delegate, CountDownCompleter<Commit<? extends Add>> referenceTracker) {
this.task = delegate;
this.referenceTracker = referenceTracker;
}
public Task<byte[]> task() {
return task;
}
public void complete() {
referenceTracker.countDown();
}
}
private class TaskAssignment {
private final long sessionId;
private final TaskHolder taskHolder;
public TaskAssignment(long sessionId, TaskHolder taskHolder) {
this.sessionId = sessionId;
this.taskHolder = taskHolder;
}
public long sessionId() {
return sessionId;
}
public TaskHolder taskHolder() {
return taskHolder;
}
public void markComplete() {
taskHolder.complete();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("sessionId", sessionId)
.add("taskHolder", taskHolder)
.toString();
}
}
}