blob: c11fb2dc21baeabbf30a56f710aa5f81061cfebb [file] [log] [blame]
Jian Li810f58c2021-02-27 01:10:50 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
Jian Li073f1ba2021-02-28 03:50:15 +090018import com.google.common.base.Strings;
Jian Li810f58c2021-02-27 01:10:50 +090019import com.google.common.collect.ImmutableSet;
20import org.onlab.util.KryoNamespace;
21import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
Jian Li073f1ba2021-02-28 03:50:15 +090023import org.onosproject.kubevirtnetworking.api.DefaultKubevirtFloatingIp;
Jian Li810f58c2021-02-27 01:10:50 +090024import org.onosproject.kubevirtnetworking.api.DefaultKubevirtRouter;
Jian Li073f1ba2021-02-28 03:50:15 +090025import org.onosproject.kubevirtnetworking.api.KubevirtFloatingIp;
Jian Li810f58c2021-02-27 01:10:50 +090026import org.onosproject.kubevirtnetworking.api.KubevirtPeerRouter;
27import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
28import org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent;
29import org.onosproject.kubevirtnetworking.api.KubevirtRouterStore;
30import org.onosproject.kubevirtnetworking.api.KubevirtRouterStoreDelegate;
31import org.onosproject.store.AbstractStore;
32import org.onosproject.store.serializers.KryoNamespaces;
33import org.onosproject.store.service.ConsistentMap;
34import org.onosproject.store.service.MapEvent;
35import org.onosproject.store.service.MapEventListener;
36import org.onosproject.store.service.Serializer;
37import org.onosproject.store.service.StorageService;
38import org.onosproject.store.service.Versioned;
39import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Reference;
43import org.osgi.service.component.annotations.ReferenceCardinality;
44import org.slf4j.Logger;
45
46import java.util.Collection;
Jian Lib636f702021-03-03 14:46:50 +090047import java.util.HashSet;
Jian Li810f58c2021-02-27 01:10:50 +090048import java.util.Set;
49import java.util.concurrent.ExecutorService;
50
51import static com.google.common.base.Preconditions.checkArgument;
52import static java.util.concurrent.Executors.newSingleThreadExecutor;
53import static org.onlab.util.Tools.groupedThreads;
Jian Li073f1ba2021-02-28 03:50:15 +090054import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_ASSOCIATED;
55import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_CREATED;
56import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_DISASSOCIATED;
57import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_REMOVED;
58import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_UPDATED;
Jian Li810f58c2021-02-27 01:10:50 +090059import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_CREATED;
Jian Lib636f702021-03-03 14:46:50 +090060import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_EXTERNAL_NETWORK_ATTACHED;
61import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_EXTERNAL_NETWORK_DETACHED;
62import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_INTERNAL_NETWORKS_ATTACHED;
63import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_INTERNAL_NETWORKS_DETACHED;
Jian Li810f58c2021-02-27 01:10:50 +090064import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_REMOVED;
65import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_UPDATED;
66import static org.slf4j.LoggerFactory.getLogger;
67
68/**
69 * Implementation of kubevirt router store using consistent map.
70 */
71@Component(immediate = true, service = KubevirtRouterStore.class)
72public class DistributedKubevirtRouterStore
73 extends AbstractStore<KubevirtRouterEvent, KubevirtRouterStoreDelegate>
74 implements KubevirtRouterStore {
75
76 private final Logger log = getLogger(getClass());
77
78 private static final String ERR_NOT_FOUND = " does not exist";
79 private static final String ERR_DUPLICATE = " already exists";
Jian Lib636f702021-03-03 14:46:50 +090080 private static final String MSG_FLOATING_IP = "Kubevirt floating IP %s %s with %s";
81 private static final String MSG_ASSOCIATED = "associated";
82 private static final String MSG_DISASSOCIATED = "disassociated";
83
Jian Li810f58c2021-02-27 01:10:50 +090084 private static final String APP_ID = "org.onosproject.kubevirtnetwork";
85
86 private static final KryoNamespace
87 SERIALIZER_KUBEVIRT_ROUTER = KryoNamespace.newBuilder()
88 .register(KryoNamespaces.API)
89 .register(KubevirtRouter.class)
90 .register(DefaultKubevirtRouter.class)
91 .register(KubevirtPeerRouter.class)
Jian Li073f1ba2021-02-28 03:50:15 +090092 .register(KubevirtFloatingIp.class)
93 .register(DefaultKubevirtFloatingIp.class)
Jian Li810f58c2021-02-27 01:10:50 +090094 .register(Collection.class)
95 .build();
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY)
98 protected CoreService coreService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected StorageService storageService;
102
103 private final ExecutorService eventExecutor = newSingleThreadExecutor(
104 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
105
106 private final MapEventListener<String, KubevirtRouter> routerMapListener =
107 new KubevirtRouterMapListener();
Jian Li073f1ba2021-02-28 03:50:15 +0900108 private final MapEventListener<String, KubevirtFloatingIp> fipMapListener =
109 new KubevirtFloatingIpMapListener();
Jian Li810f58c2021-02-27 01:10:50 +0900110
111 private ConsistentMap<String, KubevirtRouter> routerStore;
Jian Li073f1ba2021-02-28 03:50:15 +0900112 private ConsistentMap<String, KubevirtFloatingIp> fipStore;
Jian Li810f58c2021-02-27 01:10:50 +0900113
114 @Activate
115 protected void activate() {
116 ApplicationId appId = coreService.registerApplication(APP_ID);
117 routerStore = storageService.<String, KubevirtRouter>consistentMapBuilder()
118 .withSerializer(Serializer.using(SERIALIZER_KUBEVIRT_ROUTER))
119 .withName("kubevirt-routerstore")
120 .withApplicationId(appId)
121 .build();
Jian Li073f1ba2021-02-28 03:50:15 +0900122 fipStore = storageService.<String, KubevirtFloatingIp>consistentMapBuilder()
123 .withSerializer(Serializer.using(SERIALIZER_KUBEVIRT_ROUTER))
124 .withName("kubevirt-fipstore")
125 .withApplicationId(appId)
126 .build();
Jian Li810f58c2021-02-27 01:10:50 +0900127 routerStore.addListener(routerMapListener);
Jian Li073f1ba2021-02-28 03:50:15 +0900128 fipStore.addListener(fipMapListener);
Jian Li810f58c2021-02-27 01:10:50 +0900129 log.info("Started");
130 }
131
132 @Deactivate
133 protected void deactivate() {
134 routerStore.removeListener(routerMapListener);
Jian Li073f1ba2021-02-28 03:50:15 +0900135 fipStore.removeListener(fipMapListener);
Jian Li810f58c2021-02-27 01:10:50 +0900136 eventExecutor.shutdown();
137 log.info("Stopped");
138 }
139
140 @Override
141 public void createRouter(KubevirtRouter router) {
142 routerStore.compute(router.name(), (name, existing) -> {
143 final String error = router.name() + ERR_DUPLICATE;
144 checkArgument(existing == null, error);
145 return router;
146 });
147 }
148
149 @Override
150 public void updateRouter(KubevirtRouter router) {
151 routerStore.compute(router.name(), (name, existing) -> {
152 final String error = router.name() + ERR_NOT_FOUND;
153 checkArgument(existing != null, error);
154 return router;
155 });
156 }
157
158 @Override
159 public KubevirtRouter removeRouter(String name) {
160 Versioned<KubevirtRouter> router = routerStore.remove(name);
161 if (router == null) {
162 final String error = name + ERR_NOT_FOUND;
163 throw new IllegalArgumentException(error);
164 }
165 return router.value();
166 }
167
168 @Override
169 public KubevirtRouter router(String name) {
170 return routerStore.asJavaMap().get(name);
171 }
172
173 @Override
174 public Set<KubevirtRouter> routers() {
175 return ImmutableSet.copyOf(routerStore.asJavaMap().values());
176 }
177
178 @Override
Jian Li073f1ba2021-02-28 03:50:15 +0900179 public void createFloatingIp(KubevirtFloatingIp floatingIp) {
180 fipStore.compute(floatingIp.id(), (id, existing) -> {
181 final String error = floatingIp.id() + ERR_DUPLICATE;
182 checkArgument(existing == null, error);
183 return floatingIp;
184 });
185 }
186
187 @Override
188 public void updateFloatingIp(KubevirtFloatingIp floatingIp) {
189 fipStore.compute(floatingIp.id(), (id, existing) -> {
190 final String error = floatingIp.id() + ERR_NOT_FOUND;
191 checkArgument(existing != null, error);
192 return floatingIp;
193 });
194 }
195
196 @Override
197 public KubevirtFloatingIp removeFloatingIp(String id) {
198 Versioned<KubevirtFloatingIp> floatingIp = fipStore.remove(id);
199 if (floatingIp == null) {
200 final String error = id + ERR_NOT_FOUND;
201 throw new IllegalArgumentException(error);
202 }
203 return floatingIp.value();
204 }
205
206 @Override
207 public KubevirtFloatingIp floatingIp(String id) {
208 return fipStore.asJavaMap().get(id);
209 }
210
211 @Override
212 public Set<KubevirtFloatingIp> floatingIps() {
213 return ImmutableSet.copyOf(fipStore.asJavaMap().values());
214 }
215
216
217 @Override
Jian Li810f58c2021-02-27 01:10:50 +0900218 public void clear() {
219 routerStore.clear();
Jian Li073f1ba2021-02-28 03:50:15 +0900220 fipStore.clear();
Jian Li810f58c2021-02-27 01:10:50 +0900221 }
222
223 private class KubevirtRouterMapListener implements MapEventListener<String, KubevirtRouter> {
224
225 @Override
226 public void event(MapEvent<String, KubevirtRouter> event) {
227 switch (event.type()) {
228 case INSERT:
229 log.debug("Kubevirt router created");
230 eventExecutor.execute(() ->
231 notifyDelegate(new KubevirtRouterEvent(
232 KUBEVIRT_ROUTER_CREATED, event.newValue().value())));
233 break;
234 case UPDATE:
Jian Lib636f702021-03-03 14:46:50 +0900235 eventExecutor.execute(() -> processRouterMapUpdate(event));
Jian Li810f58c2021-02-27 01:10:50 +0900236 break;
237 case REMOVE:
238 log.debug("Kubevirt router removed");
239 eventExecutor.execute(() ->
240 notifyDelegate(new KubevirtRouterEvent(
241 KUBEVIRT_ROUTER_REMOVED, event.oldValue().value())));
242 break;
243 default:
244 // do nothing
245 break;
246 }
247 }
Jian Lib636f702021-03-03 14:46:50 +0900248
249 private void processRouterMapUpdate(MapEvent<String, KubevirtRouter> event) {
250 log.debug("Kubevirt router updated");
251 eventExecutor.execute(() ->
252 notifyDelegate(new KubevirtRouterEvent(
253 KUBEVIRT_ROUTER_UPDATED, event.newValue().value())));
254
255 KubevirtRouter router = Strings.isNullOrEmpty(
256 event.newValue().value().name()) ?
257 null :
258 router(event.newValue().value().name());
259
260 KubevirtRouter oldValue = event.oldValue().value();
261 KubevirtRouter newValue = event.newValue().value();
262
263 if (oldValue.external().size() == 0 && newValue.external().size() > 0) {
264 newValue.external().entrySet().stream().findAny()
265 .ifPresent(entry ->
266 notifyDelegate(new KubevirtRouterEvent(
267 KUBEVIRT_ROUTER_EXTERNAL_NETWORK_ATTACHED,
268 router, entry.getKey(), entry.getValue(),
269 newValue.peerRouter().ipAddress().toString())));
270 }
271
272 if (oldValue.external().size() > 0 && newValue.external().size() == 0) {
273 oldValue.external().entrySet().stream().findAny()
274 .ifPresent(entry ->
275 notifyDelegate(new KubevirtRouterEvent(
276 KUBEVIRT_ROUTER_EXTERNAL_NETWORK_DETACHED,
277 router, entry.getKey(), entry.getValue(),
278 oldValue.peerRouter().ipAddress().toString())));
279 }
280
281 Set<String> added = new HashSet<>(newValue.internal());
282 Set<String> oldset = oldValue.internal();
283 added.removeAll(oldset);
284
285 Set<String> removed = new HashSet<>(oldValue.internal());
286 Set<String> newset = newValue.internal();
287 removed.removeAll(newset);
288
289 if (added.size() > 0) {
290 notifyDelegate(new KubevirtRouterEvent(
291 KUBEVIRT_ROUTER_INTERNAL_NETWORKS_ATTACHED,
292 router, added));
293 }
294
295 if (removed.size() > 0) {
296 notifyDelegate(new KubevirtRouterEvent(
297 KUBEVIRT_ROUTER_INTERNAL_NETWORKS_DETACHED,
298 router, removed));
299 }
300 }
Jian Li810f58c2021-02-27 01:10:50 +0900301 }
Jian Li073f1ba2021-02-28 03:50:15 +0900302
303 private class KubevirtFloatingIpMapListener implements MapEventListener<String, KubevirtFloatingIp> {
304
305 @Override
306 public void event(MapEvent<String, KubevirtFloatingIp> event) {
307 switch (event.type()) {
308 case INSERT:
309 eventExecutor.execute(() -> processFloatingIpMapInsertion(event));
310 break;
311 case UPDATE:
312 eventExecutor.execute(() -> processFloatingIpMapUpdate(event));
313 break;
314 case REMOVE:
315 eventExecutor.execute(() -> processFloatingIpMapRemoval(event));
316 break;
317 default:
318 break;
319 }
320 }
321
322 private void processFloatingIpMapInsertion(MapEvent<String, KubevirtFloatingIp> event) {
323 log.debug("Kubevirt floating IP created");
324 KubevirtRouter router = Strings.isNullOrEmpty(
325 event.newValue().value().routerName()) ?
326 null :
327 router(event.newValue().value().routerName());
328 notifyDelegate(new KubevirtRouterEvent(
329 KUBEVIRT_FLOATING_IP_CREATED,
330 router,
331 event.newValue().value()));
332 }
333
334 private void processFloatingIpMapUpdate(MapEvent<String, KubevirtFloatingIp> event) {
335 log.debug("Kubevirt floating IP updated");
336 KubevirtRouter router = Strings.isNullOrEmpty(
337 event.newValue().value().routerName()) ?
338 null :
339 router(event.newValue().value().routerName());
340 notifyDelegate(new KubevirtRouterEvent(
341 KUBEVIRT_FLOATING_IP_UPDATED,
342 router,
343 event.newValue().value()));
344 processFloatingIpUpdate(event, router);
345 }
346
347 private void processFloatingIpMapRemoval(MapEvent<String, KubevirtFloatingIp> event) {
348 log.debug("Kubevirt floating IP removed");
349 KubevirtRouter router = Strings.isNullOrEmpty(
350 event.oldValue().value().routerName()) ?
351 null :
352 router(event.oldValue().value().routerName());
353 notifyDelegate(new KubevirtRouterEvent(
354 KUBEVIRT_FLOATING_IP_REMOVED,
355 router,
356 event.oldValue().value()));
357 }
358
359 private void processFloatingIpUpdate(MapEvent<String, KubevirtFloatingIp> event,
360 KubevirtRouter router) {
361 String oldPodName = event.oldValue().value().podName();
362 String newPodName = event.newValue().value().podName();
363
364 if (Strings.isNullOrEmpty(oldPodName) && !Strings.isNullOrEmpty(newPodName)) {
365 notifyDelegate(new KubevirtRouterEvent(
366 KUBEVIRT_FLOATING_IP_ASSOCIATED,
367 router,
368 event.newValue().value(), newPodName));
Jian Lib636f702021-03-03 14:46:50 +0900369 log.info(String.format(MSG_FLOATING_IP,
370 event.newValue().value().floatingIp(), MSG_ASSOCIATED, newPodName));
Jian Li073f1ba2021-02-28 03:50:15 +0900371 }
372
373 if (!Strings.isNullOrEmpty(oldPodName) && Strings.isNullOrEmpty(newPodName)) {
374 notifyDelegate(new KubevirtRouterEvent(
375 KUBEVIRT_FLOATING_IP_DISASSOCIATED,
376 router,
377 event.newValue().value(), oldPodName));
Jian Lib636f702021-03-03 14:46:50 +0900378 log.info(String.format(MSG_FLOATING_IP,
379 event.newValue().value().floatingIp(), MSG_DISASSOCIATED, oldPodName));
Jian Li073f1ba2021-02-28 03:50:15 +0900380 }
381 }
382 }
Jian Li810f58c2021-02-27 01:10:50 +0900383}