blob: 2640508a6f08869ccdacc2f0b4a4cb1e04799675 [file] [log] [blame]
Jian Li2cc2b632019-02-18 00:56:40 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.Lists;
Jian Li004526d2019-02-25 16:26:27 +090019import com.google.common.collect.Maps;
Jian Li2cc2b632019-02-18 00:56:40 +090020import io.fabric8.kubernetes.api.model.EndpointAddress;
21import io.fabric8.kubernetes.api.model.EndpointPort;
22import io.fabric8.kubernetes.api.model.EndpointSubset;
23import io.fabric8.kubernetes.api.model.Endpoints;
Jian Lie1a5b8f2019-07-23 17:13:19 +090024import io.fabric8.kubernetes.api.model.Pod;
Jian Li2cc2b632019-02-18 00:56:40 +090025import io.fabric8.kubernetes.api.model.Service;
Jian Li5e8a22a2019-02-27 11:48:42 +090026import io.fabric8.kubernetes.api.model.ServicePort;
Jian Li2cc2b632019-02-18 00:56:40 +090027import org.onlab.packet.Ethernet;
28import org.onlab.packet.IPv4;
29import org.onlab.packet.IpAddress;
30import org.onlab.packet.IpPrefix;
Jian Lif5da78a2019-04-15 01:52:23 +090031import org.onlab.packet.MacAddress;
Jian Li2cc2b632019-02-18 00:56:40 +090032import org.onlab.packet.TpPort;
Jian Li004526d2019-02-25 16:26:27 +090033import org.onlab.util.Tools;
34import org.onosproject.cfg.ComponentConfigService;
Jian Li2cc2b632019-02-18 00:56:40 +090035import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.LeadershipService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
40import org.onosproject.core.GroupId;
Jian Li7b63fe62019-04-14 21:49:27 +090041import org.onosproject.k8snetworking.api.K8sEndpointsEvent;
42import org.onosproject.k8snetworking.api.K8sEndpointsListener;
Jian Li2cc2b632019-02-18 00:56:40 +090043import org.onosproject.k8snetworking.api.K8sEndpointsService;
44import org.onosproject.k8snetworking.api.K8sFlowRuleService;
45import org.onosproject.k8snetworking.api.K8sGroupRuleService;
Jian Lif5da78a2019-04-15 01:52:23 +090046import org.onosproject.k8snetworking.api.K8sNetwork;
47import org.onosproject.k8snetworking.api.K8sNetworkEvent;
48import org.onosproject.k8snetworking.api.K8sNetworkListener;
Jian Li2cc2b632019-02-18 00:56:40 +090049import org.onosproject.k8snetworking.api.K8sNetworkService;
Jian Lie1a5b8f2019-07-23 17:13:19 +090050import org.onosproject.k8snetworking.api.K8sPodService;
Jian Li2cc2b632019-02-18 00:56:40 +090051import org.onosproject.k8snetworking.api.K8sServiceEvent;
52import org.onosproject.k8snetworking.api.K8sServiceListener;
53import org.onosproject.k8snetworking.api.K8sServiceService;
54import org.onosproject.k8snetworking.util.RulePopulatorUtil;
55import org.onosproject.k8snetworking.util.RulePopulatorUtil.NiciraConnTrackTreatmentBuilder;
56import org.onosproject.k8snode.api.K8sNode;
57import org.onosproject.k8snode.api.K8sNodeEvent;
58import org.onosproject.k8snode.api.K8sNodeListener;
59import org.onosproject.k8snode.api.K8sNodeService;
60import org.onosproject.net.DeviceId;
Jian Lif5da78a2019-04-15 01:52:23 +090061import org.onosproject.net.PortNumber;
Jian Li2cc2b632019-02-18 00:56:40 +090062import org.onosproject.net.device.DeviceService;
63import org.onosproject.net.driver.DriverService;
64import org.onosproject.net.flow.DefaultTrafficSelector;
65import org.onosproject.net.flow.DefaultTrafficTreatment;
66import org.onosproject.net.flow.TrafficSelector;
67import org.onosproject.net.flow.TrafficTreatment;
68import org.onosproject.net.flow.criteria.ExtensionSelector;
69import org.onosproject.net.flow.instructions.ExtensionTreatment;
70import org.onosproject.net.group.GroupBucket;
Jian Li2cc2b632019-02-18 00:56:40 +090071import org.onosproject.store.service.StorageService;
Jian Li004526d2019-02-25 16:26:27 +090072import org.osgi.service.component.ComponentContext;
Jian Li2cc2b632019-02-18 00:56:40 +090073import org.osgi.service.component.annotations.Activate;
74import org.osgi.service.component.annotations.Component;
75import org.osgi.service.component.annotations.Deactivate;
Jian Li004526d2019-02-25 16:26:27 +090076import org.osgi.service.component.annotations.Modified;
Jian Li2cc2b632019-02-18 00:56:40 +090077import org.osgi.service.component.annotations.Reference;
78import org.osgi.service.component.annotations.ReferenceCardinality;
79import org.slf4j.Logger;
80
Jian Li004526d2019-02-25 16:26:27 +090081import java.util.Dictionary;
Jian Li2cc2b632019-02-18 00:56:40 +090082import java.util.List;
83import java.util.Map;
84import java.util.Objects;
Jian Li004526d2019-02-25 16:26:27 +090085import java.util.Set;
Jian Li2cc2b632019-02-18 00:56:40 +090086import java.util.concurrent.ExecutorService;
87import java.util.stream.Collectors;
88
89import static java.util.concurrent.Executors.newSingleThreadExecutor;
90import static org.onlab.util.Tools.groupedThreads;
Jian Li73d3b6a2019-07-08 18:07:53 +090091import static org.onosproject.k8snetworking.api.Constants.ACL_TABLE;
Jian Li140d8a22019-04-24 23:41:44 +090092import static org.onosproject.k8snetworking.api.Constants.A_CLASS;
93import static org.onosproject.k8snetworking.api.Constants.B_CLASS;
Jian Li5e8a22a2019-02-27 11:48:42 +090094import static org.onosproject.k8snetworking.api.Constants.DST;
Jian Li73d3b6a2019-07-08 18:07:53 +090095import static org.onosproject.k8snetworking.api.Constants.GROUPING_TABLE;
Jian Li2cc2b632019-02-18 00:56:40 +090096import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Lie1a5b8f2019-07-23 17:13:19 +090097import static org.onosproject.k8snetworking.api.Constants.NAMESPACE_TABLE;
Jian Li004526d2019-02-25 16:26:27 +090098import static org.onosproject.k8snetworking.api.Constants.NAT_STATEFUL;
99import static org.onosproject.k8snetworking.api.Constants.NAT_STATELESS;
Jian Li2cc2b632019-02-18 00:56:40 +0900100import static org.onosproject.k8snetworking.api.Constants.NAT_TABLE;
Jian Li140d8a22019-04-24 23:41:44 +0900101import static org.onosproject.k8snetworking.api.Constants.NODE_IP_PREFIX;
Jian Li004526d2019-02-25 16:26:27 +0900102import static org.onosproject.k8snetworking.api.Constants.POD_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900103import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE;
Jian Li2cc2b632019-02-18 00:56:40 +0900104import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE;
Jian Li7d111d72019-04-12 13:58:44 +0900105import static org.onosproject.k8snetworking.api.Constants.PRIORITY_INTER_ROUTING_RULE;
Jian Li004526d2019-02-25 16:26:27 +0900106import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAT_RULE;
Jian Li2cc2b632019-02-18 00:56:40 +0900107import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900108import static org.onosproject.k8snetworking.api.Constants.SERVICE_FAKE_MAC_STR;
Jian Li004526d2019-02-25 16:26:27 +0900109import static org.onosproject.k8snetworking.api.Constants.SERVICE_TABLE;
110import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_CIDR;
111import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
Jian Li5e8a22a2019-02-27 11:48:42 +0900112import static org.onosproject.k8snetworking.api.Constants.SRC;
Jian Lie1a5b8f2019-07-23 17:13:19 +0900113import static org.onosproject.k8snetworking.api.Constants.STAT_EGRESS_TABLE;
Jian Lif5da78a2019-04-15 01:52:23 +0900114import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_CIDR;
115import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_CIDR_DEFAULT;
Jian Li004526d2019-02-25 16:26:27 +0900116import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE;
117import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE_DEFAULT;
Jian Li140d8a22019-04-24 23:41:44 +0900118import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getBclassIpPrefixFromCidr;
Jian Li2cc2b632019-02-18 00:56:40 +0900119import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap;
Jian Lie1a5b8f2019-07-23 17:13:19 +0900120import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.podByIp;
121import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.portNumberByName;
Jian Lif5da78a2019-04-15 01:52:23 +0900122import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId;
Jian Li2cc2b632019-02-18 00:56:40 +0900123import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_DST_FLAG;
Jian Lif5da78a2019-04-15 01:52:23 +0900124import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension;
Jian Li2cc2b632019-02-18 00:56:40 +0900125import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildGroupBucket;
Jian Li004526d2019-02-25 16:26:27 +0900126import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildLoadExtension;
Jian Li2cc2b632019-02-18 00:56:40 +0900127import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildResubmitExtension;
128import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtMaskFlag;
129import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtStateFlag;
130import static org.onosproject.k8snetworking.util.RulePopulatorUtil.niciraConnTrackTreatmentBuilder;
131import static org.onosproject.net.group.GroupDescription.Type.SELECT;
132import static org.slf4j.LoggerFactory.getLogger;
133
134/**
135 * Handles the service IP to pod IP related translation traffic.
136 */
Jian Li004526d2019-02-25 16:26:27 +0900137@Component(
138 immediate = true,
139 property = {
Jian Lif5da78a2019-04-15 01:52:23 +0900140 SERVICE_IP_NAT_MODE + "=" + SERVICE_IP_NAT_MODE_DEFAULT,
141 SERVICE_CIDR + "=" + SERVICE_IP_CIDR_DEFAULT
Jian Li004526d2019-02-25 16:26:27 +0900142 }
143)
Jian Li2cc2b632019-02-18 00:56:40 +0900144public class K8sServiceHandler {
145
146 private final Logger log = getLogger(getClass());
147
Jian Li2cc2b632019-02-18 00:56:40 +0900148 private static final int HOST_CIDR_NUM = 32;
149
Jian Li2cc2b632019-02-18 00:56:40 +0900150 private static final String CLUSTER_IP = "ClusterIP";
151 private static final String TCP = "TCP";
Jian Li5e8a22a2019-02-27 11:48:42 +0900152 private static final String UDP = "UDP";
Jian Lif5da78a2019-04-15 01:52:23 +0900153 private static final String SERVICE_IP_NAT_MODE = "serviceIpNatMode";
Jian Li2cc2b632019-02-18 00:56:40 +0900154
Jian Li140d8a22019-04-24 23:41:44 +0900155 private static final String SERVICE_CIDR = "serviceCidr";
Jian Li44c2b122019-05-03 14:46:34 +0900156 private static final String NONE = "None";
Jian Li140d8a22019-04-24 23:41:44 +0900157 private static final String B_CLASS_SUFFIX = ".0.0/16";
158 private static final String A_CLASS_SUFFIX = ".0.0.0/8";
159
Jian Li2cc2b632019-02-18 00:56:40 +0900160 @Reference(cardinality = ReferenceCardinality.MANDATORY)
161 protected CoreService coreService;
162
163 @Reference(cardinality = ReferenceCardinality.MANDATORY)
164 protected LeadershipService leadershipService;
165
166 @Reference(cardinality = ReferenceCardinality.MANDATORY)
167 protected ClusterService clusterService;
168
169 @Reference(cardinality = ReferenceCardinality.MANDATORY)
170 protected DriverService driverService;
171
172 @Reference(cardinality = ReferenceCardinality.MANDATORY)
173 protected DeviceService deviceService;
174
175 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li004526d2019-02-25 16:26:27 +0900176 protected ComponentConfigService configService;
177
178 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li2cc2b632019-02-18 00:56:40 +0900179 protected StorageService storageService;
180
181 @Reference(cardinality = ReferenceCardinality.MANDATORY)
182 protected K8sNetworkService k8sNetworkService;
183
184 @Reference(cardinality = ReferenceCardinality.MANDATORY)
185 protected K8sFlowRuleService k8sFlowRuleService;
186
187 @Reference(cardinality = ReferenceCardinality.MANDATORY)
188 protected K8sGroupRuleService k8sGroupRuleService;
189
190 @Reference(cardinality = ReferenceCardinality.MANDATORY)
191 protected K8sNodeService k8sNodeService;
192
193 @Reference(cardinality = ReferenceCardinality.MANDATORY)
194 protected K8sEndpointsService k8sEndpointsService;
195
196 @Reference(cardinality = ReferenceCardinality.MANDATORY)
197 protected K8sServiceService k8sServiceService;
198
Jian Lie1a5b8f2019-07-23 17:13:19 +0900199 @Reference(cardinality = ReferenceCardinality.MANDATORY)
200 protected K8sPodService k8sPodService;
Jian Li4a7ce672019-04-09 15:20:25 +0900201
Jian Li004526d2019-02-25 16:26:27 +0900202 /** Service IP address translation mode. */
203 private String serviceIpNatMode = SERVICE_IP_NAT_MODE_DEFAULT;
204
Jian Lif5da78a2019-04-15 01:52:23 +0900205 /** Ranges of IP address of service VIP. */
206 private String serviceCidr = SERVICE_IP_CIDR_DEFAULT;
207
Jian Li2cc2b632019-02-18 00:56:40 +0900208 private final ExecutorService eventExecutor = newSingleThreadExecutor(
209 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
210 private final InternalNodeEventListener internalNodeEventListener =
211 new InternalNodeEventListener();
212 private final InternalK8sServiceListener internalK8sServiceListener =
213 new InternalK8sServiceListener();
Jian Li7b63fe62019-04-14 21:49:27 +0900214 private final InternalK8sEndpointsListener internalK8sEndpointsListener =
215 new InternalK8sEndpointsListener();
Jian Lif5da78a2019-04-15 01:52:23 +0900216 private final InternalK8sNetworkListener internalK8sNetworkListener =
217 new InternalK8sNetworkListener();
Jian Li2cc2b632019-02-18 00:56:40 +0900218
Jian Li2cc2b632019-02-18 00:56:40 +0900219 private ApplicationId appId;
220 private NodeId localNodeId;
221
222 @Activate
223 protected void activate() {
224 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
Jian Li004526d2019-02-25 16:26:27 +0900225 configService.registerProperties(getClass());
Jian Li2cc2b632019-02-18 00:56:40 +0900226 localNodeId = clusterService.getLocalNode().id();
227 leadershipService.runForLeadership(appId.name());
228 k8sNodeService.addListener(internalNodeEventListener);
229 k8sServiceService.addListener(internalK8sServiceListener);
Jian Li7b63fe62019-04-14 21:49:27 +0900230 k8sEndpointsService.addListener(internalK8sEndpointsListener);
Jian Lif5da78a2019-04-15 01:52:23 +0900231 k8sNetworkService.addListener(internalK8sNetworkListener);
Jian Li2cc2b632019-02-18 00:56:40 +0900232
Jian Li2cc2b632019-02-18 00:56:40 +0900233 log.info("Started");
234 }
235
236 @Deactivate
237 protected void deactivate() {
238 leadershipService.withdraw(appId.name());
239 k8sNodeService.removeListener(internalNodeEventListener);
240 k8sServiceService.removeListener(internalK8sServiceListener);
Jian Li7b63fe62019-04-14 21:49:27 +0900241 k8sEndpointsService.removeListener(internalK8sEndpointsListener);
Jian Lif5da78a2019-04-15 01:52:23 +0900242 k8sNetworkService.removeListener(internalK8sNetworkListener);
Jian Li004526d2019-02-25 16:26:27 +0900243 configService.unregisterProperties(getClass(), false);
Jian Li2cc2b632019-02-18 00:56:40 +0900244 eventExecutor.shutdown();
245
246 log.info("Stopped");
247 }
248
Jian Li004526d2019-02-25 16:26:27 +0900249 @Modified
250 void modified(ComponentContext context) {
251 readComponentConfiguration(context);
252
253 log.info("Modified");
254 }
255
256 private void setStatefulServiceNatRules(DeviceId deviceId, boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900257 // -trk CT rules
258 long ctUntrack = computeCtStateFlag(false, false, false);
259 long ctMaskUntrack = computeCtMaskFlag(true, false, false);
260
261 k8sNetworkService.networks().forEach(n -> {
262 // TODO: need to provide a way to add multiple service IP CIDR ranges
Jian Lif5da78a2019-04-15 01:52:23 +0900263 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), serviceCidr,
Jian Li73d3b6a2019-07-08 18:07:53 +0900264 GROUPING_TABLE, NAT_TABLE, PRIORITY_CT_RULE, install);
Jian Li2cc2b632019-02-18 00:56:40 +0900265 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), n.cidr(),
Jian Lie1a5b8f2019-07-23 17:13:19 +0900266 GROUPING_TABLE, NAMESPACE_TABLE, PRIORITY_CT_RULE, install);
Jian Li2cc2b632019-02-18 00:56:40 +0900267 });
268
269 // +trk-new CT rules
270 long ctTrackUnnew = computeCtStateFlag(true, false, false);
271 long ctMaskTrackUnnew = computeCtMaskFlag(true, true, false);
272
273 setTrackEstablish(deviceId, ctTrackUnnew, ctMaskTrackUnnew,
274 NAT_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
275
276 // +trk+new CT rules
277 long ctTrackNew = computeCtStateFlag(true, true, false);
278 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
279
280 k8sServiceService.services().stream()
281 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
Jian Li004526d2019-02-25 16:26:27 +0900282 .forEach(s -> setStatefulGroupFlowRules(deviceId, ctTrackNew,
283 ctMaskTrackNew, s, install));
Jian Li2cc2b632019-02-18 00:56:40 +0900284 }
285
Jian Li004526d2019-02-25 16:26:27 +0900286 private void setStatelessServiceNatRules(DeviceId deviceId, boolean install) {
287
Jian Li140d8a22019-04-24 23:41:44 +0900288 String srcPodCidr = k8sNetworkService.network(
Jian Li7d111d72019-04-12 13:58:44 +0900289 k8sNodeService.node(deviceId).hostname()).cidr();
Jian Li140d8a22019-04-24 23:41:44 +0900290 String srcPodPrefix = getBclassIpPrefixFromCidr(srcPodCidr);
291 String fullSrcPodCidr = srcPodPrefix + B_CLASS_SUFFIX;
292 String fullSrcNodeCidr = NODE_IP_PREFIX + A_CLASS_SUFFIX;
293
294 // src: POD -> dst: service (unNAT POD) grouping
295 setSrcDstCidrRules(deviceId, fullSrcPodCidr, serviceCidr, B_CLASS, null,
Jian Li73d3b6a2019-07-08 18:07:53 +0900296 SHIFTED_IP_PREFIX, SRC, GROUPING_TABLE, SERVICE_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900297 PRIORITY_CT_RULE, install);
298 // src: POD (unNAT service) -> dst: shifted POD grouping
299 setSrcDstCidrRules(deviceId, fullSrcPodCidr, SHIFTED_IP_CIDR, B_CLASS, null,
Jian Li73d3b6a2019-07-08 18:07:53 +0900300 srcPodPrefix, DST, GROUPING_TABLE, POD_TABLE, PRIORITY_CT_RULE, install);
Jian Li140d8a22019-04-24 23:41:44 +0900301
302 // src: node -> dst: service (unNAT POD) grouping
303 setSrcDstCidrRules(deviceId, fullSrcNodeCidr, serviceCidr, A_CLASS,
Jian Li73d3b6a2019-07-08 18:07:53 +0900304 null, null, null, GROUPING_TABLE, SERVICE_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900305 PRIORITY_CT_RULE, install);
306 // src: POD (unNAT service) -> dst: node grouping
307 setSrcDstCidrRules(deviceId, fullSrcPodCidr, fullSrcNodeCidr, A_CLASS,
Jian Li73d3b6a2019-07-08 18:07:53 +0900308 null, null, null, GROUPING_TABLE, POD_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900309 PRIORITY_CT_RULE, install);
Jian Li7d111d72019-04-12 13:58:44 +0900310
Jian Li004526d2019-02-25 16:26:27 +0900311 k8sNetworkService.networks().forEach(n -> {
Jian Li140d8a22019-04-24 23:41:44 +0900312 setSrcDstCidrRules(deviceId, fullSrcPodCidr, n.cidr(), B_CLASS,
313 n.segmentId(), null, null, ROUTING_TABLE,
Jian Li73d3b6a2019-07-08 18:07:53 +0900314 STAT_EGRESS_TABLE, PRIORITY_INTER_ROUTING_RULE, install);
Jian Li004526d2019-02-25 16:26:27 +0900315 });
316
317 // setup load balancing rules using group table
318 k8sServiceService.services().stream()
319 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
320 .forEach(s -> setStatelessGroupFlowRules(deviceId, s, install));
321 }
322
323 private void setSrcDstCidrRules(DeviceId deviceId, String srcCidr,
Jian Li140d8a22019-04-24 23:41:44 +0900324 String dstCidr, String cidrClass,
325 String segId, String shiftPrefix,
326 String shiftType, int installTable,
327 int transitTable, int priority,
328 boolean install) {
Jian Li004526d2019-02-25 16:26:27 +0900329 TrafficSelector selector = DefaultTrafficSelector.builder()
330 .matchEthType(Ethernet.TYPE_IPV4)
331 .matchIPSrc(IpPrefix.valueOf(srcCidr))
332 .matchIPDst(IpPrefix.valueOf(dstCidr))
333 .build();
334
Jian Li7d111d72019-04-12 13:58:44 +0900335 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
336
337 if (segId != null) {
338 tBuilder.setTunnelId(Long.valueOf(segId));
339 }
Jian Li140d8a22019-04-24 23:41:44 +0900340
341 if (shiftPrefix != null && shiftType != null) {
342 ExtensionTreatment loadTreatment = buildLoadExtension(
343 deviceService.getDevice(deviceId), cidrClass, shiftType, shiftPrefix);
344 tBuilder.extension(loadTreatment, deviceId);
345 }
346
Jian Li7d111d72019-04-12 13:58:44 +0900347 tBuilder.transition(transitTable);
Jian Li004526d2019-02-25 16:26:27 +0900348
349 k8sFlowRuleService.setRule(
350 appId,
351 deviceId,
352 selector,
Jian Li7d111d72019-04-12 13:58:44 +0900353 tBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900354 priority,
355 installTable,
356 install);
357 }
358
Jian Li5e8a22a2019-02-27 11:48:42 +0900359 /**
360 * Obtains the service port to endpoint address paired map.
361 *
362 * @param service kubernetes service
363 * @return a map where key is kubernetes service port, and value is the
364 * endpoint addresses that are associated with the service port
365 */
366 private Map<ServicePort, Set<String>> getSportEpAddressMap(Service service) {
367
368 Map<ServicePort, Set<String>> map = Maps.newConcurrentMap();
Jian Li004526d2019-02-25 16:26:27 +0900369
370 String serviceName = service.getMetadata().getName();
Jian Li004526d2019-02-25 16:26:27 +0900371 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
372 .stream()
373 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
374 .collect(Collectors.toList());
375
Jian Li5e8a22a2019-02-27 11:48:42 +0900376 service.getSpec().getPorts().stream()
377 .filter(Objects::nonNull)
378 .filter(sp -> sp.getTargetPort() != null)
Jian Lib7dfb5b2019-07-15 17:37:12 +0900379 .filter(sp -> sp.getTargetPort().getIntVal() != null ||
Jian Lie1a5b8f2019-07-23 17:13:19 +0900380 sp.getTargetPort().getStrVal() != null)
Jian Li5e8a22a2019-02-27 11:48:42 +0900381 .forEach(sp -> {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900382 Integer targetPortInt = sp.getTargetPort().getIntVal() != null ?
383 sp.getTargetPort().getIntVal() : 0;
384 String targetPortName = sp.getTargetPort().getStrVal() != null ?
385 sp.getTargetPort().getStrVal() : "";
386 String targetProtocol = sp.getProtocol();
Jian Li004526d2019-02-25 16:26:27 +0900387
Jian Lie1a5b8f2019-07-23 17:13:19 +0900388 for (Endpoints endpoints : endpointses) {
389 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
390
391 // in case service port name is specified but not port number
392 // we will lookup the container port number and use it
393 // as the target port number
394 if (!targetPortName.equals("") && targetPortInt == 0) {
395 for (EndpointAddress addr : endpointSubset.getAddresses()) {
396 Pod pod = podByIp(k8sPodService, addr.getIp());
397 targetPortInt = portNumberByName(pod, targetPortName);
398 }
399 }
400
401 for (EndpointPort endpointPort : endpointSubset.getPorts()) {
402 if (targetProtocol.equals(endpointPort.getProtocol()) &&
403 (targetPortInt.equals(endpointPort.getPort()) ||
404 targetPortName.equals(endpointPort.getName()))) {
405 Set<String> addresses = endpointSubset.getAddresses()
406 .stream().map(EndpointAddress::getIp)
407 .collect(Collectors.toSet());
408 map.put(sp, addresses);
409 }
410 }
Jian Li004526d2019-02-25 16:26:27 +0900411 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900412 }
Jian Lie1a5b8f2019-07-23 17:13:19 +0900413 });
Jian Li004526d2019-02-25 16:26:27 +0900414
Jian Li5e8a22a2019-02-27 11:48:42 +0900415 return map;
416 }
417
Jian Libf562c22019-04-15 18:07:14 +0900418 private void setGroupBuckets(Service service, boolean install) {
Jian Li4a7ce672019-04-09 15:20:25 +0900419 Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
420 Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
Jian Li7b63fe62019-04-14 21:49:27 +0900421 Map<String, String> nodeIpGatewayIpMap =
422 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
Jian Li4a7ce672019-04-09 15:20:25 +0900423
Jian Libf562c22019-04-15 18:07:14 +0900424 for (K8sNode node : k8sNodeService.completeNodes()) {
425 spEpasMap.forEach((sp, epas) -> {
426 List<GroupBucket> bkts = Lists.newArrayList();
Jian Li4a7ce672019-04-09 15:20:25 +0900427
Jian Libf562c22019-04-15 18:07:14 +0900428 for (String ip : epas) {
429 if (install) {
430 bkts.add(buildBuckets(node.intgBridge(),
431 nodeIpGatewayIpMap.getOrDefault(ip, ip), sp));
432 } else {
433 bkts.add(buildBuckets(node.intgBridge(),
434 nodeIpGatewayIpMap.getOrDefault(ip, ip), sp));
Jian Li7b63fe62019-04-14 21:49:27 +0900435 }
Jian Libf562c22019-04-15 18:07:14 +0900436 }
437
438 spGrpBkts.put(sp, bkts);
439 });
440
441 String serviceIp = service.getSpec().getClusterIP();
442 spGrpBkts.forEach((sp, bkts) -> {
443 String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
444 int groupId = svcStr.hashCode();
445
446 if (bkts.size() > 0) {
447 k8sGroupRuleService.setBuckets(appId, node.intgBridge(), groupId, bkts);
448 }
449 });
450
451 spEpasMap.forEach((sp, epas) ->
Jian Lie1a5b8f2019-07-23 17:13:19 +0900452 // add flow rules for unshifting IP domain
453 epas.forEach(epa -> {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900454
Jian Lie1a5b8f2019-07-23 17:13:19 +0900455 String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
456
457 int targetPort;
458 if (sp.getTargetPort().getIntVal() == null) {
459 Pod pod = podByIp(k8sPodService, podIp);
460 targetPort = portNumberByName(pod, sp.getTargetPort().getStrVal());
461 } else {
462 targetPort = sp.getTargetPort().getIntVal();
463 }
464
465 setUnshiftDomainRules(node.intgBridge(), POD_TABLE,
466 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
467 sp.getProtocol(), podIp,
468 targetPort, install);
469 })
Jian Libf562c22019-04-15 18:07:14 +0900470 );
471 }
Jian Li4a7ce672019-04-09 15:20:25 +0900472 }
473
Jian Lib7dfb5b2019-07-15 17:37:12 +0900474 private GroupBucket buildBuckets(DeviceId deviceId, String podIpStr, ServicePort sp) {
Jian Li4a7ce672019-04-09 15:20:25 +0900475 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li7d111d72019-04-12 13:58:44 +0900476 .setIpDst(IpAddress.valueOf(podIpStr));
Jian Li4a7ce672019-04-09 15:20:25 +0900477
Jian Lie1a5b8f2019-07-23 17:13:19 +0900478 int targetPort;
479 if (sp.getTargetPort().getIntVal() == null) {
480 Pod pod = podByIp(k8sPodService, podIpStr);
481 targetPort = portNumberByName(pod, sp.getTargetPort().getStrVal());
482 } else {
483 targetPort = sp.getTargetPort().getIntVal();
484 }
Jian Lib7dfb5b2019-07-15 17:37:12 +0900485
Jian Li4a7ce672019-04-09 15:20:25 +0900486 if (TCP.equals(sp.getProtocol())) {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900487 tBuilder.setTcpDst(TpPort.tpPort(targetPort));
Jian Li4a7ce672019-04-09 15:20:25 +0900488 } else if (UDP.equals(sp.getProtocol())) {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900489 tBuilder.setUdpDst(TpPort.tpPort(targetPort));
Jian Li4a7ce672019-04-09 15:20:25 +0900490 }
491
Jian Li7d111d72019-04-12 13:58:44 +0900492 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
Jian Li73d3b6a2019-07-08 18:07:53 +0900493 deviceService.getDevice(deviceId), ACL_TABLE);
Jian Li7d111d72019-04-12 13:58:44 +0900494 tBuilder.extension(resubmitTreatment, deviceId);
495
Jian Libf562c22019-04-15 18:07:14 +0900496 return buildGroupBucket(tBuilder.build(), SELECT, (short) -1);
Jian Li4a7ce672019-04-09 15:20:25 +0900497 }
498
499 private synchronized void setStatelessGroupFlowRules(DeviceId deviceId,
500 Service service,
501 boolean install) {
Jian Li7b63fe62019-04-14 21:49:27 +0900502 Set<ServicePort> sps = service.getSpec().getPorts().stream()
503 .filter(Objects::nonNull)
504 .filter(sp -> sp.getTargetPort() != null)
Jian Lib7dfb5b2019-07-15 17:37:12 +0900505 .filter(sp -> sp.getTargetPort().getIntVal() != null ||
Jian Lie1a5b8f2019-07-23 17:13:19 +0900506 sp.getTargetPort().getStrVal() != null)
Jian Li7b63fe62019-04-14 21:49:27 +0900507 .collect(Collectors.toSet());
Jian Li5e8a22a2019-02-27 11:48:42 +0900508
509 String serviceIp = service.getSpec().getClusterIP();
Jian Li7b63fe62019-04-14 21:49:27 +0900510 sps.forEach(sp -> {
Jian Li5e8a22a2019-02-27 11:48:42 +0900511 String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
Jian Li4a7ce672019-04-09 15:20:25 +0900512 int groupId = svcStr.hashCode();
Jian Li5e8a22a2019-02-27 11:48:42 +0900513
Jian Li4a7ce672019-04-09 15:20:25 +0900514 if (install) {
515
516 // add group table rules
517 k8sGroupRuleService.setRule(appId, deviceId, groupId,
Jian Li7b63fe62019-04-14 21:49:27 +0900518 SELECT, Lists.newArrayList(), true);
Jian Li4a7ce672019-04-09 15:20:25 +0900519
520 log.info("Adding group rule {}", groupId);
521
522 // if we failed to add group rule, we will not install flow rules
523 // as this might cause rule inconsistency
524 if (k8sGroupRuleService.hasGroup(deviceId, groupId)) {
525 // add flow rules for shifting IP domain
526 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
527 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
528 sp.getProtocol(), true);
529 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900530 } else {
Jian Li4a7ce672019-04-09 15:20:25 +0900531 // remove flow rules for shifting IP domain
532 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
533 PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
534 sp.getProtocol(), false);
535
536 // remove group table rules
537 k8sGroupRuleService.setRule(appId, deviceId, groupId,
Jian Li7b63fe62019-04-14 21:49:27 +0900538 SELECT, Lists.newArrayList(), false);
Jian Li4a7ce672019-04-09 15:20:25 +0900539
540 log.info("Removing group rule {}", groupId);
Jian Li5e8a22a2019-02-27 11:48:42 +0900541 }
Jian Li5e8a22a2019-02-27 11:48:42 +0900542 });
Jian Li004526d2019-02-25 16:26:27 +0900543 }
544
545 private void setShiftDomainRules(DeviceId deviceId, int installTable,
Jian Li5e8a22a2019-02-27 11:48:42 +0900546 int groupId, int priority, String serviceIp,
547 int servicePort, String protocol, boolean install) {
Jian Li44c2b122019-05-03 14:46:34 +0900548
549 if (serviceIp == null || NONE.equals(serviceIp)) {
550 return;
551 }
552
Jian Li5e8a22a2019-02-27 11:48:42 +0900553 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Li004526d2019-02-25 16:26:27 +0900554 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li5e8a22a2019-02-27 11:48:42 +0900555 .matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(serviceIp), HOST_CIDR_NUM));
556
557 if (TCP.equals(protocol)) {
558 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
559 .matchTcpDst(TpPort.tpPort(servicePort));
560 } else if (UDP.equals(protocol)) {
561 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
562 .matchUdpDst(TpPort.tpPort(servicePort));
563 }
Jian Li004526d2019-02-25 16:26:27 +0900564
Jian Li004526d2019-02-25 16:26:27 +0900565 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
Jian Li004526d2019-02-25 16:26:27 +0900566 .group(GroupId.valueOf(groupId))
567 .build();
568
569 k8sFlowRuleService.setRule(
570 appId,
571 deviceId,
Jian Li5e8a22a2019-02-27 11:48:42 +0900572 sBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900573 treatment,
574 priority,
575 installTable,
576 install);
577 }
578
579 private void setUnshiftDomainRules(DeviceId deviceId, int installTable,
Jian Li5e8a22a2019-02-27 11:48:42 +0900580 int priority, String serviceIp,
581 int servicePort, String protocol,
582 String podIp, int podPort, boolean install) {
583 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Li004526d2019-02-25 16:26:27 +0900584 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li5e8a22a2019-02-27 11:48:42 +0900585 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(podIp), HOST_CIDR_NUM));
586
587 if (TCP.equals(protocol)) {
588 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
589 .matchTcpSrc(TpPort.tpPort(podPort));
590 } else if (UDP.equals(protocol)) {
591 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
592 .matchUdpSrc(TpPort.tpPort(podPort));
593 }
594
Jian Li5e8a22a2019-02-27 11:48:42 +0900595 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li004526d2019-02-25 16:26:27 +0900596 .setIpSrc(IpAddress.valueOf(serviceIp))
Jian Li73d3b6a2019-07-08 18:07:53 +0900597 .transition(ACL_TABLE);
Jian Li5e8a22a2019-02-27 11:48:42 +0900598
599 if (TCP.equals(protocol)) {
600 tBuilder.setTcpSrc(TpPort.tpPort(servicePort));
601 } else if (UDP.equals(protocol)) {
602 tBuilder.setUdpSrc(TpPort.tpPort(servicePort));
603 }
Jian Li004526d2019-02-25 16:26:27 +0900604
605 k8sFlowRuleService.setRule(
606 appId,
607 deviceId,
Jian Li5e8a22a2019-02-27 11:48:42 +0900608 sBuilder.build(),
609 tBuilder.build(),
Jian Li004526d2019-02-25 16:26:27 +0900610 priority,
611 installTable,
612 install);
613 }
614
Jian Lif5da78a2019-04-15 01:52:23 +0900615 private void setCidrRoutingRule(IpPrefix prefix, MacAddress mac,
616 K8sNetwork network, boolean install) {
617 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
618 .matchEthType(Ethernet.TYPE_IPV4)
619 .matchIPSrc(prefix)
620 .matchIPDst(IpPrefix.valueOf(network.cidr()));
621
622 k8sNodeService.completeNodes().forEach(n -> {
623 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
624 .setTunnelId(Long.valueOf(network.segmentId()));
625
626 if (n.hostname().equals(network.name())) {
627 if (mac != null) {
628 tBuilder.setEthSrc(mac);
629 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900630 tBuilder.transition(STAT_EGRESS_TABLE);
Jian Lif5da78a2019-04-15 01:52:23 +0900631 } else {
632 PortNumber portNum = tunnelPortNumByNetId(network.networkId(),
633 k8sNetworkService, n);
634 K8sNode localNode = k8sNodeService.node(network.name());
635
636 tBuilder.extension(buildExtension(
637 deviceService,
638 n.intgBridge(),
639 localNode.dataIp().getIp4Address()),
640 n.intgBridge())
641 .setOutput(portNum);
642 }
643
644 k8sFlowRuleService.setRule(
645 appId,
646 n.intgBridge(),
647 sBuilder.build(),
648 tBuilder.build(),
649 PRIORITY_CIDR_RULE,
650 ROUTING_TABLE,
651 install
652 );
653 });
654 }
655
656 private void setupServiceDefaultRule(K8sNetwork k8sNetwork, boolean install) {
657 setCidrRoutingRule(IpPrefix.valueOf(serviceCidr),
658 MacAddress.valueOf(SERVICE_FAKE_MAC_STR), k8sNetwork, install);
659 }
660
Jian Li004526d2019-02-25 16:26:27 +0900661 private void setStatefulGroupFlowRules(DeviceId deviceId, long ctState,
662 long ctMask, Service service,
663 boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900664 List<GroupBucket> buckets = Lists.newArrayList();
665
666 String serviceName = service.getMetadata().getName();
667 String serviceIp = service.getSpec().getClusterIP();
668
669 // TODO: multi-ports case should be addressed
670 Integer servicePort = service.getSpec().getPorts().get(0).getPort();
Jian Libf562c22019-04-15 18:07:14 +0900671 String serviceProtocol = service.getSpec().getPorts().get(0).getProtocol();
672
673 String svcStr = servicePortStr(serviceIp, servicePort, serviceProtocol);
674 int groupId = svcStr.hashCode();
Jian Li2cc2b632019-02-18 00:56:40 +0900675
676 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
677 .stream()
678 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
679 .collect(Collectors.toList());
680
681 Map<String, String> nodeIpGatewayIpMap =
682 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
683
684 for (Endpoints endpoints : endpointses) {
685 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
686 List<EndpointPort> ports = endpointSubset.getPorts()
687 .stream()
688 .filter(p -> p.getProtocol().equals(TCP))
689 .collect(Collectors.toList());
690
691 for (EndpointAddress address : endpointSubset.getAddresses()) {
692 String podIp = nodeIpGatewayIpMap.containsKey(address.getIp()) ?
693 nodeIpGatewayIpMap.get(address.getIp()) : address.getIp();
694
695 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
696 niciraConnTrackTreatmentBuilder(driverService, deviceId)
697 .commit(true)
698 .natAction(true)
699 .natIp(IpAddress.valueOf(podIp))
700 .natFlag(CT_NAT_DST_FLAG);
701
702 ports.forEach(p -> {
703 ExtensionTreatment ctNatTreatment = connTreatmentBuilder
Jian Lieb488ea2019-04-16 01:50:02 +0900704 .natPortMin(TpPort.tpPort(p.getPort()))
705 .natPortMax(TpPort.tpPort(p.getPort()))
706 .build();
Jian Li2cc2b632019-02-18 00:56:40 +0900707 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
Jian Li73d3b6a2019-07-08 18:07:53 +0900708 deviceService.getDevice(deviceId), ACL_TABLE);
Jian Li2cc2b632019-02-18 00:56:40 +0900709 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
710 .extension(ctNatTreatment, deviceId)
711 .extension(resubmitTreatment, deviceId)
712 .build();
713 buckets.add(buildGroupBucket(treatment, SELECT, (short) -1));
714 });
715 }
716 }
717 }
718
719 if (!buckets.isEmpty()) {
720 k8sGroupRuleService.setRule(appId, deviceId, groupId, SELECT, buckets, install);
721
722 setTrackNew(deviceId, ctState, ctMask, IpAddress.valueOf(serviceIp),
723 TpPort.tpPort(servicePort), NAT_TABLE, groupId,
724 PRIORITY_CT_RULE, install);
725 }
726 }
727
728 private void setUntrack(DeviceId deviceId, long ctState, long ctMask,
729 String srcCidr, String dstCidr, int installTable,
730 int transitTable, int priority, boolean install) {
731 ExtensionSelector esCtSate = RulePopulatorUtil
732 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
733 TrafficSelector selector = DefaultTrafficSelector.builder()
734 .matchEthType(Ethernet.TYPE_IPV4)
735 .matchIPSrc(IpPrefix.valueOf(srcCidr))
736 .matchIPDst(IpPrefix.valueOf(dstCidr))
737 .extension(esCtSate, deviceId)
738 .build();
739
740 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
741 niciraConnTrackTreatmentBuilder(driverService, deviceId)
742 .natAction(false)
743 .commit(false)
744 .table((short) transitTable);
745
746 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
747 .extension(connTreatmentBuilder.build(), deviceId)
748 .build();
749
750 k8sFlowRuleService.setRule(
751 appId,
752 deviceId,
753 selector,
754 treatment,
755 priority,
756 installTable,
757 install);
758 }
759
760 private void setTrackNew(DeviceId deviceId, long ctState, long ctMask,
761 IpAddress dstIp, TpPort dstPort, int installTable,
762 int groupId, int priority, boolean install) {
763 ExtensionSelector esCtSate = RulePopulatorUtil
764 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
765 TrafficSelector selector = DefaultTrafficSelector.builder()
766 .matchEthType(Ethernet.TYPE_IPV4)
767 .matchIPDst(IpPrefix.valueOf(dstIp, HOST_CIDR_NUM))
768 .matchIPProtocol(IPv4.PROTOCOL_TCP)
769 .matchTcpDst(dstPort)
770 .extension(esCtSate, deviceId)
771 .build();
772 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
773 .group(GroupId.valueOf(groupId))
774 .build();
775
776 k8sFlowRuleService.setRule(
777 appId,
778 deviceId,
779 selector,
780 treatment,
781 priority,
782 installTable,
783 install);
784 }
785
786 private void setTrackEstablish(DeviceId deviceId, long ctState, long ctMask,
787 int installTable, int transitTable,
788 int priority, boolean install) {
789 ExtensionSelector esCtSate = RulePopulatorUtil
790 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
791 TrafficSelector selector = DefaultTrafficSelector.builder()
792 .extension(esCtSate, deviceId)
793 .build();
794
795 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
796 .transition(transitTable)
797 .build();
798
799 k8sFlowRuleService.setRule(
800 appId,
801 deviceId,
802 selector,
803 treatment,
804 priority,
805 installTable,
806 install);
807 }
808
Jian Libf562c22019-04-15 18:07:14 +0900809 private void setEndpointsRules(Endpoints endpoints, boolean install) {
810 String appName = endpoints.getMetadata().getName();
811 Service service = k8sServiceService.services().stream().filter(s ->
812 appName.equals(s.getMetadata().getName()))
813 .findFirst().orElse(null);
Jian Li7b63fe62019-04-14 21:49:27 +0900814
Jian Libf562c22019-04-15 18:07:14 +0900815 if (service == null) {
816 return;
Jian Li7b63fe62019-04-14 21:49:27 +0900817 }
Jian Libf562c22019-04-15 18:07:14 +0900818
819 setGroupBuckets(service, install);
Jian Li7b63fe62019-04-14 21:49:27 +0900820 }
821
Jian Lif5da78a2019-04-15 01:52:23 +0900822 private String servicePortStr(String ip, int port, String protocol) {
823 return ip + "_" + port + "_" + protocol;
824 }
825
Jian Li004526d2019-02-25 16:26:27 +0900826 /**
827 * Extracts properties from the component configuration context.
828 *
829 * @param context the component context
830 */
831 private void readComponentConfiguration(ComponentContext context) {
832 Dictionary<?, ?> properties = context.getProperties();
833
834 String updatedNatMode = Tools.get(properties, SERVICE_IP_NAT_MODE);
835 serviceIpNatMode = updatedNatMode != null ? updatedNatMode : SERVICE_IP_NAT_MODE_DEFAULT;
836 log.info("Configured. Service IP NAT mode is {}", serviceIpNatMode);
Jian Lif5da78a2019-04-15 01:52:23 +0900837
838 String updatedServiceCidr = Tools.get(properties, SERVICE_CIDR);
839 serviceCidr = updatedServiceCidr != null ?
840 updatedServiceCidr : SERVICE_IP_CIDR_DEFAULT;
841 log.info("Configured. Service VIP range is {}", serviceCidr);
Jian Li004526d2019-02-25 16:26:27 +0900842 }
843
Jian Li4a7ce672019-04-09 15:20:25 +0900844 private void setServiceNatRules(DeviceId deviceId, boolean install) {
845 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
846 setStatefulServiceNatRules(deviceId, install);
847 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
848 setStatelessServiceNatRules(deviceId, install);
849 } else {
850 log.warn("Service IP NAT mode was not configured!");
851 }
852 }
853
Jian Li2cc2b632019-02-18 00:56:40 +0900854 private class InternalK8sServiceListener implements K8sServiceListener {
855
856 private boolean isRelevantHelper() {
857 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
858 }
859
860 @Override
861 public void event(K8sServiceEvent event) {
862 switch (event.type()) {
863 case K8S_SERVICE_CREATED:
Jian Li4a7ce672019-04-09 15:20:25 +0900864 case K8S_SERVICE_UPDATED:
Jian Li2cc2b632019-02-18 00:56:40 +0900865 eventExecutor.execute(() -> processServiceCreation(event.subject()));
866 break;
867 case K8S_SERVICE_REMOVED:
868 eventExecutor.execute(() -> processServiceRemoval(event.subject()));
869 break;
870 default:
871 // do nothing
872 break;
873 }
874 }
875
876 private void processServiceCreation(Service service) {
877 if (!isRelevantHelper()) {
878 return;
879 }
880
Jian Li5e8a22a2019-02-27 11:48:42 +0900881 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
882 long ctTrackNew = computeCtStateFlag(true, true, false);
883 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
Jian Li2cc2b632019-02-18 00:56:40 +0900884
Jian Li5e8a22a2019-02-27 11:48:42 +0900885 k8sNodeService.completeNodes().forEach(n ->
886 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
887 ctMaskTrackNew, service, true));
888 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
889 k8sNodeService.completeNodes().forEach(n ->
890 setStatelessGroupFlowRules(n.intgBridge(), service, true));
891 }
Jian Li2cc2b632019-02-18 00:56:40 +0900892 }
893
894 private void processServiceRemoval(Service service) {
895 if (!isRelevantHelper()) {
896 return;
897 }
898
Jian Li5e8a22a2019-02-27 11:48:42 +0900899 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
900 long ctTrackNew = computeCtStateFlag(true, true, false);
901 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
Jian Li2cc2b632019-02-18 00:56:40 +0900902
Jian Li5e8a22a2019-02-27 11:48:42 +0900903 k8sNodeService.completeNodes().forEach(n ->
904 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
905 ctMaskTrackNew, service, false));
906 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
907 k8sNodeService.completeNodes().forEach(n ->
908 setStatelessGroupFlowRules(n.intgBridge(), service, false));
909 }
Jian Li004526d2019-02-25 16:26:27 +0900910 }
Jian Li2cc2b632019-02-18 00:56:40 +0900911 }
912
Jian Li7b63fe62019-04-14 21:49:27 +0900913 private class InternalK8sEndpointsListener implements K8sEndpointsListener {
914
915 private boolean isRelevantHelper() {
916 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
917 }
918
919 @Override
920 public void event(K8sEndpointsEvent event) {
921 Endpoints endpoints = event.subject();
922
923 switch (event.type()) {
924 case K8S_ENDPOINTS_CREATED:
Jian Libf562c22019-04-15 18:07:14 +0900925 case K8S_ENDPOINTS_UPDATED:
Jian Li7b63fe62019-04-14 21:49:27 +0900926 eventExecutor.execute(() -> processEndpointsCreation(endpoints));
927 break;
928 case K8S_ENDPOINTS_REMOVED:
929 eventExecutor.execute(() -> processEndpointsRemoval(endpoints));
930 break;
931 default:
932 break;
933 }
934 }
935
936 private void processEndpointsCreation(Endpoints endpoints) {
937 if (!isRelevantHelper()) {
938 return;
939 }
940
Jian Libf562c22019-04-15 18:07:14 +0900941 setEndpointsRules(endpoints, true);
Jian Li7b63fe62019-04-14 21:49:27 +0900942 }
943
944 private void processEndpointsRemoval(Endpoints endpoints) {
945 if (!isRelevantHelper()) {
946 return;
947 }
948
Jian Libf562c22019-04-15 18:07:14 +0900949 setEndpointsRules(endpoints, false);
Jian Li4a7ce672019-04-09 15:20:25 +0900950 }
951 }
952
Jian Li2cc2b632019-02-18 00:56:40 +0900953 private class InternalNodeEventListener implements K8sNodeListener {
954
955 private boolean isRelevantHelper() {
956 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
957 }
958
959 @Override
960 public void event(K8sNodeEvent event) {
961 K8sNode k8sNode = event.subject();
962 switch (event.type()) {
963 case K8S_NODE_COMPLETE:
964 eventExecutor.execute(() -> processNodeCompletion(k8sNode));
965 break;
966 case K8S_NODE_INCOMPLETE:
Jian Li4a7ce672019-04-09 15:20:25 +0900967 case K8S_NODE_REMOVED:
Jian Li2cc2b632019-02-18 00:56:40 +0900968 default:
969 break;
970 }
971 }
972
973 private void processNodeCompletion(K8sNode node) {
974 if (!isRelevantHelper()) {
975 return;
976 }
977
Jian Li4a7ce672019-04-09 15:20:25 +0900978 setServiceNatRules(node.intgBridge(), true);
Jian Libf562c22019-04-15 18:07:14 +0900979 k8sEndpointsService.endpointses().forEach(e -> setEndpointsRules(e, true));
Jian Lif5da78a2019-04-15 01:52:23 +0900980 k8sNetworkService.networks().forEach(n -> setupServiceDefaultRule(n, true));
981 }
982 }
983
984 private class InternalK8sNetworkListener implements K8sNetworkListener {
985
986 private boolean isRelevantHelper() {
987 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
988 }
989
990 @Override
991 public void event(K8sNetworkEvent event) {
992 switch (event.type()) {
993 case K8S_NETWORK_CREATED:
994 eventExecutor.execute(() -> processNetworkCreation(event.subject()));
995 break;
996 case K8S_NETWORK_UPDATED:
997 case K8S_NETWORK_REMOVED:
998 default:
999 break;
1000 }
1001 }
1002
1003 private void processNetworkCreation(K8sNetwork network) {
1004 if (!isRelevantHelper()) {
1005 return;
1006 }
1007
1008 setupServiceDefaultRule(network, true);
Jian Li2cc2b632019-02-18 00:56:40 +09001009 }
1010 }
1011}