Using latest atomix release candidate + Updates to CopycatTransport

Change-Id: I960af428ff733ee7467024811e3b3470e951ecb7
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 73bc8b7..df44b48 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -43,8 +43,10 @@
 /**
  * Distributed resource providing the {@link AsyncConsistentMap} primitive.
  */
-@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
-public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
+@ResourceTypeInfo(id = -151,
+                  stateMachine = AtomixConsistentMapState.class,
+                  typeResolver = AtomixConsistentMapCommands.TypeResolver.class)
+public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
     implements AsyncConsistentMap<String, byte[]> {
 
     private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 72e52c2..9c9b019 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -26,6 +26,7 @@
 import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
 import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
 import io.atomix.resource.ResourceStateMachine;
+import io.atomix.resource.ResourceType;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -69,12 +70,17 @@
  * State Machine for {@link AtomixConsistentMap} resource.
  */
 public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
+
     private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
     private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
     private final Set<String> preparedKeys = Sets.newHashSet();
     private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
     private AtomicLong versionCounter = new AtomicLong(0);
 
+    public AtomixConsistentMapState() {
+        super(new ResourceType(AtomixConsistentMap.class));
+    }
+
     @Override
     public void snapshot(SnapshotWriter writer) {
         writer.writeLong(versionCounter.get());
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index 4e15a81..b7e48fa 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -25,7 +25,6 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
-import org.onlab.util.SharedExecutors;
 import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.Change;
@@ -36,8 +35,10 @@
 /**
  * Distributed resource providing the {@link AsyncLeaderElector} primitive.
  */
-@ResourceTypeInfo(id = -152, stateMachine = AtomixLeaderElectorState.class)
-public class AtomixLeaderElector extends Resource<AtomixLeaderElector, Resource.Options>
+@ResourceTypeInfo(id = -152,
+                  stateMachine = AtomixLeaderElectorState.class,
+                  typeResolver = AtomixLeaderElectorCommands.TypeResolver.class)
+public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
     implements AsyncLeaderElector {
     private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
             Sets.newConcurrentHashSet();
@@ -62,8 +63,7 @@
     }
 
     private void handleEvent(Change<Leadership> change) {
-        SharedExecutors.getSingleThreadExecutor().execute(() ->
-            leadershipChangeListeners.forEach(l -> l.accept(change)));
+        leadershipChangeListeners.forEach(l -> l.accept(change));
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index e8abfac..9b58226 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -24,6 +24,7 @@
 import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
 import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
 import io.atomix.resource.ResourceStateMachine;
+import io.atomix.resource.ResourceType;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -72,6 +73,10 @@
                                                            ElectionState.class,
                                                            Registration.class);
 
+    public AtomixLeaderElectorState() {
+        super(new ResourceType(AtomixLeaderElector.class));
+    }
+
     @Override
     protected void configure(StateMachineExecutor executor) {
         // Notification
@@ -261,7 +266,7 @@
     }
 
     private void onSessionEnd(Session session) {
-        Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session);
+        Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
         if (listener != null) {
             listener.close();
         }