blob: 3967d6a225cc429b5d699e12fd1b4a616e28139f [file] [log] [blame]
Jian Li858ccd72021-02-04 17:25:01 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
Jian Li858ccd72021-02-04 17:25:01 +090018import org.onlab.packet.Ethernet;
19import org.onlab.packet.Ip4Address;
20import org.onlab.packet.IpPrefix;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.LeadershipService;
23import org.onosproject.cluster.NodeId;
24import org.onosproject.core.ApplicationId;
25import org.onosproject.core.CoreService;
26import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
27import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
Jian Li567b25c2021-05-27 15:17:59 +090028import org.onosproject.kubevirtnetworking.api.KubevirtNetworkEvent;
29import org.onosproject.kubevirtnetworking.api.KubevirtNetworkListener;
Jian Li858ccd72021-02-04 17:25:01 +090030import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
Jian Li858ccd72021-02-04 17:25:01 +090031import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
32import org.onosproject.kubevirtnetworking.api.KubevirtPort;
Jian Lib6dc08f2021-03-24 15:24:18 +090033import org.onosproject.kubevirtnetworking.api.KubevirtPortEvent;
34import org.onosproject.kubevirtnetworking.api.KubevirtPortListener;
Jian Li858ccd72021-02-04 17:25:01 +090035import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
36import org.onosproject.kubevirtnode.api.KubevirtNode;
37import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
38import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
39import org.onosproject.kubevirtnode.api.KubevirtNodeService;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.PortNumber;
42import org.onosproject.net.device.DeviceService;
43import org.onosproject.net.driver.DriverService;
44import org.onosproject.net.flow.DefaultTrafficSelector;
45import org.onosproject.net.flow.DefaultTrafficTreatment;
46import org.onosproject.net.flow.TrafficSelector;
47import org.onosproject.net.flow.TrafficTreatment;
48import org.osgi.service.component.annotations.Activate;
49import org.osgi.service.component.annotations.Component;
50import org.osgi.service.component.annotations.Deactivate;
51import org.osgi.service.component.annotations.Reference;
52import org.osgi.service.component.annotations.ReferenceCardinality;
53import org.slf4j.Logger;
54
55import java.util.Objects;
56import java.util.concurrent.ExecutorService;
57
58import static java.util.concurrent.Executors.newSingleThreadExecutor;
59import static org.onlab.util.Tools.groupedThreads;
60import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
61import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_TUNNEL_RULE;
62import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
63import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.FLAT;
Jian Li2ce718e2021-02-17 20:42:15 +090064import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VLAN;
Jian Li858ccd72021-02-04 17:25:01 +090065import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
66import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelToTenantPort;
Jian Li0c656f02021-06-07 13:32:39 +090067import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.waitFor;
Jian Li858ccd72021-02-04 17:25:01 +090068import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
69import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
70import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
71import static org.slf4j.LoggerFactory.getLogger;
72
73/**
74 * Populates switching flow rules on OVS for the tenant network (overlay).
75 */
76@Component(immediate = true)
77public class KubevirtSwitchingTenantHandler {
78 private final Logger log = getLogger(getClass());
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY)
81 protected CoreService coreService;
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 protected MastershipService mastershipService;
84 @Reference(cardinality = ReferenceCardinality.MANDATORY)
85 protected DeviceService deviceService;
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected DriverService driverService;
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected ClusterService clusterService;
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 protected LeadershipService leadershipService;
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected KubevirtFlowRuleService flowRuleService;
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected KubevirtNodeService kubevirtNodeService;
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected KubevirtNetworkService kubevirtNetworkService;
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected KubevirtPortService kubevirtPortService;
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected KubevirtPodService kubevirtPodService;
102
103 private final ExecutorService eventExecutor = newSingleThreadExecutor(
104 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
105
Jian Li567b25c2021-05-27 15:17:59 +0900106 private final InternalKubevirtNetworkListener kubevirtNetworkListener =
107 new InternalKubevirtNetworkListener();
Jian Lib6dc08f2021-03-24 15:24:18 +0900108 private final InternalKubevirtPortListener kubevirtPortListener =
109 new InternalKubevirtPortListener();
Jian Li858ccd72021-02-04 17:25:01 +0900110 private final InternalKubevirtNodeListener kubevirtNodeListener =
111 new InternalKubevirtNodeListener();
112
113 private ApplicationId appId;
114 private NodeId localNodeId;
115
116 @Activate
117 protected void activate() {
118 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
119 localNodeId = clusterService.getLocalNode().id();
120 leadershipService.runForLeadership(appId.name());
Jian Lib6dc08f2021-03-24 15:24:18 +0900121 kubevirtPortService.addListener(kubevirtPortListener);
Jian Li567b25c2021-05-27 15:17:59 +0900122 kubevirtNetworkService.addListener(kubevirtNetworkListener);
Jian Li858ccd72021-02-04 17:25:01 +0900123 kubevirtNodeService.addListener(kubevirtNodeListener);
124
125 log.info("Started");
126 }
127
128 @Deactivate
129 protected void deactivate() {
Jian Li567b25c2021-05-27 15:17:59 +0900130 kubevirtNetworkService.removeListener(kubevirtNetworkListener);
Jian Lib6dc08f2021-03-24 15:24:18 +0900131 kubevirtPortService.removeListener(kubevirtPortListener);
Jian Li858ccd72021-02-04 17:25:01 +0900132 kubevirtNodeService.removeListener(kubevirtNodeListener);
133 leadershipService.withdraw(appId.name());
134 eventExecutor.shutdown();
135
136 log.info("Stopped");
137 }
138
Jian Li567b25c2021-05-27 15:17:59 +0900139 private void setIngressRules(KubevirtNetwork network, boolean install) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900140 if (network == null) {
141 return;
142 }
143
144 if (network.type() == FLAT || network.type() == VLAN) {
145 return;
146 }
147
148 if (network.segmentId() == null) {
149 return;
150 }
151
Jian Li567b25c2021-05-27 15:17:59 +0900152 for (KubevirtNode localNode : kubevirtNodeService.completeNodes(WORKER)) {
153
154 while (true) {
155 if (tunnelToTenantPort(localNode, network) != null) {
156 break;
157 } else {
Jian Li0c656f02021-06-07 13:32:39 +0900158 waitFor(3);
Jian Li567b25c2021-05-27 15:17:59 +0900159 }
160 }
161
162 PortNumber patchPortNumber = tunnelToTenantPort(localNode, network);
163
164 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
165 .matchTunnelId(Long.parseLong(network.segmentId()));
166
167 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
168 .setOutput(patchPortNumber);
169
170 flowRuleService.setRule(
171 appId,
172 localNode.tunBridge(),
173 sBuilder.build(),
174 tBuilder.build(),
175 PRIORITY_TUNNEL_RULE,
176 TUNNEL_DEFAULT_TABLE,
177 install);
178
179 log.debug("Install ingress rules for segment ID {}", network.segmentId());
Jian Lib6dc08f2021-03-24 15:24:18 +0900180 }
Jian Li567b25c2021-05-27 15:17:59 +0900181 }
Jian Lib6dc08f2021-03-24 15:24:18 +0900182
Jian Li567b25c2021-05-27 15:17:59 +0900183 private void setIngressRules(KubevirtNode node, boolean install) {
184 for (KubevirtNetwork network : kubevirtNetworkService.tenantNetworks()) {
185 PortNumber patchPortNumber = tunnelToTenantPort(node, network);
186 if (patchPortNumber == null) {
187 return;
188 }
189
190 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
191 .matchTunnelId(Long.parseLong(network.segmentId()));
192
193 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
194 .setOutput(patchPortNumber);
195
196 flowRuleService.setRule(
197 appId,
198 node.tunBridge(),
199 sBuilder.build(),
200 tBuilder.build(),
201 PRIORITY_TUNNEL_RULE,
202 TUNNEL_DEFAULT_TABLE,
203 install);
204
205 log.debug("Install ingress rules for segment ID {}", network.segmentId());
Jian Lib6dc08f2021-03-24 15:24:18 +0900206 }
Jian Lib6dc08f2021-03-24 15:24:18 +0900207 }
208
209 private void setEgressRules(KubevirtPort port, boolean install) {
210 if (port.ipAddress() == null) {
211 return;
212 }
213
214 KubevirtNetwork network = kubevirtNetworkService.network(port.networkId());
215
216 if (network == null) {
217 return;
218 }
219
220 if (network.type() == FLAT || network.type() == VLAN) {
221 return;
222 }
223
224 if (network.segmentId() == null) {
225 return;
226 }
227
228 KubevirtNode localNode = kubevirtNodeService.node(port.deviceId());
229
230 if (localNode == null || localNode.type() == MASTER) {
231 return;
232 }
233
234 for (KubevirtNode remoteNode : kubevirtNodeService.completeNodes(WORKER)) {
235 if (remoteNode.hostname().equals(localNode.hostname())) {
236 continue;
Jian Lid4296d02021-03-12 18:03:58 +0900237 }
238
Jian Lib6dc08f2021-03-24 15:24:18 +0900239 PortNumber patchPortNumber = tunnelToTenantPort(remoteNode, network);
Jian Lid4296d02021-03-12 18:03:58 +0900240 if (patchPortNumber == null) {
241 return;
242 }
243
Jian Lib6dc08f2021-03-24 15:24:18 +0900244 PortNumber tunnelPortNumber = tunnelPort(remoteNode, network);
245 if (tunnelPortNumber == null) {
246 return;
247 }
248
249 TrafficSelector.Builder sIpBuilder = DefaultTrafficSelector.builder()
250 .matchInPort(patchPortNumber)
251 .matchEthType(Ethernet.TYPE_IPV4)
252 .matchIPDst(IpPrefix.valueOf(port.ipAddress(), 32));
253
254 TrafficSelector.Builder sArpBuilder = DefaultTrafficSelector.builder()
255 .matchInPort(patchPortNumber)
256 .matchEthType(Ethernet.TYPE_ARP)
257 .matchArpTpa(Ip4Address.valueOf(port.ipAddress().toString()));
Jian Lid4296d02021-03-12 18:03:58 +0900258
259 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Lib6dc08f2021-03-24 15:24:18 +0900260 .setTunnelId(Long.parseLong(network.segmentId()))
261 .extension(buildExtension(
262 deviceService,
263 remoteNode.tunBridge(),
264 localNode.dataIp().getIp4Address()),
265 remoteNode.tunBridge())
266 .setOutput(tunnelPortNumber);
Jian Lid4296d02021-03-12 18:03:58 +0900267
268 flowRuleService.setRule(
269 appId,
Jian Lib6dc08f2021-03-24 15:24:18 +0900270 remoteNode.tunBridge(),
271 sIpBuilder.build(),
Jian Lid4296d02021-03-12 18:03:58 +0900272 tBuilder.build(),
273 PRIORITY_TUNNEL_RULE,
274 TUNNEL_DEFAULT_TABLE,
275 install);
276
Jian Lib6dc08f2021-03-24 15:24:18 +0900277 flowRuleService.setRule(
278 appId,
279 remoteNode.tunBridge(),
280 sArpBuilder.build(),
281 tBuilder.build(),
282 PRIORITY_TUNNEL_RULE,
283 TUNNEL_DEFAULT_TABLE,
284 install);
Jian Li858ccd72021-02-04 17:25:01 +0900285 }
286
Jian Lib6dc08f2021-03-24 15:24:18 +0900287 log.debug("Install egress rules for instance {}, segment ID {}",
288 port.ipAddress(), network.segmentId());
Jian Li858ccd72021-02-04 17:25:01 +0900289 }
290
291 private class InternalKubevirtNodeListener implements KubevirtNodeListener {
292
293 private boolean isRelevantHelper() {
294 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
295 }
296
297 @Override
298 public void event(KubevirtNodeEvent event) {
299
300 switch (event.type()) {
301 case KUBEVIRT_NODE_COMPLETE:
302 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
303 break;
304 case KUBEVIRT_NODE_INCOMPLETE:
305 default:
306 // do nothing
307 break;
308 }
309 }
310
311 private void processNodeCompletion(KubevirtNode node) {
312 if (!isRelevantHelper()) {
313 return;
314 }
315
Jian Li567b25c2021-05-27 15:17:59 +0900316 setIngressRules(node, true);
Jian Lib6dc08f2021-03-24 15:24:18 +0900317 kubevirtPortService.ports().stream()
318 .filter(port -> node.equals(kubevirtNodeService.node(port.deviceId())))
319 .forEach(port -> {
Jian Lib6dc08f2021-03-24 15:24:18 +0900320 setEgressRules(port, true);
Jian Li858ccd72021-02-04 17:25:01 +0900321 });
322 }
323 }
324
Jian Li567b25c2021-05-27 15:17:59 +0900325 private class InternalKubevirtNetworkListener implements KubevirtNetworkListener {
326 private boolean isRelevantHelper() {
327 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
328 }
329
330 @Override
331 public void event(KubevirtNetworkEvent event) {
332 switch (event.type()) {
333 case KUBEVIRT_NETWORK_CREATED:
334 eventExecutor.execute(() -> processNetworkAddition(event.subject()));
335 break;
336 case KUBEVIRT_NETWORK_REMOVED:
337 eventExecutor.execute(() -> processNetworkRemoval(event.subject()));
338 break;
339 default:
340 // do nothing
341 break;
342 }
343 }
344
345 private void processNetworkAddition(KubevirtNetwork network) {
346 if (!isRelevantHelper()) {
347 return;
348 }
349
350 setIngressRules(network, true);
351 }
352
353 private void processNetworkRemoval(KubevirtNetwork network) {
354 if (!isRelevantHelper()) {
355 return;
356 }
357
358 setIngressRules(network, false);
359 }
360 }
361
Jian Lib6dc08f2021-03-24 15:24:18 +0900362 private class InternalKubevirtPortListener implements KubevirtPortListener {
363
364 @Override
365 public boolean isRelevant(KubevirtPortEvent event) {
366 return event.subject().deviceId() != null;
367 }
Jian Li858ccd72021-02-04 17:25:01 +0900368
369 private boolean isRelevantHelper() {
370 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
371 }
372
373 @Override
Jian Lib6dc08f2021-03-24 15:24:18 +0900374 public void event(KubevirtPortEvent event) {
Jian Li858ccd72021-02-04 17:25:01 +0900375
376 switch (event.type()) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900377 case KUBEVIRT_PORT_UPDATED:
378 eventExecutor.execute(() -> processPortUpdate(event.subject()));
Jian Li858ccd72021-02-04 17:25:01 +0900379 break;
Jian Lib6dc08f2021-03-24 15:24:18 +0900380 case KUBEVIRT_PORT_REMOVED:
381 eventExecutor.execute(() -> processPortRemoval(event.subject()));
Jian Li858ccd72021-02-04 17:25:01 +0900382 break;
383 default:
384 // do nothing
385 break;
386 }
387 }
388
Jian Lib6dc08f2021-03-24 15:24:18 +0900389 private void processPortUpdate(KubevirtPort port) {
Jian Li858ccd72021-02-04 17:25:01 +0900390 if (!isRelevantHelper()) {
391 return;
392 }
393
Jian Lib6dc08f2021-03-24 15:24:18 +0900394 setEgressRules(port, true);
Jian Li858ccd72021-02-04 17:25:01 +0900395 }
396
Jian Lib6dc08f2021-03-24 15:24:18 +0900397 private void processPortRemoval(KubevirtPort port) {
Jian Li858ccd72021-02-04 17:25:01 +0900398 if (!isRelevantHelper()) {
399 return;
400 }
401
Jian Lib6dc08f2021-03-24 15:24:18 +0900402 setEgressRules(port, false);
Jian Li858ccd72021-02-04 17:25:01 +0900403 }
404 }
405}