Improves host programming introducing a pool of worker threads

Change-Id: I979693aa220e2666c13c4015435c66173624ea64
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
index 14df622..c62e395 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
@@ -21,6 +21,7 @@
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
+import org.onlab.util.PredictableExecutor;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Host;
@@ -40,6 +41,7 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.onlab.util.Tools.groupedThreads;
 
 /**
  * Handles host-related events.
@@ -49,6 +51,9 @@
 
     protected final SegmentRoutingManager srManager;
     private HostService hostService;
+    // Host workers - 0 will leverage available processors
+    private static final int DEFAULT_THREADS = 0;
+    protected PredictableExecutor hostWorkers;
 
     /**
      * Constructs the HostHandler.
@@ -58,19 +63,36 @@
     HostHandler(SegmentRoutingManager srManager) {
         this.srManager = srManager;
         hostService = srManager.hostService;
+        this.hostWorkers = new PredictableExecutor(DEFAULT_THREADS,
+                                                   groupedThreads("onos/sr", "h-worker-%d", log));
+    }
+
+    /**
+     * Shutdowns the workers.
+     */
+    void terminate() {
+        hostWorkers.shutdown();
     }
 
     protected void init(DeviceId devId) {
-        hostService.getHosts().forEach(host ->
-            host.locations().stream()
-                    .filter(location -> location.deviceId().equals(devId) ||
-                            location.deviceId().equals(srManager.getPairDeviceId(devId).orElse(null)))
-                    .forEach(location -> processHostAddedAtLocation(host, location))
+        // Init hosts in parallel using hostWorkers executor
+        hostService.getHosts().forEach(
+                host -> hostWorkers.execute(() -> initHost(host, devId), host.id().hashCode())
         );
     }
 
+    private void initHost(Host host, DeviceId deviceId) {
+        host.locations().forEach(location -> {
+            if (location.deviceId().equals(deviceId) ||
+                    location.deviceId().equals(srManager.getPairDeviceId(deviceId).orElse(null))) {
+                processHostAddedAtLocation(host, location);
+            }
+        });
+    }
+
     void processHostAddedEvent(HostEvent event) {
-        processHostAdded(event.subject());
+        Host host = event.subject();
+        hostWorkers.execute(() -> processHostAdded(host), host.id().hashCode());
     }
 
     private void processHostAdded(Host host) {
@@ -141,7 +163,8 @@
     }
 
     void processHostRemovedEvent(HostEvent event) {
-        processHostRemoved(event.subject());
+        Host host = event.subject();
+        hostWorkers.execute(() -> processHostRemoved(host), host.id().hashCode());
     }
 
     private void processHostRemoved(Host host) {
@@ -196,6 +219,11 @@
 
     void processHostMovedEvent(HostEvent event) {
         Host host = event.subject();
+        hostWorkers.execute(() -> processHostMovedEventInternal(event), host.id().hashCode());
+    }
+
+    private void processHostMovedEventInternal(HostEvent event) {
+        Host host = event.subject();
         MacAddress hostMac = host.mac();
         VlanId hostVlanId = host.vlan();
         Set<HostLocation> prevLocations = event.prevSubject().locations();
@@ -349,6 +377,11 @@
 
     void processHostUpdatedEvent(HostEvent event) {
         Host host = event.subject();
+        hostWorkers.execute(() -> processHostUpdatedEventInternal(event), host.id().hashCode());
+    }
+
+    private void processHostUpdatedEventInternal(HostEvent event) {
+        Host host = event.subject();
         MacAddress hostMac = host.mac();
         VlanId hostVlanId = host.vlan();
         EthType hostTpid = host.tpid();
@@ -424,10 +457,15 @@
         }
         if (srManager.activeProbing) {
             srManager.getPairDeviceId(cp.deviceId())
-                    .ifPresent(pairDeviceId -> srManager.hostService.getConnectedHosts(pairDeviceId).stream()
-                            .filter(host -> isHostInVlanOfPort(host, pairDeviceId, cp))
-                            .forEach(host -> srManager.probingService.probeHost(host, cp, ProbeMode.DISCOVER))
-                    );
+                    .ifPresent(pairDeviceId -> srManager.hostService.getConnectedHosts(pairDeviceId).forEach(
+                            host -> hostWorkers.execute(() -> probingIfNecessary(host, pairDeviceId, cp),
+                                                        host.id().hashCode())));
+        }
+    }
+
+    private void probingIfNecessary(Host host, DeviceId pairDeviceId, ConnectPoint cp) {
+        if (isHostInVlanOfPort(host, pairDeviceId, cp)) {
+            srManager.probingService.probeHost(host, cp, ProbeMode.DISCOVER);
         }
     }
 
@@ -637,7 +675,7 @@
      * @param install true to populate the objective, false to revoke
      */
     void processIntfVlanUpdatedEvent(DeviceId deviceId, PortNumber portNum, VlanId vlanId,
-                                 boolean popVlan, boolean install) {
+                                     boolean popVlan, boolean install) {
         ConnectPoint connectPoint = new ConnectPoint(deviceId, portNum);
         Set<Host> hosts = hostService.getConnectedHosts(connectPoint);
 
@@ -646,22 +684,25 @@
             return;
         }
 
-        hosts.forEach(host -> {
-            MacAddress mac = host.mac();
-            VlanId hostVlanId = host.vlan();
+        hosts.forEach(host -> hostWorkers.execute(() -> processIntfVlanUpdatedEventInternal(
+                host, deviceId, portNum, vlanId, popVlan, install), host.id().hashCode()));
+    }
 
-            // Check whether the host vlan is valid for new interface configuration
-            if ((!popVlan && hostVlanId.equals(vlanId)) ||
-                    (popVlan && hostVlanId.equals(VlanId.NONE))) {
-                srManager.defaultRoutingHandler.updateBridging(deviceId, portNum, mac, vlanId, popVlan, install);
-                // Update Forwarding objective and corresponding simple Next objective
-                // for each host and IP address connected to given port
-                host.ipAddresses().forEach(ipAddress ->
-                    srManager.defaultRoutingHandler.updateFwdObj(deviceId, portNum, ipAddress.toIpPrefix(),
-                                                                mac, vlanId, popVlan, install)
-                );
-            }
-        });
+    private void processIntfVlanUpdatedEventInternal(Host host, DeviceId deviceId, PortNumber portNum,
+                                                     VlanId vlanId, boolean popVlan, boolean install) {
+        MacAddress mac = host.mac();
+        VlanId hostVlanId = host.vlan();
+
+        // Check whether the host vlan is valid for new interface configuration
+        if ((!popVlan && hostVlanId.equals(vlanId)) ||
+                (popVlan && hostVlanId.equals(VlanId.NONE))) {
+            srManager.defaultRoutingHandler.updateBridging(deviceId, portNum, mac, vlanId, popVlan, install);
+            // Update Forwarding objective and corresponding simple Next objective
+            // for each host and IP address connected to given port
+            host.ipAddresses().forEach(ipAddress -> srManager.defaultRoutingHandler.updateFwdObj(
+                    deviceId, portNum, ipAddress.toIpPrefix(), mac, vlanId, popVlan, install)
+            );
+        }
     }
 
     /**
@@ -680,18 +721,23 @@
         }
 
         // Check whether the host IP address is in the interface's subnet
-        hosts.forEach(host ->
-            host.ipAddresses().forEach(hostIpAddress -> {
-                ipPrefixSet.forEach(ipPrefix -> {
-                    if (install && ipPrefix.contains(hostIpAddress)) {
-                            srManager.defaultRoutingHandler.populateRoute(cp.deviceId(), hostIpAddress.toIpPrefix(),
-                                                                         host.mac(), host.vlan(), cp.port(), true);
-                    } else if (!install && ipPrefix.contains(hostIpAddress)) {
-                            srManager.defaultRoutingHandler.revokeRoute(cp.deviceId(), hostIpAddress.toIpPrefix(),
-                                                                       host.mac(), host.vlan(), cp.port(), true);
-                    }
-                });
-            }));
+        hosts.forEach(host -> hostWorkers.execute(() -> processIntfIpUpdatedEventInternal(
+                host, cp, ipPrefixSet, install)));
+    }
+
+    private void processIntfIpUpdatedEventInternal(Host host, ConnectPoint cp, Set<IpPrefix> ipPrefixSet,
+                                                   boolean install) {
+        host.ipAddresses().forEach(hostIpAddress -> {
+            ipPrefixSet.forEach(ipPrefix -> {
+                if (install && ipPrefix.contains(hostIpAddress)) {
+                    srManager.defaultRoutingHandler.populateRoute(cp.deviceId(), hostIpAddress.toIpPrefix(),
+                                                                  host.mac(), host.vlan(), cp.port(), true);
+                } else if (!install && ipPrefix.contains(hostIpAddress)) {
+                    srManager.defaultRoutingHandler.revokeRoute(cp.deviceId(), hostIpAddress.toIpPrefix(),
+                                                                host.mac(), host.vlan(), cp.port(), true);
+                }
+            });
+        });
     }
 
     /**
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 6cc0e74..4c66dd0 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -642,6 +642,7 @@
         policyStore.destroy();
 
         mcastHandler.terminate();
+        hostHandler.terminate();
         log.info("Stopped");
     }
 
diff --git a/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/HostHandlerTest.java b/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/HostHandlerTest.java
index d0ca6cd..d97cbc7 100644
--- a/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/HostHandlerTest.java
+++ b/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/HostHandlerTest.java
@@ -28,6 +28,7 @@
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
+import org.onlab.util.PredictableExecutor;
 import org.onosproject.net.config.ConfigApplyDelegate;
 import org.onosproject.net.host.HostProbingService;
 import org.onosproject.net.host.ProbeMode;
@@ -65,8 +66,9 @@
 import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.*;
+import static org.onlab.util.Tools.groupedThreads;
 
-/**r
+/**
  * Unit test for {@link HostHandler}.
  */
 public class HostHandlerTest {
@@ -250,6 +252,8 @@
         replay(srManager.routeService);
 
         hostHandler = new HostHandler(srManager);
+        hostHandler.hostWorkers = new PredictableExecutor(
+                0, groupedThreads("onos/sr", "h-worker-%d"), true);
 
         ROUTING_TABLE.clear();
         BRIDGING_TABLE.clear();
diff --git a/utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java b/utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java
index 2be1ee2..279091c 100644
--- a/utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java
+++ b/utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java
@@ -15,6 +15,8 @@
  */
 package org.onlab.util;
 
+import com.google.common.util.concurrent.MoreExecutors;
+
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -67,6 +69,18 @@
      * @param threadFactory {@link ThreadFactory} to use to create threads
      */
     public PredictableExecutor(int buckets, ThreadFactory threadFactory) {
+        this(buckets, threadFactory, false);
+    }
+
+    /**
+     * Creates {@link PredictableExecutor} instance.
+     * Meant for testing purposes.
+     *
+     * @param buckets number of buckets or 0 to match available processors
+     * @param threadFactory {@link ThreadFactory} to use to create threads
+     * @param directExec direct executors
+     */
+    public PredictableExecutor(int buckets, ThreadFactory threadFactory, boolean directExec) {
         checkArgument(buckets >= 0, "number of buckets must be non zero");
         checkNotNull(threadFactory);
         if (buckets == 0) {
@@ -75,7 +89,7 @@
         this.backends = new ArrayList<>(buckets);
 
         for (int i = 0; i < buckets; ++i) {
-            this.backends.add(backendExecutorService(threadFactory));
+            this.backends.add(backendExecutorService(threadFactory, directExec));
         }
     }
 
@@ -93,10 +107,11 @@
      * Creates a single thread {@link ExecutorService} to use in the backend.
      *
      * @param threadFactory {@link ThreadFactory} to use to create threads
-     * @return single thread {@link ExecutorService}
+     * @param direct direct executors
+     * @return single thread {@link ExecutorService} or direct executor
      */
-    protected ExecutorService backendExecutorService(ThreadFactory threadFactory) {
-        return Executors.newSingleThreadExecutor(threadFactory);
+    protected ExecutorService backendExecutorService(ThreadFactory threadFactory, boolean direct) {
+        return direct ? MoreExecutors.newDirectExecutorService() : Executors.newSingleThreadExecutor(threadFactory);
     }