blob: 906c2dce1884cd1fcf9c93c01f6bda10a9b75ade [file] [log] [blame]
Jian Li747e1362019-02-19 22:59:46 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
Jian Li4a7ce672019-04-09 15:20:25 +090019import io.fabric8.kubernetes.api.model.Affinity;
Jian Lib1cd0b02019-02-21 16:41:40 +090020import io.fabric8.kubernetes.api.model.Capabilities;
21import io.fabric8.kubernetes.api.model.ConfigMapEnvSource;
22import io.fabric8.kubernetes.api.model.ConfigMapKeySelector;
23import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource;
24import io.fabric8.kubernetes.api.model.Container;
25import io.fabric8.kubernetes.api.model.ContainerPort;
26import io.fabric8.kubernetes.api.model.ContainerState;
27import io.fabric8.kubernetes.api.model.ContainerStateRunning;
28import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
29import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
30import io.fabric8.kubernetes.api.model.ContainerStatus;
31import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource;
32import io.fabric8.kubernetes.api.model.EnvFromSource;
33import io.fabric8.kubernetes.api.model.EnvVar;
34import io.fabric8.kubernetes.api.model.EnvVarSource;
35import io.fabric8.kubernetes.api.model.ExecAction;
Jian Li096262a2020-09-22 23:25:06 +090036import io.fabric8.kubernetes.api.model.FieldsV1;
Jian Lib1cd0b02019-02-21 16:41:40 +090037import io.fabric8.kubernetes.api.model.HTTPGetAction;
38import io.fabric8.kubernetes.api.model.HTTPHeader;
39import io.fabric8.kubernetes.api.model.HostPathVolumeSource;
40import io.fabric8.kubernetes.api.model.IntOrString;
41import io.fabric8.kubernetes.api.model.KeyToPath;
Jian Li4a7ce672019-04-09 15:20:25 +090042import io.fabric8.kubernetes.api.model.LabelSelector;
43import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
Jian Lib1cd0b02019-02-21 16:41:40 +090044import io.fabric8.kubernetes.api.model.Lifecycle;
Jian Li096262a2020-09-22 23:25:06 +090045import io.fabric8.kubernetes.api.model.ManagedFieldsEntry;
Jian Li4a7ce672019-04-09 15:20:25 +090046import io.fabric8.kubernetes.api.model.NodeAffinity;
47import io.fabric8.kubernetes.api.model.NodeSelector;
48import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
49import io.fabric8.kubernetes.api.model.NodeSelectorTerm;
Jian Lib1cd0b02019-02-21 16:41:40 +090050import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
Jian Li747e1362019-02-19 22:59:46 +090051import io.fabric8.kubernetes.api.model.ObjectMeta;
Jian Lib1cd0b02019-02-21 16:41:40 +090052import io.fabric8.kubernetes.api.model.OwnerReference;
53import io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSource;
Jian Li747e1362019-02-19 22:59:46 +090054import io.fabric8.kubernetes.api.model.Pod;
Jian Li4a7ce672019-04-09 15:20:25 +090055import io.fabric8.kubernetes.api.model.PodAffinity;
56import io.fabric8.kubernetes.api.model.PodAffinityTerm;
57import io.fabric8.kubernetes.api.model.PodAntiAffinity;
Jian Lib1cd0b02019-02-21 16:41:40 +090058import io.fabric8.kubernetes.api.model.PodCondition;
Jian Li14493892020-01-20 10:23:33 +090059import io.fabric8.kubernetes.api.model.PodIP;
Jian Lib1cd0b02019-02-21 16:41:40 +090060import io.fabric8.kubernetes.api.model.PodSecurityContext;
Jian Li747e1362019-02-19 22:59:46 +090061import io.fabric8.kubernetes.api.model.PodSpec;
62import io.fabric8.kubernetes.api.model.PodStatus;
Jian Li4a7ce672019-04-09 15:20:25 +090063import io.fabric8.kubernetes.api.model.PreferredSchedulingTerm;
Jian Lib1cd0b02019-02-21 16:41:40 +090064import io.fabric8.kubernetes.api.model.Probe;
65import io.fabric8.kubernetes.api.model.Quantity;
66import io.fabric8.kubernetes.api.model.ResourceFieldSelector;
67import io.fabric8.kubernetes.api.model.ResourceRequirements;
68import io.fabric8.kubernetes.api.model.SELinuxOptions;
Jian Liacf88ae2021-08-26 22:04:08 +090069import io.fabric8.kubernetes.api.model.SeccompProfile;
Jian Lib1cd0b02019-02-21 16:41:40 +090070import io.fabric8.kubernetes.api.model.SecretEnvSource;
71import io.fabric8.kubernetes.api.model.SecretKeySelector;
72import io.fabric8.kubernetes.api.model.SecretVolumeSource;
73import io.fabric8.kubernetes.api.model.SecurityContext;
74import io.fabric8.kubernetes.api.model.TCPSocketAction;
75import io.fabric8.kubernetes.api.model.Toleration;
76import io.fabric8.kubernetes.api.model.Volume;
77import io.fabric8.kubernetes.api.model.VolumeDevice;
78import io.fabric8.kubernetes.api.model.VolumeMount;
Jian Li4a7ce672019-04-09 15:20:25 +090079import io.fabric8.kubernetes.api.model.WeightedPodAffinityTerm;
Jian Li747e1362019-02-19 22:59:46 +090080import org.onlab.util.KryoNamespace;
81import org.onosproject.core.ApplicationId;
82import org.onosproject.core.CoreService;
83import org.onosproject.k8snetworking.api.K8sPodEvent;
84import org.onosproject.k8snetworking.api.K8sPodStore;
85import org.onosproject.k8snetworking.api.K8sPodStoreDelegate;
86import org.onosproject.store.AbstractStore;
87import org.onosproject.store.serializers.KryoNamespaces;
88import org.onosproject.store.service.ConsistentMap;
89import org.onosproject.store.service.MapEvent;
90import org.onosproject.store.service.MapEventListener;
91import org.onosproject.store.service.Serializer;
92import org.onosproject.store.service.StorageService;
93import org.onosproject.store.service.Versioned;
94import org.osgi.service.component.annotations.Activate;
95import org.osgi.service.component.annotations.Component;
96import org.osgi.service.component.annotations.Deactivate;
97import org.osgi.service.component.annotations.Reference;
98import org.osgi.service.component.annotations.ReferenceCardinality;
99import org.slf4j.Logger;
100
101import java.util.Collection;
Jian Lib1cd0b02019-02-21 16:41:40 +0900102import java.util.LinkedHashMap;
Jian Li5c755832019-04-11 23:24:28 +0900103import java.util.Map;
Jian Li747e1362019-02-19 22:59:46 +0900104import java.util.Set;
105import java.util.concurrent.ExecutorService;
106
107import static com.google.common.base.Preconditions.checkArgument;
108import static java.util.concurrent.Executors.newSingleThreadExecutor;
109import static org.onlab.util.Tools.groupedThreads;
Jian Li5c755832019-04-11 23:24:28 +0900110import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_ANNOTATION_ADDED;
111import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_COMPLETED;
112import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_CRASH_LOOP_BACK_OFF;
Jian Li747e1362019-02-19 22:59:46 +0900113import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_CREATED;
Jian Li5c755832019-04-11 23:24:28 +0900114import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_FAILED;
115import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_PENDING;
Jian Li747e1362019-02-19 22:59:46 +0900116import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_REMOVED;
Jian Li5c755832019-04-11 23:24:28 +0900117import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_RUNNING;
118import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_SUCCEEDED;
119import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_UNKNOWN;
Jian Li747e1362019-02-19 22:59:46 +0900120import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_UPDATED;
121import static org.slf4j.LoggerFactory.getLogger;
122
123/**
124 * Implementation of kubernetes pod store using consistent map.
125 */
126@Component(immediate = true, service = K8sPodStore.class)
127public class DistributedK8sPodStore
128 extends AbstractStore<K8sPodEvent, K8sPodStoreDelegate>
129 implements K8sPodStore {
130
131 private final Logger log = getLogger(getClass());
132
133 private static final String ERR_NOT_FOUND = " does not exist";
134 private static final String ERR_DUPLICATE = " already exists";
135 private static final String APP_ID = "org.onosproject.k8snetwork";
136
Jian Li5c755832019-04-11 23:24:28 +0900137 private static final String PENDING = "Pending";
138 private static final String RUNNING = "Running";
139 private static final String SUCCEEDED = "Succeeded";
140 private static final String FAILED = "Failed";
141 private static final String UNKNOWN = "Unknown";
142 private static final String COMPLETED = "Completed";
143 private static final String CRASH_LOOP_BACK_OFF = "CrashLoopBackOff";
144
Jian Li747e1362019-02-19 22:59:46 +0900145 private static final KryoNamespace
146 SERIALIZER_K8S_POD = KryoNamespace.newBuilder()
147 .register(KryoNamespaces.API)
148 .register(Pod.class)
149 .register(ObjectMeta.class)
150 .register(PodSpec.class)
151 .register(PodStatus.class)
Jian Lib1cd0b02019-02-21 16:41:40 +0900152 .register(PodCondition.class)
Jian Li14493892020-01-20 10:23:33 +0900153 .register(PodIP.class)
Jian Lib1cd0b02019-02-21 16:41:40 +0900154 .register(Container.class)
155 .register(EnvVar.class)
156 .register(EnvVarSource.class)
157 .register(EnvFromSource.class)
158 .register(ConfigMapEnvSource.class)
159 .register(ConfigMapKeySelector.class)
160 .register(ObjectFieldSelector.class)
161 .register(ResourceFieldSelector.class)
162 .register(SecretKeySelector.class)
163 .register(Lifecycle.class)
164 .register(SecretEnvSource.class)
165 .register(ContainerStatus.class)
166 .register(ContainerState.class)
167 .register(ContainerStateRunning.class)
168 .register(ContainerStateTerminated.class)
169 .register(ContainerStateWaiting.class)
170 .register(OwnerReference.class)
171 .register(Probe.class)
172 .register(ExecAction.class)
173 .register(HTTPGetAction.class)
174 .register(HTTPHeader.class)
175 .register(TCPSocketAction.class)
176 .register(ContainerPort.class)
177 .register(ResourceRequirements.class)
178 .register(SecurityContext.class)
179 .register(PodSecurityContext.class)
180 .register(SELinuxOptions.class)
181 .register(Volume.class)
182 .register(VolumeDevice.class)
183 .register(VolumeMount.class)
184 .register(IntOrString.class)
185 .register(Toleration.class)
186 .register(PersistentVolumeClaimVolumeSource.class)
187 .register(SecretVolumeSource.class)
188 .register(EmptyDirVolumeSource.class)
189 .register(Quantity.class)
190 .register(Capabilities.class)
191 .register(ConfigMapVolumeSource.class)
192 .register(KeyToPath.class)
193 .register(HostPathVolumeSource.class)
Jian Li4a7ce672019-04-09 15:20:25 +0900194 .register(Affinity.class)
195 .register(NodeAffinity.class)
196 .register(NodeSelector.class)
197 .register(NodeSelectorTerm.class)
198 .register(NodeSelectorRequirement.class)
199 .register(PreferredSchedulingTerm.class)
Jian Liacf88ae2021-08-26 22:04:08 +0900200 .register(SeccompProfile.class)
Jian Li4a7ce672019-04-09 15:20:25 +0900201 .register(PodAffinity.class)
202 .register(WeightedPodAffinityTerm.class)
203 .register(PodAffinityTerm.class)
204 .register(LabelSelector.class)
205 .register(LabelSelectorRequirement.class)
206 .register(PodAntiAffinity.class)
Jian Li096262a2020-09-22 23:25:06 +0900207 .register(ManagedFieldsEntry.class)
208 .register(FieldsV1.class)
Jian Lib1cd0b02019-02-21 16:41:40 +0900209 .register(LinkedHashMap.class)
Jian Li747e1362019-02-19 22:59:46 +0900210 .register(Collection.class)
211 .build();
212
213 @Reference(cardinality = ReferenceCardinality.MANDATORY)
214 protected CoreService coreService;
215
216 @Reference(cardinality = ReferenceCardinality.MANDATORY)
217 protected StorageService storageService;
218
219 private final ExecutorService eventExecutor = newSingleThreadExecutor(
220 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
221
222 private final MapEventListener<String, Pod> podMapListener = new K8sPodMapListener();
223
224 private ConsistentMap<String, Pod> podStore;
225
226 @Activate
227 protected void activate() {
228 ApplicationId appId = coreService.registerApplication(APP_ID);
229 podStore = storageService.<String, Pod>consistentMapBuilder()
230 .withSerializer(Serializer.using(SERIALIZER_K8S_POD))
231 .withName("k8s-pod-store")
232 .withApplicationId(appId)
233 .build();
234
235 podStore.addListener(podMapListener);
236 log.info("Started");
237 }
238
239 @Deactivate
240 protected void deactivate() {
241 podStore.removeListener(podMapListener);
242 eventExecutor.shutdown();
243 log.info("Stopped");
244 }
245
246 @Override
247 public void createPod(Pod pod) {
248 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
249 final String error = pod.getMetadata().getUid() + ERR_DUPLICATE;
250 checkArgument(existing == null, error);
251 return pod;
252 });
253 }
254
255 @Override
256 public void updatePod(Pod pod) {
257 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
258 final String error = pod.getMetadata().getUid() + ERR_NOT_FOUND;
259 checkArgument(existing != null, error);
260 return pod;
261 });
262 }
263
264 @Override
265 public Pod removePod(String uid) {
266 Versioned<Pod> pod = podStore.remove(uid);
267 if (pod == null) {
268 final String error = uid + ERR_NOT_FOUND;
269 throw new IllegalArgumentException(error);
270 }
271 return pod.value();
272 }
273
274 @Override
275 public Pod pod(String uid) {
276 return podStore.asJavaMap().get(uid);
277 }
278
279 @Override
280 public Set<Pod> pods() {
281 return ImmutableSet.copyOf(podStore.asJavaMap().values());
282 }
283
284 @Override
285 public void clear() {
286 podStore.clear();
287 }
288
289 private class K8sPodMapListener implements MapEventListener<String, Pod> {
290
291 @Override
292 public void event(MapEvent<String, Pod> event) {
293
294 switch (event.type()) {
295 case INSERT:
296 log.debug("Kubernetes pod created {}", event.newValue());
297 eventExecutor.execute(() ->
298 notifyDelegate(new K8sPodEvent(
299 K8S_POD_CREATED, event.newValue().value())));
300 break;
301 case UPDATE:
302 log.debug("Kubernetes pod updated {}", event.newValue());
Jian Li5c755832019-04-11 23:24:28 +0900303 eventExecutor.execute(() -> processUpdate(event));
Jian Li747e1362019-02-19 22:59:46 +0900304 break;
305 case REMOVE:
306 log.debug("Kubernetes pod removed {}", event.oldValue());
307 eventExecutor.execute(() ->
308 notifyDelegate(new K8sPodEvent(
309 K8S_POD_REMOVED, event.oldValue().value())));
310 break;
311 default:
312 // do nothing
313 break;
314 }
315 }
Jian Li5c755832019-04-11 23:24:28 +0900316
317 private void processUpdate(MapEvent<String, Pod> event) {
318 notifyDelegate(new K8sPodEvent(
319 K8S_POD_UPDATED, event.newValue().value()));
320
321 String oldPhase = event.oldValue().value().getStatus().getPhase();
322 String newPhase = event.newValue().value().getStatus().getPhase();
323
324 if (!PENDING.equals(oldPhase) && PENDING.equals(newPhase)) {
325 notifyDelegate(new K8sPodEvent(
326 K8S_POD_PENDING, event.newValue().value()));
327 }
328
329 if (!RUNNING.equals(oldPhase) && RUNNING.equals(newPhase)) {
330 notifyDelegate(new K8sPodEvent(
331 K8S_POD_RUNNING, event.newValue().value()));
332 }
333
334 if (!SUCCEEDED.equals(oldPhase) && SUCCEEDED.equals(newPhase)) {
335 notifyDelegate(new K8sPodEvent(
336 K8S_POD_SUCCEEDED, event.newValue().value()));
337 }
338
339 if (!FAILED.equals(oldPhase) && FAILED.equals(newPhase)) {
340 notifyDelegate(new K8sPodEvent(
341 K8S_POD_FAILED, event.newValue().value()));
342 }
343
344 if (!UNKNOWN.equals(oldPhase) && UNKNOWN.equals(newPhase)) {
345 notifyDelegate(new K8sPodEvent(
346 K8S_POD_UNKNOWN, event.newValue().value()));
347 }
348
349 if (!COMPLETED.equals(oldPhase) && COMPLETED.equals(newPhase)) {
350 notifyDelegate(new K8sPodEvent(
351 K8S_POD_COMPLETED, event.newValue().value()));
352 }
353
354 if (!CRASH_LOOP_BACK_OFF.equals(oldPhase) && CRASH_LOOP_BACK_OFF.equals(newPhase)) {
355 notifyDelegate(new K8sPodEvent(
356 K8S_POD_CRASH_LOOP_BACK_OFF, event.newValue().value()));
357 }
358
359 Map<String, String> oldAnnot = event.oldValue().value().getMetadata().getAnnotations();
360 Map<String, String> newAnnot = event.newValue().value().getMetadata().getAnnotations();
361
362 if (oldAnnot == null && newAnnot != null) {
363 notifyDelegate(new K8sPodEvent(
364 K8S_POD_ANNOTATION_ADDED, event.newValue().value()));
365 }
366 }
Jian Li747e1362019-02-19 22:59:46 +0900367 }
368}