blob: 3ef16f4b97c931345883dad0e7a718609f5c3a80 [file] [log] [blame]
Jian Li034820d2021-01-15 16:58:48 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import io.fabric8.kubernetes.client.KubernetesClient;
19import io.fabric8.kubernetes.client.Watcher;
20import io.fabric8.kubernetes.client.WatcherException;
21import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
Jian Li034820d2021-01-15 16:58:48 +090022import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
Jian Li034820d2021-01-15 16:58:48 +090027import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
Jian Li034820d2021-01-15 16:58:48 +090028import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
29import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
30import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
31import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
32import org.onosproject.mastership.MastershipService;
33import org.osgi.service.component.annotations.Activate;
34import org.osgi.service.component.annotations.Component;
35import org.osgi.service.component.annotations.Deactivate;
36import org.osgi.service.component.annotations.Reference;
37import org.osgi.service.component.annotations.ReferenceCardinality;
38import org.slf4j.Logger;
39
40import java.io.IOException;
41import java.util.Objects;
42import java.util.concurrent.ExecutorService;
43
44import static java.util.concurrent.Executors.newSingleThreadExecutor;
45import static org.onlab.util.Tools.groupedThreads;
46import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
Jian Li034820d2021-01-15 16:58:48 +090047import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
Jian Li27996912022-10-18 22:43:15 +090048import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.parseKubevirtNetwork;
Jian Li810f58c2021-02-27 01:10:50 +090049import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.parseResourceName;
Jian Li034820d2021-01-15 16:58:48 +090050import static org.slf4j.LoggerFactory.getLogger;
51
52/**
53 * Kubernetes network-attachment-definition watcher used for feeding kubevirt network information.
54 */
55@Component(immediate = true)
56public class NetworkAttachmentDefinitionWatcher {
57
58 private final Logger log = getLogger(getClass());
59
Jian Li034820d2021-01-15 16:58:48 +090060 @Reference(cardinality = ReferenceCardinality.MANDATORY)
61 protected CoreService coreService;
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY)
64 protected MastershipService mastershipService;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY)
67 protected ClusterService clusterService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY)
70 protected LeadershipService leadershipService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY)
73 protected KubevirtNetworkAdminService adminService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY)
76 protected KubevirtApiConfigService configService;
77
78 private final ExecutorService eventExecutor = newSingleThreadExecutor(
79 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
80
81 private final InternalNetworkAttachmentDefinitionWatcher
82 watcher = new InternalNetworkAttachmentDefinitionWatcher();
83 private final InternalKubevirtApiConfigListener
84 configListener = new InternalKubevirtApiConfigListener();
85
86 CustomResourceDefinitionContext nadCrdCxt = new CustomResourceDefinitionContext
87 .Builder()
88 .withGroup("k8s.cni.cncf.io")
89 .withScope("Namespaced")
90 .withVersion("v1")
91 .withPlural("network-attachment-definitions")
92 .build();
93
94 private ApplicationId appId;
95 private NodeId localNodeId;
96
97 @Activate
98 protected void activate() {
99 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
100 localNodeId = clusterService.getLocalNode().id();
101 leadershipService.runForLeadership(appId.name());
102 configService.addListener(configListener);
103
104 log.info("Started");
105 }
106
107 @Deactivate
108 protected void deactivate() {
109 configService.removeListener(configListener);
110 leadershipService.withdraw(appId.name());
111 eventExecutor.shutdown();
112
113 log.info("Stopped");
114 }
115
Jian Lid2ade0f2021-02-19 14:00:07 +0900116 private void instantiateWatcher() {
117 KubernetesClient client = k8sClient(configService);
118
119 if (client != null) {
120 try {
121 client.customResource(nadCrdCxt).watch(watcher);
122 } catch (IOException e) {
123 e.printStackTrace();
124 }
125 }
126 }
127
Jian Li034820d2021-01-15 16:58:48 +0900128 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
129
130 private boolean isRelevantHelper() {
131 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
132 }
133
134 @Override
135 public void event(KubevirtApiConfigEvent event) {
136
137 switch (event.type()) {
138 case KUBEVIRT_API_CONFIG_UPDATED:
139 eventExecutor.execute(this::processConfigUpdate);
140 break;
141 case KUBEVIRT_API_CONFIG_CREATED:
142 case KUBEVIRT_API_CONFIG_REMOVED:
143 default:
144 // do nothing
145 break;
146 }
147 }
148
149 private void processConfigUpdate() {
150 if (!isRelevantHelper()) {
151 return;
152 }
153
Jian Lid2ade0f2021-02-19 14:00:07 +0900154 instantiateWatcher();
Jian Li034820d2021-01-15 16:58:48 +0900155 }
156 }
157
158 private class InternalNetworkAttachmentDefinitionWatcher
159 implements Watcher<String> {
160
161 @Override
162 public void eventReceived(Action action, String resource) {
163 switch (action) {
164 case ADDED:
165 eventExecutor.execute(() -> processAddition(resource));
166 break;
167 case MODIFIED:
168 eventExecutor.execute(() -> processModification(resource));
169 break;
170 case DELETED:
171 eventExecutor.execute(() -> processDeletion(resource));
172 break;
173 case ERROR:
174 log.warn("Failures processing network-attachment-definition manipulation.");
175 break;
176 default:
177 break;
178 }
179 }
180
181 @Override
182 public void onClose(WatcherException e) {
Jian Lid2ade0f2021-02-19 14:00:07 +0900183 // due to the bugs in fabric8, the watcher might be closed,
184 // we will re-instantiate the watcher in this case
185 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
186 log.warn("Network-attachment-definition watcher OnClose, re-instantiate the watcher...");
187
188 instantiateWatcher();
Jian Li034820d2021-01-15 16:58:48 +0900189 }
190
191 private void processAddition(String resource) {
192 if (!isMaster()) {
193 return;
194 }
195
Jian Li810f58c2021-02-27 01:10:50 +0900196 String name = parseResourceName(resource);
Jian Li034820d2021-01-15 16:58:48 +0900197
198 log.trace("Process NetworkAttachmentDefinition {} creating event from API server.",
199 name);
200
201 KubevirtNetwork network = parseKubevirtNetwork(resource);
202 if (network != null) {
Jian Liea585882021-01-19 23:42:55 +0900203 if (adminService.network(network.networkId()) == null) {
204 adminService.createNetwork(network);
205 }
Jian Li034820d2021-01-15 16:58:48 +0900206 }
207 }
208
209 private void processModification(String resource) {
210 if (!isMaster()) {
211 return;
212 }
213
Jian Li810f58c2021-02-27 01:10:50 +0900214 String name = parseResourceName(resource);
Jian Li034820d2021-01-15 16:58:48 +0900215
216 log.trace("Process NetworkAttachmentDefinition {} updating event from API server.",
217 name);
218
219 KubevirtNetwork network = parseKubevirtNetwork(resource);
220 if (network != null) {
221 adminService.updateNetwork(network);
222 }
223 }
224
225 private void processDeletion(String resource) {
226 if (!isMaster()) {
227 return;
228 }
229
Jian Li810f58c2021-02-27 01:10:50 +0900230 String name = parseResourceName(resource);
Jian Li034820d2021-01-15 16:58:48 +0900231
232 log.trace("Process NetworkAttachmentDefinition {} removal event from API server.",
233 name);
234
Jian Li362bd8b2021-06-11 14:47:57 +0900235 if (adminService.network(name) != null) {
236 adminService.removeNetwork(name);
237 }
Jian Li034820d2021-01-15 16:58:48 +0900238 }
239
240 private boolean isMaster() {
241 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
242 }
Jian Li034820d2021-01-15 16:58:48 +0900243 }
244}