blob: 96662c41b4a8cf7387926c4158ecd1abe050f67f [file] [log] [blame]
Jian Li4b249702021-02-19 18:13:10 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
18import io.fabric8.kubernetes.api.model.Node;
19import io.fabric8.kubernetes.client.KubernetesClient;
20import io.fabric8.kubernetes.client.Watcher;
21import io.fabric8.kubernetes.client.WatcherException;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
27import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
28import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
29import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
30import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
31import org.onosproject.kubevirtnode.api.KubevirtNode;
32import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
33import org.onosproject.mastership.MastershipService;
34import org.osgi.service.component.annotations.Activate;
35import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.annotations.Deactivate;
37import org.osgi.service.component.annotations.Reference;
38import org.osgi.service.component.annotations.ReferenceCardinality;
39import org.slf4j.Logger;
40
41import java.util.Objects;
42import java.util.concurrent.ExecutorService;
43
44import static java.util.concurrent.Executors.newSingleThreadExecutor;
45import static org.onlab.util.Tools.groupedThreads;
Daniel Park515f5f32021-02-22 17:12:20 +090046import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.GATEWAY;
Jian Li517597a2021-03-22 11:04:52 +090047import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
Jian Li4b249702021-02-19 18:13:10 +090048import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
49import static org.onosproject.kubevirtnode.api.KubevirtNodeService.APP_ID;
50import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
51import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.buildKubevirtNode;
52import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.k8sClient;
53import static org.slf4j.LoggerFactory.getLogger;
54
55/**
56 * Kubernetes node watcher used for feeding node information.
57 */
58@Component(immediate = true)
59public class KubevirtNodeWatcher {
60
61 private final Logger log = getLogger(getClass());
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY)
64 protected CoreService coreService;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY)
67 protected MastershipService mastershipService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY)
70 protected ClusterService clusterService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY)
73 protected LeadershipService leadershipService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY)
76 protected KubevirtNodeAdminService kubevirtNodeAdminService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
79 protected KubevirtApiConfigService kubevirtApiConfigService;
80
81 private final ExecutorService eventExecutor = newSingleThreadExecutor(
82 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
83 private final Watcher<Node> internalKubevirtNodeWatcher = new InternalKubevirtNodeWatcher();
84 private final InternalKubevirtApiConfigListener
85 internalKubevirtApiConfigListener = new InternalKubevirtApiConfigListener();
86
87 private ApplicationId appId;
88 private NodeId localNodeId;
89
90 @Activate
91 protected void activate() {
92 appId = coreService.registerApplication(APP_ID);
93 localNodeId = clusterService.getLocalNode().id();
94 leadershipService.runForLeadership(appId.name());
95 kubevirtApiConfigService.addListener(internalKubevirtApiConfigListener);
96
97 log.info("Started");
98 }
99
100 @Deactivate
101 protected void deactivate() {
102 kubevirtApiConfigService.removeListener(internalKubevirtApiConfigListener);
103 leadershipService.withdraw(appId.name());
104 eventExecutor.shutdown();
105
106 log.info("Stopped");
107 }
108
109 private void instantiateNodeWatcher() {
110 KubevirtApiConfig config = kubevirtApiConfigService.apiConfig();
111 if (config == null) {
112 return;
113 }
114 KubernetesClient client = k8sClient(config);
115
116 if (client != null) {
117 client.nodes().watch(internalKubevirtNodeWatcher);
118 }
119 }
120
121 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
122
123 private boolean isRelevantHelper() {
124 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
125 }
126
127 @Override
128 public void event(KubevirtApiConfigEvent event) {
129
130 switch (event.type()) {
131 case KUBEVIRT_API_CONFIG_UPDATED:
132 eventExecutor.execute(this::processConfigUpdating);
133 break;
134 case KUBEVIRT_API_CONFIG_CREATED:
135 case KUBEVIRT_API_CONFIG_REMOVED:
136 default:
137 // do nothing
138 break;
139 }
140 }
141
142 private void processConfigUpdating() {
143 if (!isRelevantHelper()) {
144 return;
145 }
146
147 instantiateNodeWatcher();
148 }
149 }
150
151 private class InternalKubevirtNodeWatcher implements Watcher<Node> {
152
153 @Override
154 public void eventReceived(Action action, Node node) {
155 switch (action) {
156 case ADDED:
157 eventExecutor.execute(() -> processAddition(node));
158 break;
159 case MODIFIED:
160 eventExecutor.execute(() -> processModification(node));
161 break;
162 case DELETED:
163 eventExecutor.execute(() -> processDeletion(node));
164 break;
165 case ERROR:
166 log.warn("Failures processing node manipulation.");
167 break;
168 default:
169 // do nothing
170 break;
171 }
172 }
173
174 @Override
175 public void onClose(WatcherException e) {
176 // due to the bugs in fabric8, node watcher might be closed,
177 // we will re-instantiate the node watcher in this case
178 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
179 log.warn("Node watcher OnClose, re-instantiate the node watcher...");
180 instantiateNodeWatcher();
181 }
182
183 private void processAddition(Node node) {
184 if (!isMaster()) {
185 return;
186 }
187
188 log.trace("Process node {} creating event from API server.",
189 node.getMetadata().getName());
190
191 KubevirtNode kubevirtNode = buildKubevirtNode(node);
Daniel Park515f5f32021-02-22 17:12:20 +0900192 if (kubevirtNode.type() == WORKER || kubevirtNode.type() == GATEWAY) {
Jian Li4b249702021-02-19 18:13:10 +0900193 if (!kubevirtNodeAdminService.hasNode(kubevirtNode.hostname())) {
194 kubevirtNodeAdminService.createNode(kubevirtNode);
195 }
196 }
197 }
198
199 private void processModification(Node node) {
200 if (!isMaster()) {
201 return;
202 }
203
204 log.trace("Process node {} updating event from API server.",
205 node.getMetadata().getName());
206
Jian Li517597a2021-03-22 11:04:52 +0900207 KubevirtNode original = buildKubevirtNode(node);
Jian Li4b249702021-02-19 18:13:10 +0900208 KubevirtNode existing = kubevirtNodeAdminService.node(node.getMetadata().getName());
209
Jian Li517597a2021-03-22 11:04:52 +0900210 // if a master node is annotated as a gateway node, we simply add
211 // the node into the cluster
212 if (original.type() == GATEWAY && existing == null) {
213 kubevirtNodeAdminService.createNode(original);
214 }
Jian Li4b249702021-02-19 18:13:10 +0900215
Jian Li517597a2021-03-22 11:04:52 +0900216 // if a gateway annotation removed from the master node, we simply remove
217 // the node from the cluster
218 if (original.type() == MASTER && existing != null && existing.type() == GATEWAY) {
219 kubevirtNodeAdminService.removeNode(original.hostname());
220 }
221
222 if (existing != null) {
Jian Li4b249702021-02-19 18:13:10 +0900223 // we update the kubevirt node and re-run bootstrapping,
Jian Li517597a2021-03-22 11:04:52 +0900224 // if the updated node has different phyInts and data IP
Jian Li4b249702021-02-19 18:13:10 +0900225 // this means we assume that the node's hostname, type and mgmt IP
226 // are immutable
Jian Li517597a2021-03-22 11:04:52 +0900227 if (!original.phyIntfs().equals(existing.phyIntfs()) ||
228 !original.dataIp().equals(existing.dataIp())) {
229 kubevirtNodeAdminService.updateNode(original.updateState(INIT));
Jian Li4b249702021-02-19 18:13:10 +0900230 }
231 }
232 }
233
234 private void processDeletion(Node node) {
235 if (!isMaster()) {
236 return;
237 }
238
239 log.trace("Process node {} removal event from API server.",
240 node.getMetadata().getName());
241
242 KubevirtNode existing = kubevirtNodeAdminService.node(node.getMetadata().getName());
243
244 if (existing != null) {
245 kubevirtNodeAdminService.removeNode(node.getMetadata().getName());
246 }
247 }
248
249 private boolean isMaster() {
250 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
251 }
252 }
253}