blob: 53d9d18f64ddc30096e8e079eed308ce690a9f7b [file] [log] [blame]
Jian Li747e1362019-02-19 22:59:46 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
Jian Li4a7ce672019-04-09 15:20:25 +090019import io.fabric8.kubernetes.api.model.Affinity;
Jian Lib1cd0b02019-02-21 16:41:40 +090020import io.fabric8.kubernetes.api.model.Capabilities;
21import io.fabric8.kubernetes.api.model.ConfigMapEnvSource;
22import io.fabric8.kubernetes.api.model.ConfigMapKeySelector;
23import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource;
24import io.fabric8.kubernetes.api.model.Container;
25import io.fabric8.kubernetes.api.model.ContainerPort;
26import io.fabric8.kubernetes.api.model.ContainerState;
27import io.fabric8.kubernetes.api.model.ContainerStateRunning;
28import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
29import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
30import io.fabric8.kubernetes.api.model.ContainerStatus;
31import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource;
32import io.fabric8.kubernetes.api.model.EnvFromSource;
33import io.fabric8.kubernetes.api.model.EnvVar;
34import io.fabric8.kubernetes.api.model.EnvVarSource;
35import io.fabric8.kubernetes.api.model.ExecAction;
Jian Li096262a2020-09-22 23:25:06 +090036import io.fabric8.kubernetes.api.model.FieldsV1;
Jian Lib1cd0b02019-02-21 16:41:40 +090037import io.fabric8.kubernetes.api.model.HTTPGetAction;
38import io.fabric8.kubernetes.api.model.HTTPHeader;
39import io.fabric8.kubernetes.api.model.HostPathVolumeSource;
40import io.fabric8.kubernetes.api.model.IntOrString;
41import io.fabric8.kubernetes.api.model.KeyToPath;
Jian Li4a7ce672019-04-09 15:20:25 +090042import io.fabric8.kubernetes.api.model.LabelSelector;
43import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
Jian Lib1cd0b02019-02-21 16:41:40 +090044import io.fabric8.kubernetes.api.model.Lifecycle;
Jian Li096262a2020-09-22 23:25:06 +090045import io.fabric8.kubernetes.api.model.ManagedFieldsEntry;
Jian Li4a7ce672019-04-09 15:20:25 +090046import io.fabric8.kubernetes.api.model.NodeAffinity;
47import io.fabric8.kubernetes.api.model.NodeSelector;
48import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
49import io.fabric8.kubernetes.api.model.NodeSelectorTerm;
Jian Lib1cd0b02019-02-21 16:41:40 +090050import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
Jian Li747e1362019-02-19 22:59:46 +090051import io.fabric8.kubernetes.api.model.ObjectMeta;
Jian Lib1cd0b02019-02-21 16:41:40 +090052import io.fabric8.kubernetes.api.model.OwnerReference;
53import io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSource;
Jian Li747e1362019-02-19 22:59:46 +090054import io.fabric8.kubernetes.api.model.Pod;
Jian Li4a7ce672019-04-09 15:20:25 +090055import io.fabric8.kubernetes.api.model.PodAffinity;
56import io.fabric8.kubernetes.api.model.PodAffinityTerm;
57import io.fabric8.kubernetes.api.model.PodAntiAffinity;
Jian Lib1cd0b02019-02-21 16:41:40 +090058import io.fabric8.kubernetes.api.model.PodCondition;
Jian Li14493892020-01-20 10:23:33 +090059import io.fabric8.kubernetes.api.model.PodIP;
Jian Lib1cd0b02019-02-21 16:41:40 +090060import io.fabric8.kubernetes.api.model.PodSecurityContext;
Jian Li747e1362019-02-19 22:59:46 +090061import io.fabric8.kubernetes.api.model.PodSpec;
62import io.fabric8.kubernetes.api.model.PodStatus;
Jian Li4a7ce672019-04-09 15:20:25 +090063import io.fabric8.kubernetes.api.model.PreferredSchedulingTerm;
Jian Lib1cd0b02019-02-21 16:41:40 +090064import io.fabric8.kubernetes.api.model.Probe;
65import io.fabric8.kubernetes.api.model.Quantity;
66import io.fabric8.kubernetes.api.model.ResourceFieldSelector;
67import io.fabric8.kubernetes.api.model.ResourceRequirements;
68import io.fabric8.kubernetes.api.model.SELinuxOptions;
69import io.fabric8.kubernetes.api.model.SecretEnvSource;
70import io.fabric8.kubernetes.api.model.SecretKeySelector;
71import io.fabric8.kubernetes.api.model.SecretVolumeSource;
72import io.fabric8.kubernetes.api.model.SecurityContext;
73import io.fabric8.kubernetes.api.model.TCPSocketAction;
74import io.fabric8.kubernetes.api.model.Toleration;
75import io.fabric8.kubernetes.api.model.Volume;
76import io.fabric8.kubernetes.api.model.VolumeDevice;
77import io.fabric8.kubernetes.api.model.VolumeMount;
Jian Li4a7ce672019-04-09 15:20:25 +090078import io.fabric8.kubernetes.api.model.WeightedPodAffinityTerm;
Jian Li747e1362019-02-19 22:59:46 +090079import org.onlab.util.KryoNamespace;
80import org.onosproject.core.ApplicationId;
81import org.onosproject.core.CoreService;
82import org.onosproject.k8snetworking.api.K8sPodEvent;
83import org.onosproject.k8snetworking.api.K8sPodStore;
84import org.onosproject.k8snetworking.api.K8sPodStoreDelegate;
85import org.onosproject.store.AbstractStore;
86import org.onosproject.store.serializers.KryoNamespaces;
87import org.onosproject.store.service.ConsistentMap;
88import org.onosproject.store.service.MapEvent;
89import org.onosproject.store.service.MapEventListener;
90import org.onosproject.store.service.Serializer;
91import org.onosproject.store.service.StorageService;
92import org.onosproject.store.service.Versioned;
93import org.osgi.service.component.annotations.Activate;
94import org.osgi.service.component.annotations.Component;
95import org.osgi.service.component.annotations.Deactivate;
96import org.osgi.service.component.annotations.Reference;
97import org.osgi.service.component.annotations.ReferenceCardinality;
98import org.slf4j.Logger;
99
100import java.util.Collection;
Jian Lib1cd0b02019-02-21 16:41:40 +0900101import java.util.LinkedHashMap;
Jian Li5c755832019-04-11 23:24:28 +0900102import java.util.Map;
Jian Li747e1362019-02-19 22:59:46 +0900103import java.util.Set;
104import java.util.concurrent.ExecutorService;
105
106import static com.google.common.base.Preconditions.checkArgument;
107import static java.util.concurrent.Executors.newSingleThreadExecutor;
108import static org.onlab.util.Tools.groupedThreads;
Jian Li5c755832019-04-11 23:24:28 +0900109import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_ANNOTATION_ADDED;
110import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_COMPLETED;
111import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_CRASH_LOOP_BACK_OFF;
Jian Li747e1362019-02-19 22:59:46 +0900112import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_CREATED;
Jian Li5c755832019-04-11 23:24:28 +0900113import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_FAILED;
114import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_PENDING;
Jian Li747e1362019-02-19 22:59:46 +0900115import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_REMOVED;
Jian Li5c755832019-04-11 23:24:28 +0900116import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_RUNNING;
117import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_SUCCEEDED;
118import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_UNKNOWN;
Jian Li747e1362019-02-19 22:59:46 +0900119import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_UPDATED;
120import static org.slf4j.LoggerFactory.getLogger;
121
122/**
123 * Implementation of kubernetes pod store using consistent map.
124 */
125@Component(immediate = true, service = K8sPodStore.class)
126public class DistributedK8sPodStore
127 extends AbstractStore<K8sPodEvent, K8sPodStoreDelegate>
128 implements K8sPodStore {
129
130 private final Logger log = getLogger(getClass());
131
132 private static final String ERR_NOT_FOUND = " does not exist";
133 private static final String ERR_DUPLICATE = " already exists";
134 private static final String APP_ID = "org.onosproject.k8snetwork";
135
Jian Li5c755832019-04-11 23:24:28 +0900136 private static final String PENDING = "Pending";
137 private static final String RUNNING = "Running";
138 private static final String SUCCEEDED = "Succeeded";
139 private static final String FAILED = "Failed";
140 private static final String UNKNOWN = "Unknown";
141 private static final String COMPLETED = "Completed";
142 private static final String CRASH_LOOP_BACK_OFF = "CrashLoopBackOff";
143
Jian Li747e1362019-02-19 22:59:46 +0900144 private static final KryoNamespace
145 SERIALIZER_K8S_POD = KryoNamespace.newBuilder()
146 .register(KryoNamespaces.API)
147 .register(Pod.class)
148 .register(ObjectMeta.class)
149 .register(PodSpec.class)
150 .register(PodStatus.class)
Jian Lib1cd0b02019-02-21 16:41:40 +0900151 .register(PodCondition.class)
Jian Li14493892020-01-20 10:23:33 +0900152 .register(PodIP.class)
Jian Lib1cd0b02019-02-21 16:41:40 +0900153 .register(Container.class)
154 .register(EnvVar.class)
155 .register(EnvVarSource.class)
156 .register(EnvFromSource.class)
157 .register(ConfigMapEnvSource.class)
158 .register(ConfigMapKeySelector.class)
159 .register(ObjectFieldSelector.class)
160 .register(ResourceFieldSelector.class)
161 .register(SecretKeySelector.class)
162 .register(Lifecycle.class)
163 .register(SecretEnvSource.class)
164 .register(ContainerStatus.class)
165 .register(ContainerState.class)
166 .register(ContainerStateRunning.class)
167 .register(ContainerStateTerminated.class)
168 .register(ContainerStateWaiting.class)
169 .register(OwnerReference.class)
170 .register(Probe.class)
171 .register(ExecAction.class)
172 .register(HTTPGetAction.class)
173 .register(HTTPHeader.class)
174 .register(TCPSocketAction.class)
175 .register(ContainerPort.class)
176 .register(ResourceRequirements.class)
177 .register(SecurityContext.class)
178 .register(PodSecurityContext.class)
179 .register(SELinuxOptions.class)
180 .register(Volume.class)
181 .register(VolumeDevice.class)
182 .register(VolumeMount.class)
183 .register(IntOrString.class)
184 .register(Toleration.class)
185 .register(PersistentVolumeClaimVolumeSource.class)
186 .register(SecretVolumeSource.class)
187 .register(EmptyDirVolumeSource.class)
188 .register(Quantity.class)
189 .register(Capabilities.class)
190 .register(ConfigMapVolumeSource.class)
191 .register(KeyToPath.class)
192 .register(HostPathVolumeSource.class)
Jian Li4a7ce672019-04-09 15:20:25 +0900193 .register(Affinity.class)
194 .register(NodeAffinity.class)
195 .register(NodeSelector.class)
196 .register(NodeSelectorTerm.class)
197 .register(NodeSelectorRequirement.class)
198 .register(PreferredSchedulingTerm.class)
199 .register(PodAffinity.class)
200 .register(WeightedPodAffinityTerm.class)
201 .register(PodAffinityTerm.class)
202 .register(LabelSelector.class)
203 .register(LabelSelectorRequirement.class)
204 .register(PodAntiAffinity.class)
Jian Li096262a2020-09-22 23:25:06 +0900205 .register(ManagedFieldsEntry.class)
206 .register(FieldsV1.class)
Jian Lib1cd0b02019-02-21 16:41:40 +0900207 .register(LinkedHashMap.class)
Jian Li747e1362019-02-19 22:59:46 +0900208 .register(Collection.class)
209 .build();
210
211 @Reference(cardinality = ReferenceCardinality.MANDATORY)
212 protected CoreService coreService;
213
214 @Reference(cardinality = ReferenceCardinality.MANDATORY)
215 protected StorageService storageService;
216
217 private final ExecutorService eventExecutor = newSingleThreadExecutor(
218 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
219
220 private final MapEventListener<String, Pod> podMapListener = new K8sPodMapListener();
221
222 private ConsistentMap<String, Pod> podStore;
223
224 @Activate
225 protected void activate() {
226 ApplicationId appId = coreService.registerApplication(APP_ID);
227 podStore = storageService.<String, Pod>consistentMapBuilder()
228 .withSerializer(Serializer.using(SERIALIZER_K8S_POD))
229 .withName("k8s-pod-store")
230 .withApplicationId(appId)
231 .build();
232
233 podStore.addListener(podMapListener);
234 log.info("Started");
235 }
236
237 @Deactivate
238 protected void deactivate() {
239 podStore.removeListener(podMapListener);
240 eventExecutor.shutdown();
241 log.info("Stopped");
242 }
243
244 @Override
245 public void createPod(Pod pod) {
246 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
247 final String error = pod.getMetadata().getUid() + ERR_DUPLICATE;
248 checkArgument(existing == null, error);
249 return pod;
250 });
251 }
252
253 @Override
254 public void updatePod(Pod pod) {
255 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
256 final String error = pod.getMetadata().getUid() + ERR_NOT_FOUND;
257 checkArgument(existing != null, error);
258 return pod;
259 });
260 }
261
262 @Override
263 public Pod removePod(String uid) {
264 Versioned<Pod> pod = podStore.remove(uid);
265 if (pod == null) {
266 final String error = uid + ERR_NOT_FOUND;
267 throw new IllegalArgumentException(error);
268 }
269 return pod.value();
270 }
271
272 @Override
273 public Pod pod(String uid) {
274 return podStore.asJavaMap().get(uid);
275 }
276
277 @Override
278 public Set<Pod> pods() {
279 return ImmutableSet.copyOf(podStore.asJavaMap().values());
280 }
281
282 @Override
283 public void clear() {
284 podStore.clear();
285 }
286
287 private class K8sPodMapListener implements MapEventListener<String, Pod> {
288
289 @Override
290 public void event(MapEvent<String, Pod> event) {
291
292 switch (event.type()) {
293 case INSERT:
294 log.debug("Kubernetes pod created {}", event.newValue());
295 eventExecutor.execute(() ->
296 notifyDelegate(new K8sPodEvent(
297 K8S_POD_CREATED, event.newValue().value())));
298 break;
299 case UPDATE:
300 log.debug("Kubernetes pod updated {}", event.newValue());
Jian Li5c755832019-04-11 23:24:28 +0900301 eventExecutor.execute(() -> processUpdate(event));
Jian Li747e1362019-02-19 22:59:46 +0900302 break;
303 case REMOVE:
304 log.debug("Kubernetes pod removed {}", event.oldValue());
305 eventExecutor.execute(() ->
306 notifyDelegate(new K8sPodEvent(
307 K8S_POD_REMOVED, event.oldValue().value())));
308 break;
309 default:
310 // do nothing
311 break;
312 }
313 }
Jian Li5c755832019-04-11 23:24:28 +0900314
315 private void processUpdate(MapEvent<String, Pod> event) {
316 notifyDelegate(new K8sPodEvent(
317 K8S_POD_UPDATED, event.newValue().value()));
318
319 String oldPhase = event.oldValue().value().getStatus().getPhase();
320 String newPhase = event.newValue().value().getStatus().getPhase();
321
322 if (!PENDING.equals(oldPhase) && PENDING.equals(newPhase)) {
323 notifyDelegate(new K8sPodEvent(
324 K8S_POD_PENDING, event.newValue().value()));
325 }
326
327 if (!RUNNING.equals(oldPhase) && RUNNING.equals(newPhase)) {
328 notifyDelegate(new K8sPodEvent(
329 K8S_POD_RUNNING, event.newValue().value()));
330 }
331
332 if (!SUCCEEDED.equals(oldPhase) && SUCCEEDED.equals(newPhase)) {
333 notifyDelegate(new K8sPodEvent(
334 K8S_POD_SUCCEEDED, event.newValue().value()));
335 }
336
337 if (!FAILED.equals(oldPhase) && FAILED.equals(newPhase)) {
338 notifyDelegate(new K8sPodEvent(
339 K8S_POD_FAILED, event.newValue().value()));
340 }
341
342 if (!UNKNOWN.equals(oldPhase) && UNKNOWN.equals(newPhase)) {
343 notifyDelegate(new K8sPodEvent(
344 K8S_POD_UNKNOWN, event.newValue().value()));
345 }
346
347 if (!COMPLETED.equals(oldPhase) && COMPLETED.equals(newPhase)) {
348 notifyDelegate(new K8sPodEvent(
349 K8S_POD_COMPLETED, event.newValue().value()));
350 }
351
352 if (!CRASH_LOOP_BACK_OFF.equals(oldPhase) && CRASH_LOOP_BACK_OFF.equals(newPhase)) {
353 notifyDelegate(new K8sPodEvent(
354 K8S_POD_CRASH_LOOP_BACK_OFF, event.newValue().value()));
355 }
356
357 Map<String, String> oldAnnot = event.oldValue().value().getMetadata().getAnnotations();
358 Map<String, String> newAnnot = event.newValue().value().getMetadata().getAnnotations();
359
360 if (oldAnnot == null && newAnnot != null) {
361 notifyDelegate(new K8sPodEvent(
362 K8S_POD_ANNOTATION_ADDED, event.newValue().value()));
363 }
364 }
Jian Li747e1362019-02-19 22:59:46 +0900365 }
366}