blob: d38b5cb130a390604a306c1dc2abf51f40a8ccce [file] [log] [blame]
Jian Lica20b712021-01-18 00:19:31 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import io.fabric8.kubernetes.api.model.Pod;
Jian Lica20b712021-01-18 00:19:31 +090019import org.onosproject.cluster.ClusterService;
20import org.onosproject.cluster.LeadershipService;
21import org.onosproject.cluster.NodeId;
22import org.onosproject.core.ApplicationId;
23import org.onosproject.core.CoreService;
Jian Lica20b712021-01-18 00:19:31 +090024import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
25import org.onosproject.kubevirtnetworking.api.KubevirtPodAdminService;
26import org.onosproject.kubevirtnetworking.api.KubevirtPodEvent;
27import org.onosproject.kubevirtnetworking.api.KubevirtPodListener;
28import org.onosproject.kubevirtnetworking.api.KubevirtPort;
29import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
30import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
Jian Lib6dc08f2021-03-24 15:24:18 +090031import org.onosproject.kubevirtnode.api.KubevirtNode;
Daniel Parkf3136042021-03-10 07:49:11 +090032import org.onosproject.kubevirtnode.api.KubevirtNodeService;
Jian Lica20b712021-01-18 00:19:31 +090033import org.onosproject.mastership.MastershipService;
34import org.onosproject.net.device.DeviceService;
35import org.onosproject.net.driver.DriverService;
36import org.osgi.service.component.annotations.Activate;
37import org.osgi.service.component.annotations.Component;
38import org.osgi.service.component.annotations.Deactivate;
39import org.osgi.service.component.annotations.Reference;
40import org.osgi.service.component.annotations.ReferenceCardinality;
41import org.slf4j.Logger;
42
43import java.util.Map;
44import java.util.Objects;
Jian Lid4296d02021-03-12 18:03:58 +090045import java.util.Set;
Jian Lica20b712021-01-18 00:19:31 +090046import java.util.concurrent.ExecutorService;
47
Jian Lib6dc08f2021-03-24 15:24:18 +090048import static java.lang.Thread.sleep;
Jian Lica20b712021-01-18 00:19:31 +090049import static java.util.concurrent.Executors.newSingleThreadExecutor;
50import static org.onlab.util.Tools.groupedThreads;
51import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
Jian Lid4296d02021-03-12 18:03:58 +090052import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPorts;
Jian Lica20b712021-01-18 00:19:31 +090053import static org.slf4j.LoggerFactory.getLogger;
54
55/**
56 * Associates the kubevirt container port and pod.
57 */
58@Component(immediate = true)
59public class KubevirtPodPortMapper {
60
61 private final Logger log = getLogger(getClass());
62
63 private static final String NETWORK_STATUS_KEY = "k8s.v1.cni.cncf.io/network-status";
Jian Lib6dc08f2021-03-24 15:24:18 +090064 private static final long SLEEP_MS = 2000; // we wait 2s
Jian Lica20b712021-01-18 00:19:31 +090065
66 @Reference(cardinality = ReferenceCardinality.MANDATORY)
67 protected CoreService coreService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY)
70 protected MastershipService mastershipService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY)
73 protected ClusterService clusterService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY)
76 protected LeadershipService leadershipService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
79 protected DeviceService deviceService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY)
82 protected DriverService driverService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY)
85 protected KubevirtPortAdminService kubevirtPortAdminService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY)
88 protected KubevirtNetworkAdminService kubevirtNetworkAdminService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 protected KubevirtPodAdminService kubevirtPodAdminService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY)
94 protected KubevirtApiConfigService kubevirtApiConfigService;
95
Daniel Parkf3136042021-03-10 07:49:11 +090096 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected KubevirtNodeService kubevirtNodeService;
98
Jian Lica20b712021-01-18 00:19:31 +090099 private final ExecutorService eventExecutor = newSingleThreadExecutor(
100 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
101
102 private final InternalKubevirtPodListener kubevirtPodListener =
103 new InternalKubevirtPodListener();
104
105 private ApplicationId appId;
106 private NodeId localNodeId;
107
108 @Activate
109 protected void activate() {
110 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
111 localNodeId = clusterService.getLocalNode().id();
112 leadershipService.runForLeadership(appId.name());
113 kubevirtPodAdminService.addListener(kubevirtPodListener);
114
115 log.info("Started");
116 }
117
118 @Deactivate
119 protected void deactivate() {
120 kubevirtPodAdminService.removeListener(kubevirtPodListener);
121 leadershipService.withdraw(appId.name());
122 eventExecutor.shutdown();
123
124 log.info("Stopped");
125 }
126
127 private class InternalKubevirtPodListener implements KubevirtPodListener {
128
129 private boolean isRelevantHelper() {
130 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
131 }
132
133 @Override
134 public void event(KubevirtPodEvent event) {
135 switch (event.type()) {
136 case KUBEVIRT_POD_UPDATED:
137 eventExecutor.execute(() -> processPodUpdate(event.subject()));
138 break;
Jian Lica20b712021-01-18 00:19:31 +0900139 case KUBEVIRT_POD_CREATED:
Jian Lib6dc08f2021-03-24 15:24:18 +0900140 case KUBEVIRT_POD_REMOVED:
Jian Lica20b712021-01-18 00:19:31 +0900141 default:
142 // do nothing
143 break;
144 }
145 }
146
Jian Lib6dc08f2021-03-24 15:24:18 +0900147 private void processPodUpdate(Pod pod) {
Jian Li7a581b12021-02-18 14:24:32 +0900148 if (!isRelevantHelper()) {
149 return;
150 }
151
Jian Li7a581b12021-02-18 14:24:32 +0900152 Map<String, String> annots = pod.getMetadata().getAnnotations();
153 if (annots == null) {
154 return;
155 }
156
157 if (!annots.containsKey(NETWORK_STATUS_KEY)) {
158 return;
159 }
160
Jian Lib6dc08f2021-03-24 15:24:18 +0900161 KubevirtNode node = kubevirtNodeService.node(pod.getSpec().getNodeName());
162
163 if (node == null) {
Jian Li46592cf2021-05-11 18:12:55 +0900164 log.warn("POD scheduled node name {} is not ready, " +
165 "we wait for a while...", pod.getSpec().getNodeName());
Jian Lib6dc08f2021-03-24 15:24:18 +0900166 try {
167 // we wait until all k8s nodes are available
168 sleep(SLEEP_MS);
169 } catch (InterruptedException e) {
170 e.printStackTrace();
Jian Li7a581b12021-02-18 14:24:32 +0900171 }
Jian Li7a581b12021-02-18 14:24:32 +0900172 }
173
Jian Lib6dc08f2021-03-24 15:24:18 +0900174 Set<KubevirtPort> ports =
175 getPorts(kubevirtNodeService, kubevirtNetworkAdminService.networks(), pod);
Jian Lid4296d02021-03-12 18:03:58 +0900176 if (ports.size() == 0) {
Jian Li7a581b12021-02-18 14:24:32 +0900177 return;
178 }
179
Jian Lid4296d02021-03-12 18:03:58 +0900180 ports.forEach(port -> {
Jian Lib6dc08f2021-03-24 15:24:18 +0900181 KubevirtPort existing = kubevirtPortAdminService.port(port.macAddress());
Jian Li7a581b12021-02-18 14:24:32 +0900182
Jian Lib6dc08f2021-03-24 15:24:18 +0900183 if (existing != null) {
184 if (port.deviceId() != null && existing.deviceId() == null) {
185 KubevirtPort updated = existing.updateDeviceId(port.deviceId());
186 // internal we update device ID of kubevirt port
187 kubevirtPortAdminService.updatePort(updated);
Jian Lid4296d02021-03-12 18:03:58 +0900188 }
189 }
Jian Lid4296d02021-03-12 18:03:58 +0900190 });
Jian Lica20b712021-01-18 00:19:31 +0900191 }
192 }
193}