State machine implementations for various distributed primitives based on latest Copycat APIs

Change-Id: I622cc196aa1cdf072a5a0b100a5ffaaf71b07900
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
new file mode 100644
index 0000000..100941f
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -0,0 +1,626 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.onosproject.store.service.MapEvent.Type.INSERT;
+import static org.onosproject.store.service.MapEvent.Type.REMOVE;
+import static org.onosproject.store.service.MapEvent.Type.UPDATE;
+import io.atomix.copycat.client.session.Session;
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.SessionListener;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.onlab.util.CountDownCompleter;
+import org.onlab.util.Match;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * State Machine for {@link AtomixConsistentMap} resource.
+ */
+public class AtomixConsistentMapState extends ResourceStateMachine implements
+        SessionListener, Snapshottable {
+    private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
+    private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
+    private final Set<String> preparedKeys = Sets.newHashSet();
+    private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps
+            .newHashMap();
+    private AtomicLong versionCounter = new AtomicLong(0);
+
+    @Override
+    public void snapshot(SnapshotWriter writer) {
+        writer.writeLong(versionCounter.get());
+    }
+
+    @Override
+    public void install(SnapshotReader reader) {
+        versionCounter = new AtomicLong(reader.readLong());
+    }
+
+    @Override
+    protected void configure(StateMachineExecutor executor) {
+        // Listeners
+        executor.register(AtomixConsistentMapCommands.Listen.class,
+                this::listen);
+        executor.register(AtomixConsistentMapCommands.Unlisten.class,
+                this::unlisten);
+        // Queries
+        executor.register(AtomixConsistentMapCommands.ContainsKey.class,
+                this::containsKey);
+        executor.register(AtomixConsistentMapCommands.ContainsValue.class,
+                this::containsValue);
+        executor.register(AtomixConsistentMapCommands.EntrySet.class,
+                this::entrySet);
+        executor.register(AtomixConsistentMapCommands.Get.class, this::get);
+        executor.register(AtomixConsistentMapCommands.IsEmpty.class,
+                this::isEmpty);
+        executor.register(AtomixConsistentMapCommands.KeySet.class,
+                this::keySet);
+        executor.register(AtomixConsistentMapCommands.Size.class, this::size);
+        executor.register(AtomixConsistentMapCommands.Values.class,
+                this::values);
+        // Commands
+        executor.register(AtomixConsistentMapCommands.UpdateAndGet.class,
+                this::updateAndGet);
+        executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
+        executor.register(AtomixConsistentMapCommands.TransactionPrepare.class,
+                this::prepare);
+        executor.register(AtomixConsistentMapCommands.TransactionCommit.class,
+                this::commit);
+        executor.register(
+                AtomixConsistentMapCommands.TransactionRollback.class,
+                this::rollback);
+    }
+
+    @Override
+    public void delete() {
+        // Delete Listeners
+        listeners.values().forEach(Commit::close);
+        listeners.clear();
+
+        // Delete Map entries
+        mapEntries.values().forEach(MapEntryValue::discard);
+        mapEntries.clear();
+    }
+
+    /**
+     * Handles a contains key commit.
+     *
+     * @param commit
+     *            containsKey commit
+     * @return {@code true} if map contains key
+     */
+    protected boolean containsKey(
+            Commit<? extends AtomixConsistentMapCommands.ContainsKey> commit) {
+        try {
+            return toVersioned(mapEntries.get(commit.operation().key())) != null;
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a contains value commit.
+     *
+     * @param commit
+     *            containsValue commit
+     * @return {@code true} if map contains value
+     */
+    protected boolean containsValue(
+            Commit<? extends AtomixConsistentMapCommands.ContainsValue> commit) {
+        try {
+            Match<byte[]> valueMatch = Match
+                    .ifValue(commit.operation().value());
+            return mapEntries.values().stream()
+                    .anyMatch(value -> valueMatch.matches(value.value()));
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a get commit.
+     *
+     * @param commit
+     *            get commit
+     * @return value mapped to key
+     */
+    protected Versioned<byte[]> get(
+            Commit<? extends AtomixConsistentMapCommands.Get> commit) {
+        try {
+            return toVersioned(mapEntries.get(commit.operation().key()));
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a count commit.
+     *
+     * @param commit
+     *            size commit
+     * @return number of entries in map
+     */
+    protected int size(Commit<? extends AtomixConsistentMapCommands.Size> commit) {
+        try {
+            return mapEntries.size();
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles an is empty commit.
+     *
+     * @param commit
+     *            isEmpty commit
+     * @return {@code true} if map is empty
+     */
+    protected boolean isEmpty(
+            Commit<? extends AtomixConsistentMapCommands.IsEmpty> commit) {
+        try {
+            return mapEntries.isEmpty();
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a keySet commit.
+     *
+     * @param commit
+     *            keySet commit
+     * @return set of keys in map
+     */
+    protected Set<String> keySet(
+            Commit<? extends AtomixConsistentMapCommands.KeySet> commit) {
+        try {
+            return mapEntries.keySet();
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a values commit.
+     *
+     * @param commit
+     *            values commit
+     * @return collection of values in map
+     */
+    protected Collection<Versioned<byte[]>> values(
+            Commit<? extends AtomixConsistentMapCommands.Values> commit) {
+        try {
+            return mapEntries.values().stream().map(this::toVersioned)
+                    .collect(Collectors.toList());
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a entry set commit.
+     *
+     * @param commit
+     *            entrySet commit
+     * @return set of map entries
+     */
+    protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(
+            Commit<? extends AtomixConsistentMapCommands.EntrySet> commit) {
+        try {
+            return mapEntries
+                    .entrySet()
+                    .stream()
+                    .map(e -> Maps.immutableEntry(e.getKey(),
+                            toVersioned(e.getValue())))
+                    .collect(Collectors.toSet());
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a update and get commit.
+     *
+     * @param commit
+     *            updateAndGet commit
+     * @return update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> updateAndGet(
+            Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
+        MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
+        String key = commit.operation().key();
+        MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
+        Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
+
+        if (updateStatus != MapEntryUpdateResult.Status.OK) {
+            commit.close();
+            return new MapEntryUpdateResult<>(updateStatus, "", key,
+                    oldMapValue, oldMapValue);
+        }
+
+        byte[] newValue = commit.operation().value();
+        long newVersion = versionCounter.incrementAndGet();
+        Versioned<byte[]> newMapValue = newValue == null ? null
+                : new Versioned<>(newValue, newVersion);
+
+        MapEvent.Type updateType = newValue == null ? REMOVE
+                : oldCommitValue == null ? INSERT : UPDATE;
+        if (updateType == REMOVE || updateType == UPDATE) {
+            mapEntries.remove(key);
+            oldCommitValue.discard();
+        }
+        if (updateType == INSERT || updateType == UPDATE) {
+            mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
+        }
+        notify(new MapEvent<>("", key, newMapValue, oldMapValue));
+        return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
+                newMapValue);
+    }
+
+    /**
+     * Handles a clear commit.
+     *
+     * @param commit
+     *            clear commit
+     * @return clear result
+     */
+    protected MapEntryUpdateResult.Status clear(
+            Commit<? extends AtomixConsistentMapCommands.Clear> commit) {
+        try {
+            Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
+                    .entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<String, MapEntryValue> entry = iterator.next();
+                String key = entry.getKey();
+                MapEntryValue value = entry.getValue();
+                Versioned<byte[]> removedValue = new Versioned<>(value.value(),
+                        value.version());
+                notify(new MapEvent<>("", key, null, removedValue));
+                value.discard();
+                iterator.remove();
+            }
+            return MapEntryUpdateResult.Status.OK;
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a listen commit.
+     *
+     * @param commit
+     *            listen commit
+     */
+    protected void listen(
+            Commit<? extends AtomixConsistentMapCommands.Listen> commit) {
+        Long sessionId = commit.session().id();
+        listeners.put(sessionId, commit);
+        commit.session()
+                .onStateChange(
+                        state -> {
+                            if (state == Session.State.CLOSED
+                                    || state == Session.State.EXPIRED) {
+                                Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
+                                        .remove(sessionId);
+                                if (listener != null) {
+                                    listener.close();
+                                }
+                            }
+                        });
+    }
+
+    /**
+     * Handles an unlisten commit.
+     *
+     * @param commit
+     *            unlisten commit
+     */
+    protected void unlisten(
+            Commit<? extends AtomixConsistentMapCommands.Unlisten> commit) {
+        try {
+            Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
+                    .remove(commit.session());
+            if (listener != null) {
+                listener.close();
+            }
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Triggers a change event.
+     *
+     * @param value
+     *            map event
+     */
+    private void notify(MapEvent<String, byte[]> value) {
+        listeners.values().forEach(
+                commit -> commit.session().publish("change", value));
+    }
+
+    /**
+     * Handles an prepare commit.
+     *
+     * @param commit
+     *            transaction prepare commit
+     * @return prepare result
+     */
+    protected PrepareResult prepare(
+            Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
+        boolean ok = false;
+        try {
+            TransactionalMapUpdate<String, byte[]> transactionUpdate = commit
+                    .operation().transactionUpdate();
+            for (MapUpdate<String, byte[]> update : transactionUpdate.batch()) {
+                String key = update.key();
+                if (preparedKeys.contains(key)) {
+                    return PrepareResult.CONCURRENT_TRANSACTION;
+                }
+                MapEntryValue existingValue = mapEntries.get(key);
+                if (existingValue == null) {
+                    if (update.currentValue() != null) {
+                        return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
+                    }
+                } else {
+                    if (existingValue.version() != update.currentVersion()) {
+                        return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
+                    }
+                }
+            }
+            // No violations detected. Add to pendingTranctions and mark
+            // modified keys as
+            // currently locked to updates.
+            pendingTransactions.put(transactionUpdate.transactionId(), commit);
+            transactionUpdate.batch().forEach(u -> preparedKeys.add(u.key()));
+            ok = true;
+            return PrepareResult.OK;
+        } finally {
+            if (!ok) {
+                commit.close();
+            }
+        }
+    }
+
+    /**
+     * Handles an commit commit (ha!).
+     *
+     * @param commit transaction commit commit
+     * @return commit result
+     */
+    protected CommitResult commit(
+            Commit<? extends AtomixConsistentMapCommands.TransactionCommit> commit) {
+        TransactionId transactionId = commit.operation().transactionId();
+        try {
+            Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
+                    .remove(transactionId);
+            if (prepareCommit == null) {
+                return CommitResult.UNKNOWN_TRANSACTION_ID;
+            }
+            TransactionalMapUpdate<String, byte[]> transactionalUpdate = prepareCommit
+                    .operation().transactionUpdate();
+            long totalReferencesToCommit = transactionalUpdate
+                    .batch()
+                    .stream()
+                    .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+                    .count();
+            CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer =
+                    new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
+            for (MapUpdate<String, byte[]> update : transactionalUpdate.batch()) {
+                String key = update.key();
+                MapEntryValue previousValue = mapEntries.remove(key);
+                MapEntryValue newValue = null;
+                checkState(preparedKeys.remove(key), "key is not prepared");
+                if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
+                    newValue = new TransactionalCommit(key,
+                            versionCounter.incrementAndGet(), completer);
+                }
+                mapEntries.put(key, newValue);
+                // Notify map listeners
+                notify(new MapEvent<>("", key, toVersioned(newValue),
+                        toVersioned(previousValue)));
+            }
+            return CommitResult.OK;
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles an rollback commit (ha!).
+     *
+     * @param commit transaction rollback commit
+     * @return rollback result
+     */
+    protected RollbackResult rollback(
+            Commit<? extends AtomixConsistentMapCommands.TransactionRollback> commit) {
+        TransactionId transactionId = commit.operation().transactionId();
+        try {
+            Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
+                    .remove(transactionId);
+            if (prepareCommit == null) {
+                return RollbackResult.UNKNOWN_TRANSACTION_ID;
+            } else {
+                prepareCommit.operation().transactionUpdate().batch()
+                        .forEach(u -> preparedKeys.remove(u.key()));
+                prepareCommit.close();
+                return RollbackResult.OK;
+            }
+        } finally {
+            commit.close();
+        }
+    }
+
+    private MapEntryUpdateResult.Status validate(
+            AtomixConsistentMapCommands.UpdateAndGet update) {
+        MapEntryValue existingValue = mapEntries.get(update.key());
+        if (existingValue == null && update.value() == null) {
+            return MapEntryUpdateResult.Status.NOOP;
+        }
+        if (preparedKeys.contains(update.key())) {
+            return MapEntryUpdateResult.Status.WRITE_LOCK;
+        }
+        byte[] existingRawValue = existingValue == null ? null : existingValue
+                .value();
+        Long existingVersion = existingValue == null ? null : existingValue
+                .version();
+        return update.valueMatch().matches(existingRawValue)
+                && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
+                : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
+    }
+
+    private Versioned<byte[]> toVersioned(MapEntryValue value) {
+        return value == null ? null : new Versioned<>(value.value(),
+                value.version());
+    }
+
+    @Override
+    public void register(Session session) {
+    }
+
+    @Override
+    public void unregister(Session session) {
+        closeListener(session.id());
+    }
+
+    @Override
+    public void expire(Session session) {
+        closeListener(session.id());
+    }
+
+    @Override
+    public void close(Session session) {
+        closeListener(session.id());
+    }
+
+    private void closeListener(Long sessionId) {
+        Commit<? extends AtomixConsistentMapCommands.Listen> commit = listeners
+                .remove(sessionId);
+        if (commit != null) {
+            commit.close();
+        }
+    }
+
+    /**
+     * Interface implemented by map values.
+     */
+    private interface MapEntryValue {
+        /**
+         * Returns the raw {@code byte[]}.
+         *
+         * @return raw value
+         */
+        byte[] value();
+
+        /**
+         * Returns the version of the value.
+         *
+         * @return version
+         */
+        long version();
+
+        /**
+         * Discards the value by invoke appropriate clean up actions.
+         */
+        void discard();
+    }
+
+    /**
+     * A {@code MapEntryValue} that is derived from a non-transactional update
+     * i.e. via any standard map update operation.
+     */
+    private class NonTransactionalCommit implements MapEntryValue {
+        private final long version;
+        private final Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit;
+
+        public NonTransactionalCommit(
+                long version,
+                Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
+            this.version = version;
+            this.commit = commit;
+        }
+
+        @Override
+        public byte[] value() {
+            return commit.operation().value();
+        }
+
+        @Override
+        public long version() {
+            return version;
+        }
+
+        @Override
+        public void discard() {
+            commit.close();
+        }
+    }
+
+    /**
+     * A {@code MapEntryValue} that is derived from updates submitted via a
+     * transaction.
+     */
+    private class TransactionalCommit implements MapEntryValue {
+        private final String key;
+        private final long version;
+        private final CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer;
+
+        public TransactionalCommit(
+                String key,
+                long version,
+                CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> commit) {
+            this.key = key;
+            this.version = version;
+            this.completer = commit;
+        }
+
+        @Override
+        public byte[] value() {
+            TransactionalMapUpdate<String, byte[]> update = completer.object()
+                    .operation().transactionUpdate();
+            return update.valueForKey(key);
+        }
+
+        @Override
+        public long version() {
+            return version;
+        }
+
+        @Override
+        public void discard() {
+            completer.countDown();
+        }
+    }
+}