blob: 0dca52428a2988e6ac87e9c1b0d30e90b62b317f [file] [log] [blame]
Jian Li2cc2b632019-02-18 00:56:40 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.Lists;
Jian Li004526d2019-02-25 16:26:27 +090019import com.google.common.collect.Maps;
Jian Li2cc2b632019-02-18 00:56:40 +090020import io.fabric8.kubernetes.api.model.EndpointAddress;
21import io.fabric8.kubernetes.api.model.EndpointPort;
22import io.fabric8.kubernetes.api.model.EndpointSubset;
23import io.fabric8.kubernetes.api.model.Endpoints;
Jian Lie1a5b8f2019-07-23 17:13:19 +090024import io.fabric8.kubernetes.api.model.Pod;
Jian Li2cc2b632019-02-18 00:56:40 +090025import io.fabric8.kubernetes.api.model.Service;
Jian Li5e8a22a2019-02-27 11:48:42 +090026import io.fabric8.kubernetes.api.model.ServicePort;
Jian Li2cc2b632019-02-18 00:56:40 +090027import org.onlab.packet.Ethernet;
28import org.onlab.packet.IPv4;
29import org.onlab.packet.IpAddress;
30import org.onlab.packet.IpPrefix;
Jian Lif5da78a2019-04-15 01:52:23 +090031import org.onlab.packet.MacAddress;
Jian Li2cc2b632019-02-18 00:56:40 +090032import org.onlab.packet.TpPort;
Jian Li004526d2019-02-25 16:26:27 +090033import org.onlab.util.Tools;
34import org.onosproject.cfg.ComponentConfigService;
Jian Li2cc2b632019-02-18 00:56:40 +090035import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.LeadershipService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
40import org.onosproject.core.GroupId;
Jian Li7b63fe62019-04-14 21:49:27 +090041import org.onosproject.k8snetworking.api.K8sEndpointsEvent;
42import org.onosproject.k8snetworking.api.K8sEndpointsListener;
Jian Li2cc2b632019-02-18 00:56:40 +090043import org.onosproject.k8snetworking.api.K8sEndpointsService;
44import org.onosproject.k8snetworking.api.K8sFlowRuleService;
45import org.onosproject.k8snetworking.api.K8sGroupRuleService;
Jian Lif5da78a2019-04-15 01:52:23 +090046import org.onosproject.k8snetworking.api.K8sNetwork;
47import org.onosproject.k8snetworking.api.K8sNetworkEvent;
48import org.onosproject.k8snetworking.api.K8sNetworkListener;
Jian Li2cc2b632019-02-18 00:56:40 +090049import org.onosproject.k8snetworking.api.K8sNetworkService;
Jian Lie1a5b8f2019-07-23 17:13:19 +090050import org.onosproject.k8snetworking.api.K8sPodService;
Jian Li2cc2b632019-02-18 00:56:40 +090051import org.onosproject.k8snetworking.api.K8sServiceEvent;
52import org.onosproject.k8snetworking.api.K8sServiceListener;
53import org.onosproject.k8snetworking.api.K8sServiceService;
54import org.onosproject.k8snetworking.util.RulePopulatorUtil;
55import org.onosproject.k8snetworking.util.RulePopulatorUtil.NiciraConnTrackTreatmentBuilder;
56import org.onosproject.k8snode.api.K8sNode;
57import org.onosproject.k8snode.api.K8sNodeEvent;
58import org.onosproject.k8snode.api.K8sNodeListener;
59import org.onosproject.k8snode.api.K8sNodeService;
60import org.onosproject.net.DeviceId;
61import org.onosproject.net.device.DeviceService;
62import org.onosproject.net.driver.DriverService;
63import org.onosproject.net.flow.DefaultTrafficSelector;
64import org.onosproject.net.flow.DefaultTrafficTreatment;
65import org.onosproject.net.flow.TrafficSelector;
66import org.onosproject.net.flow.TrafficTreatment;
67import org.onosproject.net.flow.criteria.ExtensionSelector;
68import org.onosproject.net.flow.instructions.ExtensionTreatment;
69import org.onosproject.net.group.GroupBucket;
Jian Li2cc2b632019-02-18 00:56:40 +090070import org.onosproject.store.service.StorageService;
Jian Li004526d2019-02-25 16:26:27 +090071import org.osgi.service.component.ComponentContext;
Jian Li2cc2b632019-02-18 00:56:40 +090072import org.osgi.service.component.annotations.Activate;
73import org.osgi.service.component.annotations.Component;
74import org.osgi.service.component.annotations.Deactivate;
Jian Li004526d2019-02-25 16:26:27 +090075import org.osgi.service.component.annotations.Modified;
Jian Li2cc2b632019-02-18 00:56:40 +090076import org.osgi.service.component.annotations.Reference;
77import org.osgi.service.component.annotations.ReferenceCardinality;
78import org.slf4j.Logger;
79
Jian Li004526d2019-02-25 16:26:27 +090080import java.util.Dictionary;
Jian Li2cc2b632019-02-18 00:56:40 +090081import java.util.List;
82import java.util.Map;
83import java.util.Objects;
Jian Li004526d2019-02-25 16:26:27 +090084import java.util.Set;
Jian Li2cc2b632019-02-18 00:56:40 +090085import java.util.concurrent.ExecutorService;
86import java.util.stream.Collectors;
87
88import static java.util.concurrent.Executors.newSingleThreadExecutor;
89import static org.onlab.util.Tools.groupedThreads;
Jian Li73d3b6a2019-07-08 18:07:53 +090090import static org.onosproject.k8snetworking.api.Constants.ACL_TABLE;
Jian Li140d8a22019-04-24 23:41:44 +090091import static org.onosproject.k8snetworking.api.Constants.A_CLASS;
92import static org.onosproject.k8snetworking.api.Constants.B_CLASS;
Jian Li5e8a22a2019-02-27 11:48:42 +090093import static org.onosproject.k8snetworking.api.Constants.DST;
Jian Li73d3b6a2019-07-08 18:07:53 +090094import static org.onosproject.k8snetworking.api.Constants.GROUPING_TABLE;
Jian Li2cc2b632019-02-18 00:56:40 +090095import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Lie1a5b8f2019-07-23 17:13:19 +090096import static org.onosproject.k8snetworking.api.Constants.NAMESPACE_TABLE;
Jian Li004526d2019-02-25 16:26:27 +090097import static org.onosproject.k8snetworking.api.Constants.NAT_STATEFUL;
98import static org.onosproject.k8snetworking.api.Constants.NAT_STATELESS;
Jian Li2cc2b632019-02-18 00:56:40 +090099import static org.onosproject.k8snetworking.api.Constants.NAT_TABLE;
Jian Li140d8a22019-04-24 23:41:44 +0900100import static org.onosproject.k8snetworking.api.Constants.NODE_IP_PREFIX;
Jian Li004526d2019-02-25 16:26:27 +0900101import static org.onosproject.k8snetworking.api.Constants.POD_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900102import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE;
Jian Li2cc2b632019-02-18 00:56:40 +0900103import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE;
Jian Li7d111d72019-04-12 13:58:44 +0900104import static org.onosproject.k8snetworking.api.Constants.PRIORITY_INTER_ROUTING_RULE;
Jian Li004526d2019-02-25 16:26:27 +0900105import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAT_RULE;
Jian Li2cc2b632019-02-18 00:56:40 +0900106import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900107import static org.onosproject.k8snetworking.api.Constants.SERVICE_FAKE_MAC_STR;
Jian Li004526d2019-02-25 16:26:27 +0900108import static org.onosproject.k8snetworking.api.Constants.SERVICE_TABLE;
109import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_CIDR;
110import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
Jian Li5e8a22a2019-02-27 11:48:42 +0900111import static org.onosproject.k8snetworking.api.Constants.SRC;
Jian Lie1a5b8f2019-07-23 17:13:19 +0900112import static org.onosproject.k8snetworking.api.Constants.STAT_EGRESS_TABLE;
Jian Li619fa282020-09-02 14:45:35 +0900113import static org.onosproject.k8snetworking.api.Constants.TUN_ENTRY_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900114import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_CIDR;
115import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_CIDR_DEFAULT;
Jian Li004526d2019-02-25 16:26:27 +0900116import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE;
117import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE_DEFAULT;
Jian Li140d8a22019-04-24 23:41:44 +0900118import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getBclassIpPrefixFromCidr;
Jian Li2cc2b632019-02-18 00:56:40 +0900119import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap;
Jian Lie1a5b8f2019-07-23 17:13:19 +0900120import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.podByIp;
121import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.portNumberByName;
Jian Li2cc2b632019-02-18 00:56:40 +0900122import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_DST_FLAG;
123import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildGroupBucket;
Jian Li004526d2019-02-25 16:26:27 +0900124import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildLoadExtension;
Jian Li2cc2b632019-02-18 00:56:40 +0900125import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildResubmitExtension;
126import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtMaskFlag;
127import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtStateFlag;
128import static org.onosproject.k8snetworking.util.RulePopulatorUtil.niciraConnTrackTreatmentBuilder;
129import static org.onosproject.net.group.GroupDescription.Type.SELECT;
130import static org.slf4j.LoggerFactory.getLogger;
131
132/**
133 * Handles the service IP to pod IP related translation traffic.
134 */
Jian Li004526d2019-02-25 16:26:27 +0900135@Component(
136 immediate = true,
137 property = {
Jian Lif5da78a2019-04-15 01:52:23 +0900138 SERVICE_IP_NAT_MODE + "=" + SERVICE_IP_NAT_MODE_DEFAULT,
139 SERVICE_CIDR + "=" + SERVICE_IP_CIDR_DEFAULT
Jian Li004526d2019-02-25 16:26:27 +0900140 }
141)
Jian Li2cc2b632019-02-18 00:56:40 +0900142public class K8sServiceHandler {
143
144 private final Logger log = getLogger(getClass());
145
Jian Li2cc2b632019-02-18 00:56:40 +0900146 private static final int HOST_CIDR_NUM = 32;
147
Jian Li2cc2b632019-02-18 00:56:40 +0900148 private static final String CLUSTER_IP = "ClusterIP";
149 private static final String TCP = "TCP";
Jian Li5e8a22a2019-02-27 11:48:42 +0900150 private static final String UDP = "UDP";
Jian Lif5da78a2019-04-15 01:52:23 +0900151 private static final String SERVICE_IP_NAT_MODE = "serviceIpNatMode";
Jian Li2cc2b632019-02-18 00:56:40 +0900152
Jian Li140d8a22019-04-24 23:41:44 +0900153 private static final String SERVICE_CIDR = "serviceCidr";
Jian Li44c2b122019-05-03 14:46:34 +0900154 private static final String NONE = "None";
Jian Li140d8a22019-04-24 23:41:44 +0900155 private static final String B_CLASS_SUFFIX = ".0.0/16";
156 private static final String A_CLASS_SUFFIX = ".0.0.0/8";
157
Jian Li2cc2b632019-02-18 00:56:40 +0900158 @Reference(cardinality = ReferenceCardinality.MANDATORY)
159 protected CoreService coreService;
160
161 @Reference(cardinality = ReferenceCardinality.MANDATORY)
162 protected LeadershipService leadershipService;
163
164 @Reference(cardinality = ReferenceCardinality.MANDATORY)
165 protected ClusterService clusterService;
166
167 @Reference(cardinality = ReferenceCardinality.MANDATORY)
168 protected DriverService driverService;
169
170 @Reference(cardinality = ReferenceCardinality.MANDATORY)
171 protected DeviceService deviceService;
172
173 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li004526d2019-02-25 16:26:27 +0900174 protected ComponentConfigService configService;
175
176 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li2cc2b632019-02-18 00:56:40 +0900177 protected StorageService storageService;
178
179 @Reference(cardinality = ReferenceCardinality.MANDATORY)
180 protected K8sNetworkService k8sNetworkService;
181
182 @Reference(cardinality = ReferenceCardinality.MANDATORY)
183 protected K8sFlowRuleService k8sFlowRuleService;
184
185 @Reference(cardinality = ReferenceCardinality.MANDATORY)
186 protected K8sGroupRuleService k8sGroupRuleService;
187
188 @Reference(cardinality = ReferenceCardinality.MANDATORY)
189 protected K8sNodeService k8sNodeService;
190
191 @Reference(cardinality = ReferenceCardinality.MANDATORY)
192 protected K8sEndpointsService k8sEndpointsService;
193
194 @Reference(cardinality = ReferenceCardinality.MANDATORY)
195 protected K8sServiceService k8sServiceService;
196
Jian Lie1a5b8f2019-07-23 17:13:19 +0900197 @Reference(cardinality = ReferenceCardinality.MANDATORY)
198 protected K8sPodService k8sPodService;
Jian Li4a7ce672019-04-09 15:20:25 +0900199
Jian Li004526d2019-02-25 16:26:27 +0900200 /** Service IP address translation mode. */
201 private String serviceIpNatMode = SERVICE_IP_NAT_MODE_DEFAULT;
202
Jian Lif5da78a2019-04-15 01:52:23 +0900203 /** Ranges of IP address of service VIP. */
204 private String serviceCidr = SERVICE_IP_CIDR_DEFAULT;
205
Jian Li2cc2b632019-02-18 00:56:40 +0900206 private final ExecutorService eventExecutor = newSingleThreadExecutor(
207 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
208 private final InternalNodeEventListener internalNodeEventListener =
209 new InternalNodeEventListener();
210 private final InternalK8sServiceListener internalK8sServiceListener =
211 new InternalK8sServiceListener();
Jian Li7b63fe62019-04-14 21:49:27 +0900212 private final InternalK8sEndpointsListener internalK8sEndpointsListener =
213 new InternalK8sEndpointsListener();
Jian Lif5da78a2019-04-15 01:52:23 +0900214 private final InternalK8sNetworkListener internalK8sNetworkListener =
215 new InternalK8sNetworkListener();
Jian Li2cc2b632019-02-18 00:56:40 +0900216
Jian Li2cc2b632019-02-18 00:56:40 +0900217 private ApplicationId appId;
218 private NodeId localNodeId;
219
220 @Activate
221 protected void activate() {
222 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
Jian Li004526d2019-02-25 16:26:27 +0900223 configService.registerProperties(getClass());
Jian Li2cc2b632019-02-18 00:56:40 +0900224 localNodeId = clusterService.getLocalNode().id();
225 leadershipService.runForLeadership(appId.name());
226 k8sNodeService.addListener(internalNodeEventListener);
227 k8sServiceService.addListener(internalK8sServiceListener);
Jian Li7b63fe62019-04-14 21:49:27 +0900228 k8sEndpointsService.addListener(internalK8sEndpointsListener);
Jian Lif5da78a2019-04-15 01:52:23 +0900229 k8sNetworkService.addListener(internalK8sNetworkListener);
Jian Li2cc2b632019-02-18 00:56:40 +0900230
Jian Li2cc2b632019-02-18 00:56:40 +0900231 log.info("Started");
232 }
233
234 @Deactivate
235 protected void deactivate() {
236 leadershipService.withdraw(appId.name());
237 k8sNodeService.removeListener(internalNodeEventListener);
238 k8sServiceService.removeListener(internalK8sServiceListener);
Jian Li7b63fe62019-04-14 21:49:27 +0900239 k8sEndpointsService.removeListener(internalK8sEndpointsListener);
Jian Lif5da78a2019-04-15 01:52:23 +0900240 k8sNetworkService.removeListener(internalK8sNetworkListener);
Jian Li004526d2019-02-25 16:26:27 +0900241 configService.unregisterProperties(getClass(), false);
Jian Li2cc2b632019-02-18 00:56:40 +0900242 eventExecutor.shutdown();
243
244 log.info("Stopped");
245 }
246
Jian Li004526d2019-02-25 16:26:27 +0900247 @Modified
248 void modified(ComponentContext context) {
249 readComponentConfiguration(context);
250
251 log.info("Modified");
252 }
253
254 private void setStatefulServiceNatRules(DeviceId deviceId, boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900255 // -trk CT rules
256 long ctUntrack = computeCtStateFlag(false, false, false);
257 long ctMaskUntrack = computeCtMaskFlag(true, false, false);
258
259 k8sNetworkService.networks().forEach(n -> {
260 // TODO: need to provide a way to add multiple service IP CIDR ranges
Jian Lif5da78a2019-04-15 01:52:23 +0900261 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), serviceCidr,
Jian Li73d3b6a2019-07-08 18:07:53 +0900262 GROUPING_TABLE, NAT_TABLE, PRIORITY_CT_RULE, install);
Jian Li2cc2b632019-02-18 00:56:40 +0900263 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), n.cidr(),
Jian Lie1a5b8f2019-07-23 17:13:19 +0900264 GROUPING_TABLE, NAMESPACE_TABLE, PRIORITY_CT_RULE, install);
Jian Li2cc2b632019-02-18 00:56:40 +0900265 });
266
267 // +trk-new CT rules
268 long ctTrackUnnew = computeCtStateFlag(true, false, false);
269 long ctMaskTrackUnnew = computeCtMaskFlag(true, true, false);
270
271 setTrackEstablish(deviceId, ctTrackUnnew, ctMaskTrackUnnew,
272 NAT_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
273
274 // +trk+new CT rules
275 long ctTrackNew = computeCtStateFlag(true, true, false);
276 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
277
278 k8sServiceService.services().stream()
279 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
Jian Li004526d2019-02-25 16:26:27 +0900280 .forEach(s -> setStatefulGroupFlowRules(deviceId, ctTrackNew,
281 ctMaskTrackNew, s, install));
Jian Li2cc2b632019-02-18 00:56:40 +0900282 }
283
Jian Li004526d2019-02-25 16:26:27 +0900284 private void setStatelessServiceNatRules(DeviceId deviceId, boolean install) {
285
Jian Li140d8a22019-04-24 23:41:44 +0900286 String srcPodCidr = k8sNetworkService.network(
Jian Li7d111d72019-04-12 13:58:44 +0900287 k8sNodeService.node(deviceId).hostname()).cidr();
Jian Li140d8a22019-04-24 23:41:44 +0900288 String srcPodPrefix = getBclassIpPrefixFromCidr(srcPodCidr);
289 String fullSrcPodCidr = srcPodPrefix + B_CLASS_SUFFIX;
290 String fullSrcNodeCidr = NODE_IP_PREFIX + A_CLASS_SUFFIX;
291
292 // src: POD -> dst: service (unNAT POD) grouping
293 setSrcDstCidrRules(deviceId, fullSrcPodCidr, serviceCidr, B_CLASS, null,
Jian Li73d3b6a2019-07-08 18:07:53 +0900294 SHIFTED_IP_PREFIX, SRC, GROUPING_TABLE, SERVICE_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900295 PRIORITY_CT_RULE, install);
296 // src: POD (unNAT service) -> dst: shifted POD grouping
297 setSrcDstCidrRules(deviceId, fullSrcPodCidr, SHIFTED_IP_CIDR, B_CLASS, null,
Jian Li73d3b6a2019-07-08 18:07:53 +0900298 srcPodPrefix, DST, GROUPING_TABLE, POD_TABLE, PRIORITY_CT_RULE, install);
Jian Li140d8a22019-04-24 23:41:44 +0900299
300 // src: node -> dst: service (unNAT POD) grouping
301 setSrcDstCidrRules(deviceId, fullSrcNodeCidr, serviceCidr, A_CLASS,
Jian Li73d3b6a2019-07-08 18:07:53 +0900302 null, null, null, GROUPING_TABLE, SERVICE_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900303 PRIORITY_CT_RULE, install);
304 // src: POD (unNAT service) -> dst: node grouping
305 setSrcDstCidrRules(deviceId, fullSrcPodCidr, fullSrcNodeCidr, A_CLASS,
Jian Li73d3b6a2019-07-08 18:07:53 +0900306 null, null, null, GROUPING_TABLE, POD_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900307 PRIORITY_CT_RULE, install);
Jian Li7d111d72019-04-12 13:58:44 +0900308
Jian Li004526d2019-02-25 16:26:27 +0900309 k8sNetworkService.networks().forEach(n -> {
Jian Li140d8a22019-04-24 23:41:44 +0900310 setSrcDstCidrRules(deviceId, fullSrcPodCidr, n.cidr(), B_CLASS,
311 n.segmentId(), null, null, ROUTING_TABLE,
Jian Li73d3b6a2019-07-08 18:07:53 +0900312 STAT_EGRESS_TABLE, PRIORITY_INTER_ROUTING_RULE, install);
Jian Li004526d2019-02-25 16:26:27 +0900313 });
314
315 // setup load balancing rules using group table
Jian Li5abc9f02020-09-04 19:38:37 +0900316 k8sServiceService.services().forEach(s -> setStatelessGroupFlowRules(deviceId, s, install));
Jian Li004526d2019-02-25 16:26:27 +0900317 }
318
319 private void setSrcDstCidrRules(DeviceId deviceId, String srcCidr,
Jian Li140d8a22019-04-24 23:41:44 +0900320 String dstCidr, String cidrClass,
321 String segId, String shiftPrefix,
322 String shiftType, int installTable,
323 int transitTable, int priority,
324 boolean install) {
Jian Li004526d2019-02-25 16:26:27 +0900325 TrafficSelector selector = DefaultTrafficSelector.builder()
326 .matchEthType(Ethernet.TYPE_IPV4)
327 .matchIPSrc(IpPrefix.valueOf(srcCidr))
328 .matchIPDst(IpPrefix.valueOf(dstCidr))
329 .build();
330
Jian Li7d111d72019-04-12 13:58:44 +0900331 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
332
333 if (segId != null) {
334 tBuilder.setTunnelId(Long.valueOf(segId));
335 }
Jian Li140d8a22019-04-24 23:41:44 +0900336
337 if (shiftPrefix != null && shiftType != null) {
338 ExtensionTreatment loadTreatment = buildLoadExtension(
339 deviceService.getDevice(deviceId), cidrClass, shiftType, shiftPrefix);
340 tBuilder.extension(loadTreatment, deviceId);
341 }
342
Jian Li7d111d72019-04-12 13:58:44 +0900343 tBuilder.transition(transitTable);
Jian Li004526d2019-02-25 16:26:27 +0900344
345 k8sFlowRuleService.setRule(
346 appId,
347 deviceId,
348 selector,
Jian Li7d111d72019-04-12 13:58:44 +0900349 tBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900350 priority,
351 installTable,
352 install);
353 }
354
Jian Li5e8a22a2019-02-27 11:48:42 +0900355 /**
356 * Obtains the service port to endpoint address paired map.
357 *
358 * @param service kubernetes service
359 * @return a map where key is kubernetes service port, and value is the
360 * endpoint addresses that are associated with the service port
361 */
362 private Map<ServicePort, Set<String>> getSportEpAddressMap(Service service) {
363
364 Map<ServicePort, Set<String>> map = Maps.newConcurrentMap();
Jian Li004526d2019-02-25 16:26:27 +0900365
366 String serviceName = service.getMetadata().getName();
Jian Li004526d2019-02-25 16:26:27 +0900367 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
368 .stream()
369 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
370 .collect(Collectors.toList());
371
Jian Li5e8a22a2019-02-27 11:48:42 +0900372 service.getSpec().getPorts().stream()
373 .filter(Objects::nonNull)
374 .filter(sp -> sp.getTargetPort() != null)
Jian Lib7dfb5b2019-07-15 17:37:12 +0900375 .filter(sp -> sp.getTargetPort().getIntVal() != null ||
Jian Lie1a5b8f2019-07-23 17:13:19 +0900376 sp.getTargetPort().getStrVal() != null)
Jian Li5e8a22a2019-02-27 11:48:42 +0900377 .forEach(sp -> {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900378 Integer targetPortInt = sp.getTargetPort().getIntVal() != null ?
379 sp.getTargetPort().getIntVal() : 0;
380 String targetPortName = sp.getTargetPort().getStrVal() != null ?
381 sp.getTargetPort().getStrVal() : "";
382 String targetProtocol = sp.getProtocol();
Jian Li004526d2019-02-25 16:26:27 +0900383
Jian Lie1a5b8f2019-07-23 17:13:19 +0900384 for (Endpoints endpoints : endpointses) {
385 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
386
387 // in case service port name is specified but not port number
388 // we will lookup the container port number and use it
389 // as the target port number
390 if (!targetPortName.equals("") && targetPortInt == 0) {
391 for (EndpointAddress addr : endpointSubset.getAddresses()) {
392 Pod pod = podByIp(k8sPodService, addr.getIp());
393 targetPortInt = portNumberByName(pod, targetPortName);
394 }
395 }
396
Jian Li5cf3b002019-08-30 17:57:53 +0900397 if (targetPortInt == 0) {
398 continue;
399 }
400
Jian Lie1a5b8f2019-07-23 17:13:19 +0900401 for (EndpointPort endpointPort : endpointSubset.getPorts()) {
402 if (targetProtocol.equals(endpointPort.getProtocol()) &&
403 (targetPortInt.equals(endpointPort.getPort()) ||
404 targetPortName.equals(endpointPort.getName()))) {
405 Set<String> addresses = endpointSubset.getAddresses()
406 .stream().map(EndpointAddress::getIp)
407 .collect(Collectors.toSet());
408 map.put(sp, addresses);
409 }
410 }
Jian Li004526d2019-02-25 16:26:27 +0900411 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900412 }
Jian Lie1a5b8f2019-07-23 17:13:19 +0900413 });
Jian Li004526d2019-02-25 16:26:27 +0900414
Jian Li5e8a22a2019-02-27 11:48:42 +0900415 return map;
416 }
417
Jian Libf562c22019-04-15 18:07:14 +0900418 private void setGroupBuckets(Service service, boolean install) {
Jian Li4a7ce672019-04-09 15:20:25 +0900419 Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
420 Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
Jian Li7b63fe62019-04-14 21:49:27 +0900421 Map<String, String> nodeIpGatewayIpMap =
422 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
Jian Li4a7ce672019-04-09 15:20:25 +0900423
Jian Libf562c22019-04-15 18:07:14 +0900424 for (K8sNode node : k8sNodeService.completeNodes()) {
425 spEpasMap.forEach((sp, epas) -> {
426 List<GroupBucket> bkts = Lists.newArrayList();
Jian Li4a7ce672019-04-09 15:20:25 +0900427
Jian Libf562c22019-04-15 18:07:14 +0900428 for (String ip : epas) {
Jian Li5cf3b002019-08-30 17:57:53 +0900429 GroupBucket bkt = buildBuckets(node.intgBridge(),
430 nodeIpGatewayIpMap.getOrDefault(ip, ip), sp);
431
432 if (bkt == null) {
433 continue;
434 }
435
Jian Libf562c22019-04-15 18:07:14 +0900436 if (install) {
Jian Li5cf3b002019-08-30 17:57:53 +0900437 bkts.add(bkt);
Jian Libf562c22019-04-15 18:07:14 +0900438 } else {
Jian Li5cf3b002019-08-30 17:57:53 +0900439 bkts.remove(bkt);
Jian Li7b63fe62019-04-14 21:49:27 +0900440 }
Jian Libf562c22019-04-15 18:07:14 +0900441 }
442
443 spGrpBkts.put(sp, bkts);
444 });
445
446 String serviceIp = service.getSpec().getClusterIP();
447 spGrpBkts.forEach((sp, bkts) -> {
448 String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
449 int groupId = svcStr.hashCode();
450
451 if (bkts.size() > 0) {
452 k8sGroupRuleService.setBuckets(appId, node.intgBridge(), groupId, bkts);
453 }
454 });
455
456 spEpasMap.forEach((sp, epas) ->
Jian Lie1a5b8f2019-07-23 17:13:19 +0900457 // add flow rules for unshifting IP domain
458 epas.forEach(epa -> {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900459
Jian Lie1a5b8f2019-07-23 17:13:19 +0900460 String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
461
462 int targetPort;
463 if (sp.getTargetPort().getIntVal() == null) {
464 Pod pod = podByIp(k8sPodService, podIp);
465 targetPort = portNumberByName(pod, sp.getTargetPort().getStrVal());
466 } else {
467 targetPort = sp.getTargetPort().getIntVal();
468 }
469
Jian Li5cf3b002019-08-30 17:57:53 +0900470 if (targetPort != 0) {
471 setUnshiftDomainRules(node.intgBridge(), POD_TABLE,
472 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
473 sp.getProtocol(), podIp,
474 targetPort, install);
475 }
Jian Lie1a5b8f2019-07-23 17:13:19 +0900476 })
Jian Libf562c22019-04-15 18:07:14 +0900477 );
478 }
Jian Li4a7ce672019-04-09 15:20:25 +0900479 }
480
Jian Lib7dfb5b2019-07-15 17:37:12 +0900481 private GroupBucket buildBuckets(DeviceId deviceId, String podIpStr, ServicePort sp) {
Jian Li4a7ce672019-04-09 15:20:25 +0900482 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li7d111d72019-04-12 13:58:44 +0900483 .setIpDst(IpAddress.valueOf(podIpStr));
Jian Li4a7ce672019-04-09 15:20:25 +0900484
Jian Lie1a5b8f2019-07-23 17:13:19 +0900485 int targetPort;
486 if (sp.getTargetPort().getIntVal() == null) {
487 Pod pod = podByIp(k8sPodService, podIpStr);
488 targetPort = portNumberByName(pod, sp.getTargetPort().getStrVal());
489 } else {
490 targetPort = sp.getTargetPort().getIntVal();
491 }
Jian Lib7dfb5b2019-07-15 17:37:12 +0900492
Jian Li5cf3b002019-08-30 17:57:53 +0900493 if (targetPort == 0) {
494 return null;
495 }
496
Jian Li4a7ce672019-04-09 15:20:25 +0900497 if (TCP.equals(sp.getProtocol())) {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900498 tBuilder.setTcpDst(TpPort.tpPort(targetPort));
Jian Li4a7ce672019-04-09 15:20:25 +0900499 } else if (UDP.equals(sp.getProtocol())) {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900500 tBuilder.setUdpDst(TpPort.tpPort(targetPort));
Jian Li4a7ce672019-04-09 15:20:25 +0900501 }
502
Jian Li7d111d72019-04-12 13:58:44 +0900503 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
Jian Li73d3b6a2019-07-08 18:07:53 +0900504 deviceService.getDevice(deviceId), ACL_TABLE);
Jian Li7d111d72019-04-12 13:58:44 +0900505 tBuilder.extension(resubmitTreatment, deviceId);
506
Jian Li5cf3b002019-08-30 17:57:53 +0900507 // TODO: need to adjust group bucket weight by considering POD locality
Jian Libf562c22019-04-15 18:07:14 +0900508 return buildGroupBucket(tBuilder.build(), SELECT, (short) -1);
Jian Li4a7ce672019-04-09 15:20:25 +0900509 }
510
511 private synchronized void setStatelessGroupFlowRules(DeviceId deviceId,
512 Service service,
513 boolean install) {
Jian Li7b63fe62019-04-14 21:49:27 +0900514 Set<ServicePort> sps = service.getSpec().getPorts().stream()
515 .filter(Objects::nonNull)
516 .filter(sp -> sp.getTargetPort() != null)
Jian Lib7dfb5b2019-07-15 17:37:12 +0900517 .filter(sp -> sp.getTargetPort().getIntVal() != null ||
Jian Lie1a5b8f2019-07-23 17:13:19 +0900518 sp.getTargetPort().getStrVal() != null)
Jian Li7b63fe62019-04-14 21:49:27 +0900519 .collect(Collectors.toSet());
Jian Li5e8a22a2019-02-27 11:48:42 +0900520
521 String serviceIp = service.getSpec().getClusterIP();
Jian Li7b63fe62019-04-14 21:49:27 +0900522 sps.forEach(sp -> {
Jian Li5e8a22a2019-02-27 11:48:42 +0900523 String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
Jian Li4a7ce672019-04-09 15:20:25 +0900524 int groupId = svcStr.hashCode();
Jian Li5e8a22a2019-02-27 11:48:42 +0900525
Jian Li4a7ce672019-04-09 15:20:25 +0900526 if (install) {
527
528 // add group table rules
529 k8sGroupRuleService.setRule(appId, deviceId, groupId,
Jian Li7b63fe62019-04-14 21:49:27 +0900530 SELECT, Lists.newArrayList(), true);
Jian Li4a7ce672019-04-09 15:20:25 +0900531
532 log.info("Adding group rule {}", groupId);
533
534 // if we failed to add group rule, we will not install flow rules
535 // as this might cause rule inconsistency
536 if (k8sGroupRuleService.hasGroup(deviceId, groupId)) {
537 // add flow rules for shifting IP domain
538 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
539 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
540 sp.getProtocol(), true);
541 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900542 } else {
Jian Li4a7ce672019-04-09 15:20:25 +0900543 // remove flow rules for shifting IP domain
544 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
545 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
546 sp.getProtocol(), false);
547
548 // remove group table rules
549 k8sGroupRuleService.setRule(appId, deviceId, groupId,
Jian Li7b63fe62019-04-14 21:49:27 +0900550 SELECT, Lists.newArrayList(), false);
Jian Li4a7ce672019-04-09 15:20:25 +0900551
552 log.info("Removing group rule {}", groupId);
Jian Li5e8a22a2019-02-27 11:48:42 +0900553 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900554 });
Jian Li004526d2019-02-25 16:26:27 +0900555 }
556
557 private void setShiftDomainRules(DeviceId deviceId, int installTable,
Jian Li5e8a22a2019-02-27 11:48:42 +0900558 int groupId, int priority, String serviceIp,
559 int servicePort, String protocol, boolean install) {
Jian Li44c2b122019-05-03 14:46:34 +0900560
561 if (serviceIp == null || NONE.equals(serviceIp)) {
562 return;
563 }
564
Jian Li5e8a22a2019-02-27 11:48:42 +0900565 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Li004526d2019-02-25 16:26:27 +0900566 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li5e8a22a2019-02-27 11:48:42 +0900567 .matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(serviceIp), HOST_CIDR_NUM));
568
569 if (TCP.equals(protocol)) {
570 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
571 .matchTcpDst(TpPort.tpPort(servicePort));
572 } else if (UDP.equals(protocol)) {
573 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
574 .matchUdpDst(TpPort.tpPort(servicePort));
575 }
Jian Li004526d2019-02-25 16:26:27 +0900576
Jian Li004526d2019-02-25 16:26:27 +0900577 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
Jian Li004526d2019-02-25 16:26:27 +0900578 .group(GroupId.valueOf(groupId))
579 .build();
580
581 k8sFlowRuleService.setRule(
582 appId,
583 deviceId,
Jian Li5e8a22a2019-02-27 11:48:42 +0900584 sBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900585 treatment,
586 priority,
587 installTable,
588 install);
589 }
590
591 private void setUnshiftDomainRules(DeviceId deviceId, int installTable,
Jian Li5e8a22a2019-02-27 11:48:42 +0900592 int priority, String serviceIp,
593 int servicePort, String protocol,
594 String podIp, int podPort, boolean install) {
595 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Li004526d2019-02-25 16:26:27 +0900596 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li5e8a22a2019-02-27 11:48:42 +0900597 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(podIp), HOST_CIDR_NUM));
598
599 if (TCP.equals(protocol)) {
600 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
601 .matchTcpSrc(TpPort.tpPort(podPort));
602 } else if (UDP.equals(protocol)) {
603 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
604 .matchUdpSrc(TpPort.tpPort(podPort));
605 }
606
Jian Li5e8a22a2019-02-27 11:48:42 +0900607 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li004526d2019-02-25 16:26:27 +0900608 .setIpSrc(IpAddress.valueOf(serviceIp))
Jian Li73d3b6a2019-07-08 18:07:53 +0900609 .transition(ACL_TABLE);
Jian Li5e8a22a2019-02-27 11:48:42 +0900610
611 if (TCP.equals(protocol)) {
612 tBuilder.setTcpSrc(TpPort.tpPort(servicePort));
613 } else if (UDP.equals(protocol)) {
614 tBuilder.setUdpSrc(TpPort.tpPort(servicePort));
615 }
Jian Li004526d2019-02-25 16:26:27 +0900616
617 k8sFlowRuleService.setRule(
618 appId,
619 deviceId,
Jian Li5e8a22a2019-02-27 11:48:42 +0900620 sBuilder.build(),
621 tBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900622 priority,
623 installTable,
624 install);
625 }
626
Jian Lif5da78a2019-04-15 01:52:23 +0900627 private void setCidrRoutingRule(IpPrefix prefix, MacAddress mac,
628 K8sNetwork network, boolean install) {
629 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
630 .matchEthType(Ethernet.TYPE_IPV4)
631 .matchIPSrc(prefix)
632 .matchIPDst(IpPrefix.valueOf(network.cidr()));
633
Jian Lidc1df642020-11-25 16:49:34 +0900634 Set<K8sNode> nodes = install ? k8sNodeService.completeNodes() : k8sNodeService.nodes();
635
636 nodes.forEach(n -> {
Jian Lif5da78a2019-04-15 01:52:23 +0900637 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
638 .setTunnelId(Long.valueOf(network.segmentId()));
639
640 if (n.hostname().equals(network.name())) {
641 if (mac != null) {
642 tBuilder.setEthSrc(mac);
643 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900644 tBuilder.transition(STAT_EGRESS_TABLE);
Jian Li619fa282020-09-02 14:45:35 +0900645
646 // install rules into tunnel bridge
Jian Li4b5048a2020-10-08 02:57:45 +0900647 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
648 .setOutput(n.tunToIntgPortNum())
Jian Li619fa282020-09-02 14:45:35 +0900649 .build();
650
651 k8sFlowRuleService.setRule(
652 appId,
653 n.tunBridge(),
654 sBuilder.build(),
Jian Li4b5048a2020-10-08 02:57:45 +0900655 treatment,
Jian Li619fa282020-09-02 14:45:35 +0900656 PRIORITY_CIDR_RULE,
657 TUN_ENTRY_TABLE,
658 install
659 );
Jian Li4b5048a2020-10-08 02:57:45 +0900660 } else {
661 tBuilder.setOutput(n.intgToTunPortNum());
Jian Lif5da78a2019-04-15 01:52:23 +0900662 }
663
664 k8sFlowRuleService.setRule(
665 appId,
666 n.intgBridge(),
667 sBuilder.build(),
668 tBuilder.build(),
669 PRIORITY_CIDR_RULE,
670 ROUTING_TABLE,
671 install
672 );
673 });
674 }
675
676 private void setupServiceDefaultRule(K8sNetwork k8sNetwork, boolean install) {
677 setCidrRoutingRule(IpPrefix.valueOf(serviceCidr),
678 MacAddress.valueOf(SERVICE_FAKE_MAC_STR), k8sNetwork, install);
679 }
680
Jian Li004526d2019-02-25 16:26:27 +0900681 private void setStatefulGroupFlowRules(DeviceId deviceId, long ctState,
682 long ctMask, Service service,
683 boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900684 List<GroupBucket> buckets = Lists.newArrayList();
685
686 String serviceName = service.getMetadata().getName();
687 String serviceIp = service.getSpec().getClusterIP();
688
689 // TODO: multi-ports case should be addressed
690 Integer servicePort = service.getSpec().getPorts().get(0).getPort();
Jian Libf562c22019-04-15 18:07:14 +0900691 String serviceProtocol = service.getSpec().getPorts().get(0).getProtocol();
692
693 String svcStr = servicePortStr(serviceIp, servicePort, serviceProtocol);
694 int groupId = svcStr.hashCode();
Jian Li2cc2b632019-02-18 00:56:40 +0900695
696 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
697 .stream()
698 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
699 .collect(Collectors.toList());
700
701 Map<String, String> nodeIpGatewayIpMap =
702 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
703
704 for (Endpoints endpoints : endpointses) {
705 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
706 List<EndpointPort> ports = endpointSubset.getPorts()
707 .stream()
708 .filter(p -> p.getProtocol().equals(TCP))
709 .collect(Collectors.toList());
710
711 for (EndpointAddress address : endpointSubset.getAddresses()) {
712 String podIp = nodeIpGatewayIpMap.containsKey(address.getIp()) ?
713 nodeIpGatewayIpMap.get(address.getIp()) : address.getIp();
714
715 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
716 niciraConnTrackTreatmentBuilder(driverService, deviceId)
717 .commit(true)
718 .natAction(true)
719 .natIp(IpAddress.valueOf(podIp))
720 .natFlag(CT_NAT_DST_FLAG);
721
722 ports.forEach(p -> {
723 ExtensionTreatment ctNatTreatment = connTreatmentBuilder
Jian Lieb488ea2019-04-16 01:50:02 +0900724 .natPortMin(TpPort.tpPort(p.getPort()))
725 .natPortMax(TpPort.tpPort(p.getPort()))
726 .build();
Jian Li2cc2b632019-02-18 00:56:40 +0900727 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
Jian Li73d3b6a2019-07-08 18:07:53 +0900728 deviceService.getDevice(deviceId), ACL_TABLE);
Jian Li2cc2b632019-02-18 00:56:40 +0900729 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
730 .extension(ctNatTreatment, deviceId)
731 .extension(resubmitTreatment, deviceId)
732 .build();
733 buckets.add(buildGroupBucket(treatment, SELECT, (short) -1));
734 });
735 }
736 }
737 }
738
739 if (!buckets.isEmpty()) {
740 k8sGroupRuleService.setRule(appId, deviceId, groupId, SELECT, buckets, install);
741
742 setTrackNew(deviceId, ctState, ctMask, IpAddress.valueOf(serviceIp),
743 TpPort.tpPort(servicePort), NAT_TABLE, groupId,
744 PRIORITY_CT_RULE, install);
745 }
746 }
747
748 private void setUntrack(DeviceId deviceId, long ctState, long ctMask,
749 String srcCidr, String dstCidr, int installTable,
750 int transitTable, int priority, boolean install) {
751 ExtensionSelector esCtSate = RulePopulatorUtil
752 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
753 TrafficSelector selector = DefaultTrafficSelector.builder()
754 .matchEthType(Ethernet.TYPE_IPV4)
755 .matchIPSrc(IpPrefix.valueOf(srcCidr))
756 .matchIPDst(IpPrefix.valueOf(dstCidr))
757 .extension(esCtSate, deviceId)
758 .build();
759
760 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
761 niciraConnTrackTreatmentBuilder(driverService, deviceId)
762 .natAction(false)
763 .commit(false)
764 .table((short) transitTable);
765
766 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
767 .extension(connTreatmentBuilder.build(), deviceId)
768 .build();
769
770 k8sFlowRuleService.setRule(
771 appId,
772 deviceId,
773 selector,
774 treatment,
775 priority,
776 installTable,
777 install);
778 }
779
780 private void setTrackNew(DeviceId deviceId, long ctState, long ctMask,
781 IpAddress dstIp, TpPort dstPort, int installTable,
782 int groupId, int priority, boolean install) {
783 ExtensionSelector esCtSate = RulePopulatorUtil
784 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
785 TrafficSelector selector = DefaultTrafficSelector.builder()
786 .matchEthType(Ethernet.TYPE_IPV4)
787 .matchIPDst(IpPrefix.valueOf(dstIp, HOST_CIDR_NUM))
788 .matchIPProtocol(IPv4.PROTOCOL_TCP)
789 .matchTcpDst(dstPort)
790 .extension(esCtSate, deviceId)
791 .build();
792 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
793 .group(GroupId.valueOf(groupId))
794 .build();
795
796 k8sFlowRuleService.setRule(
797 appId,
798 deviceId,
799 selector,
800 treatment,
801 priority,
802 installTable,
803 install);
804 }
805
806 private void setTrackEstablish(DeviceId deviceId, long ctState, long ctMask,
807 int installTable, int transitTable,
808 int priority, boolean install) {
809 ExtensionSelector esCtSate = RulePopulatorUtil
810 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
811 TrafficSelector selector = DefaultTrafficSelector.builder()
812 .extension(esCtSate, deviceId)
813 .build();
814
815 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
816 .transition(transitTable)
817 .build();
818
819 k8sFlowRuleService.setRule(
820 appId,
821 deviceId,
822 selector,
823 treatment,
824 priority,
825 installTable,
826 install);
827 }
828
Jian Libf562c22019-04-15 18:07:14 +0900829 private void setEndpointsRules(Endpoints endpoints, boolean install) {
830 String appName = endpoints.getMetadata().getName();
831 Service service = k8sServiceService.services().stream().filter(s ->
832 appName.equals(s.getMetadata().getName()))
833 .findFirst().orElse(null);
Jian Li7b63fe62019-04-14 21:49:27 +0900834
Jian Libf562c22019-04-15 18:07:14 +0900835 if (service == null) {
836 return;
Jian Li7b63fe62019-04-14 21:49:27 +0900837 }
Jian Libf562c22019-04-15 18:07:14 +0900838
839 setGroupBuckets(service, install);
Jian Li7b63fe62019-04-14 21:49:27 +0900840 }
841
Jian Lif5da78a2019-04-15 01:52:23 +0900842 private String servicePortStr(String ip, int port, String protocol) {
843 return ip + "_" + port + "_" + protocol;
844 }
845
Jian Li004526d2019-02-25 16:26:27 +0900846 /**
847 * Extracts properties from the component configuration context.
848 *
849 * @param context the component context
850 */
851 private void readComponentConfiguration(ComponentContext context) {
852 Dictionary<?, ?> properties = context.getProperties();
853
854 String updatedNatMode = Tools.get(properties, SERVICE_IP_NAT_MODE);
855 serviceIpNatMode = updatedNatMode != null ? updatedNatMode : SERVICE_IP_NAT_MODE_DEFAULT;
856 log.info("Configured. Service IP NAT mode is {}", serviceIpNatMode);
Jian Lif5da78a2019-04-15 01:52:23 +0900857
858 String updatedServiceCidr = Tools.get(properties, SERVICE_CIDR);
859 serviceCidr = updatedServiceCidr != null ?
860 updatedServiceCidr : SERVICE_IP_CIDR_DEFAULT;
861 log.info("Configured. Service VIP range is {}", serviceCidr);
Jian Li004526d2019-02-25 16:26:27 +0900862 }
863
Jian Li4a7ce672019-04-09 15:20:25 +0900864 private void setServiceNatRules(DeviceId deviceId, boolean install) {
865 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
866 setStatefulServiceNatRules(deviceId, install);
867 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
868 setStatelessServiceNatRules(deviceId, install);
869 } else {
870 log.warn("Service IP NAT mode was not configured!");
871 }
872 }
873
Jian Li2cc2b632019-02-18 00:56:40 +0900874 private class InternalK8sServiceListener implements K8sServiceListener {
875
876 private boolean isRelevantHelper() {
877 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
878 }
879
880 @Override
881 public void event(K8sServiceEvent event) {
882 switch (event.type()) {
883 case K8S_SERVICE_CREATED:
Jian Li4a7ce672019-04-09 15:20:25 +0900884 case K8S_SERVICE_UPDATED:
Jian Li2cc2b632019-02-18 00:56:40 +0900885 eventExecutor.execute(() -> processServiceCreation(event.subject()));
886 break;
887 case K8S_SERVICE_REMOVED:
888 eventExecutor.execute(() -> processServiceRemoval(event.subject()));
889 break;
890 default:
891 // do nothing
892 break;
893 }
894 }
895
896 private void processServiceCreation(Service service) {
897 if (!isRelevantHelper()) {
898 return;
899 }
900
Jian Li5e8a22a2019-02-27 11:48:42 +0900901 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
902 long ctTrackNew = computeCtStateFlag(true, true, false);
903 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
Jian Li2cc2b632019-02-18 00:56:40 +0900904
Jian Li5e8a22a2019-02-27 11:48:42 +0900905 k8sNodeService.completeNodes().forEach(n ->
906 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
907 ctMaskTrackNew, service, true));
908 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
909 k8sNodeService.completeNodes().forEach(n ->
910 setStatelessGroupFlowRules(n.intgBridge(), service, true));
911 }
Jian Li2cc2b632019-02-18 00:56:40 +0900912 }
913
914 private void processServiceRemoval(Service service) {
915 if (!isRelevantHelper()) {
916 return;
917 }
918
Jian Li5e8a22a2019-02-27 11:48:42 +0900919 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
920 long ctTrackNew = computeCtStateFlag(true, true, false);
921 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
Jian Li2cc2b632019-02-18 00:56:40 +0900922
Jian Li5e8a22a2019-02-27 11:48:42 +0900923 k8sNodeService.completeNodes().forEach(n ->
924 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
925 ctMaskTrackNew, service, false));
926 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
927 k8sNodeService.completeNodes().forEach(n ->
928 setStatelessGroupFlowRules(n.intgBridge(), service, false));
929 }
Jian Li004526d2019-02-25 16:26:27 +0900930 }
Jian Li2cc2b632019-02-18 00:56:40 +0900931 }
932
Jian Li7b63fe62019-04-14 21:49:27 +0900933 private class InternalK8sEndpointsListener implements K8sEndpointsListener {
934
935 private boolean isRelevantHelper() {
936 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
937 }
938
939 @Override
940 public void event(K8sEndpointsEvent event) {
941 Endpoints endpoints = event.subject();
942
943 switch (event.type()) {
944 case K8S_ENDPOINTS_CREATED:
Jian Libf562c22019-04-15 18:07:14 +0900945 case K8S_ENDPOINTS_UPDATED:
Jian Li7b63fe62019-04-14 21:49:27 +0900946 eventExecutor.execute(() -> processEndpointsCreation(endpoints));
947 break;
948 case K8S_ENDPOINTS_REMOVED:
949 eventExecutor.execute(() -> processEndpointsRemoval(endpoints));
950 break;
951 default:
952 break;
953 }
954 }
955
956 private void processEndpointsCreation(Endpoints endpoints) {
957 if (!isRelevantHelper()) {
958 return;
959 }
960
Jian Libf562c22019-04-15 18:07:14 +0900961 setEndpointsRules(endpoints, true);
Jian Li7b63fe62019-04-14 21:49:27 +0900962 }
963
964 private void processEndpointsRemoval(Endpoints endpoints) {
965 if (!isRelevantHelper()) {
966 return;
967 }
968
Jian Libf562c22019-04-15 18:07:14 +0900969 setEndpointsRules(endpoints, false);
Jian Li4a7ce672019-04-09 15:20:25 +0900970 }
971 }
972
Jian Li2cc2b632019-02-18 00:56:40 +0900973 private class InternalNodeEventListener implements K8sNodeListener {
974
975 private boolean isRelevantHelper() {
976 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
977 }
978
979 @Override
980 public void event(K8sNodeEvent event) {
981 K8sNode k8sNode = event.subject();
982 switch (event.type()) {
983 case K8S_NODE_COMPLETE:
984 eventExecutor.execute(() -> processNodeCompletion(k8sNode));
985 break;
Jian Lidc1df642020-11-25 16:49:34 +0900986 case K8S_NODE_OFF_BOARDED:
987 eventExecutor.execute(() -> processNodeOffboard(k8sNode));
988 break;
Jian Li2cc2b632019-02-18 00:56:40 +0900989 case K8S_NODE_INCOMPLETE:
Jian Li2cc2b632019-02-18 00:56:40 +0900990 default:
991 break;
992 }
993 }
994
995 private void processNodeCompletion(K8sNode node) {
996 if (!isRelevantHelper()) {
997 return;
998 }
999
Jian Li4a7ce672019-04-09 15:20:25 +09001000 setServiceNatRules(node.intgBridge(), true);
Jian Libf562c22019-04-15 18:07:14 +09001001 k8sEndpointsService.endpointses().forEach(e -> setEndpointsRules(e, true));
Jian Lif5da78a2019-04-15 01:52:23 +09001002 k8sNetworkService.networks().forEach(n -> setupServiceDefaultRule(n, true));
1003 }
Jian Lidc1df642020-11-25 16:49:34 +09001004
1005 private void processNodeOffboard(K8sNode node) {
1006 if (!isRelevantHelper()) {
1007 return;
1008 }
1009
1010 K8sNetwork network = k8sNetworkService.network(node.hostname());
1011 setupServiceDefaultRule(network, false);
1012 }
Jian Lif5da78a2019-04-15 01:52:23 +09001013 }
1014
1015 private class InternalK8sNetworkListener implements K8sNetworkListener {
1016
1017 private boolean isRelevantHelper() {
1018 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
1019 }
1020
1021 @Override
1022 public void event(K8sNetworkEvent event) {
1023 switch (event.type()) {
1024 case K8S_NETWORK_CREATED:
1025 eventExecutor.execute(() -> processNetworkCreation(event.subject()));
1026 break;
1027 case K8S_NETWORK_UPDATED:
1028 case K8S_NETWORK_REMOVED:
1029 default:
1030 break;
1031 }
1032 }
1033
1034 private void processNetworkCreation(K8sNetwork network) {
1035 if (!isRelevantHelper()) {
1036 return;
1037 }
1038
1039 setupServiceDefaultRule(network, true);
Jian Li2cc2b632019-02-18 00:56:40 +09001040 }
1041 }
1042}