blob: 6578def9f8bcbe011b18599e375751d6325f25ee [file] [log] [blame]
Jian Lib230e07c2020-12-21 11:25:12 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
18/**
19 * Service administering the inventory of kubevirt nodes.
20 */
21
22import com.google.common.base.Strings;
23import com.google.common.collect.ImmutableSet;
24import org.onlab.packet.IpAddress;
25import org.onlab.util.Tools;
26import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.LeadershipService;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.event.ListenerRegistry;
31import org.onosproject.kubevirtnode.api.KubevirtNode;
32import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
33import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
34import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
35import org.onosproject.kubevirtnode.api.KubevirtNodeService;
36import org.onosproject.kubevirtnode.api.KubevirtNodeState;
37import org.onosproject.kubevirtnode.api.KubevirtNodeStore;
38import org.onosproject.kubevirtnode.api.KubevirtNodeStoreDelegate;
39import org.onosproject.net.DeviceId;
40import org.onosproject.net.device.DeviceService;
41import org.onosproject.ovsdb.controller.OvsdbController;
42import org.onosproject.store.service.AtomicCounter;
43import org.onosproject.store.service.StorageService;
44import org.osgi.service.component.ComponentContext;
45import org.osgi.service.component.annotations.Activate;
46import org.osgi.service.component.annotations.Component;
47import org.osgi.service.component.annotations.Deactivate;
48import org.osgi.service.component.annotations.Modified;
49import org.osgi.service.component.annotations.Reference;
50import org.osgi.service.component.annotations.ReferenceCardinality;
51import org.slf4j.Logger;
52
53import java.util.Dictionary;
54import java.util.Objects;
55import java.util.Optional;
56import java.util.Set;
57import java.util.concurrent.ExecutorService;
58import java.util.stream.Collectors;
59
60import static com.google.common.base.Preconditions.checkArgument;
61import static com.google.common.base.Preconditions.checkNotNull;
62import static java.util.concurrent.Executors.newSingleThreadExecutor;
63import static org.onlab.util.Tools.groupedThreads;
64import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT;
65import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
66import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.genDpid;
67import static org.slf4j.LoggerFactory.getLogger;
68
69@Component(
70 immediate = true,
71 service = {KubevirtNodeService.class, KubevirtNodeAdminService.class},
72 property = {
73 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT
74 }
75)
76public class KubevirtNodeManager
77 extends ListenerRegistry<KubevirtNodeEvent, KubevirtNodeListener>
78 implements KubevirtNodeService, KubevirtNodeAdminService {
79
80 private final Logger log = getLogger(getClass());
81
82 private static final String MSG_NODE = "KubeVirt node %s %s";
83 private static final String MSG_CREATED = "created";
84 private static final String MSG_UPDATED = "updated";
85 private static final String MSG_REMOVED = "removed";
86
87 private static final String DEVICE_ID_COUNTER_NAME = "device-id-counter";
88
89 private static final String ERR_NULL_NODE = "KubeVirt node cannot be null";
90 private static final String ERR_NULL_HOSTNAME = "KubeVirt node hostname cannot be null";
91 private static final String ERR_NULL_DEVICE_ID = "KubeVirt node device ID cannot be null";
92
93 private static final String NOT_DUPLICATED_MSG = "% cannot be duplicated";
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected KubevirtNodeStore nodeStore;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected CoreService coreService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected ClusterService clusterService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected LeadershipService leadershipService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected StorageService storageService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
111 protected OvsdbController ovsdbController;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY)
114 protected DeviceService deviceService;
115
116 /**
117 * OVSDB server listen port.
118 */
119 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
120
121 private final ExecutorService eventExecutor = newSingleThreadExecutor(
122 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
123
124 private final KubevirtNodeStoreDelegate delegate = new InternalNodeStoreDelegate();
125
126 private AtomicCounter deviceIdCounter;
127
128 private ApplicationId appId;
129
130 @Activate
131 protected void activate() {
132 appId = coreService.registerApplication(APP_ID);
133 nodeStore.setDelegate(delegate);
134
135 leadershipService.runForLeadership(appId.name());
136
137 deviceIdCounter = storageService.getAtomicCounter(DEVICE_ID_COUNTER_NAME);
138
139 log.info("Started");
140 }
141
142 @Deactivate
143 protected void deactivate() {
144 nodeStore.unsetDelegate(delegate);
145
146 leadershipService.withdraw(appId.name());
147 eventExecutor.shutdown();
148
149 log.info("Stopped");
150 }
151
152 @Modified
153 protected void modified(ComponentContext context) {
154 Dictionary<?, ?> properties = context.getProperties();
155 int updatedOvsdbPort = Tools.getIntegerProperty(properties, OVSDB_PORT);
156 if (!Objects.equals(updatedOvsdbPort, ovsdbPortNum)) {
157 ovsdbPortNum = updatedOvsdbPort;
158 }
159
160 log.info("Modified");
161 }
162
163 @Override
164 public void createNode(KubevirtNode node) {
165 checkNotNull(node, ERR_NULL_NODE);
166
167 KubevirtNode intNode;
168
169 if (node.intgBridge() == null) {
170 String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
171 checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
172 intNode = node.updateIntgBridge(DeviceId.deviceId(deviceIdStr));
173 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
174 NOT_DUPLICATED_MSG, intNode.intgBridge());
175 } else {
176 intNode = node;
177 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
178 NOT_DUPLICATED_MSG, intNode.intgBridge());
179 }
180
181 nodeStore.createNode(intNode);
182
183 log.info(String.format(MSG_NODE, intNode.hostname(), MSG_CREATED));
184 }
185
186 @Override
187 public void updateNode(KubevirtNode node) {
188 checkNotNull(node, ERR_NULL_NODE);
189
190 KubevirtNode intNode;
191
192 KubevirtNode existingNode = nodeStore.node(node.hostname());
193 checkNotNull(existingNode, ERR_NULL_NODE);
194
195 DeviceId existIntgBridge = nodeStore.node(node.hostname()).intgBridge();
196
197 if (node.intgBridge() == null) {
198 intNode = node.updateIntgBridge(existIntgBridge);
199 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
200 NOT_DUPLICATED_MSG, intNode.intgBridge());
201 } else {
202 intNode = node;
203 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
204 NOT_DUPLICATED_MSG, intNode.intgBridge());
205 }
206
207 nodeStore.updateNode(intNode);
208
209 log.info(String.format(MSG_NODE, intNode.hostname(), MSG_UPDATED));
210 }
211
212 @Override
213 public KubevirtNode removeNode(String hostname) {
214 checkArgument(!Strings.isNullOrEmpty(hostname), ERR_NULL_HOSTNAME);
215 KubevirtNode node = nodeStore.removeNode(hostname);
216 log.info(String.format(MSG_NODE, hostname, MSG_REMOVED));
217 return node;
218 }
219
220 @Override
221 public Set<KubevirtNode> nodes() {
222 return nodeStore.nodes();
223 }
224
225 @Override
226 public Set<KubevirtNode> nodes(KubevirtNode.Type type) {
227 Set<KubevirtNode> nodes = nodeStore.nodes().stream()
228 .filter(node -> Objects.equals(node.type(), type))
229 .collect(Collectors.toSet());
230 return ImmutableSet.copyOf(nodes);
231 }
232
233 @Override
234 public Set<KubevirtNode> completeNodes() {
235 Set<KubevirtNode> nodes = nodeStore.nodes().stream()
236 .filter(node -> node.state() == KubevirtNodeState.COMPLETE)
237 .collect(Collectors.toSet());
238 return ImmutableSet.copyOf(nodes);
239 }
240
241 @Override
242 public Set<KubevirtNode> completeNodes(KubevirtNode.Type type) {
243 Set<KubevirtNode> nodes = nodeStore.nodes().stream()
244 .filter(node -> node.type() == type &&
245 node.state() == KubevirtNodeState.COMPLETE)
246 .collect(Collectors.toSet());
247 return ImmutableSet.copyOf(nodes);
248 }
249
250 @Override
251 public KubevirtNode node(String hostname) {
252 return nodeStore.node(hostname);
253 }
254
255 @Override
256 public KubevirtNode node(DeviceId deviceId) {
257 return nodeStore.nodes().stream()
258 .filter(node -> Objects.equals(node.intgBridge(), deviceId) ||
259 Objects.equals(node.ovsdb(), deviceId))
260 .findFirst().orElse(null);
261 }
262
263 @Override
264 public KubevirtNode node(IpAddress mgmtIp) {
265 return nodeStore.nodes().stream()
266 .filter(node -> Objects.equals(node.managementIp(), mgmtIp))
267 .findFirst().orElse(null);
268 }
269
270 private boolean hasIntgBridge(DeviceId deviceId, String hostname) {
271 Optional<KubevirtNode> existNode = nodeStore.nodes().stream()
272 .filter(n -> !n.hostname().equals(hostname))
273 .filter(n -> deviceId.equals(n.intgBridge()))
274 .findFirst();
275
276 return existNode.isPresent();
277 }
278
279 private class InternalNodeStoreDelegate implements KubevirtNodeStoreDelegate {
280
281 @Override
282 public void notify(KubevirtNodeEvent event) {
283 if (event != null) {
284 log.trace("send kubevirt node event {}", event);
285 process(event);
286 }
287 }
288 }
289}