blob: 99c8244267116251776292f0affb9b15d888c2ce [file] [log] [blame]
Jian Lib230e07c2020-12-21 11:25:12 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
Jian Lib230e07c2020-12-21 11:25:12 +090018import com.google.common.base.Strings;
19import com.google.common.collect.ImmutableSet;
20import org.onlab.packet.IpAddress;
21import org.onlab.util.Tools;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.core.ApplicationId;
25import org.onosproject.core.CoreService;
26import org.onosproject.event.ListenerRegistry;
27import org.onosproject.kubevirtnode.api.KubevirtNode;
28import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
29import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
30import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
31import org.onosproject.kubevirtnode.api.KubevirtNodeService;
32import org.onosproject.kubevirtnode.api.KubevirtNodeState;
33import org.onosproject.kubevirtnode.api.KubevirtNodeStore;
34import org.onosproject.kubevirtnode.api.KubevirtNodeStoreDelegate;
35import org.onosproject.net.DeviceId;
36import org.onosproject.net.device.DeviceService;
37import org.onosproject.ovsdb.controller.OvsdbController;
38import org.onosproject.store.service.AtomicCounter;
39import org.onosproject.store.service.StorageService;
40import org.osgi.service.component.ComponentContext;
41import org.osgi.service.component.annotations.Activate;
42import org.osgi.service.component.annotations.Component;
43import org.osgi.service.component.annotations.Deactivate;
44import org.osgi.service.component.annotations.Modified;
45import org.osgi.service.component.annotations.Reference;
46import org.osgi.service.component.annotations.ReferenceCardinality;
47import org.slf4j.Logger;
48
49import java.util.Dictionary;
50import java.util.Objects;
51import java.util.Optional;
52import java.util.Set;
53import java.util.concurrent.ExecutorService;
54import java.util.stream.Collectors;
55
56import static com.google.common.base.Preconditions.checkArgument;
57import static com.google.common.base.Preconditions.checkNotNull;
58import static java.util.concurrent.Executors.newSingleThreadExecutor;
59import static org.onlab.util.Tools.groupedThreads;
60import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT;
61import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
62import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.genDpid;
63import static org.slf4j.LoggerFactory.getLogger;
64
Jian Liaaf44b52020-12-27 23:22:46 +090065/**
66 * Service administering the inventory of kubevirt nodes.
67 */
Jian Lib230e07c2020-12-21 11:25:12 +090068@Component(
69 immediate = true,
70 service = {KubevirtNodeService.class, KubevirtNodeAdminService.class},
71 property = {
72 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT
73 }
74)
75public class KubevirtNodeManager
76 extends ListenerRegistry<KubevirtNodeEvent, KubevirtNodeListener>
77 implements KubevirtNodeService, KubevirtNodeAdminService {
78
79 private final Logger log = getLogger(getClass());
80
81 private static final String MSG_NODE = "KubeVirt node %s %s";
82 private static final String MSG_CREATED = "created";
83 private static final String MSG_UPDATED = "updated";
84 private static final String MSG_REMOVED = "removed";
85
86 private static final String DEVICE_ID_COUNTER_NAME = "device-id-counter";
87
88 private static final String ERR_NULL_NODE = "KubeVirt node cannot be null";
89 private static final String ERR_NULL_HOSTNAME = "KubeVirt node hostname cannot be null";
90 private static final String ERR_NULL_DEVICE_ID = "KubeVirt node device ID cannot be null";
91
92 private static final String NOT_DUPLICATED_MSG = "% cannot be duplicated";
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected KubevirtNodeStore nodeStore;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY)
98 protected CoreService coreService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected ClusterService clusterService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected LeadershipService leadershipService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
107 protected StorageService storageService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
110 protected OvsdbController ovsdbController;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
113 protected DeviceService deviceService;
114
115 /**
116 * OVSDB server listen port.
117 */
118 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
119
120 private final ExecutorService eventExecutor = newSingleThreadExecutor(
121 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
122
123 private final KubevirtNodeStoreDelegate delegate = new InternalNodeStoreDelegate();
124
125 private AtomicCounter deviceIdCounter;
126
127 private ApplicationId appId;
128
129 @Activate
130 protected void activate() {
131 appId = coreService.registerApplication(APP_ID);
132 nodeStore.setDelegate(delegate);
133
134 leadershipService.runForLeadership(appId.name());
135
136 deviceIdCounter = storageService.getAtomicCounter(DEVICE_ID_COUNTER_NAME);
137
138 log.info("Started");
139 }
140
141 @Deactivate
142 protected void deactivate() {
143 nodeStore.unsetDelegate(delegate);
144
145 leadershipService.withdraw(appId.name());
146 eventExecutor.shutdown();
147
148 log.info("Stopped");
149 }
150
151 @Modified
152 protected void modified(ComponentContext context) {
153 Dictionary<?, ?> properties = context.getProperties();
154 int updatedOvsdbPort = Tools.getIntegerProperty(properties, OVSDB_PORT);
155 if (!Objects.equals(updatedOvsdbPort, ovsdbPortNum)) {
156 ovsdbPortNum = updatedOvsdbPort;
157 }
158
159 log.info("Modified");
160 }
161
162 @Override
163 public void createNode(KubevirtNode node) {
164 checkNotNull(node, ERR_NULL_NODE);
165
166 KubevirtNode intNode;
Jian Li4fe40e52021-01-06 03:29:58 +0900167 KubevirtNode tunNode;
Jian Lib230e07c2020-12-21 11:25:12 +0900168
169 if (node.intgBridge() == null) {
170 String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
171 checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
172 intNode = node.updateIntgBridge(DeviceId.deviceId(deviceIdStr));
173 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
174 NOT_DUPLICATED_MSG, intNode.intgBridge());
175 } else {
176 intNode = node;
177 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
178 NOT_DUPLICATED_MSG, intNode.intgBridge());
179 }
180
Jian Li4fe40e52021-01-06 03:29:58 +0900181 if (node.tunBridge() == null) {
182 String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
183 checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
184 tunNode = intNode.updateTunBridge(DeviceId.deviceId(deviceIdStr));
185 checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
186 NOT_DUPLICATED_MSG, tunNode.tunBridge());
187 } else {
188 tunNode = intNode;
189 checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
190 NOT_DUPLICATED_MSG, tunNode.tunBridge());
191 }
Jian Lib230e07c2020-12-21 11:25:12 +0900192
Jian Li4fe40e52021-01-06 03:29:58 +0900193 nodeStore.createNode(tunNode);
194
195 log.info(String.format(MSG_NODE, tunNode.hostname(), MSG_CREATED));
Jian Lib230e07c2020-12-21 11:25:12 +0900196 }
197
198 @Override
199 public void updateNode(KubevirtNode node) {
200 checkNotNull(node, ERR_NULL_NODE);
201
202 KubevirtNode intNode;
Jian Li4fe40e52021-01-06 03:29:58 +0900203 KubevirtNode tunNode;
Jian Lib230e07c2020-12-21 11:25:12 +0900204
205 KubevirtNode existingNode = nodeStore.node(node.hostname());
206 checkNotNull(existingNode, ERR_NULL_NODE);
207
208 DeviceId existIntgBridge = nodeStore.node(node.hostname()).intgBridge();
209
210 if (node.intgBridge() == null) {
211 intNode = node.updateIntgBridge(existIntgBridge);
212 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
213 NOT_DUPLICATED_MSG, intNode.intgBridge());
214 } else {
215 intNode = node;
216 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
217 NOT_DUPLICATED_MSG, intNode.intgBridge());
218 }
219
Jian Li4fe40e52021-01-06 03:29:58 +0900220 DeviceId existTunBridge = nodeStore.node(node.hostname()).tunBridge();
221 if (intNode.tunBridge() == null) {
222 tunNode = intNode.updateTunBridge(existTunBridge);
223 checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
224 NOT_DUPLICATED_MSG, tunNode.tunBridge());
225 } else {
226 tunNode = intNode;
227 checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
228 NOT_DUPLICATED_MSG, tunNode.tunBridge());
229 }
230 nodeStore.updateNode(tunNode);
Jian Lib230e07c2020-12-21 11:25:12 +0900231
Jian Li4fe40e52021-01-06 03:29:58 +0900232 log.info(String.format(MSG_NODE, tunNode.hostname(), MSG_UPDATED));
Jian Lib230e07c2020-12-21 11:25:12 +0900233 }
234
235 @Override
236 public KubevirtNode removeNode(String hostname) {
237 checkArgument(!Strings.isNullOrEmpty(hostname), ERR_NULL_HOSTNAME);
238 KubevirtNode node = nodeStore.removeNode(hostname);
239 log.info(String.format(MSG_NODE, hostname, MSG_REMOVED));
240 return node;
241 }
242
243 @Override
244 public Set<KubevirtNode> nodes() {
245 return nodeStore.nodes();
246 }
247
248 @Override
249 public Set<KubevirtNode> nodes(KubevirtNode.Type type) {
250 Set<KubevirtNode> nodes = nodeStore.nodes().stream()
251 .filter(node -> Objects.equals(node.type(), type))
252 .collect(Collectors.toSet());
253 return ImmutableSet.copyOf(nodes);
254 }
255
256 @Override
257 public Set<KubevirtNode> completeNodes() {
258 Set<KubevirtNode> nodes = nodeStore.nodes().stream()
259 .filter(node -> node.state() == KubevirtNodeState.COMPLETE)
260 .collect(Collectors.toSet());
261 return ImmutableSet.copyOf(nodes);
262 }
263
264 @Override
265 public Set<KubevirtNode> completeNodes(KubevirtNode.Type type) {
266 Set<KubevirtNode> nodes = nodeStore.nodes().stream()
267 .filter(node -> node.type() == type &&
268 node.state() == KubevirtNodeState.COMPLETE)
269 .collect(Collectors.toSet());
270 return ImmutableSet.copyOf(nodes);
271 }
272
273 @Override
274 public KubevirtNode node(String hostname) {
275 return nodeStore.node(hostname);
276 }
277
278 @Override
279 public KubevirtNode node(DeviceId deviceId) {
280 return nodeStore.nodes().stream()
281 .filter(node -> Objects.equals(node.intgBridge(), deviceId) ||
282 Objects.equals(node.ovsdb(), deviceId))
283 .findFirst().orElse(null);
284 }
285
286 @Override
287 public KubevirtNode node(IpAddress mgmtIp) {
288 return nodeStore.nodes().stream()
289 .filter(node -> Objects.equals(node.managementIp(), mgmtIp))
290 .findFirst().orElse(null);
291 }
292
Jian Li4fe40e52021-01-06 03:29:58 +0900293 @Override
294 public KubevirtNode nodeByTunBridge(DeviceId deviceId) {
295 return nodeStore.nodes().stream()
296 .filter(node -> Objects.equals(node.tunBridge(), deviceId))
297 .findFirst().orElse(null);
298 }
299
Jian Lib230e07c2020-12-21 11:25:12 +0900300 private boolean hasIntgBridge(DeviceId deviceId, String hostname) {
301 Optional<KubevirtNode> existNode = nodeStore.nodes().stream()
302 .filter(n -> !n.hostname().equals(hostname))
303 .filter(n -> deviceId.equals(n.intgBridge()))
304 .findFirst();
305
306 return existNode.isPresent();
307 }
308
Jian Li4fe40e52021-01-06 03:29:58 +0900309 private boolean hasTunBridge(DeviceId deviceId, String hostname) {
310 Optional<KubevirtNode> existNode = nodeStore.nodes().stream()
311 .filter(n -> !n.hostname().equals(hostname))
312 .filter(n -> deviceId.equals(n.tunBridge()))
313 .findFirst();
314
315 return existNode.isPresent();
316 }
317
Jian Lib230e07c2020-12-21 11:25:12 +0900318 private class InternalNodeStoreDelegate implements KubevirtNodeStoreDelegate {
319
320 @Override
321 public void notify(KubevirtNodeEvent event) {
322 if (event != null) {
323 log.trace("send kubevirt node event {}", event);
324 process(event);
325 }
326 }
327 }
328}