Add a mechanism to elect a single leader in an ONOS cluster (ONOS-977).

Note: For now, leadership change callbacks are intentionally not implemented,
to discourage from liberally using this feature.
In the future we might have to add the callback support, but only if really
needed.

Change-Id: Ia70045aee737fbd67ec445ea7065476e3f4838aa
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
index 8706e85..b496321 100755
--- a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -35,6 +35,7 @@
 import com.netflix.curator.framework.recipes.leader.LeaderLatch;
 import com.netflix.curator.framework.recipes.leader.LeaderLatchEvent;
 import com.netflix.curator.framework.recipes.leader.LeaderLatchListener;
+import com.netflix.curator.framework.recipes.leader.Participant;
 import com.netflix.curator.retry.ExponentialBackoffRetry;
 import com.netflix.curator.retry.RetryOneTime;
 import com.netflix.curator.x.discovery.ServiceCache;
@@ -62,6 +63,7 @@
 	
 	private final String namespace = "onos";
 	private final String switchLatchesPath = "/switches";
+	private final String CLUSTER_LEADER_PATH = "/cluster/leader";
 
 	private final String SERVICES_PATH = "/"; //i.e. the root of our namespace
 	private final String CONTROLLER_SERVICE_NAME = "controllers";
@@ -72,7 +74,11 @@
 
 	protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
 	protected Map<String, PathChildrenCache> switchPathCaches;
-	
+
+	protected LeaderLatch clusterLeaderLatch;
+	protected ClusterLeaderListener clusterLeaderListener;
+	private static final long CLUSTER_LEADER_ELECTION_RETRY_MS = 100;
+
 	private final String ID_COUNTER_PATH = "/flowidcounter";
 	private final Long ID_BLOCK_SIZE = 0x100000000L;
 	protected DistributedAtomicLong distributedIdCounter;
@@ -95,7 +101,7 @@
 		@Override
 		public void leaderLatchEvent(CuratorFramework arg0,
 				LeaderLatchEvent arg1) {
-			log.debug("Leadership changed for {}, now {}",
+			log.debug("Switch leadership changed for {}, now {}",
 					dpid, latch.hasLeadership());
 			
 			//Check that the leadership request is still active - the client
@@ -161,6 +167,26 @@
 			
 		}
 	};
+
+	protected class ClusterLeaderListener implements LeaderLatchListener {
+		LeaderLatch latch;
+
+		public ClusterLeaderListener(LeaderLatch latch) {
+			this.latch = latch;
+		}
+
+		@Override
+		public void leaderLatchEvent(CuratorFramework arg0,
+				LeaderLatchEvent arg1) {
+			log.debug("Cluster leadership changed, now {}",
+					latch.hasLeadership());
+			//
+			// NOTE: If we need to support callbacks when the
+			// leadership changes, those should be called here.
+			//
+		}
+	}
+
 	/**
 	 * Listens for changes to the switch znodes in Zookeeper. This maintains
 	 * the second level of PathChildrenCaches that hold the controllers 
@@ -258,6 +284,11 @@
 	}
 
 	@Override
+	public boolean isClusterLeader() {
+	    return clusterLeaderLatch.hasLeadership();
+	}
+
+	@Override
 	public String getControllerId() {
 		return controllerId;
 	}
@@ -500,6 +531,37 @@
 	
 	@Override
 	public void startUp (FloodlightModuleContext context) {
+		//
+		// Cluster Leader election setup.
+		// NOTE: We have to do it here, because during the init stage
+		// we don't know the Controller ID.
+		//
+		if (controllerId == null) {
+		    log.error("Error on startup: unknown ControllerId");
+		}
+		clusterLeaderLatch = new LeaderLatch(client,
+						     CLUSTER_LEADER_PATH,
+						     controllerId);
+		clusterLeaderListener = new ClusterLeaderListener(clusterLeaderLatch);
+		clusterLeaderLatch.addListener(clusterLeaderListener);
+		try {
+			clusterLeaderLatch.start();
+		} catch (Exception e) {
+		    log.error("Error on startup starting the cluster leader election: {}", e.getMessage());
+		}
+
+		// Keep trying until there is a cluster leader
+		do {
+			try {
+				Participant leader = clusterLeaderLatch.getLeader();
+				if (! leader.getId().isEmpty())
+					break;
+				Thread.sleep(CLUSTER_LEADER_ELECTION_RETRY_MS);
+			} catch (Exception e) {
+				log.error("Error on startup waiting for cluster leader election: {}", e.getMessage());
+			}
+		} while (true);
+
 		restApi.addRestletRoutable(new RegistryWebRoutable());
 	}
 }