blob: 167d4ca251f1b9314b8f035cc9043683ee32a2dc [file] [log] [blame]
Jian Libde20bf2019-01-25 17:34:43 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.base.Strings;
19import com.google.common.collect.ImmutableSet;
20import org.onlab.packet.MacAddress;
21import org.onlab.packet.VlanId;
22import org.onlab.util.Tools;
23import org.onosproject.core.CoreService;
24import org.onosproject.k8snetworking.api.K8sNetwork;
Jian Li4aa17642019-01-30 00:01:11 +090025import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
Jian Li732c3422020-09-07 17:01:11 +090026import org.onosproject.k8snetworking.api.K8sNetworkEvent;
27import org.onosproject.k8snetworking.api.K8sNetworkListener;
Jian Libde20bf2019-01-25 17:34:43 +090028import org.onosproject.k8snetworking.api.K8sPort;
29import org.onosproject.k8snode.api.K8sNode;
30import org.onosproject.k8snode.api.K8sNodeEvent;
31import org.onosproject.k8snode.api.K8sNodeListener;
32import org.onosproject.k8snode.api.K8sNodeService;
33import org.onosproject.mastership.MastershipService;
34import org.onosproject.net.ConnectPoint;
35import org.onosproject.net.DefaultAnnotations;
36import org.onosproject.net.Device;
37import org.onosproject.net.Host;
38import org.onosproject.net.HostId;
39import org.onosproject.net.HostLocation;
40import org.onosproject.net.Port;
41import org.onosproject.net.device.DeviceEvent;
42import org.onosproject.net.device.DeviceListener;
43import org.onosproject.net.device.DeviceService;
44import org.onosproject.net.host.DefaultHostDescription;
45import org.onosproject.net.host.HostDescription;
46import org.onosproject.net.host.HostProvider;
47import org.onosproject.net.host.HostProviderRegistry;
48import org.onosproject.net.host.HostProviderService;
49import org.onosproject.net.host.HostService;
50import org.onosproject.net.provider.AbstractProvider;
51import org.onosproject.net.provider.ProviderId;
52import org.osgi.service.component.annotations.Activate;
53import org.osgi.service.component.annotations.Component;
54import org.osgi.service.component.annotations.Deactivate;
55import org.osgi.service.component.annotations.Reference;
56import org.osgi.service.component.annotations.ReferenceCardinality;
57import org.slf4j.Logger;
58import org.slf4j.LoggerFactory;
59
60import java.util.Set;
61import java.util.concurrent.ExecutorService;
62import java.util.concurrent.Executors;
63import java.util.stream.Collectors;
64
65import static org.onlab.util.Tools.groupedThreads;
66import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_CREATE_TIME;
67import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_NETWORK_ID;
68import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_PORT_ID;
69import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_SEGMENT_ID;
Jian Li732c3422020-09-07 17:01:11 +090070import static org.onosproject.k8snetworking.api.Constants.GENEVE;
71import static org.onosproject.k8snetworking.api.Constants.GRE;
Jian Libde20bf2019-01-25 17:34:43 +090072import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Li732c3422020-09-07 17:01:11 +090073import static org.onosproject.k8snetworking.api.Constants.VXLAN;
74import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.existingContainerPort;
Jian Libde20bf2019-01-25 17:34:43 +090075import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.isContainer;
Jian Li4a7ce672019-04-09 15:20:25 +090076import static org.onosproject.k8snode.api.K8sNodeState.INIT;
Jian Libde20bf2019-01-25 17:34:43 +090077import static org.onosproject.net.AnnotationKeys.PORT_NAME;
78
79/**
80 * A provider used to feed host information for kubernetes.
81 */
82@Component(immediate = true, service = HostProvider.class)
83public class K8sSwitchingHostProvider extends AbstractProvider implements HostProvider {
84
85 private final Logger log = LoggerFactory.getLogger(getClass());
86
87 private static final String ERR_ADD_HOST = "Failed to add host: ";
Jian Li4aa17642019-01-30 00:01:11 +090088 private static final String SONA_HOST_SCHEME = "sona-k8s";
Jian Libde20bf2019-01-25 17:34:43 +090089
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 protected CoreService coreService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY)
94 protected DeviceService deviceService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected HostProviderRegistry hostProviderRegistry;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected HostService hostService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 protected MastershipService mastershipService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li4aa17642019-01-30 00:01:11 +0900106 protected K8sNetworkAdminService k8sNetworkService;
Jian Libde20bf2019-01-25 17:34:43 +0900107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
109 protected K8sNodeService k8sNodeService;
110
111 private HostProviderService hostProviderService;
112
113 private final ExecutorService executor = Executors.newSingleThreadExecutor(
114 groupedThreads(this.getClass().getSimpleName(), "device-event"));
115 private final InternalDeviceListener internalDeviceListener =
116 new InternalDeviceListener();
117 private final InternalK8sNodeListener internalK8sNodeListener =
118 new InternalK8sNodeListener();
Jian Li732c3422020-09-07 17:01:11 +0900119 private final InternalK8sNetworkListener internalK8sNetworkListener =
120 new InternalK8sNetworkListener();
Jian Libde20bf2019-01-25 17:34:43 +0900121
122 /**
123 * Creates kubernetes switching host provider.
124 */
125 public K8sSwitchingHostProvider() {
126 super(new ProviderId(SONA_HOST_SCHEME, K8S_NETWORKING_APP_ID));
127 }
128
129 @Activate
130 protected void activate() {
131 coreService.registerApplication(K8S_NETWORKING_APP_ID);
132 deviceService.addListener(internalDeviceListener);
133 k8sNodeService.addListener(internalK8sNodeListener);
Jian Li732c3422020-09-07 17:01:11 +0900134 k8sNetworkService.addListener(internalK8sNetworkListener);
Jian Libde20bf2019-01-25 17:34:43 +0900135 hostProviderService = hostProviderRegistry.register(this);
136
137 log.info("Started");
138 }
139
140 @Deactivate
141 protected void deactivate() {
142 hostProviderRegistry.unregister(this);
Jian Li732c3422020-09-07 17:01:11 +0900143 k8sNetworkService.removeListener(internalK8sNetworkListener);
Jian Libde20bf2019-01-25 17:34:43 +0900144 k8sNodeService.removeListener(internalK8sNodeListener);
145 deviceService.removeListener(internalDeviceListener);
146
147 executor.shutdown();
148
149 log.info("Stopped");
150 }
151
152
153 @Override
154 public void triggerProbe(Host host) {
155 // no probe is required
156 }
157
158 /**
159 * Processes port addition event.
160 *
161 * @param port port object used in ONOS
162 */
163 private void processPortAdded(Port port) {
164 K8sPort k8sPort = portToK8sPort(port);
165 if (k8sPort == null) {
166 log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
167 return;
168 }
169
170 K8sNetwork k8sNet = k8sNetworkService.network(k8sPort.networkId());
171 if (k8sNet == null) {
172 log.warn(ERR_ADD_HOST + "Kubernetes network {} not found",
173 k8sPort.networkId());
174 return;
175 }
176
177 MacAddress mac = k8sPort.macAddress();
178 HostId hostId = HostId.hostId(mac);
179
180 // connect point is the combination of switch ID with port number where
181 // the host is attached to
182 ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
183
184 long createTime = System.currentTimeMillis();
185
Jian Li4aa17642019-01-30 00:01:11 +0900186 // update k8s port number by referring to ONOS port number
187
188 k8sNetworkService.updatePort(k8sPort.updatePortNumber(port.number())
189 .updateState(K8sPort.State.ACTIVE));
190
Jian Libde20bf2019-01-25 17:34:43 +0900191 // we check whether the host already attached to same locations
192 Host host = hostService.getHost(hostId);
193
194 // build host annotations to include a set of meta info from neutron
195 DefaultAnnotations.Builder annotations = DefaultAnnotations.builder()
196 .set(ANNOTATION_NETWORK_ID, k8sPort.networkId())
197 .set(ANNOTATION_PORT_ID, k8sPort.portId())
198 .set(ANNOTATION_CREATE_TIME, String.valueOf(createTime))
199 .set(ANNOTATION_SEGMENT_ID, k8sNet.segmentId());
200
201 HostDescription hostDesc = new DefaultHostDescription(
202 mac,
203 VlanId.NONE,
204 new HostLocation(connectPoint, createTime),
205 ImmutableSet.of(k8sPort.ipAddress()),
206 annotations.build());
207
208 if (host != null) {
209 Set<HostLocation> locations = host.locations().stream()
210 .filter(l -> l.deviceId().equals(connectPoint.deviceId()))
211 .filter(l -> l.port().equals(connectPoint.port()))
212 .collect(Collectors.toSet());
213
214 // newly added location is not in the existing location list,
215 // therefore, we simply add this into the location list
216 if (locations.isEmpty()) {
217 hostProviderService.addLocationToHost(hostId,
218 new HostLocation(connectPoint, createTime));
219 }
220
221 // newly added location is in the existing location list,
222 // the hostDetected method invocation in turn triggers host Update event
223 if (locations.size() == 1) {
224 hostProviderService.hostDetected(hostId, hostDesc, false);
225 }
226 } else {
227 hostProviderService.hostDetected(hostId, hostDesc, false);
228 }
229 }
230
231 /**
232 * Processes port removal event.
233 *
234 * @param port ONOS port
235 */
236 private void processPortRemoved(Port port) {
237 ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
238
239 Set<Host> hosts = hostService.getConnectedHosts(connectPoint);
240
241 hosts.forEach(h -> hostProviderService.hostVanished(h.id()));
Jian Li4a7ce672019-04-09 15:20:25 +0900242
243 K8sPort k8sPort = portToK8sPort(port);
244
245 if (k8sPort == null) {
246 log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
247 return;
248 }
249
250 k8sNetworkService.removePort(k8sPort.portId());
251 }
252
253 /**
254 * Process port inactivate event.
255 *
256 * @param port ONOS port
257 */
258 private void processPortInactivated(Port port) {
259 K8sPort k8sPort = portToK8sPort(port);
260
261 if (k8sPort == null) {
262 log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
263 return;
264 }
265
266 k8sNetworkService.updatePort(k8sPort.updateState(K8sPort.State.INACTIVE));
Jian Libde20bf2019-01-25 17:34:43 +0900267 }
268
269 /**
270 * Converts ONOS port to kubernetes port.
271 *
272 * @param port ONOS port
273 * @return mapped kubernetes port
274 */
275 private K8sPort portToK8sPort(Port port) {
276 String portName = port.annotations().value(PORT_NAME);
277 if (Strings.isNullOrEmpty(portName)) {
278 return null;
279 }
280
281 if (isContainer(portName)) {
282 return k8sNetworkService.ports().stream()
Jian Li732c3422020-09-07 17:01:11 +0900283 .filter(p -> existingContainerPort(p.portId(), portName))
Jian Libde20bf2019-01-25 17:34:43 +0900284 .findAny().orElse(null);
285 } else {
286 return null;
287 }
288 }
289
290 private class InternalDeviceListener implements DeviceListener {
291
292 @Override
293 public boolean isRelevant(DeviceEvent event) {
294 Port port = event.port();
295 if (port == null) {
296 return false;
297 }
298
299 String portName = port.annotations().value(PORT_NAME);
300
Jian Li732c3422020-09-07 17:01:11 +0900301 return !Strings.isNullOrEmpty(portName) && isContainer(portName);
Jian Libde20bf2019-01-25 17:34:43 +0900302 }
303
304 private boolean isRelevantHelper(DeviceEvent event) {
305 return mastershipService.isLocalMaster(event.subject().id());
306 }
307
308 @Override
309 public void event(DeviceEvent event) {
310 log.info("Device event occurred with type {}", event.type());
311
312 switch (event.type()) {
313 case PORT_UPDATED:
314 executor.execute(() -> processPortUpdate(event));
315 break;
316 case PORT_ADDED:
317 executor.execute(() -> processPortAddition(event));
318 break;
319 case PORT_REMOVED:
320 executor.execute(() -> processPortRemoval(event));
321 break;
322 default:
323 break;
324 }
325 }
326
327 private void processPortUpdate(DeviceEvent event) {
328 if (!isRelevantHelper(event)) {
329 return;
330 }
331
Jian Li4aa17642019-01-30 00:01:11 +0900332 log.debug("K8s port {} is updated at {}",
333 event.port().annotations().value(PORT_NAME),
334 event.subject().id());
335
Jian Libde20bf2019-01-25 17:34:43 +0900336 if (!event.port().isEnabled()) {
337 processPortRemoval(event);
338 } else if (event.port().isEnabled()) {
339 processPortAddition(event);
340 }
341 }
342
343 private void processPortAddition(DeviceEvent event) {
344 if (!isRelevantHelper(event)) {
345 return;
346 }
347
Jian Li4aa17642019-01-30 00:01:11 +0900348 log.debug("K8s port {} is detected from {}",
349 event.port().annotations().value(PORT_NAME),
350 event.subject().id());
351
352 processPortAdded(event.port());
Jian Libde20bf2019-01-25 17:34:43 +0900353 }
354
355 private void processPortRemoval(DeviceEvent event) {
356 if (!isRelevantHelper(event)) {
357 return;
358 }
359
Jian Li4aa17642019-01-30 00:01:11 +0900360 log.debug("K8s port {} is removed from {}",
361 event.port().annotations().value(PORT_NAME),
362 event.subject().id());
363
364 processPortRemoved(event.port());
Jian Libde20bf2019-01-25 17:34:43 +0900365 }
366 }
367
368 private class InternalK8sNodeListener implements K8sNodeListener {
369
370 private boolean isRelevantHelper(K8sNodeEvent event) {
371 // do not allow to proceed without mastership
372 Device device = deviceService.getDevice(event.subject().intgBridge());
373 if (device == null) {
374 return false;
375 }
376 return mastershipService.isLocalMaster(device.id());
377 }
378
379 @Override
380 public void event(K8sNodeEvent event) {
381 K8sNode k8sNode = event.subject();
382
383 switch (event.type()) {
384 case K8S_NODE_COMPLETE:
Jian Li4a7ce672019-04-09 15:20:25 +0900385 executor.execute(() -> processCompleteNode(event, k8sNode));
Jian Libde20bf2019-01-25 17:34:43 +0900386 break;
Jian Li4a7ce672019-04-09 15:20:25 +0900387 case K8S_NODE_UPDATED:
388 if (k8sNode.state() == INIT) {
389 executor.execute(() -> processIncompleteNode(event, k8sNode));
390 }
Jian Libde20bf2019-01-25 17:34:43 +0900391 break;
392 case K8S_NODE_CREATED:
Jian Libde20bf2019-01-25 17:34:43 +0900393 case K8S_NODE_REMOVED:
Jian Li4a7ce672019-04-09 15:20:25 +0900394 case K8S_NODE_INCOMPLETE:
Jian Libde20bf2019-01-25 17:34:43 +0900395 default:
396 break;
397 }
398 }
399
400 private void processCompleteNode(K8sNodeEvent event, K8sNode k8sNode) {
401 if (!isRelevantHelper(event)) {
402 return;
403 }
404
405 log.info("COMPLETE node {} is detected", k8sNode.hostname());
406
407 deviceService.getPorts(k8sNode.intgBridge()).stream()
408 .filter(port -> isContainer(port.annotations().value(PORT_NAME)))
409 .filter(Port::isEnabled)
410 .forEach(port -> {
411 log.debug("Container port {} is detected from {}",
412 port.annotations().value(PORT_NAME),
413 k8sNode.hostname());
414 processPortAdded(port);
415 });
416
417 Tools.stream(hostService.getHosts())
418 .filter(host -> deviceService.getPort(
419 host.location().deviceId(),
420 host.location().port()) == null)
421 .forEach(host -> {
422 log.info("Remove stale host {}", host.id());
423 hostProviderService.hostVanished(host.id());
424 });
425 }
Jian Li4a7ce672019-04-09 15:20:25 +0900426
427 private void processIncompleteNode(K8sNodeEvent event, K8sNode k8sNode) {
428 if (!isRelevantHelper(event)) {
429 return;
430 }
431
432 log.info("INIT node {} is detected", k8sNode.hostname());
433
434 deviceService.getPorts(k8sNode.intgBridge()).stream()
435 .filter(port -> isContainer(port.annotations().value(PORT_NAME)))
436 .filter(Port::isEnabled)
437 .forEach(port -> {
438 log.debug("Container port {} is detected from {}",
439 port.annotations().value(PORT_NAME),
440 k8sNode.hostname());
441 processPortInactivated(port);
442 });
443 }
Jian Libde20bf2019-01-25 17:34:43 +0900444 }
Jian Li732c3422020-09-07 17:01:11 +0900445
446 private class InternalK8sNetworkListener implements K8sNetworkListener {
447
448 @Override
449 public void event(K8sNetworkEvent event) {
450 switch (event.type()) {
451 case K8S_PORT_CREATED:
452 executor.execute(() -> processK8sPortAddition(event));
453 break;
454 default:
455 break;
456 }
457 }
458
459 private void processK8sPortAddition(K8sNetworkEvent event) {
460 String portId = event.port().portId();
461 for (Device device : deviceService.getDevices()) {
462 Port port = deviceService.getPorts(device.id()).stream()
463 .filter(Port::isEnabled)
464 .filter(p -> p.annotations().value(PORT_NAME) != null)
465 .filter(p -> existingContainerPort(portId, p.annotations().value(PORT_NAME)))
466 .findAny().orElse(null);
467
468 if (port != null) {
469 String upperPortName = port.annotations().value(PORT_NAME).toUpperCase();
470 // we do not handle tunnel typed port
471 if (upperPortName.contains(VXLAN) || upperPortName.contains(GRE) ||
472 upperPortName.contains(GENEVE)) {
473 continue;
474 }
475
476 // if we have null device ID, we simply update the device ID on the k8s port
477 if (event.port().deviceId() == null) {
478 K8sPort updated = event.port().updateDeviceId(device.id());
479 k8sNetworkService.updatePort(updated);
480 }
481
482 processPortAdded(port);
483 }
484 }
485 }
486 }
Jian Libde20bf2019-01-25 17:34:43 +0900487}