[ONOS-6594] Upgrade to Atomix 2.0.0

Change-Id: I6534bca1c8570b4e017f682953b876da29146675
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueService.java
new file mode 100644
index 0000000..6458ec8
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueService.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import io.atomix.protocols.raft.service.AbstractRaftService;
+import io.atomix.protocols.raft.service.Commit;
+import io.atomix.protocols.raft.service.RaftServiceExecutor;
+import io.atomix.protocols.raft.session.RaftSession;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Complete;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.Take;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueEvents.TASK_AVAILABLE;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.ADD;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.COMPLETE;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.REGISTER;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.STATS;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.TAKE;
+import static org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations.UNREGISTER;
+
+/**
+ * State machine for {@link AtomixWorkQueue} resource.
+ */
+public class AtomixWorkQueueService extends AbstractRaftService {
+
+    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+            .register(KryoNamespaces.BASIC)
+            .register(AtomixWorkQueueOperations.NAMESPACE)
+            .register(AtomixWorkQueueEvents.NAMESPACE)
+            .register(TaskAssignment.class)
+            .register(new HashMap().keySet().getClass())
+            .register(ArrayDeque.class)
+            .build());
+
+    private final AtomicLong totalCompleted = new AtomicLong(0);
+
+    private Queue<Task<byte[]>> unassignedTasks = Queues.newArrayDeque();
+    private Map<String, TaskAssignment> assignments = Maps.newHashMap();
+    private Map<Long, RaftSession> registeredWorkers = Maps.newHashMap();
+
+    @Override
+    public void snapshot(SnapshotWriter writer) {
+        writer.writeObject(Sets.newHashSet(registeredWorkers.keySet()), SERIALIZER::encode);
+        writer.writeObject(assignments, SERIALIZER::encode);
+        writer.writeObject(unassignedTasks, SERIALIZER::encode);
+        writer.writeLong(totalCompleted.get());
+    }
+
+    @Override
+    public void install(SnapshotReader reader) {
+        registeredWorkers = Maps.newHashMap();
+        for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
+            registeredWorkers.put(sessionId, getSessions().getSession(sessionId));
+        }
+        assignments = reader.readObject(SERIALIZER::decode);
+        unassignedTasks = reader.readObject(SERIALIZER::decode);
+        totalCompleted.set(reader.readLong());
+    }
+
+    @Override
+    protected void configure(RaftServiceExecutor executor) {
+        executor.register(STATS, this::stats, SERIALIZER::encode);
+        executor.register(REGISTER, this::register);
+        executor.register(UNREGISTER, this::unregister);
+        executor.register(ADD, SERIALIZER::decode, this::add);
+        executor.register(TAKE, SERIALIZER::decode, this::take, SERIALIZER::encode);
+        executor.register(COMPLETE, SERIALIZER::decode, this::complete);
+        executor.register(CLEAR, this::clear);
+    }
+
+    protected WorkQueueStats stats(Commit<Void> commit) {
+        return WorkQueueStats.builder()
+                .withTotalCompleted(totalCompleted.get())
+                .withTotalPending(unassignedTasks.size())
+                .withTotalInProgress(assignments.size())
+                .build();
+    }
+
+    protected void clear(Commit<Void> commit) {
+        unassignedTasks.clear();
+        assignments.clear();
+        registeredWorkers.clear();
+        totalCompleted.set(0);
+    }
+
+    protected void register(Commit<Void> commit) {
+        registeredWorkers.put(commit.session().sessionId().id(), commit.session());
+    }
+
+    protected void unregister(Commit<Void> commit) {
+        registeredWorkers.remove(commit.session().sessionId().id());
+    }
+
+    protected void add(Commit<? extends Add> commit) {
+        Collection<byte[]> items = commit.value().items();
+
+        AtomicInteger itemIndex = new AtomicInteger(0);
+        items.forEach(item -> {
+            String taskId = String.format("%d:%d:%d", commit.session().sessionId().id(),
+                    commit.index(),
+                    itemIndex.getAndIncrement());
+            unassignedTasks.add(new Task<>(taskId, item));
+        });
+
+        // Send an event to all sessions that have expressed interest in task processing
+        // and are not actively processing a task.
+        registeredWorkers.values().forEach(session -> session.publish(TASK_AVAILABLE));
+        // FIXME: This generates a lot of event traffic.
+    }
+
+    protected Collection<Task<byte[]>> take(Commit<? extends Take> commit) {
+        try {
+            if (unassignedTasks.isEmpty()) {
+                return ImmutableList.of();
+            }
+            long sessionId = commit.session().sessionId().id();
+            int maxTasks = commit.value().maxTasks();
+            return IntStream.range(0, Math.min(maxTasks, unassignedTasks.size()))
+                    .mapToObj(i -> {
+                        Task<byte[]> task = unassignedTasks.poll();
+                        String taskId = task.taskId();
+                        TaskAssignment assignment = new TaskAssignment(sessionId, task);
+
+                        // bookkeeping
+                        assignments.put(taskId, assignment);
+
+                        return task;
+                    })
+                    .collect(Collectors.toCollection(ArrayList::new));
+        } catch (Exception e) {
+            getLogger().warn("State machine update failed", e);
+            throw Throwables.propagate(e);
+        }
+    }
+
+    protected void complete(Commit<? extends Complete> commit) {
+        long sessionId = commit.session().sessionId().id();
+        try {
+            commit.value().taskIds().forEach(taskId -> {
+                TaskAssignment assignment = assignments.get(taskId);
+                if (assignment != null && assignment.sessionId() == sessionId) {
+                    assignments.remove(taskId);
+                    // bookkeeping
+                    totalCompleted.incrementAndGet();
+                }
+            });
+        } catch (Exception e) {
+            getLogger().warn("State machine update failed", e);
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    public void onExpire(RaftSession session) {
+        evictWorker(session.sessionId().id());
+    }
+
+    @Override
+    public void onClose(RaftSession session) {
+        evictWorker(session.sessionId().id());
+    }
+
+    private void evictWorker(long sessionId) {
+        registeredWorkers.remove(sessionId);
+
+        // TODO: Maintain an index of tasks by session for efficient access.
+        Iterator<Map.Entry<String, TaskAssignment>> iter = assignments.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry<String, TaskAssignment> entry = iter.next();
+            TaskAssignment assignment = entry.getValue();
+            if (assignment.sessionId() == sessionId) {
+                unassignedTasks.add(assignment.task());
+                iter.remove();
+            }
+        }
+    }
+
+    private static class TaskAssignment {
+        private final long sessionId;
+        private final Task<byte[]> task;
+
+        public TaskAssignment(long sessionId, Task<byte[]> task) {
+            this.sessionId = sessionId;
+            this.task = task;
+        }
+
+        public long sessionId() {
+            return sessionId;
+        }
+
+        public Task<byte[]> task() {
+            return task;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("sessionId", sessionId)
+                    .add("task", task)
+                    .toString();
+        }
+    }
+}
\ No newline at end of file