Using latest atomix release candidate + Updates to CopycatTransport
Change-Id: I960af428ff733ee7467024811e3b3470e951ecb7
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 73bc8b7..df44b48 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -43,8 +43,10 @@
/**
* Distributed resource providing the {@link AsyncConsistentMap} primitive.
*/
-@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
-public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
+@ResourceTypeInfo(id = -151,
+ stateMachine = AtomixConsistentMapState.class,
+ typeResolver = AtomixConsistentMapCommands.TypeResolver.class)
+public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
implements AsyncConsistentMap<String, byte[]> {
private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 72e52c2..9c9b019 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -26,6 +26,7 @@
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
+import io.atomix.resource.ResourceType;
import java.util.Collection;
import java.util.HashMap;
@@ -69,12 +70,17 @@
* State Machine for {@link AtomixConsistentMap} resource.
*/
public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
+
private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
private final Set<String> preparedKeys = Sets.newHashSet();
private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
private AtomicLong versionCounter = new AtomicLong(0);
+ public AtomixConsistentMapState() {
+ super(new ResourceType(AtomixConsistentMap.class));
+ }
+
@Override
public void snapshot(SnapshotWriter writer) {
writer.writeLong(versionCounter.get());
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index 4e15a81..b7e48fa 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -25,7 +25,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
-import org.onlab.util.SharedExecutors;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
@@ -36,8 +35,10 @@
/**
* Distributed resource providing the {@link AsyncLeaderElector} primitive.
*/
-@ResourceTypeInfo(id = -152, stateMachine = AtomixLeaderElectorState.class)
-public class AtomixLeaderElector extends Resource<AtomixLeaderElector, Resource.Options>
+@ResourceTypeInfo(id = -152,
+ stateMachine = AtomixLeaderElectorState.class,
+ typeResolver = AtomixLeaderElectorCommands.TypeResolver.class)
+public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
implements AsyncLeaderElector {
private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Sets.newConcurrentHashSet();
@@ -62,8 +63,7 @@
}
private void handleEvent(Change<Leadership> change) {
- SharedExecutors.getSingleThreadExecutor().execute(() ->
- leadershipChangeListeners.forEach(l -> l.accept(change)));
+ leadershipChangeListeners.forEach(l -> l.accept(change));
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index e8abfac..9b58226 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -24,6 +24,7 @@
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
+import io.atomix.resource.ResourceType;
import java.util.Arrays;
import java.util.HashMap;
@@ -72,6 +73,10 @@
ElectionState.class,
Registration.class);
+ public AtomixLeaderElectorState() {
+ super(new ResourceType(AtomixLeaderElector.class));
+ }
+
@Override
protected void configure(StateMachineExecutor executor) {
// Notification
@@ -261,7 +266,7 @@
}
private void onSessionEnd(Session session) {
- Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session);
+ Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
if (listener != null) {
listener.close();
}