LeaderShip implementation in L3vpn app.

Change-Id: Ifd109ca19b5559d109f8bf1e8edb56eaa7dd4f0e
diff --git a/apps/l3vpn/BUCK b/apps/l3vpn/BUCK
index 1c6d6ef..06da696 100644
--- a/apps/l3vpn/BUCK
+++ b/apps/l3vpn/BUCK
@@ -3,8 +3,8 @@
 ]
 
 BUNDLES = [
-  '//apps/l3vpn/netl3vpn:onos-apps-l3vpn-netl3vpn',
   '//apps/l3vpn/yangmodel:onos-apps-l3vpn-yangmodel',
+  '//apps/l3vpn/netl3vpn:onos-apps-l3vpn-netl3vpn',
 ]
 
 onos_app(
diff --git a/apps/l3vpn/netl3vpn/src/main/java/org/onosproject/l3vpn/netl3vpn/impl/NetL3vpnManager.java b/apps/l3vpn/netl3vpn/src/main/java/org/onosproject/l3vpn/netl3vpn/impl/NetL3vpnManager.java
index 536a573..e13057e 100644
--- a/apps/l3vpn/netl3vpn/src/main/java/org/onosproject/l3vpn/netl3vpn/impl/NetL3vpnManager.java
+++ b/apps/l3vpn/netl3vpn/src/main/java/org/onosproject/l3vpn/netl3vpn/impl/NetL3vpnManager.java
@@ -20,9 +20,15 @@
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipEventListener;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.config.DynamicConfigEvent;
 import org.onosproject.config.DynamicConfigListener;
 import org.onosproject.config.DynamicConfigService;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.core.IdGenerator;
 import org.onosproject.l3vpn.netl3vpn.AccessInfo;
@@ -99,6 +105,7 @@
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.config.DynamicConfigEvent.Type.NODE_ADDED;
 import static org.onosproject.config.DynamicConfigEvent.Type.NODE_DELETED;
+import static org.onosproject.l3vpn.netl3vpn.VpnType.HUB;
 import static org.onosproject.l3vpn.netl3vpn.impl.BgpConstructionUtil.createBgpInfo;
 import static org.onosproject.l3vpn.netl3vpn.impl.InsConstructionUtil.createInstance;
 import static org.onosproject.l3vpn.netl3vpn.impl.IntConstructionUtil.createInterface;
@@ -126,7 +133,6 @@
 import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.getResourceData;
 import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.getRole;
 import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.getVpnCreateModObj;
-import static org.onosproject.l3vpn.netl3vpn.VpnType.HUB;
 import static org.onosproject.yang.runtime.helperutils.YangApacheUtils.getYangModel;
 
 /**
@@ -164,10 +170,21 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected NetL3VpnStore l3VpnStore;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
     protected IdGenerator l3VpnIdGen;
+    private NodeId localNodeId;
+    private ApplicationId appId;
 
     private DynamicConfigListener configListener = new InternalConfigListener();
 
+    private final InternalLeadershipListener leadershipEventListener =
+            new InternalLeadershipListener();
+
     private ModelRegistrationParam regParam;
 
     private ExecutorService executor = Executors.newSingleThreadExecutor(
@@ -176,11 +193,19 @@
     private ResourceId id;
     private ResourceId module;
     private ResourceId sites;
+    private boolean isElectedLeader = false;
+    private boolean isActivatedLeader = false;
 
     @Activate
     protected void activate() {
-        coreService.registerApplication(APP_ID);
+        appId = coreService.registerApplication(APP_ID);
         l3VpnIdGen = coreService.getIdGenerator(L3_VPN_ID_TOPIC);
+
+        localNodeId = clusterService.getLocalNode().id();
+
+        leadershipService.addListener(leadershipEventListener);
+        leadershipService.runForLeadership(appId.name());
+
         registerModel();
         getResourceId();
         configService.addListener(configListener);
@@ -191,11 +216,15 @@
     protected void deactivate() {
         modelRegistry.unregisterModel(regParam);
         configService.removeListener(configListener);
+
+        leadershipService.withdraw(appId.name());
+        leadershipService.removeListener(leadershipEventListener);
+
         log.info("Stopped");
     }
 
     private void registerModel() {
-        YangModel model = getYangModel(getClass());
+        YangModel model = getYangModel(IetfInetTypes.class);
         Iterator<YangModuleId> it = model.getYangModulesId().iterator();
 
         //Create model registration param.
@@ -278,7 +307,9 @@
         DefaultModelObjectData.Builder data = DefaultModelObjectData.builder()
                 .identifier(modelId);
         ResourceData resData = modelConverter.createDataNode(data.build());
-        return resData.resourceId();
+        //TODO: change this when yang runtime manager implements model
+        // converter api.
+        return null;
     }
 
     /**
@@ -288,16 +319,30 @@
      * @param storeId store resource id
      */
     private void processCreateFromStore(ResourceId storeId) {
-        List<ModelObject> objects = getModelObjects(storeId, module);
-        for (ModelObject obj : objects) {
-            if (obj instanceof DefaultL3VpnSvc) {
-                DefaultL3VpnSvc l3VpnSvc = (DefaultL3VpnSvc) obj;
-                createGlobalConfig(l3VpnSvc);
+        if (isElectedLeader && isActivatedLeader) {
+            List<ModelObject> objects = getModelObjects(storeId, module);
+            for (ModelObject obj : objects) {
+                if (obj instanceof DefaultL3VpnSvc) {
+                    DefaultL3VpnSvc l3VpnSvc = (DefaultL3VpnSvc) obj;
+                    createGlobalConfig(l3VpnSvc);
+                }
             }
         }
     }
 
     /**
+     * Processes delete request from the store, by taking the root object.
+     * The root object is then used for l3VPN processing.
+     *
+     * @param storeId store resource id
+     */
+    private void processDeleteFromStore(ResourceId storeId) {
+        if (isElectedLeader && isActivatedLeader) {
+            //TODO: add delete logic here.
+        }
+    }
+
+    /**
      * Returns model objects of the store. The data node is read from the
      * config store. This returns the resource id's node. So the node's
      * parent resource id is taken and the data node is given to model
@@ -863,4 +908,52 @@
             });
         }
     }
+
+    /**
+     * Signals that the leadership has changed.
+     *
+     * @param isLeader true if this instance is now the leader, otherwise false
+     */
+    private void leaderChanged(boolean isLeader) {
+        log.debug("Leader changed: {}", isLeader);
+
+        if (!isLeader) {
+            isElectedLeader = false;
+            isActivatedLeader = false;
+            // Nothing to do
+            return;
+        }
+        isActivatedLeader = false;
+        isElectedLeader = true;
+    }
+
+    /**
+     * A listener for leadership events.
+     */
+    private class InternalLeadershipListener implements LeadershipEventListener {
+
+        @Override
+        public boolean isRelevant(LeadershipEvent event) {
+            return event.subject().topic().equals(appId.name());
+        }
+
+        @Override
+        public void event(LeadershipEvent event) {
+            switch (event.type()) {
+                case LEADER_CHANGED:
+                case LEADER_AND_CANDIDATES_CHANGED:
+                    if (localNodeId.equals(event.subject().leaderNodeId())) {
+                        log.info("Net l3vpn manager gained leadership");
+                        leaderChanged(true);
+                    } else {
+                        log.info("Net l3vpn manager leader changed. New " +
+                                         "leader is {}", event.subject()
+                                         .leaderNodeId());
+                        leaderChanged(false);
+                    }
+                default:
+                    break;
+            }
+        }
+    }
 }