blob: 02894229dc11036b096bdc642286bc489c77186d [file] [log] [blame]
Jian Li49109b52019-01-22 00:17:28 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import com.google.common.base.Strings;
19import com.google.common.collect.ImmutableSet;
20import org.onlab.util.Tools;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.LeadershipService;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.event.ListenerRegistry;
26import org.onosproject.k8snode.api.K8sNode;
27import org.onosproject.k8snode.api.K8sNode.Type;
28import org.onosproject.k8snode.api.K8sNodeAdminService;
29import org.onosproject.k8snode.api.K8sNodeEvent;
30import org.onosproject.k8snode.api.K8sNodeListener;
31import org.onosproject.k8snode.api.K8sNodeService;
32import org.onosproject.k8snode.api.K8sNodeStore;
33import org.onosproject.k8snode.api.K8sNodeStoreDelegate;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.device.DeviceService;
Jian Li1cee9882019-02-13 11:25:25 +090036import org.onosproject.store.service.AtomicCounter;
Jian Li49109b52019-01-22 00:17:28 +090037import org.onosproject.store.service.StorageService;
38import org.osgi.service.component.ComponentContext;
39import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Modified;
43import org.osgi.service.component.annotations.Reference;
44import org.osgi.service.component.annotations.ReferenceCardinality;
45import org.slf4j.Logger;
46
47import java.util.Dictionary;
48import java.util.Objects;
Jian Li1cee9882019-02-13 11:25:25 +090049import java.util.Optional;
Jian Li49109b52019-01-22 00:17:28 +090050import java.util.Set;
51import java.util.concurrent.ExecutorService;
52import java.util.stream.Collectors;
53
54import static com.google.common.base.Preconditions.checkArgument;
55import static com.google.common.base.Preconditions.checkNotNull;
56import static java.util.concurrent.Executors.newSingleThreadExecutor;
57import static org.onlab.util.Tools.groupedThreads;
58import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
59import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
60import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
Jian Li1cee9882019-02-13 11:25:25 +090061import static org.onosproject.k8snode.util.K8sNodeUtil.genDpid;
Jian Li49109b52019-01-22 00:17:28 +090062import static org.slf4j.LoggerFactory.getLogger;
63
64/**
65 * Service administering the inventory of kubernetes nodes.
66 */
67@Component(
68 immediate = true,
69 service = { K8sNodeService.class, K8sNodeAdminService.class },
70 property = {
71 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT
72 }
73)
74public class K8sNodeManager
75 extends ListenerRegistry<K8sNodeEvent, K8sNodeListener>
76 implements K8sNodeService, K8sNodeAdminService {
77
78 private final Logger log = getLogger(getClass());
79
80 private static final String MSG_NODE = "Kubernetes node %s %s";
81 private static final String MSG_CREATED = "created";
82 private static final String MSG_UPDATED = "updated";
83 private static final String MSG_REMOVED = "removed";
84
85 private static final String ERR_NULL_NODE = "Kubernetes node cannot be null";
86 private static final String ERR_NULL_HOSTNAME = "Kubernetes node hostname cannot be null";
Jian Li1cee9882019-02-13 11:25:25 +090087 private static final String ERR_NULL_DEVICE_ID = "Kubernetes node device ID cannot be null";
88
89 private static final String DEVICE_ID_COUNTER_NAME = "device-id-counter";
90 private static final String NOT_DUPLICATED_MSG = "% cannot be duplicated";
Jian Li49109b52019-01-22 00:17:28 +090091
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected K8sNodeStore nodeStore;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected CoreService coreService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected ClusterService clusterService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected LeadershipService leadershipService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected StorageService storageService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected DeviceService deviceService;
109
110 /** OVSDB server listen port. */
111 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
112
113 private final ExecutorService eventExecutor = newSingleThreadExecutor(
114 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
115
116 private final K8sNodeStoreDelegate delegate = new K8sNodeManager.InternalNodeStoreDelegate();
117
Jian Li1cee9882019-02-13 11:25:25 +0900118 private AtomicCounter deviceIdCounter;
119
Jian Li49109b52019-01-22 00:17:28 +0900120 private ApplicationId appId;
121
122 @Activate
123 protected void activate() {
124 appId = coreService.registerApplication(APP_ID);
125 nodeStore.setDelegate(delegate);
126
127 leadershipService.runForLeadership(appId.name());
128
Jian Li1cee9882019-02-13 11:25:25 +0900129 deviceIdCounter = storageService.getAtomicCounter(DEVICE_ID_COUNTER_NAME);
130
Jian Li49109b52019-01-22 00:17:28 +0900131 log.info("Started");
132 }
133
134 @Deactivate
135 protected void deactivate() {
136 nodeStore.unsetDelegate(delegate);
137
138 leadershipService.withdraw(appId.name());
139 eventExecutor.shutdown();
140
141 log.info("Stopped");
142 }
143
144 @Modified
145 protected void modified(ComponentContext context) {
146 Dictionary<?, ?> properties = context.getProperties();
147 int updatedOvsdbPort = Tools.getIntegerProperty(properties, OVSDB_PORT);
148 if (!Objects.equals(updatedOvsdbPort, ovsdbPortNum)) {
149 ovsdbPortNum = updatedOvsdbPort;
150 }
151
152 log.info("Modified");
153 }
154
155 @Override
156 public void createNode(K8sNode node) {
157 checkNotNull(node, ERR_NULL_NODE);
Jian Li1cee9882019-02-13 11:25:25 +0900158
159 K8sNode updatedNode;
160
161 if (node.intgBridge() == null) {
162 String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
163 checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
164 updatedNode = node.updateIntgBridge(DeviceId.deviceId(deviceIdStr));
165 checkArgument(!hasIntgBridge(updatedNode.intgBridge(), updatedNode.hostname()),
166 NOT_DUPLICATED_MSG, updatedNode.intgBridge());
167 } else {
168 updatedNode = node;
169 checkArgument(!hasIntgBridge(updatedNode.intgBridge(), updatedNode.hostname()),
170 NOT_DUPLICATED_MSG, updatedNode.intgBridge());
171 }
172
173 nodeStore.createNode(updatedNode);
174 log.info(String.format(MSG_NODE, updatedNode.hostname(), MSG_CREATED));
Jian Li49109b52019-01-22 00:17:28 +0900175 }
176
177 @Override
178 public void updateNode(K8sNode node) {
179 checkNotNull(node, ERR_NULL_NODE);
Jian Li1cee9882019-02-13 11:25:25 +0900180
181 K8sNode updatedNode;
182
183 K8sNode existingNode = nodeStore.node(node.hostname());
184 checkNotNull(existingNode, ERR_NULL_NODE);
185
186 DeviceId existDeviceId = nodeStore.node(node.hostname()).intgBridge();
187
188 if (node.intgBridge() == null) {
189 updatedNode = node.updateIntgBridge(existDeviceId);
190 checkArgument(!hasIntgBridge(updatedNode.intgBridge(), updatedNode.hostname()),
191 NOT_DUPLICATED_MSG, updatedNode.intgBridge());
192 } else {
193 updatedNode = node;
194 checkArgument(!hasIntgBridge(updatedNode.intgBridge(), updatedNode.hostname()),
195 NOT_DUPLICATED_MSG, updatedNode.intgBridge());
196 }
197
198 nodeStore.updateNode(updatedNode);
199 log.info(String.format(MSG_NODE, updatedNode.hostname(), MSG_UPDATED));
Jian Li49109b52019-01-22 00:17:28 +0900200 }
201
202 @Override
203 public K8sNode removeNode(String hostname) {
204 checkArgument(!Strings.isNullOrEmpty(hostname), ERR_NULL_HOSTNAME);
205 K8sNode node = nodeStore.removeNode(hostname);
206 log.info(String.format(MSG_NODE, hostname, MSG_REMOVED));
207 return node;
208 }
209
210 @Override
211 public Set<K8sNode> nodes() {
212 return nodeStore.nodes();
213 }
214
215 @Override
216 public Set<K8sNode> nodes(Type type) {
217 Set<K8sNode> nodes = nodeStore.nodes().stream()
218 .filter(node -> Objects.equals(node.type(), type))
219 .collect(Collectors.toSet());
220 return ImmutableSet.copyOf(nodes);
221 }
222
223 @Override
224 public Set<K8sNode> completeNodes() {
225 Set<K8sNode> nodes = nodeStore.nodes().stream()
226 .filter(node -> node.state() == COMPLETE)
227 .collect(Collectors.toSet());
228 return ImmutableSet.copyOf(nodes);
229 }
230
231 @Override
232 public Set<K8sNode> completeNodes(Type type) {
233 Set<K8sNode> nodes = nodeStore.nodes().stream()
234 .filter(node -> node.type() == type &&
235 node.state() == COMPLETE)
236 .collect(Collectors.toSet());
237 return ImmutableSet.copyOf(nodes);
238 }
239
240 @Override
241 public K8sNode node(String hostname) {
242 return nodeStore.node(hostname);
243 }
244
245 @Override
246 public K8sNode node(DeviceId deviceId) {
247 return nodeStore.nodes().stream()
248 .filter(node -> Objects.equals(node.intgBridge(), deviceId) ||
249 Objects.equals(node.ovsdb(), deviceId))
250 .findFirst().orElse(null);
251 }
252
Jian Li1cee9882019-02-13 11:25:25 +0900253 private boolean hasIntgBridge(DeviceId deviceId, String hostname) {
254 Optional<K8sNode> existNode = nodeStore.nodes().stream()
255 .filter(n -> !n.hostname().equals(hostname))
256 .filter(n -> n.intgBridge().equals(deviceId))
257 .findFirst();
258
259 return existNode.isPresent();
260 }
261
Jian Li49109b52019-01-22 00:17:28 +0900262 private class InternalNodeStoreDelegate implements K8sNodeStoreDelegate {
263
264 @Override
265 public void notify(K8sNodeEvent event) {
266 if (event != null) {
267 log.trace("send kubernetes node event {}", event);
268 process(event);
269 }
270 }
271 }
272}