Support to inject external bridge into k8s node for external routing
1. Add group bucket related rules on receiving endpoint events
rather than POD events.
Change-Id: I1152343cf8ff6bbccaed3dc34908a3affbc70980
diff --git a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
index d91b0be..adc040a 100644
--- a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
+++ b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
@@ -51,6 +51,9 @@
/**
* Configures buckets to the existing group.
+ * With install flag true, this method will add buckets to existing buckets,
+ * while with install flag false, this method will remove buckets from
+ * existing buckets.
*
* @param appId application ID
* @param deviceId device ID
@@ -60,4 +63,15 @@
*/
void setBuckets(ApplicationId appId, DeviceId deviceId, int groupId,
List<GroupBucket> buckets, boolean install);
+
+ /**
+ * Configures buckets.
+ *
+ * @param appId application ID
+ * @param deviceId device ID
+ * @param groupId group ID
+ * @param buckets a lit of group buckets
+ */
+ void setBuckets(ApplicationId appId, DeviceId deviceId, int groupId,
+ List<GroupBucket> buckets);
}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
index 51dc9c8..cb2b864 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
@@ -97,4 +97,11 @@
new GroupBuckets(buckets), getGroupKey(groupId), appId);
}
}
+
+ @Override
+ public void setBuckets(ApplicationId appId, DeviceId deviceId, int groupId,
+ List<GroupBucket> buckets) {
+ groupService.setBucketsForGroup(deviceId, getGroupKey(groupId),
+ new GroupBuckets(buckets), getGroupKey(groupId), appId);
+ }
}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
index 76785df..799ecb8 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
@@ -21,7 +21,6 @@
import io.fabric8.kubernetes.api.model.EndpointPort;
import io.fabric8.kubernetes.api.model.EndpointSubset;
import io.fabric8.kubernetes.api.model.Endpoints;
-import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServicePort;
import org.onlab.packet.Ethernet;
@@ -47,9 +46,6 @@
import org.onosproject.k8snetworking.api.K8sNetworkEvent;
import org.onosproject.k8snetworking.api.K8sNetworkListener;
import org.onosproject.k8snetworking.api.K8sNetworkService;
-import org.onosproject.k8snetworking.api.K8sPodEvent;
-import org.onosproject.k8snetworking.api.K8sPodListener;
-import org.onosproject.k8snetworking.api.K8sPodService;
import org.onosproject.k8snetworking.api.K8sServiceEvent;
import org.onosproject.k8snetworking.api.K8sServiceListener;
import org.onosproject.k8snetworking.api.K8sServiceService;
@@ -70,7 +66,6 @@
import org.onosproject.net.flow.criteria.ExtensionSelector;
import org.onosproject.net.flow.instructions.ExtensionTreatment;
import org.onosproject.net.group.GroupBucket;
-import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.StorageService;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
@@ -142,17 +137,11 @@
private static final int HOST_CIDR_NUM = 32;
- private static final String NONE = "None";
private static final String CLUSTER_IP = "ClusterIP";
private static final String TCP = "TCP";
private static final String UDP = "UDP";
private static final String SERVICE_IP_NAT_MODE = "serviceIpNatMode";
- private static final String GROUP_ID_COUNTER_NAME = "group-id-counter";
-
- private static final String IP_ADDRESS = "ipAddress";
- private static final String KUBERNETES = "kubernetes";
-
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@@ -192,8 +181,6 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected K8sServiceService k8sServiceService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected K8sPodService k8sPodService;
/** Service IP address translation mode. */
private String serviceIpNatMode = SERVICE_IP_NAT_MODE_DEFAULT;
@@ -207,15 +194,11 @@
new InternalNodeEventListener();
private final InternalK8sServiceListener internalK8sServiceListener =
new InternalK8sServiceListener();
- private final InternalK8sPodListener internalK8sPodListener =
- new InternalK8sPodListener();
private final InternalK8sEndpointsListener internalK8sEndpointsListener =
new InternalK8sEndpointsListener();
private final InternalK8sNetworkListener internalK8sNetworkListener =
new InternalK8sNetworkListener();
- private AtomicCounter groupIdCounter;
-
private ApplicationId appId;
private NodeId localNodeId;
@@ -227,19 +210,15 @@
leadershipService.runForLeadership(appId.name());
k8sNodeService.addListener(internalNodeEventListener);
k8sServiceService.addListener(internalK8sServiceListener);
- k8sPodService.addListener(internalK8sPodListener);
k8sEndpointsService.addListener(internalK8sEndpointsListener);
k8sNetworkService.addListener(internalK8sNetworkListener);
- groupIdCounter = storageService.getAtomicCounter(GROUP_ID_COUNTER_NAME);
-
log.info("Started");
}
@Deactivate
protected void deactivate() {
leadershipService.withdraw(appId.name());
- k8sPodService.removeListener(internalK8sPodListener);
k8sNodeService.removeListener(internalNodeEventListener);
k8sServiceService.removeListener(internalK8sServiceListener);
k8sEndpointsService.removeListener(internalK8sEndpointsListener);
@@ -376,64 +355,55 @@
return map;
}
- private void setGroupBucketsFromPod(DeviceId deviceId, Service service,
- Pod pod, boolean install) {
- if (pod.getMetadata().getAnnotations() == null) {
- return;
- }
-
- String podIpStr = pod.getMetadata().getAnnotations().get(IP_ADDRESS);
-
- setGroupBuckets(deviceId, service, podIpStr, install);
- }
-
- private void setGroupBuckets(DeviceId deviceId, Service service,
- String podIp, boolean install) {
+ private void setGroupBuckets(Service service, boolean install) {
Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
Map<String, String> nodeIpGatewayIpMap =
nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
- spEpasMap.forEach((sp, epas) -> {
- List<GroupBucket> bkts = Lists.newArrayList();
+ for (K8sNode node : k8sNodeService.completeNodes()) {
+ spEpasMap.forEach((sp, epas) -> {
+ List<GroupBucket> bkts = Lists.newArrayList();
- if (install) {
- if (epas.contains(podIp)) {
- bkts = buildBuckets(deviceId,
- nodeIpGatewayIpMap.getOrDefault(podIp, podIp), sp);
- }
- } else {
- bkts = buildBuckets(deviceId,
- nodeIpGatewayIpMap.getOrDefault(podIp, podIp), sp);
- }
-
- spGrpBkts.put(sp, bkts);
- });
-
- String serviceIp = service.getSpec().getClusterIP();
- spGrpBkts.forEach((sp, bkts) -> {
- String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
- int groupId = svcStr.hashCode();
-
- k8sGroupRuleService.setBuckets(appId, deviceId, groupId, bkts, install);
- });
-
- spEpasMap.forEach((sp, epas) ->
- // add flow rules for unshifting IP domain
- epas.forEach(epa -> {
- setUnshiftDomainRules(deviceId, POD_TABLE,
- PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
- sp.getProtocol(), nodeIpGatewayIpMap.getOrDefault(epa, epa),
- sp.getTargetPort().getIntVal(), install);
+ for (String ip : epas) {
+ if (install) {
+ bkts.add(buildBuckets(node.intgBridge(),
+ nodeIpGatewayIpMap.getOrDefault(ip, ip), sp));
+ } else {
+ bkts.add(buildBuckets(node.intgBridge(),
+ nodeIpGatewayIpMap.getOrDefault(ip, ip), sp));
}
- ));
+ }
+
+ spGrpBkts.put(sp, bkts);
+ });
+
+ String serviceIp = service.getSpec().getClusterIP();
+ spGrpBkts.forEach((sp, bkts) -> {
+ String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
+ int groupId = svcStr.hashCode();
+
+ if (bkts.size() > 0) {
+ k8sGroupRuleService.setBuckets(appId, node.intgBridge(), groupId, bkts);
+ }
+ });
+
+ spEpasMap.forEach((sp, epas) ->
+ // add flow rules for unshifting IP domain
+ epas.forEach(epa -> {
+ setUnshiftDomainRules(node.intgBridge(), POD_TABLE,
+ PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
+ sp.getProtocol(), nodeIpGatewayIpMap.getOrDefault(epa, epa),
+ sp.getTargetPort().getIntVal(), install);
+ }
+ )
+ );
+ }
}
- private List<GroupBucket> buildBuckets(DeviceId deviceId,
+ private GroupBucket buildBuckets(DeviceId deviceId,
String podIpStr,
ServicePort sp) {
- List<GroupBucket> bkts = Lists.newArrayList();
-
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
.setIpDst(IpAddress.valueOf(podIpStr));
@@ -447,9 +417,7 @@
deviceService.getDevice(deviceId), ROUTING_TABLE);
tBuilder.extension(resubmitTreatment, deviceId);
- bkts.add(buildGroupBucket(tBuilder.build(), SELECT, (short) -1));
-
- return bkts;
+ return buildGroupBucket(tBuilder.build(), SELECT, (short) -1);
}
private synchronized void setStatelessGroupFlowRules(DeviceId deviceId,
@@ -622,8 +590,6 @@
private void setStatefulGroupFlowRules(DeviceId deviceId, long ctState,
long ctMask, Service service,
boolean install) {
- int groupId = (int) groupIdCounter.incrementAndGet();
-
List<GroupBucket> buckets = Lists.newArrayList();
String serviceName = service.getMetadata().getName();
@@ -631,6 +597,10 @@
// TODO: multi-ports case should be addressed
Integer servicePort = service.getSpec().getPorts().get(0).getPort();
+ String serviceProtocol = service.getSpec().getPorts().get(0).getProtocol();
+
+ String svcStr = servicePortStr(serviceIp, servicePort, serviceProtocol);
+ int groupId = svcStr.hashCode();
List<Endpoints> endpointses = k8sEndpointsService.endpointses()
.stream()
@@ -763,29 +733,17 @@
install);
}
- private void setK8sApiRules(K8sNode k8sNode, Endpoints endpoints, boolean install) {
- if (KUBERNETES.equals(endpoints.getMetadata().getName())) {
- Service service = k8sServiceService.services().stream().filter(s ->
- KUBERNETES.equals(s.getMetadata().getName()))
- .findFirst().orElse(null);
- if (service == null) {
- return;
- }
+ private void setEndpointsRules(Endpoints endpoints, boolean install) {
+ String appName = endpoints.getMetadata().getName();
+ Service service = k8sServiceService.services().stream().filter(s ->
+ appName.equals(s.getMetadata().getName()))
+ .findFirst().orElse(null);
- if (k8sNode != null) {
- endpoints.getSubsets().forEach(s -> {
- s.getAddresses().forEach(a -> setGroupBuckets(
- k8sNode.intgBridge(), service, a.getIp(), install));
- });
- }
-
- k8sNodeService.completeNodes().forEach(n -> {
- endpoints.getSubsets().forEach(s -> {
- s.getAddresses().forEach(a -> setGroupBuckets(
- n.intgBridge(), service, a.getIp(), install));
- });
- });
+ if (service == null) {
+ return;
}
+
+ setGroupBuckets(service, install);
}
private String servicePortStr(String ip, int port, String protocol) {
@@ -820,20 +778,6 @@
}
}
- private void setServiceRuleFromPod(Pod pod, boolean install) {
- k8sServiceService.services().forEach(s -> {
- pod.getMetadata().getLabels().forEach((pk, pv) -> {
- Map<String, String> selectors = s.getSpec().getSelector();
- if (selectors != null && selectors.containsKey(pk)) {
- if (pv.equals(selectors.get(pk))) {
- k8sNodeService.completeNodes().forEach(n ->
- setGroupBucketsFromPod(n.intgBridge(), s, pod, install));
- }
- }
- });
- });
- }
-
private class InternalK8sServiceListener implements K8sServiceListener {
private boolean isRelevantHelper() {
@@ -893,44 +837,6 @@
}
}
- private class InternalK8sPodListener implements K8sPodListener {
-
- private boolean isRelevantHelper() {
- return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
- }
-
- @Override
- public void event(K8sPodEvent event) {
- switch (event.type()) {
- case K8S_POD_CREATED:
- case K8S_POD_UPDATED:
- eventExecutor.execute(() -> processPodDetection(event.subject()));
- break;
- case K8S_POD_REMOVED:
- eventExecutor.execute(() -> processPodRemoval(event.subject()));
- break;
- default:
- break;
- }
- }
-
- private void processPodDetection(Pod pod) {
- if (!isRelevantHelper()) {
- return;
- }
-
- setServiceRuleFromPod(pod, true);
- }
-
- private void processPodRemoval(Pod pod) {
- if (!isRelevantHelper()) {
- return;
- }
-
- setServiceRuleFromPod(pod, false);
- }
- }
-
private class InternalK8sEndpointsListener implements K8sEndpointsListener {
private boolean isRelevantHelper() {
@@ -943,6 +849,7 @@
switch (event.type()) {
case K8S_ENDPOINTS_CREATED:
+ case K8S_ENDPOINTS_UPDATED:
eventExecutor.execute(() -> processEndpointsCreation(endpoints));
break;
case K8S_ENDPOINTS_REMOVED:
@@ -958,7 +865,7 @@
return;
}
- setK8sApiRules(null, endpoints, true);
+ setEndpointsRules(endpoints, true);
}
private void processEndpointsRemoval(Endpoints endpoints) {
@@ -966,7 +873,7 @@
return;
}
- setK8sApiRules(null, endpoints, false);
+ setEndpointsRules(endpoints, false);
}
}
@@ -996,7 +903,7 @@
}
setServiceNatRules(node.intgBridge(), true);
- k8sEndpointsService.endpointses().forEach(e -> setK8sApiRules(node, e, true));
+ k8sEndpointsService.endpointses().forEach(e -> setEndpointsRules(e, true));
k8sNetworkService.networks().forEach(n -> setupServiceDefaultRule(n, true));
}
}