blob: 8f3fdbd0d4dcb8b745a8322484b3a3673cca6964 [file] [log] [blame]
Jian Li7eb20782021-02-27 01:10:50 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
Jian Lie48a6172021-02-28 03:50:15 +090018import com.google.common.base.Strings;
Jian Li7eb20782021-02-27 01:10:50 +090019import com.google.common.collect.ImmutableSet;
20import org.onlab.util.KryoNamespace;
21import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
Jian Lie48a6172021-02-28 03:50:15 +090023import org.onosproject.kubevirtnetworking.api.DefaultKubevirtFloatingIp;
Jian Li7eb20782021-02-27 01:10:50 +090024import org.onosproject.kubevirtnetworking.api.DefaultKubevirtRouter;
Jian Lie48a6172021-02-28 03:50:15 +090025import org.onosproject.kubevirtnetworking.api.KubevirtFloatingIp;
Jian Li7eb20782021-02-27 01:10:50 +090026import org.onosproject.kubevirtnetworking.api.KubevirtPeerRouter;
27import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
28import org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent;
29import org.onosproject.kubevirtnetworking.api.KubevirtRouterStore;
30import org.onosproject.kubevirtnetworking.api.KubevirtRouterStoreDelegate;
31import org.onosproject.store.AbstractStore;
32import org.onosproject.store.serializers.KryoNamespaces;
33import org.onosproject.store.service.ConsistentMap;
34import org.onosproject.store.service.MapEvent;
35import org.onosproject.store.service.MapEventListener;
36import org.onosproject.store.service.Serializer;
37import org.onosproject.store.service.StorageService;
38import org.onosproject.store.service.Versioned;
39import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Reference;
43import org.osgi.service.component.annotations.ReferenceCardinality;
44import org.slf4j.Logger;
45
46import java.util.Collection;
Jian Li4acd4542021-03-03 14:46:50 +090047import java.util.HashSet;
Daniel Parkb9a22022021-03-04 18:58:47 +090048import java.util.Objects;
Jian Li7eb20782021-02-27 01:10:50 +090049import java.util.Set;
50import java.util.concurrent.ExecutorService;
51
52import static com.google.common.base.Preconditions.checkArgument;
53import static java.util.concurrent.Executors.newSingleThreadExecutor;
54import static org.onlab.util.Tools.groupedThreads;
Jian Lie48a6172021-02-28 03:50:15 +090055import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_ASSOCIATED;
56import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_CREATED;
57import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_DISASSOCIATED;
58import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_REMOVED;
59import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_UPDATED;
Daniel Parkb9a22022021-03-04 18:58:47 +090060import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_GATEWAY_NODE_ATTACHED;
61import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_GATEWAY_NODE_CHANGED;
62import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_GATEWAY_NODE_DETACHED;
Jian Li7eb20782021-02-27 01:10:50 +090063import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_CREATED;
Jian Li4acd4542021-03-03 14:46:50 +090064import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_EXTERNAL_NETWORK_ATTACHED;
65import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_EXTERNAL_NETWORK_DETACHED;
66import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_INTERNAL_NETWORKS_ATTACHED;
67import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_INTERNAL_NETWORKS_DETACHED;
Jian Li7eb20782021-02-27 01:10:50 +090068import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_REMOVED;
69import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_UPDATED;
70import static org.slf4j.LoggerFactory.getLogger;
71
72/**
73 * Implementation of kubevirt router store using consistent map.
74 */
75@Component(immediate = true, service = KubevirtRouterStore.class)
76public class DistributedKubevirtRouterStore
77 extends AbstractStore<KubevirtRouterEvent, KubevirtRouterStoreDelegate>
78 implements KubevirtRouterStore {
79
80 private final Logger log = getLogger(getClass());
81
82 private static final String ERR_NOT_FOUND = " does not exist";
83 private static final String ERR_DUPLICATE = " already exists";
Jian Li4acd4542021-03-03 14:46:50 +090084 private static final String MSG_FLOATING_IP = "Kubevirt floating IP %s %s with %s";
85 private static final String MSG_ASSOCIATED = "associated";
86 private static final String MSG_DISASSOCIATED = "disassociated";
87
Jian Li7eb20782021-02-27 01:10:50 +090088 private static final String APP_ID = "org.onosproject.kubevirtnetwork";
89
90 private static final KryoNamespace
91 SERIALIZER_KUBEVIRT_ROUTER = KryoNamespace.newBuilder()
92 .register(KryoNamespaces.API)
93 .register(KubevirtRouter.class)
94 .register(DefaultKubevirtRouter.class)
95 .register(KubevirtPeerRouter.class)
Jian Lie48a6172021-02-28 03:50:15 +090096 .register(KubevirtFloatingIp.class)
97 .register(DefaultKubevirtFloatingIp.class)
Jian Li7eb20782021-02-27 01:10:50 +090098 .register(Collection.class)
99 .build();
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected CoreService coreService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected StorageService storageService;
106
107 private final ExecutorService eventExecutor = newSingleThreadExecutor(
108 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
109
110 private final MapEventListener<String, KubevirtRouter> routerMapListener =
111 new KubevirtRouterMapListener();
Jian Lie48a6172021-02-28 03:50:15 +0900112 private final MapEventListener<String, KubevirtFloatingIp> fipMapListener =
113 new KubevirtFloatingIpMapListener();
Jian Li7eb20782021-02-27 01:10:50 +0900114
115 private ConsistentMap<String, KubevirtRouter> routerStore;
Jian Lie48a6172021-02-28 03:50:15 +0900116 private ConsistentMap<String, KubevirtFloatingIp> fipStore;
Jian Li7eb20782021-02-27 01:10:50 +0900117
118 @Activate
119 protected void activate() {
120 ApplicationId appId = coreService.registerApplication(APP_ID);
121 routerStore = storageService.<String, KubevirtRouter>consistentMapBuilder()
122 .withSerializer(Serializer.using(SERIALIZER_KUBEVIRT_ROUTER))
123 .withName("kubevirt-routerstore")
124 .withApplicationId(appId)
125 .build();
Jian Lie48a6172021-02-28 03:50:15 +0900126 fipStore = storageService.<String, KubevirtFloatingIp>consistentMapBuilder()
127 .withSerializer(Serializer.using(SERIALIZER_KUBEVIRT_ROUTER))
128 .withName("kubevirt-fipstore")
129 .withApplicationId(appId)
130 .build();
Jian Li7eb20782021-02-27 01:10:50 +0900131 routerStore.addListener(routerMapListener);
Jian Lie48a6172021-02-28 03:50:15 +0900132 fipStore.addListener(fipMapListener);
Jian Li7eb20782021-02-27 01:10:50 +0900133 log.info("Started");
134 }
135
136 @Deactivate
137 protected void deactivate() {
138 routerStore.removeListener(routerMapListener);
Jian Lie48a6172021-02-28 03:50:15 +0900139 fipStore.removeListener(fipMapListener);
Jian Li7eb20782021-02-27 01:10:50 +0900140 eventExecutor.shutdown();
141 log.info("Stopped");
142 }
143
144 @Override
145 public void createRouter(KubevirtRouter router) {
146 routerStore.compute(router.name(), (name, existing) -> {
147 final String error = router.name() + ERR_DUPLICATE;
148 checkArgument(existing == null, error);
149 return router;
150 });
151 }
152
153 @Override
154 public void updateRouter(KubevirtRouter router) {
155 routerStore.compute(router.name(), (name, existing) -> {
156 final String error = router.name() + ERR_NOT_FOUND;
157 checkArgument(existing != null, error);
158 return router;
159 });
160 }
161
162 @Override
163 public KubevirtRouter removeRouter(String name) {
164 Versioned<KubevirtRouter> router = routerStore.remove(name);
165 if (router == null) {
166 final String error = name + ERR_NOT_FOUND;
167 throw new IllegalArgumentException(error);
168 }
169 return router.value();
170 }
171
172 @Override
173 public KubevirtRouter router(String name) {
174 return routerStore.asJavaMap().get(name);
175 }
176
177 @Override
178 public Set<KubevirtRouter> routers() {
179 return ImmutableSet.copyOf(routerStore.asJavaMap().values());
180 }
181
182 @Override
Jian Lie48a6172021-02-28 03:50:15 +0900183 public void createFloatingIp(KubevirtFloatingIp floatingIp) {
184 fipStore.compute(floatingIp.id(), (id, existing) -> {
185 final String error = floatingIp.id() + ERR_DUPLICATE;
186 checkArgument(existing == null, error);
187 return floatingIp;
188 });
189 }
190
191 @Override
192 public void updateFloatingIp(KubevirtFloatingIp floatingIp) {
193 fipStore.compute(floatingIp.id(), (id, existing) -> {
194 final String error = floatingIp.id() + ERR_NOT_FOUND;
195 checkArgument(existing != null, error);
196 return floatingIp;
197 });
198 }
199
200 @Override
201 public KubevirtFloatingIp removeFloatingIp(String id) {
202 Versioned<KubevirtFloatingIp> floatingIp = fipStore.remove(id);
203 if (floatingIp == null) {
204 final String error = id + ERR_NOT_FOUND;
205 throw new IllegalArgumentException(error);
206 }
207 return floatingIp.value();
208 }
209
210 @Override
211 public KubevirtFloatingIp floatingIp(String id) {
212 return fipStore.asJavaMap().get(id);
213 }
214
215 @Override
216 public Set<KubevirtFloatingIp> floatingIps() {
217 return ImmutableSet.copyOf(fipStore.asJavaMap().values());
218 }
219
220
221 @Override
Jian Li7eb20782021-02-27 01:10:50 +0900222 public void clear() {
223 routerStore.clear();
Jian Lie48a6172021-02-28 03:50:15 +0900224 fipStore.clear();
Jian Li7eb20782021-02-27 01:10:50 +0900225 }
226
227 private class KubevirtRouterMapListener implements MapEventListener<String, KubevirtRouter> {
228
229 @Override
230 public void event(MapEvent<String, KubevirtRouter> event) {
231 switch (event.type()) {
232 case INSERT:
233 log.debug("Kubevirt router created");
234 eventExecutor.execute(() ->
235 notifyDelegate(new KubevirtRouterEvent(
236 KUBEVIRT_ROUTER_CREATED, event.newValue().value())));
237 break;
238 case UPDATE:
Jian Li4acd4542021-03-03 14:46:50 +0900239 eventExecutor.execute(() -> processRouterMapUpdate(event));
Jian Li7eb20782021-02-27 01:10:50 +0900240 break;
241 case REMOVE:
242 log.debug("Kubevirt router removed");
243 eventExecutor.execute(() ->
244 notifyDelegate(new KubevirtRouterEvent(
245 KUBEVIRT_ROUTER_REMOVED, event.oldValue().value())));
246 break;
247 default:
248 // do nothing
249 break;
250 }
251 }
Jian Li4acd4542021-03-03 14:46:50 +0900252
253 private void processRouterMapUpdate(MapEvent<String, KubevirtRouter> event) {
254 log.debug("Kubevirt router updated");
255 eventExecutor.execute(() ->
256 notifyDelegate(new KubevirtRouterEvent(
257 KUBEVIRT_ROUTER_UPDATED, event.newValue().value())));
258
259 KubevirtRouter router = Strings.isNullOrEmpty(
260 event.newValue().value().name()) ?
261 null :
262 router(event.newValue().value().name());
263
264 KubevirtRouter oldValue = event.oldValue().value();
265 KubevirtRouter newValue = event.newValue().value();
266
267 if (oldValue.external().size() == 0 && newValue.external().size() > 0) {
268 newValue.external().entrySet().stream().findAny()
269 .ifPresent(entry ->
270 notifyDelegate(new KubevirtRouterEvent(
271 KUBEVIRT_ROUTER_EXTERNAL_NETWORK_ATTACHED,
272 router, entry.getKey(), entry.getValue(),
273 newValue.peerRouter().ipAddress().toString())));
274 }
275
276 if (oldValue.external().size() > 0 && newValue.external().size() == 0) {
277 oldValue.external().entrySet().stream().findAny()
278 .ifPresent(entry ->
279 notifyDelegate(new KubevirtRouterEvent(
280 KUBEVIRT_ROUTER_EXTERNAL_NETWORK_DETACHED,
281 router, entry.getKey(), entry.getValue(),
282 oldValue.peerRouter().ipAddress().toString())));
283 }
284
285 Set<String> added = new HashSet<>(newValue.internal());
286 Set<String> oldset = oldValue.internal();
287 added.removeAll(oldset);
288
289 Set<String> removed = new HashSet<>(oldValue.internal());
290 Set<String> newset = newValue.internal();
291 removed.removeAll(newset);
292
293 if (added.size() > 0) {
294 notifyDelegate(new KubevirtRouterEvent(
295 KUBEVIRT_ROUTER_INTERNAL_NETWORKS_ATTACHED,
296 router, added));
297 }
298
299 if (removed.size() > 0) {
300 notifyDelegate(new KubevirtRouterEvent(
301 KUBEVIRT_ROUTER_INTERNAL_NETWORKS_DETACHED,
302 router, removed));
303 }
Daniel Parkb9a22022021-03-04 18:58:47 +0900304 if (oldValue.electedGateway() == null
305 && newValue.electedGateway() != null) {
306 notifyDelegate(new KubevirtRouterEvent(
307 KUBEVIRT_GATEWAY_NODE_ATTACHED,
308 router, newValue.electedGateway()));
309 }
310
311 if (oldValue.electedGateway() != null
312 && newValue.electedGateway() == null) {
313 notifyDelegate(new KubevirtRouterEvent(
314 KUBEVIRT_GATEWAY_NODE_DETACHED,
315 router, oldValue.electedGateway()));
316 }
317
318 if (oldValue.electedGateway() != null
319 && newValue.electedGateway() != null
320 && !Objects.equals(oldValue.electedGateway(), newValue.electedGateway())) {
321 notifyDelegate(new KubevirtRouterEvent(
322 KUBEVIRT_GATEWAY_NODE_CHANGED,
323 router, oldValue.electedGateway()));
324 }
Jian Li4acd4542021-03-03 14:46:50 +0900325 }
Jian Li7eb20782021-02-27 01:10:50 +0900326 }
Jian Lie48a6172021-02-28 03:50:15 +0900327
328 private class KubevirtFloatingIpMapListener implements MapEventListener<String, KubevirtFloatingIp> {
329
330 @Override
331 public void event(MapEvent<String, KubevirtFloatingIp> event) {
332 switch (event.type()) {
333 case INSERT:
334 eventExecutor.execute(() -> processFloatingIpMapInsertion(event));
335 break;
336 case UPDATE:
337 eventExecutor.execute(() -> processFloatingIpMapUpdate(event));
338 break;
339 case REMOVE:
340 eventExecutor.execute(() -> processFloatingIpMapRemoval(event));
341 break;
342 default:
343 break;
344 }
345 }
346
347 private void processFloatingIpMapInsertion(MapEvent<String, KubevirtFloatingIp> event) {
348 log.debug("Kubevirt floating IP created");
349 KubevirtRouter router = Strings.isNullOrEmpty(
350 event.newValue().value().routerName()) ?
351 null :
352 router(event.newValue().value().routerName());
353 notifyDelegate(new KubevirtRouterEvent(
354 KUBEVIRT_FLOATING_IP_CREATED,
355 router,
356 event.newValue().value()));
357 }
358
359 private void processFloatingIpMapUpdate(MapEvent<String, KubevirtFloatingIp> event) {
360 log.debug("Kubevirt floating IP updated");
361 KubevirtRouter router = Strings.isNullOrEmpty(
362 event.newValue().value().routerName()) ?
363 null :
364 router(event.newValue().value().routerName());
365 notifyDelegate(new KubevirtRouterEvent(
366 KUBEVIRT_FLOATING_IP_UPDATED,
367 router,
368 event.newValue().value()));
369 processFloatingIpUpdate(event, router);
370 }
371
372 private void processFloatingIpMapRemoval(MapEvent<String, KubevirtFloatingIp> event) {
373 log.debug("Kubevirt floating IP removed");
374 KubevirtRouter router = Strings.isNullOrEmpty(
375 event.oldValue().value().routerName()) ?
376 null :
377 router(event.oldValue().value().routerName());
378 notifyDelegate(new KubevirtRouterEvent(
379 KUBEVIRT_FLOATING_IP_REMOVED,
380 router,
381 event.oldValue().value()));
382 }
383
384 private void processFloatingIpUpdate(MapEvent<String, KubevirtFloatingIp> event,
385 KubevirtRouter router) {
386 String oldPodName = event.oldValue().value().podName();
387 String newPodName = event.newValue().value().podName();
388
389 if (Strings.isNullOrEmpty(oldPodName) && !Strings.isNullOrEmpty(newPodName)) {
390 notifyDelegate(new KubevirtRouterEvent(
391 KUBEVIRT_FLOATING_IP_ASSOCIATED,
392 router,
393 event.newValue().value(), newPodName));
Jian Li4acd4542021-03-03 14:46:50 +0900394 log.info(String.format(MSG_FLOATING_IP,
395 event.newValue().value().floatingIp(), MSG_ASSOCIATED, newPodName));
Jian Lie48a6172021-02-28 03:50:15 +0900396 }
397
398 if (!Strings.isNullOrEmpty(oldPodName) && Strings.isNullOrEmpty(newPodName)) {
399 notifyDelegate(new KubevirtRouterEvent(
400 KUBEVIRT_FLOATING_IP_DISASSOCIATED,
401 router,
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900402 event.oldValue().value(), oldPodName));
Jian Li4acd4542021-03-03 14:46:50 +0900403 log.info(String.format(MSG_FLOATING_IP,
404 event.newValue().value().floatingIp(), MSG_DISASSOCIATED, oldPodName));
Jian Lie48a6172021-02-28 03:50:15 +0900405 }
406 }
407 }
Jian Li7eb20782021-02-27 01:10:50 +0900408}