blob: 138555ba6c5ca1c7c8868ae18459b27ac984042e [file] [log] [blame]
Jian Li747e1362019-02-19 22:59:46 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
Jian Li4a7ce672019-04-09 15:20:25 +090019import io.fabric8.kubernetes.api.model.Affinity;
Jian Lib1cd0b02019-02-21 16:41:40 +090020import io.fabric8.kubernetes.api.model.Capabilities;
21import io.fabric8.kubernetes.api.model.ConfigMapEnvSource;
22import io.fabric8.kubernetes.api.model.ConfigMapKeySelector;
23import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource;
24import io.fabric8.kubernetes.api.model.Container;
25import io.fabric8.kubernetes.api.model.ContainerPort;
26import io.fabric8.kubernetes.api.model.ContainerState;
27import io.fabric8.kubernetes.api.model.ContainerStateRunning;
28import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
29import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
30import io.fabric8.kubernetes.api.model.ContainerStatus;
31import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource;
32import io.fabric8.kubernetes.api.model.EnvFromSource;
33import io.fabric8.kubernetes.api.model.EnvVar;
34import io.fabric8.kubernetes.api.model.EnvVarSource;
35import io.fabric8.kubernetes.api.model.ExecAction;
36import io.fabric8.kubernetes.api.model.HTTPGetAction;
37import io.fabric8.kubernetes.api.model.HTTPHeader;
38import io.fabric8.kubernetes.api.model.HostPathVolumeSource;
39import io.fabric8.kubernetes.api.model.IntOrString;
40import io.fabric8.kubernetes.api.model.KeyToPath;
Jian Li4a7ce672019-04-09 15:20:25 +090041import io.fabric8.kubernetes.api.model.LabelSelector;
42import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
Jian Lib1cd0b02019-02-21 16:41:40 +090043import io.fabric8.kubernetes.api.model.Lifecycle;
Jian Li4a7ce672019-04-09 15:20:25 +090044import io.fabric8.kubernetes.api.model.NodeAffinity;
45import io.fabric8.kubernetes.api.model.NodeSelector;
46import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
47import io.fabric8.kubernetes.api.model.NodeSelectorTerm;
Jian Lib1cd0b02019-02-21 16:41:40 +090048import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
Jian Li747e1362019-02-19 22:59:46 +090049import io.fabric8.kubernetes.api.model.ObjectMeta;
Jian Lib1cd0b02019-02-21 16:41:40 +090050import io.fabric8.kubernetes.api.model.OwnerReference;
51import io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSource;
Jian Li747e1362019-02-19 22:59:46 +090052import io.fabric8.kubernetes.api.model.Pod;
Jian Li4a7ce672019-04-09 15:20:25 +090053import io.fabric8.kubernetes.api.model.PodAffinity;
54import io.fabric8.kubernetes.api.model.PodAffinityTerm;
55import io.fabric8.kubernetes.api.model.PodAntiAffinity;
Jian Lib1cd0b02019-02-21 16:41:40 +090056import io.fabric8.kubernetes.api.model.PodCondition;
57import io.fabric8.kubernetes.api.model.PodSecurityContext;
Jian Li747e1362019-02-19 22:59:46 +090058import io.fabric8.kubernetes.api.model.PodSpec;
59import io.fabric8.kubernetes.api.model.PodStatus;
Jian Li4a7ce672019-04-09 15:20:25 +090060import io.fabric8.kubernetes.api.model.PreferredSchedulingTerm;
Jian Lib1cd0b02019-02-21 16:41:40 +090061import io.fabric8.kubernetes.api.model.Probe;
62import io.fabric8.kubernetes.api.model.Quantity;
63import io.fabric8.kubernetes.api.model.ResourceFieldSelector;
64import io.fabric8.kubernetes.api.model.ResourceRequirements;
65import io.fabric8.kubernetes.api.model.SELinuxOptions;
66import io.fabric8.kubernetes.api.model.SecretEnvSource;
67import io.fabric8.kubernetes.api.model.SecretKeySelector;
68import io.fabric8.kubernetes.api.model.SecretVolumeSource;
69import io.fabric8.kubernetes.api.model.SecurityContext;
70import io.fabric8.kubernetes.api.model.TCPSocketAction;
71import io.fabric8.kubernetes.api.model.Toleration;
72import io.fabric8.kubernetes.api.model.Volume;
73import io.fabric8.kubernetes.api.model.VolumeDevice;
74import io.fabric8.kubernetes.api.model.VolumeMount;
Jian Li4a7ce672019-04-09 15:20:25 +090075import io.fabric8.kubernetes.api.model.WeightedPodAffinityTerm;
Jian Li747e1362019-02-19 22:59:46 +090076import org.onlab.util.KryoNamespace;
77import org.onosproject.core.ApplicationId;
78import org.onosproject.core.CoreService;
79import org.onosproject.k8snetworking.api.K8sPodEvent;
80import org.onosproject.k8snetworking.api.K8sPodStore;
81import org.onosproject.k8snetworking.api.K8sPodStoreDelegate;
82import org.onosproject.store.AbstractStore;
83import org.onosproject.store.serializers.KryoNamespaces;
84import org.onosproject.store.service.ConsistentMap;
85import org.onosproject.store.service.MapEvent;
86import org.onosproject.store.service.MapEventListener;
87import org.onosproject.store.service.Serializer;
88import org.onosproject.store.service.StorageService;
89import org.onosproject.store.service.Versioned;
90import org.osgi.service.component.annotations.Activate;
91import org.osgi.service.component.annotations.Component;
92import org.osgi.service.component.annotations.Deactivate;
93import org.osgi.service.component.annotations.Reference;
94import org.osgi.service.component.annotations.ReferenceCardinality;
95import org.slf4j.Logger;
96
97import java.util.Collection;
Jian Lib1cd0b02019-02-21 16:41:40 +090098import java.util.LinkedHashMap;
Jian Li747e1362019-02-19 22:59:46 +090099import java.util.Set;
100import java.util.concurrent.ExecutorService;
101
102import static com.google.common.base.Preconditions.checkArgument;
103import static java.util.concurrent.Executors.newSingleThreadExecutor;
104import static org.onlab.util.Tools.groupedThreads;
105import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_CREATED;
106import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_REMOVED;
107import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_UPDATED;
108import static org.slf4j.LoggerFactory.getLogger;
109
110/**
111 * Implementation of kubernetes pod store using consistent map.
112 */
113@Component(immediate = true, service = K8sPodStore.class)
114public class DistributedK8sPodStore
115 extends AbstractStore<K8sPodEvent, K8sPodStoreDelegate>
116 implements K8sPodStore {
117
118 private final Logger log = getLogger(getClass());
119
120 private static final String ERR_NOT_FOUND = " does not exist";
121 private static final String ERR_DUPLICATE = " already exists";
122 private static final String APP_ID = "org.onosproject.k8snetwork";
123
124 private static final KryoNamespace
125 SERIALIZER_K8S_POD = KryoNamespace.newBuilder()
126 .register(KryoNamespaces.API)
127 .register(Pod.class)
128 .register(ObjectMeta.class)
129 .register(PodSpec.class)
130 .register(PodStatus.class)
Jian Lib1cd0b02019-02-21 16:41:40 +0900131 .register(PodCondition.class)
132 .register(Container.class)
133 .register(EnvVar.class)
134 .register(EnvVarSource.class)
135 .register(EnvFromSource.class)
136 .register(ConfigMapEnvSource.class)
137 .register(ConfigMapKeySelector.class)
138 .register(ObjectFieldSelector.class)
139 .register(ResourceFieldSelector.class)
140 .register(SecretKeySelector.class)
141 .register(Lifecycle.class)
142 .register(SecretEnvSource.class)
143 .register(ContainerStatus.class)
144 .register(ContainerState.class)
145 .register(ContainerStateRunning.class)
146 .register(ContainerStateTerminated.class)
147 .register(ContainerStateWaiting.class)
148 .register(OwnerReference.class)
149 .register(Probe.class)
150 .register(ExecAction.class)
151 .register(HTTPGetAction.class)
152 .register(HTTPHeader.class)
153 .register(TCPSocketAction.class)
154 .register(ContainerPort.class)
155 .register(ResourceRequirements.class)
156 .register(SecurityContext.class)
157 .register(PodSecurityContext.class)
158 .register(SELinuxOptions.class)
159 .register(Volume.class)
160 .register(VolumeDevice.class)
161 .register(VolumeMount.class)
162 .register(IntOrString.class)
163 .register(Toleration.class)
164 .register(PersistentVolumeClaimVolumeSource.class)
165 .register(SecretVolumeSource.class)
166 .register(EmptyDirVolumeSource.class)
167 .register(Quantity.class)
168 .register(Capabilities.class)
169 .register(ConfigMapVolumeSource.class)
170 .register(KeyToPath.class)
171 .register(HostPathVolumeSource.class)
Jian Li4a7ce672019-04-09 15:20:25 +0900172 .register(Affinity.class)
173 .register(NodeAffinity.class)
174 .register(NodeSelector.class)
175 .register(NodeSelectorTerm.class)
176 .register(NodeSelectorRequirement.class)
177 .register(PreferredSchedulingTerm.class)
178 .register(PodAffinity.class)
179 .register(WeightedPodAffinityTerm.class)
180 .register(PodAffinityTerm.class)
181 .register(LabelSelector.class)
182 .register(LabelSelectorRequirement.class)
183 .register(PodAntiAffinity.class)
Jian Lib1cd0b02019-02-21 16:41:40 +0900184 .register(LinkedHashMap.class)
Jian Li747e1362019-02-19 22:59:46 +0900185 .register(Collection.class)
186 .build();
187
188 @Reference(cardinality = ReferenceCardinality.MANDATORY)
189 protected CoreService coreService;
190
191 @Reference(cardinality = ReferenceCardinality.MANDATORY)
192 protected StorageService storageService;
193
194 private final ExecutorService eventExecutor = newSingleThreadExecutor(
195 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
196
197 private final MapEventListener<String, Pod> podMapListener = new K8sPodMapListener();
198
199 private ConsistentMap<String, Pod> podStore;
200
201 @Activate
202 protected void activate() {
203 ApplicationId appId = coreService.registerApplication(APP_ID);
204 podStore = storageService.<String, Pod>consistentMapBuilder()
205 .withSerializer(Serializer.using(SERIALIZER_K8S_POD))
206 .withName("k8s-pod-store")
207 .withApplicationId(appId)
208 .build();
209
210 podStore.addListener(podMapListener);
211 log.info("Started");
212 }
213
214 @Deactivate
215 protected void deactivate() {
216 podStore.removeListener(podMapListener);
217 eventExecutor.shutdown();
218 log.info("Stopped");
219 }
220
221 @Override
222 public void createPod(Pod pod) {
223 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
224 final String error = pod.getMetadata().getUid() + ERR_DUPLICATE;
225 checkArgument(existing == null, error);
226 return pod;
227 });
228 }
229
230 @Override
231 public void updatePod(Pod pod) {
232 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
233 final String error = pod.getMetadata().getUid() + ERR_NOT_FOUND;
234 checkArgument(existing != null, error);
235 return pod;
236 });
237 }
238
239 @Override
240 public Pod removePod(String uid) {
241 Versioned<Pod> pod = podStore.remove(uid);
242 if (pod == null) {
243 final String error = uid + ERR_NOT_FOUND;
244 throw new IllegalArgumentException(error);
245 }
246 return pod.value();
247 }
248
249 @Override
250 public Pod pod(String uid) {
251 return podStore.asJavaMap().get(uid);
252 }
253
254 @Override
255 public Set<Pod> pods() {
256 return ImmutableSet.copyOf(podStore.asJavaMap().values());
257 }
258
259 @Override
260 public void clear() {
261 podStore.clear();
262 }
263
264 private class K8sPodMapListener implements MapEventListener<String, Pod> {
265
266 @Override
267 public void event(MapEvent<String, Pod> event) {
268
269 switch (event.type()) {
270 case INSERT:
271 log.debug("Kubernetes pod created {}", event.newValue());
272 eventExecutor.execute(() ->
273 notifyDelegate(new K8sPodEvent(
274 K8S_POD_CREATED, event.newValue().value())));
275 break;
276 case UPDATE:
277 log.debug("Kubernetes pod updated {}", event.newValue());
278 eventExecutor.execute(() ->
279 notifyDelegate(new K8sPodEvent(
280 K8S_POD_UPDATED, event.newValue().value())));
281 break;
282 case REMOVE:
283 log.debug("Kubernetes pod removed {}", event.oldValue());
284 eventExecutor.execute(() ->
285 notifyDelegate(new K8sPodEvent(
286 K8S_POD_REMOVED, event.oldValue().value())));
287 break;
288 default:
289 // do nothing
290 break;
291 }
292 }
293 }
294}