Fix: set dummy group rules on receiving service add/remove events
Change-Id: I5e33bfaedc827b9563d08f60adf73ddf4de8ca73
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
index 5809aab..26d8400 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
@@ -37,6 +37,8 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.core.GroupId;
+import org.onosproject.k8snetworking.api.K8sEndpointsEvent;
+import org.onosproject.k8snetworking.api.K8sEndpointsListener;
import org.onosproject.k8snetworking.api.K8sEndpointsService;
import org.onosproject.k8snetworking.api.K8sFlowRuleService;
import org.onosproject.k8snetworking.api.K8sGroupRuleService;
@@ -137,6 +139,7 @@
private static final String GROUP_ID_COUNTER_NAME = "group-id-counter";
private static final String IP_ADDRESS = "ipAddress";
+ private static final String KUBERNETES = "kubernetes";
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@@ -191,6 +194,8 @@
new InternalK8sServiceListener();
private final InternalK8sPodListener internalK8sPodListener =
new InternalK8sPodListener();
+ private final InternalK8sEndpointsListener internalK8sEndpointsListener =
+ new InternalK8sEndpointsListener();
private AtomicCounter groupIdCounter;
@@ -206,6 +211,7 @@
k8sNodeService.addListener(internalNodeEventListener);
k8sServiceService.addListener(internalK8sServiceListener);
k8sPodService.addListener(internalK8sPodListener);
+ k8sEndpointsService.addListener(internalK8sEndpointsListener);
groupIdCounter = storageService.getAtomicCounter(GROUP_ID_COUNTER_NAME);
@@ -218,6 +224,7 @@
k8sPodService.removeListener(internalK8sPodListener);
k8sNodeService.removeListener(internalNodeEventListener);
k8sServiceService.removeListener(internalK8sServiceListener);
+ k8sEndpointsService.removeListener(internalK8sEndpointsListener);
configService.unregisterProperties(getClass(), false);
eventExecutor.shutdown();
@@ -354,26 +361,35 @@
return map;
}
- private void setGroupBuckets(DeviceId deviceId, Service service, Pod pod, boolean install) {
-
+ private void setGroupBucketsFromPod(DeviceId deviceId, Service service,
+ Pod pod, boolean install) {
if (pod.getMetadata().getAnnotations() == null) {
return;
}
String podIpStr = pod.getMetadata().getAnnotations().get(IP_ADDRESS);
+ setGroupBuckets(deviceId, service, podIpStr, install);
+ }
+
+ private void setGroupBuckets(DeviceId deviceId, Service service,
+ String podIp, boolean install) {
Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
+ Map<String, String> nodeIpGatewayIpMap =
+ nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
spEpasMap.forEach((sp, epas) -> {
List<GroupBucket> bkts = Lists.newArrayList();
if (install) {
- if (epas.contains(podIpStr)) {
- bkts = buildBuckets(deviceId, podIpStr, sp);
+ if (epas.contains(podIp)) {
+ bkts = buildBuckets(deviceId,
+ nodeIpGatewayIpMap.getOrDefault(podIp, podIp), sp);
}
} else {
- bkts = buildBuckets(deviceId, podIpStr, sp);
+ bkts = buildBuckets(deviceId,
+ nodeIpGatewayIpMap.getOrDefault(podIp, podIp), sp);
}
spGrpBkts.put(sp, bkts);
@@ -386,6 +402,16 @@
k8sGroupRuleService.setBuckets(appId, deviceId, groupId, bkts, install);
});
+
+ spEpasMap.forEach((sp, epas) ->
+ // add flow rules for unshifting IP domain
+ epas.forEach(epa -> {
+ setUnshiftDomainRules(deviceId, POD_TABLE,
+ PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
+ sp.getProtocol(), nodeIpGatewayIpMap.getOrDefault(epa, epa),
+ sp.getTargetPort().getIntVal(), install);
+ }
+ ));
}
private List<GroupBucket> buildBuckets(DeviceId deviceId,
@@ -414,36 +440,14 @@
private synchronized void setStatelessGroupFlowRules(DeviceId deviceId,
Service service,
boolean install) {
- Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
- Map<String, String> nodeIpGatewayIpMap =
- nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
- Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
-
- spEpasMap.forEach((sp, epas) -> {
- List<GroupBucket> bkts = Lists.newArrayList();
- epas.forEach(epa -> {
- String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
-
- TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
- .setIpDst(IpAddress.valueOf(podIp));
-
- if (TCP.equals(sp.getProtocol())) {
- tBuilder.setTcpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
- } else if (UDP.equals(sp.getProtocol())) {
- tBuilder.setUdpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
- }
-
- ExtensionTreatment resubmitTreatment = buildResubmitExtension(
- deviceService.getDevice(deviceId), ROUTING_TABLE);
- tBuilder.extension(resubmitTreatment, deviceId);
-
- bkts.add(buildGroupBucket(tBuilder.build(), SELECT, (short) -1));
- });
- spGrpBkts.put(sp, bkts);
- });
+ Set<ServicePort> sps = service.getSpec().getPorts().stream()
+ .filter(Objects::nonNull)
+ .filter(sp -> sp.getTargetPort() != null)
+ .filter(sp -> sp.getTargetPort().getIntVal() != null)
+ .collect(Collectors.toSet());
String serviceIp = service.getSpec().getClusterIP();
- spGrpBkts.forEach((sp, bkts) -> {
+ sps.forEach(sp -> {
String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
int groupId = svcStr.hashCode();
@@ -451,7 +455,7 @@
// add group table rules
k8sGroupRuleService.setRule(appId, deviceId, groupId,
- SELECT, bkts, true);
+ SELECT, Lists.newArrayList(), true);
log.info("Adding group rule {}", groupId);
@@ -471,21 +475,11 @@
// remove group table rules
k8sGroupRuleService.setRule(appId, deviceId, groupId,
- SELECT, bkts, false);
+ SELECT, Lists.newArrayList(), false);
log.info("Removing group rule {}", groupId);
}
});
-
- spEpasMap.forEach((sp, epas) ->
- // add flow rules for unshifting IP domain
- epas.forEach(epa -> {
- String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
- setUnshiftDomainRules(deviceId, POD_TABLE,
- PRIORITY_NAT_RULE, serviceIp, sp.getPort(), sp.getProtocol(),
- podIp, sp.getTargetPort().getIntVal(), install);
- }
- ));
}
private void setShiftDomainRules(DeviceId deviceId, int installTable,
@@ -708,6 +702,31 @@
install);
}
+ private void setK8sApiRules(K8sNode k8sNode, Endpoints endpoints, boolean install) {
+ if (KUBERNETES.equals(endpoints.getMetadata().getName())) {
+ Service service = k8sServiceService.services().stream().filter(s ->
+ KUBERNETES.equals(s.getMetadata().getName()))
+ .findFirst().orElse(null);
+ if (service == null) {
+ return;
+ }
+
+ if (k8sNode != null) {
+ endpoints.getSubsets().forEach(s -> {
+ s.getAddresses().forEach(a -> setGroupBuckets(
+ k8sNode.intgBridge(), service, a.getIp(), install));
+ });
+ }
+
+ k8sNodeService.completeNodes().forEach(n -> {
+ endpoints.getSubsets().forEach(s -> {
+ s.getAddresses().forEach(a -> setGroupBuckets(
+ n.intgBridge(), service, a.getIp(), install));
+ });
+ });
+ }
+ }
+
/**
* Extracts properties from the component configuration context.
*
@@ -731,6 +750,20 @@
}
}
+ private void setServiceRuleFromPod(Pod pod, boolean install) {
+ k8sServiceService.services().forEach(s -> {
+ pod.getMetadata().getLabels().forEach((pk, pv) -> {
+ Map<String, String> selectors = s.getSpec().getSelector();
+ if (selectors != null && selectors.containsKey(pk)) {
+ if (pv.equals(selectors.get(pk))) {
+ k8sNodeService.completeNodes().forEach(n ->
+ setGroupBucketsFromPod(n.intgBridge(), s, pod, install));
+ }
+ }
+ });
+ });
+ }
+
private class InternalK8sServiceListener implements K8sServiceListener {
private boolean isRelevantHelper() {
@@ -799,8 +832,9 @@
@Override
public void event(K8sPodEvent event) {
switch (event.type()) {
- case K8S_POD_ANNOTATION_ADDED:
- eventExecutor.execute(() -> processPodAnnotAddition(event.subject()));
+ case K8S_POD_CREATED:
+ case K8S_POD_UPDATED:
+ eventExecutor.execute(() -> processPodDetection(event.subject()));
break;
case K8S_POD_REMOVED:
eventExecutor.execute(() -> processPodRemoval(event.subject()));
@@ -810,7 +844,7 @@
}
}
- private void processPodAnnotAddition(Pod pod) {
+ private void processPodDetection(Pod pod) {
if (!isRelevantHelper()) {
return;
}
@@ -825,19 +859,44 @@
setServiceRuleFromPod(pod, false);
}
+ }
- private void setServiceRuleFromPod(Pod pod, boolean install) {
- k8sServiceService.services().forEach(s -> {
- pod.getMetadata().getLabels().forEach((pk, pv) -> {
- Map<String, String> selectors = s.getSpec().getSelector();
- if (selectors != null && selectors.containsKey(pk)) {
- if (pv.equals(selectors.get(pk))) {
- k8sNodeService.completeNodes().forEach(n ->
- setGroupBuckets(n.intgBridge(), s, pod, install));
- }
- }
- });
- });
+ private class InternalK8sEndpointsListener implements K8sEndpointsListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(K8sEndpointsEvent event) {
+ Endpoints endpoints = event.subject();
+
+ switch (event.type()) {
+ case K8S_ENDPOINTS_CREATED:
+ eventExecutor.execute(() -> processEndpointsCreation(endpoints));
+ break;
+ case K8S_ENDPOINTS_REMOVED:
+ eventExecutor.execute(() -> processEndpointsRemoval(endpoints));
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void processEndpointsCreation(Endpoints endpoints) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setK8sApiRules(null, endpoints, true);
+ }
+
+ private void processEndpointsRemoval(Endpoints endpoints) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setK8sApiRules(null, endpoints, false);
}
}
@@ -867,6 +926,8 @@
}
setServiceNatRules(node.intgBridge(), true);
+
+ k8sEndpointsService.endpointses().forEach(e -> setK8sApiRules(node, e, true));
}
}
}