ONOS-6748 Final porting of ECMP application to new PI APIs
Change-Id: Ibf15e944003b61a77a9da9a188f1e6dc3031447f
diff --git a/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java b/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java
index f77763e..227ec24 100644
--- a/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java
+++ b/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java
@@ -42,17 +42,13 @@
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleService;
-import org.onosproject.net.host.HostEvent;
-import org.onosproject.net.host.HostListener;
import org.onosproject.net.host.HostService;
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.model.PiPipelineInterpreter;
import org.onosproject.net.pi.runtime.PiPipeconfService;
import org.onosproject.net.pi.runtime.PiTableId;
import org.onosproject.net.topology.Topology;
-import org.onosproject.net.topology.TopologyEvent;
import org.onosproject.net.topology.TopologyGraph;
-import org.onosproject.net.topology.TopologyListener;
import org.onosproject.net.topology.TopologyService;
import org.onosproject.net.topology.TopologyVertex;
import org.slf4j.Logger;
@@ -64,6 +60,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -76,7 +73,6 @@
import static java.util.stream.Stream.concat;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.device.DeviceEvent.Type.*;
-import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -87,21 +83,25 @@
private static final Map<String, AbstractUpgradableFabricApp> APP_HANDLES = Maps.newConcurrentMap();
- private static final int NUM_LEAFS = 2;
- private static final int NUM_SPINES = 2;
+ // TOPO_SIZE should be the same of the --size argument when running bmv2-demo.py
+ private static final int TOPO_SIZE = 2;
+ private static final boolean WITH_IMBALANCED_STRIPING = false;
+ protected static final int HASHED_LINKS = TOPO_SIZE + (WITH_IMBALANCED_STRIPING ? 1 : 0);
+
private static final int FLOW_PRIORITY = 100;
+ private static final int CHECK_TOPOLOGY_INTERVAL_SECONDS = 5;
private static final int CLEANUP_SLEEP = 2000;
protected final Logger log = getLogger(getClass());
- private final TopologyListener topologyListener = new InternalTopologyListener();
private final DeviceListener deviceListener = new InternalDeviceListener();
- private final HostListener hostListener = new InternalHostListener();
private final ExecutorService executorService = Executors
.newFixedThreadPool(8, groupedThreads("onos/pi-demo-app", "pi-app-task", log));
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+
private final String appName;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -125,8 +125,6 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private PiPipeconfService piPipeconfService;
- private boolean withImbalancedStriping = false;
-
private boolean appActive = false;
private boolean appFreezed = false;
@@ -150,8 +148,8 @@
/**
* Creates a new PI fabric app.
*
- * @param appName app name
- * @param appPipeconf a P4Runtime device context to be used on devices
+ * @param appName app name
+ * @param appPipeconf a P4Runtime device context to be used on devices
*/
protected AbstractUpgradableFabricApp(String appName, PiPipeconf appPipeconf) {
this.appName = checkNotNull(appName);
@@ -178,10 +176,7 @@
APP_HANDLES.put(appName, this);
appId = coreService.registerApplication(appName);
-
- topologyService.addListener(topologyListener);
deviceService.addListener(deviceListener);
- hostService.addListener(hostListener);
piPipeconfService.register(appPipeconf);
init();
@@ -199,9 +194,8 @@
List<Runnable> runningTasks = executorService.shutdownNow();
log.warn("Unable to stop the following tasks: {}", runningTasks);
}
+ scheduledExecutorService.shutdown();
deviceService.removeListener(deviceListener);
- topologyService.removeListener(topologyListener);
- hostService.removeListener(hostListener);
flowRuleService.removeFlowRulesById(appId);
piPipeconfService.remove(appPipeconf.id());
@@ -223,8 +217,12 @@
pipeconfFlags = Maps.newConcurrentMap();
}
- // Start flow rules generator...
- spawnTask(() -> generateFlowRules(topologyService.currentTopology(), Sets.newHashSet(hostService.getHosts())));
+ /*
+ Schedules a thread that periodically checks the topology, as soon as it corresponds to the expected
+ one, it generates the necessary flow rules and starts the deploy process on each device.
+ */
+ scheduledExecutorService.scheduleAtFixedRate(this::checkTopologyAndGenerateFlowRules,
+ 0, CHECK_TOPOLOGY_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
private void setAppFreezed(boolean appFreezed) {
@@ -303,7 +301,7 @@
lock.lock();
try {
- // Set pipeconfflag if not already done.
+ // Set pipeconf flag if not already done.
if (!pipeconfFlags.getOrDefault(deviceId, false)) {
if (piPipeconfService.ofDevice(deviceId).isPresent() &&
appPipeconf.id().equals(piPipeconfService.ofDevice(deviceId).get())) {
@@ -323,7 +321,7 @@
// Install rules.
if (!ruleFlags.getOrDefault(deviceId, false) &&
deviceFlowRules.containsKey(deviceId)) {
- log.info("Installing rules for {}...", deviceId);
+ log.info("Installing {} rules for {}...", deviceFlowRules.get(deviceId).size(), deviceId);
installFlowRules(deviceFlowRules.get(deviceId));
ruleFlags.put(deviceId, true);
}
@@ -343,19 +341,13 @@
flowRuleService.apply(opsBuilder.build());
}
- private void removeFlowRules(Collection<FlowRule> rules) {
- FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
- rules.forEach(opsBuilder::remove);
- flowRuleService.apply(opsBuilder.build());
- }
-
/**
* Generates flow rules to provide host-to-host connectivity for the given topology and hosts.
- *
- * @param topo a topology
- * @param hosts a collection of hosts
*/
- private synchronized void generateFlowRules(Topology topo, Collection<Host> hosts) {
+ private synchronized void checkTopologyAndGenerateFlowRules() {
+
+ Topology topo = topologyService.currentTopology();
+ Set<Host> hosts = Sets.newHashSet(hostService.getHosts());
if (flowRuleGenerated) {
log.debug("Flow rules have been already generated, aborting...");
@@ -371,7 +363,7 @@
.map(TopologyVertex::deviceId)
.forEach(did -> (isSpine(did, topo) ? spines : leafs).add(did));
- if (spines.size() != NUM_SPINES || leafs.size() != NUM_LEAFS) {
+ if (spines.size() != TOPO_SIZE || leafs.size() != TOPO_SIZE) {
log.info("Invalid leaf/spine switches count, aborting... > leafCount={}, spineCount={}",
spines.size(), leafs.size());
return;
@@ -380,8 +372,7 @@
for (DeviceId did : spines) {
int portCount = deviceService.getPorts(did).size();
// Expected port count: num leafs + 1 redundant leaf link (if imbalanced)
- int expectedSpinePortCount = NUM_LEAFS + (withImbalancedStriping ? 1 : 0);
- if (portCount != expectedSpinePortCount) {
+ if (portCount != HASHED_LINKS) {
log.info("Invalid port count for spine, aborting... > deviceId={}, portCount={}", did, portCount);
return;
}
@@ -389,8 +380,7 @@
for (DeviceId did : leafs) {
int portCount = deviceService.getPorts(did).size();
// Expected port count: num spines + host port + 1 redundant spine link
- int expectedLeafPortCount = NUM_LEAFS + (withImbalancedStriping ? 2 : 1);
- if (portCount != expectedLeafPortCount) {
+ if (portCount != HASHED_LINKS + 1) {
log.info("Invalid port count for leaf, aborting... > deviceId={}, portCount={}", did, portCount);
return;
}
@@ -399,7 +389,7 @@
// Check hosts, number and exactly one per leaf
Map<DeviceId, Host> hostMap = Maps.newHashMap();
hosts.forEach(h -> hostMap.put(h.location().deviceId(), h));
- if (hosts.size() != NUM_LEAFS || !leafs.equals(hostMap.keySet())) {
+ if (hosts.size() != TOPO_SIZE || !leafs.equals(hostMap.keySet())) {
log.info("Wrong host configuration, aborting... > hostCount={}, hostMapz={}", hosts.size(), hostMap);
return;
}
@@ -416,7 +406,7 @@
newFlowRules.addAll(generateSpineRules(deviceId, hosts, topo));
}
} catch (FlowRuleGeneratorException e) {
- log.warn("Exception while executing flow rule generator: {}", e.toString());
+ log.warn("Exception while executing flow rule generator: {}", e.getMessage());
return;
}
@@ -497,26 +487,6 @@
}
/**
- * A listener of topology events that executes a flow rule generation task each time a device is added.
- */
- private class InternalTopologyListener implements TopologyListener {
-
- @Override
- public void event(TopologyEvent event) {
- spawnTask(() -> generateFlowRules(event.subject(), Sets.newHashSet(hostService.getHosts())));
- }
-
- @Override
- public boolean isRelevant(TopologyEvent event) {
- return !appFreezed &&
- // If at least one reason is of type DEVICE_ADDED.
- event.reasons().stream()
- .filter(r -> r instanceof DeviceEvent)
- .anyMatch(r -> ((DeviceEvent) r).type() == DEVICE_ADDED);
- }
- }
-
- /**
* A listener of device events that executes a device deploy task each time a device is added, updated or
* re-connects.
*/
@@ -537,22 +507,6 @@
}
/**
- * A listener of host events that generates flow rules each time a new host is added.
- */
- private class InternalHostListener implements HostListener {
- @Override
- public void event(HostEvent event) {
- spawnTask(() -> generateFlowRules(topologyService.currentTopology(),
- Sets.newHashSet(hostService.getHosts())));
- }
-
- @Override
- public boolean isRelevant(HostEvent event) {
- return !appFreezed && event.type() == HOST_ADDED;
- }
- }
-
- /**
* An exception occurred while generating flow rules for this fabric.
*/
public class FlowRuleGeneratorException extends Exception {
@@ -563,9 +517,5 @@
public FlowRuleGeneratorException(String msg) {
super(msg);
}
-
- public FlowRuleGeneratorException(Exception cause) {
- super(cause);
- }
}
}
\ No newline at end of file
diff --git a/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpFabricApp.java b/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpFabricApp.java
index 1b6d48b..b66c160 100644
--- a/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpFabricApp.java
+++ b/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpFabricApp.java
@@ -59,6 +59,7 @@
import java.util.Set;
import java.util.stream.Collectors;
+import static java.lang.String.format;
import static java.util.stream.Collectors.toSet;
import static org.onlab.packet.EthType.EtherType.IPV4;
import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.BMV2_JSON;
@@ -220,15 +221,19 @@
}
private Pair<PiTableAction, List<FlowRule>> provisionEcmpPiTableAction(DeviceId deviceId,
- Set<PortNumber> fabricPorts)
+ Set<PortNumber> fabricPorts)
throws FlowRuleGeneratorException {
// Install ECMP group table entries that map from hash values to actual fabric ports...
int groupId = groupIdOf(deviceId, fabricPorts);
- int groupSize = fabricPorts.size();
+ if (fabricPorts.size() != HASHED_LINKS) {
+ throw new FlowRuleGeneratorException(format(
+ "Invalid number of fabric ports for %s, expected %d but found %d",
+ deviceId, HASHED_LINKS, fabricPorts.size()));
+ }
Iterator<PortNumber> portIterator = fabricPorts.iterator();
List<FlowRule> rules = Lists.newArrayList();
- for (short i = 0; i < groupSize; i++) {
+ for (short i = 0; i < HASHED_LINKS; i++) {
FlowRule rule = flowRuleBuilder(deviceId, EcmpInterpreter.ECMP_GROUP_TABLE)
.withSelector(
buildEcmpTrafficSelector(groupId, i))
@@ -240,19 +245,17 @@
rules.add(rule);
}
- PiTableAction piTableAction = buildEcmpPiTableAction(groupId, groupSize);
+ PiTableAction piTableAction = buildEcmpPiTableAction(groupId);
return Pair.of(piTableAction, rules);
}
- private PiTableAction buildEcmpPiTableAction(int groupId, int groupSize) {
+ private PiTableAction buildEcmpPiTableAction(int groupId) {
return PiAction.builder()
.withId(PiActionId.of(ECMP_GROUP_ACTION_NAME))
.withParameter(new PiActionParam(PiActionParamId.of(GROUP_ID),
ImmutableByteSequence.copyFrom(groupId)))
- .withParameter(new PiActionParam(PiActionParamId.of(GROUP_SIZE),
- ImmutableByteSequence.copyFrom(groupSize)))
.build();
}
diff --git a/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java b/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
index 6b302f6..d662231 100644
--- a/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
+++ b/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
@@ -27,11 +27,10 @@
*/
public class EcmpInterpreter extends DefaultP4Interpreter {
- protected static final String ECMP_METADATA_HEADER_NAME = "ecmp_metadata_t";
+ protected static final String ECMP_METADATA_HEADER_NAME = "ecmp_metadata";
protected static final String ECMP_GROUP_ACTION_NAME = "ecmp_group";
protected static final String GROUP_ID = "group_id";
protected static final String SELECTOR = "selector";
- protected static final String GROUP_SIZE = "groupSize";
protected static final String ECMP_GROUP_TABLE = "ecmp_group_table";
private static final ImmutableBiMap<Integer, PiTableId> TABLE_MAP = new ImmutableBiMap.Builder<Integer, PiTableId>()