Various changes in BMv2 driver and provider modules (onos1.6 cherry-pick)

Driver notable changes:
- Implemented new behaviors, removed deprecated ones
- Removed flow rule translator classes (now under protocol module)
- Improved FlowRuleProgrammable: now it uses BMv2TableEntryService
	to lookup/bind flow rules with BMv2 table entries, retrieves flow
	statistics, better exception handling when adding/replacing/removing
	table entries.
- Improved PacketProgrammable: better exception handling and logging

Provider notable changes:
- Bmv2DeviceProvider: detects and notifies device configuration
	changes and reboots to Bmv2DeviceContextService, added support for
	periodic polling of port statistics
- Bmv2PacketProvider: implemented workaround for OutboundPackets with
	flood treatment

Change-Id: I79b756b533d4afb6b70025a137b2e811fd42a4e8
diff --git a/providers/bmv2/device/pom.xml b/providers/bmv2/device/pom.xml
index cc0b8d0..95227dd 100644
--- a/providers/bmv2/device/pom.xml
+++ b/providers/bmv2/device/pom.xml
@@ -22,6 +22,7 @@
         <artifactId>onos-bmv2-providers</artifactId>
         <groupId>org.onosproject</groupId>
         <version>1.7.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
@@ -33,12 +34,12 @@
     <dependencies>
         <dependency>
             <groupId>org.onosproject</groupId>
-            <artifactId>onos-drivers-bmv2</artifactId>
+            <artifactId>onos-core-common</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
             <groupId>org.onosproject</groupId>
-            <artifactId>onos-core-common</artifactId>
+            <artifactId>onos-bmv2-protocol-api</artifactId>
             <version>${project.version}</version>
         </dependency>
     </dependencies>
diff --git a/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java b/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
index 69d9a43..30fcb99 100644
--- a/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
+++ b/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
@@ -23,36 +23,38 @@
 import org.jboss.netty.util.HashedWheelTimer;
 import org.jboss.netty.util.Timeout;
 import org.jboss.netty.util.TimerTask;
-import org.onlab.packet.ChassisId;
-import org.onlab.util.HexString;
 import org.onlab.util.Timer;
-import org.onosproject.bmv2.api.runtime.Bmv2ControlPlaneServer;
 import org.onosproject.bmv2.api.runtime.Bmv2Device;
 import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
-import org.onosproject.bmv2.ctl.Bmv2ThriftClient;
+import org.onosproject.bmv2.api.service.Bmv2Controller;
+import org.onosproject.bmv2.api.service.Bmv2DeviceContextService;
+import org.onosproject.bmv2.api.service.Bmv2DeviceListener;
 import org.onosproject.common.net.AbstractDeviceProvider;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.incubator.net.config.basics.ConfigException;
-import org.onosproject.net.AnnotationKeys;
-import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
 import org.onosproject.net.PortNumber;
-import org.onosproject.net.behaviour.PortDiscovery;
 import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigEvent;
 import org.onosproject.net.config.NetworkConfigListener;
 import org.onosproject.net.config.NetworkConfigRegistry;
-import org.onosproject.net.device.DefaultDeviceDescription;
 import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceDescriptionDiscovery;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.driver.DefaultDriverData;
+import org.onosproject.net.driver.DefaultDriverHandler;
+import org.onosproject.net.driver.Driver;
 import org.onosproject.net.provider.ProviderId;
 import org.slf4j.Logger;
 
+import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -60,9 +62,9 @@
 
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.bmv2.api.runtime.Bmv2Device.*;
-import static org.onosproject.bmv2.ctl.Bmv2ThriftClient.forceDisconnectOf;
-import static org.onosproject.bmv2.ctl.Bmv2ThriftClient.ping;
 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+import static org.onosproject.provider.bmv2.device.impl.Bmv2PortStatisticsGetter.getPortStatistics;
+import static org.onosproject.provider.bmv2.device.impl.Bmv2PortStatisticsGetter.initCounters;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -71,11 +73,24 @@
 @Component(immediate = true)
 public class Bmv2DeviceProvider extends AbstractDeviceProvider {
 
-    private static final Logger LOG = getLogger(Bmv2DeviceProvider.class);
-
     private static final String APP_NAME = "org.onosproject.bmv2";
-    private static final String UNKNOWN = "unknown";
-    private static final int POLL_INTERVAL = 5; // seconds
+
+    private static final int POLL_INTERVAL = 5_000; // milliseconds
+
+    private final Logger log = getLogger(this.getClass());
+
+    private final ExecutorService executorService = Executors
+            .newFixedThreadPool(16, groupedThreads("onos/bmv2", "device-discovery", log));
+
+    private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
+
+    private final ConfigFactory cfgFactory = new InternalConfigFactory();
+
+    private final ConcurrentMap<DeviceId, DeviceDescription> activeDevices = Maps.newConcurrentMap();
+
+    private final DevicePoller devicePoller = new DevicePoller();
+
+    private final InternalDeviceListener deviceListener = new InternalDeviceListener();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected NetworkConfigRegistry netCfgService;
@@ -87,16 +102,11 @@
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected Bmv2ControlPlaneServer controlPlaneServer;
+    protected Bmv2Controller controller;
 
-    private final ExecutorService deviceDiscoveryExecutor = Executors
-            .newFixedThreadPool(5, groupedThreads("onos/bmv2", "device-discovery", LOG));
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected Bmv2DeviceContextService contextService;
 
-    private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
-    private final ConfigFactory cfgFactory = new InternalConfigFactory();
-    private final ConcurrentMap<DeviceId, Boolean> activeDevices = Maps.newConcurrentMap();
-    private final DevicePoller devicePoller = new DevicePoller();
-    private final InternalHelloListener helloListener = new InternalHelloListener();
     private ApplicationId appId;
 
     /**
@@ -111,7 +121,7 @@
         appId = coreService.registerApplication(APP_NAME);
         netCfgService.registerConfigFactory(cfgFactory);
         netCfgService.addListener(cfgListener);
-        controlPlaneServer.addHelloListener(helloListener);
+        controller.addDeviceListener(deviceListener);
         devicePoller.start();
         super.activate();
     }
@@ -119,16 +129,16 @@
     @Override
     protected void deactivate() {
         devicePoller.stop();
-        controlPlaneServer.removeHelloListener(helloListener);
+        controller.removeDeviceListener(deviceListener);
         try {
             activeDevices.forEach((did, value) -> {
-                deviceDiscoveryExecutor.execute(() -> disconnectDevice(did));
+                executorService.execute(() -> disconnectDevice(did));
             });
-            deviceDiscoveryExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+            executorService.awaitTermination(1000, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
-            LOG.error("Device discovery threads did not terminate");
+            log.error("Device discovery threads did not terminate");
         }
-        deviceDiscoveryExecutor.shutdownNow();
+        executorService.shutdownNow();
         netCfgService.unregisterConfigFactory(cfgFactory);
         netCfgService.removeListener(cfgListener);
         super.deactivate();
@@ -137,14 +147,12 @@
     @Override
     public void triggerProbe(DeviceId deviceId) {
         // Asynchronously trigger probe task.
-        deviceDiscoveryExecutor.execute(() -> executeProbe(deviceId));
+        executorService.execute(() -> executeProbe(deviceId));
     }
 
     private void executeProbe(DeviceId did) {
         boolean reachable = isReachable(did);
-        LOG.debug("Probed device: id={}, reachable={}",
-                  did.toString(),
-                  reachable);
+        log.debug("Probed device: id={}, reachable={}", did.toString(), reachable);
         if (reachable) {
             discoverDevice(did);
         } else {
@@ -154,85 +162,108 @@
 
     @Override
     public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
-        LOG.debug("roleChanged() is not yet implemented");
+        log.debug("roleChanged() is not yet implemented");
         // TODO: implement mastership handling
     }
 
     @Override
     public boolean isReachable(DeviceId deviceId) {
-        return ping(deviceId);
+        return controller.isReacheable(deviceId);
     }
 
     @Override
     public void changePortState(DeviceId deviceId, PortNumber portNumber, boolean enable) {
-        LOG.debug("changePortState() is not yet implemented");
-        // TODO: implement port handling
+        log.warn("changePortState() not supported");
     }
 
     private void discoverDevice(DeviceId did) {
-        LOG.debug("Starting device discovery... deviceId={}", did);
-
-        // Atomically notify device to core and update port information.
-        activeDevices.compute(did, (k, v) -> {
-            if (!deviceService.isAvailable(did)) {
-                // Device not available in the core, connect it now.
-                DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
-                        .set(AnnotationKeys.PROTOCOL, SCHEME);
-                dumpJsonConfigToAnnotations(did, annotationsBuilder);
-                DeviceDescription descr = new DefaultDeviceDescription(
-                        did.uri(), Device.Type.SWITCH, MANUFACTURER, HW_VERSION,
-                        UNKNOWN, UNKNOWN, new ChassisId(), annotationsBuilder.build());
-                // Reset device state (cleanup entries, etc.)
-                resetDeviceState(did);
-                providerService.deviceConnected(did, descr);
+        log.debug("Starting device discovery... deviceId={}", did);
+        activeDevices.compute(did, (k, lastDescription) -> {
+            DeviceDescription thisDescription = getDeviceDescription(did);
+            if (thisDescription != null) {
+                boolean descriptionChanged = lastDescription != null &&
+                                (!Objects.equals(thisDescription, lastDescription) ||
+                                        !Objects.equals(thisDescription.annotations(), lastDescription.annotations()));
+                if (descriptionChanged || !deviceService.isAvailable(did)) {
+                    // Device description changed or device not available in the core.
+                    if (contextService.notifyDeviceChange(did)) {
+                        // Configuration is the expected one, we can proceed notifying the core.
+                        resetDeviceState(did);
+                        initPortCounters(did);
+                        providerService.deviceConnected(did, thisDescription);
+                        updatePortsAndStats(did);
+                    }
+                }
+                return thisDescription;
+            } else {
+                log.warn("Unable to get device description for {}", did);
+                return lastDescription;
             }
-            updatePorts(did);
-            return true;
         });
     }
 
-    private void dumpJsonConfigToAnnotations(DeviceId did, DefaultAnnotations.Builder builder) {
-        // TODO: store json config string somewhere else, possibly in a Bmv2Controller (see ONOS-4419)
-        try {
-            String md5 = Bmv2ThriftClient.of(did).getJsonConfigMd5();
-            // Convert to hex string for readability.
-            md5 = HexString.toHexString(md5.getBytes());
-            String jsonString = Bmv2ThriftClient.of(did).dumpJsonConfig();
-            builder.set("bmv2JsonConfigMd5", md5);
-            builder.set("bmv2JsonConfigValue", jsonString);
-        } catch (Bmv2RuntimeException e) {
-            LOG.warn("Unable to dump device JSON config from device {}: {}", did, e.toString());
+    private DeviceDescription getDeviceDescription(DeviceId did) {
+        Device device = deviceService.getDevice(did);
+        DeviceDescriptionDiscovery discovery = null;
+        if (device == null) {
+            // Device not yet in the core. Manually get a driver.
+            Driver driver = driverService.getDriver(MANUFACTURER, HW_VERSION, SW_VERSION);
+            if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
+                discovery = driver.createBehaviour(new DefaultDriverHandler(new DefaultDriverData(driver, did)),
+                                                   DeviceDescriptionDiscovery.class);
+            }
+        } else if (device.is(DeviceDescriptionDiscovery.class)) {
+            discovery = device.as(DeviceDescriptionDiscovery.class);
+        }
+        if (discovery == null) {
+            log.warn("No DeviceDescriptionDiscovery behavior for device {}", did);
+            return null;
+        } else {
+            return discovery.discoverDeviceDetails();
         }
     }
 
     private void resetDeviceState(DeviceId did) {
         try {
-            Bmv2ThriftClient.of(did).resetState();
+            controller.getAgent(did).resetState();
         } catch (Bmv2RuntimeException e) {
-            LOG.warn("Unable to reset {}: {}", did, e.toString());
+            log.warn("Unable to reset {}: {}", did, e.toString());
         }
     }
 
-    private void updatePorts(DeviceId did) {
+    private void initPortCounters(DeviceId did) {
+        try {
+            initCounters(controller.getAgent(did));
+        } catch (Bmv2RuntimeException e) {
+            log.warn("Unable to init counter on {}: {}", did, e.explain());
+        }
+    }
+
+    private void updatePortsAndStats(DeviceId did) {
         Device device = deviceService.getDevice(did);
-        if (device.is(PortDiscovery.class)) {
-            PortDiscovery portConfig = device.as(PortDiscovery.class);
-            List<PortDescription> portDescriptions = portConfig.getPorts();
-            providerService.updatePorts(did, portDescriptions);
+        if (device.is(DeviceDescriptionDiscovery.class)) {
+            DeviceDescriptionDiscovery discovery = device.as(DeviceDescriptionDiscovery.class);
+            List<PortDescription> portDescriptions = discovery.discoverPortDetails();
+            if (portDescriptions != null) {
+                providerService.updatePorts(did, portDescriptions);
+            }
         } else {
-            LOG.warn("No PortDiscovery behavior for device {}", did);
+            log.warn("No DeviceDescriptionDiscovery behavior for device {}", did);
+        }
+        try {
+            Collection<PortStatistics> portStats = getPortStatistics(controller.getAgent(did),
+                                                                     deviceService.getPorts(did));
+            providerService.updatePortStatistics(did, portStats);
+        } catch (Bmv2RuntimeException e) {
+            log.warn("Unable to get port statistics for {}: {}", did, e.explain());
         }
     }
 
     private void disconnectDevice(DeviceId did) {
-        LOG.debug("Trying to disconnect device from core... deviceId={}", did);
-
-        // Atomically disconnect device.
+        log.debug("Trying to disconnect device from core... deviceId={}", did);
         activeDevices.compute(did, (k, v) -> {
             if (deviceService.isAvailable(did)) {
                 providerService.deviceDisconnected(did);
-                // Make sure to close the transport session with device. Do we really need this?
-                forceDisconnectOf(did);
             }
             return null;
         });
@@ -269,10 +300,10 @@
                         triggerProbe(bmv2Device.asDeviceId());
                     });
                 } catch (ConfigException e) {
-                    LOG.error("Unable to read config: " + e);
+                    log.error("Unable to read config: " + e);
                 }
             } else {
-                LOG.error("Unable to read config (was null)");
+                log.error("Unable to read config (was null)");
             }
         }
 
@@ -285,18 +316,18 @@
     }
 
     /**
-     * Listener triggered by Bmv2ControlPlaneServer each time a hello message is received.
+     * Listener triggered by the BMv2 controller each time a hello message is received.
      */
-    private class InternalHelloListener implements Bmv2ControlPlaneServer.HelloListener {
+    private class InternalDeviceListener implements Bmv2DeviceListener {
         @Override
-        public void handleHello(Bmv2Device device) {
+        public void handleHello(Bmv2Device device, int instanceId, String jsonConfigMd5) {
             log.debug("Received hello from {}", device);
             triggerProbe(device.asDeviceId());
         }
     }
 
     /**
-     * Task that periodically trigger device probes.
+     * Task that periodically trigger device probes to check for device status and update port informations.
      */
     private class DevicePoller implements TimerTask {
 
@@ -304,20 +335,31 @@
         private Timeout timeout;
 
         @Override
-        public void run(Timeout timeout) throws Exception {
-            if (timeout.isCancelled()) {
+        public void run(Timeout tout) throws Exception {
+            if (tout.isCancelled()) {
                 return;
             }
-            log.debug("Executing polling on {} devices...", activeDevices.size());
-            activeDevices.forEach((did, value) -> triggerProbe(did));
-            timeout.getTimer().newTimeout(this, POLL_INTERVAL, TimeUnit.SECONDS);
+            activeDevices.keySet()
+                    .stream()
+                    // Filter out devices not yet created in the core.
+                    .filter(did -> deviceService.getDevice(did) != null)
+                    .forEach(did -> executorService.execute(() -> pollingTask(did)));
+            tout.getTimer().newTimeout(this, POLL_INTERVAL, TimeUnit.MILLISECONDS);
+        }
+
+        private void pollingTask(DeviceId deviceId) {
+            if (isReachable(deviceId)) {
+                updatePortsAndStats(deviceId);
+            } else {
+                disconnectDevice(deviceId);
+            }
         }
 
         /**
          * Starts the collector.
          */
-         synchronized void start() {
-            LOG.info("Starting device poller...");
+        synchronized void start() {
+            log.info("Starting device poller...");
             timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
         }
 
@@ -325,7 +367,7 @@
          * Stops the collector.
          */
         synchronized void stop() {
-            LOG.info("Stopping device poller...");
+            log.info("Stopping device poller...");
             timeout.cancel();
         }
     }
diff --git a/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2PortStatisticsGetter.java b/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2PortStatisticsGetter.java
new file mode 100644
index 0000000..ff94ab0
--- /dev/null
+++ b/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2PortStatisticsGetter.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.provider.bmv2.device.impl;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onosproject.bmv2.api.runtime.Bmv2Action;
+import org.onosproject.bmv2.api.runtime.Bmv2DeviceAgent;
+import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
+import org.onosproject.net.Port;
+import org.onosproject.net.device.DefaultPortStatistics;
+import org.onosproject.net.device.PortStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Utility class to read port statistics from a BMv2 device.
+ */
+final class Bmv2PortStatisticsGetter {
+
+    // TODO: make counters configuration dependent
+
+    private static final String TABLE_NAME = "port_count_table";
+    private static final String ACTION_NAME = "count_packet";
+    private static final String EGRESS_COUNTER = "egress_port_counter";
+    private static final String INGRESS_COUNTER = "ingress_port_counter";
+
+    private static final Logger log = LoggerFactory.getLogger(Bmv2PortStatisticsGetter.class);
+
+    private Bmv2PortStatisticsGetter() {
+        // ban constructor.
+    }
+
+    /**
+     * Returns a collection of port statistics for given ports using the given BMv2 device agent.
+     *
+     * @param deviceAgent a device agent
+     * @param ports       a collection of ports
+     * @return a collection of port statistics
+     */
+    static Collection<PortStatistics> getPortStatistics(Bmv2DeviceAgent deviceAgent, Collection<Port> ports) {
+
+        List<PortStatistics> ps = Lists.newArrayList();
+
+        for (Port port : ports) {
+            int portNumber = (int) port.number().toLong();
+            try {
+                Pair<Long, Long> egressCounter = deviceAgent.readCounter(EGRESS_COUNTER, portNumber);
+                Pair<Long, Long> ingressCounter = deviceAgent.readCounter(INGRESS_COUNTER, portNumber);
+                ps.add(DefaultPortStatistics.builder()
+                               .setPort(portNumber)
+                               .setBytesSent(egressCounter.getLeft())
+                               .setPacketsSent(egressCounter.getRight())
+                               .setBytesReceived(ingressCounter.getLeft())
+                               .setPacketsReceived(ingressCounter.getRight())
+                               .build());
+            } catch (Bmv2RuntimeException e) {
+                log.info("Unable to read port statistics from {}: {}", port, e.explain());
+            }
+        }
+
+        return ps;
+    }
+
+    /**
+     * Initialize port counters on the given device agent.
+     *
+     * @param deviceAgent a device agent.
+     */
+    static void initCounters(Bmv2DeviceAgent deviceAgent) {
+        try {
+            deviceAgent.setTableDefaultAction(TABLE_NAME, Bmv2Action.builder().withName(ACTION_NAME).build());
+        } catch (Bmv2RuntimeException e) {
+            log.debug("Failed to provision counters on {}: {}", deviceAgent.deviceId(), e.explain());
+        }
+    }
+}
+