Initial support for multi kubernetes clusters for k8s nodes
Change-Id: I6ca132898f8e157e0583de38a637fdc135f21d6f
(cherry picked from commit e2a04cedde73618ef24575e70cb221e03854de1d)
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sApiConfigHandler.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sApiConfigHandler.java
index 01a5043..33415a8 100644
--- a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sApiConfigHandler.java
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sApiConfigHandler.java
@@ -25,6 +25,8 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.k8snode.api.DefaultK8sNode;
+import org.onosproject.k8snode.api.ExternalNetworkService;
+import org.onosproject.k8snode.api.HostNodesInfo;
import org.onosproject.k8snode.api.K8sApiConfig;
import org.onosproject.k8snode.api.K8sApiConfigAdminService;
import org.onosproject.k8snode.api.K8sApiConfigEvent;
@@ -44,6 +46,9 @@
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snode.api.Constants.DEFAULT_CLUSTER_NAME;
+import static org.onosproject.k8snode.api.Constants.EXTERNAL_TO_ROUTER;
+import static org.onosproject.k8snode.api.K8sApiConfig.Mode.PASSTHROUGH;
import static org.onosproject.k8snode.api.K8sNode.Type.MASTER;
import static org.onosproject.k8snode.api.K8sNode.Type.MINION;
import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
@@ -65,6 +70,9 @@
private static final String EXT_GATEWAY_IP = "external.gateway.ip";
private static final String EXT_INTF_NAME = "external.interface.name";
+ private static final String DEFAULT_GATEWAY_IP = "127.0.0.1";
+ private static final String DEFAULT_BRIDGE_IP = "127.0.0.1";
+
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@@ -80,6 +88,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected K8sNodeAdminService k8sNodeAdminService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ExternalNetworkService extNetworkService;
+
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
@@ -127,21 +138,32 @@
}
k8sClient.nodes().list().getItems().forEach(n ->
- k8sNodeAdminService.createNode(buildK8sNode(n))
+ k8sNodeAdminService.createNode(buildK8sNode(n, config))
);
}
- private K8sNode buildK8sNode(Node node) {
+ private K8sNode buildK8sNode(Node node, K8sApiConfig config) {
String hostname = node.getMetadata().getName();
IpAddress managementIp = null;
IpAddress dataIp = null;
- for (NodeAddress nodeAddress:node.getStatus().getAddresses()) {
- // we need to consider assigning managementIp and dataIp differently
- // FIXME: ExternalIp is not considered currently
- if (nodeAddress.getType().equals(INTERNAL_IP)) {
- managementIp = IpAddress.valueOf(nodeAddress.getAddress());
- dataIp = IpAddress.valueOf(nodeAddress.getAddress());
+ // pass-through mode: we use host IP as the management and data IP
+ // normal mode: we use K8S node's internal IP as the management and data IP
+ if (config.mode() == PASSTHROUGH) {
+ HostNodesInfo info = config.infos().stream().filter(h -> h.nodes()
+ .contains(hostname)).findAny().orElse(null);
+ if (info == null) {
+ log.error("None of the nodes were found in the host nodes info mapping list");
+ } else {
+ managementIp = info.hostIp();
+ dataIp = info.hostIp();
+ }
+ } else {
+ for (NodeAddress nodeAddress:node.getStatus().getAddresses()) {
+ if (nodeAddress.getType().equals(INTERNAL_IP)) {
+ managementIp = IpAddress.valueOf(nodeAddress.getAddress());
+ dataIp = IpAddress.valueOf(nodeAddress.getAddress());
+ }
}
}
@@ -162,17 +184,37 @@
Map<String, String> annots = node.getMetadata().getAnnotations();
- String extIntf = annots.get(EXT_INTF_NAME);
- String extGatewayIpStr = annots.get(EXT_GATEWAY_IP);
- String extBridgeIpStr = annots.get(EXT_BRIDGE_IP);
+ String extIntf = "";
+ String extGatewayIpStr = DEFAULT_GATEWAY_IP;
+ String extBridgeIpStr = DEFAULT_BRIDGE_IP;
+
+ if (config.mode() == PASSTHROUGH) {
+ extNetworkService.registerNetwork(config.extNetworkCidr());
+ extIntf = EXTERNAL_TO_ROUTER + "-" + config.clusterShortName();
+ IpAddress gatewayIp = extNetworkService.getGatewayIp(config.extNetworkCidr());
+ IpAddress bridgeIp = extNetworkService.allocateIp(config.extNetworkCidr());
+ if (gatewayIp != null) {
+ extGatewayIpStr = gatewayIp.toString();
+ }
+ if (bridgeIp != null) {
+ extBridgeIpStr = bridgeIp.toString();
+ }
+ } else {
+ extIntf = annots.get(EXT_INTF_NAME);
+ extGatewayIpStr = annots.get(EXT_GATEWAY_IP);
+ extBridgeIpStr = annots.get(EXT_BRIDGE_IP);
+ }
return DefaultK8sNode.builder()
+ .clusterName(DEFAULT_CLUSTER_NAME)
.hostname(hostname)
.managementIp(managementIp)
.dataIp(dataIp)
.extIntf(extIntf)
.type(nodeType)
+ .segmentId(config.segmentId())
.state(PRE_ON_BOARD)
+ .mode(config.mode())
.extBridgeIp(IpAddress.valueOf(extBridgeIpStr))
.extGatewayIp(IpAddress.valueOf(extGatewayIpStr))
.podCidr(node.getSpec().getPodCIDR())
@@ -209,6 +251,7 @@
if (checkApiServerConfig(config)) {
K8sApiConfig newConfig = config.updateState(K8sApiConfig.State.CONNECTED);
k8sApiConfigAdminService.updateApiConfig(newConfig);
+
bootstrapK8sNodes(config);
}
}
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sNodeHandler.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sNodeHandler.java
index e3ca070..7e99f47 100644
--- a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sNodeHandler.java
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sNodeHandler.java
@@ -42,7 +42,7 @@
import org.onosproject.net.behaviour.PatchDescription;
import org.onosproject.net.behaviour.TunnelDescription;
import org.onosproject.net.behaviour.TunnelEndPoints;
-import org.onosproject.net.behaviour.TunnelKeys;
+import org.onosproject.net.behaviour.TunnelKey;
import org.onosproject.net.device.DeviceAdminService;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
@@ -68,19 +68,10 @@
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.packet.TpPort.tpPort;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.k8snode.api.Constants.EXTERNAL_BRIDGE;
import static org.onosproject.k8snode.api.Constants.GENEVE;
-import static org.onosproject.k8snode.api.Constants.GENEVE_TUNNEL;
import static org.onosproject.k8snode.api.Constants.GRE;
-import static org.onosproject.k8snode.api.Constants.GRE_TUNNEL;
-import static org.onosproject.k8snode.api.Constants.INTEGRATION_BRIDGE;
-import static org.onosproject.k8snode.api.Constants.INTEGRATION_TO_EXTERNAL_BRIDGE;
-import static org.onosproject.k8snode.api.Constants.INTEGRATION_TO_LOCAL_BRIDGE;
-import static org.onosproject.k8snode.api.Constants.LOCAL_BRIDGE;
-import static org.onosproject.k8snode.api.Constants.LOCAL_TO_INTEGRATION_BRIDGE;
-import static org.onosproject.k8snode.api.Constants.PHYSICAL_EXTERNAL_BRIDGE;
import static org.onosproject.k8snode.api.Constants.VXLAN;
-import static org.onosproject.k8snode.api.Constants.VXLAN_TUNNEL;
+import static org.onosproject.k8snode.api.K8sApiConfig.Mode.NORMAL;
import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
import static org.onosproject.k8snode.api.K8sNodeState.DEVICE_CREATED;
@@ -196,13 +187,19 @@
return;
}
if (!deviceService.isAvailable(k8sNode.intgBridge())) {
- createBridge(k8sNode, INTEGRATION_BRIDGE, k8sNode.intgBridge());
+ createBridge(k8sNode, k8sNode.intgBridgeName(), k8sNode.intgBridge());
}
if (!deviceService.isAvailable(k8sNode.extBridge())) {
- createBridge(k8sNode, EXTERNAL_BRIDGE, k8sNode.extBridge());
+ createBridge(k8sNode, k8sNode.extBridgeName(), k8sNode.extBridge());
}
if (!deviceService.isAvailable(k8sNode.localBridge())) {
- createBridge(k8sNode, LOCAL_BRIDGE, k8sNode.localBridge());
+ createBridge(k8sNode, k8sNode.localBridgeName(), k8sNode.localBridge());
+ }
+
+ if (k8sNode.mode() == NORMAL) {
+ if (!deviceService.isAvailable(k8sNode.tunBridge())) {
+ createBridge(k8sNode, k8sNode.tunBridgeName(), k8sNode.tunBridge());
+ }
}
}
@@ -217,19 +214,21 @@
// create patch ports between integration and external bridges
createPatchInterfaces(k8sNode);
- if (k8sNode.dataIp() != null &&
- !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
- createVxlanTunnelInterface(k8sNode);
- }
+ if (k8sNode.mode() == NORMAL) {
+ if (k8sNode.dataIp() != null &&
+ !isIntfEnabled(k8sNode, k8sNode.vxlanPortName())) {
+ createVxlanTunnelInterface(k8sNode);
+ }
- if (k8sNode.dataIp() != null &&
- !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
- createGreTunnelInterface(k8sNode);
- }
+ if (k8sNode.dataIp() != null &&
+ !isIntfEnabled(k8sNode, k8sNode.grePortName())) {
+ createGreTunnelInterface(k8sNode);
+ }
- if (k8sNode.dataIp() != null &&
- !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
- createGeneveTunnelInterface(k8sNode);
+ if (k8sNode.dataIp() != null &&
+ !isIntfEnabled(k8sNode, k8sNode.genevePortName())) {
+ createGeneveTunnelInterface(k8sNode);
+ }
}
} catch (Exception e) {
log.error("Exception occurred because of {}", e);
@@ -324,7 +323,7 @@
* @param k8sNode kubernetes node
*/
private void createVxlanTunnelInterface(K8sNode k8sNode) {
- createTunnelInterface(k8sNode, VXLAN, VXLAN_TUNNEL);
+ createTunnelInterface(k8sNode, VXLAN, k8sNode.vxlanPortName());
}
/**
@@ -333,7 +332,7 @@
* @param k8sNode kubernetes node
*/
private void createGreTunnelInterface(K8sNode k8sNode) {
- createTunnelInterface(k8sNode, GRE, GRE_TUNNEL);
+ createTunnelInterface(k8sNode, GRE, k8sNode.grePortName());
}
/**
@@ -342,7 +341,7 @@
* @param k8sNode kubernetes node
*/
private void createGeneveTunnelInterface(K8sNode k8sNode) {
- createTunnelInterface(k8sNode, GENEVE, GENEVE_TUNNEL);
+ createTunnelInterface(k8sNode, GENEVE, k8sNode.genevePortName());
}
private void createPatchInterfaces(K8sNode k8sNode) {
@@ -355,40 +354,61 @@
// integration bridge -> external bridge
PatchDescription brIntExtPatchDesc =
DefaultPatchDescription.builder()
- .deviceId(INTEGRATION_BRIDGE)
- .ifaceName(INTEGRATION_TO_EXTERNAL_BRIDGE)
- .peer(PHYSICAL_EXTERNAL_BRIDGE)
+ .deviceId(k8sNode.intgBridgeName())
+ .ifaceName(k8sNode.intgToExtPatchPortName())
+ .peer(k8sNode.extToIntgPatchPortName())
+ .build();
+
+ // integration bridge -> tunnel bridge
+ PatchDescription brIntTunPatchDesc =
+ DefaultPatchDescription.builder()
+ .deviceId(k8sNode.intgBridgeName())
+ .ifaceName(k8sNode.intgToTunPatchPortName())
+ .peer(k8sNode.tunToIntgPatchPortName())
.build();
// external bridge -> integration bridge
PatchDescription brExtIntPatchDesc =
DefaultPatchDescription.builder()
- .deviceId(EXTERNAL_BRIDGE)
- .ifaceName(PHYSICAL_EXTERNAL_BRIDGE)
- .peer(INTEGRATION_TO_EXTERNAL_BRIDGE)
+ .deviceId(k8sNode.extBridgeName())
+ .ifaceName(k8sNode.extToIntgPatchPortName())
+ .peer(k8sNode.intgToExtPatchPortName())
.build();
// integration bridge -> local bridge
PatchDescription brIntLocalPatchDesc =
DefaultPatchDescription.builder()
- .deviceId(INTEGRATION_BRIDGE)
- .ifaceName(INTEGRATION_TO_LOCAL_BRIDGE)
- .peer(LOCAL_TO_INTEGRATION_BRIDGE)
- .build();
+ .deviceId(k8sNode.intgBridgeName())
+ .ifaceName(k8sNode.intgToLocalPatchPortName())
+ .peer(k8sNode.localToIntgPatchPortName())
+ .build();
// local bridge -> integration bridge
PatchDescription brLocalIntPatchDesc =
DefaultPatchDescription.builder()
- .deviceId(LOCAL_BRIDGE)
- .ifaceName(LOCAL_TO_INTEGRATION_BRIDGE)
- .peer(INTEGRATION_TO_LOCAL_BRIDGE)
- .build();
+ .deviceId(k8sNode.localBridgeName())
+ .ifaceName(k8sNode.localToIntgPatchPortName())
+ .peer(k8sNode.intgToLocalPatchPortName())
+ .build();
InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
- ifaceConfig.addPatchMode(INTEGRATION_TO_EXTERNAL_BRIDGE, brIntExtPatchDesc);
- ifaceConfig.addPatchMode(PHYSICAL_EXTERNAL_BRIDGE, brExtIntPatchDesc);
- ifaceConfig.addPatchMode(INTEGRATION_TO_LOCAL_BRIDGE, brIntLocalPatchDesc);
- ifaceConfig.addPatchMode(LOCAL_TO_INTEGRATION_BRIDGE, brLocalIntPatchDesc);
+ ifaceConfig.addPatchMode(k8sNode.intgToExtPatchPortName(), brIntExtPatchDesc);
+ ifaceConfig.addPatchMode(k8sNode.extToIntgPatchPortName(), brExtIntPatchDesc);
+ ifaceConfig.addPatchMode(k8sNode.intgToLocalPatchPortName(), brIntLocalPatchDesc);
+ ifaceConfig.addPatchMode(k8sNode.localToIntgPatchPortName(), brLocalIntPatchDesc);
+ ifaceConfig.addPatchMode(k8sNode.intgToTunPatchPortName(), brIntTunPatchDesc);
+
+ if (k8sNode.mode() == NORMAL) {
+ // tunnel bridge -> integration bridge
+ PatchDescription brTunIntPatchDesc =
+ DefaultPatchDescription.builder()
+ .deviceId(k8sNode.tunBridgeName())
+ .ifaceName(k8sNode.tunToIntgPatchPortName())
+ .peer(k8sNode.intgToTunPatchPortName())
+ .build();
+
+ ifaceConfig.addPatchMode(k8sNode.tunToIntgPatchPortName(), brTunIntPatchDesc);
+ }
}
/**
@@ -408,7 +428,7 @@
return;
}
- TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
+ TunnelDescription tunnelDesc = buildTunnelDesc(k8sNode, type, intfName);
InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
ifaceConfig.addTunnelMode(intfName, tunnelDesc);
@@ -420,14 +440,16 @@
* @param type network type
* @return tunnel description
*/
- private TunnelDescription buildTunnelDesc(String type, String intfName) {
+ private TunnelDescription buildTunnelDesc(K8sNode k8sNode,
+ String type, String intfName) {
+ TunnelKey<String> key = new TunnelKey<>(k8sNode.tunnelKey());
if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
TunnelDescription.Builder tdBuilder =
DefaultTunnelDescription.builder()
- .deviceId(INTEGRATION_BRIDGE)
+ .deviceId(k8sNode.tunBridgeName())
.ifaceName(intfName)
.remote(TunnelEndPoints.flowTunnelEndpoint())
- .key(TunnelKeys.flowTunnelKey());
+ .key(key);
switch (type) {
case VXLAN:
@@ -457,8 +479,8 @@
* @return true if the given interface is enabled, false otherwise
*/
private boolean isIntfEnabled(K8sNode k8sNode, String intf) {
- return deviceService.isAvailable(k8sNode.intgBridge()) &&
- deviceService.getPorts(k8sNode.intgBridge()).stream()
+ return deviceService.isAvailable(k8sNode.tunBridge()) &&
+ deviceService.getPorts(k8sNode.tunBridge()).stream()
.anyMatch(port -> Objects.equals(
port.annotations().value(PORT_NAME), intf) &&
port.isEnabled());
@@ -504,10 +526,16 @@
log.error("Exception caused during init state checking...");
}
- return k8sNode.intgBridge() != null && k8sNode.extBridge() != null &&
+ boolean result = k8sNode.intgBridge() != null && k8sNode.extBridge() != null &&
deviceService.isAvailable(k8sNode.intgBridge()) &&
deviceService.isAvailable(k8sNode.extBridge()) &&
deviceService.isAvailable(k8sNode.localBridge());
+
+ if (k8sNode.mode() == NORMAL) {
+ return result && deviceService.isAvailable(k8sNode.tunBridge());
+ } else {
+ return result;
+ }
}
private boolean isDeviceCreatedStateDone(K8sNode k8sNode) {
@@ -520,17 +548,19 @@
log.error("Exception caused during init state checking...");
}
- if (k8sNode.dataIp() != null &&
- !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
- return false;
- }
- if (k8sNode.dataIp() != null &&
- !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
- return false;
- }
- if (k8sNode.dataIp() != null &&
- !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
- return false;
+ if (k8sNode.mode() == NORMAL) {
+ if (k8sNode.dataIp() != null &&
+ !isIntfEnabled(k8sNode, k8sNode.vxlanPortName())) {
+ return false;
+ }
+ if (k8sNode.dataIp() != null &&
+ !isIntfEnabled(k8sNode, k8sNode.grePortName())) {
+ return false;
+ }
+ if (k8sNode.dataIp() != null &&
+ !isIntfEnabled(k8sNode, k8sNode.genevePortName())) {
+ return false;
+ }
}
return true;
@@ -574,13 +604,18 @@
}
// delete integration bridge from the node
- client.dropBridge(INTEGRATION_BRIDGE);
+ client.dropBridge(k8sNode.intgBridgeName());
// delete external bridge from the node
- client.dropBridge(EXTERNAL_BRIDGE);
+ client.dropBridge(k8sNode.extBridgeName());
// delete local bridge from the node
- client.dropBridge(LOCAL_BRIDGE);
+ client.dropBridge(k8sNode.localBridgeName());
+
+ if (k8sNode.mode() == NORMAL) {
+ // delete tunnel bridge from the node
+ client.dropBridge(k8sNode.tunBridgeName());
+ }
// disconnect ovsdb
client.disconnect();
@@ -711,9 +746,9 @@
Port port = event.port();
String portName = port.annotations().value(PORT_NAME);
if (k8sNode.state() == DEVICE_CREATED && (
- Objects.equals(portName, VXLAN_TUNNEL) ||
- Objects.equals(portName, GRE_TUNNEL) ||
- Objects.equals(portName, GENEVE_TUNNEL))) {
+ Objects.equals(portName, k8sNode.vxlanPortName()) ||
+ Objects.equals(portName, k8sNode.grePortName()) ||
+ Objects.equals(portName, k8sNode.genevePortName()))) {
log.info("Interface {} added or updated to {}",
portName, device.id());
bootstrapNode(k8sNode);
@@ -736,9 +771,9 @@
Port port = event.port();
String portName = port.annotations().value(PORT_NAME);
if (k8sNode.state() == COMPLETE && (
- Objects.equals(portName, VXLAN_TUNNEL) ||
- Objects.equals(portName, GRE_TUNNEL) ||
- Objects.equals(portName, GENEVE_TUNNEL))) {
+ Objects.equals(portName, k8sNode.vxlanPortName()) ||
+ Objects.equals(portName, k8sNode.grePortName()) ||
+ Objects.equals(portName, k8sNode.genevePortName()))) {
log.warn("Interface {} removed from {}",
portName, event.subject().id());
setState(k8sNode, INCOMPLETE);
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DistributedK8sApiConfigStore.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DistributedK8sApiConfigStore.java
index e5a65ab..044d9ae 100644
--- a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DistributedK8sApiConfigStore.java
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DistributedK8sApiConfigStore.java
@@ -20,7 +20,9 @@
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.k8snode.api.DefaultHostNodesInfo;
import org.onosproject.k8snode.api.DefaultK8sApiConfig;
+import org.onosproject.k8snode.api.HostNodesInfo;
import org.onosproject.k8snode.api.K8sApiConfig;
import org.onosproject.k8snode.api.K8sApiConfigEvent;
import org.onosproject.k8snode.api.K8sApiConfigStore;
@@ -72,8 +74,11 @@
.register(KryoNamespaces.API)
.register(K8sApiConfig.class)
.register(DefaultK8sApiConfig.class)
+ .register(K8sApiConfig.Mode.class)
.register(K8sApiConfig.Scheme.class)
.register(K8sApiConfig.State.class)
+ .register(HostNodesInfo.class)
+ .register(DefaultHostNodesInfo.class)
.register(Collection.class)
.build();
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DistributedK8sHostStore.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DistributedK8sHostStore.java
new file mode 100644
index 0000000..17b9fde
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DistributedK8sHostStore.java
@@ -0,0 +1,230 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.k8snode.api.DefaultK8sHost;
+import org.onosproject.k8snode.api.K8sHost;
+import org.onosproject.k8snode.api.K8sHostEvent;
+import org.onosproject.k8snode.api.K8sHostState;
+import org.onosproject.k8snode.api.K8sHostStore;
+import org.onosproject.k8snode.api.K8sHostStoreDelegate;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snode.api.K8sHostEvent.Type.K8S_HOST_COMPLETE;
+import static org.onosproject.k8snode.api.K8sHostEvent.Type.K8S_HOST_CREATED;
+import static org.onosproject.k8snode.api.K8sHostEvent.Type.K8S_HOST_INCOMPLETE;
+import static org.onosproject.k8snode.api.K8sHostEvent.Type.K8S_HOST_REMOVED;
+import static org.onosproject.k8snode.api.K8sHostEvent.Type.K8S_HOST_UPDATED;
+import static org.onosproject.k8snode.api.K8sHostEvent.Type.K8S_NODES_ADDED;
+import static org.onosproject.k8snode.api.K8sHostEvent.Type.K8S_NODES_REMOVED;
+import static org.onosproject.k8snode.api.K8sHostState.COMPLETE;
+import static org.onosproject.k8snode.api.K8sHostState.INCOMPLETE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of kubernetes host store using consistent map.
+ */
+@Component(immediate = true, service = K8sHostStore.class)
+public class DistributedK8sHostStore
+ extends AbstractStore<K8sHostEvent, K8sHostStoreDelegate>
+ implements K8sHostStore {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final String ERR_NOT_FOUND = " does not exist";
+ private static final String ERR_DUPLICATE = " already exists";
+ private static final String APP_ID = "org.onosproject.k8snode";
+
+ private static final KryoNamespace
+ SERIALIZER_K8S_HOST = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(K8sHost.class)
+ .register(DefaultK8sHost.class)
+ .register(K8sHostState.class)
+ .register(Collection.class)
+ .build();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+ private final MapEventListener<String, K8sHost> hostMapListener =
+ new K8sHostMapListener();
+ private ConsistentMap<String, K8sHost> hostStore;
+
+ @Activate
+ protected void activate() {
+ ApplicationId appId = coreService.registerApplication(APP_ID);
+ hostStore = storageService.<String, K8sHost>consistentMapBuilder()
+ .withSerializer(Serializer.using(SERIALIZER_K8S_HOST))
+ .withName("k8s-hoststore")
+ .withApplicationId(appId)
+ .build();
+ hostStore.addListener(hostMapListener);
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ hostStore.removeListener(hostMapListener);
+ eventExecutor.shutdown();
+ log.info("Stopped");
+ }
+
+ @Override
+ public void createHost(K8sHost host) {
+ hostStore.compute(host.hostIp().toString(), (hostIp, existing) -> {
+ final String error = host.hostIp().toString() + ERR_DUPLICATE;
+ checkArgument(existing == null, error);
+ return host;
+ });
+ }
+
+ @Override
+ public void updateHost(K8sHost host) {
+ hostStore.compute(host.hostIp().toString(), (hostIp, existing) -> {
+ final String error = host.hostIp().toString() + ERR_NOT_FOUND;
+ checkArgument(existing != null, error);
+ return host;
+ });
+ }
+
+ @Override
+ public K8sHost removeHost(IpAddress hostIp) {
+ Versioned<K8sHost> host = hostStore.remove(hostIp.toString());
+ if (host == null) {
+ final String error = hostIp.toString() + ERR_NOT_FOUND;
+ throw new IllegalArgumentException(error);
+ }
+ return host.value();
+ }
+
+ @Override
+ public Set<K8sHost> hosts() {
+ return ImmutableSet.copyOf(hostStore.asJavaMap().values());
+ }
+
+ @Override
+ public K8sHost host(IpAddress hostIp) {
+ return hostStore.asJavaMap().get(hostIp.toString());
+ }
+
+ private class K8sHostMapListener
+ implements MapEventListener<String, K8sHost> {
+
+ @Override
+ public void event(MapEvent<String, K8sHost> event) {
+ switch (event.type()) {
+ case INSERT:
+ log.debug("Kubernetes host created {}", event.newValue());
+ eventExecutor.execute(() ->
+ notifyDelegate(new K8sHostEvent(
+ K8S_HOST_CREATED, event.newValue().value()
+ )));
+ break;
+ case UPDATE:
+ log.debug("Kubernetes host updated {}", event.newValue());
+ eventExecutor.execute(() -> {
+ notifyDelegate(new K8sHostEvent(
+ K8S_HOST_UPDATED, event.newValue().value()
+ ));
+
+ if (event.newValue().value().state() == COMPLETE) {
+ notifyDelegate(new K8sHostEvent(
+ K8S_HOST_COMPLETE,
+ event.newValue().value()
+ ));
+ } else if (event.newValue().value().state() == INCOMPLETE) {
+ notifyDelegate(new K8sHostEvent(
+ K8S_HOST_INCOMPLETE,
+ event.newValue().value()
+ ));
+ }
+
+ K8sHost origHost = event.newValue().value();
+ Set<String> oldNodes = event.oldValue().value().nodeNames();
+ Set<String> newNodes = event.newValue().value().nodeNames();
+
+ Set<String> addedNodes = new HashSet<>(newNodes);
+ Set<String> removedNodes = new HashSet<>(oldNodes);
+
+ addedNodes.removeAll(oldNodes);
+ removedNodes.removeAll(newNodes);
+
+ if (addedNodes.size() > 0) {
+ K8sHost addedHost = DefaultK8sHost.builder()
+ .hostIp(origHost.hostIp())
+ .state(origHost.state())
+ .nodeNames(addedNodes)
+ .build();
+ notifyDelegate(new K8sHostEvent(K8S_NODES_ADDED, addedHost));
+ }
+
+ if (removedNodes.size() > 0) {
+ K8sHost removedHost = DefaultK8sHost.builder()
+ .hostIp(origHost.hostIp())
+ .state(origHost.state())
+ .nodeNames(removedNodes)
+ .build();
+ notifyDelegate(new K8sHostEvent(K8S_NODES_REMOVED, removedHost));
+ }
+ });
+ break;
+ case REMOVE:
+ log.debug("Kubernetes host removed {}", event.oldValue());
+ eventExecutor.execute(() ->
+ notifyDelegate(new K8sHostEvent(
+ K8S_HOST_REMOVED, event.oldValue().value()
+ )));
+ break;
+ default:
+ // do nothing
+ break;
+ }
+ }
+ }
+}
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DistributedK8sNodeStore.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DistributedK8sNodeStore.java
index 15d2697..8d3e926 100644
--- a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DistributedK8sNodeStore.java
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DistributedK8sNodeStore.java
@@ -19,7 +19,10 @@
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.k8snode.api.DefaultK8sExternalNetwork;
import org.onosproject.k8snode.api.DefaultK8sNode;
+import org.onosproject.k8snode.api.K8sApiConfig;
+import org.onosproject.k8snode.api.K8sExternalNetwork;
import org.onosproject.k8snode.api.K8sNode;
import org.onosproject.k8snode.api.K8sNodeEvent;
import org.onosproject.k8snode.api.K8sNodeState;
@@ -77,6 +80,9 @@
.register(DefaultK8sNode.class)
.register(K8sNode.Type.class)
.register(K8sNodeState.class)
+ .register(K8sApiConfig.Mode.class)
+ .register(K8sExternalNetwork.class)
+ .register(DefaultK8sExternalNetwork.class)
.register(Collection.class)
.build();
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/ExternalNetworkManager.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/ExternalNetworkManager.java
new file mode 100644
index 0000000..184998a
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/ExternalNetworkManager.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.impl;
+
+import org.apache.commons.net.util.SubnetUtils;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.k8snode.api.ExternalNetworkService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * External network service implementation.
+ */
+@Component(
+ immediate = true,
+ service = { ExternalNetworkService.class }
+)
+public class ExternalNetworkManager implements ExternalNetworkService {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final KryoNamespace
+ SERIALIZER_EXTERNAL_NETWORK = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .build();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+ private ConsistentMap<String, Set<String>> networkIpPool;
+
+ @Activate
+ protected void activate() {
+ ApplicationId appId = coreService.registerApplication(APP_ID);
+ networkIpPool = storageService.<String, Set<String>>consistentMapBuilder()
+ .withSerializer(Serializer.using(SERIALIZER_EXTERNAL_NETWORK))
+ .withName("external-network-ip-pool")
+ .withApplicationId(appId)
+ .build();
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ eventExecutor.shutdown();
+ log.info("Stopped");
+ }
+
+ @Override
+ public void registerNetwork(IpPrefix cidr) {
+ if (!networkIpPool.containsKey(cidr.toString())) {
+ SubnetUtils utils = new SubnetUtils(cidr.toString());
+ utils.setInclusiveHostCount(false);
+ SubnetUtils.SubnetInfo info = utils.getInfo();
+
+ Set<String> all = Arrays.stream(info.getAllAddresses())
+ .collect(Collectors.toSet());
+ all.remove(info.getNetworkAddress());
+ all.remove(info.getHighAddress());
+ all.remove(info.getLowAddress());
+ all.remove(info.getBroadcastAddress());
+
+ networkIpPool.put(cidr.toString(), all);
+ }
+ }
+
+ @Override
+ public void unregisterNetwork(IpPrefix cidr) {
+ if (!networkIpPool.containsKey(cidr.toString())) {
+ log.warn("The given network {} is not found!", cidr.toString());
+ } else {
+ networkIpPool.remove(cidr.toString());
+ }
+ }
+
+ @Override
+ public IpAddress getGatewayIp(IpPrefix cidr) {
+ SubnetUtils utils = new SubnetUtils(cidr.toString());
+ utils.setInclusiveHostCount(false);
+ SubnetUtils.SubnetInfo info = utils.getInfo();
+
+ return IpAddress.valueOf(info.getLowAddress());
+ }
+
+ @Override
+ public IpAddress allocateIp(IpPrefix cidr) {
+ if (!networkIpPool.containsKey(cidr.toString())) {
+ log.error("The given network {} is not found", cidr.toString());
+ return null;
+ } else {
+ Set<String> pool = networkIpPool.get(cidr.toString()).value();
+ String ipStr = pool.stream().findFirst().orElse(null);
+ if (ipStr == null) {
+ log.error("No IPs are found in the given network {}", cidr.toString());
+ return null;
+ }
+
+ pool.remove(ipStr);
+ networkIpPool.put(cidr.toString(), pool);
+ return IpAddress.valueOf(ipStr);
+ }
+ }
+
+ @Override
+ public void releaseIp(IpPrefix cidr, IpAddress ip) {
+ if (!networkIpPool.containsKey(cidr.toString())) {
+ log.error("The given network {} is not found", cidr.toString());
+ } else {
+ Set<String> pool = networkIpPool.get(cidr.toString()).value();
+ pool.add(ip.toString());
+ networkIpPool.put(cidr.toString(), pool);
+ }
+ }
+
+ @Override
+ public Set<String> getAllIps(IpPrefix cidr) {
+ return networkIpPool.get(cidr.toString()).value();
+ }
+}
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/K8sHostManager.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/K8sHostManager.java
new file mode 100644
index 0000000..fd26763
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/K8sHostManager.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.ListenerRegistry;
+import org.onosproject.k8snode.api.K8sHost;
+import org.onosproject.k8snode.api.K8sHostAdminService;
+import org.onosproject.k8snode.api.K8sHostEvent;
+import org.onosproject.k8snode.api.K8sHostListener;
+import org.onosproject.k8snode.api.K8sHostService;
+import org.onosproject.k8snode.api.K8sHostStore;
+import org.onosproject.k8snode.api.K8sHostStoreDelegate;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snode.api.K8sHostState.COMPLETE;
+import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
+import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Service administering the inventory of kubernetes hosts.
+ */
+@Component(
+ immediate = true,
+ service = {K8sHostService.class, K8sHostAdminService.class},
+ property = {
+ OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT
+ }
+)
+public class K8sHostManager
+ extends ListenerRegistry<K8sHostEvent, K8sHostListener>
+ implements K8sHostService, K8sHostAdminService {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final String MSG_HOST = "Kubernetes host %s %s";
+ private static final String MSG_CREATED = "created";
+ private static final String MSG_UPDATED = "updated";
+ private static final String MSG_REMOVED = "removed";
+
+ private static final String ERR_NULL_HOST = "Kubernetes host cannot be null";
+ private static final String ERR_NULL_HOST_IP = "Kubernetes host IP cannot be null";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sHostStore hostStore;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ /** OVSDB server listen port. */
+ private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+ private final K8sHostStoreDelegate delegate = new K8sHostManager.InternalHostStoreDelegate();
+
+ private ApplicationId appId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(APP_ID);
+ hostStore.setDelegate(delegate);
+
+ leadershipService.runForLeadership(appId.name());
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ hostStore.unsetDelegate(delegate);
+
+ leadershipService.withdraw(appId.name());
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ @Modified
+ protected void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+ int updatedOvsdbPort = Tools.getIntegerProperty(properties, OVSDB_PORT);
+ if (!Objects.equals(updatedOvsdbPort, ovsdbPortNum)) {
+ ovsdbPortNum = updatedOvsdbPort;
+ }
+
+ log.info("Modified");
+ }
+
+ @Override
+ public void createHost(K8sHost host) {
+ checkNotNull(host, ERR_NULL_HOST);
+
+ hostStore.createHost(host);
+
+ log.info(String.format(MSG_HOST, host.hostIp().toString(), MSG_CREATED));
+ }
+
+ @Override
+ public void updateHost(K8sHost host) {
+ checkNotNull(host, ERR_NULL_HOST);
+
+ hostStore.updateHost(host);
+
+ log.info(String.format(MSG_HOST, host.hostIp().toString(), MSG_UPDATED));
+ }
+
+ @Override
+ public K8sHost removeHost(IpAddress hostIp) {
+ checkArgument(hostIp != null, ERR_NULL_HOST_IP);
+
+ K8sHost host = hostStore.removeHost(hostIp);
+ log.info(String.format(MSG_HOST, hostIp.toString(), MSG_REMOVED));
+
+ return host;
+ }
+
+ @Override
+ public Set<K8sHost> hosts() {
+ return hostStore.hosts();
+ }
+
+ @Override
+ public Set<K8sHost> completeHosts() {
+ Set<K8sHost> hosts = hostStore.hosts().stream()
+ .filter(h -> Objects.equals(h.state(), COMPLETE))
+ .collect(Collectors.toSet());
+ return ImmutableSet.copyOf(hosts);
+ }
+
+ @Override
+ public K8sHost host(IpAddress hostIp) {
+ return hostStore.hosts().stream()
+ .filter(h -> Objects.equals(h.hostIp(), hostIp))
+ .findFirst().orElse(null);
+ }
+
+ private class InternalHostStoreDelegate implements K8sHostStoreDelegate {
+
+ @Override
+ public void notify(K8sHostEvent event) {
+ if (event != null) {
+ log.trace("send kubernetes host event {}", event);
+ process(event);
+ }
+ }
+ }
+}
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/K8sNodeManager.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/K8sNodeManager.java
index 5b3f353..bc789d3 100644
--- a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/K8sNodeManager.java
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/K8sNodeManager.java
@@ -55,6 +55,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snode.api.K8sApiConfig.Mode.NORMAL;
import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
@@ -159,6 +160,7 @@
K8sNode intNode;
K8sNode extNode;
K8sNode localNode;
+ K8sNode tunNode;
if (node.intgBridge() == null) {
String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
@@ -196,7 +198,24 @@
NOT_DUPLICATED_MSG, localNode.localBridge());
}
- nodeStore.createNode(localNode);
+ if (node.mode() == NORMAL) {
+ if (node.tunBridge() == null) {
+ String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
+ checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
+ tunNode = localNode.updateTunBridge(DeviceId.deviceId(deviceIdStr));
+ checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
+ NOT_DUPLICATED_MSG, tunNode.tunBridge());
+ } else {
+ tunNode = localNode;
+ checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
+ NOT_DUPLICATED_MSG, tunNode.tunBridge());
+ }
+
+ nodeStore.createNode(tunNode);
+ } else {
+ nodeStore.createNode(localNode);
+ }
+
log.info(String.format(MSG_NODE, extNode.hostname(), MSG_CREATED));
}
@@ -207,6 +226,7 @@
K8sNode intNode;
K8sNode extNode;
K8sNode localNode;
+ K8sNode tunNode;
K8sNode existingNode = nodeStore.node(node.hostname());
checkNotNull(existingNode, ERR_NULL_NODE);
@@ -247,7 +267,22 @@
NOT_DUPLICATED_MSG, localNode.localBridge());
}
- nodeStore.updateNode(localNode);
+ if (node.mode() == NORMAL) {
+ DeviceId existTunBridge = nodeStore.node(node.hostname()).tunBridge();
+
+ if (localNode.tunBridge() == null) {
+ tunNode = localNode.updateTunBridge(existTunBridge);
+ checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
+ NOT_DUPLICATED_MSG, tunNode.tunBridge());
+ } else {
+ tunNode = localNode;
+ checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
+ NOT_DUPLICATED_MSG, tunNode.tunBridge());
+ }
+ nodeStore.updateNode(tunNode);
+ } else {
+ nodeStore.updateNode(localNode);
+ }
log.info(String.format(MSG_NODE, extNode.hostname(), MSG_UPDATED));
}
@@ -265,6 +300,13 @@
}
@Override
+ public Set<K8sNode> nodes(String clusterName) {
+ return nodeStore.nodes().stream()
+ .filter(n -> n.clusterName().equals(clusterName))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
public Set<K8sNode> nodes(Type type) {
Set<K8sNode> nodes = nodeStore.nodes().stream()
.filter(node -> Objects.equals(node.type(), type))
@@ -330,6 +372,15 @@
return existNode.isPresent();
}
+ private boolean hasTunBridge(DeviceId deviceId, String hostname) {
+ Optional<K8sNode> existNode = nodeStore.nodes().stream()
+ .filter(n -> !n.hostname().equals(hostname))
+ .filter(n -> deviceId.equals(n.tunBridge()))
+ .findFirst();
+
+ return existNode.isPresent();
+ }
+
private class InternalNodeStoreDelegate implements K8sNodeStoreDelegate {
@Override