[ONOS-6810] Implement Mastership handling in general DeviceProvider

Change-Id: I14b706d364cf5124da248230fbcda65d0bd284ce
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index d60e9b3..212cf1c 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -28,6 +28,9 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.ItemNotFoundException;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigEvent;
@@ -79,6 +82,9 @@
     protected NetworkConfigRegistry cfgService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DriverService driverService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -87,6 +93,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PiPipeconfMappingStore pipeconfMappingStore;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
     // Registered pipeconf are replicated through the app subsystem and registered on app activated events.
     protected ConcurrentHashMap<PiPipeconfId, PiPipeconf> piPipeconfs = new ConcurrentHashMap<>();
 
@@ -200,21 +209,28 @@
                     // due to 1:1:1 pipeconf:driver:provider maybe find better way
                     DriverProvider provider = new PiPipeconfDriverProviderInternal(completeDriver);
 
-                    //we register to the dirver susbystem the driver provider containing the merged driver
+                    //we register to the driver susbystem the driver provider containing the merged driver
                     driverAdminService.registerProvider(provider);
                 }
 
                 // Changing the configuration for the device to enforce the full driver with pipipeconf
-                // and base behaviours
-                ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
-                newCfg = newCfg.put(DRIVER, completeDriverName);
-                ObjectMapper mapper = new ObjectMapper();
-                JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
-                cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
-                // Completable future is needed for when this method will also apply the pipeline to the device.
-                // FIXME (maybe): the pipeline is currently applied by the general device provider. But we store here
-                // the association between device and pipeconf.
-                pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
+                // and base behaviours, updating binding only first time something changes
+                NodeId leaderNodeId = leadershipService.getLeader("deploy-" +
+                        deviceId.toString() + "-pipeconf");
+                NodeId localNodeId = clusterService.getLocalNode().id();
+
+                if (!basicDeviceConfig.driver().equals(completeDriverName) && localNodeId.equals(leaderNodeId)) {
+                    ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
+                    newCfg = newCfg.put(DRIVER, completeDriverName);
+                    ObjectMapper mapper = new ObjectMapper();
+                    JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
+                    log.debug("New driver {} for device {}", completeDriverName, deviceId);
+                    cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
+                    // Completable future is needed for when this method will also apply the pipeline to the device.
+                    // FIXME (maybe): the pipeline is currently applied by the general device provider.
+                    // But we store here the association between device and pipeconf.
+                    pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
+                }
                 operationResult.complete(true);
             }
         });
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
index 7e41a99..f49ad18 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
@@ -66,7 +66,7 @@
 
         client = controller.getClient(deviceId);
         if (client == null || !controller.isReacheable(deviceId)) {
-            result.complete(MastershipRole.STANDBY);
+            result.complete(MastershipRole.NONE);
             return result;
         }
         if (newRole.equals(MastershipRole.MASTER)) {
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
index 91b9db0..269936a 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
@@ -168,6 +168,8 @@
                 doDummyMessage(channels.get(channelId));
                 return true;
             } catch (IOException e) {
+                log.warn("Error in sending dummy message to device {}", channelId);
+                log.debug("Exception ", e);
                 return false;
             }
         } finally {
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index a77dc33..824ec79 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -30,7 +30,11 @@
 import org.onlab.util.ItemNotFoundException;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.Device;
@@ -110,6 +114,10 @@
 public class GeneralDeviceProvider extends AbstractProvider
         implements DeviceProvider {
     public static final String DRIVER = "driver";
+    public static final int REACHABILITY_TIMEOUT = 10;
+    public static final String DEPLOY = "deploy-";
+    public static final String PIPECONF_TOPIC = "-pipeconf";
+
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -131,8 +139,17 @@
     protected DriverService driverService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PiPipeconfService piPipeconfService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
     private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
     @Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
             label = "Configure poll frequency for port status and statistics; " +
@@ -257,17 +274,26 @@
 
     @Override
     public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
-        log.debug("Received role {} for device {}", newRole, deviceId);
+        log.info("Received role {} for device {}", newRole, deviceId);
         CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
-        roleReply.thenAcceptAsync(mastership -> providerService.receivedRoleReply(deviceId, newRole, mastership));
+        roleReply.thenAcceptAsync(mastership -> {
+            providerService.receivedRoleReply(deviceId, newRole, mastership);
+            if (!mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) != null) {
+                scheduledTasks.get(deviceId).cancel(false);
+                scheduledTasks.remove(deviceId);
+            } else if (mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) == null) {
+                scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
+                updatePortStatistics(deviceId);
+            }
+        });
     }
 
     @Override
     public boolean isReachable(DeviceId deviceId) {
-        log.debug("Testing rechability for device {}", deviceId);
+        log.debug("Testing reachability for device {}", deviceId);
         CompletableFuture<Boolean> reachable = getHandshaker(deviceId).isReachable();
         try {
-            return reachable.get(10, TimeUnit.SECONDS);
+            return reachable.get(REACHABILITY_TIMEOUT, TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("Device {} is not reachable", deviceId, e);
             return false;
@@ -368,15 +394,7 @@
                 return;
             }
 
-            //Storing deviceKeyId and all other config values
-            // as data in the driver with protocol_<info>
-            // name as the key. e.g protocol_ip
-            providerConfig.protocolsInfo()
-                    .forEach((protocol, deviceInfoConfig) -> {
-                        deviceInfoConfig.configValues()
-                                .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
-                        driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
-                    });
+            addConfigData(providerConfig, driverData);
 
             //Connecting to the device
             CompletableFuture<Boolean> connected = handshaker.connect();
@@ -409,7 +427,7 @@
                         ports = deviceDiscovery.discoverPortDetails();
                     }
 
-                    if (!handlePipeconf(deviceId, driver, driverData)) {
+                    if (!handlePipeconf(deviceId, driver, driverData, true)) {
                         // Something went wrong during handling of pipeconf.
                         // We already logged the error.
                         handshaker.disconnect();
@@ -425,11 +443,37 @@
         }
     }
 
+    private void connectStandbyDevice(DeviceId deviceId) {
+
+        //if device is pipeline programmable we merge pipeconf + base driver for every other role
+        GeneralProviderDeviceConfig providerConfig =
+                cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
+
+        Driver driver = getDriver(deviceId);
+
+
+        DriverData driverData = new DefaultDriverData(driver, deviceId);
+        DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
+        if (handshaker == null) {
+            log.error("Device {}, with driver {} does not support DeviceHandshaker " +
+                    "behaviour, supported behaviours={}", deviceId, driver.name(), driver.behaviours());
+            return;
+        }
+        addConfigData(providerConfig, driverData);
+
+        //Connecting to the device
+        handshaker.connect().thenAcceptAsync(result -> {
+            if (result) {
+                handlePipeconf(deviceId, driver, driverData, false);
+            }
+        });
+    }
+
     /**
      * Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
      * device can be registered to the core, false otherwise.
      */
-    private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData) {
+    private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData, boolean deployPipeconf) {
 
         PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
                 driverData);
@@ -439,6 +483,42 @@
             return true;
         }
 
+        PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
+
+        if (pipeconf != null) {
+
+            PiPipeconfId pipeconfId = pipeconf.id();
+
+            try {
+                if (deployPipeconf) {
+                    if (!pipelineProg.deployPipeconf(pipeconf).get()) {
+                        log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
+                                pipeconfId, deviceId);
+                        return false;
+                    }
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                log.warn("Exception occurred while deploying pipeconf {} to device {}", pipeconf.id(), deviceId, e);
+                return false;
+            }
+            try {
+                if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
+                    log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
+                            driver.name(), deviceId, pipeconfId);
+                    return false;
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                log.warn("Exception occurred while binding pipeconf {} to device {}", pipeconf.id(), deviceId, e);
+                return false;
+            }
+        } else {
+            return false;
+        }
+
+        return true;
+    }
+
+    private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
         PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> {
             // No pipeconf has been associated with this device.
             // Check if device driver provides a default one.
@@ -453,33 +533,16 @@
 
         if (pipeconfId == null) {
             log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
-            return false;
+            return null;
         }
 
         if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
             log.warn("Pipeconf {} is not registered", pipeconfId);
-            return false;
+            return null;
         }
 
 
-        PiPipeconf pipeconf = piPipeconfService.getPipeconf(pipeconfId).get();
-
-        try {
-            if (!pipelineProg.deployPipeconf(pipeconf).get()) {
-                log.error("Unable to deploy pipeconf {} to {}, aborting device discovery", pipeconfId, deviceId);
-                return false;
-            }
-
-            if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
-                log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
-                        driver.name(), deviceId, pipeconfId);
-                return false;
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            throw new IllegalStateException(e);
-        }
-
-        return true;
+        return piPipeconfService.getPipeconf(pipeconfId).get();
     }
 
     private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
@@ -492,7 +555,6 @@
         DeviceHandshaker handshaker = getHandshaker(deviceId);
         if (handshaker != null) {
             CompletableFuture<Boolean> disconnect = handshaker.disconnect();
-
             disconnect.thenAcceptAsync(result -> {
                 if (result) {
                     log.info("Disconnected device {}", deviceId);
@@ -560,6 +622,28 @@
                 log.info("Device {} is already connected to ONOS and is available", deviceId);
                 return;
             }
+            NodeId leaderNodeId = leadershipService.runForLeadership(DEPLOY + deviceId.toString() + PIPECONF_TOPIC)
+                    .leader().nodeId();
+            NodeId localNodeId = clusterService.getLocalNode().id();
+            if (localNodeId.equals(leaderNodeId)) {
+                if (processEvent(event, deviceId)) {
+                    log.debug("{} is leader for {}, initiating the connection and deploying pipeline", leaderNodeId,
+                            deviceId);
+                    checkAndSubmitDeviceTask(deviceId);
+                }
+            } else {
+                if (processEvent(event, deviceId)) {
+                    log.debug("{} is not leader for {}, initiating connection but not deploying pipeline, {} is LEADER",
+                            localNodeId, deviceId, leaderNodeId);
+                    connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
+                    //FIXME this will be removed when config is synced
+                    cleanUpConfigInfo(deviceId);
+                }
+            }
+
+        }
+
+        private boolean processEvent(NetworkConfigEvent event, DeviceId deviceId) {
             //FIXME to be removed when netcfg will issue device events in a bundle or
             // ensure all configuration needed is present
             Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
@@ -590,7 +674,7 @@
                 // in the pipelineConfigured
                 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
                         && pipelineConfigured.contains(deviceId)) {
-                    checkAndSubmitDeviceTask(deviceId);
+                    return true;
                 } else {
                     if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
                         log.debug("Waiting for pipeline configuration for device {}", deviceId);
@@ -604,6 +688,7 @@
                         log.debug("Only device configuration for device {}", deviceId);
                     }
                 }
+                return false;
             } finally {
                 lock.unlock();
             }
@@ -622,10 +707,26 @@
     private void checkAndSubmitDeviceTask(DeviceId deviceId) {
         connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
         //FIXME this will be removed when configuration is synced.
+        cleanUpConfigInfo(deviceId);
+
+    }
+
+    private void addConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
+        //Storing deviceKeyId and all other config values
+        // as data in the driver with protocol_<info>
+        // name as the key. e.g protocol_ip
+        providerConfig.protocolsInfo()
+                .forEach((protocol, deviceInfoConfig) -> {
+                    deviceInfoConfig.configValues()
+                            .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
+                    driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
+                });
+    }
+
+    private void cleanUpConfigInfo(DeviceId deviceId) {
         deviceConfigured.remove(deviceId);
         driverConfigured.remove(deviceId);
         pipelineConfigured.remove(deviceId);
-
     }
 
     private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
@@ -650,10 +751,13 @@
 
                 //For now this is scheduled periodically, when streaming API will
                 // be available we check and base it on the streaming API (e.g. gNMI)
-                scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
-                updatePortStatistics(deviceId);
+                if (mastershipService.isLocalMaster(deviceId)) {
+                    scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
+                    updatePortStatistics(deviceId);
+                }
 
             } else if (type.equals(Type.DEVICE_REMOVED)) {
+
                 connectionExecutor.submit(exceptionSafe(() ->
                         disconnectDevice(deviceId)));
             }
diff --git a/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java b/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java
index fb4a97a..8693eff 100644
--- a/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java
+++ b/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java
@@ -22,6 +22,7 @@
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
@@ -67,6 +68,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
     private PacketProviderService providerService;
 
     private InternalPacketListener packetListener = new InternalPacketListener();
@@ -98,7 +102,7 @@
         if (packet != null) {
             DeviceId deviceId = packet.sendThrough();
             Device device = deviceService.getDevice(deviceId);
-            if (device.is(PacketProgrammable.class)) {
+            if (device.is(PacketProgrammable.class) && mastershipService.isLocalMaster(deviceId)) {
                 PacketProgrammable packetProgrammable = device.as(PacketProgrammable.class);
                 packetProgrammable.emit(packet);
             } else {
@@ -148,7 +152,10 @@
 
         @Override
         public void event(P4RuntimeEvent event) {
-            if (event.type() != P4RuntimeEvent.Type.PACKET_IN) {
+            //Masterhip message is sent to everybody but picked up only by master.
+            //FIXME we need the device ID into p4RuntimeEvnetSubject to check for mastsership
+            if (!(event.subject() instanceof P4RuntimePacketIn) || event.type() != P4RuntimeEvent.Type.PACKET_IN) {
+                log.debug("Event type {}", event.type());
                 // Not a packet-in event, ignore it.
                 return;
             }
@@ -163,7 +170,7 @@
 
             if (!device.is(PiPipelineInterpreter.class)) {
                 log.warn("Unable to process packet-in from {}, device has no PiPipelineInterpreter behaviour",
-                         deviceId);
+                        deviceId);
                 return;
             }
 
@@ -184,7 +191,7 @@
             log.debug("Processing inbound packet: {}", inPkt.toString());
 
             OutboundPacket outPkt = new DefaultOutboundPacket(eventSubject.deviceId(), null,
-                                                              operation.data().asReadOnlyBuffer());
+                    operation.data().asReadOnlyBuffer());
             PacketContext pktCtx = new P4RuntimePacketContext(System.currentTimeMillis(), inPkt, outPkt, false);
 
             // Pushing the packet context up for processing.