DatabaseManager: add periodic leader advertisements

Change-Id: I6e9244a06191fe0f2dd5eaed7e043e84d704bfcd
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index af3f6ac..dd0b379 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -1,5 +1,7 @@
 package org.onlab.onos.store.service.impl;
 
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.File;
@@ -11,6 +13,7 @@
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import net.kuujo.copycat.Copycat;
@@ -105,6 +108,10 @@
 
     private boolean autoAddMember = false;
 
+    private ScheduledExecutorService executor;
+
+    private volatile LeaderElectEvent myLeaderEvent = null;
+
     @Activate
     public void activate() throws InterruptedException, ExecutionException {
 
@@ -176,6 +183,11 @@
         // of the Raft cluster.
         if (copycat != null) {
             copycat.start().get();
+
+            executor =
+                    newSingleThreadScheduledExecutor(namedThreads("db-heartbeat-%d"));
+            executor.scheduleWithFixedDelay(new LeaderAdvertiser(), 5, 2, TimeUnit.SECONDS);
+
         }
 
         client.waitForLeader();
@@ -189,6 +201,9 @@
 
     @Deactivate
     public void deactivate() {
+        if (executor != null) {
+            executor.shutdownNow();
+        }
         clusterService.removeListener(clusterEventListener);
         // TODO: ClusterCommunicationService must support more than one
         // handler per message subject.
@@ -231,9 +246,9 @@
                     throw e;
                 } else {
                     log.debug("Failed to listTables. Will retry...", e);
-                    retries++;
                 }
             }
+            retries++;
         } while (true);
     }
 
@@ -395,11 +410,14 @@
         }
     }
 
-    private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
+    private final class LeaderAdvertiser implements Runnable {
+
         @Override
-        public void handle(LeaderElectEvent event) {
+        public void run() {
             try {
-                if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
+                LeaderElectEvent event = myLeaderEvent;
+                if (event != null) {
+                    log.trace("Broadcasting RAFT_LEADER_ELECTION_EVENT: {}", event);
                     // This node just became the leader.
                     clusterCommunicator.broadcastIncludeSelf(
                             new ClusterMessage(
@@ -407,6 +425,33 @@
                                     RAFT_LEADER_ELECTION_EVENT,
                                     ClusterMessagingProtocol.SERIALIZER.encode(event)));
                 }
+            } catch (Exception e) {
+                log.debug("LeaderAdvertiser failed with exception", e);
+            }
+        }
+
+    }
+
+    private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
+        @Override
+        public void handle(LeaderElectEvent event) {
+            try {
+                log.debug("Received LeaderElectEvent: {}", event);
+                if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
+                    log.debug("Broadcasting RAFT_LEADER_ELECTION_EVENT");
+                    myLeaderEvent = event;
+                    // This node just became the leader.
+                    clusterCommunicator.broadcastIncludeSelf(
+                            new ClusterMessage(
+                                    clusterService.getLocalNode().id(),
+                                    RAFT_LEADER_ELECTION_EVENT,
+                                    ClusterMessagingProtocol.SERIALIZER.encode(event)));
+                } else {
+                    if (myLeaderEvent != null) {
+                        log.debug("This node is no longer the Leader");
+                    }
+                    myLeaderEvent = null;
+                }
             } catch (IOException e) {
                 log.error("Failed to broadcast raft leadership change event", e);
             }