blob: 1b3fd0b7a55cea9e8631d9559773b386f308d9a8 [file] [log] [blame]
Jian Liac31f652021-01-17 02:18:30 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
19import io.fabric8.kubernetes.api.model.Affinity;
20import io.fabric8.kubernetes.api.model.Capabilities;
21import io.fabric8.kubernetes.api.model.ConfigMapEnvSource;
22import io.fabric8.kubernetes.api.model.ConfigMapKeySelector;
23import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource;
24import io.fabric8.kubernetes.api.model.Container;
25import io.fabric8.kubernetes.api.model.ContainerPort;
26import io.fabric8.kubernetes.api.model.ContainerState;
27import io.fabric8.kubernetes.api.model.ContainerStateRunning;
28import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
29import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
30import io.fabric8.kubernetes.api.model.ContainerStatus;
31import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource;
32import io.fabric8.kubernetes.api.model.EnvFromSource;
33import io.fabric8.kubernetes.api.model.EnvVar;
34import io.fabric8.kubernetes.api.model.EnvVarSource;
35import io.fabric8.kubernetes.api.model.ExecAction;
36import io.fabric8.kubernetes.api.model.FieldsV1;
37import io.fabric8.kubernetes.api.model.HTTPGetAction;
38import io.fabric8.kubernetes.api.model.HTTPHeader;
Jian Lifc724252021-01-29 00:01:51 +090039import io.fabric8.kubernetes.api.model.Handler;
Jian Liac31f652021-01-17 02:18:30 +090040import io.fabric8.kubernetes.api.model.HostPathVolumeSource;
41import io.fabric8.kubernetes.api.model.IntOrString;
42import io.fabric8.kubernetes.api.model.KeyToPath;
43import io.fabric8.kubernetes.api.model.LabelSelector;
44import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
45import io.fabric8.kubernetes.api.model.Lifecycle;
Jian Lifc724252021-01-29 00:01:51 +090046import io.fabric8.kubernetes.api.model.LocalObjectReference;
Jian Liac31f652021-01-17 02:18:30 +090047import io.fabric8.kubernetes.api.model.ManagedFieldsEntry;
48import io.fabric8.kubernetes.api.model.NodeAffinity;
49import io.fabric8.kubernetes.api.model.NodeSelector;
50import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
51import io.fabric8.kubernetes.api.model.NodeSelectorTerm;
52import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
53import io.fabric8.kubernetes.api.model.ObjectMeta;
54import io.fabric8.kubernetes.api.model.OwnerReference;
55import io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSource;
56import io.fabric8.kubernetes.api.model.Pod;
57import io.fabric8.kubernetes.api.model.PodAffinity;
58import io.fabric8.kubernetes.api.model.PodAffinityTerm;
59import io.fabric8.kubernetes.api.model.PodAntiAffinity;
60import io.fabric8.kubernetes.api.model.PodCondition;
61import io.fabric8.kubernetes.api.model.PodIP;
62import io.fabric8.kubernetes.api.model.PodSecurityContext;
63import io.fabric8.kubernetes.api.model.PodSpec;
64import io.fabric8.kubernetes.api.model.PodStatus;
65import io.fabric8.kubernetes.api.model.PreferredSchedulingTerm;
66import io.fabric8.kubernetes.api.model.Probe;
67import io.fabric8.kubernetes.api.model.Quantity;
68import io.fabric8.kubernetes.api.model.ResourceFieldSelector;
69import io.fabric8.kubernetes.api.model.ResourceRequirements;
70import io.fabric8.kubernetes.api.model.SELinuxOptions;
Jian Li72f3dac2021-01-28 16:14:54 +090071import io.fabric8.kubernetes.api.model.SeccompProfile;
Jian Liac31f652021-01-17 02:18:30 +090072import io.fabric8.kubernetes.api.model.SecretEnvSource;
73import io.fabric8.kubernetes.api.model.SecretKeySelector;
74import io.fabric8.kubernetes.api.model.SecretVolumeSource;
75import io.fabric8.kubernetes.api.model.SecurityContext;
76import io.fabric8.kubernetes.api.model.TCPSocketAction;
77import io.fabric8.kubernetes.api.model.Toleration;
78import io.fabric8.kubernetes.api.model.Volume;
79import io.fabric8.kubernetes.api.model.VolumeDevice;
80import io.fabric8.kubernetes.api.model.VolumeMount;
81import io.fabric8.kubernetes.api.model.WeightedPodAffinityTerm;
82import org.onlab.util.KryoNamespace;
83import org.onosproject.core.ApplicationId;
84import org.onosproject.core.CoreService;
85import org.onosproject.kubevirtnetworking.api.KubevirtPodEvent;
86import org.onosproject.kubevirtnetworking.api.KubevirtPodStore;
87import org.onosproject.kubevirtnetworking.api.KubevirtPodStoreDelegate;
88import org.onosproject.store.AbstractStore;
89import org.onosproject.store.serializers.KryoNamespaces;
90import org.onosproject.store.service.ConsistentMap;
91import org.onosproject.store.service.MapEvent;
92import org.onosproject.store.service.MapEventListener;
93import org.onosproject.store.service.Serializer;
94import org.onosproject.store.service.StorageService;
95import org.onosproject.store.service.Versioned;
96import org.osgi.service.component.annotations.Activate;
97import org.osgi.service.component.annotations.Component;
98import org.osgi.service.component.annotations.Deactivate;
99import org.osgi.service.component.annotations.Reference;
100import org.osgi.service.component.annotations.ReferenceCardinality;
101import org.slf4j.Logger;
102
103import java.util.Collection;
104import java.util.LinkedHashMap;
105import java.util.Map;
106import java.util.Set;
107import java.util.concurrent.ExecutorService;
108
109import static com.google.common.base.Preconditions.checkArgument;
110import static java.util.concurrent.Executors.newSingleThreadExecutor;
111import static org.onlab.util.Tools.groupedThreads;
112import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_ANNOTATION_ADDED;
113import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_COMPLETED;
114import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_CRASH_LOOP_BACK_OFF;
115import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_CREATED;
116import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_FAILED;
117import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_PENDING;
118import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_REMOVED;
119import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_RUNNING;
120import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_SUCCEEDED;
121import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_UNKNOWN;
122import static org.onosproject.kubevirtnetworking.api.KubevirtPodEvent.Type.KUBEVIRT_POD_UPDATED;
123import static org.slf4j.LoggerFactory.getLogger;
124
125/**
126 * Implementation of kubevirt pod store using consistent map.
127 */
128@Component(immediate = true, service = KubevirtPodStore.class)
129public class DistributedKubevirtPodStore
130 extends AbstractStore<KubevirtPodEvent, KubevirtPodStoreDelegate>
131 implements KubevirtPodStore {
132 private final Logger log = getLogger(getClass());
133
134 private static final String ERR_NOT_FOUND = " does not exist";
135 private static final String ERR_DUPLICATE = " already exists";
136 private static final String APP_ID = "org.onosproject.k8snetwork";
137
138 private static final String PENDING = "Pending";
139 private static final String RUNNING = "Running";
140 private static final String SUCCEEDED = "Succeeded";
141 private static final String FAILED = "Failed";
142 private static final String UNKNOWN = "Unknown";
143 private static final String COMPLETED = "Completed";
144 private static final String CRASH_LOOP_BACK_OFF = "CrashLoopBackOff";
145
146 private static final KryoNamespace
147 SERIALIZER_K8S_POD = KryoNamespace.newBuilder()
148 .register(KryoNamespaces.API)
149 .register(Pod.class)
150 .register(ObjectMeta.class)
151 .register(PodSpec.class)
152 .register(PodStatus.class)
153 .register(PodCondition.class)
154 .register(PodIP.class)
155 .register(Container.class)
156 .register(EnvVar.class)
157 .register(EnvVarSource.class)
158 .register(EnvFromSource.class)
159 .register(ConfigMapEnvSource.class)
160 .register(ConfigMapKeySelector.class)
161 .register(ObjectFieldSelector.class)
162 .register(ResourceFieldSelector.class)
163 .register(SecretKeySelector.class)
164 .register(Lifecycle.class)
165 .register(SecretEnvSource.class)
166 .register(ContainerStatus.class)
167 .register(ContainerState.class)
168 .register(ContainerStateRunning.class)
169 .register(ContainerStateTerminated.class)
170 .register(ContainerStateWaiting.class)
171 .register(OwnerReference.class)
172 .register(Probe.class)
173 .register(ExecAction.class)
174 .register(HTTPGetAction.class)
175 .register(HTTPHeader.class)
176 .register(TCPSocketAction.class)
177 .register(ContainerPort.class)
178 .register(ResourceRequirements.class)
179 .register(SecurityContext.class)
180 .register(PodSecurityContext.class)
181 .register(SELinuxOptions.class)
Jian Li72f3dac2021-01-28 16:14:54 +0900182 .register(SeccompProfile.class)
Jian Lifc724252021-01-29 00:01:51 +0900183 .register(Handler.class)
184 .register(ExecAction.class)
185 .register(HTTPGetAction.class)
186 .register(TCPSocketAction.class)
187 .register(HTTPHeader.class)
Jian Liac31f652021-01-17 02:18:30 +0900188 .register(Volume.class)
189 .register(VolumeDevice.class)
190 .register(VolumeMount.class)
191 .register(IntOrString.class)
192 .register(Toleration.class)
193 .register(PersistentVolumeClaimVolumeSource.class)
194 .register(SecretVolumeSource.class)
195 .register(EmptyDirVolumeSource.class)
196 .register(Quantity.class)
197 .register(Capabilities.class)
198 .register(ConfigMapVolumeSource.class)
199 .register(KeyToPath.class)
200 .register(HostPathVolumeSource.class)
201 .register(Affinity.class)
202 .register(NodeAffinity.class)
203 .register(NodeSelector.class)
204 .register(NodeSelectorTerm.class)
205 .register(NodeSelectorRequirement.class)
206 .register(PreferredSchedulingTerm.class)
207 .register(PodAffinity.class)
208 .register(WeightedPodAffinityTerm.class)
209 .register(PodAffinityTerm.class)
210 .register(LabelSelector.class)
211 .register(LabelSelectorRequirement.class)
Jian Lifc724252021-01-29 00:01:51 +0900212 .register(LocalObjectReference.class)
Jian Liac31f652021-01-17 02:18:30 +0900213 .register(PodAntiAffinity.class)
214 .register(ManagedFieldsEntry.class)
215 .register(FieldsV1.class)
216 .register(LinkedHashMap.class)
217 .register(Collection.class)
218 .build();
219
220 @Reference(cardinality = ReferenceCardinality.MANDATORY)
221 protected CoreService coreService;
222
223 @Reference(cardinality = ReferenceCardinality.MANDATORY)
224 protected StorageService storageService;
225
226 private final ExecutorService eventExecutor = newSingleThreadExecutor(
227 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
228
229 private final MapEventListener<String, Pod> podMapListener = new KubevirtPodMapListener();
230
231 private ConsistentMap<String, Pod> podStore;
232
233 @Activate
234 protected void activate() {
235 ApplicationId appId = coreService.registerApplication(APP_ID);
236 podStore = storageService.<String, Pod>consistentMapBuilder()
237 .withSerializer(Serializer.using(SERIALIZER_K8S_POD))
238 .withName("kubevirt-pod-store")
239 .withApplicationId(appId)
240 .build();
241
242 podStore.addListener(podMapListener);
243 log.info("Started");
244 }
245
246 @Deactivate
247 protected void deactivate() {
248 podStore.removeListener(podMapListener);
249 eventExecutor.shutdown();
250 log.info("Stopped");
251 }
252
253 @Override
254 public void createPod(Pod pod) {
255 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
256 final String error = pod.getMetadata().getUid() + ERR_DUPLICATE;
257 checkArgument(existing == null, error);
258 return pod;
259 });
260 }
261
262 @Override
263 public void updatePod(Pod pod) {
264 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
265 final String error = pod.getMetadata().getUid() + ERR_NOT_FOUND;
266 checkArgument(existing != null, error);
267 return pod;
268 });
269 }
270
271 @Override
272 public Pod removePod(String uid) {
273 Versioned<Pod> pod = podStore.remove(uid);
274 if (pod == null) {
275 final String error = uid + ERR_NOT_FOUND;
276 throw new IllegalArgumentException(error);
277 }
278 return pod.value();
279 }
280
281 @Override
282 public Pod pod(String uid) {
283 return podStore.asJavaMap().get(uid);
284 }
285
286 @Override
287 public Set<Pod> pods() {
288 return ImmutableSet.copyOf(podStore.asJavaMap().values());
289 }
290
291 @Override
292 public void clear() {
293 podStore.clear();
294 }
295
296 private class KubevirtPodMapListener implements MapEventListener<String, Pod> {
297
298 @Override
299 public void event(MapEvent<String, Pod> event) {
300
301 switch (event.type()) {
302 case INSERT:
303 log.debug("Kubernetes pod created {}", event.newValue());
304 eventExecutor.execute(() ->
305 notifyDelegate(new KubevirtPodEvent(
306 KUBEVIRT_POD_CREATED, event.newValue().value())));
307 break;
308 case UPDATE:
309 log.debug("Kubernetes pod updated {}", event.newValue());
310 eventExecutor.execute(() -> processUpdate(event));
311 break;
312 case REMOVE:
313 log.debug("Kubernetes pod removed {}", event.oldValue());
314 eventExecutor.execute(() ->
315 notifyDelegate(new KubevirtPodEvent(
316 KUBEVIRT_POD_REMOVED, event.oldValue().value())));
317 break;
318 default:
319 // do nothing
320 break;
321 }
322 }
323
324 private void processUpdate(MapEvent<String, Pod> event) {
325 notifyDelegate(new KubevirtPodEvent(
326 KUBEVIRT_POD_UPDATED, event.newValue().value()));
327
328 String oldPhase = event.oldValue().value().getStatus().getPhase();
329 String newPhase = event.newValue().value().getStatus().getPhase();
330
331 if (!PENDING.equals(oldPhase) && PENDING.equals(newPhase)) {
332 notifyDelegate(new KubevirtPodEvent(
333 KUBEVIRT_POD_PENDING, event.newValue().value()));
334 }
335
336 if (!RUNNING.equals(oldPhase) && RUNNING.equals(newPhase)) {
337 notifyDelegate(new KubevirtPodEvent(
338 KUBEVIRT_POD_RUNNING, event.newValue().value()));
339 }
340
341 if (!SUCCEEDED.equals(oldPhase) && SUCCEEDED.equals(newPhase)) {
342 notifyDelegate(new KubevirtPodEvent(
343 KUBEVIRT_POD_SUCCEEDED, event.newValue().value()));
344 }
345
346 if (!FAILED.equals(oldPhase) && FAILED.equals(newPhase)) {
347 notifyDelegate(new KubevirtPodEvent(
348 KUBEVIRT_POD_FAILED, event.newValue().value()));
349 }
350
351 if (!UNKNOWN.equals(oldPhase) && UNKNOWN.equals(newPhase)) {
352 notifyDelegate(new KubevirtPodEvent(
353 KUBEVIRT_POD_UNKNOWN, event.newValue().value()));
354 }
355
356 if (!COMPLETED.equals(oldPhase) && COMPLETED.equals(newPhase)) {
357 notifyDelegate(new KubevirtPodEvent(
358 KUBEVIRT_POD_COMPLETED, event.newValue().value()));
359 }
360
361 if (!CRASH_LOOP_BACK_OFF.equals(oldPhase) && CRASH_LOOP_BACK_OFF.equals(newPhase)) {
362 notifyDelegate(new KubevirtPodEvent(
363 KUBEVIRT_POD_CRASH_LOOP_BACK_OFF, event.newValue().value()));
364 }
365
366 Map<String, String> oldAnnot = event.oldValue().value().getMetadata().getAnnotations();
367 Map<String, String> newAnnot = event.newValue().value().getMetadata().getAnnotations();
368
369 if (oldAnnot == null && newAnnot != null) {
370 notifyDelegate(new KubevirtPodEvent(
371 KUBEVIRT_POD_ANNOTATION_ADDED, event.newValue().value()));
372 }
373 }
374 }
375}