Fix: register a set of Affinity classes into distributed store
1. Purge group rules when issue k8s purge rules command.
2. Remove k8s port when remove OVS port.
3. Re-install group rules during sync rules.
4. Install: 1) install group rule; 2) install flow rule
Uninstall: 1) uninstall flow rule; 2) uninstall group rule
5. Add/remove group buckets when receiving POD update/remove
events.
6. Lower down the endpoint update logging level
Change-Id: Ib50e359a9b2c0cd9cb1490c6172864ad118b2247
diff --git a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
index fc935c0..d91b0be 100644
--- a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
+++ b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
@@ -39,4 +39,25 @@
*/
void setRule(ApplicationId appId, DeviceId deviceId, int groupId,
Type type, List<GroupBucket> buckets, boolean install);
+
+ /**
+ * Checks whether has the group in store with given device ID and group ID.
+ *
+ * @param deviceId device ID
+ * @param groupId group ID
+ * @return true if the group exists, false otherwise
+ */
+ boolean hasGroup(DeviceId deviceId, int groupId);
+
+ /**
+ * Configures buckets to the existing group.
+ *
+ * @param appId application ID
+ * @param deviceId device ID
+ * @param groupId group ID
+ * @param buckets a list of group buckets
+ * @param install true for buckets addition, false for buckets removal
+ */
+ void setBuckets(ApplicationId appId, DeviceId deviceId, int groupId,
+ List<GroupBucket> buckets, boolean install);
}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeRulesCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeRulesCommand.java
index 1658a35..477ec77 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeRulesCommand.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeRulesCommand.java
@@ -20,7 +20,11 @@
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeService;
import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.group.Group;
+import org.onosproject.net.group.GroupService;
import static java.lang.Thread.sleep;
import static java.util.stream.StreamSupport.stream;
@@ -40,7 +44,9 @@
@Override
protected void doExecute() {
FlowRuleService flowRuleService = get(FlowRuleService.class);
+ GroupService groupService = get(GroupService.class);
CoreService coreService = get(CoreService.class);
+ K8sNodeService k8sNodeService = get(K8sNodeService.class);
ApplicationId appId = coreService.getAppId(K8S_NETWORKING_APP_ID);
if (appId == null) {
@@ -80,6 +86,12 @@
}
}
+ for (K8sNode node : k8sNodeService.completeNodes()) {
+ for (Group group : groupService.getGroups(node.intgBridge(), appId)) {
+ groupService.removeGroup(node.intgBridge(), group.appCookie(), appId);
+ }
+ }
+
if (result) {
print("Successfully purged flow rules!");
} else {
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sPodStore.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sPodStore.java
index fe16346..138555b 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sPodStore.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sPodStore.java
@@ -16,6 +16,7 @@
package org.onosproject.k8snetworking.impl;
import com.google.common.collect.ImmutableSet;
+import io.fabric8.kubernetes.api.model.Affinity;
import io.fabric8.kubernetes.api.model.Capabilities;
import io.fabric8.kubernetes.api.model.ConfigMapEnvSource;
import io.fabric8.kubernetes.api.model.ConfigMapKeySelector;
@@ -37,16 +38,26 @@
import io.fabric8.kubernetes.api.model.HostPathVolumeSource;
import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.KeyToPath;
+import io.fabric8.kubernetes.api.model.LabelSelector;
+import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
import io.fabric8.kubernetes.api.model.Lifecycle;
+import io.fabric8.kubernetes.api.model.NodeAffinity;
+import io.fabric8.kubernetes.api.model.NodeSelector;
+import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
+import io.fabric8.kubernetes.api.model.NodeSelectorTerm;
import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSource;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodAffinity;
+import io.fabric8.kubernetes.api.model.PodAffinityTerm;
+import io.fabric8.kubernetes.api.model.PodAntiAffinity;
import io.fabric8.kubernetes.api.model.PodCondition;
import io.fabric8.kubernetes.api.model.PodSecurityContext;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.PodStatus;
+import io.fabric8.kubernetes.api.model.PreferredSchedulingTerm;
import io.fabric8.kubernetes.api.model.Probe;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceFieldSelector;
@@ -61,6 +72,7 @@
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeDevice;
import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.WeightedPodAffinityTerm;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
@@ -157,6 +169,18 @@
.register(ConfigMapVolumeSource.class)
.register(KeyToPath.class)
.register(HostPathVolumeSource.class)
+ .register(Affinity.class)
+ .register(NodeAffinity.class)
+ .register(NodeSelector.class)
+ .register(NodeSelectorTerm.class)
+ .register(NodeSelectorRequirement.class)
+ .register(PreferredSchedulingTerm.class)
+ .register(PodAffinity.class)
+ .register(WeightedPodAffinityTerm.class)
+ .register(PodAffinityTerm.class)
+ .register(LabelSelector.class)
+ .register(LabelSelectorRequirement.class)
+ .register(PodAntiAffinity.class)
.register(LinkedHashMap.class)
.register(Collection.class)
.build();
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsManager.java
index de594aa..ac9e2c0 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsManager.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsManager.java
@@ -108,7 +108,7 @@
k8sEndpointsStore.updateEndpoints(endpoints);
- log.info(String.format(MSG_ENDPOINTS, endpoints.getMetadata().getName(), MSG_UPDATED));
+ log.debug(String.format(MSG_ENDPOINTS, endpoints.getMetadata().getName(), MSG_UPDATED));
}
@Override
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
index 683b070..ecc35fe 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
@@ -143,9 +143,7 @@
eventExecutor.execute(() -> processAddition(endpoints));
break;
case MODIFIED:
- // FIXME: there are too frequent endpoints update events
- // issued from kubernetes API server, we disable update for now
- // eventExecutor.execute(() -> processModification(endpoints));
+ eventExecutor.execute(() -> processModification(endpoints));
break;
case DELETED:
eventExecutor.execute(() -> processDeletion(endpoints));
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
index aa2b90f..51dc9c8 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
@@ -70,13 +70,31 @@
@Override
public void setRule(ApplicationId appId, DeviceId deviceId, int groupId,
Type type, List<GroupBucket> buckets, boolean install) {
- GroupDescription groupDesc = new DefaultGroupDescription(deviceId,
- type, new GroupBuckets(buckets), getGroupKey(groupId), groupId, appId);
if (install) {
+ GroupDescription groupDesc = new DefaultGroupDescription(deviceId,
+ type, new GroupBuckets(buckets), getGroupKey(groupId), groupId, appId);
groupService.addGroup(groupDesc);
} else {
groupService.removeGroup(deviceId, getGroupKey(groupId), appId);
}
}
+
+ @Override
+ public boolean hasGroup(DeviceId deviceId, int groupId) {
+ return groupService.getGroup(deviceId, getGroupKey(groupId)) != null;
+ }
+
+ @Override
+ public void setBuckets(ApplicationId appId, DeviceId deviceId, int groupId,
+ List<GroupBucket> buckets, boolean install) {
+
+ if (install) {
+ groupService.addBucketsToGroup(deviceId, getGroupKey(groupId),
+ new GroupBuckets(buckets), getGroupKey(groupId), appId);
+ } else {
+ groupService.removeBucketsFromGroup(deviceId, getGroupKey(groupId),
+ new GroupBuckets(buckets), getGroupKey(groupId), appId);
+ }
+ }
}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
index f2e56e0..791cbc9 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
@@ -21,6 +21,7 @@
import io.fabric8.kubernetes.api.model.EndpointPort;
import io.fabric8.kubernetes.api.model.EndpointSubset;
import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServicePort;
import org.onlab.packet.Ethernet;
@@ -40,6 +41,9 @@
import org.onosproject.k8snetworking.api.K8sFlowRuleService;
import org.onosproject.k8snetworking.api.K8sGroupRuleService;
import org.onosproject.k8snetworking.api.K8sNetworkService;
+import org.onosproject.k8snetworking.api.K8sPodEvent;
+import org.onosproject.k8snetworking.api.K8sPodListener;
+import org.onosproject.k8snetworking.api.K8sPodService;
import org.onosproject.k8snetworking.api.K8sServiceEvent;
import org.onosproject.k8snetworking.api.K8sServiceListener;
import org.onosproject.k8snetworking.api.K8sServiceService;
@@ -59,10 +63,7 @@
import org.onosproject.net.flow.criteria.ExtensionSelector;
import org.onosproject.net.flow.instructions.ExtensionTreatment;
import org.onosproject.net.group.GroupBucket;
-import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AtomicCounter;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
@@ -133,6 +134,8 @@
private static final String GROUP_ID_COUNTER_NAME = "group-id-counter";
+ private static final String IP_ADDRESS = "ipAddress";
+
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@@ -172,6 +175,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected K8sServiceService k8sServiceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sPodService k8sPodService;
+
/** Service IP address translation mode. */
private String serviceIpNatMode = SERVICE_IP_NAT_MODE_DEFAULT;
@@ -181,12 +187,11 @@
new InternalNodeEventListener();
private final InternalK8sServiceListener internalK8sServiceListener =
new InternalK8sServiceListener();
+ private final InternalK8sPodListener internalK8sPodListener =
+ new InternalK8sPodListener();
private AtomicCounter groupIdCounter;
- // service IP ports has following format IP_PORT_PROTO
- private ConsistentMap<String, Integer> servicePortGroupIdMap;
-
private ApplicationId appId;
private NodeId localNodeId;
@@ -198,21 +203,17 @@
leadershipService.runForLeadership(appId.name());
k8sNodeService.addListener(internalNodeEventListener);
k8sServiceService.addListener(internalK8sServiceListener);
+ k8sPodService.addListener(internalK8sPodListener);
groupIdCounter = storageService.getAtomicCounter(GROUP_ID_COUNTER_NAME);
- servicePortGroupIdMap = storageService.<String, Integer>consistentMapBuilder()
- .withName("k8s-service-ip-port-set")
- .withSerializer(Serializer.using(KryoNamespaces.API))
- .withApplicationId(appId)
- .build();
-
log.info("Started");
}
@Deactivate
protected void deactivate() {
leadershipService.withdraw(appId.name());
+ k8sPodService.removeListener(internalK8sPodListener);
k8sNodeService.removeListener(internalNodeEventListener);
k8sServiceService.removeListener(internalK8sServiceListener);
configService.unregisterProperties(getClass(), false);
@@ -345,8 +346,65 @@
return map;
}
- private void setStatelessGroupFlowRules(DeviceId deviceId, Service service,
- boolean install) {
+ private void setGroupBuckets(DeviceId deviceId, Service service, Pod pod, boolean install) {
+
+ if (pod.getMetadata().getAnnotations() == null) {
+ return;
+ }
+
+ String podIpStr = pod.getMetadata().getAnnotations().get(IP_ADDRESS);
+
+ Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
+ Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
+
+ spEpasMap.forEach((sp, epas) -> {
+ List<GroupBucket> bkts = Lists.newArrayList();
+
+ if (install) {
+ if (epas.contains(podIpStr)) {
+ bkts = buildBuckets(deviceId, podIpStr, sp);
+ }
+ } else {
+ bkts = buildBuckets(deviceId, podIpStr, sp);
+ }
+
+ spGrpBkts.put(sp, bkts);
+ });
+
+ String serviceIp = service.getSpec().getClusterIP();
+ spGrpBkts.forEach((sp, bkts) -> {
+ String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
+ int groupId = svcStr.hashCode();
+
+ k8sGroupRuleService.setBuckets(appId, deviceId, groupId, bkts, install);
+ });
+ }
+
+ private List<GroupBucket> buildBuckets(DeviceId deviceId,
+ String podIpStr,
+ ServicePort sp) {
+ List<GroupBucket> bkts = Lists.newArrayList();
+
+ ExtensionTreatment resubmitTreatment = buildResubmitExtension(
+ deviceService.getDevice(deviceId), ROUTING_TABLE);
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+ .setIpDst(IpAddress.valueOf(podIpStr))
+ .extension(resubmitTreatment, deviceId);
+
+ if (TCP.equals(sp.getProtocol())) {
+ tBuilder.setTcpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
+ } else if (UDP.equals(sp.getProtocol())) {
+ tBuilder.setUdpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
+ }
+
+ bkts.add(buildGroupBucket(tBuilder.build(), SELECT, (short) -1));
+
+ return bkts;
+ }
+
+ private synchronized void setStatelessGroupFlowRules(DeviceId deviceId,
+ Service service,
+ boolean install) {
Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
Map<String, String> nodeIpGatewayIpMap =
nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
@@ -376,23 +434,36 @@
String serviceIp = service.getSpec().getClusterIP();
spGrpBkts.forEach((sp, bkts) -> {
String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
- int groupId;
+ int groupId = svcStr.hashCode();
- if (servicePortGroupIdMap.asJavaMap().containsKey(svcStr)) {
- groupId = servicePortGroupIdMap.asJavaMap().get(svcStr);
+ if (install) {
+
+ // add group table rules
+ k8sGroupRuleService.setRule(appId, deviceId, groupId,
+ SELECT, bkts, true);
+
+ log.info("Adding group rule {}", groupId);
+
+ // if we failed to add group rule, we will not install flow rules
+ // as this might cause rule inconsistency
+ if (k8sGroupRuleService.hasGroup(deviceId, groupId)) {
+ // add flow rules for shifting IP domain
+ setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
+ PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
+ sp.getProtocol(), true);
+ }
} else {
- groupId = (int) groupIdCounter.incrementAndGet();
- servicePortGroupIdMap.put(svcStr, groupId);
+ // remove flow rules for shifting IP domain
+ setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
+ PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
+ sp.getProtocol(), false);
+
+ // remove group table rules
+ k8sGroupRuleService.setRule(appId, deviceId, groupId,
+ SELECT, bkts, false);
+
+ log.info("Removing group rule {}", groupId);
}
-
- // add group table rules
- k8sGroupRuleService.setRule(appId, deviceId, groupId,
- SELECT, bkts, install);
-
- // add flow rules for shifting IP domain
- setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
- PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
- sp.getProtocol(), install);
});
spEpasMap.forEach((sp, epas) ->
@@ -639,6 +710,16 @@
log.info("Configured. Service IP NAT mode is {}", serviceIpNatMode);
}
+ private void setServiceNatRules(DeviceId deviceId, boolean install) {
+ if (NAT_STATEFUL.equals(serviceIpNatMode)) {
+ setStatefulServiceNatRules(deviceId, install);
+ } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
+ setStatelessServiceNatRules(deviceId, install);
+ } else {
+ log.warn("Service IP NAT mode was not configured!");
+ }
+ }
+
private class InternalK8sServiceListener implements K8sServiceListener {
private boolean isRelevantHelper() {
@@ -649,6 +730,7 @@
public void event(K8sServiceEvent event) {
switch (event.type()) {
case K8S_SERVICE_CREATED:
+ case K8S_SERVICE_UPDATED:
eventExecutor.execute(() -> processServiceCreation(event.subject()));
break;
case K8S_SERVICE_REMOVED:
@@ -697,6 +779,57 @@
}
}
+ private class InternalK8sPodListener implements K8sPodListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(K8sPodEvent event) {
+ switch (event.type()) {
+ case K8S_POD_UPDATED:
+ eventExecutor.execute(() -> processPodUpdate(event.subject()));
+ break;
+ case K8S_POD_REMOVED:
+ eventExecutor.execute(() -> processPodRemoval(event.subject()));
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void processPodUpdate(Pod pod) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setServiceRuleFromPod(pod, true);
+ }
+
+ private void processPodRemoval(Pod pod) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setServiceRuleFromPod(pod, false);
+ }
+
+ private void setServiceRuleFromPod(Pod pod, boolean install) {
+ k8sServiceService.services().forEach(s -> {
+ pod.getMetadata().getLabels().forEach((pk, pv) -> {
+ Map<String, String> selectors = s.getSpec().getSelector();
+ if (selectors != null && selectors.containsKey(pk)) {
+ if (pv.equals(selectors.get(pk))) {
+ k8sNodeService.completeNodes().forEach(n ->
+ setGroupBuckets(n.intgBridge(), s, pod, install));
+ }
+ }
+ });
+ });
+ }
+ }
+
private class InternalNodeEventListener implements K8sNodeListener {
private boolean isRelevantHelper() {
@@ -711,8 +844,7 @@
eventExecutor.execute(() -> processNodeCompletion(k8sNode));
break;
case K8S_NODE_INCOMPLETE:
- eventExecutor.execute(() -> processNodeIncompletion(k8sNode));
- break;
+ case K8S_NODE_REMOVED:
default:
break;
}
@@ -723,28 +855,7 @@
return;
}
- if (NAT_STATEFUL.equals(serviceIpNatMode)) {
- setStatefulServiceNatRules(node.intgBridge(), true);
- } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
- setStatelessServiceNatRules(node.intgBridge(), true);
- } else {
- log.warn("Service IP NAT mode was not configured!");
- }
-
- }
-
- private void processNodeIncompletion(K8sNode node) {
- if (!isRelevantHelper()) {
- return;
- }
-
- if (NAT_STATEFUL.equals(serviceIpNatMode)) {
- setStatefulServiceNatRules(node.intgBridge(), false);
- } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
- setStatelessServiceNatRules(node.intgBridge(), false);
- } else {
- log.warn("Service IP NAT mode was not configured!");
- }
+ setServiceNatRules(node.intgBridge(), true);
}
}
}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
index 1dec9d1..7510aaf 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
@@ -68,6 +68,7 @@
import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
import static org.onosproject.k8snetworking.api.Constants.PORT_NAME_PREFIX_CONTAINER;
import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.isContainer;
+import static org.onosproject.k8snode.api.K8sNodeState.INIT;
import static org.onosproject.net.AnnotationKeys.PORT_NAME;
/**
@@ -230,6 +231,31 @@
Set<Host> hosts = hostService.getConnectedHosts(connectPoint);
hosts.forEach(h -> hostProviderService.hostVanished(h.id()));
+
+ K8sPort k8sPort = portToK8sPort(port);
+
+ if (k8sPort == null) {
+ log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
+ return;
+ }
+
+ k8sNetworkService.removePort(k8sPort.portId());
+ }
+
+ /**
+ * Process port inactivate event.
+ *
+ * @param port ONOS port
+ */
+ private void processPortInactivated(Port port) {
+ K8sPort k8sPort = portToK8sPort(port);
+
+ if (k8sPort == null) {
+ log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
+ return;
+ }
+
+ k8sNetworkService.updatePort(k8sPort.updateState(K8sPort.State.INACTIVE));
}
/**
@@ -349,14 +375,16 @@
switch (event.type()) {
case K8S_NODE_COMPLETE:
- executor.execute(() -> processCompleteNode(event, event.subject()));
+ executor.execute(() -> processCompleteNode(event, k8sNode));
break;
- case K8S_NODE_INCOMPLETE:
- log.warn("{} is changed to INCOMPLETE state", k8sNode);
+ case K8S_NODE_UPDATED:
+ if (k8sNode.state() == INIT) {
+ executor.execute(() -> processIncompleteNode(event, k8sNode));
+ }
break;
case K8S_NODE_CREATED:
- case K8S_NODE_UPDATED:
case K8S_NODE_REMOVED:
+ case K8S_NODE_INCOMPLETE:
default:
break;
}
@@ -388,5 +416,23 @@
hostProviderService.hostVanished(host.id());
});
}
+
+ private void processIncompleteNode(K8sNodeEvent event, K8sNode k8sNode) {
+ if (!isRelevantHelper(event)) {
+ return;
+ }
+
+ log.info("INIT node {} is detected", k8sNode.hostname());
+
+ deviceService.getPorts(k8sNode.intgBridge()).stream()
+ .filter(port -> isContainer(port.annotations().value(PORT_NAME)))
+ .filter(Port::isEnabled)
+ .forEach(port -> {
+ log.debug("Container port {} is detected from {}",
+ port.annotations().value(PORT_NAME),
+ k8sNode.hostname());
+ processPortInactivated(port);
+ });
+ }
}
}