blob: 5da9624c1d62e82c719531288121186acf712e05 [file] [log] [blame]
/*
* Copyright 2020-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.k8snode.impl;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.k8snode.api.K8sHost;
import org.onosproject.k8snode.api.K8sHostAdminService;
import org.onosproject.k8snode.api.K8sHostEvent;
import org.onosproject.k8snode.api.K8sHostHandler;
import org.onosproject.k8snode.api.K8sHostListener;
import org.onosproject.k8snode.api.K8sHostState;
import org.onosproject.k8snode.api.K8sNode;
import org.onosproject.k8snode.api.K8sNodeAdminService;
import org.onosproject.k8snode.api.K8sTunnelBridge;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.behaviour.BridgeConfig;
import org.onosproject.net.behaviour.BridgeDescription;
import org.onosproject.net.behaviour.ControllerInfo;
import org.onosproject.net.behaviour.DefaultBridgeDescription;
import org.onosproject.net.behaviour.DefaultPatchDescription;
import org.onosproject.net.behaviour.DefaultTunnelDescription;
import org.onosproject.net.behaviour.InterfaceConfig;
import org.onosproject.net.behaviour.PatchDescription;
import org.onosproject.net.behaviour.TunnelDescription;
import org.onosproject.net.behaviour.TunnelEndPoints;
import org.onosproject.net.behaviour.TunnelKey;
import org.onosproject.net.device.DeviceAdminService;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.ovsdb.controller.OvsdbClientService;
import org.onosproject.ovsdb.controller.OvsdbController;
import org.onosproject.ovsdb.controller.OvsdbNodeId;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.packet.TpPort.tpPort;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.k8snode.api.Constants.GENEVE;
import static org.onosproject.k8snode.api.Constants.GRE;
import static org.onosproject.k8snode.api.Constants.VXLAN;
import static org.onosproject.k8snode.api.K8sHostState.COMPLETE;
import static org.onosproject.k8snode.api.K8sHostState.DEVICE_CREATED;
import static org.onosproject.k8snode.api.K8sHostState.INCOMPLETE;
import static org.onosproject.k8snode.api.K8sHostState.INIT;
import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
import static org.onosproject.net.AnnotationKeys.PORT_NAME;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Service bootstraps kubernetes host.
*/
@Component(immediate = true)
public class DefaultK8sHostHandler implements K8sHostHandler {
private final Logger log = getLogger(getClass());
private static final String DEFAULT_OF_PROTO = "tcp";
private static final int DEFAULT_OFPORT = 6653;
private static final int DPID_BEGIN = 3;
private static final long SLEEP_MS = 3000; // we wait 3s
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceAdminService deviceAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OvsdbController ovsdbController;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected K8sHostAdminService k8sHostAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected K8sNodeAdminService k8sNodeAdminService;
private int ovsdbPortNum = 6640;
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
private final DeviceListener ovsdbListener = new InternalOvsdbListener();
private final DeviceListener bridgeListener = new InternalBridgeListener();
private final K8sHostListener k8sHostListener = new InternalK8sHostListener();
private ApplicationId appId;
private NodeId localNode;
@Activate
protected void activate() {
appId = coreService.getAppId(APP_ID);
localNode = clusterService.getLocalNode().id();
leadershipService.runForLeadership(appId.name());
deviceService.addListener(ovsdbListener);
deviceService.addListener(bridgeListener);
k8sHostAdminService.addListener(k8sHostListener);
log.info("Started");
}
@Deactivate
protected void deactivate() {
k8sHostAdminService.removeListener(k8sHostListener);
deviceService.removeListener(bridgeListener);
deviceService.removeListener(ovsdbListener);
leadershipService.withdraw(appId.name());
eventExecutor.shutdown();
log.info("Stopped");
}
@Override
public void processInitState(K8sHost k8sHost) {
if (!isOvsdbConnected(k8sHost, ovsdbPortNum, ovsdbController, deviceService)) {
ovsdbController.connect(k8sHost.hostIp(), tpPort(ovsdbPortNum));
return;
}
for (K8sTunnelBridge tunBridge : k8sHost.tunBridges()) {
if (!deviceService.isAvailable(tunBridge.deviceId())) {
createBridge(k8sHost.ovsdb(), tunBridge);
}
}
}
@Override
public void processDeviceCreatedState(K8sHost k8sHost) {
try {
if (!isOvsdbConnected(k8sHost, ovsdbPortNum, ovsdbController, deviceService)) {
ovsdbController.connect(k8sHost.hostIp(), tpPort(ovsdbPortNum));
return;
}
// create patch ports into tunnel bridge face to integration bridge
for (K8sTunnelBridge bridge : k8sHost.tunBridges()) {
for (String node : k8sHost.nodeNames()) {
K8sNode k8sNode = k8sNodeAdminService.node(node);
if (k8sNode.segmentId() == bridge.tunnelId()) {
createPatchInterfaces(k8sHost.ovsdb(), bridge, k8sNode);
}
}
}
// create tunnel ports
for (K8sTunnelBridge bridge : k8sHost.tunBridges()) {
if (!isTunPortEnabled(bridge, bridge.vxlanPortName())) {
createVxlanTunnelInterface(k8sHost.ovsdb(), bridge);
}
if (!isTunPortEnabled(bridge, bridge.grePortName())) {
createGreTunnelInterface(k8sHost.ovsdb(), bridge);
}
if (!isTunPortEnabled(bridge, bridge.genevePortName())) {
createGeneveTunnelInterface(k8sHost.ovsdb(), bridge);
}
}
} catch (Exception e) {
log.error("Exception occurred because of {}", e);
}
}
@Override
public void processCompleteState(K8sHost k8sHost) {
// do something if needed
}
@Override
public void processIncompleteState(K8sHost k8sHost) {
// do something if needed
}
private void createBridge(DeviceId ovsdb, K8sTunnelBridge bridge) {
Device device = deviceService.getDevice(ovsdb);
List<ControllerInfo> controllers = clusterService.getNodes().stream()
.map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
.collect(Collectors.toList());
String dpid = bridge.dpid().substring(DPID_BEGIN);
BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
.name(bridge.name())
.failMode(BridgeDescription.FailMode.SECURE)
.datapathId(dpid)
.disableInBand()
.controllers(controllers);
BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
bridgeConfig.addBridge(builder.build());
}
private void createPatchInterfaces(DeviceId ovsdb, K8sTunnelBridge bridge, K8sNode k8sNode) {
Device device = deviceService.getDevice(ovsdb);
if (device == null || !device.is(InterfaceConfig.class)) {
log.error("Failed to create patch interface on {}", ovsdb);
return;
}
InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
// tunnel bridge -> integration bridge
PatchDescription brTunIntPatchDesc =
DefaultPatchDescription.builder()
.deviceId(bridge.name())
.ifaceName(k8sNode.tunToIntgPatchPortName())
.peer(k8sNode.intgToTunPatchPortName())
.build();
ifaceConfig.addPatchMode(k8sNode.tunToIntgPatchPortName(), brTunIntPatchDesc);
}
private void createVxlanTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
createTunnelInterface(ovsdb, bridge, VXLAN, bridge.vxlanPortName());
}
private void createGreTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
createTunnelInterface(ovsdb, bridge, GRE, bridge.grePortName());
}
private void createGeneveTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
createTunnelInterface(ovsdb, bridge, GENEVE, bridge.genevePortName());
}
private void createTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge,
String type, String intfName) {
if (isTunPortEnabled(bridge, intfName)) {
return;
}
Device device = deviceService.getDevice(ovsdb);
if (device == null || !device.is(InterfaceConfig.class)) {
log.error("Failed to create tunnel interface on {}", ovsdb);
return;
}
TunnelDescription tunnelDesc = buildTunnelDesc(bridge, type, intfName);
InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
ifaceConfig.addTunnelMode(intfName, tunnelDesc);
}
private TunnelDescription buildTunnelDesc(K8sTunnelBridge bridge, String type, String intfName) {
TunnelKey<String> key = new TunnelKey<>(String.valueOf(bridge.tunnelId()));
if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
TunnelDescription.Builder tdBuilder =
DefaultTunnelDescription.builder()
.deviceId(bridge.name())
.ifaceName(intfName)
.remote(TunnelEndPoints.flowTunnelEndpoint())
.key(key);
switch (type) {
case VXLAN:
tdBuilder.type(TunnelDescription.Type.VXLAN);
break;
case GRE:
tdBuilder.type(TunnelDescription.Type.GRE);
break;
case GENEVE:
tdBuilder.type(TunnelDescription.Type.GENEVE);
break;
default:
return null;
}
return tdBuilder.build();
}
return null;
}
private boolean isOvsdbConnected(K8sHost host, int ovsdbPort,
OvsdbController ovsdbController,
DeviceService deviceService) {
OvsdbClientService client = getOvsdbClient(host, ovsdbPort, ovsdbController);
return deviceService.isAvailable(host.ovsdb()) &&
client != null &&
client.isConnected();
}
private OvsdbClientService getOvsdbClient(K8sHost host, int ovsdbPort,
OvsdbController ovsdbController) {
OvsdbNodeId ovsdb = new OvsdbNodeId(host.hostIp(), ovsdbPort);
return ovsdbController.getOvsdbClient(ovsdb);
}
private boolean isCurrentStateDone(K8sHost k8sHost) {
switch (k8sHost.state()) {
case INIT:
return isInitStateDone(k8sHost);
case DEVICE_CREATED:
return isDeviceCreatedStateDone(k8sHost);
case COMPLETE:
case INCOMPLETE:
return false;
default:
return true;
}
}
private boolean isInitStateDone(K8sHost k8sHost) {
if (!isOvsdbConnected(k8sHost, ovsdbPortNum,
ovsdbController, deviceService)) {
return false;
}
try {
// we need to wait a while, in case interface and bridge
// creation requires some time
sleep(SLEEP_MS);
} catch (InterruptedException e) {
log.error("Exception caused during init state checking...");
}
for (K8sTunnelBridge tunBridge : k8sHost.tunBridges()) {
if (!deviceService.isAvailable(tunBridge.deviceId())) {
return false;
}
}
return true;
}
private boolean isDeviceCreatedStateDone(K8sHost k8sHost) {
try {
// we need to wait a while, in case interface and bridge
// creation requires some time
sleep(SLEEP_MS);
} catch (InterruptedException e) {
log.error("Exception caused during init state checking...");
}
for (K8sTunnelBridge bridge: k8sHost.tunBridges()) {
if (!isTunPortEnabled(bridge, bridge.vxlanPortName())) {
return false;
}
if (!isTunPortEnabled(bridge, bridge.grePortName())) {
return false;
}
if (!isTunPortEnabled(bridge, bridge.genevePortName())) {
return false;
}
}
return true;
}
private boolean isTunPortEnabled(K8sTunnelBridge tunBridge, String intf) {
return deviceService.isAvailable(tunBridge.deviceId()) &&
deviceService.getPorts(tunBridge.deviceId()).stream()
.anyMatch(port -> Objects.equals(
port.annotations().value(PORT_NAME), intf) &&
port.isEnabled());
}
/**
* Configures the kubernetes host with new state.
*
* @param k8sHost kubernetes host
* @param newState a new state
*/
private void setState(K8sHost k8sHost, K8sHostState newState) {
if (k8sHost.state() == newState) {
return;
}
K8sHost updated = k8sHost.updateState(newState);
k8sHostAdminService.updateHost(updated);
log.info("Changed {} state: {}", k8sHost.hostIp(), newState);
}
/**
* Bootstraps a new kubernetes host.
*
* @param k8sHost kubernetes host
*/
private void bootstrapHost(K8sHost k8sHost) {
if (isCurrentStateDone(k8sHost)) {
setState(k8sHost, k8sHost.state().nextState());
} else {
log.trace("Processing {} state for {}", k8sHost.state(),
k8sHost.hostIp());
k8sHost.state().process(this, k8sHost);
}
}
private class InternalOvsdbListener implements DeviceListener {
@Override
public boolean isRelevant(DeviceEvent event) {
return event.subject().type() == Device.Type.CONTROLLER;
}
private boolean isRelevantHelper() {
return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
}
@Override
public void event(DeviceEvent event) {
Device device = event.subject();
switch (event.type()) {
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_ADDED:
eventExecutor.execute(() -> {
if (!isRelevantHelper()) {
return;
}
K8sHost k8sHost = k8sHostAdminService.host(device.id());
if (k8sHost == null) {
return;
}
if (deviceService.isAvailable(device.id())) {
log.debug("OVSDB {} detected", device.id());
bootstrapHost(k8sHost);
}
});
break;
case PORT_ADDED:
case PORT_REMOVED:
case DEVICE_REMOVED:
default:
// do nothing
break;
}
}
}
private class InternalBridgeListener implements DeviceListener {
@Override
public boolean isRelevant(DeviceEvent event) {
return event.subject().type() == Device.Type.SWITCH;
}
private boolean isRelevantHelper() {
return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
}
@Override
public void event(DeviceEvent event) {
Device device = event.subject();
switch (event.type()) {
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_ADDED:
eventExecutor.execute(() -> {
if (!isRelevantHelper()) {
return;
}
K8sHost k8sHost = k8sHostAdminService.hostByTunBridge(device.id());
if (k8sHost == null) {
return;
}
if (deviceService.isAvailable(device.id())) {
log.debug("Tunnel bridge created on {}",
k8sHost.hostIp());
log.debug("OVSDB {} detected", device.id());
bootstrapHost(k8sHost);
} else if (k8sHost.state() == COMPLETE) {
log.info("Device {} disconnected", device.id());
setState(k8sHost, INCOMPLETE);
}
if (k8sHost.state() == INCOMPLETE ||
k8sHost.state() == DEVICE_CREATED) {
log.info("Device {} is reconnected", device.id());
k8sHostAdminService.updateHost(
k8sHost.updateState(INIT));
}
});
break;
case PORT_UPDATED:
case PORT_ADDED:
eventExecutor.execute(() -> {
if (!isRelevantHelper()) {
return;
}
K8sHost k8sHost = k8sHostAdminService.hostByTunBridge(device.id());
if (k8sHost == null) {
return;
}
Port port = event.port();
String portName = port.annotations().value(PORT_NAME);
if (k8sHost.state() == DEVICE_CREATED) {
K8sTunnelBridge tunBridge = k8sHost.tunBridges().stream().filter(
br -> br.deviceId().equals(device.id())
).findAny().orElse(null);
if (tunBridge != null) {
if (Objects.equals(portName, tunBridge.vxlanPortName()) ||
Objects.equals(portName, tunBridge.grePortName()) ||
Objects.equals(portName, tunBridge.genevePortName())) {
log.info("Interface {} added or updated to {}",
portName, device.id());
bootstrapHost(k8sHost);
}
}
}
});
break;
case PORT_REMOVED:
eventExecutor.execute(() -> {
if (!isRelevantHelper()) {
return;
}
K8sHost k8sHost = k8sHostAdminService.hostByTunBridge(device.id());
if (k8sHost == null) {
return;
}
Port port = event.port();
String portName = port.annotations().value(PORT_NAME);
if (k8sHost.state() == COMPLETE) {
K8sTunnelBridge tunBridge = k8sHost.tunBridges().stream().filter(
br -> br.deviceId().equals(device.id())
).findAny().orElse(null);
if (tunBridge != null) {
if (Objects.equals(portName, tunBridge.vxlanPortName()) ||
Objects.equals(portName, tunBridge.grePortName()) ||
Objects.equals(portName, tunBridge.genevePortName())) {
log.warn("Interface {} removed from {}",
portName, event.subject().id());
setState(k8sHost, INCOMPLETE);
}
}
}
});
break;
case DEVICE_REMOVED:
default:
// do nothing
break;
}
}
}
private class InternalK8sHostListener implements K8sHostListener {
private boolean isRelevantHelper() {
return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
}
@Override
public void event(K8sHostEvent event) {
switch (event.type()) {
case K8S_HOST_CREATED:
case K8S_HOST_UPDATED:
eventExecutor.execute(() -> {
if (!isRelevantHelper()) {
return;
}
bootstrapHost(event.subject());
});
break;
case K8S_HOST_REMOVED:
case K8S_HOST_INCOMPLETE:
default:
break;
}
}
}
}