blob: 2d566fc6fbfd4763352e9b95ce6253a869d13a2a [file] [log] [blame]
Jian Li7eb20782021-02-27 01:10:50 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
Jian Lie48a6172021-02-28 03:50:15 +090018import com.google.common.base.Strings;
Jian Li7eb20782021-02-27 01:10:50 +090019import com.google.common.collect.ImmutableSet;
20import org.onlab.util.KryoNamespace;
21import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
Jian Lie48a6172021-02-28 03:50:15 +090023import org.onosproject.kubevirtnetworking.api.DefaultKubevirtFloatingIp;
Jian Li7eb20782021-02-27 01:10:50 +090024import org.onosproject.kubevirtnetworking.api.DefaultKubevirtRouter;
Jian Lie48a6172021-02-28 03:50:15 +090025import org.onosproject.kubevirtnetworking.api.KubevirtFloatingIp;
Jian Li7eb20782021-02-27 01:10:50 +090026import org.onosproject.kubevirtnetworking.api.KubevirtPeerRouter;
27import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
28import org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent;
29import org.onosproject.kubevirtnetworking.api.KubevirtRouterStore;
30import org.onosproject.kubevirtnetworking.api.KubevirtRouterStoreDelegate;
31import org.onosproject.store.AbstractStore;
32import org.onosproject.store.serializers.KryoNamespaces;
33import org.onosproject.store.service.ConsistentMap;
34import org.onosproject.store.service.MapEvent;
35import org.onosproject.store.service.MapEventListener;
36import org.onosproject.store.service.Serializer;
37import org.onosproject.store.service.StorageService;
38import org.onosproject.store.service.Versioned;
39import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Reference;
43import org.osgi.service.component.annotations.ReferenceCardinality;
44import org.slf4j.Logger;
45
46import java.util.Collection;
47import java.util.Set;
48import java.util.concurrent.ExecutorService;
49
50import static com.google.common.base.Preconditions.checkArgument;
51import static java.util.concurrent.Executors.newSingleThreadExecutor;
52import static org.onlab.util.Tools.groupedThreads;
Jian Lie48a6172021-02-28 03:50:15 +090053import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_ASSOCIATED;
54import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_CREATED;
55import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_DISASSOCIATED;
56import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_REMOVED;
57import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_UPDATED;
Jian Li7eb20782021-02-27 01:10:50 +090058import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_CREATED;
59import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_REMOVED;
60import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_UPDATED;
61import static org.slf4j.LoggerFactory.getLogger;
62
63/**
64 * Implementation of kubevirt router store using consistent map.
65 */
66@Component(immediate = true, service = KubevirtRouterStore.class)
67public class DistributedKubevirtRouterStore
68 extends AbstractStore<KubevirtRouterEvent, KubevirtRouterStoreDelegate>
69 implements KubevirtRouterStore {
70
71 private final Logger log = getLogger(getClass());
72
73 private static final String ERR_NOT_FOUND = " does not exist";
74 private static final String ERR_DUPLICATE = " already exists";
75 private static final String APP_ID = "org.onosproject.kubevirtnetwork";
76
77 private static final KryoNamespace
78 SERIALIZER_KUBEVIRT_ROUTER = KryoNamespace.newBuilder()
79 .register(KryoNamespaces.API)
80 .register(KubevirtRouter.class)
81 .register(DefaultKubevirtRouter.class)
82 .register(KubevirtPeerRouter.class)
Jian Lie48a6172021-02-28 03:50:15 +090083 .register(KubevirtFloatingIp.class)
84 .register(DefaultKubevirtFloatingIp.class)
Jian Li7eb20782021-02-27 01:10:50 +090085 .register(Collection.class)
86 .build();
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected CoreService coreService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY)
92 protected StorageService storageService;
93
94 private final ExecutorService eventExecutor = newSingleThreadExecutor(
95 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
96
97 private final MapEventListener<String, KubevirtRouter> routerMapListener =
98 new KubevirtRouterMapListener();
Jian Lie48a6172021-02-28 03:50:15 +090099 private final MapEventListener<String, KubevirtFloatingIp> fipMapListener =
100 new KubevirtFloatingIpMapListener();
Jian Li7eb20782021-02-27 01:10:50 +0900101
102 private ConsistentMap<String, KubevirtRouter> routerStore;
Jian Lie48a6172021-02-28 03:50:15 +0900103 private ConsistentMap<String, KubevirtFloatingIp> fipStore;
Jian Li7eb20782021-02-27 01:10:50 +0900104
105 @Activate
106 protected void activate() {
107 ApplicationId appId = coreService.registerApplication(APP_ID);
108 routerStore = storageService.<String, KubevirtRouter>consistentMapBuilder()
109 .withSerializer(Serializer.using(SERIALIZER_KUBEVIRT_ROUTER))
110 .withName("kubevirt-routerstore")
111 .withApplicationId(appId)
112 .build();
Jian Lie48a6172021-02-28 03:50:15 +0900113 fipStore = storageService.<String, KubevirtFloatingIp>consistentMapBuilder()
114 .withSerializer(Serializer.using(SERIALIZER_KUBEVIRT_ROUTER))
115 .withName("kubevirt-fipstore")
116 .withApplicationId(appId)
117 .build();
Jian Li7eb20782021-02-27 01:10:50 +0900118 routerStore.addListener(routerMapListener);
Jian Lie48a6172021-02-28 03:50:15 +0900119 fipStore.addListener(fipMapListener);
Jian Li7eb20782021-02-27 01:10:50 +0900120 log.info("Started");
121 }
122
123 @Deactivate
124 protected void deactivate() {
125 routerStore.removeListener(routerMapListener);
Jian Lie48a6172021-02-28 03:50:15 +0900126 fipStore.removeListener(fipMapListener);
Jian Li7eb20782021-02-27 01:10:50 +0900127 eventExecutor.shutdown();
128 log.info("Stopped");
129 }
130
131 @Override
132 public void createRouter(KubevirtRouter router) {
133 routerStore.compute(router.name(), (name, existing) -> {
134 final String error = router.name() + ERR_DUPLICATE;
135 checkArgument(existing == null, error);
136 return router;
137 });
138 }
139
140 @Override
141 public void updateRouter(KubevirtRouter router) {
142 routerStore.compute(router.name(), (name, existing) -> {
143 final String error = router.name() + ERR_NOT_FOUND;
144 checkArgument(existing != null, error);
145 return router;
146 });
147 }
148
149 @Override
150 public KubevirtRouter removeRouter(String name) {
151 Versioned<KubevirtRouter> router = routerStore.remove(name);
152 if (router == null) {
153 final String error = name + ERR_NOT_FOUND;
154 throw new IllegalArgumentException(error);
155 }
156 return router.value();
157 }
158
159 @Override
160 public KubevirtRouter router(String name) {
161 return routerStore.asJavaMap().get(name);
162 }
163
164 @Override
165 public Set<KubevirtRouter> routers() {
166 return ImmutableSet.copyOf(routerStore.asJavaMap().values());
167 }
168
169 @Override
Jian Lie48a6172021-02-28 03:50:15 +0900170 public void createFloatingIp(KubevirtFloatingIp floatingIp) {
171 fipStore.compute(floatingIp.id(), (id, existing) -> {
172 final String error = floatingIp.id() + ERR_DUPLICATE;
173 checkArgument(existing == null, error);
174 return floatingIp;
175 });
176 }
177
178 @Override
179 public void updateFloatingIp(KubevirtFloatingIp floatingIp) {
180 fipStore.compute(floatingIp.id(), (id, existing) -> {
181 final String error = floatingIp.id() + ERR_NOT_FOUND;
182 checkArgument(existing != null, error);
183 return floatingIp;
184 });
185 }
186
187 @Override
188 public KubevirtFloatingIp removeFloatingIp(String id) {
189 Versioned<KubevirtFloatingIp> floatingIp = fipStore.remove(id);
190 if (floatingIp == null) {
191 final String error = id + ERR_NOT_FOUND;
192 throw new IllegalArgumentException(error);
193 }
194 return floatingIp.value();
195 }
196
197 @Override
198 public KubevirtFloatingIp floatingIp(String id) {
199 return fipStore.asJavaMap().get(id);
200 }
201
202 @Override
203 public Set<KubevirtFloatingIp> floatingIps() {
204 return ImmutableSet.copyOf(fipStore.asJavaMap().values());
205 }
206
207
208 @Override
Jian Li7eb20782021-02-27 01:10:50 +0900209 public void clear() {
210 routerStore.clear();
Jian Lie48a6172021-02-28 03:50:15 +0900211 fipStore.clear();
Jian Li7eb20782021-02-27 01:10:50 +0900212 }
213
214 private class KubevirtRouterMapListener implements MapEventListener<String, KubevirtRouter> {
215
216 @Override
217 public void event(MapEvent<String, KubevirtRouter> event) {
218 switch (event.type()) {
219 case INSERT:
220 log.debug("Kubevirt router created");
221 eventExecutor.execute(() ->
222 notifyDelegate(new KubevirtRouterEvent(
223 KUBEVIRT_ROUTER_CREATED, event.newValue().value())));
224 break;
225 case UPDATE:
226 log.debug("Kubevirt router updated");
227 eventExecutor.execute(() ->
228 notifyDelegate(new KubevirtRouterEvent(
229 KUBEVIRT_ROUTER_UPDATED, event.newValue().value())));
230 break;
231 case REMOVE:
232 log.debug("Kubevirt router removed");
233 eventExecutor.execute(() ->
234 notifyDelegate(new KubevirtRouterEvent(
235 KUBEVIRT_ROUTER_REMOVED, event.oldValue().value())));
236 break;
237 default:
238 // do nothing
239 break;
240 }
241 }
242 }
Jian Lie48a6172021-02-28 03:50:15 +0900243
244 private class KubevirtFloatingIpMapListener implements MapEventListener<String, KubevirtFloatingIp> {
245
246 @Override
247 public void event(MapEvent<String, KubevirtFloatingIp> event) {
248 switch (event.type()) {
249 case INSERT:
250 eventExecutor.execute(() -> processFloatingIpMapInsertion(event));
251 break;
252 case UPDATE:
253 eventExecutor.execute(() -> processFloatingIpMapUpdate(event));
254 break;
255 case REMOVE:
256 eventExecutor.execute(() -> processFloatingIpMapRemoval(event));
257 break;
258 default:
259 break;
260 }
261 }
262
263 private void processFloatingIpMapInsertion(MapEvent<String, KubevirtFloatingIp> event) {
264 log.debug("Kubevirt floating IP created");
265 KubevirtRouter router = Strings.isNullOrEmpty(
266 event.newValue().value().routerName()) ?
267 null :
268 router(event.newValue().value().routerName());
269 notifyDelegate(new KubevirtRouterEvent(
270 KUBEVIRT_FLOATING_IP_CREATED,
271 router,
272 event.newValue().value()));
273 }
274
275 private void processFloatingIpMapUpdate(MapEvent<String, KubevirtFloatingIp> event) {
276 log.debug("Kubevirt floating IP updated");
277 KubevirtRouter router = Strings.isNullOrEmpty(
278 event.newValue().value().routerName()) ?
279 null :
280 router(event.newValue().value().routerName());
281 notifyDelegate(new KubevirtRouterEvent(
282 KUBEVIRT_FLOATING_IP_UPDATED,
283 router,
284 event.newValue().value()));
285 processFloatingIpUpdate(event, router);
286 }
287
288 private void processFloatingIpMapRemoval(MapEvent<String, KubevirtFloatingIp> event) {
289 log.debug("Kubevirt floating IP removed");
290 KubevirtRouter router = Strings.isNullOrEmpty(
291 event.oldValue().value().routerName()) ?
292 null :
293 router(event.oldValue().value().routerName());
294 notifyDelegate(new KubevirtRouterEvent(
295 KUBEVIRT_FLOATING_IP_REMOVED,
296 router,
297 event.oldValue().value()));
298 }
299
300 private void processFloatingIpUpdate(MapEvent<String, KubevirtFloatingIp> event,
301 KubevirtRouter router) {
302 String oldPodName = event.oldValue().value().podName();
303 String newPodName = event.newValue().value().podName();
304
305 if (Strings.isNullOrEmpty(oldPodName) && !Strings.isNullOrEmpty(newPodName)) {
306 notifyDelegate(new KubevirtRouterEvent(
307 KUBEVIRT_FLOATING_IP_ASSOCIATED,
308 router,
309 event.newValue().value(), newPodName));
310 }
311
312 if (!Strings.isNullOrEmpty(oldPodName) && Strings.isNullOrEmpty(newPodName)) {
313 notifyDelegate(new KubevirtRouterEvent(
314 KUBEVIRT_FLOATING_IP_DISASSOCIATED,
315 router,
316 event.newValue().value(), oldPodName));
317 }
318 }
319 }
Jian Li7eb20782021-02-27 01:10:50 +0900320}