blob: fe1634676a53bdd17f6559f832a3803c656d7a71 [file] [log] [blame]
Jian Li747e1362019-02-19 22:59:46 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
Jian Lib1cd0b02019-02-21 16:41:40 +090019import io.fabric8.kubernetes.api.model.Capabilities;
20import io.fabric8.kubernetes.api.model.ConfigMapEnvSource;
21import io.fabric8.kubernetes.api.model.ConfigMapKeySelector;
22import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource;
23import io.fabric8.kubernetes.api.model.Container;
24import io.fabric8.kubernetes.api.model.ContainerPort;
25import io.fabric8.kubernetes.api.model.ContainerState;
26import io.fabric8.kubernetes.api.model.ContainerStateRunning;
27import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
28import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
29import io.fabric8.kubernetes.api.model.ContainerStatus;
30import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource;
31import io.fabric8.kubernetes.api.model.EnvFromSource;
32import io.fabric8.kubernetes.api.model.EnvVar;
33import io.fabric8.kubernetes.api.model.EnvVarSource;
34import io.fabric8.kubernetes.api.model.ExecAction;
35import io.fabric8.kubernetes.api.model.HTTPGetAction;
36import io.fabric8.kubernetes.api.model.HTTPHeader;
37import io.fabric8.kubernetes.api.model.HostPathVolumeSource;
38import io.fabric8.kubernetes.api.model.IntOrString;
39import io.fabric8.kubernetes.api.model.KeyToPath;
40import io.fabric8.kubernetes.api.model.Lifecycle;
41import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
Jian Li747e1362019-02-19 22:59:46 +090042import io.fabric8.kubernetes.api.model.ObjectMeta;
Jian Lib1cd0b02019-02-21 16:41:40 +090043import io.fabric8.kubernetes.api.model.OwnerReference;
44import io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSource;
Jian Li747e1362019-02-19 22:59:46 +090045import io.fabric8.kubernetes.api.model.Pod;
Jian Lib1cd0b02019-02-21 16:41:40 +090046import io.fabric8.kubernetes.api.model.PodCondition;
47import io.fabric8.kubernetes.api.model.PodSecurityContext;
Jian Li747e1362019-02-19 22:59:46 +090048import io.fabric8.kubernetes.api.model.PodSpec;
49import io.fabric8.kubernetes.api.model.PodStatus;
Jian Lib1cd0b02019-02-21 16:41:40 +090050import io.fabric8.kubernetes.api.model.Probe;
51import io.fabric8.kubernetes.api.model.Quantity;
52import io.fabric8.kubernetes.api.model.ResourceFieldSelector;
53import io.fabric8.kubernetes.api.model.ResourceRequirements;
54import io.fabric8.kubernetes.api.model.SELinuxOptions;
55import io.fabric8.kubernetes.api.model.SecretEnvSource;
56import io.fabric8.kubernetes.api.model.SecretKeySelector;
57import io.fabric8.kubernetes.api.model.SecretVolumeSource;
58import io.fabric8.kubernetes.api.model.SecurityContext;
59import io.fabric8.kubernetes.api.model.TCPSocketAction;
60import io.fabric8.kubernetes.api.model.Toleration;
61import io.fabric8.kubernetes.api.model.Volume;
62import io.fabric8.kubernetes.api.model.VolumeDevice;
63import io.fabric8.kubernetes.api.model.VolumeMount;
Jian Li747e1362019-02-19 22:59:46 +090064import org.onlab.util.KryoNamespace;
65import org.onosproject.core.ApplicationId;
66import org.onosproject.core.CoreService;
67import org.onosproject.k8snetworking.api.K8sPodEvent;
68import org.onosproject.k8snetworking.api.K8sPodStore;
69import org.onosproject.k8snetworking.api.K8sPodStoreDelegate;
70import org.onosproject.store.AbstractStore;
71import org.onosproject.store.serializers.KryoNamespaces;
72import org.onosproject.store.service.ConsistentMap;
73import org.onosproject.store.service.MapEvent;
74import org.onosproject.store.service.MapEventListener;
75import org.onosproject.store.service.Serializer;
76import org.onosproject.store.service.StorageService;
77import org.onosproject.store.service.Versioned;
78import org.osgi.service.component.annotations.Activate;
79import org.osgi.service.component.annotations.Component;
80import org.osgi.service.component.annotations.Deactivate;
81import org.osgi.service.component.annotations.Reference;
82import org.osgi.service.component.annotations.ReferenceCardinality;
83import org.slf4j.Logger;
84
85import java.util.Collection;
Jian Lib1cd0b02019-02-21 16:41:40 +090086import java.util.LinkedHashMap;
Jian Li747e1362019-02-19 22:59:46 +090087import java.util.Set;
88import java.util.concurrent.ExecutorService;
89
90import static com.google.common.base.Preconditions.checkArgument;
91import static java.util.concurrent.Executors.newSingleThreadExecutor;
92import static org.onlab.util.Tools.groupedThreads;
93import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_CREATED;
94import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_REMOVED;
95import static org.onosproject.k8snetworking.api.K8sPodEvent.Type.K8S_POD_UPDATED;
96import static org.slf4j.LoggerFactory.getLogger;
97
98/**
99 * Implementation of kubernetes pod store using consistent map.
100 */
101@Component(immediate = true, service = K8sPodStore.class)
102public class DistributedK8sPodStore
103 extends AbstractStore<K8sPodEvent, K8sPodStoreDelegate>
104 implements K8sPodStore {
105
106 private final Logger log = getLogger(getClass());
107
108 private static final String ERR_NOT_FOUND = " does not exist";
109 private static final String ERR_DUPLICATE = " already exists";
110 private static final String APP_ID = "org.onosproject.k8snetwork";
111
112 private static final KryoNamespace
113 SERIALIZER_K8S_POD = KryoNamespace.newBuilder()
114 .register(KryoNamespaces.API)
115 .register(Pod.class)
116 .register(ObjectMeta.class)
117 .register(PodSpec.class)
118 .register(PodStatus.class)
Jian Lib1cd0b02019-02-21 16:41:40 +0900119 .register(PodCondition.class)
120 .register(Container.class)
121 .register(EnvVar.class)
122 .register(EnvVarSource.class)
123 .register(EnvFromSource.class)
124 .register(ConfigMapEnvSource.class)
125 .register(ConfigMapKeySelector.class)
126 .register(ObjectFieldSelector.class)
127 .register(ResourceFieldSelector.class)
128 .register(SecretKeySelector.class)
129 .register(Lifecycle.class)
130 .register(SecretEnvSource.class)
131 .register(ContainerStatus.class)
132 .register(ContainerState.class)
133 .register(ContainerStateRunning.class)
134 .register(ContainerStateTerminated.class)
135 .register(ContainerStateWaiting.class)
136 .register(OwnerReference.class)
137 .register(Probe.class)
138 .register(ExecAction.class)
139 .register(HTTPGetAction.class)
140 .register(HTTPHeader.class)
141 .register(TCPSocketAction.class)
142 .register(ContainerPort.class)
143 .register(ResourceRequirements.class)
144 .register(SecurityContext.class)
145 .register(PodSecurityContext.class)
146 .register(SELinuxOptions.class)
147 .register(Volume.class)
148 .register(VolumeDevice.class)
149 .register(VolumeMount.class)
150 .register(IntOrString.class)
151 .register(Toleration.class)
152 .register(PersistentVolumeClaimVolumeSource.class)
153 .register(SecretVolumeSource.class)
154 .register(EmptyDirVolumeSource.class)
155 .register(Quantity.class)
156 .register(Capabilities.class)
157 .register(ConfigMapVolumeSource.class)
158 .register(KeyToPath.class)
159 .register(HostPathVolumeSource.class)
160 .register(LinkedHashMap.class)
Jian Li747e1362019-02-19 22:59:46 +0900161 .register(Collection.class)
162 .build();
163
164 @Reference(cardinality = ReferenceCardinality.MANDATORY)
165 protected CoreService coreService;
166
167 @Reference(cardinality = ReferenceCardinality.MANDATORY)
168 protected StorageService storageService;
169
170 private final ExecutorService eventExecutor = newSingleThreadExecutor(
171 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
172
173 private final MapEventListener<String, Pod> podMapListener = new K8sPodMapListener();
174
175 private ConsistentMap<String, Pod> podStore;
176
177 @Activate
178 protected void activate() {
179 ApplicationId appId = coreService.registerApplication(APP_ID);
180 podStore = storageService.<String, Pod>consistentMapBuilder()
181 .withSerializer(Serializer.using(SERIALIZER_K8S_POD))
182 .withName("k8s-pod-store")
183 .withApplicationId(appId)
184 .build();
185
186 podStore.addListener(podMapListener);
187 log.info("Started");
188 }
189
190 @Deactivate
191 protected void deactivate() {
192 podStore.removeListener(podMapListener);
193 eventExecutor.shutdown();
194 log.info("Stopped");
195 }
196
197 @Override
198 public void createPod(Pod pod) {
199 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
200 final String error = pod.getMetadata().getUid() + ERR_DUPLICATE;
201 checkArgument(existing == null, error);
202 return pod;
203 });
204 }
205
206 @Override
207 public void updatePod(Pod pod) {
208 podStore.compute(pod.getMetadata().getUid(), (uid, existing) -> {
209 final String error = pod.getMetadata().getUid() + ERR_NOT_FOUND;
210 checkArgument(existing != null, error);
211 return pod;
212 });
213 }
214
215 @Override
216 public Pod removePod(String uid) {
217 Versioned<Pod> pod = podStore.remove(uid);
218 if (pod == null) {
219 final String error = uid + ERR_NOT_FOUND;
220 throw new IllegalArgumentException(error);
221 }
222 return pod.value();
223 }
224
225 @Override
226 public Pod pod(String uid) {
227 return podStore.asJavaMap().get(uid);
228 }
229
230 @Override
231 public Set<Pod> pods() {
232 return ImmutableSet.copyOf(podStore.asJavaMap().values());
233 }
234
235 @Override
236 public void clear() {
237 podStore.clear();
238 }
239
240 private class K8sPodMapListener implements MapEventListener<String, Pod> {
241
242 @Override
243 public void event(MapEvent<String, Pod> event) {
244
245 switch (event.type()) {
246 case INSERT:
247 log.debug("Kubernetes pod created {}", event.newValue());
248 eventExecutor.execute(() ->
249 notifyDelegate(new K8sPodEvent(
250 K8S_POD_CREATED, event.newValue().value())));
251 break;
252 case UPDATE:
253 log.debug("Kubernetes pod updated {}", event.newValue());
254 eventExecutor.execute(() ->
255 notifyDelegate(new K8sPodEvent(
256 K8S_POD_UPDATED, event.newValue().value())));
257 break;
258 case REMOVE:
259 log.debug("Kubernetes pod removed {}", event.oldValue());
260 eventExecutor.execute(() ->
261 notifyDelegate(new K8sPodEvent(
262 K8S_POD_REMOVED, event.oldValue().value())));
263 break;
264 default:
265 // do nothing
266 break;
267 }
268 }
269 }
270}