ECMP app fixes
Change-Id: Ib437f1e92cb90f7bc97539a447637066747c5532
diff --git a/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java b/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java
index 04cfc7a..ccab5cf 100644
--- a/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java
+++ b/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java
@@ -42,17 +42,13 @@
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleService;
-import org.onosproject.net.host.HostEvent;
-import org.onosproject.net.host.HostListener;
import org.onosproject.net.host.HostService;
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.model.PiPipelineInterpreter;
import org.onosproject.net.pi.runtime.PiPipeconfService;
import org.onosproject.net.pi.runtime.PiTableId;
import org.onosproject.net.topology.Topology;
-import org.onosproject.net.topology.TopologyEvent;
import org.onosproject.net.topology.TopologyGraph;
-import org.onosproject.net.topology.TopologyListener;
import org.onosproject.net.topology.TopologyService;
import org.onosproject.net.topology.TopologyVertex;
import org.slf4j.Logger;
@@ -64,6 +60,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -76,7 +73,6 @@
import static java.util.stream.Stream.concat;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.device.DeviceEvent.Type.*;
-import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -90,19 +86,21 @@
private static final int NUM_LEAFS = 2;
private static final int NUM_SPINES = 2;
protected static final int HASHED_LINKS = 2;
+ private static final boolean WITH_IMBALANCED_STRIPING = false;
private static final int FLOW_PRIORITY = 100;
+ private static final int CHECK_TOPOLOGY_INTERVAL_SECONDS = 5;
private static final int CLEANUP_SLEEP = 2000;
protected final Logger log = getLogger(getClass());
- private final TopologyListener topologyListener = new InternalTopologyListener();
private final DeviceListener deviceListener = new InternalDeviceListener();
- private final HostListener hostListener = new InternalHostListener();
private final ExecutorService executorService = Executors
.newFixedThreadPool(8, groupedThreads("onos/pi-demo-app", "pi-app-task", log));
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+
private final String appName;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -126,8 +124,6 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private PiPipeconfService piPipeconfService;
- private boolean withImbalancedStriping = false;
-
private boolean appActive = false;
private boolean appFreezed = false;
@@ -151,8 +147,8 @@
/**
* Creates a new PI fabric app.
*
- * @param appName app name
- * @param appPipeconf a P4Runtime device context to be used on devices
+ * @param appName app name
+ * @param appPipeconf a P4Runtime device context to be used on devices
*/
protected AbstractUpgradableFabricApp(String appName, PiPipeconf appPipeconf) {
this.appName = checkNotNull(appName);
@@ -179,10 +175,7 @@
APP_HANDLES.put(appName, this);
appId = coreService.registerApplication(appName);
-
- topologyService.addListener(topologyListener);
deviceService.addListener(deviceListener);
- hostService.addListener(hostListener);
piPipeconfService.register(appPipeconf);
init();
@@ -200,9 +193,8 @@
List<Runnable> runningTasks = executorService.shutdownNow();
log.warn("Unable to stop the following tasks: {}", runningTasks);
}
+ scheduledExecutorService.shutdown();
deviceService.removeListener(deviceListener);
- topologyService.removeListener(topologyListener);
- hostService.removeListener(hostListener);
flowRuleService.removeFlowRulesById(appId);
piPipeconfService.remove(appPipeconf.id());
@@ -224,8 +216,12 @@
pipeconfFlags = Maps.newConcurrentMap();
}
- // Start flow rules generator...
- spawnTask(() -> generateFlowRules(topologyService.currentTopology(), Sets.newHashSet(hostService.getHosts())));
+ /*
+ Schedules a thread that periodically checks the topology, as soon as it corresponds to the expected
+ one, it generates the necessary flow rules and starts the deploy process on each device.
+ */
+ scheduledExecutorService.scheduleAtFixedRate(this::checkTopologyAndGenerateFlowRules,
+ 0, CHECK_TOPOLOGY_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
private void setAppFreezed(boolean appFreezed) {
@@ -304,7 +300,7 @@
lock.lock();
try {
- // Set pipeconfflag if not already done.
+ // Set pipeconf flag if not already done.
if (!pipeconfFlags.getOrDefault(deviceId, false)) {
if (piPipeconfService.ofDevice(deviceId).isPresent() &&
appPipeconf.id().equals(piPipeconfService.ofDevice(deviceId).get())) {
@@ -324,7 +320,7 @@
// Install rules.
if (!ruleFlags.getOrDefault(deviceId, false) &&
deviceFlowRules.containsKey(deviceId)) {
- log.info("Installing rules for {}...", deviceId);
+ log.info("Installing {} rules for {}...", deviceFlowRules.get(deviceId).size(), deviceId);
installFlowRules(deviceFlowRules.get(deviceId));
ruleFlags.put(deviceId, true);
}
@@ -344,19 +340,13 @@
flowRuleService.apply(opsBuilder.build());
}
- private void removeFlowRules(Collection<FlowRule> rules) {
- FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
- rules.forEach(opsBuilder::remove);
- flowRuleService.apply(opsBuilder.build());
- }
-
/**
* Generates flow rules to provide host-to-host connectivity for the given topology and hosts.
- *
- * @param topo a topology
- * @param hosts a collection of hosts
*/
- private synchronized void generateFlowRules(Topology topo, Collection<Host> hosts) {
+ private synchronized void checkTopologyAndGenerateFlowRules() {
+
+ Topology topo = topologyService.currentTopology();
+ Set<Host> hosts = Sets.newHashSet(hostService.getHosts());
if (flowRuleGenerated) {
log.debug("Flow rules have been already generated, aborting...");
@@ -381,7 +371,7 @@
for (DeviceId did : spines) {
int portCount = deviceService.getPorts(did).size();
// Expected port count: num leafs + 1 redundant leaf link (if imbalanced)
- int expectedSpinePortCount = NUM_LEAFS + (withImbalancedStriping ? 1 : 0);
+ int expectedSpinePortCount = NUM_LEAFS + (WITH_IMBALANCED_STRIPING ? 1 : 0);
if (portCount != expectedSpinePortCount) {
log.info("Invalid port count for spine, aborting... > deviceId={}, portCount={}", did, portCount);
return;
@@ -390,7 +380,7 @@
for (DeviceId did : leafs) {
int portCount = deviceService.getPorts(did).size();
// Expected port count: num spines + host port + 1 redundant spine link
- int expectedLeafPortCount = NUM_LEAFS + (withImbalancedStriping ? 2 : 1);
+ int expectedLeafPortCount = NUM_LEAFS + (WITH_IMBALANCED_STRIPING ? 2 : 1);
if (portCount != expectedLeafPortCount) {
log.info("Invalid port count for leaf, aborting... > deviceId={}, portCount={}", did, portCount);
return;
@@ -498,26 +488,6 @@
}
/**
- * A listener of topology events that executes a flow rule generation task each time a device is added.
- */
- private class InternalTopologyListener implements TopologyListener {
-
- @Override
- public void event(TopologyEvent event) {
- spawnTask(() -> generateFlowRules(event.subject(), Sets.newHashSet(hostService.getHosts())));
- }
-
- @Override
- public boolean isRelevant(TopologyEvent event) {
- return !appFreezed &&
- // If at least one reason is of type DEVICE_ADDED.
- event.reasons().stream()
- .filter(r -> r instanceof DeviceEvent)
- .anyMatch(r -> ((DeviceEvent) r).type() == DEVICE_ADDED);
- }
- }
-
- /**
* A listener of device events that executes a device deploy task each time a device is added, updated or
* re-connects.
*/
@@ -538,22 +508,6 @@
}
/**
- * A listener of host events that generates flow rules each time a new host is added.
- */
- private class InternalHostListener implements HostListener {
- @Override
- public void event(HostEvent event) {
- spawnTask(() -> generateFlowRules(topologyService.currentTopology(),
- Sets.newHashSet(hostService.getHosts())));
- }
-
- @Override
- public boolean isRelevant(HostEvent event) {
- return !appFreezed && event.type() == HOST_ADDED;
- }
- }
-
- /**
* An exception occurred while generating flow rules for this fabric.
*/
public class FlowRuleGeneratorException extends Exception {
@@ -564,9 +518,5 @@
public FlowRuleGeneratorException(String msg) {
super(msg);
}
-
- public FlowRuleGeneratorException(Exception cause) {
- super(cause);
- }
}
}
\ No newline at end of file
diff --git a/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java b/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
index 0781f52..d662231 100644
--- a/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
+++ b/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
@@ -27,7 +27,7 @@
*/
public class EcmpInterpreter extends DefaultP4Interpreter {
- protected static final String ECMP_METADATA_HEADER_NAME = "ecmp_metadata_t";
+ protected static final String ECMP_METADATA_HEADER_NAME = "ecmp_metadata";
protected static final String ECMP_GROUP_ACTION_NAME = "ecmp_group";
protected static final String GROUP_ID = "group_id";
protected static final String SELECTOR = "selector";
diff --git a/tools/dev/mininet/bmv2.py b/tools/dev/mininet/bmv2.py
index e942646..fb3d9d8 100644
--- a/tools/dev/mininet/bmv2.py
+++ b/tools/dev/mininet/bmv2.py
@@ -18,7 +18,6 @@
CPU_PORT = 255
PKT_BYTES_TO_DUMP = 80
VALGRIND_PREFIX = 'valgrind --leak-check=yes'
-VALGRIND_SLEEP = 10 # seconds
def parseBoolean(value):
@@ -52,7 +51,7 @@
def __init__(self, name, json=None, debugger=False, loglevel="warn", elogger=False,
persistent=False, grpcPort=None, thriftPort=None, netcfg=True, dryrun=False,
- pipeconfId="", pktdump=False, valgrind=False, **kwargs):
+ pipeconfId="", pktdump=False, valgrind=False, netcfgSleep=1, **kwargs):
Switch.__init__(self, name, **kwargs)
self.grpcPort = ONOSBmv2Switch.pickUnusedPort() if not grpcPort else grpcPort
self.thriftPort = ONOSBmv2Switch.pickUnusedPort() if not thriftPort else thriftPort
@@ -71,6 +70,7 @@
self.netcfg = parseBoolean(netcfg)
self.dryrun = parseBoolean(dryrun)
self.valgrind = parseBoolean(valgrind)
+ self.netcfgSleep = netcfgSleep
self.netcfgfile = '/tmp/bmv2-%d-netcfg.json' % self.deviceId
self.pipeconfId = pipeconfId
if persistent:
@@ -229,11 +229,12 @@
out = self.cmd(cmdStr)
if out:
print out
- if self.valgrind:
- # With valgrind, it takes some time before the gRPC server is available.
- # Wait before pushing the netcfg.
- info("\n*** Waiting %d seconds before pushing the config to ONOS...\n" % VALGRIND_SLEEP)
- time.sleep(VALGRIND_SLEEP)
+
+ # Wait before pushing the netcfg.
+ if self.netcfgSleep > 0:
+ info("\n*** Waiting %d seconds before pushing the config to ONOS...\n"
+ % self.netcfgSleep)
+ time.sleep(self.netcfgSleep)
try: # onos.py
clist = controllers[0].nodes()
diff --git a/tools/test/topos/bmv2-demo.py b/tools/test/topos/bmv2-demo.py
index 171cc6f..2864dca 100755
--- a/tools/test/topos/bmv2-demo.py
+++ b/tools/test/topos/bmv2-demo.py
@@ -70,7 +70,10 @@
netcfg=False,
longitude=longitude,
latitude=latitude,
- pipeconfId=args.pipeconf_id)
+ pipeconfId=args.pipeconf_id,
+ valgrind=True,
+ netcfgSleep=0,
+ leglevel="debug")
for i in range(1, args.size + 1):
for j in range(1, args.size + 1):
@@ -233,8 +236,14 @@
print "Network started"
+ generateNetcfg(onosIp, net, args)
+
+ sleep(30)
+ print "Uploading netcfg..."
+ call(("%s/onos-netcfg" % RUN_PACK_PATH, onosIp, TEMP_NETCFG_FILE))
+
# Generate background traffic.
- sleep(3)
+ sleep(5)
for (h1, h2) in combinations(net.hosts, 2):
h1.startPingBg(h2)
h2.startPingBg(h1)
@@ -250,11 +259,6 @@
# print "Starting traffic from h1 to h3..."
# net.hosts[0].startIperfClient(net.hosts[-1], flowBw="200k", numFlows=100, duration=10)
- generateNetcfg(onosIp, net, args)
-
- print "Uploading netcfg..."
- call(("%s/onos-netcfg" % RUN_PACK_PATH, onosIp, TEMP_NETCFG_FILE))
-
if not args.onos_ip:
ONOSCLI(net)
else: