Merge branch 'regsync'

Merging in the re-implementation of registering controllers using Curator's Service Discovery.
(Merged the wrong branch before, oops)
diff --git a/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java b/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java
index 36cde95..964eff1 100644
--- a/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java
+++ b/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java
@@ -102,7 +102,7 @@
 				// TODO Auto-generated catch block
 				e.printStackTrace();
 			} catch (RegistryException e) {
-				// TODO Auto-generated catch block
+				log.debug("Caught RegistryException trying to requestControl in cleanup thread");
 				e.printStackTrace();
 			}			
 		}
diff --git a/src/main/java/net/onrc/onos/registry/controller/ControllerService.java b/src/main/java/net/onrc/onos/registry/controller/ControllerService.java
new file mode 100644
index 0000000..c74e85d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/registry/controller/ControllerService.java
@@ -0,0 +1,26 @@
+package net.onrc.onos.registry.controller;
+
+
+
+//@JsonRootName("controller")
+public class ControllerService {
+
+	private String controllerId;
+	
+	public ControllerService(){
+		this("");
+	}
+	
+	public ControllerService(String controllerId) {
+		this.controllerId = controllerId;
+	}
+
+    public void setControllerId(String controllerId) {
+        this.controllerId = controllerId;
+    }
+
+    public String getControllerId() {
+        return controllerId;
+    }
+
+}
diff --git a/src/main/java/net/onrc/onos/registry/controller/RegistryRunner.java b/src/main/java/net/onrc/onos/registry/controller/RegistryRunner.java
deleted file mode 100644
index 164f328..0000000
--- a/src/main/java/net/onrc/onos/registry/controller/RegistryRunner.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package net.onrc.onos.registry.controller;
-
-import net.floodlightcontroller.core.module.FloodlightModuleContext;
-import net.floodlightcontroller.core.module.FloodlightModuleException;
-import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
-
-import org.openflow.util.HexString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used for lightweight testing of the mastership module without having
- * to load up the entire ONOS.
- * @author jono
- *
- */
-public class RegistryRunner {
-	protected static Logger log = LoggerFactory.getLogger(RegistryRunner.class);
-
-	public static void main(String args[]){
-		FloodlightModuleContext fmc = new FloodlightModuleContext();
-		ZookeeperRegistry rm = new ZookeeperRegistry();
-		
-		fmc.addConfigParam(rm, "enableZookeeper", "true");
-		
-		String id = null;
-		if (args.length > 0){
-			id = args[0];
-			log.info("Using unique id: {}", id);
-		}
-		
-		try {
-			rm.init(fmc);
-			rm.startUp(fmc);
-			
-			if (id == null){
-				log.error("No unique ID supplied");
-				System.exit(1);
-			}
-			
-			rm.registerController(id);
-			//rm.setMastershipId(id);
-				
-			rm.requestControl(1L, 
-				new ControlChangeCallback(){
-					@Override
-					public void controlChanged(long dpid, boolean isMaster) {
-						if (isMaster){
-							log.debug("Callback for becoming master for {}", HexString.toHexString(dpid));
-						}
-						else {
-							log.debug("Callback for losing mastership for {}", HexString.toHexString(dpid));
-						}
-					}
-				});
-			
-			Thread.sleep(1000);
-			
-			/*
-			Map<String, List<ControllerRegistryEntry>> switches = rm.getAllSwitches();
-			for (List<ControllerRegistryEntry> ls : switches.values()){
-				for (ControllerRegistryEntry cre : ls){
-					log.debug("ctrlr: {}", cre.getControllerId());
-				}
-			}
-			*/
-			//"Server" loop
-			while (true) {
-				Thread.sleep(60000);
-			}
-			
-		} catch (FloodlightModuleException e) {
-			e.printStackTrace();
-		} catch (InterruptedException e) {
-			e.printStackTrace();
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		
-		log.debug("is master: {}", rm.hasControl(1L));
-	}
-}
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
index b666db7..476f7b3 100644
--- a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -1,7 +1,6 @@
 package net.onrc.onos.registry.controller;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -16,9 +15,6 @@
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.floodlightcontroller.restserver.IRestApiService;
 
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.openflow.util.HexString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,7 +33,10 @@
 import com.netflix.curator.framework.recipes.leader.LeaderLatchListener;
 import com.netflix.curator.framework.recipes.leader.Participant;
 import com.netflix.curator.retry.ExponentialBackoffRetry;
-import com.netflix.curator.utils.ZKPaths;
+import com.netflix.curator.x.discovery.ServiceCache;
+import com.netflix.curator.x.discovery.ServiceDiscovery;
+import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder;
+import com.netflix.curator.x.discovery.ServiceInstance;
 
 /**
  * A registry service that uses Zookeeper. All data is stored in Zookeeper,
@@ -57,11 +56,12 @@
 	
 	private final String namespace = "onos";
 	private final String switchLatchesPath = "/switches";
-	private final String controllerPath = "/controllers";
+
+	private final String SERVICES_PATH = "/"; //i.e. the root of our namespace
+	private final String CONTROLLER_SERVICE_NAME = "controllers";
 	
 	protected CuratorFramework client;
 	
-	protected PathChildrenCache controllerCache;
 	protected PathChildrenCache switchCache;
 
 	protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
@@ -96,7 +96,8 @@
 						HexString.toLong(dpid), latch.hasLeadership());
 			}
 			else {
-				log.debug("Latch for {} has changed", dpid);
+				log.debug("Latch for {} has changed: old latch {} - new latch {}", 
+						new Object[]{dpid, latch, swData.getLatch()});
 			}
 		}
 	}
@@ -123,26 +124,35 @@
 			case CHILD_ADDED:
 			case CHILD_UPDATED:
 				//Check we have a PathChildrenCache for this child, add one if not
-				if (switchPathCaches.get(strSwitch) == null){
-					PathChildrenCache pc = new PathChildrenCache(client, 
-							event.getData().getPath(), true);
-					pc.start(StartMode.NORMAL);
-					switchPathCaches.put(strSwitch, pc);
+				synchronized (switchPathCaches){
+					if (switchPathCaches.get(strSwitch) == null){
+						PathChildrenCache pc = new PathChildrenCache(client, 
+								event.getData().getPath(), true);
+						pc.start(StartMode.NORMAL);
+						switchPathCaches.put(strSwitch, pc);
+					}
 				}
 				break;
 			case CHILD_REMOVED:
 				//Remove our PathChildrenCache for this child
-				PathChildrenCache pc = switchPathCaches.remove(strSwitch);
-				pc.close();
+				PathChildrenCache pc = null;
+				synchronized(switchPathCaches){
+					pc = switchPathCaches.remove(strSwitch);
+				}
+				if (pc != null){
+					pc.close();
+				}
 				break;
 			default:
-				//All other events are connection status events. We need to do anything
-				//as the path cache handles these on its own.
+				//All other events are connection status events. We don't need to 
+				//do anything as the path cache handles these on its own.
 				break;
 			}
 			
 		}
 	};
+	protected ServiceDiscovery<ControllerService> serviceDiscovery;
+	protected ServiceCache<ControllerService> serviceCache;
 
 	
 	@Override
@@ -206,6 +216,8 @@
 
 		LeaderLatch latch = swData.getLatch();
 		
+		latch.removeAllListeners();
+		
 		try {
 			latch.close();
 		} catch (IOException e) {
@@ -238,17 +250,13 @@
 		log.debug("Getting all controllers");
 		
 		List<String> controllers = new ArrayList<String>();
-		for (ChildData data : controllerCache.getCurrentData()){
-
-			String d = null;
-			try {
-				d = new String(data.getData(), "UTF-8");
-			} catch (UnsupportedEncodingException e) {
-				throw new RegistryException("Error encoding string", e);
+		for (ServiceInstance<ControllerService> instance : serviceCache.getInstances()){
+			String id = instance.getPayload().getControllerId();
+			if (!controllers.contains(id)){
+				controllers.add(id);
 			}
-
-			controllers.add(d);
 		}
+
 		return controllers;
 	}
 
@@ -261,52 +269,20 @@
 		
 		controllerId = id;
 		
-		byte bytes[] = id.getBytes(Charsets.UTF_8);
-		String path = ZKPaths.makePath(controllerPath, controllerId);
-		
-		log.info("Registering controller with id {}", id);
-		
 		try {
-			//We need to set a watch to recreate the node in the controller
-			//registry if it gets deleted - e.g. on Zookeeper connection loss.
-			Watcher watcher = new Watcher(){
-				@Override
-				public void process(WatchedEvent event) {
-					log.debug("got any watch event {} ", event);
-					
-					String path = ZKPaths.makePath(controllerPath, controllerId);
-					byte bytes[] = controllerId.getBytes(Charsets.UTF_8);
-					
-					try {
-						if (event.getType() == Event.EventType.NodeDeleted){
-							log.debug("got a node deleted event");
-							
-							
-							client.create().withMode(CreateMode.EPHEMERAL)
-								.forPath(path, bytes);
-						}
-					} catch (Exception e) {
-						log.warn("Error recreating controller node for {}: {}",
-								controllerId, e.getMessage());
-					} finally {
-						try {
-							client.checkExists().usingWatcher(this).forPath(path);
-						} catch (Exception e2){
-							log.warn("Error resetting watch for {}: {}", 
-									controllerId, e2.getMessage());
-						}
-					}
-				}
-			};
+			ServiceInstance<ControllerService> thisInstance = ServiceInstance.<ControllerService>builder()
+			        .name(CONTROLLER_SERVICE_NAME)
+			        .payload(new ControllerService(controllerId))
+			        //.port((int)(65535 * Math.random())) // in a real application, you'd use a common port
+			        //.uriSpec(uriSpec)
+			        .build();
 			
-			//Create ephemeral node in controller registry
-			//TODO Use protection
-			client.create().withMode(CreateMode.EPHEMERAL)
-					.forPath(path, bytes);
-			client.checkExists().usingWatcher(watcher).forPath(path);
+			serviceDiscovery.registerService(thisInstance);
 		} catch (Exception e) {
-			throw new RegistryException("Error contacting the Zookeeper service", e);
+			// TODO Auto-generated catch block
+			e.printStackTrace();
 		}
+		
 	}
 	
 	@Override
@@ -421,7 +397,8 @@
 		restApi = context.getServiceImpl(IRestApiService.class);
 
 		switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
-		switchPathCaches = new HashMap<String, PathChildrenCache>();
+		//switchPathCaches = new HashMap<String, PathChildrenCache>();
+		switchPathCaches = new ConcurrentHashMap<String, PathChildrenCache>();
 		
 		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
 		client = CuratorFrameworkFactory.newClient(this.connectionString, 
@@ -431,12 +408,22 @@
 		client = client.usingNamespace(namespace);
 
 		
-		controllerCache = new PathChildrenCache(client, controllerPath, true);
 		switchCache = new PathChildrenCache(client, switchLatchesPath, true);
 		switchCache.getListenable().addListener(switchPathCacheListener);
 		
+		//Build the service discovery object
+	    serviceDiscovery = ServiceDiscoveryBuilder.builder(ControllerService.class)
+	            .client(client).basePath(SERVICES_PATH).build();
+	    
+	    //We read the list of services very frequently (GUI periodically queries them)
+	    //so we'll cache them to cut down on Zookeeper queries.
+	    serviceCache = serviceDiscovery.serviceCacheBuilder()
+				.name(CONTROLLER_SERVICE_NAME).build();
+	    
+	    
 		try {
-			controllerCache.start(StartMode.BUILD_INITIAL_CACHE);
+			serviceDiscovery.start();
+			serviceCache.start();
 			
 			//Don't prime the cache, we want a notification for each child node in the path
 			switchCache.start(StartMode.NORMAL);