blob: 5809aabd228c3abdca587521b23aa6727e932b4e [file] [log] [blame]
Jian Li2cc2b632019-02-18 00:56:40 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.Lists;
Jian Li004526d2019-02-25 16:26:27 +090019import com.google.common.collect.Maps;
Jian Li2cc2b632019-02-18 00:56:40 +090020import io.fabric8.kubernetes.api.model.EndpointAddress;
21import io.fabric8.kubernetes.api.model.EndpointPort;
22import io.fabric8.kubernetes.api.model.EndpointSubset;
23import io.fabric8.kubernetes.api.model.Endpoints;
Jian Li4a7ce672019-04-09 15:20:25 +090024import io.fabric8.kubernetes.api.model.Pod;
Jian Li2cc2b632019-02-18 00:56:40 +090025import io.fabric8.kubernetes.api.model.Service;
Jian Li5e8a22a2019-02-27 11:48:42 +090026import io.fabric8.kubernetes.api.model.ServicePort;
Jian Li2cc2b632019-02-18 00:56:40 +090027import org.onlab.packet.Ethernet;
28import org.onlab.packet.IPv4;
29import org.onlab.packet.IpAddress;
30import org.onlab.packet.IpPrefix;
31import org.onlab.packet.TpPort;
Jian Li004526d2019-02-25 16:26:27 +090032import org.onlab.util.Tools;
33import org.onosproject.cfg.ComponentConfigService;
Jian Li2cc2b632019-02-18 00:56:40 +090034import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.LeadershipService;
36import org.onosproject.cluster.NodeId;
37import org.onosproject.core.ApplicationId;
38import org.onosproject.core.CoreService;
39import org.onosproject.core.GroupId;
40import org.onosproject.k8snetworking.api.K8sEndpointsService;
41import org.onosproject.k8snetworking.api.K8sFlowRuleService;
42import org.onosproject.k8snetworking.api.K8sGroupRuleService;
43import org.onosproject.k8snetworking.api.K8sNetworkService;
Jian Li4a7ce672019-04-09 15:20:25 +090044import org.onosproject.k8snetworking.api.K8sPodEvent;
45import org.onosproject.k8snetworking.api.K8sPodListener;
46import org.onosproject.k8snetworking.api.K8sPodService;
Jian Li2cc2b632019-02-18 00:56:40 +090047import org.onosproject.k8snetworking.api.K8sServiceEvent;
48import org.onosproject.k8snetworking.api.K8sServiceListener;
49import org.onosproject.k8snetworking.api.K8sServiceService;
50import org.onosproject.k8snetworking.util.RulePopulatorUtil;
51import org.onosproject.k8snetworking.util.RulePopulatorUtil.NiciraConnTrackTreatmentBuilder;
52import org.onosproject.k8snode.api.K8sNode;
53import org.onosproject.k8snode.api.K8sNodeEvent;
54import org.onosproject.k8snode.api.K8sNodeListener;
55import org.onosproject.k8snode.api.K8sNodeService;
56import org.onosproject.net.DeviceId;
57import org.onosproject.net.device.DeviceService;
58import org.onosproject.net.driver.DriverService;
59import org.onosproject.net.flow.DefaultTrafficSelector;
60import org.onosproject.net.flow.DefaultTrafficTreatment;
61import org.onosproject.net.flow.TrafficSelector;
62import org.onosproject.net.flow.TrafficTreatment;
63import org.onosproject.net.flow.criteria.ExtensionSelector;
64import org.onosproject.net.flow.instructions.ExtensionTreatment;
65import org.onosproject.net.group.GroupBucket;
66import org.onosproject.store.service.AtomicCounter;
67import org.onosproject.store.service.StorageService;
Jian Li004526d2019-02-25 16:26:27 +090068import org.osgi.service.component.ComponentContext;
Jian Li2cc2b632019-02-18 00:56:40 +090069import org.osgi.service.component.annotations.Activate;
70import org.osgi.service.component.annotations.Component;
71import org.osgi.service.component.annotations.Deactivate;
Jian Li004526d2019-02-25 16:26:27 +090072import org.osgi.service.component.annotations.Modified;
Jian Li2cc2b632019-02-18 00:56:40 +090073import org.osgi.service.component.annotations.Reference;
74import org.osgi.service.component.annotations.ReferenceCardinality;
75import org.slf4j.Logger;
76
Jian Li004526d2019-02-25 16:26:27 +090077import java.util.Dictionary;
Jian Li2cc2b632019-02-18 00:56:40 +090078import java.util.List;
79import java.util.Map;
80import java.util.Objects;
Jian Li004526d2019-02-25 16:26:27 +090081import java.util.Set;
Jian Li2cc2b632019-02-18 00:56:40 +090082import java.util.concurrent.ExecutorService;
83import java.util.stream.Collectors;
84
85import static java.util.concurrent.Executors.newSingleThreadExecutor;
86import static org.onlab.util.Tools.groupedThreads;
Jian Li5e8a22a2019-02-27 11:48:42 +090087import static org.onosproject.k8snetworking.api.Constants.DST;
Jian Li2cc2b632019-02-18 00:56:40 +090088import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE;
89import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Li004526d2019-02-25 16:26:27 +090090import static org.onosproject.k8snetworking.api.Constants.NAT_STATEFUL;
91import static org.onosproject.k8snetworking.api.Constants.NAT_STATELESS;
Jian Li2cc2b632019-02-18 00:56:40 +090092import static org.onosproject.k8snetworking.api.Constants.NAT_TABLE;
Jian Li004526d2019-02-25 16:26:27 +090093import static org.onosproject.k8snetworking.api.Constants.POD_TABLE;
Jian Li2cc2b632019-02-18 00:56:40 +090094import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE;
Jian Li7d111d72019-04-12 13:58:44 +090095import static org.onosproject.k8snetworking.api.Constants.PRIORITY_INTER_ROUTING_RULE;
Jian Li004526d2019-02-25 16:26:27 +090096import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAT_RULE;
Jian Li2cc2b632019-02-18 00:56:40 +090097import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
Jian Li004526d2019-02-25 16:26:27 +090098import static org.onosproject.k8snetworking.api.Constants.SERVICE_IP_CIDR;
99import static org.onosproject.k8snetworking.api.Constants.SERVICE_TABLE;
100import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_CIDR;
101import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
Jian Li5e8a22a2019-02-27 11:48:42 +0900102import static org.onosproject.k8snetworking.api.Constants.SRC;
Jian Li004526d2019-02-25 16:26:27 +0900103import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE;
104import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE_DEFAULT;
Jian Li7d111d72019-04-12 13:58:44 +0900105import static org.onosproject.k8snetworking.api.Constants.STAT_OUTBOUND_TABLE;
Jian Li2cc2b632019-02-18 00:56:40 +0900106import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap;
107import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_DST_FLAG;
108import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildGroupBucket;
Jian Li004526d2019-02-25 16:26:27 +0900109import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildLoadExtension;
Jian Li2cc2b632019-02-18 00:56:40 +0900110import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildResubmitExtension;
111import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtMaskFlag;
112import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtStateFlag;
113import static org.onosproject.k8snetworking.util.RulePopulatorUtil.niciraConnTrackTreatmentBuilder;
114import static org.onosproject.net.group.GroupDescription.Type.SELECT;
115import static org.slf4j.LoggerFactory.getLogger;
116
117/**
118 * Handles the service IP to pod IP related translation traffic.
119 */
Jian Li004526d2019-02-25 16:26:27 +0900120@Component(
121 immediate = true,
122 property = {
123 SERVICE_IP_NAT_MODE + "=" + SERVICE_IP_NAT_MODE_DEFAULT
124 }
125)
Jian Li2cc2b632019-02-18 00:56:40 +0900126public class K8sServiceHandler {
127
128 private final Logger log = getLogger(getClass());
129
Jian Li2cc2b632019-02-18 00:56:40 +0900130 private static final int HOST_CIDR_NUM = 32;
131
132 private static final String NONE = "None";
133 private static final String CLUSTER_IP = "ClusterIP";
134 private static final String TCP = "TCP";
Jian Li5e8a22a2019-02-27 11:48:42 +0900135 private static final String UDP = "UDP";
Jian Li2cc2b632019-02-18 00:56:40 +0900136
137 private static final String GROUP_ID_COUNTER_NAME = "group-id-counter";
138
Jian Li4a7ce672019-04-09 15:20:25 +0900139 private static final String IP_ADDRESS = "ipAddress";
140
Jian Li2cc2b632019-02-18 00:56:40 +0900141 @Reference(cardinality = ReferenceCardinality.MANDATORY)
142 protected CoreService coreService;
143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY)
145 protected LeadershipService leadershipService;
146
147 @Reference(cardinality = ReferenceCardinality.MANDATORY)
148 protected ClusterService clusterService;
149
150 @Reference(cardinality = ReferenceCardinality.MANDATORY)
151 protected DriverService driverService;
152
153 @Reference(cardinality = ReferenceCardinality.MANDATORY)
154 protected DeviceService deviceService;
155
156 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li004526d2019-02-25 16:26:27 +0900157 protected ComponentConfigService configService;
158
159 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li2cc2b632019-02-18 00:56:40 +0900160 protected StorageService storageService;
161
162 @Reference(cardinality = ReferenceCardinality.MANDATORY)
163 protected K8sNetworkService k8sNetworkService;
164
165 @Reference(cardinality = ReferenceCardinality.MANDATORY)
166 protected K8sFlowRuleService k8sFlowRuleService;
167
168 @Reference(cardinality = ReferenceCardinality.MANDATORY)
169 protected K8sGroupRuleService k8sGroupRuleService;
170
171 @Reference(cardinality = ReferenceCardinality.MANDATORY)
172 protected K8sNodeService k8sNodeService;
173
174 @Reference(cardinality = ReferenceCardinality.MANDATORY)
175 protected K8sEndpointsService k8sEndpointsService;
176
177 @Reference(cardinality = ReferenceCardinality.MANDATORY)
178 protected K8sServiceService k8sServiceService;
179
Jian Li4a7ce672019-04-09 15:20:25 +0900180 @Reference(cardinality = ReferenceCardinality.MANDATORY)
181 protected K8sPodService k8sPodService;
182
Jian Li004526d2019-02-25 16:26:27 +0900183 /** Service IP address translation mode. */
184 private String serviceIpNatMode = SERVICE_IP_NAT_MODE_DEFAULT;
185
Jian Li2cc2b632019-02-18 00:56:40 +0900186 private final ExecutorService eventExecutor = newSingleThreadExecutor(
187 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
188 private final InternalNodeEventListener internalNodeEventListener =
189 new InternalNodeEventListener();
190 private final InternalK8sServiceListener internalK8sServiceListener =
191 new InternalK8sServiceListener();
Jian Li4a7ce672019-04-09 15:20:25 +0900192 private final InternalK8sPodListener internalK8sPodListener =
193 new InternalK8sPodListener();
Jian Li2cc2b632019-02-18 00:56:40 +0900194
195 private AtomicCounter groupIdCounter;
196
197 private ApplicationId appId;
198 private NodeId localNodeId;
199
200 @Activate
201 protected void activate() {
202 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
Jian Li004526d2019-02-25 16:26:27 +0900203 configService.registerProperties(getClass());
Jian Li2cc2b632019-02-18 00:56:40 +0900204 localNodeId = clusterService.getLocalNode().id();
205 leadershipService.runForLeadership(appId.name());
206 k8sNodeService.addListener(internalNodeEventListener);
207 k8sServiceService.addListener(internalK8sServiceListener);
Jian Li4a7ce672019-04-09 15:20:25 +0900208 k8sPodService.addListener(internalK8sPodListener);
Jian Li2cc2b632019-02-18 00:56:40 +0900209
210 groupIdCounter = storageService.getAtomicCounter(GROUP_ID_COUNTER_NAME);
211
212 log.info("Started");
213 }
214
215 @Deactivate
216 protected void deactivate() {
217 leadershipService.withdraw(appId.name());
Jian Li4a7ce672019-04-09 15:20:25 +0900218 k8sPodService.removeListener(internalK8sPodListener);
Jian Li2cc2b632019-02-18 00:56:40 +0900219 k8sNodeService.removeListener(internalNodeEventListener);
220 k8sServiceService.removeListener(internalK8sServiceListener);
Jian Li004526d2019-02-25 16:26:27 +0900221 configService.unregisterProperties(getClass(), false);
Jian Li2cc2b632019-02-18 00:56:40 +0900222 eventExecutor.shutdown();
223
224 log.info("Stopped");
225 }
226
Jian Li004526d2019-02-25 16:26:27 +0900227 @Modified
228 void modified(ComponentContext context) {
229 readComponentConfiguration(context);
230
231 log.info("Modified");
232 }
233
234 private void setStatefulServiceNatRules(DeviceId deviceId, boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900235 // -trk CT rules
236 long ctUntrack = computeCtStateFlag(false, false, false);
237 long ctMaskUntrack = computeCtMaskFlag(true, false, false);
238
239 k8sNetworkService.networks().forEach(n -> {
240 // TODO: need to provide a way to add multiple service IP CIDR ranges
241 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), SERVICE_IP_CIDR,
242 JUMP_TABLE, NAT_TABLE, PRIORITY_CT_RULE, install);
243 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), n.cidr(),
244 JUMP_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
245 });
246
247 // +trk-new CT rules
248 long ctTrackUnnew = computeCtStateFlag(true, false, false);
249 long ctMaskTrackUnnew = computeCtMaskFlag(true, true, false);
250
251 setTrackEstablish(deviceId, ctTrackUnnew, ctMaskTrackUnnew,
252 NAT_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
253
254 // +trk+new CT rules
255 long ctTrackNew = computeCtStateFlag(true, true, false);
256 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
257
258 k8sServiceService.services().stream()
259 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
Jian Li004526d2019-02-25 16:26:27 +0900260 .forEach(s -> setStatefulGroupFlowRules(deviceId, ctTrackNew,
261 ctMaskTrackNew, s, install));
Jian Li2cc2b632019-02-18 00:56:40 +0900262 }
263
Jian Li004526d2019-02-25 16:26:27 +0900264 private void setStatelessServiceNatRules(DeviceId deviceId, boolean install) {
265
Jian Li7d111d72019-04-12 13:58:44 +0900266 String srcCidr = k8sNetworkService.network(
267 k8sNodeService.node(deviceId).hostname()).cidr();
268
Jian Li004526d2019-02-25 16:26:27 +0900269 k8sNetworkService.networks().forEach(n -> {
Jian Li7d111d72019-04-12 13:58:44 +0900270 setSrcDstCidrRules(deviceId, n.cidr(), SERVICE_IP_CIDR, null, JUMP_TABLE,
Jian Li004526d2019-02-25 16:26:27 +0900271 SERVICE_TABLE, PRIORITY_CT_RULE, install);
Jian Li7d111d72019-04-12 13:58:44 +0900272 setSrcDstCidrRules(deviceId, n.cidr(), SHIFTED_IP_CIDR, null, JUMP_TABLE,
Jian Li004526d2019-02-25 16:26:27 +0900273 POD_TABLE, PRIORITY_CT_RULE, install);
Jian Li7d111d72019-04-12 13:58:44 +0900274 setSrcDstCidrRules(deviceId, srcCidr, n.cidr(), n.segmentId(), ROUTING_TABLE,
275 STAT_OUTBOUND_TABLE, PRIORITY_INTER_ROUTING_RULE, install);
Jian Li004526d2019-02-25 16:26:27 +0900276 });
277
278 // setup load balancing rules using group table
279 k8sServiceService.services().stream()
280 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
281 .forEach(s -> setStatelessGroupFlowRules(deviceId, s, install));
282 }
283
284 private void setSrcDstCidrRules(DeviceId deviceId, String srcCidr,
Jian Li7d111d72019-04-12 13:58:44 +0900285 String dstCidr, String segId, int installTable,
Jian Li004526d2019-02-25 16:26:27 +0900286 int transitTable, int priority, boolean install) {
287 TrafficSelector selector = DefaultTrafficSelector.builder()
288 .matchEthType(Ethernet.TYPE_IPV4)
289 .matchIPSrc(IpPrefix.valueOf(srcCidr))
290 .matchIPDst(IpPrefix.valueOf(dstCidr))
291 .build();
292
Jian Li7d111d72019-04-12 13:58:44 +0900293 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
294
295 if (segId != null) {
296 tBuilder.setTunnelId(Long.valueOf(segId));
297 }
298 tBuilder.transition(transitTable);
Jian Li004526d2019-02-25 16:26:27 +0900299
300 k8sFlowRuleService.setRule(
301 appId,
302 deviceId,
303 selector,
Jian Li7d111d72019-04-12 13:58:44 +0900304 tBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900305 priority,
306 installTable,
307 install);
308 }
309
Jian Li5e8a22a2019-02-27 11:48:42 +0900310 private String servicePortStr(String ip, int port, String protocol) {
311 return ip + "_" + port + "_" + protocol;
312 }
Jian Li004526d2019-02-25 16:26:27 +0900313
Jian Li5e8a22a2019-02-27 11:48:42 +0900314 /**
315 * Obtains the service port to endpoint address paired map.
316 *
317 * @param service kubernetes service
318 * @return a map where key is kubernetes service port, and value is the
319 * endpoint addresses that are associated with the service port
320 */
321 private Map<ServicePort, Set<String>> getSportEpAddressMap(Service service) {
322
323 Map<ServicePort, Set<String>> map = Maps.newConcurrentMap();
Jian Li004526d2019-02-25 16:26:27 +0900324
325 String serviceName = service.getMetadata().getName();
Jian Li004526d2019-02-25 16:26:27 +0900326 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
327 .stream()
328 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
329 .collect(Collectors.toList());
330
Jian Li5e8a22a2019-02-27 11:48:42 +0900331 service.getSpec().getPorts().stream()
332 .filter(Objects::nonNull)
333 .filter(sp -> sp.getTargetPort() != null)
334 .filter(sp -> sp.getTargetPort().getIntVal() != null)
335 .forEach(sp -> {
336 int targetPort = sp.getTargetPort().getIntVal();
337 String targetProtocol = sp.getProtocol();
Jian Li004526d2019-02-25 16:26:27 +0900338
Jian Li5e8a22a2019-02-27 11:48:42 +0900339 for (Endpoints endpoints : endpointses) {
340 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
341 for (EndpointPort endpointPort : endpointSubset.getPorts()) {
342 if (targetProtocol.equals(endpointPort.getProtocol()) &&
343 targetPort == endpointPort.getPort()) {
344 Set<String> addresses = endpointSubset.getAddresses()
345 .stream().map(EndpointAddress::getIp)
346 .collect(Collectors.toSet());
347 map.put(sp, addresses);
Jian Li004526d2019-02-25 16:26:27 +0900348 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900349 }
Jian Li004526d2019-02-25 16:26:27 +0900350 }
351 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900352 });
Jian Li004526d2019-02-25 16:26:27 +0900353
Jian Li5e8a22a2019-02-27 11:48:42 +0900354 return map;
355 }
356
Jian Li4a7ce672019-04-09 15:20:25 +0900357 private void setGroupBuckets(DeviceId deviceId, Service service, Pod pod, boolean install) {
358
359 if (pod.getMetadata().getAnnotations() == null) {
360 return;
361 }
362
363 String podIpStr = pod.getMetadata().getAnnotations().get(IP_ADDRESS);
364
365 Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
366 Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
367
368 spEpasMap.forEach((sp, epas) -> {
369 List<GroupBucket> bkts = Lists.newArrayList();
370
371 if (install) {
372 if (epas.contains(podIpStr)) {
373 bkts = buildBuckets(deviceId, podIpStr, sp);
374 }
375 } else {
376 bkts = buildBuckets(deviceId, podIpStr, sp);
377 }
378
379 spGrpBkts.put(sp, bkts);
380 });
381
382 String serviceIp = service.getSpec().getClusterIP();
383 spGrpBkts.forEach((sp, bkts) -> {
384 String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
385 int groupId = svcStr.hashCode();
386
387 k8sGroupRuleService.setBuckets(appId, deviceId, groupId, bkts, install);
388 });
389 }
390
391 private List<GroupBucket> buildBuckets(DeviceId deviceId,
392 String podIpStr,
393 ServicePort sp) {
394 List<GroupBucket> bkts = Lists.newArrayList();
395
Jian Li4a7ce672019-04-09 15:20:25 +0900396 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li7d111d72019-04-12 13:58:44 +0900397 .setIpDst(IpAddress.valueOf(podIpStr));
Jian Li4a7ce672019-04-09 15:20:25 +0900398
399 if (TCP.equals(sp.getProtocol())) {
400 tBuilder.setTcpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
401 } else if (UDP.equals(sp.getProtocol())) {
402 tBuilder.setUdpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
403 }
404
Jian Li7d111d72019-04-12 13:58:44 +0900405 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
406 deviceService.getDevice(deviceId), ROUTING_TABLE);
407 tBuilder.extension(resubmitTreatment, deviceId);
408
Jian Li4a7ce672019-04-09 15:20:25 +0900409 bkts.add(buildGroupBucket(tBuilder.build(), SELECT, (short) -1));
410
411 return bkts;
412 }
413
414 private synchronized void setStatelessGroupFlowRules(DeviceId deviceId,
415 Service service,
416 boolean install) {
Jian Li5e8a22a2019-02-27 11:48:42 +0900417 Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
418 Map<String, String> nodeIpGatewayIpMap =
419 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
420 Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
421
422 spEpasMap.forEach((sp, epas) -> {
423 List<GroupBucket> bkts = Lists.newArrayList();
424 epas.forEach(epa -> {
425 String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
Jian Li7d111d72019-04-12 13:58:44 +0900426
Jian Li5e8a22a2019-02-27 11:48:42 +0900427 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li7d111d72019-04-12 13:58:44 +0900428 .setIpDst(IpAddress.valueOf(podIp));
Jian Li5e8a22a2019-02-27 11:48:42 +0900429
430 if (TCP.equals(sp.getProtocol())) {
431 tBuilder.setTcpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
432 } else if (UDP.equals(sp.getProtocol())) {
433 tBuilder.setUdpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
434 }
435
Jian Li7d111d72019-04-12 13:58:44 +0900436 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
437 deviceService.getDevice(deviceId), ROUTING_TABLE);
438 tBuilder.extension(resubmitTreatment, deviceId);
439
Jian Li5e8a22a2019-02-27 11:48:42 +0900440 bkts.add(buildGroupBucket(tBuilder.build(), SELECT, (short) -1));
441 });
442 spGrpBkts.put(sp, bkts);
443 });
444
445 String serviceIp = service.getSpec().getClusterIP();
446 spGrpBkts.forEach((sp, bkts) -> {
447 String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
Jian Li4a7ce672019-04-09 15:20:25 +0900448 int groupId = svcStr.hashCode();
Jian Li5e8a22a2019-02-27 11:48:42 +0900449
Jian Li4a7ce672019-04-09 15:20:25 +0900450 if (install) {
451
452 // add group table rules
453 k8sGroupRuleService.setRule(appId, deviceId, groupId,
454 SELECT, bkts, true);
455
456 log.info("Adding group rule {}", groupId);
457
458 // if we failed to add group rule, we will not install flow rules
459 // as this might cause rule inconsistency
460 if (k8sGroupRuleService.hasGroup(deviceId, groupId)) {
461 // add flow rules for shifting IP domain
462 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
463 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
464 sp.getProtocol(), true);
465 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900466 } else {
Jian Li4a7ce672019-04-09 15:20:25 +0900467 // remove flow rules for shifting IP domain
468 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
469 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
470 sp.getProtocol(), false);
471
472 // remove group table rules
473 k8sGroupRuleService.setRule(appId, deviceId, groupId,
474 SELECT, bkts, false);
475
476 log.info("Removing group rule {}", groupId);
Jian Li5e8a22a2019-02-27 11:48:42 +0900477 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900478 });
Jian Li004526d2019-02-25 16:26:27 +0900479
Jian Li5e8a22a2019-02-27 11:48:42 +0900480 spEpasMap.forEach((sp, epas) ->
481 // add flow rules for unshifting IP domain
482 epas.forEach(epa -> {
483 String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
484 setUnshiftDomainRules(deviceId, POD_TABLE,
485 PRIORITY_NAT_RULE, serviceIp, sp.getPort(), sp.getProtocol(),
486 podIp, sp.getTargetPort().getIntVal(), install);
487 }
488 ));
Jian Li004526d2019-02-25 16:26:27 +0900489 }
490
491 private void setShiftDomainRules(DeviceId deviceId, int installTable,
Jian Li5e8a22a2019-02-27 11:48:42 +0900492 int groupId, int priority, String serviceIp,
493 int servicePort, String protocol, boolean install) {
494 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Li004526d2019-02-25 16:26:27 +0900495 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li5e8a22a2019-02-27 11:48:42 +0900496 .matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(serviceIp), HOST_CIDR_NUM));
497
498 if (TCP.equals(protocol)) {
499 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
500 .matchTcpDst(TpPort.tpPort(servicePort));
501 } else if (UDP.equals(protocol)) {
502 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
503 .matchUdpDst(TpPort.tpPort(servicePort));
504 }
Jian Li004526d2019-02-25 16:26:27 +0900505
506 ExtensionTreatment loadTreatment = buildLoadExtension(
Jian Li5e8a22a2019-02-27 11:48:42 +0900507 deviceService.getDevice(deviceId), SRC, SHIFTED_IP_PREFIX);
Jian Li004526d2019-02-25 16:26:27 +0900508
509 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
510 .extension(loadTreatment, deviceId)
511 .group(GroupId.valueOf(groupId))
512 .build();
513
514 k8sFlowRuleService.setRule(
515 appId,
516 deviceId,
Jian Li5e8a22a2019-02-27 11:48:42 +0900517 sBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900518 treatment,
519 priority,
520 installTable,
521 install);
522 }
523
524 private void setUnshiftDomainRules(DeviceId deviceId, int installTable,
Jian Li5e8a22a2019-02-27 11:48:42 +0900525 int priority, String serviceIp,
526 int servicePort, String protocol,
527 String podIp, int podPort, boolean install) {
528 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Li004526d2019-02-25 16:26:27 +0900529 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li5e8a22a2019-02-27 11:48:42 +0900530 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(podIp), HOST_CIDR_NUM));
531
532 if (TCP.equals(protocol)) {
533 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
534 .matchTcpSrc(TpPort.tpPort(podPort));
535 } else if (UDP.equals(protocol)) {
536 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
537 .matchUdpSrc(TpPort.tpPort(podPort));
538 }
539
540 String podIpPrefix = podIp.split("\\.")[0] +
541 "." + podIp.split("\\.")[1];
Jian Li004526d2019-02-25 16:26:27 +0900542
543 ExtensionTreatment loadTreatment = buildLoadExtension(
Jian Li5e8a22a2019-02-27 11:48:42 +0900544 deviceService.getDevice(deviceId), DST, podIpPrefix);
Jian Li004526d2019-02-25 16:26:27 +0900545
Jian Li5e8a22a2019-02-27 11:48:42 +0900546 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li004526d2019-02-25 16:26:27 +0900547 .extension(loadTreatment, deviceId)
548 .setIpSrc(IpAddress.valueOf(serviceIp))
Jian Li5e8a22a2019-02-27 11:48:42 +0900549 .transition(ROUTING_TABLE);
550
551 if (TCP.equals(protocol)) {
552 tBuilder.setTcpSrc(TpPort.tpPort(servicePort));
553 } else if (UDP.equals(protocol)) {
554 tBuilder.setUdpSrc(TpPort.tpPort(servicePort));
555 }
Jian Li004526d2019-02-25 16:26:27 +0900556
557 k8sFlowRuleService.setRule(
558 appId,
559 deviceId,
Jian Li5e8a22a2019-02-27 11:48:42 +0900560 sBuilder.build(),
561 tBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900562 priority,
563 installTable,
564 install);
565 }
566
567 private void setStatefulGroupFlowRules(DeviceId deviceId, long ctState,
568 long ctMask, Service service,
569 boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900570 int groupId = (int) groupIdCounter.incrementAndGet();
571
572 List<GroupBucket> buckets = Lists.newArrayList();
573
574 String serviceName = service.getMetadata().getName();
575 String serviceIp = service.getSpec().getClusterIP();
576
577 // TODO: multi-ports case should be addressed
578 Integer servicePort = service.getSpec().getPorts().get(0).getPort();
579
580 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
581 .stream()
582 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
583 .collect(Collectors.toList());
584
585 Map<String, String> nodeIpGatewayIpMap =
586 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
587
588 for (Endpoints endpoints : endpointses) {
589 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
590 List<EndpointPort> ports = endpointSubset.getPorts()
591 .stream()
592 .filter(p -> p.getProtocol().equals(TCP))
593 .collect(Collectors.toList());
594
595 for (EndpointAddress address : endpointSubset.getAddresses()) {
596 String podIp = nodeIpGatewayIpMap.containsKey(address.getIp()) ?
597 nodeIpGatewayIpMap.get(address.getIp()) : address.getIp();
598
599 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
600 niciraConnTrackTreatmentBuilder(driverService, deviceId)
601 .commit(true)
602 .natAction(true)
603 .natIp(IpAddress.valueOf(podIp))
604 .natFlag(CT_NAT_DST_FLAG);
605
606 ports.forEach(p -> {
607 ExtensionTreatment ctNatTreatment = connTreatmentBuilder
608 .natPort(TpPort.tpPort(p.getPort())).build();
609 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
610 deviceService.getDevice(deviceId), ROUTING_TABLE);
611 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
612 .extension(ctNatTreatment, deviceId)
613 .extension(resubmitTreatment, deviceId)
614 .build();
615 buckets.add(buildGroupBucket(treatment, SELECT, (short) -1));
616 });
617 }
618 }
619 }
620
621 if (!buckets.isEmpty()) {
622 k8sGroupRuleService.setRule(appId, deviceId, groupId, SELECT, buckets, install);
623
624 setTrackNew(deviceId, ctState, ctMask, IpAddress.valueOf(serviceIp),
625 TpPort.tpPort(servicePort), NAT_TABLE, groupId,
626 PRIORITY_CT_RULE, install);
627 }
628 }
629
630 private void setUntrack(DeviceId deviceId, long ctState, long ctMask,
631 String srcCidr, String dstCidr, int installTable,
632 int transitTable, int priority, boolean install) {
633 ExtensionSelector esCtSate = RulePopulatorUtil
634 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
635 TrafficSelector selector = DefaultTrafficSelector.builder()
636 .matchEthType(Ethernet.TYPE_IPV4)
637 .matchIPSrc(IpPrefix.valueOf(srcCidr))
638 .matchIPDst(IpPrefix.valueOf(dstCidr))
639 .extension(esCtSate, deviceId)
640 .build();
641
642 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
643 niciraConnTrackTreatmentBuilder(driverService, deviceId)
644 .natAction(false)
645 .commit(false)
646 .table((short) transitTable);
647
648 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
649 .extension(connTreatmentBuilder.build(), deviceId)
650 .build();
651
652 k8sFlowRuleService.setRule(
653 appId,
654 deviceId,
655 selector,
656 treatment,
657 priority,
658 installTable,
659 install);
660 }
661
662 private void setTrackNew(DeviceId deviceId, long ctState, long ctMask,
663 IpAddress dstIp, TpPort dstPort, int installTable,
664 int groupId, int priority, boolean install) {
665 ExtensionSelector esCtSate = RulePopulatorUtil
666 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
667 TrafficSelector selector = DefaultTrafficSelector.builder()
668 .matchEthType(Ethernet.TYPE_IPV4)
669 .matchIPDst(IpPrefix.valueOf(dstIp, HOST_CIDR_NUM))
670 .matchIPProtocol(IPv4.PROTOCOL_TCP)
671 .matchTcpDst(dstPort)
672 .extension(esCtSate, deviceId)
673 .build();
674 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
675 .group(GroupId.valueOf(groupId))
676 .build();
677
678 k8sFlowRuleService.setRule(
679 appId,
680 deviceId,
681 selector,
682 treatment,
683 priority,
684 installTable,
685 install);
686 }
687
688 private void setTrackEstablish(DeviceId deviceId, long ctState, long ctMask,
689 int installTable, int transitTable,
690 int priority, boolean install) {
691 ExtensionSelector esCtSate = RulePopulatorUtil
692 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
693 TrafficSelector selector = DefaultTrafficSelector.builder()
694 .extension(esCtSate, deviceId)
695 .build();
696
697 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
698 .transition(transitTable)
699 .build();
700
701 k8sFlowRuleService.setRule(
702 appId,
703 deviceId,
704 selector,
705 treatment,
706 priority,
707 installTable,
708 install);
709 }
710
Jian Li004526d2019-02-25 16:26:27 +0900711 /**
712 * Extracts properties from the component configuration context.
713 *
714 * @param context the component context
715 */
716 private void readComponentConfiguration(ComponentContext context) {
717 Dictionary<?, ?> properties = context.getProperties();
718
719 String updatedNatMode = Tools.get(properties, SERVICE_IP_NAT_MODE);
720 serviceIpNatMode = updatedNatMode != null ? updatedNatMode : SERVICE_IP_NAT_MODE_DEFAULT;
721 log.info("Configured. Service IP NAT mode is {}", serviceIpNatMode);
722 }
723
Jian Li4a7ce672019-04-09 15:20:25 +0900724 private void setServiceNatRules(DeviceId deviceId, boolean install) {
725 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
726 setStatefulServiceNatRules(deviceId, install);
727 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
728 setStatelessServiceNatRules(deviceId, install);
729 } else {
730 log.warn("Service IP NAT mode was not configured!");
731 }
732 }
733
Jian Li2cc2b632019-02-18 00:56:40 +0900734 private class InternalK8sServiceListener implements K8sServiceListener {
735
736 private boolean isRelevantHelper() {
737 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
738 }
739
740 @Override
741 public void event(K8sServiceEvent event) {
742 switch (event.type()) {
743 case K8S_SERVICE_CREATED:
Jian Li4a7ce672019-04-09 15:20:25 +0900744 case K8S_SERVICE_UPDATED:
Jian Li2cc2b632019-02-18 00:56:40 +0900745 eventExecutor.execute(() -> processServiceCreation(event.subject()));
746 break;
747 case K8S_SERVICE_REMOVED:
748 eventExecutor.execute(() -> processServiceRemoval(event.subject()));
749 break;
750 default:
751 // do nothing
752 break;
753 }
754 }
755
756 private void processServiceCreation(Service service) {
757 if (!isRelevantHelper()) {
758 return;
759 }
760
Jian Li5e8a22a2019-02-27 11:48:42 +0900761 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
762 long ctTrackNew = computeCtStateFlag(true, true, false);
763 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
Jian Li2cc2b632019-02-18 00:56:40 +0900764
Jian Li5e8a22a2019-02-27 11:48:42 +0900765 k8sNodeService.completeNodes().forEach(n ->
766 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
767 ctMaskTrackNew, service, true));
768 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
769 k8sNodeService.completeNodes().forEach(n ->
770 setStatelessGroupFlowRules(n.intgBridge(), service, true));
771 }
Jian Li2cc2b632019-02-18 00:56:40 +0900772 }
773
774 private void processServiceRemoval(Service service) {
775 if (!isRelevantHelper()) {
776 return;
777 }
778
Jian Li5e8a22a2019-02-27 11:48:42 +0900779 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
780 long ctTrackNew = computeCtStateFlag(true, true, false);
781 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
Jian Li2cc2b632019-02-18 00:56:40 +0900782
Jian Li5e8a22a2019-02-27 11:48:42 +0900783 k8sNodeService.completeNodes().forEach(n ->
784 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
785 ctMaskTrackNew, service, false));
786 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
787 k8sNodeService.completeNodes().forEach(n ->
788 setStatelessGroupFlowRules(n.intgBridge(), service, false));
789 }
Jian Li004526d2019-02-25 16:26:27 +0900790 }
Jian Li2cc2b632019-02-18 00:56:40 +0900791 }
792
Jian Li4a7ce672019-04-09 15:20:25 +0900793 private class InternalK8sPodListener implements K8sPodListener {
794
795 private boolean isRelevantHelper() {
796 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
797 }
798
799 @Override
800 public void event(K8sPodEvent event) {
801 switch (event.type()) {
Jian Li5c755832019-04-11 23:24:28 +0900802 case K8S_POD_ANNOTATION_ADDED:
803 eventExecutor.execute(() -> processPodAnnotAddition(event.subject()));
Jian Li4a7ce672019-04-09 15:20:25 +0900804 break;
805 case K8S_POD_REMOVED:
806 eventExecutor.execute(() -> processPodRemoval(event.subject()));
807 break;
808 default:
809 break;
810 }
811 }
812
Jian Li5c755832019-04-11 23:24:28 +0900813 private void processPodAnnotAddition(Pod pod) {
Jian Li4a7ce672019-04-09 15:20:25 +0900814 if (!isRelevantHelper()) {
815 return;
816 }
817
818 setServiceRuleFromPod(pod, true);
819 }
820
821 private void processPodRemoval(Pod pod) {
822 if (!isRelevantHelper()) {
823 return;
824 }
825
826 setServiceRuleFromPod(pod, false);
827 }
828
829 private void setServiceRuleFromPod(Pod pod, boolean install) {
830 k8sServiceService.services().forEach(s -> {
831 pod.getMetadata().getLabels().forEach((pk, pv) -> {
832 Map<String, String> selectors = s.getSpec().getSelector();
833 if (selectors != null && selectors.containsKey(pk)) {
834 if (pv.equals(selectors.get(pk))) {
835 k8sNodeService.completeNodes().forEach(n ->
836 setGroupBuckets(n.intgBridge(), s, pod, install));
837 }
838 }
839 });
840 });
841 }
842 }
843
Jian Li2cc2b632019-02-18 00:56:40 +0900844 private class InternalNodeEventListener implements K8sNodeListener {
845
846 private boolean isRelevantHelper() {
847 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
848 }
849
850 @Override
851 public void event(K8sNodeEvent event) {
852 K8sNode k8sNode = event.subject();
853 switch (event.type()) {
854 case K8S_NODE_COMPLETE:
855 eventExecutor.execute(() -> processNodeCompletion(k8sNode));
856 break;
857 case K8S_NODE_INCOMPLETE:
Jian Li4a7ce672019-04-09 15:20:25 +0900858 case K8S_NODE_REMOVED:
Jian Li2cc2b632019-02-18 00:56:40 +0900859 default:
860 break;
861 }
862 }
863
864 private void processNodeCompletion(K8sNode node) {
865 if (!isRelevantHelper()) {
866 return;
867 }
868
Jian Li4a7ce672019-04-09 15:20:25 +0900869 setServiceNatRules(node.intgBridge(), true);
Jian Li2cc2b632019-02-18 00:56:40 +0900870 }
871 }
872}