blob: 7f2a51dfa9ce753242badf0f24cf42533cadcce5 [file] [log] [blame]
Jian Li747e1362019-02-19 22:59:46 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
Jian Li4a7ce672019-04-09 15:20:25 +090019import io.fabric8.kubernetes.api.model.Affinity;
Jian Lib1cd0b02019-02-21 16:41:40 +090020import io.fabric8.kubernetes.api.model.Capabilities;
21import io.fabric8.kubernetes.api.model.ConfigMapEnvSource;
22import io.fabric8.kubernetes.api.model.ConfigMapKeySelector;
23import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource;
24import io.fabric8.kubernetes.api.model.Container;
25import io.fabric8.kubernetes.api.model.ContainerPort;
26import io.fabric8.kubernetes.api.model.ContainerState;
27import io.fabric8.kubernetes.api.model.ContainerStateRunning;
28import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
29import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
30import io.fabric8.kubernetes.api.model.ContainerStatus;
31import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource;
32import io.fabric8.kubernetes.api.model.EnvFromSource;
33import io.fabric8.kubernetes.api.model.EnvVar;
34import io.fabric8.kubernetes.api.model.EnvVarSource;
35import io.fabric8.kubernetes.api.model.ExecAction;
36import io.fabric8.kubernetes.api.model.HTTPGetAction;
37import io.fabric8.kubernetes.api.model.HTTPHeader;
38import io.fabric8.kubernetes.api.model.HostPathVolumeSource;
39import io.fabric8.kubernetes.api.model.IntOrString;
40import io.fabric8.kubernetes.api.model.KeyToPath;
Jian Li4a7ce672019-04-09 15:20:25 +090041import io.fabric8.kubernetes.api.model.LabelSelector;
42import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
Jian Lib1cd0b02019-02-21 16:41:40 +090043import io.fabric8.kubernetes.api.model.Lifecycle;
Jian Li4a7ce672019-04-09 15:20:25 +090044import io.fabric8.kubernetes.api.model.NodeAffinity;
45import io.fabric8.kubernetes.api.model.NodeSelector;
46import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
47import io.fabric8.kubernetes.api.model.NodeSelectorTerm;
Jian Lib1cd0b02019-02-21 16:41:40 +090048import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
Jian Li747e1362019-02-19 22:59:46 +090049import io.fabric8.kubernetes.api.model.ObjectMeta;
Jian Lib1cd0b02019-02-21 16:41:40 +090050import io.fabric8.kubernetes.api.model.OwnerReference;
51import io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSource;
Jian Li747e1362019-02-19 22:59:46 +090052import io.fabric8.kubernetes.api.model.Pod;
Jian Li4a7ce672019-04-09 15:20:25 +090053import io.fabric8.kubernetes.api.model.PodAffinity;
54import io.fabric8.kubernetes.api.model.PodAffinityTerm;
55import io.fabric8.kubernetes.api.model.PodAntiAffinity;
Jian Lib1cd0b02019-02-21 16:41:40 +090056import io.fabric8.kubernetes.api.model.PodCondition;
Jian Li599075a2020-01-20 10:23:33 +090057import io.fabric8.kubernetes.api.model.PodIP;
Jian Lib1cd0b02019-02-21 16:41:40 +090058import io.fabric8.kubernetes.api.model.PodSecurityContext;
Jian Li747e1362019-02-19 22:59:46 +090059import io.fabric8.kubernetes.api.model.PodSpec;
60import io.fabric8.kubernetes.api.model.PodStatus;
Jian Li4a7ce672019-04-09 15:20:25 +090061import io.fabric8.kubernetes.api.model.PreferredSchedulingTerm;
Jian Lib1cd0b02019-02-21 16:41:40 +090062import io.fabric8.kubernetes.api.model.Probe;
63import io.fabric8.kubernetes.api.model.Quantity;
64import io.fabric8.kubernetes.api.model.ResourceFieldSelector;
65import io.fabric8.kubernetes.api.model.ResourceRequirements;
66import io.fabric8.kubernetes.api.model.SELinuxOptions;
67import io.fabric8.kubernetes.api.model.SecretEnvSource;
68import io.fabric8.kubernetes.api.model.SecretKeySelector;
69import io.fabric8.kubernetes.api.model.SecretVolumeSource;
70import io.fabric8.kubernetes.api.model.SecurityContext;
71import io.fabric8.kubernetes.api.model.TCPSocketAction;
72import io.fabric8.kubernetes.api.model.Toleration;
73import io.fabric8.kubernetes.api.model.Volume;
74import io.fabric8.kubernetes.api.model.VolumeDevice;
75import io.fabric8.kubernetes.api.model.VolumeMount;
Jian Li4a7ce672019-04-09 15:20:25 +090076import io.fabric8.kubernetes.api.model.WeightedPodAffinityTerm;
Jian Li747e1362019-02-19 22:59:46 +090077import org.onlab.util.KryoNamespace;
78import org.onosproject.core.ApplicationId;
79import org.onosproject.core.CoreService;
80import org.onosproject.k8snetworking.api.K8sPodEvent;
81import org.onosproject.k8snetworking.api.K8sPodStore;
82import org.onosproject.k8snetworking.api.K8sPodStoreDelegate;
83import org.onosproject.store.AbstractStore;
84import org.onosproject.store.serializers.KryoNamespaces;
85import org.onosproject.store.service.ConsistentMap;
86import org.onosproject.store.service.MapEvent;
87import org.onosproject.store.service.MapEventListener;
88import org.onosproject.store.service.Serializer;
89import org.onosproject.store.service.StorageService;
90import org.onosproject.store.service.Versioned;
91import org.osgi.service.component.annotations.Activate;
92import org.osgi.service.component.annotations.Component;
93import org.osgi.service.component.annotations.Deactivate;
94import org.osgi.service.component.annotations.Reference;
95import org.osgi.service.component.annotations.ReferenceCardinality;
96import org.slf4j.Logger;
97
98import java.util.Collection;
Jian Lib1cd0b02019-02-21 16:41:40 +090099import java.util.LinkedHashMap;
Jian Li5c755832019-04-11 23:24:28 +0900100import java.util.Map;
Jian Li747e1362019-02-19 22:59:46 +0900101import java.util.Set;
102import java.util.concurrent.ExecutorService;
103
104import static com.google.common.base.Preconditions.checkArgument;
105import static java.util.concurrent.Executors.newSingleThreadExecutor;
106import static org.onlab.util.Tools.groupedThreads;
Jian Li5c755832019-04-11 23:24:28 +0900107import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_ANNOTATION_ADDED;
108import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_COMPLETED;
109import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_CRASH_LOOP_BACK_OFF;
Jian Li747e1362019-02-19 22:59:46 +0900110import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_CREATED;
Jian Li5c755832019-04-11 23:24:28 +0900111import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_FAILED;
112import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_PENDING;
Jian Li747e1362019-02-19 22:59:46 +0900113import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_REMOVED;
Jian Li5c755832019-04-11 23:24:28 +0900114import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_RUNNING;
115import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_SUCCEEDED;
116import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_UNKNOWN;
Jian Li747e1362019-02-19 22:59:46 +0900117import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_UPDATED;
118import static org.slf4j.LoggerFactory.getLogger;
119
120/**
121 * Implementation of kubernetes pod store using consistent map.
122 */
123@Component(immediate = true, service = K8sPodStore.class)
124public class DistributedK8sPodStore
125 extends AbstractStore<K8sPodEvent, K8sPodStoreDelegate>
126 implements K8sPodStore {
127
128 private final Logger log = getLogger(getClass());
129
130 private static final String ERR_NOT_FOUND = " does not exist";
131 private static final String ERR_DUPLICATE = " already exists";
132 private static final String APP_ID = "org.onosproject.k8snetwork";
133
Jian Li5c755832019-04-11 23:24:28 +0900134 private static final String PENDING = "Pending";
135 private static final String RUNNING = "Running";
136 private static final String SUCCEEDED = "Succeeded";
137 private static final String FAILED = "Failed";
138 private static final String UNKNOWN = "Unknown";
139 private static final String COMPLETED = "Completed";
140 private static final String CRASH_LOOP_BACK_OFF = "CrashLoopBackOff";
141
Jian Li747e1362019-02-19 22:59:46 +0900142 private static final KryoNamespace
143 SERIALIZER_K8S_POD = KryoNamespace.newBuilder()
144 .register(KryoNamespaces.API)
145 .register(Pod.class)
146 .register(ObjectMeta.class)
147 .register(PodSpec.class)
148 .register(PodStatus.class)
Jian Lib1cd0b02019-02-21 16:41:40 +0900149 .register(PodCondition.class)
Jian Li599075a2020-01-20 10:23:33 +0900150 .register(PodIP.class)
Jian Lib1cd0b02019-02-21 16:41:40 +0900151 .register(Container.class)
152 .register(EnvVar.class)
153 .register(EnvVarSource.class)
154 .register(EnvFromSource.class)
155 .register(ConfigMapEnvSource.class)
156 .register(ConfigMapKeySelector.class)
157 .register(ObjectFieldSelector.class)
158 .register(ResourceFieldSelector.class)
159 .register(SecretKeySelector.class)
160 .register(Lifecycle.class)
161 .register(SecretEnvSource.class)
162 .register(ContainerStatus.class)
163 .register(ContainerState.class)
164 .register(ContainerStateRunning.class)
165 .register(ContainerStateTerminated.class)
166 .register(ContainerStateWaiting.class)
167 .register(OwnerReference.class)
168 .register(Probe.class)
169 .register(ExecAction.class)
170 .register(HTTPGetAction.class)
171 .register(HTTPHeader.class)
172 .register(TCPSocketAction.class)
173 .register(ContainerPort.class)
174 .register(ResourceRequirements.class)
175 .register(SecurityContext.class)
176 .register(PodSecurityContext.class)
177 .register(SELinuxOptions.class)
178 .register(Volume.class)
179 .register(VolumeDevice.class)
180 .register(VolumeMount.class)
181 .register(IntOrString.class)
182 .register(Toleration.class)
183 .register(PersistentVolumeClaimVolumeSource.class)
184 .register(SecretVolumeSource.class)
185 .register(EmptyDirVolumeSource.class)
186 .register(Quantity.class)
187 .register(Capabilities.class)
188 .register(ConfigMapVolumeSource.class)
189 .register(KeyToPath.class)
190 .register(HostPathVolumeSource.class)
Jian Li4a7ce672019-04-09 15:20:25 +0900191 .register(Affinity.class)
192 .register(NodeAffinity.class)
193 .register(NodeSelector.class)
194 .register(NodeSelectorTerm.class)
195 .register(NodeSelectorRequirement.class)
196 .register(PreferredSchedulingTerm.class)
197 .register(PodAffinity.class)
198 .register(WeightedPodAffinityTerm.class)
199 .register(PodAffinityTerm.class)
200 .register(LabelSelector.class)
201 .register(LabelSelectorRequirement.class)
202 .register(PodAntiAffinity.class)
Jian Lib1cd0b02019-02-21 16:41:40 +0900203 .register(LinkedHashMap.class)
Jian Li747e1362019-02-19 22:59:46 +0900204 .register(Collection.class)
205 .build();
206
207 @Reference(cardinality = ReferenceCardinality.MANDATORY)
208 protected CoreService coreService;
209
210 @Reference(cardinality = ReferenceCardinality.MANDATORY)
211 protected StorageService storageService;
212
213 private final ExecutorService eventExecutor = newSingleThreadExecutor(
214 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
215
216 private final MapEventListener<String, Pod> podMapListener = new K8sPodMapListener();
217
218 private ConsistentMap<String, Pod> podStore;
219
220 @Activate
221 protected void activate() {
222 ApplicationId appId = coreService.registerApplication(APP_ID);
223 podStore = storageService.<String, Pod>consistentMapBuilder()
224 .withSerializer(Serializer.using(SERIALIZER_K8S_POD))
225 .withName("k8s-pod-store")
226 .withApplicationId(appId)
227 .build();
228
229 podStore.addListener(podMapListener);
230 log.info("Started");
231 }
232
233 @Deactivate
234 protected void deactivate() {
235 podStore.removeListener(podMapListener);
236 eventExecutor.shutdown();
237 log.info("Stopped");
238 }
239
240 @Override
241 public void createPod(Pod pod) {
242 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
243 final String error = pod.getMetadata().getUid() + ERR_DUPLICATE;
244 checkArgument(existing == null, error);
245 return pod;
246 });
247 }
248
249 @Override
250 public void updatePod(Pod pod) {
251 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
252 final String error = pod.getMetadata().getUid() + ERR_NOT_FOUND;
253 checkArgument(existing != null, error);
254 return pod;
255 });
256 }
257
258 @Override
259 public Pod removePod(String uid) {
260 Versioned<Pod> pod = podStore.remove(uid);
261 if (pod == null) {
262 final String error = uid + ERR_NOT_FOUND;
263 throw new IllegalArgumentException(error);
264 }
265 return pod.value();
266 }
267
268 @Override
269 public Pod pod(String uid) {
270 return podStore.asJavaMap().get(uid);
271 }
272
273 @Override
274 public Set<Pod> pods() {
275 return ImmutableSet.copyOf(podStore.asJavaMap().values());
276 }
277
278 @Override
279 public void clear() {
280 podStore.clear();
281 }
282
283 private class K8sPodMapListener implements MapEventListener<String, Pod> {
284
285 @Override
286 public void event(MapEvent<String, Pod> event) {
287
288 switch (event.type()) {
289 case INSERT:
290 log.debug("Kubernetes pod created {}", event.newValue());
291 eventExecutor.execute(() ->
292 notifyDelegate(new K8sPodEvent(
293 K8S_POD_CREATED, event.newValue().value())));
294 break;
295 case UPDATE:
296 log.debug("Kubernetes pod updated {}", event.newValue());
Jian Li5c755832019-04-11 23:24:28 +0900297 eventExecutor.execute(() -> processUpdate(event));
Jian Li747e1362019-02-19 22:59:46 +0900298 break;
299 case REMOVE:
300 log.debug("Kubernetes pod removed {}", event.oldValue());
301 eventExecutor.execute(() ->
302 notifyDelegate(new K8sPodEvent(
303 K8S_POD_REMOVED, event.oldValue().value())));
304 break;
305 default:
306 // do nothing
307 break;
308 }
309 }
Jian Li5c755832019-04-11 23:24:28 +0900310
311 private void processUpdate(MapEvent<String, Pod> event) {
312 notifyDelegate(new K8sPodEvent(
313 K8S_POD_UPDATED, event.newValue().value()));
314
315 String oldPhase = event.oldValue().value().getStatus().getPhase();
316 String newPhase = event.newValue().value().getStatus().getPhase();
317
318 if (!PENDING.equals(oldPhase) && PENDING.equals(newPhase)) {
319 notifyDelegate(new K8sPodEvent(
320 K8S_POD_PENDING, event.newValue().value()));
321 }
322
323 if (!RUNNING.equals(oldPhase) && RUNNING.equals(newPhase)) {
324 notifyDelegate(new K8sPodEvent(
325 K8S_POD_RUNNING, event.newValue().value()));
326 }
327
328 if (!SUCCEEDED.equals(oldPhase) && SUCCEEDED.equals(newPhase)) {
329 notifyDelegate(new K8sPodEvent(
330 K8S_POD_SUCCEEDED, event.newValue().value()));
331 }
332
333 if (!FAILED.equals(oldPhase) && FAILED.equals(newPhase)) {
334 notifyDelegate(new K8sPodEvent(
335 K8S_POD_FAILED, event.newValue().value()));
336 }
337
338 if (!UNKNOWN.equals(oldPhase) && UNKNOWN.equals(newPhase)) {
339 notifyDelegate(new K8sPodEvent(
340 K8S_POD_UNKNOWN, event.newValue().value()));
341 }
342
343 if (!COMPLETED.equals(oldPhase) && COMPLETED.equals(newPhase)) {
344 notifyDelegate(new K8sPodEvent(
345 K8S_POD_COMPLETED, event.newValue().value()));
346 }
347
348 if (!CRASH_LOOP_BACK_OFF.equals(oldPhase) && CRASH_LOOP_BACK_OFF.equals(newPhase)) {
349 notifyDelegate(new K8sPodEvent(
350 K8S_POD_CRASH_LOOP_BACK_OFF, event.newValue().value()));
351 }
352
353 Map<String, String> oldAnnot = event.oldValue().value().getMetadata().getAnnotations();
354 Map<String, String> newAnnot = event.newValue().value().getMetadata().getAnnotations();
355
356 if (oldAnnot == null && newAnnot != null) {
357 notifyDelegate(new K8sPodEvent(
358 K8S_POD_ANNOTATION_ADDED, event.newValue().value()));
359 }
360 }
Jian Li747e1362019-02-19 22:59:46 +0900361 }
362}