blob: 07b122fa70fc02e59ce5a72457e4dd519a90669d [file] [log] [blame]
Jian Li965de272019-02-19 15:35:55 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
Jian Lib1cd0b02019-02-21 16:41:40 +090019import io.fabric8.kubernetes.api.model.ClientIPConfig;
Jian Li096262a2020-09-22 23:25:06 +090020import io.fabric8.kubernetes.api.model.FieldsV1;
Jian Lib1cd0b02019-02-21 16:41:40 +090021import io.fabric8.kubernetes.api.model.IntOrString;
22import io.fabric8.kubernetes.api.model.LoadBalancerIngress;
23import io.fabric8.kubernetes.api.model.LoadBalancerStatus;
Jian Li096262a2020-09-22 23:25:06 +090024import io.fabric8.kubernetes.api.model.ManagedFieldsEntry;
Jian Li965de272019-02-19 15:35:55 +090025import io.fabric8.kubernetes.api.model.ObjectMeta;
Jian Li07c27f32020-10-08 02:57:45 +090026import io.fabric8.kubernetes.api.model.OwnerReference;
Jian Li965de272019-02-19 15:35:55 +090027import io.fabric8.kubernetes.api.model.Service;
Jian Lib1cd0b02019-02-21 16:41:40 +090028import io.fabric8.kubernetes.api.model.ServicePort;
Jian Li965de272019-02-19 15:35:55 +090029import io.fabric8.kubernetes.api.model.ServiceSpec;
30import io.fabric8.kubernetes.api.model.ServiceStatus;
Jian Lib1cd0b02019-02-21 16:41:40 +090031import io.fabric8.kubernetes.api.model.SessionAffinityConfig;
Jian Li965de272019-02-19 15:35:55 +090032import org.onlab.util.KryoNamespace;
33import org.onosproject.core.ApplicationId;
34import org.onosproject.core.CoreService;
35import org.onosproject.k8snetworking.api.K8sServiceEvent;
36import org.onosproject.k8snetworking.api.K8sServiceStore;
37import org.onosproject.k8snetworking.api.K8sServiceStoreDelegate;
38import org.onosproject.store.AbstractStore;
39import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.service.ConsistentMap;
41import org.onosproject.store.service.MapEvent;
42import org.onosproject.store.service.MapEventListener;
43import org.onosproject.store.service.Serializer;
44import org.onosproject.store.service.StorageService;
45import org.onosproject.store.service.Versioned;
46import org.osgi.service.component.annotations.Activate;
47import org.osgi.service.component.annotations.Component;
48import org.osgi.service.component.annotations.Deactivate;
49import org.osgi.service.component.annotations.Reference;
50import org.osgi.service.component.annotations.ReferenceCardinality;
51import org.slf4j.Logger;
52
53import java.util.Collection;
Jian Lib1cd0b02019-02-21 16:41:40 +090054import java.util.LinkedHashMap;
Jian Li965de272019-02-19 15:35:55 +090055import java.util.Set;
56import java.util.concurrent.ExecutorService;
57
58import static com.google.common.base.Preconditions.checkArgument;
59import static java.util.concurrent.Executors.newSingleThreadExecutor;
60import static org.onlab.util.Tools.groupedThreads;
61import static org.onosproject.k8snetworking.api.K8sServiceEvent.Type.K8S_SERVICE_CREATED;
62import static org.onosproject.k8snetworking.api.K8sServiceEvent.Type.K8S_SERVICE_REMOVED;
63import static org.onosproject.k8snetworking.api.K8sServiceEvent.Type.K8S_SERVICE_UPDATED;
64import static org.slf4j.LoggerFactory.getLogger;
65
66/**
67 * Implementation of kubernetes service store using consistent map.
68 */
69@Component(immediate = true, service = K8sServiceStore.class)
70public class DistributedK8sServiceStore
71 extends AbstractStore<K8sServiceEvent, K8sServiceStoreDelegate>
72 implements K8sServiceStore {
73
74 private final Logger log = getLogger(getClass());
75
76 private static final String ERR_NOT_FOUND = " does not exist";
77 private static final String ERR_DUPLICATE = " already exists";
78 private static final String APP_ID = "org.onosproject.k8snetwork";
79
80 private static final KryoNamespace
81 SERIALIZER_K8S_SERVICE = KryoNamespace.newBuilder()
82 .register(KryoNamespaces.API)
83 .register(Service.class)
84 .register(ObjectMeta.class)
85 .register(ServiceSpec.class)
86 .register(ServiceStatus.class)
Jian Lib1cd0b02019-02-21 16:41:40 +090087 .register(LoadBalancerStatus.class)
88 .register(LoadBalancerIngress.class)
89 .register(ServicePort.class)
90 .register(IntOrString.class)
91 .register(SessionAffinityConfig.class)
92 .register(ClientIPConfig.class)
Jian Li096262a2020-09-22 23:25:06 +090093 .register(ManagedFieldsEntry.class)
94 .register(FieldsV1.class)
Jian Li07c27f32020-10-08 02:57:45 +090095 .register(OwnerReference.class)
Jian Lib1cd0b02019-02-21 16:41:40 +090096 .register(LinkedHashMap.class)
Jian Li965de272019-02-19 15:35:55 +090097 .register(Collection.class)
98 .build();
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected CoreService coreService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected StorageService storageService;
105
106 private final ExecutorService eventExecutor = newSingleThreadExecutor(
107 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
108
109 private final MapEventListener<String, Service> serviceMapListener =
110 new K8sServiceMapListener();
111
112 private ConsistentMap<String, Service> serviceStore;
113
114 @Activate
115 protected void activate() {
116 ApplicationId appId = coreService.registerApplication(APP_ID);
117 serviceStore = storageService.<String, Service>consistentMapBuilder()
118 .withSerializer(Serializer.using(SERIALIZER_K8S_SERVICE))
119 .withName("k8s-service-store")
120 .withApplicationId(appId)
121 .build();
122
123 serviceStore.addListener(serviceMapListener);
124 log.info("Started");
125 }
126
127 @Deactivate
128 protected void deactivate() {
129 serviceStore.removeListener(serviceMapListener);
130 eventExecutor.shutdown();
131 log.info("Stopped");
132 }
133
134 @Override
135 public void createService(Service service) {
136 serviceStore.compute(service.getMetadata().getUid(), (uid, existing) -> {
137 final String error = service.getMetadata().getUid() + ERR_DUPLICATE;
138 checkArgument(existing == null, error);
139 return service;
140 });
141 }
142
143 @Override
144 public void updateService(Service service) {
145 serviceStore.compute(service.getMetadata().getUid(), (uid, existing) -> {
146 final String error = service.getMetadata().getUid() + ERR_NOT_FOUND;
147 checkArgument(existing != null, error);
148 return service;
149 });
150 }
151
152 @Override
153 public Service removeService(String uid) {
154 Versioned<Service> service = serviceStore.remove(uid);
155 if (service == null) {
156 final String error = uid + ERR_NOT_FOUND;
157 throw new IllegalArgumentException(error);
158 }
159 return service.value();
160 }
161
162 @Override
163 public Service service(String uid) {
164 return serviceStore.asJavaMap().get(uid);
165 }
166
167 @Override
168 public Set<Service> services() {
169 return ImmutableSet.copyOf(serviceStore.asJavaMap().values());
170 }
171
172 @Override
173 public void clear() {
174 serviceStore.clear();
175 }
176
177 private class K8sServiceMapListener implements MapEventListener<String, Service> {
178
179 @Override
180 public void event(MapEvent<String, Service> event) {
181
182 switch (event.type()) {
183 case INSERT:
184 log.debug("Kubernetes service created {}", event.newValue());
185 eventExecutor.execute(() ->
186 notifyDelegate(new K8sServiceEvent(
187 K8S_SERVICE_CREATED, event.newValue().value())));
188 break;
189 case UPDATE:
190 log.debug("Kubernetes service updated {}", event.newValue());
191 eventExecutor.execute(() ->
192 notifyDelegate(new K8sServiceEvent(
193 K8S_SERVICE_UPDATED, event.newValue().value())));
194 break;
195 case REMOVE:
196 log.debug("Kubernetes service removed {}", event.oldValue());
197 eventExecutor.execute(() ->
198 notifyDelegate(new K8sServiceEvent(
199 K8S_SERVICE_REMOVED, event.oldValue().value())));
200 break;
201 default:
202 // do nothing
203 break;
204 }
205 }
206 }
207}