blob: fd0dff061e950daa2acd5a7eb258d8f48579ead8 [file] [log] [blame]
Jian Li2cc2b632019-02-18 00:56:40 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.Lists;
Jian Li004526d2019-02-25 16:26:27 +090019import com.google.common.collect.Maps;
Jian Li2cc2b632019-02-18 00:56:40 +090020import io.fabric8.kubernetes.api.model.EndpointAddress;
21import io.fabric8.kubernetes.api.model.EndpointPort;
22import io.fabric8.kubernetes.api.model.EndpointSubset;
23import io.fabric8.kubernetes.api.model.Endpoints;
Jian Lie1a5b8f2019-07-23 17:13:19 +090024import io.fabric8.kubernetes.api.model.Pod;
Jian Li2cc2b632019-02-18 00:56:40 +090025import io.fabric8.kubernetes.api.model.Service;
Jian Li5e8a22a2019-02-27 11:48:42 +090026import io.fabric8.kubernetes.api.model.ServicePort;
Jian Li2cc2b632019-02-18 00:56:40 +090027import org.onlab.packet.Ethernet;
28import org.onlab.packet.IPv4;
29import org.onlab.packet.IpAddress;
30import org.onlab.packet.IpPrefix;
Jian Lif5da78a2019-04-15 01:52:23 +090031import org.onlab.packet.MacAddress;
Jian Li2cc2b632019-02-18 00:56:40 +090032import org.onlab.packet.TpPort;
Jian Li004526d2019-02-25 16:26:27 +090033import org.onlab.util.Tools;
34import org.onosproject.cfg.ComponentConfigService;
Jian Li2cc2b632019-02-18 00:56:40 +090035import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.LeadershipService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
40import org.onosproject.core.GroupId;
Jian Li7b63fe62019-04-14 21:49:27 +090041import org.onosproject.k8snetworking.api.K8sEndpointsEvent;
42import org.onosproject.k8snetworking.api.K8sEndpointsListener;
Jian Li2cc2b632019-02-18 00:56:40 +090043import org.onosproject.k8snetworking.api.K8sEndpointsService;
44import org.onosproject.k8snetworking.api.K8sFlowRuleService;
45import org.onosproject.k8snetworking.api.K8sGroupRuleService;
Jian Lif5da78a2019-04-15 01:52:23 +090046import org.onosproject.k8snetworking.api.K8sNetwork;
47import org.onosproject.k8snetworking.api.K8sNetworkEvent;
48import org.onosproject.k8snetworking.api.K8sNetworkListener;
Jian Li2cc2b632019-02-18 00:56:40 +090049import org.onosproject.k8snetworking.api.K8sNetworkService;
Jian Lie1a5b8f2019-07-23 17:13:19 +090050import org.onosproject.k8snetworking.api.K8sPodService;
Jian Li2cc2b632019-02-18 00:56:40 +090051import org.onosproject.k8snetworking.api.K8sServiceEvent;
52import org.onosproject.k8snetworking.api.K8sServiceListener;
53import org.onosproject.k8snetworking.api.K8sServiceService;
54import org.onosproject.k8snetworking.util.RulePopulatorUtil;
55import org.onosproject.k8snetworking.util.RulePopulatorUtil.NiciraConnTrackTreatmentBuilder;
56import org.onosproject.k8snode.api.K8sNode;
57import org.onosproject.k8snode.api.K8sNodeEvent;
58import org.onosproject.k8snode.api.K8sNodeListener;
59import org.onosproject.k8snode.api.K8sNodeService;
60import org.onosproject.net.DeviceId;
Jian Lif5da78a2019-04-15 01:52:23 +090061import org.onosproject.net.PortNumber;
Jian Li2cc2b632019-02-18 00:56:40 +090062import org.onosproject.net.device.DeviceService;
63import org.onosproject.net.driver.DriverService;
64import org.onosproject.net.flow.DefaultTrafficSelector;
65import org.onosproject.net.flow.DefaultTrafficTreatment;
66import org.onosproject.net.flow.TrafficSelector;
67import org.onosproject.net.flow.TrafficTreatment;
68import org.onosproject.net.flow.criteria.ExtensionSelector;
69import org.onosproject.net.flow.instructions.ExtensionTreatment;
70import org.onosproject.net.group.GroupBucket;
Jian Li2cc2b632019-02-18 00:56:40 +090071import org.onosproject.store.service.StorageService;
Jian Li004526d2019-02-25 16:26:27 +090072import org.osgi.service.component.ComponentContext;
Jian Li2cc2b632019-02-18 00:56:40 +090073import org.osgi.service.component.annotations.Activate;
74import org.osgi.service.component.annotations.Component;
75import org.osgi.service.component.annotations.Deactivate;
Jian Li004526d2019-02-25 16:26:27 +090076import org.osgi.service.component.annotations.Modified;
Jian Li2cc2b632019-02-18 00:56:40 +090077import org.osgi.service.component.annotations.Reference;
78import org.osgi.service.component.annotations.ReferenceCardinality;
79import org.slf4j.Logger;
80
Jian Li004526d2019-02-25 16:26:27 +090081import java.util.Dictionary;
Jian Li2cc2b632019-02-18 00:56:40 +090082import java.util.List;
83import java.util.Map;
84import java.util.Objects;
Jian Li004526d2019-02-25 16:26:27 +090085import java.util.Set;
Jian Li2cc2b632019-02-18 00:56:40 +090086import java.util.concurrent.ExecutorService;
87import java.util.stream.Collectors;
88
89import static java.util.concurrent.Executors.newSingleThreadExecutor;
90import static org.onlab.util.Tools.groupedThreads;
Jian Li73d3b6a2019-07-08 18:07:53 +090091import static org.onosproject.k8snetworking.api.Constants.ACL_TABLE;
Jian Li140d8a22019-04-24 23:41:44 +090092import static org.onosproject.k8snetworking.api.Constants.A_CLASS;
93import static org.onosproject.k8snetworking.api.Constants.B_CLASS;
Jian Li5e8a22a2019-02-27 11:48:42 +090094import static org.onosproject.k8snetworking.api.Constants.DST;
Jian Li73d3b6a2019-07-08 18:07:53 +090095import static org.onosproject.k8snetworking.api.Constants.GROUPING_TABLE;
Jian Li2cc2b632019-02-18 00:56:40 +090096import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Lie1a5b8f2019-07-23 17:13:19 +090097import static org.onosproject.k8snetworking.api.Constants.NAMESPACE_TABLE;
Jian Li004526d2019-02-25 16:26:27 +090098import static org.onosproject.k8snetworking.api.Constants.NAT_STATEFUL;
99import static org.onosproject.k8snetworking.api.Constants.NAT_STATELESS;
Jian Li2cc2b632019-02-18 00:56:40 +0900100import static org.onosproject.k8snetworking.api.Constants.NAT_TABLE;
Jian Li140d8a22019-04-24 23:41:44 +0900101import static org.onosproject.k8snetworking.api.Constants.NODE_IP_PREFIX;
Jian Li004526d2019-02-25 16:26:27 +0900102import static org.onosproject.k8snetworking.api.Constants.POD_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900103import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE;
Jian Li2cc2b632019-02-18 00:56:40 +0900104import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE;
Jian Li7d111d72019-04-12 13:58:44 +0900105import static org.onosproject.k8snetworking.api.Constants.PRIORITY_INTER_ROUTING_RULE;
Jian Li004526d2019-02-25 16:26:27 +0900106import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAT_RULE;
Jian Li2cc2b632019-02-18 00:56:40 +0900107import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900108import static org.onosproject.k8snetworking.api.Constants.SERVICE_FAKE_MAC_STR;
Jian Li004526d2019-02-25 16:26:27 +0900109import static org.onosproject.k8snetworking.api.Constants.SERVICE_TABLE;
110import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_CIDR;
111import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
Jian Li5e8a22a2019-02-27 11:48:42 +0900112import static org.onosproject.k8snetworking.api.Constants.SRC;
Jian Lie1a5b8f2019-07-23 17:13:19 +0900113import static org.onosproject.k8snetworking.api.Constants.STAT_EGRESS_TABLE;
Jian Li619fa282020-09-02 14:45:35 +0900114import static org.onosproject.k8snetworking.api.Constants.TUN_ENTRY_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900115import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_CIDR;
116import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_CIDR_DEFAULT;
Jian Li004526d2019-02-25 16:26:27 +0900117import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE;
118import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE_DEFAULT;
Jian Li140d8a22019-04-24 23:41:44 +0900119import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getBclassIpPrefixFromCidr;
Jian Li2cc2b632019-02-18 00:56:40 +0900120import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap;
Jian Lie1a5b8f2019-07-23 17:13:19 +0900121import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.podByIp;
122import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.portNumberByName;
Jian Lif5da78a2019-04-15 01:52:23 +0900123import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId;
Jian Li2cc2b632019-02-18 00:56:40 +0900124import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_DST_FLAG;
Jian Lif5da78a2019-04-15 01:52:23 +0900125import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension;
Jian Li2cc2b632019-02-18 00:56:40 +0900126import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildGroupBucket;
Jian Li004526d2019-02-25 16:26:27 +0900127import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildLoadExtension;
Jian Li2cc2b632019-02-18 00:56:40 +0900128import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildResubmitExtension;
129import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtMaskFlag;
130import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtStateFlag;
131import static org.onosproject.k8snetworking.util.RulePopulatorUtil.niciraConnTrackTreatmentBuilder;
132import static org.onosproject.net.group.GroupDescription.Type.SELECT;
133import static org.slf4j.LoggerFactory.getLogger;
134
135/**
136 * Handles the service IP to pod IP related translation traffic.
137 */
Jian Li004526d2019-02-25 16:26:27 +0900138@Component(
139 immediate = true,
140 property = {
Jian Lif5da78a2019-04-15 01:52:23 +0900141 SERVICE_IP_NAT_MODE + "=" + SERVICE_IP_NAT_MODE_DEFAULT,
142 SERVICE_CIDR + "=" + SERVICE_IP_CIDR_DEFAULT
Jian Li004526d2019-02-25 16:26:27 +0900143 }
144)
Jian Li2cc2b632019-02-18 00:56:40 +0900145public class K8sServiceHandler {
146
147 private final Logger log = getLogger(getClass());
148
Jian Li2cc2b632019-02-18 00:56:40 +0900149 private static final int HOST_CIDR_NUM = 32;
150
Jian Li2cc2b632019-02-18 00:56:40 +0900151 private static final String CLUSTER_IP = "ClusterIP";
152 private static final String TCP = "TCP";
Jian Li5e8a22a2019-02-27 11:48:42 +0900153 private static final String UDP = "UDP";
Jian Lif5da78a2019-04-15 01:52:23 +0900154 private static final String SERVICE_IP_NAT_MODE = "serviceIpNatMode";
Jian Li2cc2b632019-02-18 00:56:40 +0900155
Jian Li140d8a22019-04-24 23:41:44 +0900156 private static final String SERVICE_CIDR = "serviceCidr";
Jian Li44c2b122019-05-03 14:46:34 +0900157 private static final String NONE = "None";
Jian Li140d8a22019-04-24 23:41:44 +0900158 private static final String B_CLASS_SUFFIX = ".0.0/16";
159 private static final String A_CLASS_SUFFIX = ".0.0.0/8";
160
Jian Li2cc2b632019-02-18 00:56:40 +0900161 @Reference(cardinality = ReferenceCardinality.MANDATORY)
162 protected CoreService coreService;
163
164 @Reference(cardinality = ReferenceCardinality.MANDATORY)
165 protected LeadershipService leadershipService;
166
167 @Reference(cardinality = ReferenceCardinality.MANDATORY)
168 protected ClusterService clusterService;
169
170 @Reference(cardinality = ReferenceCardinality.MANDATORY)
171 protected DriverService driverService;
172
173 @Reference(cardinality = ReferenceCardinality.MANDATORY)
174 protected DeviceService deviceService;
175
176 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li004526d2019-02-25 16:26:27 +0900177 protected ComponentConfigService configService;
178
179 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li2cc2b632019-02-18 00:56:40 +0900180 protected StorageService storageService;
181
182 @Reference(cardinality = ReferenceCardinality.MANDATORY)
183 protected K8sNetworkService k8sNetworkService;
184
185 @Reference(cardinality = ReferenceCardinality.MANDATORY)
186 protected K8sFlowRuleService k8sFlowRuleService;
187
188 @Reference(cardinality = ReferenceCardinality.MANDATORY)
189 protected K8sGroupRuleService k8sGroupRuleService;
190
191 @Reference(cardinality = ReferenceCardinality.MANDATORY)
192 protected K8sNodeService k8sNodeService;
193
194 @Reference(cardinality = ReferenceCardinality.MANDATORY)
195 protected K8sEndpointsService k8sEndpointsService;
196
197 @Reference(cardinality = ReferenceCardinality.MANDATORY)
198 protected K8sServiceService k8sServiceService;
199
Jian Lie1a5b8f2019-07-23 17:13:19 +0900200 @Reference(cardinality = ReferenceCardinality.MANDATORY)
201 protected K8sPodService k8sPodService;
Jian Li4a7ce672019-04-09 15:20:25 +0900202
Jian Li004526d2019-02-25 16:26:27 +0900203 /** Service IP address translation mode. */
204 private String serviceIpNatMode = SERVICE_IP_NAT_MODE_DEFAULT;
205
Jian Lif5da78a2019-04-15 01:52:23 +0900206 /** Ranges of IP address of service VIP. */
207 private String serviceCidr = SERVICE_IP_CIDR_DEFAULT;
208
Jian Li2cc2b632019-02-18 00:56:40 +0900209 private final ExecutorService eventExecutor = newSingleThreadExecutor(
210 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
211 private final InternalNodeEventListener internalNodeEventListener =
212 new InternalNodeEventListener();
213 private final InternalK8sServiceListener internalK8sServiceListener =
214 new InternalK8sServiceListener();
Jian Li7b63fe62019-04-14 21:49:27 +0900215 private final InternalK8sEndpointsListener internalK8sEndpointsListener =
216 new InternalK8sEndpointsListener();
Jian Lif5da78a2019-04-15 01:52:23 +0900217 private final InternalK8sNetworkListener internalK8sNetworkListener =
218 new InternalK8sNetworkListener();
Jian Li2cc2b632019-02-18 00:56:40 +0900219
Jian Li2cc2b632019-02-18 00:56:40 +0900220 private ApplicationId appId;
221 private NodeId localNodeId;
222
223 @Activate
224 protected void activate() {
225 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
Jian Li004526d2019-02-25 16:26:27 +0900226 configService.registerProperties(getClass());
Jian Li2cc2b632019-02-18 00:56:40 +0900227 localNodeId = clusterService.getLocalNode().id();
228 leadershipService.runForLeadership(appId.name());
229 k8sNodeService.addListener(internalNodeEventListener);
230 k8sServiceService.addListener(internalK8sServiceListener);
Jian Li7b63fe62019-04-14 21:49:27 +0900231 k8sEndpointsService.addListener(internalK8sEndpointsListener);
Jian Lif5da78a2019-04-15 01:52:23 +0900232 k8sNetworkService.addListener(internalK8sNetworkListener);
Jian Li2cc2b632019-02-18 00:56:40 +0900233
Jian Li2cc2b632019-02-18 00:56:40 +0900234 log.info("Started");
235 }
236
237 @Deactivate
238 protected void deactivate() {
239 leadershipService.withdraw(appId.name());
240 k8sNodeService.removeListener(internalNodeEventListener);
241 k8sServiceService.removeListener(internalK8sServiceListener);
Jian Li7b63fe62019-04-14 21:49:27 +0900242 k8sEndpointsService.removeListener(internalK8sEndpointsListener);
Jian Lif5da78a2019-04-15 01:52:23 +0900243 k8sNetworkService.removeListener(internalK8sNetworkListener);
Jian Li004526d2019-02-25 16:26:27 +0900244 configService.unregisterProperties(getClass(), false);
Jian Li2cc2b632019-02-18 00:56:40 +0900245 eventExecutor.shutdown();
246
247 log.info("Stopped");
248 }
249
Jian Li004526d2019-02-25 16:26:27 +0900250 @Modified
251 void modified(ComponentContext context) {
252 readComponentConfiguration(context);
253
254 log.info("Modified");
255 }
256
257 private void setStatefulServiceNatRules(DeviceId deviceId, boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900258 // -trk CT rules
259 long ctUntrack = computeCtStateFlag(false, false, false);
260 long ctMaskUntrack = computeCtMaskFlag(true, false, false);
261
262 k8sNetworkService.networks().forEach(n -> {
263 // TODO: need to provide a way to add multiple service IP CIDR ranges
Jian Lif5da78a2019-04-15 01:52:23 +0900264 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), serviceCidr,
Jian Li73d3b6a2019-07-08 18:07:53 +0900265 GROUPING_TABLE, NAT_TABLE, PRIORITY_CT_RULE, install);
Jian Li2cc2b632019-02-18 00:56:40 +0900266 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), n.cidr(),
Jian Lie1a5b8f2019-07-23 17:13:19 +0900267 GROUPING_TABLE, NAMESPACE_TABLE, PRIORITY_CT_RULE, install);
Jian Li2cc2b632019-02-18 00:56:40 +0900268 });
269
270 // +trk-new CT rules
271 long ctTrackUnnew = computeCtStateFlag(true, false, false);
272 long ctMaskTrackUnnew = computeCtMaskFlag(true, true, false);
273
274 setTrackEstablish(deviceId, ctTrackUnnew, ctMaskTrackUnnew,
275 NAT_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
276
277 // +trk+new CT rules
278 long ctTrackNew = computeCtStateFlag(true, true, false);
279 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
280
281 k8sServiceService.services().stream()
282 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
Jian Li004526d2019-02-25 16:26:27 +0900283 .forEach(s -> setStatefulGroupFlowRules(deviceId, ctTrackNew,
284 ctMaskTrackNew, s, install));
Jian Li2cc2b632019-02-18 00:56:40 +0900285 }
286
Jian Li004526d2019-02-25 16:26:27 +0900287 private void setStatelessServiceNatRules(DeviceId deviceId, boolean install) {
288
Jian Li140d8a22019-04-24 23:41:44 +0900289 String srcPodCidr = k8sNetworkService.network(
Jian Li7d111d72019-04-12 13:58:44 +0900290 k8sNodeService.node(deviceId).hostname()).cidr();
Jian Li140d8a22019-04-24 23:41:44 +0900291 String srcPodPrefix = getBclassIpPrefixFromCidr(srcPodCidr);
292 String fullSrcPodCidr = srcPodPrefix + B_CLASS_SUFFIX;
293 String fullSrcNodeCidr = NODE_IP_PREFIX + A_CLASS_SUFFIX;
294
295 // src: POD -> dst: service (unNAT POD) grouping
296 setSrcDstCidrRules(deviceId, fullSrcPodCidr, serviceCidr, B_CLASS, null,
Jian Li73d3b6a2019-07-08 18:07:53 +0900297 SHIFTED_IP_PREFIX, SRC, GROUPING_TABLE, SERVICE_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900298 PRIORITY_CT_RULE, install);
299 // src: POD (unNAT service) -> dst: shifted POD grouping
300 setSrcDstCidrRules(deviceId, fullSrcPodCidr, SHIFTED_IP_CIDR, B_CLASS, null,
Jian Li73d3b6a2019-07-08 18:07:53 +0900301 srcPodPrefix, DST, GROUPING_TABLE, POD_TABLE, PRIORITY_CT_RULE, install);
Jian Li140d8a22019-04-24 23:41:44 +0900302
303 // src: node -> dst: service (unNAT POD) grouping
304 setSrcDstCidrRules(deviceId, fullSrcNodeCidr, serviceCidr, A_CLASS,
Jian Li73d3b6a2019-07-08 18:07:53 +0900305 null, null, null, GROUPING_TABLE, SERVICE_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900306 PRIORITY_CT_RULE, install);
307 // src: POD (unNAT service) -> dst: node grouping
308 setSrcDstCidrRules(deviceId, fullSrcPodCidr, fullSrcNodeCidr, A_CLASS,
Jian Li73d3b6a2019-07-08 18:07:53 +0900309 null, null, null, GROUPING_TABLE, POD_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900310 PRIORITY_CT_RULE, install);
Jian Li7d111d72019-04-12 13:58:44 +0900311
Jian Li004526d2019-02-25 16:26:27 +0900312 k8sNetworkService.networks().forEach(n -> {
Jian Li140d8a22019-04-24 23:41:44 +0900313 setSrcDstCidrRules(deviceId, fullSrcPodCidr, n.cidr(), B_CLASS,
314 n.segmentId(), null, null, ROUTING_TABLE,
Jian Li73d3b6a2019-07-08 18:07:53 +0900315 STAT_EGRESS_TABLE, PRIORITY_INTER_ROUTING_RULE, install);
Jian Li004526d2019-02-25 16:26:27 +0900316 });
317
318 // setup load balancing rules using group table
319 k8sServiceService.services().stream()
320 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
321 .forEach(s -> setStatelessGroupFlowRules(deviceId, s, install));
322 }
323
324 private void setSrcDstCidrRules(DeviceId deviceId, String srcCidr,
Jian Li140d8a22019-04-24 23:41:44 +0900325 String dstCidr, String cidrClass,
326 String segId, String shiftPrefix,
327 String shiftType, int installTable,
328 int transitTable, int priority,
329 boolean install) {
Jian Li004526d2019-02-25 16:26:27 +0900330 TrafficSelector selector = DefaultTrafficSelector.builder()
331 .matchEthType(Ethernet.TYPE_IPV4)
332 .matchIPSrc(IpPrefix.valueOf(srcCidr))
333 .matchIPDst(IpPrefix.valueOf(dstCidr))
334 .build();
335
Jian Li7d111d72019-04-12 13:58:44 +0900336 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
337
338 if (segId != null) {
339 tBuilder.setTunnelId(Long.valueOf(segId));
340 }
Jian Li140d8a22019-04-24 23:41:44 +0900341
342 if (shiftPrefix != null && shiftType != null) {
343 ExtensionTreatment loadTreatment = buildLoadExtension(
344 deviceService.getDevice(deviceId), cidrClass, shiftType, shiftPrefix);
345 tBuilder.extension(loadTreatment, deviceId);
346 }
347
Jian Li7d111d72019-04-12 13:58:44 +0900348 tBuilder.transition(transitTable);
Jian Li004526d2019-02-25 16:26:27 +0900349
350 k8sFlowRuleService.setRule(
351 appId,
352 deviceId,
353 selector,
Jian Li7d111d72019-04-12 13:58:44 +0900354 tBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900355 priority,
356 installTable,
357 install);
358 }
359
Jian Li5e8a22a2019-02-27 11:48:42 +0900360 /**
361 * Obtains the service port to endpoint address paired map.
362 *
363 * @param service kubernetes service
364 * @return a map where key is kubernetes service port, and value is the
365 * endpoint addresses that are associated with the service port
366 */
367 private Map<ServicePort, Set<String>> getSportEpAddressMap(Service service) {
368
369 Map<ServicePort, Set<String>> map = Maps.newConcurrentMap();
Jian Li004526d2019-02-25 16:26:27 +0900370
371 String serviceName = service.getMetadata().getName();
Jian Li004526d2019-02-25 16:26:27 +0900372 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
373 .stream()
374 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
375 .collect(Collectors.toList());
376
Jian Li5e8a22a2019-02-27 11:48:42 +0900377 service.getSpec().getPorts().stream()
378 .filter(Objects::nonNull)
379 .filter(sp -> sp.getTargetPort() != null)
Jian Lib7dfb5b2019-07-15 17:37:12 +0900380 .filter(sp -> sp.getTargetPort().getIntVal() != null ||
Jian Lie1a5b8f2019-07-23 17:13:19 +0900381 sp.getTargetPort().getStrVal() != null)
Jian Li5e8a22a2019-02-27 11:48:42 +0900382 .forEach(sp -> {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900383 Integer targetPortInt = sp.getTargetPort().getIntVal() != null ?
384 sp.getTargetPort().getIntVal() : 0;
385 String targetPortName = sp.getTargetPort().getStrVal() != null ?
386 sp.getTargetPort().getStrVal() : "";
387 String targetProtocol = sp.getProtocol();
Jian Li004526d2019-02-25 16:26:27 +0900388
Jian Lie1a5b8f2019-07-23 17:13:19 +0900389 for (Endpoints endpoints : endpointses) {
390 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
391
392 // in case service port name is specified but not port number
393 // we will lookup the container port number and use it
394 // as the target port number
395 if (!targetPortName.equals("") && targetPortInt == 0) {
396 for (EndpointAddress addr : endpointSubset.getAddresses()) {
397 Pod pod = podByIp(k8sPodService, addr.getIp());
398 targetPortInt = portNumberByName(pod, targetPortName);
399 }
400 }
401
Jian Li5cf3b002019-08-30 17:57:53 +0900402 if (targetPortInt == 0) {
403 continue;
404 }
405
Jian Lie1a5b8f2019-07-23 17:13:19 +0900406 for (EndpointPort endpointPort : endpointSubset.getPorts()) {
407 if (targetProtocol.equals(endpointPort.getProtocol()) &&
408 (targetPortInt.equals(endpointPort.getPort()) ||
409 targetPortName.equals(endpointPort.getName()))) {
410 Set<String> addresses = endpointSubset.getAddresses()
411 .stream().map(EndpointAddress::getIp)
412 .collect(Collectors.toSet());
413 map.put(sp, addresses);
414 }
415 }
Jian Li004526d2019-02-25 16:26:27 +0900416 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900417 }
Jian Lie1a5b8f2019-07-23 17:13:19 +0900418 });
Jian Li004526d2019-02-25 16:26:27 +0900419
Jian Li5e8a22a2019-02-27 11:48:42 +0900420 return map;
421 }
422
Jian Libf562c22019-04-15 18:07:14 +0900423 private void setGroupBuckets(Service service, boolean install) {
Jian Li4a7ce672019-04-09 15:20:25 +0900424 Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
425 Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
Jian Li7b63fe62019-04-14 21:49:27 +0900426 Map<String, String> nodeIpGatewayIpMap =
427 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
Jian Li4a7ce672019-04-09 15:20:25 +0900428
Jian Libf562c22019-04-15 18:07:14 +0900429 for (K8sNode node : k8sNodeService.completeNodes()) {
430 spEpasMap.forEach((sp, epas) -> {
431 List<GroupBucket> bkts = Lists.newArrayList();
Jian Li4a7ce672019-04-09 15:20:25 +0900432
Jian Libf562c22019-04-15 18:07:14 +0900433 for (String ip : epas) {
Jian Li5cf3b002019-08-30 17:57:53 +0900434 GroupBucket bkt = buildBuckets(node.intgBridge(),
435 nodeIpGatewayIpMap.getOrDefault(ip, ip), sp);
436
437 if (bkt == null) {
438 continue;
439 }
440
Jian Libf562c22019-04-15 18:07:14 +0900441 if (install) {
Jian Li5cf3b002019-08-30 17:57:53 +0900442 bkts.add(bkt);
Jian Libf562c22019-04-15 18:07:14 +0900443 } else {
Jian Li5cf3b002019-08-30 17:57:53 +0900444 bkts.remove(bkt);
Jian Li7b63fe62019-04-14 21:49:27 +0900445 }
Jian Libf562c22019-04-15 18:07:14 +0900446 }
447
448 spGrpBkts.put(sp, bkts);
449 });
450
451 String serviceIp = service.getSpec().getClusterIP();
452 spGrpBkts.forEach((sp, bkts) -> {
453 String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
454 int groupId = svcStr.hashCode();
455
456 if (bkts.size() > 0) {
457 k8sGroupRuleService.setBuckets(appId, node.intgBridge(), groupId, bkts);
458 }
459 });
460
461 spEpasMap.forEach((sp, epas) ->
Jian Lie1a5b8f2019-07-23 17:13:19 +0900462 // add flow rules for unshifting IP domain
463 epas.forEach(epa -> {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900464
Jian Lie1a5b8f2019-07-23 17:13:19 +0900465 String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
466
467 int targetPort;
468 if (sp.getTargetPort().getIntVal() == null) {
469 Pod pod = podByIp(k8sPodService, podIp);
470 targetPort = portNumberByName(pod, sp.getTargetPort().getStrVal());
471 } else {
472 targetPort = sp.getTargetPort().getIntVal();
473 }
474
Jian Li5cf3b002019-08-30 17:57:53 +0900475 if (targetPort != 0) {
476 setUnshiftDomainRules(node.intgBridge(), POD_TABLE,
477 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
478 sp.getProtocol(), podIp,
479 targetPort, install);
480 }
Jian Lie1a5b8f2019-07-23 17:13:19 +0900481 })
Jian Libf562c22019-04-15 18:07:14 +0900482 );
483 }
Jian Li4a7ce672019-04-09 15:20:25 +0900484 }
485
Jian Lib7dfb5b2019-07-15 17:37:12 +0900486 private GroupBucket buildBuckets(DeviceId deviceId, String podIpStr, ServicePort sp) {
Jian Li4a7ce672019-04-09 15:20:25 +0900487 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li7d111d72019-04-12 13:58:44 +0900488 .setIpDst(IpAddress.valueOf(podIpStr));
Jian Li4a7ce672019-04-09 15:20:25 +0900489
Jian Lie1a5b8f2019-07-23 17:13:19 +0900490 int targetPort;
491 if (sp.getTargetPort().getIntVal() == null) {
492 Pod pod = podByIp(k8sPodService, podIpStr);
493 targetPort = portNumberByName(pod, sp.getTargetPort().getStrVal());
494 } else {
495 targetPort = sp.getTargetPort().getIntVal();
496 }
Jian Lib7dfb5b2019-07-15 17:37:12 +0900497
Jian Li5cf3b002019-08-30 17:57:53 +0900498 if (targetPort == 0) {
499 return null;
500 }
501
Jian Li4a7ce672019-04-09 15:20:25 +0900502 if (TCP.equals(sp.getProtocol())) {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900503 tBuilder.setTcpDst(TpPort.tpPort(targetPort));
Jian Li4a7ce672019-04-09 15:20:25 +0900504 } else if (UDP.equals(sp.getProtocol())) {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900505 tBuilder.setUdpDst(TpPort.tpPort(targetPort));
Jian Li4a7ce672019-04-09 15:20:25 +0900506 }
507
Jian Li7d111d72019-04-12 13:58:44 +0900508 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
Jian Li73d3b6a2019-07-08 18:07:53 +0900509 deviceService.getDevice(deviceId), ACL_TABLE);
Jian Li7d111d72019-04-12 13:58:44 +0900510 tBuilder.extension(resubmitTreatment, deviceId);
511
Jian Li5cf3b002019-08-30 17:57:53 +0900512 // TODO: need to adjust group bucket weight by considering POD locality
Jian Libf562c22019-04-15 18:07:14 +0900513 return buildGroupBucket(tBuilder.build(), SELECT, (short) -1);
Jian Li4a7ce672019-04-09 15:20:25 +0900514 }
515
516 private synchronized void setStatelessGroupFlowRules(DeviceId deviceId,
517 Service service,
518 boolean install) {
Jian Li7b63fe62019-04-14 21:49:27 +0900519 Set<ServicePort> sps = service.getSpec().getPorts().stream()
520 .filter(Objects::nonNull)
521 .filter(sp -> sp.getTargetPort() != null)
Jian Lib7dfb5b2019-07-15 17:37:12 +0900522 .filter(sp -> sp.getTargetPort().getIntVal() != null ||
Jian Lie1a5b8f2019-07-23 17:13:19 +0900523 sp.getTargetPort().getStrVal() != null)
Jian Li7b63fe62019-04-14 21:49:27 +0900524 .collect(Collectors.toSet());
Jian Li5e8a22a2019-02-27 11:48:42 +0900525
526 String serviceIp = service.getSpec().getClusterIP();
Jian Li7b63fe62019-04-14 21:49:27 +0900527 sps.forEach(sp -> {
Jian Li5e8a22a2019-02-27 11:48:42 +0900528 String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
Jian Li4a7ce672019-04-09 15:20:25 +0900529 int groupId = svcStr.hashCode();
Jian Li5e8a22a2019-02-27 11:48:42 +0900530
Jian Li4a7ce672019-04-09 15:20:25 +0900531 if (install) {
532
533 // add group table rules
534 k8sGroupRuleService.setRule(appId, deviceId, groupId,
Jian Li7b63fe62019-04-14 21:49:27 +0900535 SELECT, Lists.newArrayList(), true);
Jian Li4a7ce672019-04-09 15:20:25 +0900536
537 log.info("Adding group rule {}", groupId);
538
539 // if we failed to add group rule, we will not install flow rules
540 // as this might cause rule inconsistency
541 if (k8sGroupRuleService.hasGroup(deviceId, groupId)) {
542 // add flow rules for shifting IP domain
543 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
544 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
545 sp.getProtocol(), true);
546 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900547 } else {
Jian Li4a7ce672019-04-09 15:20:25 +0900548 // remove flow rules for shifting IP domain
549 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
550 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
551 sp.getProtocol(), false);
552
553 // remove group table rules
554 k8sGroupRuleService.setRule(appId, deviceId, groupId,
Jian Li7b63fe62019-04-14 21:49:27 +0900555 SELECT, Lists.newArrayList(), false);
Jian Li4a7ce672019-04-09 15:20:25 +0900556
557 log.info("Removing group rule {}", groupId);
Jian Li5e8a22a2019-02-27 11:48:42 +0900558 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900559 });
Jian Li004526d2019-02-25 16:26:27 +0900560 }
561
562 private void setShiftDomainRules(DeviceId deviceId, int installTable,
Jian Li5e8a22a2019-02-27 11:48:42 +0900563 int groupId, int priority, String serviceIp,
564 int servicePort, String protocol, boolean install) {
Jian Li44c2b122019-05-03 14:46:34 +0900565
566 if (serviceIp == null || NONE.equals(serviceIp)) {
567 return;
568 }
569
Jian Li5e8a22a2019-02-27 11:48:42 +0900570 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Li004526d2019-02-25 16:26:27 +0900571 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li5e8a22a2019-02-27 11:48:42 +0900572 .matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(serviceIp), HOST_CIDR_NUM));
573
574 if (TCP.equals(protocol)) {
575 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
576 .matchTcpDst(TpPort.tpPort(servicePort));
577 } else if (UDP.equals(protocol)) {
578 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
579 .matchUdpDst(TpPort.tpPort(servicePort));
580 }
Jian Li004526d2019-02-25 16:26:27 +0900581
Jian Li004526d2019-02-25 16:26:27 +0900582 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
Jian Li004526d2019-02-25 16:26:27 +0900583 .group(GroupId.valueOf(groupId))
584 .build();
585
586 k8sFlowRuleService.setRule(
587 appId,
588 deviceId,
Jian Li5e8a22a2019-02-27 11:48:42 +0900589 sBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900590 treatment,
591 priority,
592 installTable,
593 install);
594 }
595
596 private void setUnshiftDomainRules(DeviceId deviceId, int installTable,
Jian Li5e8a22a2019-02-27 11:48:42 +0900597 int priority, String serviceIp,
598 int servicePort, String protocol,
599 String podIp, int podPort, boolean install) {
600 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Li004526d2019-02-25 16:26:27 +0900601 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li5e8a22a2019-02-27 11:48:42 +0900602 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(podIp), HOST_CIDR_NUM));
603
604 if (TCP.equals(protocol)) {
605 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
606 .matchTcpSrc(TpPort.tpPort(podPort));
607 } else if (UDP.equals(protocol)) {
608 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
609 .matchUdpSrc(TpPort.tpPort(podPort));
610 }
611
Jian Li5e8a22a2019-02-27 11:48:42 +0900612 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li004526d2019-02-25 16:26:27 +0900613 .setIpSrc(IpAddress.valueOf(serviceIp))
Jian Li73d3b6a2019-07-08 18:07:53 +0900614 .transition(ACL_TABLE);
Jian Li5e8a22a2019-02-27 11:48:42 +0900615
616 if (TCP.equals(protocol)) {
617 tBuilder.setTcpSrc(TpPort.tpPort(servicePort));
618 } else if (UDP.equals(protocol)) {
619 tBuilder.setUdpSrc(TpPort.tpPort(servicePort));
620 }
Jian Li004526d2019-02-25 16:26:27 +0900621
622 k8sFlowRuleService.setRule(
623 appId,
624 deviceId,
Jian Li5e8a22a2019-02-27 11:48:42 +0900625 sBuilder.build(),
626 tBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900627 priority,
628 installTable,
629 install);
630 }
631
Jian Lif5da78a2019-04-15 01:52:23 +0900632 private void setCidrRoutingRule(IpPrefix prefix, MacAddress mac,
633 K8sNetwork network, boolean install) {
634 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
635 .matchEthType(Ethernet.TYPE_IPV4)
636 .matchIPSrc(prefix)
637 .matchIPDst(IpPrefix.valueOf(network.cidr()));
638
639 k8sNodeService.completeNodes().forEach(n -> {
640 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
641 .setTunnelId(Long.valueOf(network.segmentId()));
642
643 if (n.hostname().equals(network.name())) {
644 if (mac != null) {
645 tBuilder.setEthSrc(mac);
646 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900647 tBuilder.transition(STAT_EGRESS_TABLE);
Jian Lif5da78a2019-04-15 01:52:23 +0900648 } else {
Jian Lif5da78a2019-04-15 01:52:23 +0900649 K8sNode localNode = k8sNodeService.node(network.name());
650
Jian Li619fa282020-09-02 14:45:35 +0900651 tBuilder.setOutput(n.intgToTunPortNum());
652
653 PortNumber portNum = tunnelPortNumByNetId(network.networkId(),
654 k8sNetworkService, n);
655
656 // install rules into tunnel bridge
657 TrafficTreatment treatmentToRemote = DefaultTrafficTreatment.builder()
658 .extension(buildExtension(
659 deviceService,
660 n.tunBridge(),
661 localNode.dataIp().getIp4Address()),
662 n.tunBridge())
663 .setTunnelId(Long.valueOf(network.segmentId()))
664 .setOutput(portNum)
665 .build();
666
667 k8sFlowRuleService.setRule(
668 appId,
669 n.tunBridge(),
670 sBuilder.build(),
671 treatmentToRemote,
672 PRIORITY_CIDR_RULE,
673 TUN_ENTRY_TABLE,
674 install
675 );
Jian Lif5da78a2019-04-15 01:52:23 +0900676 }
677
678 k8sFlowRuleService.setRule(
679 appId,
680 n.intgBridge(),
681 sBuilder.build(),
682 tBuilder.build(),
683 PRIORITY_CIDR_RULE,
684 ROUTING_TABLE,
685 install
686 );
687 });
688 }
689
690 private void setupServiceDefaultRule(K8sNetwork k8sNetwork, boolean install) {
691 setCidrRoutingRule(IpPrefix.valueOf(serviceCidr),
692 MacAddress.valueOf(SERVICE_FAKE_MAC_STR), k8sNetwork, install);
693 }
694
Jian Li004526d2019-02-25 16:26:27 +0900695 private void setStatefulGroupFlowRules(DeviceId deviceId, long ctState,
696 long ctMask, Service service,
697 boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900698 List<GroupBucket> buckets = Lists.newArrayList();
699
700 String serviceName = service.getMetadata().getName();
701 String serviceIp = service.getSpec().getClusterIP();
702
703 // TODO: multi-ports case should be addressed
704 Integer servicePort = service.getSpec().getPorts().get(0).getPort();
Jian Libf562c22019-04-15 18:07:14 +0900705 String serviceProtocol = service.getSpec().getPorts().get(0).getProtocol();
706
707 String svcStr = servicePortStr(serviceIp, servicePort, serviceProtocol);
708 int groupId = svcStr.hashCode();
Jian Li2cc2b632019-02-18 00:56:40 +0900709
710 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
711 .stream()
712 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
713 .collect(Collectors.toList());
714
715 Map<String, String> nodeIpGatewayIpMap =
716 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
717
718 for (Endpoints endpoints : endpointses) {
719 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
720 List<EndpointPort> ports = endpointSubset.getPorts()
721 .stream()
722 .filter(p -> p.getProtocol().equals(TCP))
723 .collect(Collectors.toList());
724
725 for (EndpointAddress address : endpointSubset.getAddresses()) {
726 String podIp = nodeIpGatewayIpMap.containsKey(address.getIp()) ?
727 nodeIpGatewayIpMap.get(address.getIp()) : address.getIp();
728
729 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
730 niciraConnTrackTreatmentBuilder(driverService, deviceId)
731 .commit(true)
732 .natAction(true)
733 .natIp(IpAddress.valueOf(podIp))
734 .natFlag(CT_NAT_DST_FLAG);
735
736 ports.forEach(p -> {
737 ExtensionTreatment ctNatTreatment = connTreatmentBuilder
Jian Lieb488ea2019-04-16 01:50:02 +0900738 .natPortMin(TpPort.tpPort(p.getPort()))
739 .natPortMax(TpPort.tpPort(p.getPort()))
740 .build();
Jian Li2cc2b632019-02-18 00:56:40 +0900741 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
Jian Li73d3b6a2019-07-08 18:07:53 +0900742 deviceService.getDevice(deviceId), ACL_TABLE);
Jian Li2cc2b632019-02-18 00:56:40 +0900743 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
744 .extension(ctNatTreatment, deviceId)
745 .extension(resubmitTreatment, deviceId)
746 .build();
747 buckets.add(buildGroupBucket(treatment, SELECT, (short) -1));
748 });
749 }
750 }
751 }
752
753 if (!buckets.isEmpty()) {
754 k8sGroupRuleService.setRule(appId, deviceId, groupId, SELECT, buckets, install);
755
756 setTrackNew(deviceId, ctState, ctMask, IpAddress.valueOf(serviceIp),
757 TpPort.tpPort(servicePort), NAT_TABLE, groupId,
758 PRIORITY_CT_RULE, install);
759 }
760 }
761
762 private void setUntrack(DeviceId deviceId, long ctState, long ctMask,
763 String srcCidr, String dstCidr, int installTable,
764 int transitTable, int priority, boolean install) {
765 ExtensionSelector esCtSate = RulePopulatorUtil
766 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
767 TrafficSelector selector = DefaultTrafficSelector.builder()
768 .matchEthType(Ethernet.TYPE_IPV4)
769 .matchIPSrc(IpPrefix.valueOf(srcCidr))
770 .matchIPDst(IpPrefix.valueOf(dstCidr))
771 .extension(esCtSate, deviceId)
772 .build();
773
774 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
775 niciraConnTrackTreatmentBuilder(driverService, deviceId)
776 .natAction(false)
777 .commit(false)
778 .table((short) transitTable);
779
780 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
781 .extension(connTreatmentBuilder.build(), deviceId)
782 .build();
783
784 k8sFlowRuleService.setRule(
785 appId,
786 deviceId,
787 selector,
788 treatment,
789 priority,
790 installTable,
791 install);
792 }
793
794 private void setTrackNew(DeviceId deviceId, long ctState, long ctMask,
795 IpAddress dstIp, TpPort dstPort, int installTable,
796 int groupId, int priority, boolean install) {
797 ExtensionSelector esCtSate = RulePopulatorUtil
798 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
799 TrafficSelector selector = DefaultTrafficSelector.builder()
800 .matchEthType(Ethernet.TYPE_IPV4)
801 .matchIPDst(IpPrefix.valueOf(dstIp, HOST_CIDR_NUM))
802 .matchIPProtocol(IPv4.PROTOCOL_TCP)
803 .matchTcpDst(dstPort)
804 .extension(esCtSate, deviceId)
805 .build();
806 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
807 .group(GroupId.valueOf(groupId))
808 .build();
809
810 k8sFlowRuleService.setRule(
811 appId,
812 deviceId,
813 selector,
814 treatment,
815 priority,
816 installTable,
817 install);
818 }
819
820 private void setTrackEstablish(DeviceId deviceId, long ctState, long ctMask,
821 int installTable, int transitTable,
822 int priority, boolean install) {
823 ExtensionSelector esCtSate = RulePopulatorUtil
824 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
825 TrafficSelector selector = DefaultTrafficSelector.builder()
826 .extension(esCtSate, deviceId)
827 .build();
828
829 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
830 .transition(transitTable)
831 .build();
832
833 k8sFlowRuleService.setRule(
834 appId,
835 deviceId,
836 selector,
837 treatment,
838 priority,
839 installTable,
840 install);
841 }
842
Jian Libf562c22019-04-15 18:07:14 +0900843 private void setEndpointsRules(Endpoints endpoints, boolean install) {
844 String appName = endpoints.getMetadata().getName();
845 Service service = k8sServiceService.services().stream().filter(s ->
846 appName.equals(s.getMetadata().getName()))
847 .findFirst().orElse(null);
Jian Li7b63fe62019-04-14 21:49:27 +0900848
Jian Libf562c22019-04-15 18:07:14 +0900849 if (service == null) {
850 return;
Jian Li7b63fe62019-04-14 21:49:27 +0900851 }
Jian Libf562c22019-04-15 18:07:14 +0900852
853 setGroupBuckets(service, install);
Jian Li7b63fe62019-04-14 21:49:27 +0900854 }
855
Jian Lif5da78a2019-04-15 01:52:23 +0900856 private String servicePortStr(String ip, int port, String protocol) {
857 return ip + "_" + port + "_" + protocol;
858 }
859
Jian Li004526d2019-02-25 16:26:27 +0900860 /**
861 * Extracts properties from the component configuration context.
862 *
863 * @param context the component context
864 */
865 private void readComponentConfiguration(ComponentContext context) {
866 Dictionary<?, ?> properties = context.getProperties();
867
868 String updatedNatMode = Tools.get(properties, SERVICE_IP_NAT_MODE);
869 serviceIpNatMode = updatedNatMode != null ? updatedNatMode : SERVICE_IP_NAT_MODE_DEFAULT;
870 log.info("Configured. Service IP NAT mode is {}", serviceIpNatMode);
Jian Lif5da78a2019-04-15 01:52:23 +0900871
872 String updatedServiceCidr = Tools.get(properties, SERVICE_CIDR);
873 serviceCidr = updatedServiceCidr != null ?
874 updatedServiceCidr : SERVICE_IP_CIDR_DEFAULT;
875 log.info("Configured. Service VIP range is {}", serviceCidr);
Jian Li004526d2019-02-25 16:26:27 +0900876 }
877
Jian Li4a7ce672019-04-09 15:20:25 +0900878 private void setServiceNatRules(DeviceId deviceId, boolean install) {
879 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
880 setStatefulServiceNatRules(deviceId, install);
881 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
882 setStatelessServiceNatRules(deviceId, install);
883 } else {
884 log.warn("Service IP NAT mode was not configured!");
885 }
886 }
887
Jian Li2cc2b632019-02-18 00:56:40 +0900888 private class InternalK8sServiceListener implements K8sServiceListener {
889
890 private boolean isRelevantHelper() {
891 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
892 }
893
894 @Override
895 public void event(K8sServiceEvent event) {
896 switch (event.type()) {
897 case K8S_SERVICE_CREATED:
Jian Li4a7ce672019-04-09 15:20:25 +0900898 case K8S_SERVICE_UPDATED:
Jian Li2cc2b632019-02-18 00:56:40 +0900899 eventExecutor.execute(() -> processServiceCreation(event.subject()));
900 break;
901 case K8S_SERVICE_REMOVED:
902 eventExecutor.execute(() -> processServiceRemoval(event.subject()));
903 break;
904 default:
905 // do nothing
906 break;
907 }
908 }
909
910 private void processServiceCreation(Service service) {
911 if (!isRelevantHelper()) {
912 return;
913 }
914
Jian Li5e8a22a2019-02-27 11:48:42 +0900915 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
916 long ctTrackNew = computeCtStateFlag(true, true, false);
917 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
Jian Li2cc2b632019-02-18 00:56:40 +0900918
Jian Li5e8a22a2019-02-27 11:48:42 +0900919 k8sNodeService.completeNodes().forEach(n ->
920 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
921 ctMaskTrackNew, service, true));
922 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
923 k8sNodeService.completeNodes().forEach(n ->
924 setStatelessGroupFlowRules(n.intgBridge(), service, true));
925 }
Jian Li2cc2b632019-02-18 00:56:40 +0900926 }
927
928 private void processServiceRemoval(Service service) {
929 if (!isRelevantHelper()) {
930 return;
931 }
932
Jian Li5e8a22a2019-02-27 11:48:42 +0900933 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
934 long ctTrackNew = computeCtStateFlag(true, true, false);
935 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
Jian Li2cc2b632019-02-18 00:56:40 +0900936
Jian Li5e8a22a2019-02-27 11:48:42 +0900937 k8sNodeService.completeNodes().forEach(n ->
938 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
939 ctMaskTrackNew, service, false));
940 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
941 k8sNodeService.completeNodes().forEach(n ->
942 setStatelessGroupFlowRules(n.intgBridge(), service, false));
943 }
Jian Li004526d2019-02-25 16:26:27 +0900944 }
Jian Li2cc2b632019-02-18 00:56:40 +0900945 }
946
Jian Li7b63fe62019-04-14 21:49:27 +0900947 private class InternalK8sEndpointsListener implements K8sEndpointsListener {
948
949 private boolean isRelevantHelper() {
950 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
951 }
952
953 @Override
954 public void event(K8sEndpointsEvent event) {
955 Endpoints endpoints = event.subject();
956
957 switch (event.type()) {
958 case K8S_ENDPOINTS_CREATED:
Jian Libf562c22019-04-15 18:07:14 +0900959 case K8S_ENDPOINTS_UPDATED:
Jian Li7b63fe62019-04-14 21:49:27 +0900960 eventExecutor.execute(() -> processEndpointsCreation(endpoints));
961 break;
962 case K8S_ENDPOINTS_REMOVED:
963 eventExecutor.execute(() -> processEndpointsRemoval(endpoints));
964 break;
965 default:
966 break;
967 }
968 }
969
970 private void processEndpointsCreation(Endpoints endpoints) {
971 if (!isRelevantHelper()) {
972 return;
973 }
974
Jian Libf562c22019-04-15 18:07:14 +0900975 setEndpointsRules(endpoints, true);
Jian Li7b63fe62019-04-14 21:49:27 +0900976 }
977
978 private void processEndpointsRemoval(Endpoints endpoints) {
979 if (!isRelevantHelper()) {
980 return;
981 }
982
Jian Libf562c22019-04-15 18:07:14 +0900983 setEndpointsRules(endpoints, false);
Jian Li4a7ce672019-04-09 15:20:25 +0900984 }
985 }
986
Jian Li2cc2b632019-02-18 00:56:40 +0900987 private class InternalNodeEventListener implements K8sNodeListener {
988
989 private boolean isRelevantHelper() {
990 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
991 }
992
993 @Override
994 public void event(K8sNodeEvent event) {
995 K8sNode k8sNode = event.subject();
996 switch (event.type()) {
997 case K8S_NODE_COMPLETE:
998 eventExecutor.execute(() -> processNodeCompletion(k8sNode));
999 break;
1000 case K8S_NODE_INCOMPLETE:
Jian Li4a7ce672019-04-09 15:20:25 +09001001 case K8S_NODE_REMOVED:
Jian Li2cc2b632019-02-18 00:56:40 +09001002 default:
1003 break;
1004 }
1005 }
1006
1007 private void processNodeCompletion(K8sNode node) {
1008 if (!isRelevantHelper()) {
1009 return;
1010 }
1011
Jian Li4a7ce672019-04-09 15:20:25 +09001012 setServiceNatRules(node.intgBridge(), true);
Jian Libf562c22019-04-15 18:07:14 +09001013 k8sEndpointsService.endpointses().forEach(e -> setEndpointsRules(e, true));
Jian Lif5da78a2019-04-15 01:52:23 +09001014 k8sNetworkService.networks().forEach(n -> setupServiceDefaultRule(n, true));
1015 }
1016 }
1017
1018 private class InternalK8sNetworkListener implements K8sNetworkListener {
1019
1020 private boolean isRelevantHelper() {
1021 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
1022 }
1023
1024 @Override
1025 public void event(K8sNetworkEvent event) {
1026 switch (event.type()) {
1027 case K8S_NETWORK_CREATED:
1028 eventExecutor.execute(() -> processNetworkCreation(event.subject()));
1029 break;
1030 case K8S_NETWORK_UPDATED:
1031 case K8S_NETWORK_REMOVED:
1032 default:
1033 break;
1034 }
1035 }
1036
1037 private void processNetworkCreation(K8sNetwork network) {
1038 if (!isRelevantHelper()) {
1039 return;
1040 }
1041
1042 setupServiceDefaultRule(network, true);
Jian Li2cc2b632019-02-18 00:56:40 +09001043 }
1044 }
1045}