blob: 555a144924ab8b72d6eb3a6f94fc789ab9ad8107 [file] [log] [blame]
Jian Li2cc2b632019-02-18 00:56:40 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.Lists;
Jian Li004526d2019-02-25 16:26:27 +090019import com.google.common.collect.Maps;
20import com.google.common.collect.Sets;
Jian Li2cc2b632019-02-18 00:56:40 +090021import io.fabric8.kubernetes.api.model.EndpointAddress;
22import io.fabric8.kubernetes.api.model.EndpointPort;
23import io.fabric8.kubernetes.api.model.EndpointSubset;
24import io.fabric8.kubernetes.api.model.Endpoints;
25import io.fabric8.kubernetes.api.model.Service;
26import org.onlab.packet.Ethernet;
27import org.onlab.packet.IPv4;
28import org.onlab.packet.IpAddress;
29import org.onlab.packet.IpPrefix;
30import org.onlab.packet.TpPort;
Jian Li004526d2019-02-25 16:26:27 +090031import org.onlab.util.Tools;
32import org.onosproject.cfg.ComponentConfigService;
Jian Li2cc2b632019-02-18 00:56:40 +090033import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.LeadershipService;
35import org.onosproject.cluster.NodeId;
36import org.onosproject.core.ApplicationId;
37import org.onosproject.core.CoreService;
38import org.onosproject.core.GroupId;
39import org.onosproject.k8snetworking.api.K8sEndpointsService;
40import org.onosproject.k8snetworking.api.K8sFlowRuleService;
41import org.onosproject.k8snetworking.api.K8sGroupRuleService;
42import org.onosproject.k8snetworking.api.K8sNetworkService;
Jian Li004526d2019-02-25 16:26:27 +090043import org.onosproject.k8snetworking.api.K8sPodService;
Jian Li2cc2b632019-02-18 00:56:40 +090044import org.onosproject.k8snetworking.api.K8sServiceEvent;
45import org.onosproject.k8snetworking.api.K8sServiceListener;
46import org.onosproject.k8snetworking.api.K8sServiceService;
47import org.onosproject.k8snetworking.util.RulePopulatorUtil;
48import org.onosproject.k8snetworking.util.RulePopulatorUtil.NiciraConnTrackTreatmentBuilder;
49import org.onosproject.k8snode.api.K8sNode;
50import org.onosproject.k8snode.api.K8sNodeEvent;
51import org.onosproject.k8snode.api.K8sNodeListener;
52import org.onosproject.k8snode.api.K8sNodeService;
53import org.onosproject.net.DeviceId;
54import org.onosproject.net.device.DeviceService;
55import org.onosproject.net.driver.DriverService;
56import org.onosproject.net.flow.DefaultTrafficSelector;
57import org.onosproject.net.flow.DefaultTrafficTreatment;
58import org.onosproject.net.flow.TrafficSelector;
59import org.onosproject.net.flow.TrafficTreatment;
60import org.onosproject.net.flow.criteria.ExtensionSelector;
61import org.onosproject.net.flow.instructions.ExtensionTreatment;
62import org.onosproject.net.group.GroupBucket;
63import org.onosproject.store.service.AtomicCounter;
64import org.onosproject.store.service.StorageService;
Jian Li004526d2019-02-25 16:26:27 +090065import org.osgi.service.component.ComponentContext;
Jian Li2cc2b632019-02-18 00:56:40 +090066import org.osgi.service.component.annotations.Activate;
67import org.osgi.service.component.annotations.Component;
68import org.osgi.service.component.annotations.Deactivate;
Jian Li004526d2019-02-25 16:26:27 +090069import org.osgi.service.component.annotations.Modified;
Jian Li2cc2b632019-02-18 00:56:40 +090070import org.osgi.service.component.annotations.Reference;
71import org.osgi.service.component.annotations.ReferenceCardinality;
72import org.slf4j.Logger;
73
Jian Li004526d2019-02-25 16:26:27 +090074import java.util.Dictionary;
Jian Li2cc2b632019-02-18 00:56:40 +090075import java.util.List;
76import java.util.Map;
77import java.util.Objects;
Jian Li004526d2019-02-25 16:26:27 +090078import java.util.Set;
Jian Li2cc2b632019-02-18 00:56:40 +090079import java.util.concurrent.ExecutorService;
80import java.util.stream.Collectors;
81
82import static java.util.concurrent.Executors.newSingleThreadExecutor;
83import static org.onlab.util.Tools.groupedThreads;
84import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE;
85import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Li004526d2019-02-25 16:26:27 +090086import static org.onosproject.k8snetworking.api.Constants.NAT_STATEFUL;
87import static org.onosproject.k8snetworking.api.Constants.NAT_STATELESS;
Jian Li2cc2b632019-02-18 00:56:40 +090088import static org.onosproject.k8snetworking.api.Constants.NAT_TABLE;
Jian Li004526d2019-02-25 16:26:27 +090089import static org.onosproject.k8snetworking.api.Constants.POD_TABLE;
Jian Li2cc2b632019-02-18 00:56:40 +090090import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE;
Jian Li004526d2019-02-25 16:26:27 +090091import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAT_RULE;
Jian Li2cc2b632019-02-18 00:56:40 +090092import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
Jian Li004526d2019-02-25 16:26:27 +090093import static org.onosproject.k8snetworking.api.Constants.SERVICE_IP_CIDR;
94import static org.onosproject.k8snetworking.api.Constants.SERVICE_TABLE;
95import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_CIDR;
96import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
97import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE;
98import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE_DEFAULT;
Jian Li2cc2b632019-02-18 00:56:40 +090099import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap;
100import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_DST_FLAG;
101import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildGroupBucket;
Jian Li004526d2019-02-25 16:26:27 +0900102import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildLoadExtension;
Jian Li2cc2b632019-02-18 00:56:40 +0900103import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildResubmitExtension;
104import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtMaskFlag;
105import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtStateFlag;
106import static org.onosproject.k8snetworking.util.RulePopulatorUtil.niciraConnTrackTreatmentBuilder;
107import static org.onosproject.net.group.GroupDescription.Type.SELECT;
108import static org.slf4j.LoggerFactory.getLogger;
109
110/**
111 * Handles the service IP to pod IP related translation traffic.
112 */
Jian Li004526d2019-02-25 16:26:27 +0900113@Component(
114 immediate = true,
115 property = {
116 SERVICE_IP_NAT_MODE + "=" + SERVICE_IP_NAT_MODE_DEFAULT
117 }
118)
Jian Li2cc2b632019-02-18 00:56:40 +0900119public class K8sServiceHandler {
120
121 private final Logger log = getLogger(getClass());
122
Jian Li2cc2b632019-02-18 00:56:40 +0900123 private static final int HOST_CIDR_NUM = 32;
124
125 private static final String NONE = "None";
126 private static final String CLUSTER_IP = "ClusterIP";
127 private static final String TCP = "TCP";
128
129 private static final String GROUP_ID_COUNTER_NAME = "group-id-counter";
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY)
132 protected CoreService coreService;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY)
135 protected LeadershipService leadershipService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
138 protected ClusterService clusterService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY)
141 protected DriverService driverService;
142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY)
144 protected DeviceService deviceService;
145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li004526d2019-02-25 16:26:27 +0900147 protected ComponentConfigService configService;
148
149 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li2cc2b632019-02-18 00:56:40 +0900150 protected StorageService storageService;
151
152 @Reference(cardinality = ReferenceCardinality.MANDATORY)
153 protected K8sNetworkService k8sNetworkService;
154
155 @Reference(cardinality = ReferenceCardinality.MANDATORY)
156 protected K8sFlowRuleService k8sFlowRuleService;
157
158 @Reference(cardinality = ReferenceCardinality.MANDATORY)
159 protected K8sGroupRuleService k8sGroupRuleService;
160
161 @Reference(cardinality = ReferenceCardinality.MANDATORY)
162 protected K8sNodeService k8sNodeService;
163
164 @Reference(cardinality = ReferenceCardinality.MANDATORY)
165 protected K8sEndpointsService k8sEndpointsService;
166
167 @Reference(cardinality = ReferenceCardinality.MANDATORY)
168 protected K8sServiceService k8sServiceService;
169
Jian Li004526d2019-02-25 16:26:27 +0900170 @Reference(cardinality = ReferenceCardinality.MANDATORY)
171 protected K8sPodService k8sPodService;
172
173 /** Service IP address translation mode. */
174 private String serviceIpNatMode = SERVICE_IP_NAT_MODE_DEFAULT;
175
Jian Li2cc2b632019-02-18 00:56:40 +0900176 private final ExecutorService eventExecutor = newSingleThreadExecutor(
177 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
178 private final InternalNodeEventListener internalNodeEventListener =
179 new InternalNodeEventListener();
180 private final InternalK8sServiceListener internalK8sServiceListener =
181 new InternalK8sServiceListener();
182
183 private AtomicCounter groupIdCounter;
184
185 private ApplicationId appId;
186 private NodeId localNodeId;
187
188 @Activate
189 protected void activate() {
190 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
Jian Li004526d2019-02-25 16:26:27 +0900191 configService.registerProperties(getClass());
Jian Li2cc2b632019-02-18 00:56:40 +0900192 localNodeId = clusterService.getLocalNode().id();
193 leadershipService.runForLeadership(appId.name());
194 k8sNodeService.addListener(internalNodeEventListener);
195 k8sServiceService.addListener(internalK8sServiceListener);
196
197 groupIdCounter = storageService.getAtomicCounter(GROUP_ID_COUNTER_NAME);
198
199 log.info("Started");
200 }
201
202 @Deactivate
203 protected void deactivate() {
204 leadershipService.withdraw(appId.name());
205 k8sNodeService.removeListener(internalNodeEventListener);
206 k8sServiceService.removeListener(internalK8sServiceListener);
Jian Li004526d2019-02-25 16:26:27 +0900207 configService.unregisterProperties(getClass(), false);
Jian Li2cc2b632019-02-18 00:56:40 +0900208 eventExecutor.shutdown();
209
210 log.info("Stopped");
211 }
212
Jian Li004526d2019-02-25 16:26:27 +0900213 @Modified
214 void modified(ComponentContext context) {
215 readComponentConfiguration(context);
216
217 log.info("Modified");
218 }
219
220 private void setStatefulServiceNatRules(DeviceId deviceId, boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900221 // -trk CT rules
222 long ctUntrack = computeCtStateFlag(false, false, false);
223 long ctMaskUntrack = computeCtMaskFlag(true, false, false);
224
225 k8sNetworkService.networks().forEach(n -> {
226 // TODO: need to provide a way to add multiple service IP CIDR ranges
227 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), SERVICE_IP_CIDR,
228 JUMP_TABLE, NAT_TABLE, PRIORITY_CT_RULE, install);
229 setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), n.cidr(),
230 JUMP_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
231 });
232
233 // +trk-new CT rules
234 long ctTrackUnnew = computeCtStateFlag(true, false, false);
235 long ctMaskTrackUnnew = computeCtMaskFlag(true, true, false);
236
237 setTrackEstablish(deviceId, ctTrackUnnew, ctMaskTrackUnnew,
238 NAT_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
239
240 // +trk+new CT rules
241 long ctTrackNew = computeCtStateFlag(true, true, false);
242 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
243
244 k8sServiceService.services().stream()
245 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
Jian Li004526d2019-02-25 16:26:27 +0900246 .forEach(s -> setStatefulGroupFlowRules(deviceId, ctTrackNew,
247 ctMaskTrackNew, s, install));
Jian Li2cc2b632019-02-18 00:56:40 +0900248 }
249
Jian Li004526d2019-02-25 16:26:27 +0900250 private void setStatelessServiceNatRules(DeviceId deviceId, boolean install) {
251
252 k8sNetworkService.networks().forEach(n -> {
253 setSrcDstCidrRules(deviceId, n.cidr(), SERVICE_IP_CIDR, JUMP_TABLE,
254 SERVICE_TABLE, PRIORITY_CT_RULE, install);
255 setSrcDstCidrRules(deviceId, n.cidr(), SHIFTED_IP_CIDR, JUMP_TABLE,
256 POD_TABLE, PRIORITY_CT_RULE, install);
257 setSrcDstCidrRules(deviceId, n.cidr(), n.cidr(), JUMP_TABLE,
258 ROUTING_TABLE, PRIORITY_CT_RULE, install);
259 });
260
261 // setup load balancing rules using group table
262 k8sServiceService.services().stream()
263 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
264 .forEach(s -> setStatelessGroupFlowRules(deviceId, s, install));
265 }
266
267 private void setSrcDstCidrRules(DeviceId deviceId, String srcCidr,
268 String dstCidr, int installTable,
269 int transitTable, int priority, boolean install) {
270 TrafficSelector selector = DefaultTrafficSelector.builder()
271 .matchEthType(Ethernet.TYPE_IPV4)
272 .matchIPSrc(IpPrefix.valueOf(srcCidr))
273 .matchIPDst(IpPrefix.valueOf(dstCidr))
274 .build();
275
276 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
277 .transition(transitTable)
278 .build();
279
280 k8sFlowRuleService.setRule(
281 appId,
282 deviceId,
283 selector,
284 treatment,
285 priority,
286 installTable,
287 install);
288 }
289
290 private void setStatelessGroupFlowRules(DeviceId deviceId,
291 Service service, boolean install) {
292 int groupId = (int) groupIdCounter.incrementAndGet();
293
294 List<GroupBucket> buckets = Lists.newArrayList();
295
296 String serviceName = service.getMetadata().getName();
297
298 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
299 .stream()
300 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
301 .collect(Collectors.toList());
302
303 Map<String, String> nodeIpGatewayIpMap =
304 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
305
306 Map<String, Set<Integer>> podIpPorts = Maps.newConcurrentMap();
307
308 for (Endpoints endpoints : endpointses) {
309 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
310 List<EndpointPort> ports = endpointSubset.getPorts()
311 .stream()
312 .filter(p -> p.getProtocol().equals(TCP))
313 .collect(Collectors.toList());
314
315 for (EndpointAddress address : endpointSubset.getAddresses()) {
316 String podIp = nodeIpGatewayIpMap.containsKey(address.getIp()) ?
317 nodeIpGatewayIpMap.get(address.getIp()) : address.getIp();
318
319 ports.forEach(p -> {
320 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
321 deviceService.getDevice(deviceId), ROUTING_TABLE);
322 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
323 .setIpDst(IpAddress.valueOf(podIp))
324 .setTcpDst(TpPort.tpPort(p.getPort()))
325 .extension(resubmitTreatment, deviceId)
326 .build();
327 buckets.add(buildGroupBucket(treatment, SELECT, (short) -1));
328
329 Set<Integer> existPorts = podIpPorts.get(podIp);
330 if (existPorts == null || existPorts.isEmpty()) {
331 existPorts = Sets.newConcurrentHashSet();
332 }
333 existPorts.add(p.getPort());
334 podIpPorts.put(podIp, existPorts);
335 });
336 }
337 }
338 }
339
340 if (!buckets.isEmpty()) {
341 k8sGroupRuleService.setRule(appId, deviceId, groupId, SELECT, buckets, install);
342 setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
343 PRIORITY_NAT_RULE, service, install);
344
345 podIpPorts.forEach((k, v) ->
346 v.forEach(p -> setUnshiftDomainRules(deviceId, POD_TABLE,
347 PRIORITY_NAT_RULE, service, k, p, install)));
348 }
349 }
350
351 private void setShiftDomainRules(DeviceId deviceId, int installTable,
352 int groupId, int priority,
353 Service service, boolean install) {
354 String serviceIp = service.getSpec().getClusterIP();
355 // TODO: multi-ports case should be addressed
356 Integer servicePort = service.getSpec().getPorts().get(0).getPort();
357
358 TrafficSelector selector = DefaultTrafficSelector.builder()
359 .matchEthType(Ethernet.TYPE_IPV4)
360 .matchIPProtocol(IPv4.PROTOCOL_TCP)
361 .matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(serviceIp), 32))
362 .matchTcpDst(TpPort.tpPort(servicePort))
363 .build();
364
365 ExtensionTreatment loadTreatment = buildLoadExtension(
366 deviceService.getDevice(deviceId), "src", SHIFTED_IP_PREFIX);
367
368 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
369 .extension(loadTreatment, deviceId)
370 .group(GroupId.valueOf(groupId))
371 .build();
372
373 k8sFlowRuleService.setRule(
374 appId,
375 deviceId,
376 selector,
377 treatment,
378 priority,
379 installTable,
380 install);
381 }
382
383 private void setUnshiftDomainRules(DeviceId deviceId, int installTable,
384 int priority, Service service, String podIp,
385 int podPort, boolean install) {
386 String serviceIp = service.getSpec().getClusterIP();
387 // TODO: multi-ports case should be addressed
388 Integer servicePort = service.getSpec().getPorts().get(0).getPort();
389
390 TrafficSelector selector = DefaultTrafficSelector.builder()
391 .matchEthType(Ethernet.TYPE_IPV4)
392 .matchIPProtocol(IPv4.PROTOCOL_TCP)
393 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(podIp), 32))
394 .matchTcpSrc(TpPort.tpPort(podPort))
395 .build();
396
397 ExtensionTreatment loadTreatment = buildLoadExtension(
398 deviceService.getDevice(deviceId), "dst", "10.10");
399
400 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
401 .extension(loadTreatment, deviceId)
402 .setIpSrc(IpAddress.valueOf(serviceIp))
403 .setTcpSrc(TpPort.tpPort(servicePort))
404 .transition(ROUTING_TABLE)
405 .build();
406
407 k8sFlowRuleService.setRule(
408 appId,
409 deviceId,
410 selector,
411 treatment,
412 priority,
413 installTable,
414 install);
415 }
416
417 private void setStatefulGroupFlowRules(DeviceId deviceId, long ctState,
418 long ctMask, Service service,
419 boolean install) {
Jian Li2cc2b632019-02-18 00:56:40 +0900420 int groupId = (int) groupIdCounter.incrementAndGet();
421
422 List<GroupBucket> buckets = Lists.newArrayList();
423
424 String serviceName = service.getMetadata().getName();
425 String serviceIp = service.getSpec().getClusterIP();
426
427 // TODO: multi-ports case should be addressed
428 Integer servicePort = service.getSpec().getPorts().get(0).getPort();
429
430 List<Endpoints> endpointses = k8sEndpointsService.endpointses()
431 .stream()
432 .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
433 .collect(Collectors.toList());
434
435 Map<String, String> nodeIpGatewayIpMap =
436 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
437
438 for (Endpoints endpoints : endpointses) {
439 for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
440 List<EndpointPort> ports = endpointSubset.getPorts()
441 .stream()
442 .filter(p -> p.getProtocol().equals(TCP))
443 .collect(Collectors.toList());
444
445 for (EndpointAddress address : endpointSubset.getAddresses()) {
446 String podIp = nodeIpGatewayIpMap.containsKey(address.getIp()) ?
447 nodeIpGatewayIpMap.get(address.getIp()) : address.getIp();
448
449 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
450 niciraConnTrackTreatmentBuilder(driverService, deviceId)
451 .commit(true)
452 .natAction(true)
453 .natIp(IpAddress.valueOf(podIp))
454 .natFlag(CT_NAT_DST_FLAG);
455
456 ports.forEach(p -> {
457 ExtensionTreatment ctNatTreatment = connTreatmentBuilder
458 .natPort(TpPort.tpPort(p.getPort())).build();
459 ExtensionTreatment resubmitTreatment = buildResubmitExtension(
460 deviceService.getDevice(deviceId), ROUTING_TABLE);
461 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
462 .extension(ctNatTreatment, deviceId)
463 .extension(resubmitTreatment, deviceId)
464 .build();
465 buckets.add(buildGroupBucket(treatment, SELECT, (short) -1));
466 });
467 }
468 }
469 }
470
471 if (!buckets.isEmpty()) {
472 k8sGroupRuleService.setRule(appId, deviceId, groupId, SELECT, buckets, install);
473
474 setTrackNew(deviceId, ctState, ctMask, IpAddress.valueOf(serviceIp),
475 TpPort.tpPort(servicePort), NAT_TABLE, groupId,
476 PRIORITY_CT_RULE, install);
477 }
478 }
479
480 private void setUntrack(DeviceId deviceId, long ctState, long ctMask,
481 String srcCidr, String dstCidr, int installTable,
482 int transitTable, int priority, boolean install) {
483 ExtensionSelector esCtSate = RulePopulatorUtil
484 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
485 TrafficSelector selector = DefaultTrafficSelector.builder()
486 .matchEthType(Ethernet.TYPE_IPV4)
487 .matchIPSrc(IpPrefix.valueOf(srcCidr))
488 .matchIPDst(IpPrefix.valueOf(dstCidr))
489 .extension(esCtSate, deviceId)
490 .build();
491
492 NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
493 niciraConnTrackTreatmentBuilder(driverService, deviceId)
494 .natAction(false)
495 .commit(false)
496 .table((short) transitTable);
497
498 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
499 .extension(connTreatmentBuilder.build(), deviceId)
500 .build();
501
502 k8sFlowRuleService.setRule(
503 appId,
504 deviceId,
505 selector,
506 treatment,
507 priority,
508 installTable,
509 install);
510 }
511
512 private void setTrackNew(DeviceId deviceId, long ctState, long ctMask,
513 IpAddress dstIp, TpPort dstPort, int installTable,
514 int groupId, int priority, boolean install) {
515 ExtensionSelector esCtSate = RulePopulatorUtil
516 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
517 TrafficSelector selector = DefaultTrafficSelector.builder()
518 .matchEthType(Ethernet.TYPE_IPV4)
519 .matchIPDst(IpPrefix.valueOf(dstIp, HOST_CIDR_NUM))
520 .matchIPProtocol(IPv4.PROTOCOL_TCP)
521 .matchTcpDst(dstPort)
522 .extension(esCtSate, deviceId)
523 .build();
524 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
525 .group(GroupId.valueOf(groupId))
526 .build();
527
528 k8sFlowRuleService.setRule(
529 appId,
530 deviceId,
531 selector,
532 treatment,
533 priority,
534 installTable,
535 install);
536 }
537
538 private void setTrackEstablish(DeviceId deviceId, long ctState, long ctMask,
539 int installTable, int transitTable,
540 int priority, boolean install) {
541 ExtensionSelector esCtSate = RulePopulatorUtil
542 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
543 TrafficSelector selector = DefaultTrafficSelector.builder()
544 .extension(esCtSate, deviceId)
545 .build();
546
547 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
548 .transition(transitTable)
549 .build();
550
551 k8sFlowRuleService.setRule(
552 appId,
553 deviceId,
554 selector,
555 treatment,
556 priority,
557 installTable,
558 install);
559 }
560
Jian Li004526d2019-02-25 16:26:27 +0900561 /**
562 * Extracts properties from the component configuration context.
563 *
564 * @param context the component context
565 */
566 private void readComponentConfiguration(ComponentContext context) {
567 Dictionary<?, ?> properties = context.getProperties();
568
569 String updatedNatMode = Tools.get(properties, SERVICE_IP_NAT_MODE);
570 serviceIpNatMode = updatedNatMode != null ? updatedNatMode : SERVICE_IP_NAT_MODE_DEFAULT;
571 log.info("Configured. Service IP NAT mode is {}", serviceIpNatMode);
572 }
573
Jian Li2cc2b632019-02-18 00:56:40 +0900574 private class InternalK8sServiceListener implements K8sServiceListener {
575
576 private boolean isRelevantHelper() {
577 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
578 }
579
580 @Override
581 public void event(K8sServiceEvent event) {
582 switch (event.type()) {
583 case K8S_SERVICE_CREATED:
584 eventExecutor.execute(() -> processServiceCreation(event.subject()));
585 break;
586 case K8S_SERVICE_REMOVED:
587 eventExecutor.execute(() -> processServiceRemoval(event.subject()));
588 break;
589 default:
590 // do nothing
591 break;
592 }
593 }
594
595 private void processServiceCreation(Service service) {
596 if (!isRelevantHelper()) {
597 return;
598 }
599
600 long ctTrackNew = computeCtStateFlag(true, true, false);
601 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
602
Jian Li004526d2019-02-25 16:26:27 +0900603 k8sNodeService.completeNodes().forEach(n -> {
604 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
605 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
606 ctMaskTrackNew, service, true);
607 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
608 setStatelessGroupFlowRules(n.intgBridge(), service, true);
609 }
610 });
Jian Li2cc2b632019-02-18 00:56:40 +0900611 }
612
613 private void processServiceRemoval(Service service) {
614 if (!isRelevantHelper()) {
615 return;
616 }
617
618 long ctTrackNew = computeCtStateFlag(true, true, false);
619 long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
620
Jian Li004526d2019-02-25 16:26:27 +0900621 k8sNodeService.completeNodes().forEach(n -> {
622 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
623 setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
624 ctMaskTrackNew, service, false);
625 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
626 setStatelessGroupFlowRules(n.intgBridge(), service, false);
627 }
628 });
629 }
Jian Li2cc2b632019-02-18 00:56:40 +0900630 }
631
632 private class InternalNodeEventListener implements K8sNodeListener {
633
634 private boolean isRelevantHelper() {
635 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
636 }
637
638 @Override
639 public void event(K8sNodeEvent event) {
640 K8sNode k8sNode = event.subject();
641 switch (event.type()) {
642 case K8S_NODE_COMPLETE:
643 eventExecutor.execute(() -> processNodeCompletion(k8sNode));
644 break;
645 case K8S_NODE_INCOMPLETE:
646 eventExecutor.execute(() -> processNodeIncompletion(k8sNode));
647 break;
648 default:
649 break;
650 }
651 }
652
653 private void processNodeCompletion(K8sNode node) {
654 if (!isRelevantHelper()) {
655 return;
656 }
657
Jian Li004526d2019-02-25 16:26:27 +0900658 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
659 setStatefulServiceNatRules(node.intgBridge(), true);
660 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
661 setStatelessServiceNatRules(node.intgBridge(), true);
662 } else {
663 log.warn("Service IP NAT mode was not configured!");
664 }
665
Jian Li2cc2b632019-02-18 00:56:40 +0900666 }
667
668 private void processNodeIncompletion(K8sNode node) {
669 if (!isRelevantHelper()) {
670 return;
671 }
672
Jian Li004526d2019-02-25 16:26:27 +0900673 if (NAT_STATEFUL.equals(serviceIpNatMode)) {
674 setStatefulServiceNatRules(node.intgBridge(), false);
675 } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
676 setStatelessServiceNatRules(node.intgBridge(), false);
677 } else {
678 log.warn("Service IP NAT mode was not configured!");
679 }
680
Jian Li2cc2b632019-02-18 00:56:40 +0900681 }
682 }
683}