Completing Port Statistics scheduling in general device provider.
Adding empty shell for port statistcs discovery with bmv2 and default.p4
Change-Id: I4a333e406d6df0c6f8041d53f21cbf10f8bb9782
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java
index 0a1ebba..e04e8b3 100644
--- a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconfFactory.java
@@ -19,6 +19,7 @@
import org.onosproject.bmv2.model.Bmv2PipelineModelParser;
import org.onosproject.driver.pipeline.DefaultSingleTablePipeline;
import org.onosproject.net.behaviour.Pipeliner;
+import org.onosproject.net.device.PortStatisticsDiscovery;
import org.onosproject.net.pi.model.DefaultPiPipeconf;
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.model.PiPipeconfId;
@@ -58,6 +59,7 @@
.withPipelineModel(Bmv2PipelineModelParser.parse(jsonUrl))
.addBehaviour(PiPipelineInterpreter.class, Bmv2DefaultInterpreter.class)
.addBehaviour(Pipeliner.class, DefaultSingleTablePipeline.class)
+ .addBehaviour(PortStatisticsDiscovery.class, Bmv2DefaultPortStatisticsDiscovery.class)
.addExtension(P4_INFO_TEXT, p4InfoUrl)
.addExtension(BMV2_JSON, jsonUrl)
.build();
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPortStatisticsDiscovery.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPortStatisticsDiscovery.java
new file mode 100644
index 0000000..96081be
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPortStatisticsDiscovery.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.bmv2;
+
+import com.google.common.collect.ImmutableList;
+import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.device.PortStatisticsDiscovery;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+/**
+ * Implementation of the behaviour for discovering the port statistics of a Bmv2 device with the default.p4 program.
+ */
+public class Bmv2DefaultPortStatisticsDiscovery extends AbstractHandlerBehaviour implements PortStatisticsDiscovery {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Override
+ public Collection<PortStatistics> discoverPortStatistics() {
+ log.debug("Discovering Port Statistics for device {}", handler().data().deviceId());
+ return ImmutableList.of();
+ }
+}
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index c8e7ae1..260e41c 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -22,10 +22,13 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.ChassisId;
import org.onlab.util.ItemNotFoundException;
+import org.onlab.util.Tools;
import org.onosproject.core.CoreService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.DefaultAnnotations;
@@ -68,18 +71,23 @@
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Dictionary;
import java.util.List;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
@@ -120,17 +128,23 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PiPipeconfService piPipeconfService;
+ private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
+ @Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
+ label = "Configure poll frequency for port status and statistics; " +
+ "default is 10 sec")
+ private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
+
protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
protected static final String URI_SCHEME = "device";
protected static final String CFG_SCHEME = "generalprovider";
private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
private static final int CORE_POOL_SIZE = 10;
private static final String UNKNOWN = "unknown";
- private static final int PORT_STATS_PERIOD_SECONDS = 10;
+
//FIXME this will be removed when the configuration is synced at the source.
private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
- private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
+ private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
//FIXME to be removed when netcfg will issue device events in a bundle or
//ensures all configuration needed is present
private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
@@ -140,12 +154,13 @@
protected ScheduledExecutorService connectionExecutor
= newScheduledThreadPool(CORE_POOL_SIZE,
- groupedThreads("onos/generaldeviceprovider-device",
- "connection-executor-%d", log));
+ groupedThreads("onos/generaldeviceprovider-device",
+ "connection-executor-%d", log));
protected ScheduledExecutorService portStatsExecutor
= newScheduledThreadPool(CORE_POOL_SIZE,
- groupedThreads("onos/generaldeviceprovider-port-stats",
- "port-stats-executor-%d", log));
+ groupedThreads("onos/generaldeviceprovider-port-stats",
+ "port-stats-executor-%d", log));
+ protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
protected DeviceProviderService providerService;
private InternalDeviceListener deviceListener = new InternalDeviceListener();
@@ -177,6 +192,33 @@
log.info("Started");
}
+ @Modified
+ public void modified(ComponentContext context) {
+ if (context != null) {
+ Dictionary<?, ?> properties = context.getProperties();
+ pollFrequency = Tools.getIntegerProperty(properties, "pollFrequency",
+ DEFAULT_POLL_FREQUENCY_SECONDS);
+ log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
+ }
+
+ if (!scheduledTasks.isEmpty()) {
+ //cancel all previous tasks
+ scheduledTasks.values().forEach(task -> task.cancel(false));
+ //resubmit task with new timeout.
+ Set<DeviceId> deviceSubjects =
+ cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
+ deviceSubjects.forEach(deviceId -> {
+ if (!compareScheme(deviceId)) {
+ // not under my scheme, skipping
+ log.debug("{} is not my scheme, skipping", deviceId);
+ return;
+ }
+ scheduledTasks.put(deviceId, schedulePolling(deviceId, true));
+ });
+ }
+
+ }
+
@Deactivate
public void deactivate() {
@@ -240,7 +282,7 @@
modified.thenAcceptAsync(result -> {
if (!result) {
log.warn("Your device {} port {} status can't be changed to {}",
- deviceId, portNumber, enable);
+ deviceId, portNumber, enable);
}
});
@@ -252,13 +294,13 @@
private DeviceHandshaker getHandshaker(DeviceId deviceId) {
Driver driver = getDriver(deviceId);
return getBehaviour(driver, DeviceHandshaker.class,
- new DefaultDriverData(driver, deviceId));
+ new DefaultDriverData(driver, deviceId));
}
private PortAdmin getPortAdmin(DeviceId deviceId) {
Driver driver = getDriver(deviceId);
return getBehaviour(driver, PortAdmin.class,
- new DefaultDriverData(driver, deviceId));
+ new DefaultDriverData(driver, deviceId));
}
@@ -268,7 +310,7 @@
driver = driverService.getDriver(deviceId);
} catch (ItemNotFoundException e) {
log.debug("Falling back to configuration to fetch driver " +
- "for device {}", deviceId);
+ "for device {}", deviceId);
driver = driverService.getDriver(
cfgService.getConfig(deviceId, BasicDeviceConfig.class).driver());
}
@@ -299,7 +341,7 @@
if (providerConfig == null || basicDeviceConfig == null) {
log.error("Configuration is NULL: basic config {}, general provider " +
- "config {}", basicDeviceConfig, providerConfig);
+ "config {}", basicDeviceConfig, providerConfig);
} else {
log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
@@ -311,7 +353,7 @@
if (handshaker == null) {
log.error("Device {}, with driver {} does not support DeviceHandshaker " +
- "behaviour, {}", deviceId, driver.name(), driver.behaviours());
+ "behaviour, {}", deviceId, driver.name(), driver.behaviours());
return;
}
//Storing deviceKeyId and all other config values
@@ -334,13 +376,13 @@
ChassisId cid = new ChassisId();
SparseAnnotations annotations = DefaultAnnotations.builder()
.set(AnnotationKeys.PROTOCOL,
- providerConfig.protocolsInfo().keySet().toString())
+ providerConfig.protocolsInfo().keySet().toString())
.build();
DeviceDescription description =
new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
- driver.manufacturer(), driver.hwVersion(),
- driver.swVersion(), UNKNOWN,
- cid, false, annotations);
+ driver.manufacturer(), driver.hwVersion(),
+ driver.swVersion(), UNKNOWN,
+ cid, false, annotations);
//Empty list of ports
List<PortDescription> ports = new ArrayList<>();
@@ -378,7 +420,7 @@
private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData) {
PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
- driverData);
+ driverData);
if (pipelineProg == null) {
// Device is not pipeline programmable.
@@ -416,7 +458,7 @@
if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
- driver.name(), deviceId, pipeconfId);
+ driver.name(), deviceId, pipeconfId);
return false;
}
} catch (InterruptedException | ExecutionException e) {
@@ -449,6 +491,10 @@
//gracefully ignoring.
log.info("No DeviceHandshaker for device {}", deviceId);
}
+ ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
+ if (pollingStatisticsTask != null) {
+ pollingStatisticsTask.cancel(true);
+ }
}
//Needed to catch the exception in the executors since are not rethrown otherwise.
@@ -466,7 +512,14 @@
Collection<PortStatistics> statistics = deviceService.getDevice(deviceId)
.as(PortStatisticsDiscovery.class)
.discoverPortStatistics();
- providerService.updatePortStatistics(deviceId, statistics);
+ //updating statistcs only if not empty
+ if (!statistics.isEmpty()) {
+ providerService.updatePortStatistics(deviceId, statistics);
+ }
+ }
+
+ private boolean compareScheme(DeviceId deviceId) {
+ return deviceId.uri().getScheme().equals(URI_SCHEME);
}
/**
@@ -479,7 +532,7 @@
public void event(NetworkConfigEvent event) {
DeviceId deviceId = (DeviceId) event.subject();
//Assuming that the deviceId comes with uri 'device:'
- if (!deviceId.uri().getScheme().equals(URI_SCHEME)) {
+ if (!compareScheme(deviceId)) {
// not under my scheme, skipping
log.debug("{} is not my scheme, skipping", deviceId);
return;
@@ -498,7 +551,7 @@
//If we want to connect a p4runtime device with no pipeline
if (event.config().isPresent() &&
Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
- PIPELINE_CONFIGURABLE_PROTOCOLS)) {
+ PIPELINE_CONFIGURABLE_PROTOCOLS)) {
pipelineConfigured.add(deviceId);
}
deviceConfigured.add(deviceId);
@@ -556,6 +609,16 @@
}
+ private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
+ int delay = 0;
+ if (randomize) {
+ delay = new Random().nextInt(10);
+ }
+ return portStatsExecutor.scheduleAtFixedRate(
+ exceptionSafe(() -> updatePortStatistics(deviceId)),
+ delay, pollFrequency, TimeUnit.SECONDS);
+ }
+
/**
* Listener for core device events.
*/
@@ -563,29 +626,25 @@
@Override
public void event(DeviceEvent event) {
Type type = event.type();
-
if (type.equals((Type.DEVICE_ADDED))) {
//For now this is scheduled periodically, when streaming API will
// be available we check and base it on the streaming API (e.g. gNMI)
if (deviceService.getDevice(event.subject().id()).
is(PortStatisticsDiscovery.class)) {
- portStatsExecutor.scheduleAtFixedRate(
- exceptionSafe(() -> updatePortStatistics(event.subject().id())),
- 0, PORT_STATS_PERIOD_SECONDS, TimeUnit.SECONDS);
+ scheduledTasks.put(event.subject().id(), schedulePolling(event.subject().id(), false));
updatePortStatistics(event.subject().id());
}
} else if (type.equals(Type.DEVICE_REMOVED)) {
connectionExecutor.submit(exceptionSafe(() ->
- disconnectDevice(event.subject().id())));
+ disconnectDevice(event.subject().id())));
}
}
@Override
public boolean isRelevant(DeviceEvent event) {
- return URI_SCHEME.toUpperCase()
- .equals(event.subject().id().uri().toString());
+ return event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
}
}
}
diff --git a/tools/test/configs/driver-cfg.json b/tools/test/configs/driver-cfg.json
new file mode 100644
index 0000000..86f0164
--- /dev/null
+++ b/tools/test/configs/driver-cfg.json
@@ -0,0 +1,14 @@
+{
+ "devices": {
+ "of:0000000000000001": {
+ "basic": {
+ "driver": "corsa"
+ }
+ },
+ "of:0000000000000002": {
+ "basic": {
+ "driver": "corsa"
+ }
+ }
+ }
+}
diff --git a/tools/test/topos/test-null b/tools/test/topos/test-null
new file mode 100755
index 0000000..98fe8b9
--- /dev/null
+++ b/tools/test/topos/test-null
@@ -0,0 +1,37 @@
+#!/bin/bash
+# -----------------------------------------------------------------------------
+# Creates a replica of the GEANT topology using ONOS null provider
+# -----------------------------------------------------------------------------
+
+# config
+host=${1:-localhost}
+nports=24
+sleepfor=5
+
+
+# start custom simulation..
+onos ${host} null-simulation start custom
+
+
+## unfortunately, it takes a time for the sim to start up
+# this is not ideal...
+
+echo
+echo "Sleeping while sim starts up... (${sleepfor} seconds)..."
+echo
+sleep ${sleepfor}
+
+# Add devices, links, and hosts
+onos ${1:-localhost} <<-EOF
+
+null-create-device switch ATH ${nports} 37.984149 23.7279843
+null-create-device switch LIS ${nports} 38.707792 -9.1365069
+
+
+null-create-link direct ATH LIS
+null-create-link direct LIS ATH
+
+null-create-host ATH 192.168.1.1 34.984149 24.7279843
+null-create-host LIS 192.168.1.2 37.707792 -7.1365069
+
+EOF
\ No newline at end of file