Refactor: install rules in a separated thread to avoid thread blocking
Change-Id: I10ff88fb56f9358ec948f01176d6fe20d91e37c0
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
index 95661a3..3f72375 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
@@ -71,8 +71,11 @@
import java.util.Dictionary;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.openstacknetworking.api.Constants.ARP_BROADCAST_MODE;
import static org.onosproject.openstacknetworking.api.Constants.ARP_PROXY_MODE;
import static org.onosproject.openstacknetworking.api.Constants.ARP_TABLE;
@@ -148,6 +151,9 @@
private final InstancePortListener instancePortListener = new InternalInstancePortListener();
private final OpenstackNodeListener osNodeListener = new InternalNodeEventListener();
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
private ApplicationId appId;
private NodeId localNodeId;
@@ -175,6 +181,7 @@
instancePortService.removeListener(instancePortListener);
leadershipService.withdraw(appId.name());
configService.unregisterProperties(getClass(), false);
+ eventExecutor.shutdown();
log.info("Stopped");
}
@@ -543,7 +550,8 @@
if (ethPacket == null || ethPacket.getEtherType() != Ethernet.TYPE_ARP) {
return;
}
- processPacketIn(context, ethPacket);
+
+ eventExecutor.execute(() -> processPacketIn(context, ethPacket));
}
}
@@ -586,10 +594,14 @@
switch (event.type()) {
case OPENSTACK_SUBNET_CREATED:
case OPENSTACK_SUBNET_UPDATED:
- setFakeGatewayArpRule(event.subnet(), true, null);
+ eventExecutor.execute(() -> {
+ setFakeGatewayArpRule(event.subnet(), true, null);
+ });
break;
case OPENSTACK_SUBNET_REMOVED:
- setFakeGatewayArpRule(event.subnet(), false, null);
+ eventExecutor.execute(() -> {
+ setFakeGatewayArpRule(event.subnet(), false, null);
+ });
break;
case OPENSTACK_NETWORK_CREATED:
case OPENSTACK_NETWORK_UPDATED:
@@ -624,12 +636,16 @@
OpenstackNode osNode = event.subject();
switch (event.type()) {
case OPENSTACK_NODE_COMPLETE:
- setDefaultArpRule(osNode, true);
- setAllArpRules(osNode, true);
+ eventExecutor.execute(() -> {
+ setDefaultArpRule(osNode, true);
+ setAllArpRules(osNode, true);
+ });
break;
case OPENSTACK_NODE_INCOMPLETE:
- setDefaultArpRule(osNode, false);
- setAllArpRules(osNode, false);
+ eventExecutor.execute(() -> {
+ setDefaultArpRule(osNode, false);
+ setAllArpRules(osNode, false);
+ });
break;
default:
break;
@@ -745,20 +761,28 @@
switch (event.type()) {
case OPENSTACK_INSTANCE_PORT_DETECTED:
case OPENSTACK_INSTANCE_PORT_UPDATED:
- setArpRequestRule(event.subject(), true);
- setArpReplyRule(event.subject(), true);
+ eventExecutor.execute(() -> {
+ setArpRequestRule(event.subject(), true);
+ setArpReplyRule(event.subject(), true);
+ });
break;
case OPENSTACK_INSTANCE_PORT_VANISHED:
- setArpRequestRule(event.subject(), false);
- setArpReplyRule(event.subject(), false);
+ eventExecutor.execute(() -> {
+ setArpRequestRule(event.subject(), false);
+ setArpReplyRule(event.subject(), false);
+ });
break;
case OPENSTACK_INSTANCE_MIGRATION_STARTED:
- setArpRequestRule(event.subject(), true);
- setArpReplyRule(event.subject(), true);
+ eventExecutor.execute(() -> {
+ setArpRequestRule(event.subject(), true);
+ setArpReplyRule(event.subject(), true);
+ });
break;
case OPENSTACK_INSTANCE_MIGRATION_ENDED:
- InstancePort revisedInstPort = swapStaleLocation(event.subject());
- setArpRequestRule(revisedInstPort, false);
+ eventExecutor.execute(() -> {
+ InstancePort revisedInstPort = swapStaleLocation(event.subject());
+ setArpRequestRule(revisedInstPort, false);
+ });
break;
default:
break;