blob: 501099fee46be7028dbcbbf03c7c107f890ba935 [file] [log] [blame]
Jian Li7eb20782021-02-27 01:10:50 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ObjectNode;
21import io.fabric8.kubernetes.client.KubernetesClient;
22import io.fabric8.kubernetes.client.Watcher;
23import io.fabric8.kubernetes.client.WatcherException;
24import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.LeadershipService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.kubevirtnetworking.api.AbstractWatcher;
31import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
32import org.onosproject.kubevirtnetworking.api.KubevirtRouterAdminService;
33import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
34import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
35import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
36import org.onosproject.mastership.MastershipService;
37import org.osgi.service.component.annotations.Activate;
38import org.osgi.service.component.annotations.Component;
39import org.osgi.service.component.annotations.Deactivate;
40import org.osgi.service.component.annotations.Reference;
41import org.osgi.service.component.annotations.ReferenceCardinality;
42import org.slf4j.Logger;
43
44import java.io.IOException;
45import java.util.Objects;
46import java.util.concurrent.ExecutorService;
47
48import static java.util.concurrent.Executors.newSingleThreadExecutor;
49import static org.onlab.util.Tools.groupedThreads;
50import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
51import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
52import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.parseResourceName;
53import static org.slf4j.LoggerFactory.getLogger;
54
55/**
56 * Kubevirt virtual router watcher used for feeding kubevirt router information.
57 */
58@Component(immediate = true)
59public class KubevirtRouterWatcher extends AbstractWatcher {
60
61 private final Logger log = getLogger(getClass());
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY)
64 protected CoreService coreService;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY)
67 protected MastershipService mastershipService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY)
70 protected ClusterService clusterService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY)
73 protected LeadershipService leadershipService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY)
76 protected KubevirtRouterAdminService adminService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
79 protected KubevirtApiConfigService configService;
80
81 private final ExecutorService eventExecutor = newSingleThreadExecutor(
82 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
83
84 private final InternalVirtualRouterWatcher
85 watcher = new InternalVirtualRouterWatcher();
86 private final InternalKubevirtApiConfigListener
87 configListener = new InternalKubevirtApiConfigListener();
88
89 CustomResourceDefinitionContext routerCrdCxt = new CustomResourceDefinitionContext
90 .Builder()
91 .withGroup("kubevirt.io")
92 .withScope("Cluster")
93 .withVersion("v1")
94 .withPlural("virtualrouters")
95 .build();
96
97 private ApplicationId appId;
98 private NodeId localNodeId;
99
100 @Activate
101 protected void activate() {
102 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
103 localNodeId = clusterService.getLocalNode().id();
104 leadershipService.runForLeadership(appId.name());
105 configService.addListener(configListener);
106
107 log.info("Started");
108 }
109
110 @Deactivate
111 protected void deactivate() {
112 configService.removeListener(configListener);
113 leadershipService.withdraw(appId.name());
114 eventExecutor.shutdown();
115
116 log.info("Stopped");
117 }
118
119 private void instantiateWatcher() {
120 KubernetesClient client = k8sClient(configService);
121
122 if (client != null) {
123 try {
124 client.customResource(routerCrdCxt).watch(watcher);
125 } catch (IOException e) {
126 e.printStackTrace();
127 }
128 }
129 }
130
131 private KubevirtRouter parseKubevirtRouter(String resource) {
132 try {
133 ObjectMapper mapper = new ObjectMapper();
134 JsonNode json = mapper.readTree(resource);
135 ObjectNode spec = (ObjectNode) json.get("spec");
136 return codec(KubevirtRouter.class).decode(spec, this);
137 } catch (IOException e) {
138 log.error("Failed to parse kubevirt router object");
139 }
140
141 return null;
142 }
143
144 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
145
146 private boolean isRelevantHelper() {
147 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
148 }
149
150 @Override
151 public void event(KubevirtApiConfigEvent event) {
152
153 switch (event.type()) {
154 case KUBEVIRT_API_CONFIG_UPDATED:
155 eventExecutor.execute(this::processConfigUpdate);
156 break;
157 case KUBEVIRT_API_CONFIG_CREATED:
158 case KUBEVIRT_API_CONFIG_REMOVED:
159 default:
160 // do nothing
161 break;
162 }
163 }
164
165 private void processConfigUpdate() {
166 if (!isRelevantHelper()) {
167 return;
168 }
169
170 instantiateWatcher();
171 }
172 }
173
174 private class InternalVirtualRouterWatcher implements Watcher<String> {
175
176 @Override
177 public void eventReceived(Action action, String resource) {
178 switch (action) {
179 case ADDED:
180 eventExecutor.execute(() -> processAddition(resource));
181 break;
182 case MODIFIED:
183 eventExecutor.execute(() -> processModification(resource));
184 break;
185 case DELETED:
186 eventExecutor.execute(() -> processDeletion(resource));
187 break;
188 case ERROR:
189 log.warn("Failures processing virtual router manipulation.");
190 break;
191 default:
192 // do nothing
193 break;
194 }
195 }
196
197 @Override
198 public void onClose(WatcherException e) {
199 // due to the bugs in fabric8, the watcher might be closed,
200 // we will re-instantiate the watcher in this case
201 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
202 log.warn("Virtual Router watcher OnClose, re-instantiate the watcher...");
203
204 instantiateWatcher();
205 }
206
207 private void processAddition(String resource) {
208 if (!isMaster()) {
209 return;
210 }
211
212 String name = parseResourceName(resource);
213
214 log.trace("Process Virtual Router {} creating event from API server.",
215 name);
216
217 KubevirtRouter router = parseKubevirtRouter(resource);
218 if (router != null) {
219 if (adminService.router(router.name()) == null) {
220 adminService.createRouter(router);
221 }
222 }
223 }
224
225 private void processModification(String resource) {
226 if (!isMaster()) {
227 return;
228 }
229
230 String name = parseResourceName(resource);
231
232 log.trace("Process Virtual Router {} updating event from API server.",
233 name);
234
235 KubevirtRouter router = parseKubevirtRouter(resource);
236 if (router != null) {
237 adminService.updateRouter(router);
238 }
239 }
240
241 private void processDeletion(String resource) {
242 if (!isMaster()) {
243 return;
244 }
245
246 String name = parseResourceName(resource);
247
248 log.trace("Process Virtual Router {} removal event from API server.",
249 name);
250
251 adminService.removeRouter(name);
252 }
253
254 private boolean isMaster() {
255 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
256 }
257 }
258}