blob: 1ef99f609e2d6896a1efcf6add9d96b56ec2469c [file] [log] [blame]
Jian Liac31f652021-01-17 02:18:30 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
19import io.fabric8.kubernetes.api.model.Affinity;
20import io.fabric8.kubernetes.api.model.Capabilities;
21import io.fabric8.kubernetes.api.model.ConfigMapEnvSource;
22import io.fabric8.kubernetes.api.model.ConfigMapKeySelector;
23import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource;
24import io.fabric8.kubernetes.api.model.Container;
25import io.fabric8.kubernetes.api.model.ContainerPort;
26import io.fabric8.kubernetes.api.model.ContainerState;
27import io.fabric8.kubernetes.api.model.ContainerStateRunning;
28import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
29import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
30import io.fabric8.kubernetes.api.model.ContainerStatus;
Jian Lib6dc08f2021-03-24 15:24:18 +090031import io.fabric8.kubernetes.api.model.DownwardAPIVolumeFile;
32import io.fabric8.kubernetes.api.model.DownwardAPIVolumeSource;
Jian Liac31f652021-01-17 02:18:30 +090033import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource;
34import io.fabric8.kubernetes.api.model.EnvFromSource;
35import io.fabric8.kubernetes.api.model.EnvVar;
36import io.fabric8.kubernetes.api.model.EnvVarSource;
37import io.fabric8.kubernetes.api.model.ExecAction;
38import io.fabric8.kubernetes.api.model.FieldsV1;
39import io.fabric8.kubernetes.api.model.HTTPGetAction;
40import io.fabric8.kubernetes.api.model.HTTPHeader;
Jian Lifc724252021-01-29 00:01:51 +090041import io.fabric8.kubernetes.api.model.Handler;
Jian Liac31f652021-01-17 02:18:30 +090042import io.fabric8.kubernetes.api.model.HostPathVolumeSource;
43import io.fabric8.kubernetes.api.model.IntOrString;
44import io.fabric8.kubernetes.api.model.KeyToPath;
45import io.fabric8.kubernetes.api.model.LabelSelector;
46import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
47import io.fabric8.kubernetes.api.model.Lifecycle;
Jian Lifc724252021-01-29 00:01:51 +090048import io.fabric8.kubernetes.api.model.LocalObjectReference;
Jian Liac31f652021-01-17 02:18:30 +090049import io.fabric8.kubernetes.api.model.ManagedFieldsEntry;
50import io.fabric8.kubernetes.api.model.NodeAffinity;
51import io.fabric8.kubernetes.api.model.NodeSelector;
52import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
53import io.fabric8.kubernetes.api.model.NodeSelectorTerm;
54import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
55import io.fabric8.kubernetes.api.model.ObjectMeta;
56import io.fabric8.kubernetes.api.model.OwnerReference;
57import io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSource;
58import io.fabric8.kubernetes.api.model.Pod;
59import io.fabric8.kubernetes.api.model.PodAffinity;
60import io.fabric8.kubernetes.api.model.PodAffinityTerm;
61import io.fabric8.kubernetes.api.model.PodAntiAffinity;
62import io.fabric8.kubernetes.api.model.PodCondition;
63import io.fabric8.kubernetes.api.model.PodIP;
64import io.fabric8.kubernetes.api.model.PodSecurityContext;
65import io.fabric8.kubernetes.api.model.PodSpec;
66import io.fabric8.kubernetes.api.model.PodStatus;
67import io.fabric8.kubernetes.api.model.PreferredSchedulingTerm;
68import io.fabric8.kubernetes.api.model.Probe;
69import io.fabric8.kubernetes.api.model.Quantity;
70import io.fabric8.kubernetes.api.model.ResourceFieldSelector;
71import io.fabric8.kubernetes.api.model.ResourceRequirements;
72import io.fabric8.kubernetes.api.model.SELinuxOptions;
Jian Li72f3dac2021-01-28 16:14:54 +090073import io.fabric8.kubernetes.api.model.SeccompProfile;
Jian Liac31f652021-01-17 02:18:30 +090074import io.fabric8.kubernetes.api.model.SecretEnvSource;
75import io.fabric8.kubernetes.api.model.SecretKeySelector;
76import io.fabric8.kubernetes.api.model.SecretVolumeSource;
77import io.fabric8.kubernetes.api.model.SecurityContext;
78import io.fabric8.kubernetes.api.model.TCPSocketAction;
79import io.fabric8.kubernetes.api.model.Toleration;
80import io.fabric8.kubernetes.api.model.Volume;
81import io.fabric8.kubernetes.api.model.VolumeDevice;
82import io.fabric8.kubernetes.api.model.VolumeMount;
83import io.fabric8.kubernetes.api.model.WeightedPodAffinityTerm;
84import org.onlab.util.KryoNamespace;
85import org.onosproject.core.ApplicationId;
86import org.onosproject.core.CoreService;
87import org.onosproject.kubevirtnetworking.api.KubevirtPodEvent;
88import org.onosproject.kubevirtnetworking.api.KubevirtPodStore;
89import org.onosproject.kubevirtnetworking.api.KubevirtPodStoreDelegate;
90import org.onosproject.store.AbstractStore;
91import org.onosproject.store.serializers.KryoNamespaces;
92import org.onosproject.store.service.ConsistentMap;
93import org.onosproject.store.service.MapEvent;
94import org.onosproject.store.service.MapEventListener;
95import org.onosproject.store.service.Serializer;
96import org.onosproject.store.service.StorageService;
97import org.onosproject.store.service.Versioned;
98import org.osgi.service.component.annotations.Activate;
99import org.osgi.service.component.annotations.Component;
100import org.osgi.service.component.annotations.Deactivate;
101import org.osgi.service.component.annotations.Reference;
102import org.osgi.service.component.annotations.ReferenceCardinality;
103import org.slf4j.Logger;
104
105import java.util.Collection;
106import java.util.LinkedHashMap;
107import java.util.Map;
108import java.util.Set;
109import java.util.concurrent.ExecutorService;
110
111import static com.google.common.base.Preconditions.checkArgument;
112import static java.util.concurrent.Executors.newSingleThreadExecutor;
113import static org.onlab.util.Tools.groupedThreads;
114import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_ANNOTATION_ADDED;
115import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_COMPLETED;
116import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_CRASH_LOOP_BACK_OFF;
117import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_CREATED;
118import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_FAILED;
119import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_PENDING;
120import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_REMOVED;
121import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_RUNNING;
122import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_SUCCEEDED;
123import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_UNKNOWN;
124import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_UPDATED;
125import static org.slf4j.LoggerFactory.getLogger;
126
127/**
128 * Implementation of kubevirt pod store using consistent map.
129 */
130@Component(immediate = true, service = KubevirtPodStore.class)
131public class DistributedKubevirtPodStore
132 extends AbstractStore<KubevirtPodEvent, KubevirtPodStoreDelegate>
133 implements KubevirtPodStore {
134 private final Logger log = getLogger(getClass());
135
136 private static final String ERR_NOT_FOUND = " does not exist";
137 private static final String ERR_DUPLICATE = " already exists";
138 private static final String APP_ID = "org.onosproject.k8snetwork";
139
140 private static final String PENDING = "Pending";
141 private static final String RUNNING = "Running";
142 private static final String SUCCEEDED = "Succeeded";
143 private static final String FAILED = "Failed";
144 private static final String UNKNOWN = "Unknown";
145 private static final String COMPLETED = "Completed";
146 private static final String CRASH_LOOP_BACK_OFF = "CrashLoopBackOff";
147
148 private static final KryoNamespace
149 SERIALIZER_K8S_POD = KryoNamespace.newBuilder()
150 .register(KryoNamespaces.API)
151 .register(Pod.class)
152 .register(ObjectMeta.class)
153 .register(PodSpec.class)
154 .register(PodStatus.class)
155 .register(PodCondition.class)
156 .register(PodIP.class)
157 .register(Container.class)
158 .register(EnvVar.class)
159 .register(EnvVarSource.class)
160 .register(EnvFromSource.class)
161 .register(ConfigMapEnvSource.class)
162 .register(ConfigMapKeySelector.class)
163 .register(ObjectFieldSelector.class)
164 .register(ResourceFieldSelector.class)
165 .register(SecretKeySelector.class)
166 .register(Lifecycle.class)
167 .register(SecretEnvSource.class)
168 .register(ContainerStatus.class)
169 .register(ContainerState.class)
170 .register(ContainerStateRunning.class)
171 .register(ContainerStateTerminated.class)
172 .register(ContainerStateWaiting.class)
173 .register(OwnerReference.class)
174 .register(Probe.class)
175 .register(ExecAction.class)
176 .register(HTTPGetAction.class)
177 .register(HTTPHeader.class)
178 .register(TCPSocketAction.class)
179 .register(ContainerPort.class)
180 .register(ResourceRequirements.class)
181 .register(SecurityContext.class)
182 .register(PodSecurityContext.class)
183 .register(SELinuxOptions.class)
Jian Li72f3dac2021-01-28 16:14:54 +0900184 .register(SeccompProfile.class)
Jian Lifc724252021-01-29 00:01:51 +0900185 .register(Handler.class)
186 .register(ExecAction.class)
187 .register(HTTPGetAction.class)
188 .register(TCPSocketAction.class)
189 .register(HTTPHeader.class)
Jian Liac31f652021-01-17 02:18:30 +0900190 .register(Volume.class)
191 .register(VolumeDevice.class)
192 .register(VolumeMount.class)
193 .register(IntOrString.class)
194 .register(Toleration.class)
195 .register(PersistentVolumeClaimVolumeSource.class)
196 .register(SecretVolumeSource.class)
197 .register(EmptyDirVolumeSource.class)
198 .register(Quantity.class)
199 .register(Capabilities.class)
200 .register(ConfigMapVolumeSource.class)
201 .register(KeyToPath.class)
202 .register(HostPathVolumeSource.class)
203 .register(Affinity.class)
204 .register(NodeAffinity.class)
205 .register(NodeSelector.class)
206 .register(NodeSelectorTerm.class)
207 .register(NodeSelectorRequirement.class)
208 .register(PreferredSchedulingTerm.class)
209 .register(PodAffinity.class)
210 .register(WeightedPodAffinityTerm.class)
211 .register(PodAffinityTerm.class)
212 .register(LabelSelector.class)
213 .register(LabelSelectorRequirement.class)
Jian Lifc724252021-01-29 00:01:51 +0900214 .register(LocalObjectReference.class)
Jian Liac31f652021-01-17 02:18:30 +0900215 .register(PodAntiAffinity.class)
216 .register(ManagedFieldsEntry.class)
Jian Lib6dc08f2021-03-24 15:24:18 +0900217 .register(DownwardAPIVolumeSource.class)
218 .register(DownwardAPIVolumeFile.class)
219 .register(ObjectFieldSelector.class)
Jian Liac31f652021-01-17 02:18:30 +0900220 .register(FieldsV1.class)
221 .register(LinkedHashMap.class)
222 .register(Collection.class)
223 .build();
224
225 @Reference(cardinality = ReferenceCardinality.MANDATORY)
226 protected CoreService coreService;
227
228 @Reference(cardinality = ReferenceCardinality.MANDATORY)
229 protected StorageService storageService;
230
231 private final ExecutorService eventExecutor = newSingleThreadExecutor(
232 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
233
234 private final MapEventListener<String, Pod> podMapListener = new KubevirtPodMapListener();
235
236 private ConsistentMap<String, Pod> podStore;
237
238 @Activate
239 protected void activate() {
240 ApplicationId appId = coreService.registerApplication(APP_ID);
241 podStore = storageService.<String, Pod>consistentMapBuilder()
242 .withSerializer(Serializer.using(SERIALIZER_K8S_POD))
243 .withName("kubevirt-pod-store")
244 .withApplicationId(appId)
245 .build();
246
247 podStore.addListener(podMapListener);
248 log.info("Started");
249 }
250
251 @Deactivate
252 protected void deactivate() {
253 podStore.removeListener(podMapListener);
254 eventExecutor.shutdown();
255 log.info("Stopped");
256 }
257
258 @Override
259 public void createPod(Pod pod) {
260 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
261 final String error = pod.getMetadata().getUid() + ERR_DUPLICATE;
262 checkArgument(existing == null, error);
263 return pod;
264 });
265 }
266
267 @Override
268 public void updatePod(Pod pod) {
269 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
270 final String error = pod.getMetadata().getUid() + ERR_NOT_FOUND;
271 checkArgument(existing != null, error);
272 return pod;
273 });
274 }
275
276 @Override
277 public Pod removePod(String uid) {
278 Versioned<Pod> pod = podStore.remove(uid);
279 if (pod == null) {
280 final String error = uid + ERR_NOT_FOUND;
281 throw new IllegalArgumentException(error);
282 }
283 return pod.value();
284 }
285
286 @Override
287 public Pod pod(String uid) {
288 return podStore.asJavaMap().get(uid);
289 }
290
291 @Override
292 public Set<Pod> pods() {
293 return ImmutableSet.copyOf(podStore.asJavaMap().values());
294 }
295
296 @Override
297 public void clear() {
298 podStore.clear();
299 }
300
301 private class KubevirtPodMapListener implements MapEventListener<String, Pod> {
302
303 @Override
304 public void event(MapEvent<String, Pod> event) {
305
306 switch (event.type()) {
307 case INSERT:
308 log.debug("Kubernetes pod created {}", event.newValue());
309 eventExecutor.execute(() ->
310 notifyDelegate(new KubevirtPodEvent(
311 KUBEVIRT_POD_CREATED, event.newValue().value())));
312 break;
313 case UPDATE:
314 log.debug("Kubernetes pod updated {}", event.newValue());
315 eventExecutor.execute(() -> processUpdate(event));
316 break;
317 case REMOVE:
318 log.debug("Kubernetes pod removed {}", event.oldValue());
319 eventExecutor.execute(() ->
320 notifyDelegate(new KubevirtPodEvent(
321 KUBEVIRT_POD_REMOVED, event.oldValue().value())));
322 break;
323 default:
324 // do nothing
325 break;
326 }
327 }
328
329 private void processUpdate(MapEvent<String, Pod> event) {
330 notifyDelegate(new KubevirtPodEvent(
331 KUBEVIRT_POD_UPDATED, event.newValue().value()));
332
333 String oldPhase = event.oldValue().value().getStatus().getPhase();
334 String newPhase = event.newValue().value().getStatus().getPhase();
335
336 if (!PENDING.equals(oldPhase) && PENDING.equals(newPhase)) {
337 notifyDelegate(new KubevirtPodEvent(
338 KUBEVIRT_POD_PENDING, event.newValue().value()));
339 }
340
341 if (!RUNNING.equals(oldPhase) && RUNNING.equals(newPhase)) {
342 notifyDelegate(new KubevirtPodEvent(
343 KUBEVIRT_POD_RUNNING, event.newValue().value()));
344 }
345
346 if (!SUCCEEDED.equals(oldPhase) && SUCCEEDED.equals(newPhase)) {
347 notifyDelegate(new KubevirtPodEvent(
348 KUBEVIRT_POD_SUCCEEDED, event.newValue().value()));
349 }
350
351 if (!FAILED.equals(oldPhase) && FAILED.equals(newPhase)) {
352 notifyDelegate(new KubevirtPodEvent(
353 KUBEVIRT_POD_FAILED, event.newValue().value()));
354 }
355
356 if (!UNKNOWN.equals(oldPhase) && UNKNOWN.equals(newPhase)) {
357 notifyDelegate(new KubevirtPodEvent(
358 KUBEVIRT_POD_UNKNOWN, event.newValue().value()));
359 }
360
361 if (!COMPLETED.equals(oldPhase) && COMPLETED.equals(newPhase)) {
362 notifyDelegate(new KubevirtPodEvent(
363 KUBEVIRT_POD_COMPLETED, event.newValue().value()));
364 }
365
366 if (!CRASH_LOOP_BACK_OFF.equals(oldPhase) && CRASH_LOOP_BACK_OFF.equals(newPhase)) {
367 notifyDelegate(new KubevirtPodEvent(
368 KUBEVIRT_POD_CRASH_LOOP_BACK_OFF, event.newValue().value()));
369 }
370
371 Map<String, String> oldAnnot = event.oldValue().value().getMetadata().getAnnotations();
372 Map<String, String> newAnnot = event.newValue().value().getMetadata().getAnnotations();
373
374 if (oldAnnot == null && newAnnot != null) {
375 notifyDelegate(new KubevirtPodEvent(
376 KUBEVIRT_POD_ANNOTATION_ADDED, event.newValue().value()));
377 }
378 }
379 }
380}