blob: 066ce4994b755f5f0708ff2cf4b8b4ecd79fd34c [file] [log] [blame]
Jian Li543fe852021-02-04 17:25:01 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
Jian Li543fe852021-02-04 17:25:01 +090018import org.onlab.packet.Ethernet;
19import org.onlab.packet.Ip4Address;
20import org.onlab.packet.IpPrefix;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.LeadershipService;
23import org.onosproject.cluster.NodeId;
24import org.onosproject.core.ApplicationId;
25import org.onosproject.core.CoreService;
26import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
27import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
28import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
Jian Li543fe852021-02-04 17:25:01 +090029import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
30import org.onosproject.kubevirtnetworking.api.KubevirtPort;
Jian Li7d3a0c82021-03-24 15:24:18 +090031import org.onosproject.kubevirtnetworking.api.KubevirtPortEvent;
32import org.onosproject.kubevirtnetworking.api.KubevirtPortListener;
Jian Li543fe852021-02-04 17:25:01 +090033import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
34import org.onosproject.kubevirtnode.api.KubevirtNode;
35import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
36import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
37import org.onosproject.kubevirtnode.api.KubevirtNodeService;
38import org.onosproject.mastership.MastershipService;
39import org.onosproject.net.PortNumber;
40import org.onosproject.net.device.DeviceService;
41import org.onosproject.net.driver.DriverService;
42import org.onosproject.net.flow.DefaultTrafficSelector;
43import org.onosproject.net.flow.DefaultTrafficTreatment;
44import org.onosproject.net.flow.TrafficSelector;
45import org.onosproject.net.flow.TrafficTreatment;
46import org.osgi.service.component.annotations.Activate;
47import org.osgi.service.component.annotations.Component;
48import org.osgi.service.component.annotations.Deactivate;
49import org.osgi.service.component.annotations.Reference;
50import org.osgi.service.component.annotations.ReferenceCardinality;
51import org.slf4j.Logger;
52
53import java.util.Objects;
54import java.util.concurrent.ExecutorService;
55
56import static java.util.concurrent.Executors.newSingleThreadExecutor;
57import static org.onlab.util.Tools.groupedThreads;
58import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
59import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_TUNNEL_RULE;
60import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
61import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.FLAT;
Jian Li81b1aab2021-02-17 20:42:15 +090062import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VLAN;
Jian Li543fe852021-02-04 17:25:01 +090063import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
64import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelToTenantPort;
65import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
66import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
67import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
68import static org.slf4j.LoggerFactory.getLogger;
69
70/**
71 * Populates switching flow rules on OVS for the tenant network (overlay).
72 */
73@Component(immediate = true)
74public class KubevirtSwitchingTenantHandler {
75 private final Logger log = getLogger(getClass());
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY)
78 protected CoreService coreService;
79 @Reference(cardinality = ReferenceCardinality.MANDATORY)
80 protected MastershipService mastershipService;
81 @Reference(cardinality = ReferenceCardinality.MANDATORY)
82 protected DeviceService deviceService;
83 @Reference(cardinality = ReferenceCardinality.MANDATORY)
84 protected DriverService driverService;
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 protected ClusterService clusterService;
87 @Reference(cardinality = ReferenceCardinality.MANDATORY)
88 protected LeadershipService leadershipService;
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected KubevirtFlowRuleService flowRuleService;
91 @Reference(cardinality = ReferenceCardinality.MANDATORY)
92 protected KubevirtNodeService kubevirtNodeService;
93 @Reference(cardinality = ReferenceCardinality.MANDATORY)
94 protected KubevirtNetworkService kubevirtNetworkService;
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected KubevirtPortService kubevirtPortService;
97 @Reference(cardinality = ReferenceCardinality.MANDATORY)
98 protected KubevirtPodService kubevirtPodService;
99
100 private final ExecutorService eventExecutor = newSingleThreadExecutor(
101 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
102
Jian Li7d3a0c82021-03-24 15:24:18 +0900103 private final InternalKubevirtPortListener kubevirtPortListener =
104 new InternalKubevirtPortListener();
Jian Li543fe852021-02-04 17:25:01 +0900105 private final InternalKubevirtNodeListener kubevirtNodeListener =
106 new InternalKubevirtNodeListener();
107
108 private ApplicationId appId;
109 private NodeId localNodeId;
110
111 @Activate
112 protected void activate() {
113 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
114 localNodeId = clusterService.getLocalNode().id();
115 leadershipService.runForLeadership(appId.name());
Jian Li7d3a0c82021-03-24 15:24:18 +0900116 kubevirtPortService.addListener(kubevirtPortListener);
Jian Li543fe852021-02-04 17:25:01 +0900117 kubevirtNodeService.addListener(kubevirtNodeListener);
118
119 log.info("Started");
120 }
121
122 @Deactivate
123 protected void deactivate() {
Jian Li7d3a0c82021-03-24 15:24:18 +0900124 kubevirtPortService.removeListener(kubevirtPortListener);
Jian Li543fe852021-02-04 17:25:01 +0900125 kubevirtNodeService.removeListener(kubevirtNodeListener);
126 leadershipService.withdraw(appId.name());
127 eventExecutor.shutdown();
128
129 log.info("Stopped");
130 }
131
Jian Li7d3a0c82021-03-24 15:24:18 +0900132 private void setIngressRules(KubevirtPort port, boolean install) {
133 if (port.ipAddress() == null) {
Jian Li543fe852021-02-04 17:25:01 +0900134 return;
135 }
136
Jian Li7d3a0c82021-03-24 15:24:18 +0900137 KubevirtNetwork network = kubevirtNetworkService.network(port.networkId());
138
139 if (network == null) {
140 return;
141 }
142
143 if (network.type() == FLAT || network.type() == VLAN) {
144 return;
145 }
146
147 if (network.segmentId() == null) {
148 return;
149 }
150
151 KubevirtNode localNode = kubevirtNodeService.node(port.deviceId());
152 if (localNode == null || localNode.type() == MASTER) {
153 return;
154 }
155
156 PortNumber patchPortNumber = tunnelToTenantPort(localNode, network);
157 if (patchPortNumber == null) {
158 return;
159 }
160
161 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
162 .matchTunnelId(Long.parseLong(network.segmentId()));
163
164 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
165 .setOutput(patchPortNumber);
166
167 flowRuleService.setRule(
168 appId,
169 localNode.tunBridge(),
170 sBuilder.build(),
171 tBuilder.build(),
172 PRIORITY_TUNNEL_RULE,
173 TUNNEL_DEFAULT_TABLE,
174 install);
175
176 log.debug("Install ingress rules for instance {}, segment ID {}",
177 port.ipAddress(), network.segmentId());
178 }
179
180 private void setEgressRules(KubevirtPort port, boolean install) {
181 if (port.ipAddress() == null) {
182 return;
183 }
184
185 KubevirtNetwork network = kubevirtNetworkService.network(port.networkId());
186
187 if (network == null) {
188 return;
189 }
190
191 if (network.type() == FLAT || network.type() == VLAN) {
192 return;
193 }
194
195 if (network.segmentId() == null) {
196 return;
197 }
198
199 KubevirtNode localNode = kubevirtNodeService.node(port.deviceId());
200
201 if (localNode == null || localNode.type() == MASTER) {
202 return;
203 }
204
205 for (KubevirtNode remoteNode : kubevirtNodeService.completeNodes(WORKER)) {
206 if (remoteNode.hostname().equals(localNode.hostname())) {
207 continue;
Jian Li3831f0c2021-03-12 18:03:58 +0900208 }
209
Jian Li7d3a0c82021-03-24 15:24:18 +0900210 PortNumber patchPortNumber = tunnelToTenantPort(remoteNode, network);
Jian Li3831f0c2021-03-12 18:03:58 +0900211 if (patchPortNumber == null) {
212 return;
213 }
214
Jian Li7d3a0c82021-03-24 15:24:18 +0900215 PortNumber tunnelPortNumber = tunnelPort(remoteNode, network);
216 if (tunnelPortNumber == null) {
217 return;
218 }
219
220 TrafficSelector.Builder sIpBuilder = DefaultTrafficSelector.builder()
221 .matchInPort(patchPortNumber)
222 .matchEthType(Ethernet.TYPE_IPV4)
223 .matchIPDst(IpPrefix.valueOf(port.ipAddress(), 32));
224
225 TrafficSelector.Builder sArpBuilder = DefaultTrafficSelector.builder()
226 .matchInPort(patchPortNumber)
227 .matchEthType(Ethernet.TYPE_ARP)
228 .matchArpTpa(Ip4Address.valueOf(port.ipAddress().toString()));
Jian Li3831f0c2021-03-12 18:03:58 +0900229
230 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li7d3a0c82021-03-24 15:24:18 +0900231 .setTunnelId(Long.parseLong(network.segmentId()))
232 .extension(buildExtension(
233 deviceService,
234 remoteNode.tunBridge(),
235 localNode.dataIp().getIp4Address()),
236 remoteNode.tunBridge())
237 .setOutput(tunnelPortNumber);
Jian Li3831f0c2021-03-12 18:03:58 +0900238
239 flowRuleService.setRule(
240 appId,
Jian Li7d3a0c82021-03-24 15:24:18 +0900241 remoteNode.tunBridge(),
242 sIpBuilder.build(),
Jian Li3831f0c2021-03-12 18:03:58 +0900243 tBuilder.build(),
244 PRIORITY_TUNNEL_RULE,
245 TUNNEL_DEFAULT_TABLE,
246 install);
247
Jian Li7d3a0c82021-03-24 15:24:18 +0900248 flowRuleService.setRule(
249 appId,
250 remoteNode.tunBridge(),
251 sArpBuilder.build(),
252 tBuilder.build(),
253 PRIORITY_TUNNEL_RULE,
254 TUNNEL_DEFAULT_TABLE,
255 install);
Jian Li543fe852021-02-04 17:25:01 +0900256 }
257
Jian Li7d3a0c82021-03-24 15:24:18 +0900258 log.debug("Install egress rules for instance {}, segment ID {}",
259 port.ipAddress(), network.segmentId());
Jian Li543fe852021-02-04 17:25:01 +0900260 }
261
262 private class InternalKubevirtNodeListener implements KubevirtNodeListener {
263
264 private boolean isRelevantHelper() {
265 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
266 }
267
268 @Override
269 public void event(KubevirtNodeEvent event) {
270
271 switch (event.type()) {
272 case KUBEVIRT_NODE_COMPLETE:
273 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
274 break;
275 case KUBEVIRT_NODE_INCOMPLETE:
276 default:
277 // do nothing
278 break;
279 }
280 }
281
282 private void processNodeCompletion(KubevirtNode node) {
283 if (!isRelevantHelper()) {
284 return;
285 }
286
Jian Li7d3a0c82021-03-24 15:24:18 +0900287 kubevirtPortService.ports().stream()
288 .filter(port -> node.equals(kubevirtNodeService.node(port.deviceId())))
289 .forEach(port -> {
290 setIngressRules(port, true);
291 setEgressRules(port, true);
Jian Li543fe852021-02-04 17:25:01 +0900292 });
293 }
294 }
295
Jian Li7d3a0c82021-03-24 15:24:18 +0900296 private class InternalKubevirtPortListener implements KubevirtPortListener {
297
298 @Override
299 public boolean isRelevant(KubevirtPortEvent event) {
300 return event.subject().deviceId() != null;
301 }
Jian Li543fe852021-02-04 17:25:01 +0900302
303 private boolean isRelevantHelper() {
304 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
305 }
306
307 @Override
Jian Li7d3a0c82021-03-24 15:24:18 +0900308 public void event(KubevirtPortEvent event) {
Jian Li543fe852021-02-04 17:25:01 +0900309
310 switch (event.type()) {
Jian Li7d3a0c82021-03-24 15:24:18 +0900311 case KUBEVIRT_PORT_UPDATED:
312 eventExecutor.execute(() -> processPortUpdate(event.subject()));
Jian Li543fe852021-02-04 17:25:01 +0900313 break;
Jian Li7d3a0c82021-03-24 15:24:18 +0900314 case KUBEVIRT_PORT_REMOVED:
315 eventExecutor.execute(() -> processPortRemoval(event.subject()));
Jian Li543fe852021-02-04 17:25:01 +0900316 break;
317 default:
318 // do nothing
319 break;
320 }
321 }
322
Jian Li7d3a0c82021-03-24 15:24:18 +0900323 private void processPortUpdate(KubevirtPort port) {
Jian Li543fe852021-02-04 17:25:01 +0900324 if (!isRelevantHelper()) {
325 return;
326 }
327
Jian Li7d3a0c82021-03-24 15:24:18 +0900328 setIngressRules(port, true);
329 setEgressRules(port, true);
Jian Li543fe852021-02-04 17:25:01 +0900330 }
331
Jian Li7d3a0c82021-03-24 15:24:18 +0900332 private void processPortRemoval(KubevirtPort port) {
Jian Li543fe852021-02-04 17:25:01 +0900333 if (!isRelevantHelper()) {
334 return;
335 }
336
Jian Li7d3a0c82021-03-24 15:24:18 +0900337 setIngressRules(port, false);
338 setEgressRules(port, false);
Jian Li543fe852021-02-04 17:25:01 +0900339 }
340 }
341}