blob: 01c32dd4665c087b4ae4f1e21036a0d399f53f7d [file] [log] [blame]
Jian Li543fe852021-02-04 17:25:01 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import io.fabric8.kubernetes.api.model.Pod;
19import org.onlab.packet.Ethernet;
20import org.onlab.packet.Ip4Address;
21import org.onlab.packet.IpPrefix;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
27import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
28import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
29import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
30import org.onosproject.kubevirtnetworking.api.KubevirtPodEvent;
31import org.onosproject.kubevirtnetworking.api.KubevirtPodListener;
32import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
33import org.onosproject.kubevirtnetworking.api.KubevirtPort;
34import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
35import org.onosproject.kubevirtnode.api.KubevirtNode;
36import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
37import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
38import org.onosproject.kubevirtnode.api.KubevirtNodeService;
39import org.onosproject.mastership.MastershipService;
40import org.onosproject.net.PortNumber;
41import org.onosproject.net.device.DeviceService;
42import org.onosproject.net.driver.DriverService;
43import org.onosproject.net.flow.DefaultTrafficSelector;
44import org.onosproject.net.flow.DefaultTrafficTreatment;
45import org.onosproject.net.flow.TrafficSelector;
46import org.onosproject.net.flow.TrafficTreatment;
47import org.osgi.service.component.annotations.Activate;
48import org.osgi.service.component.annotations.Component;
49import org.osgi.service.component.annotations.Deactivate;
50import org.osgi.service.component.annotations.Reference;
51import org.osgi.service.component.annotations.ReferenceCardinality;
52import org.slf4j.Logger;
53
54import java.util.Objects;
55import java.util.concurrent.ExecutorService;
56
57import static java.util.concurrent.Executors.newSingleThreadExecutor;
58import static org.onlab.util.Tools.groupedThreads;
59import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
60import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_TUNNEL_RULE;
61import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
62import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.FLAT;
Jian Li81b1aab2021-02-17 20:42:15 +090063import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VLAN;
Jian Li543fe852021-02-04 17:25:01 +090064import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPort;
65import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
66import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelToTenantPort;
67import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
68import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
69import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
70import static org.slf4j.LoggerFactory.getLogger;
71
72/**
73 * Populates switching flow rules on OVS for the tenant network (overlay).
74 */
75@Component(immediate = true)
76public class KubevirtSwitchingTenantHandler {
77 private final Logger log = getLogger(getClass());
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY)
80 protected CoreService coreService;
81 @Reference(cardinality = ReferenceCardinality.MANDATORY)
82 protected MastershipService mastershipService;
83 @Reference(cardinality = ReferenceCardinality.MANDATORY)
84 protected DeviceService deviceService;
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 protected DriverService driverService;
87 @Reference(cardinality = ReferenceCardinality.MANDATORY)
88 protected ClusterService clusterService;
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected LeadershipService leadershipService;
91 @Reference(cardinality = ReferenceCardinality.MANDATORY)
92 protected KubevirtFlowRuleService flowRuleService;
93 @Reference(cardinality = ReferenceCardinality.MANDATORY)
94 protected KubevirtNodeService kubevirtNodeService;
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected KubevirtNetworkService kubevirtNetworkService;
97 @Reference(cardinality = ReferenceCardinality.MANDATORY)
98 protected KubevirtPortService kubevirtPortService;
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected KubevirtPodService kubevirtPodService;
101
102 private final ExecutorService eventExecutor = newSingleThreadExecutor(
103 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
104
105 private final InternalKubevirtPodListener kubevirtPodListener =
106 new InternalKubevirtPodListener();
107 private final InternalKubevirtNodeListener kubevirtNodeListener =
108 new InternalKubevirtNodeListener();
109
110 private ApplicationId appId;
111 private NodeId localNodeId;
112
113 @Activate
114 protected void activate() {
115 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
116 localNodeId = clusterService.getLocalNode().id();
117 leadershipService.runForLeadership(appId.name());
118 kubevirtPodService.addListener(kubevirtPodListener);
119 kubevirtNodeService.addListener(kubevirtNodeListener);
120
121 log.info("Started");
122 }
123
124 @Deactivate
125 protected void deactivate() {
126 kubevirtPodService.removeListener(kubevirtPodListener);
127 kubevirtNodeService.removeListener(kubevirtNodeListener);
128 leadershipService.withdraw(appId.name());
129 eventExecutor.shutdown();
130
131 log.info("Stopped");
132 }
133
134 private KubevirtPort getPortByPod(Pod pod) {
135 return getPort(kubevirtNetworkService.networks(), pod);
136 }
137
138 private void setIngressRules(Pod pod, boolean install) {
139 KubevirtPort port = getPortByPod(pod);
140
141 if (port == null) {
142 return;
143 }
144
145 if (port.ipAddress() == null) {
146 return;
147 }
148
149 KubevirtNetwork network = kubevirtNetworkService.network(port.networkId());
150
151 if (network == null) {
152 return;
153 }
154
Jian Li81b1aab2021-02-17 20:42:15 +0900155 if (network.type() == FLAT || network.type() == VLAN) {
Jian Li543fe852021-02-04 17:25:01 +0900156 return;
157 }
158
159 if (network.segmentId() == null) {
160 return;
161 }
162
163 KubevirtNode localNode = kubevirtNodeService.node(pod.getSpec().getNodeName());
164 if (localNode == null || localNode.type() == MASTER) {
165 return;
166 }
167
168 PortNumber patchPortNumber = tunnelToTenantPort(localNode, network);
169 if (patchPortNumber == null) {
170 return;
171 }
172
173 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
174 .matchTunnelId(Long.parseLong(network.segmentId()));
175
176 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
177 .setOutput(patchPortNumber);
178
179 flowRuleService.setRule(
180 appId,
181 localNode.tunBridge(),
182 sBuilder.build(),
183 tBuilder.build(),
184 PRIORITY_TUNNEL_RULE,
185 TUNNEL_DEFAULT_TABLE,
186 install);
187
188 log.debug("Install ingress rules for instance {}, segment ID {}",
189 port.ipAddress(), network.segmentId());
190 }
191
192 private void setEgressRules(Pod pod, boolean install) {
193 KubevirtNode localNode = kubevirtNodeService.node(pod.getSpec().getNodeName());
194
195 if (localNode == null) {
196 return;
197 }
198
199 if (localNode.type() == MASTER) {
200 return;
201 }
202
203 KubevirtPort port = getPortByPod(pod);
204
205 if (port == null) {
206 return;
207 }
208
209 if (port.ipAddress() == null) {
210 return;
211 }
212
213 KubevirtNetwork network = kubevirtNetworkService.network(port.networkId());
214
215 if (network == null) {
216 return;
217 }
218
Jian Li81b1aab2021-02-17 20:42:15 +0900219 if (network.type() == FLAT || network.type() == VLAN) {
Jian Li543fe852021-02-04 17:25:01 +0900220 return;
221 }
222
223 if (network.segmentId() == null) {
224 return;
225 }
226
227 for (KubevirtNode remoteNode : kubevirtNodeService.completeNodes(WORKER)) {
228 if (remoteNode.hostname().equals(localNode.hostname())) {
229 continue;
230 }
231
232 PortNumber patchPortNumber = tunnelToTenantPort(remoteNode, network);
233 if (patchPortNumber == null) {
234 return;
235 }
236
237 PortNumber tunnelPortNumber = tunnelPort(remoteNode, network);
238 if (tunnelPortNumber == null) {
239 return;
240 }
241
242 TrafficSelector.Builder sIpBuilder = DefaultTrafficSelector.builder()
243 .matchInPort(patchPortNumber)
244 .matchEthType(Ethernet.TYPE_IPV4)
245 .matchIPDst(IpPrefix.valueOf(port.ipAddress(), 32));
246
247 TrafficSelector.Builder sArpBuilder = DefaultTrafficSelector.builder()
248 .matchInPort(patchPortNumber)
249 .matchEthType(Ethernet.TYPE_ARP)
250 .matchArpTpa(Ip4Address.valueOf(port.ipAddress().toString()));
251
252 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
253 .setTunnelId(Long.parseLong(network.segmentId()))
254 .extension(buildExtension(
255 deviceService,
256 remoteNode.tunBridge(),
257 localNode.dataIp().getIp4Address()),
258 remoteNode.tunBridge())
259 .setOutput(tunnelPortNumber);
260
261 flowRuleService.setRule(
262 appId,
263 remoteNode.tunBridge(),
264 sIpBuilder.build(),
265 tBuilder.build(),
266 PRIORITY_TUNNEL_RULE,
267 TUNNEL_DEFAULT_TABLE,
268 install);
269
270 flowRuleService.setRule(
271 appId,
272 remoteNode.tunBridge(),
273 sArpBuilder.build(),
274 tBuilder.build(),
275 PRIORITY_TUNNEL_RULE,
276 TUNNEL_DEFAULT_TABLE,
277 install);
278 }
279
280 log.debug("Install egress rules for instance {}, segment ID {}",
281 port.ipAddress(), network.segmentId());
282 }
283
284 private class InternalKubevirtNodeListener implements KubevirtNodeListener {
285
286 private boolean isRelevantHelper() {
287 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
288 }
289
290 @Override
291 public void event(KubevirtNodeEvent event) {
292
293 switch (event.type()) {
294 case KUBEVIRT_NODE_COMPLETE:
295 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
296 break;
297 case KUBEVIRT_NODE_INCOMPLETE:
298 default:
299 // do nothing
300 break;
301 }
302 }
303
304 private void processNodeCompletion(KubevirtNode node) {
305 if (!isRelevantHelper()) {
306 return;
307 }
308
309 kubevirtPodService.pods().stream()
310 .filter(pod -> node.hostname().equals(pod.getSpec().getNodeName()))
311 .forEach(pod -> {
312 setIngressRules(pod, true);
313 setEgressRules(pod, true);
314 });
315 }
316 }
317
318 private class InternalKubevirtPodListener implements KubevirtPodListener {
319
320 private boolean isRelevantHelper() {
321 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
322 }
323
324 @Override
325 public void event(KubevirtPodEvent event) {
326
327 switch (event.type()) {
328 case KUBEVIRT_POD_UPDATED:
329 eventExecutor.execute(() -> processPodUpdate(event.subject()));
330 break;
331 case KUBEVIRT_POD_REMOVED:
332 eventExecutor.execute(() -> processPodRemoval(event.subject()));
333 break;
334 default:
335 // do nothing
336 break;
337 }
338 }
339
340 private void processPodUpdate(Pod pod) {
341 if (!isRelevantHelper()) {
342 return;
343 }
344
345 setIngressRules(pod, true);
346 setEgressRules(pod, true);
347 }
348
349 private void processPodRemoval(Pod pod) {
350 if (!isRelevantHelper()) {
351 return;
352 }
353
354 setIngressRules(pod, false);
355 setEgressRules(pod, false);
356 }
357 }
358}