blob: 3d0045ddc63d535516b1d53a0faf185a5db7190b [file] [log] [blame]
Jian Li858ccd72021-02-04 17:25:01 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
Jian Li858ccd72021-02-04 17:25:01 +090018import org.onlab.packet.Ethernet;
19import org.onlab.packet.Ip4Address;
20import org.onlab.packet.IpPrefix;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.LeadershipService;
23import org.onosproject.cluster.NodeId;
24import org.onosproject.core.ApplicationId;
25import org.onosproject.core.CoreService;
26import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
27import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
Jian Li567b25c2021-05-27 15:17:59 +090028import org.onosproject.kubevirtnetworking.api.KubevirtNetworkEvent;
29import org.onosproject.kubevirtnetworking.api.KubevirtNetworkListener;
Jian Li858ccd72021-02-04 17:25:01 +090030import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
Jian Li858ccd72021-02-04 17:25:01 +090031import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
32import org.onosproject.kubevirtnetworking.api.KubevirtPort;
Jian Lib6dc08f2021-03-24 15:24:18 +090033import org.onosproject.kubevirtnetworking.api.KubevirtPortEvent;
34import org.onosproject.kubevirtnetworking.api.KubevirtPortListener;
Jian Li858ccd72021-02-04 17:25:01 +090035import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
36import org.onosproject.kubevirtnode.api.KubevirtNode;
37import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
38import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
39import org.onosproject.kubevirtnode.api.KubevirtNodeService;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.PortNumber;
42import org.onosproject.net.device.DeviceService;
43import org.onosproject.net.driver.DriverService;
44import org.onosproject.net.flow.DefaultTrafficSelector;
45import org.onosproject.net.flow.DefaultTrafficTreatment;
46import org.onosproject.net.flow.TrafficSelector;
47import org.onosproject.net.flow.TrafficTreatment;
48import org.osgi.service.component.annotations.Activate;
49import org.osgi.service.component.annotations.Component;
50import org.osgi.service.component.annotations.Deactivate;
51import org.osgi.service.component.annotations.Reference;
52import org.osgi.service.component.annotations.ReferenceCardinality;
53import org.slf4j.Logger;
54
55import java.util.Objects;
56import java.util.concurrent.ExecutorService;
57
58import static java.util.concurrent.Executors.newSingleThreadExecutor;
59import static org.onlab.util.Tools.groupedThreads;
60import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
61import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_TUNNEL_RULE;
62import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
63import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.FLAT;
Jian Li2ce718e2021-02-17 20:42:15 +090064import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VLAN;
Jian Li858ccd72021-02-04 17:25:01 +090065import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
66import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelToTenantPort;
Jian Li0c656f02021-06-07 13:32:39 +090067import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.waitFor;
Jian Li858ccd72021-02-04 17:25:01 +090068import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
69import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
70import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
71import static org.slf4j.LoggerFactory.getLogger;
72
73/**
74 * Populates switching flow rules on OVS for the tenant network (overlay).
75 */
76@Component(immediate = true)
77public class KubevirtSwitchingTenantHandler {
78 private final Logger log = getLogger(getClass());
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY)
81 protected CoreService coreService;
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 protected MastershipService mastershipService;
84 @Reference(cardinality = ReferenceCardinality.MANDATORY)
85 protected DeviceService deviceService;
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected DriverService driverService;
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected ClusterService clusterService;
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 protected LeadershipService leadershipService;
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected KubevirtFlowRuleService flowRuleService;
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected KubevirtNodeService kubevirtNodeService;
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected KubevirtNetworkService kubevirtNetworkService;
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected KubevirtPortService kubevirtPortService;
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected KubevirtPodService kubevirtPodService;
102
103 private final ExecutorService eventExecutor = newSingleThreadExecutor(
104 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
105
Jian Li567b25c2021-05-27 15:17:59 +0900106 private final InternalKubevirtNetworkListener kubevirtNetworkListener =
107 new InternalKubevirtNetworkListener();
Jian Lib6dc08f2021-03-24 15:24:18 +0900108 private final InternalKubevirtPortListener kubevirtPortListener =
109 new InternalKubevirtPortListener();
Jian Li858ccd72021-02-04 17:25:01 +0900110 private final InternalKubevirtNodeListener kubevirtNodeListener =
111 new InternalKubevirtNodeListener();
112
113 private ApplicationId appId;
114 private NodeId localNodeId;
115
116 @Activate
117 protected void activate() {
118 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
119 localNodeId = clusterService.getLocalNode().id();
120 leadershipService.runForLeadership(appId.name());
Jian Lib6dc08f2021-03-24 15:24:18 +0900121 kubevirtPortService.addListener(kubevirtPortListener);
Jian Li567b25c2021-05-27 15:17:59 +0900122 kubevirtNetworkService.addListener(kubevirtNetworkListener);
Jian Li858ccd72021-02-04 17:25:01 +0900123 kubevirtNodeService.addListener(kubevirtNodeListener);
124
125 log.info("Started");
126 }
127
128 @Deactivate
129 protected void deactivate() {
Jian Li567b25c2021-05-27 15:17:59 +0900130 kubevirtNetworkService.removeListener(kubevirtNetworkListener);
Jian Lib6dc08f2021-03-24 15:24:18 +0900131 kubevirtPortService.removeListener(kubevirtPortListener);
Jian Li858ccd72021-02-04 17:25:01 +0900132 kubevirtNodeService.removeListener(kubevirtNodeListener);
133 leadershipService.withdraw(appId.name());
134 eventExecutor.shutdown();
135
136 log.info("Stopped");
137 }
138
Jian Li567b25c2021-05-27 15:17:59 +0900139 private void setIngressRules(KubevirtNetwork network, boolean install) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900140 if (network == null) {
141 return;
142 }
143
144 if (network.type() == FLAT || network.type() == VLAN) {
145 return;
146 }
147
148 if (network.segmentId() == null) {
149 return;
150 }
151
Jian Li567b25c2021-05-27 15:17:59 +0900152 for (KubevirtNode localNode : kubevirtNodeService.completeNodes(WORKER)) {
153
154 while (true) {
Jian Li34fff802021-07-01 10:04:04 +0900155 KubevirtNode updatedNode = kubevirtNodeService.node(localNode.hostname());
156 if (tunnelToTenantPort(deviceService, updatedNode, network) != null) {
Jian Li567b25c2021-05-27 15:17:59 +0900157 break;
158 } else {
Jian Li34fff802021-07-01 10:04:04 +0900159 log.info("Waiting for tunnel to tenant patch port creation " +
160 "on ingress rule setup on node {}", updatedNode);
Jian Li0c656f02021-06-07 13:32:39 +0900161 waitFor(3);
Jian Li567b25c2021-05-27 15:17:59 +0900162 }
163 }
164
Jian Li34fff802021-07-01 10:04:04 +0900165 PortNumber patchPortNumber = tunnelToTenantPort(deviceService, localNode, network);
Jian Li567b25c2021-05-27 15:17:59 +0900166
167 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
168 .matchTunnelId(Long.parseLong(network.segmentId()));
169
170 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
171 .setOutput(patchPortNumber);
172
173 flowRuleService.setRule(
174 appId,
175 localNode.tunBridge(),
176 sBuilder.build(),
177 tBuilder.build(),
178 PRIORITY_TUNNEL_RULE,
179 TUNNEL_DEFAULT_TABLE,
180 install);
181
182 log.debug("Install ingress rules for segment ID {}", network.segmentId());
Jian Lib6dc08f2021-03-24 15:24:18 +0900183 }
Jian Li567b25c2021-05-27 15:17:59 +0900184 }
Jian Lib6dc08f2021-03-24 15:24:18 +0900185
Jian Li567b25c2021-05-27 15:17:59 +0900186 private void setIngressRules(KubevirtNode node, boolean install) {
187 for (KubevirtNetwork network : kubevirtNetworkService.tenantNetworks()) {
Jian Li34fff802021-07-01 10:04:04 +0900188
189 if (node == null || node.type() != WORKER) {
Jian Li567b25c2021-05-27 15:17:59 +0900190 return;
191 }
192
Jian Li34fff802021-07-01 10:04:04 +0900193 while (true) {
194 KubevirtNode updatedNode = kubevirtNodeService.node(node.hostname());
195 if (tunnelToTenantPort(deviceService, updatedNode, network) != null) {
196 break;
197 } else {
198 log.info("Waiting for tunnel to tenant patch port creation " +
199 "on ingress rule setup on node {}", updatedNode);
200 waitFor(3);
201 }
202 }
203
204 PortNumber patchPortNumber = tunnelToTenantPort(deviceService, node, network);
205
Jian Li567b25c2021-05-27 15:17:59 +0900206 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
207 .matchTunnelId(Long.parseLong(network.segmentId()));
208
209 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
210 .setOutput(patchPortNumber);
211
212 flowRuleService.setRule(
213 appId,
214 node.tunBridge(),
215 sBuilder.build(),
216 tBuilder.build(),
217 PRIORITY_TUNNEL_RULE,
218 TUNNEL_DEFAULT_TABLE,
219 install);
220
221 log.debug("Install ingress rules for segment ID {}", network.segmentId());
Jian Lib6dc08f2021-03-24 15:24:18 +0900222 }
Jian Lib6dc08f2021-03-24 15:24:18 +0900223 }
224
225 private void setEgressRules(KubevirtPort port, boolean install) {
226 if (port.ipAddress() == null) {
227 return;
228 }
229
230 KubevirtNetwork network = kubevirtNetworkService.network(port.networkId());
231
232 if (network == null) {
233 return;
234 }
235
236 if (network.type() == FLAT || network.type() == VLAN) {
237 return;
238 }
239
240 if (network.segmentId() == null) {
241 return;
242 }
243
244 KubevirtNode localNode = kubevirtNodeService.node(port.deviceId());
245
246 if (localNode == null || localNode.type() == MASTER) {
247 return;
248 }
249
250 for (KubevirtNode remoteNode : kubevirtNodeService.completeNodes(WORKER)) {
251 if (remoteNode.hostname().equals(localNode.hostname())) {
252 continue;
Jian Lid4296d02021-03-12 18:03:58 +0900253 }
254
Jian Li34fff802021-07-01 10:04:04 +0900255 while (true) {
256 KubevirtNode updatedNode = kubevirtNodeService.node(localNode.hostname());
257 if (tunnelToTenantPort(deviceService, updatedNode, network) != null) {
258 break;
259 } else {
260 log.info("Waiting for tunnel to tenant patch port creation " +
261 "on egress rule setup on node {}", updatedNode);
262 waitFor(3);
263 }
Jian Lid4296d02021-03-12 18:03:58 +0900264 }
265
Jian Li34fff802021-07-01 10:04:04 +0900266 PortNumber patchPortNumber = tunnelToTenantPort(deviceService, remoteNode, network);
267
Jian Lib6dc08f2021-03-24 15:24:18 +0900268 PortNumber tunnelPortNumber = tunnelPort(remoteNode, network);
269 if (tunnelPortNumber == null) {
270 return;
271 }
272
273 TrafficSelector.Builder sIpBuilder = DefaultTrafficSelector.builder()
274 .matchInPort(patchPortNumber)
275 .matchEthType(Ethernet.TYPE_IPV4)
276 .matchIPDst(IpPrefix.valueOf(port.ipAddress(), 32));
277
278 TrafficSelector.Builder sArpBuilder = DefaultTrafficSelector.builder()
279 .matchInPort(patchPortNumber)
280 .matchEthType(Ethernet.TYPE_ARP)
281 .matchArpTpa(Ip4Address.valueOf(port.ipAddress().toString()));
Jian Lid4296d02021-03-12 18:03:58 +0900282
283 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Lib6dc08f2021-03-24 15:24:18 +0900284 .setTunnelId(Long.parseLong(network.segmentId()))
285 .extension(buildExtension(
286 deviceService,
287 remoteNode.tunBridge(),
288 localNode.dataIp().getIp4Address()),
289 remoteNode.tunBridge())
290 .setOutput(tunnelPortNumber);
Jian Lid4296d02021-03-12 18:03:58 +0900291
292 flowRuleService.setRule(
293 appId,
Jian Lib6dc08f2021-03-24 15:24:18 +0900294 remoteNode.tunBridge(),
295 sIpBuilder.build(),
Jian Lid4296d02021-03-12 18:03:58 +0900296 tBuilder.build(),
297 PRIORITY_TUNNEL_RULE,
298 TUNNEL_DEFAULT_TABLE,
299 install);
300
Jian Lib6dc08f2021-03-24 15:24:18 +0900301 flowRuleService.setRule(
302 appId,
303 remoteNode.tunBridge(),
304 sArpBuilder.build(),
305 tBuilder.build(),
306 PRIORITY_TUNNEL_RULE,
307 TUNNEL_DEFAULT_TABLE,
308 install);
Jian Li858ccd72021-02-04 17:25:01 +0900309 }
310
Jian Lib6dc08f2021-03-24 15:24:18 +0900311 log.debug("Install egress rules for instance {}, segment ID {}",
312 port.ipAddress(), network.segmentId());
Jian Li858ccd72021-02-04 17:25:01 +0900313 }
314
315 private class InternalKubevirtNodeListener implements KubevirtNodeListener {
316
317 private boolean isRelevantHelper() {
318 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
319 }
320
321 @Override
322 public void event(KubevirtNodeEvent event) {
323
324 switch (event.type()) {
325 case KUBEVIRT_NODE_COMPLETE:
326 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
327 break;
328 case KUBEVIRT_NODE_INCOMPLETE:
329 default:
330 // do nothing
331 break;
332 }
333 }
334
335 private void processNodeCompletion(KubevirtNode node) {
336 if (!isRelevantHelper()) {
337 return;
338 }
339
Jian Li567b25c2021-05-27 15:17:59 +0900340 setIngressRules(node, true);
Jian Lib6dc08f2021-03-24 15:24:18 +0900341 kubevirtPortService.ports().stream()
342 .filter(port -> node.equals(kubevirtNodeService.node(port.deviceId())))
343 .forEach(port -> {
Jian Lib6dc08f2021-03-24 15:24:18 +0900344 setEgressRules(port, true);
Jian Li858ccd72021-02-04 17:25:01 +0900345 });
346 }
347 }
348
Jian Li567b25c2021-05-27 15:17:59 +0900349 private class InternalKubevirtNetworkListener implements KubevirtNetworkListener {
350 private boolean isRelevantHelper() {
351 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
352 }
353
354 @Override
355 public void event(KubevirtNetworkEvent event) {
356 switch (event.type()) {
357 case KUBEVIRT_NETWORK_CREATED:
358 eventExecutor.execute(() -> processNetworkAddition(event.subject()));
359 break;
360 case KUBEVIRT_NETWORK_REMOVED:
361 eventExecutor.execute(() -> processNetworkRemoval(event.subject()));
362 break;
363 default:
364 // do nothing
365 break;
366 }
367 }
368
369 private void processNetworkAddition(KubevirtNetwork network) {
370 if (!isRelevantHelper()) {
371 return;
372 }
373
374 setIngressRules(network, true);
375 }
376
377 private void processNetworkRemoval(KubevirtNetwork network) {
378 if (!isRelevantHelper()) {
379 return;
380 }
381
382 setIngressRules(network, false);
383 }
384 }
385
Jian Lib6dc08f2021-03-24 15:24:18 +0900386 private class InternalKubevirtPortListener implements KubevirtPortListener {
387
388 @Override
389 public boolean isRelevant(KubevirtPortEvent event) {
390 return event.subject().deviceId() != null;
391 }
Jian Li858ccd72021-02-04 17:25:01 +0900392
393 private boolean isRelevantHelper() {
394 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
395 }
396
397 @Override
Jian Lib6dc08f2021-03-24 15:24:18 +0900398 public void event(KubevirtPortEvent event) {
Jian Li858ccd72021-02-04 17:25:01 +0900399
400 switch (event.type()) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900401 case KUBEVIRT_PORT_UPDATED:
402 eventExecutor.execute(() -> processPortUpdate(event.subject()));
Jian Li858ccd72021-02-04 17:25:01 +0900403 break;
Jian Lib6dc08f2021-03-24 15:24:18 +0900404 case KUBEVIRT_PORT_REMOVED:
405 eventExecutor.execute(() -> processPortRemoval(event.subject()));
Jian Li858ccd72021-02-04 17:25:01 +0900406 break;
407 default:
408 // do nothing
409 break;
410 }
411 }
412
Jian Lib6dc08f2021-03-24 15:24:18 +0900413 private void processPortUpdate(KubevirtPort port) {
Jian Li858ccd72021-02-04 17:25:01 +0900414 if (!isRelevantHelper()) {
415 return;
416 }
417
Jian Lib6dc08f2021-03-24 15:24:18 +0900418 setEgressRules(port, true);
Jian Li858ccd72021-02-04 17:25:01 +0900419 }
420
Jian Lib6dc08f2021-03-24 15:24:18 +0900421 private void processPortRemoval(KubevirtPort port) {
Jian Li858ccd72021-02-04 17:25:01 +0900422 if (!isRelevantHelper()) {
423 return;
424 }
425
Jian Lib6dc08f2021-03-24 15:24:18 +0900426 setEgressRules(port, false);
Jian Li858ccd72021-02-04 17:25:01 +0900427 }
428 }
429}