blob: c346c5888968a1b2192b32f2b2420a8d0cf9124f [file] [log] [blame]
Jian Li7eb20782021-02-27 01:10:50 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ObjectNode;
21import io.fabric8.kubernetes.client.KubernetesClient;
22import io.fabric8.kubernetes.client.Watcher;
23import io.fabric8.kubernetes.client.WatcherException;
24import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.LeadershipService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.kubevirtnetworking.api.AbstractWatcher;
31import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
32import org.onosproject.kubevirtnetworking.api.KubevirtRouterAdminService;
Daniel Parkb9a22022021-03-04 18:58:47 +090033import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
Jian Li7eb20782021-02-27 01:10:50 +090034import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
35import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
36import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
37import org.onosproject.mastership.MastershipService;
38import org.osgi.service.component.annotations.Activate;
39import org.osgi.service.component.annotations.Component;
40import org.osgi.service.component.annotations.Deactivate;
41import org.osgi.service.component.annotations.Reference;
42import org.osgi.service.component.annotations.ReferenceCardinality;
43import org.slf4j.Logger;
44
45import java.io.IOException;
46import java.util.Objects;
47import java.util.concurrent.ExecutorService;
48
49import static java.util.concurrent.Executors.newSingleThreadExecutor;
50import static org.onlab.util.Tools.groupedThreads;
51import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
52import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
53import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.parseResourceName;
54import static org.slf4j.LoggerFactory.getLogger;
55
56/**
57 * Kubevirt virtual router watcher used for feeding kubevirt router information.
58 */
59@Component(immediate = true)
60public class KubevirtRouterWatcher extends AbstractWatcher {
61
62 private final Logger log = getLogger(getClass());
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY)
65 protected CoreService coreService;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY)
68 protected MastershipService mastershipService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY)
71 protected ClusterService clusterService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY)
74 protected LeadershipService leadershipService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY)
77 protected KubevirtRouterAdminService adminService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY)
80 protected KubevirtApiConfigService configService;
81
Daniel Parkb9a22022021-03-04 18:58:47 +090082 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 protected KubevirtRouterService routerService;
84
Jian Li7eb20782021-02-27 01:10:50 +090085 private final ExecutorService eventExecutor = newSingleThreadExecutor(
86 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
87
88 private final InternalVirtualRouterWatcher
89 watcher = new InternalVirtualRouterWatcher();
90 private final InternalKubevirtApiConfigListener
91 configListener = new InternalKubevirtApiConfigListener();
92
93 CustomResourceDefinitionContext routerCrdCxt = new CustomResourceDefinitionContext
94 .Builder()
95 .withGroup("kubevirt.io")
96 .withScope("Cluster")
97 .withVersion("v1")
98 .withPlural("virtualrouters")
99 .build();
100
101 private ApplicationId appId;
102 private NodeId localNodeId;
103
104 @Activate
105 protected void activate() {
106 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
107 localNodeId = clusterService.getLocalNode().id();
108 leadershipService.runForLeadership(appId.name());
109 configService.addListener(configListener);
110
111 log.info("Started");
112 }
113
114 @Deactivate
115 protected void deactivate() {
116 configService.removeListener(configListener);
117 leadershipService.withdraw(appId.name());
118 eventExecutor.shutdown();
119
120 log.info("Stopped");
121 }
122
123 private void instantiateWatcher() {
124 KubernetesClient client = k8sClient(configService);
125
126 if (client != null) {
127 try {
128 client.customResource(routerCrdCxt).watch(watcher);
129 } catch (IOException e) {
130 e.printStackTrace();
131 }
132 }
133 }
134
135 private KubevirtRouter parseKubevirtRouter(String resource) {
136 try {
137 ObjectMapper mapper = new ObjectMapper();
138 JsonNode json = mapper.readTree(resource);
139 ObjectNode spec = (ObjectNode) json.get("spec");
Daniel Parkb9a22022021-03-04 18:58:47 +0900140 KubevirtRouter router = codec(KubevirtRouter.class).decode(spec, this);
141 KubevirtRouter existing = routerService.router(router.name());
142
143 if (existing == null) {
144 return router;
145 } else {
146 return router.updatedElectedGateway(existing.electedGateway());
147 }
Jian Li7eb20782021-02-27 01:10:50 +0900148 } catch (IOException e) {
149 log.error("Failed to parse kubevirt router object");
150 }
151
152 return null;
153 }
154
155 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
156
157 private boolean isRelevantHelper() {
158 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
159 }
160
161 @Override
162 public void event(KubevirtApiConfigEvent event) {
163
164 switch (event.type()) {
165 case KUBEVIRT_API_CONFIG_UPDATED:
166 eventExecutor.execute(this::processConfigUpdate);
167 break;
168 case KUBEVIRT_API_CONFIG_CREATED:
169 case KUBEVIRT_API_CONFIG_REMOVED:
170 default:
171 // do nothing
172 break;
173 }
174 }
175
176 private void processConfigUpdate() {
177 if (!isRelevantHelper()) {
178 return;
179 }
180
181 instantiateWatcher();
182 }
183 }
184
185 private class InternalVirtualRouterWatcher implements Watcher<String> {
186
187 @Override
188 public void eventReceived(Action action, String resource) {
189 switch (action) {
190 case ADDED:
191 eventExecutor.execute(() -> processAddition(resource));
192 break;
193 case MODIFIED:
194 eventExecutor.execute(() -> processModification(resource));
195 break;
196 case DELETED:
197 eventExecutor.execute(() -> processDeletion(resource));
198 break;
199 case ERROR:
200 log.warn("Failures processing virtual router manipulation.");
201 break;
202 default:
203 // do nothing
204 break;
205 }
206 }
207
208 @Override
209 public void onClose(WatcherException e) {
210 // due to the bugs in fabric8, the watcher might be closed,
211 // we will re-instantiate the watcher in this case
212 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
213 log.warn("Virtual Router watcher OnClose, re-instantiate the watcher...");
214
215 instantiateWatcher();
216 }
217
218 private void processAddition(String resource) {
219 if (!isMaster()) {
220 return;
221 }
222
223 String name = parseResourceName(resource);
224
225 log.trace("Process Virtual Router {} creating event from API server.",
226 name);
227
228 KubevirtRouter router = parseKubevirtRouter(resource);
229 if (router != null) {
230 if (adminService.router(router.name()) == null) {
231 adminService.createRouter(router);
232 }
233 }
234 }
235
236 private void processModification(String resource) {
237 if (!isMaster()) {
238 return;
239 }
240
241 String name = parseResourceName(resource);
242
243 log.trace("Process Virtual Router {} updating event from API server.",
244 name);
245
246 KubevirtRouter router = parseKubevirtRouter(resource);
247 if (router != null) {
248 adminService.updateRouter(router);
249 }
250 }
251
252 private void processDeletion(String resource) {
253 if (!isMaster()) {
254 return;
255 }
256
257 String name = parseResourceName(resource);
258
259 log.trace("Process Virtual Router {} removal event from API server.",
260 name);
261
262 adminService.removeRouter(name);
263 }
264
265 private boolean isMaster() {
266 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
267 }
268 }
269}