ONOS-6748 Final porting of ECMP application to new PI APIs

Change-Id: Ibf15e944003b61a77a9da9a188f1e6dc3031447f
diff --git a/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java b/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java
index f77763e..227ec24 100644
--- a/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java
+++ b/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java
@@ -42,17 +42,13 @@
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleOperations;
 import org.onosproject.net.flow.FlowRuleService;
-import org.onosproject.net.host.HostEvent;
-import org.onosproject.net.host.HostListener;
 import org.onosproject.net.host.HostService;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.model.PiPipelineInterpreter;
 import org.onosproject.net.pi.runtime.PiPipeconfService;
 import org.onosproject.net.pi.runtime.PiTableId;
 import org.onosproject.net.topology.Topology;
-import org.onosproject.net.topology.TopologyEvent;
 import org.onosproject.net.topology.TopologyGraph;
-import org.onosproject.net.topology.TopologyListener;
 import org.onosproject.net.topology.TopologyService;
 import org.onosproject.net.topology.TopologyVertex;
 import org.slf4j.Logger;
@@ -64,6 +60,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -76,7 +73,6 @@
 import static java.util.stream.Stream.concat;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.device.DeviceEvent.Type.*;
-import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -87,21 +83,25 @@
 
     private static final Map<String, AbstractUpgradableFabricApp> APP_HANDLES = Maps.newConcurrentMap();
 
-    private static final int NUM_LEAFS = 2;
-    private static final int NUM_SPINES = 2;
+    // TOPO_SIZE should be the same of the --size argument when running bmv2-demo.py
+    private static final int TOPO_SIZE = 2;
+    private static final boolean WITH_IMBALANCED_STRIPING = false;
+    protected static final int HASHED_LINKS = TOPO_SIZE + (WITH_IMBALANCED_STRIPING ? 1 : 0);
+
     private static final int FLOW_PRIORITY = 100;
+    private static final int CHECK_TOPOLOGY_INTERVAL_SECONDS = 5;
 
     private static final int CLEANUP_SLEEP = 2000;
 
     protected final Logger log = getLogger(getClass());
 
-    private final TopologyListener topologyListener = new InternalTopologyListener();
     private final DeviceListener deviceListener = new InternalDeviceListener();
-    private final HostListener hostListener = new InternalHostListener();
 
     private final ExecutorService executorService = Executors
             .newFixedThreadPool(8, groupedThreads("onos/pi-demo-app", "pi-app-task", log));
 
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+
     private final String appName;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -125,8 +125,6 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private PiPipeconfService piPipeconfService;
 
-    private boolean withImbalancedStriping = false;
-
     private boolean appActive = false;
     private boolean appFreezed = false;
 
@@ -150,8 +148,8 @@
     /**
      * Creates a new PI fabric app.
      *
-     * @param appName           app name
-     * @param appPipeconf       a P4Runtime device context to be used on devices
+     * @param appName     app name
+     * @param appPipeconf a P4Runtime device context to be used on devices
      */
     protected AbstractUpgradableFabricApp(String appName, PiPipeconf appPipeconf) {
         this.appName = checkNotNull(appName);
@@ -178,10 +176,7 @@
         APP_HANDLES.put(appName, this);
 
         appId = coreService.registerApplication(appName);
-
-        topologyService.addListener(topologyListener);
         deviceService.addListener(deviceListener);
-        hostService.addListener(hostListener);
         piPipeconfService.register(appPipeconf);
 
         init();
@@ -199,9 +194,8 @@
             List<Runnable> runningTasks = executorService.shutdownNow();
             log.warn("Unable to stop the following tasks: {}", runningTasks);
         }
+        scheduledExecutorService.shutdown();
         deviceService.removeListener(deviceListener);
-        topologyService.removeListener(topologyListener);
-        hostService.removeListener(hostListener);
         flowRuleService.removeFlowRulesById(appId);
         piPipeconfService.remove(appPipeconf.id());
 
@@ -223,8 +217,12 @@
             pipeconfFlags = Maps.newConcurrentMap();
         }
 
-        // Start flow rules generator...
-        spawnTask(() -> generateFlowRules(topologyService.currentTopology(), Sets.newHashSet(hostService.getHosts())));
+        /*
+        Schedules a thread that periodically checks the topology, as soon as it corresponds to the expected
+        one, it generates the necessary flow rules and starts the deploy process on each device.
+         */
+        scheduledExecutorService.scheduleAtFixedRate(this::checkTopologyAndGenerateFlowRules,
+                0, CHECK_TOPOLOGY_INTERVAL_SECONDS, TimeUnit.SECONDS);
     }
 
     private void setAppFreezed(boolean appFreezed) {
@@ -303,7 +301,7 @@
         lock.lock();
 
         try {
-            // Set pipeconfflag if not already done.
+            // Set pipeconf flag if not already done.
             if (!pipeconfFlags.getOrDefault(deviceId, false)) {
                 if (piPipeconfService.ofDevice(deviceId).isPresent() &&
                         appPipeconf.id().equals(piPipeconfService.ofDevice(deviceId).get())) {
@@ -323,7 +321,7 @@
             // Install rules.
             if (!ruleFlags.getOrDefault(deviceId, false) &&
                     deviceFlowRules.containsKey(deviceId)) {
-                log.info("Installing rules for {}...", deviceId);
+                log.info("Installing {} rules for {}...", deviceFlowRules.get(deviceId).size(), deviceId);
                 installFlowRules(deviceFlowRules.get(deviceId));
                 ruleFlags.put(deviceId, true);
             }
@@ -343,19 +341,13 @@
         flowRuleService.apply(opsBuilder.build());
     }
 
-    private void removeFlowRules(Collection<FlowRule> rules) {
-        FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
-        rules.forEach(opsBuilder::remove);
-        flowRuleService.apply(opsBuilder.build());
-    }
-
     /**
      * Generates flow rules to provide host-to-host connectivity for the given topology and hosts.
-     *
-     * @param topo  a topology
-     * @param hosts a collection of hosts
      */
-    private synchronized void generateFlowRules(Topology topo, Collection<Host> hosts) {
+    private synchronized void checkTopologyAndGenerateFlowRules() {
+
+        Topology topo = topologyService.currentTopology();
+        Set<Host> hosts = Sets.newHashSet(hostService.getHosts());
 
         if (flowRuleGenerated) {
             log.debug("Flow rules have been already generated, aborting...");
@@ -371,7 +363,7 @@
                 .map(TopologyVertex::deviceId)
                 .forEach(did -> (isSpine(did, topo) ? spines : leafs).add(did));
 
-        if (spines.size() != NUM_SPINES || leafs.size() != NUM_LEAFS) {
+        if (spines.size() != TOPO_SIZE || leafs.size() != TOPO_SIZE) {
             log.info("Invalid leaf/spine switches count, aborting... > leafCount={}, spineCount={}",
                      spines.size(), leafs.size());
             return;
@@ -380,8 +372,7 @@
         for (DeviceId did : spines) {
             int portCount = deviceService.getPorts(did).size();
             // Expected port count: num leafs + 1 redundant leaf link (if imbalanced)
-            int expectedSpinePortCount = NUM_LEAFS + (withImbalancedStriping ? 1 : 0);
-            if (portCount != expectedSpinePortCount) {
+            if (portCount != HASHED_LINKS) {
                 log.info("Invalid port count for spine, aborting... > deviceId={}, portCount={}", did, portCount);
                 return;
             }
@@ -389,8 +380,7 @@
         for (DeviceId did : leafs) {
             int portCount = deviceService.getPorts(did).size();
             // Expected port count: num spines + host port + 1 redundant spine link
-            int expectedLeafPortCount = NUM_LEAFS + (withImbalancedStriping ? 2 : 1);
-            if (portCount != expectedLeafPortCount) {
+            if (portCount != HASHED_LINKS + 1) {
                 log.info("Invalid port count for leaf, aborting... > deviceId={}, portCount={}", did, portCount);
                 return;
             }
@@ -399,7 +389,7 @@
         // Check hosts, number and exactly one per leaf
         Map<DeviceId, Host> hostMap = Maps.newHashMap();
         hosts.forEach(h -> hostMap.put(h.location().deviceId(), h));
-        if (hosts.size() != NUM_LEAFS || !leafs.equals(hostMap.keySet())) {
+        if (hosts.size() != TOPO_SIZE || !leafs.equals(hostMap.keySet())) {
             log.info("Wrong host configuration, aborting... > hostCount={}, hostMapz={}", hosts.size(), hostMap);
             return;
         }
@@ -416,7 +406,7 @@
                 newFlowRules.addAll(generateSpineRules(deviceId, hosts, topo));
             }
         } catch (FlowRuleGeneratorException e) {
-            log.warn("Exception while executing flow rule generator: {}", e.toString());
+            log.warn("Exception while executing flow rule generator: {}", e.getMessage());
             return;
         }
 
@@ -497,26 +487,6 @@
     }
 
     /**
-     * A listener of topology events that executes a flow rule generation task each time a device is added.
-     */
-    private class InternalTopologyListener implements TopologyListener {
-
-        @Override
-        public void event(TopologyEvent event) {
-            spawnTask(() -> generateFlowRules(event.subject(), Sets.newHashSet(hostService.getHosts())));
-        }
-
-        @Override
-        public boolean isRelevant(TopologyEvent event) {
-            return !appFreezed &&
-                    // If at least one reason is of type DEVICE_ADDED.
-                    event.reasons().stream()
-                            .filter(r -> r instanceof DeviceEvent)
-                            .anyMatch(r -> ((DeviceEvent) r).type() == DEVICE_ADDED);
-        }
-    }
-
-    /**
      * A listener of device events that executes a device deploy task each time a device is added, updated or
      * re-connects.
      */
@@ -537,22 +507,6 @@
     }
 
     /**
-     * A listener of host events that generates flow rules each time a new host is added.
-     */
-    private class InternalHostListener implements HostListener {
-        @Override
-        public void event(HostEvent event) {
-            spawnTask(() -> generateFlowRules(topologyService.currentTopology(),
-                                              Sets.newHashSet(hostService.getHosts())));
-        }
-
-        @Override
-        public boolean isRelevant(HostEvent event) {
-            return !appFreezed && event.type() == HOST_ADDED;
-        }
-    }
-
-    /**
      * An exception occurred while generating flow rules for this fabric.
      */
     public class FlowRuleGeneratorException extends Exception {
@@ -563,9 +517,5 @@
         public FlowRuleGeneratorException(String msg) {
             super(msg);
         }
-
-        public FlowRuleGeneratorException(Exception cause) {
-            super(cause);
-        }
     }
 }
\ No newline at end of file
diff --git a/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpFabricApp.java b/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpFabricApp.java
index 1b6d48b..b66c160 100644
--- a/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpFabricApp.java
+++ b/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpFabricApp.java
@@ -59,6 +59,7 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static java.lang.String.format;
 import static java.util.stream.Collectors.toSet;
 import static org.onlab.packet.EthType.EtherType.IPV4;
 import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.BMV2_JSON;
@@ -220,15 +221,19 @@
     }
 
     private Pair<PiTableAction, List<FlowRule>> provisionEcmpPiTableAction(DeviceId deviceId,
-                                                                            Set<PortNumber> fabricPorts)
+                                                                           Set<PortNumber> fabricPorts)
             throws FlowRuleGeneratorException {
 
         // Install ECMP group table entries that map from hash values to actual fabric ports...
         int groupId = groupIdOf(deviceId, fabricPorts);
-        int groupSize = fabricPorts.size();
+        if (fabricPorts.size() != HASHED_LINKS) {
+            throw new FlowRuleGeneratorException(format(
+                    "Invalid number of fabric ports for %s, expected %d but found %d",
+                    deviceId, HASHED_LINKS, fabricPorts.size()));
+        }
         Iterator<PortNumber> portIterator = fabricPorts.iterator();
         List<FlowRule> rules = Lists.newArrayList();
-        for (short i = 0; i < groupSize; i++) {
+        for (short i = 0; i < HASHED_LINKS; i++) {
             FlowRule rule = flowRuleBuilder(deviceId, EcmpInterpreter.ECMP_GROUP_TABLE)
                     .withSelector(
                             buildEcmpTrafficSelector(groupId, i))
@@ -240,19 +245,17 @@
             rules.add(rule);
         }
 
-        PiTableAction piTableAction = buildEcmpPiTableAction(groupId, groupSize);
+        PiTableAction piTableAction = buildEcmpPiTableAction(groupId);
 
         return Pair.of(piTableAction, rules);
     }
 
-    private PiTableAction buildEcmpPiTableAction(int groupId, int groupSize) {
+    private PiTableAction buildEcmpPiTableAction(int groupId) {
 
         return PiAction.builder()
                 .withId(PiActionId.of(ECMP_GROUP_ACTION_NAME))
                 .withParameter(new PiActionParam(PiActionParamId.of(GROUP_ID),
                                                  ImmutableByteSequence.copyFrom(groupId)))
-                .withParameter(new PiActionParam(PiActionParamId.of(GROUP_SIZE),
-                                                 ImmutableByteSequence.copyFrom(groupSize)))
                 .build();
     }
 
diff --git a/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java b/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
index 6b302f6..d662231 100644
--- a/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
+++ b/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
@@ -27,11 +27,10 @@
  */
 public class EcmpInterpreter extends DefaultP4Interpreter {
 
-    protected static final String ECMP_METADATA_HEADER_NAME = "ecmp_metadata_t";
+    protected static final String ECMP_METADATA_HEADER_NAME = "ecmp_metadata";
     protected static final String ECMP_GROUP_ACTION_NAME = "ecmp_group";
     protected static final String GROUP_ID = "group_id";
     protected static final String SELECTOR = "selector";
-    protected static final String GROUP_SIZE = "groupSize";
     protected static final String ECMP_GROUP_TABLE = "ecmp_group_table";
 
     private static final ImmutableBiMap<Integer, PiTableId> TABLE_MAP = new ImmutableBiMap.Builder<Integer, PiTableId>()