Initial support for multi kubernetes clusters for k8s nodes
Change-Id: I6ca132898f8e157e0583de38a637fdc135f21d6f
(cherry picked from commit e2a04cedde73618ef24575e70cb221e03854de1d)
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/K8sNodeManager.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/K8sNodeManager.java
index 5b3f353..bc789d3 100644
--- a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/K8sNodeManager.java
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/K8sNodeManager.java
@@ -55,6 +55,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snode.api.K8sApiConfig.Mode.NORMAL;
import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
@@ -159,6 +160,7 @@
K8sNode intNode;
K8sNode extNode;
K8sNode localNode;
+ K8sNode tunNode;
if (node.intgBridge() == null) {
String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
@@ -196,7 +198,24 @@
NOT_DUPLICATED_MSG, localNode.localBridge());
}
- nodeStore.createNode(localNode);
+ if (node.mode() == NORMAL) {
+ if (node.tunBridge() == null) {
+ String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
+ checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
+ tunNode = localNode.updateTunBridge(DeviceId.deviceId(deviceIdStr));
+ checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
+ NOT_DUPLICATED_MSG, tunNode.tunBridge());
+ } else {
+ tunNode = localNode;
+ checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
+ NOT_DUPLICATED_MSG, tunNode.tunBridge());
+ }
+
+ nodeStore.createNode(tunNode);
+ } else {
+ nodeStore.createNode(localNode);
+ }
+
log.info(String.format(MSG_NODE, extNode.hostname(), MSG_CREATED));
}
@@ -207,6 +226,7 @@
K8sNode intNode;
K8sNode extNode;
K8sNode localNode;
+ K8sNode tunNode;
K8sNode existingNode = nodeStore.node(node.hostname());
checkNotNull(existingNode, ERR_NULL_NODE);
@@ -247,7 +267,22 @@
NOT_DUPLICATED_MSG, localNode.localBridge());
}
- nodeStore.updateNode(localNode);
+ if (node.mode() == NORMAL) {
+ DeviceId existTunBridge = nodeStore.node(node.hostname()).tunBridge();
+
+ if (localNode.tunBridge() == null) {
+ tunNode = localNode.updateTunBridge(existTunBridge);
+ checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
+ NOT_DUPLICATED_MSG, tunNode.tunBridge());
+ } else {
+ tunNode = localNode;
+ checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
+ NOT_DUPLICATED_MSG, tunNode.tunBridge());
+ }
+ nodeStore.updateNode(tunNode);
+ } else {
+ nodeStore.updateNode(localNode);
+ }
log.info(String.format(MSG_NODE, extNode.hostname(), MSG_UPDATED));
}
@@ -265,6 +300,13 @@
}
@Override
+ public Set<K8sNode> nodes(String clusterName) {
+ return nodeStore.nodes().stream()
+ .filter(n -> n.clusterName().equals(clusterName))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
public Set<K8sNode> nodes(Type type) {
Set<K8sNode> nodes = nodeStore.nodes().stream()
.filter(node -> Objects.equals(node.type(), type))
@@ -330,6 +372,15 @@
return existNode.isPresent();
}
+ private boolean hasTunBridge(DeviceId deviceId, String hostname) {
+ Optional<K8sNode> existNode = nodeStore.nodes().stream()
+ .filter(n -> !n.hostname().equals(hostname))
+ .filter(n -> deviceId.equals(n.tunBridge()))
+ .findFirst();
+
+ return existNode.isPresent();
+ }
+
private class InternalNodeStoreDelegate implements K8sNodeStoreDelegate {
@Override