Added Hazelcast-based Leadership implementation that is needed/used by SDN-IP.
This is a drop-in replacement until the generic ONOS Leadership service
is robust enough.
Change-Id: I72a84331dd948f98707eb59844dab425aa9d5c08
diff --git a/apps/sdnip/pom.xml b/apps/sdnip/pom.xml
index 4877d4f..a37c087 100644
--- a/apps/sdnip/pom.xml
+++ b/apps/sdnip/pom.xml
@@ -85,6 +85,12 @@
</dependency>
<dependency>
+ <groupId>org.onlab.onos</groupId>
+ <artifactId>onos-core-dist</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.karaf.shell</groupId>
<artifactId>org.apache.karaf.shell.console</artifactId>
</dependency>
diff --git a/apps/sdnip/pom.xml.orig b/apps/sdnip/pom.xml.orig
new file mode 100644
index 0000000..4877d4f
--- /dev/null
+++ b/apps/sdnip/pom.xml.orig
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2014 Open Networking Laboratory
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onlab.onos</groupId>
+ <artifactId>onos-apps</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>onos-app-sdnip</artifactId>
+ <packaging>bundle</packaging>
+
+ <description>SDN-IP peering application</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>2.4.2</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ <version>4.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onlab.onos</groupId>
+ <artifactId>onlab-thirdparty</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onlab.onos</groupId>
+ <artifactId>onlab-misc</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onlab.onos</groupId>
+ <artifactId>onlab-junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onlab.onos</groupId>
+ <artifactId>onos-api</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onlab.onos</groupId>
+ <artifactId>onos-cli</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.karaf.shell</groupId>
+ <artifactId>org.apache.karaf.shell.console</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
index 5c19c33..d143b25 100644
--- a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
@@ -29,15 +29,17 @@
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEventListener;
-import org.onlab.onos.cluster.LeadershipService;
+// import org.onlab.onos.cluster.LeadershipService;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.core.CoreService;
+import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.sdnip.bgp.BgpRouteEntry;
import org.onlab.onos.sdnip.bgp.BgpSession;
import org.onlab.onos.sdnip.bgp.BgpSessionManager;
import org.onlab.onos.sdnip.config.SdnIpConfigReader;
+import org.onlab.onos.store.hz.StoreService;
import org.slf4j.Logger;
@@ -68,7 +70,13 @@
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LeadershipService leadershipService;
+ protected StoreService storeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDispatcher;
+
+ // @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected SdnIpLeadershipService leadershipService;
private IntentSynchronizer intentSynchronizer;
private SdnIpConfigReader config;
@@ -77,7 +85,7 @@
private BgpSessionManager bgpSessionManager;
private LeadershipEventListener leadershipEventListener =
new InnerLeadershipEventListener();
- ApplicationId appId;
+ private ApplicationId appId;
private ControllerNode localControllerNode;
@Activate
@@ -106,6 +114,10 @@
interfaceService, hostService);
router.start();
+ leadershipService = new SdnIpLeadershipService(clusterService,
+ storeService,
+ eventDispatcher);
+ leadershipService.start();
leadershipService.addListener(leadershipEventListener);
leadershipService.runForLeadership(appId.name());
@@ -126,6 +138,7 @@
leadershipService.withdraw(appId.name());
leadershipService.removeListener(leadershipEventListener);
+ leadershipService.stop();
log.info("SDN-IP Stopped");
}
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIpLeadershipService.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIpLeadershipService.java
new file mode 100644
index 0000000..81535f5
--- /dev/null
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIpLeadershipService.java
@@ -0,0 +1,421 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.onos.sdnip;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.Leadership;
+import org.onlab.onos.cluster.LeadershipEvent;
+import org.onlab.onos.cluster.LeadershipEventListener;
+import org.onlab.onos.cluster.LeadershipService;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.event.AbstractListenerRegistry;
+import org.onlab.onos.event.EventDeliveryService;
+import org.onlab.onos.store.hz.StoreService;
+import org.onlab.onos.store.serializers.KryoNamespaces;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoNamespace;
+import static org.onlab.util.Tools.namedThreads;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
+import com.hazelcast.config.TopicConfig;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Distributed implementation of LeadershipService that is based on Hazelcast.
+ * <p>
+ * The election is eventually-consistent: if there is Hazelcast partitioning,
+ * and the partitioning is healed, there could be a short window of time
+ * until the leaders in each partition discover each other. If this happens,
+ * the leaders release the leadership and run again for election.
+ * </p>
+ * <p>
+ * The leader election is based on Hazelcast's Global Lock, which is stongly
+ * consistent. In addition, each leader periodically advertises events
+ * (using a Hazelcast Topic) that it is the elected leader. Those events are
+ * used for two purposes: (1) Discover multi-leader collisions (in case of
+ * healed Hazelcast partitions), and (2) Inform all listeners who is
+ * the current leader (e.g., for informational purpose).
+ * </p>
+ */
+public class SdnIpLeadershipService implements LeadershipService {
+ private static final Logger log =
+ LoggerFactory.getLogger(SdnIpLeadershipService.class);
+
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .build()
+ .populate(1);
+ }
+ };
+
+ private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
+ private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
+
+ private ClusterService clusterService;
+ private StoreService storeService;
+ private EventDeliveryService eventDispatcher;
+
+ private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
+ listenerRegistry;
+ private final Map<String, Topic> topics = Maps.newConcurrentMap();
+ private ControllerNode localNode;
+
+ /**
+ * Constructor.
+ *
+ * @param clusterService the cluster service to use
+ * @param storeService the store service to use
+ * @param eventDispatcher the event dispacher to use
+ */
+ SdnIpLeadershipService(ClusterService clusterService,
+ StoreService storeService,
+ EventDeliveryService eventDispatcher) {
+ this.clusterService = clusterService;
+ this.storeService = storeService;
+ this.eventDispatcher = eventDispatcher;
+ }
+
+ /**
+ * Starts operation.
+ */
+ void start() {
+ localNode = clusterService.getLocalNode();
+ listenerRegistry = new AbstractListenerRegistry<>();
+ eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
+
+ log.info("SDN-IP Leadership Service started");
+ }
+
+ /**
+ * Stops operation.
+ */
+ void stop() {
+ eventDispatcher.removeSink(LeadershipEvent.class);
+
+ for (Topic topic : topics.values()) {
+ topic.stop();
+ }
+ topics.clear();
+
+ log.info("SDN-IP Leadership Service stopped");
+ }
+
+ @Override
+ public ControllerNode getLeader(String path) {
+ Topic topic = topics.get(path);
+ if (topic == null) {
+ return null;
+ }
+ return topic.leader();
+ }
+
+ @Override
+ public void runForLeadership(String path) {
+ checkArgument(path != null);
+ Topic topic = new Topic(path);
+ Topic oldTopic = topics.putIfAbsent(path, topic);
+ if (oldTopic == null) {
+ topic.start();
+ }
+ }
+
+ @Override
+ public void withdraw(String path) {
+ checkArgument(path != null);
+ Topic topic = topics.get(path);
+ if (topic != null) {
+ topic.stop();
+ topics.remove(path, topic);
+ }
+ }
+
+ @Override
+ public void addListener(LeadershipEventListener listener) {
+ listenerRegistry.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(LeadershipEventListener listener) {
+ listenerRegistry.removeListener(listener);
+ }
+
+ /**
+ * Class for keeping per-topic information.
+ */
+ private final class Topic implements MessageListener<byte[]> {
+ private final String topicName;
+ private volatile boolean isShutdown = true;
+ private volatile long lastLeadershipUpdateMs = 0;
+ private ExecutorService leaderElectionExecutor;
+
+ private ControllerNode leader;
+ private Lock leaderLock;
+ private Future<?> getLockFuture;
+ private Future<?> periodicProcessingFuture;
+ private ITopic<byte[]> leaderTopic;
+ private String leaderTopicRegistrationId;
+
+ /**
+ * Constructor.
+ *
+ * @param topicName the topic name
+ */
+ private Topic(String topicName) {
+ this.topicName = topicName;
+ }
+
+ /**
+ * Gets the leader for the topic.
+ *
+ * @return the leader for the topic
+ */
+ private ControllerNode leader() {
+ return leader;
+ }
+
+ /**
+ * Starts leadership election for the topic.
+ */
+ private void start() {
+ isShutdown = false;
+ String lockHzId = "LeadershipService/" + topicName + "/lock";
+ String topicHzId = "LeadershipService/" + topicName + "/topic";
+ leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
+
+ String threadPoolName =
+ "sdnip-leader-election-" + topicName + "-%d";
+ leaderElectionExecutor = Executors.newScheduledThreadPool(2,
+ namedThreads(threadPoolName));
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setGlobalOrderingEnabled(true);
+ topicConfig.setName(topicHzId);
+ storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
+
+ leaderTopic =
+ storeService.getHazelcastInstance().getTopic(topicHzId);
+ leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
+
+ getLockFuture = leaderElectionExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ doLeaderElectionThread();
+ }
+ });
+ periodicProcessingFuture =
+ leaderElectionExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ doPeriodicProcessing();
+ }
+ });
+ }
+
+ /**
+ * Stops leadership election for the topic.
+ */
+ private void stop() {
+ isShutdown = true;
+ leaderTopic.removeMessageListener(leaderTopicRegistrationId);
+ // getLockFuture.cancel(true);
+ // periodicProcessingFuture.cancel(true);
+ leaderElectionExecutor.shutdownNow();
+ }
+
+ @Override
+ public void onMessage(Message<byte[]> message) {
+ LeadershipEvent leadershipEvent =
+ SERIALIZER.decode(message.getMessageObject());
+ NodeId eventLeaderId = leadershipEvent.subject().leader().id();
+
+ log.debug("SDN-IP Leadership Event: time = {} type = {} event = {}",
+ leadershipEvent.time(), leadershipEvent.type(),
+ leadershipEvent);
+ if (!leadershipEvent.subject().topic().equals(topicName)) {
+ return; // Not our topic: ignore
+ }
+ if (eventLeaderId.equals(localNode.id())) {
+ return; // My own message: ignore
+ }
+
+ synchronized (this) {
+ switch (leadershipEvent.type()) {
+ case LEADER_ELECTED:
+ // FALLTHROUGH
+ case LEADER_REELECTED:
+ //
+ // Another leader: if we are also a leader, then give up
+ // leadership and run for re-election.
+ //
+ if ((leader != null) &&
+ leader.id().equals(localNode.id())) {
+ getLockFuture.cancel(true);
+ } else {
+ // Just update the current leader
+ leader = leadershipEvent.subject().leader();
+ lastLeadershipUpdateMs = System.currentTimeMillis();
+ }
+ eventDispatcher.post(leadershipEvent);
+ break;
+ case LEADER_BOOTED:
+ // Remove the state for the current leader
+ if ((leader != null) &&
+ eventLeaderId.equals(leader.id())) {
+ leader = null;
+ }
+ eventDispatcher.post(leadershipEvent);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private void doPeriodicProcessing() {
+
+ while (!isShutdown) {
+
+ //
+ // Periodic tasks:
+ // (a) Advertise ourselves as the leader
+ // OR
+ // (b) Expire a stale (remote) leader
+ //
+ synchronized (this) {
+ LeadershipEvent leadershipEvent;
+ if (leader != null) {
+ if (leader.id().equals(localNode.id())) {
+ //
+ // Advertise ourselves as the leader
+ //
+ leadershipEvent = new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_REELECTED,
+ new Leadership(topicName, localNode, 0));
+ // Dispatch to all remote instances
+ leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+ } else {
+ //
+ // Test if time to expire a stale leader
+ //
+ long delta = System.currentTimeMillis() -
+ lastLeadershipUpdateMs;
+ if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
+ leadershipEvent = new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_BOOTED,
+ new Leadership(topicName, leader, 0));
+ // Dispatch only to the local listener(s)
+ eventDispatcher.post(leadershipEvent);
+ leader = null;
+ }
+ }
+ }
+ }
+
+ // Sleep before re-advertising
+ try {
+ Thread.sleep(LEADERSHIP_PERIODIC_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ log.debug("SDN-IP Leader Election periodic thread interrupted");
+ }
+ }
+ }
+
+ /**
+ * Performs the leader election by using Hazelcast.
+ */
+ private void doLeaderElectionThread() {
+
+ while (!isShutdown) {
+ LeadershipEvent leadershipEvent;
+ //
+ // Try to acquire the lock and keep it until the instance is
+ // shutdown.
+ //
+ log.debug("SDN-IP Leader Election begin for topic {}",
+ topicName);
+ try {
+ // Block until it becomes the leader
+ leaderLock.lockInterruptibly();
+ } catch (InterruptedException e) {
+ //
+ // Thread interrupted. Either shutdown or run for
+ // re-election.
+ //
+ log.debug("SDN-IP Election interrupted for topic {}",
+ topicName);
+ continue;
+ }
+
+ synchronized (this) {
+ //
+ // This instance is now the leader
+ //
+ log.info("SDN-IP Leader Elected for topic {}", topicName);
+ leader = localNode;
+ leadershipEvent = new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_ELECTED,
+ new Leadership(topicName, localNode, 0));
+ eventDispatcher.post(leadershipEvent);
+ leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+ }
+
+ try {
+ // Sleep forever until interrupted
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e) {
+ //
+ // Thread interrupted. Either shutdown or run for
+ // re-election.
+ //
+ log.debug("SDN-IP Leader Interrupted for topic {}",
+ topicName);
+ }
+
+ synchronized (this) {
+ // If we reach here, we should release the leadership
+ log.debug("SDN-IP Leader Lock Released for topic {}",
+ topicName);
+ if ((leader != null) &&
+ leader.id().equals(localNode.id())) {
+ leader = null;
+ }
+ leadershipEvent = new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_BOOTED,
+ new Leadership(topicName, localNode, 0));
+ eventDispatcher.post(leadershipEvent);
+ leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+ leaderLock.unlock();
+ }
+ }
+ }
+ }
+}