blob: 93d913db4706fff6bdda804790e2e6ffbefefeb2 [file] [log] [blame]
Jian Li2cc2b632019-02-18 00:56:40 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.Lists;
Jian Li004526d2019-02-25 16:26:27 +090019import com.google.common.collect.Maps;
Jian Li2cc2b632019-02-18 00:56:40 +090020import io.fabric8.kubernetes.api.model.EndpointAddress;
21import io.fabric8.kubernetes.api.model.EndpointPort;
22import io.fabric8.kubernetes.api.model.EndpointSubset;
23import io.fabric8.kubernetes.api.model.Endpoints;
Jian Lie1a5b8f2019-07-23 17:13:19 +090024import io.fabric8.kubernetes.api.model.Pod;
Jian Li2cc2b632019-02-18 00:56:40 +090025import io.fabric8.kubernetes.api.model.Service;
Jian Li5e8a22a2019-02-27 11:48:42 +090026import io.fabric8.kubernetes.api.model.ServicePort;
Jian Li2cc2b632019-02-18 00:56:40 +090027import org.onlab.packet.Ethernet;
28import org.onlab.packet.IPv4;
29import org.onlab.packet.IpAddress;
30import org.onlab.packet.IpPrefix;
Jian Lif5da78a2019-04-15 01:52:23 +090031import org.onlab.packet.MacAddress;
Jian Li2cc2b632019-02-18 00:56:40 +090032import org.onlab.packet.TpPort;
Jian Li004526d2019-02-25 16:26:27 +090033import org.onlab.util.Tools;
34import org.onosproject.cfg.ComponentConfigService;
Jian Li2cc2b632019-02-18 00:56:40 +090035import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.LeadershipService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
40import org.onosproject.core.GroupId;
Jian Li7b63fe62019-04-14 21:49:27 +090041import org.onosproject.k8snetworking.api.K8sEndpointsEvent;
42import org.onosproject.k8snetworking.api.K8sEndpointsListener;
Jian Li2cc2b632019-02-18 00:56:40 +090043import org.onosproject.k8snetworking.api.K8sEndpointsService;
44import org.onosproject.k8snetworking.api.K8sFlowRuleService;
45import org.onosproject.k8snetworking.api.K8sGroupRuleService;
Jian Lif5da78a2019-04-15 01:52:23 +090046import org.onosproject.k8snetworking.api.K8sNetwork;
47import org.onosproject.k8snetworking.api.K8sNetworkEvent;
48import org.onosproject.k8snetworking.api.K8sNetworkListener;
Jian Li2cc2b632019-02-18 00:56:40 +090049import org.onosproject.k8snetworking.api.K8sNetworkService;
Jian Lie1a5b8f2019-07-23 17:13:19 +090050import org.onosproject.k8snetworking.api.K8sPodService;
Jian Li2cc2b632019-02-18 00:56:40 +090051import org.onosproject.k8snetworking.api.K8sServiceEvent;
52import org.onosproject.k8snetworking.api.K8sServiceListener;
53import org.onosproject.k8snetworking.api.K8sServiceService;
54import org.onosproject.k8snetworking.util.RulePopulatorUtil;
55import org.onosproject.k8snetworking.util.RulePopulatorUtil.NiciraConnTrackTreatmentBuilder;
56import org.onosproject.k8snode.api.K8sNode;
57import org.onosproject.k8snode.api.K8sNodeEvent;
58import org.onosproject.k8snode.api.K8sNodeListener;
59import org.onosproject.k8snode.api.K8sNodeService;
60import org.onosproject.net.DeviceId;
Jian Lif5da78a2019-04-15 01:52:23 +090061import org.onosproject.net.PortNumber;
Jian Li2cc2b632019-02-18 00:56:40 +090062import org.onosproject.net.device.DeviceService;
63import org.onosproject.net.driver.DriverService;
64import org.onosproject.net.flow.DefaultTrafficSelector;
65import org.onosproject.net.flow.DefaultTrafficTreatment;
66import org.onosproject.net.flow.TrafficSelector;
67import org.onosproject.net.flow.TrafficTreatment;
68import org.onosproject.net.flow.criteria.ExtensionSelector;
69import org.onosproject.net.flow.instructions.ExtensionTreatment;
70import org.onosproject.net.group.GroupBucket;
Jian Li2cc2b632019-02-18 00:56:40 +090071import org.onosproject.store.service.StorageService;
Jian Li004526d2019-02-25 16:26:27 +090072import org.osgi.service.component.ComponentContext;
Jian Li2cc2b632019-02-18 00:56:40 +090073import org.osgi.service.component.annotations.Activate;
74import org.osgi.service.component.annotations.Component;
75import org.osgi.service.component.annotations.Deactivate;
Jian Li004526d2019-02-25 16:26:27 +090076import org.osgi.service.component.annotations.Modified;
Jian Li2cc2b632019-02-18 00:56:40 +090077import org.osgi.service.component.annotations.Reference;
78import org.osgi.service.component.annotations.ReferenceCardinality;
79import org.slf4j.Logger;
80
Jian Li004526d2019-02-25 16:26:27 +090081import java.util.Dictionary;
Jian Li2cc2b632019-02-18 00:56:40 +090082import java.util.List;
83import java.util.Map;
84import java.util.Objects;
Jian Li004526d2019-02-25 16:26:27 +090085import java.util.Set;
Jian Li2cc2b632019-02-18 00:56:40 +090086import java.util.concurrent.ExecutorService;
87import java.util.stream.Collectors;
88
89import static java.util.concurrent.Executors.newSingleThreadExecutor;
90import static org.onlab.util.Tools.groupedThreads;
Jian Li73d3b6a2019-07-08 18:07:53 +090091import static org.onosproject.k8snetworking.api.Constants.ACL_TABLE;
Jian Li140d8a22019-04-24 23:41:44 +090092import static org.onosproject.k8snetworking.api.Constants.A_CLASS;
93import static org.onosproject.k8snetworking.api.Constants.B_CLASS;
Jian Li5e8a22a2019-02-27 11:48:42 +090094import static org.onosproject.k8snetworking.api.Constants.DST;
Jian Li73d3b6a2019-07-08 18:07:53 +090095import static org.onosproject.k8snetworking.api.Constants.GROUPING_TABLE;
Jian Li2cc2b632019-02-18 00:56:40 +090096import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Lie1a5b8f2019-07-23 17:13:19 +090097import static org.onosproject.k8snetworking.api.Constants.NAMESPACE_TABLE;
Jian Li004526d2019-02-25 16:26:27 +090098import static org.onosproject.k8snetworking.api.Constants.NAT_STATEFUL;
99import static org.onosproject.k8snetworking.api.Constants.NAT_STATELESS;
Jian Li2cc2b632019-02-18 00:56:40 +0900100import static org.onosproject.k8snetworking.api.Constants.NAT_TABLE;
Jian Li140d8a22019-04-24 23:41:44 +0900101import static org.onosproject.k8snetworking.api.Constants.NODE_IP_PREFIX;
Jian Li004526d2019-02-25 16:26:27 +0900102import static org.onosproject.k8snetworking.api.Constants.POD_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900103import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE;
Jian Li2cc2b632019-02-18 00:56:40 +0900104import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE;
Jian Li7d111d72019-04-12 13:58:44 +0900105import static org.onosproject.k8snetworking.api.Constants.PRIORITY_INTER_ROUTING_RULE;
Jian Li004526d2019-02-25 16:26:27 +0900106import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAT_RULE;
Jian Li2cc2b632019-02-18 00:56:40 +0900107import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900108import static org.onosproject.k8snetworking.api.Constants.SERVICE_FAKE_MAC_STR;
Jian Li004526d2019-02-25 16:26:27 +0900109import static org.onosproject.k8snetworking.api.Constants.SERVICE_TABLE;
110import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_CIDR;
111import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
Jian Li5e8a22a2019-02-27 11:48:42 +0900112import static org.onosproject.k8snetworking.api.Constants.SRC;
Jian Lie1a5b8f2019-07-23 17:13:19 +0900113import static org.onosproject.k8snetworking.api.Constants.STAT_EGRESS_TABLE;
Jian Li619fa282020-09-02 14:45:35 +0900114import static org.onosproject.k8snetworking.api.Constants.TUN_ENTRY_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900115import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_CIDR;
116import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_CIDR_DEFAULT;
Jian Li004526d2019-02-25 16:26:27 +0900117import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE;
118import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE_DEFAULT;
Jian Li140d8a22019-04-24 23:41:44 +0900119import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getBclassIpPrefixFromCidr;
Jian Li2cc2b632019-02-18 00:56:40 +0900120import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap;
Jian Lie1a5b8f2019-07-23 17:13:19 +0900121import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.podByIp;
122import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.portNumberByName;
Jian Lif5da78a2019-04-15 01:52:23 +0900123import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId;
Jian Li2cc2b632019-02-18 00:56:40 +0900124import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_DST_FLAG;
Jian Lif5da78a2019-04-15 01:52:23 +0900125import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension;
Jian Li2cc2b632019-02-18 00:56:40 +0900126import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildGroupBucket;
Jian Li004526d2019-02-25 16:26:27 +0900127import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildLoadExtension;
Jian Li2cc2b632019-02-18 00:56:40 +0900128import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildResubmitExtension;
129import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtMaskFlag;
130import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtStateFlag;
131import static org.onosproject.k8snetworking.util.RulePopulatorUtil.niciraConnTrackTreatmentBuilder;
132import static org.onosproject.net.group.GroupDescription.Type.SELECT;
133import static org.slf4j.LoggerFactory.getLogger;
134
135/**
136 * Handles the service IP to pod IP related translation traffic.
137 */
Jian Li004526d2019-02-25 16:26:27 +0900138@Component(
139 immediate = true,
140 property = {
Jian Lif5da78a2019-04-15 01:52:23 +0900141 SERVICE_IP_NAT_MODE + "=" + SERVICE_IP_NAT_MODE_DEFAULT,
142 SERVICE_CIDR + "=" + SERVICE_IP_CIDR_DEFAULT
Jian Li004526d2019-02-25 16:26:27 +0900143 }
144)
Jian Li2cc2b632019-02-18 00:56:40 +0900145public class K8sServiceHandler {
146
147 private final Logger log = getLogger(getClass());
148
Jian Li2cc2b632019-02-18 00:56:40 +0900149 private static final int HOST_CIDR_NUM = 32;
150
Jian Li2cc2b632019-02-18 00:56:40 +0900151 private static final String CLUSTER_IP = "ClusterIP";
152 private static final String TCP = "TCP";
Jian Li5e8a22a2019-02-27 11:48:42 +0900153 private static final String UDP = "UDP";
Jian Lif5da78a2019-04-15 01:52:23 +0900154 private static final String SERVICE_IP_NAT_MODE = "serviceIpNatMode";
Jian Li2cc2b632019-02-18 00:56:40 +0900155
Jian Li140d8a22019-04-24 23:41:44 +0900156 private static final String SERVICE_CIDR = "serviceCidr";
Jian Li44c2b122019-05-03 14:46:34 +0900157 private static final String NONE = "None";
Jian Li140d8a22019-04-24 23:41:44 +0900158 private static final String B_CLASS_SUFFIX = ".0.0/16";
159 private static final String A_CLASS_SUFFIX = ".0.0.0/8";
160
Jian Li2cc2b632019-02-18 00:56:40 +0900161 @Reference(cardinality = ReferenceCardinality.MANDATORY)
162 protected CoreService coreService;
163
164 @Reference(cardinality = ReferenceCardinality.MANDATORY)
165 protected LeadershipService leadershipService;
166
167 @Reference(cardinality = ReferenceCardinality.MANDATORY)
168 protected ClusterService clusterService;
169
170 @Reference(cardinality = ReferenceCardinality.MANDATORY)
171 protected DriverService driverService;
172
173 @Reference(cardinality = ReferenceCardinality.MANDATORY)
174 protected DeviceService deviceService;
175
176 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li004526d2019-02-25 16:26:27 +0900177 protected ComponentConfigService configService;
178
179 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li2cc2b632019-02-18 00:56:40 +0900180 protected StorageService storageService;
181
182 @Reference(cardinality = ReferenceCardinality.MANDATORY)
183 protected K8sNetworkService k8sNetworkService;
184
185 @Reference(cardinality = ReferenceCardinality.MANDATORY)
186 protected K8sFlowRuleService k8sFlowRuleService;
187
188 @Reference(cardinality = ReferenceCardinality.MANDATORY)
189 protected K8sGroupRuleService k8sGroupRuleService;
190
191 @Reference(cardinality = ReferenceCardinality.MANDATORY)
192 protected K8sNodeService k8sNodeService;
193
194 @Reference(cardinality = ReferenceCardinality.MANDATORY)
195 protected K8sEndpointsService k8sEndpointsService;
196
197 @Reference(cardinality = ReferenceCardinality.MANDATORY)
198 protected K8sServiceService k8sServiceService;
199
Jian Lie1a5b8f2019-07-23 17:13:19 +0900200 @Reference(cardinality = ReferenceCardinality.MANDATORY)
201 protected K8sPodService k8sPodService;
Jian Li4a7ce672019-04-09 15:20:25 +0900202
Jian Li004526d2019-02-25 16:26:27 +0900203 /** Service IP address translation mode. */
204 private String serviceIpNatMode = SERVICE_IP_NAT_MODE_DEFAULT;
205
Jian Lif5da78a2019-04-15 01:52:23 +0900206 /** Ranges of IP address of service VIP. */
207 private String serviceCidr = SERVICE_IP_CIDR_DEFAULT;
208
Jian Li2cc2b632019-02-18 00:56:40 +0900209 private final ExecutorService eventExecutor = newSingleThreadExecutor(
210 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
211 private final InternalNodeEventListener internalNodeEventListener =
212 new InternalNodeEventListener();
213 private final InternalK8sServiceListener internalK8sServiceListener =
214 new InternalK8sServiceListener();
Jian Li7b63fe62019-04-14 21:49:27 +0900215 private final InternalK8sEndpointsListener internalK8sEndpointsListener =
216 new InternalK8sEndpointsListener();
Jian Lif5da78a2019-04-15 01:52:23 +0900217 private final InternalK8sNetworkListener internalK8sNetworkListener =
218 new InternalK8sNetworkListener();
Jian Li2cc2b632019-02-18 00:56:40 +0900219
Jian Li2cc2b632019-02-18 00:56:40 +0900220 private ApplicationId appId;
221 private NodeId localNodeId;
222
223 @Activate
224 protected void activate() {
225 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
Jian Li004526d2019-02-25 16:26:27 +0900226 configService.registerProperties(getClass());
Jian Li2cc2b632019-02-18 00:56:40 +0900227 localNodeId = clusterService.getLocalNode().id();
228 leadershipService.runForLeadership(appId.name());
229 k8sNodeService.addListener(internalNodeEventListener);
230 k8sServiceService.addListener(internalK8sServiceListener);
Jian Li7b63fe62019-04-14 21:49:27 +0900231 k8sEndpointsService.addListener(internalK8sEndpointsListener);
Jian Lif5da78a2019-04-15 01:52:23 +0900232 k8sNetworkService.addListener(internalK8sNetworkListener);
Jian Li2cc2b632019-02-18 00:56:40 +0900233
Jian Li2cc2b632019-02-18 00:56:40 +0900234 log.info("Started");
235 }
236
237 @Deactivate
238 protected void deactivate() {
239 leadershipService.withdraw(appId.name());
240 k8sNodeService.removeListener(internalNodeEventListener);
241 k8sServiceService.removeListener(internalK8sServiceListener);
Jian Li7b63fe62019-04-14 21:49:27 +0900242 k8sEndpointsService.removeListener(internalK8sEndpointsListener);
Jian Lif5da78a2019-04-15 01:52:23 +0900243 k8sNetworkService.removeListener(internalK8sNetworkListener);
Jian Li004526d2019-02-25 16:26:27 +0900244 configService.unregisterProperties(getClass(), false);
Jian Li2cc2b632019-02-18 00:56:40 +0900245 eventExecutor.shutdown();
246
247 log.info("Stopped");
248 }
249
Jian Li004526d2019-02-25 16:26:27 +0900250 @Modified
251 void modified(ComponentContext context) {
252 readComponentConfiguration(context);
253
254 log.info("Modified");
255 }
256
257 private void setStatefulServiceNatRules(DeviceId deviceId, boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900258 // -trk CT rules
259 long ctUntrack = computeCtStateFlag(false, false, false);
260 long ctMaskUntrack = computeCtMaskFlag(true, false, false);
261
262 k8sNetworkService.networks().forEach(n -> {
263 // TODO: need to provide a way to add multiple service IP CIDR ranges
Jian Lif5da78a2019-04-15 01:52:23 +0900264 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), serviceCidr,
Jian Li73d3b6a2019-07-08 18:07:53 +0900265 GROUPING_TABLE, NAT_TABLE, PRIORITY_CT_RULE, install);
Jian Li2cc2b632019-02-18 00:56:40 +0900266 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), n.cidr(),
Jian Lie1a5b8f2019-07-23 17:13:19 +0900267 GROUPING_TABLE, NAMESPACE_TABLE, PRIORITY_CT_RULE, install);
Jian Li2cc2b632019-02-18 00:56:40 +0900268 });
269
270 // +trk-new CT rules
271 long ctTrackUnnew = computeCtStateFlag(true, false, false);
272 long ctMaskTrackUnnew = computeCtMaskFlag(true, true, false);
273
274 setTrackEstablish(deviceId, ctTrackUnnew, ctMaskTrackUnnew,
275 NAT_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
276
277 // +trk+new CT rules
278 long ctTrackNew = computeCtStateFlag(true, true, false);
279 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
280
281 k8sServiceService.services().stream()
282 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
Jian Li004526d2019-02-25 16:26:27 +0900283 .forEach(s -> setStatefulGroupFlowRules(deviceId, ctTrackNew,
284 ctMaskTrackNew, s, install));
Jian Li2cc2b632019-02-18 00:56:40 +0900285 }
286
Jian Li004526d2019-02-25 16:26:27 +0900287 private void setStatelessServiceNatRules(DeviceId deviceId, boolean install) {
288
Jian Li140d8a22019-04-24 23:41:44 +0900289 String srcPodCidr = k8sNetworkService.network(
Jian Li7d111d72019-04-12 13:58:44 +0900290 k8sNodeService.node(deviceId).hostname()).cidr();
Jian Li140d8a22019-04-24 23:41:44 +0900291 String srcPodPrefix = getBclassIpPrefixFromCidr(srcPodCidr);
292 String fullSrcPodCidr = srcPodPrefix + B_CLASS_SUFFIX;
293 String fullSrcNodeCidr = NODE_IP_PREFIX + A_CLASS_SUFFIX;
294
295 // src: POD -> dst: service (unNAT POD) grouping
296 setSrcDstCidrRules(deviceId, fullSrcPodCidr, serviceCidr, B_CLASS, null,
Jian Li73d3b6a2019-07-08 18:07:53 +0900297 SHIFTED_IP_PREFIX, SRC, GROUPING_TABLE, SERVICE_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900298 PRIORITY_CT_RULE, install);
299 // src: POD (unNAT service) -> dst: shifted POD grouping
300 setSrcDstCidrRules(deviceId, fullSrcPodCidr, SHIFTED_IP_CIDR, B_CLASS, null,
Jian Li73d3b6a2019-07-08 18:07:53 +0900301 srcPodPrefix, DST, GROUPING_TABLE, POD_TABLE, PRIORITY_CT_RULE, install);
Jian Li140d8a22019-04-24 23:41:44 +0900302
303 // src: node -> dst: service (unNAT POD) grouping
304 setSrcDstCidrRules(deviceId, fullSrcNodeCidr, serviceCidr, A_CLASS,
Jian Li73d3b6a2019-07-08 18:07:53 +0900305 null, null, null, GROUPING_TABLE, SERVICE_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900306 PRIORITY_CT_RULE, install);
307 // src: POD (unNAT service) -> dst: node grouping
308 setSrcDstCidrRules(deviceId, fullSrcPodCidr, fullSrcNodeCidr, A_CLASS,
Jian Li73d3b6a2019-07-08 18:07:53 +0900309 null, null, null, GROUPING_TABLE, POD_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900310 PRIORITY_CT_RULE, install);
Jian Li7d111d72019-04-12 13:58:44 +0900311
Jian Li004526d2019-02-25 16:26:27 +0900312 k8sNetworkService.networks().forEach(n -> {
Jian Li140d8a22019-04-24 23:41:44 +0900313 setSrcDstCidrRules(deviceId, fullSrcPodCidr, n.cidr(), B_CLASS,
314 n.segmentId(), null, null, ROUTING_TABLE,
Jian Li73d3b6a2019-07-08 18:07:53 +0900315 STAT_EGRESS_TABLE, PRIORITY_INTER_ROUTING_RULE, install);
Jian Li004526d2019-02-25 16:26:27 +0900316 });
317
318 // setup load balancing rules using group table
Jian Li5abc9f02020-09-04 19:38:37 +0900319 k8sServiceService.services().forEach(s -> setStatelessGroupFlowRules(deviceId, s, install));
Jian Li004526d2019-02-25 16:26:27 +0900320 }
321
322 private void setSrcDstCidrRules(DeviceId deviceId, String srcCidr,
Jian Li140d8a22019-04-24 23:41:44 +0900323 String dstCidr, String cidrClass,
324 String segId, String shiftPrefix,
325 String shiftType, int installTable,
326 int transitTable, int priority,
327 boolean install) {
Jian Li004526d2019-02-25 16:26:27 +0900328 TrafficSelector selector = DefaultTrafficSelector.builder()
329 .matchEthType(Ethernet.TYPE_IPV4)
330 .matchIPSrc(IpPrefix.valueOf(srcCidr))
331 .matchIPDst(IpPrefix.valueOf(dstCidr))
332 .build();
333
Jian Li7d111d72019-04-12 13:58:44 +0900334 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
335
336 if (segId != null) {
337 tBuilder.setTunnelId(Long.valueOf(segId));
338 }
Jian Li140d8a22019-04-24 23:41:44 +0900339
340 if (shiftPrefix != null && shiftType != null) {
341 ExtensionTreatment loadTreatment = buildLoadExtension(
342 deviceService.getDevice(deviceId), cidrClass, shiftType, shiftPrefix);
343 tBuilder.extension(loadTreatment, deviceId);
344 }
345
Jian Li7d111d72019-04-12 13:58:44 +0900346 tBuilder.transition(transitTable);
Jian Li004526d2019-02-25 16:26:27 +0900347
348 k8sFlowRuleService.setRule(
349 appId,
350 deviceId,
351 selector,
Jian Li7d111d72019-04-12 13:58:44 +0900352 tBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900353 priority,
354 installTable,
355 install);
356 }
357
Jian Li5e8a22a2019-02-27 11:48:42 +0900358 /**
359 * Obtains the service port to endpoint address paired map.
360 *
361 * @param service kubernetes service
362 * @return a map where key is kubernetes service port, and value is the
363 * endpoint addresses that are associated with the service port
364 */
365 private Map<ServicePort, Set<String>> getSportEpAddressMap(Service service) {
366
367 Map<ServicePort, Set<String>> map = Maps.newConcurrentMap();
Jian Li004526d2019-02-25 16:26:27 +0900368
369 String serviceName = service.getMetadata().getName();
Jian Li004526d2019-02-25 16:26:27 +0900370 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
371 .stream()
372 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
373 .collect(Collectors.toList());
374
Jian Li5e8a22a2019-02-27 11:48:42 +0900375 service.getSpec().getPorts().stream()
376 .filter(Objects::nonNull)
377 .filter(sp -> sp.getTargetPort() != null)
Jian Lib7dfb5b2019-07-15 17:37:12 +0900378 .filter(sp -> sp.getTargetPort().getIntVal() != null ||
Jian Lie1a5b8f2019-07-23 17:13:19 +0900379 sp.getTargetPort().getStrVal() != null)
Jian Li5e8a22a2019-02-27 11:48:42 +0900380 .forEach(sp -> {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900381 Integer targetPortInt = sp.getTargetPort().getIntVal() != null ?
382 sp.getTargetPort().getIntVal() : 0;
383 String targetPortName = sp.getTargetPort().getStrVal() != null ?
384 sp.getTargetPort().getStrVal() : "";
385 String targetProtocol = sp.getProtocol();
Jian Li004526d2019-02-25 16:26:27 +0900386
Jian Lie1a5b8f2019-07-23 17:13:19 +0900387 for (Endpoints endpoints : endpointses) {
388 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
389
390 // in case service port name is specified but not port number
391 // we will lookup the container port number and use it
392 // as the target port number
393 if (!targetPortName.equals("") && targetPortInt == 0) {
394 for (EndpointAddress addr : endpointSubset.getAddresses()) {
395 Pod pod = podByIp(k8sPodService, addr.getIp());
396 targetPortInt = portNumberByName(pod, targetPortName);
397 }
398 }
399
Jian Li5cf3b002019-08-30 17:57:53 +0900400 if (targetPortInt == 0) {
401 continue;
402 }
403
Jian Lie1a5b8f2019-07-23 17:13:19 +0900404 for (EndpointPort endpointPort : endpointSubset.getPorts()) {
405 if (targetProtocol.equals(endpointPort.getProtocol()) &&
406 (targetPortInt.equals(endpointPort.getPort()) ||
407 targetPortName.equals(endpointPort.getName()))) {
408 Set<String> addresses = endpointSubset.getAddresses()
409 .stream().map(EndpointAddress::getIp)
410 .collect(Collectors.toSet());
411 map.put(sp, addresses);
412 }
413 }
Jian Li004526d2019-02-25 16:26:27 +0900414 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900415 }
Jian Lie1a5b8f2019-07-23 17:13:19 +0900416 });
Jian Li004526d2019-02-25 16:26:27 +0900417
Jian Li5e8a22a2019-02-27 11:48:42 +0900418 return map;
419 }
420
Jian Libf562c22019-04-15 18:07:14 +0900421 private void setGroupBuckets(Service service, boolean install) {
Jian Li4a7ce672019-04-09 15:20:25 +0900422 Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
423 Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
Jian Li7b63fe62019-04-14 21:49:27 +0900424 Map<String, String> nodeIpGatewayIpMap =
425 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
Jian Li4a7ce672019-04-09 15:20:25 +0900426
Jian Libf562c22019-04-15 18:07:14 +0900427 for (K8sNode node : k8sNodeService.completeNodes()) {
428 spEpasMap.forEach((sp, epas) -> {
429 List<GroupBucket> bkts = Lists.newArrayList();
Jian Li4a7ce672019-04-09 15:20:25 +0900430
Jian Libf562c22019-04-15 18:07:14 +0900431 for (String ip : epas) {
Jian Li5cf3b002019-08-30 17:57:53 +0900432 GroupBucket bkt = buildBuckets(node.intgBridge(),
433 nodeIpGatewayIpMap.getOrDefault(ip, ip), sp);
434
435 if (bkt == null) {
436 continue;
437 }
438
Jian Libf562c22019-04-15 18:07:14 +0900439 if (install) {
Jian Li5cf3b002019-08-30 17:57:53 +0900440 bkts.add(bkt);
Jian Libf562c22019-04-15 18:07:14 +0900441 } else {
Jian Li5cf3b002019-08-30 17:57:53 +0900442 bkts.remove(bkt);
Jian Li7b63fe62019-04-14 21:49:27 +0900443 }
Jian Libf562c22019-04-15 18:07:14 +0900444 }
445
446 spGrpBkts.put(sp, bkts);
447 });
448
449 String serviceIp = service.getSpec().getClusterIP();
450 spGrpBkts.forEach((sp, bkts) -> {
451 String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
452 int groupId = svcStr.hashCode();
453
454 if (bkts.size() > 0) {
455 k8sGroupRuleService.setBuckets(appId, node.intgBridge(), groupId, bkts);
456 }
457 });
458
459 spEpasMap.forEach((sp, epas) ->
Jian Lie1a5b8f2019-07-23 17:13:19 +0900460 // add flow rules for unshifting IP domain
461 epas.forEach(epa -> {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900462
Jian Lie1a5b8f2019-07-23 17:13:19 +0900463 String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
464
465 int targetPort;
466 if (sp.getTargetPort().getIntVal() == null) {
467 Pod pod = podByIp(k8sPodService, podIp);
468 targetPort = portNumberByName(pod, sp.getTargetPort().getStrVal());
469 } else {
470 targetPort = sp.getTargetPort().getIntVal();
471 }
472
Jian Li5cf3b002019-08-30 17:57:53 +0900473 if (targetPort != 0) {
474 setUnshiftDomainRules(node.intgBridge(), POD_TABLE,
475 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
476 sp.getProtocol(), podIp,
477 targetPort, install);
478 }
Jian Lie1a5b8f2019-07-23 17:13:19 +0900479 })
Jian Libf562c22019-04-15 18:07:14 +0900480 );
481 }
Jian Li4a7ce672019-04-09 15:20:25 +0900482 }
483
Jian Lib7dfb5b2019-07-15 17:37:12 +0900484 private GroupBucket buildBuckets(DeviceId deviceId, String podIpStr, ServicePort sp) {
Jian Li4a7ce672019-04-09 15:20:25 +0900485 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li7d111d72019-04-12 13:58:44 +0900486 .setIpDst(IpAddress.valueOf(podIpStr));
Jian Li4a7ce672019-04-09 15:20:25 +0900487
Jian Lie1a5b8f2019-07-23 17:13:19 +0900488 int targetPort;
489 if (sp.getTargetPort().getIntVal() == null) {
490 Pod pod = podByIp(k8sPodService, podIpStr);
491 targetPort = portNumberByName(pod, sp.getTargetPort().getStrVal());
492 } else {
493 targetPort = sp.getTargetPort().getIntVal();
494 }
Jian Lib7dfb5b2019-07-15 17:37:12 +0900495
Jian Li5cf3b002019-08-30 17:57:53 +0900496 if (targetPort == 0) {
497 return null;
498 }
499
Jian Li4a7ce672019-04-09 15:20:25 +0900500 if (TCP.equals(sp.getProtocol())) {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900501 tBuilder.setTcpDst(TpPort.tpPort(targetPort));
Jian Li4a7ce672019-04-09 15:20:25 +0900502 } else if (UDP.equals(sp.getProtocol())) {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900503 tBuilder.setUdpDst(TpPort.tpPort(targetPort));
Jian Li4a7ce672019-04-09 15:20:25 +0900504 }
505
Jian Li7d111d72019-04-12 13:58:44 +0900506 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
Jian Li73d3b6a2019-07-08 18:07:53 +0900507 deviceService.getDevice(deviceId), ACL_TABLE);
Jian Li7d111d72019-04-12 13:58:44 +0900508 tBuilder.extension(resubmitTreatment, deviceId);
509
Jian Li5cf3b002019-08-30 17:57:53 +0900510 // TODO: need to adjust group bucket weight by considering POD locality
Jian Libf562c22019-04-15 18:07:14 +0900511 return buildGroupBucket(tBuilder.build(), SELECT, (short) -1);
Jian Li4a7ce672019-04-09 15:20:25 +0900512 }
513
514 private synchronized void setStatelessGroupFlowRules(DeviceId deviceId,
515 Service service,
516 boolean install) {
Jian Li7b63fe62019-04-14 21:49:27 +0900517 Set<ServicePort> sps = service.getSpec().getPorts().stream()
518 .filter(Objects::nonNull)
519 .filter(sp -> sp.getTargetPort() != null)
Jian Lib7dfb5b2019-07-15 17:37:12 +0900520 .filter(sp -> sp.getTargetPort().getIntVal() != null ||
Jian Lie1a5b8f2019-07-23 17:13:19 +0900521 sp.getTargetPort().getStrVal() != null)
Jian Li7b63fe62019-04-14 21:49:27 +0900522 .collect(Collectors.toSet());
Jian Li5e8a22a2019-02-27 11:48:42 +0900523
524 String serviceIp = service.getSpec().getClusterIP();
Jian Li7b63fe62019-04-14 21:49:27 +0900525 sps.forEach(sp -> {
Jian Li5e8a22a2019-02-27 11:48:42 +0900526 String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
Jian Li4a7ce672019-04-09 15:20:25 +0900527 int groupId = svcStr.hashCode();
Jian Li5e8a22a2019-02-27 11:48:42 +0900528
Jian Li4a7ce672019-04-09 15:20:25 +0900529 if (install) {
530
531 // add group table rules
532 k8sGroupRuleService.setRule(appId, deviceId, groupId,
Jian Li7b63fe62019-04-14 21:49:27 +0900533 SELECT, Lists.newArrayList(), true);
Jian Li4a7ce672019-04-09 15:20:25 +0900534
535 log.info("Adding group rule {}", groupId);
536
537 // if we failed to add group rule, we will not install flow rules
538 // as this might cause rule inconsistency
539 if (k8sGroupRuleService.hasGroup(deviceId, groupId)) {
540 // add flow rules for shifting IP domain
541 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
542 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
543 sp.getProtocol(), true);
544 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900545 } else {
Jian Li4a7ce672019-04-09 15:20:25 +0900546 // remove flow rules for shifting IP domain
547 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
548 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
549 sp.getProtocol(), false);
550
551 // remove group table rules
552 k8sGroupRuleService.setRule(appId, deviceId, groupId,
Jian Li7b63fe62019-04-14 21:49:27 +0900553 SELECT, Lists.newArrayList(), false);
Jian Li4a7ce672019-04-09 15:20:25 +0900554
555 log.info("Removing group rule {}", groupId);
Jian Li5e8a22a2019-02-27 11:48:42 +0900556 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900557 });
Jian Li004526d2019-02-25 16:26:27 +0900558 }
559
560 private void setShiftDomainRules(DeviceId deviceId, int installTable,
Jian Li5e8a22a2019-02-27 11:48:42 +0900561 int groupId, int priority, String serviceIp,
562 int servicePort, String protocol, boolean install) {
Jian Li44c2b122019-05-03 14:46:34 +0900563
564 if (serviceIp == null || NONE.equals(serviceIp)) {
565 return;
566 }
567
Jian Li5e8a22a2019-02-27 11:48:42 +0900568 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Li004526d2019-02-25 16:26:27 +0900569 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li5e8a22a2019-02-27 11:48:42 +0900570 .matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(serviceIp), HOST_CIDR_NUM));
571
572 if (TCP.equals(protocol)) {
573 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
574 .matchTcpDst(TpPort.tpPort(servicePort));
575 } else if (UDP.equals(protocol)) {
576 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
577 .matchUdpDst(TpPort.tpPort(servicePort));
578 }
Jian Li004526d2019-02-25 16:26:27 +0900579
Jian Li004526d2019-02-25 16:26:27 +0900580 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
Jian Li004526d2019-02-25 16:26:27 +0900581 .group(GroupId.valueOf(groupId))
582 .build();
583
584 k8sFlowRuleService.setRule(
585 appId,
586 deviceId,
Jian Li5e8a22a2019-02-27 11:48:42 +0900587 sBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900588 treatment,
589 priority,
590 installTable,
591 install);
592 }
593
594 private void setUnshiftDomainRules(DeviceId deviceId, int installTable,
Jian Li5e8a22a2019-02-27 11:48:42 +0900595 int priority, String serviceIp,
596 int servicePort, String protocol,
597 String podIp, int podPort, boolean install) {
598 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Li004526d2019-02-25 16:26:27 +0900599 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li5e8a22a2019-02-27 11:48:42 +0900600 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(podIp), HOST_CIDR_NUM));
601
602 if (TCP.equals(protocol)) {
603 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
604 .matchTcpSrc(TpPort.tpPort(podPort));
605 } else if (UDP.equals(protocol)) {
606 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
607 .matchUdpSrc(TpPort.tpPort(podPort));
608 }
609
Jian Li5e8a22a2019-02-27 11:48:42 +0900610 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li004526d2019-02-25 16:26:27 +0900611 .setIpSrc(IpAddress.valueOf(serviceIp))
Jian Li73d3b6a2019-07-08 18:07:53 +0900612 .transition(ACL_TABLE);
Jian Li5e8a22a2019-02-27 11:48:42 +0900613
614 if (TCP.equals(protocol)) {
615 tBuilder.setTcpSrc(TpPort.tpPort(servicePort));
616 } else if (UDP.equals(protocol)) {
617 tBuilder.setUdpSrc(TpPort.tpPort(servicePort));
618 }
Jian Li004526d2019-02-25 16:26:27 +0900619
620 k8sFlowRuleService.setRule(
621 appId,
622 deviceId,
Jian Li5e8a22a2019-02-27 11:48:42 +0900623 sBuilder.build(),
624 tBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900625 priority,
626 installTable,
627 install);
628 }
629
Jian Lif5da78a2019-04-15 01:52:23 +0900630 private void setCidrRoutingRule(IpPrefix prefix, MacAddress mac,
631 K8sNetwork network, boolean install) {
632 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
633 .matchEthType(Ethernet.TYPE_IPV4)
634 .matchIPSrc(prefix)
635 .matchIPDst(IpPrefix.valueOf(network.cidr()));
636
637 k8sNodeService.completeNodes().forEach(n -> {
638 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
639 .setTunnelId(Long.valueOf(network.segmentId()));
640
641 if (n.hostname().equals(network.name())) {
642 if (mac != null) {
643 tBuilder.setEthSrc(mac);
644 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900645 tBuilder.transition(STAT_EGRESS_TABLE);
Jian Lif5da78a2019-04-15 01:52:23 +0900646 } else {
Jian Lif5da78a2019-04-15 01:52:23 +0900647 K8sNode localNode = k8sNodeService.node(network.name());
648
Jian Li619fa282020-09-02 14:45:35 +0900649 tBuilder.setOutput(n.intgToTunPortNum());
650
651 PortNumber portNum = tunnelPortNumByNetId(network.networkId(),
652 k8sNetworkService, n);
653
654 // install rules into tunnel bridge
655 TrafficTreatment treatmentToRemote = DefaultTrafficTreatment.builder()
656 .extension(buildExtension(
657 deviceService,
658 n.tunBridge(),
659 localNode.dataIp().getIp4Address()),
660 n.tunBridge())
661 .setTunnelId(Long.valueOf(network.segmentId()))
662 .setOutput(portNum)
663 .build();
664
665 k8sFlowRuleService.setRule(
666 appId,
667 n.tunBridge(),
668 sBuilder.build(),
669 treatmentToRemote,
670 PRIORITY_CIDR_RULE,
671 TUN_ENTRY_TABLE,
672 install
673 );
Jian Lif5da78a2019-04-15 01:52:23 +0900674 }
675
676 k8sFlowRuleService.setRule(
677 appId,
678 n.intgBridge(),
679 sBuilder.build(),
680 tBuilder.build(),
681 PRIORITY_CIDR_RULE,
682 ROUTING_TABLE,
683 install
684 );
685 });
686 }
687
688 private void setupServiceDefaultRule(K8sNetwork k8sNetwork, boolean install) {
689 setCidrRoutingRule(IpPrefix.valueOf(serviceCidr),
690 MacAddress.valueOf(SERVICE_FAKE_MAC_STR), k8sNetwork, install);
691 }
692
Jian Li004526d2019-02-25 16:26:27 +0900693 private void setStatefulGroupFlowRules(DeviceId deviceId, long ctState,
694 long ctMask, Service service,
695 boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900696 List<GroupBucket> buckets = Lists.newArrayList();
697
698 String serviceName = service.getMetadata().getName();
699 String serviceIp = service.getSpec().getClusterIP();
700
701 // TODO: multi-ports case should be addressed
702 Integer servicePort = service.getSpec().getPorts().get(0).getPort();
Jian Libf562c22019-04-15 18:07:14 +0900703 String serviceProtocol = service.getSpec().getPorts().get(0).getProtocol();
704
705 String svcStr = servicePortStr(serviceIp, servicePort, serviceProtocol);
706 int groupId = svcStr.hashCode();
Jian Li2cc2b632019-02-18 00:56:40 +0900707
708 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
709 .stream()
710 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
711 .collect(Collectors.toList());
712
713 Map<String, String> nodeIpGatewayIpMap =
714 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
715
716 for (Endpoints endpoints : endpointses) {
717 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
718 List<EndpointPort> ports = endpointSubset.getPorts()
719 .stream()
720 .filter(p -> p.getProtocol().equals(TCP))
721 .collect(Collectors.toList());
722
723 for (EndpointAddress address : endpointSubset.getAddresses()) {
724 String podIp = nodeIpGatewayIpMap.containsKey(address.getIp()) ?
725 nodeIpGatewayIpMap.get(address.getIp()) : address.getIp();
726
727 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
728 niciraConnTrackTreatmentBuilder(driverService, deviceId)
729 .commit(true)
730 .natAction(true)
731 .natIp(IpAddress.valueOf(podIp))
732 .natFlag(CT_NAT_DST_FLAG);
733
734 ports.forEach(p -> {
735 ExtensionTreatment ctNatTreatment = connTreatmentBuilder
Jian Lieb488ea2019-04-16 01:50:02 +0900736 .natPortMin(TpPort.tpPort(p.getPort()))
737 .natPortMax(TpPort.tpPort(p.getPort()))
738 .build();
Jian Li2cc2b632019-02-18 00:56:40 +0900739 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
Jian Li73d3b6a2019-07-08 18:07:53 +0900740 deviceService.getDevice(deviceId), ACL_TABLE);
Jian Li2cc2b632019-02-18 00:56:40 +0900741 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
742 .extension(ctNatTreatment, deviceId)
743 .extension(resubmitTreatment, deviceId)
744 .build();
745 buckets.add(buildGroupBucket(treatment, SELECT, (short) -1));
746 });
747 }
748 }
749 }
750
751 if (!buckets.isEmpty()) {
752 k8sGroupRuleService.setRule(appId, deviceId, groupId, SELECT, buckets, install);
753
754 setTrackNew(deviceId, ctState, ctMask, IpAddress.valueOf(serviceIp),
755 TpPort.tpPort(servicePort), NAT_TABLE, groupId,
756 PRIORITY_CT_RULE, install);
757 }
758 }
759
760 private void setUntrack(DeviceId deviceId, long ctState, long ctMask,
761 String srcCidr, String dstCidr, int installTable,
762 int transitTable, int priority, boolean install) {
763 ExtensionSelector esCtSate = RulePopulatorUtil
764 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
765 TrafficSelector selector = DefaultTrafficSelector.builder()
766 .matchEthType(Ethernet.TYPE_IPV4)
767 .matchIPSrc(IpPrefix.valueOf(srcCidr))
768 .matchIPDst(IpPrefix.valueOf(dstCidr))
769 .extension(esCtSate, deviceId)
770 .build();
771
772 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
773 niciraConnTrackTreatmentBuilder(driverService, deviceId)
774 .natAction(false)
775 .commit(false)
776 .table((short) transitTable);
777
778 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
779 .extension(connTreatmentBuilder.build(), deviceId)
780 .build();
781
782 k8sFlowRuleService.setRule(
783 appId,
784 deviceId,
785 selector,
786 treatment,
787 priority,
788 installTable,
789 install);
790 }
791
792 private void setTrackNew(DeviceId deviceId, long ctState, long ctMask,
793 IpAddress dstIp, TpPort dstPort, int installTable,
794 int groupId, int priority, boolean install) {
795 ExtensionSelector esCtSate = RulePopulatorUtil
796 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
797 TrafficSelector selector = DefaultTrafficSelector.builder()
798 .matchEthType(Ethernet.TYPE_IPV4)
799 .matchIPDst(IpPrefix.valueOf(dstIp, HOST_CIDR_NUM))
800 .matchIPProtocol(IPv4.PROTOCOL_TCP)
801 .matchTcpDst(dstPort)
802 .extension(esCtSate, deviceId)
803 .build();
804 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
805 .group(GroupId.valueOf(groupId))
806 .build();
807
808 k8sFlowRuleService.setRule(
809 appId,
810 deviceId,
811 selector,
812 treatment,
813 priority,
814 installTable,
815 install);
816 }
817
818 private void setTrackEstablish(DeviceId deviceId, long ctState, long ctMask,
819 int installTable, int transitTable,
820 int priority, boolean install) {
821 ExtensionSelector esCtSate = RulePopulatorUtil
822 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
823 TrafficSelector selector = DefaultTrafficSelector.builder()
824 .extension(esCtSate, deviceId)
825 .build();
826
827 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
828 .transition(transitTable)
829 .build();
830
831 k8sFlowRuleService.setRule(
832 appId,
833 deviceId,
834 selector,
835 treatment,
836 priority,
837 installTable,
838 install);
839 }
840
Jian Libf562c22019-04-15 18:07:14 +0900841 private void setEndpointsRules(Endpoints endpoints, boolean install) {
842 String appName = endpoints.getMetadata().getName();
843 Service service = k8sServiceService.services().stream().filter(s ->
844 appName.equals(s.getMetadata().getName()))
845 .findFirst().orElse(null);
Jian Li7b63fe62019-04-14 21:49:27 +0900846
Jian Libf562c22019-04-15 18:07:14 +0900847 if (service == null) {
848 return;
Jian Li7b63fe62019-04-14 21:49:27 +0900849 }
Jian Libf562c22019-04-15 18:07:14 +0900850
851 setGroupBuckets(service, install);
Jian Li7b63fe62019-04-14 21:49:27 +0900852 }
853
Jian Lif5da78a2019-04-15 01:52:23 +0900854 private String servicePortStr(String ip, int port, String protocol) {
855 return ip + "_" + port + "_" + protocol;
856 }
857
Jian Li004526d2019-02-25 16:26:27 +0900858 /**
859 * Extracts properties from the component configuration context.
860 *
861 * @param context the component context
862 */
863 private void readComponentConfiguration(ComponentContext context) {
864 Dictionary<?, ?> properties = context.getProperties();
865
866 String updatedNatMode = Tools.get(properties, SERVICE_IP_NAT_MODE);
867 serviceIpNatMode = updatedNatMode != null ? updatedNatMode : SERVICE_IP_NAT_MODE_DEFAULT;
868 log.info("Configured. Service IP NAT mode is {}", serviceIpNatMode);
Jian Lif5da78a2019-04-15 01:52:23 +0900869
870 String updatedServiceCidr = Tools.get(properties, SERVICE_CIDR);
871 serviceCidr = updatedServiceCidr != null ?
872 updatedServiceCidr : SERVICE_IP_CIDR_DEFAULT;
873 log.info("Configured. Service VIP range is {}", serviceCidr);
Jian Li004526d2019-02-25 16:26:27 +0900874 }
875
Jian Li4a7ce672019-04-09 15:20:25 +0900876 private void setServiceNatRules(DeviceId deviceId, boolean install) {
877 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
878 setStatefulServiceNatRules(deviceId, install);
879 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
880 setStatelessServiceNatRules(deviceId, install);
881 } else {
882 log.warn("Service IP NAT mode was not configured!");
883 }
884 }
885
Jian Li2cc2b632019-02-18 00:56:40 +0900886 private class InternalK8sServiceListener implements K8sServiceListener {
887
888 private boolean isRelevantHelper() {
889 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
890 }
891
892 @Override
893 public void event(K8sServiceEvent event) {
894 switch (event.type()) {
895 case K8S_SERVICE_CREATED:
Jian Li4a7ce672019-04-09 15:20:25 +0900896 case K8S_SERVICE_UPDATED:
Jian Li2cc2b632019-02-18 00:56:40 +0900897 eventExecutor.execute(() -> processServiceCreation(event.subject()));
898 break;
899 case K8S_SERVICE_REMOVED:
900 eventExecutor.execute(() -> processServiceRemoval(event.subject()));
901 break;
902 default:
903 // do nothing
904 break;
905 }
906 }
907
908 private void processServiceCreation(Service service) {
909 if (!isRelevantHelper()) {
910 return;
911 }
912
Jian Li5e8a22a2019-02-27 11:48:42 +0900913 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
914 long ctTrackNew = computeCtStateFlag(true, true, false);
915 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
Jian Li2cc2b632019-02-18 00:56:40 +0900916
Jian Li5e8a22a2019-02-27 11:48:42 +0900917 k8sNodeService.completeNodes().forEach(n ->
918 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
919 ctMaskTrackNew, service, true));
920 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
921 k8sNodeService.completeNodes().forEach(n ->
922 setStatelessGroupFlowRules(n.intgBridge(), service, true));
923 }
Jian Li2cc2b632019-02-18 00:56:40 +0900924 }
925
926 private void processServiceRemoval(Service service) {
927 if (!isRelevantHelper()) {
928 return;
929 }
930
Jian Li5e8a22a2019-02-27 11:48:42 +0900931 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
932 long ctTrackNew = computeCtStateFlag(true, true, false);
933 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
Jian Li2cc2b632019-02-18 00:56:40 +0900934
Jian Li5e8a22a2019-02-27 11:48:42 +0900935 k8sNodeService.completeNodes().forEach(n ->
936 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
937 ctMaskTrackNew, service, false));
938 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
939 k8sNodeService.completeNodes().forEach(n ->
940 setStatelessGroupFlowRules(n.intgBridge(), service, false));
941 }
Jian Li004526d2019-02-25 16:26:27 +0900942 }
Jian Li2cc2b632019-02-18 00:56:40 +0900943 }
944
Jian Li7b63fe62019-04-14 21:49:27 +0900945 private class InternalK8sEndpointsListener implements K8sEndpointsListener {
946
947 private boolean isRelevantHelper() {
948 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
949 }
950
951 @Override
952 public void event(K8sEndpointsEvent event) {
953 Endpoints endpoints = event.subject();
954
955 switch (event.type()) {
956 case K8S_ENDPOINTS_CREATED:
Jian Libf562c22019-04-15 18:07:14 +0900957 case K8S_ENDPOINTS_UPDATED:
Jian Li7b63fe62019-04-14 21:49:27 +0900958 eventExecutor.execute(() -> processEndpointsCreation(endpoints));
959 break;
960 case K8S_ENDPOINTS_REMOVED:
961 eventExecutor.execute(() -> processEndpointsRemoval(endpoints));
962 break;
963 default:
964 break;
965 }
966 }
967
968 private void processEndpointsCreation(Endpoints endpoints) {
969 if (!isRelevantHelper()) {
970 return;
971 }
972
Jian Libf562c22019-04-15 18:07:14 +0900973 setEndpointsRules(endpoints, true);
Jian Li7b63fe62019-04-14 21:49:27 +0900974 }
975
976 private void processEndpointsRemoval(Endpoints endpoints) {
977 if (!isRelevantHelper()) {
978 return;
979 }
980
Jian Libf562c22019-04-15 18:07:14 +0900981 setEndpointsRules(endpoints, false);
Jian Li4a7ce672019-04-09 15:20:25 +0900982 }
983 }
984
Jian Li2cc2b632019-02-18 00:56:40 +0900985 private class InternalNodeEventListener implements K8sNodeListener {
986
987 private boolean isRelevantHelper() {
988 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
989 }
990
991 @Override
992 public void event(K8sNodeEvent event) {
993 K8sNode k8sNode = event.subject();
994 switch (event.type()) {
995 case K8S_NODE_COMPLETE:
996 eventExecutor.execute(() -> processNodeCompletion(k8sNode));
997 break;
998 case K8S_NODE_INCOMPLETE:
Jian Li4a7ce672019-04-09 15:20:25 +0900999 case K8S_NODE_REMOVED:
Jian Li2cc2b632019-02-18 00:56:40 +09001000 default:
1001 break;
1002 }
1003 }
1004
1005 private void processNodeCompletion(K8sNode node) {
1006 if (!isRelevantHelper()) {
1007 return;
1008 }
1009
Jian Li4a7ce672019-04-09 15:20:25 +09001010 setServiceNatRules(node.intgBridge(), true);
Jian Libf562c22019-04-15 18:07:14 +09001011 k8sEndpointsService.endpointses().forEach(e -> setEndpointsRules(e, true));
Jian Lif5da78a2019-04-15 01:52:23 +09001012 k8sNetworkService.networks().forEach(n -> setupServiceDefaultRule(n, true));
1013 }
1014 }
1015
1016 private class InternalK8sNetworkListener implements K8sNetworkListener {
1017
1018 private boolean isRelevantHelper() {
1019 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
1020 }
1021
1022 @Override
1023 public void event(K8sNetworkEvent event) {
1024 switch (event.type()) {
1025 case K8S_NETWORK_CREATED:
1026 eventExecutor.execute(() -> processNetworkCreation(event.subject()));
1027 break;
1028 case K8S_NETWORK_UPDATED:
1029 case K8S_NETWORK_REMOVED:
1030 default:
1031 break;
1032 }
1033 }
1034
1035 private void processNetworkCreation(K8sNetwork network) {
1036 if (!isRelevantHelper()) {
1037 return;
1038 }
1039
1040 setupServiceDefaultRule(network, true);
Jian Li2cc2b632019-02-18 00:56:40 +09001041 }
1042 }
1043}