blob: bc124c6aee98830b648e4c04b6817f17dea54563 [file] [log] [blame]
Jian Li543fe852021-02-04 17:25:01 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import io.fabric8.kubernetes.api.model.Pod;
19import org.onlab.packet.Ethernet;
20import org.onlab.packet.Ip4Address;
21import org.onlab.packet.IpPrefix;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
27import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
28import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
29import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
30import org.onosproject.kubevirtnetworking.api.KubevirtPodEvent;
31import org.onosproject.kubevirtnetworking.api.KubevirtPodListener;
32import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
33import org.onosproject.kubevirtnetworking.api.KubevirtPort;
34import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
35import org.onosproject.kubevirtnode.api.KubevirtNode;
36import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
37import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
38import org.onosproject.kubevirtnode.api.KubevirtNodeService;
39import org.onosproject.mastership.MastershipService;
40import org.onosproject.net.PortNumber;
41import org.onosproject.net.device.DeviceService;
42import org.onosproject.net.driver.DriverService;
43import org.onosproject.net.flow.DefaultTrafficSelector;
44import org.onosproject.net.flow.DefaultTrafficTreatment;
45import org.onosproject.net.flow.TrafficSelector;
46import org.onosproject.net.flow.TrafficTreatment;
47import org.osgi.service.component.annotations.Activate;
48import org.osgi.service.component.annotations.Component;
49import org.osgi.service.component.annotations.Deactivate;
50import org.osgi.service.component.annotations.Reference;
51import org.osgi.service.component.annotations.ReferenceCardinality;
52import org.slf4j.Logger;
53
54import java.util.Objects;
55import java.util.concurrent.ExecutorService;
56
57import static java.util.concurrent.Executors.newSingleThreadExecutor;
58import static org.onlab.util.Tools.groupedThreads;
59import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
60import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_TUNNEL_RULE;
61import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
62import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.FLAT;
63import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPort;
64import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
65import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelToTenantPort;
66import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
67import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
68import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
69import static org.slf4j.LoggerFactory.getLogger;
70
71/**
72 * Populates switching flow rules on OVS for the tenant network (overlay).
73 */
74@Component(immediate = true)
75public class KubevirtSwitchingTenantHandler {
76 private final Logger log = getLogger(getClass());
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
79 protected CoreService coreService;
80 @Reference(cardinality = ReferenceCardinality.MANDATORY)
81 protected MastershipService mastershipService;
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 protected DeviceService deviceService;
84 @Reference(cardinality = ReferenceCardinality.MANDATORY)
85 protected DriverService driverService;
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected ClusterService clusterService;
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected LeadershipService leadershipService;
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 protected KubevirtFlowRuleService flowRuleService;
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected KubevirtNodeService kubevirtNodeService;
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected KubevirtNetworkService kubevirtNetworkService;
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected KubevirtPortService kubevirtPortService;
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected KubevirtPodService kubevirtPodService;
100
101 private final ExecutorService eventExecutor = newSingleThreadExecutor(
102 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
103
104 private final InternalKubevirtPodListener kubevirtPodListener =
105 new InternalKubevirtPodListener();
106 private final InternalKubevirtNodeListener kubevirtNodeListener =
107 new InternalKubevirtNodeListener();
108
109 private ApplicationId appId;
110 private NodeId localNodeId;
111
112 @Activate
113 protected void activate() {
114 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
115 localNodeId = clusterService.getLocalNode().id();
116 leadershipService.runForLeadership(appId.name());
117 kubevirtPodService.addListener(kubevirtPodListener);
118 kubevirtNodeService.addListener(kubevirtNodeListener);
119
120 log.info("Started");
121 }
122
123 @Deactivate
124 protected void deactivate() {
125 kubevirtPodService.removeListener(kubevirtPodListener);
126 kubevirtNodeService.removeListener(kubevirtNodeListener);
127 leadershipService.withdraw(appId.name());
128 eventExecutor.shutdown();
129
130 log.info("Stopped");
131 }
132
133 private KubevirtPort getPortByPod(Pod pod) {
134 return getPort(kubevirtNetworkService.networks(), pod);
135 }
136
137 private void setIngressRules(Pod pod, boolean install) {
138 KubevirtPort port = getPortByPod(pod);
139
140 if (port == null) {
141 return;
142 }
143
144 if (port.ipAddress() == null) {
145 return;
146 }
147
148 KubevirtNetwork network = kubevirtNetworkService.network(port.networkId());
149
150 if (network == null) {
151 return;
152 }
153
154 // TODO: need to handle VLAN case
155 if (network.type() == FLAT) {
156 return;
157 }
158
159 if (network.segmentId() == null) {
160 return;
161 }
162
163 KubevirtNode localNode = kubevirtNodeService.node(pod.getSpec().getNodeName());
164 if (localNode == null || localNode.type() == MASTER) {
165 return;
166 }
167
168 PortNumber patchPortNumber = tunnelToTenantPort(localNode, network);
169 if (patchPortNumber == null) {
170 return;
171 }
172
173 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
174 .matchTunnelId(Long.parseLong(network.segmentId()));
175
176 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
177 .setOutput(patchPortNumber);
178
179 flowRuleService.setRule(
180 appId,
181 localNode.tunBridge(),
182 sBuilder.build(),
183 tBuilder.build(),
184 PRIORITY_TUNNEL_RULE,
185 TUNNEL_DEFAULT_TABLE,
186 install);
187
188 log.debug("Install ingress rules for instance {}, segment ID {}",
189 port.ipAddress(), network.segmentId());
190 }
191
192 private void setEgressRules(Pod pod, boolean install) {
193 KubevirtNode localNode = kubevirtNodeService.node(pod.getSpec().getNodeName());
194
195 if (localNode == null) {
196 return;
197 }
198
199 if (localNode.type() == MASTER) {
200 return;
201 }
202
203 KubevirtPort port = getPortByPod(pod);
204
205 if (port == null) {
206 return;
207 }
208
209 if (port.ipAddress() == null) {
210 return;
211 }
212
213 KubevirtNetwork network = kubevirtNetworkService.network(port.networkId());
214
215 if (network == null) {
216 return;
217 }
218
219 // TODO: need to handle VLAN case
220 if (network.type() == FLAT) {
221 return;
222 }
223
224 if (network.segmentId() == null) {
225 return;
226 }
227
228 for (KubevirtNode remoteNode : kubevirtNodeService.completeNodes(WORKER)) {
229 if (remoteNode.hostname().equals(localNode.hostname())) {
230 continue;
231 }
232
233 PortNumber patchPortNumber = tunnelToTenantPort(remoteNode, network);
234 if (patchPortNumber == null) {
235 return;
236 }
237
238 PortNumber tunnelPortNumber = tunnelPort(remoteNode, network);
239 if (tunnelPortNumber == null) {
240 return;
241 }
242
243 TrafficSelector.Builder sIpBuilder = DefaultTrafficSelector.builder()
244 .matchInPort(patchPortNumber)
245 .matchEthType(Ethernet.TYPE_IPV4)
246 .matchIPDst(IpPrefix.valueOf(port.ipAddress(), 32));
247
248 TrafficSelector.Builder sArpBuilder = DefaultTrafficSelector.builder()
249 .matchInPort(patchPortNumber)
250 .matchEthType(Ethernet.TYPE_ARP)
251 .matchArpTpa(Ip4Address.valueOf(port.ipAddress().toString()));
252
253 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
254 .setTunnelId(Long.parseLong(network.segmentId()))
255 .extension(buildExtension(
256 deviceService,
257 remoteNode.tunBridge(),
258 localNode.dataIp().getIp4Address()),
259 remoteNode.tunBridge())
260 .setOutput(tunnelPortNumber);
261
262 flowRuleService.setRule(
263 appId,
264 remoteNode.tunBridge(),
265 sIpBuilder.build(),
266 tBuilder.build(),
267 PRIORITY_TUNNEL_RULE,
268 TUNNEL_DEFAULT_TABLE,
269 install);
270
271 flowRuleService.setRule(
272 appId,
273 remoteNode.tunBridge(),
274 sArpBuilder.build(),
275 tBuilder.build(),
276 PRIORITY_TUNNEL_RULE,
277 TUNNEL_DEFAULT_TABLE,
278 install);
279 }
280
281 log.debug("Install egress rules for instance {}, segment ID {}",
282 port.ipAddress(), network.segmentId());
283 }
284
285 private class InternalKubevirtNodeListener implements KubevirtNodeListener {
286
287 private boolean isRelevantHelper() {
288 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
289 }
290
291 @Override
292 public void event(KubevirtNodeEvent event) {
293
294 switch (event.type()) {
295 case KUBEVIRT_NODE_COMPLETE:
296 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
297 break;
298 case KUBEVIRT_NODE_INCOMPLETE:
299 default:
300 // do nothing
301 break;
302 }
303 }
304
305 private void processNodeCompletion(KubevirtNode node) {
306 if (!isRelevantHelper()) {
307 return;
308 }
309
310 kubevirtPodService.pods().stream()
311 .filter(pod -> node.hostname().equals(pod.getSpec().getNodeName()))
312 .forEach(pod -> {
313 setIngressRules(pod, true);
314 setEgressRules(pod, true);
315 });
316 }
317 }
318
319 private class InternalKubevirtPodListener implements KubevirtPodListener {
320
321 private boolean isRelevantHelper() {
322 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
323 }
324
325 @Override
326 public void event(KubevirtPodEvent event) {
327
328 switch (event.type()) {
329 case KUBEVIRT_POD_UPDATED:
330 eventExecutor.execute(() -> processPodUpdate(event.subject()));
331 break;
332 case KUBEVIRT_POD_REMOVED:
333 eventExecutor.execute(() -> processPodRemoval(event.subject()));
334 break;
335 default:
336 // do nothing
337 break;
338 }
339 }
340
341 private void processPodUpdate(Pod pod) {
342 if (!isRelevantHelper()) {
343 return;
344 }
345
346 setIngressRules(pod, true);
347 setEgressRules(pod, true);
348 }
349
350 private void processPodRemoval(Pod pod) {
351 if (!isRelevantHelper()) {
352 return;
353 }
354
355 setIngressRules(pod, false);
356 setEgressRules(pod, false);
357 }
358 }
359}