blob: 87de35e59e8047e5119334bd7ebf506f4a44c66b [file] [log] [blame]
Jian Li7eb20782021-02-27 01:10:50 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
Jian Lie48a6172021-02-28 03:50:15 +090018import com.google.common.base.Strings;
Jian Li7eb20782021-02-27 01:10:50 +090019import com.google.common.collect.ImmutableSet;
20import org.onlab.util.KryoNamespace;
21import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
Jian Lie48a6172021-02-28 03:50:15 +090023import org.onosproject.kubevirtnetworking.api.DefaultKubevirtFloatingIp;
Jian Li7eb20782021-02-27 01:10:50 +090024import org.onosproject.kubevirtnetworking.api.DefaultKubevirtRouter;
Jian Lie48a6172021-02-28 03:50:15 +090025import org.onosproject.kubevirtnetworking.api.KubevirtFloatingIp;
Jian Li7eb20782021-02-27 01:10:50 +090026import org.onosproject.kubevirtnetworking.api.KubevirtPeerRouter;
27import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
28import org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent;
29import org.onosproject.kubevirtnetworking.api.KubevirtRouterStore;
30import org.onosproject.kubevirtnetworking.api.KubevirtRouterStoreDelegate;
31import org.onosproject.store.AbstractStore;
32import org.onosproject.store.serializers.KryoNamespaces;
33import org.onosproject.store.service.ConsistentMap;
34import org.onosproject.store.service.MapEvent;
35import org.onosproject.store.service.MapEventListener;
36import org.onosproject.store.service.Serializer;
37import org.onosproject.store.service.StorageService;
38import org.onosproject.store.service.Versioned;
39import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Reference;
43import org.osgi.service.component.annotations.ReferenceCardinality;
44import org.slf4j.Logger;
45
46import java.util.Collection;
Jian Li4acd4542021-03-03 14:46:50 +090047import java.util.HashSet;
Daniel Parkb9a22022021-03-04 18:58:47 +090048import java.util.Objects;
Jian Li7eb20782021-02-27 01:10:50 +090049import java.util.Set;
50import java.util.concurrent.ExecutorService;
51
52import static com.google.common.base.Preconditions.checkArgument;
53import static java.util.concurrent.Executors.newSingleThreadExecutor;
54import static org.onlab.util.Tools.groupedThreads;
Jian Lie48a6172021-02-28 03:50:15 +090055import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_ASSOCIATED;
56import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_CREATED;
57import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_DISASSOCIATED;
58import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_REMOVED;
59import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_UPDATED;
Daniel Parkb9a22022021-03-04 18:58:47 +090060import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_GATEWAY_NODE_ATTACHED;
61import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_GATEWAY_NODE_CHANGED;
62import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_GATEWAY_NODE_DETACHED;
Jian Li7eb20782021-02-27 01:10:50 +090063import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_CREATED;
Jian Li4acd4542021-03-03 14:46:50 +090064import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_EXTERNAL_NETWORK_ATTACHED;
65import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_EXTERNAL_NETWORK_DETACHED;
66import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_INTERNAL_NETWORKS_ATTACHED;
67import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_INTERNAL_NETWORKS_DETACHED;
Jian Li7eb20782021-02-27 01:10:50 +090068import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_REMOVED;
69import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_UPDATED;
70import static org.slf4j.LoggerFactory.getLogger;
71
72/**
73 * Implementation of kubevirt router store using consistent map.
74 */
75@Component(immediate = true, service = KubevirtRouterStore.class)
76public class DistributedKubevirtRouterStore
77 extends AbstractStore<KubevirtRouterEvent, KubevirtRouterStoreDelegate>
78 implements KubevirtRouterStore {
79
80 private final Logger log = getLogger(getClass());
81
82 private static final String ERR_NOT_FOUND = " does not exist";
83 private static final String ERR_DUPLICATE = " already exists";
Jian Li4acd4542021-03-03 14:46:50 +090084 private static final String MSG_FLOATING_IP = "Kubevirt floating IP %s %s with %s";
85 private static final String MSG_ASSOCIATED = "associated";
86 private static final String MSG_DISASSOCIATED = "disassociated";
87
Jian Li7eb20782021-02-27 01:10:50 +090088 private static final String APP_ID = "org.onosproject.kubevirtnetwork";
89
90 private static final KryoNamespace
91 SERIALIZER_KUBEVIRT_ROUTER = KryoNamespace.newBuilder()
92 .register(KryoNamespaces.API)
93 .register(KubevirtRouter.class)
94 .register(DefaultKubevirtRouter.class)
95 .register(KubevirtPeerRouter.class)
Jian Lie48a6172021-02-28 03:50:15 +090096 .register(KubevirtFloatingIp.class)
97 .register(DefaultKubevirtFloatingIp.class)
Jian Li7eb20782021-02-27 01:10:50 +090098 .register(Collection.class)
99 .build();
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected CoreService coreService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected StorageService storageService;
106
107 private final ExecutorService eventExecutor = newSingleThreadExecutor(
108 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
109
110 private final MapEventListener<String, KubevirtRouter> routerMapListener =
111 new KubevirtRouterMapListener();
Jian Lie48a6172021-02-28 03:50:15 +0900112 private final MapEventListener<String, KubevirtFloatingIp> fipMapListener =
113 new KubevirtFloatingIpMapListener();
Jian Li7eb20782021-02-27 01:10:50 +0900114
115 private ConsistentMap<String, KubevirtRouter> routerStore;
Jian Lie48a6172021-02-28 03:50:15 +0900116 private ConsistentMap<String, KubevirtFloatingIp> fipStore;
Jian Li7eb20782021-02-27 01:10:50 +0900117
118 @Activate
119 protected void activate() {
120 ApplicationId appId = coreService.registerApplication(APP_ID);
121 routerStore = storageService.<String, KubevirtRouter>consistentMapBuilder()
122 .withSerializer(Serializer.using(SERIALIZER_KUBEVIRT_ROUTER))
123 .withName("kubevirt-routerstore")
124 .withApplicationId(appId)
125 .build();
Jian Lie48a6172021-02-28 03:50:15 +0900126 fipStore = storageService.<String, KubevirtFloatingIp>consistentMapBuilder()
127 .withSerializer(Serializer.using(SERIALIZER_KUBEVIRT_ROUTER))
128 .withName("kubevirt-fipstore")
129 .withApplicationId(appId)
130 .build();
Jian Li7eb20782021-02-27 01:10:50 +0900131 routerStore.addListener(routerMapListener);
Jian Lie48a6172021-02-28 03:50:15 +0900132 fipStore.addListener(fipMapListener);
Jian Li7eb20782021-02-27 01:10:50 +0900133 log.info("Started");
134 }
135
136 @Deactivate
137 protected void deactivate() {
138 routerStore.removeListener(routerMapListener);
Jian Lie48a6172021-02-28 03:50:15 +0900139 fipStore.removeListener(fipMapListener);
Jian Li7eb20782021-02-27 01:10:50 +0900140 eventExecutor.shutdown();
141 log.info("Stopped");
142 }
143
144 @Override
145 public void createRouter(KubevirtRouter router) {
146 routerStore.compute(router.name(), (name, existing) -> {
147 final String error = router.name() + ERR_DUPLICATE;
148 checkArgument(existing == null, error);
149 return router;
150 });
151 }
152
153 @Override
154 public void updateRouter(KubevirtRouter router) {
155 routerStore.compute(router.name(), (name, existing) -> {
156 final String error = router.name() + ERR_NOT_FOUND;
157 checkArgument(existing != null, error);
158 return router;
159 });
160 }
161
162 @Override
163 public KubevirtRouter removeRouter(String name) {
164 Versioned<KubevirtRouter> router = routerStore.remove(name);
165 if (router == null) {
166 final String error = name + ERR_NOT_FOUND;
167 throw new IllegalArgumentException(error);
168 }
169 return router.value();
170 }
171
172 @Override
173 public KubevirtRouter router(String name) {
174 return routerStore.asJavaMap().get(name);
175 }
176
177 @Override
178 public Set<KubevirtRouter> routers() {
179 return ImmutableSet.copyOf(routerStore.asJavaMap().values());
180 }
181
182 @Override
Jian Lie48a6172021-02-28 03:50:15 +0900183 public void createFloatingIp(KubevirtFloatingIp floatingIp) {
184 fipStore.compute(floatingIp.id(), (id, existing) -> {
185 final String error = floatingIp.id() + ERR_DUPLICATE;
186 checkArgument(existing == null, error);
187 return floatingIp;
188 });
189 }
190
191 @Override
192 public void updateFloatingIp(KubevirtFloatingIp floatingIp) {
193 fipStore.compute(floatingIp.id(), (id, existing) -> {
194 final String error = floatingIp.id() + ERR_NOT_FOUND;
195 checkArgument(existing != null, error);
196 return floatingIp;
197 });
198 }
199
200 @Override
201 public KubevirtFloatingIp removeFloatingIp(String id) {
202 Versioned<KubevirtFloatingIp> floatingIp = fipStore.remove(id);
203 if (floatingIp == null) {
204 final String error = id + ERR_NOT_FOUND;
205 throw new IllegalArgumentException(error);
206 }
207 return floatingIp.value();
208 }
209
210 @Override
211 public KubevirtFloatingIp floatingIp(String id) {
212 return fipStore.asJavaMap().get(id);
213 }
214
215 @Override
216 public Set<KubevirtFloatingIp> floatingIps() {
217 return ImmutableSet.copyOf(fipStore.asJavaMap().values());
218 }
219
220
221 @Override
Jian Li7eb20782021-02-27 01:10:50 +0900222 public void clear() {
223 routerStore.clear();
Jian Lie48a6172021-02-28 03:50:15 +0900224 fipStore.clear();
Jian Li7eb20782021-02-27 01:10:50 +0900225 }
226
227 private class KubevirtRouterMapListener implements MapEventListener<String, KubevirtRouter> {
228
229 @Override
230 public void event(MapEvent<String, KubevirtRouter> event) {
231 switch (event.type()) {
232 case INSERT:
233 log.debug("Kubevirt router created");
234 eventExecutor.execute(() ->
235 notifyDelegate(new KubevirtRouterEvent(
236 KUBEVIRT_ROUTER_CREATED, event.newValue().value())));
237 break;
238 case UPDATE:
Jian Li4acd4542021-03-03 14:46:50 +0900239 eventExecutor.execute(() -> processRouterMapUpdate(event));
Jian Li7eb20782021-02-27 01:10:50 +0900240 break;
241 case REMOVE:
242 log.debug("Kubevirt router removed");
243 eventExecutor.execute(() ->
244 notifyDelegate(new KubevirtRouterEvent(
245 KUBEVIRT_ROUTER_REMOVED, event.oldValue().value())));
246 break;
247 default:
248 // do nothing
249 break;
250 }
251 }
Jian Li4acd4542021-03-03 14:46:50 +0900252
253 private void processRouterMapUpdate(MapEvent<String, KubevirtRouter> event) {
254 log.debug("Kubevirt router updated");
255 eventExecutor.execute(() ->
256 notifyDelegate(new KubevirtRouterEvent(
257 KUBEVIRT_ROUTER_UPDATED, event.newValue().value())));
258
259 KubevirtRouter router = Strings.isNullOrEmpty(
260 event.newValue().value().name()) ?
261 null :
262 router(event.newValue().value().name());
263
264 KubevirtRouter oldValue = event.oldValue().value();
265 KubevirtRouter newValue = event.newValue().value();
266
267 if (oldValue.external().size() == 0 && newValue.external().size() > 0) {
268 newValue.external().entrySet().stream().findAny()
269 .ifPresent(entry ->
270 notifyDelegate(new KubevirtRouterEvent(
271 KUBEVIRT_ROUTER_EXTERNAL_NETWORK_ATTACHED,
272 router, entry.getKey(), entry.getValue(),
Daniel Park54205272021-03-23 08:00:00 +0900273 newValue.peerRouter().ipAddress().toString(),
274 newValue.peerRouter().macAddress())));
Jian Li4acd4542021-03-03 14:46:50 +0900275 }
276
277 if (oldValue.external().size() > 0 && newValue.external().size() == 0) {
278 oldValue.external().entrySet().stream().findAny()
279 .ifPresent(entry ->
280 notifyDelegate(new KubevirtRouterEvent(
281 KUBEVIRT_ROUTER_EXTERNAL_NETWORK_DETACHED,
282 router, entry.getKey(), entry.getValue(),
Daniel Park54205272021-03-23 08:00:00 +0900283 oldValue.peerRouter().ipAddress().toString(),
284 oldValue.peerRouter().macAddress())));
Jian Li4acd4542021-03-03 14:46:50 +0900285 }
286
287 Set<String> added = new HashSet<>(newValue.internal());
288 Set<String> oldset = oldValue.internal();
289 added.removeAll(oldset);
290
291 Set<String> removed = new HashSet<>(oldValue.internal());
292 Set<String> newset = newValue.internal();
293 removed.removeAll(newset);
294
295 if (added.size() > 0) {
296 notifyDelegate(new KubevirtRouterEvent(
297 KUBEVIRT_ROUTER_INTERNAL_NETWORKS_ATTACHED,
298 router, added));
299 }
300
301 if (removed.size() > 0) {
302 notifyDelegate(new KubevirtRouterEvent(
303 KUBEVIRT_ROUTER_INTERNAL_NETWORKS_DETACHED,
304 router, removed));
305 }
Daniel Parkb9a22022021-03-04 18:58:47 +0900306 if (oldValue.electedGateway() == null
307 && newValue.electedGateway() != null) {
308 notifyDelegate(new KubevirtRouterEvent(
309 KUBEVIRT_GATEWAY_NODE_ATTACHED,
310 router, newValue.electedGateway()));
311 }
312
313 if (oldValue.electedGateway() != null
314 && newValue.electedGateway() == null) {
315 notifyDelegate(new KubevirtRouterEvent(
316 KUBEVIRT_GATEWAY_NODE_DETACHED,
317 router, oldValue.electedGateway()));
318 }
319
320 if (oldValue.electedGateway() != null
321 && newValue.electedGateway() != null
322 && !Objects.equals(oldValue.electedGateway(), newValue.electedGateway())) {
323 notifyDelegate(new KubevirtRouterEvent(
324 KUBEVIRT_GATEWAY_NODE_CHANGED,
325 router, oldValue.electedGateway()));
326 }
Jian Li4acd4542021-03-03 14:46:50 +0900327 }
Jian Li7eb20782021-02-27 01:10:50 +0900328 }
Jian Lie48a6172021-02-28 03:50:15 +0900329
330 private class KubevirtFloatingIpMapListener implements MapEventListener<String, KubevirtFloatingIp> {
331
332 @Override
333 public void event(MapEvent<String, KubevirtFloatingIp> event) {
334 switch (event.type()) {
335 case INSERT:
336 eventExecutor.execute(() -> processFloatingIpMapInsertion(event));
337 break;
338 case UPDATE:
339 eventExecutor.execute(() -> processFloatingIpMapUpdate(event));
340 break;
341 case REMOVE:
342 eventExecutor.execute(() -> processFloatingIpMapRemoval(event));
343 break;
344 default:
345 break;
346 }
347 }
348
349 private void processFloatingIpMapInsertion(MapEvent<String, KubevirtFloatingIp> event) {
350 log.debug("Kubevirt floating IP created");
351 KubevirtRouter router = Strings.isNullOrEmpty(
352 event.newValue().value().routerName()) ?
353 null :
354 router(event.newValue().value().routerName());
355 notifyDelegate(new KubevirtRouterEvent(
356 KUBEVIRT_FLOATING_IP_CREATED,
357 router,
358 event.newValue().value()));
359 }
360
361 private void processFloatingIpMapUpdate(MapEvent<String, KubevirtFloatingIp> event) {
362 log.debug("Kubevirt floating IP updated");
363 KubevirtRouter router = Strings.isNullOrEmpty(
364 event.newValue().value().routerName()) ?
365 null :
366 router(event.newValue().value().routerName());
367 notifyDelegate(new KubevirtRouterEvent(
368 KUBEVIRT_FLOATING_IP_UPDATED,
369 router,
370 event.newValue().value()));
371 processFloatingIpUpdate(event, router);
372 }
373
374 private void processFloatingIpMapRemoval(MapEvent<String, KubevirtFloatingIp> event) {
375 log.debug("Kubevirt floating IP removed");
376 KubevirtRouter router = Strings.isNullOrEmpty(
377 event.oldValue().value().routerName()) ?
378 null :
379 router(event.oldValue().value().routerName());
380 notifyDelegate(new KubevirtRouterEvent(
381 KUBEVIRT_FLOATING_IP_REMOVED,
382 router,
383 event.oldValue().value()));
384 }
385
386 private void processFloatingIpUpdate(MapEvent<String, KubevirtFloatingIp> event,
387 KubevirtRouter router) {
388 String oldPodName = event.oldValue().value().podName();
389 String newPodName = event.newValue().value().podName();
390
391 if (Strings.isNullOrEmpty(oldPodName) && !Strings.isNullOrEmpty(newPodName)) {
392 notifyDelegate(new KubevirtRouterEvent(
393 KUBEVIRT_FLOATING_IP_ASSOCIATED,
394 router,
395 event.newValue().value(), newPodName));
Jian Li4acd4542021-03-03 14:46:50 +0900396 log.info(String.format(MSG_FLOATING_IP,
397 event.newValue().value().floatingIp(), MSG_ASSOCIATED, newPodName));
Jian Lie48a6172021-02-28 03:50:15 +0900398 }
399
400 if (!Strings.isNullOrEmpty(oldPodName) && Strings.isNullOrEmpty(newPodName)) {
401 notifyDelegate(new KubevirtRouterEvent(
402 KUBEVIRT_FLOATING_IP_DISASSOCIATED,
403 router,
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900404 event.oldValue().value(), oldPodName));
Jian Li4acd4542021-03-03 14:46:50 +0900405 log.info(String.format(MSG_FLOATING_IP,
406 event.newValue().value().floatingIp(), MSG_DISASSOCIATED, oldPodName));
Jian Lie48a6172021-02-28 03:50:15 +0900407 }
408 }
409 }
Jian Li7eb20782021-02-27 01:10:50 +0900410}