Updates for SDN-IP:

 * Use the new Leadership Service instead of Distributed Lock to
   elect the SDN-IP Leader
 * Reimplement the SDN-IP Intent Synchronizer. In the new implementation
   the Point-to-Point Peer intents are also synchronized by and pushed
   only by the Leader (same as the Multipoint-to-SinglePoint Route intents)
 * Minor cleanups

Change-Id: I8e142781211a1d0f2d362875bc28fd05d843cd4b
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
index 85f9114..5c19c33 100644
--- a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
@@ -18,8 +18,6 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Collection;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -27,6 +25,11 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.LeadershipEvent;
+import org.onlab.onos.cluster.LeadershipEventListener;
+import org.onlab.onos.cluster.LeadershipService;
 import org.onlab.onos.core.ApplicationId;
 import org.onlab.onos.core.CoreService;
 import org.onlab.onos.net.host.HostService;
@@ -35,11 +38,8 @@
 import org.onlab.onos.sdnip.bgp.BgpSession;
 import org.onlab.onos.sdnip.bgp.BgpSessionManager;
 import org.onlab.onos.sdnip.config.SdnIpConfigReader;
-import org.onlab.onos.store.service.Lock;
-import org.onlab.onos.store.service.LockService;
-import org.slf4j.Logger;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
 
 /**
  * Component for the SDN-IP peering application.
@@ -65,55 +65,49 @@
     protected HostService hostService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LockService lockService;
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
 
     private IntentSynchronizer intentSynchronizer;
     private SdnIpConfigReader config;
     private PeerConnectivityManager peerConnectivity;
     private Router router;
     private BgpSessionManager bgpSessionManager;
-
-    private ExecutorService leaderElectionExecutor;
-    private Lock leaderLock;
-    private volatile boolean isShutdown = true;
+    private LeadershipEventListener leadershipEventListener =
+        new InnerLeadershipEventListener();
+    ApplicationId appId;
+    private ControllerNode localControllerNode;
 
     @Activate
     protected void activate() {
         log.info("SDN-IP started");
-        isShutdown = false;
 
-        ApplicationId appId = coreService.registerApplication(SDN_IP_APP);
+        appId = coreService.registerApplication(SDN_IP_APP);
         config = new SdnIpConfigReader();
         config.init();
 
+        localControllerNode = clusterService.getLocalNode();
+
         InterfaceService interfaceService =
             new HostToInterfaceAdaptor(hostService);
 
         intentSynchronizer = new IntentSynchronizer(appId, intentService);
         intentSynchronizer.start();
 
-        peerConnectivity = new PeerConnectivityManager(appId, config,
-                interfaceService, intentService);
+        peerConnectivity = new PeerConnectivityManager(appId,
+                                                       intentSynchronizer,
+                                                       config,
+                                                       interfaceService);
         peerConnectivity.start();
 
-        router = new Router(appId, intentSynchronizer, hostService, config,
-                            interfaceService);
+        router = new Router(appId, intentSynchronizer, config,
+                            interfaceService, hostService);
         router.start();
 
-        leaderLock = lockService.create(SDN_IP_APP + "/sdnIpLeaderLock");
-        leaderElectionExecutor = Executors.newSingleThreadExecutor(
-                new ThreadFactoryBuilder()
-                .setNameFormat("sdnip-leader-election-%d").build());
-        leaderElectionExecutor.execute(new Runnable() {
-            @Override
-            public void run() {
-                doLeaderElectionThread();
-            }
-        });
-
-        // Manually set the instance as the leader to allow testing
-        // TODO change this when we get a leader election
-        // intentSynchronizer.leaderChanged(true);
+        leadershipService.addListener(leadershipEventListener);
+        leadershipService.runForLeadership(appId.name());
 
         bgpSessionManager = new BgpSessionManager(router);
         // TODO: the local BGP listen port number should be configurable
@@ -124,17 +118,16 @@
 
     @Deactivate
     protected void deactivate() {
-        isShutdown = true;
 
         bgpSessionManager.stop();
         router.stop();
         peerConnectivity.stop();
         intentSynchronizer.stop();
 
-        // Stop the thread(s)
-        leaderElectionExecutor.shutdownNow();
+        leadershipService.withdraw(appId.name());
+        leadershipService.removeListener(leadershipEventListener);
 
-        log.info("Stopped");
+        log.info("SDN-IP Stopped");
     }
 
     @Override
@@ -162,63 +155,38 @@
     }
 
     /**
-     * Performs the leader election.
+     * A listener for Leadership Events.
      */
-    private void doLeaderElectionThread() {
+    private class InnerLeadershipEventListener
+        implements LeadershipEventListener {
 
-        //
-        // Try to acquire the lock and keep extending it until the instance
-        // is shutdown.
-        //
-        while (!isShutdown) {
-            log.debug("SDN-IP Leader Election begin");
+        @Override
+        public void event(LeadershipEvent event) {
+            log.debug("Leadership Event: time = {} type = {} event = {}",
+                      event.time(), event.type(), event);
 
-            // Block until it becomes the leader
-            try {
-                leaderLock.lock(LEASE_DURATION_MS);
+            if (!event.subject().topic().equals(appId.name())) {
+                return;         // Not our topic: ignore
+            }
+            if (!event.subject().leader().id().equals(
+                        localControllerNode.id())) {
+                return;         // The event is not about this instance: ignore
+            }
 
-                // This instance is the leader
+            switch (event.type()) {
+            case LEADER_ELECTED:
                 log.info("SDN-IP Leader Elected");
                 intentSynchronizer.leaderChanged(true);
-
-                // Keep extending the expiration until shutdown
-                int extensionFailedCountdown = LEASE_EXTEND_RETRY_MAX - 1;
-
-                //
-                // Keep periodically extending the lock expiration.
-                // If there are multiple back-to-back failures to extend (with
-                // extra sleep time between retrials), then release the lock.
-                //
-                while (!isShutdown) {
-                    Thread.sleep(LEASE_DURATION_MS / LEASE_EXTEND_RETRY_MAX);
-                    if (leaderLock.extendExpiration(LEASE_DURATION_MS)) {
-                        log.trace("SDN-IP Leader Extended");
-                        extensionFailedCountdown = LEASE_EXTEND_RETRY_MAX;
-                    } else {
-                        log.debug("SDN-IP Leader Cannot Extend Election");
-                        if (!leaderLock.isLocked()) {
-                            log.debug("SDN-IP Leader Lock Lost");
-                            intentSynchronizer.leaderChanged(false);
-                            break;              // Try again to get the lock
-                        }
-                        extensionFailedCountdown--;
-                        if (extensionFailedCountdown <= 0) {
-                            // Failed too many times to extend.
-                            // Release the lock.
-                            log.debug("SDN-IP Leader Lock Released");
-                            intentSynchronizer.leaderChanged(false);
-                            leaderLock.unlock();
-                            break;              // Try again to get the lock
-                        }
-                    }
-                }
-            } catch (InterruptedException e) {
-                // Thread interrupted. Time to shutdown
-                log.debug("SDN-IP Leader Interrupted");
+                break;
+            case LEADER_BOOTED:
+                log.info("SDN-IP Leader Lost Election");
+                intentSynchronizer.leaderChanged(false);
+                break;
+            case LEADER_REELECTED:
+                break;
+            default:
+                break;
             }
         }
-        // If we reach here, the instance was shutdown
-        intentSynchronizer.leaderChanged(false);
-        leaderLock.unlock();
     }
 }