Implement base Atomix primitive to handle status change events

Change-Id: Ia7541ddf562be5c1a05954afc4f7a5fa90289800
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 8116461..e404ddb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -26,7 +26,6 @@
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.mastership.MastershipService;
-import org.onosproject.mastership.MastershipTermService;
 import org.onosproject.net.Annotations;
 import org.onosproject.net.AnnotationsUtil;
 import org.onosproject.net.DefaultAnnotations;
@@ -164,9 +163,6 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected MastershipService mastershipService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected MastershipTermService termService;
-
     private static final Timestamp DEFAULT_TIMESTAMP = new MastershipBasedTimestamp(0, 0);
 
     protected static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounter.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounter.java
index dde25cb..f224df5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounter.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounter.java
@@ -24,19 +24,15 @@
 /**
  * Atomix atomic counter.
  */
-public class AtomixAtomicCounter implements AsyncAtomicCounter {
+public class AtomixAtomicCounter extends AtomixPrimitive implements AsyncAtomicCounter {
     private final io.atomix.core.counter.AsyncAtomicCounter atomixCounter;
 
     public AtomixAtomicCounter(io.atomix.core.counter.AsyncAtomicCounter atomixCounter) {
+        super(atomixCounter);
         this.atomixCounter = atomixCounter;
     }
 
     @Override
-    public String name() {
-        return atomixCounter.name();
-    }
-
-    @Override
     public CompletableFuture<Long> incrementAndGet() {
         return adaptFuture(atomixCounter.incrementAndGet());
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounterMap.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounterMap.java
index e60f893..dba4473 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounterMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounterMap.java
@@ -24,19 +24,15 @@
 /**
  * Atomix atomic counter map.
  */
-public class AtomixAtomicCounterMap<K> implements AsyncAtomicCounterMap<K> {
+public class AtomixAtomicCounterMap<K> extends AtomixPrimitive implements AsyncAtomicCounterMap<K> {
     private final io.atomix.core.map.AsyncAtomicCounterMap<K> atomixMap;
 
     public AtomixAtomicCounterMap(io.atomix.core.map.AsyncAtomicCounterMap<K> atomixMap) {
+        super(atomixMap);
         this.atomixMap = atomixMap;
     }
 
     @Override
-    public String name() {
-        return atomixMap.name();
-    }
-
-    @Override
     public CompletableFuture<Long> incrementAndGet(K key) {
         return adaptMapFuture(atomixMap.incrementAndGet(key));
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicIdGenerator.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicIdGenerator.java
index e762511..850ff49 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicIdGenerator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicIdGenerator.java
@@ -24,19 +24,15 @@
 /**
  * Atomix atomic ID generator.
  */
-public class AtomixAtomicIdGenerator implements AsyncAtomicIdGenerator {
+public class AtomixAtomicIdGenerator extends AtomixPrimitive implements AsyncAtomicIdGenerator {
     private final io.atomix.core.idgenerator.AsyncAtomicIdGenerator atomixIdGenerator;
 
     public AtomixAtomicIdGenerator(io.atomix.core.idgenerator.AsyncAtomicIdGenerator atomixIdGenerator) {
+        super(atomixIdGenerator);
         this.atomixIdGenerator = atomixIdGenerator;
     }
 
     @Override
-    public String name() {
-        return atomixIdGenerator.name();
-    }
-
-    @Override
     public CompletableFuture<Long> nextId() {
         return adaptFuture(atomixIdGenerator.nextId());
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicValue.java
index 48c524a..8e0987d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicValue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicValue.java
@@ -28,21 +28,17 @@
 /**
  * Atomix atomic value.
  */
-public class AtomixAtomicValue<V> implements AsyncAtomicValue<V> {
+public class AtomixAtomicValue<V> extends AtomixPrimitive implements AsyncAtomicValue<V> {
     private final io.atomix.core.value.AsyncAtomicValue<V> atomixValue;
     private final Map<AtomicValueEventListener<V>, io.atomix.core.value.AtomicValueEventListener<V>> listenerMap =
         Maps.newIdentityHashMap();
 
     public AtomixAtomicValue(io.atomix.core.value.AsyncAtomicValue<V> atomixValue) {
+        super(atomixValue);
         this.atomixValue = atomixValue;
     }
 
     @Override
-    public String name() {
-        return atomixValue.name();
-    }
-
-    @Override
     public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
         return adaptFuture(atomixValue.compareAndSet(expect, update));
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMap.java
index 6241855..33883fc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMap.java
@@ -42,21 +42,17 @@
 /**
  * Atomix consistent map.
  */
-public class AtomixConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
+public class AtomixConsistentMap<K, V> extends AtomixPrimitive implements AsyncConsistentMap<K, V> {
     private final io.atomix.core.map.AsyncAtomicMap<K, V> atomixMap;
     private final Map<MapEventListener<K, V>, io.atomix.core.map.AtomicMapEventListener<K, V>> listenerMap =
         Maps.newIdentityHashMap();
 
     public AtomixConsistentMap(io.atomix.core.map.AsyncAtomicMap<K, V> atomixMap) {
+        super(atomixMap);
         this.atomixMap = atomixMap;
     }
 
     @Override
-    public String name() {
-        return atomixMap.name();
-    }
-
-    @Override
     public CompletableFuture<Integer> size() {
         return atomixMap.size();
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMultimap.java
index 96d7e8c..956d9b2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMultimap.java
@@ -37,21 +37,17 @@
 /**
  * Atomix consistent map.
  */
-public class AtomixConsistentMultimap<K, V> implements AsyncConsistentMultimap<K, V> {
+public class AtomixConsistentMultimap<K, V> extends AtomixPrimitive implements AsyncConsistentMultimap<K, V> {
     private final io.atomix.core.multimap.AsyncAtomicMultimap<K, V> atomixMultimap;
     private final Map<MultimapEventListener<K, V>, io.atomix.core.multimap.AtomicMultimapEventListener<K, V>>
         listenerMap = Maps.newIdentityHashMap();
 
     public AtomixConsistentMultimap(io.atomix.core.multimap.AsyncAtomicMultimap<K, V> atomixMultimap) {
+        super(atomixMultimap);
         this.atomixMultimap = atomixMultimap;
     }
 
     @Override
-    public String name() {
-        return atomixMultimap.name();
-    }
-
-    @Override
     public CompletableFuture<Integer> size() {
         return adaptMapFuture(atomixMultimap.size());
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentTreeMap.java
index cfee0e7..caaae88 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentTreeMap.java
@@ -45,21 +45,17 @@
 /**
  * Atomix consistent tree map.
  */
-public class AtomixConsistentTreeMap<V> implements AsyncConsistentTreeMap<V> {
+public class AtomixConsistentTreeMap<V> extends AtomixPrimitive implements AsyncConsistentTreeMap<V> {
     private final io.atomix.core.map.AsyncAtomicNavigableMap<String, V> atomixTreeMap;
     private final Map<MapEventListener<String, V>, io.atomix.core.map.AtomicMapEventListener<String, V>> listenerMap =
         Maps.newIdentityHashMap();
 
     public AtomixConsistentTreeMap(io.atomix.core.map.AsyncAtomicNavigableMap<String, V> atomixTreeMap) {
+        super(atomixTreeMap);
         this.atomixTreeMap = atomixTreeMap;
     }
 
     @Override
-    public String name() {
-        return atomixTreeMap.name();
-    }
-
-    @Override
     public CompletableFuture<Integer> size() {
         return atomixTreeMap.size();
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedLock.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedLock.java
index 038d995..b0d79f2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedLock.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedLock.java
@@ -27,19 +27,15 @@
 /**
  * Atomix distributed lock.
  */
-public class AtomixDistributedLock implements AsyncDistributedLock {
+public class AtomixDistributedLock extends AtomixPrimitive implements AsyncDistributedLock {
     private final io.atomix.core.lock.AsyncAtomicLock atomixLock;
 
     public AtomixDistributedLock(io.atomix.core.lock.AsyncAtomicLock atomixLock) {
+        super(atomixLock);
         this.atomixLock = atomixLock;
     }
 
     @Override
-    public String name() {
-        return atomixLock.name();
-    }
-
-    @Override
     public CompletableFuture<Version> lock() {
         return adaptFuture(atomixLock.lock()).thenApply(this::toVersion);
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedSet.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedSet.java
index 5483c74..6d22b73 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedSet.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedSet.java
@@ -31,21 +31,17 @@
 /**
  * Atomix distributed set.
  */
-public class AtomixDistributedSet<E> implements AsyncDistributedSet<E> {
+public class AtomixDistributedSet<E> extends AtomixPrimitive implements AsyncDistributedSet<E> {
     private final io.atomix.core.set.AsyncDistributedSet<E> atomixSet;
     private final Map<SetEventListener<E>, io.atomix.core.collection.CollectionEventListener<E>> listenerMap =
         Maps.newIdentityHashMap();
 
     public AtomixDistributedSet(io.atomix.core.set.AsyncDistributedSet<E> atomixSet) {
+        super(atomixSet);
         this.atomixSet = atomixSet;
     }
 
     @Override
-    public String name() {
-        return atomixSet.name();
-    }
-
-    @Override
     public CompletableFuture<Integer> size() {
         return adaptFuture(atomixSet.size());
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedTopic.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedTopic.java
index a6d6536..87de225 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedTopic.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedTopic.java
@@ -33,21 +33,17 @@
  *
  * @param <T> topic message type.
  */
-public class AtomixDistributedTopic<T> implements Topic<T> {
+public class AtomixDistributedTopic<T> extends AtomixPrimitive implements Topic<T> {
 
     private final AsyncAtomicValue<T> atomixValue;
     private final Map<Consumer<T>, AtomicValueEventListener<T>> callbacks = Maps.newIdentityHashMap();
 
     AtomixDistributedTopic(AsyncAtomicValue<T> atomixValue) {
+        super(atomixValue);
         this.atomixValue = atomixValue;
     }
 
     @Override
-    public String name() {
-        return atomixValue.name();
-    }
-
-    @Override
     public Type primitiveType() {
         return DistributedPrimitive.Type.TOPIC;
     }
@@ -75,9 +71,4 @@
         }
         return CompletableFuture.completedFuture(null);
     }
-
-    @Override
-    public CompletableFuture<Void> destroy() {
-        return atomixValue.close();
-    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDocumentTree.java
index 54d9034..a8577ff 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDocumentTree.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDocumentTree.java
@@ -37,21 +37,17 @@
 /**
  * Atomix document tree.
  */
-public class AtomixDocumentTree<V> implements AsyncDocumentTree<V> {
+public class AtomixDocumentTree<V> extends AtomixPrimitive implements AsyncDocumentTree<V> {
     private final io.atomix.core.tree.AsyncAtomicDocumentTree<V> atomixTree;
     private final Map<DocumentTreeListener<V>, io.atomix.core.tree.DocumentTreeEventListener<V>> listenerMap =
         Maps.newIdentityHashMap();
 
     public AtomixDocumentTree(io.atomix.core.tree.AsyncAtomicDocumentTree<V> atomixTree) {
+        super(atomixTree);
         this.atomixTree = atomixTree;
     }
 
     @Override
-    public String name() {
-        return atomixTree.name();
-    }
-
-    @Override
     public DocumentPath root() {
         return toOnosPath(atomixTree.root());
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixLeaderElector.java
index e92eafb..1f2ab62 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixLeaderElector.java
@@ -33,23 +33,19 @@
 /**
  * Atomix leader elector.
  */
-public class AtomixLeaderElector implements AsyncLeaderElector {
+public class AtomixLeaderElector extends AtomixPrimitive implements AsyncLeaderElector {
     private final io.atomix.core.election.AsyncLeaderElector<NodeId> atomixElector;
     private final NodeId localNodeId;
     private final Map<Consumer<Change<Leadership>>, LeadershipEventListener<NodeId>> listenerMap =
         Maps.newIdentityHashMap();
 
     public AtomixLeaderElector(io.atomix.core.election.AsyncLeaderElector<NodeId> atomixElector, NodeId localNodeId) {
+        super(atomixElector);
         this.atomixElector = atomixElector;
         this.localNodeId = localNodeId;
     }
 
     @Override
-    public String name() {
-        return atomixElector.name();
-    }
-
-    @Override
     public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
         return adaptFuture(atomixElector.run(topic, nodeId)).thenApply(leadership -> toLeadership(topic, leadership));
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixPrimitive.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixPrimitive.java
new file mode 100644
index 0000000..5b6b8b1
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixPrimitive.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.atomix.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Maps;
+import io.atomix.primitive.PrimitiveState;
+import org.onosproject.store.service.DistributedPrimitive;
+
+/**
+ * Atomix distributed primitive.
+ */
+public abstract class AtomixPrimitive implements DistributedPrimitive {
+    private final io.atomix.primitive.AsyncPrimitive atomixPrimitive;
+    private final Map<Consumer<Status>, Consumer<PrimitiveState>> listenerMap = Maps.newIdentityHashMap();
+
+    protected AtomixPrimitive(io.atomix.primitive.AsyncPrimitive atomixPrimitive) {
+        this.atomixPrimitive = atomixPrimitive;
+    }
+
+    @Override
+    public String name() {
+        return atomixPrimitive.name();
+    }
+
+    private Status toStatus(PrimitiveState state) {
+        switch (state) {
+            case CONNECTED:
+                return Status.ACTIVE;
+            case SUSPENDED:
+            case EXPIRED:
+                return Status.SUSPENDED;
+            case CLOSED:
+                return Status.INACTIVE;
+            default:
+                throw new AssertionError();
+        }
+    }
+
+    @Override
+    public synchronized void addStatusChangeListener(Consumer<Status> listener) {
+        Consumer<PrimitiveState> atomixListener = state -> listener.accept(toStatus(state));
+        listenerMap.put(listener, atomixListener);
+        atomixPrimitive.addStateChangeListener(atomixListener);
+    }
+
+    @Override
+    public synchronized void removeStatusChangeListener(Consumer<Status> listener) {
+        Consumer<PrimitiveState> atomixListener = listenerMap.remove(listener);
+        if (atomixListener != null) {
+            atomixPrimitive.removeStateChangeListener(atomixListener);
+        }
+    }
+
+    @Override
+    public Collection<Consumer<Status>> statusChangeListeners() {
+        return listenerMap.keySet();
+    }
+
+    @Override
+    public CompletableFuture<Void> destroy() {
+        return atomixPrimitive.close();
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixWorkQueue.java
index 89ffd4c..8f6daf5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixWorkQueue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixWorkQueue.java
@@ -30,19 +30,15 @@
 /**
  * Atomix work queue.
  */
-public class AtomixWorkQueue<E> implements WorkQueue<E> {
+public class AtomixWorkQueue<E> extends AtomixPrimitive implements WorkQueue<E> {
     private final io.atomix.core.workqueue.AsyncWorkQueue<E> atomixWorkQueue;
 
     public AtomixWorkQueue(io.atomix.core.workqueue.AsyncWorkQueue<E> atomixWorkQueue) {
+        super(atomixWorkQueue);
         this.atomixWorkQueue = atomixWorkQueue;
     }
 
     @Override
-    public String name() {
-        return atomixWorkQueue.name();
-    }
-
-    @Override
     public CompletableFuture<Void> addMultiple(Collection<E> items) {
         return adaptFuture(atomixWorkQueue.addMultiple(items));
     }