Support Kubernetes LoadBalancer service type
Change-Id: Ife5a75a68a5b83dbf51787e686412b9318c27aa0
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sNodePortHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sNodePortHandler.java
index c87dfe7..387494a 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sNodePortHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sNodePortHandler.java
@@ -59,6 +59,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.k8snetworking.api.Constants.B_CLASS;
@@ -87,12 +88,15 @@
private final Logger log = getLogger(getClass());
private static final String NODE_PORT_TYPE = "NodePort";
+ private static final String LOAD_BALANCER_TYPE = "LoadBalancer";
private static final String TCP = "TCP";
private static final String UDP = "UDP";
private static final int HOST_CIDR = 32;
private static final String SERVICE_CIDR = "serviceCidr";
private static final String B_CLASS_SUFFIX = "0.0/16";
+ private static final long SLEEP_MS = 3000;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@@ -364,10 +368,22 @@
return;
}
- if (NODE_PORT_TYPE.equals(service.getSpec().getType())) {
- k8sNodeService.completeNodes().forEach(n ->
- processNodePortEvent(n, service, true)
- );
+ if (NODE_PORT_TYPE.equals(service.getSpec().getType()) ||
+ LOAD_BALANCER_TYPE.equals(service.getSpec().getType())) {
+ k8sNodeService.completeNodes().forEach(n -> {
+ // we need to wait, until we resolve the valid MAC address of the node
+ while (k8sNodeService.node(n.hostname()).nodeMac() == null) {
+ log.warn("Node {} MAC address is not resolved, " +
+ "wait until resolving it", n.hostname());
+ try {
+ sleep(SLEEP_MS);
+ } catch (InterruptedException e) {
+ log.error("Exception caused by", e);
+ }
+ }
+ K8sNode updatedNode = k8sNodeService.node(n.hostname());
+ processNodePortEvent(updatedNode, service, true);
+ });
}
}
}
@@ -395,12 +411,26 @@
return;
}
- k8sServiceService.services().stream()
- .filter(s -> NODE_PORT_TYPE.equals(s.getSpec().getType()))
- .forEach(s -> processNodePortEvent(k8sNode, s, true));
+ // we need to wait, until we resolve the valid MAC address of the node
+ while (k8sNodeService.node(k8sNode.hostname()).nodeMac() == null) {
+ log.warn("Node {} MAC address is not resolved, " +
+ "wait until resolving it", k8sNode.hostname());
+ try {
+ sleep(SLEEP_MS);
+ } catch (InterruptedException e) {
+ log.error("Exception caused by", e);
+ }
+ }
- setIntgToExtRules(k8sNode, getServiceCidr(), true);
- setTunToIntgRules(k8sNode, true);
+ K8sNode updatedNode = k8sNodeService.node(k8sNode.hostname());
+
+ k8sServiceService.services().stream()
+ .filter(s -> NODE_PORT_TYPE.equals(s.getSpec().getType()) ||
+ LOAD_BALANCER_TYPE.equals(s.getSpec().getType()))
+ .forEach(s -> processNodePortEvent(updatedNode, s, true));
+
+ setIntgToExtRules(updatedNode, getServiceCidr(), true);
+ setTunToIntgRules(updatedNode, true);
}
}
}