Implement base Atomix primitive to handle status change events
Change-Id: Ia7541ddf562be5c1a05954afc4f7a5fa90289800
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 8116461..e404ddb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -26,7 +26,6 @@
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
-import org.onosproject.mastership.MastershipTermService;
import org.onosproject.net.Annotations;
import org.onosproject.net.AnnotationsUtil;
import org.onosproject.net.DefaultAnnotations;
@@ -164,9 +163,6 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected MastershipService mastershipService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected MastershipTermService termService;
-
private static final Timestamp DEFAULT_TIMESTAMP = new MastershipBasedTimestamp(0, 0);
protected static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounter.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounter.java
index dde25cb..f224df5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounter.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounter.java
@@ -24,19 +24,15 @@
/**
* Atomix atomic counter.
*/
-public class AtomixAtomicCounter implements AsyncAtomicCounter {
+public class AtomixAtomicCounter extends AtomixPrimitive implements AsyncAtomicCounter {
private final io.atomix.core.counter.AsyncAtomicCounter atomixCounter;
public AtomixAtomicCounter(io.atomix.core.counter.AsyncAtomicCounter atomixCounter) {
+ super(atomixCounter);
this.atomixCounter = atomixCounter;
}
@Override
- public String name() {
- return atomixCounter.name();
- }
-
- @Override
public CompletableFuture<Long> incrementAndGet() {
return adaptFuture(atomixCounter.incrementAndGet());
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounterMap.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounterMap.java
index e60f893..dba4473 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounterMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounterMap.java
@@ -24,19 +24,15 @@
/**
* Atomix atomic counter map.
*/
-public class AtomixAtomicCounterMap<K> implements AsyncAtomicCounterMap<K> {
+public class AtomixAtomicCounterMap<K> extends AtomixPrimitive implements AsyncAtomicCounterMap<K> {
private final io.atomix.core.map.AsyncAtomicCounterMap<K> atomixMap;
public AtomixAtomicCounterMap(io.atomix.core.map.AsyncAtomicCounterMap<K> atomixMap) {
+ super(atomixMap);
this.atomixMap = atomixMap;
}
@Override
- public String name() {
- return atomixMap.name();
- }
-
- @Override
public CompletableFuture<Long> incrementAndGet(K key) {
return adaptMapFuture(atomixMap.incrementAndGet(key));
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicIdGenerator.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicIdGenerator.java
index e762511..850ff49 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicIdGenerator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicIdGenerator.java
@@ -24,19 +24,15 @@
/**
* Atomix atomic ID generator.
*/
-public class AtomixAtomicIdGenerator implements AsyncAtomicIdGenerator {
+public class AtomixAtomicIdGenerator extends AtomixPrimitive implements AsyncAtomicIdGenerator {
private final io.atomix.core.idgenerator.AsyncAtomicIdGenerator atomixIdGenerator;
public AtomixAtomicIdGenerator(io.atomix.core.idgenerator.AsyncAtomicIdGenerator atomixIdGenerator) {
+ super(atomixIdGenerator);
this.atomixIdGenerator = atomixIdGenerator;
}
@Override
- public String name() {
- return atomixIdGenerator.name();
- }
-
- @Override
public CompletableFuture<Long> nextId() {
return adaptFuture(atomixIdGenerator.nextId());
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicValue.java
index 48c524a..8e0987d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicValue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicValue.java
@@ -28,21 +28,17 @@
/**
* Atomix atomic value.
*/
-public class AtomixAtomicValue<V> implements AsyncAtomicValue<V> {
+public class AtomixAtomicValue<V> extends AtomixPrimitive implements AsyncAtomicValue<V> {
private final io.atomix.core.value.AsyncAtomicValue<V> atomixValue;
private final Map<AtomicValueEventListener<V>, io.atomix.core.value.AtomicValueEventListener<V>> listenerMap =
Maps.newIdentityHashMap();
public AtomixAtomicValue(io.atomix.core.value.AsyncAtomicValue<V> atomixValue) {
+ super(atomixValue);
this.atomixValue = atomixValue;
}
@Override
- public String name() {
- return atomixValue.name();
- }
-
- @Override
public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
return adaptFuture(atomixValue.compareAndSet(expect, update));
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMap.java
index 6241855..33883fc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMap.java
@@ -42,21 +42,17 @@
/**
* Atomix consistent map.
*/
-public class AtomixConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
+public class AtomixConsistentMap<K, V> extends AtomixPrimitive implements AsyncConsistentMap<K, V> {
private final io.atomix.core.map.AsyncAtomicMap<K, V> atomixMap;
private final Map<MapEventListener<K, V>, io.atomix.core.map.AtomicMapEventListener<K, V>> listenerMap =
Maps.newIdentityHashMap();
public AtomixConsistentMap(io.atomix.core.map.AsyncAtomicMap<K, V> atomixMap) {
+ super(atomixMap);
this.atomixMap = atomixMap;
}
@Override
- public String name() {
- return atomixMap.name();
- }
-
- @Override
public CompletableFuture<Integer> size() {
return atomixMap.size();
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMultimap.java
index 96d7e8c..956d9b2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMultimap.java
@@ -37,21 +37,17 @@
/**
* Atomix consistent map.
*/
-public class AtomixConsistentMultimap<K, V> implements AsyncConsistentMultimap<K, V> {
+public class AtomixConsistentMultimap<K, V> extends AtomixPrimitive implements AsyncConsistentMultimap<K, V> {
private final io.atomix.core.multimap.AsyncAtomicMultimap<K, V> atomixMultimap;
private final Map<MultimapEventListener<K, V>, io.atomix.core.multimap.AtomicMultimapEventListener<K, V>>
listenerMap = Maps.newIdentityHashMap();
public AtomixConsistentMultimap(io.atomix.core.multimap.AsyncAtomicMultimap<K, V> atomixMultimap) {
+ super(atomixMultimap);
this.atomixMultimap = atomixMultimap;
}
@Override
- public String name() {
- return atomixMultimap.name();
- }
-
- @Override
public CompletableFuture<Integer> size() {
return adaptMapFuture(atomixMultimap.size());
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentTreeMap.java
index cfee0e7..caaae88 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentTreeMap.java
@@ -45,21 +45,17 @@
/**
* Atomix consistent tree map.
*/
-public class AtomixConsistentTreeMap<V> implements AsyncConsistentTreeMap<V> {
+public class AtomixConsistentTreeMap<V> extends AtomixPrimitive implements AsyncConsistentTreeMap<V> {
private final io.atomix.core.map.AsyncAtomicNavigableMap<String, V> atomixTreeMap;
private final Map<MapEventListener<String, V>, io.atomix.core.map.AtomicMapEventListener<String, V>> listenerMap =
Maps.newIdentityHashMap();
public AtomixConsistentTreeMap(io.atomix.core.map.AsyncAtomicNavigableMap<String, V> atomixTreeMap) {
+ super(atomixTreeMap);
this.atomixTreeMap = atomixTreeMap;
}
@Override
- public String name() {
- return atomixTreeMap.name();
- }
-
- @Override
public CompletableFuture<Integer> size() {
return atomixTreeMap.size();
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedLock.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedLock.java
index 038d995..b0d79f2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedLock.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedLock.java
@@ -27,19 +27,15 @@
/**
* Atomix distributed lock.
*/
-public class AtomixDistributedLock implements AsyncDistributedLock {
+public class AtomixDistributedLock extends AtomixPrimitive implements AsyncDistributedLock {
private final io.atomix.core.lock.AsyncAtomicLock atomixLock;
public AtomixDistributedLock(io.atomix.core.lock.AsyncAtomicLock atomixLock) {
+ super(atomixLock);
this.atomixLock = atomixLock;
}
@Override
- public String name() {
- return atomixLock.name();
- }
-
- @Override
public CompletableFuture<Version> lock() {
return adaptFuture(atomixLock.lock()).thenApply(this::toVersion);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedSet.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedSet.java
index 5483c74..6d22b73 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedSet.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedSet.java
@@ -31,21 +31,17 @@
/**
* Atomix distributed set.
*/
-public class AtomixDistributedSet<E> implements AsyncDistributedSet<E> {
+public class AtomixDistributedSet<E> extends AtomixPrimitive implements AsyncDistributedSet<E> {
private final io.atomix.core.set.AsyncDistributedSet<E> atomixSet;
private final Map<SetEventListener<E>, io.atomix.core.collection.CollectionEventListener<E>> listenerMap =
Maps.newIdentityHashMap();
public AtomixDistributedSet(io.atomix.core.set.AsyncDistributedSet<E> atomixSet) {
+ super(atomixSet);
this.atomixSet = atomixSet;
}
@Override
- public String name() {
- return atomixSet.name();
- }
-
- @Override
public CompletableFuture<Integer> size() {
return adaptFuture(atomixSet.size());
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedTopic.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedTopic.java
index a6d6536..87de225 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedTopic.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedTopic.java
@@ -33,21 +33,17 @@
*
* @param <T> topic message type.
*/
-public class AtomixDistributedTopic<T> implements Topic<T> {
+public class AtomixDistributedTopic<T> extends AtomixPrimitive implements Topic<T> {
private final AsyncAtomicValue<T> atomixValue;
private final Map<Consumer<T>, AtomicValueEventListener<T>> callbacks = Maps.newIdentityHashMap();
AtomixDistributedTopic(AsyncAtomicValue<T> atomixValue) {
+ super(atomixValue);
this.atomixValue = atomixValue;
}
@Override
- public String name() {
- return atomixValue.name();
- }
-
- @Override
public Type primitiveType() {
return DistributedPrimitive.Type.TOPIC;
}
@@ -75,9 +71,4 @@
}
return CompletableFuture.completedFuture(null);
}
-
- @Override
- public CompletableFuture<Void> destroy() {
- return atomixValue.close();
- }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDocumentTree.java
index 54d9034..a8577ff 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDocumentTree.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDocumentTree.java
@@ -37,21 +37,17 @@
/**
* Atomix document tree.
*/
-public class AtomixDocumentTree<V> implements AsyncDocumentTree<V> {
+public class AtomixDocumentTree<V> extends AtomixPrimitive implements AsyncDocumentTree<V> {
private final io.atomix.core.tree.AsyncAtomicDocumentTree<V> atomixTree;
private final Map<DocumentTreeListener<V>, io.atomix.core.tree.DocumentTreeEventListener<V>> listenerMap =
Maps.newIdentityHashMap();
public AtomixDocumentTree(io.atomix.core.tree.AsyncAtomicDocumentTree<V> atomixTree) {
+ super(atomixTree);
this.atomixTree = atomixTree;
}
@Override
- public String name() {
- return atomixTree.name();
- }
-
- @Override
public DocumentPath root() {
return toOnosPath(atomixTree.root());
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixLeaderElector.java
index e92eafb..1f2ab62 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixLeaderElector.java
@@ -33,23 +33,19 @@
/**
* Atomix leader elector.
*/
-public class AtomixLeaderElector implements AsyncLeaderElector {
+public class AtomixLeaderElector extends AtomixPrimitive implements AsyncLeaderElector {
private final io.atomix.core.election.AsyncLeaderElector<NodeId> atomixElector;
private final NodeId localNodeId;
private final Map<Consumer<Change<Leadership>>, LeadershipEventListener<NodeId>> listenerMap =
Maps.newIdentityHashMap();
public AtomixLeaderElector(io.atomix.core.election.AsyncLeaderElector<NodeId> atomixElector, NodeId localNodeId) {
+ super(atomixElector);
this.atomixElector = atomixElector;
this.localNodeId = localNodeId;
}
@Override
- public String name() {
- return atomixElector.name();
- }
-
- @Override
public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
return adaptFuture(atomixElector.run(topic, nodeId)).thenApply(leadership -> toLeadership(topic, leadership));
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixPrimitive.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixPrimitive.java
new file mode 100644
index 0000000..5b6b8b1
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixPrimitive.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.atomix.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Maps;
+import io.atomix.primitive.PrimitiveState;
+import org.onosproject.store.service.DistributedPrimitive;
+
+/**
+ * Atomix distributed primitive.
+ */
+public abstract class AtomixPrimitive implements DistributedPrimitive {
+ private final io.atomix.primitive.AsyncPrimitive atomixPrimitive;
+ private final Map<Consumer<Status>, Consumer<PrimitiveState>> listenerMap = Maps.newIdentityHashMap();
+
+ protected AtomixPrimitive(io.atomix.primitive.AsyncPrimitive atomixPrimitive) {
+ this.atomixPrimitive = atomixPrimitive;
+ }
+
+ @Override
+ public String name() {
+ return atomixPrimitive.name();
+ }
+
+ private Status toStatus(PrimitiveState state) {
+ switch (state) {
+ case CONNECTED:
+ return Status.ACTIVE;
+ case SUSPENDED:
+ case EXPIRED:
+ return Status.SUSPENDED;
+ case CLOSED:
+ return Status.INACTIVE;
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ @Override
+ public synchronized void addStatusChangeListener(Consumer<Status> listener) {
+ Consumer<PrimitiveState> atomixListener = state -> listener.accept(toStatus(state));
+ listenerMap.put(listener, atomixListener);
+ atomixPrimitive.addStateChangeListener(atomixListener);
+ }
+
+ @Override
+ public synchronized void removeStatusChangeListener(Consumer<Status> listener) {
+ Consumer<PrimitiveState> atomixListener = listenerMap.remove(listener);
+ if (atomixListener != null) {
+ atomixPrimitive.removeStateChangeListener(atomixListener);
+ }
+ }
+
+ @Override
+ public Collection<Consumer<Status>> statusChangeListeners() {
+ return listenerMap.keySet();
+ }
+
+ @Override
+ public CompletableFuture<Void> destroy() {
+ return atomixPrimitive.close();
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixWorkQueue.java
index 89ffd4c..8f6daf5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixWorkQueue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixWorkQueue.java
@@ -30,19 +30,15 @@
/**
* Atomix work queue.
*/
-public class AtomixWorkQueue<E> implements WorkQueue<E> {
+public class AtomixWorkQueue<E> extends AtomixPrimitive implements WorkQueue<E> {
private final io.atomix.core.workqueue.AsyncWorkQueue<E> atomixWorkQueue;
public AtomixWorkQueue(io.atomix.core.workqueue.AsyncWorkQueue<E> atomixWorkQueue) {
+ super(atomixWorkQueue);
this.atomixWorkQueue = atomixWorkQueue;
}
@Override
- public String name() {
- return atomixWorkQueue.name();
- }
-
- @Override
public CompletableFuture<Void> addMultiple(Collection<E> items) {
return adaptFuture(atomixWorkQueue.addMultiple(items));
}