Create/remove tenant bridge per tenant network at kubevirt app
Change-Id: I43ccf21db561edb9c51c2d8017dc4f614c5f0a4d
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
new file mode 100644
index 0000000..e538ef7
--- /dev/null
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
@@ -0,0 +1,425 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.impl;
+
+import com.google.common.collect.Lists;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.TpPort;
+import org.onlab.packet.UDP;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkEvent;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkListener;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
+import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
+import org.onosproject.kubevirtnode.api.KubevirtNodeService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.BridgeConfig;
+import org.onosproject.net.behaviour.BridgeDescription;
+import org.onosproject.net.behaviour.BridgeName;
+import org.onosproject.net.behaviour.ControllerInfo;
+import org.onosproject.net.behaviour.DefaultBridgeDescription;
+import org.onosproject.net.behaviour.DefaultPatchDescription;
+import org.onosproject.net.behaviour.InterfaceConfig;
+import org.onosproject.net.behaviour.PatchDescription;
+import org.onosproject.net.device.DeviceAdminService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import static java.lang.Thread.sleep;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_DHCP_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_SWITCHING_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_DHCP_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_FORWARDING_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_INBOUND_TABLE;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.segmentIdHex;
+import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_BRIDGE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Handles kubevirt network events.
+ */
+@Component(immediate = true)
+public class KubevirtNetworkHandler {
+ protected final Logger log = getLogger(getClass());
+ private static final String DEFAULT_OF_PROTO = "tcp";
+ private static final int DEFAULT_OFPORT = 6653;
+ private static final int DPID_BEGIN = 3;
+ private static final long SLEEP_MS = 3000; // we wait 3s for init each node
+
+ public static final String INTEGRATION_TO_TUNNEL_PREFIX = "i-to-t-";
+ public static final String TUNNEL_TO_INTEGRATION_PREFIX = "t-to-i-";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceAdminService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtApiConfigService apiConfigService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtNodeService nodeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtNetworkService networkService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtFlowRuleService flowService;
+
+ private final KubevirtNetworkListener networkListener = new InternalNetworkEventListener();
+ private final KubevirtNodeListener nodeListener = new InternalNodeEventListener();
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
+ localNodeId = clusterService.getLocalNode().id();
+ networkService.addListener(networkListener);
+ nodeService.addListener(nodeListener);
+ leadershipService.runForLeadership(appId.name());
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ networkService.removeListener(networkListener);
+ nodeService.removeListener(nodeListener);
+ leadershipService.withdraw(appId.name());
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ private void createBridge(KubevirtNode node, KubevirtNetwork network) {
+
+ Device tunBridge = deviceService.getDevice(network.tenantDeviceId(node.hostname()));
+ if (tunBridge != null) {
+ log.warn("The tunnel bridge {} already exists at node {}",
+ network.tenantBridgeName(), node.hostname());
+ setDefaultRules(node, network);
+ return;
+ }
+
+ Device device = deviceService.getDevice(node.ovsdb());
+
+ IpAddress serverIp = apiConfigService.apiConfig().ipAddress();
+ ControllerInfo controlInfo =
+ new ControllerInfo(serverIp, DEFAULT_OFPORT, DEFAULT_OF_PROTO);
+ List<ControllerInfo> controllers = Lists.newArrayList(controlInfo);
+
+ String dpid = network.tenantDeviceId(
+ node.hostname()).toString().substring(DPID_BEGIN);
+
+ BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
+ .name(network.tenantBridgeName())
+ .failMode(BridgeDescription.FailMode.SECURE)
+ .datapathId(dpid)
+ .disableInBand()
+ .controllers(controllers);
+
+ BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+ bridgeConfig.addBridge(builder.build());
+ }
+
+ private void removeBridge(KubevirtNode node, KubevirtNetwork network) {
+ Device device = deviceService.getDevice(node.ovsdb());
+
+ BridgeName bridgeName = BridgeName.bridgeName(network.tenantBridgeName());
+
+ BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+ bridgeConfig.deleteBridge(bridgeName);
+ deviceService.removeDevice(network.tenantDeviceId(node.hostname()));
+ }
+
+ private void createPatchInterface(KubevirtNode node, KubevirtNetwork network) {
+ Device device = deviceService.getDevice(node.ovsdb());
+
+ if (device == null || !device.is(InterfaceConfig.class)) {
+ log.error("Failed to create patch interface on {}", node.ovsdb());
+ return;
+ }
+
+ InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
+
+ String intToTunIntf =
+ INTEGRATION_TO_TUNNEL_PREFIX + segmentIdHex(network.segmentId());
+ String tunToIntIntf =
+ TUNNEL_TO_INTEGRATION_PREFIX + segmentIdHex(network.segmentId());
+
+ // integration bridge -> tunnel bridge
+ PatchDescription brIntTunPatchDesc =
+ DefaultPatchDescription.builder()
+ .deviceId(network.tenantBridgeName())
+ .ifaceName(intToTunIntf)
+ .peer(tunToIntIntf)
+ .build();
+
+ ifaceConfig.addPatchMode(intToTunIntf, brIntTunPatchDesc);
+
+ // tunnel bridge -> integration bridge
+ PatchDescription brTunIntPatchDesc =
+ DefaultPatchDescription.builder()
+ .deviceId(TUNNEL_BRIDGE)
+ .ifaceName(tunToIntIntf)
+ .peer(intToTunIntf)
+ .build();
+ ifaceConfig.addPatchMode(tunToIntIntf, brTunIntPatchDesc);
+ }
+
+ private void removePatchInterface(KubevirtNode node, KubevirtNetwork network) {
+ Device device = deviceService.getDevice(node.ovsdb());
+
+ if (device == null || !device.is(InterfaceConfig.class)) {
+ log.error("Failed to create patch interface on {}", node.ovsdb());
+ return;
+ }
+
+ InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
+
+ String tunToIntIntf = TUNNEL_TO_INTEGRATION_PREFIX + segmentIdHex(network.segmentId());
+
+ ifaceConfig.removePatchMode(tunToIntIntf);
+ }
+
+ private void setDefaultRules(KubevirtNode node, KubevirtNetwork network) {
+ DeviceId deviceId = network.tenantDeviceId(node.hostname());
+
+ while (!deviceService.isAvailable(deviceId)) {
+ log.warn("Device {} is not ready for installing rules", deviceId);
+
+ try {
+ sleep(SLEEP_MS);
+ } catch (InterruptedException e) {
+ log.error("Failed to check device availability", e);
+ }
+ }
+
+ flowService.connectTables(deviceId, TENANT_INBOUND_TABLE, TENANT_DHCP_TABLE);
+ flowService.connectTables(deviceId, TENANT_DHCP_TABLE, TENANT_FORWARDING_TABLE);
+
+ setDhcpRule(deviceId, true);
+ setForwardingRule(deviceId, true);
+
+ log.info("Install default flow rules for tenant bridge {}", network.tenantBridgeName());
+ }
+
+ private void setDhcpRule(DeviceId deviceId, boolean install) {
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPProtocol(IPv4.PROTOCOL_UDP)
+ .matchUdpDst(TpPort.tpPort(UDP.DHCP_SERVER_PORT))
+ .matchUdpSrc(TpPort.tpPort(UDP.DHCP_CLIENT_PORT))
+ .build();
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .punt()
+ .build();
+
+ flowService.setRule(
+ appId,
+ deviceId,
+ selector,
+ treatment,
+ PRIORITY_DHCP_RULE,
+ TENANT_DHCP_TABLE,
+ install);
+ }
+
+ public void setForwardingRule(DeviceId deviceId, boolean install) {
+ TrafficSelector selector = DefaultTrafficSelector.builder().build();
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .setOutput(PortNumber.NORMAL)
+ .build();
+
+ flowService.setRule(
+ appId,
+ deviceId,
+ selector,
+ treatment,
+ PRIORITY_SWITCHING_RULE,
+ TENANT_FORWARDING_TABLE,
+ install);
+ }
+
+ private class InternalNetworkEventListener implements KubevirtNetworkListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubevirtNetworkEvent event) {
+ switch (event.type()) {
+ case KUBEVIRT_NETWORK_CREATED:
+ eventExecutor.execute(() -> processNetworkCreation(event.subject()));
+ break;
+ case KUBEVIRT_NETWORK_REMOVED:
+ eventExecutor.execute(() -> processNetworkRemoval(event.subject()));
+ break;
+ case KUBEVIRT_NETWORK_UPDATED:
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void processNetworkCreation(KubevirtNetwork network) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ switch (network.type()) {
+ case VXLAN:
+ case GRE:
+ case GENEVE:
+ initIntegrationTunnelBridge(network);
+ break;
+ case FLAT:
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void processNetworkRemoval(KubevirtNetwork network) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ switch (network.type()) {
+ case VXLAN:
+ case GRE:
+ case GENEVE:
+ purgeIntegrationTunnelBridge(network);
+ break;
+ case FLAT:
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void initIntegrationTunnelBridge(KubevirtNetwork network) {
+ if (network.segmentId() == null) {
+ return;
+ }
+
+ nodeService.completeNodes().forEach(n -> {
+ createBridge(n, network);
+ createPatchInterface(n, network);
+ setDefaultRules(n, network);
+ });
+ }
+
+ private void purgeIntegrationTunnelBridge(KubevirtNetwork network) {
+ if (network.segmentId() == null) {
+ return;
+ }
+
+ nodeService.completeNodes().forEach(n -> {
+ removePatchInterface(n, network);
+ removeBridge(n, network);
+ });
+ }
+ }
+
+ private class InternalNodeEventListener implements KubevirtNodeListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubevirtNodeEvent event) {
+ switch (event.type()) {
+ case KUBEVIRT_NODE_COMPLETE:
+ eventExecutor.execute(() -> processNodeCompletion(event.subject()));
+ break;
+ case KUBEVIRT_NODE_INCOMPLETE:
+ case KUBEVIRT_NODE_UPDATED:
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void processNodeCompletion(KubevirtNode node) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ for (KubevirtNetwork network : networkService.networks()) {
+ switch (network.type()) {
+ case VXLAN:
+ case GRE:
+ case GENEVE:
+ if (network.segmentId() == null) {
+ continue;
+ }
+ createBridge(node, network);
+ createPatchInterface(node, network);
+ setDefaultRules(node, network);
+ break;
+ case FLAT:
+ default:
+ // do nothing
+ break;
+ }
+ }
+ }
+ }
+}