Ported ZookeeperRegistry to new Curator version using their notifications

Change-Id: Ie2319d0f11a1ebd2c34ceb7c9e0fb9c5aa085708

Implemented asynchronous leadership change notification.
This is needed to prevent deadlock in original switch registry code
(this used to be provided by my Curator modifications, but the official
Curator leader latch implementation gives notifications on the calling thread).

Change-Id: Ic3db816c98e4fc2b7a85d0007740b28afc4566a6

Remove local (modified) curator libraries and clean up the pom.xml
Amended to fix tests that broke due to an API change.

Change-Id: I09de15de2dd448b06712e93724c1b75160768db3
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
index 8a2af13..b8a1021 100755
--- a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -8,7 +8,11 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import net.floodlightcontroller.core.IFloodlightProviderService;
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
@@ -18,33 +22,30 @@
 import net.floodlightcontroller.restserver.IRestApiService;
 import net.onrc.onos.registry.controller.web.RegistryWebRoutable;
 
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.framework.recipes.leader.Participant;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.x.discovery.ServiceCache;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
 import org.openflow.util.HexString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Charsets;
-import com.netflix.curator.RetryPolicy;
-import com.netflix.curator.framework.CuratorFramework;
-import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.framework.recipes.atomic.AtomicValue;
-import com.netflix.curator.framework.recipes.atomic.DistributedAtomicLong;
-import com.netflix.curator.framework.recipes.cache.ChildData;
-import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
-import com.netflix.curator.framework.recipes.cache.PathChildrenCache.StartMode;
-import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
-import com.netflix.curator.framework.recipes.leader.LeaderLatch;
-import com.netflix.curator.framework.recipes.leader.LeaderLatchEvent;
-import com.netflix.curator.framework.recipes.leader.LeaderLatchListener;
-import com.netflix.curator.framework.recipes.leader.Participant;
-import com.netflix.curator.retry.ExponentialBackoffRetry;
-import com.netflix.curator.retry.RetryOneTime;
-import com.netflix.curator.x.discovery.ServiceCache;
-import com.netflix.curator.x.discovery.ServiceDiscovery;
-import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder;
-import com.netflix.curator.x.discovery.ServiceInstance;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 
 /**
  * A registry service that uses Zookeeper. All data is stored in Zookeeper,
@@ -87,8 +88,44 @@
 	//Zookeeper performance-related configuration
 	protected static final int sessionTimeout = 5000;
 	protected static final int connectionTimeout = 7000;
-        private volatile IdBlock idBlock = null;
 	
+    private final BlockingQueue<SwitchLeaderEvent> switchLeadershipEvents = 
+    		new LinkedBlockingQueue<SwitchLeaderEvent>();
+    
+    private ExecutorService eventThreadExecutorService;
+    
+    private static class SwitchLeaderEvent {
+    	public final long dpid;
+    	public final boolean isLeader;
+    	
+    	public SwitchLeaderEvent(long dpid, boolean isLeader) {
+    		this.dpid = dpid;
+    		this.isLeader = isLeader;
+    	}
+    }
+    
+    /*
+     * Dispatcher thread for leadership change events coming from Curator.
+     */
+    private void dispatchEvents() {
+    	while (!Thread.currentThread().isInterrupted()) {
+    		try {
+    			SwitchLeaderEvent event = switchLeadershipEvents.take();
+    			SwitchLeadershipData swData = switches.get(HexString.toHexString(event.dpid));
+    			if (swData == null) {
+    				log.debug("Leadership data {} not found", event.dpid);
+    				continue;
+    			}
+    			
+    			swData.getCallback().controlChanged(event.dpid, event.isLeader);
+    		} catch (InterruptedException e) {
+    			Thread.currentThread().interrupt();
+    			break;
+    		} catch (Exception e) {
+    			log.error("Exception in registry event thread", e);
+    		}
+    	}
+    }
 
 	protected class SwitchLeaderListener implements LeaderLatchListener{
 		String dpid;
@@ -98,30 +135,19 @@
 			this.dpid = dpid;
 			this.latch = latch;
 		}
-		
+
 		@Override
-		public void leaderLatchEvent(CuratorFramework arg0,
-				LeaderLatchEvent arg1) {
-			log.debug("Switch leadership changed for {}, now {}",
-					dpid, latch.hasLeadership());
+		public void isLeader() {
+		    log.debug("Became leader for {}", dpid);
+		    
+		    switchLeadershipEvents.offer(new SwitchLeaderEvent(HexString.toLong(dpid), true));
+		}
+
+		@Override
+		public void notLeader() {
+		    log.debug("Lost leadership for {}", dpid);
 			
-			//Check that the leadership request is still active - the client
-			//may have since released the request or even begun another request
-			//(this is why we use == to check the object instance is the same)
-			SwitchLeadershipData swData = switches.get(dpid);
-			if (swData == null) {
-				log.debug("Leadership data {} not found", dpid);
-				return;
-			}
-			
-			if (swData.getLatch() == latch){
-				swData.getCallback().controlChanged(
-						HexString.toLong(dpid), latch.hasLeadership());
-			}
-			else {
-				log.debug("Latch for {} has changed: old latch {} - new latch {}", 
-						new Object[]{dpid, latch, swData.getLatch()});
-			}
+			switchLeadershipEvents.offer(new SwitchLeaderEvent(HexString.toLong(dpid), false));
 		}
 	}
 	
@@ -129,7 +155,6 @@
 		@Override
 		public void childEvent(CuratorFramework client,
 				PathChildrenCacheEvent event) throws Exception {
-			//log.debug("Root switch path cache got {} event", event.getType());
 			
 			String strSwitch = null;
 			if (event.getData() != null){
@@ -161,7 +186,7 @@
 				}
 				break;
 			default:
-				//All other events are connection status events. We don't need to 
+				//All other switchLeadershipEvents are connection status switchLeadershipEvents. We don't need to 
 				//do anything as the path cache handles these on its own.
 				break;
 			}
@@ -176,15 +201,19 @@
 			this.latch = latch;
 		}
 
+		//
+		// NOTE: If we need to support callbacks when the
+		// leadership changes, those should be called here.
+		//
+		
 		@Override
-		public void leaderLatchEvent(CuratorFramework arg0,
-				LeaderLatchEvent arg1) {
-			log.debug("Cluster leadership changed, now {}",
-					latch.hasLeadership());
-			//
-			// NOTE: If we need to support callbacks when the
-			// leadership changes, those should be called here.
-			//
+		public void isLeader() {
+			log.debug("Cluster leadership aquired");
+		}
+
+		@Override
+		public void notLeader() {
+			log.debug("Cluster leadership lost");
 		}
 	}
 
@@ -215,10 +244,11 @@
 		}
 		
 		LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
-		latch.addListener(new SwitchLeaderListener(dpidStr, latch));
+		SwitchLeaderListener listener = new SwitchLeaderListener(dpidStr, latch); 
+		latch.addListener(listener);
 		
 
-		SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb);
+		SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb, listener);
 		SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
 		
 		if (oldData != null){
@@ -259,7 +289,7 @@
 
 		LeaderLatch latch = swData.getLatch();
 		
-		latch.removeAllListeners();
+		latch.removeListener(swData.getListener());
 		
 		try {
 			latch.close();
@@ -529,8 +559,17 @@
 			throw new FloodlightModuleException("Error initialising ZookeeperRegistry: " 
 					+ e.getMessage());
 		}
+		
+		eventThreadExecutorService = Executors.newSingleThreadExecutor();
+		eventThreadExecutorService.execute(
+				new Runnable() {
+					@Override
+					public void run(){
+						dispatchEvents();
+					}
+				});
 	}
-	
+
 	@Override
 	public void startUp (FloodlightModuleContext context) {
 		//