Improves host programming introducing a pool of worker threads
Change-Id: I979693aa220e2666c13c4015435c66173624ea64
diff --git a/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java b/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
index 14df622..c62e395 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
@@ -21,6 +21,7 @@
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
+import org.onlab.util.PredictableExecutor;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
@@ -40,6 +41,7 @@
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.onlab.util.Tools.groupedThreads;
/**
* Handles host-related events.
@@ -49,6 +51,9 @@
protected final SegmentRoutingManager srManager;
private HostService hostService;
+ // Host workers - 0 will leverage available processors
+ private static final int DEFAULT_THREADS = 0;
+ protected PredictableExecutor hostWorkers;
/**
* Constructs the HostHandler.
@@ -58,19 +63,36 @@
HostHandler(SegmentRoutingManager srManager) {
this.srManager = srManager;
hostService = srManager.hostService;
+ this.hostWorkers = new PredictableExecutor(DEFAULT_THREADS,
+ groupedThreads("onos/sr", "h-worker-%d", log));
+ }
+
+ /**
+ * Shutdowns the workers.
+ */
+ void terminate() {
+ hostWorkers.shutdown();
}
protected void init(DeviceId devId) {
- hostService.getHosts().forEach(host ->
- host.locations().stream()
- .filter(location -> location.deviceId().equals(devId) ||
- location.deviceId().equals(srManager.getPairDeviceId(devId).orElse(null)))
- .forEach(location -> processHostAddedAtLocation(host, location))
+ // Init hosts in parallel using hostWorkers executor
+ hostService.getHosts().forEach(
+ host -> hostWorkers.execute(() -> initHost(host, devId), host.id().hashCode())
);
}
+ private void initHost(Host host, DeviceId deviceId) {
+ host.locations().forEach(location -> {
+ if (location.deviceId().equals(deviceId) ||
+ location.deviceId().equals(srManager.getPairDeviceId(deviceId).orElse(null))) {
+ processHostAddedAtLocation(host, location);
+ }
+ });
+ }
+
void processHostAddedEvent(HostEvent event) {
- processHostAdded(event.subject());
+ Host host = event.subject();
+ hostWorkers.execute(() -> processHostAdded(host), host.id().hashCode());
}
private void processHostAdded(Host host) {
@@ -141,7 +163,8 @@
}
void processHostRemovedEvent(HostEvent event) {
- processHostRemoved(event.subject());
+ Host host = event.subject();
+ hostWorkers.execute(() -> processHostRemoved(host), host.id().hashCode());
}
private void processHostRemoved(Host host) {
@@ -196,6 +219,11 @@
void processHostMovedEvent(HostEvent event) {
Host host = event.subject();
+ hostWorkers.execute(() -> processHostMovedEventInternal(event), host.id().hashCode());
+ }
+
+ private void processHostMovedEventInternal(HostEvent event) {
+ Host host = event.subject();
MacAddress hostMac = host.mac();
VlanId hostVlanId = host.vlan();
Set<HostLocation> prevLocations = event.prevSubject().locations();
@@ -349,6 +377,11 @@
void processHostUpdatedEvent(HostEvent event) {
Host host = event.subject();
+ hostWorkers.execute(() -> processHostUpdatedEventInternal(event), host.id().hashCode());
+ }
+
+ private void processHostUpdatedEventInternal(HostEvent event) {
+ Host host = event.subject();
MacAddress hostMac = host.mac();
VlanId hostVlanId = host.vlan();
EthType hostTpid = host.tpid();
@@ -424,10 +457,15 @@
}
if (srManager.activeProbing) {
srManager.getPairDeviceId(cp.deviceId())
- .ifPresent(pairDeviceId -> srManager.hostService.getConnectedHosts(pairDeviceId).stream()
- .filter(host -> isHostInVlanOfPort(host, pairDeviceId, cp))
- .forEach(host -> srManager.probingService.probeHost(host, cp, ProbeMode.DISCOVER))
- );
+ .ifPresent(pairDeviceId -> srManager.hostService.getConnectedHosts(pairDeviceId).forEach(
+ host -> hostWorkers.execute(() -> probingIfNecessary(host, pairDeviceId, cp),
+ host.id().hashCode())));
+ }
+ }
+
+ private void probingIfNecessary(Host host, DeviceId pairDeviceId, ConnectPoint cp) {
+ if (isHostInVlanOfPort(host, pairDeviceId, cp)) {
+ srManager.probingService.probeHost(host, cp, ProbeMode.DISCOVER);
}
}
@@ -637,7 +675,7 @@
* @param install true to populate the objective, false to revoke
*/
void processIntfVlanUpdatedEvent(DeviceId deviceId, PortNumber portNum, VlanId vlanId,
- boolean popVlan, boolean install) {
+ boolean popVlan, boolean install) {
ConnectPoint connectPoint = new ConnectPoint(deviceId, portNum);
Set<Host> hosts = hostService.getConnectedHosts(connectPoint);
@@ -646,22 +684,25 @@
return;
}
- hosts.forEach(host -> {
- MacAddress mac = host.mac();
- VlanId hostVlanId = host.vlan();
+ hosts.forEach(host -> hostWorkers.execute(() -> processIntfVlanUpdatedEventInternal(
+ host, deviceId, portNum, vlanId, popVlan, install), host.id().hashCode()));
+ }
- // Check whether the host vlan is valid for new interface configuration
- if ((!popVlan && hostVlanId.equals(vlanId)) ||
- (popVlan && hostVlanId.equals(VlanId.NONE))) {
- srManager.defaultRoutingHandler.updateBridging(deviceId, portNum, mac, vlanId, popVlan, install);
- // Update Forwarding objective and corresponding simple Next objective
- // for each host and IP address connected to given port
- host.ipAddresses().forEach(ipAddress ->
- srManager.defaultRoutingHandler.updateFwdObj(deviceId, portNum, ipAddress.toIpPrefix(),
- mac, vlanId, popVlan, install)
- );
- }
- });
+ private void processIntfVlanUpdatedEventInternal(Host host, DeviceId deviceId, PortNumber portNum,
+ VlanId vlanId, boolean popVlan, boolean install) {
+ MacAddress mac = host.mac();
+ VlanId hostVlanId = host.vlan();
+
+ // Check whether the host vlan is valid for new interface configuration
+ if ((!popVlan && hostVlanId.equals(vlanId)) ||
+ (popVlan && hostVlanId.equals(VlanId.NONE))) {
+ srManager.defaultRoutingHandler.updateBridging(deviceId, portNum, mac, vlanId, popVlan, install);
+ // Update Forwarding objective and corresponding simple Next objective
+ // for each host and IP address connected to given port
+ host.ipAddresses().forEach(ipAddress -> srManager.defaultRoutingHandler.updateFwdObj(
+ deviceId, portNum, ipAddress.toIpPrefix(), mac, vlanId, popVlan, install)
+ );
+ }
}
/**
@@ -680,18 +721,23 @@
}
// Check whether the host IP address is in the interface's subnet
- hosts.forEach(host ->
- host.ipAddresses().forEach(hostIpAddress -> {
- ipPrefixSet.forEach(ipPrefix -> {
- if (install && ipPrefix.contains(hostIpAddress)) {
- srManager.defaultRoutingHandler.populateRoute(cp.deviceId(), hostIpAddress.toIpPrefix(),
- host.mac(), host.vlan(), cp.port(), true);
- } else if (!install && ipPrefix.contains(hostIpAddress)) {
- srManager.defaultRoutingHandler.revokeRoute(cp.deviceId(), hostIpAddress.toIpPrefix(),
- host.mac(), host.vlan(), cp.port(), true);
- }
- });
- }));
+ hosts.forEach(host -> hostWorkers.execute(() -> processIntfIpUpdatedEventInternal(
+ host, cp, ipPrefixSet, install)));
+ }
+
+ private void processIntfIpUpdatedEventInternal(Host host, ConnectPoint cp, Set<IpPrefix> ipPrefixSet,
+ boolean install) {
+ host.ipAddresses().forEach(hostIpAddress -> {
+ ipPrefixSet.forEach(ipPrefix -> {
+ if (install && ipPrefix.contains(hostIpAddress)) {
+ srManager.defaultRoutingHandler.populateRoute(cp.deviceId(), hostIpAddress.toIpPrefix(),
+ host.mac(), host.vlan(), cp.port(), true);
+ } else if (!install && ipPrefix.contains(hostIpAddress)) {
+ srManager.defaultRoutingHandler.revokeRoute(cp.deviceId(), hostIpAddress.toIpPrefix(),
+ host.mac(), host.vlan(), cp.port(), true);
+ }
+ });
+ });
}
/**
diff --git a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 6cc0e74..4c66dd0 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -642,6 +642,7 @@
policyStore.destroy();
mcastHandler.terminate();
+ hostHandler.terminate();
log.info("Stopped");
}
diff --git a/app/src/test/java/org/onosproject/segmentrouting/HostHandlerTest.java b/app/src/test/java/org/onosproject/segmentrouting/HostHandlerTest.java
index d0ca6cd..d97cbc7 100644
--- a/app/src/test/java/org/onosproject/segmentrouting/HostHandlerTest.java
+++ b/app/src/test/java/org/onosproject/segmentrouting/HostHandlerTest.java
@@ -28,6 +28,7 @@
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
+import org.onlab.util.PredictableExecutor;
import org.onosproject.net.config.ConfigApplyDelegate;
import org.onosproject.net.host.HostProbingService;
import org.onosproject.net.host.ProbeMode;
@@ -65,8 +66,9 @@
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.*;
+import static org.onlab.util.Tools.groupedThreads;
-/**r
+/**
* Unit test for {@link HostHandler}.
*/
public class HostHandlerTest {
@@ -250,6 +252,8 @@
replay(srManager.routeService);
hostHandler = new HostHandler(srManager);
+ hostHandler.hostWorkers = new PredictableExecutor(
+ 0, groupedThreads("onos/sr", "h-worker-%d"), true);
ROUTING_TABLE.clear();
BRIDGING_TABLE.clear();