Added Hazelcast-based Leadership implementation that is needed/used by SDN-IP.
This is a drop-in replacement until the generic ONOS Leadership service
is robust enough.

Change-Id: I72a84331dd948f98707eb59844dab425aa9d5c08
diff --git a/apps/sdnip/pom.xml b/apps/sdnip/pom.xml
index 4877d4f..a37c087 100644
--- a/apps/sdnip/pom.xml
+++ b/apps/sdnip/pom.xml
@@ -85,6 +85,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.onlab.onos</groupId>
+      <artifactId>onos-core-dist</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.karaf.shell</groupId>
       <artifactId>org.apache.karaf.shell.console</artifactId>
     </dependency>
diff --git a/apps/sdnip/pom.xml.orig b/apps/sdnip/pom.xml.orig
new file mode 100644
index 0000000..4877d4f
--- /dev/null
+++ b/apps/sdnip/pom.xml.orig
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2014 Open Networking Laboratory
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.onlab.onos</groupId>
+    <artifactId>onos-apps</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>onos-app-sdnip</artifactId>
+  <packaging>bundle</packaging>
+
+  <description>SDN-IP peering application</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <version>2.4.2</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-collections4</artifactId>
+      <version>4.0</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.onlab.onos</groupId>
+      <artifactId>onlab-thirdparty</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.onlab.onos</groupId>
+      <artifactId>onlab-misc</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.onlab.onos</groupId>
+      <artifactId>onlab-junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.onlab.onos</groupId>
+      <artifactId>onos-api</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+
+    <dependency>
+      <groupId>org.onlab.onos</groupId>
+      <artifactId>onos-cli</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.karaf.shell</groupId>
+      <artifactId>org.apache.karaf.shell.console</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.osgi</groupId>
+      <artifactId>org.osgi.core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
index 5c19c33..d143b25 100644
--- a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
@@ -29,15 +29,17 @@
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.cluster.LeadershipEvent;
 import org.onlab.onos.cluster.LeadershipEventListener;
-import org.onlab.onos.cluster.LeadershipService;
+// import org.onlab.onos.cluster.LeadershipService;
 import org.onlab.onos.core.ApplicationId;
 import org.onlab.onos.core.CoreService;
+import org.onlab.onos.event.EventDeliveryService;
 import org.onlab.onos.net.host.HostService;
 import org.onlab.onos.net.intent.IntentService;
 import org.onlab.onos.sdnip.bgp.BgpRouteEntry;
 import org.onlab.onos.sdnip.bgp.BgpSession;
 import org.onlab.onos.sdnip.bgp.BgpSessionManager;
 import org.onlab.onos.sdnip.config.SdnIpConfigReader;
+import org.onlab.onos.store.hz.StoreService;
 
 import org.slf4j.Logger;
 
@@ -68,7 +70,13 @@
     protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LeadershipService leadershipService;
+    protected StoreService storeService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventDeliveryService eventDispatcher;
+
+    //    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected SdnIpLeadershipService leadershipService;
 
     private IntentSynchronizer intentSynchronizer;
     private SdnIpConfigReader config;
@@ -77,7 +85,7 @@
     private BgpSessionManager bgpSessionManager;
     private LeadershipEventListener leadershipEventListener =
         new InnerLeadershipEventListener();
-    ApplicationId appId;
+    private ApplicationId appId;
     private ControllerNode localControllerNode;
 
     @Activate
@@ -106,6 +114,10 @@
                             interfaceService, hostService);
         router.start();
 
+        leadershipService = new SdnIpLeadershipService(clusterService,
+                                                       storeService,
+                                                       eventDispatcher);
+        leadershipService.start();
         leadershipService.addListener(leadershipEventListener);
         leadershipService.runForLeadership(appId.name());
 
@@ -126,6 +138,7 @@
 
         leadershipService.withdraw(appId.name());
         leadershipService.removeListener(leadershipEventListener);
+        leadershipService.stop();
 
         log.info("SDN-IP Stopped");
     }
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIpLeadershipService.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIpLeadershipService.java
new file mode 100644
index 0000000..81535f5
--- /dev/null
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIpLeadershipService.java
@@ -0,0 +1,421 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.onos.sdnip;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.Leadership;
+import org.onlab.onos.cluster.LeadershipEvent;
+import org.onlab.onos.cluster.LeadershipEventListener;
+import org.onlab.onos.cluster.LeadershipService;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.event.AbstractListenerRegistry;
+import org.onlab.onos.event.EventDeliveryService;
+import org.onlab.onos.store.hz.StoreService;
+import org.onlab.onos.store.serializers.KryoNamespaces;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoNamespace;
+import static org.onlab.util.Tools.namedThreads;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
+import com.hazelcast.config.TopicConfig;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Distributed implementation of LeadershipService that is based on Hazelcast.
+ * <p>
+ * The election is eventually-consistent: if there is Hazelcast partitioning,
+ * and the partitioning is healed, there could be a short window of time
+ * until the leaders in each partition discover each other. If this happens,
+ * the leaders release the leadership and run again for election.
+ * </p>
+ * <p>
+ * The leader election is based on Hazelcast's Global Lock, which is stongly
+ * consistent. In addition, each leader periodically advertises events
+ * (using a Hazelcast Topic) that it is the elected leader. Those events are
+ * used for two purposes: (1) Discover multi-leader collisions (in case of
+ * healed Hazelcast partitions), and (2) Inform all listeners who is
+ * the current leader (e.g., for informational purpose).
+ * </p>
+ */
+public class SdnIpLeadershipService implements LeadershipService {
+    private static final Logger log =
+        LoggerFactory.getLogger(SdnIpLeadershipService.class);
+
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .build()
+                .populate(1);
+        }
+    };
+
+    private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
+    private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000;  // 15s
+
+    private ClusterService clusterService;
+    private StoreService storeService;
+    private EventDeliveryService eventDispatcher;
+
+    private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
+        listenerRegistry;
+    private final Map<String, Topic> topics = Maps.newConcurrentMap();
+    private ControllerNode localNode;
+
+    /**
+     * Constructor.
+     *
+     * @param clusterService the cluster service to use
+     * @param storeService the store service to use
+     * @param eventDispatcher the event dispacher to use
+     */
+    SdnIpLeadershipService(ClusterService clusterService,
+                           StoreService storeService,
+                           EventDeliveryService eventDispatcher) {
+        this.clusterService = clusterService;
+        this.storeService = storeService;
+        this.eventDispatcher = eventDispatcher;
+    }
+
+    /**
+     * Starts operation.
+     */
+    void start() {
+        localNode = clusterService.getLocalNode();
+        listenerRegistry = new AbstractListenerRegistry<>();
+        eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
+
+        log.info("SDN-IP Leadership Service started");
+    }
+
+    /**
+     * Stops operation.
+     */
+    void stop() {
+        eventDispatcher.removeSink(LeadershipEvent.class);
+
+        for (Topic topic : topics.values()) {
+            topic.stop();
+        }
+        topics.clear();
+
+        log.info("SDN-IP Leadership Service stopped");
+    }
+
+    @Override
+    public ControllerNode getLeader(String path) {
+        Topic topic = topics.get(path);
+        if (topic == null) {
+            return null;
+        }
+        return topic.leader();
+    }
+
+    @Override
+    public void runForLeadership(String path) {
+        checkArgument(path != null);
+        Topic topic = new Topic(path);
+        Topic oldTopic = topics.putIfAbsent(path, topic);
+        if (oldTopic == null) {
+            topic.start();
+        }
+    }
+
+    @Override
+    public void withdraw(String path) {
+        checkArgument(path != null);
+        Topic topic = topics.get(path);
+        if (topic != null) {
+            topic.stop();
+            topics.remove(path, topic);
+        }
+    }
+
+    @Override
+    public void addListener(LeadershipEventListener listener) {
+        listenerRegistry.addListener(listener);
+    }
+
+    @Override
+    public void removeListener(LeadershipEventListener listener) {
+        listenerRegistry.removeListener(listener);
+    }
+
+    /**
+     * Class for keeping per-topic information.
+     */
+    private final class Topic implements MessageListener<byte[]> {
+        private final String topicName;
+        private volatile boolean isShutdown = true;
+        private volatile long lastLeadershipUpdateMs = 0;
+        private ExecutorService leaderElectionExecutor;
+
+        private ControllerNode leader;
+        private Lock leaderLock;
+        private Future<?> getLockFuture;
+        private Future<?> periodicProcessingFuture;
+        private ITopic<byte[]> leaderTopic;
+        private String leaderTopicRegistrationId;
+
+        /**
+         * Constructor.
+         *
+         * @param topicName the topic name
+         */
+        private Topic(String topicName) {
+            this.topicName = topicName;
+        }
+
+        /**
+         * Gets the leader for the topic.
+         *
+         * @return the leader for the topic
+         */
+        private ControllerNode leader() {
+            return leader;
+        }
+
+        /**
+         * Starts leadership election for the topic.
+         */
+        private void start() {
+            isShutdown = false;
+            String lockHzId = "LeadershipService/" + topicName + "/lock";
+            String topicHzId = "LeadershipService/" + topicName + "/topic";
+            leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
+
+            String threadPoolName =
+                "sdnip-leader-election-" + topicName + "-%d";
+            leaderElectionExecutor = Executors.newScheduledThreadPool(2,
+                                        namedThreads(threadPoolName));
+
+            TopicConfig topicConfig = new TopicConfig();
+            topicConfig.setGlobalOrderingEnabled(true);
+            topicConfig.setName(topicHzId);
+            storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
+
+            leaderTopic =
+                storeService.getHazelcastInstance().getTopic(topicHzId);
+            leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
+
+            getLockFuture = leaderElectionExecutor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        doLeaderElectionThread();
+                    }
+                });
+            periodicProcessingFuture =
+                leaderElectionExecutor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        doPeriodicProcessing();
+                    }
+                });
+        }
+
+        /**
+         * Stops leadership election for the topic.
+         */
+        private void stop() {
+            isShutdown = true;
+            leaderTopic.removeMessageListener(leaderTopicRegistrationId);
+            // getLockFuture.cancel(true);
+            // periodicProcessingFuture.cancel(true);
+            leaderElectionExecutor.shutdownNow();
+        }
+
+        @Override
+        public void onMessage(Message<byte[]> message) {
+            LeadershipEvent leadershipEvent =
+                SERIALIZER.decode(message.getMessageObject());
+            NodeId eventLeaderId = leadershipEvent.subject().leader().id();
+
+            log.debug("SDN-IP Leadership Event: time = {} type = {} event = {}",
+                      leadershipEvent.time(), leadershipEvent.type(),
+                      leadershipEvent);
+            if (!leadershipEvent.subject().topic().equals(topicName)) {
+                return;         // Not our topic: ignore
+            }
+            if (eventLeaderId.equals(localNode.id())) {
+                return;         // My own message: ignore
+            }
+
+            synchronized (this) {
+                switch (leadershipEvent.type()) {
+                case LEADER_ELECTED:
+                    // FALLTHROUGH
+                case LEADER_REELECTED:
+                    //
+                    // Another leader: if we are also a leader, then give up
+                    // leadership and run for re-election.
+                    //
+                    if ((leader != null) &&
+                        leader.id().equals(localNode.id())) {
+                        getLockFuture.cancel(true);
+                    } else {
+                        // Just update the current leader
+                        leader = leadershipEvent.subject().leader();
+                        lastLeadershipUpdateMs = System.currentTimeMillis();
+                    }
+                    eventDispatcher.post(leadershipEvent);
+                    break;
+                case LEADER_BOOTED:
+                    // Remove the state for the current leader
+                    if ((leader != null) &&
+                        eventLeaderId.equals(leader.id())) {
+                        leader = null;
+                    }
+                    eventDispatcher.post(leadershipEvent);
+                    break;
+                default:
+                    break;
+                }
+            }
+        }
+
+        private void doPeriodicProcessing() {
+
+            while (!isShutdown) {
+
+                //
+                // Periodic tasks:
+                // (a) Advertise ourselves as the leader
+                //   OR
+                // (b) Expire a stale (remote) leader
+                //
+                synchronized (this) {
+                    LeadershipEvent leadershipEvent;
+                    if (leader != null) {
+                        if (leader.id().equals(localNode.id())) {
+                            //
+                            // Advertise ourselves as the leader
+                            //
+                            leadershipEvent = new LeadershipEvent(
+                                LeadershipEvent.Type.LEADER_REELECTED,
+                                new Leadership(topicName, localNode, 0));
+                            // Dispatch to all remote instances
+                            leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+                        } else {
+                            //
+                            // Test if time to expire a stale leader
+                            //
+                            long delta = System.currentTimeMillis() -
+                                lastLeadershipUpdateMs;
+                            if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
+                                leadershipEvent = new LeadershipEvent(
+                                        LeadershipEvent.Type.LEADER_BOOTED,
+                                        new Leadership(topicName, leader, 0));
+                                // Dispatch only to the local listener(s)
+                                eventDispatcher.post(leadershipEvent);
+                                leader = null;
+                            }
+                        }
+                    }
+                }
+
+                // Sleep before re-advertising
+                try {
+                    Thread.sleep(LEADERSHIP_PERIODIC_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                    log.debug("SDN-IP Leader Election periodic thread interrupted");
+                }
+            }
+        }
+
+        /**
+         * Performs the leader election by using Hazelcast.
+         */
+        private void doLeaderElectionThread() {
+
+            while (!isShutdown) {
+                LeadershipEvent leadershipEvent;
+                //
+                // Try to acquire the lock and keep it until the instance is
+                // shutdown.
+                //
+                log.debug("SDN-IP Leader Election begin for topic {}",
+                          topicName);
+                try {
+                    // Block until it becomes the leader
+                    leaderLock.lockInterruptibly();
+                } catch (InterruptedException e) {
+                    //
+                    // Thread interrupted. Either shutdown or run for
+                    // re-election.
+                    //
+                    log.debug("SDN-IP Election interrupted for topic {}",
+                              topicName);
+                    continue;
+                }
+
+                synchronized (this) {
+                    //
+                    // This instance is now the leader
+                    //
+                    log.info("SDN-IP Leader Elected for topic {}", topicName);
+                    leader = localNode;
+                    leadershipEvent = new LeadershipEvent(
+                        LeadershipEvent.Type.LEADER_ELECTED,
+                        new Leadership(topicName, localNode, 0));
+                    eventDispatcher.post(leadershipEvent);
+                    leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+                }
+
+                try {
+                    // Sleep forever until interrupted
+                    Thread.sleep(Long.MAX_VALUE);
+                } catch (InterruptedException e) {
+                    //
+                    // Thread interrupted. Either shutdown or run for
+                    // re-election.
+                    //
+                    log.debug("SDN-IP Leader Interrupted for topic {}",
+                              topicName);
+                }
+
+                synchronized (this) {
+                    // If we reach here, we should release the leadership
+                    log.debug("SDN-IP Leader Lock Released for topic {}",
+                              topicName);
+                    if ((leader != null) &&
+                        leader.id().equals(localNode.id())) {
+                        leader = null;
+                    }
+                    leadershipEvent = new LeadershipEvent(
+                                LeadershipEvent.Type.LEADER_BOOTED,
+                                new Leadership(topicName, localNode, 0));
+                    eventDispatcher.post(leadershipEvent);
+                    leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+                    leaderLock.unlock();
+                }
+            }
+        }
+    }
+}