blob: bc789d32002371781a26c475e5a119b7854bf581 [file] [log] [blame]
Jian Li49109b52019-01-22 00:17:28 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import com.google.common.base.Strings;
19import com.google.common.collect.ImmutableSet;
20import org.onlab.util.Tools;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.LeadershipService;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.event.ListenerRegistry;
26import org.onosproject.k8snode.api.K8sNode;
27import org.onosproject.k8snode.api.K8sNode.Type;
28import org.onosproject.k8snode.api.K8sNodeAdminService;
29import org.onosproject.k8snode.api.K8sNodeEvent;
30import org.onosproject.k8snode.api.K8sNodeListener;
31import org.onosproject.k8snode.api.K8sNodeService;
32import org.onosproject.k8snode.api.K8sNodeStore;
33import org.onosproject.k8snode.api.K8sNodeStoreDelegate;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.device.DeviceService;
Jian Li1cee9882019-02-13 11:25:25 +090036import org.onosproject.store.service.AtomicCounter;
Jian Li49109b52019-01-22 00:17:28 +090037import org.onosproject.store.service.StorageService;
38import org.osgi.service.component.ComponentContext;
39import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Modified;
43import org.osgi.service.component.annotations.Reference;
44import org.osgi.service.component.annotations.ReferenceCardinality;
45import org.slf4j.Logger;
46
47import java.util.Dictionary;
48import java.util.Objects;
Jian Li1cee9882019-02-13 11:25:25 +090049import java.util.Optional;
Jian Li49109b52019-01-22 00:17:28 +090050import java.util.Set;
51import java.util.concurrent.ExecutorService;
52import java.util.stream.Collectors;
53
54import static com.google.common.base.Preconditions.checkArgument;
55import static com.google.common.base.Preconditions.checkNotNull;
56import static java.util.concurrent.Executors.newSingleThreadExecutor;
57import static org.onlab.util.Tools.groupedThreads;
Jian Lie2a04ce2020-07-01 19:07:02 +090058import static org.onosproject.k8snode.api.K8sApiConfig.Mode.NORMAL;
Jian Li49109b52019-01-22 00:17:28 +090059import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
60import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
61import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
Jian Li1cee9882019-02-13 11:25:25 +090062import static org.onosproject.k8snode.util.K8sNodeUtil.genDpid;
Jian Li49109b52019-01-22 00:17:28 +090063import static org.slf4j.LoggerFactory.getLogger;
64
65/**
66 * Service administering the inventory of kubernetes nodes.
67 */
68@Component(
69 immediate = true,
70 service = { K8sNodeService.class, K8sNodeAdminService.class },
71 property = {
72 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT
73 }
74)
75public class K8sNodeManager
76 extends ListenerRegistry<K8sNodeEvent, K8sNodeListener>
77 implements K8sNodeService, K8sNodeAdminService {
78
79 private final Logger log = getLogger(getClass());
80
81 private static final String MSG_NODE = "Kubernetes node %s %s";
82 private static final String MSG_CREATED = "created";
83 private static final String MSG_UPDATED = "updated";
84 private static final String MSG_REMOVED = "removed";
85
86 private static final String ERR_NULL_NODE = "Kubernetes node cannot be null";
87 private static final String ERR_NULL_HOSTNAME = "Kubernetes node hostname cannot be null";
Jian Li1cee9882019-02-13 11:25:25 +090088 private static final String ERR_NULL_DEVICE_ID = "Kubernetes node device ID cannot be null";
89
90 private static final String DEVICE_ID_COUNTER_NAME = "device-id-counter";
91 private static final String NOT_DUPLICATED_MSG = "% cannot be duplicated";
Jian Li49109b52019-01-22 00:17:28 +090092
93 @Reference(cardinality = ReferenceCardinality.MANDATORY)
94 protected K8sNodeStore nodeStore;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected CoreService coreService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected ClusterService clusterService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 protected LeadershipService leadershipService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
106 protected StorageService storageService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
109 protected DeviceService deviceService;
110
111 /** OVSDB server listen port. */
112 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
113
114 private final ExecutorService eventExecutor = newSingleThreadExecutor(
115 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
116
117 private final K8sNodeStoreDelegate delegate = new K8sNodeManager.InternalNodeStoreDelegate();
118
Jian Li1cee9882019-02-13 11:25:25 +0900119 private AtomicCounter deviceIdCounter;
120
Jian Li49109b52019-01-22 00:17:28 +0900121 private ApplicationId appId;
122
123 @Activate
124 protected void activate() {
125 appId = coreService.registerApplication(APP_ID);
126 nodeStore.setDelegate(delegate);
127
128 leadershipService.runForLeadership(appId.name());
129
Jian Li1cee9882019-02-13 11:25:25 +0900130 deviceIdCounter = storageService.getAtomicCounter(DEVICE_ID_COUNTER_NAME);
131
Jian Li49109b52019-01-22 00:17:28 +0900132 log.info("Started");
133 }
134
135 @Deactivate
136 protected void deactivate() {
137 nodeStore.unsetDelegate(delegate);
138
139 leadershipService.withdraw(appId.name());
140 eventExecutor.shutdown();
141
142 log.info("Stopped");
143 }
144
145 @Modified
146 protected void modified(ComponentContext context) {
147 Dictionary<?, ?> properties = context.getProperties();
148 int updatedOvsdbPort = Tools.getIntegerProperty(properties, OVSDB_PORT);
149 if (!Objects.equals(updatedOvsdbPort, ovsdbPortNum)) {
150 ovsdbPortNum = updatedOvsdbPort;
151 }
152
153 log.info("Modified");
154 }
155
156 @Override
157 public void createNode(K8sNode node) {
158 checkNotNull(node, ERR_NULL_NODE);
Jian Li1cee9882019-02-13 11:25:25 +0900159
Jian Libf562c22019-04-15 18:07:14 +0900160 K8sNode intNode;
161 K8sNode extNode;
Jian Li1a2eb5d2019-08-27 02:07:05 +0900162 K8sNode localNode;
Jian Lie2a04ce2020-07-01 19:07:02 +0900163 K8sNode tunNode;
Jian Li1cee9882019-02-13 11:25:25 +0900164
165 if (node.intgBridge() == null) {
166 String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
167 checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
Jian Libf562c22019-04-15 18:07:14 +0900168 intNode = node.updateIntgBridge(DeviceId.deviceId(deviceIdStr));
169 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
170 NOT_DUPLICATED_MSG, intNode.intgBridge());
Jian Li1cee9882019-02-13 11:25:25 +0900171 } else {
Jian Libf562c22019-04-15 18:07:14 +0900172 intNode = node;
173 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
174 NOT_DUPLICATED_MSG, intNode.intgBridge());
Jian Li1cee9882019-02-13 11:25:25 +0900175 }
176
Jian Libf562c22019-04-15 18:07:14 +0900177 if (intNode.extBridge() == null) {
178 String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
179 checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
180 extNode = intNode.updateExtBridge(DeviceId.deviceId(deviceIdStr));
181 checkArgument(!hasExtBridge(extNode.extBridge(), extNode.hostname()),
182 NOT_DUPLICATED_MSG, extNode.extBridge());
183 } else {
184 extNode = intNode;
185 checkArgument(!hasExtBridge(extNode.extBridge(), extNode.hostname()),
186 NOT_DUPLICATED_MSG, extNode.extBridge());
187 }
188
Jian Li1a2eb5d2019-08-27 02:07:05 +0900189 if (node.localBridge() == null) {
190 String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
191 checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
192 localNode = extNode.updateLocalBridge(DeviceId.deviceId(deviceIdStr));
193 checkArgument(!hasLocalBridge(localNode.localBridge(), localNode.hostname()),
194 NOT_DUPLICATED_MSG, localNode.localBridge());
195 } else {
196 localNode = extNode;
197 checkArgument(!hasLocalBridge(localNode.localBridge(), localNode.hostname()),
198 NOT_DUPLICATED_MSG, localNode.localBridge());
199 }
200
Jian Lie2a04ce2020-07-01 19:07:02 +0900201 if (node.mode() == NORMAL) {
202 if (node.tunBridge() == null) {
203 String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
204 checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
205 tunNode = localNode.updateTunBridge(DeviceId.deviceId(deviceIdStr));
206 checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
207 NOT_DUPLICATED_MSG, tunNode.tunBridge());
208 } else {
209 tunNode = localNode;
210 checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
211 NOT_DUPLICATED_MSG, tunNode.tunBridge());
212 }
213
214 nodeStore.createNode(tunNode);
215 } else {
216 nodeStore.createNode(localNode);
217 }
218
Jian Libf562c22019-04-15 18:07:14 +0900219 log.info(String.format(MSG_NODE, extNode.hostname(), MSG_CREATED));
Jian Li49109b52019-01-22 00:17:28 +0900220 }
221
222 @Override
223 public void updateNode(K8sNode node) {
224 checkNotNull(node, ERR_NULL_NODE);
Jian Li1cee9882019-02-13 11:25:25 +0900225
Jian Libf562c22019-04-15 18:07:14 +0900226 K8sNode intNode;
227 K8sNode extNode;
Jian Li1a2eb5d2019-08-27 02:07:05 +0900228 K8sNode localNode;
Jian Lie2a04ce2020-07-01 19:07:02 +0900229 K8sNode tunNode;
Jian Li1cee9882019-02-13 11:25:25 +0900230
231 K8sNode existingNode = nodeStore.node(node.hostname());
232 checkNotNull(existingNode, ERR_NULL_NODE);
233
Jian Libf562c22019-04-15 18:07:14 +0900234 DeviceId existIntgBridge = nodeStore.node(node.hostname()).intgBridge();
Jian Li1cee9882019-02-13 11:25:25 +0900235
236 if (node.intgBridge() == null) {
Jian Libf562c22019-04-15 18:07:14 +0900237 intNode = node.updateIntgBridge(existIntgBridge);
238 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
239 NOT_DUPLICATED_MSG, intNode.intgBridge());
Jian Li1cee9882019-02-13 11:25:25 +0900240 } else {
Jian Libf562c22019-04-15 18:07:14 +0900241 intNode = node;
242 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
243 NOT_DUPLICATED_MSG, intNode.intgBridge());
Jian Li1cee9882019-02-13 11:25:25 +0900244 }
245
Jian Libf562c22019-04-15 18:07:14 +0900246 DeviceId existExtBridge = nodeStore.node(node.hostname()).extBridge();
247
248 if (intNode.extBridge() == null) {
249 extNode = intNode.updateExtBridge(existExtBridge);
250 checkArgument(!hasExtBridge(extNode.extBridge(), extNode.hostname()),
251 NOT_DUPLICATED_MSG, extNode.extBridge());
252 } else {
253 extNode = intNode;
254 checkArgument(!hasExtBridge(extNode.extBridge(), extNode.hostname()),
255 NOT_DUPLICATED_MSG, extNode.extBridge());
256 }
257
Jian Li1a2eb5d2019-08-27 02:07:05 +0900258 DeviceId existLocalBridge = nodeStore.node(node.hostname()).localBridge();
259
260 if (extNode.localBridge() == null) {
261 localNode = extNode.updateLocalBridge(existLocalBridge);
262 checkArgument(!hasLocalBridge(localNode.localBridge(), localNode.hostname()),
263 NOT_DUPLICATED_MSG, localNode.localBridge());
264 } else {
265 localNode = extNode;
266 checkArgument(!hasLocalBridge(localNode.localBridge(), localNode.hostname()),
267 NOT_DUPLICATED_MSG, localNode.localBridge());
268 }
269
Jian Lie2a04ce2020-07-01 19:07:02 +0900270 if (node.mode() == NORMAL) {
271 DeviceId existTunBridge = nodeStore.node(node.hostname()).tunBridge();
272
273 if (localNode.tunBridge() == null) {
274 tunNode = localNode.updateTunBridge(existTunBridge);
275 checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
276 NOT_DUPLICATED_MSG, tunNode.tunBridge());
277 } else {
278 tunNode = localNode;
279 checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
280 NOT_DUPLICATED_MSG, tunNode.tunBridge());
281 }
282 nodeStore.updateNode(tunNode);
283 } else {
284 nodeStore.updateNode(localNode);
285 }
Jian Libf562c22019-04-15 18:07:14 +0900286 log.info(String.format(MSG_NODE, extNode.hostname(), MSG_UPDATED));
Jian Li49109b52019-01-22 00:17:28 +0900287 }
288
289 @Override
290 public K8sNode removeNode(String hostname) {
291 checkArgument(!Strings.isNullOrEmpty(hostname), ERR_NULL_HOSTNAME);
292 K8sNode node = nodeStore.removeNode(hostname);
293 log.info(String.format(MSG_NODE, hostname, MSG_REMOVED));
294 return node;
295 }
296
297 @Override
298 public Set<K8sNode> nodes() {
299 return nodeStore.nodes();
300 }
301
302 @Override
Jian Lie2a04ce2020-07-01 19:07:02 +0900303 public Set<K8sNode> nodes(String clusterName) {
304 return nodeStore.nodes().stream()
305 .filter(n -> n.clusterName().equals(clusterName))
306 .collect(Collectors.toSet());
307 }
308
309 @Override
Jian Li49109b52019-01-22 00:17:28 +0900310 public Set<K8sNode> nodes(Type type) {
311 Set<K8sNode> nodes = nodeStore.nodes().stream()
312 .filter(node -> Objects.equals(node.type(), type))
313 .collect(Collectors.toSet());
314 return ImmutableSet.copyOf(nodes);
315 }
316
317 @Override
318 public Set<K8sNode> completeNodes() {
319 Set<K8sNode> nodes = nodeStore.nodes().stream()
320 .filter(node -> node.state() == COMPLETE)
321 .collect(Collectors.toSet());
322 return ImmutableSet.copyOf(nodes);
323 }
324
325 @Override
326 public Set<K8sNode> completeNodes(Type type) {
327 Set<K8sNode> nodes = nodeStore.nodes().stream()
328 .filter(node -> node.type() == type &&
329 node.state() == COMPLETE)
330 .collect(Collectors.toSet());
331 return ImmutableSet.copyOf(nodes);
332 }
333
334 @Override
335 public K8sNode node(String hostname) {
336 return nodeStore.node(hostname);
337 }
338
Jian Libf562c22019-04-15 18:07:14 +0900339 // TODO: need to differentiate integration bridge and external bridge
Jian Li49109b52019-01-22 00:17:28 +0900340 @Override
341 public K8sNode node(DeviceId deviceId) {
342 return nodeStore.nodes().stream()
343 .filter(node -> Objects.equals(node.intgBridge(), deviceId) ||
344 Objects.equals(node.ovsdb(), deviceId))
345 .findFirst().orElse(null);
346 }
347
Jian Li1cee9882019-02-13 11:25:25 +0900348 private boolean hasIntgBridge(DeviceId deviceId, String hostname) {
349 Optional<K8sNode> existNode = nodeStore.nodes().stream()
350 .filter(n -> !n.hostname().equals(hostname))
Jian Libf562c22019-04-15 18:07:14 +0900351 .filter(n -> deviceId.equals(n.intgBridge()))
352 .findFirst();
353
354 return existNode.isPresent();
355 }
356
357 private boolean hasExtBridge(DeviceId deviceId, String hostname) {
358 Optional<K8sNode> existNode = nodeStore.nodes().stream()
359 .filter(n -> !n.hostname().equals(hostname))
360 .filter(n -> deviceId.equals(n.extBridge()))
Jian Li1cee9882019-02-13 11:25:25 +0900361 .findFirst();
362
363 return existNode.isPresent();
364 }
365
Jian Li1a2eb5d2019-08-27 02:07:05 +0900366 private boolean hasLocalBridge(DeviceId deviceId, String hostname) {
367 Optional<K8sNode> existNode = nodeStore.nodes().stream()
368 .filter(n -> !n.hostname().equals(hostname))
369 .filter(n -> deviceId.equals(n.localBridge()))
370 .findFirst();
371
372 return existNode.isPresent();
373 }
374
Jian Lie2a04ce2020-07-01 19:07:02 +0900375 private boolean hasTunBridge(DeviceId deviceId, String hostname) {
376 Optional<K8sNode> existNode = nodeStore.nodes().stream()
377 .filter(n -> !n.hostname().equals(hostname))
378 .filter(n -> deviceId.equals(n.tunBridge()))
379 .findFirst();
380
381 return existNode.isPresent();
382 }
383
Jian Li49109b52019-01-22 00:17:28 +0900384 private class InternalNodeStoreDelegate implements K8sNodeStoreDelegate {
385
386 @Override
387 public void notify(K8sNodeEvent event) {
388 if (event != null) {
389 log.trace("send kubernetes node event {}", event);
390 process(event);
391 }
392 }
393 }
394}