Merge branch 'regsync'
Merging in the re-implementation of registering controllers using Curator's Service Discovery.
(Merged the wrong branch before, oops)
diff --git a/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java b/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java
index 36cde95..964eff1 100644
--- a/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java
+++ b/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java
@@ -102,7 +102,7 @@
// TODO Auto-generated catch block
e.printStackTrace();
} catch (RegistryException e) {
- // TODO Auto-generated catch block
+ log.debug("Caught RegistryException trying to requestControl in cleanup thread");
e.printStackTrace();
}
}
diff --git a/src/main/java/net/onrc/onos/registry/controller/ControllerService.java b/src/main/java/net/onrc/onos/registry/controller/ControllerService.java
new file mode 100644
index 0000000..c74e85d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/registry/controller/ControllerService.java
@@ -0,0 +1,26 @@
+package net.onrc.onos.registry.controller;
+
+
+
+//@JsonRootName("controller")
+public class ControllerService {
+
+ private String controllerId;
+
+ public ControllerService(){
+ this("");
+ }
+
+ public ControllerService(String controllerId) {
+ this.controllerId = controllerId;
+ }
+
+ public void setControllerId(String controllerId) {
+ this.controllerId = controllerId;
+ }
+
+ public String getControllerId() {
+ return controllerId;
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/registry/controller/RegistryRunner.java b/src/main/java/net/onrc/onos/registry/controller/RegistryRunner.java
deleted file mode 100644
index 164f328..0000000
--- a/src/main/java/net/onrc/onos/registry/controller/RegistryRunner.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package net.onrc.onos.registry.controller;
-
-import net.floodlightcontroller.core.module.FloodlightModuleContext;
-import net.floodlightcontroller.core.module.FloodlightModuleException;
-import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
-
-import org.openflow.util.HexString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used for lightweight testing of the mastership module without having
- * to load up the entire ONOS.
- * @author jono
- *
- */
-public class RegistryRunner {
- protected static Logger log = LoggerFactory.getLogger(RegistryRunner.class);
-
- public static void main(String args[]){
- FloodlightModuleContext fmc = new FloodlightModuleContext();
- ZookeeperRegistry rm = new ZookeeperRegistry();
-
- fmc.addConfigParam(rm, "enableZookeeper", "true");
-
- String id = null;
- if (args.length > 0){
- id = args[0];
- log.info("Using unique id: {}", id);
- }
-
- try {
- rm.init(fmc);
- rm.startUp(fmc);
-
- if (id == null){
- log.error("No unique ID supplied");
- System.exit(1);
- }
-
- rm.registerController(id);
- //rm.setMastershipId(id);
-
- rm.requestControl(1L,
- new ControlChangeCallback(){
- @Override
- public void controlChanged(long dpid, boolean isMaster) {
- if (isMaster){
- log.debug("Callback for becoming master for {}", HexString.toHexString(dpid));
- }
- else {
- log.debug("Callback for losing mastership for {}", HexString.toHexString(dpid));
- }
- }
- });
-
- Thread.sleep(1000);
-
- /*
- Map<String, List<ControllerRegistryEntry>> switches = rm.getAllSwitches();
- for (List<ControllerRegistryEntry> ls : switches.values()){
- for (ControllerRegistryEntry cre : ls){
- log.debug("ctrlr: {}", cre.getControllerId());
- }
- }
- */
- //"Server" loop
- while (true) {
- Thread.sleep(60000);
- }
-
- } catch (FloodlightModuleException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- log.debug("is master: {}", rm.hasControl(1L));
- }
-}
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
index b666db7..476f7b3 100644
--- a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -1,7 +1,6 @@
package net.onrc.onos.registry.controller;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -16,9 +15,6 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.restserver.IRestApiService;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,7 +33,10 @@
import com.netflix.curator.framework.recipes.leader.LeaderLatchListener;
import com.netflix.curator.framework.recipes.leader.Participant;
import com.netflix.curator.retry.ExponentialBackoffRetry;
-import com.netflix.curator.utils.ZKPaths;
+import com.netflix.curator.x.discovery.ServiceCache;
+import com.netflix.curator.x.discovery.ServiceDiscovery;
+import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder;
+import com.netflix.curator.x.discovery.ServiceInstance;
/**
* A registry service that uses Zookeeper. All data is stored in Zookeeper,
@@ -57,11 +56,12 @@
private final String namespace = "onos";
private final String switchLatchesPath = "/switches";
- private final String controllerPath = "/controllers";
+
+ private final String SERVICES_PATH = "/"; //i.e. the root of our namespace
+ private final String CONTROLLER_SERVICE_NAME = "controllers";
protected CuratorFramework client;
- protected PathChildrenCache controllerCache;
protected PathChildrenCache switchCache;
protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
@@ -96,7 +96,8 @@
HexString.toLong(dpid), latch.hasLeadership());
}
else {
- log.debug("Latch for {} has changed", dpid);
+ log.debug("Latch for {} has changed: old latch {} - new latch {}",
+ new Object[]{dpid, latch, swData.getLatch()});
}
}
}
@@ -123,26 +124,35 @@
case CHILD_ADDED:
case CHILD_UPDATED:
//Check we have a PathChildrenCache for this child, add one if not
- if (switchPathCaches.get(strSwitch) == null){
- PathChildrenCache pc = new PathChildrenCache(client,
- event.getData().getPath(), true);
- pc.start(StartMode.NORMAL);
- switchPathCaches.put(strSwitch, pc);
+ synchronized (switchPathCaches){
+ if (switchPathCaches.get(strSwitch) == null){
+ PathChildrenCache pc = new PathChildrenCache(client,
+ event.getData().getPath(), true);
+ pc.start(StartMode.NORMAL);
+ switchPathCaches.put(strSwitch, pc);
+ }
}
break;
case CHILD_REMOVED:
//Remove our PathChildrenCache for this child
- PathChildrenCache pc = switchPathCaches.remove(strSwitch);
- pc.close();
+ PathChildrenCache pc = null;
+ synchronized(switchPathCaches){
+ pc = switchPathCaches.remove(strSwitch);
+ }
+ if (pc != null){
+ pc.close();
+ }
break;
default:
- //All other events are connection status events. We need to do anything
- //as the path cache handles these on its own.
+ //All other events are connection status events. We don't need to
+ //do anything as the path cache handles these on its own.
break;
}
}
};
+ protected ServiceDiscovery<ControllerService> serviceDiscovery;
+ protected ServiceCache<ControllerService> serviceCache;
@Override
@@ -206,6 +216,8 @@
LeaderLatch latch = swData.getLatch();
+ latch.removeAllListeners();
+
try {
latch.close();
} catch (IOException e) {
@@ -238,17 +250,13 @@
log.debug("Getting all controllers");
List<String> controllers = new ArrayList<String>();
- for (ChildData data : controllerCache.getCurrentData()){
-
- String d = null;
- try {
- d = new String(data.getData(), "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RegistryException("Error encoding string", e);
+ for (ServiceInstance<ControllerService> instance : serviceCache.getInstances()){
+ String id = instance.getPayload().getControllerId();
+ if (!controllers.contains(id)){
+ controllers.add(id);
}
-
- controllers.add(d);
}
+
return controllers;
}
@@ -261,52 +269,20 @@
controllerId = id;
- byte bytes[] = id.getBytes(Charsets.UTF_8);
- String path = ZKPaths.makePath(controllerPath, controllerId);
-
- log.info("Registering controller with id {}", id);
-
try {
- //We need to set a watch to recreate the node in the controller
- //registry if it gets deleted - e.g. on Zookeeper connection loss.
- Watcher watcher = new Watcher(){
- @Override
- public void process(WatchedEvent event) {
- log.debug("got any watch event {} ", event);
-
- String path = ZKPaths.makePath(controllerPath, controllerId);
- byte bytes[] = controllerId.getBytes(Charsets.UTF_8);
-
- try {
- if (event.getType() == Event.EventType.NodeDeleted){
- log.debug("got a node deleted event");
-
-
- client.create().withMode(CreateMode.EPHEMERAL)
- .forPath(path, bytes);
- }
- } catch (Exception e) {
- log.warn("Error recreating controller node for {}: {}",
- controllerId, e.getMessage());
- } finally {
- try {
- client.checkExists().usingWatcher(this).forPath(path);
- } catch (Exception e2){
- log.warn("Error resetting watch for {}: {}",
- controllerId, e2.getMessage());
- }
- }
- }
- };
+ ServiceInstance<ControllerService> thisInstance = ServiceInstance.<ControllerService>builder()
+ .name(CONTROLLER_SERVICE_NAME)
+ .payload(new ControllerService(controllerId))
+ //.port((int)(65535 * Math.random())) // in a real application, you'd use a common port
+ //.uriSpec(uriSpec)
+ .build();
- //Create ephemeral node in controller registry
- //TODO Use protection
- client.create().withMode(CreateMode.EPHEMERAL)
- .forPath(path, bytes);
- client.checkExists().usingWatcher(watcher).forPath(path);
+ serviceDiscovery.registerService(thisInstance);
} catch (Exception e) {
- throw new RegistryException("Error contacting the Zookeeper service", e);
+ // TODO Auto-generated catch block
+ e.printStackTrace();
}
+
}
@Override
@@ -421,7 +397,8 @@
restApi = context.getServiceImpl(IRestApiService.class);
switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
- switchPathCaches = new HashMap<String, PathChildrenCache>();
+ //switchPathCaches = new HashMap<String, PathChildrenCache>();
+ switchPathCaches = new ConcurrentHashMap<String, PathChildrenCache>();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(this.connectionString,
@@ -431,12 +408,22 @@
client = client.usingNamespace(namespace);
- controllerCache = new PathChildrenCache(client, controllerPath, true);
switchCache = new PathChildrenCache(client, switchLatchesPath, true);
switchCache.getListenable().addListener(switchPathCacheListener);
+ //Build the service discovery object
+ serviceDiscovery = ServiceDiscoveryBuilder.builder(ControllerService.class)
+ .client(client).basePath(SERVICES_PATH).build();
+
+ //We read the list of services very frequently (GUI periodically queries them)
+ //so we'll cache them to cut down on Zookeeper queries.
+ serviceCache = serviceDiscovery.serviceCacheBuilder()
+ .name(CONTROLLER_SERVICE_NAME).build();
+
+
try {
- controllerCache.start(StartMode.BUILD_INITIAL_CACHE);
+ serviceDiscovery.start();
+ serviceCache.start();
//Don't prime the cache, we want a notification for each child node in the path
switchCache.start(StartMode.NORMAL);