Code clean up in ConsistentMap and LeaderElector resources

Change-Id: I1834188393f19e37394c32047538e6027522a13d
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 14c24a6..0f0968c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
-import io.atomix.catalyst.util.Listener;
 import io.atomix.copycat.client.CopycatClient;
 import io.atomix.resource.Resource;
 import io.atomix.resource.ResourceTypeInfo;
@@ -32,6 +31,21 @@
 
 import org.onlab.util.Match;
 import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
@@ -76,48 +90,48 @@
 
     @Override
     public CompletableFuture<Boolean> isEmpty() {
-        return submit(new AtomixConsistentMapCommands.IsEmpty());
+        return submit(new IsEmpty());
     }
 
     @Override
     public CompletableFuture<Integer> size() {
-        return submit(new AtomixConsistentMapCommands.Size());
+        return submit(new Size());
     }
 
     @Override
     public CompletableFuture<Boolean> containsKey(String key) {
-        return submit(new AtomixConsistentMapCommands.ContainsKey(key));
+        return submit(new ContainsKey(key));
     }
 
     @Override
     public CompletableFuture<Boolean> containsValue(byte[] value) {
-        return submit(new AtomixConsistentMapCommands.ContainsValue(value));
+        return submit(new ContainsValue(value));
     }
 
     @Override
     public CompletableFuture<Versioned<byte[]>> get(String key) {
-        return submit(new AtomixConsistentMapCommands.Get(key));
+        return submit(new Get(key));
     }
 
     @Override
     public CompletableFuture<Set<String>> keySet() {
-        return submit(new AtomixConsistentMapCommands.KeySet());
+        return submit(new KeySet());
     }
 
     @Override
     public CompletableFuture<Collection<Versioned<byte[]>>> values() {
-        return submit(new AtomixConsistentMapCommands.Values());
+        return submit(new Values());
     }
 
     @Override
     public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
-        return submit(new AtomixConsistentMapCommands.EntrySet());
+        return submit(new EntrySet());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
-        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
+        return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
@@ -125,7 +139,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
-        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
+        return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.newValue());
     }
@@ -133,14 +147,14 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
-        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NULL, Match.ANY))
+        return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> remove(String key) {
-        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ANY))
+        return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
@@ -148,7 +162,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> remove(String key, byte[] value) {
-        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
+        return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.updated());
     }
@@ -156,7 +170,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> remove(String key, long version) {
-        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
+        return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.updated());
     }
@@ -164,7 +178,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
-        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
+        return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
@@ -172,7 +186,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
-        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
+        return submit(new UpdateAndGet(key,
                 newValue,
                 Match.ifValue(oldValue),
                 Match.ANY))
@@ -183,7 +197,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
-        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
+        return submit(new UpdateAndGet(key,
                 newValue,
                 Match.ANY,
                 Match.ifValue(oldVersion)))
@@ -193,7 +207,7 @@
 
     @Override
     public CompletableFuture<Void> clear() {
-        return submit(new AtomixConsistentMapCommands.Clear())
+        return submit(new Clear())
                 .whenComplete((r, e) -> throwIfLocked(r))
                 .thenApply(v -> null);
     }
@@ -224,7 +238,7 @@
             }
             Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
             Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
-            return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
+            return submit(new UpdateAndGet(key,
                     computedValue.get(),
                     valueMatch,
                     versionMatch))
@@ -235,18 +249,18 @@
 
     @Override
     public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
-        if (!mapEventListeners.isEmpty()) {
+        if (mapEventListeners.isEmpty()) {
+            return submit(new Listen()).thenRun(() -> mapEventListeners.add(listener));
+        } else {
             mapEventListeners.add(listener);
             return CompletableFuture.completedFuture(null);
         }
-        mapEventListeners.add(listener);
-        return submit(new AtomixConsistentMapCommands.Listen()).thenApply(v -> null);
     }
 
     @Override
     public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
         if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
-            return submit(new AtomixConsistentMapCommands.Unlisten()).thenApply(v -> null);
+            return submit(new Unlisten()).thenApply(v -> null);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -259,45 +273,17 @@
 
     @Override
     public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
-        return submit(new AtomixConsistentMapCommands.TransactionPrepare(transaction))
-                .thenApply(v -> v == PrepareResult.OK);
+        return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
     }
 
     @Override
     public CompletableFuture<Void> commit(TransactionId transactionId) {
-        return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId))
-                .thenApply(v -> null);
+        return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
     }
 
     @Override
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId))
+        return submit(new TransactionRollback(transactionId))
                 .thenApply(v -> null);
     }
-
-    /**
-     * Change listener context.
-     */
-    private final class ChangeListener implements Listener<MapEvent<String, byte[]>> {
-        private final MapEventListener<String, byte[]> listener;
-
-        private ChangeListener(MapEventListener<String, byte[]> listener) {
-            this.listener = listener;
-        }
-
-        @Override
-        public void accept(MapEvent<String, byte[]> event) {
-            listener.event(event);
-        }
-
-        @Override
-        public void close() {
-            synchronized (AtomixConsistentMap.this) {
-                mapEventListeners.remove(listener);
-                if (mapEventListeners.isEmpty()) {
-                    submit(new AtomixConsistentMapCommands.Unlisten());
-                }
-            }
-        }
-    }
 }
\ No newline at end of file