ECMP app fixes

Change-Id: Ib437f1e92cb90f7bc97539a447637066747c5532
diff --git a/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java b/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java
index 04cfc7a..ccab5cf 100644
--- a/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java
+++ b/apps/pi-demo/common/src/main/java/org/onosproject/pi/demo/app/common/AbstractUpgradableFabricApp.java
@@ -42,17 +42,13 @@
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleOperations;
 import org.onosproject.net.flow.FlowRuleService;
-import org.onosproject.net.host.HostEvent;
-import org.onosproject.net.host.HostListener;
 import org.onosproject.net.host.HostService;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.model.PiPipelineInterpreter;
 import org.onosproject.net.pi.runtime.PiPipeconfService;
 import org.onosproject.net.pi.runtime.PiTableId;
 import org.onosproject.net.topology.Topology;
-import org.onosproject.net.topology.TopologyEvent;
 import org.onosproject.net.topology.TopologyGraph;
-import org.onosproject.net.topology.TopologyListener;
 import org.onosproject.net.topology.TopologyService;
 import org.onosproject.net.topology.TopologyVertex;
 import org.slf4j.Logger;
@@ -64,6 +60,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -76,7 +73,6 @@
 import static java.util.stream.Stream.concat;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.device.DeviceEvent.Type.*;
-import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -90,19 +86,21 @@
     private static final int NUM_LEAFS = 2;
     private static final int NUM_SPINES = 2;
     protected static final int HASHED_LINKS = 2;
+    private static final boolean WITH_IMBALANCED_STRIPING = false;
     private static final int FLOW_PRIORITY = 100;
+    private static final int CHECK_TOPOLOGY_INTERVAL_SECONDS = 5;
 
     private static final int CLEANUP_SLEEP = 2000;
 
     protected final Logger log = getLogger(getClass());
 
-    private final TopologyListener topologyListener = new InternalTopologyListener();
     private final DeviceListener deviceListener = new InternalDeviceListener();
-    private final HostListener hostListener = new InternalHostListener();
 
     private final ExecutorService executorService = Executors
             .newFixedThreadPool(8, groupedThreads("onos/pi-demo-app", "pi-app-task", log));
 
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+
     private final String appName;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -126,8 +124,6 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private PiPipeconfService piPipeconfService;
 
-    private boolean withImbalancedStriping = false;
-
     private boolean appActive = false;
     private boolean appFreezed = false;
 
@@ -151,8 +147,8 @@
     /**
      * Creates a new PI fabric app.
      *
-     * @param appName           app name
-     * @param appPipeconf       a P4Runtime device context to be used on devices
+     * @param appName     app name
+     * @param appPipeconf a P4Runtime device context to be used on devices
      */
     protected AbstractUpgradableFabricApp(String appName, PiPipeconf appPipeconf) {
         this.appName = checkNotNull(appName);
@@ -179,10 +175,7 @@
         APP_HANDLES.put(appName, this);
 
         appId = coreService.registerApplication(appName);
-
-        topologyService.addListener(topologyListener);
         deviceService.addListener(deviceListener);
-        hostService.addListener(hostListener);
         piPipeconfService.register(appPipeconf);
 
         init();
@@ -200,9 +193,8 @@
             List<Runnable> runningTasks = executorService.shutdownNow();
             log.warn("Unable to stop the following tasks: {}", runningTasks);
         }
+        scheduledExecutorService.shutdown();
         deviceService.removeListener(deviceListener);
-        topologyService.removeListener(topologyListener);
-        hostService.removeListener(hostListener);
         flowRuleService.removeFlowRulesById(appId);
         piPipeconfService.remove(appPipeconf.id());
 
@@ -224,8 +216,12 @@
             pipeconfFlags = Maps.newConcurrentMap();
         }
 
-        // Start flow rules generator...
-        spawnTask(() -> generateFlowRules(topologyService.currentTopology(), Sets.newHashSet(hostService.getHosts())));
+        /*
+        Schedules a thread that periodically checks the topology, as soon as it corresponds to the expected
+        one, it generates the necessary flow rules and starts the deploy process on each device.
+         */
+        scheduledExecutorService.scheduleAtFixedRate(this::checkTopologyAndGenerateFlowRules,
+                0, CHECK_TOPOLOGY_INTERVAL_SECONDS, TimeUnit.SECONDS);
     }
 
     private void setAppFreezed(boolean appFreezed) {
@@ -304,7 +300,7 @@
         lock.lock();
 
         try {
-            // Set pipeconfflag if not already done.
+            // Set pipeconf flag if not already done.
             if (!pipeconfFlags.getOrDefault(deviceId, false)) {
                 if (piPipeconfService.ofDevice(deviceId).isPresent() &&
                         appPipeconf.id().equals(piPipeconfService.ofDevice(deviceId).get())) {
@@ -324,7 +320,7 @@
             // Install rules.
             if (!ruleFlags.getOrDefault(deviceId, false) &&
                     deviceFlowRules.containsKey(deviceId)) {
-                log.info("Installing rules for {}...", deviceId);
+                log.info("Installing {} rules for {}...", deviceFlowRules.get(deviceId).size(), deviceId);
                 installFlowRules(deviceFlowRules.get(deviceId));
                 ruleFlags.put(deviceId, true);
             }
@@ -344,19 +340,13 @@
         flowRuleService.apply(opsBuilder.build());
     }
 
-    private void removeFlowRules(Collection<FlowRule> rules) {
-        FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
-        rules.forEach(opsBuilder::remove);
-        flowRuleService.apply(opsBuilder.build());
-    }
-
     /**
      * Generates flow rules to provide host-to-host connectivity for the given topology and hosts.
-     *
-     * @param topo  a topology
-     * @param hosts a collection of hosts
      */
-    private synchronized void generateFlowRules(Topology topo, Collection<Host> hosts) {
+    private synchronized void checkTopologyAndGenerateFlowRules() {
+
+        Topology topo = topologyService.currentTopology();
+        Set<Host> hosts = Sets.newHashSet(hostService.getHosts());
 
         if (flowRuleGenerated) {
             log.debug("Flow rules have been already generated, aborting...");
@@ -381,7 +371,7 @@
         for (DeviceId did : spines) {
             int portCount = deviceService.getPorts(did).size();
             // Expected port count: num leafs + 1 redundant leaf link (if imbalanced)
-            int expectedSpinePortCount = NUM_LEAFS + (withImbalancedStriping ? 1 : 0);
+            int expectedSpinePortCount = NUM_LEAFS + (WITH_IMBALANCED_STRIPING ? 1 : 0);
             if (portCount != expectedSpinePortCount) {
                 log.info("Invalid port count for spine, aborting... > deviceId={}, portCount={}", did, portCount);
                 return;
@@ -390,7 +380,7 @@
         for (DeviceId did : leafs) {
             int portCount = deviceService.getPorts(did).size();
             // Expected port count: num spines + host port + 1 redundant spine link
-            int expectedLeafPortCount = NUM_LEAFS + (withImbalancedStriping ? 2 : 1);
+            int expectedLeafPortCount = NUM_LEAFS + (WITH_IMBALANCED_STRIPING ? 2 : 1);
             if (portCount != expectedLeafPortCount) {
                 log.info("Invalid port count for leaf, aborting... > deviceId={}, portCount={}", did, portCount);
                 return;
@@ -498,26 +488,6 @@
     }
 
     /**
-     * A listener of topology events that executes a flow rule generation task each time a device is added.
-     */
-    private class InternalTopologyListener implements TopologyListener {
-
-        @Override
-        public void event(TopologyEvent event) {
-            spawnTask(() -> generateFlowRules(event.subject(), Sets.newHashSet(hostService.getHosts())));
-        }
-
-        @Override
-        public boolean isRelevant(TopologyEvent event) {
-            return !appFreezed &&
-                    // If at least one reason is of type DEVICE_ADDED.
-                    event.reasons().stream()
-                            .filter(r -> r instanceof DeviceEvent)
-                            .anyMatch(r -> ((DeviceEvent) r).type() == DEVICE_ADDED);
-        }
-    }
-
-    /**
      * A listener of device events that executes a device deploy task each time a device is added, updated or
      * re-connects.
      */
@@ -538,22 +508,6 @@
     }
 
     /**
-     * A listener of host events that generates flow rules each time a new host is added.
-     */
-    private class InternalHostListener implements HostListener {
-        @Override
-        public void event(HostEvent event) {
-            spawnTask(() -> generateFlowRules(topologyService.currentTopology(),
-                                              Sets.newHashSet(hostService.getHosts())));
-        }
-
-        @Override
-        public boolean isRelevant(HostEvent event) {
-            return !appFreezed && event.type() == HOST_ADDED;
-        }
-    }
-
-    /**
      * An exception occurred while generating flow rules for this fabric.
      */
     public class FlowRuleGeneratorException extends Exception {
@@ -564,9 +518,5 @@
         public FlowRuleGeneratorException(String msg) {
             super(msg);
         }
-
-        public FlowRuleGeneratorException(Exception cause) {
-            super(cause);
-        }
     }
 }
\ No newline at end of file
diff --git a/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java b/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
index 0781f52..d662231 100644
--- a/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
+++ b/apps/pi-demo/ecmp/src/main/java/org/onosproject/pi/demo/app/ecmp/EcmpInterpreter.java
@@ -27,7 +27,7 @@
  */
 public class EcmpInterpreter extends DefaultP4Interpreter {
 
-    protected static final String ECMP_METADATA_HEADER_NAME = "ecmp_metadata_t";
+    protected static final String ECMP_METADATA_HEADER_NAME = "ecmp_metadata";
     protected static final String ECMP_GROUP_ACTION_NAME = "ecmp_group";
     protected static final String GROUP_ID = "group_id";
     protected static final String SELECTOR = "selector";
diff --git a/tools/dev/mininet/bmv2.py b/tools/dev/mininet/bmv2.py
index e942646..fb3d9d8 100644
--- a/tools/dev/mininet/bmv2.py
+++ b/tools/dev/mininet/bmv2.py
@@ -18,7 +18,6 @@
 CPU_PORT = 255
 PKT_BYTES_TO_DUMP = 80
 VALGRIND_PREFIX = 'valgrind --leak-check=yes'
-VALGRIND_SLEEP = 10  # seconds
 
 
 def parseBoolean(value):
@@ -52,7 +51,7 @@
 
     def __init__(self, name, json=None, debugger=False, loglevel="warn", elogger=False,
                  persistent=False, grpcPort=None, thriftPort=None, netcfg=True, dryrun=False,
-                 pipeconfId="", pktdump=False, valgrind=False, **kwargs):
+                 pipeconfId="", pktdump=False, valgrind=False, netcfgSleep=1, **kwargs):
         Switch.__init__(self, name, **kwargs)
         self.grpcPort = ONOSBmv2Switch.pickUnusedPort() if not grpcPort else grpcPort
         self.thriftPort = ONOSBmv2Switch.pickUnusedPort() if not thriftPort else thriftPort
@@ -71,6 +70,7 @@
         self.netcfg = parseBoolean(netcfg)
         self.dryrun = parseBoolean(dryrun)
         self.valgrind = parseBoolean(valgrind)
+        self.netcfgSleep = netcfgSleep
         self.netcfgfile = '/tmp/bmv2-%d-netcfg.json' % self.deviceId
         self.pipeconfId = pipeconfId
         if persistent:
@@ -229,11 +229,12 @@
             out = self.cmd(cmdStr)
             if out:
                 print out
-            if self.valgrind:
-                # With valgrind, it takes some time before the gRPC server is available.
-                # Wait before pushing the netcfg.
-                info("\n*** Waiting %d seconds before pushing the config to ONOS...\n" % VALGRIND_SLEEP)
-                time.sleep(VALGRIND_SLEEP)
+
+        # Wait before pushing the netcfg.
+        if self.netcfgSleep > 0:
+            info("\n*** Waiting %d seconds before pushing the config to ONOS...\n"
+                 % self.netcfgSleep)
+        time.sleep(self.netcfgSleep)
 
         try:  # onos.py
             clist = controllers[0].nodes()
diff --git a/tools/test/topos/bmv2-demo.py b/tools/test/topos/bmv2-demo.py
index 171cc6f..2864dca 100755
--- a/tools/test/topos/bmv2-demo.py
+++ b/tools/test/topos/bmv2-demo.py
@@ -70,7 +70,10 @@
                                                     netcfg=False,
                                                     longitude=longitude,
                                                     latitude=latitude,
-                                                    pipeconfId=args.pipeconf_id)
+                                                    pipeconfId=args.pipeconf_id,
+                                                    valgrind=True,
+                                                    netcfgSleep=0,
+                                                    leglevel="debug")
 
         for i in range(1, args.size + 1):
             for j in range(1, args.size + 1):
@@ -233,8 +236,14 @@
 
     print "Network started"
 
+    generateNetcfg(onosIp, net, args)
+
+    sleep(30)
+    print "Uploading netcfg..."
+    call(("%s/onos-netcfg" % RUN_PACK_PATH, onosIp, TEMP_NETCFG_FILE))
+
     # Generate background traffic.
-    sleep(3)
+    sleep(5)
     for (h1, h2) in combinations(net.hosts, 2):
         h1.startPingBg(h2)
         h2.startPingBg(h1)
@@ -250,11 +259,6 @@
     # print "Starting traffic from h1 to h3..."
     # net.hosts[0].startIperfClient(net.hosts[-1], flowBw="200k", numFlows=100, duration=10)
 
-    generateNetcfg(onosIp, net, args)
-
-    print "Uploading netcfg..."
-    call(("%s/onos-netcfg" % RUN_PACK_PATH, onosIp, TEMP_NETCFG_FILE))
-
     if not args.onos_ip:
         ONOSCLI(net)
     else: