blob: f7f78c94b89f6127e74f4a08973d80f033c39140 [file] [log] [blame]
Jian Libde20bf2019-01-25 17:34:43 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.base.Strings;
19import com.google.common.collect.ImmutableSet;
20import org.onlab.packet.MacAddress;
21import org.onlab.packet.VlanId;
22import org.onlab.util.Tools;
23import org.onosproject.core.CoreService;
24import org.onosproject.k8snetworking.api.K8sNetwork;
25import org.onosproject.k8snetworking.api.K8sNetworkService;
26import org.onosproject.k8snetworking.api.K8sPort;
27import org.onosproject.k8snode.api.K8sNode;
28import org.onosproject.k8snode.api.K8sNodeEvent;
29import org.onosproject.k8snode.api.K8sNodeListener;
30import org.onosproject.k8snode.api.K8sNodeService;
31import org.onosproject.mastership.MastershipService;
32import org.onosproject.net.ConnectPoint;
33import org.onosproject.net.DefaultAnnotations;
34import org.onosproject.net.Device;
35import org.onosproject.net.Host;
36import org.onosproject.net.HostId;
37import org.onosproject.net.HostLocation;
38import org.onosproject.net.Port;
39import org.onosproject.net.device.DeviceEvent;
40import org.onosproject.net.device.DeviceListener;
41import org.onosproject.net.device.DeviceService;
42import org.onosproject.net.host.DefaultHostDescription;
43import org.onosproject.net.host.HostDescription;
44import org.onosproject.net.host.HostProvider;
45import org.onosproject.net.host.HostProviderRegistry;
46import org.onosproject.net.host.HostProviderService;
47import org.onosproject.net.host.HostService;
48import org.onosproject.net.provider.AbstractProvider;
49import org.onosproject.net.provider.ProviderId;
50import org.osgi.service.component.annotations.Activate;
51import org.osgi.service.component.annotations.Component;
52import org.osgi.service.component.annotations.Deactivate;
53import org.osgi.service.component.annotations.Reference;
54import org.osgi.service.component.annotations.ReferenceCardinality;
55import org.slf4j.Logger;
56import org.slf4j.LoggerFactory;
57
58import java.util.Set;
59import java.util.concurrent.ExecutorService;
60import java.util.concurrent.Executors;
61import java.util.stream.Collectors;
62
63import static org.onlab.util.Tools.groupedThreads;
64import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_CREATE_TIME;
65import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_NETWORK_ID;
66import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_PORT_ID;
67import static org.onosproject.k8snetworking.api.Constants.ANNOTATION_SEGMENT_ID;
68import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
69import static org.onosproject.k8snetworking.api.Constants.PORT_NAME_PREFIX_CONTAINER;
70import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.isContainer;
71import static org.onosproject.net.AnnotationKeys.PORT_NAME;
72
73/**
74 * A provider used to feed host information for kubernetes.
75 */
76@Component(immediate = true, service = HostProvider.class)
77public class K8sSwitchingHostProvider extends AbstractProvider implements HostProvider {
78
79 private final Logger log = LoggerFactory.getLogger(getClass());
80
81 private static final String ERR_ADD_HOST = "Failed to add host: ";
82 private static final String SONA_HOST_SCHEME = "sona";
83 private static final int PORT_PREFIX_LENGTH = 3;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 protected CoreService coreService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected DeviceService deviceService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY)
92 protected HostProviderRegistry hostProviderRegistry;
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected HostService hostService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY)
98 protected MastershipService mastershipService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected K8sNetworkService k8sNetworkService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected K8sNodeService k8sNodeService;
105
106 private HostProviderService hostProviderService;
107
108 private final ExecutorService executor = Executors.newSingleThreadExecutor(
109 groupedThreads(this.getClass().getSimpleName(), "device-event"));
110 private final InternalDeviceListener internalDeviceListener =
111 new InternalDeviceListener();
112 private final InternalK8sNodeListener internalK8sNodeListener =
113 new InternalK8sNodeListener();
114
115 /**
116 * Creates kubernetes switching host provider.
117 */
118 public K8sSwitchingHostProvider() {
119 super(new ProviderId(SONA_HOST_SCHEME, K8S_NETWORKING_APP_ID));
120 }
121
122 @Activate
123 protected void activate() {
124 coreService.registerApplication(K8S_NETWORKING_APP_ID);
125 deviceService.addListener(internalDeviceListener);
126 k8sNodeService.addListener(internalK8sNodeListener);
127 hostProviderService = hostProviderRegistry.register(this);
128
129 log.info("Started");
130 }
131
132 @Deactivate
133 protected void deactivate() {
134 hostProviderRegistry.unregister(this);
135 k8sNodeService.removeListener(internalK8sNodeListener);
136 deviceService.removeListener(internalDeviceListener);
137
138 executor.shutdown();
139
140 log.info("Stopped");
141 }
142
143
144 @Override
145 public void triggerProbe(Host host) {
146 // no probe is required
147 }
148
149 /**
150 * Processes port addition event.
151 *
152 * @param port port object used in ONOS
153 */
154 private void processPortAdded(Port port) {
155 K8sPort k8sPort = portToK8sPort(port);
156 if (k8sPort == null) {
157 log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
158 return;
159 }
160
161 K8sNetwork k8sNet = k8sNetworkService.network(k8sPort.networkId());
162 if (k8sNet == null) {
163 log.warn(ERR_ADD_HOST + "Kubernetes network {} not found",
164 k8sPort.networkId());
165 return;
166 }
167
168 MacAddress mac = k8sPort.macAddress();
169 HostId hostId = HostId.hostId(mac);
170
171 // connect point is the combination of switch ID with port number where
172 // the host is attached to
173 ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
174
175 long createTime = System.currentTimeMillis();
176
177 // we check whether the host already attached to same locations
178 Host host = hostService.getHost(hostId);
179
180 // build host annotations to include a set of meta info from neutron
181 DefaultAnnotations.Builder annotations = DefaultAnnotations.builder()
182 .set(ANNOTATION_NETWORK_ID, k8sPort.networkId())
183 .set(ANNOTATION_PORT_ID, k8sPort.portId())
184 .set(ANNOTATION_CREATE_TIME, String.valueOf(createTime))
185 .set(ANNOTATION_SEGMENT_ID, k8sNet.segmentId());
186
187 HostDescription hostDesc = new DefaultHostDescription(
188 mac,
189 VlanId.NONE,
190 new HostLocation(connectPoint, createTime),
191 ImmutableSet.of(k8sPort.ipAddress()),
192 annotations.build());
193
194 if (host != null) {
195 Set<HostLocation> locations = host.locations().stream()
196 .filter(l -> l.deviceId().equals(connectPoint.deviceId()))
197 .filter(l -> l.port().equals(connectPoint.port()))
198 .collect(Collectors.toSet());
199
200 // newly added location is not in the existing location list,
201 // therefore, we simply add this into the location list
202 if (locations.isEmpty()) {
203 hostProviderService.addLocationToHost(hostId,
204 new HostLocation(connectPoint, createTime));
205 }
206
207 // newly added location is in the existing location list,
208 // the hostDetected method invocation in turn triggers host Update event
209 if (locations.size() == 1) {
210 hostProviderService.hostDetected(hostId, hostDesc, false);
211 }
212 } else {
213 hostProviderService.hostDetected(hostId, hostDesc, false);
214 }
215 }
216
217 /**
218 * Processes port removal event.
219 *
220 * @param port ONOS port
221 */
222 private void processPortRemoved(Port port) {
223 ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
224
225 Set<Host> hosts = hostService.getConnectedHosts(connectPoint);
226
227 hosts.forEach(h -> hostProviderService.hostVanished(h.id()));
228 }
229
230 /**
231 * Converts ONOS port to kubernetes port.
232 *
233 * @param port ONOS port
234 * @return mapped kubernetes port
235 */
236 private K8sPort portToK8sPort(Port port) {
237 String portName = port.annotations().value(PORT_NAME);
238 if (Strings.isNullOrEmpty(portName)) {
239 return null;
240 }
241
242 if (isContainer(portName)) {
243 return k8sNetworkService.ports().stream()
244 .filter(p -> p.portId().contains(portName.substring(PORT_PREFIX_LENGTH)))
245 .findAny().orElse(null);
246 } else {
247 return null;
248 }
249 }
250
251 private class InternalDeviceListener implements DeviceListener {
252
253 @Override
254 public boolean isRelevant(DeviceEvent event) {
255 Port port = event.port();
256 if (port == null) {
257 return false;
258 }
259
260 String portName = port.annotations().value(PORT_NAME);
261
262 return !Strings.isNullOrEmpty(portName) &&
263 portName.startsWith(PORT_NAME_PREFIX_CONTAINER);
264 }
265
266 private boolean isRelevantHelper(DeviceEvent event) {
267 return mastershipService.isLocalMaster(event.subject().id());
268 }
269
270 @Override
271 public void event(DeviceEvent event) {
272 log.info("Device event occurred with type {}", event.type());
273
274 switch (event.type()) {
275 case PORT_UPDATED:
276 executor.execute(() -> processPortUpdate(event));
277 break;
278 case PORT_ADDED:
279 executor.execute(() -> processPortAddition(event));
280 break;
281 case PORT_REMOVED:
282 executor.execute(() -> processPortRemoval(event));
283 break;
284 default:
285 break;
286 }
287 }
288
289 private void processPortUpdate(DeviceEvent event) {
290 if (!isRelevantHelper(event)) {
291 return;
292 }
293
294 if (!event.port().isEnabled()) {
295 processPortRemoval(event);
296 } else if (event.port().isEnabled()) {
297 processPortAddition(event);
298 }
299 }
300
301 private void processPortAddition(DeviceEvent event) {
302 if (!isRelevantHelper(event)) {
303 return;
304 }
305
306 processPortAddition(event);
307 }
308
309 private void processPortRemoval(DeviceEvent event) {
310 if (!isRelevantHelper(event)) {
311 return;
312 }
313
314 processPortRemoval(event);
315 }
316 }
317
318 private class InternalK8sNodeListener implements K8sNodeListener {
319
320 private boolean isRelevantHelper(K8sNodeEvent event) {
321 // do not allow to proceed without mastership
322 Device device = deviceService.getDevice(event.subject().intgBridge());
323 if (device == null) {
324 return false;
325 }
326 return mastershipService.isLocalMaster(device.id());
327 }
328
329 @Override
330 public void event(K8sNodeEvent event) {
331 K8sNode k8sNode = event.subject();
332
333 switch (event.type()) {
334 case K8S_NODE_COMPLETE:
335 executor.execute(() -> processCompleteNode(event, event.subject()));
336 break;
337 case K8S_NODE_INCOMPLETE:
338 log.warn("{} is changed to INCOMPLETE state", k8sNode);
339 break;
340 case K8S_NODE_CREATED:
341 case K8S_NODE_UPDATED:
342 case K8S_NODE_REMOVED:
343 default:
344 break;
345 }
346 }
347
348 private void processCompleteNode(K8sNodeEvent event, K8sNode k8sNode) {
349 if (!isRelevantHelper(event)) {
350 return;
351 }
352
353 log.info("COMPLETE node {} is detected", k8sNode.hostname());
354
355 deviceService.getPorts(k8sNode.intgBridge()).stream()
356 .filter(port -> isContainer(port.annotations().value(PORT_NAME)))
357 .filter(Port::isEnabled)
358 .forEach(port -> {
359 log.debug("Container port {} is detected from {}",
360 port.annotations().value(PORT_NAME),
361 k8sNode.hostname());
362 processPortAdded(port);
363 });
364
365 Tools.stream(hostService.getHosts())
366 .filter(host -> deviceService.getPort(
367 host.location().deviceId(),
368 host.location().port()) == null)
369 .forEach(host -> {
370 log.info("Remove stale host {}", host.id());
371 hostProviderService.hostVanished(host.id());
372 });
373 }
374 }
375}