blob: 18b9a11bef56e975f4954a1344b9e8e23a262691 [file] [log] [blame]
Jian Li2cc2b632019-02-18 00:56:40 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.Lists;
Jian Li004526d2019-02-25 16:26:27 +090019import com.google.common.collect.Maps;
Jian Li2cc2b632019-02-18 00:56:40 +090020import io.fabric8.kubernetes.api.model.EndpointAddress;
21import io.fabric8.kubernetes.api.model.EndpointPort;
22import io.fabric8.kubernetes.api.model.EndpointSubset;
23import io.fabric8.kubernetes.api.model.Endpoints;
Jian Lie1a5b8f2019-07-23 17:13:19 +090024import io.fabric8.kubernetes.api.model.Pod;
Jian Li2cc2b632019-02-18 00:56:40 +090025import io.fabric8.kubernetes.api.model.Service;
Jian Li5e8a22a2019-02-27 11:48:42 +090026import io.fabric8.kubernetes.api.model.ServicePort;
Jian Li2cc2b632019-02-18 00:56:40 +090027import org.onlab.packet.Ethernet;
28import org.onlab.packet.IPv4;
29import org.onlab.packet.IpAddress;
30import org.onlab.packet.IpPrefix;
Jian Lif5da78a2019-04-15 01:52:23 +090031import org.onlab.packet.MacAddress;
Jian Li2cc2b632019-02-18 00:56:40 +090032import org.onlab.packet.TpPort;
Jian Li004526d2019-02-25 16:26:27 +090033import org.onlab.util.Tools;
34import org.onosproject.cfg.ComponentConfigService;
Jian Li2cc2b632019-02-18 00:56:40 +090035import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.LeadershipService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
40import org.onosproject.core.GroupId;
Jian Li7b63fe62019-04-14 21:49:27 +090041import org.onosproject.k8snetworking.api.K8sEndpointsEvent;
42import org.onosproject.k8snetworking.api.K8sEndpointsListener;
Jian Li2cc2b632019-02-18 00:56:40 +090043import org.onosproject.k8snetworking.api.K8sEndpointsService;
44import org.onosproject.k8snetworking.api.K8sFlowRuleService;
45import org.onosproject.k8snetworking.api.K8sGroupRuleService;
Jian Lif5da78a2019-04-15 01:52:23 +090046import org.onosproject.k8snetworking.api.K8sNetwork;
47import org.onosproject.k8snetworking.api.K8sNetworkEvent;
48import org.onosproject.k8snetworking.api.K8sNetworkListener;
Jian Li2cc2b632019-02-18 00:56:40 +090049import org.onosproject.k8snetworking.api.K8sNetworkService;
Jian Lie1a5b8f2019-07-23 17:13:19 +090050import org.onosproject.k8snetworking.api.K8sPodService;
Jian Li2cc2b632019-02-18 00:56:40 +090051import org.onosproject.k8snetworking.api.K8sServiceEvent;
52import org.onosproject.k8snetworking.api.K8sServiceListener;
53import org.onosproject.k8snetworking.api.K8sServiceService;
54import org.onosproject.k8snetworking.util.RulePopulatorUtil;
55import org.onosproject.k8snetworking.util.RulePopulatorUtil.NiciraConnTrackTreatmentBuilder;
56import org.onosproject.k8snode.api.K8sNode;
57import org.onosproject.k8snode.api.K8sNodeEvent;
58import org.onosproject.k8snode.api.K8sNodeListener;
59import org.onosproject.k8snode.api.K8sNodeService;
60import org.onosproject.net.DeviceId;
Jian Lif5da78a2019-04-15 01:52:23 +090061import org.onosproject.net.PortNumber;
Jian Li2cc2b632019-02-18 00:56:40 +090062import org.onosproject.net.device.DeviceService;
63import org.onosproject.net.driver.DriverService;
64import org.onosproject.net.flow.DefaultTrafficSelector;
65import org.onosproject.net.flow.DefaultTrafficTreatment;
66import org.onosproject.net.flow.TrafficSelector;
67import org.onosproject.net.flow.TrafficTreatment;
68import org.onosproject.net.flow.criteria.ExtensionSelector;
69import org.onosproject.net.flow.instructions.ExtensionTreatment;
70import org.onosproject.net.group.GroupBucket;
Jian Li2cc2b632019-02-18 00:56:40 +090071import org.onosproject.store.service.StorageService;
Jian Li004526d2019-02-25 16:26:27 +090072import org.osgi.service.component.ComponentContext;
Jian Li2cc2b632019-02-18 00:56:40 +090073import org.osgi.service.component.annotations.Activate;
74import org.osgi.service.component.annotations.Component;
75import org.osgi.service.component.annotations.Deactivate;
Jian Li004526d2019-02-25 16:26:27 +090076import org.osgi.service.component.annotations.Modified;
Jian Li2cc2b632019-02-18 00:56:40 +090077import org.osgi.service.component.annotations.Reference;
78import org.osgi.service.component.annotations.ReferenceCardinality;
79import org.slf4j.Logger;
80
Jian Li004526d2019-02-25 16:26:27 +090081import java.util.Dictionary;
Jian Li2cc2b632019-02-18 00:56:40 +090082import java.util.List;
83import java.util.Map;
84import java.util.Objects;
Jian Li004526d2019-02-25 16:26:27 +090085import java.util.Set;
Jian Li2cc2b632019-02-18 00:56:40 +090086import java.util.concurrent.ExecutorService;
87import java.util.stream.Collectors;
88
89import static java.util.concurrent.Executors.newSingleThreadExecutor;
90import static org.onlab.util.Tools.groupedThreads;
Jian Li73d3b6a2019-07-08 18:07:53 +090091import static org.onosproject.k8snetworking.api.Constants.ACL_TABLE;
Jian Li140d8a22019-04-24 23:41:44 +090092import static org.onosproject.k8snetworking.api.Constants.A_CLASS;
93import static org.onosproject.k8snetworking.api.Constants.B_CLASS;
Jian Li5e8a22a2019-02-27 11:48:42 +090094import static org.onosproject.k8snetworking.api.Constants.DST;
Jian Li73d3b6a2019-07-08 18:07:53 +090095import static org.onosproject.k8snetworking.api.Constants.GROUPING_TABLE;
Jian Li2cc2b632019-02-18 00:56:40 +090096import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Lie1a5b8f2019-07-23 17:13:19 +090097import static org.onosproject.k8snetworking.api.Constants.NAMESPACE_TABLE;
Jian Li004526d2019-02-25 16:26:27 +090098import static org.onosproject.k8snetworking.api.Constants.NAT_STATEFUL;
99import static org.onosproject.k8snetworking.api.Constants.NAT_STATELESS;
Jian Li2cc2b632019-02-18 00:56:40 +0900100import static org.onosproject.k8snetworking.api.Constants.NAT_TABLE;
Jian Li140d8a22019-04-24 23:41:44 +0900101import static org.onosproject.k8snetworking.api.Constants.NODE_IP_PREFIX;
Jian Li004526d2019-02-25 16:26:27 +0900102import static org.onosproject.k8snetworking.api.Constants.POD_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900103import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE;
Jian Li2cc2b632019-02-18 00:56:40 +0900104import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE;
Jian Li7d111d72019-04-12 13:58:44 +0900105import static org.onosproject.k8snetworking.api.Constants.PRIORITY_INTER_ROUTING_RULE;
Jian Li004526d2019-02-25 16:26:27 +0900106import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAT_RULE;
Jian Li2cc2b632019-02-18 00:56:40 +0900107import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900108import static org.onosproject.k8snetworking.api.Constants.SERVICE_FAKE_MAC_STR;
Jian Li004526d2019-02-25 16:26:27 +0900109import static org.onosproject.k8snetworking.api.Constants.SERVICE_TABLE;
110import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_CIDR;
111import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
Jian Li5e8a22a2019-02-27 11:48:42 +0900112import static org.onosproject.k8snetworking.api.Constants.SRC;
Jian Lie1a5b8f2019-07-23 17:13:19 +0900113import static org.onosproject.k8snetworking.api.Constants.STAT_EGRESS_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900114import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_CIDR;
115import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_CIDR_DEFAULT;
Jian Li004526d2019-02-25 16:26:27 +0900116import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE;
117import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE_DEFAULT;
Jian Li140d8a22019-04-24 23:41:44 +0900118import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getBclassIpPrefixFromCidr;
Jian Li2cc2b632019-02-18 00:56:40 +0900119import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap;
Jian Lie1a5b8f2019-07-23 17:13:19 +0900120import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.podByIp;
121import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.portNumberByName;
Jian Lif5da78a2019-04-15 01:52:23 +0900122import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId;
Jian Li2cc2b632019-02-18 00:56:40 +0900123import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_DST_FLAG;
Jian Lif5da78a2019-04-15 01:52:23 +0900124import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension;
Jian Li2cc2b632019-02-18 00:56:40 +0900125import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildGroupBucket;
Jian Li004526d2019-02-25 16:26:27 +0900126import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildLoadExtension;
Jian Li2cc2b632019-02-18 00:56:40 +0900127import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildResubmitExtension;
128import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtMaskFlag;
129import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtStateFlag;
130import static org.onosproject.k8snetworking.util.RulePopulatorUtil.niciraConnTrackTreatmentBuilder;
131import static org.onosproject.net.group.GroupDescription.Type.SELECT;
132import static org.slf4j.LoggerFactory.getLogger;
133
134/**
135 * Handles the service IP to pod IP related translation traffic.
136 */
Jian Li004526d2019-02-25 16:26:27 +0900137@Component(
138 immediate = true,
139 property = {
Jian Lif5da78a2019-04-15 01:52:23 +0900140 SERVICE_IP_NAT_MODE + "=" + SERVICE_IP_NAT_MODE_DEFAULT,
141 SERVICE_CIDR + "=" + SERVICE_IP_CIDR_DEFAULT
Jian Li004526d2019-02-25 16:26:27 +0900142 }
143)
Jian Li2cc2b632019-02-18 00:56:40 +0900144public class K8sServiceHandler {
145
146 private final Logger log = getLogger(getClass());
147
Jian Li2cc2b632019-02-18 00:56:40 +0900148 private static final int HOST_CIDR_NUM = 32;
149
Jian Li2cc2b632019-02-18 00:56:40 +0900150 private static final String CLUSTER_IP = "ClusterIP";
151 private static final String TCP = "TCP";
Jian Li5e8a22a2019-02-27 11:48:42 +0900152 private static final String UDP = "UDP";
Jian Lif5da78a2019-04-15 01:52:23 +0900153 private static final String SERVICE_IP_NAT_MODE = "serviceIpNatMode";
Jian Li2cc2b632019-02-18 00:56:40 +0900154
Jian Li140d8a22019-04-24 23:41:44 +0900155 private static final String SERVICE_CIDR = "serviceCidr";
Jian Li44c2b122019-05-03 14:46:34 +0900156 private static final String NONE = "None";
Jian Li140d8a22019-04-24 23:41:44 +0900157 private static final String B_CLASS_SUFFIX = ".0.0/16";
158 private static final String A_CLASS_SUFFIX = ".0.0.0/8";
159
Jian Li2cc2b632019-02-18 00:56:40 +0900160 @Reference(cardinality = ReferenceCardinality.MANDATORY)
161 protected CoreService coreService;
162
163 @Reference(cardinality = ReferenceCardinality.MANDATORY)
164 protected LeadershipService leadershipService;
165
166 @Reference(cardinality = ReferenceCardinality.MANDATORY)
167 protected ClusterService clusterService;
168
169 @Reference(cardinality = ReferenceCardinality.MANDATORY)
170 protected DriverService driverService;
171
172 @Reference(cardinality = ReferenceCardinality.MANDATORY)
173 protected DeviceService deviceService;
174
175 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li004526d2019-02-25 16:26:27 +0900176 protected ComponentConfigService configService;
177
178 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li2cc2b632019-02-18 00:56:40 +0900179 protected StorageService storageService;
180
181 @Reference(cardinality = ReferenceCardinality.MANDATORY)
182 protected K8sNetworkService k8sNetworkService;
183
184 @Reference(cardinality = ReferenceCardinality.MANDATORY)
185 protected K8sFlowRuleService k8sFlowRuleService;
186
187 @Reference(cardinality = ReferenceCardinality.MANDATORY)
188 protected K8sGroupRuleService k8sGroupRuleService;
189
190 @Reference(cardinality = ReferenceCardinality.MANDATORY)
191 protected K8sNodeService k8sNodeService;
192
193 @Reference(cardinality = ReferenceCardinality.MANDATORY)
194 protected K8sEndpointsService k8sEndpointsService;
195
196 @Reference(cardinality = ReferenceCardinality.MANDATORY)
197 protected K8sServiceService k8sServiceService;
198
Jian Lie1a5b8f2019-07-23 17:13:19 +0900199 @Reference(cardinality = ReferenceCardinality.MANDATORY)
200 protected K8sPodService k8sPodService;
Jian Li4a7ce672019-04-09 15:20:25 +0900201
Jian Li004526d2019-02-25 16:26:27 +0900202 /** Service IP address translation mode. */
203 private String serviceIpNatMode = SERVICE_IP_NAT_MODE_DEFAULT;
204
Jian Lif5da78a2019-04-15 01:52:23 +0900205 /** Ranges of IP address of service VIP. */
206 private String serviceCidr = SERVICE_IP_CIDR_DEFAULT;
207
Jian Li2cc2b632019-02-18 00:56:40 +0900208 private final ExecutorService eventExecutor = newSingleThreadExecutor(
209 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
210 private final InternalNodeEventListener internalNodeEventListener =
211 new InternalNodeEventListener();
212 private final InternalK8sServiceListener internalK8sServiceListener =
213 new InternalK8sServiceListener();
Jian Li7b63fe62019-04-14 21:49:27 +0900214 private final InternalK8sEndpointsListener internalK8sEndpointsListener =
215 new InternalK8sEndpointsListener();
Jian Lif5da78a2019-04-15 01:52:23 +0900216 private final InternalK8sNetworkListener internalK8sNetworkListener =
217 new InternalK8sNetworkListener();
Jian Li2cc2b632019-02-18 00:56:40 +0900218
Jian Li2cc2b632019-02-18 00:56:40 +0900219 private ApplicationId appId;
220 private NodeId localNodeId;
221
222 @Activate
223 protected void activate() {
224 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
Jian Li004526d2019-02-25 16:26:27 +0900225 configService.registerProperties(getClass());
Jian Li2cc2b632019-02-18 00:56:40 +0900226 localNodeId = clusterService.getLocalNode().id();
227 leadershipService.runForLeadership(appId.name());
228 k8sNodeService.addListener(internalNodeEventListener);
229 k8sServiceService.addListener(internalK8sServiceListener);
Jian Li7b63fe62019-04-14 21:49:27 +0900230 k8sEndpointsService.addListener(internalK8sEndpointsListener);
Jian Lif5da78a2019-04-15 01:52:23 +0900231 k8sNetworkService.addListener(internalK8sNetworkListener);
Jian Li2cc2b632019-02-18 00:56:40 +0900232
Jian Li2cc2b632019-02-18 00:56:40 +0900233 log.info("Started");
234 }
235
236 @Deactivate
237 protected void deactivate() {
238 leadershipService.withdraw(appId.name());
239 k8sNodeService.removeListener(internalNodeEventListener);
240 k8sServiceService.removeListener(internalK8sServiceListener);
Jian Li7b63fe62019-04-14 21:49:27 +0900241 k8sEndpointsService.removeListener(internalK8sEndpointsListener);
Jian Lif5da78a2019-04-15 01:52:23 +0900242 k8sNetworkService.removeListener(internalK8sNetworkListener);
Jian Li004526d2019-02-25 16:26:27 +0900243 configService.unregisterProperties(getClass(), false);
Jian Li2cc2b632019-02-18 00:56:40 +0900244 eventExecutor.shutdown();
245
246 log.info("Stopped");
247 }
248
Jian Li004526d2019-02-25 16:26:27 +0900249 @Modified
250 void modified(ComponentContext context) {
251 readComponentConfiguration(context);
252
253 log.info("Modified");
254 }
255
256 private void setStatefulServiceNatRules(DeviceId deviceId, boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900257 // -trk CT rules
258 long ctUntrack = computeCtStateFlag(false, false, false);
259 long ctMaskUntrack = computeCtMaskFlag(true, false, false);
260
261 k8sNetworkService.networks().forEach(n -> {
262 // TODO: need to provide a way to add multiple service IP CIDR ranges
Jian Lif5da78a2019-04-15 01:52:23 +0900263 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), serviceCidr,
Jian Li73d3b6a2019-07-08 18:07:53 +0900264 GROUPING_TABLE, NAT_TABLE, PRIORITY_CT_RULE, install);
Jian Li2cc2b632019-02-18 00:56:40 +0900265 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), n.cidr(),
Jian Lie1a5b8f2019-07-23 17:13:19 +0900266 GROUPING_TABLE, NAMESPACE_TABLE, PRIORITY_CT_RULE, install);
Jian Li2cc2b632019-02-18 00:56:40 +0900267 });
268
269 // +trk-new CT rules
270 long ctTrackUnnew = computeCtStateFlag(true, false, false);
271 long ctMaskTrackUnnew = computeCtMaskFlag(true, true, false);
272
273 setTrackEstablish(deviceId, ctTrackUnnew, ctMaskTrackUnnew,
274 NAT_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
275
276 // +trk+new CT rules
277 long ctTrackNew = computeCtStateFlag(true, true, false);
278 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
279
280 k8sServiceService.services().stream()
281 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
Jian Li004526d2019-02-25 16:26:27 +0900282 .forEach(s -> setStatefulGroupFlowRules(deviceId, ctTrackNew,
283 ctMaskTrackNew, s, install));
Jian Li2cc2b632019-02-18 00:56:40 +0900284 }
285
Jian Li004526d2019-02-25 16:26:27 +0900286 private void setStatelessServiceNatRules(DeviceId deviceId, boolean install) {
287
Jian Li140d8a22019-04-24 23:41:44 +0900288 String srcPodCidr = k8sNetworkService.network(
Jian Li7d111d72019-04-12 13:58:44 +0900289 k8sNodeService.node(deviceId).hostname()).cidr();
Jian Li140d8a22019-04-24 23:41:44 +0900290 String srcPodPrefix = getBclassIpPrefixFromCidr(srcPodCidr);
291 String fullSrcPodCidr = srcPodPrefix + B_CLASS_SUFFIX;
292 String fullSrcNodeCidr = NODE_IP_PREFIX + A_CLASS_SUFFIX;
293
294 // src: POD -> dst: service (unNAT POD) grouping
295 setSrcDstCidrRules(deviceId, fullSrcPodCidr, serviceCidr, B_CLASS, null,
Jian Li73d3b6a2019-07-08 18:07:53 +0900296 SHIFTED_IP_PREFIX, SRC, GROUPING_TABLE, SERVICE_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900297 PRIORITY_CT_RULE, install);
298 // src: POD (unNAT service) -> dst: shifted POD grouping
299 setSrcDstCidrRules(deviceId, fullSrcPodCidr, SHIFTED_IP_CIDR, B_CLASS, null,
Jian Li73d3b6a2019-07-08 18:07:53 +0900300 srcPodPrefix, DST, GROUPING_TABLE, POD_TABLE, PRIORITY_CT_RULE, install);
Jian Li140d8a22019-04-24 23:41:44 +0900301
302 // src: node -> dst: service (unNAT POD) grouping
303 setSrcDstCidrRules(deviceId, fullSrcNodeCidr, serviceCidr, A_CLASS,
Jian Li73d3b6a2019-07-08 18:07:53 +0900304 null, null, null, GROUPING_TABLE, SERVICE_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900305 PRIORITY_CT_RULE, install);
306 // src: POD (unNAT service) -> dst: node grouping
307 setSrcDstCidrRules(deviceId, fullSrcPodCidr, fullSrcNodeCidr, A_CLASS,
Jian Li73d3b6a2019-07-08 18:07:53 +0900308 null, null, null, GROUPING_TABLE, POD_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900309 PRIORITY_CT_RULE, install);
Jian Li7d111d72019-04-12 13:58:44 +0900310
Jian Li004526d2019-02-25 16:26:27 +0900311 k8sNetworkService.networks().forEach(n -> {
Jian Li140d8a22019-04-24 23:41:44 +0900312 setSrcDstCidrRules(deviceId, fullSrcPodCidr, n.cidr(), B_CLASS,
313 n.segmentId(), null, null, ROUTING_TABLE,
Jian Li73d3b6a2019-07-08 18:07:53 +0900314 STAT_EGRESS_TABLE, PRIORITY_INTER_ROUTING_RULE, install);
Jian Li004526d2019-02-25 16:26:27 +0900315 });
316
317 // setup load balancing rules using group table
318 k8sServiceService.services().stream()
319 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
320 .forEach(s -> setStatelessGroupFlowRules(deviceId, s, install));
321 }
322
323 private void setSrcDstCidrRules(DeviceId deviceId, String srcCidr,
Jian Li140d8a22019-04-24 23:41:44 +0900324 String dstCidr, String cidrClass,
325 String segId, String shiftPrefix,
326 String shiftType, int installTable,
327 int transitTable, int priority,
328 boolean install) {
Jian Li004526d2019-02-25 16:26:27 +0900329 TrafficSelector selector = DefaultTrafficSelector.builder()
330 .matchEthType(Ethernet.TYPE_IPV4)
331 .matchIPSrc(IpPrefix.valueOf(srcCidr))
332 .matchIPDst(IpPrefix.valueOf(dstCidr))
333 .build();
334
Jian Li7d111d72019-04-12 13:58:44 +0900335 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
336
337 if (segId != null) {
338 tBuilder.setTunnelId(Long.valueOf(segId));
339 }
Jian Li140d8a22019-04-24 23:41:44 +0900340
341 if (shiftPrefix != null && shiftType != null) {
342 ExtensionTreatment loadTreatment = buildLoadExtension(
343 deviceService.getDevice(deviceId), cidrClass, shiftType, shiftPrefix);
344 tBuilder.extension(loadTreatment, deviceId);
345 }
346
Jian Li7d111d72019-04-12 13:58:44 +0900347 tBuilder.transition(transitTable);
Jian Li004526d2019-02-25 16:26:27 +0900348
349 k8sFlowRuleService.setRule(
350 appId,
351 deviceId,
352 selector,
Jian Li7d111d72019-04-12 13:58:44 +0900353 tBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900354 priority,
355 installTable,
356 install);
357 }
358
Jian Li5e8a22a2019-02-27 11:48:42 +0900359 /**
360 * Obtains the service port to endpoint address paired map.
361 *
362 * @param service kubernetes service
363 * @return a map where key is kubernetes service port, and value is the
364 * endpoint addresses that are associated with the service port
365 */
366 private Map<ServicePort, Set<String>> getSportEpAddressMap(Service service) {
367
368 Map<ServicePort, Set<String>> map = Maps.newConcurrentMap();
Jian Li004526d2019-02-25 16:26:27 +0900369
370 String serviceName = service.getMetadata().getName();
Jian Li004526d2019-02-25 16:26:27 +0900371 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
372 .stream()
373 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
374 .collect(Collectors.toList());
375
Jian Li5e8a22a2019-02-27 11:48:42 +0900376 service.getSpec().getPorts().stream()
377 .filter(Objects::nonNull)
378 .filter(sp -> sp.getTargetPort() != null)
Jian Lib7dfb5b2019-07-15 17:37:12 +0900379 .filter(sp -> sp.getTargetPort().getIntVal() != null ||
Jian Lie1a5b8f2019-07-23 17:13:19 +0900380 sp.getTargetPort().getStrVal() != null)
Jian Li5e8a22a2019-02-27 11:48:42 +0900381 .forEach(sp -> {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900382 Integer targetPortInt = sp.getTargetPort().getIntVal() != null ?
383 sp.getTargetPort().getIntVal() : 0;
384 String targetPortName = sp.getTargetPort().getStrVal() != null ?
385 sp.getTargetPort().getStrVal() : "";
386 String targetProtocol = sp.getProtocol();
Jian Li004526d2019-02-25 16:26:27 +0900387
Jian Lie1a5b8f2019-07-23 17:13:19 +0900388 for (Endpoints endpoints : endpointses) {
389 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
390
391 // in case service port name is specified but not port number
392 // we will lookup the container port number and use it
393 // as the target port number
394 if (!targetPortName.equals("") && targetPortInt == 0) {
395 for (EndpointAddress addr : endpointSubset.getAddresses()) {
396 Pod pod = podByIp(k8sPodService, addr.getIp());
397 targetPortInt = portNumberByName(pod, targetPortName);
398 }
399 }
400
Jian Li5cf3b002019-08-30 17:57:53 +0900401 if (targetPortInt == 0) {
402 continue;
403 }
404
Jian Lie1a5b8f2019-07-23 17:13:19 +0900405 for (EndpointPort endpointPort : endpointSubset.getPorts()) {
406 if (targetProtocol.equals(endpointPort.getProtocol()) &&
407 (targetPortInt.equals(endpointPort.getPort()) ||
408 targetPortName.equals(endpointPort.getName()))) {
409 Set<String> addresses = endpointSubset.getAddresses()
410 .stream().map(EndpointAddress::getIp)
411 .collect(Collectors.toSet());
412 map.put(sp, addresses);
413 }
414 }
Jian Li004526d2019-02-25 16:26:27 +0900415 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900416 }
Jian Lie1a5b8f2019-07-23 17:13:19 +0900417 });
Jian Li004526d2019-02-25 16:26:27 +0900418
Jian Li5e8a22a2019-02-27 11:48:42 +0900419 return map;
420 }
421
Jian Libf562c22019-04-15 18:07:14 +0900422 private void setGroupBuckets(Service service, boolean install) {
Jian Li4a7ce672019-04-09 15:20:25 +0900423 Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
424 Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
Jian Li7b63fe62019-04-14 21:49:27 +0900425 Map<String, String> nodeIpGatewayIpMap =
426 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
Jian Li4a7ce672019-04-09 15:20:25 +0900427
Jian Libf562c22019-04-15 18:07:14 +0900428 for (K8sNode node : k8sNodeService.completeNodes()) {
429 spEpasMap.forEach((sp, epas) -> {
430 List<GroupBucket> bkts = Lists.newArrayList();
Jian Li4a7ce672019-04-09 15:20:25 +0900431
Jian Libf562c22019-04-15 18:07:14 +0900432 for (String ip : epas) {
Jian Li5cf3b002019-08-30 17:57:53 +0900433 GroupBucket bkt = buildBuckets(node.intgBridge(),
434 nodeIpGatewayIpMap.getOrDefault(ip, ip), sp);
435
436 if (bkt == null) {
437 continue;
438 }
439
Jian Libf562c22019-04-15 18:07:14 +0900440 if (install) {
Jian Li5cf3b002019-08-30 17:57:53 +0900441 bkts.add(bkt);
Jian Libf562c22019-04-15 18:07:14 +0900442 } else {
Jian Li5cf3b002019-08-30 17:57:53 +0900443 bkts.remove(bkt);
Jian Li7b63fe62019-04-14 21:49:27 +0900444 }
Jian Libf562c22019-04-15 18:07:14 +0900445 }
446
447 spGrpBkts.put(sp, bkts);
448 });
449
450 String serviceIp = service.getSpec().getClusterIP();
451 spGrpBkts.forEach((sp, bkts) -> {
452 String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
453 int groupId = svcStr.hashCode();
454
455 if (bkts.size() > 0) {
456 k8sGroupRuleService.setBuckets(appId, node.intgBridge(), groupId, bkts);
457 }
458 });
459
460 spEpasMap.forEach((sp, epas) ->
Jian Lie1a5b8f2019-07-23 17:13:19 +0900461 // add flow rules for unshifting IP domain
462 epas.forEach(epa -> {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900463
Jian Lie1a5b8f2019-07-23 17:13:19 +0900464 String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
465
466 int targetPort;
467 if (sp.getTargetPort().getIntVal() == null) {
468 Pod pod = podByIp(k8sPodService, podIp);
469 targetPort = portNumberByName(pod, sp.getTargetPort().getStrVal());
470 } else {
471 targetPort = sp.getTargetPort().getIntVal();
472 }
473
Jian Li5cf3b002019-08-30 17:57:53 +0900474 if (targetPort != 0) {
475 setUnshiftDomainRules(node.intgBridge(), POD_TABLE,
476 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
477 sp.getProtocol(), podIp,
478 targetPort, install);
479 }
Jian Lie1a5b8f2019-07-23 17:13:19 +0900480 })
Jian Libf562c22019-04-15 18:07:14 +0900481 );
482 }
Jian Li4a7ce672019-04-09 15:20:25 +0900483 }
484
Jian Lib7dfb5b2019-07-15 17:37:12 +0900485 private GroupBucket buildBuckets(DeviceId deviceId, String podIpStr, ServicePort sp) {
Jian Li4a7ce672019-04-09 15:20:25 +0900486 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li7d111d72019-04-12 13:58:44 +0900487 .setIpDst(IpAddress.valueOf(podIpStr));
Jian Li4a7ce672019-04-09 15:20:25 +0900488
Jian Lie1a5b8f2019-07-23 17:13:19 +0900489 int targetPort;
490 if (sp.getTargetPort().getIntVal() == null) {
491 Pod pod = podByIp(k8sPodService, podIpStr);
492 targetPort = portNumberByName(pod, sp.getTargetPort().getStrVal());
493 } else {
494 targetPort = sp.getTargetPort().getIntVal();
495 }
Jian Lib7dfb5b2019-07-15 17:37:12 +0900496
Jian Li5cf3b002019-08-30 17:57:53 +0900497 if (targetPort == 0) {
498 return null;
499 }
500
Jian Li4a7ce672019-04-09 15:20:25 +0900501 if (TCP.equals(sp.getProtocol())) {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900502 tBuilder.setTcpDst(TpPort.tpPort(targetPort));
Jian Li4a7ce672019-04-09 15:20:25 +0900503 } else if (UDP.equals(sp.getProtocol())) {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900504 tBuilder.setUdpDst(TpPort.tpPort(targetPort));
Jian Li4a7ce672019-04-09 15:20:25 +0900505 }
506
Jian Li7d111d72019-04-12 13:58:44 +0900507 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
Jian Li73d3b6a2019-07-08 18:07:53 +0900508 deviceService.getDevice(deviceId), ACL_TABLE);
Jian Li7d111d72019-04-12 13:58:44 +0900509 tBuilder.extension(resubmitTreatment, deviceId);
510
Jian Li5cf3b002019-08-30 17:57:53 +0900511 // TODO: need to adjust group bucket weight by considering POD locality
Jian Libf562c22019-04-15 18:07:14 +0900512 return buildGroupBucket(tBuilder.build(), SELECT, (short) -1);
Jian Li4a7ce672019-04-09 15:20:25 +0900513 }
514
515 private synchronized void setStatelessGroupFlowRules(DeviceId deviceId,
516 Service service,
517 boolean install) {
Jian Li7b63fe62019-04-14 21:49:27 +0900518 Set<ServicePort> sps = service.getSpec().getPorts().stream()
519 .filter(Objects::nonNull)
520 .filter(sp -> sp.getTargetPort() != null)
Jian Lib7dfb5b2019-07-15 17:37:12 +0900521 .filter(sp -> sp.getTargetPort().getIntVal() != null ||
Jian Lie1a5b8f2019-07-23 17:13:19 +0900522 sp.getTargetPort().getStrVal() != null)
Jian Li7b63fe62019-04-14 21:49:27 +0900523 .collect(Collectors.toSet());
Jian Li5e8a22a2019-02-27 11:48:42 +0900524
525 String serviceIp = service.getSpec().getClusterIP();
Jian Li7b63fe62019-04-14 21:49:27 +0900526 sps.forEach(sp -> {
Jian Li5e8a22a2019-02-27 11:48:42 +0900527 String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
Jian Li4a7ce672019-04-09 15:20:25 +0900528 int groupId = svcStr.hashCode();
Jian Li5e8a22a2019-02-27 11:48:42 +0900529
Jian Li4a7ce672019-04-09 15:20:25 +0900530 if (install) {
531
532 // add group table rules
533 k8sGroupRuleService.setRule(appId, deviceId, groupId,
Jian Li7b63fe62019-04-14 21:49:27 +0900534 SELECT, Lists.newArrayList(), true);
Jian Li4a7ce672019-04-09 15:20:25 +0900535
536 log.info("Adding group rule {}", groupId);
537
538 // if we failed to add group rule, we will not install flow rules
539 // as this might cause rule inconsistency
540 if (k8sGroupRuleService.hasGroup(deviceId, groupId)) {
541 // add flow rules for shifting IP domain
542 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
543 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
544 sp.getProtocol(), true);
545 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900546 } else {
Jian Li4a7ce672019-04-09 15:20:25 +0900547 // remove flow rules for shifting IP domain
548 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
549 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
550 sp.getProtocol(), false);
551
552 // remove group table rules
553 k8sGroupRuleService.setRule(appId, deviceId, groupId,
Jian Li7b63fe62019-04-14 21:49:27 +0900554 SELECT, Lists.newArrayList(), false);
Jian Li4a7ce672019-04-09 15:20:25 +0900555
556 log.info("Removing group rule {}", groupId);
Jian Li5e8a22a2019-02-27 11:48:42 +0900557 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900558 });
Jian Li004526d2019-02-25 16:26:27 +0900559 }
560
561 private void setShiftDomainRules(DeviceId deviceId, int installTable,
Jian Li5e8a22a2019-02-27 11:48:42 +0900562 int groupId, int priority, String serviceIp,
563 int servicePort, String protocol, boolean install) {
Jian Li44c2b122019-05-03 14:46:34 +0900564
565 if (serviceIp == null || NONE.equals(serviceIp)) {
566 return;
567 }
568
Jian Li5e8a22a2019-02-27 11:48:42 +0900569 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Li004526d2019-02-25 16:26:27 +0900570 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li5e8a22a2019-02-27 11:48:42 +0900571 .matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(serviceIp), HOST_CIDR_NUM));
572
573 if (TCP.equals(protocol)) {
574 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
575 .matchTcpDst(TpPort.tpPort(servicePort));
576 } else if (UDP.equals(protocol)) {
577 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
578 .matchUdpDst(TpPort.tpPort(servicePort));
579 }
Jian Li004526d2019-02-25 16:26:27 +0900580
Jian Li004526d2019-02-25 16:26:27 +0900581 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
Jian Li004526d2019-02-25 16:26:27 +0900582 .group(GroupId.valueOf(groupId))
583 .build();
584
585 k8sFlowRuleService.setRule(
586 appId,
587 deviceId,
Jian Li5e8a22a2019-02-27 11:48:42 +0900588 sBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900589 treatment,
590 priority,
591 installTable,
592 install);
593 }
594
595 private void setUnshiftDomainRules(DeviceId deviceId, int installTable,
Jian Li5e8a22a2019-02-27 11:48:42 +0900596 int priority, String serviceIp,
597 int servicePort, String protocol,
598 String podIp, int podPort, boolean install) {
599 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Li004526d2019-02-25 16:26:27 +0900600 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li5e8a22a2019-02-27 11:48:42 +0900601 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(podIp), HOST_CIDR_NUM));
602
603 if (TCP.equals(protocol)) {
604 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
605 .matchTcpSrc(TpPort.tpPort(podPort));
606 } else if (UDP.equals(protocol)) {
607 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
608 .matchUdpSrc(TpPort.tpPort(podPort));
609 }
610
Jian Li5e8a22a2019-02-27 11:48:42 +0900611 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li004526d2019-02-25 16:26:27 +0900612 .setIpSrc(IpAddress.valueOf(serviceIp))
Jian Li73d3b6a2019-07-08 18:07:53 +0900613 .transition(ACL_TABLE);
Jian Li5e8a22a2019-02-27 11:48:42 +0900614
615 if (TCP.equals(protocol)) {
616 tBuilder.setTcpSrc(TpPort.tpPort(servicePort));
617 } else if (UDP.equals(protocol)) {
618 tBuilder.setUdpSrc(TpPort.tpPort(servicePort));
619 }
Jian Li004526d2019-02-25 16:26:27 +0900620
621 k8sFlowRuleService.setRule(
622 appId,
623 deviceId,
Jian Li5e8a22a2019-02-27 11:48:42 +0900624 sBuilder.build(),
625 tBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900626 priority,
627 installTable,
628 install);
629 }
630
Jian Lif5da78a2019-04-15 01:52:23 +0900631 private void setCidrRoutingRule(IpPrefix prefix, MacAddress mac,
632 K8sNetwork network, boolean install) {
633 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
634 .matchEthType(Ethernet.TYPE_IPV4)
635 .matchIPSrc(prefix)
636 .matchIPDst(IpPrefix.valueOf(network.cidr()));
637
638 k8sNodeService.completeNodes().forEach(n -> {
639 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
640 .setTunnelId(Long.valueOf(network.segmentId()));
641
642 if (n.hostname().equals(network.name())) {
643 if (mac != null) {
644 tBuilder.setEthSrc(mac);
645 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900646 tBuilder.transition(STAT_EGRESS_TABLE);
Jian Lif5da78a2019-04-15 01:52:23 +0900647 } else {
648 PortNumber portNum = tunnelPortNumByNetId(network.networkId(),
649 k8sNetworkService, n);
650 K8sNode localNode = k8sNodeService.node(network.name());
651
652 tBuilder.extension(buildExtension(
653 deviceService,
654 n.intgBridge(),
655 localNode.dataIp().getIp4Address()),
656 n.intgBridge())
657 .setOutput(portNum);
658 }
659
660 k8sFlowRuleService.setRule(
661 appId,
662 n.intgBridge(),
663 sBuilder.build(),
664 tBuilder.build(),
665 PRIORITY_CIDR_RULE,
666 ROUTING_TABLE,
667 install
668 );
669 });
670 }
671
672 private void setupServiceDefaultRule(K8sNetwork k8sNetwork, boolean install) {
673 setCidrRoutingRule(IpPrefix.valueOf(serviceCidr),
674 MacAddress.valueOf(SERVICE_FAKE_MAC_STR), k8sNetwork, install);
675 }
676
Jian Li004526d2019-02-25 16:26:27 +0900677 private void setStatefulGroupFlowRules(DeviceId deviceId, long ctState,
678 long ctMask, Service service,
679 boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900680 List<GroupBucket> buckets = Lists.newArrayList();
681
682 String serviceName = service.getMetadata().getName();
683 String serviceIp = service.getSpec().getClusterIP();
684
685 // TODO: multi-ports case should be addressed
686 Integer servicePort = service.getSpec().getPorts().get(0).getPort();
Jian Libf562c22019-04-15 18:07:14 +0900687 String serviceProtocol = service.getSpec().getPorts().get(0).getProtocol();
688
689 String svcStr = servicePortStr(serviceIp, servicePort, serviceProtocol);
690 int groupId = svcStr.hashCode();
Jian Li2cc2b632019-02-18 00:56:40 +0900691
692 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
693 .stream()
694 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
695 .collect(Collectors.toList());
696
697 Map<String, String> nodeIpGatewayIpMap =
698 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
699
700 for (Endpoints endpoints : endpointses) {
701 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
702 List<EndpointPort> ports = endpointSubset.getPorts()
703 .stream()
704 .filter(p -> p.getProtocol().equals(TCP))
705 .collect(Collectors.toList());
706
707 for (EndpointAddress address : endpointSubset.getAddresses()) {
708 String podIp = nodeIpGatewayIpMap.containsKey(address.getIp()) ?
709 nodeIpGatewayIpMap.get(address.getIp()) : address.getIp();
710
711 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
712 niciraConnTrackTreatmentBuilder(driverService, deviceId)
713 .commit(true)
714 .natAction(true)
715 .natIp(IpAddress.valueOf(podIp))
716 .natFlag(CT_NAT_DST_FLAG);
717
718 ports.forEach(p -> {
719 ExtensionTreatment ctNatTreatment = connTreatmentBuilder
Jian Lieb488ea2019-04-16 01:50:02 +0900720 .natPortMin(TpPort.tpPort(p.getPort()))
721 .natPortMax(TpPort.tpPort(p.getPort()))
722 .build();
Jian Li2cc2b632019-02-18 00:56:40 +0900723 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
Jian Li73d3b6a2019-07-08 18:07:53 +0900724 deviceService.getDevice(deviceId), ACL_TABLE);
Jian Li2cc2b632019-02-18 00:56:40 +0900725 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
726 .extension(ctNatTreatment, deviceId)
727 .extension(resubmitTreatment, deviceId)
728 .build();
729 buckets.add(buildGroupBucket(treatment, SELECT, (short) -1));
730 });
731 }
732 }
733 }
734
735 if (!buckets.isEmpty()) {
736 k8sGroupRuleService.setRule(appId, deviceId, groupId, SELECT, buckets, install);
737
738 setTrackNew(deviceId, ctState, ctMask, IpAddress.valueOf(serviceIp),
739 TpPort.tpPort(servicePort), NAT_TABLE, groupId,
740 PRIORITY_CT_RULE, install);
741 }
742 }
743
744 private void setUntrack(DeviceId deviceId, long ctState, long ctMask,
745 String srcCidr, String dstCidr, int installTable,
746 int transitTable, int priority, boolean install) {
747 ExtensionSelector esCtSate = RulePopulatorUtil
748 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
749 TrafficSelector selector = DefaultTrafficSelector.builder()
750 .matchEthType(Ethernet.TYPE_IPV4)
751 .matchIPSrc(IpPrefix.valueOf(srcCidr))
752 .matchIPDst(IpPrefix.valueOf(dstCidr))
753 .extension(esCtSate, deviceId)
754 .build();
755
756 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
757 niciraConnTrackTreatmentBuilder(driverService, deviceId)
758 .natAction(false)
759 .commit(false)
760 .table((short) transitTable);
761
762 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
763 .extension(connTreatmentBuilder.build(), deviceId)
764 .build();
765
766 k8sFlowRuleService.setRule(
767 appId,
768 deviceId,
769 selector,
770 treatment,
771 priority,
772 installTable,
773 install);
774 }
775
776 private void setTrackNew(DeviceId deviceId, long ctState, long ctMask,
777 IpAddress dstIp, TpPort dstPort, int installTable,
778 int groupId, int priority, boolean install) {
779 ExtensionSelector esCtSate = RulePopulatorUtil
780 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
781 TrafficSelector selector = DefaultTrafficSelector.builder()
782 .matchEthType(Ethernet.TYPE_IPV4)
783 .matchIPDst(IpPrefix.valueOf(dstIp, HOST_CIDR_NUM))
784 .matchIPProtocol(IPv4.PROTOCOL_TCP)
785 .matchTcpDst(dstPort)
786 .extension(esCtSate, deviceId)
787 .build();
788 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
789 .group(GroupId.valueOf(groupId))
790 .build();
791
792 k8sFlowRuleService.setRule(
793 appId,
794 deviceId,
795 selector,
796 treatment,
797 priority,
798 installTable,
799 install);
800 }
801
802 private void setTrackEstablish(DeviceId deviceId, long ctState, long ctMask,
803 int installTable, int transitTable,
804 int priority, boolean install) {
805 ExtensionSelector esCtSate = RulePopulatorUtil
806 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
807 TrafficSelector selector = DefaultTrafficSelector.builder()
808 .extension(esCtSate, deviceId)
809 .build();
810
811 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
812 .transition(transitTable)
813 .build();
814
815 k8sFlowRuleService.setRule(
816 appId,
817 deviceId,
818 selector,
819 treatment,
820 priority,
821 installTable,
822 install);
823 }
824
Jian Libf562c22019-04-15 18:07:14 +0900825 private void setEndpointsRules(Endpoints endpoints, boolean install) {
826 String appName = endpoints.getMetadata().getName();
827 Service service = k8sServiceService.services().stream().filter(s ->
828 appName.equals(s.getMetadata().getName()))
829 .findFirst().orElse(null);
Jian Li7b63fe62019-04-14 21:49:27 +0900830
Jian Libf562c22019-04-15 18:07:14 +0900831 if (service == null) {
832 return;
Jian Li7b63fe62019-04-14 21:49:27 +0900833 }
Jian Libf562c22019-04-15 18:07:14 +0900834
835 setGroupBuckets(service, install);
Jian Li7b63fe62019-04-14 21:49:27 +0900836 }
837
Jian Lif5da78a2019-04-15 01:52:23 +0900838 private String servicePortStr(String ip, int port, String protocol) {
839 return ip + "_" + port + "_" + protocol;
840 }
841
Jian Li004526d2019-02-25 16:26:27 +0900842 /**
843 * Extracts properties from the component configuration context.
844 *
845 * @param context the component context
846 */
847 private void readComponentConfiguration(ComponentContext context) {
848 Dictionary<?, ?> properties = context.getProperties();
849
850 String updatedNatMode = Tools.get(properties, SERVICE_IP_NAT_MODE);
851 serviceIpNatMode = updatedNatMode != null ? updatedNatMode : SERVICE_IP_NAT_MODE_DEFAULT;
852 log.info("Configured. Service IP NAT mode is {}", serviceIpNatMode);
Jian Lif5da78a2019-04-15 01:52:23 +0900853
854 String updatedServiceCidr = Tools.get(properties, SERVICE_CIDR);
855 serviceCidr = updatedServiceCidr != null ?
856 updatedServiceCidr : SERVICE_IP_CIDR_DEFAULT;
857 log.info("Configured. Service VIP range is {}", serviceCidr);
Jian Li004526d2019-02-25 16:26:27 +0900858 }
859
Jian Li4a7ce672019-04-09 15:20:25 +0900860 private void setServiceNatRules(DeviceId deviceId, boolean install) {
861 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
862 setStatefulServiceNatRules(deviceId, install);
863 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
864 setStatelessServiceNatRules(deviceId, install);
865 } else {
866 log.warn("Service IP NAT mode was not configured!");
867 }
868 }
869
Jian Li2cc2b632019-02-18 00:56:40 +0900870 private class InternalK8sServiceListener implements K8sServiceListener {
871
872 private boolean isRelevantHelper() {
873 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
874 }
875
876 @Override
877 public void event(K8sServiceEvent event) {
878 switch (event.type()) {
879 case K8S_SERVICE_CREATED:
Jian Li4a7ce672019-04-09 15:20:25 +0900880 case K8S_SERVICE_UPDATED:
Jian Li2cc2b632019-02-18 00:56:40 +0900881 eventExecutor.execute(() -> processServiceCreation(event.subject()));
882 break;
883 case K8S_SERVICE_REMOVED:
884 eventExecutor.execute(() -> processServiceRemoval(event.subject()));
885 break;
886 default:
887 // do nothing
888 break;
889 }
890 }
891
892 private void processServiceCreation(Service service) {
893 if (!isRelevantHelper()) {
894 return;
895 }
896
Jian Li5e8a22a2019-02-27 11:48:42 +0900897 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
898 long ctTrackNew = computeCtStateFlag(true, true, false);
899 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
Jian Li2cc2b632019-02-18 00:56:40 +0900900
Jian Li5e8a22a2019-02-27 11:48:42 +0900901 k8sNodeService.completeNodes().forEach(n ->
902 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
903 ctMaskTrackNew, service, true));
904 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
905 k8sNodeService.completeNodes().forEach(n ->
906 setStatelessGroupFlowRules(n.intgBridge(), service, true));
907 }
Jian Li2cc2b632019-02-18 00:56:40 +0900908 }
909
910 private void processServiceRemoval(Service service) {
911 if (!isRelevantHelper()) {
912 return;
913 }
914
Jian Li5e8a22a2019-02-27 11:48:42 +0900915 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
916 long ctTrackNew = computeCtStateFlag(true, true, false);
917 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
Jian Li2cc2b632019-02-18 00:56:40 +0900918
Jian Li5e8a22a2019-02-27 11:48:42 +0900919 k8sNodeService.completeNodes().forEach(n ->
920 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
921 ctMaskTrackNew, service, false));
922 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
923 k8sNodeService.completeNodes().forEach(n ->
924 setStatelessGroupFlowRules(n.intgBridge(), service, false));
925 }
Jian Li004526d2019-02-25 16:26:27 +0900926 }
Jian Li2cc2b632019-02-18 00:56:40 +0900927 }
928
Jian Li7b63fe62019-04-14 21:49:27 +0900929 private class InternalK8sEndpointsListener implements K8sEndpointsListener {
930
931 private boolean isRelevantHelper() {
932 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
933 }
934
935 @Override
936 public void event(K8sEndpointsEvent event) {
937 Endpoints endpoints = event.subject();
938
939 switch (event.type()) {
940 case K8S_ENDPOINTS_CREATED:
Jian Libf562c22019-04-15 18:07:14 +0900941 case K8S_ENDPOINTS_UPDATED:
Jian Li7b63fe62019-04-14 21:49:27 +0900942 eventExecutor.execute(() -> processEndpointsCreation(endpoints));
943 break;
944 case K8S_ENDPOINTS_REMOVED:
945 eventExecutor.execute(() -> processEndpointsRemoval(endpoints));
946 break;
947 default:
948 break;
949 }
950 }
951
952 private void processEndpointsCreation(Endpoints endpoints) {
953 if (!isRelevantHelper()) {
954 return;
955 }
956
Jian Libf562c22019-04-15 18:07:14 +0900957 setEndpointsRules(endpoints, true);
Jian Li7b63fe62019-04-14 21:49:27 +0900958 }
959
960 private void processEndpointsRemoval(Endpoints endpoints) {
961 if (!isRelevantHelper()) {
962 return;
963 }
964
Jian Libf562c22019-04-15 18:07:14 +0900965 setEndpointsRules(endpoints, false);
Jian Li4a7ce672019-04-09 15:20:25 +0900966 }
967 }
968
Jian Li2cc2b632019-02-18 00:56:40 +0900969 private class InternalNodeEventListener implements K8sNodeListener {
970
971 private boolean isRelevantHelper() {
972 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
973 }
974
975 @Override
976 public void event(K8sNodeEvent event) {
977 K8sNode k8sNode = event.subject();
978 switch (event.type()) {
979 case K8S_NODE_COMPLETE:
980 eventExecutor.execute(() -> processNodeCompletion(k8sNode));
981 break;
982 case K8S_NODE_INCOMPLETE:
Jian Li4a7ce672019-04-09 15:20:25 +0900983 case K8S_NODE_REMOVED:
Jian Li2cc2b632019-02-18 00:56:40 +0900984 default:
985 break;
986 }
987 }
988
989 private void processNodeCompletion(K8sNode node) {
990 if (!isRelevantHelper()) {
991 return;
992 }
993
Jian Li4a7ce672019-04-09 15:20:25 +0900994 setServiceNatRules(node.intgBridge(), true);
Jian Libf562c22019-04-15 18:07:14 +0900995 k8sEndpointsService.endpointses().forEach(e -> setEndpointsRules(e, true));
Jian Lif5da78a2019-04-15 01:52:23 +0900996 k8sNetworkService.networks().forEach(n -> setupServiceDefaultRule(n, true));
997 }
998 }
999
1000 private class InternalK8sNetworkListener implements K8sNetworkListener {
1001
1002 private boolean isRelevantHelper() {
1003 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
1004 }
1005
1006 @Override
1007 public void event(K8sNetworkEvent event) {
1008 switch (event.type()) {
1009 case K8S_NETWORK_CREATED:
1010 eventExecutor.execute(() -> processNetworkCreation(event.subject()));
1011 break;
1012 case K8S_NETWORK_UPDATED:
1013 case K8S_NETWORK_REMOVED:
1014 default:
1015 break;
1016 }
1017 }
1018
1019 private void processNetworkCreation(K8sNetwork network) {
1020 if (!isRelevantHelper()) {
1021 return;
1022 }
1023
1024 setupServiceDefaultRule(network, true);
Jian Li2cc2b632019-02-18 00:56:40 +09001025 }
1026 }
1027}