blob: 89086bfcd27b2e4879066d6b46e2a67be0d4e16b [file] [log] [blame]
Jian Lib9eb11d2021-02-19 18:13:10 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
18import io.fabric8.kubernetes.api.model.Node;
19import io.fabric8.kubernetes.client.KubernetesClient;
20import io.fabric8.kubernetes.client.Watcher;
21import io.fabric8.kubernetes.client.WatcherException;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
27import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
28import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
29import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
30import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
31import org.onosproject.kubevirtnode.api.KubevirtNode;
32import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
33import org.onosproject.mastership.MastershipService;
34import org.osgi.service.component.annotations.Activate;
35import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.annotations.Deactivate;
37import org.osgi.service.component.annotations.Reference;
38import org.osgi.service.component.annotations.ReferenceCardinality;
39import org.slf4j.Logger;
40
41import java.util.Objects;
42import java.util.concurrent.ExecutorService;
43
44import static java.util.concurrent.Executors.newSingleThreadExecutor;
45import static org.onlab.util.Tools.groupedThreads;
46import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
47import static org.onosproject.kubevirtnode.api.KubevirtNodeService.APP_ID;
48import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
49import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.buildKubevirtNode;
50import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.k8sClient;
51import static org.slf4j.LoggerFactory.getLogger;
52
53/**
54 * Kubernetes node watcher used for feeding node information.
55 */
56@Component(immediate = true)
57public class KubevirtNodeWatcher {
58
59 private final Logger log = getLogger(getClass());
60
61 @Reference(cardinality = ReferenceCardinality.MANDATORY)
62 protected CoreService coreService;
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY)
65 protected MastershipService mastershipService;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY)
68 protected ClusterService clusterService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY)
71 protected LeadershipService leadershipService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY)
74 protected KubevirtNodeAdminService kubevirtNodeAdminService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY)
77 protected KubevirtApiConfigService kubevirtApiConfigService;
78
79 private final ExecutorService eventExecutor = newSingleThreadExecutor(
80 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
81 private final Watcher<Node> internalKubevirtNodeWatcher = new InternalKubevirtNodeWatcher();
82 private final InternalKubevirtApiConfigListener
83 internalKubevirtApiConfigListener = new InternalKubevirtApiConfigListener();
84
85 private ApplicationId appId;
86 private NodeId localNodeId;
87
88 @Activate
89 protected void activate() {
90 appId = coreService.registerApplication(APP_ID);
91 localNodeId = clusterService.getLocalNode().id();
92 leadershipService.runForLeadership(appId.name());
93 kubevirtApiConfigService.addListener(internalKubevirtApiConfigListener);
94
95 log.info("Started");
96 }
97
98 @Deactivate
99 protected void deactivate() {
100 kubevirtApiConfigService.removeListener(internalKubevirtApiConfigListener);
101 leadershipService.withdraw(appId.name());
102 eventExecutor.shutdown();
103
104 log.info("Stopped");
105 }
106
107 private void instantiateNodeWatcher() {
108 KubevirtApiConfig config = kubevirtApiConfigService.apiConfig();
109 if (config == null) {
110 return;
111 }
112 KubernetesClient client = k8sClient(config);
113
114 if (client != null) {
115 client.nodes().watch(internalKubevirtNodeWatcher);
116 }
117 }
118
119 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
120
121 private boolean isRelevantHelper() {
122 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
123 }
124
125 @Override
126 public void event(KubevirtApiConfigEvent event) {
127
128 switch (event.type()) {
129 case KUBEVIRT_API_CONFIG_UPDATED:
130 eventExecutor.execute(this::processConfigUpdating);
131 break;
132 case KUBEVIRT_API_CONFIG_CREATED:
133 case KUBEVIRT_API_CONFIG_REMOVED:
134 default:
135 // do nothing
136 break;
137 }
138 }
139
140 private void processConfigUpdating() {
141 if (!isRelevantHelper()) {
142 return;
143 }
144
145 instantiateNodeWatcher();
146 }
147 }
148
149 private class InternalKubevirtNodeWatcher implements Watcher<Node> {
150
151 @Override
152 public void eventReceived(Action action, Node node) {
153 switch (action) {
154 case ADDED:
155 eventExecutor.execute(() -> processAddition(node));
156 break;
157 case MODIFIED:
158 eventExecutor.execute(() -> processModification(node));
159 break;
160 case DELETED:
161 eventExecutor.execute(() -> processDeletion(node));
162 break;
163 case ERROR:
164 log.warn("Failures processing node manipulation.");
165 break;
166 default:
167 // do nothing
168 break;
169 }
170 }
171
172 @Override
173 public void onClose(WatcherException e) {
174 // due to the bugs in fabric8, node watcher might be closed,
175 // we will re-instantiate the node watcher in this case
176 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
177 log.warn("Node watcher OnClose, re-instantiate the node watcher...");
178 instantiateNodeWatcher();
179 }
180
181 private void processAddition(Node node) {
182 if (!isMaster()) {
183 return;
184 }
185
186 log.trace("Process node {} creating event from API server.",
187 node.getMetadata().getName());
188
189 KubevirtNode kubevirtNode = buildKubevirtNode(node);
190 if (kubevirtNode.type() == WORKER) {
191 if (!kubevirtNodeAdminService.hasNode(kubevirtNode.hostname())) {
192 kubevirtNodeAdminService.createNode(kubevirtNode);
193 }
194 }
195 }
196
197 private void processModification(Node node) {
198 if (!isMaster()) {
199 return;
200 }
201
202 log.trace("Process node {} updating event from API server.",
203 node.getMetadata().getName());
204
205 KubevirtNode existing = kubevirtNodeAdminService.node(node.getMetadata().getName());
206
207 if (existing != null) {
208 KubevirtNode kubevirtNode = buildKubevirtNode(node);
209
210 // we update the kubevirt node and re-run bootstrapping,
211 // only if the updated node has different phyInts and data IP
212 // this means we assume that the node's hostname, type and mgmt IP
213 // are immutable
214 if (!kubevirtNode.phyIntfs().equals(existing.phyIntfs()) ||
215 !kubevirtNode.dataIp().equals(existing.dataIp())) {
216 kubevirtNodeAdminService.updateNode(kubevirtNode.updateState(INIT));
217 }
218 }
219 }
220
221 private void processDeletion(Node node) {
222 if (!isMaster()) {
223 return;
224 }
225
226 log.trace("Process node {} removal event from API server.",
227 node.getMetadata().getName());
228
229 KubevirtNode existing = kubevirtNodeAdminService.node(node.getMetadata().getName());
230
231 if (existing != null) {
232 kubevirtNodeAdminService.removeNode(node.getMetadata().getName());
233 }
234 }
235
236 private boolean isMaster() {
237 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
238 }
239 }
240}