Re-register listeners after a Copycat client recovers from a network partition
Change-Id: I1b2669011e1f229f8b6edc836eb89c39ea371a97
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index f1a44de..2e6e620 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -56,7 +56,6 @@
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
-
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -85,6 +84,11 @@
@Override
public CompletableFuture<AtomixConsistentMap> open() {
return super.open().thenApply(result -> {
+ client.onStateChange(state -> {
+ if (state == CopycatClient.State.CONNECTED && isListening()) {
+ client.submit(new Listen());
+ }
+ });
client.onEvent(CHANGE_SUBJECT, this::handleEvent);
return result;
});
@@ -308,4 +312,8 @@
public Collection<Consumer<Status>> statusChangeListeners() {
return ImmutableSet.copyOf(statusChangeListeners);
}
+
+ private boolean isListening() {
+ return !mapEventListeners.isEmpty();
+ }
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index 95f9d17..56511a1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -94,6 +94,11 @@
@Override
public CompletableFuture<AtomixLeaderElector> open() {
return super.open().thenApply(result -> {
+ client.onStateChange(state -> {
+ if (state == CopycatClient.State.CONNECTED && isListening()) {
+ client.submit(new Listen());
+ }
+ });
client.onEvent(CHANGE_SUBJECT, this::handleEvent);
return result;
});
@@ -183,4 +188,8 @@
public Collection<Consumer<Status>> statusChangeListeners() {
return ImmutableSet.copyOf(statusChangeListeners);
}
+
+ private boolean isListening() {
+ return !leadershipChangeListeners.isEmpty();
+ }
}