blob: 65921dc36901c1957e220dadd606e44419d14d4d [file] [log] [blame]
Jian Li747e1362019-02-19 22:59:46 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
Jian Li4a7ce672019-04-09 15:20:25 +090019import io.fabric8.kubernetes.api.model.Affinity;
Jian Lib1cd0b02019-02-21 16:41:40 +090020import io.fabric8.kubernetes.api.model.Capabilities;
21import io.fabric8.kubernetes.api.model.ConfigMapEnvSource;
22import io.fabric8.kubernetes.api.model.ConfigMapKeySelector;
23import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource;
24import io.fabric8.kubernetes.api.model.Container;
25import io.fabric8.kubernetes.api.model.ContainerPort;
26import io.fabric8.kubernetes.api.model.ContainerState;
27import io.fabric8.kubernetes.api.model.ContainerStateRunning;
28import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
29import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
30import io.fabric8.kubernetes.api.model.ContainerStatus;
31import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource;
32import io.fabric8.kubernetes.api.model.EnvFromSource;
33import io.fabric8.kubernetes.api.model.EnvVar;
34import io.fabric8.kubernetes.api.model.EnvVarSource;
35import io.fabric8.kubernetes.api.model.ExecAction;
36import io.fabric8.kubernetes.api.model.HTTPGetAction;
37import io.fabric8.kubernetes.api.model.HTTPHeader;
38import io.fabric8.kubernetes.api.model.HostPathVolumeSource;
39import io.fabric8.kubernetes.api.model.IntOrString;
40import io.fabric8.kubernetes.api.model.KeyToPath;
Jian Li4a7ce672019-04-09 15:20:25 +090041import io.fabric8.kubernetes.api.model.LabelSelector;
42import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
Jian Lib1cd0b02019-02-21 16:41:40 +090043import io.fabric8.kubernetes.api.model.Lifecycle;
Jian Li4a7ce672019-04-09 15:20:25 +090044import io.fabric8.kubernetes.api.model.NodeAffinity;
45import io.fabric8.kubernetes.api.model.NodeSelector;
46import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
47import io.fabric8.kubernetes.api.model.NodeSelectorTerm;
Jian Lib1cd0b02019-02-21 16:41:40 +090048import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
Jian Li747e1362019-02-19 22:59:46 +090049import io.fabric8.kubernetes.api.model.ObjectMeta;
Jian Lib1cd0b02019-02-21 16:41:40 +090050import io.fabric8.kubernetes.api.model.OwnerReference;
51import io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSource;
Jian Li747e1362019-02-19 22:59:46 +090052import io.fabric8.kubernetes.api.model.Pod;
Jian Li4a7ce672019-04-09 15:20:25 +090053import io.fabric8.kubernetes.api.model.PodAffinity;
54import io.fabric8.kubernetes.api.model.PodAffinityTerm;
55import io.fabric8.kubernetes.api.model.PodAntiAffinity;
Jian Lib1cd0b02019-02-21 16:41:40 +090056import io.fabric8.kubernetes.api.model.PodCondition;
57import io.fabric8.kubernetes.api.model.PodSecurityContext;
Jian Li747e1362019-02-19 22:59:46 +090058import io.fabric8.kubernetes.api.model.PodSpec;
59import io.fabric8.kubernetes.api.model.PodStatus;
Jian Li4a7ce672019-04-09 15:20:25 +090060import io.fabric8.kubernetes.api.model.PreferredSchedulingTerm;
Jian Lib1cd0b02019-02-21 16:41:40 +090061import io.fabric8.kubernetes.api.model.Probe;
62import io.fabric8.kubernetes.api.model.Quantity;
63import io.fabric8.kubernetes.api.model.ResourceFieldSelector;
64import io.fabric8.kubernetes.api.model.ResourceRequirements;
65import io.fabric8.kubernetes.api.model.SELinuxOptions;
66import io.fabric8.kubernetes.api.model.SecretEnvSource;
67import io.fabric8.kubernetes.api.model.SecretKeySelector;
68import io.fabric8.kubernetes.api.model.SecretVolumeSource;
69import io.fabric8.kubernetes.api.model.SecurityContext;
70import io.fabric8.kubernetes.api.model.TCPSocketAction;
71import io.fabric8.kubernetes.api.model.Toleration;
72import io.fabric8.kubernetes.api.model.Volume;
73import io.fabric8.kubernetes.api.model.VolumeDevice;
74import io.fabric8.kubernetes.api.model.VolumeMount;
Jian Li4a7ce672019-04-09 15:20:25 +090075import io.fabric8.kubernetes.api.model.WeightedPodAffinityTerm;
Jian Li747e1362019-02-19 22:59:46 +090076import org.onlab.util.KryoNamespace;
77import org.onosproject.core.ApplicationId;
78import org.onosproject.core.CoreService;
79import org.onosproject.k8snetworking.api.K8sPodEvent;
80import org.onosproject.k8snetworking.api.K8sPodStore;
81import org.onosproject.k8snetworking.api.K8sPodStoreDelegate;
82import org.onosproject.store.AbstractStore;
83import org.onosproject.store.serializers.KryoNamespaces;
84import org.onosproject.store.service.ConsistentMap;
85import org.onosproject.store.service.MapEvent;
86import org.onosproject.store.service.MapEventListener;
87import org.onosproject.store.service.Serializer;
88import org.onosproject.store.service.StorageService;
89import org.onosproject.store.service.Versioned;
90import org.osgi.service.component.annotations.Activate;
91import org.osgi.service.component.annotations.Component;
92import org.osgi.service.component.annotations.Deactivate;
93import org.osgi.service.component.annotations.Reference;
94import org.osgi.service.component.annotations.ReferenceCardinality;
95import org.slf4j.Logger;
96
97import java.util.Collection;
Jian Lib1cd0b02019-02-21 16:41:40 +090098import java.util.LinkedHashMap;
Jian Li5c755832019-04-11 23:24:28 +090099import java.util.Map;
Jian Li747e1362019-02-19 22:59:46 +0900100import java.util.Set;
101import java.util.concurrent.ExecutorService;
102
103import static com.google.common.base.Preconditions.checkArgument;
104import static java.util.concurrent.Executors.newSingleThreadExecutor;
105import static org.onlab.util.Tools.groupedThreads;
Jian Li5c755832019-04-11 23:24:28 +0900106import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_ANNOTATION_ADDED;
107import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_COMPLETED;
108import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_CRASH_LOOP_BACK_OFF;
Jian Li747e1362019-02-19 22:59:46 +0900109import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_CREATED;
Jian Li5c755832019-04-11 23:24:28 +0900110import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_FAILED;
111import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_PENDING;
Jian Li747e1362019-02-19 22:59:46 +0900112import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_REMOVED;
Jian Li5c755832019-04-11 23:24:28 +0900113import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_RUNNING;
114import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_SUCCEEDED;
115import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_UNKNOWN;
Jian Li747e1362019-02-19 22:59:46 +0900116import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_UPDATED;
117import static org.slf4j.LoggerFactory.getLogger;
118
119/**
120 * Implementation of kubernetes pod store using consistent map.
121 */
122@Component(immediate = true, service = K8sPodStore.class)
123public class DistributedK8sPodStore
124 extends AbstractStore<K8sPodEvent, K8sPodStoreDelegate>
125 implements K8sPodStore {
126
127 private final Logger log = getLogger(getClass());
128
129 private static final String ERR_NOT_FOUND = " does not exist";
130 private static final String ERR_DUPLICATE = " already exists";
131 private static final String APP_ID = "org.onosproject.k8snetwork";
132
Jian Li5c755832019-04-11 23:24:28 +0900133 private static final String PENDING = "Pending";
134 private static final String RUNNING = "Running";
135 private static final String SUCCEEDED = "Succeeded";
136 private static final String FAILED = "Failed";
137 private static final String UNKNOWN = "Unknown";
138 private static final String COMPLETED = "Completed";
139 private static final String CRASH_LOOP_BACK_OFF = "CrashLoopBackOff";
140
Jian Li747e1362019-02-19 22:59:46 +0900141 private static final KryoNamespace
142 SERIALIZER_K8S_POD = KryoNamespace.newBuilder()
143 .register(KryoNamespaces.API)
144 .register(Pod.class)
145 .register(ObjectMeta.class)
146 .register(PodSpec.class)
147 .register(PodStatus.class)
Jian Lib1cd0b02019-02-21 16:41:40 +0900148 .register(PodCondition.class)
149 .register(Container.class)
150 .register(EnvVar.class)
151 .register(EnvVarSource.class)
152 .register(EnvFromSource.class)
153 .register(ConfigMapEnvSource.class)
154 .register(ConfigMapKeySelector.class)
155 .register(ObjectFieldSelector.class)
156 .register(ResourceFieldSelector.class)
157 .register(SecretKeySelector.class)
158 .register(Lifecycle.class)
159 .register(SecretEnvSource.class)
160 .register(ContainerStatus.class)
161 .register(ContainerState.class)
162 .register(ContainerStateRunning.class)
163 .register(ContainerStateTerminated.class)
164 .register(ContainerStateWaiting.class)
165 .register(OwnerReference.class)
166 .register(Probe.class)
167 .register(ExecAction.class)
168 .register(HTTPGetAction.class)
169 .register(HTTPHeader.class)
170 .register(TCPSocketAction.class)
171 .register(ContainerPort.class)
172 .register(ResourceRequirements.class)
173 .register(SecurityContext.class)
174 .register(PodSecurityContext.class)
175 .register(SELinuxOptions.class)
176 .register(Volume.class)
177 .register(VolumeDevice.class)
178 .register(VolumeMount.class)
179 .register(IntOrString.class)
180 .register(Toleration.class)
181 .register(PersistentVolumeClaimVolumeSource.class)
182 .register(SecretVolumeSource.class)
183 .register(EmptyDirVolumeSource.class)
184 .register(Quantity.class)
185 .register(Capabilities.class)
186 .register(ConfigMapVolumeSource.class)
187 .register(KeyToPath.class)
188 .register(HostPathVolumeSource.class)
Jian Li4a7ce672019-04-09 15:20:25 +0900189 .register(Affinity.class)
190 .register(NodeAffinity.class)
191 .register(NodeSelector.class)
192 .register(NodeSelectorTerm.class)
193 .register(NodeSelectorRequirement.class)
194 .register(PreferredSchedulingTerm.class)
195 .register(PodAffinity.class)
196 .register(WeightedPodAffinityTerm.class)
197 .register(PodAffinityTerm.class)
198 .register(LabelSelector.class)
199 .register(LabelSelectorRequirement.class)
200 .register(PodAntiAffinity.class)
Jian Lib1cd0b02019-02-21 16:41:40 +0900201 .register(LinkedHashMap.class)
Jian Li747e1362019-02-19 22:59:46 +0900202 .register(Collection.class)
203 .build();
204
205 @Reference(cardinality = ReferenceCardinality.MANDATORY)
206 protected CoreService coreService;
207
208 @Reference(cardinality = ReferenceCardinality.MANDATORY)
209 protected StorageService storageService;
210
211 private final ExecutorService eventExecutor = newSingleThreadExecutor(
212 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
213
214 private final MapEventListener<String, Pod> podMapListener = new K8sPodMapListener();
215
216 private ConsistentMap<String, Pod> podStore;
217
218 @Activate
219 protected void activate() {
220 ApplicationId appId = coreService.registerApplication(APP_ID);
221 podStore = storageService.<String, Pod>consistentMapBuilder()
222 .withSerializer(Serializer.using(SERIALIZER_K8S_POD))
223 .withName("k8s-pod-store")
224 .withApplicationId(appId)
225 .build();
226
227 podStore.addListener(podMapListener);
228 log.info("Started");
229 }
230
231 @Deactivate
232 protected void deactivate() {
233 podStore.removeListener(podMapListener);
234 eventExecutor.shutdown();
235 log.info("Stopped");
236 }
237
238 @Override
239 public void createPod(Pod pod) {
240 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
241 final String error = pod.getMetadata().getUid() + ERR_DUPLICATE;
242 checkArgument(existing == null, error);
243 return pod;
244 });
245 }
246
247 @Override
248 public void updatePod(Pod pod) {
249 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
250 final String error = pod.getMetadata().getUid() + ERR_NOT_FOUND;
251 checkArgument(existing != null, error);
252 return pod;
253 });
254 }
255
256 @Override
257 public Pod removePod(String uid) {
258 Versioned<Pod> pod = podStore.remove(uid);
259 if (pod == null) {
260 final String error = uid + ERR_NOT_FOUND;
261 throw new IllegalArgumentException(error);
262 }
263 return pod.value();
264 }
265
266 @Override
267 public Pod pod(String uid) {
268 return podStore.asJavaMap().get(uid);
269 }
270
271 @Override
272 public Set<Pod> pods() {
273 return ImmutableSet.copyOf(podStore.asJavaMap().values());
274 }
275
276 @Override
277 public void clear() {
278 podStore.clear();
279 }
280
281 private class K8sPodMapListener implements MapEventListener<String, Pod> {
282
283 @Override
284 public void event(MapEvent<String, Pod> event) {
285
286 switch (event.type()) {
287 case INSERT:
288 log.debug("Kubernetes pod created {}", event.newValue());
289 eventExecutor.execute(() ->
290 notifyDelegate(new K8sPodEvent(
291 K8S_POD_CREATED, event.newValue().value())));
292 break;
293 case UPDATE:
294 log.debug("Kubernetes pod updated {}", event.newValue());
Jian Li5c755832019-04-11 23:24:28 +0900295 eventExecutor.execute(() -> processUpdate(event));
Jian Li747e1362019-02-19 22:59:46 +0900296 break;
297 case REMOVE:
298 log.debug("Kubernetes pod removed {}", event.oldValue());
299 eventExecutor.execute(() ->
300 notifyDelegate(new K8sPodEvent(
301 K8S_POD_REMOVED, event.oldValue().value())));
302 break;
303 default:
304 // do nothing
305 break;
306 }
307 }
Jian Li5c755832019-04-11 23:24:28 +0900308
309 private void processUpdate(MapEvent<String, Pod> event) {
310 notifyDelegate(new K8sPodEvent(
311 K8S_POD_UPDATED, event.newValue().value()));
312
313 String oldPhase = event.oldValue().value().getStatus().getPhase();
314 String newPhase = event.newValue().value().getStatus().getPhase();
315
316 if (!PENDING.equals(oldPhase) && PENDING.equals(newPhase)) {
317 notifyDelegate(new K8sPodEvent(
318 K8S_POD_PENDING, event.newValue().value()));
319 }
320
321 if (!RUNNING.equals(oldPhase) && RUNNING.equals(newPhase)) {
322 notifyDelegate(new K8sPodEvent(
323 K8S_POD_RUNNING, event.newValue().value()));
324 }
325
326 if (!SUCCEEDED.equals(oldPhase) && SUCCEEDED.equals(newPhase)) {
327 notifyDelegate(new K8sPodEvent(
328 K8S_POD_SUCCEEDED, event.newValue().value()));
329 }
330
331 if (!FAILED.equals(oldPhase) && FAILED.equals(newPhase)) {
332 notifyDelegate(new K8sPodEvent(
333 K8S_POD_FAILED, event.newValue().value()));
334 }
335
336 if (!UNKNOWN.equals(oldPhase) && UNKNOWN.equals(newPhase)) {
337 notifyDelegate(new K8sPodEvent(
338 K8S_POD_UNKNOWN, event.newValue().value()));
339 }
340
341 if (!COMPLETED.equals(oldPhase) && COMPLETED.equals(newPhase)) {
342 notifyDelegate(new K8sPodEvent(
343 K8S_POD_COMPLETED, event.newValue().value()));
344 }
345
346 if (!CRASH_LOOP_BACK_OFF.equals(oldPhase) && CRASH_LOOP_BACK_OFF.equals(newPhase)) {
347 notifyDelegate(new K8sPodEvent(
348 K8S_POD_CRASH_LOOP_BACK_OFF, event.newValue().value()));
349 }
350
351 Map<String, String> oldAnnot = event.oldValue().value().getMetadata().getAnnotations();
352 Map<String, String> newAnnot = event.newValue().value().getMetadata().getAnnotations();
353
354 if (oldAnnot == null && newAnnot != null) {
355 notifyDelegate(new K8sPodEvent(
356 K8S_POD_ANNOTATION_ADDED, event.newValue().value()));
357 }
358 }
Jian Li747e1362019-02-19 22:59:46 +0900359 }
360}