blob: c8888eefe6d884e48dc50352991104502b1099ef [file] [log] [blame]
Jian Libde20bf2019-01-25 17:34:43 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.base.Strings;
19import com.google.common.collect.ImmutableSet;
20import org.onlab.packet.MacAddress;
21import org.onlab.packet.VlanId;
22import org.onlab.util.Tools;
23import org.onosproject.core.CoreService;
24import org.onosproject.k8snetworking.api.K8sNetwork;
Jian Li4aa17642019-01-30 00:01:11 +090025import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
Jian Li732c3422020-09-07 17:01:11 +090026import org.onosproject.k8snetworking.api.K8sNetworkEvent;
27import org.onosproject.k8snetworking.api.K8sNetworkListener;
Jian Libde20bf2019-01-25 17:34:43 +090028import org.onosproject.k8snetworking.api.K8sPort;
29import org.onosproject.k8snode.api.K8sNode;
30import org.onosproject.k8snode.api.K8sNodeEvent;
31import org.onosproject.k8snode.api.K8sNodeListener;
32import org.onosproject.k8snode.api.K8sNodeService;
33import org.onosproject.mastership.MastershipService;
34import org.onosproject.net.ConnectPoint;
35import org.onosproject.net.DefaultAnnotations;
36import org.onosproject.net.Device;
37import org.onosproject.net.Host;
38import org.onosproject.net.HostId;
39import org.onosproject.net.HostLocation;
40import org.onosproject.net.Port;
41import org.onosproject.net.device.DeviceEvent;
42import org.onosproject.net.device.DeviceListener;
43import org.onosproject.net.device.DeviceService;
44import org.onosproject.net.host.DefaultHostDescription;
45import org.onosproject.net.host.HostDescription;
46import org.onosproject.net.host.HostProvider;
47import org.onosproject.net.host.HostProviderRegistry;
48import org.onosproject.net.host.HostProviderService;
49import org.onosproject.net.host.HostService;
50import org.onosproject.net.provider.AbstractProvider;
51import org.onosproject.net.provider.ProviderId;
52import org.osgi.service.component.annotations.Activate;
53import org.osgi.service.component.annotations.Component;
54import org.osgi.service.component.annotations.Deactivate;
55import org.osgi.service.component.annotations.Reference;
56import org.osgi.service.component.annotations.ReferenceCardinality;
57import org.slf4j.Logger;
58import org.slf4j.LoggerFactory;
59
60import java.util.Set;
61import java.util.concurrent.ExecutorService;
62import java.util.concurrent.Executors;
63import java.util.stream.Collectors;
64
65import static org.onlab.util.Tools.groupedThreads;
66import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_CREATE_TIME;
67import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_NETWORK_ID;
68import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_PORT_ID;
69import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_SEGMENT_ID;
Jian Li732c3422020-09-07 17:01:11 +090070import static org.onosproject.k8snetworking.api.Constants.GENEVE;
71import static org.onosproject.k8snetworking.api.Constants.GRE;
Jian Libde20bf2019-01-25 17:34:43 +090072import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Li732c3422020-09-07 17:01:11 +090073import static org.onosproject.k8snetworking.api.Constants.VXLAN;
Jian Li3bc3d5b2020-09-23 16:46:12 +090074import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.existingContainerPortByMac;
75import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.existingContainerPortByName;
Jian Libde20bf2019-01-25 17:34:43 +090076import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.isContainer;
Jian Li4a7ce672019-04-09 15:20:25 +090077import static org.onosproject.k8snode.api.K8sNodeState.INIT;
Jian Li3bc3d5b2020-09-23 16:46:12 +090078import static org.onosproject.net.AnnotationKeys.PORT_MAC;
Jian Libde20bf2019-01-25 17:34:43 +090079import static org.onosproject.net.AnnotationKeys.PORT_NAME;
80
81/**
82 * A provider used to feed host information for kubernetes.
83 */
84@Component(immediate = true, service = HostProvider.class)
85public class K8sSwitchingHostProvider extends AbstractProvider implements HostProvider {
86
87 private final Logger log = LoggerFactory.getLogger(getClass());
88
89 private static final String ERR_ADD_HOST = "Failed to add host: ";
Jian Li4aa17642019-01-30 00:01:11 +090090 private static final String SONA_HOST_SCHEME = "sona-k8s";
Jian Libde20bf2019-01-25 17:34:43 +090091
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected CoreService coreService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected DeviceService deviceService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected HostProviderRegistry hostProviderRegistry;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected HostService hostService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected MastershipService mastershipService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li4aa17642019-01-30 00:01:11 +0900108 protected K8sNetworkAdminService k8sNetworkService;
Jian Libde20bf2019-01-25 17:34:43 +0900109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
111 protected K8sNodeService k8sNodeService;
112
113 private HostProviderService hostProviderService;
114
115 private final ExecutorService executor = Executors.newSingleThreadExecutor(
116 groupedThreads(this.getClass().getSimpleName(), "device-event"));
117 private final InternalDeviceListener internalDeviceListener =
118 new InternalDeviceListener();
119 private final InternalK8sNodeListener internalK8sNodeListener =
120 new InternalK8sNodeListener();
Jian Li732c3422020-09-07 17:01:11 +0900121 private final InternalK8sNetworkListener internalK8sNetworkListener =
122 new InternalK8sNetworkListener();
Jian Libde20bf2019-01-25 17:34:43 +0900123
124 /**
125 * Creates kubernetes switching host provider.
126 */
127 public K8sSwitchingHostProvider() {
128 super(new ProviderId(SONA_HOST_SCHEME, K8S_NETWORKING_APP_ID));
129 }
130
131 @Activate
132 protected void activate() {
133 coreService.registerApplication(K8S_NETWORKING_APP_ID);
134 deviceService.addListener(internalDeviceListener);
135 k8sNodeService.addListener(internalK8sNodeListener);
Jian Li732c3422020-09-07 17:01:11 +0900136 k8sNetworkService.addListener(internalK8sNetworkListener);
Jian Libde20bf2019-01-25 17:34:43 +0900137 hostProviderService = hostProviderRegistry.register(this);
138
139 log.info("Started");
140 }
141
142 @Deactivate
143 protected void deactivate() {
144 hostProviderRegistry.unregister(this);
Jian Li732c3422020-09-07 17:01:11 +0900145 k8sNetworkService.removeListener(internalK8sNetworkListener);
Jian Libde20bf2019-01-25 17:34:43 +0900146 k8sNodeService.removeListener(internalK8sNodeListener);
147 deviceService.removeListener(internalDeviceListener);
148
149 executor.shutdown();
150
151 log.info("Stopped");
152 }
153
154
155 @Override
156 public void triggerProbe(Host host) {
157 // no probe is required
158 }
159
160 /**
161 * Processes port addition event.
162 *
163 * @param port port object used in ONOS
164 */
165 private void processPortAdded(Port port) {
Jian Li3bc3d5b2020-09-23 16:46:12 +0900166 K8sPort k8sPort = portToK8sPortByName(port);
Jian Libde20bf2019-01-25 17:34:43 +0900167 if (k8sPort == null) {
Jian Li3bc3d5b2020-09-23 16:46:12 +0900168 k8sPort = portToK8sPortByMac(port);
169 if (k8sPort == null) {
170 log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
171 return;
172 }
Jian Libde20bf2019-01-25 17:34:43 +0900173 }
174
175 K8sNetwork k8sNet = k8sNetworkService.network(k8sPort.networkId());
176 if (k8sNet == null) {
177 log.warn(ERR_ADD_HOST + "Kubernetes network {} not found",
178 k8sPort.networkId());
179 return;
180 }
181
182 MacAddress mac = k8sPort.macAddress();
183 HostId hostId = HostId.hostId(mac);
184
185 // connect point is the combination of switch ID with port number where
186 // the host is attached to
187 ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
188
189 long createTime = System.currentTimeMillis();
190
Jian Li4aa17642019-01-30 00:01:11 +0900191 // update k8s port number by referring to ONOS port number
192
193 k8sNetworkService.updatePort(k8sPort.updatePortNumber(port.number())
194 .updateState(K8sPort.State.ACTIVE));
195
Jian Libde20bf2019-01-25 17:34:43 +0900196 // we check whether the host already attached to same locations
197 Host host = hostService.getHost(hostId);
198
199 // build host annotations to include a set of meta info from neutron
200 DefaultAnnotations.Builder annotations = DefaultAnnotations.builder()
201 .set(ANNOTATION_NETWORK_ID, k8sPort.networkId())
202 .set(ANNOTATION_PORT_ID, k8sPort.portId())
203 .set(ANNOTATION_CREATE_TIME, String.valueOf(createTime))
204 .set(ANNOTATION_SEGMENT_ID, k8sNet.segmentId());
205
206 HostDescription hostDesc = new DefaultHostDescription(
207 mac,
208 VlanId.NONE,
209 new HostLocation(connectPoint, createTime),
210 ImmutableSet.of(k8sPort.ipAddress()),
211 annotations.build());
212
213 if (host != null) {
214 Set<HostLocation> locations = host.locations().stream()
215 .filter(l -> l.deviceId().equals(connectPoint.deviceId()))
216 .filter(l -> l.port().equals(connectPoint.port()))
217 .collect(Collectors.toSet());
218
219 // newly added location is not in the existing location list,
220 // therefore, we simply add this into the location list
221 if (locations.isEmpty()) {
222 hostProviderService.addLocationToHost(hostId,
223 new HostLocation(connectPoint, createTime));
224 }
225
226 // newly added location is in the existing location list,
227 // the hostDetected method invocation in turn triggers host Update event
228 if (locations.size() == 1) {
229 hostProviderService.hostDetected(hostId, hostDesc, false);
230 }
231 } else {
232 hostProviderService.hostDetected(hostId, hostDesc, false);
233 }
234 }
235
236 /**
237 * Processes port removal event.
238 *
239 * @param port ONOS port
240 */
241 private void processPortRemoved(Port port) {
242 ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
243
244 Set<Host> hosts = hostService.getConnectedHosts(connectPoint);
245
246 hosts.forEach(h -> hostProviderService.hostVanished(h.id()));
Jian Li4a7ce672019-04-09 15:20:25 +0900247
Jian Li3bc3d5b2020-09-23 16:46:12 +0900248 K8sPort k8sPort = portToK8sPortByName(port);
Jian Li4a7ce672019-04-09 15:20:25 +0900249
250 if (k8sPort == null) {
Jian Li3bc3d5b2020-09-23 16:46:12 +0900251 k8sPort = portToK8sPortByMac(port);
252 if (k8sPort == null) {
253 log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
254 return;
255 }
Jian Li4a7ce672019-04-09 15:20:25 +0900256 }
257
258 k8sNetworkService.removePort(k8sPort.portId());
259 }
260
261 /**
262 * Process port inactivate event.
263 *
264 * @param port ONOS port
265 */
266 private void processPortInactivated(Port port) {
Jian Li3bc3d5b2020-09-23 16:46:12 +0900267 K8sPort k8sPort = portToK8sPortByName(port);
Jian Li4a7ce672019-04-09 15:20:25 +0900268
269 if (k8sPort == null) {
Jian Li3bc3d5b2020-09-23 16:46:12 +0900270 k8sPort = portToK8sPortByMac(port);
271 if (k8sPort == null) {
272 log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
273 return;
274 }
Jian Li4a7ce672019-04-09 15:20:25 +0900275 }
276
277 k8sNetworkService.updatePort(k8sPort.updateState(K8sPort.State.INACTIVE));
Jian Libde20bf2019-01-25 17:34:43 +0900278 }
279
280 /**
281 * Converts ONOS port to kubernetes port.
282 *
283 * @param port ONOS port
284 * @return mapped kubernetes port
285 */
Jian Li3bc3d5b2020-09-23 16:46:12 +0900286 private K8sPort portToK8sPortByName(Port port) {
Jian Libde20bf2019-01-25 17:34:43 +0900287 String portName = port.annotations().value(PORT_NAME);
288 if (Strings.isNullOrEmpty(portName)) {
289 return null;
290 }
291
292 if (isContainer(portName)) {
293 return k8sNetworkService.ports().stream()
Jian Li3bc3d5b2020-09-23 16:46:12 +0900294 .filter(p -> existingContainerPortByName(p.portId(), portName))
295 .findAny().orElse(null);
296 } else {
297 return null;
298 }
299 }
300
301 /**
302 * Converts ONOS port to kubernetes port.
303 *
304 * @param port ONOS port
305 * @return mapped kubernetes port
306 */
307 private K8sPort portToK8sPortByMac(Port port) {
308 String portName = port.annotations().value(PORT_NAME);
309 String portMac = port.annotations().value(PORT_MAC);
310 if (Strings.isNullOrEmpty(portMac) || Strings.isNullOrEmpty(portName)) {
311 return null;
312 }
313
314 if (isContainer(portName)) {
315 return k8sNetworkService.ports().stream()
316 .filter(p -> existingContainerPortByMac(p.macAddress().toString(), portMac))
Jian Libde20bf2019-01-25 17:34:43 +0900317 .findAny().orElse(null);
318 } else {
319 return null;
320 }
321 }
322
323 private class InternalDeviceListener implements DeviceListener {
324
325 @Override
326 public boolean isRelevant(DeviceEvent event) {
327 Port port = event.port();
328 if (port == null) {
329 return false;
330 }
331
332 String portName = port.annotations().value(PORT_NAME);
333
Jian Li732c3422020-09-07 17:01:11 +0900334 return !Strings.isNullOrEmpty(portName) && isContainer(portName);
Jian Libde20bf2019-01-25 17:34:43 +0900335 }
336
337 private boolean isRelevantHelper(DeviceEvent event) {
338 return mastershipService.isLocalMaster(event.subject().id());
339 }
340
341 @Override
342 public void event(DeviceEvent event) {
343 log.info("Device event occurred with type {}", event.type());
344
345 switch (event.type()) {
346 case PORT_UPDATED:
347 executor.execute(() -> processPortUpdate(event));
348 break;
349 case PORT_ADDED:
350 executor.execute(() -> processPortAddition(event));
351 break;
352 case PORT_REMOVED:
353 executor.execute(() -> processPortRemoval(event));
354 break;
355 default:
356 break;
357 }
358 }
359
360 private void processPortUpdate(DeviceEvent event) {
361 if (!isRelevantHelper(event)) {
362 return;
363 }
364
Jian Li4aa17642019-01-30 00:01:11 +0900365 log.debug("K8s port {} is updated at {}",
366 event.port().annotations().value(PORT_NAME),
367 event.subject().id());
368
Jian Libde20bf2019-01-25 17:34:43 +0900369 if (!event.port().isEnabled()) {
370 processPortRemoval(event);
371 } else if (event.port().isEnabled()) {
372 processPortAddition(event);
373 }
374 }
375
376 private void processPortAddition(DeviceEvent event) {
377 if (!isRelevantHelper(event)) {
378 return;
379 }
380
Jian Li4aa17642019-01-30 00:01:11 +0900381 log.debug("K8s port {} is detected from {}",
382 event.port().annotations().value(PORT_NAME),
383 event.subject().id());
384
385 processPortAdded(event.port());
Jian Libde20bf2019-01-25 17:34:43 +0900386 }
387
388 private void processPortRemoval(DeviceEvent event) {
389 if (!isRelevantHelper(event)) {
390 return;
391 }
392
Jian Li4aa17642019-01-30 00:01:11 +0900393 log.debug("K8s port {} is removed from {}",
394 event.port().annotations().value(PORT_NAME),
395 event.subject().id());
396
397 processPortRemoved(event.port());
Jian Libde20bf2019-01-25 17:34:43 +0900398 }
399 }
400
401 private class InternalK8sNodeListener implements K8sNodeListener {
402
403 private boolean isRelevantHelper(K8sNodeEvent event) {
404 // do not allow to proceed without mastership
405 Device device = deviceService.getDevice(event.subject().intgBridge());
406 if (device == null) {
407 return false;
408 }
409 return mastershipService.isLocalMaster(device.id());
410 }
411
412 @Override
413 public void event(K8sNodeEvent event) {
414 K8sNode k8sNode = event.subject();
415
416 switch (event.type()) {
417 case K8S_NODE_COMPLETE:
Jian Li4a7ce672019-04-09 15:20:25 +0900418 executor.execute(() -> processCompleteNode(event, k8sNode));
Jian Libde20bf2019-01-25 17:34:43 +0900419 break;
Jian Li4a7ce672019-04-09 15:20:25 +0900420 case K8S_NODE_UPDATED:
421 if (k8sNode.state() == INIT) {
422 executor.execute(() -> processIncompleteNode(event, k8sNode));
423 }
Jian Libde20bf2019-01-25 17:34:43 +0900424 break;
425 case K8S_NODE_CREATED:
Jian Libde20bf2019-01-25 17:34:43 +0900426 case K8S_NODE_REMOVED:
Jian Li4a7ce672019-04-09 15:20:25 +0900427 case K8S_NODE_INCOMPLETE:
Jian Libde20bf2019-01-25 17:34:43 +0900428 default:
429 break;
430 }
431 }
432
433 private void processCompleteNode(K8sNodeEvent event, K8sNode k8sNode) {
434 if (!isRelevantHelper(event)) {
435 return;
436 }
437
438 log.info("COMPLETE node {} is detected", k8sNode.hostname());
439
440 deviceService.getPorts(k8sNode.intgBridge()).stream()
441 .filter(port -> isContainer(port.annotations().value(PORT_NAME)))
442 .filter(Port::isEnabled)
443 .forEach(port -> {
444 log.debug("Container port {} is detected from {}",
445 port.annotations().value(PORT_NAME),
446 k8sNode.hostname());
447 processPortAdded(port);
448 });
449
450 Tools.stream(hostService.getHosts())
451 .filter(host -> deviceService.getPort(
452 host.location().deviceId(),
453 host.location().port()) == null)
454 .forEach(host -> {
455 log.info("Remove stale host {}", host.id());
456 hostProviderService.hostVanished(host.id());
457 });
458 }
Jian Li4a7ce672019-04-09 15:20:25 +0900459
460 private void processIncompleteNode(K8sNodeEvent event, K8sNode k8sNode) {
461 if (!isRelevantHelper(event)) {
462 return;
463 }
464
465 log.info("INIT node {} is detected", k8sNode.hostname());
466
467 deviceService.getPorts(k8sNode.intgBridge()).stream()
468 .filter(port -> isContainer(port.annotations().value(PORT_NAME)))
469 .filter(Port::isEnabled)
470 .forEach(port -> {
471 log.debug("Container port {} is detected from {}",
472 port.annotations().value(PORT_NAME),
473 k8sNode.hostname());
474 processPortInactivated(port);
475 });
476 }
Jian Libde20bf2019-01-25 17:34:43 +0900477 }
Jian Li732c3422020-09-07 17:01:11 +0900478
479 private class InternalK8sNetworkListener implements K8sNetworkListener {
480
481 @Override
482 public void event(K8sNetworkEvent event) {
483 switch (event.type()) {
484 case K8S_PORT_CREATED:
485 executor.execute(() -> processK8sPortAddition(event));
486 break;
487 default:
488 break;
489 }
490 }
491
492 private void processK8sPortAddition(K8sNetworkEvent event) {
Jian Li3bc3d5b2020-09-23 16:46:12 +0900493 String mac = event.port().macAddress().toString();
Jian Li732c3422020-09-07 17:01:11 +0900494 for (Device device : deviceService.getDevices()) {
495 Port port = deviceService.getPorts(device.id()).stream()
496 .filter(Port::isEnabled)
Jian Li3bc3d5b2020-09-23 16:46:12 +0900497 .filter(p -> p.annotations().value(PORT_MAC) != null)
Jian Li732c3422020-09-07 17:01:11 +0900498 .filter(p -> p.annotations().value(PORT_NAME) != null)
Jian Li3bc3d5b2020-09-23 16:46:12 +0900499 .filter(p -> existingContainerPortByMac(mac, p.annotations().value(PORT_MAC)))
Jian Li732c3422020-09-07 17:01:11 +0900500 .findAny().orElse(null);
501
502 if (port != null) {
503 String upperPortName = port.annotations().value(PORT_NAME).toUpperCase();
504 // we do not handle tunnel typed port
505 if (upperPortName.contains(VXLAN) || upperPortName.contains(GRE) ||
506 upperPortName.contains(GENEVE)) {
507 continue;
508 }
509
510 // if we have null device ID, we simply update the device ID on the k8s port
511 if (event.port().deviceId() == null) {
512 K8sPort updated = event.port().updateDeviceId(device.id());
513 k8sNetworkService.updatePort(updated);
514 }
515
516 processPortAdded(port);
517 }
518 }
519 }
520 }
Jian Libde20bf2019-01-25 17:34:43 +0900521}