Ported ZookeeperRegistry to new Curator version using their notifications

Change-Id: Ie2319d0f11a1ebd2c34ceb7c9e0fb9c5aa085708

Implemented asynchronous leadership change notification.
This is needed to prevent deadlock in original switch registry code
(this used to be provided by my Curator modifications, but the official
Curator leader latch implementation gives notifications on the calling thread).

Change-Id: Ic3db816c98e4fc2b7a85d0007740b28afc4566a6

Remove local (modified) curator libraries and clean up the pom.xml
Amended to fix tests that broke due to an API change.

Change-Id: I09de15de2dd448b06712e93724c1b75160768db3
diff --git a/curator/curator-client-1.3.5-SNAPSHOT.jar b/curator/curator-client-1.3.5-SNAPSHOT.jar
deleted file mode 100644
index c22d602..0000000
--- a/curator/curator-client-1.3.5-SNAPSHOT.jar
+++ /dev/null
Binary files differ
diff --git a/curator/curator-framework-1.3.5-SNAPSHOT.jar b/curator/curator-framework-1.3.5-SNAPSHOT.jar
deleted file mode 100644
index 1b89270..0000000
--- a/curator/curator-framework-1.3.5-SNAPSHOT.jar
+++ /dev/null
Binary files differ
diff --git a/curator/curator-recipes-1.3.5-SNAPSHOT.jar b/curator/curator-recipes-1.3.5-SNAPSHOT.jar
deleted file mode 100644
index 30efe51..0000000
--- a/curator/curator-recipes-1.3.5-SNAPSHOT.jar
+++ /dev/null
Binary files differ
diff --git a/curator/curator-x-discovery-1.3.5-SNAPSHOT.jar b/curator/curator-x-discovery-1.3.5-SNAPSHOT.jar
deleted file mode 100644
index 147417e..0000000
--- a/curator/curator-x-discovery-1.3.5-SNAPSHOT.jar
+++ /dev/null
Binary files differ
diff --git a/pom.xml b/pom.xml
index 2bd6030..18ea3db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,11 +12,11 @@
   <url>http://onlab.us/</url>
   <repositories>
     <!-- In Project repository -->
-    <repository>
+    <!--<repository>
       <id>in-project</id>
       <name>In Project Repo</name>
       <url>file://${project.basedir}/repo</url>
-    </repository>
+    </repository>-->
     <repository>
       <id>central</id>
       <name>Maven Central repository</name>
@@ -550,6 +550,26 @@
       <version>2.1</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+      <version>2.4.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-client</artifactId>
+      <version>2.4.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+      <version>2.4.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-x-discovery</artifactId>
+      <version>2.4.1</version>
+    </dependency>
     <!--
     <dependency>
       <groupId>org.objenesis</groupId>
@@ -563,27 +583,7 @@
       <version>2.2.2</version>
     </dependency>
     -->
-    <!-- dependency to locally modified version -->
-    <dependency>
-      <groupId>com.netflix.curator</groupId>
-      <artifactId>curator-framework</artifactId>
-      <version>1.3.5-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>com.netflix.curator</groupId>
-      <artifactId>curator-client</artifactId>
-      <version>1.3.5-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>com.netflix.curator</groupId>
-      <artifactId>curator-recipes</artifactId>
-      <version>1.3.5-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>com.netflix.curator</groupId>
-      <artifactId>curator-x-discovery</artifactId>
-      <version>1.3.5-SNAPSHOT</version>
-    </dependency>
+
     <!--
     <dependency>
       <groupId>net.floodlightcontroller</groupId>
diff --git a/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java b/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
index eb513fd..0b29964 100644
--- a/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
+++ b/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
@@ -2,16 +2,20 @@
 
 import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
 
-import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
 
 public class SwitchLeadershipData {
 	
-	private LeaderLatch latch;
-	private ControlChangeCallback cb;
+	private final LeaderLatch latch;
+	private final ControlChangeCallback cb;
+	private final LeaderLatchListener listener;
 
-	public SwitchLeadershipData(LeaderLatch latch, ControlChangeCallback cb) {
+	public SwitchLeadershipData(LeaderLatch latch, ControlChangeCallback cb,
+			LeaderLatchListener listener) {
 		this.latch = latch;
 		this.cb = cb;
+		this.listener = listener;
 	}
 	
 	public LeaderLatch getLatch(){
@@ -21,5 +25,9 @@
 	public ControlChangeCallback getCallback(){
 		return cb;
 	}
+	
+	public LeaderLatchListener getListener() {
+		return listener;
+	}
 
 }
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
index 8a2af13..b8a1021 100755
--- a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -8,7 +8,11 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import net.floodlightcontroller.core.IFloodlightProviderService;
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
@@ -18,33 +22,30 @@
 import net.floodlightcontroller.restserver.IRestApiService;
 import net.onrc.onos.registry.controller.web.RegistryWebRoutable;
 
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.framework.recipes.leader.Participant;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.x.discovery.ServiceCache;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
 import org.openflow.util.HexString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Charsets;
-import com.netflix.curator.RetryPolicy;
-import com.netflix.curator.framework.CuratorFramework;
-import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.framework.recipes.atomic.AtomicValue;
-import com.netflix.curator.framework.recipes.atomic.DistributedAtomicLong;
-import com.netflix.curator.framework.recipes.cache.ChildData;
-import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
-import com.netflix.curator.framework.recipes.cache.PathChildrenCache.StartMode;
-import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
-import com.netflix.curator.framework.recipes.leader.LeaderLatch;
-import com.netflix.curator.framework.recipes.leader.LeaderLatchEvent;
-import com.netflix.curator.framework.recipes.leader.LeaderLatchListener;
-import com.netflix.curator.framework.recipes.leader.Participant;
-import com.netflix.curator.retry.ExponentialBackoffRetry;
-import com.netflix.curator.retry.RetryOneTime;
-import com.netflix.curator.x.discovery.ServiceCache;
-import com.netflix.curator.x.discovery.ServiceDiscovery;
-import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder;
-import com.netflix.curator.x.discovery.ServiceInstance;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 
 /**
  * A registry service that uses Zookeeper. All data is stored in Zookeeper,
@@ -87,8 +88,44 @@
 	//Zookeeper performance-related configuration
 	protected static final int sessionTimeout = 5000;
 	protected static final int connectionTimeout = 7000;
-        private volatile IdBlock idBlock = null;
 	
+    private final BlockingQueue<SwitchLeaderEvent> switchLeadershipEvents = 
+    		new LinkedBlockingQueue<SwitchLeaderEvent>();
+    
+    private ExecutorService eventThreadExecutorService;
+    
+    private static class SwitchLeaderEvent {
+    	public final long dpid;
+    	public final boolean isLeader;
+    	
+    	public SwitchLeaderEvent(long dpid, boolean isLeader) {
+    		this.dpid = dpid;
+    		this.isLeader = isLeader;
+    	}
+    }
+    
+    /*
+     * Dispatcher thread for leadership change events coming from Curator.
+     */
+    private void dispatchEvents() {
+    	while (!Thread.currentThread().isInterrupted()) {
+    		try {
+    			SwitchLeaderEvent event = switchLeadershipEvents.take();
+    			SwitchLeadershipData swData = switches.get(HexString.toHexString(event.dpid));
+    			if (swData == null) {
+    				log.debug("Leadership data {} not found", event.dpid);
+    				continue;
+    			}
+    			
+    			swData.getCallback().controlChanged(event.dpid, event.isLeader);
+    		} catch (InterruptedException e) {
+    			Thread.currentThread().interrupt();
+    			break;
+    		} catch (Exception e) {
+    			log.error("Exception in registry event thread", e);
+    		}
+    	}
+    }
 
 	protected class SwitchLeaderListener implements LeaderLatchListener{
 		String dpid;
@@ -98,30 +135,19 @@
 			this.dpid = dpid;
 			this.latch = latch;
 		}
-		
+
 		@Override
-		public void leaderLatchEvent(CuratorFramework arg0,
-				LeaderLatchEvent arg1) {
-			log.debug("Switch leadership changed for {}, now {}",
-					dpid, latch.hasLeadership());
+		public void isLeader() {
+		    log.debug("Became leader for {}", dpid);
+		    
+		    switchLeadershipEvents.offer(new SwitchLeaderEvent(HexString.toLong(dpid), true));
+		}
+
+		@Override
+		public void notLeader() {
+		    log.debug("Lost leadership for {}", dpid);
 			
-			//Check that the leadership request is still active - the client
-			//may have since released the request or even begun another request
-			//(this is why we use == to check the object instance is the same)
-			SwitchLeadershipData swData = switches.get(dpid);
-			if (swData == null) {
-				log.debug("Leadership data {} not found", dpid);
-				return;
-			}
-			
-			if (swData.getLatch() == latch){
-				swData.getCallback().controlChanged(
-						HexString.toLong(dpid), latch.hasLeadership());
-			}
-			else {
-				log.debug("Latch for {} has changed: old latch {} - new latch {}", 
-						new Object[]{dpid, latch, swData.getLatch()});
-			}
+			switchLeadershipEvents.offer(new SwitchLeaderEvent(HexString.toLong(dpid), false));
 		}
 	}
 	
@@ -129,7 +155,6 @@
 		@Override
 		public void childEvent(CuratorFramework client,
 				PathChildrenCacheEvent event) throws Exception {
-			//log.debug("Root switch path cache got {} event", event.getType());
 			
 			String strSwitch = null;
 			if (event.getData() != null){
@@ -161,7 +186,7 @@
 				}
 				break;
 			default:
-				//All other events are connection status events. We don't need to 
+				//All other switchLeadershipEvents are connection status switchLeadershipEvents. We don't need to 
 				//do anything as the path cache handles these on its own.
 				break;
 			}
@@ -176,15 +201,19 @@
 			this.latch = latch;
 		}
 
+		//
+		// NOTE: If we need to support callbacks when the
+		// leadership changes, those should be called here.
+		//
+		
 		@Override
-		public void leaderLatchEvent(CuratorFramework arg0,
-				LeaderLatchEvent arg1) {
-			log.debug("Cluster leadership changed, now {}",
-					latch.hasLeadership());
-			//
-			// NOTE: If we need to support callbacks when the
-			// leadership changes, those should be called here.
-			//
+		public void isLeader() {
+			log.debug("Cluster leadership aquired");
+		}
+
+		@Override
+		public void notLeader() {
+			log.debug("Cluster leadership lost");
 		}
 	}
 
@@ -215,10 +244,11 @@
 		}
 		
 		LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
-		latch.addListener(new SwitchLeaderListener(dpidStr, latch));
+		SwitchLeaderListener listener = new SwitchLeaderListener(dpidStr, latch); 
+		latch.addListener(listener);
 		
 
-		SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb);
+		SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb, listener);
 		SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
 		
 		if (oldData != null){
@@ -259,7 +289,7 @@
 
 		LeaderLatch latch = swData.getLatch();
 		
-		latch.removeAllListeners();
+		latch.removeListener(swData.getListener());
 		
 		try {
 			latch.close();
@@ -529,8 +559,17 @@
 			throw new FloodlightModuleException("Error initialising ZookeeperRegistry: " 
 					+ e.getMessage());
 		}
+		
+		eventThreadExecutorService = Executors.newSingleThreadExecutor();
+		eventThreadExecutorService.execute(
+				new Runnable() {
+					@Override
+					public void run(){
+						dispatchEvents();
+					}
+				});
 	}
-	
+
 	@Override
 	public void startUp (FloodlightModuleContext context) {
 		//
diff --git a/src/test/java/net/onrc/onos/registry/controller/ZookeeperRegistryTest.java b/src/test/java/net/onrc/onos/registry/controller/ZookeeperRegistryTest.java
index 3314ad2..774a9d4 100644
--- a/src/test/java/net/onrc/onos/registry/controller/ZookeeperRegistryTest.java
+++ b/src/test/java/net/onrc/onos/registry/controller/ZookeeperRegistryTest.java
@@ -10,6 +10,23 @@
 import net.onrc.onos.registry.controller.StandaloneRegistryTest.LoggingCallback;
 import net.onrc.onos.registry.controller.ZookeeperRegistry.SwitchLeaderListener;
 
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.x.discovery.ServiceCache;
+import org.apache.curator.x.discovery.ServiceCacheBuilder;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.junit.After;
@@ -22,24 +39,6 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import com.netflix.curator.RetryPolicy;
-import com.netflix.curator.framework.CuratorFramework;
-import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.framework.listen.ListenerContainer;
-import com.netflix.curator.framework.recipes.atomic.AtomicValue;
-import com.netflix.curator.framework.recipes.atomic.DistributedAtomicLong;
-import com.netflix.curator.framework.recipes.cache.ChildData;
-import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
-import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
-import com.netflix.curator.framework.recipes.cache.PathChildrenCache.StartMode;
-import com.netflix.curator.framework.recipes.leader.LeaderLatch;
-import com.netflix.curator.x.discovery.ServiceCache;
-import com.netflix.curator.x.discovery.ServiceCacheBuilder;
-import com.netflix.curator.x.discovery.ServiceDiscovery;
-import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder;
-import com.netflix.curator.x.discovery.ServiceInstance;
-
 /**
  * Unit test for {@link ZookeeperRegistry}.
  * NOTE: {@link FloodlightTestCase} conflicts with PowerMock. If FloodLight-related methods need to be tested,
@@ -205,7 +204,7 @@
 		EasyMock.expectLastCall().once();
 		latch.start();
 		EasyMock.expectLastCall().once();
-		latch.removeAllListeners();
+		latch.removeListener(EasyMock.anyObject(SwitchLeaderListener.class));
 		EasyMock.expectLastCall().once();
 		latch.close();
 		EasyMock.expectLastCall().once();
@@ -241,7 +240,7 @@
 		latch.start();
 		EasyMock.expectLastCall().once();
 		EasyMock.expect(latch.hasLeadership()).andReturn(true).anyTimes();
-		latch.removeAllListeners();
+		latch.removeListener(EasyMock.anyObject(SwitchLeaderListener.class));
 		EasyMock.expectLastCall().once();
 		latch.close();
 		EasyMock.expectLastCall().once();