blob: c7754fbf16e9200a1b40026911d222ae23032e87 [file] [log] [blame]
Jian Li73d3b6a2019-07-08 18:07:53 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
Jian Li0f459612019-07-11 10:38:02 +090018import com.google.common.collect.ImmutableSet;
Jian Li73d3b6a2019-07-08 18:07:53 +090019import com.google.common.collect.Maps;
Jian Li0f459612019-07-11 10:38:02 +090020import com.google.common.collect.Sets;
Jian Lie1a5b8f2019-07-23 17:13:19 +090021import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
22import io.fabric8.kubernetes.api.model.Namespace;
Jian Li73d3b6a2019-07-08 18:07:53 +090023import io.fabric8.kubernetes.api.model.Pod;
Jian Lie1a5b8f2019-07-23 17:13:19 +090024import io.fabric8.kubernetes.api.model.Service;
Jian Li73d3b6a2019-07-08 18:07:53 +090025import io.fabric8.kubernetes.api.model.networking.NetworkPolicy;
Jian Lib7dfb5b2019-07-15 17:37:12 +090026import io.fabric8.kubernetes.api.model.networking.NetworkPolicyEgressRule;
27import io.fabric8.kubernetes.api.model.networking.NetworkPolicyIngressRule;
Jian Lie1a5b8f2019-07-23 17:13:19 +090028import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPeer;
Jian Li73d3b6a2019-07-08 18:07:53 +090029import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPort;
Jian Li73d3b6a2019-07-08 18:07:53 +090030import org.onlab.packet.IPv4;
31import org.onlab.packet.IpAddress;
32import org.onlab.packet.IpPrefix;
33import org.onlab.packet.TpPort;
34import org.onosproject.cfg.ComponentConfigService;
35import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.LeadershipService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
40import org.onosproject.k8snetworking.api.K8sFlowRuleService;
Jian Lie1a5b8f2019-07-23 17:13:19 +090041import org.onosproject.k8snetworking.api.K8sNamespaceEvent;
42import org.onosproject.k8snetworking.api.K8sNamespaceListener;
Jian Li0f459612019-07-11 10:38:02 +090043import org.onosproject.k8snetworking.api.K8sNamespaceService;
Jian Li73d3b6a2019-07-08 18:07:53 +090044import org.onosproject.k8snetworking.api.K8sNetworkPolicyEvent;
45import org.onosproject.k8snetworking.api.K8sNetworkPolicyListener;
46import org.onosproject.k8snetworking.api.K8sNetworkPolicyService;
47import org.onosproject.k8snetworking.api.K8sNetworkService;
48import org.onosproject.k8snetworking.api.K8sPodEvent;
49import org.onosproject.k8snetworking.api.K8sPodListener;
50import org.onosproject.k8snetworking.api.K8sPodService;
Jian Lie1a5b8f2019-07-23 17:13:19 +090051import org.onosproject.k8snetworking.api.K8sServiceEvent;
52import org.onosproject.k8snetworking.api.K8sServiceListener;
53import org.onosproject.k8snetworking.api.K8sServiceService;
Jian Li73d3b6a2019-07-08 18:07:53 +090054import org.onosproject.k8snode.api.K8sNodeService;
55import org.onosproject.net.device.DeviceService;
56import org.onosproject.net.driver.DriverService;
57import org.onosproject.net.flow.DefaultTrafficSelector;
58import org.onosproject.net.flow.DefaultTrafficTreatment;
59import org.onosproject.net.flow.TrafficSelector;
60import org.onosproject.net.flow.TrafficTreatment;
61import org.onosproject.store.service.StorageService;
62import org.osgi.service.component.annotations.Activate;
63import org.osgi.service.component.annotations.Component;
64import org.osgi.service.component.annotations.Deactivate;
65import org.osgi.service.component.annotations.Reference;
66import org.osgi.service.component.annotations.ReferenceCardinality;
67import org.slf4j.Logger;
68
69import java.util.List;
70import java.util.Map;
71import java.util.Objects;
Jian Li0f459612019-07-11 10:38:02 +090072import java.util.Set;
Jian Li73d3b6a2019-07-08 18:07:53 +090073import java.util.concurrent.ExecutorService;
Jian Lie1a5b8f2019-07-23 17:13:19 +090074import java.util.concurrent.atomic.AtomicReference;
Jian Li73d3b6a2019-07-08 18:07:53 +090075
76import static java.util.concurrent.Executors.newSingleThreadExecutor;
Jian Lie1a5b8f2019-07-23 17:13:19 +090077import static org.onlab.packet.Ethernet.TYPE_IPV4;
Jian Li73d3b6a2019-07-08 18:07:53 +090078import static org.onlab.util.Tools.groupedThreads;
Jian Li0f459612019-07-11 10:38:02 +090079import static org.onosproject.k8snetworking.api.Constants.ACL_EGRESS_BLACK_TABLE;
80import static org.onosproject.k8snetworking.api.Constants.ACL_EGRESS_WHITE_TABLE;
81import static org.onosproject.k8snetworking.api.Constants.ACL_INGRESS_BLACK_TABLE;
82import static org.onosproject.k8snetworking.api.Constants.ACL_INGRESS_WHITE_TABLE;
Jian Li73d3b6a2019-07-08 18:07:53 +090083import static org.onosproject.k8snetworking.api.Constants.ACL_TABLE;
Jian Lie1a5b8f2019-07-23 17:13:19 +090084import static org.onosproject.k8snetworking.api.Constants.DEFAULT_METADATA_MASK;
85import static org.onosproject.k8snetworking.api.Constants.DEFAULT_NAMESPACE_HASH;
86import static org.onosproject.k8snetworking.api.Constants.DEFAULT_SEGMENT_ID;
87import static org.onosproject.k8snetworking.api.Constants.GROUPING_TABLE;
Jian Li73d3b6a2019-07-08 18:07:53 +090088import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Lie1a5b8f2019-07-23 17:13:19 +090089import static org.onosproject.k8snetworking.api.Constants.NAMESPACE_TABLE;
Jian Li73d3b6a2019-07-08 18:07:53 +090090import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE;
Jian Lie1a5b8f2019-07-23 17:13:19 +090091import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAMESPACE_RULE;
Jian Li73d3b6a2019-07-08 18:07:53 +090092import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
93import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
Jian Lie1a5b8f2019-07-23 17:13:19 +090094import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_CIDR_DEFAULT;
95import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.namespaceHashByNamespace;
96import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.namespaceHashByPodIp;
97import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.namespaceHashByServiceIp;
Jian Li73d3b6a2019-07-08 18:07:53 +090098import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.shiftIpDomain;
99import static org.slf4j.LoggerFactory.getLogger;
100
101/**
102 * Handles the ACL by referring to the network policy defined through kubernetes.
103 */
104@Component(immediate = true)
105public class K8sNetworkPolicyHandler {
106
107 private final Logger log = getLogger(getClass());
108
109 private static final String DIRECTION_INGRESS = "ingress";
110 private static final String DIRECTION_EGRESS = "egress";
111
112 private static final String PROTOCOL_TCP = "tcp";
113 private static final String PROTOCOL_UDP = "udp";
114
Jian Lie1a5b8f2019-07-23 17:13:19 +0900115 private static final String KUBE_SYSTEM = "kube-system";
116
Jian Li73d3b6a2019-07-08 18:07:53 +0900117 private static final int HOST_PREFIX = 32;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY)
120 protected CoreService coreService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY)
123 protected LeadershipService leadershipService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY)
126 protected ClusterService clusterService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY)
129 protected DriverService driverService;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY)
132 protected DeviceService deviceService;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY)
135 protected ComponentConfigService configService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
138 protected StorageService storageService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY)
141 protected K8sNetworkService k8sNetworkService;
142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY)
144 protected K8sFlowRuleService k8sFlowRuleService;
145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY)
147 protected K8sNodeService k8sNodeService;
148
149 @Reference(cardinality = ReferenceCardinality.MANDATORY)
150 protected K8sPodService k8sPodService;
151
152 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lie1a5b8f2019-07-23 17:13:19 +0900153 protected K8sServiceService k8sServiceService;
154
155 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li73d3b6a2019-07-08 18:07:53 +0900156 protected K8sNetworkPolicyService k8sNetworkPolicyService;
157
Jian Li0f459612019-07-11 10:38:02 +0900158 @Reference(cardinality = ReferenceCardinality.MANDATORY)
159 protected K8sNamespaceService k8sNamespaceService;
160
Jian Li73d3b6a2019-07-08 18:07:53 +0900161 private final ExecutorService eventExecutor = newSingleThreadExecutor(
162 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
163 private final InternalPodListener internalPodListener =
164 new InternalPodListener();
Jian Lie1a5b8f2019-07-23 17:13:19 +0900165 private final InternalServiceListener internalServiceListener =
166 new InternalServiceListener();
Jian Li73d3b6a2019-07-08 18:07:53 +0900167 private final InternalNetworkPolicyListener internalNetworkPolicyListener =
168 new InternalNetworkPolicyListener();
Jian Lie1a5b8f2019-07-23 17:13:19 +0900169 private final InternalNamespaceListener internalNamespaceListener =
170 new InternalNamespaceListener();
Jian Li73d3b6a2019-07-08 18:07:53 +0900171
172 private ApplicationId appId;
173 private NodeId localNodeId;
174
175 @Activate
176 protected void activate() {
177 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
178
179 localNodeId = clusterService.getLocalNode().id();
180 leadershipService.runForLeadership(appId.name());
181 k8sPodService.addListener(internalPodListener);
Jian Lie1a5b8f2019-07-23 17:13:19 +0900182 k8sServiceService.addListener(internalServiceListener);
Jian Li73d3b6a2019-07-08 18:07:53 +0900183 k8sNetworkPolicyService.addListener(internalNetworkPolicyListener);
Jian Lie1a5b8f2019-07-23 17:13:19 +0900184 k8sNamespaceService.addListener(internalNamespaceListener);
Jian Li73d3b6a2019-07-08 18:07:53 +0900185
186 log.info("Started");
187 }
188
189 @Deactivate
190 protected void deactivate() {
191 leadershipService.withdraw(appId.name());
192 k8sPodService.removeListener(internalPodListener);
Jian Lie1a5b8f2019-07-23 17:13:19 +0900193 k8sServiceService.removeListener(internalServiceListener);
Jian Li73d3b6a2019-07-08 18:07:53 +0900194 k8sNetworkPolicyService.removeListener(internalNetworkPolicyListener);
Jian Lie1a5b8f2019-07-23 17:13:19 +0900195 k8sNamespaceService.removeListener(internalNamespaceListener);
Jian Li73d3b6a2019-07-08 18:07:53 +0900196 eventExecutor.shutdown();
197
198 log.info("Stopped");
199 }
200
201 private void setBlockRulesByPolicy(NetworkPolicy policy, boolean install) {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900202 final Map<String, List<String>> filter = Maps.newConcurrentMap();
Jian Li73d3b6a2019-07-08 18:07:53 +0900203
Jian Lib7dfb5b2019-07-15 17:37:12 +0900204 k8sPodService.pods().forEach(pod ->
205 filter.putAll(getBlockRuleFilter(pod, policy)));
Jian Li73d3b6a2019-07-08 18:07:53 +0900206
207 setBlockRules(filter, install);
208 }
209
210 private void setBlockRulesByPod(Pod pod, boolean install) {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900211 final Map<String, List<String>> filter = Maps.newConcurrentMap();
Jian Li73d3b6a2019-07-08 18:07:53 +0900212
Jian Lib7dfb5b2019-07-15 17:37:12 +0900213 k8sNetworkPolicyService.networkPolicies().forEach(policy ->
214 filter.putAll(getBlockRuleFilter(pod, policy)));
Jian Li73d3b6a2019-07-08 18:07:53 +0900215
216 setBlockRules(filter, install);
217 }
218
Jian Lib7dfb5b2019-07-15 17:37:12 +0900219 private Map<String, List<String>> getBlockRuleFilter(Pod pod, NetworkPolicy policy) {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900220
221 // if the POD is not included in the namespace of the given policy,
222 // we do not block the POD
223 if (!pod.getMetadata().getNamespace().equals(policy.getMetadata().getNamespace())) {
224 return Maps.newConcurrentMap();
225 }
226
Jian Lib7dfb5b2019-07-15 17:37:12 +0900227 Map<String, String> labels = policy.getSpec().getPodSelector().getMatchLabels();
228 Map<String, List<String>> filter = Maps.newConcurrentMap();
229 String podIp = pod.getStatus().getPodIP();
230 List<String> policyTypes = policy.getSpec().getPolicyTypes();
231
232 if (podIp != null && policyTypes != null) {
233 if (labels == null) {
234 filter.put(podIp, policyTypes);
235 } else {
236 pod.getMetadata().getLabels().forEach((k, v) -> {
237 if (labels.get(k) != null && labels.get(k).equals(v)) {
238 filter.put(podIp, policyTypes);
239 }
240 });
241 }
242 }
243
244 return filter;
245 }
246
Jian Li73d3b6a2019-07-08 18:07:53 +0900247 private void setBlockRules(Map<String, List<String>> filter, boolean install) {
248 filter.forEach((k, v) -> {
249 v.forEach(d -> {
250 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Lie1a5b8f2019-07-23 17:13:19 +0900251 .matchEthType(TYPE_IPV4);
Jian Li73d3b6a2019-07-08 18:07:53 +0900252 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
Jian Lie1a5b8f2019-07-23 17:13:19 +0900253
254 Integer nsHash = namespaceHashByPodIp(k8sPodService, k8sNamespaceService, k);
255 if (nsHash != null) {
256 tBuilder.setTunnelId(nsHash);
257 }
258
Jian Li73d3b6a2019-07-08 18:07:53 +0900259 if (d.equalsIgnoreCase(DIRECTION_INGRESS)) {
260 sBuilder.matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(k), HOST_PREFIX));
Jian Li0f459612019-07-11 10:38:02 +0900261 tBuilder.transition(ACL_INGRESS_WHITE_TABLE);
Jian Li6b9bf562019-07-30 17:10:39 +0900262 setPolicyRulesBase(sBuilder, tBuilder, ACL_TABLE, install);
Jian Li73d3b6a2019-07-08 18:07:53 +0900263 } else if (d.equalsIgnoreCase(DIRECTION_EGRESS)) {
Jian Li6b9bf562019-07-30 17:10:39 +0900264 // original IP
Jian Li73d3b6a2019-07-08 18:07:53 +0900265 sBuilder.matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(k), HOST_PREFIX));
Jian Li0f459612019-07-11 10:38:02 +0900266 tBuilder.transition(ACL_EGRESS_WHITE_TABLE);
Jian Li6b9bf562019-07-30 17:10:39 +0900267 setPolicyRulesBase(sBuilder, tBuilder, ACL_TABLE, install);
Jian Li73d3b6a2019-07-08 18:07:53 +0900268
Jian Li6b9bf562019-07-30 17:10:39 +0900269
270 // shifted IP
271 sBuilder.matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(
272 shiftIpDomain(k, SHIFTED_IP_PREFIX)), HOST_PREFIX));
273 setPolicyRulesBase(sBuilder, tBuilder, ACL_TABLE, install);
274 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900275 });
276 });
277 }
278
Jian Lie1a5b8f2019-07-23 17:13:19 +0900279 private void setDefaultAllowNamespaceRules(Namespace namespace, boolean install) {
280
281 String ns = namespace.getMetadata().getName();
282 if (KUBE_SYSTEM.equalsIgnoreCase(ns)) {
283 setAllowNamespaceRulesBase(0, namespace.hashCode(),
284 DIRECTION_INGRESS, install);
285 }
286 }
287
288 private void setDefaultAllowServiceRules(boolean install) {
289 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
290 .matchEthType(TYPE_IPV4)
291 .matchIPSrc(IpPrefix.valueOf(SERVICE_IP_CIDR_DEFAULT));
292 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
293 .setTunnelId(DEFAULT_SEGMENT_ID)
294 .transition(ROUTING_TABLE);
295
296 setPolicyRulesBase(sBuilder, tBuilder, ACL_INGRESS_WHITE_TABLE, install);
297 }
298
299 private void setAllowNamespaceRulesBase(int tunnelId, int metadata,
300 String direction, boolean install) {
301 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
302
303 if (tunnelId != 0) {
304 sBuilder.matchTunnelId(tunnelId);
305 }
306
307 sBuilder.matchMetadata(metadata);
308 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
309 .setTunnelId(DEFAULT_SEGMENT_ID)
310 .transition(ROUTING_TABLE);
311
312 int table = 0;
313 if (DIRECTION_INGRESS.equals(direction)) {
314 table = ACL_INGRESS_WHITE_TABLE;
315 }
316
317 setPolicyRulesBase(sBuilder, tBuilder, table, install);
318 }
319
Jian Li73d3b6a2019-07-08 18:07:53 +0900320 private void setAllowRulesByPolicy(NetworkPolicy policy, boolean install) {
Jian Li0f459612019-07-11 10:38:02 +0900321 Map<String, Map<String, List<NetworkPolicyPort>>>
322 white = Maps.newConcurrentMap();
Jian Li73d3b6a2019-07-08 18:07:53 +0900323
Jian Lie1a5b8f2019-07-23 17:13:19 +0900324 int nsHash = namespaceHashByNamespace(k8sNamespaceService,
325 policy.getMetadata().getNamespace());
326
Jian Lib7dfb5b2019-07-15 17:37:12 +0900327 List<NetworkPolicyIngressRule> ingress = policy.getSpec().getIngress();
328 if (ingress != null && ingress.size() == 1) {
329 NetworkPolicyIngressRule rule = ingress.get(0);
330 if (rule.getFrom().size() == 0 && rule.getPorts().size() == 0) {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900331 setAllowAllRule(nsHash, DIRECTION_INGRESS, install);
Jian Lib7dfb5b2019-07-15 17:37:12 +0900332 }
333 }
334
Jian Li73d3b6a2019-07-08 18:07:53 +0900335 policy.getSpec().getIngress().forEach(i -> {
336 Map<String, List<NetworkPolicyPort>> direction = Maps.newConcurrentMap();
337 direction.put(DIRECTION_INGRESS, i.getPorts());
Jian Lie1a5b8f2019-07-23 17:13:19 +0900338 i.getFrom().forEach(peer -> {
339
340 // IP block
341 if (peer.getIpBlock() != null) {
342 if (peer.getIpBlock().getExcept() != null &&
343 peer.getIpBlock().getExcept().size() > 0) {
Jian Li0f459612019-07-11 10:38:02 +0900344 Map<String, List<NetworkPolicyPort>>
345 blkDirection = Maps.newConcurrentMap();
Jian Li73d3b6a2019-07-08 18:07:53 +0900346
Jian Li0f459612019-07-11 10:38:02 +0900347 blkDirection.put(DIRECTION_INGRESS, i.getPorts());
Jian Lie1a5b8f2019-07-23 17:13:19 +0900348 white.compute(peer.getIpBlock().getCidr(), (k, v) -> blkDirection);
Jian Li73d3b6a2019-07-08 18:07:53 +0900349
Jian Lie1a5b8f2019-07-23 17:13:19 +0900350 setBlackRules(peer.getIpBlock().getCidr(), DIRECTION_INGRESS,
351 peer.getIpBlock().getExcept(), install);
Jian Li0f459612019-07-11 10:38:02 +0900352 } else {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900353 white.compute(peer.getIpBlock().getCidr(), (k, v) -> direction);
Jian Li0f459612019-07-11 10:38:02 +0900354 }
355 }
356
Jian Lie1a5b8f2019-07-23 17:13:19 +0900357 // POD selector
Jian Licd934152019-07-28 21:35:49 +0900358 Set<Pod> pods = podsFromPolicyPeer(peer, policy.getMetadata().getNamespace());
Jian Li0f459612019-07-11 10:38:02 +0900359
360 pods.forEach(pod -> {
361 white.compute(shiftIpDomain(pod.getStatus().getPodIP(),
362 SHIFTED_IP_PREFIX) + "/" + HOST_PREFIX, (m, n) -> direction);
363 white.compute(pod.getStatus().getPodIP() + "/" + HOST_PREFIX, (m, n) -> direction);
Jian Li73d3b6a2019-07-08 18:07:53 +0900364 });
Jian Lie1a5b8f2019-07-23 17:13:19 +0900365
366 // Namespace selector
367 setAllowNamespaceRules(nsHash,
368 namespacesByPolicyPeer(peer), DIRECTION_INGRESS, install);
Jian Li73d3b6a2019-07-08 18:07:53 +0900369 });
370 });
371
Jian Lib7dfb5b2019-07-15 17:37:12 +0900372 List<NetworkPolicyEgressRule> egress = policy.getSpec().getEgress();
373 if (egress != null && egress.size() == 1) {
374 NetworkPolicyEgressRule rule = egress.get(0);
375 if (rule.getTo().size() == 0 && rule.getPorts().size() == 0) {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900376 setAllowAllRule(nsHash, DIRECTION_EGRESS, install);
Jian Lib7dfb5b2019-07-15 17:37:12 +0900377 }
378 }
379
Jian Li73d3b6a2019-07-08 18:07:53 +0900380 policy.getSpec().getEgress().forEach(e -> {
381 Map<String, List<NetworkPolicyPort>> direction = Maps.newConcurrentMap();
382 direction.put(DIRECTION_EGRESS, e.getPorts());
Jian Lie1a5b8f2019-07-23 17:13:19 +0900383 e.getTo().forEach(peer -> {
384
385 // IP block
386 if (peer.getIpBlock() != null) {
387 if (peer.getIpBlock().getExcept() != null &&
388 peer.getIpBlock().getExcept().size() > 0) {
Jian Li0f459612019-07-11 10:38:02 +0900389
390 Map<String, List<NetworkPolicyPort>>
391 blkDirection = Maps.newConcurrentMap();
392 blkDirection.put(DIRECTION_EGRESS, e.getPorts());
Jian Lie1a5b8f2019-07-23 17:13:19 +0900393 white.compute(peer.getIpBlock().getCidr(), (k, v) -> {
Jian Li0f459612019-07-11 10:38:02 +0900394 if (v != null) {
395 v.put(DIRECTION_EGRESS, e.getPorts());
396 return v;
397 } else {
398 return blkDirection;
399 }
400 });
401
Jian Lie1a5b8f2019-07-23 17:13:19 +0900402 setBlackRules(peer.getIpBlock().getCidr(), DIRECTION_EGRESS,
403 peer.getIpBlock().getExcept(), install);
Jian Li73d3b6a2019-07-08 18:07:53 +0900404 } else {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900405 white.compute(peer.getIpBlock().getCidr(), (k, v) -> {
Jian Li0f459612019-07-11 10:38:02 +0900406 if (v != null) {
407 v.put(DIRECTION_EGRESS, e.getPorts());
408 return v;
409 } else {
410 return direction;
411 }
412 });
Jian Li73d3b6a2019-07-08 18:07:53 +0900413 }
Jian Li0f459612019-07-11 10:38:02 +0900414 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900415
Jian Lie1a5b8f2019-07-23 17:13:19 +0900416 // POD selector
Jian Licd934152019-07-28 21:35:49 +0900417 Set<Pod> pods = podsFromPolicyPeer(peer, policy.getMetadata().getNamespace());
Jian Li0f459612019-07-11 10:38:02 +0900418
419 pods.forEach(pod -> {
420 white.compute(shiftIpDomain(pod.getStatus().getPodIP(),
421 SHIFTED_IP_PREFIX) + "/" + HOST_PREFIX, (m, n) -> {
422 if (n != null) {
423 n.put(DIRECTION_EGRESS, e.getPorts());
424 return n;
425 } else {
426 return direction;
427 }
428 });
429
430 white.compute(pod.getStatus().getPodIP() + "/" + HOST_PREFIX,
431 (m, n) -> {
432 if (n != null) {
433 n.put(DIRECTION_EGRESS, e.getPorts());
434 return n;
435 } else {
436 return direction;
Jian Li73d3b6a2019-07-08 18:07:53 +0900437 }
438 });
439 });
Jian Lie1a5b8f2019-07-23 17:13:19 +0900440
441 // Namespace selector
442 setAllowNamespaceRules(nsHash,
443 namespacesByPolicyPeer(peer), DIRECTION_EGRESS, install);
Jian Li73d3b6a2019-07-08 18:07:53 +0900444 });
445 });
446
Jian Lie1a5b8f2019-07-23 17:13:19 +0900447 setAllowRules(namespaceHashByNamespace(k8sNamespaceService,
448 policy.getMetadata().getNamespace()), white, install);
Jian Li0f459612019-07-11 10:38:02 +0900449 setBlackToRouteRules(true);
Jian Li73d3b6a2019-07-08 18:07:53 +0900450 }
451
Jian Licd934152019-07-28 21:35:49 +0900452 private Set<Pod> podsFromPolicyPeer(NetworkPolicyPeer peer, String namespace) {
453 Set<Pod> pods = Sets.newConcurrentHashSet();
454 if (peer.getPodSelector() != null) {
455 Map<String, String> podLabels = peer.getPodSelector().getMatchLabels();
456 List<LabelSelectorRequirement> matchExps = peer.getPodSelector().getMatchExpressions();
457
458 if (podLabels == null && matchExps.size() == 0) {
459 k8sPodService.pods().stream()
460 .filter(pod -> pod.getMetadata().getNamespace().equals(
461 namespace))
462 .forEach(pods::add);
463 } else {
464 k8sPodService.pods().stream()
465 .filter(pod -> pod.getMetadata().getNamespace().equals(
466 namespace))
467 .forEach(pod -> {
468 pod.getMetadata().getLabels().forEach((k, v) -> {
469 if (podLabels != null && podLabels.get(k) != null &&
470 podLabels.get(k).equals(v)) {
471 pods.add(pod);
472 }
473 });
474 });
475 }
476 }
477 return pods;
478 }
479
Jian Li73d3b6a2019-07-08 18:07:53 +0900480 private void setAllowRulesByPod(Pod pod, boolean install) {
Jian Li0f459612019-07-11 10:38:02 +0900481 Map<String, Map<String, List<NetworkPolicyPort>>>
482 white = Maps.newConcurrentMap();
Jian Lie1a5b8f2019-07-23 17:13:19 +0900483 AtomicReference<NetworkPolicy> selectedPolicy = new AtomicReference<>();
484 k8sNetworkPolicyService.networkPolicies().stream()
485 .filter(policy -> policy.getMetadata().getNamespace().equals(
486 pod.getMetadata().getNamespace()))
487 .forEach(policy -> {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900488 String podIp = pod.getStatus().getPodIP();
489
Jian Li73d3b6a2019-07-08 18:07:53 +0900490 policy.getSpec().getIngress().forEach(i -> {
Jian Li0f459612019-07-11 10:38:02 +0900491 Map<String, List<NetworkPolicyPort>>
492 direction = Maps.newConcurrentMap();
Jian Li73d3b6a2019-07-08 18:07:53 +0900493 direction.put(DIRECTION_INGRESS, i.getPorts());
494 i.getFrom().forEach(peer -> {
Jian Li0f459612019-07-11 10:38:02 +0900495 if (peer.getPodSelector() != null) {
496 Map<String, String> podLabels = peer.getPodSelector().getMatchLabels();
Jian Lie1a5b8f2019-07-23 17:13:19 +0900497 List<LabelSelectorRequirement> matchExps = peer.getPodSelector().getMatchExpressions();
Jian Li73d3b6a2019-07-08 18:07:53 +0900498
Jian Lie1a5b8f2019-07-23 17:13:19 +0900499 if (podLabels == null && matchExps.size() == 0 && podIp != null) {
500 white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
501 "/" + HOST_PREFIX, (m, n) -> direction);
502 white.compute(podIp + "/" +
503 HOST_PREFIX, (m, n) -> direction);
504
505 selectedPolicy.set(policy);
506 } else {
507 pod.getMetadata().getLabels().forEach((k, v) -> {
508 if (podLabels != null && podLabels.get(k) != null &&
509 podLabels.get(k).equals(v) && podIp != null) {
510 white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
511 "/" + HOST_PREFIX, (m, n) -> direction);
512 white.compute(podIp + "/" +
513 HOST_PREFIX, (m, n) -> direction);
514
515 selectedPolicy.set(policy);
516 }
517 });
Jian Li73d3b6a2019-07-08 18:07:53 +0900518 }
Jian Li0f459612019-07-11 10:38:02 +0900519 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900520 });
521 });
522 });
523
Jian Lie1a5b8f2019-07-23 17:13:19 +0900524 k8sNetworkPolicyService.networkPolicies().stream()
525 .filter(policy -> policy.getMetadata().getNamespace().equals(
526 pod.getMetadata().getNamespace()))
527 .forEach(policy -> {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900528 String podIp = pod.getStatus().getPodIP();
529
Jian Li73d3b6a2019-07-08 18:07:53 +0900530 policy.getSpec().getEgress().forEach(e -> {
531 Map<String, List<NetworkPolicyPort>> direction = Maps.newConcurrentMap();
532 direction.put(DIRECTION_EGRESS, e.getPorts());
Jian Li0f459612019-07-11 10:38:02 +0900533 e.getTo().forEach(peer -> {
534 if (peer.getPodSelector() != null) {
535 Map<String, String> podLabels = peer.getPodSelector().getMatchLabels();
Jian Lie1a5b8f2019-07-23 17:13:19 +0900536 List<LabelSelectorRequirement> matchExps = peer.getPodSelector().getMatchExpressions();
Jian Li73d3b6a2019-07-08 18:07:53 +0900537
Jian Lie1a5b8f2019-07-23 17:13:19 +0900538 if (podLabels == null && matchExps.size() == 0 && podIp != null) {
539 white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
540 "/" + HOST_PREFIX, (m, n) -> {
541 if (n != null) {
542 n.put(DIRECTION_EGRESS, e.getPorts());
543 return n;
544 } else {
545 return direction;
546 }
547 });
Jian Li73d3b6a2019-07-08 18:07:53 +0900548
Jian Lie1a5b8f2019-07-23 17:13:19 +0900549 white.compute(podIp + "/" +
550 HOST_PREFIX, (m, n) -> {
551 if (n != null) {
552 n.put(DIRECTION_EGRESS, e.getPorts());
553 return n;
554 } else {
555 return direction;
556 }
557 });
Jian Li0f459612019-07-11 10:38:02 +0900558
Jian Lie1a5b8f2019-07-23 17:13:19 +0900559 selectedPolicy.set(policy);
560 } else {
561 pod.getMetadata().getLabels().forEach((k, v) -> {
562 if (podLabels != null && podLabels.get(k) != null &&
563 podLabels.get(k).equals(v) && podIp != null) {
564 white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
565 "/" + HOST_PREFIX, (m, n) -> {
566 if (n != null) {
567 n.put(DIRECTION_EGRESS, e.getPorts());
568 return n;
569 } else {
570 return direction;
571 }
572 });
573
574 white.compute(podIp + "/" +
575 HOST_PREFIX, (m, n) -> {
576 if (n != null) {
577 n.put(DIRECTION_EGRESS, e.getPorts());
578 return n;
579 } else {
580 return direction;
581 }
582 });
583
584 selectedPolicy.set(policy);
585 }
586 });
Jian Li73d3b6a2019-07-08 18:07:53 +0900587 }
Jian Li0f459612019-07-11 10:38:02 +0900588 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900589 });
590 });
591 });
592
Jian Lie1a5b8f2019-07-23 17:13:19 +0900593 int nsHash = selectedPolicy.get() != null ? namespaceHashByNamespace(k8sNamespaceService,
594 selectedPolicy.get().getMetadata().getNamespace()) : DEFAULT_NAMESPACE_HASH;
595
596 setAllowRules(nsHash, white, install);
Jian Li0f459612019-07-11 10:38:02 +0900597 setBlackToRouteRules(true);
Jian Li73d3b6a2019-07-08 18:07:53 +0900598 }
599
Jian Lie1a5b8f2019-07-23 17:13:19 +0900600 private Set<Namespace> namespacesByLabels(Map<String, String> labels) {
601 Set<Namespace> nsSet = Sets.newConcurrentHashSet();
Jian Li0f459612019-07-11 10:38:02 +0900602 k8sNamespaceService.namespaces().forEach(ns -> {
603 if (ns != null && ns.getMetadata() != null &&
Jian Lie1a5b8f2019-07-23 17:13:19 +0900604 ns.getMetadata().getLabels() != null && labels != null) {
Jian Li0f459612019-07-11 10:38:02 +0900605 ns.getMetadata().getLabels().forEach((k, v) -> {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900606 if (labels.get(k) != null && labels.get(k).equals(v)) {
607 nsSet.add(ns);
Jian Li0f459612019-07-11 10:38:02 +0900608 }
609 });
610 }
611 });
612
Jian Lie1a5b8f2019-07-23 17:13:19 +0900613 return nsSet;
Jian Li0f459612019-07-11 10:38:02 +0900614 }
615
Jian Lie1a5b8f2019-07-23 17:13:19 +0900616 private Set<Namespace> namespacesByPolicyPeer(NetworkPolicyPeer peer) {
617 if (peer.getNamespaceSelector() != null) {
618 Map<String, String> labels = peer.getNamespaceSelector().getMatchLabels();
619 if (labels == null || labels.size() == 0) {
620 // if none of match labels are specified, it means the
621 // target PODs are from any namespaces
622 return k8sNamespaceService.namespaces();
623 } else {
624 return namespacesByLabels(labels);
625 }
626 }
627
628 return Sets.newConcurrentHashSet();
629 }
630
631 private void setAllowNamespaceRules(int tunnelId, Set<Namespace> nsSet,
632 String direction, boolean install) {
633
634 nsSet.forEach(ns -> {
635 setAllowNamespaceRulesBase(tunnelId, ns.hashCode(), direction, install);
636 });
637
638 }
639
640 private void setAllowAllRule(int nsHash, String direction, boolean install) {
641 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
642 .matchTunnelId(nsHash);
Jian Lib7dfb5b2019-07-15 17:37:12 +0900643 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Lie1a5b8f2019-07-23 17:13:19 +0900644 .setTunnelId(DEFAULT_SEGMENT_ID)
Jian Lib7dfb5b2019-07-15 17:37:12 +0900645 .transition(ROUTING_TABLE);
646
647 int table = 0;
648
649 if (DIRECTION_INGRESS.equalsIgnoreCase(direction)) {
650 table = ACL_INGRESS_WHITE_TABLE;
Jian Lib7dfb5b2019-07-15 17:37:12 +0900651 }
652
Jian Lie1a5b8f2019-07-23 17:13:19 +0900653 setPolicyRulesBase(sBuilder, tBuilder, table, install);
Jian Lib7dfb5b2019-07-15 17:37:12 +0900654 }
655
Jian Lie1a5b8f2019-07-23 17:13:19 +0900656 private void setAllowRules(int namespaceHash,
657 Map<String, Map<String, List<NetworkPolicyPort>>> white,
Jian Li0f459612019-07-11 10:38:02 +0900658 boolean install) {
Jian Li73d3b6a2019-07-08 18:07:53 +0900659 white.forEach((k, v) -> {
660 v.forEach((pk, pv) -> {
661 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Lie1a5b8f2019-07-23 17:13:19 +0900662 .matchTunnelId(namespaceHash)
663 .matchEthType(TYPE_IPV4);
Jian Li73d3b6a2019-07-08 18:07:53 +0900664 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
Jian Lie1a5b8f2019-07-23 17:13:19 +0900665 tBuilder.setTunnelId(DEFAULT_SEGMENT_ID);
666
Jian Li73d3b6a2019-07-08 18:07:53 +0900667 if (pk.equalsIgnoreCase(DIRECTION_INGRESS)) {
Jian Li0f459612019-07-11 10:38:02 +0900668 sBuilder.matchIPSrc(IpPrefix.valueOf(k));
669 tBuilder.writeMetadata(k.hashCode(), DEFAULT_METADATA_MASK)
670 .transition(ACL_INGRESS_BLACK_TABLE);
Jian Li73d3b6a2019-07-08 18:07:53 +0900671
672 if (pv.size() == 0) {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900673 setPolicyRulesBase(sBuilder, tBuilder, ACL_INGRESS_WHITE_TABLE, install);
Jian Li73d3b6a2019-07-08 18:07:53 +0900674 } else {
675 pv.forEach(p -> {
676 if (p.getProtocol().equalsIgnoreCase(PROTOCOL_TCP)) {
677 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP);
Jian Lib7dfb5b2019-07-15 17:37:12 +0900678
679 if (p.getPort() != null &&
680 p.getPort().getIntVal() != null) {
681 sBuilder.matchTcpDst(TpPort.tpPort(p.getPort().getIntVal()));
682 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900683 }
684 if (p.getProtocol().equalsIgnoreCase(PROTOCOL_UDP)) {
685 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP);
Jian Lib7dfb5b2019-07-15 17:37:12 +0900686
687 if (p.getPort() != null &&
688 p.getPort().getIntVal() != null) {
689 sBuilder.matchUdpDst(TpPort.tpPort(p.getPort().getIntVal()));
690 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900691 }
692
Jian Lie1a5b8f2019-07-23 17:13:19 +0900693 setPolicyRulesBase(sBuilder, tBuilder, ACL_INGRESS_WHITE_TABLE, install);
694 });
695 }
696 } else if (pk.equalsIgnoreCase(DIRECTION_EGRESS)) {
697 sBuilder.matchIPDst(IpPrefix.valueOf(k));
698 tBuilder.writeMetadata(k.hashCode(), DEFAULT_METADATA_MASK)
699 .transition(ACL_EGRESS_BLACK_TABLE);
700
701 if (pv.size() == 0) {
702 setPolicyRulesBase(sBuilder, tBuilder, ACL_EGRESS_WHITE_TABLE, install);
703 } else {
704 pv.forEach(p -> {
705 if (p.getProtocol().equalsIgnoreCase(PROTOCOL_TCP)) {
706 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP);
707
708 if (p.getPort() != null &&
709 p.getPort().getIntVal() != null) {
710 sBuilder.matchTcpSrc(TpPort.tpPort(p.getPort().getIntVal()));
711 }
712 }
713 if (p.getProtocol().equalsIgnoreCase(PROTOCOL_UDP)) {
714 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP);
715
716 if (p.getPort() != null &&
717 p.getPort().getIntVal() != null) {
718 sBuilder.matchUdpSrc(TpPort.tpPort(p.getPort().getIntVal()));
719 }
720 }
721 setPolicyRulesBase(sBuilder, tBuilder, ACL_EGRESS_WHITE_TABLE, install);
Jian Li73d3b6a2019-07-08 18:07:53 +0900722 });
723 }
724
725 } else {
726 log.error("In correct direction has been specified at network policy.");
727 }
728 });
729 });
730 }
731
Jian Lie1a5b8f2019-07-23 17:13:19 +0900732 private void setPolicyRulesBase(TrafficSelector.Builder sBuilder,
733 TrafficTreatment.Builder tBuilder,
734 int table,
735 boolean install) {
736 k8sNodeService.completeNodes().forEach(n -> {
737 k8sFlowRuleService.setRule(
738 appId,
739 n.intgBridge(),
740 sBuilder.build(),
741 tBuilder.build(),
742 PRIORITY_CIDR_RULE,
743 table,
744 install
745 );
746 });
747 }
748
Jian Li0f459612019-07-11 10:38:02 +0900749 private void setBlackRules(String whiteIpCidr, String direction,
750 List<String> except, boolean install) {
751 k8sNodeService.completeNodes().forEach(n -> {
752 except.forEach(blkIp -> {
753 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Lie1a5b8f2019-07-23 17:13:19 +0900754 .matchEthType(TYPE_IPV4)
Jian Li0f459612019-07-11 10:38:02 +0900755 .matchMetadata(whiteIpCidr.hashCode());
756 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
757 .drop();
758 int table = 0;
759 if (direction.equalsIgnoreCase(DIRECTION_INGRESS)) {
760 sBuilder.matchIPSrc(IpPrefix.valueOf(blkIp));
761 table = ACL_INGRESS_BLACK_TABLE;
762 }
763 if (direction.equalsIgnoreCase(DIRECTION_EGRESS)) {
764 sBuilder.matchIPDst(IpPrefix.valueOf(blkIp));
765 table = ACL_EGRESS_BLACK_TABLE;
766 }
767
Jian Lie1a5b8f2019-07-23 17:13:19 +0900768 setPolicyRulesBase(sBuilder, tBuilder, table, install);
Jian Li0f459612019-07-11 10:38:02 +0900769 });
770 });
771 }
772
773 private void setBlackToRouteRules(boolean install) {
774
775 k8sNodeService.completeNodes().forEach(n -> {
776 ImmutableSet.of(ACL_INGRESS_BLACK_TABLE, ACL_EGRESS_BLACK_TABLE).forEach(t -> {
777 k8sFlowRuleService.setRule(
778 appId,
779 n.intgBridge(),
780 DefaultTrafficSelector.builder().build(),
Jian Lie1a5b8f2019-07-23 17:13:19 +0900781 DefaultTrafficTreatment.builder()
782 .transition(ROUTING_TABLE).build(),
Jian Li0f459612019-07-11 10:38:02 +0900783 0,
784 t,
785 install
786 );
787 });
788 });
789 }
790
Jian Lie1a5b8f2019-07-23 17:13:19 +0900791 private void setNamespaceRulesByPod(Pod pod, boolean install) {
792 String podIp = pod.getStatus().getPodIP();
793
794 if (podIp == null) {
795 return;
796 }
797
798 Integer nsHash = namespaceHashByPodIp(k8sPodService, k8sNamespaceService, podIp);
799
800 if (nsHash == null) {
801 return;
802 }
803
804 setNamespaceRulesBase(podIp, nsHash, install);
805 }
806
807 private void setNamespaceRulesByService(Service service, boolean install) {
808 String clusterIp = service.getSpec().getClusterIP();
809
810 if (clusterIp == null) {
811 return;
812 }
813
814 setNamespaceRulesBase(clusterIp, namespaceHashByServiceIp(k8sServiceService,
815 k8sNamespaceService, clusterIp), install);
816 }
817
818 private void setNamespaceRulesBase(String ip, Integer nsHash, boolean install) {
819
820 k8sNodeService.completeNodes().forEach(n -> {
821 TrafficSelector.Builder origBuilder = DefaultTrafficSelector.builder()
822 .matchEthType(TYPE_IPV4)
823 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(ip), HOST_PREFIX));
824 TrafficSelector.Builder convBuilder = DefaultTrafficSelector.builder()
825 .matchEthType(TYPE_IPV4)
826 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(
827 shiftIpDomain(ip, SHIFTED_IP_PREFIX)), HOST_PREFIX));
828 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
829 .writeMetadata(nsHash, DEFAULT_METADATA_MASK)
830 .transition(GROUPING_TABLE);
831
832 k8sFlowRuleService.setRule(
833 appId,
834 n.intgBridge(),
835 origBuilder.build(),
836 tBuilder.build(),
837 PRIORITY_NAMESPACE_RULE,
838 NAMESPACE_TABLE,
839 install
840 );
841
842 k8sFlowRuleService.setRule(
843 appId,
844 n.intgBridge(),
845 convBuilder.build(),
846 tBuilder.build(),
847 PRIORITY_NAMESPACE_RULE,
848 NAMESPACE_TABLE,
849 install
850 );
851 });
852 }
853
854 private class InternalServiceListener implements K8sServiceListener {
855
856 private boolean isRelevantHelper() {
857 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
858 }
859
860 @Override
861 public void event(K8sServiceEvent event) {
862 Service service = event.subject();
863 switch (event.type()) {
864 case K8S_SERVICE_CREATED:
865 case K8S_SERVICE_UPDATED:
866 eventExecutor.execute(() -> processServiceCreation(service));
867 break;
868 case K8S_SERVICE_REMOVED:
869 eventExecutor.execute(() -> processServiceRemoval(service));
870 break;
871 default:
872 break;
873 }
874 }
875
876 private void processServiceCreation(Service service) {
877 if (!isRelevantHelper()) {
878 return;
879 }
880
881 setNamespaceRulesByService(service, true);
882 }
883
884 private void processServiceRemoval(Service service) {
885 if (!isRelevantHelper()) {
886 return;
887 }
888
889 setNamespaceRulesByService(service, false);
890 }
891 }
892
Jian Li73d3b6a2019-07-08 18:07:53 +0900893 private class InternalPodListener implements K8sPodListener {
894
895 private boolean isRelevantHelper() {
896 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
897 }
898
899 @Override
900 public void event(K8sPodEvent event) {
901 Pod pod = event.subject();
902 switch (event.type()) {
903 case K8S_POD_CREATED:
904 case K8S_POD_UPDATED:
905 eventExecutor.execute(() -> processPodCreation(pod));
906 break;
907 case K8S_POD_REMOVED:
908 eventExecutor.execute(() -> processPodRemoval(pod));
909 break;
910 default:
911 break;
912 }
913 }
914
915 private void processPodCreation(Pod pod) {
916 if (!isRelevantHelper()) {
917 return;
918 }
919
920 setBlockRulesByPod(pod, true);
921 setAllowRulesByPod(pod, true);
Jian Lie1a5b8f2019-07-23 17:13:19 +0900922 setNamespaceRulesByPod(pod, true);
Jian Li73d3b6a2019-07-08 18:07:53 +0900923 }
924
925 private void processPodRemoval(Pod pod) {
926 if (!isRelevantHelper()) {
927 return;
928 }
929
930 setBlockRulesByPod(pod, false);
931 setAllowRulesByPod(pod, false);
Jian Lie1a5b8f2019-07-23 17:13:19 +0900932 setNamespaceRulesByPod(pod, false);
Jian Li73d3b6a2019-07-08 18:07:53 +0900933 }
934 }
935
936 private class InternalNetworkPolicyListener implements K8sNetworkPolicyListener {
937
938 private boolean isRelevantHelper() {
939 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
940 }
941
942 @Override
943 public void event(K8sNetworkPolicyEvent event) {
944 NetworkPolicy policy = event.subject();
945 switch (event.type()) {
946 case K8S_NETWORK_POLICY_CREATED:
947 case K8S_NETWORK_POLICY_UPDATED:
948 eventExecutor.execute(() -> processNetworkPolicyCreation(policy));
949 break;
950 case K8S_NETWORK_POLICY_REMOVED:
951 eventExecutor.execute(() -> processNetworkPolicyRemoval(policy));
952 break;
953 default:
954 break;
955 }
956 }
957
958 private void processNetworkPolicyCreation(NetworkPolicy policy) {
959 if (!isRelevantHelper()) {
960 return;
961 }
962
963 setBlockRulesByPolicy(policy, true);
964 setAllowRulesByPolicy(policy, true);
965 }
966
967 private void processNetworkPolicyRemoval(NetworkPolicy policy) {
968 if (!isRelevantHelper()) {
969 return;
970 }
971
972 setBlockRulesByPolicy(policy, false);
973 setAllowRulesByPolicy(policy, false);
974 }
975 }
Jian Lie1a5b8f2019-07-23 17:13:19 +0900976
977 private class InternalNamespaceListener implements K8sNamespaceListener {
978
979 private boolean isRelevantHelper() {
980 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
981 }
982
983 @Override
984 public void event(K8sNamespaceEvent event) {
985 Namespace ns = event.subject();
986 switch (event.type()) {
987 case K8S_NAMESPACE_CREATED:
988 case K8S_NAMESPACE_UPDATED:
989 eventExecutor.execute(() -> processNamespaceCreation(ns));
990 break;
991 case K8S_NAMESPACE_REMOVED:
992 eventExecutor.execute(() -> processNamespaceRemoval(ns));
993 break;
994 default:
995 break;
996 }
997 }
998
999 private void processNamespaceCreation(Namespace namespace) {
1000 if (!isRelevantHelper()) {
1001 return;
1002 }
1003
1004 setDefaultAllowNamespaceRules(namespace, true);
1005 setDefaultAllowServiceRules(true);
1006 }
1007
1008 private void processNamespaceRemoval(Namespace namespace) {
1009 if (!isRelevantHelper()) {
1010 return;
1011 }
1012
1013 // do nothing for now
1014 }
1015 }
Jian Li73d3b6a2019-07-08 18:07:53 +09001016}