blob: bf149e228d77a332b82cef56c7686b89ecb67cbf [file] [log] [blame]
Jian Liac31f652021-01-17 02:18:30 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
19import io.fabric8.kubernetes.api.model.Affinity;
20import io.fabric8.kubernetes.api.model.Capabilities;
21import io.fabric8.kubernetes.api.model.ConfigMapEnvSource;
22import io.fabric8.kubernetes.api.model.ConfigMapKeySelector;
Jian Lic42f3cb2021-07-21 18:32:31 +090023import io.fabric8.kubernetes.api.model.ConfigMapProjection;
Jian Liac31f652021-01-17 02:18:30 +090024import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource;
25import io.fabric8.kubernetes.api.model.Container;
26import io.fabric8.kubernetes.api.model.ContainerPort;
27import io.fabric8.kubernetes.api.model.ContainerState;
28import io.fabric8.kubernetes.api.model.ContainerStateRunning;
29import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
30import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
31import io.fabric8.kubernetes.api.model.ContainerStatus;
Jian Lic42f3cb2021-07-21 18:32:31 +090032import io.fabric8.kubernetes.api.model.DownwardAPIProjection;
Jian Lib6dc08f2021-03-24 15:24:18 +090033import io.fabric8.kubernetes.api.model.DownwardAPIVolumeFile;
34import io.fabric8.kubernetes.api.model.DownwardAPIVolumeSource;
Jian Liac31f652021-01-17 02:18:30 +090035import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource;
36import io.fabric8.kubernetes.api.model.EnvFromSource;
37import io.fabric8.kubernetes.api.model.EnvVar;
38import io.fabric8.kubernetes.api.model.EnvVarSource;
39import io.fabric8.kubernetes.api.model.ExecAction;
40import io.fabric8.kubernetes.api.model.FieldsV1;
41import io.fabric8.kubernetes.api.model.HTTPGetAction;
42import io.fabric8.kubernetes.api.model.HTTPHeader;
Jian Lifc724252021-01-29 00:01:51 +090043import io.fabric8.kubernetes.api.model.Handler;
Jian Liac31f652021-01-17 02:18:30 +090044import io.fabric8.kubernetes.api.model.HostPathVolumeSource;
45import io.fabric8.kubernetes.api.model.IntOrString;
46import io.fabric8.kubernetes.api.model.KeyToPath;
47import io.fabric8.kubernetes.api.model.LabelSelector;
48import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
49import io.fabric8.kubernetes.api.model.Lifecycle;
Jian Lifc724252021-01-29 00:01:51 +090050import io.fabric8.kubernetes.api.model.LocalObjectReference;
Jian Liac31f652021-01-17 02:18:30 +090051import io.fabric8.kubernetes.api.model.ManagedFieldsEntry;
52import io.fabric8.kubernetes.api.model.NodeAffinity;
53import io.fabric8.kubernetes.api.model.NodeSelector;
54import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
55import io.fabric8.kubernetes.api.model.NodeSelectorTerm;
56import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
57import io.fabric8.kubernetes.api.model.ObjectMeta;
58import io.fabric8.kubernetes.api.model.OwnerReference;
59import io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSource;
60import io.fabric8.kubernetes.api.model.Pod;
61import io.fabric8.kubernetes.api.model.PodAffinity;
62import io.fabric8.kubernetes.api.model.PodAffinityTerm;
63import io.fabric8.kubernetes.api.model.PodAntiAffinity;
64import io.fabric8.kubernetes.api.model.PodCondition;
65import io.fabric8.kubernetes.api.model.PodIP;
66import io.fabric8.kubernetes.api.model.PodSecurityContext;
67import io.fabric8.kubernetes.api.model.PodSpec;
68import io.fabric8.kubernetes.api.model.PodStatus;
69import io.fabric8.kubernetes.api.model.PreferredSchedulingTerm;
70import io.fabric8.kubernetes.api.model.Probe;
Jian Lic42f3cb2021-07-21 18:32:31 +090071import io.fabric8.kubernetes.api.model.ProjectedVolumeSource;
Jian Liac31f652021-01-17 02:18:30 +090072import io.fabric8.kubernetes.api.model.Quantity;
73import io.fabric8.kubernetes.api.model.ResourceFieldSelector;
74import io.fabric8.kubernetes.api.model.ResourceRequirements;
75import io.fabric8.kubernetes.api.model.SELinuxOptions;
Jian Li72f3dac2021-01-28 16:14:54 +090076import io.fabric8.kubernetes.api.model.SeccompProfile;
Jian Liac31f652021-01-17 02:18:30 +090077import io.fabric8.kubernetes.api.model.SecretEnvSource;
78import io.fabric8.kubernetes.api.model.SecretKeySelector;
Jian Lic42f3cb2021-07-21 18:32:31 +090079import io.fabric8.kubernetes.api.model.SecretProjection;
Jian Liac31f652021-01-17 02:18:30 +090080import io.fabric8.kubernetes.api.model.SecretVolumeSource;
81import io.fabric8.kubernetes.api.model.SecurityContext;
Jian Lic42f3cb2021-07-21 18:32:31 +090082import io.fabric8.kubernetes.api.model.ServiceAccountTokenProjection;
Jian Liac31f652021-01-17 02:18:30 +090083import io.fabric8.kubernetes.api.model.TCPSocketAction;
84import io.fabric8.kubernetes.api.model.Toleration;
85import io.fabric8.kubernetes.api.model.Volume;
86import io.fabric8.kubernetes.api.model.VolumeDevice;
87import io.fabric8.kubernetes.api.model.VolumeMount;
Jian Lic42f3cb2021-07-21 18:32:31 +090088import io.fabric8.kubernetes.api.model.VolumeProjection;
Jian Liac31f652021-01-17 02:18:30 +090089import io.fabric8.kubernetes.api.model.WeightedPodAffinityTerm;
90import org.onlab.util.KryoNamespace;
91import org.onosproject.core.ApplicationId;
92import org.onosproject.core.CoreService;
93import org.onosproject.kubevirtnetworking.api.KubevirtPodEvent;
94import org.onosproject.kubevirtnetworking.api.KubevirtPodStore;
95import org.onosproject.kubevirtnetworking.api.KubevirtPodStoreDelegate;
96import org.onosproject.store.AbstractStore;
97import org.onosproject.store.serializers.KryoNamespaces;
98import org.onosproject.store.service.ConsistentMap;
99import org.onosproject.store.service.MapEvent;
100import org.onosproject.store.service.MapEventListener;
101import org.onosproject.store.service.Serializer;
102import org.onosproject.store.service.StorageService;
103import org.onosproject.store.service.Versioned;
104import org.osgi.service.component.annotations.Activate;
105import org.osgi.service.component.annotations.Component;
106import org.osgi.service.component.annotations.Deactivate;
107import org.osgi.service.component.annotations.Reference;
108import org.osgi.service.component.annotations.ReferenceCardinality;
109import org.slf4j.Logger;
110
111import java.util.Collection;
112import java.util.LinkedHashMap;
113import java.util.Map;
114import java.util.Set;
115import java.util.concurrent.ExecutorService;
116
117import static com.google.common.base.Preconditions.checkArgument;
118import static java.util.concurrent.Executors.newSingleThreadExecutor;
119import static org.onlab.util.Tools.groupedThreads;
120import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_ANNOTATION_ADDED;
121import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_COMPLETED;
122import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_CRASH_LOOP_BACK_OFF;
123import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_CREATED;
124import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_FAILED;
125import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_PENDING;
126import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_REMOVED;
127import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_RUNNING;
128import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_SUCCEEDED;
129import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_UNKNOWN;
130import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_UPDATED;
131import static org.slf4j.LoggerFactory.getLogger;
132
133/**
134 * Implementation of kubevirt pod store using consistent map.
135 */
136@Component(immediate = true, service = KubevirtPodStore.class)
137public class DistributedKubevirtPodStore
138 extends AbstractStore<KubevirtPodEvent, KubevirtPodStoreDelegate>
139 implements KubevirtPodStore {
140 private final Logger log = getLogger(getClass());
141
142 private static final String ERR_NOT_FOUND = " does not exist";
143 private static final String ERR_DUPLICATE = " already exists";
144 private static final String APP_ID = "org.onosproject.k8snetwork";
145
146 private static final String PENDING = "Pending";
147 private static final String RUNNING = "Running";
148 private static final String SUCCEEDED = "Succeeded";
149 private static final String FAILED = "Failed";
150 private static final String UNKNOWN = "Unknown";
151 private static final String COMPLETED = "Completed";
152 private static final String CRASH_LOOP_BACK_OFF = "CrashLoopBackOff";
153
154 private static final KryoNamespace
155 SERIALIZER_K8S_POD = KryoNamespace.newBuilder()
156 .register(KryoNamespaces.API)
157 .register(Pod.class)
158 .register(ObjectMeta.class)
159 .register(PodSpec.class)
160 .register(PodStatus.class)
161 .register(PodCondition.class)
162 .register(PodIP.class)
163 .register(Container.class)
164 .register(EnvVar.class)
165 .register(EnvVarSource.class)
166 .register(EnvFromSource.class)
167 .register(ConfigMapEnvSource.class)
168 .register(ConfigMapKeySelector.class)
169 .register(ObjectFieldSelector.class)
170 .register(ResourceFieldSelector.class)
171 .register(SecretKeySelector.class)
172 .register(Lifecycle.class)
173 .register(SecretEnvSource.class)
174 .register(ContainerStatus.class)
175 .register(ContainerState.class)
176 .register(ContainerStateRunning.class)
177 .register(ContainerStateTerminated.class)
178 .register(ContainerStateWaiting.class)
179 .register(OwnerReference.class)
180 .register(Probe.class)
181 .register(ExecAction.class)
182 .register(HTTPGetAction.class)
183 .register(HTTPHeader.class)
184 .register(TCPSocketAction.class)
185 .register(ContainerPort.class)
186 .register(ResourceRequirements.class)
187 .register(SecurityContext.class)
188 .register(PodSecurityContext.class)
189 .register(SELinuxOptions.class)
Jian Li72f3dac2021-01-28 16:14:54 +0900190 .register(SeccompProfile.class)
Jian Lifc724252021-01-29 00:01:51 +0900191 .register(Handler.class)
192 .register(ExecAction.class)
193 .register(HTTPGetAction.class)
194 .register(TCPSocketAction.class)
195 .register(HTTPHeader.class)
Jian Liac31f652021-01-17 02:18:30 +0900196 .register(Volume.class)
197 .register(VolumeDevice.class)
198 .register(VolumeMount.class)
199 .register(IntOrString.class)
200 .register(Toleration.class)
201 .register(PersistentVolumeClaimVolumeSource.class)
Jian Lic42f3cb2021-07-21 18:32:31 +0900202 .register(ProjectedVolumeSource.class)
203 .register(VolumeProjection.class)
204 .register(ConfigMapProjection.class)
205 .register(KeyToPath.class)
206 .register(DownwardAPIProjection.class)
207 .register(DownwardAPIVolumeFile.class)
208 .register(ObjectFieldSelector.class)
209 .register(ResourceFieldSelector.class)
210 .register(SecretProjection.class)
211 .register(Quantity.class)
212 .register(ServiceAccountTokenProjection.class)
Jian Liac31f652021-01-17 02:18:30 +0900213 .register(SecretVolumeSource.class)
214 .register(EmptyDirVolumeSource.class)
215 .register(Quantity.class)
216 .register(Capabilities.class)
217 .register(ConfigMapVolumeSource.class)
218 .register(KeyToPath.class)
219 .register(HostPathVolumeSource.class)
220 .register(Affinity.class)
221 .register(NodeAffinity.class)
222 .register(NodeSelector.class)
223 .register(NodeSelectorTerm.class)
224 .register(NodeSelectorRequirement.class)
225 .register(PreferredSchedulingTerm.class)
226 .register(PodAffinity.class)
227 .register(WeightedPodAffinityTerm.class)
228 .register(PodAffinityTerm.class)
229 .register(LabelSelector.class)
230 .register(LabelSelectorRequirement.class)
Jian Lifc724252021-01-29 00:01:51 +0900231 .register(LocalObjectReference.class)
Jian Liac31f652021-01-17 02:18:30 +0900232 .register(PodAntiAffinity.class)
233 .register(ManagedFieldsEntry.class)
Jian Lib6dc08f2021-03-24 15:24:18 +0900234 .register(DownwardAPIVolumeSource.class)
235 .register(DownwardAPIVolumeFile.class)
236 .register(ObjectFieldSelector.class)
Jian Liac31f652021-01-17 02:18:30 +0900237 .register(FieldsV1.class)
238 .register(LinkedHashMap.class)
239 .register(Collection.class)
240 .build();
241
242 @Reference(cardinality = ReferenceCardinality.MANDATORY)
243 protected CoreService coreService;
244
245 @Reference(cardinality = ReferenceCardinality.MANDATORY)
246 protected StorageService storageService;
247
248 private final ExecutorService eventExecutor = newSingleThreadExecutor(
249 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
250
251 private final MapEventListener<String, Pod> podMapListener = new KubevirtPodMapListener();
252
253 private ConsistentMap<String, Pod> podStore;
254
255 @Activate
256 protected void activate() {
257 ApplicationId appId = coreService.registerApplication(APP_ID);
258 podStore = storageService.<String, Pod>consistentMapBuilder()
259 .withSerializer(Serializer.using(SERIALIZER_K8S_POD))
260 .withName("kubevirt-pod-store")
261 .withApplicationId(appId)
262 .build();
263
264 podStore.addListener(podMapListener);
265 log.info("Started");
266 }
267
268 @Deactivate
269 protected void deactivate() {
270 podStore.removeListener(podMapListener);
271 eventExecutor.shutdown();
272 log.info("Stopped");
273 }
274
275 @Override
276 public void createPod(Pod pod) {
277 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
278 final String error = pod.getMetadata().getUid() + ERR_DUPLICATE;
279 checkArgument(existing == null, error);
280 return pod;
281 });
282 }
283
284 @Override
285 public void updatePod(Pod pod) {
286 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
287 final String error = pod.getMetadata().getUid() + ERR_NOT_FOUND;
288 checkArgument(existing != null, error);
289 return pod;
290 });
291 }
292
293 @Override
294 public Pod removePod(String uid) {
295 Versioned<Pod> pod = podStore.remove(uid);
296 if (pod == null) {
297 final String error = uid + ERR_NOT_FOUND;
298 throw new IllegalArgumentException(error);
299 }
300 return pod.value();
301 }
302
303 @Override
304 public Pod pod(String uid) {
305 return podStore.asJavaMap().get(uid);
306 }
307
308 @Override
309 public Set<Pod> pods() {
310 return ImmutableSet.copyOf(podStore.asJavaMap().values());
311 }
312
313 @Override
314 public void clear() {
315 podStore.clear();
316 }
317
318 private class KubevirtPodMapListener implements MapEventListener<String, Pod> {
319
320 @Override
321 public void event(MapEvent<String, Pod> event) {
322
323 switch (event.type()) {
324 case INSERT:
325 log.debug("Kubernetes pod created {}", event.newValue());
326 eventExecutor.execute(() ->
327 notifyDelegate(new KubevirtPodEvent(
328 KUBEVIRT_POD_CREATED, event.newValue().value())));
329 break;
330 case UPDATE:
331 log.debug("Kubernetes pod updated {}", event.newValue());
332 eventExecutor.execute(() -> processUpdate(event));
333 break;
334 case REMOVE:
335 log.debug("Kubernetes pod removed {}", event.oldValue());
336 eventExecutor.execute(() ->
337 notifyDelegate(new KubevirtPodEvent(
338 KUBEVIRT_POD_REMOVED, event.oldValue().value())));
339 break;
340 default:
341 // do nothing
342 break;
343 }
344 }
345
346 private void processUpdate(MapEvent<String, Pod> event) {
347 notifyDelegate(new KubevirtPodEvent(
348 KUBEVIRT_POD_UPDATED, event.newValue().value()));
349
350 String oldPhase = event.oldValue().value().getStatus().getPhase();
351 String newPhase = event.newValue().value().getStatus().getPhase();
352
353 if (!PENDING.equals(oldPhase) && PENDING.equals(newPhase)) {
354 notifyDelegate(new KubevirtPodEvent(
355 KUBEVIRT_POD_PENDING, event.newValue().value()));
356 }
357
358 if (!RUNNING.equals(oldPhase) && RUNNING.equals(newPhase)) {
359 notifyDelegate(new KubevirtPodEvent(
360 KUBEVIRT_POD_RUNNING, event.newValue().value()));
361 }
362
363 if (!SUCCEEDED.equals(oldPhase) && SUCCEEDED.equals(newPhase)) {
364 notifyDelegate(new KubevirtPodEvent(
365 KUBEVIRT_POD_SUCCEEDED, event.newValue().value()));
366 }
367
368 if (!FAILED.equals(oldPhase) && FAILED.equals(newPhase)) {
369 notifyDelegate(new KubevirtPodEvent(
370 KUBEVIRT_POD_FAILED, event.newValue().value()));
371 }
372
373 if (!UNKNOWN.equals(oldPhase) && UNKNOWN.equals(newPhase)) {
374 notifyDelegate(new KubevirtPodEvent(
375 KUBEVIRT_POD_UNKNOWN, event.newValue().value()));
376 }
377
378 if (!COMPLETED.equals(oldPhase) && COMPLETED.equals(newPhase)) {
379 notifyDelegate(new KubevirtPodEvent(
380 KUBEVIRT_POD_COMPLETED, event.newValue().value()));
381 }
382
383 if (!CRASH_LOOP_BACK_OFF.equals(oldPhase) && CRASH_LOOP_BACK_OFF.equals(newPhase)) {
384 notifyDelegate(new KubevirtPodEvent(
385 KUBEVIRT_POD_CRASH_LOOP_BACK_OFF, event.newValue().value()));
386 }
387
388 Map<String, String> oldAnnot = event.oldValue().value().getMetadata().getAnnotations();
389 Map<String, String> newAnnot = event.newValue().value().getMetadata().getAnnotations();
390
391 if (oldAnnot == null && newAnnot != null) {
392 notifyDelegate(new KubevirtPodEvent(
393 KUBEVIRT_POD_ANNOTATION_ADDED, event.newValue().value()));
394 }
395 }
396 }
397}