blob: 6a5f94244975b5ac4bbc546b81df10989151f564 [file] [log] [blame]
Jian Li2cc2b632019-02-18 00:56:40 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.Lists;
Jian Li004526d2019-02-25 16:26:27 +090019import com.google.common.collect.Maps;
Jian Li2cc2b632019-02-18 00:56:40 +090020import io.fabric8.kubernetes.api.model.EndpointAddress;
21import io.fabric8.kubernetes.api.model.EndpointPort;
22import io.fabric8.kubernetes.api.model.EndpointSubset;
23import io.fabric8.kubernetes.api.model.Endpoints;
Jian Li4a7ce672019-04-09 15:20:25 +090024import io.fabric8.kubernetes.api.model.Pod;
Jian Li2cc2b632019-02-18 00:56:40 +090025import io.fabric8.kubernetes.api.model.Service;
Jian Li5e8a22a2019-02-27 11:48:42 +090026import io.fabric8.kubernetes.api.model.ServicePort;
Jian Li2cc2b632019-02-18 00:56:40 +090027import org.onlab.packet.Ethernet;
28import org.onlab.packet.IPv4;
29import org.onlab.packet.IpAddress;
30import org.onlab.packet.IpPrefix;
31import org.onlab.packet.TpPort;
Jian Li004526d2019-02-25 16:26:27 +090032import org.onlab.util.Tools;
33import org.onosproject.cfg.ComponentConfigService;
Jian Li2cc2b632019-02-18 00:56:40 +090034import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.LeadershipService;
36import org.onosproject.cluster.NodeId;
37import org.onosproject.core.ApplicationId;
38import org.onosproject.core.CoreService;
39import org.onosproject.core.GroupId;
40import org.onosproject.k8snetworking.api.K8sEndpointsService;
41import org.onosproject.k8snetworking.api.K8sFlowRuleService;
42import org.onosproject.k8snetworking.api.K8sGroupRuleService;
43import org.onosproject.k8snetworking.api.K8sNetworkService;
Jian Li4a7ce672019-04-09 15:20:25 +090044import org.onosproject.k8snetworking.api.K8sPodEvent;
45import org.onosproject.k8snetworking.api.K8sPodListener;
46import org.onosproject.k8snetworking.api.K8sPodService;
Jian Li2cc2b632019-02-18 00:56:40 +090047import org.onosproject.k8snetworking.api.K8sServiceEvent;
48import org.onosproject.k8snetworking.api.K8sServiceListener;
49import org.onosproject.k8snetworking.api.K8sServiceService;
50import org.onosproject.k8snetworking.util.RulePopulatorUtil;
51import org.onosproject.k8snetworking.util.RulePopulatorUtil.NiciraConnTrackTreatmentBuilder;
52import org.onosproject.k8snode.api.K8sNode;
53import org.onosproject.k8snode.api.K8sNodeEvent;
54import org.onosproject.k8snode.api.K8sNodeListener;
55import org.onosproject.k8snode.api.K8sNodeService;
56import org.onosproject.net.DeviceId;
57import org.onosproject.net.device.DeviceService;
58import org.onosproject.net.driver.DriverService;
59import org.onosproject.net.flow.DefaultTrafficSelector;
60import org.onosproject.net.flow.DefaultTrafficTreatment;
61import org.onosproject.net.flow.TrafficSelector;
62import org.onosproject.net.flow.TrafficTreatment;
63import org.onosproject.net.flow.criteria.ExtensionSelector;
64import org.onosproject.net.flow.instructions.ExtensionTreatment;
65import org.onosproject.net.group.GroupBucket;
66import org.onosproject.store.service.AtomicCounter;
67import org.onosproject.store.service.StorageService;
Jian Li004526d2019-02-25 16:26:27 +090068import org.osgi.service.component.ComponentContext;
Jian Li2cc2b632019-02-18 00:56:40 +090069import org.osgi.service.component.annotations.Activate;
70import org.osgi.service.component.annotations.Component;
71import org.osgi.service.component.annotations.Deactivate;
Jian Li004526d2019-02-25 16:26:27 +090072import org.osgi.service.component.annotations.Modified;
Jian Li2cc2b632019-02-18 00:56:40 +090073import org.osgi.service.component.annotations.Reference;
74import org.osgi.service.component.annotations.ReferenceCardinality;
75import org.slf4j.Logger;
76
Jian Li004526d2019-02-25 16:26:27 +090077import java.util.Dictionary;
Jian Li2cc2b632019-02-18 00:56:40 +090078import java.util.List;
79import java.util.Map;
80import java.util.Objects;
Jian Li004526d2019-02-25 16:26:27 +090081import java.util.Set;
Jian Li2cc2b632019-02-18 00:56:40 +090082import java.util.concurrent.ExecutorService;
83import java.util.stream.Collectors;
84
85import static java.util.concurrent.Executors.newSingleThreadExecutor;
86import static org.onlab.util.Tools.groupedThreads;
Jian Li5e8a22a2019-02-27 11:48:42 +090087import static org.onosproject.k8snetworking.api.Constants.DST;
Jian Li2cc2b632019-02-18 00:56:40 +090088import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE;
89import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Li004526d2019-02-25 16:26:27 +090090import static org.onosproject.k8snetworking.api.Constants.NAT_STATEFUL;
91import static org.onosproject.k8snetworking.api.Constants.NAT_STATELESS;
Jian Li2cc2b632019-02-18 00:56:40 +090092import static org.onosproject.k8snetworking.api.Constants.NAT_TABLE;
Jian Li004526d2019-02-25 16:26:27 +090093import static org.onosproject.k8snetworking.api.Constants.POD_TABLE;
Jian Li2cc2b632019-02-18 00:56:40 +090094import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE;
Jian Li004526d2019-02-25 16:26:27 +090095import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAT_RULE;
Jian Li2cc2b632019-02-18 00:56:40 +090096import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
Jian Li004526d2019-02-25 16:26:27 +090097import static org.onosproject.k8snetworking.api.Constants.SERVICE_IP_CIDR;
98import static org.onosproject.k8snetworking.api.Constants.SERVICE_TABLE;
99import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_CIDR;
100import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
Jian Li5e8a22a2019-02-27 11:48:42 +0900101import static org.onosproject.k8snetworking.api.Constants.SRC;
Jian Li004526d2019-02-25 16:26:27 +0900102import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE;
103import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE_DEFAULT;
Jian Li2cc2b632019-02-18 00:56:40 +0900104import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap;
105import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_DST_FLAG;
106import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildGroupBucket;
Jian Li004526d2019-02-25 16:26:27 +0900107import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildLoadExtension;
Jian Li2cc2b632019-02-18 00:56:40 +0900108import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildResubmitExtension;
109import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtMaskFlag;
110import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtStateFlag;
111import static org.onosproject.k8snetworking.util.RulePopulatorUtil.niciraConnTrackTreatmentBuilder;
112import static org.onosproject.net.group.GroupDescription.Type.SELECT;
113import static org.slf4j.LoggerFactory.getLogger;
114
115/**
116 * Handles the service IP to pod IP related translation traffic.
117 */
Jian Li004526d2019-02-25 16:26:27 +0900118@Component(
119 immediate = true,
120 property = {
121 SERVICE_IP_NAT_MODE + "=" + SERVICE_IP_NAT_MODE_DEFAULT
122 }
123)
Jian Li2cc2b632019-02-18 00:56:40 +0900124public class K8sServiceHandler {
125
126 private final Logger log = getLogger(getClass());
127
Jian Li2cc2b632019-02-18 00:56:40 +0900128 private static final int HOST_CIDR_NUM = 32;
129
130 private static final String NONE = "None";
131 private static final String CLUSTER_IP = "ClusterIP";
132 private static final String TCP = "TCP";
Jian Li5e8a22a2019-02-27 11:48:42 +0900133 private static final String UDP = "UDP";
Jian Li2cc2b632019-02-18 00:56:40 +0900134
135 private static final String GROUP_ID_COUNTER_NAME = "group-id-counter";
136
Jian Li4a7ce672019-04-09 15:20:25 +0900137 private static final String IP_ADDRESS = "ipAddress";
138
Jian Li2cc2b632019-02-18 00:56:40 +0900139 @Reference(cardinality = ReferenceCardinality.MANDATORY)
140 protected CoreService coreService;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY)
143 protected LeadershipService leadershipService;
144
145 @Reference(cardinality = ReferenceCardinality.MANDATORY)
146 protected ClusterService clusterService;
147
148 @Reference(cardinality = ReferenceCardinality.MANDATORY)
149 protected DriverService driverService;
150
151 @Reference(cardinality = ReferenceCardinality.MANDATORY)
152 protected DeviceService deviceService;
153
154 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li004526d2019-02-25 16:26:27 +0900155 protected ComponentConfigService configService;
156
157 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li2cc2b632019-02-18 00:56:40 +0900158 protected StorageService storageService;
159
160 @Reference(cardinality = ReferenceCardinality.MANDATORY)
161 protected K8sNetworkService k8sNetworkService;
162
163 @Reference(cardinality = ReferenceCardinality.MANDATORY)
164 protected K8sFlowRuleService k8sFlowRuleService;
165
166 @Reference(cardinality = ReferenceCardinality.MANDATORY)
167 protected K8sGroupRuleService k8sGroupRuleService;
168
169 @Reference(cardinality = ReferenceCardinality.MANDATORY)
170 protected K8sNodeService k8sNodeService;
171
172 @Reference(cardinality = ReferenceCardinality.MANDATORY)
173 protected K8sEndpointsService k8sEndpointsService;
174
175 @Reference(cardinality = ReferenceCardinality.MANDATORY)
176 protected K8sServiceService k8sServiceService;
177
Jian Li4a7ce672019-04-09 15:20:25 +0900178 @Reference(cardinality = ReferenceCardinality.MANDATORY)
179 protected K8sPodService k8sPodService;
180
Jian Li004526d2019-02-25 16:26:27 +0900181 /** Service IP address translation mode. */
182 private String serviceIpNatMode = SERVICE_IP_NAT_MODE_DEFAULT;
183
Jian Li2cc2b632019-02-18 00:56:40 +0900184 private final ExecutorService eventExecutor = newSingleThreadExecutor(
185 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
186 private final InternalNodeEventListener internalNodeEventListener =
187 new InternalNodeEventListener();
188 private final InternalK8sServiceListener internalK8sServiceListener =
189 new InternalK8sServiceListener();
Jian Li4a7ce672019-04-09 15:20:25 +0900190 private final InternalK8sPodListener internalK8sPodListener =
191 new InternalK8sPodListener();
Jian Li2cc2b632019-02-18 00:56:40 +0900192
193 private AtomicCounter groupIdCounter;
194
195 private ApplicationId appId;
196 private NodeId localNodeId;
197
198 @Activate
199 protected void activate() {
200 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
Jian Li004526d2019-02-25 16:26:27 +0900201 configService.registerProperties(getClass());
Jian Li2cc2b632019-02-18 00:56:40 +0900202 localNodeId = clusterService.getLocalNode().id();
203 leadershipService.runForLeadership(appId.name());
204 k8sNodeService.addListener(internalNodeEventListener);
205 k8sServiceService.addListener(internalK8sServiceListener);
Jian Li4a7ce672019-04-09 15:20:25 +0900206 k8sPodService.addListener(internalK8sPodListener);
Jian Li2cc2b632019-02-18 00:56:40 +0900207
208 groupIdCounter = storageService.getAtomicCounter(GROUP_ID_COUNTER_NAME);
209
210 log.info("Started");
211 }
212
213 @Deactivate
214 protected void deactivate() {
215 leadershipService.withdraw(appId.name());
Jian Li4a7ce672019-04-09 15:20:25 +0900216 k8sPodService.removeListener(internalK8sPodListener);
Jian Li2cc2b632019-02-18 00:56:40 +0900217 k8sNodeService.removeListener(internalNodeEventListener);
218 k8sServiceService.removeListener(internalK8sServiceListener);
Jian Li004526d2019-02-25 16:26:27 +0900219 configService.unregisterProperties(getClass(), false);
Jian Li2cc2b632019-02-18 00:56:40 +0900220 eventExecutor.shutdown();
221
222 log.info("Stopped");
223 }
224
Jian Li004526d2019-02-25 16:26:27 +0900225 @Modified
226 void modified(ComponentContext context) {
227 readComponentConfiguration(context);
228
229 log.info("Modified");
230 }
231
232 private void setStatefulServiceNatRules(DeviceId deviceId, boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900233 // -trk CT rules
234 long ctUntrack = computeCtStateFlag(false, false, false);
235 long ctMaskUntrack = computeCtMaskFlag(true, false, false);
236
237 k8sNetworkService.networks().forEach(n -> {
238 // TODO: need to provide a way to add multiple service IP CIDR ranges
239 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), SERVICE_IP_CIDR,
240 JUMP_TABLE, NAT_TABLE, PRIORITY_CT_RULE, install);
241 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), n.cidr(),
242 JUMP_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
243 });
244
245 // +trk-new CT rules
246 long ctTrackUnnew = computeCtStateFlag(true, false, false);
247 long ctMaskTrackUnnew = computeCtMaskFlag(true, true, false);
248
249 setTrackEstablish(deviceId, ctTrackUnnew, ctMaskTrackUnnew,
250 NAT_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
251
252 // +trk+new CT rules
253 long ctTrackNew = computeCtStateFlag(true, true, false);
254 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
255
256 k8sServiceService.services().stream()
257 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
Jian Li004526d2019-02-25 16:26:27 +0900258 .forEach(s -> setStatefulGroupFlowRules(deviceId, ctTrackNew,
259 ctMaskTrackNew, s, install));
Jian Li2cc2b632019-02-18 00:56:40 +0900260 }
261
Jian Li004526d2019-02-25 16:26:27 +0900262 private void setStatelessServiceNatRules(DeviceId deviceId, boolean install) {
263
264 k8sNetworkService.networks().forEach(n -> {
265 setSrcDstCidrRules(deviceId, n.cidr(), SERVICE_IP_CIDR, JUMP_TABLE,
266 SERVICE_TABLE, PRIORITY_CT_RULE, install);
267 setSrcDstCidrRules(deviceId, n.cidr(), SHIFTED_IP_CIDR, JUMP_TABLE,
268 POD_TABLE, PRIORITY_CT_RULE, install);
269 setSrcDstCidrRules(deviceId, n.cidr(), n.cidr(), JUMP_TABLE,
270 ROUTING_TABLE, PRIORITY_CT_RULE, install);
271 });
272
273 // setup load balancing rules using group table
274 k8sServiceService.services().stream()
275 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
276 .forEach(s -> setStatelessGroupFlowRules(deviceId, s, install));
277 }
278
279 private void setSrcDstCidrRules(DeviceId deviceId, String srcCidr,
280 String dstCidr, int installTable,
281 int transitTable, int priority, boolean install) {
282 TrafficSelector selector = DefaultTrafficSelector.builder()
283 .matchEthType(Ethernet.TYPE_IPV4)
284 .matchIPSrc(IpPrefix.valueOf(srcCidr))
285 .matchIPDst(IpPrefix.valueOf(dstCidr))
286 .build();
287
288 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
289 .transition(transitTable)
290 .build();
291
292 k8sFlowRuleService.setRule(
293 appId,
294 deviceId,
295 selector,
296 treatment,
297 priority,
298 installTable,
299 install);
300 }
301
Jian Li5e8a22a2019-02-27 11:48:42 +0900302 private String servicePortStr(String ip, int port, String protocol) {
303 return ip + "_" + port + "_" + protocol;
304 }
Jian Li004526d2019-02-25 16:26:27 +0900305
Jian Li5e8a22a2019-02-27 11:48:42 +0900306 /**
307 * Obtains the service port to endpoint address paired map.
308 *
309 * @param service kubernetes service
310 * @return a map where key is kubernetes service port, and value is the
311 * endpoint addresses that are associated with the service port
312 */
313 private Map<ServicePort, Set<String>> getSportEpAddressMap(Service service) {
314
315 Map<ServicePort, Set<String>> map = Maps.newConcurrentMap();
Jian Li004526d2019-02-25 16:26:27 +0900316
317 String serviceName = service.getMetadata().getName();
Jian Li004526d2019-02-25 16:26:27 +0900318 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
319 .stream()
320 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
321 .collect(Collectors.toList());
322
Jian Li5e8a22a2019-02-27 11:48:42 +0900323 service.getSpec().getPorts().stream()
324 .filter(Objects::nonNull)
325 .filter(sp -> sp.getTargetPort() != null)
326 .filter(sp -> sp.getTargetPort().getIntVal() != null)
327 .forEach(sp -> {
328 int targetPort = sp.getTargetPort().getIntVal();
329 String targetProtocol = sp.getProtocol();
Jian Li004526d2019-02-25 16:26:27 +0900330
Jian Li5e8a22a2019-02-27 11:48:42 +0900331 for (Endpoints endpoints : endpointses) {
332 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
333 for (EndpointPort endpointPort : endpointSubset.getPorts()) {
334 if (targetProtocol.equals(endpointPort.getProtocol()) &&
335 targetPort == endpointPort.getPort()) {
336 Set<String> addresses = endpointSubset.getAddresses()
337 .stream().map(EndpointAddress::getIp)
338 .collect(Collectors.toSet());
339 map.put(sp, addresses);
Jian Li004526d2019-02-25 16:26:27 +0900340 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900341 }
Jian Li004526d2019-02-25 16:26:27 +0900342 }
343 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900344 });
Jian Li004526d2019-02-25 16:26:27 +0900345
Jian Li5e8a22a2019-02-27 11:48:42 +0900346 return map;
347 }
348
Jian Li4a7ce672019-04-09 15:20:25 +0900349 private void setGroupBuckets(DeviceId deviceId, Service service, Pod pod, boolean install) {
350
351 if (pod.getMetadata().getAnnotations() == null) {
352 return;
353 }
354
355 String podIpStr = pod.getMetadata().getAnnotations().get(IP_ADDRESS);
356
357 Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
358 Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
359
360 spEpasMap.forEach((sp, epas) -> {
361 List<GroupBucket> bkts = Lists.newArrayList();
362
363 if (install) {
364 if (epas.contains(podIpStr)) {
365 bkts = buildBuckets(deviceId, podIpStr, sp);
366 }
367 } else {
368 bkts = buildBuckets(deviceId, podIpStr, sp);
369 }
370
371 spGrpBkts.put(sp, bkts);
372 });
373
374 String serviceIp = service.getSpec().getClusterIP();
375 spGrpBkts.forEach((sp, bkts) -> {
376 String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
377 int groupId = svcStr.hashCode();
378
379 k8sGroupRuleService.setBuckets(appId, deviceId, groupId, bkts, install);
380 });
381 }
382
383 private List<GroupBucket> buildBuckets(DeviceId deviceId,
384 String podIpStr,
385 ServicePort sp) {
386 List<GroupBucket> bkts = Lists.newArrayList();
387
388 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
389 deviceService.getDevice(deviceId), ROUTING_TABLE);
390 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
391 .setIpDst(IpAddress.valueOf(podIpStr))
392 .extension(resubmitTreatment, deviceId);
393
394 if (TCP.equals(sp.getProtocol())) {
395 tBuilder.setTcpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
396 } else if (UDP.equals(sp.getProtocol())) {
397 tBuilder.setUdpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
398 }
399
400 bkts.add(buildGroupBucket(tBuilder.build(), SELECT, (short) -1));
401
402 return bkts;
403 }
404
405 private synchronized void setStatelessGroupFlowRules(DeviceId deviceId,
406 Service service,
407 boolean install) {
Jian Li5e8a22a2019-02-27 11:48:42 +0900408 Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
409 Map<String, String> nodeIpGatewayIpMap =
410 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
411 Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
412
413 spEpasMap.forEach((sp, epas) -> {
414 List<GroupBucket> bkts = Lists.newArrayList();
415 epas.forEach(epa -> {
416 String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
417 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
418 deviceService.getDevice(deviceId), ROUTING_TABLE);
419 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
420 .setIpDst(IpAddress.valueOf(podIp))
421 .extension(resubmitTreatment, deviceId);
422
423 if (TCP.equals(sp.getProtocol())) {
424 tBuilder.setTcpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
425 } else if (UDP.equals(sp.getProtocol())) {
426 tBuilder.setUdpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
427 }
428
429 bkts.add(buildGroupBucket(tBuilder.build(), SELECT, (short) -1));
430 });
431 spGrpBkts.put(sp, bkts);
432 });
433
434 String serviceIp = service.getSpec().getClusterIP();
435 spGrpBkts.forEach((sp, bkts) -> {
436 String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
Jian Li4a7ce672019-04-09 15:20:25 +0900437 int groupId = svcStr.hashCode();
Jian Li5e8a22a2019-02-27 11:48:42 +0900438
Jian Li4a7ce672019-04-09 15:20:25 +0900439 if (install) {
440
441 // add group table rules
442 k8sGroupRuleService.setRule(appId, deviceId, groupId,
443 SELECT, bkts, true);
444
445 log.info("Adding group rule {}", groupId);
446
447 // if we failed to add group rule, we will not install flow rules
448 // as this might cause rule inconsistency
449 if (k8sGroupRuleService.hasGroup(deviceId, groupId)) {
450 // add flow rules for shifting IP domain
451 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
452 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
453 sp.getProtocol(), true);
454 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900455 } else {
Jian Li4a7ce672019-04-09 15:20:25 +0900456 // remove flow rules for shifting IP domain
457 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
458 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
459 sp.getProtocol(), false);
460
461 // remove group table rules
462 k8sGroupRuleService.setRule(appId, deviceId, groupId,
463 SELECT, bkts, false);
464
465 log.info("Removing group rule {}", groupId);
Jian Li5e8a22a2019-02-27 11:48:42 +0900466 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900467 });
Jian Li004526d2019-02-25 16:26:27 +0900468
Jian Li5e8a22a2019-02-27 11:48:42 +0900469 spEpasMap.forEach((sp, epas) ->
470 // add flow rules for unshifting IP domain
471 epas.forEach(epa -> {
472 String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
473 setUnshiftDomainRules(deviceId, POD_TABLE,
474 PRIORITY_NAT_RULE, serviceIp, sp.getPort(), sp.getProtocol(),
475 podIp, sp.getTargetPort().getIntVal(), install);
476 }
477 ));
Jian Li004526d2019-02-25 16:26:27 +0900478 }
479
480 private void setShiftDomainRules(DeviceId deviceId, int installTable,
Jian Li5e8a22a2019-02-27 11:48:42 +0900481 int groupId, int priority, String serviceIp,
482 int servicePort, String protocol, boolean install) {
483 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Li004526d2019-02-25 16:26:27 +0900484 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li5e8a22a2019-02-27 11:48:42 +0900485 .matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(serviceIp), HOST_CIDR_NUM));
486
487 if (TCP.equals(protocol)) {
488 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
489 .matchTcpDst(TpPort.tpPort(servicePort));
490 } else if (UDP.equals(protocol)) {
491 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
492 .matchUdpDst(TpPort.tpPort(servicePort));
493 }
Jian Li004526d2019-02-25 16:26:27 +0900494
495 ExtensionTreatment loadTreatment = buildLoadExtension(
Jian Li5e8a22a2019-02-27 11:48:42 +0900496 deviceService.getDevice(deviceId), SRC, SHIFTED_IP_PREFIX);
Jian Li004526d2019-02-25 16:26:27 +0900497
498 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
499 .extension(loadTreatment, deviceId)
500 .group(GroupId.valueOf(groupId))
501 .build();
502
503 k8sFlowRuleService.setRule(
504 appId,
505 deviceId,
Jian Li5e8a22a2019-02-27 11:48:42 +0900506 sBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900507 treatment,
508 priority,
509 installTable,
510 install);
511 }
512
513 private void setUnshiftDomainRules(DeviceId deviceId, int installTable,
Jian Li5e8a22a2019-02-27 11:48:42 +0900514 int priority, String serviceIp,
515 int servicePort, String protocol,
516 String podIp, int podPort, boolean install) {
517 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Li004526d2019-02-25 16:26:27 +0900518 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li5e8a22a2019-02-27 11:48:42 +0900519 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(podIp), HOST_CIDR_NUM));
520
521 if (TCP.equals(protocol)) {
522 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
523 .matchTcpSrc(TpPort.tpPort(podPort));
524 } else if (UDP.equals(protocol)) {
525 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
526 .matchUdpSrc(TpPort.tpPort(podPort));
527 }
528
529 String podIpPrefix = podIp.split("\\.")[0] +
530 "." + podIp.split("\\.")[1];
Jian Li004526d2019-02-25 16:26:27 +0900531
532 ExtensionTreatment loadTreatment = buildLoadExtension(
Jian Li5e8a22a2019-02-27 11:48:42 +0900533 deviceService.getDevice(deviceId), DST, podIpPrefix);
Jian Li004526d2019-02-25 16:26:27 +0900534
Jian Li5e8a22a2019-02-27 11:48:42 +0900535 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li004526d2019-02-25 16:26:27 +0900536 .extension(loadTreatment, deviceId)
537 .setIpSrc(IpAddress.valueOf(serviceIp))
Jian Li5e8a22a2019-02-27 11:48:42 +0900538 .transition(ROUTING_TABLE);
539
540 if (TCP.equals(protocol)) {
541 tBuilder.setTcpSrc(TpPort.tpPort(servicePort));
542 } else if (UDP.equals(protocol)) {
543 tBuilder.setUdpSrc(TpPort.tpPort(servicePort));
544 }
Jian Li004526d2019-02-25 16:26:27 +0900545
546 k8sFlowRuleService.setRule(
547 appId,
548 deviceId,
Jian Li5e8a22a2019-02-27 11:48:42 +0900549 sBuilder.build(),
550 tBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900551 priority,
552 installTable,
553 install);
554 }
555
556 private void setStatefulGroupFlowRules(DeviceId deviceId, long ctState,
557 long ctMask, Service service,
558 boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900559 int groupId = (int) groupIdCounter.incrementAndGet();
560
561 List<GroupBucket> buckets = Lists.newArrayList();
562
563 String serviceName = service.getMetadata().getName();
564 String serviceIp = service.getSpec().getClusterIP();
565
566 // TODO: multi-ports case should be addressed
567 Integer servicePort = service.getSpec().getPorts().get(0).getPort();
568
569 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
570 .stream()
571 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
572 .collect(Collectors.toList());
573
574 Map<String, String> nodeIpGatewayIpMap =
575 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
576
577 for (Endpoints endpoints : endpointses) {
578 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
579 List<EndpointPort> ports = endpointSubset.getPorts()
580 .stream()
581 .filter(p -> p.getProtocol().equals(TCP))
582 .collect(Collectors.toList());
583
584 for (EndpointAddress address : endpointSubset.getAddresses()) {
585 String podIp = nodeIpGatewayIpMap.containsKey(address.getIp()) ?
586 nodeIpGatewayIpMap.get(address.getIp()) : address.getIp();
587
588 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
589 niciraConnTrackTreatmentBuilder(driverService, deviceId)
590 .commit(true)
591 .natAction(true)
592 .natIp(IpAddress.valueOf(podIp))
593 .natFlag(CT_NAT_DST_FLAG);
594
595 ports.forEach(p -> {
596 ExtensionTreatment ctNatTreatment = connTreatmentBuilder
597 .natPort(TpPort.tpPort(p.getPort())).build();
598 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
599 deviceService.getDevice(deviceId), ROUTING_TABLE);
600 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
601 .extension(ctNatTreatment, deviceId)
602 .extension(resubmitTreatment, deviceId)
603 .build();
604 buckets.add(buildGroupBucket(treatment, SELECT, (short) -1));
605 });
606 }
607 }
608 }
609
610 if (!buckets.isEmpty()) {
611 k8sGroupRuleService.setRule(appId, deviceId, groupId, SELECT, buckets, install);
612
613 setTrackNew(deviceId, ctState, ctMask, IpAddress.valueOf(serviceIp),
614 TpPort.tpPort(servicePort), NAT_TABLE, groupId,
615 PRIORITY_CT_RULE, install);
616 }
617 }
618
619 private void setUntrack(DeviceId deviceId, long ctState, long ctMask,
620 String srcCidr, String dstCidr, int installTable,
621 int transitTable, int priority, boolean install) {
622 ExtensionSelector esCtSate = RulePopulatorUtil
623 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
624 TrafficSelector selector = DefaultTrafficSelector.builder()
625 .matchEthType(Ethernet.TYPE_IPV4)
626 .matchIPSrc(IpPrefix.valueOf(srcCidr))
627 .matchIPDst(IpPrefix.valueOf(dstCidr))
628 .extension(esCtSate, deviceId)
629 .build();
630
631 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
632 niciraConnTrackTreatmentBuilder(driverService, deviceId)
633 .natAction(false)
634 .commit(false)
635 .table((short) transitTable);
636
637 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
638 .extension(connTreatmentBuilder.build(), deviceId)
639 .build();
640
641 k8sFlowRuleService.setRule(
642 appId,
643 deviceId,
644 selector,
645 treatment,
646 priority,
647 installTable,
648 install);
649 }
650
651 private void setTrackNew(DeviceId deviceId, long ctState, long ctMask,
652 IpAddress dstIp, TpPort dstPort, int installTable,
653 int groupId, int priority, boolean install) {
654 ExtensionSelector esCtSate = RulePopulatorUtil
655 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
656 TrafficSelector selector = DefaultTrafficSelector.builder()
657 .matchEthType(Ethernet.TYPE_IPV4)
658 .matchIPDst(IpPrefix.valueOf(dstIp, HOST_CIDR_NUM))
659 .matchIPProtocol(IPv4.PROTOCOL_TCP)
660 .matchTcpDst(dstPort)
661 .extension(esCtSate, deviceId)
662 .build();
663 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
664 .group(GroupId.valueOf(groupId))
665 .build();
666
667 k8sFlowRuleService.setRule(
668 appId,
669 deviceId,
670 selector,
671 treatment,
672 priority,
673 installTable,
674 install);
675 }
676
677 private void setTrackEstablish(DeviceId deviceId, long ctState, long ctMask,
678 int installTable, int transitTable,
679 int priority, boolean install) {
680 ExtensionSelector esCtSate = RulePopulatorUtil
681 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
682 TrafficSelector selector = DefaultTrafficSelector.builder()
683 .extension(esCtSate, deviceId)
684 .build();
685
686 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
687 .transition(transitTable)
688 .build();
689
690 k8sFlowRuleService.setRule(
691 appId,
692 deviceId,
693 selector,
694 treatment,
695 priority,
696 installTable,
697 install);
698 }
699
Jian Li004526d2019-02-25 16:26:27 +0900700 /**
701 * Extracts properties from the component configuration context.
702 *
703 * @param context the component context
704 */
705 private void readComponentConfiguration(ComponentContext context) {
706 Dictionary<?, ?> properties = context.getProperties();
707
708 String updatedNatMode = Tools.get(properties, SERVICE_IP_NAT_MODE);
709 serviceIpNatMode = updatedNatMode != null ? updatedNatMode : SERVICE_IP_NAT_MODE_DEFAULT;
710 log.info("Configured. Service IP NAT mode is {}", serviceIpNatMode);
711 }
712
Jian Li4a7ce672019-04-09 15:20:25 +0900713 private void setServiceNatRules(DeviceId deviceId, boolean install) {
714 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
715 setStatefulServiceNatRules(deviceId, install);
716 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
717 setStatelessServiceNatRules(deviceId, install);
718 } else {
719 log.warn("Service IP NAT mode was not configured!");
720 }
721 }
722
Jian Li2cc2b632019-02-18 00:56:40 +0900723 private class InternalK8sServiceListener implements K8sServiceListener {
724
725 private boolean isRelevantHelper() {
726 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
727 }
728
729 @Override
730 public void event(K8sServiceEvent event) {
731 switch (event.type()) {
732 case K8S_SERVICE_CREATED:
Jian Li4a7ce672019-04-09 15:20:25 +0900733 case K8S_SERVICE_UPDATED:
Jian Li2cc2b632019-02-18 00:56:40 +0900734 eventExecutor.execute(() -> processServiceCreation(event.subject()));
735 break;
736 case K8S_SERVICE_REMOVED:
737 eventExecutor.execute(() -> processServiceRemoval(event.subject()));
738 break;
739 default:
740 // do nothing
741 break;
742 }
743 }
744
745 private void processServiceCreation(Service service) {
746 if (!isRelevantHelper()) {
747 return;
748 }
749
Jian Li5e8a22a2019-02-27 11:48:42 +0900750 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
751 long ctTrackNew = computeCtStateFlag(true, true, false);
752 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
Jian Li2cc2b632019-02-18 00:56:40 +0900753
Jian Li5e8a22a2019-02-27 11:48:42 +0900754 k8sNodeService.completeNodes().forEach(n ->
755 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
756 ctMaskTrackNew, service, true));
757 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
758 k8sNodeService.completeNodes().forEach(n ->
759 setStatelessGroupFlowRules(n.intgBridge(), service, true));
760 }
Jian Li2cc2b632019-02-18 00:56:40 +0900761 }
762
763 private void processServiceRemoval(Service service) {
764 if (!isRelevantHelper()) {
765 return;
766 }
767
Jian Li5e8a22a2019-02-27 11:48:42 +0900768 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
769 long ctTrackNew = computeCtStateFlag(true, true, false);
770 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
Jian Li2cc2b632019-02-18 00:56:40 +0900771
Jian Li5e8a22a2019-02-27 11:48:42 +0900772 k8sNodeService.completeNodes().forEach(n ->
773 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
774 ctMaskTrackNew, service, false));
775 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
776 k8sNodeService.completeNodes().forEach(n ->
777 setStatelessGroupFlowRules(n.intgBridge(), service, false));
778 }
Jian Li004526d2019-02-25 16:26:27 +0900779 }
Jian Li2cc2b632019-02-18 00:56:40 +0900780 }
781
Jian Li4a7ce672019-04-09 15:20:25 +0900782 private class InternalK8sPodListener implements K8sPodListener {
783
784 private boolean isRelevantHelper() {
785 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
786 }
787
788 @Override
789 public void event(K8sPodEvent event) {
790 switch (event.type()) {
Jian Li5c755832019-04-11 23:24:28 +0900791 case K8S_POD_ANNOTATION_ADDED:
792 eventExecutor.execute(() -> processPodAnnotAddition(event.subject()));
Jian Li4a7ce672019-04-09 15:20:25 +0900793 break;
794 case K8S_POD_REMOVED:
795 eventExecutor.execute(() -> processPodRemoval(event.subject()));
796 break;
797 default:
798 break;
799 }
800 }
801
Jian Li5c755832019-04-11 23:24:28 +0900802 private void processPodAnnotAddition(Pod pod) {
Jian Li4a7ce672019-04-09 15:20:25 +0900803 if (!isRelevantHelper()) {
804 return;
805 }
806
807 setServiceRuleFromPod(pod, true);
808 }
809
810 private void processPodRemoval(Pod pod) {
811 if (!isRelevantHelper()) {
812 return;
813 }
814
815 setServiceRuleFromPod(pod, false);
816 }
817
818 private void setServiceRuleFromPod(Pod pod, boolean install) {
819 k8sServiceService.services().forEach(s -> {
820 pod.getMetadata().getLabels().forEach((pk, pv) -> {
821 Map<String, String> selectors = s.getSpec().getSelector();
822 if (selectors != null && selectors.containsKey(pk)) {
823 if (pv.equals(selectors.get(pk))) {
824 k8sNodeService.completeNodes().forEach(n ->
825 setGroupBuckets(n.intgBridge(), s, pod, install));
826 }
827 }
828 });
829 });
830 }
831 }
832
Jian Li2cc2b632019-02-18 00:56:40 +0900833 private class InternalNodeEventListener implements K8sNodeListener {
834
835 private boolean isRelevantHelper() {
836 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
837 }
838
839 @Override
840 public void event(K8sNodeEvent event) {
841 K8sNode k8sNode = event.subject();
842 switch (event.type()) {
843 case K8S_NODE_COMPLETE:
844 eventExecutor.execute(() -> processNodeCompletion(k8sNode));
845 break;
846 case K8S_NODE_INCOMPLETE:
Jian Li4a7ce672019-04-09 15:20:25 +0900847 case K8S_NODE_REMOVED:
Jian Li2cc2b632019-02-18 00:56:40 +0900848 default:
849 break;
850 }
851 }
852
853 private void processNodeCompletion(K8sNode node) {
854 if (!isRelevantHelper()) {
855 return;
856 }
857
Jian Li4a7ce672019-04-09 15:20:25 +0900858 setServiceNatRules(node.intgBridge(), true);
Jian Li2cc2b632019-02-18 00:56:40 +0900859 }
860 }
861}