Updates for SDN-IP:
* Use the new Leadership Service instead of Distributed Lock to
elect the SDN-IP Leader
* Reimplement the SDN-IP Intent Synchronizer. In the new implementation
the Point-to-Point Peer intents are also synchronized by and pushed
only by the Leader (same as the Multipoint-to-SinglePoint Route intents)
* Minor cleanups
Change-Id: I8e142781211a1d0f2d362875bc28fd05d843cd4b
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
index 85f9114..5c19c33 100644
--- a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
@@ -18,8 +18,6 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -27,6 +25,11 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.LeadershipEvent;
+import org.onlab.onos.cluster.LeadershipEventListener;
+import org.onlab.onos.cluster.LeadershipService;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.core.CoreService;
import org.onlab.onos.net.host.HostService;
@@ -35,11 +38,8 @@
import org.onlab.onos.sdnip.bgp.BgpSession;
import org.onlab.onos.sdnip.bgp.BgpSessionManager;
import org.onlab.onos.sdnip.config.SdnIpConfigReader;
-import org.onlab.onos.store.service.Lock;
-import org.onlab.onos.store.service.LockService;
-import org.slf4j.Logger;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
/**
* Component for the SDN-IP peering application.
@@ -65,55 +65,49 @@
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LockService lockService;
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipService leadershipService;
private IntentSynchronizer intentSynchronizer;
private SdnIpConfigReader config;
private PeerConnectivityManager peerConnectivity;
private Router router;
private BgpSessionManager bgpSessionManager;
-
- private ExecutorService leaderElectionExecutor;
- private Lock leaderLock;
- private volatile boolean isShutdown = true;
+ private LeadershipEventListener leadershipEventListener =
+ new InnerLeadershipEventListener();
+ ApplicationId appId;
+ private ControllerNode localControllerNode;
@Activate
protected void activate() {
log.info("SDN-IP started");
- isShutdown = false;
- ApplicationId appId = coreService.registerApplication(SDN_IP_APP);
+ appId = coreService.registerApplication(SDN_IP_APP);
config = new SdnIpConfigReader();
config.init();
+ localControllerNode = clusterService.getLocalNode();
+
InterfaceService interfaceService =
new HostToInterfaceAdaptor(hostService);
intentSynchronizer = new IntentSynchronizer(appId, intentService);
intentSynchronizer.start();
- peerConnectivity = new PeerConnectivityManager(appId, config,
- interfaceService, intentService);
+ peerConnectivity = new PeerConnectivityManager(appId,
+ intentSynchronizer,
+ config,
+ interfaceService);
peerConnectivity.start();
- router = new Router(appId, intentSynchronizer, hostService, config,
- interfaceService);
+ router = new Router(appId, intentSynchronizer, config,
+ interfaceService, hostService);
router.start();
- leaderLock = lockService.create(SDN_IP_APP + "/sdnIpLeaderLock");
- leaderElectionExecutor = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder()
- .setNameFormat("sdnip-leader-election-%d").build());
- leaderElectionExecutor.execute(new Runnable() {
- @Override
- public void run() {
- doLeaderElectionThread();
- }
- });
-
- // Manually set the instance as the leader to allow testing
- // TODO change this when we get a leader election
- // intentSynchronizer.leaderChanged(true);
+ leadershipService.addListener(leadershipEventListener);
+ leadershipService.runForLeadership(appId.name());
bgpSessionManager = new BgpSessionManager(router);
// TODO: the local BGP listen port number should be configurable
@@ -124,17 +118,16 @@
@Deactivate
protected void deactivate() {
- isShutdown = true;
bgpSessionManager.stop();
router.stop();
peerConnectivity.stop();
intentSynchronizer.stop();
- // Stop the thread(s)
- leaderElectionExecutor.shutdownNow();
+ leadershipService.withdraw(appId.name());
+ leadershipService.removeListener(leadershipEventListener);
- log.info("Stopped");
+ log.info("SDN-IP Stopped");
}
@Override
@@ -162,63 +155,38 @@
}
/**
- * Performs the leader election.
+ * A listener for Leadership Events.
*/
- private void doLeaderElectionThread() {
+ private class InnerLeadershipEventListener
+ implements LeadershipEventListener {
- //
- // Try to acquire the lock and keep extending it until the instance
- // is shutdown.
- //
- while (!isShutdown) {
- log.debug("SDN-IP Leader Election begin");
+ @Override
+ public void event(LeadershipEvent event) {
+ log.debug("Leadership Event: time = {} type = {} event = {}",
+ event.time(), event.type(), event);
- // Block until it becomes the leader
- try {
- leaderLock.lock(LEASE_DURATION_MS);
+ if (!event.subject().topic().equals(appId.name())) {
+ return; // Not our topic: ignore
+ }
+ if (!event.subject().leader().id().equals(
+ localControllerNode.id())) {
+ return; // The event is not about this instance: ignore
+ }
- // This instance is the leader
+ switch (event.type()) {
+ case LEADER_ELECTED:
log.info("SDN-IP Leader Elected");
intentSynchronizer.leaderChanged(true);
-
- // Keep extending the expiration until shutdown
- int extensionFailedCountdown = LEASE_EXTEND_RETRY_MAX - 1;
-
- //
- // Keep periodically extending the lock expiration.
- // If there are multiple back-to-back failures to extend (with
- // extra sleep time between retrials), then release the lock.
- //
- while (!isShutdown) {
- Thread.sleep(LEASE_DURATION_MS / LEASE_EXTEND_RETRY_MAX);
- if (leaderLock.extendExpiration(LEASE_DURATION_MS)) {
- log.trace("SDN-IP Leader Extended");
- extensionFailedCountdown = LEASE_EXTEND_RETRY_MAX;
- } else {
- log.debug("SDN-IP Leader Cannot Extend Election");
- if (!leaderLock.isLocked()) {
- log.debug("SDN-IP Leader Lock Lost");
- intentSynchronizer.leaderChanged(false);
- break; // Try again to get the lock
- }
- extensionFailedCountdown--;
- if (extensionFailedCountdown <= 0) {
- // Failed too many times to extend.
- // Release the lock.
- log.debug("SDN-IP Leader Lock Released");
- intentSynchronizer.leaderChanged(false);
- leaderLock.unlock();
- break; // Try again to get the lock
- }
- }
- }
- } catch (InterruptedException e) {
- // Thread interrupted. Time to shutdown
- log.debug("SDN-IP Leader Interrupted");
+ break;
+ case LEADER_BOOTED:
+ log.info("SDN-IP Leader Lost Election");
+ intentSynchronizer.leaderChanged(false);
+ break;
+ case LEADER_REELECTED:
+ break;
+ default:
+ break;
}
}
- // If we reach here, the instance was shutdown
- intentSynchronizer.leaderChanged(false);
- leaderLock.unlock();
}
}