blob: 7510aaf9305b4b5852c2e11c16c1e5d7ef84ec49 [file] [log] [blame]
Jian Libde20bf2019-01-25 17:34:43 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.base.Strings;
19import com.google.common.collect.ImmutableSet;
20import org.onlab.packet.MacAddress;
21import org.onlab.packet.VlanId;
22import org.onlab.util.Tools;
23import org.onosproject.core.CoreService;
24import org.onosproject.k8snetworking.api.K8sNetwork;
Jian Li4aa17642019-01-30 00:01:11 +090025import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
Jian Libde20bf2019-01-25 17:34:43 +090026import org.onosproject.k8snetworking.api.K8sPort;
27import org.onosproject.k8snode.api.K8sNode;
28import org.onosproject.k8snode.api.K8sNodeEvent;
29import org.onosproject.k8snode.api.K8sNodeListener;
30import org.onosproject.k8snode.api.K8sNodeService;
31import org.onosproject.mastership.MastershipService;
32import org.onosproject.net.ConnectPoint;
33import org.onosproject.net.DefaultAnnotations;
34import org.onosproject.net.Device;
35import org.onosproject.net.Host;
36import org.onosproject.net.HostId;
37import org.onosproject.net.HostLocation;
38import org.onosproject.net.Port;
39import org.onosproject.net.device.DeviceEvent;
40import org.onosproject.net.device.DeviceListener;
41import org.onosproject.net.device.DeviceService;
42import org.onosproject.net.host.DefaultHostDescription;
43import org.onosproject.net.host.HostDescription;
44import org.onosproject.net.host.HostProvider;
45import org.onosproject.net.host.HostProviderRegistry;
46import org.onosproject.net.host.HostProviderService;
47import org.onosproject.net.host.HostService;
48import org.onosproject.net.provider.AbstractProvider;
49import org.onosproject.net.provider.ProviderId;
50import org.osgi.service.component.annotations.Activate;
51import org.osgi.service.component.annotations.Component;
52import org.osgi.service.component.annotations.Deactivate;
53import org.osgi.service.component.annotations.Reference;
54import org.osgi.service.component.annotations.ReferenceCardinality;
55import org.slf4j.Logger;
56import org.slf4j.LoggerFactory;
57
58import java.util.Set;
59import java.util.concurrent.ExecutorService;
60import java.util.concurrent.Executors;
61import java.util.stream.Collectors;
62
63import static org.onlab.util.Tools.groupedThreads;
64import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_CREATE_TIME;
65import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_NETWORK_ID;
66import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_PORT_ID;
67import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_SEGMENT_ID;
68import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
69import static org.onosproject.k8snetworking.api.Constants.PORT_NAME_PREFIX_CONTAINER;
70import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.isContainer;
Jian Li4a7ce672019-04-09 15:20:25 +090071import static org.onosproject.k8snode.api.K8sNodeState.INIT;
Jian Libde20bf2019-01-25 17:34:43 +090072import static org.onosproject.net.AnnotationKeys.PORT_NAME;
73
74/**
75 * A provider used to feed host information for kubernetes.
76 */
77@Component(immediate = true, service = HostProvider.class)
78public class K8sSwitchingHostProvider extends AbstractProvider implements HostProvider {
79
80 private final Logger log = LoggerFactory.getLogger(getClass());
81
82 private static final String ERR_ADD_HOST = "Failed to add host: ";
Jian Li4aa17642019-01-30 00:01:11 +090083 private static final String SONA_HOST_SCHEME = "sona-k8s";
84 private static final int PORT_PREFIX_LENGTH = 4;
Jian Libde20bf2019-01-25 17:34:43 +090085
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected CoreService coreService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected DeviceService deviceService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected HostProviderRegistry hostProviderRegistry;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected HostService hostService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected MastershipService mastershipService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li4aa17642019-01-30 00:01:11 +0900102 protected K8sNetworkAdminService k8sNetworkService;
Jian Libde20bf2019-01-25 17:34:43 +0900103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected K8sNodeService k8sNodeService;
106
107 private HostProviderService hostProviderService;
108
109 private final ExecutorService executor = Executors.newSingleThreadExecutor(
110 groupedThreads(this.getClass().getSimpleName(), "device-event"));
111 private final InternalDeviceListener internalDeviceListener =
112 new InternalDeviceListener();
113 private final InternalK8sNodeListener internalK8sNodeListener =
114 new InternalK8sNodeListener();
115
116 /**
117 * Creates kubernetes switching host provider.
118 */
119 public K8sSwitchingHostProvider() {
120 super(new ProviderId(SONA_HOST_SCHEME, K8S_NETWORKING_APP_ID));
121 }
122
123 @Activate
124 protected void activate() {
125 coreService.registerApplication(K8S_NETWORKING_APP_ID);
126 deviceService.addListener(internalDeviceListener);
127 k8sNodeService.addListener(internalK8sNodeListener);
128 hostProviderService = hostProviderRegistry.register(this);
129
130 log.info("Started");
131 }
132
133 @Deactivate
134 protected void deactivate() {
135 hostProviderRegistry.unregister(this);
136 k8sNodeService.removeListener(internalK8sNodeListener);
137 deviceService.removeListener(internalDeviceListener);
138
139 executor.shutdown();
140
141 log.info("Stopped");
142 }
143
144
145 @Override
146 public void triggerProbe(Host host) {
147 // no probe is required
148 }
149
150 /**
151 * Processes port addition event.
152 *
153 * @param port port object used in ONOS
154 */
155 private void processPortAdded(Port port) {
156 K8sPort k8sPort = portToK8sPort(port);
157 if (k8sPort == null) {
158 log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
159 return;
160 }
161
162 K8sNetwork k8sNet = k8sNetworkService.network(k8sPort.networkId());
163 if (k8sNet == null) {
164 log.warn(ERR_ADD_HOST + "Kubernetes network {} not found",
165 k8sPort.networkId());
166 return;
167 }
168
169 MacAddress mac = k8sPort.macAddress();
170 HostId hostId = HostId.hostId(mac);
171
172 // connect point is the combination of switch ID with port number where
173 // the host is attached to
174 ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
175
176 long createTime = System.currentTimeMillis();
177
Jian Li4aa17642019-01-30 00:01:11 +0900178 // update k8s port number by referring to ONOS port number
179
180 k8sNetworkService.updatePort(k8sPort.updatePortNumber(port.number())
181 .updateState(K8sPort.State.ACTIVE));
182
Jian Libde20bf2019-01-25 17:34:43 +0900183 // we check whether the host already attached to same locations
184 Host host = hostService.getHost(hostId);
185
186 // build host annotations to include a set of meta info from neutron
187 DefaultAnnotations.Builder annotations = DefaultAnnotations.builder()
188 .set(ANNOTATION_NETWORK_ID, k8sPort.networkId())
189 .set(ANNOTATION_PORT_ID, k8sPort.portId())
190 .set(ANNOTATION_CREATE_TIME, String.valueOf(createTime))
191 .set(ANNOTATION_SEGMENT_ID, k8sNet.segmentId());
192
193 HostDescription hostDesc = new DefaultHostDescription(
194 mac,
195 VlanId.NONE,
196 new HostLocation(connectPoint, createTime),
197 ImmutableSet.of(k8sPort.ipAddress()),
198 annotations.build());
199
200 if (host != null) {
201 Set<HostLocation> locations = host.locations().stream()
202 .filter(l -> l.deviceId().equals(connectPoint.deviceId()))
203 .filter(l -> l.port().equals(connectPoint.port()))
204 .collect(Collectors.toSet());
205
206 // newly added location is not in the existing location list,
207 // therefore, we simply add this into the location list
208 if (locations.isEmpty()) {
209 hostProviderService.addLocationToHost(hostId,
210 new HostLocation(connectPoint, createTime));
211 }
212
213 // newly added location is in the existing location list,
214 // the hostDetected method invocation in turn triggers host Update event
215 if (locations.size() == 1) {
216 hostProviderService.hostDetected(hostId, hostDesc, false);
217 }
218 } else {
219 hostProviderService.hostDetected(hostId, hostDesc, false);
220 }
221 }
222
223 /**
224 * Processes port removal event.
225 *
226 * @param port ONOS port
227 */
228 private void processPortRemoved(Port port) {
229 ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
230
231 Set<Host> hosts = hostService.getConnectedHosts(connectPoint);
232
233 hosts.forEach(h -> hostProviderService.hostVanished(h.id()));
Jian Li4a7ce672019-04-09 15:20:25 +0900234
235 K8sPort k8sPort = portToK8sPort(port);
236
237 if (k8sPort == null) {
238 log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
239 return;
240 }
241
242 k8sNetworkService.removePort(k8sPort.portId());
243 }
244
245 /**
246 * Process port inactivate event.
247 *
248 * @param port ONOS port
249 */
250 private void processPortInactivated(Port port) {
251 K8sPort k8sPort = portToK8sPort(port);
252
253 if (k8sPort == null) {
254 log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
255 return;
256 }
257
258 k8sNetworkService.updatePort(k8sPort.updateState(K8sPort.State.INACTIVE));
Jian Libde20bf2019-01-25 17:34:43 +0900259 }
260
261 /**
262 * Converts ONOS port to kubernetes port.
263 *
264 * @param port ONOS port
265 * @return mapped kubernetes port
266 */
267 private K8sPort portToK8sPort(Port port) {
268 String portName = port.annotations().value(PORT_NAME);
269 if (Strings.isNullOrEmpty(portName)) {
270 return null;
271 }
272
273 if (isContainer(portName)) {
274 return k8sNetworkService.ports().stream()
275 .filter(p -> p.portId().contains(portName.substring(PORT_PREFIX_LENGTH)))
276 .findAny().orElse(null);
277 } else {
278 return null;
279 }
280 }
281
282 private class InternalDeviceListener implements DeviceListener {
283
284 @Override
285 public boolean isRelevant(DeviceEvent event) {
286 Port port = event.port();
287 if (port == null) {
288 return false;
289 }
290
291 String portName = port.annotations().value(PORT_NAME);
292
293 return !Strings.isNullOrEmpty(portName) &&
294 portName.startsWith(PORT_NAME_PREFIX_CONTAINER);
295 }
296
297 private boolean isRelevantHelper(DeviceEvent event) {
298 return mastershipService.isLocalMaster(event.subject().id());
299 }
300
301 @Override
302 public void event(DeviceEvent event) {
303 log.info("Device event occurred with type {}", event.type());
304
305 switch (event.type()) {
306 case PORT_UPDATED:
307 executor.execute(() -> processPortUpdate(event));
308 break;
309 case PORT_ADDED:
310 executor.execute(() -> processPortAddition(event));
311 break;
312 case PORT_REMOVED:
313 executor.execute(() -> processPortRemoval(event));
314 break;
315 default:
316 break;
317 }
318 }
319
320 private void processPortUpdate(DeviceEvent event) {
321 if (!isRelevantHelper(event)) {
322 return;
323 }
324
Jian Li4aa17642019-01-30 00:01:11 +0900325 log.debug("K8s port {} is updated at {}",
326 event.port().annotations().value(PORT_NAME),
327 event.subject().id());
328
Jian Libde20bf2019-01-25 17:34:43 +0900329 if (!event.port().isEnabled()) {
330 processPortRemoval(event);
331 } else if (event.port().isEnabled()) {
332 processPortAddition(event);
333 }
334 }
335
336 private void processPortAddition(DeviceEvent event) {
337 if (!isRelevantHelper(event)) {
338 return;
339 }
340
Jian Li4aa17642019-01-30 00:01:11 +0900341 log.debug("K8s port {} is detected from {}",
342 event.port().annotations().value(PORT_NAME),
343 event.subject().id());
344
345 processPortAdded(event.port());
Jian Libde20bf2019-01-25 17:34:43 +0900346 }
347
348 private void processPortRemoval(DeviceEvent event) {
349 if (!isRelevantHelper(event)) {
350 return;
351 }
352
Jian Li4aa17642019-01-30 00:01:11 +0900353 log.debug("K8s port {} is removed from {}",
354 event.port().annotations().value(PORT_NAME),
355 event.subject().id());
356
357 processPortRemoved(event.port());
Jian Libde20bf2019-01-25 17:34:43 +0900358 }
359 }
360
361 private class InternalK8sNodeListener implements K8sNodeListener {
362
363 private boolean isRelevantHelper(K8sNodeEvent event) {
364 // do not allow to proceed without mastership
365 Device device = deviceService.getDevice(event.subject().intgBridge());
366 if (device == null) {
367 return false;
368 }
369 return mastershipService.isLocalMaster(device.id());
370 }
371
372 @Override
373 public void event(K8sNodeEvent event) {
374 K8sNode k8sNode = event.subject();
375
376 switch (event.type()) {
377 case K8S_NODE_COMPLETE:
Jian Li4a7ce672019-04-09 15:20:25 +0900378 executor.execute(() -> processCompleteNode(event, k8sNode));
Jian Libde20bf2019-01-25 17:34:43 +0900379 break;
Jian Li4a7ce672019-04-09 15:20:25 +0900380 case K8S_NODE_UPDATED:
381 if (k8sNode.state() == INIT) {
382 executor.execute(() -> processIncompleteNode(event, k8sNode));
383 }
Jian Libde20bf2019-01-25 17:34:43 +0900384 break;
385 case K8S_NODE_CREATED:
Jian Libde20bf2019-01-25 17:34:43 +0900386 case K8S_NODE_REMOVED:
Jian Li4a7ce672019-04-09 15:20:25 +0900387 case K8S_NODE_INCOMPLETE:
Jian Libde20bf2019-01-25 17:34:43 +0900388 default:
389 break;
390 }
391 }
392
393 private void processCompleteNode(K8sNodeEvent event, K8sNode k8sNode) {
394 if (!isRelevantHelper(event)) {
395 return;
396 }
397
398 log.info("COMPLETE node {} is detected", k8sNode.hostname());
399
400 deviceService.getPorts(k8sNode.intgBridge()).stream()
401 .filter(port -> isContainer(port.annotations().value(PORT_NAME)))
402 .filter(Port::isEnabled)
403 .forEach(port -> {
404 log.debug("Container port {} is detected from {}",
405 port.annotations().value(PORT_NAME),
406 k8sNode.hostname());
407 processPortAdded(port);
408 });
409
410 Tools.stream(hostService.getHosts())
411 .filter(host -> deviceService.getPort(
412 host.location().deviceId(),
413 host.location().port()) == null)
414 .forEach(host -> {
415 log.info("Remove stale host {}", host.id());
416 hostProviderService.hostVanished(host.id());
417 });
418 }
Jian Li4a7ce672019-04-09 15:20:25 +0900419
420 private void processIncompleteNode(K8sNodeEvent event, K8sNode k8sNode) {
421 if (!isRelevantHelper(event)) {
422 return;
423 }
424
425 log.info("INIT node {} is detected", k8sNode.hostname());
426
427 deviceService.getPorts(k8sNode.intgBridge()).stream()
428 .filter(port -> isContainer(port.annotations().value(PORT_NAME)))
429 .filter(Port::isEnabled)
430 .forEach(port -> {
431 log.debug("Container port {} is detected from {}",
432 port.annotations().value(PORT_NAME),
433 k8sNode.hostname());
434 processPortInactivated(port);
435 });
436 }
Jian Libde20bf2019-01-25 17:34:43 +0900437 }
438}