blob: 58ca9f872f8fbe0b785998b7653643bafea227a0 [file] [log] [blame]
Jian Li810f58c2021-02-27 01:10:50 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ObjectNode;
21import io.fabric8.kubernetes.client.KubernetesClient;
22import io.fabric8.kubernetes.client.Watcher;
23import io.fabric8.kubernetes.client.WatcherException;
24import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.LeadershipService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.kubevirtnetworking.api.AbstractWatcher;
Daniel Park68f9c322021-06-29 16:37:56 +090031import org.onosproject.kubevirtnetworking.api.KubevirtPeerRouter;
Jian Li810f58c2021-02-27 01:10:50 +090032import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
33import org.onosproject.kubevirtnetworking.api.KubevirtRouterAdminService;
Daniel Park2884b232021-03-04 18:58:47 +090034import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
Jian Li810f58c2021-02-27 01:10:50 +090035import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
36import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
37import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
38import org.onosproject.mastership.MastershipService;
39import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Reference;
43import org.osgi.service.component.annotations.ReferenceCardinality;
44import org.slf4j.Logger;
45
46import java.io.IOException;
47import java.util.Objects;
48import java.util.concurrent.ExecutorService;
49
50import static java.util.concurrent.Executors.newSingleThreadExecutor;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
53import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
54import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.parseResourceName;
55import static org.slf4j.LoggerFactory.getLogger;
56
57/**
58 * Kubevirt virtual router watcher used for feeding kubevirt router information.
59 */
60@Component(immediate = true)
61public class KubevirtRouterWatcher extends AbstractWatcher {
62
63 private final Logger log = getLogger(getClass());
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY)
66 protected CoreService coreService;
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY)
69 protected MastershipService mastershipService;
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY)
72 protected ClusterService clusterService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY)
75 protected LeadershipService leadershipService;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY)
78 protected KubevirtRouterAdminService adminService;
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY)
81 protected KubevirtApiConfigService configService;
82
Daniel Park2884b232021-03-04 18:58:47 +090083 @Reference(cardinality = ReferenceCardinality.MANDATORY)
84 protected KubevirtRouterService routerService;
85
Jian Li810f58c2021-02-27 01:10:50 +090086 private final ExecutorService eventExecutor = newSingleThreadExecutor(
87 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
88
89 private final InternalVirtualRouterWatcher
90 watcher = new InternalVirtualRouterWatcher();
91 private final InternalKubevirtApiConfigListener
92 configListener = new InternalKubevirtApiConfigListener();
93
94 CustomResourceDefinitionContext routerCrdCxt = new CustomResourceDefinitionContext
95 .Builder()
96 .withGroup("kubevirt.io")
97 .withScope("Cluster")
98 .withVersion("v1")
99 .withPlural("virtualrouters")
100 .build();
101
102 private ApplicationId appId;
103 private NodeId localNodeId;
104
105 @Activate
106 protected void activate() {
107 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
108 localNodeId = clusterService.getLocalNode().id();
109 leadershipService.runForLeadership(appId.name());
110 configService.addListener(configListener);
111
112 log.info("Started");
113 }
114
115 @Deactivate
116 protected void deactivate() {
117 configService.removeListener(configListener);
118 leadershipService.withdraw(appId.name());
119 eventExecutor.shutdown();
120
121 log.info("Stopped");
122 }
123
124 private void instantiateWatcher() {
125 KubernetesClient client = k8sClient(configService);
126
127 if (client != null) {
128 try {
129 client.customResource(routerCrdCxt).watch(watcher);
130 } catch (IOException e) {
131 e.printStackTrace();
132 }
133 }
134 }
135
136 private KubevirtRouter parseKubevirtRouter(String resource) {
137 try {
138 ObjectMapper mapper = new ObjectMapper();
139 JsonNode json = mapper.readTree(resource);
140 ObjectNode spec = (ObjectNode) json.get("spec");
Daniel Park2884b232021-03-04 18:58:47 +0900141 KubevirtRouter router = codec(KubevirtRouter.class).decode(spec, this);
142 KubevirtRouter existing = routerService.router(router.name());
143
144 if (existing == null) {
145 return router;
146 } else {
147 return router.updatedElectedGateway(existing.electedGateway());
148 }
Jian Li810f58c2021-02-27 01:10:50 +0900149 } catch (IOException e) {
150 log.error("Failed to parse kubevirt router object");
151 }
152
153 return null;
154 }
155
156 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
157
158 private boolean isRelevantHelper() {
159 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
160 }
161
162 @Override
163 public void event(KubevirtApiConfigEvent event) {
164
165 switch (event.type()) {
166 case KUBEVIRT_API_CONFIG_UPDATED:
167 eventExecutor.execute(this::processConfigUpdate);
168 break;
169 case KUBEVIRT_API_CONFIG_CREATED:
170 case KUBEVIRT_API_CONFIG_REMOVED:
171 default:
172 // do nothing
173 break;
174 }
175 }
176
177 private void processConfigUpdate() {
178 if (!isRelevantHelper()) {
179 return;
180 }
181
182 instantiateWatcher();
183 }
184 }
185
186 private class InternalVirtualRouterWatcher implements Watcher<String> {
187
188 @Override
189 public void eventReceived(Action action, String resource) {
190 switch (action) {
191 case ADDED:
192 eventExecutor.execute(() -> processAddition(resource));
193 break;
194 case MODIFIED:
195 eventExecutor.execute(() -> processModification(resource));
196 break;
197 case DELETED:
198 eventExecutor.execute(() -> processDeletion(resource));
199 break;
200 case ERROR:
201 log.warn("Failures processing virtual router manipulation.");
202 break;
203 default:
204 // do nothing
205 break;
206 }
207 }
208
209 @Override
210 public void onClose(WatcherException e) {
211 // due to the bugs in fabric8, the watcher might be closed,
212 // we will re-instantiate the watcher in this case
213 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
214 log.warn("Virtual Router watcher OnClose, re-instantiate the watcher...");
215
216 instantiateWatcher();
217 }
218
219 private void processAddition(String resource) {
220 if (!isMaster()) {
221 return;
222 }
223
224 String name = parseResourceName(resource);
225
226 log.trace("Process Virtual Router {} creating event from API server.",
227 name);
228
229 KubevirtRouter router = parseKubevirtRouter(resource);
230 if (router != null) {
231 if (adminService.router(router.name()) == null) {
232 adminService.createRouter(router);
233 }
234 }
235 }
236
237 private void processModification(String resource) {
238 if (!isMaster()) {
239 return;
240 }
241
242 String name = parseResourceName(resource);
243
244 log.trace("Process Virtual Router {} updating event from API server.",
245 name);
246
247 KubevirtRouter router = parseKubevirtRouter(resource);
Daniel Park68f9c322021-06-29 16:37:56 +0900248
249 KubevirtPeerRouter oldPeerRouter = adminService.router(router.name()).peerRouter();
250 if (oldPeerRouter != null
251 && Objects.equals(oldPeerRouter.ipAddress(), router.peerRouter().ipAddress())
252 && oldPeerRouter.macAddress() != null
253 && router.peerRouter().macAddress() == null) {
254
255 router = router.updatePeerRouter(oldPeerRouter);
256 }
257
Jian Li810f58c2021-02-27 01:10:50 +0900258 if (router != null) {
259 adminService.updateRouter(router);
260 }
261 }
262
263 private void processDeletion(String resource) {
264 if (!isMaster()) {
265 return;
266 }
267
268 String name = parseResourceName(resource);
269
270 log.trace("Process Virtual Router {} removal event from API server.",
271 name);
272
273 adminService.removeRouter(name);
274 }
275
276 private boolean isMaster() {
277 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
278 }
279 }
280}