Completing Port Statistics scheduling in general device provider.
Adding empty shell for port statistcs discovery with bmv2 and default.p4

Change-Id: I4a333e406d6df0c6f8041d53f21cbf10f8bb9782
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java
index 0a1ebba..e04e8b3 100644
--- a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java
@@ -19,6 +19,7 @@
 import org.onosproject.bmv2.model.Bmv2PipelineModelParser;
 import org.onosproject.driver.pipeline.DefaultSingleTablePipeline;
 import org.onosproject.net.behaviour.Pipeliner;
+import org.onosproject.net.device.PortStatisticsDiscovery;
 import org.onosproject.net.pi.model.DefaultPiPipeconf;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.model.PiPipeconfId;
@@ -58,6 +59,7 @@
                 .withPipelineModel(Bmv2PipelineModelParser.parse(jsonUrl))
                 .addBehaviour(PiPipelineInterpreter.class, Bmv2DefaultInterpreter.class)
                 .addBehaviour(Pipeliner.class, DefaultSingleTablePipeline.class)
+                .addBehaviour(PortStatisticsDiscovery.class, Bmv2DefaultPortStatisticsDiscovery.class)
                 .addExtension(P4_INFO_TEXT, p4InfoUrl)
                 .addExtension(BMV2_JSON, jsonUrl)
                 .build();
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPortStatisticsDiscovery.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPortStatisticsDiscovery.java
new file mode 100644
index 0000000..96081be
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPortStatisticsDiscovery.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.bmv2;
+
+import com.google.common.collect.ImmutableList;
+import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.device.PortStatisticsDiscovery;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+/**
+ * Implementation of the behaviour for discovering the port statistics of a Bmv2 device with the default.p4 program.
+ */
+public class Bmv2DefaultPortStatisticsDiscovery extends AbstractHandlerBehaviour implements PortStatisticsDiscovery {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Override
+    public Collection<PortStatistics> discoverPortStatistics() {
+        log.debug("Discovering Port Statistics for device {}", handler().data().deviceId());
+        return ImmutableList.of();
+    }
+}
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index c8e7ae1..260e41c 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -22,10 +22,13 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.packet.ChassisId;
 import org.onlab.util.ItemNotFoundException;
+import org.onlab.util.Tools;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.DefaultAnnotations;
@@ -68,18 +71,23 @@
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Dictionary;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
@@ -120,17 +128,23 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PiPipeconfService piPipeconfService;
 
+    private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
+    @Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
+            label = "Configure poll frequency for port status and statistics; " +
+                    "default is 10 sec")
+    private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
+
     protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
     protected static final String URI_SCHEME = "device";
     protected static final String CFG_SCHEME = "generalprovider";
     private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
     private static final int CORE_POOL_SIZE = 10;
     private static final String UNKNOWN = "unknown";
-    private static final int PORT_STATS_PERIOD_SECONDS = 10;
+
     //FIXME this will be removed when the configuration is synced at the source.
     private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
-    private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
 
+    private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
     //FIXME to be removed when netcfg will issue device events in a bundle or
     //ensures all configuration needed is present
     private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
@@ -140,12 +154,13 @@
 
     protected ScheduledExecutorService connectionExecutor
             = newScheduledThreadPool(CORE_POOL_SIZE,
-                                     groupedThreads("onos/generaldeviceprovider-device",
-                                                    "connection-executor-%d", log));
+            groupedThreads("onos/generaldeviceprovider-device",
+                    "connection-executor-%d", log));
     protected ScheduledExecutorService portStatsExecutor
             = newScheduledThreadPool(CORE_POOL_SIZE,
-                                     groupedThreads("onos/generaldeviceprovider-port-stats",
-                                                    "port-stats-executor-%d", log));
+            groupedThreads("onos/generaldeviceprovider-port-stats",
+                    "port-stats-executor-%d", log));
+    protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
 
     protected DeviceProviderService providerService;
     private InternalDeviceListener deviceListener = new InternalDeviceListener();
@@ -177,6 +192,33 @@
         log.info("Started");
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        if (context != null) {
+            Dictionary<?, ?> properties = context.getProperties();
+            pollFrequency = Tools.getIntegerProperty(properties, "pollFrequency",
+                    DEFAULT_POLL_FREQUENCY_SECONDS);
+            log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
+        }
+
+        if (!scheduledTasks.isEmpty()) {
+            //cancel all previous tasks
+            scheduledTasks.values().forEach(task -> task.cancel(false));
+            //resubmit task with new timeout.
+            Set<DeviceId> deviceSubjects =
+                    cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
+            deviceSubjects.forEach(deviceId -> {
+                if (!compareScheme(deviceId)) {
+                    // not under my scheme, skipping
+                    log.debug("{} is not my scheme, skipping", deviceId);
+                    return;
+                }
+                scheduledTasks.put(deviceId, schedulePolling(deviceId, true));
+            });
+        }
+
+    }
+
 
     @Deactivate
     public void deactivate() {
@@ -240,7 +282,7 @@
             modified.thenAcceptAsync(result -> {
                 if (!result) {
                     log.warn("Your device {} port {} status can't be changed to {}",
-                             deviceId, portNumber, enable);
+                            deviceId, portNumber, enable);
                 }
             });
 
@@ -252,13 +294,13 @@
     private DeviceHandshaker getHandshaker(DeviceId deviceId) {
         Driver driver = getDriver(deviceId);
         return getBehaviour(driver, DeviceHandshaker.class,
-                            new DefaultDriverData(driver, deviceId));
+                new DefaultDriverData(driver, deviceId));
     }
 
     private PortAdmin getPortAdmin(DeviceId deviceId) {
         Driver driver = getDriver(deviceId);
         return getBehaviour(driver, PortAdmin.class,
-                            new DefaultDriverData(driver, deviceId));
+                new DefaultDriverData(driver, deviceId));
 
     }
 
@@ -268,7 +310,7 @@
             driver = driverService.getDriver(deviceId);
         } catch (ItemNotFoundException e) {
             log.debug("Falling back to configuration to fetch driver " +
-                              "for device {}", deviceId);
+                    "for device {}", deviceId);
             driver = driverService.getDriver(
                     cfgService.getConfig(deviceId, BasicDeviceConfig.class).driver());
         }
@@ -299,7 +341,7 @@
 
         if (providerConfig == null || basicDeviceConfig == null) {
             log.error("Configuration is NULL: basic config {}, general provider " +
-                              "config {}", basicDeviceConfig, providerConfig);
+                    "config {}", basicDeviceConfig, providerConfig);
         } else {
             log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
 
@@ -311,7 +353,7 @@
 
             if (handshaker == null) {
                 log.error("Device {}, with driver {} does not support DeviceHandshaker " +
-                                  "behaviour, {}", deviceId, driver.name(), driver.behaviours());
+                        "behaviour, {}", deviceId, driver.name(), driver.behaviours());
                 return;
             }
             //Storing deviceKeyId and all other config values
@@ -334,13 +376,13 @@
                     ChassisId cid = new ChassisId();
                     SparseAnnotations annotations = DefaultAnnotations.builder()
                             .set(AnnotationKeys.PROTOCOL,
-                                 providerConfig.protocolsInfo().keySet().toString())
+                                    providerConfig.protocolsInfo().keySet().toString())
                             .build();
                     DeviceDescription description =
                             new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
-                                                         driver.manufacturer(), driver.hwVersion(),
-                                                         driver.swVersion(), UNKNOWN,
-                                                         cid, false, annotations);
+                                    driver.manufacturer(), driver.hwVersion(),
+                                    driver.swVersion(), UNKNOWN,
+                                    cid, false, annotations);
                     //Empty list of ports
                     List<PortDescription> ports = new ArrayList<>();
 
@@ -378,7 +420,7 @@
     private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData) {
 
         PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
-                                                           driverData);
+                driverData);
 
         if (pipelineProg == null) {
             // Device is not pipeline programmable.
@@ -416,7 +458,7 @@
 
             if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
                 log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
-                          driver.name(), deviceId, pipeconfId);
+                        driver.name(), deviceId, pipeconfId);
                 return false;
             }
         } catch (InterruptedException | ExecutionException e) {
@@ -449,6 +491,10 @@
             //gracefully ignoring.
             log.info("No DeviceHandshaker for device {}", deviceId);
         }
+        ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
+        if (pollingStatisticsTask != null) {
+            pollingStatisticsTask.cancel(true);
+        }
     }
 
     //Needed to catch the exception in the executors since are not rethrown otherwise.
@@ -466,7 +512,14 @@
         Collection<PortStatistics> statistics = deviceService.getDevice(deviceId)
                 .as(PortStatisticsDiscovery.class)
                 .discoverPortStatistics();
-        providerService.updatePortStatistics(deviceId, statistics);
+        //updating statistcs only if not empty
+        if (!statistics.isEmpty()) {
+            providerService.updatePortStatistics(deviceId, statistics);
+        }
+    }
+
+    private boolean compareScheme(DeviceId deviceId) {
+        return deviceId.uri().getScheme().equals(URI_SCHEME);
     }
 
     /**
@@ -479,7 +532,7 @@
         public void event(NetworkConfigEvent event) {
             DeviceId deviceId = (DeviceId) event.subject();
             //Assuming that the deviceId comes with uri 'device:'
-            if (!deviceId.uri().getScheme().equals(URI_SCHEME)) {
+            if (!compareScheme(deviceId)) {
                 // not under my scheme, skipping
                 log.debug("{} is not my scheme, skipping", deviceId);
                 return;
@@ -498,7 +551,7 @@
                     //If we want to connect a p4runtime device with no pipeline
                     if (event.config().isPresent() &&
                             Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
-                                                 PIPELINE_CONFIGURABLE_PROTOCOLS)) {
+                                    PIPELINE_CONFIGURABLE_PROTOCOLS)) {
                         pipelineConfigured.add(deviceId);
                     }
                     deviceConfigured.add(deviceId);
@@ -556,6 +609,16 @@
 
     }
 
+    private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
+        int delay = 0;
+        if (randomize) {
+            delay = new Random().nextInt(10);
+        }
+        return portStatsExecutor.scheduleAtFixedRate(
+                exceptionSafe(() -> updatePortStatistics(deviceId)),
+                delay, pollFrequency, TimeUnit.SECONDS);
+    }
+
     /**
      * Listener for core device events.
      */
@@ -563,29 +626,25 @@
         @Override
         public void event(DeviceEvent event) {
             Type type = event.type();
-
             if (type.equals((Type.DEVICE_ADDED))) {
 
                 //For now this is scheduled periodically, when streaming API will
                 // be available we check and base it on the streaming API (e.g. gNMI)
                 if (deviceService.getDevice(event.subject().id()).
                         is(PortStatisticsDiscovery.class)) {
-                    portStatsExecutor.scheduleAtFixedRate(
-                            exceptionSafe(() -> updatePortStatistics(event.subject().id())),
-                            0, PORT_STATS_PERIOD_SECONDS, TimeUnit.SECONDS);
+                    scheduledTasks.put(event.subject().id(), schedulePolling(event.subject().id(), false));
                     updatePortStatistics(event.subject().id());
                 }
 
             } else if (type.equals(Type.DEVICE_REMOVED)) {
                 connectionExecutor.submit(exceptionSafe(() ->
-                                                                disconnectDevice(event.subject().id())));
+                        disconnectDevice(event.subject().id())));
             }
         }
 
         @Override
         public boolean isRelevant(DeviceEvent event) {
-            return URI_SCHEME.toUpperCase()
-                    .equals(event.subject().id().uri().toString());
+            return event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
         }
     }
 }
diff --git a/tools/test/configs/driver-cfg.json b/tools/test/configs/driver-cfg.json
new file mode 100644
index 0000000..86f0164
--- /dev/null
+++ b/tools/test/configs/driver-cfg.json
@@ -0,0 +1,14 @@
+{
+  "devices": {
+    "of:0000000000000001": {
+      "basic": {
+        "driver": "corsa"
+      }
+    },
+    "of:0000000000000002": {
+      "basic": {
+        "driver": "corsa"
+      }
+    }
+  }
+}
diff --git a/tools/test/topos/test-null b/tools/test/topos/test-null
new file mode 100755
index 0000000..98fe8b9
--- /dev/null
+++ b/tools/test/topos/test-null
@@ -0,0 +1,37 @@
+#!/bin/bash
+# -----------------------------------------------------------------------------
+# Creates a replica of the GEANT topology using ONOS null provider
+# -----------------------------------------------------------------------------
+
+# config
+host=${1:-localhost}
+nports=24
+sleepfor=5
+
+
+# start custom simulation..
+onos ${host} null-simulation start custom
+
+
+## unfortunately, it takes a time for the sim to start up
+#  this is not ideal...
+
+echo
+echo "Sleeping while sim starts up... (${sleepfor} seconds)..."
+echo
+sleep ${sleepfor}
+
+# Add devices, links, and hosts
+onos ${1:-localhost} <<-EOF
+
+null-create-device switch ATH ${nports} 37.984149 23.7279843
+null-create-device switch LIS ${nports} 38.707792 -9.1365069
+
+
+null-create-link direct ATH LIS
+null-create-link direct LIS ATH
+
+null-create-host ATH 192.168.1.1   34.984149  24.7279843
+null-create-host LIS 192.168.1.2   37.707792  -7.1365069
+
+EOF
\ No newline at end of file