Completing Port Statistics scheduling in general device provider.
Adding empty shell for port statistcs discovery with bmv2 and default.p4

Change-Id: I4a333e406d6df0c6f8041d53f21cbf10f8bb9782
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index c8e7ae1..260e41c 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -22,10 +22,13 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.packet.ChassisId;
 import org.onlab.util.ItemNotFoundException;
+import org.onlab.util.Tools;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.DefaultAnnotations;
@@ -68,18 +71,23 @@
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Dictionary;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
@@ -120,17 +128,23 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PiPipeconfService piPipeconfService;
 
+    private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
+    @Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
+            label = "Configure poll frequency for port status and statistics; " +
+                    "default is 10 sec")
+    private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
+
     protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
     protected static final String URI_SCHEME = "device";
     protected static final String CFG_SCHEME = "generalprovider";
     private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
     private static final int CORE_POOL_SIZE = 10;
     private static final String UNKNOWN = "unknown";
-    private static final int PORT_STATS_PERIOD_SECONDS = 10;
+
     //FIXME this will be removed when the configuration is synced at the source.
     private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
-    private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
 
+    private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
     //FIXME to be removed when netcfg will issue device events in a bundle or
     //ensures all configuration needed is present
     private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
@@ -140,12 +154,13 @@
 
     protected ScheduledExecutorService connectionExecutor
             = newScheduledThreadPool(CORE_POOL_SIZE,
-                                     groupedThreads("onos/generaldeviceprovider-device",
-                                                    "connection-executor-%d", log));
+            groupedThreads("onos/generaldeviceprovider-device",
+                    "connection-executor-%d", log));
     protected ScheduledExecutorService portStatsExecutor
             = newScheduledThreadPool(CORE_POOL_SIZE,
-                                     groupedThreads("onos/generaldeviceprovider-port-stats",
-                                                    "port-stats-executor-%d", log));
+            groupedThreads("onos/generaldeviceprovider-port-stats",
+                    "port-stats-executor-%d", log));
+    protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
 
     protected DeviceProviderService providerService;
     private InternalDeviceListener deviceListener = new InternalDeviceListener();
@@ -177,6 +192,33 @@
         log.info("Started");
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        if (context != null) {
+            Dictionary<?, ?> properties = context.getProperties();
+            pollFrequency = Tools.getIntegerProperty(properties, "pollFrequency",
+                    DEFAULT_POLL_FREQUENCY_SECONDS);
+            log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
+        }
+
+        if (!scheduledTasks.isEmpty()) {
+            //cancel all previous tasks
+            scheduledTasks.values().forEach(task -> task.cancel(false));
+            //resubmit task with new timeout.
+            Set<DeviceId> deviceSubjects =
+                    cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
+            deviceSubjects.forEach(deviceId -> {
+                if (!compareScheme(deviceId)) {
+                    // not under my scheme, skipping
+                    log.debug("{} is not my scheme, skipping", deviceId);
+                    return;
+                }
+                scheduledTasks.put(deviceId, schedulePolling(deviceId, true));
+            });
+        }
+
+    }
+
 
     @Deactivate
     public void deactivate() {
@@ -240,7 +282,7 @@
             modified.thenAcceptAsync(result -> {
                 if (!result) {
                     log.warn("Your device {} port {} status can't be changed to {}",
-                             deviceId, portNumber, enable);
+                            deviceId, portNumber, enable);
                 }
             });
 
@@ -252,13 +294,13 @@
     private DeviceHandshaker getHandshaker(DeviceId deviceId) {
         Driver driver = getDriver(deviceId);
         return getBehaviour(driver, DeviceHandshaker.class,
-                            new DefaultDriverData(driver, deviceId));
+                new DefaultDriverData(driver, deviceId));
     }
 
     private PortAdmin getPortAdmin(DeviceId deviceId) {
         Driver driver = getDriver(deviceId);
         return getBehaviour(driver, PortAdmin.class,
-                            new DefaultDriverData(driver, deviceId));
+                new DefaultDriverData(driver, deviceId));
 
     }
 
@@ -268,7 +310,7 @@
             driver = driverService.getDriver(deviceId);
         } catch (ItemNotFoundException e) {
             log.debug("Falling back to configuration to fetch driver " +
-                              "for device {}", deviceId);
+                    "for device {}", deviceId);
             driver = driverService.getDriver(
                     cfgService.getConfig(deviceId, BasicDeviceConfig.class).driver());
         }
@@ -299,7 +341,7 @@
 
         if (providerConfig == null || basicDeviceConfig == null) {
             log.error("Configuration is NULL: basic config {}, general provider " +
-                              "config {}", basicDeviceConfig, providerConfig);
+                    "config {}", basicDeviceConfig, providerConfig);
         } else {
             log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
 
@@ -311,7 +353,7 @@
 
             if (handshaker == null) {
                 log.error("Device {}, with driver {} does not support DeviceHandshaker " +
-                                  "behaviour, {}", deviceId, driver.name(), driver.behaviours());
+                        "behaviour, {}", deviceId, driver.name(), driver.behaviours());
                 return;
             }
             //Storing deviceKeyId and all other config values
@@ -334,13 +376,13 @@
                     ChassisId cid = new ChassisId();
                     SparseAnnotations annotations = DefaultAnnotations.builder()
                             .set(AnnotationKeys.PROTOCOL,
-                                 providerConfig.protocolsInfo().keySet().toString())
+                                    providerConfig.protocolsInfo().keySet().toString())
                             .build();
                     DeviceDescription description =
                             new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
-                                                         driver.manufacturer(), driver.hwVersion(),
-                                                         driver.swVersion(), UNKNOWN,
-                                                         cid, false, annotations);
+                                    driver.manufacturer(), driver.hwVersion(),
+                                    driver.swVersion(), UNKNOWN,
+                                    cid, false, annotations);
                     //Empty list of ports
                     List<PortDescription> ports = new ArrayList<>();
 
@@ -378,7 +420,7 @@
     private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData) {
 
         PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
-                                                           driverData);
+                driverData);
 
         if (pipelineProg == null) {
             // Device is not pipeline programmable.
@@ -416,7 +458,7 @@
 
             if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
                 log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
-                          driver.name(), deviceId, pipeconfId);
+                        driver.name(), deviceId, pipeconfId);
                 return false;
             }
         } catch (InterruptedException | ExecutionException e) {
@@ -449,6 +491,10 @@
             //gracefully ignoring.
             log.info("No DeviceHandshaker for device {}", deviceId);
         }
+        ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
+        if (pollingStatisticsTask != null) {
+            pollingStatisticsTask.cancel(true);
+        }
     }
 
     //Needed to catch the exception in the executors since are not rethrown otherwise.
@@ -466,7 +512,14 @@
         Collection<PortStatistics> statistics = deviceService.getDevice(deviceId)
                 .as(PortStatisticsDiscovery.class)
                 .discoverPortStatistics();
-        providerService.updatePortStatistics(deviceId, statistics);
+        //updating statistcs only if not empty
+        if (!statistics.isEmpty()) {
+            providerService.updatePortStatistics(deviceId, statistics);
+        }
+    }
+
+    private boolean compareScheme(DeviceId deviceId) {
+        return deviceId.uri().getScheme().equals(URI_SCHEME);
     }
 
     /**
@@ -479,7 +532,7 @@
         public void event(NetworkConfigEvent event) {
             DeviceId deviceId = (DeviceId) event.subject();
             //Assuming that the deviceId comes with uri 'device:'
-            if (!deviceId.uri().getScheme().equals(URI_SCHEME)) {
+            if (!compareScheme(deviceId)) {
                 // not under my scheme, skipping
                 log.debug("{} is not my scheme, skipping", deviceId);
                 return;
@@ -498,7 +551,7 @@
                     //If we want to connect a p4runtime device with no pipeline
                     if (event.config().isPresent() &&
                             Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
-                                                 PIPELINE_CONFIGURABLE_PROTOCOLS)) {
+                                    PIPELINE_CONFIGURABLE_PROTOCOLS)) {
                         pipelineConfigured.add(deviceId);
                     }
                     deviceConfigured.add(deviceId);
@@ -556,6 +609,16 @@
 
     }
 
+    private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
+        int delay = 0;
+        if (randomize) {
+            delay = new Random().nextInt(10);
+        }
+        return portStatsExecutor.scheduleAtFixedRate(
+                exceptionSafe(() -> updatePortStatistics(deviceId)),
+                delay, pollFrequency, TimeUnit.SECONDS);
+    }
+
     /**
      * Listener for core device events.
      */
@@ -563,29 +626,25 @@
         @Override
         public void event(DeviceEvent event) {
             Type type = event.type();
-
             if (type.equals((Type.DEVICE_ADDED))) {
 
                 //For now this is scheduled periodically, when streaming API will
                 // be available we check and base it on the streaming API (e.g. gNMI)
                 if (deviceService.getDevice(event.subject().id()).
                         is(PortStatisticsDiscovery.class)) {
-                    portStatsExecutor.scheduleAtFixedRate(
-                            exceptionSafe(() -> updatePortStatistics(event.subject().id())),
-                            0, PORT_STATS_PERIOD_SECONDS, TimeUnit.SECONDS);
+                    scheduledTasks.put(event.subject().id(), schedulePolling(event.subject().id(), false));
                     updatePortStatistics(event.subject().id());
                 }
 
             } else if (type.equals(Type.DEVICE_REMOVED)) {
                 connectionExecutor.submit(exceptionSafe(() ->
-                                                                disconnectDevice(event.subject().id())));
+                        disconnectDevice(event.subject().id())));
             }
         }
 
         @Override
         public boolean isRelevant(DeviceEvent event) {
-            return URI_SCHEME.toUpperCase()
-                    .equals(event.subject().id().uri().toString());
+            return event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
         }
     }
 }