blob: 91984afbb77a794b45121c88cec91f8e3f3c5cc5 [file] [log] [blame]
Jian Li543fe852021-02-04 17:25:01 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import io.fabric8.kubernetes.api.model.Pod;
19import org.onlab.packet.Ethernet;
20import org.onlab.packet.Ip4Address;
21import org.onlab.packet.IpPrefix;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
27import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
28import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
29import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
30import org.onosproject.kubevirtnetworking.api.KubevirtPodEvent;
31import org.onosproject.kubevirtnetworking.api.KubevirtPodListener;
32import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
33import org.onosproject.kubevirtnetworking.api.KubevirtPort;
34import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
35import org.onosproject.kubevirtnode.api.KubevirtNode;
36import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
37import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
38import org.onosproject.kubevirtnode.api.KubevirtNodeService;
39import org.onosproject.mastership.MastershipService;
40import org.onosproject.net.PortNumber;
41import org.onosproject.net.device.DeviceService;
42import org.onosproject.net.driver.DriverService;
43import org.onosproject.net.flow.DefaultTrafficSelector;
44import org.onosproject.net.flow.DefaultTrafficTreatment;
45import org.onosproject.net.flow.TrafficSelector;
46import org.onosproject.net.flow.TrafficTreatment;
47import org.osgi.service.component.annotations.Activate;
48import org.osgi.service.component.annotations.Component;
49import org.osgi.service.component.annotations.Deactivate;
50import org.osgi.service.component.annotations.Reference;
51import org.osgi.service.component.annotations.ReferenceCardinality;
52import org.slf4j.Logger;
53
54import java.util.Objects;
Jian Li3831f0c2021-03-12 18:03:58 +090055import java.util.Set;
Jian Li543fe852021-02-04 17:25:01 +090056import java.util.concurrent.ExecutorService;
57
58import static java.util.concurrent.Executors.newSingleThreadExecutor;
59import static org.onlab.util.Tools.groupedThreads;
60import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
61import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_TUNNEL_RULE;
62import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
63import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.FLAT;
Jian Li81b1aab2021-02-17 20:42:15 +090064import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VLAN;
Jian Li3831f0c2021-03-12 18:03:58 +090065import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPorts;
Jian Li543fe852021-02-04 17:25:01 +090066import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
67import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelToTenantPort;
68import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
69import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
70import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
71import static org.slf4j.LoggerFactory.getLogger;
72
73/**
74 * Populates switching flow rules on OVS for the tenant network (overlay).
75 */
76@Component(immediate = true)
77public class KubevirtSwitchingTenantHandler {
78 private final Logger log = getLogger(getClass());
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY)
81 protected CoreService coreService;
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 protected MastershipService mastershipService;
84 @Reference(cardinality = ReferenceCardinality.MANDATORY)
85 protected DeviceService deviceService;
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected DriverService driverService;
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected ClusterService clusterService;
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 protected LeadershipService leadershipService;
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected KubevirtFlowRuleService flowRuleService;
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected KubevirtNodeService kubevirtNodeService;
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected KubevirtNetworkService kubevirtNetworkService;
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected KubevirtPortService kubevirtPortService;
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected KubevirtPodService kubevirtPodService;
102
103 private final ExecutorService eventExecutor = newSingleThreadExecutor(
104 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
105
106 private final InternalKubevirtPodListener kubevirtPodListener =
107 new InternalKubevirtPodListener();
108 private final InternalKubevirtNodeListener kubevirtNodeListener =
109 new InternalKubevirtNodeListener();
110
111 private ApplicationId appId;
112 private NodeId localNodeId;
113
114 @Activate
115 protected void activate() {
116 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
117 localNodeId = clusterService.getLocalNode().id();
118 leadershipService.runForLeadership(appId.name());
119 kubevirtPodService.addListener(kubevirtPodListener);
120 kubevirtNodeService.addListener(kubevirtNodeListener);
121
122 log.info("Started");
123 }
124
125 @Deactivate
126 protected void deactivate() {
127 kubevirtPodService.removeListener(kubevirtPodListener);
128 kubevirtNodeService.removeListener(kubevirtNodeListener);
129 leadershipService.withdraw(appId.name());
130 eventExecutor.shutdown();
131
132 log.info("Stopped");
133 }
134
Jian Li3831f0c2021-03-12 18:03:58 +0900135 private Set<KubevirtPort> getPortByPod(Pod pod) {
136 return getPorts(kubevirtNetworkService.networks(), pod);
Jian Li543fe852021-02-04 17:25:01 +0900137 }
138
139 private void setIngressRules(Pod pod, boolean install) {
Jian Li3831f0c2021-03-12 18:03:58 +0900140 Set<KubevirtPort> ports = getPortByPod(pod);
Jian Li543fe852021-02-04 17:25:01 +0900141
Jian Li3831f0c2021-03-12 18:03:58 +0900142 if (ports.size() == 0) {
Jian Li543fe852021-02-04 17:25:01 +0900143 return;
144 }
145
Jian Li3831f0c2021-03-12 18:03:58 +0900146 for (KubevirtPort port : ports) {
147 if (port.ipAddress() == null) {
148 return;
149 }
150
151 KubevirtNetwork network = kubevirtNetworkService.network(port.networkId());
152
153 if (network == null) {
154 return;
155 }
156
157 if (network.type() == FLAT || network.type() == VLAN) {
158 return;
159 }
160
161 if (network.segmentId() == null) {
162 return;
163 }
164
165 KubevirtNode localNode = kubevirtNodeService.node(pod.getSpec().getNodeName());
166 if (localNode == null || localNode.type() == MASTER) {
167 return;
168 }
169
170 PortNumber patchPortNumber = tunnelToTenantPort(localNode, network);
171 if (patchPortNumber == null) {
172 return;
173 }
174
175 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
176 .matchTunnelId(Long.parseLong(network.segmentId()));
177
178 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
179 .setOutput(patchPortNumber);
180
181 flowRuleService.setRule(
182 appId,
183 localNode.tunBridge(),
184 sBuilder.build(),
185 tBuilder.build(),
186 PRIORITY_TUNNEL_RULE,
187 TUNNEL_DEFAULT_TABLE,
188 install);
189
190 log.debug("Install ingress rules for instance {}, segment ID {}",
191 port.ipAddress(), network.segmentId());
Jian Li543fe852021-02-04 17:25:01 +0900192 }
Jian Li543fe852021-02-04 17:25:01 +0900193 }
194
195 private void setEgressRules(Pod pod, boolean install) {
196 KubevirtNode localNode = kubevirtNodeService.node(pod.getSpec().getNodeName());
197
198 if (localNode == null) {
199 return;
200 }
201
202 if (localNode.type() == MASTER) {
203 return;
204 }
205
Jian Li3831f0c2021-03-12 18:03:58 +0900206 Set<KubevirtPort> ports = getPortByPod(pod);
Jian Li543fe852021-02-04 17:25:01 +0900207
Jian Li3831f0c2021-03-12 18:03:58 +0900208 if (ports.size() == 0) {
Jian Li543fe852021-02-04 17:25:01 +0900209 return;
210 }
211
Jian Li3831f0c2021-03-12 18:03:58 +0900212 for (KubevirtPort port : ports) {
213 if (port.ipAddress() == null) {
Jian Li543fe852021-02-04 17:25:01 +0900214 return;
215 }
216
Jian Li3831f0c2021-03-12 18:03:58 +0900217 KubevirtNetwork network = kubevirtNetworkService.network(port.networkId());
218
219 if (network == null) {
Jian Li543fe852021-02-04 17:25:01 +0900220 return;
221 }
222
Jian Li3831f0c2021-03-12 18:03:58 +0900223 if (network.type() == FLAT || network.type() == VLAN) {
224 return;
225 }
Jian Li543fe852021-02-04 17:25:01 +0900226
Jian Li3831f0c2021-03-12 18:03:58 +0900227 if (network.segmentId() == null) {
228 return;
229 }
Jian Li543fe852021-02-04 17:25:01 +0900230
Jian Li3831f0c2021-03-12 18:03:58 +0900231 for (KubevirtNode remoteNode : kubevirtNodeService.completeNodes(WORKER)) {
232 if (remoteNode.hostname().equals(localNode.hostname())) {
233 continue;
234 }
Jian Li543fe852021-02-04 17:25:01 +0900235
Jian Li3831f0c2021-03-12 18:03:58 +0900236 PortNumber patchPortNumber = tunnelToTenantPort(remoteNode, network);
237 if (patchPortNumber == null) {
238 return;
239 }
Jian Li543fe852021-02-04 17:25:01 +0900240
Jian Li3831f0c2021-03-12 18:03:58 +0900241 PortNumber tunnelPortNumber = tunnelPort(remoteNode, network);
242 if (tunnelPortNumber == null) {
243 return;
244 }
245
246 TrafficSelector.Builder sIpBuilder = DefaultTrafficSelector.builder()
247 .matchInPort(patchPortNumber)
248 .matchEthType(Ethernet.TYPE_IPV4)
249 .matchIPDst(IpPrefix.valueOf(port.ipAddress(), 32));
250
251 TrafficSelector.Builder sArpBuilder = DefaultTrafficSelector.builder()
252 .matchInPort(patchPortNumber)
253 .matchEthType(Ethernet.TYPE_ARP)
254 .matchArpTpa(Ip4Address.valueOf(port.ipAddress().toString()));
255
256 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
257 .setTunnelId(Long.parseLong(network.segmentId()))
258 .extension(buildExtension(
259 deviceService,
260 remoteNode.tunBridge(),
261 localNode.dataIp().getIp4Address()),
262 remoteNode.tunBridge())
263 .setOutput(tunnelPortNumber);
264
265 flowRuleService.setRule(
266 appId,
267 remoteNode.tunBridge(),
268 sIpBuilder.build(),
269 tBuilder.build(),
270 PRIORITY_TUNNEL_RULE,
271 TUNNEL_DEFAULT_TABLE,
272 install);
273
274 flowRuleService.setRule(
275 appId,
276 remoteNode.tunBridge(),
277 sArpBuilder.build(),
278 tBuilder.build(),
279 PRIORITY_TUNNEL_RULE,
280 TUNNEL_DEFAULT_TABLE,
281 install);
282 }
283
284 log.debug("Install egress rules for instance {}, segment ID {}",
285 port.ipAddress(), network.segmentId());
Jian Li543fe852021-02-04 17:25:01 +0900286 }
Jian Li543fe852021-02-04 17:25:01 +0900287 }
288
289 private class InternalKubevirtNodeListener implements KubevirtNodeListener {
290
291 private boolean isRelevantHelper() {
292 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
293 }
294
295 @Override
296 public void event(KubevirtNodeEvent event) {
297
298 switch (event.type()) {
299 case KUBEVIRT_NODE_COMPLETE:
300 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
301 break;
302 case KUBEVIRT_NODE_INCOMPLETE:
303 default:
304 // do nothing
305 break;
306 }
307 }
308
309 private void processNodeCompletion(KubevirtNode node) {
310 if (!isRelevantHelper()) {
311 return;
312 }
313
314 kubevirtPodService.pods().stream()
315 .filter(pod -> node.hostname().equals(pod.getSpec().getNodeName()))
316 .forEach(pod -> {
317 setIngressRules(pod, true);
318 setEgressRules(pod, true);
319 });
320 }
321 }
322
323 private class InternalKubevirtPodListener implements KubevirtPodListener {
324
325 private boolean isRelevantHelper() {
326 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
327 }
328
329 @Override
330 public void event(KubevirtPodEvent event) {
331
332 switch (event.type()) {
333 case KUBEVIRT_POD_UPDATED:
334 eventExecutor.execute(() -> processPodUpdate(event.subject()));
335 break;
336 case KUBEVIRT_POD_REMOVED:
337 eventExecutor.execute(() -> processPodRemoval(event.subject()));
338 break;
339 default:
340 // do nothing
341 break;
342 }
343 }
344
345 private void processPodUpdate(Pod pod) {
346 if (!isRelevantHelper()) {
347 return;
348 }
349
350 setIngressRules(pod, true);
351 setEgressRules(pod, true);
352 }
353
354 private void processPodRemoval(Pod pod) {
355 if (!isRelevantHelper()) {
356 return;
357 }
358
359 setIngressRules(pod, false);
360 setEgressRules(pod, false);
361 }
362 }
363}