Add k8s POD events, trigger svc rule on receiving annotation event
Change-Id: I7fbbd071a9b078337c028791b7441603ae1e7473
diff --git a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sPodEvent.java b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sPodEvent.java
index a6b8d4f..a0d972c 100644
--- a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sPodEvent.java
+++ b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sPodEvent.java
@@ -46,6 +46,46 @@
K8S_POD_UPDATED,
/**
+ * Signifies that the kubernetes pod is in Pending phase.
+ */
+ K8S_POD_PENDING,
+
+ /**
+ * Signifies that the kubernetes pod is in Running phase.
+ */
+ K8S_POD_RUNNING,
+
+ /**
+ * Signifies that the kubernetes pod is in Succeeded phase.
+ */
+ K8S_POD_SUCCEEDED,
+
+ /**
+ * Signifies that the kubernetes pod is in Failed phase.
+ */
+ K8S_POD_FAILED,
+
+ /**
+ * Signifies that the kubernetes pod is in Unknown phase.
+ */
+ K8S_POD_UNKNOWN,
+
+ /**
+ * Signifies that the kubernetes pod is in Completed phase.
+ */
+ K8S_POD_COMPLETED,
+
+ /**
+ * Signifies that the kubernetes pod is in CrashLoopBackOff phase.
+ */
+ K8S_POD_CRASH_LOOP_BACK_OFF,
+
+ /**
+ * Signifies that the kubernetes pod annotation is added.
+ */
+ K8S_POD_ANNOTATION_ADDED,
+
+ /**
* Signifies that the kubernetes pod is removed.
*/
K8S_POD_REMOVED,
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sPodStore.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sPodStore.java
index 138555b..65921dc 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sPodStore.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sPodStore.java
@@ -96,14 +96,23 @@
import java.util.Collection;
import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_ANNOTATION_ADDED;
+import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_COMPLETED;
+import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_CRASH_LOOP_BACK_OFF;
import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_CREATED;
+import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_FAILED;
+import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_PENDING;
import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_REMOVED;
+import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_RUNNING;
+import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_SUCCEEDED;
+import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_UNKNOWN;
import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_UPDATED;
import static org.slf4j.LoggerFactory.getLogger;
@@ -121,6 +130,14 @@
private static final String ERR_DUPLICATE = " already exists";
private static final String APP_ID = "org.onosproject.k8snetwork";
+ private static final String PENDING = "Pending";
+ private static final String RUNNING = "Running";
+ private static final String SUCCEEDED = "Succeeded";
+ private static final String FAILED = "Failed";
+ private static final String UNKNOWN = "Unknown";
+ private static final String COMPLETED = "Completed";
+ private static final String CRASH_LOOP_BACK_OFF = "CrashLoopBackOff";
+
private static final KryoNamespace
SERIALIZER_K8S_POD = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
@@ -275,9 +292,7 @@
break;
case UPDATE:
log.debug("Kubernetes pod updated {}", event.newValue());
- eventExecutor.execute(() ->
- notifyDelegate(new K8sPodEvent(
- K8S_POD_UPDATED, event.newValue().value())));
+ eventExecutor.execute(() -> processUpdate(event));
break;
case REMOVE:
log.debug("Kubernetes pod removed {}", event.oldValue());
@@ -290,5 +305,56 @@
break;
}
}
+
+ private void processUpdate(MapEvent<String, Pod> event) {
+ notifyDelegate(new K8sPodEvent(
+ K8S_POD_UPDATED, event.newValue().value()));
+
+ String oldPhase = event.oldValue().value().getStatus().getPhase();
+ String newPhase = event.newValue().value().getStatus().getPhase();
+
+ if (!PENDING.equals(oldPhase) && PENDING.equals(newPhase)) {
+ notifyDelegate(new K8sPodEvent(
+ K8S_POD_PENDING, event.newValue().value()));
+ }
+
+ if (!RUNNING.equals(oldPhase) && RUNNING.equals(newPhase)) {
+ notifyDelegate(new K8sPodEvent(
+ K8S_POD_RUNNING, event.newValue().value()));
+ }
+
+ if (!SUCCEEDED.equals(oldPhase) && SUCCEEDED.equals(newPhase)) {
+ notifyDelegate(new K8sPodEvent(
+ K8S_POD_SUCCEEDED, event.newValue().value()));
+ }
+
+ if (!FAILED.equals(oldPhase) && FAILED.equals(newPhase)) {
+ notifyDelegate(new K8sPodEvent(
+ K8S_POD_FAILED, event.newValue().value()));
+ }
+
+ if (!UNKNOWN.equals(oldPhase) && UNKNOWN.equals(newPhase)) {
+ notifyDelegate(new K8sPodEvent(
+ K8S_POD_UNKNOWN, event.newValue().value()));
+ }
+
+ if (!COMPLETED.equals(oldPhase) && COMPLETED.equals(newPhase)) {
+ notifyDelegate(new K8sPodEvent(
+ K8S_POD_COMPLETED, event.newValue().value()));
+ }
+
+ if (!CRASH_LOOP_BACK_OFF.equals(oldPhase) && CRASH_LOOP_BACK_OFF.equals(newPhase)) {
+ notifyDelegate(new K8sPodEvent(
+ K8S_POD_CRASH_LOOP_BACK_OFF, event.newValue().value()));
+ }
+
+ Map<String, String> oldAnnot = event.oldValue().value().getMetadata().getAnnotations();
+ Map<String, String> newAnnot = event.newValue().value().getMetadata().getAnnotations();
+
+ if (oldAnnot == null && newAnnot != null) {
+ notifyDelegate(new K8sPodEvent(
+ K8S_POD_ANNOTATION_ADDED, event.newValue().value()));
+ }
+ }
}
}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
index 791cbc9..6a5f942 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
@@ -788,8 +788,8 @@
@Override
public void event(K8sPodEvent event) {
switch (event.type()) {
- case K8S_POD_UPDATED:
- eventExecutor.execute(() -> processPodUpdate(event.subject()));
+ case K8S_POD_ANNOTATION_ADDED:
+ eventExecutor.execute(() -> processPodAnnotAddition(event.subject()));
break;
case K8S_POD_REMOVED:
eventExecutor.execute(() -> processPodRemoval(event.subject()));
@@ -799,7 +799,7 @@
}
}
- private void processPodUpdate(Pod pod) {
+ private void processPodAnnotAddition(Pod pod) {
if (!isRelevantHelper()) {
return;
}
diff --git a/apps/k8s-networking/app/src/test/java/org/onosproject/k8snetworking/impl/K8sPodManagerTest.java b/apps/k8s-networking/app/src/test/java/org/onosproject/k8snetworking/impl/K8sPodManagerTest.java
index 3894f9c..06572d0 100644
--- a/apps/k8s-networking/app/src/test/java/org/onosproject/k8snetworking/impl/K8sPodManagerTest.java
+++ b/apps/k8s-networking/app/src/test/java/org/onosproject/k8snetworking/impl/K8sPodManagerTest.java
@@ -19,6 +19,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodStatus;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -181,10 +182,14 @@
meta.setUid(uid);
meta.setName(name);
+ PodStatus status = new PodStatus();
+ status.setPhase("Running");
+
Pod pod = new Pod();
pod.setApiVersion("v1");
pod.setKind("pod");
pod.setMetadata(meta);
+ pod.setStatus(status);
return pod;
}