blob: 3bb88e4595ba0aee716ab1163a0e381cb6e29655 [file] [log] [blame]
Jian Li3e1b8872019-02-19 21:45:04 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
19import io.fabric8.kubernetes.api.model.EndpointAddress;
20import io.fabric8.kubernetes.api.model.EndpointPort;
21import io.fabric8.kubernetes.api.model.EndpointSubset;
22import io.fabric8.kubernetes.api.model.Endpoints;
23import io.fabric8.kubernetes.api.model.ObjectMeta;
Jian Lib1cd0b02019-02-21 16:41:40 +090024import io.fabric8.kubernetes.api.model.ObjectReference;
Jian Li3e1b8872019-02-19 21:45:04 +090025import org.onlab.util.KryoNamespace;
26import org.onosproject.core.ApplicationId;
27import org.onosproject.core.CoreService;
28import org.onosproject.k8snetworking.api.K8sEndpointsEvent;
29import org.onosproject.k8snetworking.api.K8sEndpointsStore;
30import org.onosproject.k8snetworking.api.K8sEndpointsStoreDelegate;
31import org.onosproject.store.AbstractStore;
32import org.onosproject.store.serializers.KryoNamespaces;
33import org.onosproject.store.service.ConsistentMap;
34import org.onosproject.store.service.MapEvent;
35import org.onosproject.store.service.MapEventListener;
36import org.onosproject.store.service.Serializer;
37import org.onosproject.store.service.StorageService;
38import org.onosproject.store.service.Versioned;
39import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Reference;
43import org.osgi.service.component.annotations.ReferenceCardinality;
44import org.slf4j.Logger;
45
46import java.util.Collection;
Jian Lib1cd0b02019-02-21 16:41:40 +090047import java.util.LinkedHashMap;
Jian Li3e1b8872019-02-19 21:45:04 +090048import java.util.Set;
49import java.util.concurrent.ExecutorService;
50
51import static com.google.common.base.Preconditions.checkArgument;
52import static java.util.concurrent.Executors.newSingleThreadExecutor;
53import static org.onlab.util.Tools.groupedThreads;
54import static org.onosproject.k8snetworking.api.K8sEndpointsEvent.Type.K8S_ENDPOINTS_CREATED;
55import static org.onosproject.k8snetworking.api.K8sEndpointsEvent.Type.K8S_ENDPOINTS_REMOVED;
56import static org.onosproject.k8snetworking.api.K8sEndpointsEvent.Type.K8S_ENDPOINTS_UPDATED;
57import static org.slf4j.LoggerFactory.getLogger;
58
59/**
60 * Implementation of kubernetes service store using consistent map.
61 */
62@Component(immediate = true, service = K8sEndpointsStore.class)
63public class DistributedK8sEndpointsStore
64 extends AbstractStore<K8sEndpointsEvent, K8sEndpointsStoreDelegate>
65 implements K8sEndpointsStore {
66
67 private final Logger log = getLogger(getClass());
68
69 private static final String ERR_NOT_FOUND = " does not exist";
70 private static final String ERR_DUPLICATE = " already exists";
71 private static final String APP_ID = "org.onosproject.k8snetwork";
72
73 private static final KryoNamespace
74 SERIALIZER_K8S_ENDPOINTS = KryoNamespace.newBuilder()
75 .register(KryoNamespaces.API)
76 .register(Endpoints.class)
77 .register(ObjectMeta.class)
78 .register(EndpointSubset.class)
79 .register(EndpointAddress.class)
Jian Lib1cd0b02019-02-21 16:41:40 +090080 .register(ObjectReference.class)
Jian Li3e1b8872019-02-19 21:45:04 +090081 .register(EndpointPort.class)
Jian Lib1cd0b02019-02-21 16:41:40 +090082 .register(LinkedHashMap.class)
Jian Li3e1b8872019-02-19 21:45:04 +090083 .register(Collection.class)
84 .build();
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected CoreService coreService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected StorageService storageService;
91
92 private final ExecutorService eventExecutor = newSingleThreadExecutor(
93 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
94
95 private final MapEventListener<String, Endpoints> endpointsMapListener =
96 new K8sEndpointsMapListener();
97
98 private ConsistentMap<String, Endpoints> endpointsStore;
99
100 @Activate
101 protected void activate() {
102 ApplicationId appId = coreService.registerApplication(APP_ID);
103 endpointsStore = storageService.<String, Endpoints>consistentMapBuilder()
104 .withSerializer(Serializer.using(SERIALIZER_K8S_ENDPOINTS))
105 .withName("k8s-endpoints-store")
106 .withApplicationId(appId)
107 .build();
108
109 endpointsStore.addListener(endpointsMapListener);
110 log.info("Started");
111 }
112
113 @Deactivate
114 protected void deactivate() {
115 endpointsStore.removeListener(endpointsMapListener);
116 eventExecutor.shutdown();
117 log.info("Stopped");
118 }
119
120 @Override
121 public void createEndpoints(Endpoints endpoints) {
122 endpointsStore.compute(endpoints.getMetadata().getUid(), (uid, existing) -> {
123 final String error = endpoints.getMetadata().getUid() + ERR_DUPLICATE;
124 checkArgument(existing == null, error);
125 return endpoints;
126 });
127 }
128
129 @Override
130 public void updateEndpoints(Endpoints endpoints) {
131 endpointsStore.compute(endpoints.getMetadata().getUid(), (uid, existing) -> {
132 final String error = endpoints.getMetadata().getUid() + ERR_NOT_FOUND;
133 checkArgument(existing != null, error);
134 return endpoints;
135 });
136 }
137
138 @Override
139 public Endpoints removeEndpoints(String uid) {
140 Versioned<Endpoints> endpoints = endpointsStore.remove(uid);
141 if (endpoints == null) {
142 final String error = uid + ERR_NOT_FOUND;
143 throw new IllegalArgumentException(error);
144 }
145 return endpoints.value();
146 }
147
148 @Override
149 public Endpoints endpoints(String uid) {
150 return endpointsStore.asJavaMap().get(uid);
151 }
152
153 @Override
154 public Set<Endpoints> endpointses() {
155 return ImmutableSet.copyOf(endpointsStore.asJavaMap().values());
156 }
157
158 @Override
159 public void clear() {
160 endpointsStore.clear();
161 }
162
163 private class K8sEndpointsMapListener implements MapEventListener<String, Endpoints> {
164
165 @Override
166 public void event(MapEvent<String, Endpoints> event) {
167
168 switch (event.type()) {
169 case INSERT:
170 log.debug("Kubernetes endpoints created {}", event.newValue());
171 eventExecutor.execute(() ->
172 notifyDelegate(new K8sEndpointsEvent(
173 K8S_ENDPOINTS_CREATED, event.newValue().value())));
174 break;
175 case UPDATE:
176 log.debug("Kubernetes endpoints updated {}", event.newValue());
177 eventExecutor.execute(() ->
178 notifyDelegate(new K8sEndpointsEvent(
179 K8S_ENDPOINTS_UPDATED, event.newValue().value())));
180 break;
181 case REMOVE:
182 log.debug("Kubernetes endpoints removed {}", event.oldValue());
183 eventExecutor.execute(() ->
184 notifyDelegate(new K8sEndpointsEvent(
185 K8S_ENDPOINTS_REMOVED, event.oldValue().value())));
186 break;
187 default:
188 // do nothing
189 break;
190 }
191 }
192 }
193}