Isolate the PODs traffic for each namespace using segment ID
Change-Id: Icb3e4379c3b211678d56e1eeee4ff70a2b02fba0
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
index 4a0c64c..25d8a93 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
@@ -57,20 +57,21 @@
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.k8snetworking.api.Constants.ACL_TABLE;
-import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE;
import static org.onosproject.k8snetworking.api.Constants.ARP_TABLE;
import static org.onosproject.k8snetworking.api.Constants.DEFAULT_GATEWAY_MAC;
import static org.onosproject.k8snetworking.api.Constants.FORWARDING_TABLE;
import static org.onosproject.k8snetworking.api.Constants.GROUPING_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE;
import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.api.Constants.NAMESPACE_TABLE;
import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE;
import static org.onosproject.k8snetworking.api.Constants.PRIORITY_SNAT_RULE;
import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
-import static org.onosproject.k8snetworking.api.Constants.STAT_INGRESS_TABLE;
import static org.onosproject.k8snetworking.api.Constants.STAT_EGRESS_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.STAT_INGRESS_TABLE;
import static org.onosproject.k8snetworking.api.Constants.VTAG_TABLE;
-import static org.onosproject.k8snetworking.api.Constants.VTAP_INGRESS_TABLE;
import static org.onosproject.k8snetworking.api.Constants.VTAP_EGRESS_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.VTAP_INGRESS_TABLE;
import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId;
import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension;
import static org.slf4j.LoggerFactory.getLogger;
@@ -230,10 +231,14 @@
// for vTag and ARP table transition
connectTables(deviceId, VTAG_TABLE, ARP_TABLE);
- connectTables(deviceId, JUMP_TABLE, GROUPING_TABLE);
+ // for jump and namespace table transition
+ connectTables(deviceId, JUMP_TABLE, NAMESPACE_TABLE);
// for ARP and ACL table transition
- connectTables(deviceId, ARP_TABLE, GROUPING_TABLE);
+ connectTables(deviceId, ARP_TABLE, NAMESPACE_TABLE);
+
+ // for namespace table transition to grouping table
+ connectTables(deviceId, NAMESPACE_TABLE, GROUPING_TABLE);
// for grouping table transition to ACL table
connectTables(deviceId, GROUPING_TABLE, ACL_TABLE);
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sNetworkPolicyHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sNetworkPolicyHandler.java
index dda9b58..ce31f04 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sNetworkPolicyHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sNetworkPolicyHandler.java
@@ -18,12 +18,15 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
+import io.fabric8.kubernetes.api.model.Namespace;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicy;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyEgressRule;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyIngressRule;
+import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPeer;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPort;
-import org.onlab.packet.Ethernet;
import org.onlab.packet.IPv4;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
@@ -35,6 +38,8 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.k8snetworking.api.K8sFlowRuleService;
+import org.onosproject.k8snetworking.api.K8sNamespaceEvent;
+import org.onosproject.k8snetworking.api.K8sNamespaceListener;
import org.onosproject.k8snetworking.api.K8sNamespaceService;
import org.onosproject.k8snetworking.api.K8sNetworkPolicyEvent;
import org.onosproject.k8snetworking.api.K8sNetworkPolicyListener;
@@ -43,6 +48,9 @@
import org.onosproject.k8snetworking.api.K8sPodEvent;
import org.onosproject.k8snetworking.api.K8sPodListener;
import org.onosproject.k8snetworking.api.K8sPodService;
+import org.onosproject.k8snetworking.api.K8sServiceEvent;
+import org.onosproject.k8snetworking.api.K8sServiceListener;
+import org.onosproject.k8snetworking.api.K8sServiceService;
import org.onosproject.k8snode.api.K8sNodeService;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.driver.DriverService;
@@ -63,19 +71,30 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.packet.Ethernet.TYPE_IPV4;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.k8snetworking.api.Constants.ACL_EGRESS_BLACK_TABLE;
import static org.onosproject.k8snetworking.api.Constants.ACL_EGRESS_WHITE_TABLE;
import static org.onosproject.k8snetworking.api.Constants.ACL_INGRESS_BLACK_TABLE;
import static org.onosproject.k8snetworking.api.Constants.ACL_INGRESS_WHITE_TABLE;
import static org.onosproject.k8snetworking.api.Constants.ACL_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.DEFAULT_METADATA_MASK;
+import static org.onosproject.k8snetworking.api.Constants.DEFAULT_NAMESPACE_HASH;
+import static org.onosproject.k8snetworking.api.Constants.DEFAULT_SEGMENT_ID;
+import static org.onosproject.k8snetworking.api.Constants.GROUPING_TABLE;
import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.api.Constants.NAMESPACE_TABLE;
import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE;
+import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAMESPACE_RULE;
import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_CIDR_DEFAULT;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.namespaceHashByNamespace;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.namespaceHashByPodIp;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.namespaceHashByServiceIp;
import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.shiftIpDomain;
import static org.slf4j.LoggerFactory.getLogger;
@@ -93,8 +112,9 @@
private static final String PROTOCOL_TCP = "tcp";
private static final String PROTOCOL_UDP = "udp";
+ private static final String KUBE_SYSTEM = "kube-system";
+
private static final int HOST_PREFIX = 32;
- private static final long DEFAULT_METADATA_MASK = 0xffffffffffffffffL;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@@ -130,6 +150,9 @@
protected K8sPodService k8sPodService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sServiceService k8sServiceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected K8sNetworkPolicyService k8sNetworkPolicyService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -139,8 +162,12 @@
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
private final InternalPodListener internalPodListener =
new InternalPodListener();
+ private final InternalServiceListener internalServiceListener =
+ new InternalServiceListener();
private final InternalNetworkPolicyListener internalNetworkPolicyListener =
new InternalNetworkPolicyListener();
+ private final InternalNamespaceListener internalNamespaceListener =
+ new InternalNamespaceListener();
private ApplicationId appId;
private NodeId localNodeId;
@@ -152,7 +179,9 @@
localNodeId = clusterService.getLocalNode().id();
leadershipService.runForLeadership(appId.name());
k8sPodService.addListener(internalPodListener);
+ k8sServiceService.addListener(internalServiceListener);
k8sNetworkPolicyService.addListener(internalNetworkPolicyListener);
+ k8sNamespaceService.addListener(internalNamespaceListener);
log.info("Started");
}
@@ -161,7 +190,9 @@
protected void deactivate() {
leadershipService.withdraw(appId.name());
k8sPodService.removeListener(internalPodListener);
+ k8sServiceService.removeListener(internalServiceListener);
k8sNetworkPolicyService.removeListener(internalNetworkPolicyListener);
+ k8sNamespaceService.removeListener(internalNamespaceListener);
eventExecutor.shutdown();
log.info("Stopped");
@@ -186,6 +217,13 @@
}
private Map<String, List<String>> getBlockRuleFilter(Pod pod, NetworkPolicy policy) {
+
+ // if the POD is not included in the namespace of the given policy,
+ // we do not block the POD
+ if (!pod.getMetadata().getNamespace().equals(policy.getMetadata().getNamespace())) {
+ return Maps.newConcurrentMap();
+ }
+
Map<String, String> labels = policy.getSpec().getPodSelector().getMatchLabels();
Map<String, List<String>> filter = Maps.newConcurrentMap();
String podIp = pod.getStatus().getPodIP();
@@ -210,8 +248,14 @@
filter.forEach((k, v) -> {
v.forEach(d -> {
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
- .matchEthType(Ethernet.TYPE_IPV4);
+ .matchEthType(TYPE_IPV4);
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+
+ Integer nsHash = namespaceHashByPodIp(k8sPodService, k8sNamespaceService, k);
+ if (nsHash != null) {
+ tBuilder.setTunnelId(nsHash);
+ }
+
if (d.equalsIgnoreCase(DIRECTION_INGRESS)) {
sBuilder.matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(k), HOST_PREFIX));
tBuilder.transition(ACL_INGRESS_WHITE_TABLE);
@@ -220,75 +264,112 @@
tBuilder.transition(ACL_EGRESS_WHITE_TABLE);
}
- k8sNodeService.completeNodes().forEach(n -> {
- k8sFlowRuleService.setRule(
- appId,
- n.intgBridge(),
- sBuilder.build(),
- tBuilder.build(),
- PRIORITY_CIDR_RULE,
- ACL_TABLE,
- install
- );
- });
+ setPolicyRulesBase(sBuilder, tBuilder, ACL_TABLE, install);
});
});
}
+ private void setDefaultAllowNamespaceRules(Namespace namespace, boolean install) {
+
+ String ns = namespace.getMetadata().getName();
+ if (KUBE_SYSTEM.equalsIgnoreCase(ns)) {
+ setAllowNamespaceRulesBase(0, namespace.hashCode(),
+ DIRECTION_INGRESS, install);
+ }
+ }
+
+ private void setDefaultAllowServiceRules(boolean install) {
+ TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+ .matchEthType(TYPE_IPV4)
+ .matchIPSrc(IpPrefix.valueOf(SERVICE_IP_CIDR_DEFAULT));
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+ .setTunnelId(DEFAULT_SEGMENT_ID)
+ .transition(ROUTING_TABLE);
+
+ setPolicyRulesBase(sBuilder, tBuilder, ACL_INGRESS_WHITE_TABLE, install);
+ }
+
+ private void setAllowNamespaceRulesBase(int tunnelId, int metadata,
+ String direction, boolean install) {
+ TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+
+ if (tunnelId != 0) {
+ sBuilder.matchTunnelId(tunnelId);
+ }
+
+ sBuilder.matchMetadata(metadata);
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+ .setTunnelId(DEFAULT_SEGMENT_ID)
+ .transition(ROUTING_TABLE);
+
+ int table = 0;
+ if (DIRECTION_INGRESS.equals(direction)) {
+ table = ACL_INGRESS_WHITE_TABLE;
+ }
+
+ setPolicyRulesBase(sBuilder, tBuilder, table, install);
+ }
+
private void setAllowRulesByPolicy(NetworkPolicy policy, boolean install) {
Map<String, Map<String, List<NetworkPolicyPort>>>
white = Maps.newConcurrentMap();
+ int nsHash = namespaceHashByNamespace(k8sNamespaceService,
+ policy.getMetadata().getNamespace());
+
List<NetworkPolicyIngressRule> ingress = policy.getSpec().getIngress();
if (ingress != null && ingress.size() == 1) {
NetworkPolicyIngressRule rule = ingress.get(0);
if (rule.getFrom().size() == 0 && rule.getPorts().size() == 0) {
- setAllowAllRule(DIRECTION_INGRESS, install);
+ setAllowAllRule(nsHash, DIRECTION_INGRESS, install);
}
}
policy.getSpec().getIngress().forEach(i -> {
Map<String, List<NetworkPolicyPort>> direction = Maps.newConcurrentMap();
direction.put(DIRECTION_INGRESS, i.getPorts());
- i.getFrom().forEach(p -> {
- if (p.getIpBlock() != null) {
- if (p.getIpBlock().getExcept() != null &&
- p.getIpBlock().getExcept().size() > 0) {
+ i.getFrom().forEach(peer -> {
+
+ // IP block
+ if (peer.getIpBlock() != null) {
+ if (peer.getIpBlock().getExcept() != null &&
+ peer.getIpBlock().getExcept().size() > 0) {
Map<String, List<NetworkPolicyPort>>
blkDirection = Maps.newConcurrentMap();
blkDirection.put(DIRECTION_INGRESS, i.getPorts());
- white.compute(p.getIpBlock().getCidr(), (k, v) -> blkDirection);
+ white.compute(peer.getIpBlock().getCidr(), (k, v) -> blkDirection);
- setBlackRules(p.getIpBlock().getCidr(), DIRECTION_INGRESS,
- p.getIpBlock().getExcept(), install);
+ setBlackRules(peer.getIpBlock().getCidr(), DIRECTION_INGRESS,
+ peer.getIpBlock().getExcept(), install);
} else {
- white.compute(p.getIpBlock().getCidr(), (k, v) -> direction);
+ white.compute(peer.getIpBlock().getCidr(), (k, v) -> direction);
}
}
+ // POD selector
Set<Pod> pods = Sets.newConcurrentHashSet();
- if (p.getPodSelector() != null) {
- Map<String, String> podLabels = p.getPodSelector().getMatchLabels();
+ if (peer.getPodSelector() != null) {
+ Map<String, String> podLabels = peer.getPodSelector().getMatchLabels();
+ List<LabelSelectorRequirement> matchExps = peer.getPodSelector().getMatchExpressions();
- k8sPodService.pods().forEach(pod -> {
- pod.getMetadata().getLabels().forEach((k, v) -> {
- if (podLabels != null && podLabels.get(k) != null &&
- podLabels.get(k).equals(v)) {
- pods.add(pod);
- }
- });
- });
- }
-
- if (p.getNamespaceSelector() != null) {
- if (p.getNamespaceSelector().getMatchLabels() == null ||
- p.getNamespaceSelector().getMatchLabels().size() == 0) {
- // if none of match labels are specified, it means the
- // target PODs are from any namespaces
- pods.addAll(k8sPodService.pods());
+ if (podLabels == null && matchExps.size() == 0) {
+ k8sPodService.pods().stream()
+ .filter(pod -> pod.getMetadata().getNamespace().equals(
+ policy.getMetadata().getNamespace()))
+ .forEach(pods::add);
} else {
- pods.addAll(podsFromNamespace(p.getNamespaceSelector().getMatchLabels()));
+ k8sPodService.pods().stream()
+ .filter(pod -> pod.getMetadata().getNamespace().equals(
+ policy.getMetadata().getNamespace()))
+ .forEach(pod -> {
+ pod.getMetadata().getLabels().forEach((k, v) -> {
+ if (podLabels != null && podLabels.get(k) != null &&
+ podLabels.get(k).equals(v)) {
+ pods.add(pod);
+ }
+ });
+ });
}
}
@@ -297,6 +378,10 @@
SHIFTED_IP_PREFIX) + "/" + HOST_PREFIX, (m, n) -> direction);
white.compute(pod.getStatus().getPodIP() + "/" + HOST_PREFIX, (m, n) -> direction);
});
+
+ // Namespace selector
+ setAllowNamespaceRules(nsHash,
+ namespacesByPolicyPeer(peer), DIRECTION_INGRESS, install);
});
});
@@ -304,22 +389,24 @@
if (egress != null && egress.size() == 1) {
NetworkPolicyEgressRule rule = egress.get(0);
if (rule.getTo().size() == 0 && rule.getPorts().size() == 0) {
- setAllowAllRule(DIRECTION_EGRESS, install);
+ setAllowAllRule(nsHash, DIRECTION_EGRESS, install);
}
}
policy.getSpec().getEgress().forEach(e -> {
Map<String, List<NetworkPolicyPort>> direction = Maps.newConcurrentMap();
direction.put(DIRECTION_EGRESS, e.getPorts());
- e.getTo().forEach(p -> {
- if (p.getIpBlock() != null) {
- if (p.getIpBlock().getExcept() != null &&
- p.getIpBlock().getExcept().size() > 0) {
+ e.getTo().forEach(peer -> {
+
+ // IP block
+ if (peer.getIpBlock() != null) {
+ if (peer.getIpBlock().getExcept() != null &&
+ peer.getIpBlock().getExcept().size() > 0) {
Map<String, List<NetworkPolicyPort>>
blkDirection = Maps.newConcurrentMap();
blkDirection.put(DIRECTION_EGRESS, e.getPorts());
- white.compute(p.getIpBlock().getCidr(), (k, v) -> {
+ white.compute(peer.getIpBlock().getCidr(), (k, v) -> {
if (v != null) {
v.put(DIRECTION_EGRESS, e.getPorts());
return v;
@@ -328,10 +415,10 @@
}
});
- setBlackRules(p.getIpBlock().getCidr(), DIRECTION_EGRESS,
- p.getIpBlock().getExcept(), install);
+ setBlackRules(peer.getIpBlock().getCidr(), DIRECTION_EGRESS,
+ peer.getIpBlock().getExcept(), install);
} else {
- white.compute(p.getIpBlock().getCidr(), (k, v) -> {
+ white.compute(peer.getIpBlock().getCidr(), (k, v) -> {
if (v != null) {
v.put(DIRECTION_EGRESS, e.getPorts());
return v;
@@ -344,26 +431,29 @@
Set<Pod> pods = Sets.newConcurrentHashSet();
- if (p.getPodSelector() != null) {
- Map<String, String> podLabels = p.getPodSelector().getMatchLabels();
- k8sPodService.pods().forEach(pod -> {
- pod.getMetadata().getLabels().forEach((k, v) -> {
- if (podLabels != null && podLabels.get(k) != null &&
- podLabels.get(k).equals(v)) {
- pods.add(pod);
- }
- });
- });
- }
+ // POD selector
+ if (peer.getPodSelector() != null) {
+ Map<String, String> podLabels = peer.getPodSelector().getMatchLabels();
- if (p.getNamespaceSelector() != null) {
- if (p.getNamespaceSelector().getMatchLabels() == null ||
- p.getNamespaceSelector().getMatchLabels().size() == 0) {
- // if none of match labels are specified, it means the
- // target PODs are from any namespaces
- pods.addAll(k8sPodService.pods());
+ List<LabelSelectorRequirement> matchExps = peer.getPodSelector().getMatchExpressions();
+
+ if (podLabels == null && matchExps.size() == 0) {
+ k8sPodService.pods().stream()
+ .filter(pod -> pod.getMetadata().getNamespace().equals(
+ policy.getMetadata().getNamespace()))
+ .forEach(pods::add);
} else {
- pods.addAll(podsFromNamespace(p.getNamespaceSelector().getMatchLabels()));
+ k8sPodService.pods().stream()
+ .filter(pod -> pod.getMetadata().getNamespace().equals(
+ policy.getMetadata().getNamespace()))
+ .forEach(pod -> {
+ pod.getMetadata().getLabels().forEach((k, v) -> {
+ if (podLabels != null && podLabels.get(k) != null &&
+ podLabels.get(k).equals(v)) {
+ pods.add(pod);
+ }
+ });
+ });
}
}
@@ -388,17 +478,26 @@
}
});
});
+
+ // Namespace selector
+ setAllowNamespaceRules(nsHash,
+ namespacesByPolicyPeer(peer), DIRECTION_EGRESS, install);
});
});
- setAllowRules(white, install);
+ setAllowRules(namespaceHashByNamespace(k8sNamespaceService,
+ policy.getMetadata().getNamespace()), white, install);
setBlackToRouteRules(true);
}
private void setAllowRulesByPod(Pod pod, boolean install) {
Map<String, Map<String, List<NetworkPolicyPort>>>
white = Maps.newConcurrentMap();
- k8sNetworkPolicyService.networkPolicies().forEach(policy -> {
+ AtomicReference<NetworkPolicy> selectedPolicy = new AtomicReference<>();
+ k8sNetworkPolicyService.networkPolicies().stream()
+ .filter(policy -> policy.getMetadata().getNamespace().equals(
+ pod.getMetadata().getNamespace()))
+ .forEach(policy -> {
String podIp = pod.getStatus().getPodIP();
policy.getSpec().getIngress().forEach(i -> {
@@ -408,32 +507,37 @@
i.getFrom().forEach(peer -> {
if (peer.getPodSelector() != null) {
Map<String, String> podLabels = peer.getPodSelector().getMatchLabels();
- pod.getMetadata().getLabels().forEach((k, v) -> {
- if (podLabels != null && podLabels.get(k) != null &&
- podLabels.get(k).equals(v) && podIp != null) {
- white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
- "/" + HOST_PREFIX, (m, n) -> direction);
- white.compute(podIp + "/" +
- HOST_PREFIX, (m, n) -> direction);
- }
- });
+ List<LabelSelectorRequirement> matchExps = peer.getPodSelector().getMatchExpressions();
- if (peer.getNamespaceSelector() != null &&
- peer.getNamespaceSelector().getMatchLabels() != null) {
- Set<Pod> pods = podsFromNamespace(
- peer.getNamespaceSelector().getMatchLabels());
- if ((pods != null && pods.contains(pod)) ||
- (peer.getNamespaceSelector().getMatchLabels().size() == 0)) {
- white.compute(pod.getStatus().getPodIP() + "/" +
- HOST_PREFIX, (m, n) -> direction);
- }
+ if (podLabels == null && matchExps.size() == 0 && podIp != null) {
+ white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
+ "/" + HOST_PREFIX, (m, n) -> direction);
+ white.compute(podIp + "/" +
+ HOST_PREFIX, (m, n) -> direction);
+
+ selectedPolicy.set(policy);
+ } else {
+ pod.getMetadata().getLabels().forEach((k, v) -> {
+ if (podLabels != null && podLabels.get(k) != null &&
+ podLabels.get(k).equals(v) && podIp != null) {
+ white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
+ "/" + HOST_PREFIX, (m, n) -> direction);
+ white.compute(podIp + "/" +
+ HOST_PREFIX, (m, n) -> direction);
+
+ selectedPolicy.set(policy);
+ }
+ });
}
}
});
});
});
- k8sNetworkPolicyService.networkPolicies().forEach(policy -> {
+ k8sNetworkPolicyService.networkPolicies().stream()
+ .filter(policy -> policy.getMetadata().getNamespace().equals(
+ pod.getMetadata().getNamespace()))
+ .forEach(policy -> {
String podIp = pod.getStatus().getPodIP();
policy.getSpec().getEgress().forEach(e -> {
@@ -442,185 +546,144 @@
e.getTo().forEach(peer -> {
if (peer.getPodSelector() != null) {
Map<String, String> podLabels = peer.getPodSelector().getMatchLabels();
- pod.getMetadata().getLabels().forEach((k, v) -> {
- if (podLabels != null && podLabels.get(k) != null &&
- podLabels.get(k).equals(v) && podIp != null) {
- white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
- "/" + HOST_PREFIX, (m, n) -> {
- if (n != null) {
- n.put(DIRECTION_EGRESS, e.getPorts());
- return n;
- } else {
- return direction;
- }
- });
+ List<LabelSelectorRequirement> matchExps = peer.getPodSelector().getMatchExpressions();
- white.compute(podIp + "/" +
- HOST_PREFIX, (m, n) -> {
- if (n != null) {
- n.put(DIRECTION_EGRESS, e.getPorts());
- return n;
- } else {
- return direction;
- }
- });
- }
- });
+ if (podLabels == null && matchExps.size() == 0 && podIp != null) {
+ white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
+ "/" + HOST_PREFIX, (m, n) -> {
+ if (n != null) {
+ n.put(DIRECTION_EGRESS, e.getPorts());
+ return n;
+ } else {
+ return direction;
+ }
+ });
- if (peer.getNamespaceSelector() != null &&
- peer.getNamespaceSelector().getMatchLabels() != null) {
- Set<Pod> pods = podsFromNamespace(
- peer.getNamespaceSelector().getMatchLabels());
- if ((pods != null && pods.contains(pod)) ||
- (peer.getNamespaceSelector().getMatchLabels().size() == 0)) {
- white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
- "/" + HOST_PREFIX, (m, n) -> {
- if (n != null) {
- n.put(DIRECTION_EGRESS, e.getPorts());
- return n;
- } else {
- return direction;
- }
- });
+ white.compute(podIp + "/" +
+ HOST_PREFIX, (m, n) -> {
+ if (n != null) {
+ n.put(DIRECTION_EGRESS, e.getPorts());
+ return n;
+ } else {
+ return direction;
+ }
+ });
- white.compute(podIp + "/" + HOST_PREFIX, (m, n) -> {
- if (n != null) {
- n.put(DIRECTION_EGRESS, e.getPorts());
- return n;
- } else {
- return direction;
- }
- });
- }
+ selectedPolicy.set(policy);
+ } else {
+ pod.getMetadata().getLabels().forEach((k, v) -> {
+ if (podLabels != null && podLabels.get(k) != null &&
+ podLabels.get(k).equals(v) && podIp != null) {
+ white.compute(shiftIpDomain(podIp, SHIFTED_IP_PREFIX) +
+ "/" + HOST_PREFIX, (m, n) -> {
+ if (n != null) {
+ n.put(DIRECTION_EGRESS, e.getPorts());
+ return n;
+ } else {
+ return direction;
+ }
+ });
+
+ white.compute(podIp + "/" +
+ HOST_PREFIX, (m, n) -> {
+ if (n != null) {
+ n.put(DIRECTION_EGRESS, e.getPorts());
+ return n;
+ } else {
+ return direction;
+ }
+ });
+
+ selectedPolicy.set(policy);
+ }
+ });
}
}
});
});
});
- setAllowRules(white, install);
+ int nsHash = selectedPolicy.get() != null ? namespaceHashByNamespace(k8sNamespaceService,
+ selectedPolicy.get().getMetadata().getNamespace()) : DEFAULT_NAMESPACE_HASH;
+
+ setAllowRules(nsHash, white, install);
setBlackToRouteRules(true);
}
- private Set<Pod> podsFromNamespace(Map<String, String> nsLabels) {
- Set<Pod> pods = Sets.newConcurrentHashSet();
+ private Set<Namespace> namespacesByLabels(Map<String, String> labels) {
+ Set<Namespace> nsSet = Sets.newConcurrentHashSet();
k8sNamespaceService.namespaces().forEach(ns -> {
if (ns != null && ns.getMetadata() != null &&
- ns.getMetadata().getLabels() != null && nsLabels != null) {
+ ns.getMetadata().getLabels() != null && labels != null) {
ns.getMetadata().getLabels().forEach((k, v) -> {
- if (nsLabels.get(k) != null && nsLabels.get(k).equals(v)) {
- pods.addAll(k8sPodService.pods().stream()
- .filter(pod -> pod.getMetadata().getNamespace()
- .equals(ns.getMetadata().getName()))
- .collect(Collectors.toSet()));
+ if (labels.get(k) != null && labels.get(k).equals(v)) {
+ nsSet.add(ns);
}
});
}
});
- return pods;
+ return nsSet;
}
- private void setAllowAllRule(String direction, boolean install) {
- TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+ private Set<Namespace> namespacesByPolicyPeer(NetworkPolicyPeer peer) {
+ if (peer.getNamespaceSelector() != null) {
+ Map<String, String> labels = peer.getNamespaceSelector().getMatchLabels();
+ if (labels == null || labels.size() == 0) {
+ // if none of match labels are specified, it means the
+ // target PODs are from any namespaces
+ return k8sNamespaceService.namespaces();
+ } else {
+ return namespacesByLabels(labels);
+ }
+ }
+
+ return Sets.newConcurrentHashSet();
+ }
+
+ private void setAllowNamespaceRules(int tunnelId, Set<Namespace> nsSet,
+ String direction, boolean install) {
+
+ nsSet.forEach(ns -> {
+ setAllowNamespaceRulesBase(tunnelId, ns.hashCode(), direction, install);
+ });
+
+ }
+
+ private void setAllowAllRule(int nsHash, String direction, boolean install) {
+ TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+ .matchTunnelId(nsHash);
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+ .setTunnelId(DEFAULT_SEGMENT_ID)
.transition(ROUTING_TABLE);
int table = 0;
if (DIRECTION_INGRESS.equalsIgnoreCase(direction)) {
table = ACL_INGRESS_WHITE_TABLE;
- } else if (DIRECTION_EGRESS.equalsIgnoreCase(direction)) {
- table = ACL_EGRESS_WHITE_TABLE;
}
- int finalTable = table;
- k8sNodeService.completeNodes().forEach(n -> {
- k8sFlowRuleService.setRule(
- appId,
- n.intgBridge(),
- sBuilder.build(),
- tBuilder.build(),
- PRIORITY_CIDR_RULE,
- finalTable,
- install
- );
- });
+ setPolicyRulesBase(sBuilder, tBuilder, table, install);
}
- private void setAllowRules(Map<String, Map<String, List<NetworkPolicyPort>>> white,
+ private void setAllowRules(int namespaceHash,
+ Map<String, Map<String, List<NetworkPolicyPort>>> white,
boolean install) {
white.forEach((k, v) -> {
v.forEach((pk, pv) -> {
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
- .matchEthType(Ethernet.TYPE_IPV4);
+ .matchTunnelId(namespaceHash)
+ .matchEthType(TYPE_IPV4);
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+ tBuilder.setTunnelId(DEFAULT_SEGMENT_ID);
+
if (pk.equalsIgnoreCase(DIRECTION_INGRESS)) {
sBuilder.matchIPSrc(IpPrefix.valueOf(k));
tBuilder.writeMetadata(k.hashCode(), DEFAULT_METADATA_MASK)
.transition(ACL_INGRESS_BLACK_TABLE);
if (pv.size() == 0) {
- k8sNodeService.completeNodes().forEach(n -> {
- k8sFlowRuleService.setRule(
- appId,
- n.intgBridge(),
- sBuilder.build(),
- tBuilder.build(),
- PRIORITY_CIDR_RULE,
- ACL_INGRESS_WHITE_TABLE,
- install
- );
- });
- } else {
- pv.forEach(p -> {
- if (p.getProtocol().equalsIgnoreCase(PROTOCOL_TCP)) {
- sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP);
-
- if (p.getPort() != null &&
- p.getPort().getIntVal() != null) {
- sBuilder.matchTcpSrc(TpPort.tpPort(p.getPort().getIntVal()));
- }
- }
- if (p.getProtocol().equalsIgnoreCase(PROTOCOL_UDP)) {
- sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP);
-
- if (p.getPort() != null &&
- p.getPort().getIntVal() != null) {
- sBuilder.matchUdpSrc(TpPort.tpPort(p.getPort().getIntVal()));
- }
- }
-
- k8sNodeService.completeNodes().forEach(n -> {
- k8sFlowRuleService.setRule(
- appId,
- n.intgBridge(),
- sBuilder.build(),
- tBuilder.build(),
- PRIORITY_CIDR_RULE,
- ACL_INGRESS_WHITE_TABLE,
- install
- );
- });
- });
- }
- } else if (pk.equalsIgnoreCase(DIRECTION_EGRESS)) {
- sBuilder.matchIPDst(IpPrefix.valueOf(k));
- tBuilder.writeMetadata(k.hashCode(), DEFAULT_METADATA_MASK)
- .transition(ACL_EGRESS_BLACK_TABLE);
-
- if (pv.size() == 0) {
- k8sNodeService.completeNodes().forEach(n -> {
- k8sFlowRuleService.setRule(
- appId,
- n.intgBridge(),
- sBuilder.build(),
- tBuilder.build(),
- PRIORITY_CIDR_RULE,
- ACL_EGRESS_WHITE_TABLE,
- install
- );
- });
+ setPolicyRulesBase(sBuilder, tBuilder, ACL_INGRESS_WHITE_TABLE, install);
} else {
pv.forEach(p -> {
if (p.getProtocol().equalsIgnoreCase(PROTOCOL_TCP)) {
@@ -640,17 +703,35 @@
}
}
- k8sNodeService.completeNodes().forEach(n -> {
- k8sFlowRuleService.setRule(
- appId,
- n.intgBridge(),
- sBuilder.build(),
- tBuilder.build(),
- PRIORITY_CIDR_RULE,
- ACL_EGRESS_WHITE_TABLE,
- install
- );
- });
+ setPolicyRulesBase(sBuilder, tBuilder, ACL_INGRESS_WHITE_TABLE, install);
+ });
+ }
+ } else if (pk.equalsIgnoreCase(DIRECTION_EGRESS)) {
+ sBuilder.matchIPDst(IpPrefix.valueOf(k));
+ tBuilder.writeMetadata(k.hashCode(), DEFAULT_METADATA_MASK)
+ .transition(ACL_EGRESS_BLACK_TABLE);
+
+ if (pv.size() == 0) {
+ setPolicyRulesBase(sBuilder, tBuilder, ACL_EGRESS_WHITE_TABLE, install);
+ } else {
+ pv.forEach(p -> {
+ if (p.getProtocol().equalsIgnoreCase(PROTOCOL_TCP)) {
+ sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP);
+
+ if (p.getPort() != null &&
+ p.getPort().getIntVal() != null) {
+ sBuilder.matchTcpSrc(TpPort.tpPort(p.getPort().getIntVal()));
+ }
+ }
+ if (p.getProtocol().equalsIgnoreCase(PROTOCOL_UDP)) {
+ sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP);
+
+ if (p.getPort() != null &&
+ p.getPort().getIntVal() != null) {
+ sBuilder.matchUdpSrc(TpPort.tpPort(p.getPort().getIntVal()));
+ }
+ }
+ setPolicyRulesBase(sBuilder, tBuilder, ACL_EGRESS_WHITE_TABLE, install);
});
}
@@ -661,12 +742,29 @@
});
}
+ private void setPolicyRulesBase(TrafficSelector.Builder sBuilder,
+ TrafficTreatment.Builder tBuilder,
+ int table,
+ boolean install) {
+ k8sNodeService.completeNodes().forEach(n -> {
+ k8sFlowRuleService.setRule(
+ appId,
+ n.intgBridge(),
+ sBuilder.build(),
+ tBuilder.build(),
+ PRIORITY_CIDR_RULE,
+ table,
+ install
+ );
+ });
+ }
+
private void setBlackRules(String whiteIpCidr, String direction,
List<String> except, boolean install) {
k8sNodeService.completeNodes().forEach(n -> {
except.forEach(blkIp -> {
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
- .matchEthType(Ethernet.TYPE_IPV4)
+ .matchEthType(TYPE_IPV4)
.matchMetadata(whiteIpCidr.hashCode());
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
.drop();
@@ -680,15 +778,7 @@
table = ACL_EGRESS_BLACK_TABLE;
}
- k8sFlowRuleService.setRule(
- appId,
- n.intgBridge(),
- sBuilder.build(),
- tBuilder.build(),
- PRIORITY_CIDR_RULE,
- table,
- install
- );
+ setPolicyRulesBase(sBuilder, tBuilder, table, install);
});
});
}
@@ -701,7 +791,8 @@
appId,
n.intgBridge(),
DefaultTrafficSelector.builder().build(),
- DefaultTrafficTreatment.builder().transition(ROUTING_TABLE).build(),
+ DefaultTrafficTreatment.builder()
+ .transition(ROUTING_TABLE).build(),
0,
t,
install
@@ -710,6 +801,108 @@
});
}
+ private void setNamespaceRulesByPod(Pod pod, boolean install) {
+ String podIp = pod.getStatus().getPodIP();
+
+ if (podIp == null) {
+ return;
+ }
+
+ Integer nsHash = namespaceHashByPodIp(k8sPodService, k8sNamespaceService, podIp);
+
+ if (nsHash == null) {
+ return;
+ }
+
+ setNamespaceRulesBase(podIp, nsHash, install);
+ }
+
+ private void setNamespaceRulesByService(Service service, boolean install) {
+ String clusterIp = service.getSpec().getClusterIP();
+
+ if (clusterIp == null) {
+ return;
+ }
+
+ setNamespaceRulesBase(clusterIp, namespaceHashByServiceIp(k8sServiceService,
+ k8sNamespaceService, clusterIp), install);
+ }
+
+ private void setNamespaceRulesBase(String ip, Integer nsHash, boolean install) {
+
+ k8sNodeService.completeNodes().forEach(n -> {
+ TrafficSelector.Builder origBuilder = DefaultTrafficSelector.builder()
+ .matchEthType(TYPE_IPV4)
+ .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(ip), HOST_PREFIX));
+ TrafficSelector.Builder convBuilder = DefaultTrafficSelector.builder()
+ .matchEthType(TYPE_IPV4)
+ .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(
+ shiftIpDomain(ip, SHIFTED_IP_PREFIX)), HOST_PREFIX));
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+ .writeMetadata(nsHash, DEFAULT_METADATA_MASK)
+ .transition(GROUPING_TABLE);
+
+ k8sFlowRuleService.setRule(
+ appId,
+ n.intgBridge(),
+ origBuilder.build(),
+ tBuilder.build(),
+ PRIORITY_NAMESPACE_RULE,
+ NAMESPACE_TABLE,
+ install
+ );
+
+ k8sFlowRuleService.setRule(
+ appId,
+ n.intgBridge(),
+ convBuilder.build(),
+ tBuilder.build(),
+ PRIORITY_NAMESPACE_RULE,
+ NAMESPACE_TABLE,
+ install
+ );
+ });
+ }
+
+ private class InternalServiceListener implements K8sServiceListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(K8sServiceEvent event) {
+ Service service = event.subject();
+ switch (event.type()) {
+ case K8S_SERVICE_CREATED:
+ case K8S_SERVICE_UPDATED:
+ eventExecutor.execute(() -> processServiceCreation(service));
+ break;
+ case K8S_SERVICE_REMOVED:
+ eventExecutor.execute(() -> processServiceRemoval(service));
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void processServiceCreation(Service service) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setNamespaceRulesByService(service, true);
+ }
+
+ private void processServiceRemoval(Service service) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setNamespaceRulesByService(service, false);
+ }
+ }
+
private class InternalPodListener implements K8sPodListener {
private boolean isRelevantHelper() {
@@ -739,6 +932,7 @@
setBlockRulesByPod(pod, true);
setAllowRulesByPod(pod, true);
+ setNamespaceRulesByPod(pod, true);
}
private void processPodRemoval(Pod pod) {
@@ -748,6 +942,7 @@
setBlockRulesByPod(pod, false);
setAllowRulesByPod(pod, false);
+ setNamespaceRulesByPod(pod, false);
}
}
@@ -791,4 +986,44 @@
setAllowRulesByPolicy(policy, false);
}
}
+
+ private class InternalNamespaceListener implements K8sNamespaceListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(K8sNamespaceEvent event) {
+ Namespace ns = event.subject();
+ switch (event.type()) {
+ case K8S_NAMESPACE_CREATED:
+ case K8S_NAMESPACE_UPDATED:
+ eventExecutor.execute(() -> processNamespaceCreation(ns));
+ break;
+ case K8S_NAMESPACE_REMOVED:
+ eventExecutor.execute(() -> processNamespaceRemoval(ns));
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void processNamespaceCreation(Namespace namespace) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setDefaultAllowNamespaceRules(namespace, true);
+ setDefaultAllowServiceRules(true);
+ }
+
+ private void processNamespaceRemoval(Namespace namespace) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ // do nothing for now
+ }
+ }
}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
index 5660b56..2640508 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
@@ -21,6 +21,7 @@
import io.fabric8.kubernetes.api.model.EndpointPort;
import io.fabric8.kubernetes.api.model.EndpointSubset;
import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServicePort;
import org.onlab.packet.Ethernet;
@@ -46,6 +47,7 @@
import org.onosproject.k8snetworking.api.K8sNetworkEvent;
import org.onosproject.k8snetworking.api.K8sNetworkListener;
import org.onosproject.k8snetworking.api.K8sNetworkService;
+import org.onosproject.k8snetworking.api.K8sPodService;
import org.onosproject.k8snetworking.api.K8sServiceEvent;
import org.onosproject.k8snetworking.api.K8sServiceListener;
import org.onosproject.k8snetworking.api.K8sServiceService;
@@ -92,6 +94,7 @@
import static org.onosproject.k8snetworking.api.Constants.DST;
import static org.onosproject.k8snetworking.api.Constants.GROUPING_TABLE;
import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.api.Constants.NAMESPACE_TABLE;
import static org.onosproject.k8snetworking.api.Constants.NAT_STATEFUL;
import static org.onosproject.k8snetworking.api.Constants.NAT_STATELESS;
import static org.onosproject.k8snetworking.api.Constants.NAT_TABLE;
@@ -103,18 +106,19 @@
import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAT_RULE;
import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
import static org.onosproject.k8snetworking.api.Constants.SERVICE_FAKE_MAC_STR;
-import static org.onosproject.k8snetworking.api.Constants.SERVICE_PORT_MAP;
import static org.onosproject.k8snetworking.api.Constants.SERVICE_TABLE;
import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_CIDR;
import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
import static org.onosproject.k8snetworking.api.Constants.SRC;
+import static org.onosproject.k8snetworking.api.Constants.STAT_EGRESS_TABLE;
import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_CIDR;
import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_CIDR_DEFAULT;
import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE;
import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE_DEFAULT;
-import static org.onosproject.k8snetworking.api.Constants.STAT_EGRESS_TABLE;
import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getBclassIpPrefixFromCidr;
import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.podByIp;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.portNumberByName;
import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId;
import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_DST_FLAG;
import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension;
@@ -192,6 +196,8 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected K8sServiceService k8sServiceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sPodService k8sPodService;
/** Service IP address translation mode. */
private String serviceIpNatMode = SERVICE_IP_NAT_MODE_DEFAULT;
@@ -257,7 +263,7 @@
setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), serviceCidr,
GROUPING_TABLE, NAT_TABLE, PRIORITY_CT_RULE, install);
setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), n.cidr(),
- GROUPING_TABLE, ACL_TABLE, PRIORITY_CT_RULE, install);
+ GROUPING_TABLE, NAMESPACE_TABLE, PRIORITY_CT_RULE, install);
});
// +trk-new CT rules
@@ -371,28 +377,40 @@
.filter(Objects::nonNull)
.filter(sp -> sp.getTargetPort() != null)
.filter(sp -> sp.getTargetPort().getIntVal() != null ||
- (sp.getTargetPort().getStrVal() != null &&
- SERVICE_PORT_MAP.get(sp.getTargetPort().getStrVal()) != null))
+ sp.getTargetPort().getStrVal() != null)
.forEach(sp -> {
- int targetPort = sp.getTargetPort().getIntVal() != null ?
- sp.getTargetPort().getIntVal() :
- SERVICE_PORT_MAP.get(sp.getTargetPort().getStrVal());
- String targetProtocol = sp.getProtocol();
+ Integer targetPortInt = sp.getTargetPort().getIntVal() != null ?
+ sp.getTargetPort().getIntVal() : 0;
+ String targetPortName = sp.getTargetPort().getStrVal() != null ?
+ sp.getTargetPort().getStrVal() : "";
+ String targetProtocol = sp.getProtocol();
- for (Endpoints endpoints : endpointses) {
- for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
- for (EndpointPort endpointPort : endpointSubset.getPorts()) {
- if (targetProtocol.equals(endpointPort.getProtocol()) &&
- targetPort == endpointPort.getPort()) {
- Set<String> addresses = endpointSubset.getAddresses()
- .stream().map(EndpointAddress::getIp)
- .collect(Collectors.toSet());
- map.put(sp, addresses);
+ for (Endpoints endpoints : endpointses) {
+ for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
+
+ // in case service port name is specified but not port number
+ // we will lookup the container port number and use it
+ // as the target port number
+ if (!targetPortName.equals("") && targetPortInt == 0) {
+ for (EndpointAddress addr : endpointSubset.getAddresses()) {
+ Pod pod = podByIp(k8sPodService, addr.getIp());
+ targetPortInt = portNumberByName(pod, targetPortName);
+ }
+ }
+
+ for (EndpointPort endpointPort : endpointSubset.getPorts()) {
+ if (targetProtocol.equals(endpointPort.getProtocol()) &&
+ (targetPortInt.equals(endpointPort.getPort()) ||
+ targetPortName.equals(endpointPort.getName()))) {
+ Set<String> addresses = endpointSubset.getAddresses()
+ .stream().map(EndpointAddress::getIp)
+ .collect(Collectors.toSet());
+ map.put(sp, addresses);
+ }
+ }
}
}
- }
- }
- });
+ });
return map;
}
@@ -431,17 +449,24 @@
});
spEpasMap.forEach((sp, epas) ->
- // add flow rules for unshifting IP domain
- epas.forEach(epa -> {
- int targetPort = sp.getTargetPort().getIntVal() != null ?
- sp.getTargetPort().getIntVal() :
- SERVICE_PORT_MAP.get(sp.getTargetPort().getStrVal());
+ // add flow rules for unshifting IP domain
+ epas.forEach(epa -> {
- setUnshiftDomainRules(node.intgBridge(), POD_TABLE,
- PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
- sp.getProtocol(), nodeIpGatewayIpMap.getOrDefault(epa, epa),
- targetPort, install);
- })
+ String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
+
+ int targetPort;
+ if (sp.getTargetPort().getIntVal() == null) {
+ Pod pod = podByIp(k8sPodService, podIp);
+ targetPort = portNumberByName(pod, sp.getTargetPort().getStrVal());
+ } else {
+ targetPort = sp.getTargetPort().getIntVal();
+ }
+
+ setUnshiftDomainRules(node.intgBridge(), POD_TABLE,
+ PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
+ sp.getProtocol(), podIp,
+ targetPort, install);
+ })
);
}
}
@@ -450,9 +475,13 @@
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
.setIpDst(IpAddress.valueOf(podIpStr));
- int targetPort = sp.getTargetPort().getIntVal() != null ?
- sp.getTargetPort().getIntVal() :
- SERVICE_PORT_MAP.get(sp.getTargetPort().getStrVal());
+ int targetPort;
+ if (sp.getTargetPort().getIntVal() == null) {
+ Pod pod = podByIp(k8sPodService, podIpStr);
+ targetPort = portNumberByName(pod, sp.getTargetPort().getStrVal());
+ } else {
+ targetPort = sp.getTargetPort().getIntVal();
+ }
if (TCP.equals(sp.getProtocol())) {
tBuilder.setTcpDst(TpPort.tpPort(targetPort));
@@ -474,8 +503,7 @@
.filter(Objects::nonNull)
.filter(sp -> sp.getTargetPort() != null)
.filter(sp -> sp.getTargetPort().getIntVal() != null ||
- (sp.getTargetPort().getStrVal() != null &&
- SERVICE_PORT_MAP.get(sp.getTargetPort().getStrVal()) != null))
+ sp.getTargetPort().getStrVal() != null)
.collect(Collectors.toSet());
String serviceIp = service.getSpec().getClusterIP();
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java
index 4b9adc7..56c4398 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java
@@ -56,10 +56,10 @@
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE;
import static org.onosproject.k8snetworking.api.Constants.ARP_BROADCAST_MODE;
import static org.onosproject.k8snetworking.api.Constants.ARP_TABLE;
import static org.onosproject.k8snetworking.api.Constants.FORWARDING_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE;
import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
import static org.onosproject.k8snetworking.api.Constants.PRIORITY_SWITCHING_RULE;
import static org.onosproject.k8snetworking.api.Constants.PRIORITY_TUNNEL_TAG_RULE;
@@ -244,7 +244,6 @@
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
.setTunnelId(getVni(port));
-
if (ethType == Ethernet.TYPE_ARP) {
tBuilder.transition(ARP_TABLE);
} else if (ethType == Ethernet.TYPE_IPV4) {
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
index b9ef48a..754efdf 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
@@ -20,6 +20,10 @@
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerPort;
+import io.fabric8.kubernetes.api.model.Namespace;
+import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -28,8 +32,11 @@
import org.onlab.packet.IpAddress;
import org.onlab.packet.TpPort;
import org.onosproject.cfg.ConfigProperty;
+import org.onosproject.k8snetworking.api.K8sNamespaceService;
import org.onosproject.k8snetworking.api.K8sNetwork;
import org.onosproject.k8snetworking.api.K8sNetworkService;
+import org.onosproject.k8snetworking.api.K8sPodService;
+import org.onosproject.k8snetworking.api.K8sServiceService;
import org.onosproject.k8snode.api.K8sApiConfig;
import org.onosproject.k8snode.api.K8sApiConfigService;
import org.onosproject.k8snode.api.K8sNode;
@@ -49,6 +56,7 @@
import java.util.Set;
import java.util.stream.Collectors;
+import static org.onosproject.k8snetworking.api.Constants.DEFAULT_NAMESPACE_HASH;
import static org.onosproject.k8snetworking.api.Constants.PORT_NAME_PREFIX_CONTAINER;
/**
@@ -382,6 +390,106 @@
return portMaskMap;
}
+ /**
+ * Returns the namespace hash value by given POD IP.
+ *
+ * @param k8sPodService kubernetes POD service
+ * @param k8sNamespaceService kubernetes namespace service
+ * @param podIp POD IP address
+ * @return namespace hash value
+ */
+ public static Integer namespaceHashByPodIp(K8sPodService k8sPodService,
+ K8sNamespaceService k8sNamespaceService,
+ String podIp) {
+ String ns = k8sPodService.pods().stream()
+ .filter(pod -> pod.getStatus().getPodIP() != null)
+ .filter(pod -> pod.getStatus().getPodIP().equals(podIp))
+ .map(pod -> pod.getMetadata().getNamespace())
+ .findAny().orElse(null);
+
+ if (ns != null) {
+ return k8sNamespaceService.namespaces().stream()
+ .filter(n -> n.getMetadata().getName().equals(ns))
+ .map(Namespace::hashCode).findAny().orElse(null);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Returns the namespace hash value by given service IP.
+ *
+ * @param k8sServiceService kubernetes service service
+ * @param k8sNamespaceService kubernetes namespace service
+ * @param serviceIp service IP address
+ * @return namespace hash value
+ */
+ public static int namespaceHashByServiceIp(K8sServiceService k8sServiceService,
+ K8sNamespaceService k8sNamespaceService,
+ String serviceIp) {
+ String ns = k8sServiceService.services().stream()
+ .filter(service -> service.getSpec().getClusterIP() != null)
+ .filter(service -> service.getSpec().getClusterIP().equalsIgnoreCase(serviceIp))
+ .map(service -> service.getMetadata().getNamespace())
+ .findAny().orElse(null);
+
+ if (ns != null) {
+ return namespaceHashByNamespace(k8sNamespaceService, ns);
+ } else {
+ return DEFAULT_NAMESPACE_HASH;
+ }
+ }
+
+ /**
+ * Returns the namespace hash value by given namespace name.
+ *
+ * @param k8sNamespaceService kubernetes namespace service
+ * @param ns namespace name
+ * @return namespace hash value
+ */
+ public static int namespaceHashByNamespace(K8sNamespaceService k8sNamespaceService,
+ String ns) {
+
+ return k8sNamespaceService.namespaces().stream()
+ .filter(n -> n.getMetadata().getName() != null)
+ .filter(n -> n.getMetadata().getName().equalsIgnoreCase(ns))
+ .map(Namespace::hashCode).findAny().orElse(DEFAULT_NAMESPACE_HASH);
+ }
+
+ /**
+ * Returns POD instance by POD IP address.
+ *
+ * @param podService kubernetes POD service
+ * @param podIp POD IP address
+ * @return POD instance
+ */
+ public static Pod podByIp(K8sPodService podService, String podIp) {
+ return podService.pods().stream()
+ .filter(pod -> pod.getStatus().getPodIP() != null)
+ .filter(pod -> pod.getStatus().getPodIP().equals(podIp))
+ .findAny().orElse(null);
+ }
+
+ /**
+ * Returns the container port number by given container port name.
+ *
+ * @param pod kubernetes POD
+ * @param portName port name
+ * @return container port number,
+ * return 0 if there is no port number mapped with the given port name
+ */
+ public static int portNumberByName(Pod pod, String portName) {
+ for (Container container : pod.getSpec().getContainers()) {
+ for (ContainerPort cp : container.getPorts()) {
+ if (cp.getName() != null && cp.getName().equals(portName)) {
+ return cp.getContainerPort();
+ }
+ }
+ }
+
+ return 0;
+ }
+
private static int binLower(String binStr, int bits) {
StringBuilder outBin = new StringBuilder(
binStr.substring(MASK_BEGIN_IDX, MASK_MAX_IDX - bits));