Updates to ConsistentMap and LeaderElector state machines

Change-Id: I7734b253a56fef7300a8a094a3cfc8c1b45c2453
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index 0dad15a..834d8e9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import static org.slf4j.LoggerFactory.getLogger;
 import io.atomix.catalyst.serializer.Serializer;
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Transport;
@@ -35,6 +36,7 @@
 
 import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.PartitionInfo;
+import org.slf4j.Logger;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
@@ -44,6 +46,8 @@
  */
 public class StoragePartitionServer implements Managed<StoragePartitionServer> {
 
+    private final Logger log = getLogger(getClass());
+
     private static final int MAX_ENTRIES_PER_LOG_SEGMENT = 32768;
     private final StoragePartition partition;
     private final Address localAddress;
@@ -81,7 +85,13 @@
         } else {
             serverOpenFuture = CompletableFuture.completedFuture(null);
         }
-        return serverOpenFuture.thenApply(v -> null);
+        return serverOpenFuture.whenComplete((r, e) -> {
+            if (e == null) {
+                log.info("Successfully started server for partition {}", partition.getId());
+            } else {
+                log.info("Failed to start server for partition {}", partition.getId(), e);
+            }
+        }).thenApply(v -> null);
     }
 
     @Override
@@ -105,7 +115,6 @@
                 .withStorage(Storage.builder()
                          // FIXME: StorageLevel should be DISK
                         .withStorageLevel(StorageLevel.MEMORY)
-                        .withSerializer(serializer.clone())
                         .withDirectory(dataFolder)
                         .withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT)
                         .build())
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index bd9690c..73bc8b7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -17,12 +17,12 @@
 
 import io.atomix.catalyst.util.Listener;
 import io.atomix.copycat.client.CopycatClient;
-import io.atomix.resource.Consistency;
 import io.atomix.resource.Resource;
 import io.atomix.resource.ResourceTypeInfo;
 
 import java.util.Collection;
 import java.util.ConcurrentModificationException;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -49,7 +49,7 @@
 
     private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
 
-    private static final String CHANGE_SUBJECT = "change";
+    public static final String CHANGE_SUBJECT = "changeEvents";
 
     public AtomixConsistentMap(CopycatClient client, Resource.Options options) {
         super(client, options);
@@ -68,14 +68,8 @@
         });
     }
 
-    private void handleEvent(MapEvent<String, byte[]> event) {
-        mapEventListeners.forEach(listener -> listener.event(event));
-    }
-
-    @Override
-    public AtomixConsistentMap with(Consistency consistency) {
-        super.with(consistency);
-        return this;
+    private void handleEvent(List<MapEvent<String, byte[]>> events) {
+        events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index 458e5fb..913c3bc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -18,7 +18,9 @@
 import io.atomix.catalyst.buffer.BufferInput;
 import io.atomix.catalyst.buffer.BufferOutput;
 import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
 import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.serializer.SerializerRegistry;
 import io.atomix.catalyst.util.Assert;
 import io.atomix.copycat.client.Command;
 import io.atomix.copycat.client.Query;
@@ -514,4 +516,28 @@
                     .toString();
         }
     }
+
+    /**
+     * Map command type resolver.
+     */
+    public static class TypeResolver implements SerializableTypeResolver {
+        @Override
+        public void resolve(SerializerRegistry registry) {
+            registry.register(ContainsKey.class, -761);
+            registry.register(ContainsValue.class, -762);
+            registry.register(Get.class, -763);
+            registry.register(EntrySet.class, -764);
+            registry.register(Values.class, -765);
+            registry.register(KeySet.class, -766);
+            registry.register(Clear.class, -767);
+            registry.register(IsEmpty.class, -768);
+            registry.register(Size.class, -769);
+            registry.register(Listen.class, -770);
+            registry.register(Unlisten.class, -771);
+            registry.register(TransactionPrepare.class, -772);
+            registry.register(TransactionCommit.class, -773);
+            registry.register(TransactionRollback.class, -774);
+            registry.register(UpdateAndGet.class, -775);
+        }
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index e580fed..72e52c2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -30,6 +30,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
@@ -39,11 +40,26 @@
 import org.onlab.util.Match;
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
@@ -52,13 +68,11 @@
 /**
  * State Machine for {@link AtomixConsistentMap} resource.
  */
-public class AtomixConsistentMapState extends ResourceStateMachine implements
-        SessionListener, Snapshottable {
+public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
     private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
     private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
     private final Set<String> preparedKeys = Sets.newHashSet();
-    private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps
-            .newHashMap();
+    private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
     private AtomicLong versionCounter = new AtomicLong(0);
 
     @Override
@@ -74,36 +88,23 @@
     @Override
     protected void configure(StateMachineExecutor executor) {
         // Listeners
-        executor.register(AtomixConsistentMapCommands.Listen.class,
-                this::listen);
-        executor.register(AtomixConsistentMapCommands.Unlisten.class,
-                this::unlisten);
+        executor.register(Listen.class, this::listen);
+        executor.register(Unlisten.class, this::unlisten);
         // Queries
-        executor.register(AtomixConsistentMapCommands.ContainsKey.class,
-                this::containsKey);
-        executor.register(AtomixConsistentMapCommands.ContainsValue.class,
-                this::containsValue);
-        executor.register(AtomixConsistentMapCommands.EntrySet.class,
-                this::entrySet);
-        executor.register(AtomixConsistentMapCommands.Get.class, this::get);
-        executor.register(AtomixConsistentMapCommands.IsEmpty.class,
-                this::isEmpty);
-        executor.register(AtomixConsistentMapCommands.KeySet.class,
-                this::keySet);
-        executor.register(AtomixConsistentMapCommands.Size.class, this::size);
-        executor.register(AtomixConsistentMapCommands.Values.class,
-                this::values);
+        executor.register(ContainsKey.class, this::containsKey);
+        executor.register(ContainsValue.class, this::containsValue);
+        executor.register(EntrySet.class, this::entrySet);
+        executor.register(Get.class, this::get);
+        executor.register(IsEmpty.class, this::isEmpty);
+        executor.register(KeySet.class, this::keySet);
+        executor.register(Size.class, this::size);
+        executor.register(Values.class, this::values);
         // Commands
-        executor.register(AtomixConsistentMapCommands.UpdateAndGet.class,
-                this::updateAndGet);
+        executor.register(UpdateAndGet.class, this::updateAndGet);
         executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
-        executor.register(AtomixConsistentMapCommands.TransactionPrepare.class,
-                this::prepare);
-        executor.register(AtomixConsistentMapCommands.TransactionCommit.class,
-                this::commit);
-        executor.register(
-                AtomixConsistentMapCommands.TransactionRollback.class,
-                this::rollback);
+        executor.register(TransactionPrepare.class, this::prepare);
+        executor.register(TransactionCommit.class, this::commit);
+        executor.register(TransactionRollback.class, this::rollback);
     }
 
     @Override
@@ -120,12 +121,10 @@
     /**
      * Handles a contains key commit.
      *
-     * @param commit
-     *            containsKey commit
+     * @param commit containsKey commit
      * @return {@code true} if map contains key
      */
-    protected boolean containsKey(
-            Commit<? extends AtomixConsistentMapCommands.ContainsKey> commit) {
+    protected boolean containsKey(Commit<? extends ContainsKey> commit) {
         try {
             return toVersioned(mapEntries.get(commit.operation().key())) != null;
         } finally {
@@ -136,12 +135,10 @@
     /**
      * Handles a contains value commit.
      *
-     * @param commit
-     *            containsValue commit
+     * @param commit containsValue commit
      * @return {@code true} if map contains value
      */
-    protected boolean containsValue(
-            Commit<? extends AtomixConsistentMapCommands.ContainsValue> commit) {
+    protected boolean containsValue(Commit<? extends ContainsValue> commit) {
         try {
             Match<byte[]> valueMatch = Match
                     .ifValue(commit.operation().value());
@@ -159,8 +156,7 @@
      *            get commit
      * @return value mapped to key
      */
-    protected Versioned<byte[]> get(
-            Commit<? extends AtomixConsistentMapCommands.Get> commit) {
+    protected Versioned<byte[]> get(Commit<? extends Get> commit) {
         try {
             return toVersioned(mapEntries.get(commit.operation().key()));
         } finally {
@@ -171,11 +167,10 @@
     /**
      * Handles a count commit.
      *
-     * @param commit
-     *            size commit
+     * @param commit size commit
      * @return number of entries in map
      */
-    protected int size(Commit<? extends AtomixConsistentMapCommands.Size> commit) {
+    protected int size(Commit<? extends Size> commit) {
         try {
             return mapEntries.size();
         } finally {
@@ -186,12 +181,10 @@
     /**
      * Handles an is empty commit.
      *
-     * @param commit
-     *            isEmpty commit
+     * @param commit isEmpty commit
      * @return {@code true} if map is empty
      */
-    protected boolean isEmpty(
-            Commit<? extends AtomixConsistentMapCommands.IsEmpty> commit) {
+    protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
         try {
             return mapEntries.isEmpty();
         } finally {
@@ -202,14 +195,12 @@
     /**
      * Handles a keySet commit.
      *
-     * @param commit
-     *            keySet commit
+     * @param commit keySet commit
      * @return set of keys in map
      */
-    protected Set<String> keySet(
-            Commit<? extends AtomixConsistentMapCommands.KeySet> commit) {
+    protected Set<String> keySet(Commit<? extends KeySet> commit) {
         try {
-            return mapEntries.keySet();
+            return mapEntries.keySet().stream().collect(Collectors.toSet());
         } finally {
             commit.close();
         }
@@ -218,15 +209,12 @@
     /**
      * Handles a values commit.
      *
-     * @param commit
-     *            values commit
+     * @param commit values commit
      * @return collection of values in map
      */
-    protected Collection<Versioned<byte[]>> values(
-            Commit<? extends AtomixConsistentMapCommands.Values> commit) {
+    protected Collection<Versioned<byte[]>> values(Commit<? extends Values> commit) {
         try {
-            return mapEntries.values().stream().map(this::toVersioned)
-                    .collect(Collectors.toList());
+            return mapEntries.values().stream().map(this::toVersioned).collect(Collectors.toList());
         } finally {
             commit.close();
         }
@@ -239,8 +227,7 @@
      *            entrySet commit
      * @return set of map entries
      */
-    protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(
-            Commit<? extends AtomixConsistentMapCommands.EntrySet> commit) {
+    protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<? extends EntrySet> commit) {
         try {
             return mapEntries
                     .entrySet()
@@ -256,12 +243,10 @@
     /**
      * Handles a update and get commit.
      *
-     * @param commit
-     *            updateAndGet commit
+     * @param commit updateAndGet commit
      * @return update result
      */
-    protected MapEntryUpdateResult<String, byte[]> updateAndGet(
-            Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
+    protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
         MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
         String key = commit.operation().key();
         MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
@@ -286,8 +271,10 @@
         }
         if (updateType == INSERT || updateType == UPDATE) {
             mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
+        } else {
+            commit.close();
         }
-        notify(new MapEvent<>("", key, newMapValue, oldMapValue));
+        publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
         return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
                 newMapValue);
     }
@@ -295,12 +282,10 @@
     /**
      * Handles a clear commit.
      *
-     * @param commit
-     *            clear commit
+     * @param commit clear commit
      * @return clear result
      */
-    protected MapEntryUpdateResult.Status clear(
-            Commit<? extends AtomixConsistentMapCommands.Clear> commit) {
+    protected MapEntryUpdateResult.Status clear(Commit<? extends Clear> commit) {
         try {
             Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
                     .entrySet().iterator();
@@ -310,7 +295,7 @@
                 MapEntryValue value = entry.getValue();
                 Versioned<byte[]> removedValue = new Versioned<>(value.value(),
                         value.version());
-                notify(new MapEvent<>("", key, null, removedValue));
+                publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
                 value.discard();
                 iterator.remove();
             }
@@ -323,11 +308,9 @@
     /**
      * Handles a listen commit.
      *
-     * @param commit
-     *            listen commit
+     * @param commit listen commit
      */
-    protected void listen(
-            Commit<? extends AtomixConsistentMapCommands.Listen> commit) {
+    protected void listen(Commit<? extends Listen> commit) {
         Long sessionId = commit.session().id();
         listeners.put(sessionId, commit);
         commit.session()
@@ -335,8 +318,7 @@
                         state -> {
                             if (state == Session.State.CLOSED
                                     || state == Session.State.EXPIRED) {
-                                Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
-                                        .remove(sessionId);
+                                Commit<? extends Listen> listener = listeners.remove(sessionId);
                                 if (listener != null) {
                                     listener.close();
                                 }
@@ -347,14 +329,12 @@
     /**
      * Handles an unlisten commit.
      *
-     * @param commit
-     *            unlisten commit
+     * @param commit unlisten commit
      */
     protected void unlisten(
-            Commit<? extends AtomixConsistentMapCommands.Unlisten> commit) {
+            Commit<? extends Unlisten> commit) {
         try {
-            Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
-                    .remove(commit.session());
+            Commit<? extends Listen> listener = listeners.remove(commit.session());
             if (listener != null) {
                 listener.close();
             }
@@ -364,25 +344,12 @@
     }
 
     /**
-     * Triggers a change event.
-     *
-     * @param value
-     *            map event
-     */
-    private void notify(MapEvent<String, byte[]> value) {
-        listeners.values().forEach(
-                commit -> commit.session().publish("change", value));
-    }
-
-    /**
      * Handles an prepare commit.
      *
-     * @param commit
-     *            transaction prepare commit
+     * @param commit transaction prepare commit
      * @return prepare result
      */
-    protected PrepareResult prepare(
-            Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
+    protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
         boolean ok = false;
         try {
             MapTransaction<String, byte[]> transaction = commit.operation().transaction();
@@ -403,8 +370,7 @@
                 }
             }
             // No violations detected. Add to pendingTranctions and mark
-            // modified keys as
-            // currently locked to updates.
+            // modified keys as locked for updates.
             pendingTransactions.put(transaction.transactionId(), commit);
             transaction.updates().forEach(u -> preparedKeys.add(u.key()));
             ok = true;
@@ -422,11 +388,10 @@
      * @param commit transaction commit commit
      * @return commit result
      */
-    protected CommitResult commit(
-            Commit<? extends AtomixConsistentMapCommands.TransactionCommit> commit) {
+    protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
         TransactionId transactionId = commit.operation().transactionId();
         try {
-            Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
+            Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
                     .remove(transactionId);
             if (prepareCommit == null) {
                 return CommitResult.UNKNOWN_TRANSACTION_ID;
@@ -437,8 +402,9 @@
                     .stream()
                     .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
                     .count();
-            CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer =
+            CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
                     new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
+            List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
             for (MapUpdate<String, byte[]> update : transaction.updates()) {
                 String key = update.key();
                 MapEntryValue previousValue = mapEntries.remove(key);
@@ -448,11 +414,15 @@
                     newValue = new TransactionalCommit(key,
                             versionCounter.incrementAndGet(), completer);
                 }
-                mapEntries.put(key, newValue);
-                // Notify map listeners
-                notify(new MapEvent<>("", key, toVersioned(newValue),
-                        toVersioned(previousValue)));
+                eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
+                if (newValue != null) {
+                    mapEntries.put(key, newValue);
+                }
+                if (previousValue != null) {
+                    previousValue.discard();
+                }
             }
+            publish(eventsToPublish);
             return CommitResult.OK;
         } finally {
             commit.close();
@@ -465,12 +435,10 @@
      * @param commit transaction rollback commit
      * @return rollback result
      */
-    protected RollbackResult rollback(
-            Commit<? extends AtomixConsistentMapCommands.TransactionRollback> commit) {
+    protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
         TransactionId transactionId = commit.operation().transactionId();
         try {
-            Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
-                    .remove(transactionId);
+            Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions.remove(transactionId);
             if (prepareCommit == null) {
                 return RollbackResult.UNKNOWN_TRANSACTION_ID;
             } else {
@@ -486,8 +454,14 @@
         }
     }
 
-    private MapEntryUpdateResult.Status validate(
-            AtomixConsistentMapCommands.UpdateAndGet update) {
+    /**
+     * Computes the update status that would result if the specified update were to applied to
+     * the state machine.
+     *
+     * @param update update
+     * @return status
+     */
+    private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
         MapEntryValue existingValue = mapEntries.get(update.key());
         if (existingValue == null && update.value() == null) {
             return MapEntryUpdateResult.Status.NOOP;
@@ -504,9 +478,22 @@
                 : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
     }
 
+    /**
+     * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
+     * @param value map entry value
+     * @return versioned instance
+     */
     private Versioned<byte[]> toVersioned(MapEntryValue value) {
-        return value == null ? null : new Versioned<>(value.value(),
-                value.version());
+        return value == null ? null : new Versioned<>(value.value(), value.version());
+    }
+
+    /**
+     * Publishes events to listeners.
+     *
+     * @param events list of map event to publish
+     */
+    private void publish(List<MapEvent<String, byte[]>> events) {
+        listeners.values().forEach(commit -> commit.session().publish(AtomixConsistentMap.CHANGE_SUBJECT, events));
     }
 
     @Override
@@ -529,8 +516,7 @@
     }
 
     private void closeListener(Long sessionId) {
-        Commit<? extends AtomixConsistentMapCommands.Listen> commit = listeners
-                .remove(sessionId);
+        Commit<? extends Listen> commit = listeners.remove(sessionId);
         if (commit != null) {
             commit.close();
         }
@@ -566,11 +552,9 @@
      */
     private class NonTransactionalCommit implements MapEntryValue {
         private final long version;
-        private final Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit;
+        private final Commit<? extends UpdateAndGet> commit;
 
-        public NonTransactionalCommit(
-                long version,
-                Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
+        public NonTransactionalCommit(long version, Commit<? extends UpdateAndGet> commit) {
             this.version = version;
             this.commit = commit;
         }
@@ -598,12 +582,12 @@
     private class TransactionalCommit implements MapEntryValue {
         private final String key;
         private final long version;
-        private final CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer;
+        private final CountDownCompleter<Commit<? extends TransactionPrepare>> completer;
 
         public TransactionalCommit(
                 String key,
                 long version,
-                CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> commit) {
+                CountDownCompleter<Commit<? extends TransactionPrepare>> commit) {
             this.key = key;
             this.version = version;
             this.completer = commit;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index b18d3da..4e15a81 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -17,7 +17,6 @@
 
 import io.atomix.catalyst.util.Listener;
 import io.atomix.copycat.client.CopycatClient;
-import io.atomix.resource.Consistency;
 import io.atomix.resource.Resource;
 import io.atomix.resource.ResourceTypeInfo;
 
@@ -26,6 +25,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
+import org.onlab.util.SharedExecutors;
 import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.Change;
@@ -37,8 +37,8 @@
  * Distributed resource providing the {@link AsyncLeaderElector} primitive.
  */
 @ResourceTypeInfo(id = -152, stateMachine = AtomixLeaderElectorState.class)
-public class AtomixLeaderElector
-    extends Resource<AtomixLeaderElector, Resource.Options> implements AsyncLeaderElector {
+public class AtomixLeaderElector extends Resource<AtomixLeaderElector, Resource.Options>
+    implements AsyncLeaderElector {
     private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
             Sets.newConcurrentHashSet();
 
@@ -62,13 +62,8 @@
     }
 
     private void handleEvent(Change<Leadership> change) {
-        leadershipChangeListeners.forEach(l -> l.accept(change));
-    }
-
-    @Override
-    public AtomixLeaderElector with(Consistency consistency) {
-        super.with(consistency);
-        return this;
+        SharedExecutors.getSingleThreadExecutor().execute(() ->
+            leadershipChangeListeners.forEach(l -> l.accept(change)));
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
index 16f1769..e7de783 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
@@ -21,13 +21,16 @@
 import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.NodeId;
 
+
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Strings;
 
 import io.atomix.catalyst.buffer.BufferInput;
 import io.atomix.catalyst.buffer.BufferOutput;
 import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
 import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.serializer.SerializerRegistry;
 import io.atomix.catalyst.util.Assert;
 import io.atomix.copycat.client.Command;
 import io.atomix.copycat.client.Query;
@@ -232,6 +235,18 @@
                     .add("nodeId", nodeId)
                     .toString();
         }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            buffer.writeString(topic);
+            buffer.writeString(nodeId.toString());
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            topic = buffer.readString();
+            nodeId = new NodeId(buffer.readString());
+        }
     }
 
     /**
@@ -263,6 +278,16 @@
                     .add("topic", topic)
                     .toString();
         }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            buffer.writeString(topic);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            topic = buffer.readString();
+        }
     }
 
     /**
@@ -306,5 +331,34 @@
                     .add("nodeId", nodeId)
                     .toString();
         }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            buffer.writeString(topic);
+            buffer.writeString(nodeId.toString());
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            topic = buffer.readString();
+            nodeId = new NodeId(buffer.readString());
+        }
+    }
+
+    /**
+     * Map command type resolver.
+     */
+    public static class TypeResolver implements SerializableTypeResolver {
+        @Override
+        public void resolve(SerializerRegistry registry) {
+            registry.register(Run.class, -861);
+            registry.register(Withdraw.class, -862);
+            registry.register(Anoint.class, -863);
+            registry.register(GetAllLeaderships.class, -864);
+            registry.register(GetElectedTopics.class, -865);
+            registry.register(GetLeadership.class, -866);
+            registry.register(Listen.class, -867);
+            registry.register(Unlisten.class, -868);
+        }
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index 2ae6e68..e8abfac 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -41,6 +41,14 @@
 import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.Change;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.Serializer;
 import org.slf4j.Logger;
@@ -59,7 +67,7 @@
     private final Logger log = getLogger(getClass());
     private Map<String, AtomicLong> termCounters = new HashMap<>();
     private Map<String, ElectionState> elections = new HashMap<>();
-    private final Map<Long, Commit<? extends AtomixLeaderElectorCommands.Listen>> listeners = new LinkedHashMap<>();
+    private final Map<Long, Commit<? extends Listen>> listeners = new LinkedHashMap<>();
     private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
                                                            ElectionState.class,
                                                            Registration.class);
@@ -67,16 +75,16 @@
     @Override
     protected void configure(StateMachineExecutor executor) {
         // Notification
-        executor.register(AtomixLeaderElectorCommands.Listen.class, this::listen);
-        executor.register(AtomixLeaderElectorCommands.Unlisten.class, this::unlisten);
+        executor.register(Listen.class, this::listen);
+        executor.register(Unlisten.class, this::unlisten);
         // Commands
-        executor.register(AtomixLeaderElectorCommands.Run.class, this::run);
-        executor.register(AtomixLeaderElectorCommands.Withdraw.class, this::withdraw);
-        executor.register(AtomixLeaderElectorCommands.Anoint.class, this::anoint);
+        executor.register(Run.class, this::run);
+        executor.register(Withdraw.class, this::withdraw);
+        executor.register(Anoint.class, this::anoint);
         // Queries
-        executor.register(AtomixLeaderElectorCommands.GetLeadership.class, this::leadership);
-        executor.register(AtomixLeaderElectorCommands.GetAllLeaderships.class, this::allLeaderships);
-        executor.register(AtomixLeaderElectorCommands.GetElectedTopics.class, this::electedTopics);
+        executor.register(GetLeadership.class, this::leadership);
+        executor.register(GetAllLeaderships.class, this::allLeaderships);
+        executor.register(GetElectedTopics.class, this::electedTopics);
     }
 
     private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
@@ -96,7 +104,7 @@
      *
      * @param commit listen commit
      */
-    public void listen(Commit<? extends AtomixLeaderElectorCommands.Listen> commit) {
+    public void listen(Commit<? extends Listen> commit) {
         if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
             commit.close();
         }
@@ -107,9 +115,9 @@
      *
      * @param commit unlisten commit
      */
-    public void unlisten(Commit<? extends AtomixLeaderElectorCommands.Unlisten> commit) {
+    public void unlisten(Commit<? extends Unlisten> commit) {
         try {
-            Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(commit.session().id());
+            Commit<? extends Listen> listener = listeners.remove(commit.session().id());
             if (listener != null) {
                 listener.close();
             }
@@ -123,7 +131,7 @@
      * @param commit commit entry
      * @return topic leader. If no previous leader existed this is the node that just entered the race.
      */
-    public Leadership run(Commit<? extends AtomixLeaderElectorCommands.Run> commit) {
+    public Leadership run(Commit<? extends Run> commit) {
         try {
             String topic = commit.operation().topic();
             Leadership oldLeadership = leadership(topic);
@@ -154,7 +162,7 @@
      * Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
      * @param commit withdraw commit
      */
-    public void withdraw(Commit<? extends AtomixLeaderElectorCommands.Withdraw> commit) {
+    public void withdraw(Commit<? extends Withdraw> commit) {
         try {
             String topic = commit.operation().topic();
             Leadership oldLeadership = leadership(topic);
@@ -174,7 +182,7 @@
      * @param commit anoint commit
      * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
      */
-    public boolean anoint(Commit<? extends AtomixLeaderElectorCommands.Anoint> commit) {
+    public boolean anoint(Commit<? extends Anoint> commit) {
         try {
             String topic = commit.operation().topic();
             Leadership oldLeadership = leadership(topic);
@@ -197,7 +205,7 @@
      * @param commit GetLeadership commit
      * @return leader
      */
-    public Leadership leadership(Commit<? extends AtomixLeaderElectorCommands.GetLeadership> commit) {
+    public Leadership leadership(Commit<? extends GetLeadership> commit) {
         String topic = commit.operation().topic();
         try {
             return leadership(topic);
@@ -211,7 +219,7 @@
      * @param commit commit entry
      * @return set of topics for which the node is the leader
      */
-    public Set<String> electedTopics(Commit<? extends AtomixLeaderElectorCommands.GetElectedTopics> commit) {
+    public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
         try {
             NodeId nodeId = commit.operation().nodeId();
             return Maps.filterEntries(elections, e -> {
@@ -228,8 +236,7 @@
      * @param commit GetAllLeaderships commit
      * @return topic to leader mapping
      */
-    public Map<String, Leadership> allLeaderships(
-            Commit<? extends AtomixLeaderElectorCommands.GetAllLeaderships> commit) {
+    public Map<String, Leadership> allLeaderships(Commit<? extends GetAllLeaderships> commit) {
         try {
             return Maps.transformEntries(elections, (k, v) -> leadership(k));
         } finally {