DatabaseManager: add periodic leader advertisements
Change-Id: I6e9244a06191fe0f2dd5eaed7e043e84d704bfcd
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index af3f6ac..dd0b379 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -1,5 +1,7 @@
package org.onlab.onos.store.service.impl;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
@@ -11,6 +13,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.Copycat;
@@ -105,6 +108,10 @@
private boolean autoAddMember = false;
+ private ScheduledExecutorService executor;
+
+ private volatile LeaderElectEvent myLeaderEvent = null;
+
@Activate
public void activate() throws InterruptedException, ExecutionException {
@@ -176,6 +183,11 @@
// of the Raft cluster.
if (copycat != null) {
copycat.start().get();
+
+ executor =
+ newSingleThreadScheduledExecutor(namedThreads("db-heartbeat-%d"));
+ executor.scheduleWithFixedDelay(new LeaderAdvertiser(), 5, 2, TimeUnit.SECONDS);
+
}
client.waitForLeader();
@@ -189,6 +201,9 @@
@Deactivate
public void deactivate() {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
clusterService.removeListener(clusterEventListener);
// TODO: ClusterCommunicationService must support more than one
// handler per message subject.
@@ -231,9 +246,9 @@
throw e;
} else {
log.debug("Failed to listTables. Will retry...", e);
- retries++;
}
}
+ retries++;
} while (true);
}
@@ -395,11 +410,14 @@
}
}
- private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
+ private final class LeaderAdvertiser implements Runnable {
+
@Override
- public void handle(LeaderElectEvent event) {
+ public void run() {
try {
- if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
+ LeaderElectEvent event = myLeaderEvent;
+ if (event != null) {
+ log.trace("Broadcasting RAFT_LEADER_ELECTION_EVENT: {}", event);
// This node just became the leader.
clusterCommunicator.broadcastIncludeSelf(
new ClusterMessage(
@@ -407,6 +425,33 @@
RAFT_LEADER_ELECTION_EVENT,
ClusterMessagingProtocol.SERIALIZER.encode(event)));
}
+ } catch (Exception e) {
+ log.debug("LeaderAdvertiser failed with exception", e);
+ }
+ }
+
+ }
+
+ private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
+ @Override
+ public void handle(LeaderElectEvent event) {
+ try {
+ log.debug("Received LeaderElectEvent: {}", event);
+ if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
+ log.debug("Broadcasting RAFT_LEADER_ELECTION_EVENT");
+ myLeaderEvent = event;
+ // This node just became the leader.
+ clusterCommunicator.broadcastIncludeSelf(
+ new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ RAFT_LEADER_ELECTION_EVENT,
+ ClusterMessagingProtocol.SERIALIZER.encode(event)));
+ } else {
+ if (myLeaderEvent != null) {
+ log.debug("This node is no longer the Leader");
+ }
+ myLeaderEvent = null;
+ }
} catch (IOException e) {
log.error("Failed to broadcast raft leadership change event", e);
}