Refactor: install rules in a separated thread to avoid thread blocking
Change-Id: I10ff88fb56f9358ec948f01176d6fe20d91e37c0
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
index 6ae7100..298263c 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
@@ -70,7 +70,6 @@
import org.onosproject.openstacknode.api.OpenstackNodeEvent;
import org.onosproject.openstacknode.api.OpenstackNodeListener;
import org.onosproject.openstacknode.api.OpenstackNodeService;
-import org.openstack4j.model.network.ExternalGateway;
import org.openstack4j.model.network.IP;
import org.openstack4j.model.network.NetFloatingIP;
import org.openstack4j.model.network.Network;
@@ -98,11 +97,13 @@
import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_PRE_REMOVE;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.associatedFloatingIp;
+import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.externalPeerRouterForNetwork;
+import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.floatingIpByInstancePort;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getGwByComputeDevId;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getGwByInstancePort;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getPropertyValue;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.isAssociatedWithVM;
-import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.processGratuitousArpPacketForFloatingIp;
+import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.processGarpPacketForFloatingIp;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.swapStaleLocation;
import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.GATEWAY;
@@ -241,14 +242,16 @@
targetMac = instPort.macAddress();
}
- OpenstackNode gw = getGwByInstancePort(osNodeService.completeNodes(GATEWAY), instPort);
+ OpenstackNode gw =
+ getGwByInstancePort(osNodeService.completeNodes(GATEWAY), instPort);
if (gw == null) {
return;
}
// if the ARP packet_in received from non-relevant GWs, we simply ignore it
- if (!Objects.equals(gw.intgBridge(), context.inPacket().receivedFrom().deviceId())) {
+ if (!Objects.equals(gw.intgBridge(),
+ context.inPacket().receivedFrom().deviceId())) {
return;
}
}
@@ -286,8 +289,10 @@
try {
- Set<String> extRouterIps = osNetworkService.externalPeerRouters().
- stream().map(r -> r.ipAddress().toString()).collect(Collectors.toSet());
+ Set<String> extRouterIps = osNetworkService.externalPeerRouters()
+ .stream()
+ .map(r -> r.ipAddress().toString())
+ .collect(Collectors.toSet());
// if SPA is NOT contained in existing external router IP set, we ignore it
if (!extRouterIps.contains(spa.toString())) {
@@ -354,7 +359,8 @@
* @param gateway gateway node
* @param install flow rule installation flag
*/
- private void setFloatingIpArpRuleForGateway(OpenstackNode gateway, boolean install) {
+ private void setFloatingIpArpRuleForGateway(OpenstackNode gateway,
+ boolean install) {
if (ARP_BROADCAST_MODE.equals(getArpMode())) {
Set<OpenstackNode> completedGws = osNodeService.completeNodes(GATEWAY);
@@ -367,14 +373,16 @@
finalGws.remove(gateway);
osRouterAdminService.floatingIps().forEach(fip -> {
if (fip.getPortId() != null) {
- setFloatingIpArpRule(fip, fip.getPortId(), finalGws, false);
+ setFloatingIpArpRule(fip, fip.getPortId(),
+ finalGws, false);
finalGws.add(gateway);
}
});
}
osRouterAdminService.floatingIps().forEach(fip -> {
if (fip.getPortId() != null) {
- setFloatingIpArpRule(fip, fip.getPortId(), finalGws, true);
+ setFloatingIpArpRule(fip, fip.getPortId(),
+ finalGws, true);
}
});
} else {
@@ -385,14 +393,16 @@
finalGws.add(gateway);
osRouterAdminService.floatingIps().forEach(fip -> {
if (fip.getPortId() != null) {
- setFloatingIpArpRule(fip, fip.getPortId(), finalGws, false);
+ setFloatingIpArpRule(fip, fip.getPortId(),
+ finalGws, false);
}
});
finalGws.remove(gateway);
if (completedGws.size() >= 1) {
osRouterAdminService.floatingIps().forEach(fip -> {
if (fip.getPortId() != null) {
- setFloatingIpArpRule(fip, fip.getPortId(), finalGws, true);
+ setFloatingIpArpRule(fip, fip.getPortId(),
+ finalGws, true);
}
});
}
@@ -516,15 +526,16 @@
}
private void setFakeGatewayArpRuleByRouter(Router router, boolean install) {
- setFakeGatewayArpRuleByGateway(router.getId(), router.getExternalGatewayInfo(), install);
+ setFakeGatewayArpRuleByGateway(router.getId(), install);
}
- private Set<IP> getExternalGatewaySnatIps(String routerId, ExternalGateway extGw) {
+ private Set<IP> getExternalGatewaySnatIps(String routerId) {
if (routerId == null) {
return ImmutableSet.of();
}
- Set<String> portIds = osRouterAdminService.routerInterfaces(routerId).stream()
+ Set<String> portIds = osRouterAdminService.routerInterfaces(routerId)
+ .stream()
.map(RouterInterface::getPortId)
.collect(Collectors.toSet());
@@ -535,14 +546,9 @@
.collect(Collectors.toSet());
}
- private void setFakeGatewayArpRuleByGateway(String routerId, ExternalGateway extGw, boolean install) {
+ private void setFakeGatewayArpRuleByGateway(String routerId, boolean install) {
if (ARP_BROADCAST_MODE.equals(getArpMode())) {
-
- if (extGw == null) {
- return;
- }
-
- setFakeGatewayArpRuleByIps(getExternalGatewaySnatIps(routerId, extGw), install);
+ setFakeGatewayArpRuleByIps(getExternalGatewaySnatIps(routerId), install);
}
}
@@ -607,12 +613,14 @@
case OPENSTACK_PORT_CREATED:
case OPENSTACK_PORT_UPDATED:
eventExecutor.execute(() ->
- setFakeGatewayArpRuleByIps((Set<IP>) event.port().getFixedIps(), true)
+ setFakeGatewayArpRuleByIps(
+ (Set<IP>) event.port().getFixedIps(), true)
);
break;
case OPENSTACK_PORT_REMOVED:
eventExecutor.execute(() ->
- setFakeGatewayArpRuleByIps((Set<IP>) event.port().getFixedIps(), false)
+ setFakeGatewayArpRuleByIps(
+ (Set<IP>) event.port().getFixedIps(), false)
);
break;
default:
@@ -656,58 +664,60 @@
case OPENSTACK_ROUTER_GATEWAY_ADDED:
eventExecutor.execute(() ->
// add a gateway manually after adding a router
- setFakeGatewayArpRuleByGateway(event.subject().getId(),
- event.externalGateway(), true)
+ setFakeGatewayArpRuleByGateway(event.subject().getId(), true)
);
break;
case OPENSTACK_ROUTER_GATEWAY_REMOVED:
eventExecutor.execute(() ->
// remove a gateway from an existing router
- setFakeGatewayArpRuleByGateway(event.subject().getId(),
- event.externalGateway(), false)
+ setFakeGatewayArpRuleByGateway(event.subject().getId(), false)
);
break;
case OPENSTACK_FLOATING_IP_ASSOCIATED:
- if (getValidPortId(event) != null) {
- eventExecutor.execute(() -> {
- // associate a floating IP with an existing VM
- setFloatingIpArpRule(event.floatingIp(), getValidPortId(event),
- completedGws, true);
- });
- }
+ eventExecutor.execute(() -> {
+ if (getValidPortId(event) == null) {
+ return;
+ }
+ // associate a floating IP with an existing VM
+ setFloatingIpArpRule(event.floatingIp(),
+ getValidPortId(event), completedGws, true);
+ });
break;
case OPENSTACK_FLOATING_IP_DISASSOCIATED:
- if (getValidPortId(event) != null) {
- eventExecutor.execute(() -> {
- // disassociate a floating IP with an existing VM
- setFloatingIpArpRule(event.floatingIp(), getValidPortId(event),
- completedGws, false);
- });
- }
+ eventExecutor.execute(() -> {
+ if (getValidPortId(event) == null) {
+ return;
+ }
+ // disassociate a floating IP with an existing VM
+ setFloatingIpArpRule(event.floatingIp(),
+ getValidPortId(event), completedGws, false);
+ });
break;
case OPENSTACK_FLOATING_IP_CREATED:
// during floating IP creation, if the floating IP is
// associated with any port of VM, then we will set
// floating IP related ARP rules to gateway node
- if (getValidPortId(event) != null) {
- eventExecutor.execute(() -> {
- // associate a floating IP with an existing VM
- setFloatingIpArpRule(event.floatingIp(), getValidPortId(event),
- completedGws, true);
- });
- }
+ eventExecutor.execute(() -> {
+ if (getValidPortId(event) == null) {
+ return;
+ }
+ // associate a floating IP with an existing VM
+ setFloatingIpArpRule(event.floatingIp(),
+ getValidPortId(event), completedGws, true);
+ });
break;
case OPENSTACK_FLOATING_IP_REMOVED:
// during floating IP deletion, if the floating IP is
// still associated with any port of VM, then we will
// remove floating IP related ARP rules from gateway node
- if (getValidPortId(event) != null) {
- eventExecutor.execute(() -> {
- // associate a floating IP with an existing VM
- setFloatingIpArpRule(event.floatingIp(), getValidPortId(event),
- completedGws, false);
- });
- }
+ eventExecutor.execute(() -> {
+ if (getValidPortId(event) == null) {
+ return;
+ }
+ // associate a floating IP with an existing VM
+ setFloatingIpArpRule(event.floatingIp(),
+ getValidPortId(event), completedGws, false);
+ });
break;
default:
// do nothing for the other events
@@ -750,50 +760,52 @@
switch (event.type()) {
case OPENSTACK_INSTANCE_PORT_DETECTED:
-
- osRouterAdminService.floatingIps().stream()
- .filter(f -> f.getPortId() != null)
- .filter(f -> f.getPortId().equals(instPort.portId()))
- .forEach(f -> setFloatingIpArpRule(f, instPort.portId(), gateways, true));
-
+ eventExecutor.execute(() ->
+ osRouterAdminService.floatingIps().stream()
+ .filter(f -> f.getPortId() != null)
+ .filter(f -> f.getPortId().equals(instPort.portId()))
+ .forEach(f -> setFloatingIpArpRule(f,
+ instPort.portId(), gateways, true))
+ );
break;
case OPENSTACK_INSTANCE_MIGRATION_STARTED:
-
- if (gateways.size() == 1) {
- return;
- }
-
- if (fip != null && isAssociatedWithVM(osNetworkService, fip)) {
- eventExecutor.execute(() ->
- setFloatingIpArpRuleWithPortEvent(fip, event.subject(),
- gateways, true)
- );
- }
-
- break;
- case OPENSTACK_INSTANCE_MIGRATION_ENDED:
-
- InstancePort revisedInstPort = swapStaleLocation(event.subject());
-
- if (gateways.size() == 1) {
- return;
- }
-
- if (fip != null && isAssociatedWithVM(osNetworkService, fip)) {
- DeviceId newDeviceId = event.subject().deviceId();
- DeviceId oldDeviceId = revisedInstPort.deviceId();
-
- OpenstackNode oldGw = getGwByComputeDevId(gateways, oldDeviceId);
- OpenstackNode newGw = getGwByComputeDevId(gateways, newDeviceId);
-
- if (oldGw != null && oldGw.equals(newGw)) {
+ eventExecutor.execute(() -> {
+ if (gateways.size() == 1) {
return;
}
- eventExecutor.execute(() ->
- setFloatingIpArpRuleWithPortEvent(fip,
- revisedInstPort, gateways, false));
- }
+ if (fip != null && isAssociatedWithVM(osNetworkService, fip)) {
+ setFloatingIpArpRuleWithPortEvent(fip,
+ event.subject(), gateways, true);
+ }
+ });
+
+ break;
+ case OPENSTACK_INSTANCE_MIGRATION_ENDED:
+ eventExecutor.execute(() -> {
+ InstancePort revisedInstPort = swapStaleLocation(event.subject());
+
+ if (gateways.size() == 1) {
+ return;
+ }
+
+ if (fip != null && isAssociatedWithVM(osNetworkService, fip)) {
+ DeviceId newDeviceId = event.subject().deviceId();
+ DeviceId oldDeviceId = revisedInstPort.deviceId();
+
+ OpenstackNode oldGw =
+ getGwByComputeDevId(gateways, oldDeviceId);
+ OpenstackNode newGw =
+ getGwByComputeDevId(gateways, newDeviceId);
+
+ if (oldGw != null && oldGw.equals(newGw)) {
+ return;
+ }
+
+ setFloatingIpArpRuleWithPortEvent(fip,
+ revisedInstPort, gateways, false);
+ }
+ });
break;
default:
break;
@@ -815,44 +827,52 @@
OpenstackNode osNode = event.subject();
switch (event.type()) {
case OPENSTACK_NODE_COMPLETE:
- setDefaultArpRule(osNode, true);
- setFloatingIpArpRuleForGateway(osNode, true);
- sendGratuitousArpToSwitch(event.subject(), true);
+ eventExecutor.execute(() -> {
+ setDefaultArpRule(osNode, true);
+ setFloatingIpArpRuleForGateway(osNode, true);
+ sendGratuitousArpToSwitch(event.subject(), true);
+ });
break;
case OPENSTACK_NODE_INCOMPLETE:
- setDefaultArpRule(osNode, false);
- setFloatingIpArpRuleForGateway(osNode, false);
- sendGratuitousArpToSwitch(event.subject(), false);
+ eventExecutor.execute(() -> {
+ setDefaultArpRule(osNode, false);
+ setFloatingIpArpRuleForGateway(osNode, false);
+ sendGratuitousArpToSwitch(event.subject(), false);
+ });
break;
case OPENSTACK_NODE_REMOVED:
- sendGratuitousArpToSwitch(event.subject(), false);
+ eventExecutor.execute(() -> {
+ sendGratuitousArpToSwitch(event.subject(), false);
+ });
break;
-
default:
break;
}
}
- private void sendGratuitousArpToSwitch(OpenstackNode gatewayNode, boolean isCompleteCase) {
- Set<OpenstackNode> completeGws = ImmutableSet.copyOf(osNodeService.completeNodes(GATEWAY));
+ private void sendGratuitousArpToSwitch(OpenstackNode gatewayNode,
+ boolean isCompleteCase) {
+ Set<OpenstackNode> completeGws =
+ ImmutableSet.copyOf(osNodeService.completeNodes(GATEWAY));
if (isCompleteCase) {
- osNodeService.completeNodes(COMPUTE)
- .stream()
- .filter(node -> isGwSelectedByComputeNode(completeGws, node, gatewayNode))
- .forEach(node -> processGratuitousArpPacketForComputeNode(node, gatewayNode));
+ osNodeService.completeNodes(COMPUTE).stream()
+ .filter(node -> isGwSelectedByComputeNode(completeGws,
+ node, gatewayNode))
+ .forEach(node -> processGarpPacketForComputeNode(node, gatewayNode));
} else {
Set<OpenstackNode> oldCompleteGws = Sets.newConcurrentHashSet();
oldCompleteGws.addAll(ImmutableSet.copyOf(osNodeService.completeNodes(GATEWAY)));
oldCompleteGws.add(gatewayNode);
- osNodeService.completeNodes(COMPUTE)
- .stream()
- .filter(node -> isGwSelectedByComputeNode(oldCompleteGws, node, gatewayNode))
+ osNodeService.completeNodes(COMPUTE).stream()
+ .filter(node -> isGwSelectedByComputeNode(oldCompleteGws,
+ node, gatewayNode))
.forEach(node -> {
- OpenstackNode newSelectedGatewayNode = getGwByComputeDevId(completeGws, node.intgBridge());
- processGratuitousArpPacketForComputeNode(node, newSelectedGatewayNode);
+ OpenstackNode newSelectedGatewayNode =
+ getGwByComputeDevId(completeGws, node.intgBridge());
+ processGarpPacketForComputeNode(node, newSelectedGatewayNode);
});
}
}
@@ -865,16 +885,20 @@
.intgBridge().equals(gwNode.intgBridge());
}
- private void processGratuitousArpPacketForComputeNode(OpenstackNode computeNode, OpenstackNode gatewayNode) {
- instancePortService.instancePort(computeNode.intgBridge()).forEach(instancePort -> {
- NetFloatingIP floatingIP = OpenstackNetworkingUtil.floatingIpByInstancePort(instancePort,
- osRouterAdminService);
+ private void processGarpPacketForComputeNode(OpenstackNode computeNode,
+ OpenstackNode gatewayNode) {
+ instancePortService.instancePort(computeNode.intgBridge())
+ .forEach(instancePort -> {
+ NetFloatingIP floatingIP =
+ floatingIpByInstancePort(instancePort, osRouterAdminService);
Network network = osNetworkService.network(instancePort.networkId());
- ExternalPeerRouter externalPeerRouter = OpenstackNetworkingUtil.externalPeerRouterForNetwork(network,
- osNetworkService, osRouterAdminService);
+ ExternalPeerRouter externalPeerRouter =
+ externalPeerRouterForNetwork(network, osNetworkService,
+ osRouterAdminService);
if (floatingIP != null && externalPeerRouter != null) {
- processGratuitousArpPacketForFloatingIp(
- floatingIP, instancePort, externalPeerRouter.vlanId(), gatewayNode, packetService);
+ processGarpPacketForFloatingIp(
+ floatingIP, instancePort, externalPeerRouter.vlanId(),
+ gatewayNode, packetService);
}
});
}
@@ -899,7 +923,8 @@
}
}
- private void setDefaultArpRuleForProxyMode(OpenstackNode osNode, boolean install) {
+ private void setDefaultArpRuleForProxyMode(OpenstackNode osNode,
+ boolean install) {
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchEthType(EthType.EtherType.ARP.ethType().toShort())
.build();
@@ -919,7 +944,8 @@
);
}
- private void setDefaultArpRuleForBroadcastMode(OpenstackNode osNode, boolean install) {
+ private void setDefaultArpRuleForBroadcastMode(OpenstackNode osNode,
+ boolean install) {
// we only match ARP_REPLY in gateway node, because controller
// somehow need to process ARP_REPLY which is issued from
// external router...
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
index 732275c..15d2f96 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
@@ -85,7 +85,7 @@
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getGwByComputeDevId;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getGwByInstancePort;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.isAssociatedWithVM;
-import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.processGratuitousArpPacketForFloatingIp;
+import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.processGarpPacketForFloatingIp;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.swapStaleLocation;
import static org.onosproject.openstacknetworking.util.RulePopulatorUtil.buildExtension;
import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.GATEWAY;
@@ -440,7 +440,8 @@
}
private void setUpstreamRules(NetFloatingIP floatingIp, Network osNet,
- InstancePort instPort, ExternalPeerRouter externalPeerRouter,
+ InstancePort instPort,
+ ExternalPeerRouter externalPeerRouter,
boolean install) {
IpAddress floating = IpAddress.valueOf(floatingIp.getFloatingIpAddress());
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
@@ -515,12 +516,13 @@
ExternalPeerRouter externalPeerRouter =
externalPeerRouterForNetwork(osNet, osNetworkService, osRouterAdminService);
if (externalPeerRouter == null) {
- log.error("Failed to process GARP packet for floating ip {} because no external peer router found");
+ log.error("Failed to process GARP packet for floating ip {} " +
+ "because no external peer router found");
return;
}
- processGratuitousArpPacketForFloatingIp(floatingIP, instancePort, externalPeerRouter.vlanId(),
- selectedGw, packetService);
+ processGarpPacketForFloatingIp(floatingIP, instancePort,
+ externalPeerRouter.vlanId(), selectedGw, packetService);
}
@@ -604,7 +606,8 @@
preCommitPortService.unsubscribePreCommit(osFip.getPortId(),
OPENSTACK_PORT_PRE_REMOVE, instancePortService,
this.getClass().getName());
- log.info("Unsubscribed the port {} on listening pre-remove event", osFip.getPortId());
+ log.info("Unsubscribed the port {} on listening pre-remove event",
+ osFip.getPortId());
}
log.info("Removed floating IP {}", osFip.getFloatingIpAddress());
});
@@ -746,17 +749,19 @@
switch (event.type()) {
case OPENSTACK_INSTANCE_PORT_DETECTED:
- if (instPort != null && instPort.portId() != null) {
- osRouterAdminService.floatingIps().stream()
- .filter(f -> f.getPortId() != null)
- .filter(f -> f.getPortId().equals(instPort.portId()))
- .forEach(f -> setFloatingIpRules(f, instPort, null, true));
- }
+ eventExecutor.execute(() -> {
+ if (instPort != null && instPort.portId() != null) {
+ osRouterAdminService.floatingIps().stream()
+ .filter(f -> f.getPortId() != null)
+ .filter(f -> f.getPortId().equals(instPort.portId()))
+ .forEach(f -> setFloatingIpRules(f,
+ instPort, null, true));
+ }
+ });
break;
case OPENSTACK_INSTANCE_MIGRATION_STARTED:
-
fip = associatedFloatingIp(event.subject(), ips);
if (fip == null) {
@@ -765,7 +770,8 @@
osPort = osNetworkService.port(fip.getPortId());
osNet = osNetworkService.network(osPort.getNetworkId());
- externalPeerRouter = externalPeerRouterForNetwork(osNet, osNetworkService, osRouterAdminService);
+ externalPeerRouter = externalPeerRouterForNetwork(osNet,
+ osNetworkService, osRouterAdminService);
if (externalPeerRouter == null) {
final String errorFormat = ERR_FLOW + "no external peer router found";
@@ -798,7 +804,8 @@
osPort = osNetworkService.port(fip.getPortId());
osNet = osNetworkService.network(osPort.getNetworkId());
- externalPeerRouter = externalPeerRouterForNetwork(osNet, osNetworkService, osRouterAdminService);
+ externalPeerRouter = externalPeerRouterForNetwork(osNet,
+ osNetworkService, osRouterAdminService);
if (externalPeerRouter == null) {
final String errorFormat = ERR_FLOW + "no external peer router found";
@@ -859,29 +866,31 @@
public void event(OpenstackNetworkEvent event) {
switch (event.type()) {
case OPENSTACK_PORT_PRE_REMOVE:
- InstancePort instPort =
- instancePortService.instancePort(event.port().getId());
-
- if (instPort == null) {
- break;
- }
-
- NetFloatingIP fip =
- associatedFloatingIp(instPort, osRouterAdminService.floatingIps());
-
- if (fip != null) {
- instancePortService.updateInstancePort(
- instPort.updateState(REMOVE_PENDING));
- eventExecutor.execute(() -> updateFipStore(event.port().getId()));
- } else {
- instancePortService.removeInstancePort(instPort.portId());
- }
+ eventExecutor.execute(() -> processPortPreRemove(event));
break;
default:
break;
}
}
+ private void processPortPreRemove(OpenstackNetworkEvent event) {
+ InstancePort instPort = instancePortService.instancePort(
+ event.port().getId());
+ if (instPort == null) {
+ return;
+ }
+ NetFloatingIP fip = associatedFloatingIp(instPort,
+ osRouterAdminService.floatingIps());
+
+ if (fip != null) {
+ instancePortService.updateInstancePort(
+ instPort.updateState(REMOVE_PENDING));
+ updateFipStore(event.port().getId());
+ } else {
+ instancePortService.removeInstancePort(instPort.portId());
+ }
+ }
+
private void updateFipStore(String portId) {
if (portId == null) {
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java
index 28697ed..edc9e09 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java
@@ -100,6 +100,7 @@
import static org.onosproject.openstacknetworking.util.RulePopulatorUtil.buildExtension;
import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.GATEWAY;
+import static org.openstack4j.model.network.NetworkType.FLAT;
/**
* Handles OpenStack router events.
@@ -332,7 +333,7 @@
Subnet osSubnet = osNetworkAdminService.subnet(routerIface.getSubnetId());
Network osNet = osNetworkAdminService.network(osSubnet.getNetworkId());
- if (osNet.getNetworkType() == NetworkType.FLAT) {
+ if (osNet.getNetworkType() == FLAT) {
return;
}
@@ -996,10 +997,12 @@
event.routerIface()));
break;
case OPENSTACK_ROUTER_GATEWAY_ADDED:
- log.debug("Router external gateway {} added", event.externalGateway().getNetworkId());
+ log.debug("Router external gateway {} added",
+ event.externalGateway().getNetworkId());
break;
case OPENSTACK_ROUTER_GATEWAY_REMOVED:
- log.debug("Router external gateway {} removed", event.externalGateway().getNetworkId());
+ log.debug("Router external gateway {} removed",
+ event.externalGateway().getNetworkId());
break;
case OPENSTACK_FLOATING_IP_CREATED:
case OPENSTACK_FLOATING_IP_UPDATED:
@@ -1104,7 +1107,7 @@
}
private void instPortDetected(InstancePort instPort) {
- if (osNetworkAdminService.network(instPort.networkId()).getNetworkType() == NetworkType.FLAT) {
+ if (osNetworkAdminService.network(instPort.networkId()).getNetworkType() == FLAT) {
return;
}
@@ -1120,7 +1123,7 @@
}
private void instPortRemoved(InstancePort instPort) {
- if (osNetworkAdminService.network(instPort.networkId()).getNetworkType() == NetworkType.FLAT) {
+ if (osNetworkAdminService.network(instPort.networkId()).getNetworkType() == FLAT) {
return;
}
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
index 83f75aa..2f8b7db 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
@@ -767,20 +767,22 @@
case OPENSTACK_INSTANCE_PORT_UPDATED:
case OPENSTACK_INSTANCE_PORT_DETECTED:
case OPENSTACK_INSTANCE_MIGRATION_STARTED:
- installSecurityGroupRules(event, instPort);
+ eventExecutor.execute(() ->
+ installSecurityGroupRules(event, instPort));
break;
case OPENSTACK_INSTANCE_PORT_VANISHED:
- Port osPort = removedOsPortStore.asJavaMap().get(instPort.portId());
- eventExecutor.execute(() ->
- setSecurityGroupRules(instPort, osPort, false)
- );
- removedOsPortStore.remove(instPort.portId());
+ eventExecutor.execute(() -> {
+ Port osPort = removedOsPortStore.asJavaMap().get(instPort.portId());
+ setSecurityGroupRules(instPort, osPort, false);
+ removedOsPortStore.remove(instPort.portId());
+ });
break;
case OPENSTACK_INSTANCE_MIGRATION_ENDED:
- InstancePort revisedInstPort = swapStaleLocation(instPort);
- Port port = osNetService.port(instPort.portId());
- eventExecutor.execute(() ->
- setSecurityGroupRules(revisedInstPort, port, false));
+ eventExecutor.execute(() -> {
+ InstancePort revisedInstPort = swapStaleLocation(instPort);
+ Port port = osNetService.port(instPort.portId());
+ setSecurityGroupRules(revisedInstPort, port, false);
+ });
break;
default:
break;
@@ -822,7 +824,8 @@
switch (event.type()) {
case OPENSTACK_PORT_PRE_REMOVE:
- removedOsPortStore.put(osPort.getId(), osPort);
+ eventExecutor.execute(() ->
+ removedOsPortStore.put(osPort.getId(), osPort));
break;
default:
// do nothing for the other events
@@ -945,7 +948,8 @@
public void event(OpenstackNodeEvent event) {
switch (event.type()) {
case OPENSTACK_NODE_COMPLETE:
- resetSecurityGroupRules();
+ eventExecutor.execute(OpenstackSecurityGroupHandler.this::
+ resetSecurityGroupRules);
break;
case OPENSTACK_NODE_CREATED:
case OPENSTACK_NODE_REMOVED:
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
index 95661a3..3f72375 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
@@ -71,8 +71,11 @@
import java.util.Dictionary;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.openstacknetworking.api.Constants.ARP_BROADCAST_MODE;
import static org.onosproject.openstacknetworking.api.Constants.ARP_PROXY_MODE;
import static org.onosproject.openstacknetworking.api.Constants.ARP_TABLE;
@@ -148,6 +151,9 @@
private final InstancePortListener instancePortListener = new InternalInstancePortListener();
private final OpenstackNodeListener osNodeListener = new InternalNodeEventListener();
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
private ApplicationId appId;
private NodeId localNodeId;
@@ -175,6 +181,7 @@
instancePortService.removeListener(instancePortListener);
leadershipService.withdraw(appId.name());
configService.unregisterProperties(getClass(), false);
+ eventExecutor.shutdown();
log.info("Stopped");
}
@@ -543,7 +550,8 @@
if (ethPacket == null || ethPacket.getEtherType() != Ethernet.TYPE_ARP) {
return;
}
- processPacketIn(context, ethPacket);
+
+ eventExecutor.execute(() -> processPacketIn(context, ethPacket));
}
}
@@ -586,10 +594,14 @@
switch (event.type()) {
case OPENSTACK_SUBNET_CREATED:
case OPENSTACK_SUBNET_UPDATED:
- setFakeGatewayArpRule(event.subnet(), true, null);
+ eventExecutor.execute(() -> {
+ setFakeGatewayArpRule(event.subnet(), true, null);
+ });
break;
case OPENSTACK_SUBNET_REMOVED:
- setFakeGatewayArpRule(event.subnet(), false, null);
+ eventExecutor.execute(() -> {
+ setFakeGatewayArpRule(event.subnet(), false, null);
+ });
break;
case OPENSTACK_NETWORK_CREATED:
case OPENSTACK_NETWORK_UPDATED:
@@ -624,12 +636,16 @@
OpenstackNode osNode = event.subject();
switch (event.type()) {
case OPENSTACK_NODE_COMPLETE:
- setDefaultArpRule(osNode, true);
- setAllArpRules(osNode, true);
+ eventExecutor.execute(() -> {
+ setDefaultArpRule(osNode, true);
+ setAllArpRules(osNode, true);
+ });
break;
case OPENSTACK_NODE_INCOMPLETE:
- setDefaultArpRule(osNode, false);
- setAllArpRules(osNode, false);
+ eventExecutor.execute(() -> {
+ setDefaultArpRule(osNode, false);
+ setAllArpRules(osNode, false);
+ });
break;
default:
break;
@@ -745,20 +761,28 @@
switch (event.type()) {
case OPENSTACK_INSTANCE_PORT_DETECTED:
case OPENSTACK_INSTANCE_PORT_UPDATED:
- setArpRequestRule(event.subject(), true);
- setArpReplyRule(event.subject(), true);
+ eventExecutor.execute(() -> {
+ setArpRequestRule(event.subject(), true);
+ setArpReplyRule(event.subject(), true);
+ });
break;
case OPENSTACK_INSTANCE_PORT_VANISHED:
- setArpRequestRule(event.subject(), false);
- setArpReplyRule(event.subject(), false);
+ eventExecutor.execute(() -> {
+ setArpRequestRule(event.subject(), false);
+ setArpReplyRule(event.subject(), false);
+ });
break;
case OPENSTACK_INSTANCE_MIGRATION_STARTED:
- setArpRequestRule(event.subject(), true);
- setArpReplyRule(event.subject(), true);
+ eventExecutor.execute(() -> {
+ setArpRequestRule(event.subject(), true);
+ setArpReplyRule(event.subject(), true);
+ });
break;
case OPENSTACK_INSTANCE_MIGRATION_ENDED:
- InstancePort revisedInstPort = swapStaleLocation(event.subject());
- setArpRequestRule(revisedInstPort, false);
+ eventExecutor.execute(() -> {
+ InstancePort revisedInstPort = swapStaleLocation(event.subject());
+ setArpRequestRule(revisedInstPort, false);
+ });
break;
default:
break;
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingDhcpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingDhcpHandler.java
index de4f8f2..68bf34f 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingDhcpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingDhcpHandler.java
@@ -72,8 +72,10 @@
import java.util.Dictionary;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_BroadcastAddress;
import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_Classless_Static_Route;
import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_DHCPServerIp;
@@ -85,6 +87,7 @@
import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_SubnetMask;
import static org.onlab.packet.DHCP.MsgType.DHCPACK;
import static org.onlab.packet.DHCP.MsgType.DHCPOFFER;
+import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.openstacknetworking.api.Constants.DEFAULT_GATEWAY_MAC_STR;
import static org.onosproject.openstacknetworking.api.Constants.DHCP_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_DHCP_RULE;
@@ -150,6 +153,9 @@
private final PacketProcessor packetProcessor = new InternalPacketProcessor();
private final OpenstackNodeListener osNodeListener = new InternalNodeEventListener();
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+
private ApplicationId appId;
private NodeId localNodeId;
@@ -171,6 +177,7 @@
osNodeService.removeListener(osNodeListener);
configService.unregisterProperties(getClass(), false);
leadershipService.withdraw(appId.name());
+ eventExecutor.shutdown();
log.info("Stopped");
}
@@ -212,7 +219,8 @@
}
DHCP dhcpPacket = (DHCP) udpPacket.getPayload();
- processDhcp(context, dhcpPacket);
+
+ eventExecutor.execute(() -> processDhcp(context, dhcpPacket));
}
private void processDhcp(PacketContext context, DHCP dhcpPacket) {
@@ -547,10 +555,10 @@
OpenstackNode osNode = event.subject();
switch (event.type()) {
case OPENSTACK_NODE_COMPLETE:
- setDhcpRule(osNode, true);
+ eventExecutor.execute(() -> setDhcpRule(osNode, true));
break;
case OPENSTACK_NODE_INCOMPLETE:
- setDhcpRule(osNode, false);
+ eventExecutor.execute(() -> setDhcpRule(osNode, false));
break;
case OPENSTACK_NODE_CREATED:
case OPENSTACK_NODE_UPDATED:
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java
index e63444f..9039ed2 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java
@@ -47,7 +47,6 @@
import org.onosproject.openstacknetworking.api.OpenstackNetworkEvent;
import org.onosproject.openstacknetworking.api.OpenstackNetworkListener;
import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
-import org.onosproject.openstacknetworking.api.OpenstackSecurityGroupService;
import org.onosproject.openstacknode.api.OpenstackNode;
import org.onosproject.openstacknode.api.OpenstackNodeService;
import org.openstack4j.model.network.Network;
@@ -129,9 +128,6 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackNodeService osNodeService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected OpenstackSecurityGroupService securityGroupService;
-
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler"));
private final InstancePortListener instancePortListener = new InternalInstancePortListener();
@@ -797,7 +793,7 @@
boolean isNwAdminStateUp = event.subject().isAdminStateUp();
boolean isPortAdminStateUp = event.port().isAdminStateUp();
- InstancePort instPort = instancePortService.instancePort(event.port().getId());
+ String portId = event.port().getId();
switch (event.type()) {
case OPENSTACK_NETWORK_CREATED:
@@ -812,20 +808,20 @@
break;
case OPENSTACK_PORT_CREATED:
case OPENSTACK_PORT_UPDATED:
-
- if (instPort != null) {
- eventExecutor.execute(() ->
- setPortBlockRules(instPort, !isPortAdminStateUp));
- }
-
+ eventExecutor.execute(() -> {
+ InstancePort instPort = instancePortService.instancePort(portId);
+ if (instPort != null) {
+ setPortBlockRules(instPort, !isPortAdminStateUp);
+ }
+ });
break;
case OPENSTACK_PORT_REMOVED:
-
- if (instPort != null) {
- eventExecutor.execute(() ->
- setPortBlockRules(instPort, false));
- }
-
+ eventExecutor.execute(() -> {
+ InstancePort instPort = instancePortService.instancePort(portId);
+ if (instPort != null) {
+ setPortBlockRules(instPort, false);
+ }
+ });
break;
default:
break;
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHostProvider.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHostProvider.java
index 40413d7..92eab4f 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHostProvider.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHostProvider.java
@@ -306,11 +306,11 @@
@Override
public boolean isRelevant(DeviceEvent event) {
- Device device = event.subject();
- if (!mastershipService.isLocalMaster(device.id())) {
+ if (!mastershipService.isLocalMaster(event.subject().id())) {
// do not allow to proceed without mastership
return false;
}
+
Port port = event.port();
if (port == null) {
return false;
@@ -390,8 +390,10 @@
private void processCompleteNode(OpenstackNode osNode) {
deviceService.getPorts(osNode.intgBridge()).stream()
- .filter(port -> vnicType(port.annotations().value(PORT_NAME)).equals(Constants.VnicType.NORMAL) ||
- vnicType(port.annotations().value(PORT_NAME)).equals(Constants.VnicType.DIRECT))
+ .filter(port -> vnicType(port.annotations().value(PORT_NAME))
+ .equals(Constants.VnicType.NORMAL) ||
+ vnicType(port.annotations().value(PORT_NAME))
+ .equals(Constants.VnicType.DIRECT))
.filter(Port::isEnabled)
.forEach(port -> {
log.debug("Instance port {} is detected from {}",
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/util/OpenstackNetworkingUtil.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/util/OpenstackNetworkingUtil.java
index db15e15..25a7e41 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/util/OpenstackNetworkingUtil.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/util/OpenstackNetworkingUtil.java
@@ -884,11 +884,11 @@
* @param gatewayNode gateway node
* @param packetService packet service
*/
- public static void processGratuitousArpPacketForFloatingIp(NetFloatingIP floatingIP,
- InstancePort instancePort,
- VlanId vlanId,
- OpenstackNode gatewayNode,
- PacketService packetService) {
+ public static void processGarpPacketForFloatingIp(NetFloatingIP floatingIP,
+ InstancePort instancePort,
+ VlanId vlanId,
+ OpenstackNode gatewayNode,
+ PacketService packetService) {
Ethernet ethernet = buildGratuitousArpPacket(floatingIP, instancePort, vlanId);
TrafficTreatment treatment = DefaultTrafficTreatment.builder()