[ONOS-6594] Upgrade to Atomix 2.0.0
Change-Id: I6534bca1c8570b4e017f682953b876da29146675
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueService.java
new file mode 100644
index 0000000..6458ec8
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueService.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import io.atomix.protocols.raft.service.AbstractRaftService;
+import io.atomix.protocols.raft.service.Commit;
+import io.atomix.protocols.raft.service.RaftServiceExecutor;
+import io.atomix.protocols.raft.session.RaftSession;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Complete;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Take;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueEvents.TASK_AVAILABLE;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.ADD;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.COMPLETE;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.REGISTER;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.STATS;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.TAKE;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.UNREGISTER;
+
+/**
+ * State machine for {@link AtomixWorkQueue} resource.
+ */
+public class AtomixWorkQueueService extends AbstractRaftService {
+
+ private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+ .register(KryoNamespaces.BASIC)
+ .register(AtomixWorkQueueOperations.NAMESPACE)
+ .register(AtomixWorkQueueEvents.NAMESPACE)
+ .register(TaskAssignment.class)
+ .register(new HashMap().keySet().getClass())
+ .register(ArrayDeque.class)
+ .build());
+
+ private final AtomicLong totalCompleted = new AtomicLong(0);
+
+ private Queue<Task<byte[]>> unassignedTasks = Queues.newArrayDeque();
+ private Map<String, TaskAssignment> assignments = Maps.newHashMap();
+ private Map<Long, RaftSession> registeredWorkers = Maps.newHashMap();
+
+ @Override
+ public void snapshot(SnapshotWriter writer) {
+ writer.writeObject(Sets.newHashSet(registeredWorkers.keySet()), SERIALIZER::encode);
+ writer.writeObject(assignments, SERIALIZER::encode);
+ writer.writeObject(unassignedTasks, SERIALIZER::encode);
+ writer.writeLong(totalCompleted.get());
+ }
+
+ @Override
+ public void install(SnapshotReader reader) {
+ registeredWorkers = Maps.newHashMap();
+ for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
+ registeredWorkers.put(sessionId, getSessions().getSession(sessionId));
+ }
+ assignments = reader.readObject(SERIALIZER::decode);
+ unassignedTasks = reader.readObject(SERIALIZER::decode);
+ totalCompleted.set(reader.readLong());
+ }
+
+ @Override
+ protected void configure(RaftServiceExecutor executor) {
+ executor.register(STATS, this::stats, SERIALIZER::encode);
+ executor.register(REGISTER, this::register);
+ executor.register(UNREGISTER, this::unregister);
+ executor.register(ADD, SERIALIZER::decode, this::add);
+ executor.register(TAKE, SERIALIZER::decode, this::take, SERIALIZER::encode);
+ executor.register(COMPLETE, SERIALIZER::decode, this::complete);
+ executor.register(CLEAR, this::clear);
+ }
+
+ protected WorkQueueStats stats(Commit<Void> commit) {
+ return WorkQueueStats.builder()
+ .withTotalCompleted(totalCompleted.get())
+ .withTotalPending(unassignedTasks.size())
+ .withTotalInProgress(assignments.size())
+ .build();
+ }
+
+ protected void clear(Commit<Void> commit) {
+ unassignedTasks.clear();
+ assignments.clear();
+ registeredWorkers.clear();
+ totalCompleted.set(0);
+ }
+
+ protected void register(Commit<Void> commit) {
+ registeredWorkers.put(commit.session().sessionId().id(), commit.session());
+ }
+
+ protected void unregister(Commit<Void> commit) {
+ registeredWorkers.remove(commit.session().sessionId().id());
+ }
+
+ protected void add(Commit<? extends Add> commit) {
+ Collection<byte[]> items = commit.value().items();
+
+ AtomicInteger itemIndex = new AtomicInteger(0);
+ items.forEach(item -> {
+ String taskId = String.format("%d:%d:%d", commit.session().sessionId().id(),
+ commit.index(),
+ itemIndex.getAndIncrement());
+ unassignedTasks.add(new Task<>(taskId, item));
+ });
+
+ // Send an event to all sessions that have expressed interest in task processing
+ // and are not actively processing a task.
+ registeredWorkers.values().forEach(session -> session.publish(TASK_AVAILABLE));
+ // FIXME: This generates a lot of event traffic.
+ }
+
+ protected Collection<Task<byte[]>> take(Commit<? extends Take> commit) {
+ try {
+ if (unassignedTasks.isEmpty()) {
+ return ImmutableList.of();
+ }
+ long sessionId = commit.session().sessionId().id();
+ int maxTasks = commit.value().maxTasks();
+ return IntStream.range(0, Math.min(maxTasks, unassignedTasks.size()))
+ .mapToObj(i -> {
+ Task<byte[]> task = unassignedTasks.poll();
+ String taskId = task.taskId();
+ TaskAssignment assignment = new TaskAssignment(sessionId, task);
+
+ // bookkeeping
+ assignments.put(taskId, assignment);
+
+ return task;
+ })
+ .collect(Collectors.toCollection(ArrayList::new));
+ } catch (Exception e) {
+ getLogger().warn("State machine update failed", e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ protected void complete(Commit<? extends Complete> commit) {
+ long sessionId = commit.session().sessionId().id();
+ try {
+ commit.value().taskIds().forEach(taskId -> {
+ TaskAssignment assignment = assignments.get(taskId);
+ if (assignment != null && assignment.sessionId() == sessionId) {
+ assignments.remove(taskId);
+ // bookkeeping
+ totalCompleted.incrementAndGet();
+ }
+ });
+ } catch (Exception e) {
+ getLogger().warn("State machine update failed", e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void onExpire(RaftSession session) {
+ evictWorker(session.sessionId().id());
+ }
+
+ @Override
+ public void onClose(RaftSession session) {
+ evictWorker(session.sessionId().id());
+ }
+
+ private void evictWorker(long sessionId) {
+ registeredWorkers.remove(sessionId);
+
+ // TODO: Maintain an index of tasks by session for efficient access.
+ Iterator<Map.Entry<String, TaskAssignment>> iter = assignments.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, TaskAssignment> entry = iter.next();
+ TaskAssignment assignment = entry.getValue();
+ if (assignment.sessionId() == sessionId) {
+ unassignedTasks.add(assignment.task());
+ iter.remove();
+ }
+ }
+ }
+
+ private static class TaskAssignment {
+ private final long sessionId;
+ private final Task<byte[]> task;
+
+ public TaskAssignment(long sessionId, Task<byte[]> task) {
+ this.sessionId = sessionId;
+ this.task = task;
+ }
+
+ public long sessionId() {
+ return sessionId;
+ }
+
+ public Task<byte[]> task() {
+ return task;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("sessionId", sessionId)
+ .add("task", task)
+ .toString();
+ }
+ }
+}
\ No newline at end of file