blob: 5b3f3538edcbbe7d302eb37621ca881faedec3b6 [file] [log] [blame]
Jian Li49109b52019-01-22 00:17:28 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import com.google.common.base.Strings;
19import com.google.common.collect.ImmutableSet;
20import org.onlab.util.Tools;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.LeadershipService;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.event.ListenerRegistry;
26import org.onosproject.k8snode.api.K8sNode;
27import org.onosproject.k8snode.api.K8sNode.Type;
28import org.onosproject.k8snode.api.K8sNodeAdminService;
29import org.onosproject.k8snode.api.K8sNodeEvent;
30import org.onosproject.k8snode.api.K8sNodeListener;
31import org.onosproject.k8snode.api.K8sNodeService;
32import org.onosproject.k8snode.api.K8sNodeStore;
33import org.onosproject.k8snode.api.K8sNodeStoreDelegate;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.device.DeviceService;
Jian Li1cee9882019-02-13 11:25:25 +090036import org.onosproject.store.service.AtomicCounter;
Jian Li49109b52019-01-22 00:17:28 +090037import org.onosproject.store.service.StorageService;
38import org.osgi.service.component.ComponentContext;
39import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Modified;
43import org.osgi.service.component.annotations.Reference;
44import org.osgi.service.component.annotations.ReferenceCardinality;
45import org.slf4j.Logger;
46
47import java.util.Dictionary;
48import java.util.Objects;
Jian Li1cee9882019-02-13 11:25:25 +090049import java.util.Optional;
Jian Li49109b52019-01-22 00:17:28 +090050import java.util.Set;
51import java.util.concurrent.ExecutorService;
52import java.util.stream.Collectors;
53
54import static com.google.common.base.Preconditions.checkArgument;
55import static com.google.common.base.Preconditions.checkNotNull;
56import static java.util.concurrent.Executors.newSingleThreadExecutor;
57import static org.onlab.util.Tools.groupedThreads;
58import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
59import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
60import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
Jian Li1cee9882019-02-13 11:25:25 +090061import static org.onosproject.k8snode.util.K8sNodeUtil.genDpid;
Jian Li49109b52019-01-22 00:17:28 +090062import static org.slf4j.LoggerFactory.getLogger;
63
64/**
65 * Service administering the inventory of kubernetes nodes.
66 */
67@Component(
68 immediate = true,
69 service = { K8sNodeService.class, K8sNodeAdminService.class },
70 property = {
71 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT
72 }
73)
74public class K8sNodeManager
75 extends ListenerRegistry<K8sNodeEvent, K8sNodeListener>
76 implements K8sNodeService, K8sNodeAdminService {
77
78 private final Logger log = getLogger(getClass());
79
80 private static final String MSG_NODE = "Kubernetes node %s %s";
81 private static final String MSG_CREATED = "created";
82 private static final String MSG_UPDATED = "updated";
83 private static final String MSG_REMOVED = "removed";
84
85 private static final String ERR_NULL_NODE = "Kubernetes node cannot be null";
86 private static final String ERR_NULL_HOSTNAME = "Kubernetes node hostname cannot be null";
Jian Li1cee9882019-02-13 11:25:25 +090087 private static final String ERR_NULL_DEVICE_ID = "Kubernetes node device ID cannot be null";
88
89 private static final String DEVICE_ID_COUNTER_NAME = "device-id-counter";
90 private static final String NOT_DUPLICATED_MSG = "% cannot be duplicated";
Jian Li49109b52019-01-22 00:17:28 +090091
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected K8sNodeStore nodeStore;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected CoreService coreService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected ClusterService clusterService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected LeadershipService leadershipService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected StorageService storageService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected DeviceService deviceService;
109
110 /** OVSDB server listen port. */
111 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
112
113 private final ExecutorService eventExecutor = newSingleThreadExecutor(
114 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
115
116 private final K8sNodeStoreDelegate delegate = new K8sNodeManager.InternalNodeStoreDelegate();
117
Jian Li1cee9882019-02-13 11:25:25 +0900118 private AtomicCounter deviceIdCounter;
119
Jian Li49109b52019-01-22 00:17:28 +0900120 private ApplicationId appId;
121
122 @Activate
123 protected void activate() {
124 appId = coreService.registerApplication(APP_ID);
125 nodeStore.setDelegate(delegate);
126
127 leadershipService.runForLeadership(appId.name());
128
Jian Li1cee9882019-02-13 11:25:25 +0900129 deviceIdCounter = storageService.getAtomicCounter(DEVICE_ID_COUNTER_NAME);
130
Jian Li49109b52019-01-22 00:17:28 +0900131 log.info("Started");
132 }
133
134 @Deactivate
135 protected void deactivate() {
136 nodeStore.unsetDelegate(delegate);
137
138 leadershipService.withdraw(appId.name());
139 eventExecutor.shutdown();
140
141 log.info("Stopped");
142 }
143
144 @Modified
145 protected void modified(ComponentContext context) {
146 Dictionary<?, ?> properties = context.getProperties();
147 int updatedOvsdbPort = Tools.getIntegerProperty(properties, OVSDB_PORT);
148 if (!Objects.equals(updatedOvsdbPort, ovsdbPortNum)) {
149 ovsdbPortNum = updatedOvsdbPort;
150 }
151
152 log.info("Modified");
153 }
154
155 @Override
156 public void createNode(K8sNode node) {
157 checkNotNull(node, ERR_NULL_NODE);
Jian Li1cee9882019-02-13 11:25:25 +0900158
Jian Libf562c22019-04-15 18:07:14 +0900159 K8sNode intNode;
160 K8sNode extNode;
Jian Li1a2eb5d2019-08-27 02:07:05 +0900161 K8sNode localNode;
Jian Li1cee9882019-02-13 11:25:25 +0900162
163 if (node.intgBridge() == null) {
164 String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
165 checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
Jian Libf562c22019-04-15 18:07:14 +0900166 intNode = node.updateIntgBridge(DeviceId.deviceId(deviceIdStr));
167 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
168 NOT_DUPLICATED_MSG, intNode.intgBridge());
Jian Li1cee9882019-02-13 11:25:25 +0900169 } else {
Jian Libf562c22019-04-15 18:07:14 +0900170 intNode = node;
171 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
172 NOT_DUPLICATED_MSG, intNode.intgBridge());
Jian Li1cee9882019-02-13 11:25:25 +0900173 }
174
Jian Libf562c22019-04-15 18:07:14 +0900175 if (intNode.extBridge() == null) {
176 String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
177 checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
178 extNode = intNode.updateExtBridge(DeviceId.deviceId(deviceIdStr));
179 checkArgument(!hasExtBridge(extNode.extBridge(), extNode.hostname()),
180 NOT_DUPLICATED_MSG, extNode.extBridge());
181 } else {
182 extNode = intNode;
183 checkArgument(!hasExtBridge(extNode.extBridge(), extNode.hostname()),
184 NOT_DUPLICATED_MSG, extNode.extBridge());
185 }
186
Jian Li1a2eb5d2019-08-27 02:07:05 +0900187 if (node.localBridge() == null) {
188 String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
189 checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
190 localNode = extNode.updateLocalBridge(DeviceId.deviceId(deviceIdStr));
191 checkArgument(!hasLocalBridge(localNode.localBridge(), localNode.hostname()),
192 NOT_DUPLICATED_MSG, localNode.localBridge());
193 } else {
194 localNode = extNode;
195 checkArgument(!hasLocalBridge(localNode.localBridge(), localNode.hostname()),
196 NOT_DUPLICATED_MSG, localNode.localBridge());
197 }
198
199 nodeStore.createNode(localNode);
Jian Libf562c22019-04-15 18:07:14 +0900200 log.info(String.format(MSG_NODE, extNode.hostname(), MSG_CREATED));
Jian Li49109b52019-01-22 00:17:28 +0900201 }
202
203 @Override
204 public void updateNode(K8sNode node) {
205 checkNotNull(node, ERR_NULL_NODE);
Jian Li1cee9882019-02-13 11:25:25 +0900206
Jian Libf562c22019-04-15 18:07:14 +0900207 K8sNode intNode;
208 K8sNode extNode;
Jian Li1a2eb5d2019-08-27 02:07:05 +0900209 K8sNode localNode;
Jian Li1cee9882019-02-13 11:25:25 +0900210
211 K8sNode existingNode = nodeStore.node(node.hostname());
212 checkNotNull(existingNode, ERR_NULL_NODE);
213
Jian Libf562c22019-04-15 18:07:14 +0900214 DeviceId existIntgBridge = nodeStore.node(node.hostname()).intgBridge();
Jian Li1cee9882019-02-13 11:25:25 +0900215
216 if (node.intgBridge() == null) {
Jian Libf562c22019-04-15 18:07:14 +0900217 intNode = node.updateIntgBridge(existIntgBridge);
218 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
219 NOT_DUPLICATED_MSG, intNode.intgBridge());
Jian Li1cee9882019-02-13 11:25:25 +0900220 } else {
Jian Libf562c22019-04-15 18:07:14 +0900221 intNode = node;
222 checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
223 NOT_DUPLICATED_MSG, intNode.intgBridge());
Jian Li1cee9882019-02-13 11:25:25 +0900224 }
225
Jian Libf562c22019-04-15 18:07:14 +0900226 DeviceId existExtBridge = nodeStore.node(node.hostname()).extBridge();
227
228 if (intNode.extBridge() == null) {
229 extNode = intNode.updateExtBridge(existExtBridge);
230 checkArgument(!hasExtBridge(extNode.extBridge(), extNode.hostname()),
231 NOT_DUPLICATED_MSG, extNode.extBridge());
232 } else {
233 extNode = intNode;
234 checkArgument(!hasExtBridge(extNode.extBridge(), extNode.hostname()),
235 NOT_DUPLICATED_MSG, extNode.extBridge());
236 }
237
Jian Li1a2eb5d2019-08-27 02:07:05 +0900238 DeviceId existLocalBridge = nodeStore.node(node.hostname()).localBridge();
239
240 if (extNode.localBridge() == null) {
241 localNode = extNode.updateLocalBridge(existLocalBridge);
242 checkArgument(!hasLocalBridge(localNode.localBridge(), localNode.hostname()),
243 NOT_DUPLICATED_MSG, localNode.localBridge());
244 } else {
245 localNode = extNode;
246 checkArgument(!hasLocalBridge(localNode.localBridge(), localNode.hostname()),
247 NOT_DUPLICATED_MSG, localNode.localBridge());
248 }
249
250 nodeStore.updateNode(localNode);
Jian Libf562c22019-04-15 18:07:14 +0900251 log.info(String.format(MSG_NODE, extNode.hostname(), MSG_UPDATED));
Jian Li49109b52019-01-22 00:17:28 +0900252 }
253
254 @Override
255 public K8sNode removeNode(String hostname) {
256 checkArgument(!Strings.isNullOrEmpty(hostname), ERR_NULL_HOSTNAME);
257 K8sNode node = nodeStore.removeNode(hostname);
258 log.info(String.format(MSG_NODE, hostname, MSG_REMOVED));
259 return node;
260 }
261
262 @Override
263 public Set<K8sNode> nodes() {
264 return nodeStore.nodes();
265 }
266
267 @Override
268 public Set<K8sNode> nodes(Type type) {
269 Set<K8sNode> nodes = nodeStore.nodes().stream()
270 .filter(node -> Objects.equals(node.type(), type))
271 .collect(Collectors.toSet());
272 return ImmutableSet.copyOf(nodes);
273 }
274
275 @Override
276 public Set<K8sNode> completeNodes() {
277 Set<K8sNode> nodes = nodeStore.nodes().stream()
278 .filter(node -> node.state() == COMPLETE)
279 .collect(Collectors.toSet());
280 return ImmutableSet.copyOf(nodes);
281 }
282
283 @Override
284 public Set<K8sNode> completeNodes(Type type) {
285 Set<K8sNode> nodes = nodeStore.nodes().stream()
286 .filter(node -> node.type() == type &&
287 node.state() == COMPLETE)
288 .collect(Collectors.toSet());
289 return ImmutableSet.copyOf(nodes);
290 }
291
292 @Override
293 public K8sNode node(String hostname) {
294 return nodeStore.node(hostname);
295 }
296
Jian Libf562c22019-04-15 18:07:14 +0900297 // TODO: need to differentiate integration bridge and external bridge
Jian Li49109b52019-01-22 00:17:28 +0900298 @Override
299 public K8sNode node(DeviceId deviceId) {
300 return nodeStore.nodes().stream()
301 .filter(node -> Objects.equals(node.intgBridge(), deviceId) ||
302 Objects.equals(node.ovsdb(), deviceId))
303 .findFirst().orElse(null);
304 }
305
Jian Li1cee9882019-02-13 11:25:25 +0900306 private boolean hasIntgBridge(DeviceId deviceId, String hostname) {
307 Optional<K8sNode> existNode = nodeStore.nodes().stream()
308 .filter(n -> !n.hostname().equals(hostname))
Jian Libf562c22019-04-15 18:07:14 +0900309 .filter(n -> deviceId.equals(n.intgBridge()))
310 .findFirst();
311
312 return existNode.isPresent();
313 }
314
315 private boolean hasExtBridge(DeviceId deviceId, String hostname) {
316 Optional<K8sNode> existNode = nodeStore.nodes().stream()
317 .filter(n -> !n.hostname().equals(hostname))
318 .filter(n -> deviceId.equals(n.extBridge()))
Jian Li1cee9882019-02-13 11:25:25 +0900319 .findFirst();
320
321 return existNode.isPresent();
322 }
323
Jian Li1a2eb5d2019-08-27 02:07:05 +0900324 private boolean hasLocalBridge(DeviceId deviceId, String hostname) {
325 Optional<K8sNode> existNode = nodeStore.nodes().stream()
326 .filter(n -> !n.hostname().equals(hostname))
327 .filter(n -> deviceId.equals(n.localBridge()))
328 .findFirst();
329
330 return existNode.isPresent();
331 }
332
Jian Li49109b52019-01-22 00:17:28 +0900333 private class InternalNodeStoreDelegate implements K8sNodeStoreDelegate {
334
335 @Override
336 public void notify(K8sNodeEvent event) {
337 if (event != null) {
338 log.trace("send kubernetes node event {}", event);
339 process(event);
340 }
341 }
342 }
343}