blob: 4ac793ddfca54dc709d058f28a1b53eaed6cf0a2 [file] [log] [blame]
Jian Li4b249702021-02-19 18:13:10 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
18import io.fabric8.kubernetes.api.model.Node;
19import io.fabric8.kubernetes.client.KubernetesClient;
20import io.fabric8.kubernetes.client.Watcher;
21import io.fabric8.kubernetes.client.WatcherException;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
27import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
28import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
29import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
30import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
31import org.onosproject.kubevirtnode.api.KubevirtNode;
32import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
33import org.onosproject.mastership.MastershipService;
34import org.osgi.service.component.annotations.Activate;
35import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.annotations.Deactivate;
37import org.osgi.service.component.annotations.Reference;
38import org.osgi.service.component.annotations.ReferenceCardinality;
39import org.slf4j.Logger;
40
41import java.util.Objects;
42import java.util.concurrent.ExecutorService;
43
44import static java.util.concurrent.Executors.newSingleThreadExecutor;
45import static org.onlab.util.Tools.groupedThreads;
Daniel Park515f5f32021-02-22 17:12:20 +090046import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.GATEWAY;
Jian Li4b249702021-02-19 18:13:10 +090047import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
48import static org.onosproject.kubevirtnode.api.KubevirtNodeService.APP_ID;
49import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
50import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.buildKubevirtNode;
51import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.k8sClient;
52import static org.slf4j.LoggerFactory.getLogger;
53
54/**
55 * Kubernetes node watcher used for feeding node information.
56 */
57@Component(immediate = true)
58public class KubevirtNodeWatcher {
59
60 private final Logger log = getLogger(getClass());
61
62 @Reference(cardinality = ReferenceCardinality.MANDATORY)
63 protected CoreService coreService;
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY)
66 protected MastershipService mastershipService;
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY)
69 protected ClusterService clusterService;
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY)
72 protected LeadershipService leadershipService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY)
75 protected KubevirtNodeAdminService kubevirtNodeAdminService;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY)
78 protected KubevirtApiConfigService kubevirtApiConfigService;
79
80 private final ExecutorService eventExecutor = newSingleThreadExecutor(
81 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
82 private final Watcher<Node> internalKubevirtNodeWatcher = new InternalKubevirtNodeWatcher();
83 private final InternalKubevirtApiConfigListener
84 internalKubevirtApiConfigListener = new InternalKubevirtApiConfigListener();
85
86 private ApplicationId appId;
87 private NodeId localNodeId;
88
89 @Activate
90 protected void activate() {
91 appId = coreService.registerApplication(APP_ID);
92 localNodeId = clusterService.getLocalNode().id();
93 leadershipService.runForLeadership(appId.name());
94 kubevirtApiConfigService.addListener(internalKubevirtApiConfigListener);
95
96 log.info("Started");
97 }
98
99 @Deactivate
100 protected void deactivate() {
101 kubevirtApiConfigService.removeListener(internalKubevirtApiConfigListener);
102 leadershipService.withdraw(appId.name());
103 eventExecutor.shutdown();
104
105 log.info("Stopped");
106 }
107
108 private void instantiateNodeWatcher() {
109 KubevirtApiConfig config = kubevirtApiConfigService.apiConfig();
110 if (config == null) {
111 return;
112 }
113 KubernetesClient client = k8sClient(config);
114
115 if (client != null) {
116 client.nodes().watch(internalKubevirtNodeWatcher);
117 }
118 }
119
120 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
121
122 private boolean isRelevantHelper() {
123 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
124 }
125
126 @Override
127 public void event(KubevirtApiConfigEvent event) {
128
129 switch (event.type()) {
130 case KUBEVIRT_API_CONFIG_UPDATED:
131 eventExecutor.execute(this::processConfigUpdating);
132 break;
133 case KUBEVIRT_API_CONFIG_CREATED:
134 case KUBEVIRT_API_CONFIG_REMOVED:
135 default:
136 // do nothing
137 break;
138 }
139 }
140
141 private void processConfigUpdating() {
142 if (!isRelevantHelper()) {
143 return;
144 }
145
146 instantiateNodeWatcher();
147 }
148 }
149
150 private class InternalKubevirtNodeWatcher implements Watcher<Node> {
151
152 @Override
153 public void eventReceived(Action action, Node node) {
154 switch (action) {
155 case ADDED:
156 eventExecutor.execute(() -> processAddition(node));
157 break;
158 case MODIFIED:
159 eventExecutor.execute(() -> processModification(node));
160 break;
161 case DELETED:
162 eventExecutor.execute(() -> processDeletion(node));
163 break;
164 case ERROR:
165 log.warn("Failures processing node manipulation.");
166 break;
167 default:
168 // do nothing
169 break;
170 }
171 }
172
173 @Override
174 public void onClose(WatcherException e) {
175 // due to the bugs in fabric8, node watcher might be closed,
176 // we will re-instantiate the node watcher in this case
177 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
178 log.warn("Node watcher OnClose, re-instantiate the node watcher...");
179 instantiateNodeWatcher();
180 }
181
182 private void processAddition(Node node) {
183 if (!isMaster()) {
184 return;
185 }
186
187 log.trace("Process node {} creating event from API server.",
188 node.getMetadata().getName());
189
190 KubevirtNode kubevirtNode = buildKubevirtNode(node);
Daniel Park515f5f32021-02-22 17:12:20 +0900191 if (kubevirtNode.type() == WORKER || kubevirtNode.type() == GATEWAY) {
Jian Li4b249702021-02-19 18:13:10 +0900192 if (!kubevirtNodeAdminService.hasNode(kubevirtNode.hostname())) {
193 kubevirtNodeAdminService.createNode(kubevirtNode);
194 }
195 }
196 }
197
198 private void processModification(Node node) {
199 if (!isMaster()) {
200 return;
201 }
202
203 log.trace("Process node {} updating event from API server.",
204 node.getMetadata().getName());
205
206 KubevirtNode existing = kubevirtNodeAdminService.node(node.getMetadata().getName());
207
208 if (existing != null) {
209 KubevirtNode kubevirtNode = buildKubevirtNode(node);
210
211 // we update the kubevirt node and re-run bootstrapping,
212 // only if the updated node has different phyInts and data IP
213 // this means we assume that the node's hostname, type and mgmt IP
214 // are immutable
215 if (!kubevirtNode.phyIntfs().equals(existing.phyIntfs()) ||
216 !kubevirtNode.dataIp().equals(existing.dataIp())) {
217 kubevirtNodeAdminService.updateNode(kubevirtNode.updateState(INIT));
218 }
219 }
220 }
221
222 private void processDeletion(Node node) {
223 if (!isMaster()) {
224 return;
225 }
226
227 log.trace("Process node {} removal event from API server.",
228 node.getMetadata().getName());
229
230 KubevirtNode existing = kubevirtNodeAdminService.node(node.getMetadata().getName());
231
232 if (existing != null) {
233 kubevirtNodeAdminService.removeNode(node.getMetadata().getName());
234 }
235 }
236
237 private boolean isMaster() {
238 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
239 }
240 }
241}