ONOS-6561 BMv2 handshaker via P4Runtime

+ support fort device-specific default pipeconf
+ improvements to P4runtime and gRPC protocol stuff

Change-Id: I8986fce3959df564454ea3d31859860f61eabcae
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 1b3c459..ad48e1e 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -60,7 +60,9 @@
 import org.onosproject.net.driver.Driver;
 import org.onosproject.net.driver.DriverData;
 import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.model.PiPipelineProgrammable;
 import org.onosproject.net.pi.runtime.PiPipeconfConfig;
 import org.onosproject.net.pi.runtime.PiPipeconfService;
 import org.onosproject.net.provider.AbstractProvider;
@@ -72,7 +74,6 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
@@ -139,12 +140,12 @@
 
     protected ScheduledExecutorService connectionExecutor
             = newScheduledThreadPool(CORE_POOL_SIZE,
-            groupedThreads("onos/generaldeviceprovider-device",
-                    "connection-executor-%d", log));
+                                     groupedThreads("onos/generaldeviceprovider-device",
+                                                    "connection-executor-%d", log));
     protected ScheduledExecutorService portStatsExecutor
             = newScheduledThreadPool(CORE_POOL_SIZE,
-            groupedThreads("onos/generaldeviceprovider-port-stats",
-                    "port-stats-executor-%d", log));
+                                     groupedThreads("onos/generaldeviceprovider-port-stats",
+                                                    "port-stats-executor-%d", log));
 
     protected DeviceProviderService providerService;
     private InternalDeviceListener deviceListener = new InternalDeviceListener();
@@ -239,7 +240,7 @@
             modified.thenAcceptAsync(result -> {
                 if (!result) {
                     log.warn("Your device {} port {} status can't be changed to {}",
-                            deviceId, portNumber, enable);
+                             deviceId, portNumber, enable);
                 }
             });
 
@@ -251,13 +252,13 @@
     private DeviceHandshaker getHandshaker(DeviceId deviceId) {
         Driver driver = getDriver(deviceId);
         return getBehaviour(driver, DeviceHandshaker.class,
-                new DefaultDriverData(driver, deviceId));
+                            new DefaultDriverData(driver, deviceId));
     }
 
     private PortAdmin getPortAdmin(DeviceId deviceId) {
         Driver driver = getDriver(deviceId);
         return getBehaviour(driver, PortAdmin.class,
-                new DefaultDriverData(driver, deviceId));
+                            new DefaultDriverData(driver, deviceId));
 
     }
 
@@ -267,7 +268,7 @@
             driver = driverService.getDriver(deviceId);
         } catch (ItemNotFoundException e) {
             log.debug("Falling back to configuration to fetch driver " +
-                    "for device {}", deviceId);
+                              "for device {}", deviceId);
             driver = driverService.getDriver(
                     cfgService.getConfig(deviceId, BasicDeviceConfig.class).driver());
         }
@@ -298,7 +299,7 @@
 
         if (providerConfig == null || basicDeviceConfig == null) {
             log.error("Configuration is NULL: basic config {}, general provider " +
-                    "config {}", basicDeviceConfig, providerConfig);
+                              "config {}", basicDeviceConfig, providerConfig);
         } else {
             log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
 
@@ -310,7 +311,7 @@
 
             if (handshaker == null) {
                 log.error("Device {}, with driver {} does not support DeviceHandshaker " +
-                        "behaviour, {}", deviceId, driver.name(), driver.behaviours());
+                                  "behaviour, {}", deviceId, driver.name(), driver.behaviours());
                 return;
             }
             //Storing deviceKeyId and all other config values
@@ -333,13 +334,13 @@
                     ChassisId cid = new ChassisId();
                     SparseAnnotations annotations = DefaultAnnotations.builder()
                             .set(AnnotationKeys.PROTOCOL,
-                                    providerConfig.protocolsInfo().keySet().toString())
+                                 providerConfig.protocolsInfo().keySet().toString())
                             .build();
                     DeviceDescription description =
                             new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
-                                    driver.manufacturer(), driver.hwVersion(),
-                                    driver.swVersion(), UNKNOWN,
-                                    cid, false, annotations);
+                                                         driver.manufacturer(), driver.hwVersion(),
+                                                         driver.swVersion(), UNKNOWN,
+                                                         cid, false, annotations);
                     //Empty list of ports
                     List<PortDescription> ports = new ArrayList<>();
 
@@ -354,27 +355,15 @@
                         ports = deviceDiscovery.discoverPortDetails();
                     }
 
-                    Optional<PiPipeconfId> pipeconfId = piPipeconfService.ofDevice(deviceId);
-                    //Apply the Pipeline configuration and then connect the device
-                    if (pipeconfId.isPresent()) {
-                        DeviceDescription finalDescription = description;
-                        List<PortDescription> finalPorts = ports;
-                        piPipeconfService.bindToDevice(pipeconfId.get(), deviceId).whenComplete((success, ex) -> {
-                            if (success) {
-                                advertiseDevice(deviceId, finalDescription, finalPorts);
-                            } else {
-                                log.error("Can't merge driver {} with pipeconf {} for device {}, " +
-                                                "not reporting it to the device manager",
-                                        driver.name(), pipeconfId.get(), deviceId);
-                            }
-                        }).exceptionally(ex -> {
-                            throw new IllegalStateException(ex);
-                        });
-                    } else {
-                        //No other operation is needed, advertise the device to the core.
-                        advertiseDevice(deviceId, description, ports);
+                    if (!handlePipeconf(deviceId, driver, driverData)) {
+                        // Something went wrong during handling of pipeconf.
+                        // We already logged the error.
+                        handshaker.disconnect();
+                        return;
                     }
 
+                    advertiseDevice(deviceId, description, ports);
+
                 } else {
                     log.warn("Can't connect to device {}", deviceId);
                 }
@@ -382,6 +371,66 @@
         }
     }
 
+    /**
+     * Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
+     * device can be registered to the core, false otherwise.
+     */
+    private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData) {
+
+        PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
+                                                           driverData);
+
+        if (pipelineProg == null) {
+            // Device is not pipeline programmable.
+            return true;
+        }
+
+        PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> {
+            // No pipeconf has been associated with this device.
+            // Check if device driver provides a default one.
+            if (pipelineProg.getDefaultPipeconf().isPresent()) {
+                PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
+                log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
+                // Register default one if it is not.
+                // TODO: this should be performed at driver loading.
+                if (!piPipeconfService.getPipeconf(defaultPipeconf.id()).isPresent()) {
+                    piPipeconfService.register(defaultPipeconf);
+                }
+                return defaultPipeconf.id();
+            } else {
+                return null;
+            }
+        });
+
+        if (pipeconfId == null) {
+            log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
+            return true;
+        }
+
+
+        PiPipeconf pipeconf = piPipeconfService.getPipeconf(pipeconfId).orElseThrow(
+                () -> new IllegalStateException("Pipeconf is not registered: " + pipeconfId)
+        );
+
+
+        try {
+            if (!pipelineProg.deployPipeconf(pipeconf).get()) {
+                log.error("Unable to deploy pipeconf {} to {}, aborting device discovery", pipeconfId, deviceId);
+                return false;
+            }
+
+            if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
+                log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
+                          driver.name(), deviceId, pipeconfId);
+                return false;
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IllegalStateException(e);
+        }
+
+        return true;
+    }
+
     private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
         providerService.deviceConnected(deviceId, description);
         providerService.updatePorts(deviceId, ports);
@@ -454,7 +503,7 @@
                     //If we want to connect a p4runtime device with no pipeline
                     if (event.config().isPresent() &&
                             Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
-                                    PIPELINE_CONFIGURABLE_PROTOCOLS)) {
+                                                 PIPELINE_CONFIGURABLE_PROTOCOLS)) {
                         pipelineConfigured.add(deviceId);
                     }
                     deviceConfigured.add(deviceId);
@@ -526,15 +575,15 @@
                 // be available we check and base it on the streaming API (e.g. gNMI)
                 if (deviceService.getDevice(event.subject().id()).
                         is(PortStatisticsDiscovery.class)) {
-                    portStatsExecutor.scheduleAtFixedRate(exceptionSafe(() ->
-                                    updatePortStatistics(event.subject().id())),
+                    portStatsExecutor.scheduleAtFixedRate(
+                            exceptionSafe(() -> updatePortStatistics(event.subject().id())),
                             0, PORT_STATS_PERIOD_SECONDS, TimeUnit.SECONDS);
                     updatePortStatistics(event.subject().id());
                 }
 
             } else if (type.equals(Type.DEVICE_REMOVED)) {
                 connectionExecutor.submit(exceptionSafe(() ->
-                        disconnectDevice(event.subject().id())));
+                                                                disconnectDevice(event.subject().id())));
             }
         }