blob: 86d103c067dadaa4724fc9b23535fd4697a135f5 [file] [log] [blame]
Jian Liac31f652021-01-17 02:18:30 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
19import io.fabric8.kubernetes.api.model.Affinity;
20import io.fabric8.kubernetes.api.model.Capabilities;
21import io.fabric8.kubernetes.api.model.ConfigMapEnvSource;
22import io.fabric8.kubernetes.api.model.ConfigMapKeySelector;
Jian Lic42f3cb2021-07-21 18:32:31 +090023import io.fabric8.kubernetes.api.model.ConfigMapProjection;
Jian Liac31f652021-01-17 02:18:30 +090024import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource;
25import io.fabric8.kubernetes.api.model.Container;
26import io.fabric8.kubernetes.api.model.ContainerPort;
27import io.fabric8.kubernetes.api.model.ContainerState;
28import io.fabric8.kubernetes.api.model.ContainerStateRunning;
29import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
30import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
31import io.fabric8.kubernetes.api.model.ContainerStatus;
Jian Lic42f3cb2021-07-21 18:32:31 +090032import io.fabric8.kubernetes.api.model.DownwardAPIProjection;
Jian Lib6dc08f2021-03-24 15:24:18 +090033import io.fabric8.kubernetes.api.model.DownwardAPIVolumeFile;
34import io.fabric8.kubernetes.api.model.DownwardAPIVolumeSource;
Jian Liac31f652021-01-17 02:18:30 +090035import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource;
36import io.fabric8.kubernetes.api.model.EnvFromSource;
37import io.fabric8.kubernetes.api.model.EnvVar;
38import io.fabric8.kubernetes.api.model.EnvVarSource;
39import io.fabric8.kubernetes.api.model.ExecAction;
40import io.fabric8.kubernetes.api.model.FieldsV1;
41import io.fabric8.kubernetes.api.model.HTTPGetAction;
42import io.fabric8.kubernetes.api.model.HTTPHeader;
Jian Lifc724252021-01-29 00:01:51 +090043import io.fabric8.kubernetes.api.model.Handler;
Jian Liac31f652021-01-17 02:18:30 +090044import io.fabric8.kubernetes.api.model.HostPathVolumeSource;
45import io.fabric8.kubernetes.api.model.IntOrString;
46import io.fabric8.kubernetes.api.model.KeyToPath;
47import io.fabric8.kubernetes.api.model.LabelSelector;
48import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
49import io.fabric8.kubernetes.api.model.Lifecycle;
Jian Lifc724252021-01-29 00:01:51 +090050import io.fabric8.kubernetes.api.model.LocalObjectReference;
Jian Liac31f652021-01-17 02:18:30 +090051import io.fabric8.kubernetes.api.model.ManagedFieldsEntry;
52import io.fabric8.kubernetes.api.model.NodeAffinity;
53import io.fabric8.kubernetes.api.model.NodeSelector;
54import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
55import io.fabric8.kubernetes.api.model.NodeSelectorTerm;
56import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
57import io.fabric8.kubernetes.api.model.ObjectMeta;
58import io.fabric8.kubernetes.api.model.OwnerReference;
59import io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSource;
60import io.fabric8.kubernetes.api.model.Pod;
61import io.fabric8.kubernetes.api.model.PodAffinity;
62import io.fabric8.kubernetes.api.model.PodAffinityTerm;
63import io.fabric8.kubernetes.api.model.PodAntiAffinity;
64import io.fabric8.kubernetes.api.model.PodCondition;
65import io.fabric8.kubernetes.api.model.PodIP;
Jian Li106b3822021-12-14 15:54:58 +090066import io.fabric8.kubernetes.api.model.PodReadinessGate;
Jian Liac31f652021-01-17 02:18:30 +090067import io.fabric8.kubernetes.api.model.PodSecurityContext;
68import io.fabric8.kubernetes.api.model.PodSpec;
69import io.fabric8.kubernetes.api.model.PodStatus;
70import io.fabric8.kubernetes.api.model.PreferredSchedulingTerm;
71import io.fabric8.kubernetes.api.model.Probe;
Jian Lic42f3cb2021-07-21 18:32:31 +090072import io.fabric8.kubernetes.api.model.ProjectedVolumeSource;
Jian Liac31f652021-01-17 02:18:30 +090073import io.fabric8.kubernetes.api.model.Quantity;
74import io.fabric8.kubernetes.api.model.ResourceFieldSelector;
75import io.fabric8.kubernetes.api.model.ResourceRequirements;
76import io.fabric8.kubernetes.api.model.SELinuxOptions;
Jian Li72f3dac2021-01-28 16:14:54 +090077import io.fabric8.kubernetes.api.model.SeccompProfile;
Jian Liac31f652021-01-17 02:18:30 +090078import io.fabric8.kubernetes.api.model.SecretEnvSource;
79import io.fabric8.kubernetes.api.model.SecretKeySelector;
Jian Lic42f3cb2021-07-21 18:32:31 +090080import io.fabric8.kubernetes.api.model.SecretProjection;
Jian Liac31f652021-01-17 02:18:30 +090081import io.fabric8.kubernetes.api.model.SecretVolumeSource;
82import io.fabric8.kubernetes.api.model.SecurityContext;
Jian Lic42f3cb2021-07-21 18:32:31 +090083import io.fabric8.kubernetes.api.model.ServiceAccountTokenProjection;
Jian Liac31f652021-01-17 02:18:30 +090084import io.fabric8.kubernetes.api.model.TCPSocketAction;
85import io.fabric8.kubernetes.api.model.Toleration;
86import io.fabric8.kubernetes.api.model.Volume;
87import io.fabric8.kubernetes.api.model.VolumeDevice;
88import io.fabric8.kubernetes.api.model.VolumeMount;
Jian Lic42f3cb2021-07-21 18:32:31 +090089import io.fabric8.kubernetes.api.model.VolumeProjection;
Jian Liac31f652021-01-17 02:18:30 +090090import io.fabric8.kubernetes.api.model.WeightedPodAffinityTerm;
91import org.onlab.util.KryoNamespace;
92import org.onosproject.core.ApplicationId;
93import org.onosproject.core.CoreService;
94import org.onosproject.kubevirtnetworking.api.KubevirtPodEvent;
95import org.onosproject.kubevirtnetworking.api.KubevirtPodStore;
96import org.onosproject.kubevirtnetworking.api.KubevirtPodStoreDelegate;
97import org.onosproject.store.AbstractStore;
98import org.onosproject.store.serializers.KryoNamespaces;
99import org.onosproject.store.service.ConsistentMap;
100import org.onosproject.store.service.MapEvent;
101import org.onosproject.store.service.MapEventListener;
102import org.onosproject.store.service.Serializer;
103import org.onosproject.store.service.StorageService;
104import org.onosproject.store.service.Versioned;
105import org.osgi.service.component.annotations.Activate;
106import org.osgi.service.component.annotations.Component;
107import org.osgi.service.component.annotations.Deactivate;
108import org.osgi.service.component.annotations.Reference;
109import org.osgi.service.component.annotations.ReferenceCardinality;
110import org.slf4j.Logger;
111
112import java.util.Collection;
113import java.util.LinkedHashMap;
114import java.util.Map;
115import java.util.Set;
116import java.util.concurrent.ExecutorService;
117
118import static com.google.common.base.Preconditions.checkArgument;
119import static java.util.concurrent.Executors.newSingleThreadExecutor;
120import static org.onlab.util.Tools.groupedThreads;
121import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_ANNOTATION_ADDED;
122import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_COMPLETED;
123import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_CRASH_LOOP_BACK_OFF;
124import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_CREATED;
125import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_FAILED;
126import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_PENDING;
127import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_REMOVED;
128import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_RUNNING;
129import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_SUCCEEDED;
130import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_UNKNOWN;
131import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_UPDATED;
132import static org.slf4j.LoggerFactory.getLogger;
133
134/**
135 * Implementation of kubevirt pod store using consistent map.
136 */
137@Component(immediate = true, service = KubevirtPodStore.class)
138public class DistributedKubevirtPodStore
139 extends AbstractStore<KubevirtPodEvent, KubevirtPodStoreDelegate>
140 implements KubevirtPodStore {
141 private final Logger log = getLogger(getClass());
142
143 private static final String ERR_NOT_FOUND = " does not exist";
144 private static final String ERR_DUPLICATE = " already exists";
145 private static final String APP_ID = "org.onosproject.k8snetwork";
146
147 private static final String PENDING = "Pending";
148 private static final String RUNNING = "Running";
149 private static final String SUCCEEDED = "Succeeded";
150 private static final String FAILED = "Failed";
151 private static final String UNKNOWN = "Unknown";
152 private static final String COMPLETED = "Completed";
153 private static final String CRASH_LOOP_BACK_OFF = "CrashLoopBackOff";
154
155 private static final KryoNamespace
156 SERIALIZER_K8S_POD = KryoNamespace.newBuilder()
157 .register(KryoNamespaces.API)
158 .register(Pod.class)
159 .register(ObjectMeta.class)
160 .register(PodSpec.class)
161 .register(PodStatus.class)
162 .register(PodCondition.class)
163 .register(PodIP.class)
164 .register(Container.class)
165 .register(EnvVar.class)
166 .register(EnvVarSource.class)
167 .register(EnvFromSource.class)
168 .register(ConfigMapEnvSource.class)
169 .register(ConfigMapKeySelector.class)
170 .register(ObjectFieldSelector.class)
171 .register(ResourceFieldSelector.class)
172 .register(SecretKeySelector.class)
173 .register(Lifecycle.class)
174 .register(SecretEnvSource.class)
175 .register(ContainerStatus.class)
176 .register(ContainerState.class)
177 .register(ContainerStateRunning.class)
178 .register(ContainerStateTerminated.class)
179 .register(ContainerStateWaiting.class)
180 .register(OwnerReference.class)
181 .register(Probe.class)
182 .register(ExecAction.class)
183 .register(HTTPGetAction.class)
184 .register(HTTPHeader.class)
185 .register(TCPSocketAction.class)
186 .register(ContainerPort.class)
187 .register(ResourceRequirements.class)
188 .register(SecurityContext.class)
189 .register(PodSecurityContext.class)
190 .register(SELinuxOptions.class)
Jian Li72f3dac2021-01-28 16:14:54 +0900191 .register(SeccompProfile.class)
Jian Lifc724252021-01-29 00:01:51 +0900192 .register(Handler.class)
193 .register(ExecAction.class)
194 .register(HTTPGetAction.class)
195 .register(TCPSocketAction.class)
196 .register(HTTPHeader.class)
Jian Liac31f652021-01-17 02:18:30 +0900197 .register(Volume.class)
198 .register(VolumeDevice.class)
199 .register(VolumeMount.class)
200 .register(IntOrString.class)
201 .register(Toleration.class)
202 .register(PersistentVolumeClaimVolumeSource.class)
Jian Lic42f3cb2021-07-21 18:32:31 +0900203 .register(ProjectedVolumeSource.class)
204 .register(VolumeProjection.class)
205 .register(ConfigMapProjection.class)
206 .register(KeyToPath.class)
207 .register(DownwardAPIProjection.class)
208 .register(DownwardAPIVolumeFile.class)
209 .register(ObjectFieldSelector.class)
210 .register(ResourceFieldSelector.class)
211 .register(SecretProjection.class)
212 .register(Quantity.class)
213 .register(ServiceAccountTokenProjection.class)
Jian Liac31f652021-01-17 02:18:30 +0900214 .register(SecretVolumeSource.class)
215 .register(EmptyDirVolumeSource.class)
216 .register(Quantity.class)
217 .register(Capabilities.class)
218 .register(ConfigMapVolumeSource.class)
219 .register(KeyToPath.class)
220 .register(HostPathVolumeSource.class)
221 .register(Affinity.class)
222 .register(NodeAffinity.class)
223 .register(NodeSelector.class)
224 .register(NodeSelectorTerm.class)
225 .register(NodeSelectorRequirement.class)
226 .register(PreferredSchedulingTerm.class)
227 .register(PodAffinity.class)
228 .register(WeightedPodAffinityTerm.class)
229 .register(PodAffinityTerm.class)
230 .register(LabelSelector.class)
231 .register(LabelSelectorRequirement.class)
Jian Lifc724252021-01-29 00:01:51 +0900232 .register(LocalObjectReference.class)
Jian Liac31f652021-01-17 02:18:30 +0900233 .register(PodAntiAffinity.class)
234 .register(ManagedFieldsEntry.class)
Jian Lib6dc08f2021-03-24 15:24:18 +0900235 .register(DownwardAPIVolumeSource.class)
236 .register(DownwardAPIVolumeFile.class)
237 .register(ObjectFieldSelector.class)
Jian Liac31f652021-01-17 02:18:30 +0900238 .register(FieldsV1.class)
Jian Li106b3822021-12-14 15:54:58 +0900239 .register(PodReadinessGate.class)
Jian Liac31f652021-01-17 02:18:30 +0900240 .register(LinkedHashMap.class)
241 .register(Collection.class)
242 .build();
243
244 @Reference(cardinality = ReferenceCardinality.MANDATORY)
245 protected CoreService coreService;
246
247 @Reference(cardinality = ReferenceCardinality.MANDATORY)
248 protected StorageService storageService;
249
250 private final ExecutorService eventExecutor = newSingleThreadExecutor(
251 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
252
253 private final MapEventListener<String, Pod> podMapListener = new KubevirtPodMapListener();
254
255 private ConsistentMap<String, Pod> podStore;
256
257 @Activate
258 protected void activate() {
259 ApplicationId appId = coreService.registerApplication(APP_ID);
260 podStore = storageService.<String, Pod>consistentMapBuilder()
261 .withSerializer(Serializer.using(SERIALIZER_K8S_POD))
262 .withName("kubevirt-pod-store")
263 .withApplicationId(appId)
264 .build();
265
266 podStore.addListener(podMapListener);
267 log.info("Started");
268 }
269
270 @Deactivate
271 protected void deactivate() {
272 podStore.removeListener(podMapListener);
273 eventExecutor.shutdown();
274 log.info("Stopped");
275 }
276
277 @Override
278 public void createPod(Pod pod) {
279 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
280 final String error = pod.getMetadata().getUid() + ERR_DUPLICATE;
281 checkArgument(existing == null, error);
282 return pod;
283 });
284 }
285
286 @Override
287 public void updatePod(Pod pod) {
288 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
289 final String error = pod.getMetadata().getUid() + ERR_NOT_FOUND;
290 checkArgument(existing != null, error);
291 return pod;
292 });
293 }
294
295 @Override
296 public Pod removePod(String uid) {
297 Versioned<Pod> pod = podStore.remove(uid);
298 if (pod == null) {
299 final String error = uid + ERR_NOT_FOUND;
300 throw new IllegalArgumentException(error);
301 }
302 return pod.value();
303 }
304
305 @Override
306 public Pod pod(String uid) {
307 return podStore.asJavaMap().get(uid);
308 }
309
310 @Override
311 public Set<Pod> pods() {
312 return ImmutableSet.copyOf(podStore.asJavaMap().values());
313 }
314
315 @Override
316 public void clear() {
317 podStore.clear();
318 }
319
320 private class KubevirtPodMapListener implements MapEventListener<String, Pod> {
321
322 @Override
323 public void event(MapEvent<String, Pod> event) {
324
325 switch (event.type()) {
326 case INSERT:
327 log.debug("Kubernetes pod created {}", event.newValue());
328 eventExecutor.execute(() ->
329 notifyDelegate(new KubevirtPodEvent(
330 KUBEVIRT_POD_CREATED, event.newValue().value())));
331 break;
332 case UPDATE:
333 log.debug("Kubernetes pod updated {}", event.newValue());
334 eventExecutor.execute(() -> processUpdate(event));
335 break;
336 case REMOVE:
337 log.debug("Kubernetes pod removed {}", event.oldValue());
338 eventExecutor.execute(() ->
339 notifyDelegate(new KubevirtPodEvent(
340 KUBEVIRT_POD_REMOVED, event.oldValue().value())));
341 break;
342 default:
343 // do nothing
344 break;
345 }
346 }
347
348 private void processUpdate(MapEvent<String, Pod> event) {
349 notifyDelegate(new KubevirtPodEvent(
350 KUBEVIRT_POD_UPDATED, event.newValue().value()));
351
352 String oldPhase = event.oldValue().value().getStatus().getPhase();
353 String newPhase = event.newValue().value().getStatus().getPhase();
354
355 if (!PENDING.equals(oldPhase) && PENDING.equals(newPhase)) {
356 notifyDelegate(new KubevirtPodEvent(
357 KUBEVIRT_POD_PENDING, event.newValue().value()));
358 }
359
360 if (!RUNNING.equals(oldPhase) && RUNNING.equals(newPhase)) {
361 notifyDelegate(new KubevirtPodEvent(
362 KUBEVIRT_POD_RUNNING, event.newValue().value()));
363 }
364
365 if (!SUCCEEDED.equals(oldPhase) && SUCCEEDED.equals(newPhase)) {
366 notifyDelegate(new KubevirtPodEvent(
367 KUBEVIRT_POD_SUCCEEDED, event.newValue().value()));
368 }
369
370 if (!FAILED.equals(oldPhase) && FAILED.equals(newPhase)) {
371 notifyDelegate(new KubevirtPodEvent(
372 KUBEVIRT_POD_FAILED, event.newValue().value()));
373 }
374
375 if (!UNKNOWN.equals(oldPhase) && UNKNOWN.equals(newPhase)) {
376 notifyDelegate(new KubevirtPodEvent(
377 KUBEVIRT_POD_UNKNOWN, event.newValue().value()));
378 }
379
380 if (!COMPLETED.equals(oldPhase) && COMPLETED.equals(newPhase)) {
381 notifyDelegate(new KubevirtPodEvent(
382 KUBEVIRT_POD_COMPLETED, event.newValue().value()));
383 }
384
385 if (!CRASH_LOOP_BACK_OFF.equals(oldPhase) && CRASH_LOOP_BACK_OFF.equals(newPhase)) {
386 notifyDelegate(new KubevirtPodEvent(
387 KUBEVIRT_POD_CRASH_LOOP_BACK_OFF, event.newValue().value()));
388 }
389
390 Map<String, String> oldAnnot = event.oldValue().value().getMetadata().getAnnotations();
391 Map<String, String> newAnnot = event.newValue().value().getMetadata().getAnnotations();
392
393 if (oldAnnot == null && newAnnot != null) {
394 notifyDelegate(new KubevirtPodEvent(
395 KUBEVIRT_POD_ANNOTATION_ADDED, event.newValue().value()));
396 }
397 }
398 }
399}