[ONOS-6810] Implement Mastership handling in general DeviceProvider

Change-Id: I14b706d364cf5124da248230fbcda65d0bd284ce
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index a77dc33..824ec79 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -30,7 +30,11 @@
 import org.onlab.util.ItemNotFoundException;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.Device;
@@ -110,6 +114,10 @@
 public class GeneralDeviceProvider extends AbstractProvider
         implements DeviceProvider {
     public static final String DRIVER = "driver";
+    public static final int REACHABILITY_TIMEOUT = 10;
+    public static final String DEPLOY = "deploy-";
+    public static final String PIPECONF_TOPIC = "-pipeconf";
+
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -131,8 +139,17 @@
     protected DriverService driverService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PiPipeconfService piPipeconfService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
     private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
     @Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
             label = "Configure poll frequency for port status and statistics; " +
@@ -257,17 +274,26 @@
 
     @Override
     public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
-        log.debug("Received role {} for device {}", newRole, deviceId);
+        log.info("Received role {} for device {}", newRole, deviceId);
         CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
-        roleReply.thenAcceptAsync(mastership -> providerService.receivedRoleReply(deviceId, newRole, mastership));
+        roleReply.thenAcceptAsync(mastership -> {
+            providerService.receivedRoleReply(deviceId, newRole, mastership);
+            if (!mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) != null) {
+                scheduledTasks.get(deviceId).cancel(false);
+                scheduledTasks.remove(deviceId);
+            } else if (mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) == null) {
+                scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
+                updatePortStatistics(deviceId);
+            }
+        });
     }
 
     @Override
     public boolean isReachable(DeviceId deviceId) {
-        log.debug("Testing rechability for device {}", deviceId);
+        log.debug("Testing reachability for device {}", deviceId);
         CompletableFuture<Boolean> reachable = getHandshaker(deviceId).isReachable();
         try {
-            return reachable.get(10, TimeUnit.SECONDS);
+            return reachable.get(REACHABILITY_TIMEOUT, TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Device {} is not reachable", deviceId, e);
             return false;
@@ -368,15 +394,7 @@
                 return;
             }
 
-            //Storing deviceKeyId and all other config values
-            // as data in the driver with protocol_<info>
-            // name as the key. e.g protocol_ip
-            providerConfig.protocolsInfo()
-                    .forEach((protocol, deviceInfoConfig) -> {
-                        deviceInfoConfig.configValues()
-                                .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
-                        driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
-                    });
+            addConfigData(providerConfig, driverData);
 
             //Connecting to the device
             CompletableFuture<Boolean> connected = handshaker.connect();
@@ -409,7 +427,7 @@
                         ports = deviceDiscovery.discoverPortDetails();
                     }
 
-                    if (!handlePipeconf(deviceId, driver, driverData)) {
+                    if (!handlePipeconf(deviceId, driver, driverData, true)) {
                         // Something went wrong during handling of pipeconf.
                         // We already logged the error.
                         handshaker.disconnect();
@@ -425,11 +443,37 @@
         }
     }
 
+    private void connectStandbyDevice(DeviceId deviceId) {
+
+        //if device is pipeline programmable we merge pipeconf + base driver for every other role
+        GeneralProviderDeviceConfig providerConfig =
+                cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
+
+        Driver driver = getDriver(deviceId);
+
+
+        DriverData driverData = new DefaultDriverData(driver, deviceId);
+        DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
+        if (handshaker == null) {
+            log.error("Device {}, with driver {} does not support DeviceHandshaker " +
+                    "behaviour, supported behaviours={}", deviceId, driver.name(), driver.behaviours());
+            return;
+        }
+        addConfigData(providerConfig, driverData);
+
+        //Connecting to the device
+        handshaker.connect().thenAcceptAsync(result -> {
+            if (result) {
+                handlePipeconf(deviceId, driver, driverData, false);
+            }
+        });
+    }
+
     /**
      * Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
      * device can be registered to the core, false otherwise.
      */
-    private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData) {
+    private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData, boolean deployPipeconf) {
 
         PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
                 driverData);
@@ -439,6 +483,42 @@
             return true;
         }
 
+        PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
+
+        if (pipeconf != null) {
+
+            PiPipeconfId pipeconfId = pipeconf.id();
+
+            try {
+                if (deployPipeconf) {
+                    if (!pipelineProg.deployPipeconf(pipeconf).get()) {
+                        log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
+                                pipeconfId, deviceId);
+                        return false;
+                    }
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                log.warn("Exception occurred while deploying pipeconf {} to device {}", pipeconf.id(), deviceId, e);
+                return false;
+            }
+            try {
+                if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
+                    log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
+                            driver.name(), deviceId, pipeconfId);
+                    return false;
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                log.warn("Exception occurred while binding pipeconf {} to device {}", pipeconf.id(), deviceId, e);
+                return false;
+            }
+        } else {
+            return false;
+        }
+
+        return true;
+    }
+
+    private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
         PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> {
             // No pipeconf has been associated with this device.
             // Check if device driver provides a default one.
@@ -453,33 +533,16 @@
 
         if (pipeconfId == null) {
             log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
-            return false;
+            return null;
         }
 
         if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
             log.warn("Pipeconf {} is not registered", pipeconfId);
-            return false;
+            return null;
         }
 
 
-        PiPipeconf pipeconf = piPipeconfService.getPipeconf(pipeconfId).get();
-
-        try {
-            if (!pipelineProg.deployPipeconf(pipeconf).get()) {
-                log.error("Unable to deploy pipeconf {} to {}, aborting device discovery", pipeconfId, deviceId);
-                return false;
-            }
-
-            if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
-                log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
-                        driver.name(), deviceId, pipeconfId);
-                return false;
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            throw new IllegalStateException(e);
-        }
-
-        return true;
+        return piPipeconfService.getPipeconf(pipeconfId).get();
     }
 
     private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
@@ -492,7 +555,6 @@
         DeviceHandshaker handshaker = getHandshaker(deviceId);
         if (handshaker != null) {
             CompletableFuture<Boolean> disconnect = handshaker.disconnect();
-
             disconnect.thenAcceptAsync(result -> {
                 if (result) {
                     log.info("Disconnected device {}", deviceId);
@@ -560,6 +622,28 @@
                 log.info("Device {} is already connected to ONOS and is available", deviceId);
                 return;
             }
+            NodeId leaderNodeId = leadershipService.runForLeadership(DEPLOY + deviceId.toString() + PIPECONF_TOPIC)
+                    .leader().nodeId();
+            NodeId localNodeId = clusterService.getLocalNode().id();
+            if (localNodeId.equals(leaderNodeId)) {
+                if (processEvent(event, deviceId)) {
+                    log.debug("{} is leader for {}, initiating the connection and deploying pipeline", leaderNodeId,
+                            deviceId);
+                    checkAndSubmitDeviceTask(deviceId);
+                }
+            } else {
+                if (processEvent(event, deviceId)) {
+                    log.debug("{} is not leader for {}, initiating connection but not deploying pipeline, {} is LEADER",
+                            localNodeId, deviceId, leaderNodeId);
+                    connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
+                    //FIXME this will be removed when config is synced
+                    cleanUpConfigInfo(deviceId);
+                }
+            }
+
+        }
+
+        private boolean processEvent(NetworkConfigEvent event, DeviceId deviceId) {
             //FIXME to be removed when netcfg will issue device events in a bundle or
             // ensure all configuration needed is present
             Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
@@ -590,7 +674,7 @@
                 // in the pipelineConfigured
                 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
                         && pipelineConfigured.contains(deviceId)) {
-                    checkAndSubmitDeviceTask(deviceId);
+                    return true;
                 } else {
                     if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
                         log.debug("Waiting for pipeline configuration for device {}", deviceId);
@@ -604,6 +688,7 @@
                         log.debug("Only device configuration for device {}", deviceId);
                     }
                 }
+                return false;
             } finally {
                 lock.unlock();
             }
@@ -622,10 +707,26 @@
     private void checkAndSubmitDeviceTask(DeviceId deviceId) {
         connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
         //FIXME this will be removed when configuration is synced.
+        cleanUpConfigInfo(deviceId);
+
+    }
+
+    private void addConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
+        //Storing deviceKeyId and all other config values
+        // as data in the driver with protocol_<info>
+        // name as the key. e.g protocol_ip
+        providerConfig.protocolsInfo()
+                .forEach((protocol, deviceInfoConfig) -> {
+                    deviceInfoConfig.configValues()
+                            .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
+                    driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
+                });
+    }
+
+    private void cleanUpConfigInfo(DeviceId deviceId) {
         deviceConfigured.remove(deviceId);
         driverConfigured.remove(deviceId);
         pipelineConfigured.remove(deviceId);
-
     }
 
     private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
@@ -650,10 +751,13 @@
 
                 //For now this is scheduled periodically, when streaming API will
                 // be available we check and base it on the streaming API (e.g. gNMI)
-                scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
-                updatePortStatistics(deviceId);
+                if (mastershipService.isLocalMaster(deviceId)) {
+                    scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
+                    updatePortStatistics(deviceId);
+                }
 
             } else if (type.equals(Type.DEVICE_REMOVED)) {
+
                 connectionExecutor.submit(exceptionSafe(() ->
                         disconnectDevice(deviceId)));
             }