blob: 349043f331689e766262044a7b18b72455929052 [file] [log] [blame]
Jian Lieb488ea2019-04-16 01:50:02 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import org.onlab.packet.ARP;
19import org.onlab.packet.Ethernet;
20import org.onlab.packet.Ip4Address;
21import org.onlab.packet.IpPrefix;
22import org.onlab.packet.TpPort;
23import org.onosproject.cluster.ClusterService;
24import org.onosproject.cluster.LeadershipService;
25import org.onosproject.cluster.NodeId;
26import org.onosproject.core.ApplicationId;
27import org.onosproject.core.CoreService;
28import org.onosproject.k8snetworking.api.K8sFlowRuleService;
29import org.onosproject.k8snetworking.api.K8sNetwork;
30import org.onosproject.k8snetworking.api.K8sNetworkEvent;
31import org.onosproject.k8snetworking.api.K8sNetworkListener;
32import org.onosproject.k8snetworking.api.K8sNetworkService;
33import org.onosproject.k8snetworking.api.K8sPort;
34import org.onosproject.k8snetworking.util.RulePopulatorUtil;
35import org.onosproject.k8snode.api.K8sNode;
36import org.onosproject.k8snode.api.K8sNodeEvent;
37import org.onosproject.k8snode.api.K8sNodeListener;
38import org.onosproject.k8snode.api.K8sNodeService;
39import org.onosproject.mastership.MastershipService;
40import org.onosproject.net.Device;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.PortNumber;
43import org.onosproject.net.device.DeviceService;
44import org.onosproject.net.driver.DriverService;
45import org.onosproject.net.flow.DefaultTrafficSelector;
46import org.onosproject.net.flow.DefaultTrafficTreatment;
47import org.onosproject.net.flow.TrafficSelector;
48import org.onosproject.net.flow.TrafficTreatment;
49import org.onosproject.net.flow.instructions.ExtensionTreatment;
50import org.osgi.service.component.annotations.Activate;
51import org.osgi.service.component.annotations.Component;
52import org.osgi.service.component.annotations.Deactivate;
53import org.osgi.service.component.annotations.Reference;
54import org.osgi.service.component.annotations.ReferenceCardinality;
55import org.slf4j.Logger;
56
57import java.util.Objects;
58import java.util.concurrent.ExecutorService;
59
60import static java.util.concurrent.Executors.newSingleThreadExecutor;
61import static org.onlab.util.Tools.groupedThreads;
62import static org.onosproject.k8snetworking.api.Constants.DEFAULT_GATEWAY_MAC;
63import static org.onosproject.k8snetworking.api.Constants.GW_COMMON_TABLE;
64import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
65import static org.onosproject.k8snetworking.api.Constants.PRIORITY_EXTERNAL_ROUTING_RULE;
66import static org.onosproject.k8snetworking.api.Constants.PRIORITY_STATEFUL_SNAT_RULE;
67import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
68import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_SRC_FLAG;
69import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildMoveArpShaToThaExtension;
70import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildMoveArpSpaToTpaExtension;
71import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildMoveEthSrcToDstExtension;
72import static org.slf4j.LoggerFactory.getLogger;
73
74/**
75 * Provides POD's internal to external connectivity using source NAT (SNAT).
76 */
77@Component(immediate = true)
78public class K8sRoutingSnatHandler {
79
80 private final Logger log = getLogger(getClass());
81
82 private static final int POD_PREFIX = 32;
83
84 private static final int TP_PORT_MINIMUM_NUM = 1025;
85 private static final int TP_PORT_MAXIMUM_NUM = 65535;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY)
88 protected CoreService coreService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 protected DeviceService deviceService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY)
94 protected DriverService driverService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected LeadershipService leadershipService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected MastershipService mastershipService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 protected ClusterService clusterService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
106 protected K8sNetworkService k8sNetworkService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
109 protected K8sNodeService k8sNodeService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected K8sFlowRuleService k8sFlowRuleService;
113
114 private final InternalK8sNetworkListener k8sNetworkListener =
115 new InternalK8sNetworkListener();
116 private final InternalK8sNodeListener k8sNodeListener =
117 new InternalK8sNodeListener();
118 private final ExecutorService eventExecutor = newSingleThreadExecutor(
119 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
120
121 private ApplicationId appId;
122 private NodeId localNodeId;
123
124 @Activate
125 protected void activate() {
126 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
127
128 localNodeId = clusterService.getLocalNode().id();
129 leadershipService.runForLeadership(appId.name());
130 k8sNetworkService.addListener(k8sNetworkListener);
131 k8sNodeService.addListener(k8sNodeListener);
132
133 log.info("Started");
134 }
135
136 @Deactivate
137 protected void deactivate() {
138 k8sNodeService.removeListener(k8sNodeListener);
139 k8sNetworkService.removeListener(k8sNetworkListener);
140 leadershipService.withdraw(appId.name());
141 eventExecutor.shutdown();
142
143 log.info("Stopped");
144 }
145
146 private void setContainerToExtRule(K8sNode k8sNode, boolean install) {
147
148 K8sNetwork net = k8sNetworkService.network(k8sNode.hostname());
149
150 if (net == null) {
151 return;
152 }
153
154 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
155 .matchEthType(Ethernet.TYPE_IPV4)
156 .matchTunnelId(Long.valueOf(net.segmentId()))
157 .matchEthDst(DEFAULT_GATEWAY_MAC);
158
159 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
160 .setOutput(k8sNode.intgToExtPatchPortNum());
161
162 k8sFlowRuleService.setRule(
163 appId,
164 k8sNode.intgBridge(),
165 sBuilder.build(),
166 tBuilder.build(),
167 PRIORITY_EXTERNAL_ROUTING_RULE,
168 ROUTING_TABLE,
169 install);
170 }
171
172 private void setExtToContainerRule(K8sNode k8sNode,
173 K8sPort k8sPort, boolean install) {
174
175 K8sNetwork net = k8sNetworkService.network(k8sPort.networkId());
176
177 if (net == null) {
178 return;
179 }
180
181 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
182 .matchEthType(Ethernet.TYPE_IPV4)
183 .matchIPDst(IpPrefix.valueOf(k8sPort.ipAddress(), POD_PREFIX));
184
185 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
186 .setOutput(k8sNode.extToIntgPatchPortNum());
187
188 k8sFlowRuleService.setRule(
189 appId,
190 k8sNode.extBridge(),
191 sBuilder.build(),
192 tBuilder.build(),
193 PRIORITY_STATEFUL_SNAT_RULE,
194 GW_COMMON_TABLE,
195 install);
196 }
197
198 private void setSnatDownstreamRule(K8sNode k8sNode,
199 boolean install) {
200 DeviceId deviceId = k8sNode.extBridge();
201
202 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
203 sBuilder.matchEthType(Ethernet.TYPE_IPV4)
204 .matchIPDst(IpPrefix.valueOf(k8sNode.extBridgeIp(), POD_PREFIX));
205
206 ExtensionTreatment natTreatment = RulePopulatorUtil
207 .niciraConnTrackTreatmentBuilder(driverService, deviceId)
208 .commit(false)
209 .natAction(true)
210 .table((short) 0)
211 .build();
212
213 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
214 .setEthSrc(DEFAULT_GATEWAY_MAC)
215 .extension(natTreatment, deviceId)
216 .build();
217
218 k8sFlowRuleService.setRule(
219 appId,
220 deviceId,
221 sBuilder.build(),
222 treatment,
223 PRIORITY_STATEFUL_SNAT_RULE,
224 GW_COMMON_TABLE,
225 install);
226 }
227
228 private void setSnatUpstreamRule(K8sNode k8sNode,
229 boolean install) {
230
231 K8sNetwork net = k8sNetworkService.network(k8sNode.hostname());
232
233 if (net == null) {
234 return;
235 }
236
237 TrafficSelector selector = DefaultTrafficSelector.builder()
238 .matchEthType(Ethernet.TYPE_IPV4)
239 .matchEthDst(DEFAULT_GATEWAY_MAC)
240 .build();
241
242 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
243
244 if (install) {
245 ExtensionTreatment natTreatment = RulePopulatorUtil
246 .niciraConnTrackTreatmentBuilder(driverService, k8sNode.extBridge())
247 .commit(true)
248 .natFlag(CT_NAT_SRC_FLAG)
249 .natAction(true)
250 .natIp(k8sNode.extBridgeIp())
251 .natPortMin(TpPort.tpPort(TP_PORT_MINIMUM_NUM))
252 .natPortMax(TpPort.tpPort(TP_PORT_MAXIMUM_NUM))
253 .build();
254
255 tBuilder.extension(natTreatment, k8sNode.extBridge())
256 .setEthSrc(k8sNode.extBridgeMac())
257 .setEthDst(k8sNode.extGatewayMac())
258 .setOutput(k8sNode.extBridgePortNum());
259 }
260
261 k8sFlowRuleService.setRule(
262 appId,
263 k8sNode.extBridge(),
264 selector,
265 tBuilder.build(),
266 PRIORITY_STATEFUL_SNAT_RULE,
267 GW_COMMON_TABLE,
268 install);
269 }
270
271 private void setExtIntfArpRule(K8sNode k8sNode, boolean install) {
272
273 Device device = deviceService.getDevice(k8sNode.extBridge());
274
275 TrafficSelector selector = DefaultTrafficSelector.builder()
276 .matchEthType(Ethernet.TYPE_ARP)
277 .matchArpOp(ARP.OP_REQUEST)
278 .matchArpTpa(Ip4Address.valueOf(k8sNode.extBridgeIp().toString()))
279 .build();
280
281 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
282 .setArpOp(ARP.OP_REPLY)
283 .extension(buildMoveEthSrcToDstExtension(device), device.id())
284 .extension(buildMoveArpShaToThaExtension(device), device.id())
285 .extension(buildMoveArpSpaToTpaExtension(device), device.id())
286 .setArpSpa(Ip4Address.valueOf(k8sNode.extBridgeIp().toString()))
287 .setArpSha(k8sNode.extBridgeMac())
288 .setOutput(PortNumber.IN_PORT)
289 .build();
290
291 k8sFlowRuleService.setRule(
292 appId,
293 k8sNode.extBridge(),
294 selector,
295 treatment,
296 PRIORITY_STATEFUL_SNAT_RULE,
297 GW_COMMON_TABLE,
298 install);
299 }
300
301 private class InternalK8sNodeListener implements K8sNodeListener {
302
303 private boolean isRelevantHelper() {
304 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
305 }
306
307 @Override
308 public void event(K8sNodeEvent event) {
309 switch (event.type()) {
310 case K8S_NODE_COMPLETE:
311 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
312 break;
313 case K8S_NODE_INCOMPLETE:
314 default:
315 break;
316 }
317 }
318
319 private void processNodeCompletion(K8sNode k8sNode) {
320 if (!isRelevantHelper()) {
321 return;
322 }
323
324 setExtIntfArpRule(k8sNode, true);
325 setSnatUpstreamRule(k8sNode, true);
326 setSnatDownstreamRule(k8sNode, true);
327 setContainerToExtRule(k8sNode, true);
328 }
329 }
330
331 private class InternalK8sNetworkListener implements K8sNetworkListener {
332
333 private boolean isRelevantHelper() {
334 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
335 }
336
337 @Override
338 public void event(K8sNetworkEvent event) {
339 switch (event.type()) {
340 case K8S_PORT_ACTIVATED:
341 eventExecutor.execute(() -> processPortActivation(event.port()));
342 break;
343 case K8S_PORT_REMOVED:
344 eventExecutor.execute(() -> processPortRemoval(event.port()));
345 break;
346 default:
347 break;
348 }
349 }
350
351 private void processPortActivation(K8sPort port) {
352 if (!isRelevantHelper()) {
353 return;
354 }
355
356 k8sNodeService.completeNodes().forEach(n ->
357 setExtToContainerRule(n, port, true));
358 }
359
360 private void processPortRemoval(K8sPort port) {
361 if (!isRelevantHelper()) {
362 return;
363 }
364
365 k8sNodeService.completeNodes().forEach(n ->
366 setExtToContainerRule(n, port, false));
367 }
368 }
369}