blob: c77efa3b8952280bf5c81c57fbf83d8cf34badb6 [file] [log] [blame]
Jian Li7c4a8822020-12-21 11:25:12 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
Jian Li7c4a8822020-12-21 11:25:12 +090018import com.google.common.base.Strings;
19import com.google.common.collect.ImmutableSet;
20import org.onlab.packet.IpAddress;
21import org.onlab.util.Tools;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.core.ApplicationId;
25import org.onosproject.core.CoreService;
26import org.onosproject.event.ListenerRegistry;
27import org.onosproject.kubevirtnode.api.KubevirtNode;
28import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
29import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
30import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
31import org.onosproject.kubevirtnode.api.KubevirtNodeService;
32import org.onosproject.kubevirtnode.api.KubevirtNodeState;
33import org.onosproject.kubevirtnode.api.KubevirtNodeStore;
34import org.onosproject.kubevirtnode.api.KubevirtNodeStoreDelegate;
35import org.onosproject.net.DeviceId;
36import org.onosproject.net.device.DeviceService;
37import org.onosproject.ovsdb.controller.OvsdbController;
38import org.onosproject.store.service.AtomicCounter;
39import org.onosproject.store.service.StorageService;
40import org.osgi.service.component.ComponentContext;
41import org.osgi.service.component.annotations.Activate;
42import org.osgi.service.component.annotations.Component;
43import org.osgi.service.component.annotations.Deactivate;
44import org.osgi.service.component.annotations.Modified;
45import org.osgi.service.component.annotations.Reference;
46import org.osgi.service.component.annotations.ReferenceCardinality;
47import org.slf4j.Logger;
48
49import java.util.Dictionary;
50import java.util.Objects;
51import java.util.Optional;
52import java.util.Set;
53import java.util.concurrent.ExecutorService;
54import java.util.stream.Collectors;
55
56import static com.google.common.base.Preconditions.checkArgument;
57import static com.google.common.base.Preconditions.checkNotNull;
58import static java.util.concurrent.Executors.newSingleThreadExecutor;
59import static org.onlab.util.Tools.groupedThreads;
60import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT;
61import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
62import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.genDpid;
63import static org.slf4j.LoggerFactory.getLogger;
64
Jian Li304dca42020-12-27 23:22:46 +090065/**
66 * Service administering the inventory of kubevirt nodes.
67 */
Jian Li7c4a8822020-12-21 11:25:12 +090068@Component(
69 immediate = true,
70 service = {KubevirtNodeService.class, KubevirtNodeAdminService.class},
71 property = {
72 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT
73 }
74)
75public class KubevirtNodeManager
76 extends ListenerRegistry<KubevirtNodeEvent, KubevirtNodeListener>
77 implements KubevirtNodeService, KubevirtNodeAdminService {
78
79 private final Logger log = getLogger(getClass());
80
81 private static final String MSG_NODE = "KubeVirt node %s %s";
82 private static final String MSG_CREATED = "created";
83 private static final String MSG_UPDATED = "updated";
84 private static final String MSG_REMOVED = "removed";
85
86 private static final String DEVICE_ID_COUNTER_NAME = "device-id-counter";
87
88 private static final String ERR_NULL_NODE = "KubeVirt node cannot be null";
89 private static final String ERR_NULL_HOSTNAME = "KubeVirt node hostname cannot be null";
90 private static final String ERR_NULL_DEVICE_ID = "KubeVirt node device ID cannot be null";
91
92 private static final String NOT_DUPLICATED_MSG = "% cannot be duplicated";
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected KubevirtNodeStore nodeStore;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY)
98 protected CoreService coreService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected ClusterService clusterService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected LeadershipService leadershipService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
107 protected StorageService storageService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
110 protected OvsdbController ovsdbController;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
113 protected DeviceService deviceService;
114
115 /**
116 * OVSDB server listen port.
117 */
118 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
119
120 private final ExecutorService eventExecutor = newSingleThreadExecutor(
121 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
122
123 private final KubevirtNodeStoreDelegate delegate = new InternalNodeStoreDelegate();
124
125 private AtomicCounter deviceIdCounter;
126
127 private ApplicationId appId;
128
129 @Activate
130 protected void activate() {
131 appId = coreService.registerApplication(APP_ID);
132 nodeStore.setDelegate(delegate);
133
134 leadershipService.runForLeadership(appId.name());
135
136 deviceIdCounter = storageService.getAtomicCounter(DEVICE_ID_COUNTER_NAME);
137
138 log.info("Started");
139 }
140
141 @Deactivate
142 protected void deactivate() {
143 nodeStore.unsetDelegate(delegate);
144
145 leadershipService.withdraw(appId.name());
146 eventExecutor.shutdown();
147
148 log.info("Stopped");
149 }
150
151 @Modified
152 protected void modified(ComponentContext context) {
153 Dictionary<?, ?> properties = context.getProperties();
154 int updatedOvsdbPort = Tools.getIntegerProperty(properties, OVSDB_PORT);
155 if (!Objects.equals(updatedOvsdbPort, ovsdbPortNum)) {
156 ovsdbPortNum = updatedOvsdbPort;
157 }
158
159 log.info("Modified");
160 }
161
162 @Override
163 public void createNode(KubevirtNode node) {
164 checkNotNull(node, ERR_NULL_NODE);
165
166 KubevirtNode intNode;
167
168 if (node.intgBridge() == null) {
169 String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
170 checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
171 intNode = node.updateIntgBridge(DeviceId.deviceId(deviceIdStr));
172 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
173 NOT_DUPLICATED_MSG, intNode.intgBridge());
174 } else {
175 intNode = node;
176 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
177 NOT_DUPLICATED_MSG, intNode.intgBridge());
178 }
179
180 nodeStore.createNode(intNode);
181
182 log.info(String.format(MSG_NODE, intNode.hostname(), MSG_CREATED));
183 }
184
185 @Override
186 public void updateNode(KubevirtNode node) {
187 checkNotNull(node, ERR_NULL_NODE);
188
189 KubevirtNode intNode;
190
191 KubevirtNode existingNode = nodeStore.node(node.hostname());
192 checkNotNull(existingNode, ERR_NULL_NODE);
193
194 DeviceId existIntgBridge = nodeStore.node(node.hostname()).intgBridge();
195
196 if (node.intgBridge() == null) {
197 intNode = node.updateIntgBridge(existIntgBridge);
198 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
199 NOT_DUPLICATED_MSG, intNode.intgBridge());
200 } else {
201 intNode = node;
202 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
203 NOT_DUPLICATED_MSG, intNode.intgBridge());
204 }
205
206 nodeStore.updateNode(intNode);
207
208 log.info(String.format(MSG_NODE, intNode.hostname(), MSG_UPDATED));
209 }
210
211 @Override
212 public KubevirtNode removeNode(String hostname) {
213 checkArgument(!Strings.isNullOrEmpty(hostname), ERR_NULL_HOSTNAME);
214 KubevirtNode node = nodeStore.removeNode(hostname);
215 log.info(String.format(MSG_NODE, hostname, MSG_REMOVED));
216 return node;
217 }
218
219 @Override
220 public Set<KubevirtNode> nodes() {
221 return nodeStore.nodes();
222 }
223
224 @Override
225 public Set<KubevirtNode> nodes(KubevirtNode.Type type) {
226 Set<KubevirtNode> nodes = nodeStore.nodes().stream()
227 .filter(node -> Objects.equals(node.type(), type))
228 .collect(Collectors.toSet());
229 return ImmutableSet.copyOf(nodes);
230 }
231
232 @Override
233 public Set<KubevirtNode> completeNodes() {
234 Set<KubevirtNode> nodes = nodeStore.nodes().stream()
235 .filter(node -> node.state() == KubevirtNodeState.COMPLETE)
236 .collect(Collectors.toSet());
237 return ImmutableSet.copyOf(nodes);
238 }
239
240 @Override
241 public Set<KubevirtNode> completeNodes(KubevirtNode.Type type) {
242 Set<KubevirtNode> nodes = nodeStore.nodes().stream()
243 .filter(node -> node.type() == type &&
244 node.state() == KubevirtNodeState.COMPLETE)
245 .collect(Collectors.toSet());
246 return ImmutableSet.copyOf(nodes);
247 }
248
249 @Override
250 public KubevirtNode node(String hostname) {
251 return nodeStore.node(hostname);
252 }
253
254 @Override
255 public KubevirtNode node(DeviceId deviceId) {
256 return nodeStore.nodes().stream()
257 .filter(node -> Objects.equals(node.intgBridge(), deviceId) ||
258 Objects.equals(node.ovsdb(), deviceId))
259 .findFirst().orElse(null);
260 }
261
262 @Override
263 public KubevirtNode node(IpAddress mgmtIp) {
264 return nodeStore.nodes().stream()
265 .filter(node -> Objects.equals(node.managementIp(), mgmtIp))
266 .findFirst().orElse(null);
267 }
268
269 private boolean hasIntgBridge(DeviceId deviceId, String hostname) {
270 Optional<KubevirtNode> existNode = nodeStore.nodes().stream()
271 .filter(n -> !n.hostname().equals(hostname))
272 .filter(n -> deviceId.equals(n.intgBridge()))
273 .findFirst();
274
275 return existNode.isPresent();
276 }
277
278 private class InternalNodeStoreDelegate implements KubevirtNodeStoreDelegate {
279
280 @Override
281 public void notify(KubevirtNodeEvent event) {
282 if (event != null) {
283 log.trace("send kubevirt node event {}", event);
284 process(event);
285 }
286 }
287 }
288}