blob: 1044e391f66d112407f83879b2940d85889d6b76 [file] [log] [blame]
Jian Li73d3b6a2019-07-08 18:07:53 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
Jian Li0f459612019-07-11 10:38:02 +090018import com.google.common.collect.ImmutableSet;
Jian Li73d3b6a2019-07-08 18:07:53 +090019import com.google.common.collect.Maps;
Jian Li0f459612019-07-11 10:38:02 +090020import com.google.common.collect.Sets;
Jian Lie1a5b8f2019-07-23 17:13:19 +090021import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
22import io.fabric8.kubernetes.api.model.Namespace;
Jian Li73d3b6a2019-07-08 18:07:53 +090023import io.fabric8.kubernetes.api.model.Pod;
Jian Lie1a5b8f2019-07-23 17:13:19 +090024import io.fabric8.kubernetes.api.model.Service;
Jian Li73d3b6a2019-07-08 18:07:53 +090025import io.fabric8.kubernetes.api.model.networking.NetworkPolicy;
Jian Lib7dfb5b2019-07-15 17:37:12 +090026import io.fabric8.kubernetes.api.model.networking.NetworkPolicyEgressRule;
27import io.fabric8.kubernetes.api.model.networking.NetworkPolicyIngressRule;
Jian Lie1a5b8f2019-07-23 17:13:19 +090028import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPeer;
Jian Li73d3b6a2019-07-08 18:07:53 +090029import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPort;
Jian Li73d3b6a2019-07-08 18:07:53 +090030import org.onlab.packet.IPv4;
31import org.onlab.packet.IpAddress;
32import org.onlab.packet.IpPrefix;
33import org.onlab.packet.TpPort;
34import org.onosproject.cfg.ComponentConfigService;
35import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.LeadershipService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
40import org.onosproject.k8snetworking.api.K8sFlowRuleService;
Jian Lie1a5b8f2019-07-23 17:13:19 +090041import org.onosproject.k8snetworking.api.K8sNamespaceEvent;
42import org.onosproject.k8snetworking.api.K8sNamespaceListener;
Jian Li0f459612019-07-11 10:38:02 +090043import org.onosproject.k8snetworking.api.K8sNamespaceService;
Jian Li73d3b6a2019-07-08 18:07:53 +090044import org.onosproject.k8snetworking.api.K8sNetworkPolicyEvent;
45import org.onosproject.k8snetworking.api.K8sNetworkPolicyListener;
46import org.onosproject.k8snetworking.api.K8sNetworkPolicyService;
47import org.onosproject.k8snetworking.api.K8sNetworkService;
48import org.onosproject.k8snetworking.api.K8sPodEvent;
49import org.onosproject.k8snetworking.api.K8sPodListener;
50import org.onosproject.k8snetworking.api.K8sPodService;
Jian Lie1a5b8f2019-07-23 17:13:19 +090051import org.onosproject.k8snetworking.api.K8sServiceEvent;
52import org.onosproject.k8snetworking.api.K8sServiceListener;
53import org.onosproject.k8snetworking.api.K8sServiceService;
Jian Li73d3b6a2019-07-08 18:07:53 +090054import org.onosproject.k8snode.api.K8sNodeService;
55import org.onosproject.net.device.DeviceService;
56import org.onosproject.net.driver.DriverService;
57import org.onosproject.net.flow.DefaultTrafficSelector;
58import org.onosproject.net.flow.DefaultTrafficTreatment;
59import org.onosproject.net.flow.TrafficSelector;
60import org.onosproject.net.flow.TrafficTreatment;
61import org.onosproject.store.service.StorageService;
62import org.osgi.service.component.annotations.Activate;
63import org.osgi.service.component.annotations.Component;
64import org.osgi.service.component.annotations.Deactivate;
65import org.osgi.service.component.annotations.Reference;
66import org.osgi.service.component.annotations.ReferenceCardinality;
67import org.slf4j.Logger;
68
69import java.util.List;
70import java.util.Map;
71import java.util.Objects;
Jian Li0f459612019-07-11 10:38:02 +090072import java.util.Set;
Jian Li73d3b6a2019-07-08 18:07:53 +090073import java.util.concurrent.ExecutorService;
Jian Lie1a5b8f2019-07-23 17:13:19 +090074import java.util.concurrent.atomic.AtomicReference;
Jian Li73d3b6a2019-07-08 18:07:53 +090075
76import static java.util.concurrent.Executors.newSingleThreadExecutor;
Jian Lie1a5b8f2019-07-23 17:13:19 +090077import static org.onlab.packet.Ethernet.TYPE_IPV4;
Jian Li73d3b6a2019-07-08 18:07:53 +090078import static org.onlab.util.Tools.groupedThreads;
Jian Li0f459612019-07-11 10:38:02 +090079import static org.onosproject.k8snetworking.api.Constants.ACL_EGRESS_BLACK_TABLE;
80import static org.onosproject.k8snetworking.api.Constants.ACL_EGRESS_WHITE_TABLE;
81import static org.onosproject.k8snetworking.api.Constants.ACL_INGRESS_BLACK_TABLE;
82import static org.onosproject.k8snetworking.api.Constants.ACL_INGRESS_WHITE_TABLE;
Jian Li73d3b6a2019-07-08 18:07:53 +090083import static org.onosproject.k8snetworking.api.Constants.ACL_TABLE;
Jian Lie1a5b8f2019-07-23 17:13:19 +090084import static org.onosproject.k8snetworking.api.Constants.DEFAULT_METADATA_MASK;
85import static org.onosproject.k8snetworking.api.Constants.DEFAULT_NAMESPACE_HASH;
86import static org.onosproject.k8snetworking.api.Constants.DEFAULT_SEGMENT_ID;
87import static org.onosproject.k8snetworking.api.Constants.GROUPING_TABLE;
Jian Li73d3b6a2019-07-08 18:07:53 +090088import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Lie1a5b8f2019-07-23 17:13:19 +090089import static org.onosproject.k8snetworking.api.Constants.NAMESPACE_TABLE;
Jian Li73d3b6a2019-07-08 18:07:53 +090090import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE;
Jian Lie1a5b8f2019-07-23 17:13:19 +090091import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAMESPACE_RULE;
Jian Li73d3b6a2019-07-08 18:07:53 +090092import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
93import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
Jian Lie1a5b8f2019-07-23 17:13:19 +090094import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_CIDR_DEFAULT;
95import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.namespaceHashByNamespace;
96import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.namespaceHashByPodIp;
97import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.namespaceHashByServiceIp;
Jian Li73d3b6a2019-07-08 18:07:53 +090098import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.shiftIpDomain;
99import static org.slf4j.LoggerFactory.getLogger;
100
101/**
102 * Handles the ACL by referring to the network policy defined through kubernetes.
103 */
104@Component(immediate = true)
105public class K8sNetworkPolicyHandler {
106
107 private final Logger log = getLogger(getClass());
108
109 private static final String DIRECTION_INGRESS = "ingress";
110 private static final String DIRECTION_EGRESS = "egress";
111
112 private static final String PROTOCOL_TCP = "tcp";
113 private static final String PROTOCOL_UDP = "udp";
114
Jian Lie1a5b8f2019-07-23 17:13:19 +0900115 private static final String KUBE_SYSTEM = "kube-system";
116
Jian Li73d3b6a2019-07-08 18:07:53 +0900117 private static final int HOST_PREFIX = 32;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY)
120 protected CoreService coreService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY)
123 protected LeadershipService leadershipService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY)
126 protected ClusterService clusterService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY)
129 protected DriverService driverService;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY)
132 protected DeviceService deviceService;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY)
135 protected ComponentConfigService configService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
138 protected StorageService storageService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY)
141 protected K8sNetworkService k8sNetworkService;
142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY)
144 protected K8sFlowRuleService k8sFlowRuleService;
145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY)
147 protected K8sNodeService k8sNodeService;
148
149 @Reference(cardinality = ReferenceCardinality.MANDATORY)
150 protected K8sPodService k8sPodService;
151
152 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lie1a5b8f2019-07-23 17:13:19 +0900153 protected K8sServiceService k8sServiceService;
154
155 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li73d3b6a2019-07-08 18:07:53 +0900156 protected K8sNetworkPolicyService k8sNetworkPolicyService;
157
Jian Li0f459612019-07-11 10:38:02 +0900158 @Reference(cardinality = ReferenceCardinality.MANDATORY)
159 protected K8sNamespaceService k8sNamespaceService;
160
Jian Li73d3b6a2019-07-08 18:07:53 +0900161 private final ExecutorService eventExecutor = newSingleThreadExecutor(
162 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
163 private final InternalPodListener internalPodListener =
164 new InternalPodListener();
Jian Lie1a5b8f2019-07-23 17:13:19 +0900165 private final InternalServiceListener internalServiceListener =
166 new InternalServiceListener();
Jian Li73d3b6a2019-07-08 18:07:53 +0900167 private final InternalNetworkPolicyListener internalNetworkPolicyListener =
168 new InternalNetworkPolicyListener();
Jian Lie1a5b8f2019-07-23 17:13:19 +0900169 private final InternalNamespaceListener internalNamespaceListener =
170 new InternalNamespaceListener();
Jian Li73d3b6a2019-07-08 18:07:53 +0900171
172 private ApplicationId appId;
173 private NodeId localNodeId;
174
175 @Activate
176 protected void activate() {
177 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
178
179 localNodeId = clusterService.getLocalNode().id();
180 leadershipService.runForLeadership(appId.name());
181 k8sPodService.addListener(internalPodListener);
Jian Lie1a5b8f2019-07-23 17:13:19 +0900182 k8sServiceService.addListener(internalServiceListener);
Jian Li73d3b6a2019-07-08 18:07:53 +0900183 k8sNetworkPolicyService.addListener(internalNetworkPolicyListener);
Jian Lie1a5b8f2019-07-23 17:13:19 +0900184 k8sNamespaceService.addListener(internalNamespaceListener);
Jian Li73d3b6a2019-07-08 18:07:53 +0900185
186 log.info("Started");
187 }
188
189 @Deactivate
190 protected void deactivate() {
191 leadershipService.withdraw(appId.name());
192 k8sPodService.removeListener(internalPodListener);
Jian Lie1a5b8f2019-07-23 17:13:19 +0900193 k8sServiceService.removeListener(internalServiceListener);
Jian Li73d3b6a2019-07-08 18:07:53 +0900194 k8sNetworkPolicyService.removeListener(internalNetworkPolicyListener);
Jian Lie1a5b8f2019-07-23 17:13:19 +0900195 k8sNamespaceService.removeListener(internalNamespaceListener);
Jian Li73d3b6a2019-07-08 18:07:53 +0900196 eventExecutor.shutdown();
197
198 log.info("Stopped");
199 }
200
201 private void setBlockRulesByPolicy(NetworkPolicy policy, boolean install) {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900202 final Map<String, List<String>> filter = Maps.newConcurrentMap();
Jian Li73d3b6a2019-07-08 18:07:53 +0900203
Jian Lib7dfb5b2019-07-15 17:37:12 +0900204 k8sPodService.pods().forEach(pod ->
205 filter.putAll(getBlockRuleFilter(pod, policy)));
Jian Li73d3b6a2019-07-08 18:07:53 +0900206
207 setBlockRules(filter, install);
208 }
209
210 private void setBlockRulesByPod(Pod pod, boolean install) {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900211 final Map<String, List<String>> filter = Maps.newConcurrentMap();
Jian Li73d3b6a2019-07-08 18:07:53 +0900212
Jian Lib7dfb5b2019-07-15 17:37:12 +0900213 k8sNetworkPolicyService.networkPolicies().forEach(policy ->
214 filter.putAll(getBlockRuleFilter(pod, policy)));
Jian Li73d3b6a2019-07-08 18:07:53 +0900215
216 setBlockRules(filter, install);
217 }
218
Jian Lib7dfb5b2019-07-15 17:37:12 +0900219 private Map<String, List<String>> getBlockRuleFilter(Pod pod, NetworkPolicy policy) {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900220
221 // if the POD is not included in the namespace of the given policy,
222 // we do not block the POD
223 if (!pod.getMetadata().getNamespace().equals(policy.getMetadata().getNamespace())) {
224 return Maps.newConcurrentMap();
225 }
226
Jian Lib7dfb5b2019-07-15 17:37:12 +0900227 Map<String, String> labels = policy.getSpec().getPodSelector().getMatchLabels();
228 Map<String, List<String>> filter = Maps.newConcurrentMap();
229 String podIp = pod.getStatus().getPodIP();
230 List<String> policyTypes = policy.getSpec().getPolicyTypes();
231
232 if (podIp != null && policyTypes != null) {
233 if (labels == null) {
234 filter.put(podIp, policyTypes);
235 } else {
236 pod.getMetadata().getLabels().forEach((k, v) -> {
237 if (labels.get(k) != null && labels.get(k).equals(v)) {
238 filter.put(podIp, policyTypes);
239 }
240 });
241 }
242 }
243
244 return filter;
245 }
246
Jian Li73d3b6a2019-07-08 18:07:53 +0900247 private void setBlockRules(Map<String, List<String>> filter, boolean install) {
248 filter.forEach((k, v) -> {
249 v.forEach(d -> {
250 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Lie1a5b8f2019-07-23 17:13:19 +0900251 .matchEthType(TYPE_IPV4);
Jian Li73d3b6a2019-07-08 18:07:53 +0900252 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
Jian Lie1a5b8f2019-07-23 17:13:19 +0900253
254 Integer nsHash = namespaceHashByPodIp(k8sPodService, k8sNamespaceService, k);
255 if (nsHash != null) {
256 tBuilder.setTunnelId(nsHash);
257 }
258
Jian Li73d3b6a2019-07-08 18:07:53 +0900259 if (d.equalsIgnoreCase(DIRECTION_INGRESS)) {
260 sBuilder.matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(k), HOST_PREFIX));
Jian Li0f459612019-07-11 10:38:02 +0900261 tBuilder.transition(ACL_INGRESS_WHITE_TABLE);
Jian Li73d3b6a2019-07-08 18:07:53 +0900262 } else if (d.equalsIgnoreCase(DIRECTION_EGRESS)) {
263 sBuilder.matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(k), HOST_PREFIX));
Jian Li0f459612019-07-11 10:38:02 +0900264 tBuilder.transition(ACL_EGRESS_WHITE_TABLE);
Jian Li73d3b6a2019-07-08 18:07:53 +0900265 }
266
Jian Lie1a5b8f2019-07-23 17:13:19 +0900267 setPolicyRulesBase(sBuilder, tBuilder, ACL_TABLE, install);
Jian Li73d3b6a2019-07-08 18:07:53 +0900268 });
269 });
270 }
271
Jian Lie1a5b8f2019-07-23 17:13:19 +0900272 private void setDefaultAllowNamespaceRules(Namespace namespace, boolean install) {
273
274 String ns = namespace.getMetadata().getName();
275 if (KUBE_SYSTEM.equalsIgnoreCase(ns)) {
276 setAllowNamespaceRulesBase(0, namespace.hashCode(),
277 DIRECTION_INGRESS, install);
278 }
279 }
280
281 private void setDefaultAllowServiceRules(boolean install) {
282 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
283 .matchEthType(TYPE_IPV4)
284 .matchIPSrc(IpPrefix.valueOf(SERVICE_IP_CIDR_DEFAULT));
285 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
286 .setTunnelId(DEFAULT_SEGMENT_ID)
287 .transition(ROUTING_TABLE);
288
289 setPolicyRulesBase(sBuilder, tBuilder, ACL_INGRESS_WHITE_TABLE, install);
290 }
291
292 private void setAllowNamespaceRulesBase(int tunnelId, int metadata,
293 String direction, boolean install) {
294 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
295
296 if (tunnelId != 0) {
297 sBuilder.matchTunnelId(tunnelId);
298 }
299
300 sBuilder.matchMetadata(metadata);
301 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
302 .setTunnelId(DEFAULT_SEGMENT_ID)
303 .transition(ROUTING_TABLE);
304
305 int table = 0;
306 if (DIRECTION_INGRESS.equals(direction)) {
307 table = ACL_INGRESS_WHITE_TABLE;
308 }
309
310 setPolicyRulesBase(sBuilder, tBuilder, table, install);
311 }
312
Jian Li73d3b6a2019-07-08 18:07:53 +0900313 private void setAllowRulesByPolicy(NetworkPolicy policy, boolean install) {
Jian Li0f459612019-07-11 10:38:02 +0900314 Map<String, Map<String, List<NetworkPolicyPort>>>
315 white = Maps.newConcurrentMap();
Jian Li73d3b6a2019-07-08 18:07:53 +0900316
Jian Lie1a5b8f2019-07-23 17:13:19 +0900317 int nsHash = namespaceHashByNamespace(k8sNamespaceService,
318 policy.getMetadata().getNamespace());
319
Jian Lib7dfb5b2019-07-15 17:37:12 +0900320 List<NetworkPolicyIngressRule> ingress = policy.getSpec().getIngress();
321 if (ingress != null && ingress.size() == 1) {
322 NetworkPolicyIngressRule rule = ingress.get(0);
323 if (rule.getFrom().size() == 0 && rule.getPorts().size() == 0) {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900324 setAllowAllRule(nsHash, DIRECTION_INGRESS, install);
Jian Lib7dfb5b2019-07-15 17:37:12 +0900325 }
326 }
327
Jian Li73d3b6a2019-07-08 18:07:53 +0900328 policy.getSpec().getIngress().forEach(i -> {
329 Map<String, List<NetworkPolicyPort>> direction = Maps.newConcurrentMap();
330 direction.put(DIRECTION_INGRESS, i.getPorts());
Jian Lie1a5b8f2019-07-23 17:13:19 +0900331 i.getFrom().forEach(peer -> {
332
333 // IP block
334 if (peer.getIpBlock() != null) {
335 if (peer.getIpBlock().getExcept() != null &&
336 peer.getIpBlock().getExcept().size() > 0) {
Jian Li0f459612019-07-11 10:38:02 +0900337 Map<String, List<NetworkPolicyPort>>
338 blkDirection = Maps.newConcurrentMap();
Jian Li73d3b6a2019-07-08 18:07:53 +0900339
Jian Li0f459612019-07-11 10:38:02 +0900340 blkDirection.put(DIRECTION_INGRESS, i.getPorts());
Jian Lie1a5b8f2019-07-23 17:13:19 +0900341 white.compute(peer.getIpBlock().getCidr(), (k, v) -> blkDirection);
Jian Li73d3b6a2019-07-08 18:07:53 +0900342
Jian Lie1a5b8f2019-07-23 17:13:19 +0900343 setBlackRules(peer.getIpBlock().getCidr(), DIRECTION_INGRESS,
344 peer.getIpBlock().getExcept(), install);
Jian Li0f459612019-07-11 10:38:02 +0900345 } else {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900346 white.compute(peer.getIpBlock().getCidr(), (k, v) -> direction);
Jian Li0f459612019-07-11 10:38:02 +0900347 }
348 }
349
Jian Lie1a5b8f2019-07-23 17:13:19 +0900350 // POD selector
Jian Licd934152019-07-28 21:35:49 +0900351 Set<Pod> pods = podsFromPolicyPeer(peer, policy.getMetadata().getNamespace());
Jian Li0f459612019-07-11 10:38:02 +0900352
353 pods.forEach(pod -> {
354 white.compute(shiftIpDomain(pod.getStatus().getPodIP(),
355 SHIFTED_IP_PREFIX) + "/" + HOST_PREFIX, (m, n) -> direction);
356 white.compute(pod.getStatus().getPodIP() + "/" + HOST_PREFIX, (m, n) -> direction);
Jian Li73d3b6a2019-07-08 18:07:53 +0900357 });
Jian Lie1a5b8f2019-07-23 17:13:19 +0900358
359 // Namespace selector
360 setAllowNamespaceRules(nsHash,
361 namespacesByPolicyPeer(peer), DIRECTION_INGRESS, install);
Jian Li73d3b6a2019-07-08 18:07:53 +0900362 });
363 });
364
Jian Lib7dfb5b2019-07-15 17:37:12 +0900365 List<NetworkPolicyEgressRule> egress = policy.getSpec().getEgress();
366 if (egress != null && egress.size() == 1) {
367 NetworkPolicyEgressRule rule = egress.get(0);
368 if (rule.getTo().size() == 0 && rule.getPorts().size() == 0) {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900369 setAllowAllRule(nsHash, DIRECTION_EGRESS, install);
Jian Lib7dfb5b2019-07-15 17:37:12 +0900370 }
371 }
372
Jian Li73d3b6a2019-07-08 18:07:53 +0900373 policy.getSpec().getEgress().forEach(e -> {
374 Map<String, List<NetworkPolicyPort>> direction = Maps.newConcurrentMap();
375 direction.put(DIRECTION_EGRESS, e.getPorts());
Jian Lie1a5b8f2019-07-23 17:13:19 +0900376 e.getTo().forEach(peer -> {
377
378 // IP block
379 if (peer.getIpBlock() != null) {
380 if (peer.getIpBlock().getExcept() != null &&
381 peer.getIpBlock().getExcept().size() > 0) {
Jian Li0f459612019-07-11 10:38:02 +0900382
383 Map<String, List<NetworkPolicyPort>>
384 blkDirection = Maps.newConcurrentMap();
385 blkDirection.put(DIRECTION_EGRESS, e.getPorts());
Jian Lie1a5b8f2019-07-23 17:13:19 +0900386 white.compute(peer.getIpBlock().getCidr(), (k, v) -> {
Jian Li0f459612019-07-11 10:38:02 +0900387 if (v != null) {
388 v.put(DIRECTION_EGRESS, e.getPorts());
389 return v;
390 } else {
391 return blkDirection;
392 }
393 });
394
Jian Lie1a5b8f2019-07-23 17:13:19 +0900395 setBlackRules(peer.getIpBlock().getCidr(), DIRECTION_EGRESS,
396 peer.getIpBlock().getExcept(), install);
Jian Li73d3b6a2019-07-08 18:07:53 +0900397 } else {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900398 white.compute(peer.getIpBlock().getCidr(), (k, v) -> {
Jian Li0f459612019-07-11 10:38:02 +0900399 if (v != null) {
400 v.put(DIRECTION_EGRESS, e.getPorts());
401 return v;
402 } else {
403 return direction;
404 }
405 });
Jian Li73d3b6a2019-07-08 18:07:53 +0900406 }
Jian Li0f459612019-07-11 10:38:02 +0900407 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900408
Jian Lie1a5b8f2019-07-23 17:13:19 +0900409 // POD selector
Jian Licd934152019-07-28 21:35:49 +0900410 Set<Pod> pods = podsFromPolicyPeer(peer, policy.getMetadata().getNamespace());
Jian Li0f459612019-07-11 10:38:02 +0900411
412 pods.forEach(pod -> {
413 white.compute(shiftIpDomain(pod.getStatus().getPodIP(),
414 SHIFTED_IP_PREFIX) + "/" + HOST_PREFIX, (m, n) -> {
415 if (n != null) {
416 n.put(DIRECTION_EGRESS, e.getPorts());
417 return n;
418 } else {
419 return direction;
420 }
421 });
422
423 white.compute(pod.getStatus().getPodIP() + "/" + HOST_PREFIX,
424 (m, n) -> {
425 if (n != null) {
426 n.put(DIRECTION_EGRESS, e.getPorts());
427 return n;
428 } else {
429 return direction;
Jian Li73d3b6a2019-07-08 18:07:53 +0900430 }
431 });
432 });
Jian Lie1a5b8f2019-07-23 17:13:19 +0900433
434 // Namespace selector
435 setAllowNamespaceRules(nsHash,
436 namespacesByPolicyPeer(peer), DIRECTION_EGRESS, install);
Jian Li73d3b6a2019-07-08 18:07:53 +0900437 });
438 });
439
Jian Lie1a5b8f2019-07-23 17:13:19 +0900440 setAllowRules(namespaceHashByNamespace(k8sNamespaceService,
441 policy.getMetadata().getNamespace()), white, install);
Jian Li0f459612019-07-11 10:38:02 +0900442 setBlackToRouteRules(true);
Jian Li73d3b6a2019-07-08 18:07:53 +0900443 }
444
Jian Licd934152019-07-28 21:35:49 +0900445 private Set<Pod> podsFromPolicyPeer(NetworkPolicyPeer peer, String namespace) {
446 Set<Pod> pods = Sets.newConcurrentHashSet();
447 if (peer.getPodSelector() != null) {
448 Map<String, String> podLabels = peer.getPodSelector().getMatchLabels();
449 List<LabelSelectorRequirement> matchExps = peer.getPodSelector().getMatchExpressions();
450
451 if (podLabels == null && matchExps.size() == 0) {
452 k8sPodService.pods().stream()
453 .filter(pod -> pod.getMetadata().getNamespace().equals(
454 namespace))
455 .forEach(pods::add);
456 } else {
457 k8sPodService.pods().stream()
458 .filter(pod -> pod.getMetadata().getNamespace().equals(
459 namespace))
460 .forEach(pod -> {
461 pod.getMetadata().getLabels().forEach((k, v) -> {
462 if (podLabels != null && podLabels.get(k) != null &&
463 podLabels.get(k).equals(v)) {
464 pods.add(pod);
465 }
466 });
467 });
468 }
469 }
470 return pods;
471 }
472
Jian Li73d3b6a2019-07-08 18:07:53 +0900473 private void setAllowRulesByPod(Pod pod, boolean install) {
Jian Li0f459612019-07-11 10:38:02 +0900474 Map<String, Map<String, List<NetworkPolicyPort>>>
475 white = Maps.newConcurrentMap();
Jian Lie1a5b8f2019-07-23 17:13:19 +0900476 AtomicReference<NetworkPolicy> selectedPolicy = new AtomicReference<>();
477 k8sNetworkPolicyService.networkPolicies().stream()
478 .filter(policy -> policy.getMetadata().getNamespace().equals(
479 pod.getMetadata().getNamespace()))
480 .forEach(policy -> {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900481 String podIp = pod.getStatus().getPodIP();
482
Jian Li73d3b6a2019-07-08 18:07:53 +0900483 policy.getSpec().getIngress().forEach(i -> {
Jian Li0f459612019-07-11 10:38:02 +0900484 Map<String, List<NetworkPolicyPort>>
485 direction = Maps.newConcurrentMap();
Jian Li73d3b6a2019-07-08 18:07:53 +0900486 direction.put(DIRECTION_INGRESS, i.getPorts());
487 i.getFrom().forEach(peer -> {
Jian Li0f459612019-07-11 10:38:02 +0900488 if (peer.getPodSelector() != null) {
489 Map<String, String> podLabels = peer.getPodSelector().getMatchLabels();
Jian Lie1a5b8f2019-07-23 17:13:19 +0900490 List<LabelSelectorRequirement> matchExps = peer.getPodSelector().getMatchExpressions();
Jian Li73d3b6a2019-07-08 18:07:53 +0900491
Jian Lie1a5b8f2019-07-23 17:13:19 +0900492 if (podLabels == null && matchExps.size() == 0 && podIp != null) {
493 white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
494 "/" + HOST_PREFIX, (m, n) -> direction);
495 white.compute(podIp + "/" +
496 HOST_PREFIX, (m, n) -> direction);
497
498 selectedPolicy.set(policy);
499 } else {
500 pod.getMetadata().getLabels().forEach((k, v) -> {
501 if (podLabels != null && podLabels.get(k) != null &&
502 podLabels.get(k).equals(v) && podIp != null) {
503 white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
504 "/" + HOST_PREFIX, (m, n) -> direction);
505 white.compute(podIp + "/" +
506 HOST_PREFIX, (m, n) -> direction);
507
508 selectedPolicy.set(policy);
509 }
510 });
Jian Li73d3b6a2019-07-08 18:07:53 +0900511 }
Jian Li0f459612019-07-11 10:38:02 +0900512 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900513 });
514 });
515 });
516
Jian Lie1a5b8f2019-07-23 17:13:19 +0900517 k8sNetworkPolicyService.networkPolicies().stream()
518 .filter(policy -> policy.getMetadata().getNamespace().equals(
519 pod.getMetadata().getNamespace()))
520 .forEach(policy -> {
Jian Lib7dfb5b2019-07-15 17:37:12 +0900521 String podIp = pod.getStatus().getPodIP();
522
Jian Li73d3b6a2019-07-08 18:07:53 +0900523 policy.getSpec().getEgress().forEach(e -> {
524 Map<String, List<NetworkPolicyPort>> direction = Maps.newConcurrentMap();
525 direction.put(DIRECTION_EGRESS, e.getPorts());
Jian Li0f459612019-07-11 10:38:02 +0900526 e.getTo().forEach(peer -> {
527 if (peer.getPodSelector() != null) {
528 Map<String, String> podLabels = peer.getPodSelector().getMatchLabels();
Jian Lie1a5b8f2019-07-23 17:13:19 +0900529 List<LabelSelectorRequirement> matchExps = peer.getPodSelector().getMatchExpressions();
Jian Li73d3b6a2019-07-08 18:07:53 +0900530
Jian Lie1a5b8f2019-07-23 17:13:19 +0900531 if (podLabels == null && matchExps.size() == 0 && podIp != null) {
532 white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
533 "/" + HOST_PREFIX, (m, n) -> {
534 if (n != null) {
535 n.put(DIRECTION_EGRESS, e.getPorts());
536 return n;
537 } else {
538 return direction;
539 }
540 });
Jian Li73d3b6a2019-07-08 18:07:53 +0900541
Jian Lie1a5b8f2019-07-23 17:13:19 +0900542 white.compute(podIp + "/" +
543 HOST_PREFIX, (m, n) -> {
544 if (n != null) {
545 n.put(DIRECTION_EGRESS, e.getPorts());
546 return n;
547 } else {
548 return direction;
549 }
550 });
Jian Li0f459612019-07-11 10:38:02 +0900551
Jian Lie1a5b8f2019-07-23 17:13:19 +0900552 selectedPolicy.set(policy);
553 } else {
554 pod.getMetadata().getLabels().forEach((k, v) -> {
555 if (podLabels != null && podLabels.get(k) != null &&
556 podLabels.get(k).equals(v) && podIp != null) {
557 white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
558 "/" + HOST_PREFIX, (m, n) -> {
559 if (n != null) {
560 n.put(DIRECTION_EGRESS, e.getPorts());
561 return n;
562 } else {
563 return direction;
564 }
565 });
566
567 white.compute(podIp + "/" +
568 HOST_PREFIX, (m, n) -> {
569 if (n != null) {
570 n.put(DIRECTION_EGRESS, e.getPorts());
571 return n;
572 } else {
573 return direction;
574 }
575 });
576
577 selectedPolicy.set(policy);
578 }
579 });
Jian Li73d3b6a2019-07-08 18:07:53 +0900580 }
Jian Li0f459612019-07-11 10:38:02 +0900581 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900582 });
583 });
584 });
585
Jian Lie1a5b8f2019-07-23 17:13:19 +0900586 int nsHash = selectedPolicy.get() != null ? namespaceHashByNamespace(k8sNamespaceService,
587 selectedPolicy.get().getMetadata().getNamespace()) : DEFAULT_NAMESPACE_HASH;
588
589 setAllowRules(nsHash, white, install);
Jian Li0f459612019-07-11 10:38:02 +0900590 setBlackToRouteRules(true);
Jian Li73d3b6a2019-07-08 18:07:53 +0900591 }
592
Jian Lie1a5b8f2019-07-23 17:13:19 +0900593 private Set<Namespace> namespacesByLabels(Map<String, String> labels) {
594 Set<Namespace> nsSet = Sets.newConcurrentHashSet();
Jian Li0f459612019-07-11 10:38:02 +0900595 k8sNamespaceService.namespaces().forEach(ns -> {
596 if (ns != null && ns.getMetadata() != null &&
Jian Lie1a5b8f2019-07-23 17:13:19 +0900597 ns.getMetadata().getLabels() != null && labels != null) {
Jian Li0f459612019-07-11 10:38:02 +0900598 ns.getMetadata().getLabels().forEach((k, v) -> {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900599 if (labels.get(k) != null && labels.get(k).equals(v)) {
600 nsSet.add(ns);
Jian Li0f459612019-07-11 10:38:02 +0900601 }
602 });
603 }
604 });
605
Jian Lie1a5b8f2019-07-23 17:13:19 +0900606 return nsSet;
Jian Li0f459612019-07-11 10:38:02 +0900607 }
608
Jian Lie1a5b8f2019-07-23 17:13:19 +0900609 private Set<Namespace> namespacesByPolicyPeer(NetworkPolicyPeer peer) {
610 if (peer.getNamespaceSelector() != null) {
611 Map<String, String> labels = peer.getNamespaceSelector().getMatchLabels();
612 if (labels == null || labels.size() == 0) {
613 // if none of match labels are specified, it means the
614 // target PODs are from any namespaces
615 return k8sNamespaceService.namespaces();
616 } else {
617 return namespacesByLabels(labels);
618 }
619 }
620
621 return Sets.newConcurrentHashSet();
622 }
623
624 private void setAllowNamespaceRules(int tunnelId, Set<Namespace> nsSet,
625 String direction, boolean install) {
626
627 nsSet.forEach(ns -> {
628 setAllowNamespaceRulesBase(tunnelId, ns.hashCode(), direction, install);
629 });
630
631 }
632
633 private void setAllowAllRule(int nsHash, String direction, boolean install) {
634 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
635 .matchTunnelId(nsHash);
Jian Lib7dfb5b2019-07-15 17:37:12 +0900636 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Lie1a5b8f2019-07-23 17:13:19 +0900637 .setTunnelId(DEFAULT_SEGMENT_ID)
Jian Lib7dfb5b2019-07-15 17:37:12 +0900638 .transition(ROUTING_TABLE);
639
640 int table = 0;
641
642 if (DIRECTION_INGRESS.equalsIgnoreCase(direction)) {
643 table = ACL_INGRESS_WHITE_TABLE;
Jian Lib7dfb5b2019-07-15 17:37:12 +0900644 }
645
Jian Lie1a5b8f2019-07-23 17:13:19 +0900646 setPolicyRulesBase(sBuilder, tBuilder, table, install);
Jian Lib7dfb5b2019-07-15 17:37:12 +0900647 }
648
Jian Lie1a5b8f2019-07-23 17:13:19 +0900649 private void setAllowRules(int namespaceHash,
650 Map<String, Map<String, List<NetworkPolicyPort>>> white,
Jian Li0f459612019-07-11 10:38:02 +0900651 boolean install) {
Jian Li73d3b6a2019-07-08 18:07:53 +0900652 white.forEach((k, v) -> {
653 v.forEach((pk, pv) -> {
654 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Lie1a5b8f2019-07-23 17:13:19 +0900655 .matchTunnelId(namespaceHash)
656 .matchEthType(TYPE_IPV4);
Jian Li73d3b6a2019-07-08 18:07:53 +0900657 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
Jian Lie1a5b8f2019-07-23 17:13:19 +0900658 tBuilder.setTunnelId(DEFAULT_SEGMENT_ID);
659
Jian Li73d3b6a2019-07-08 18:07:53 +0900660 if (pk.equalsIgnoreCase(DIRECTION_INGRESS)) {
Jian Li0f459612019-07-11 10:38:02 +0900661 sBuilder.matchIPSrc(IpPrefix.valueOf(k));
662 tBuilder.writeMetadata(k.hashCode(), DEFAULT_METADATA_MASK)
663 .transition(ACL_INGRESS_BLACK_TABLE);
Jian Li73d3b6a2019-07-08 18:07:53 +0900664
665 if (pv.size() == 0) {
Jian Lie1a5b8f2019-07-23 17:13:19 +0900666 setPolicyRulesBase(sBuilder, tBuilder, ACL_INGRESS_WHITE_TABLE, install);
Jian Li73d3b6a2019-07-08 18:07:53 +0900667 } else {
668 pv.forEach(p -> {
669 if (p.getProtocol().equalsIgnoreCase(PROTOCOL_TCP)) {
670 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP);
Jian Lib7dfb5b2019-07-15 17:37:12 +0900671
672 if (p.getPort() != null &&
673 p.getPort().getIntVal() != null) {
674 sBuilder.matchTcpDst(TpPort.tpPort(p.getPort().getIntVal()));
675 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900676 }
677 if (p.getProtocol().equalsIgnoreCase(PROTOCOL_UDP)) {
678 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP);
Jian Lib7dfb5b2019-07-15 17:37:12 +0900679
680 if (p.getPort() != null &&
681 p.getPort().getIntVal() != null) {
682 sBuilder.matchUdpDst(TpPort.tpPort(p.getPort().getIntVal()));
683 }
Jian Li73d3b6a2019-07-08 18:07:53 +0900684 }
685
Jian Lie1a5b8f2019-07-23 17:13:19 +0900686 setPolicyRulesBase(sBuilder, tBuilder, ACL_INGRESS_WHITE_TABLE, install);
687 });
688 }
689 } else if (pk.equalsIgnoreCase(DIRECTION_EGRESS)) {
690 sBuilder.matchIPDst(IpPrefix.valueOf(k));
691 tBuilder.writeMetadata(k.hashCode(), DEFAULT_METADATA_MASK)
692 .transition(ACL_EGRESS_BLACK_TABLE);
693
694 if (pv.size() == 0) {
695 setPolicyRulesBase(sBuilder, tBuilder, ACL_EGRESS_WHITE_TABLE, install);
696 } else {
697 pv.forEach(p -> {
698 if (p.getProtocol().equalsIgnoreCase(PROTOCOL_TCP)) {
699 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP);
700
701 if (p.getPort() != null &&
702 p.getPort().getIntVal() != null) {
703 sBuilder.matchTcpSrc(TpPort.tpPort(p.getPort().getIntVal()));
704 }
705 }
706 if (p.getProtocol().equalsIgnoreCase(PROTOCOL_UDP)) {
707 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP);
708
709 if (p.getPort() != null &&
710 p.getPort().getIntVal() != null) {
711 sBuilder.matchUdpSrc(TpPort.tpPort(p.getPort().getIntVal()));
712 }
713 }
714 setPolicyRulesBase(sBuilder, tBuilder, ACL_EGRESS_WHITE_TABLE, install);
Jian Li73d3b6a2019-07-08 18:07:53 +0900715 });
716 }
717
718 } else {
719 log.error("In correct direction has been specified at network policy.");
720 }
721 });
722 });
723 }
724
Jian Lie1a5b8f2019-07-23 17:13:19 +0900725 private void setPolicyRulesBase(TrafficSelector.Builder sBuilder,
726 TrafficTreatment.Builder tBuilder,
727 int table,
728 boolean install) {
729 k8sNodeService.completeNodes().forEach(n -> {
730 k8sFlowRuleService.setRule(
731 appId,
732 n.intgBridge(),
733 sBuilder.build(),
734 tBuilder.build(),
735 PRIORITY_CIDR_RULE,
736 table,
737 install
738 );
739 });
740 }
741
Jian Li0f459612019-07-11 10:38:02 +0900742 private void setBlackRules(String whiteIpCidr, String direction,
743 List<String> except, boolean install) {
744 k8sNodeService.completeNodes().forEach(n -> {
745 except.forEach(blkIp -> {
746 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
Jian Lie1a5b8f2019-07-23 17:13:19 +0900747 .matchEthType(TYPE_IPV4)
Jian Li0f459612019-07-11 10:38:02 +0900748 .matchMetadata(whiteIpCidr.hashCode());
749 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
750 .drop();
751 int table = 0;
752 if (direction.equalsIgnoreCase(DIRECTION_INGRESS)) {
753 sBuilder.matchIPSrc(IpPrefix.valueOf(blkIp));
754 table = ACL_INGRESS_BLACK_TABLE;
755 }
756 if (direction.equalsIgnoreCase(DIRECTION_EGRESS)) {
757 sBuilder.matchIPDst(IpPrefix.valueOf(blkIp));
758 table = ACL_EGRESS_BLACK_TABLE;
759 }
760
Jian Lie1a5b8f2019-07-23 17:13:19 +0900761 setPolicyRulesBase(sBuilder, tBuilder, table, install);
Jian Li0f459612019-07-11 10:38:02 +0900762 });
763 });
764 }
765
766 private void setBlackToRouteRules(boolean install) {
767
768 k8sNodeService.completeNodes().forEach(n -> {
769 ImmutableSet.of(ACL_INGRESS_BLACK_TABLE, ACL_EGRESS_BLACK_TABLE).forEach(t -> {
770 k8sFlowRuleService.setRule(
771 appId,
772 n.intgBridge(),
773 DefaultTrafficSelector.builder().build(),
Jian Lie1a5b8f2019-07-23 17:13:19 +0900774 DefaultTrafficTreatment.builder()
775 .transition(ROUTING_TABLE).build(),
Jian Li0f459612019-07-11 10:38:02 +0900776 0,
777 t,
778 install
779 );
780 });
781 });
782 }
783
Jian Lie1a5b8f2019-07-23 17:13:19 +0900784 private void setNamespaceRulesByPod(Pod pod, boolean install) {
785 String podIp = pod.getStatus().getPodIP();
786
787 if (podIp == null) {
788 return;
789 }
790
791 Integer nsHash = namespaceHashByPodIp(k8sPodService, k8sNamespaceService, podIp);
792
793 if (nsHash == null) {
794 return;
795 }
796
797 setNamespaceRulesBase(podIp, nsHash, install);
798 }
799
800 private void setNamespaceRulesByService(Service service, boolean install) {
801 String clusterIp = service.getSpec().getClusterIP();
802
803 if (clusterIp == null) {
804 return;
805 }
806
807 setNamespaceRulesBase(clusterIp, namespaceHashByServiceIp(k8sServiceService,
808 k8sNamespaceService, clusterIp), install);
809 }
810
811 private void setNamespaceRulesBase(String ip, Integer nsHash, boolean install) {
812
813 k8sNodeService.completeNodes().forEach(n -> {
814 TrafficSelector.Builder origBuilder = DefaultTrafficSelector.builder()
815 .matchEthType(TYPE_IPV4)
816 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(ip), HOST_PREFIX));
817 TrafficSelector.Builder convBuilder = DefaultTrafficSelector.builder()
818 .matchEthType(TYPE_IPV4)
819 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(
820 shiftIpDomain(ip, SHIFTED_IP_PREFIX)), HOST_PREFIX));
821 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
822 .writeMetadata(nsHash, DEFAULT_METADATA_MASK)
823 .transition(GROUPING_TABLE);
824
825 k8sFlowRuleService.setRule(
826 appId,
827 n.intgBridge(),
828 origBuilder.build(),
829 tBuilder.build(),
830 PRIORITY_NAMESPACE_RULE,
831 NAMESPACE_TABLE,
832 install
833 );
834
835 k8sFlowRuleService.setRule(
836 appId,
837 n.intgBridge(),
838 convBuilder.build(),
839 tBuilder.build(),
840 PRIORITY_NAMESPACE_RULE,
841 NAMESPACE_TABLE,
842 install
843 );
844 });
845 }
846
847 private class InternalServiceListener implements K8sServiceListener {
848
849 private boolean isRelevantHelper() {
850 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
851 }
852
853 @Override
854 public void event(K8sServiceEvent event) {
855 Service service = event.subject();
856 switch (event.type()) {
857 case K8S_SERVICE_CREATED:
858 case K8S_SERVICE_UPDATED:
859 eventExecutor.execute(() -> processServiceCreation(service));
860 break;
861 case K8S_SERVICE_REMOVED:
862 eventExecutor.execute(() -> processServiceRemoval(service));
863 break;
864 default:
865 break;
866 }
867 }
868
869 private void processServiceCreation(Service service) {
870 if (!isRelevantHelper()) {
871 return;
872 }
873
874 setNamespaceRulesByService(service, true);
875 }
876
877 private void processServiceRemoval(Service service) {
878 if (!isRelevantHelper()) {
879 return;
880 }
881
882 setNamespaceRulesByService(service, false);
883 }
884 }
885
Jian Li73d3b6a2019-07-08 18:07:53 +0900886 private class InternalPodListener implements K8sPodListener {
887
888 private boolean isRelevantHelper() {
889 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
890 }
891
892 @Override
893 public void event(K8sPodEvent event) {
894 Pod pod = event.subject();
895 switch (event.type()) {
896 case K8S_POD_CREATED:
897 case K8S_POD_UPDATED:
898 eventExecutor.execute(() -> processPodCreation(pod));
899 break;
900 case K8S_POD_REMOVED:
901 eventExecutor.execute(() -> processPodRemoval(pod));
902 break;
903 default:
904 break;
905 }
906 }
907
908 private void processPodCreation(Pod pod) {
909 if (!isRelevantHelper()) {
910 return;
911 }
912
913 setBlockRulesByPod(pod, true);
914 setAllowRulesByPod(pod, true);
Jian Lie1a5b8f2019-07-23 17:13:19 +0900915 setNamespaceRulesByPod(pod, true);
Jian Li73d3b6a2019-07-08 18:07:53 +0900916 }
917
918 private void processPodRemoval(Pod pod) {
919 if (!isRelevantHelper()) {
920 return;
921 }
922
923 setBlockRulesByPod(pod, false);
924 setAllowRulesByPod(pod, false);
Jian Lie1a5b8f2019-07-23 17:13:19 +0900925 setNamespaceRulesByPod(pod, false);
Jian Li73d3b6a2019-07-08 18:07:53 +0900926 }
927 }
928
929 private class InternalNetworkPolicyListener implements K8sNetworkPolicyListener {
930
931 private boolean isRelevantHelper() {
932 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
933 }
934
935 @Override
936 public void event(K8sNetworkPolicyEvent event) {
937 NetworkPolicy policy = event.subject();
938 switch (event.type()) {
939 case K8S_NETWORK_POLICY_CREATED:
940 case K8S_NETWORK_POLICY_UPDATED:
941 eventExecutor.execute(() -> processNetworkPolicyCreation(policy));
942 break;
943 case K8S_NETWORK_POLICY_REMOVED:
944 eventExecutor.execute(() -> processNetworkPolicyRemoval(policy));
945 break;
946 default:
947 break;
948 }
949 }
950
951 private void processNetworkPolicyCreation(NetworkPolicy policy) {
952 if (!isRelevantHelper()) {
953 return;
954 }
955
956 setBlockRulesByPolicy(policy, true);
957 setAllowRulesByPolicy(policy, true);
958 }
959
960 private void processNetworkPolicyRemoval(NetworkPolicy policy) {
961 if (!isRelevantHelper()) {
962 return;
963 }
964
965 setBlockRulesByPolicy(policy, false);
966 setAllowRulesByPolicy(policy, false);
967 }
968 }
Jian Lie1a5b8f2019-07-23 17:13:19 +0900969
970 private class InternalNamespaceListener implements K8sNamespaceListener {
971
972 private boolean isRelevantHelper() {
973 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
974 }
975
976 @Override
977 public void event(K8sNamespaceEvent event) {
978 Namespace ns = event.subject();
979 switch (event.type()) {
980 case K8S_NAMESPACE_CREATED:
981 case K8S_NAMESPACE_UPDATED:
982 eventExecutor.execute(() -> processNamespaceCreation(ns));
983 break;
984 case K8S_NAMESPACE_REMOVED:
985 eventExecutor.execute(() -> processNamespaceRemoval(ns));
986 break;
987 default:
988 break;
989 }
990 }
991
992 private void processNamespaceCreation(Namespace namespace) {
993 if (!isRelevantHelper()) {
994 return;
995 }
996
997 setDefaultAllowNamespaceRules(namespace, true);
998 setDefaultAllowServiceRules(true);
999 }
1000
1001 private void processNamespaceRemoval(Namespace namespace) {
1002 if (!isRelevantHelper()) {
1003 return;
1004 }
1005
1006 // do nothing for now
1007 }
1008 }
Jian Li73d3b6a2019-07-08 18:07:53 +09001009}