blob: 85f64bf98ed096edca1ef4925ef575b28847b784 [file] [log] [blame]
Jian Libde20bf2019-01-25 17:34:43 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.base.Strings;
19import com.google.common.collect.ImmutableSet;
20import org.onlab.packet.MacAddress;
21import org.onlab.packet.VlanId;
22import org.onlab.util.Tools;
23import org.onosproject.core.CoreService;
24import org.onosproject.k8snetworking.api.K8sNetwork;
Jian Li4aa17642019-01-30 00:01:11 +090025import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
Jian Li3cb86e32020-09-07 17:01:11 +090026import org.onosproject.k8snetworking.api.K8sNetworkEvent;
27import org.onosproject.k8snetworking.api.K8sNetworkListener;
Jian Libde20bf2019-01-25 17:34:43 +090028import org.onosproject.k8snetworking.api.K8sPort;
Jian Lib5089a52020-10-06 02:21:01 +090029import org.onosproject.k8snode.api.K8sHostService;
Jian Libde20bf2019-01-25 17:34:43 +090030import org.onosproject.k8snode.api.K8sNode;
31import org.onosproject.k8snode.api.K8sNodeEvent;
32import org.onosproject.k8snode.api.K8sNodeListener;
33import org.onosproject.k8snode.api.K8sNodeService;
34import org.onosproject.mastership.MastershipService;
35import org.onosproject.net.ConnectPoint;
36import org.onosproject.net.DefaultAnnotations;
37import org.onosproject.net.Device;
Jian Lib5089a52020-10-06 02:21:01 +090038import org.onosproject.net.DeviceId;
Jian Libde20bf2019-01-25 17:34:43 +090039import org.onosproject.net.Host;
40import org.onosproject.net.HostId;
41import org.onosproject.net.HostLocation;
42import org.onosproject.net.Port;
43import org.onosproject.net.device.DeviceEvent;
44import org.onosproject.net.device.DeviceListener;
45import org.onosproject.net.device.DeviceService;
46import org.onosproject.net.host.DefaultHostDescription;
47import org.onosproject.net.host.HostDescription;
48import org.onosproject.net.host.HostProvider;
49import org.onosproject.net.host.HostProviderRegistry;
50import org.onosproject.net.host.HostProviderService;
51import org.onosproject.net.host.HostService;
52import org.onosproject.net.provider.AbstractProvider;
53import org.onosproject.net.provider.ProviderId;
54import org.osgi.service.component.annotations.Activate;
55import org.osgi.service.component.annotations.Component;
56import org.osgi.service.component.annotations.Deactivate;
57import org.osgi.service.component.annotations.Reference;
58import org.osgi.service.component.annotations.ReferenceCardinality;
59import org.slf4j.Logger;
60import org.slf4j.LoggerFactory;
61
62import java.util.Set;
63import java.util.concurrent.ExecutorService;
64import java.util.concurrent.Executors;
65import java.util.stream.Collectors;
66
67import static org.onlab.util.Tools.groupedThreads;
68import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_CREATE_TIME;
69import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_NETWORK_ID;
70import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_PORT_ID;
71import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_SEGMENT_ID;
Jian Li3cb86e32020-09-07 17:01:11 +090072import static org.onosproject.k8snetworking.api.Constants.GENEVE;
73import static org.onosproject.k8snetworking.api.Constants.GRE;
Jian Libde20bf2019-01-25 17:34:43 +090074import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Li3cb86e32020-09-07 17:01:11 +090075import static org.onosproject.k8snetworking.api.Constants.VXLAN;
Jian Lib5089a52020-10-06 02:21:01 +090076import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.allK8sDevices;
Jian Li682c5e02020-09-23 16:46:12 +090077import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.existingContainerPortByMac;
78import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.existingContainerPortByName;
Jian Libde20bf2019-01-25 17:34:43 +090079import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.isContainer;
Jian Li4a7ce672019-04-09 15:20:25 +090080import static org.onosproject.k8snode.api.K8sNodeState.INIT;
Jian Li682c5e02020-09-23 16:46:12 +090081import static org.onosproject.net.AnnotationKeys.PORT_MAC;
Jian Libde20bf2019-01-25 17:34:43 +090082import static org.onosproject.net.AnnotationKeys.PORT_NAME;
83
84/**
85 * A provider used to feed host information for kubernetes.
86 */
87@Component(immediate = true, service = HostProvider.class)
88public class K8sSwitchingHostProvider extends AbstractProvider implements HostProvider {
89
90 private final Logger log = LoggerFactory.getLogger(getClass());
91
92 private static final String ERR_ADD_HOST = "Failed to add host: ";
Jian Li4aa17642019-01-30 00:01:11 +090093 private static final String SONA_HOST_SCHEME = "sona-k8s";
Jian Libde20bf2019-01-25 17:34:43 +090094
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected CoreService coreService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected DeviceService deviceService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected HostProviderRegistry hostProviderRegistry;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected HostService hostService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected MastershipService mastershipService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li4aa17642019-01-30 00:01:11 +0900111 protected K8sNetworkAdminService k8sNetworkService;
Jian Libde20bf2019-01-25 17:34:43 +0900112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY)
114 protected K8sNodeService k8sNodeService;
115
Jian Lib5089a52020-10-06 02:21:01 +0900116 @Reference(cardinality = ReferenceCardinality.MANDATORY)
117 protected K8sHostService k8sHostService;
118
Jian Libde20bf2019-01-25 17:34:43 +0900119 private HostProviderService hostProviderService;
120
121 private final ExecutorService executor = Executors.newSingleThreadExecutor(
122 groupedThreads(this.getClass().getSimpleName(), "device-event"));
123 private final InternalDeviceListener internalDeviceListener =
124 new InternalDeviceListener();
125 private final InternalK8sNodeListener internalK8sNodeListener =
126 new InternalK8sNodeListener();
Jian Li3cb86e32020-09-07 17:01:11 +0900127 private final InternalK8sNetworkListener internalK8sNetworkListener =
128 new InternalK8sNetworkListener();
Jian Libde20bf2019-01-25 17:34:43 +0900129
130 /**
131 * Creates kubernetes switching host provider.
132 */
133 public K8sSwitchingHostProvider() {
134 super(new ProviderId(SONA_HOST_SCHEME, K8S_NETWORKING_APP_ID));
135 }
136
137 @Activate
138 protected void activate() {
139 coreService.registerApplication(K8S_NETWORKING_APP_ID);
140 deviceService.addListener(internalDeviceListener);
141 k8sNodeService.addListener(internalK8sNodeListener);
Jian Li3cb86e32020-09-07 17:01:11 +0900142 k8sNetworkService.addListener(internalK8sNetworkListener);
Jian Libde20bf2019-01-25 17:34:43 +0900143 hostProviderService = hostProviderRegistry.register(this);
144
145 log.info("Started");
146 }
147
148 @Deactivate
149 protected void deactivate() {
150 hostProviderRegistry.unregister(this);
Jian Li3cb86e32020-09-07 17:01:11 +0900151 k8sNetworkService.removeListener(internalK8sNetworkListener);
Jian Libde20bf2019-01-25 17:34:43 +0900152 k8sNodeService.removeListener(internalK8sNodeListener);
153 deviceService.removeListener(internalDeviceListener);
154
155 executor.shutdown();
156
157 log.info("Stopped");
158 }
159
160
161 @Override
162 public void triggerProbe(Host host) {
163 // no probe is required
164 }
165
166 /**
167 * Processes port addition event.
168 *
169 * @param port port object used in ONOS
170 */
171 private void processPortAdded(Port port) {
Jian Li682c5e02020-09-23 16:46:12 +0900172 K8sPort k8sPort = portToK8sPortByName(port);
Jian Libde20bf2019-01-25 17:34:43 +0900173 if (k8sPort == null) {
Jian Li682c5e02020-09-23 16:46:12 +0900174 k8sPort = portToK8sPortByMac(port);
175 if (k8sPort == null) {
176 log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
177 return;
178 }
Jian Libde20bf2019-01-25 17:34:43 +0900179 }
180
181 K8sNetwork k8sNet = k8sNetworkService.network(k8sPort.networkId());
182 if (k8sNet == null) {
183 log.warn(ERR_ADD_HOST + "Kubernetes network {} not found",
184 k8sPort.networkId());
185 return;
186 }
187
188 MacAddress mac = k8sPort.macAddress();
189 HostId hostId = HostId.hostId(mac);
190
191 // connect point is the combination of switch ID with port number where
192 // the host is attached to
193 ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
194
195 long createTime = System.currentTimeMillis();
196
Jian Li4aa17642019-01-30 00:01:11 +0900197 // update k8s port number by referring to ONOS port number
198
199 k8sNetworkService.updatePort(k8sPort.updatePortNumber(port.number())
200 .updateState(K8sPort.State.ACTIVE));
201
Jian Libde20bf2019-01-25 17:34:43 +0900202 // we check whether the host already attached to same locations
203 Host host = hostService.getHost(hostId);
204
205 // build host annotations to include a set of meta info from neutron
206 DefaultAnnotations.Builder annotations = DefaultAnnotations.builder()
207 .set(ANNOTATION_NETWORK_ID, k8sPort.networkId())
208 .set(ANNOTATION_PORT_ID, k8sPort.portId())
209 .set(ANNOTATION_CREATE_TIME, String.valueOf(createTime))
210 .set(ANNOTATION_SEGMENT_ID, k8sNet.segmentId());
211
212 HostDescription hostDesc = new DefaultHostDescription(
213 mac,
214 VlanId.NONE,
215 new HostLocation(connectPoint, createTime),
216 ImmutableSet.of(k8sPort.ipAddress()),
217 annotations.build());
218
219 if (host != null) {
220 Set<HostLocation> locations = host.locations().stream()
221 .filter(l -> l.deviceId().equals(connectPoint.deviceId()))
222 .filter(l -> l.port().equals(connectPoint.port()))
223 .collect(Collectors.toSet());
224
225 // newly added location is not in the existing location list,
226 // therefore, we simply add this into the location list
227 if (locations.isEmpty()) {
228 hostProviderService.addLocationToHost(hostId,
229 new HostLocation(connectPoint, createTime));
230 }
231
232 // newly added location is in the existing location list,
233 // the hostDetected method invocation in turn triggers host Update event
234 if (locations.size() == 1) {
235 hostProviderService.hostDetected(hostId, hostDesc, false);
236 }
237 } else {
238 hostProviderService.hostDetected(hostId, hostDesc, false);
239 }
240 }
241
242 /**
243 * Processes port removal event.
244 *
245 * @param port ONOS port
246 */
247 private void processPortRemoved(Port port) {
248 ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
249
250 Set<Host> hosts = hostService.getConnectedHosts(connectPoint);
251
252 hosts.forEach(h -> hostProviderService.hostVanished(h.id()));
Jian Li4a7ce672019-04-09 15:20:25 +0900253
Jian Li682c5e02020-09-23 16:46:12 +0900254 K8sPort k8sPort = portToK8sPortByName(port);
Jian Li4a7ce672019-04-09 15:20:25 +0900255
256 if (k8sPort == null) {
Jian Li682c5e02020-09-23 16:46:12 +0900257 k8sPort = portToK8sPortByMac(port);
258 if (k8sPort == null) {
259 log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
260 return;
261 }
Jian Li4a7ce672019-04-09 15:20:25 +0900262 }
263
264 k8sNetworkService.removePort(k8sPort.portId());
265 }
266
267 /**
268 * Process port inactivate event.
269 *
270 * @param port ONOS port
271 */
272 private void processPortInactivated(Port port) {
Jian Li682c5e02020-09-23 16:46:12 +0900273 K8sPort k8sPort = portToK8sPortByName(port);
Jian Li4a7ce672019-04-09 15:20:25 +0900274
275 if (k8sPort == null) {
Jian Li682c5e02020-09-23 16:46:12 +0900276 k8sPort = portToK8sPortByMac(port);
277 if (k8sPort == null) {
278 log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
279 return;
280 }
Jian Li4a7ce672019-04-09 15:20:25 +0900281 }
282
283 k8sNetworkService.updatePort(k8sPort.updateState(K8sPort.State.INACTIVE));
Jian Libde20bf2019-01-25 17:34:43 +0900284 }
285
286 /**
287 * Converts ONOS port to kubernetes port.
288 *
289 * @param port ONOS port
290 * @return mapped kubernetes port
291 */
Jian Li682c5e02020-09-23 16:46:12 +0900292 private K8sPort portToK8sPortByName(Port port) {
Jian Libde20bf2019-01-25 17:34:43 +0900293 String portName = port.annotations().value(PORT_NAME);
294 if (Strings.isNullOrEmpty(portName)) {
295 return null;
296 }
297
298 if (isContainer(portName)) {
299 return k8sNetworkService.ports().stream()
Jian Li682c5e02020-09-23 16:46:12 +0900300 .filter(p -> existingContainerPortByName(p.portId(), portName))
301 .findAny().orElse(null);
302 } else {
303 return null;
304 }
305 }
306
307 /**
308 * Converts ONOS port to kubernetes port.
309 *
310 * @param port ONOS port
311 * @return mapped kubernetes port
312 */
313 private K8sPort portToK8sPortByMac(Port port) {
314 String portName = port.annotations().value(PORT_NAME);
315 String portMac = port.annotations().value(PORT_MAC);
316 if (Strings.isNullOrEmpty(portMac) || Strings.isNullOrEmpty(portName)) {
317 return null;
318 }
319
320 if (isContainer(portName)) {
321 return k8sNetworkService.ports().stream()
322 .filter(p -> existingContainerPortByMac(p.macAddress().toString(), portMac))
Jian Libde20bf2019-01-25 17:34:43 +0900323 .findAny().orElse(null);
324 } else {
325 return null;
326 }
327 }
328
329 private class InternalDeviceListener implements DeviceListener {
330
331 @Override
332 public boolean isRelevant(DeviceEvent event) {
333 Port port = event.port();
334 if (port == null) {
335 return false;
336 }
337
338 String portName = port.annotations().value(PORT_NAME);
Jian Lib5089a52020-10-06 02:21:01 +0900339 DeviceId deviceId = event.subject().id();
Jian Libde20bf2019-01-25 17:34:43 +0900340
Jian Lib5089a52020-10-06 02:21:01 +0900341 return !Strings.isNullOrEmpty(portName) && isContainer(portName) &&
342 allK8sDevices(k8sNodeService, k8sHostService).contains(deviceId);
Jian Libde20bf2019-01-25 17:34:43 +0900343 }
344
345 private boolean isRelevantHelper(DeviceEvent event) {
346 return mastershipService.isLocalMaster(event.subject().id());
347 }
348
349 @Override
350 public void event(DeviceEvent event) {
351 log.info("Device event occurred with type {}", event.type());
352
353 switch (event.type()) {
354 case PORT_UPDATED:
355 executor.execute(() -> processPortUpdate(event));
356 break;
357 case PORT_ADDED:
358 executor.execute(() -> processPortAddition(event));
359 break;
360 case PORT_REMOVED:
361 executor.execute(() -> processPortRemoval(event));
362 break;
363 default:
364 break;
365 }
366 }
367
368 private void processPortUpdate(DeviceEvent event) {
369 if (!isRelevantHelper(event)) {
370 return;
371 }
372
Jian Li4aa17642019-01-30 00:01:11 +0900373 log.debug("K8s port {} is updated at {}",
374 event.port().annotations().value(PORT_NAME),
375 event.subject().id());
376
Jian Libde20bf2019-01-25 17:34:43 +0900377 if (!event.port().isEnabled()) {
378 processPortRemoval(event);
379 } else if (event.port().isEnabled()) {
380 processPortAddition(event);
381 }
382 }
383
384 private void processPortAddition(DeviceEvent event) {
385 if (!isRelevantHelper(event)) {
386 return;
387 }
388
Jian Li4aa17642019-01-30 00:01:11 +0900389 log.debug("K8s port {} is detected from {}",
390 event.port().annotations().value(PORT_NAME),
391 event.subject().id());
392
393 processPortAdded(event.port());
Jian Libde20bf2019-01-25 17:34:43 +0900394 }
395
396 private void processPortRemoval(DeviceEvent event) {
397 if (!isRelevantHelper(event)) {
398 return;
399 }
400
Jian Li4aa17642019-01-30 00:01:11 +0900401 log.debug("K8s port {} is removed from {}",
402 event.port().annotations().value(PORT_NAME),
403 event.subject().id());
404
405 processPortRemoved(event.port());
Jian Libde20bf2019-01-25 17:34:43 +0900406 }
407 }
408
409 private class InternalK8sNodeListener implements K8sNodeListener {
410
411 private boolean isRelevantHelper(K8sNodeEvent event) {
412 // do not allow to proceed without mastership
413 Device device = deviceService.getDevice(event.subject().intgBridge());
414 if (device == null) {
415 return false;
416 }
417 return mastershipService.isLocalMaster(device.id());
418 }
419
420 @Override
421 public void event(K8sNodeEvent event) {
422 K8sNode k8sNode = event.subject();
423
424 switch (event.type()) {
425 case K8S_NODE_COMPLETE:
Jian Li4a7ce672019-04-09 15:20:25 +0900426 executor.execute(() -> processCompleteNode(event, k8sNode));
Jian Libde20bf2019-01-25 17:34:43 +0900427 break;
Jian Li4a7ce672019-04-09 15:20:25 +0900428 case K8S_NODE_UPDATED:
429 if (k8sNode.state() == INIT) {
430 executor.execute(() -> processIncompleteNode(event, k8sNode));
431 }
Jian Libde20bf2019-01-25 17:34:43 +0900432 break;
433 case K8S_NODE_CREATED:
Jian Libde20bf2019-01-25 17:34:43 +0900434 case K8S_NODE_REMOVED:
Jian Li4a7ce672019-04-09 15:20:25 +0900435 case K8S_NODE_INCOMPLETE:
Jian Libde20bf2019-01-25 17:34:43 +0900436 default:
437 break;
438 }
439 }
440
441 private void processCompleteNode(K8sNodeEvent event, K8sNode k8sNode) {
442 if (!isRelevantHelper(event)) {
443 return;
444 }
445
446 log.info("COMPLETE node {} is detected", k8sNode.hostname());
447
448 deviceService.getPorts(k8sNode.intgBridge()).stream()
449 .filter(port -> isContainer(port.annotations().value(PORT_NAME)))
450 .filter(Port::isEnabled)
451 .forEach(port -> {
452 log.debug("Container port {} is detected from {}",
453 port.annotations().value(PORT_NAME),
454 k8sNode.hostname());
455 processPortAdded(port);
456 });
457
458 Tools.stream(hostService.getHosts())
459 .filter(host -> deviceService.getPort(
460 host.location().deviceId(),
461 host.location().port()) == null)
462 .forEach(host -> {
463 log.info("Remove stale host {}", host.id());
464 hostProviderService.hostVanished(host.id());
465 });
466 }
Jian Li4a7ce672019-04-09 15:20:25 +0900467
468 private void processIncompleteNode(K8sNodeEvent event, K8sNode k8sNode) {
469 if (!isRelevantHelper(event)) {
470 return;
471 }
472
473 log.info("INIT node {} is detected", k8sNode.hostname());
474
475 deviceService.getPorts(k8sNode.intgBridge()).stream()
476 .filter(port -> isContainer(port.annotations().value(PORT_NAME)))
477 .filter(Port::isEnabled)
478 .forEach(port -> {
479 log.debug("Container port {} is detected from {}",
480 port.annotations().value(PORT_NAME),
481 k8sNode.hostname());
482 processPortInactivated(port);
483 });
484 }
Jian Libde20bf2019-01-25 17:34:43 +0900485 }
Jian Li3cb86e32020-09-07 17:01:11 +0900486
487 private class InternalK8sNetworkListener implements K8sNetworkListener {
488
489 @Override
490 public void event(K8sNetworkEvent event) {
491 switch (event.type()) {
492 case K8S_PORT_CREATED:
493 executor.execute(() -> processK8sPortAddition(event));
494 break;
495 default:
496 break;
497 }
498 }
499
500 private void processK8sPortAddition(K8sNetworkEvent event) {
Jian Li682c5e02020-09-23 16:46:12 +0900501 String mac = event.port().macAddress().toString();
Jian Li3cb86e32020-09-07 17:01:11 +0900502 for (Device device : deviceService.getDevices()) {
503 Port port = deviceService.getPorts(device.id()).stream()
504 .filter(Port::isEnabled)
Jian Li682c5e02020-09-23 16:46:12 +0900505 .filter(p -> p.annotations().value(PORT_MAC) != null)
Jian Li3cb86e32020-09-07 17:01:11 +0900506 .filter(p -> p.annotations().value(PORT_NAME) != null)
Jian Li682c5e02020-09-23 16:46:12 +0900507 .filter(p -> existingContainerPortByMac(mac, p.annotations().value(PORT_MAC)))
Jian Li3cb86e32020-09-07 17:01:11 +0900508 .findAny().orElse(null);
509
510 if (port != null) {
511 String upperPortName = port.annotations().value(PORT_NAME).toUpperCase();
512 // we do not handle tunnel typed port
513 if (upperPortName.contains(VXLAN) || upperPortName.contains(GRE) ||
514 upperPortName.contains(GENEVE)) {
515 continue;
516 }
517
518 // if we have null device ID, we simply update the device ID on the k8s port
519 if (event.port().deviceId() == null) {
520 K8sPort updated = event.port().updateDeviceId(device.id());
521 k8sNetworkService.updatePort(updated);
522 }
523
524 processPortAdded(port);
525 }
526 }
527 }
528 }
Jian Libde20bf2019-01-25 17:34:43 +0900529}