Fix: add physnet patch-ports related rules on kubevirt node completion
Change-Id: I51caa6f923583d48ce7580e9f6b2e811ace6e055
(cherry picked from commit 6d2dd5aa6c8f1b98dcbc549d7bf0d2858cf981e0)
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingPhysicalHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingPhysicalHandler.java
index 9d9757d..b94c7c4 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingPhysicalHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingPhysicalHandler.java
@@ -17,10 +17,13 @@
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
+import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
import org.onosproject.kubevirtnode.api.KubevirtNodeService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
@@ -41,6 +44,7 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
@@ -78,17 +82,21 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected KubevirtFlowRuleService flowRuleService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected KubevirtNodeService nodeService;
+ protected KubevirtNodeService kubevirtNodeService;
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler"));
private final InternalDeviceListener internalDeviceListener = new InternalDeviceListener();
+ private final InternalKubevirtNodeListener kubevirtNodeListener = new InternalKubevirtNodeListener();
private ApplicationId appId;
+ private NodeId localNodeId;
@Activate
protected void activate() {
appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
+ localNodeId = clusterService.getLocalNode().id();
deviceService.addListener(internalDeviceListener);
+ kubevirtNodeService.addListener(kubevirtNodeListener);
log.info("Started");
}
@@ -96,6 +104,7 @@
protected void deactivate() {
eventExecutor.shutdown();
deviceService.removeListener(internalDeviceListener);
+ kubevirtNodeService.removeListener(kubevirtNodeListener);
log.info("Stopped");
}
@@ -135,7 +144,7 @@
return false;
}
- KubevirtNode node = nodeService.node(event.subject().id());
+ KubevirtNode node = kubevirtNodeService.node(event.subject().id());
if (node == null) {
return false;
}
@@ -179,4 +188,35 @@
event.port().number(), false);
}
}
+
+ private class InternalKubevirtNodeListener implements KubevirtNodeListener {
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubevirtNodeEvent event) {
+ switch (event.type()) {
+ case KUBEVIRT_NODE_COMPLETE:
+ eventExecutor.execute(() -> processNodeCompletion(event.subject()));
+ break;
+ case KUBEVIRT_NODE_INCOMPLETE:
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void processNodeCompletion(KubevirtNode node) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ deviceService.getPorts(node.intgBridge()).forEach(p -> {
+ if (containsPhyPatchPort(node, p)) {
+ setIngressRuleForPatchPort(node.intgBridge(), p.number(), true);
+ }
+ });
+ }
+ }
}