blob: 1dec9d11633e8c028a1ffdc0c77b0e60ad288459 [file] [log] [blame]
Jian Libde20bf2019-01-25 17:34:43 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.base.Strings;
19import com.google.common.collect.ImmutableSet;
20import org.onlab.packet.MacAddress;
21import org.onlab.packet.VlanId;
22import org.onlab.util.Tools;
23import org.onosproject.core.CoreService;
24import org.onosproject.k8snetworking.api.K8sNetwork;
Jian Li4aa17642019-01-30 00:01:11 +090025import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
Jian Libde20bf2019-01-25 17:34:43 +090026import org.onosproject.k8snetworking.api.K8sPort;
27import org.onosproject.k8snode.api.K8sNode;
28import org.onosproject.k8snode.api.K8sNodeEvent;
29import org.onosproject.k8snode.api.K8sNodeListener;
30import org.onosproject.k8snode.api.K8sNodeService;
31import org.onosproject.mastership.MastershipService;
32import org.onosproject.net.ConnectPoint;
33import org.onosproject.net.DefaultAnnotations;
34import org.onosproject.net.Device;
35import org.onosproject.net.Host;
36import org.onosproject.net.HostId;
37import org.onosproject.net.HostLocation;
38import org.onosproject.net.Port;
39import org.onosproject.net.device.DeviceEvent;
40import org.onosproject.net.device.DeviceListener;
41import org.onosproject.net.device.DeviceService;
42import org.onosproject.net.host.DefaultHostDescription;
43import org.onosproject.net.host.HostDescription;
44import org.onosproject.net.host.HostProvider;
45import org.onosproject.net.host.HostProviderRegistry;
46import org.onosproject.net.host.HostProviderService;
47import org.onosproject.net.host.HostService;
48import org.onosproject.net.provider.AbstractProvider;
49import org.onosproject.net.provider.ProviderId;
50import org.osgi.service.component.annotations.Activate;
51import org.osgi.service.component.annotations.Component;
52import org.osgi.service.component.annotations.Deactivate;
53import org.osgi.service.component.annotations.Reference;
54import org.osgi.service.component.annotations.ReferenceCardinality;
55import org.slf4j.Logger;
56import org.slf4j.LoggerFactory;
57
58import java.util.Set;
59import java.util.concurrent.ExecutorService;
60import java.util.concurrent.Executors;
61import java.util.stream.Collectors;
62
63import static org.onlab.util.Tools.groupedThreads;
64import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_CREATE_TIME;
65import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_NETWORK_ID;
66import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_PORT_ID;
67import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_SEGMENT_ID;
68import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
69import static org.onosproject.k8snetworking.api.Constants.PORT_NAME_PREFIX_CONTAINER;
70import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.isContainer;
71import static org.onosproject.net.AnnotationKeys.PORT_NAME;
72
73/**
74 * A provider used to feed host information for kubernetes.
75 */
76@Component(immediate = true, service = HostProvider.class)
77public class K8sSwitchingHostProvider extends AbstractProvider implements HostProvider {
78
79 private final Logger log = LoggerFactory.getLogger(getClass());
80
81 private static final String ERR_ADD_HOST = "Failed to add host: ";
Jian Li4aa17642019-01-30 00:01:11 +090082 private static final String SONA_HOST_SCHEME = "sona-k8s";
83 private static final int PORT_PREFIX_LENGTH = 4;
Jian Libde20bf2019-01-25 17:34:43 +090084
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 protected CoreService coreService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected DeviceService deviceService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY)
92 protected HostProviderRegistry hostProviderRegistry;
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected HostService hostService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY)
98 protected MastershipService mastershipService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li4aa17642019-01-30 00:01:11 +0900101 protected K8sNetworkAdminService k8sNetworkService;
Jian Libde20bf2019-01-25 17:34:43 +0900102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected K8sNodeService k8sNodeService;
105
106 private HostProviderService hostProviderService;
107
108 private final ExecutorService executor = Executors.newSingleThreadExecutor(
109 groupedThreads(this.getClass().getSimpleName(), "device-event"));
110 private final InternalDeviceListener internalDeviceListener =
111 new InternalDeviceListener();
112 private final InternalK8sNodeListener internalK8sNodeListener =
113 new InternalK8sNodeListener();
114
115 /**
116 * Creates kubernetes switching host provider.
117 */
118 public K8sSwitchingHostProvider() {
119 super(new ProviderId(SONA_HOST_SCHEME, K8S_NETWORKING_APP_ID));
120 }
121
122 @Activate
123 protected void activate() {
124 coreService.registerApplication(K8S_NETWORKING_APP_ID);
125 deviceService.addListener(internalDeviceListener);
126 k8sNodeService.addListener(internalK8sNodeListener);
127 hostProviderService = hostProviderRegistry.register(this);
128
129 log.info("Started");
130 }
131
132 @Deactivate
133 protected void deactivate() {
134 hostProviderRegistry.unregister(this);
135 k8sNodeService.removeListener(internalK8sNodeListener);
136 deviceService.removeListener(internalDeviceListener);
137
138 executor.shutdown();
139
140 log.info("Stopped");
141 }
142
143
144 @Override
145 public void triggerProbe(Host host) {
146 // no probe is required
147 }
148
149 /**
150 * Processes port addition event.
151 *
152 * @param port port object used in ONOS
153 */
154 private void processPortAdded(Port port) {
155 K8sPort k8sPort = portToK8sPort(port);
156 if (k8sPort == null) {
157 log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
158 return;
159 }
160
161 K8sNetwork k8sNet = k8sNetworkService.network(k8sPort.networkId());
162 if (k8sNet == null) {
163 log.warn(ERR_ADD_HOST + "Kubernetes network {} not found",
164 k8sPort.networkId());
165 return;
166 }
167
168 MacAddress mac = k8sPort.macAddress();
169 HostId hostId = HostId.hostId(mac);
170
171 // connect point is the combination of switch ID with port number where
172 // the host is attached to
173 ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
174
175 long createTime = System.currentTimeMillis();
176
Jian Li4aa17642019-01-30 00:01:11 +0900177 // update k8s port number by referring to ONOS port number
178
179 k8sNetworkService.updatePort(k8sPort.updatePortNumber(port.number())
180 .updateState(K8sPort.State.ACTIVE));
181
Jian Libde20bf2019-01-25 17:34:43 +0900182 // we check whether the host already attached to same locations
183 Host host = hostService.getHost(hostId);
184
185 // build host annotations to include a set of meta info from neutron
186 DefaultAnnotations.Builder annotations = DefaultAnnotations.builder()
187 .set(ANNOTATION_NETWORK_ID, k8sPort.networkId())
188 .set(ANNOTATION_PORT_ID, k8sPort.portId())
189 .set(ANNOTATION_CREATE_TIME, String.valueOf(createTime))
190 .set(ANNOTATION_SEGMENT_ID, k8sNet.segmentId());
191
192 HostDescription hostDesc = new DefaultHostDescription(
193 mac,
194 VlanId.NONE,
195 new HostLocation(connectPoint, createTime),
196 ImmutableSet.of(k8sPort.ipAddress()),
197 annotations.build());
198
199 if (host != null) {
200 Set<HostLocation> locations = host.locations().stream()
201 .filter(l -> l.deviceId().equals(connectPoint.deviceId()))
202 .filter(l -> l.port().equals(connectPoint.port()))
203 .collect(Collectors.toSet());
204
205 // newly added location is not in the existing location list,
206 // therefore, we simply add this into the location list
207 if (locations.isEmpty()) {
208 hostProviderService.addLocationToHost(hostId,
209 new HostLocation(connectPoint, createTime));
210 }
211
212 // newly added location is in the existing location list,
213 // the hostDetected method invocation in turn triggers host Update event
214 if (locations.size() == 1) {
215 hostProviderService.hostDetected(hostId, hostDesc, false);
216 }
217 } else {
218 hostProviderService.hostDetected(hostId, hostDesc, false);
219 }
220 }
221
222 /**
223 * Processes port removal event.
224 *
225 * @param port ONOS port
226 */
227 private void processPortRemoved(Port port) {
228 ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
229
230 Set<Host> hosts = hostService.getConnectedHosts(connectPoint);
231
232 hosts.forEach(h -> hostProviderService.hostVanished(h.id()));
233 }
234
235 /**
236 * Converts ONOS port to kubernetes port.
237 *
238 * @param port ONOS port
239 * @return mapped kubernetes port
240 */
241 private K8sPort portToK8sPort(Port port) {
242 String portName = port.annotations().value(PORT_NAME);
243 if (Strings.isNullOrEmpty(portName)) {
244 return null;
245 }
246
247 if (isContainer(portName)) {
248 return k8sNetworkService.ports().stream()
249 .filter(p -> p.portId().contains(portName.substring(PORT_PREFIX_LENGTH)))
250 .findAny().orElse(null);
251 } else {
252 return null;
253 }
254 }
255
256 private class InternalDeviceListener implements DeviceListener {
257
258 @Override
259 public boolean isRelevant(DeviceEvent event) {
260 Port port = event.port();
261 if (port == null) {
262 return false;
263 }
264
265 String portName = port.annotations().value(PORT_NAME);
266
267 return !Strings.isNullOrEmpty(portName) &&
268 portName.startsWith(PORT_NAME_PREFIX_CONTAINER);
269 }
270
271 private boolean isRelevantHelper(DeviceEvent event) {
272 return mastershipService.isLocalMaster(event.subject().id());
273 }
274
275 @Override
276 public void event(DeviceEvent event) {
277 log.info("Device event occurred with type {}", event.type());
278
279 switch (event.type()) {
280 case PORT_UPDATED:
281 executor.execute(() -> processPortUpdate(event));
282 break;
283 case PORT_ADDED:
284 executor.execute(() -> processPortAddition(event));
285 break;
286 case PORT_REMOVED:
287 executor.execute(() -> processPortRemoval(event));
288 break;
289 default:
290 break;
291 }
292 }
293
294 private void processPortUpdate(DeviceEvent event) {
295 if (!isRelevantHelper(event)) {
296 return;
297 }
298
Jian Li4aa17642019-01-30 00:01:11 +0900299 log.debug("K8s port {} is updated at {}",
300 event.port().annotations().value(PORT_NAME),
301 event.subject().id());
302
Jian Libde20bf2019-01-25 17:34:43 +0900303 if (!event.port().isEnabled()) {
304 processPortRemoval(event);
305 } else if (event.port().isEnabled()) {
306 processPortAddition(event);
307 }
308 }
309
310 private void processPortAddition(DeviceEvent event) {
311 if (!isRelevantHelper(event)) {
312 return;
313 }
314
Jian Li4aa17642019-01-30 00:01:11 +0900315 log.debug("K8s port {} is detected from {}",
316 event.port().annotations().value(PORT_NAME),
317 event.subject().id());
318
319 processPortAdded(event.port());
Jian Libde20bf2019-01-25 17:34:43 +0900320 }
321
322 private void processPortRemoval(DeviceEvent event) {
323 if (!isRelevantHelper(event)) {
324 return;
325 }
326
Jian Li4aa17642019-01-30 00:01:11 +0900327 log.debug("K8s port {} is removed from {}",
328 event.port().annotations().value(PORT_NAME),
329 event.subject().id());
330
331 processPortRemoved(event.port());
Jian Libde20bf2019-01-25 17:34:43 +0900332 }
333 }
334
335 private class InternalK8sNodeListener implements K8sNodeListener {
336
337 private boolean isRelevantHelper(K8sNodeEvent event) {
338 // do not allow to proceed without mastership
339 Device device = deviceService.getDevice(event.subject().intgBridge());
340 if (device == null) {
341 return false;
342 }
343 return mastershipService.isLocalMaster(device.id());
344 }
345
346 @Override
347 public void event(K8sNodeEvent event) {
348 K8sNode k8sNode = event.subject();
349
350 switch (event.type()) {
351 case K8S_NODE_COMPLETE:
352 executor.execute(() -> processCompleteNode(event, event.subject()));
353 break;
354 case K8S_NODE_INCOMPLETE:
355 log.warn("{} is changed to INCOMPLETE state", k8sNode);
356 break;
357 case K8S_NODE_CREATED:
358 case K8S_NODE_UPDATED:
359 case K8S_NODE_REMOVED:
360 default:
361 break;
362 }
363 }
364
365 private void processCompleteNode(K8sNodeEvent event, K8sNode k8sNode) {
366 if (!isRelevantHelper(event)) {
367 return;
368 }
369
370 log.info("COMPLETE node {} is detected", k8sNode.hostname());
371
372 deviceService.getPorts(k8sNode.intgBridge()).stream()
373 .filter(port -> isContainer(port.annotations().value(PORT_NAME)))
374 .filter(Port::isEnabled)
375 .forEach(port -> {
376 log.debug("Container port {} is detected from {}",
377 port.annotations().value(PORT_NAME),
378 k8sNode.hostname());
379 processPortAdded(port);
380 });
381
382 Tools.stream(hostService.getHosts())
383 .filter(host -> deviceService.getPort(
384 host.location().deviceId(),
385 host.location().port()) == null)
386 .forEach(host -> {
387 log.info("Remove stale host {}", host.id());
388 hostProviderService.hostVanished(host.id());
389 });
390 }
391 }
392}