blob: b5fd93c20bf90a24dacf4fbad1b925a4b7c33332 [file] [log] [blame]
Jian Li077b07e2020-09-01 16:55:25 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import org.onosproject.cluster.ClusterService;
19import org.onosproject.cluster.LeadershipService;
20import org.onosproject.cluster.NodeId;
21import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
Jian Li019ce6a2020-09-09 10:23:21 +090023import org.onosproject.k8snode.api.K8sBridge;
Jian Li077b07e2020-09-01 16:55:25 +090024import org.onosproject.k8snode.api.K8sHost;
25import org.onosproject.k8snode.api.K8sHostAdminService;
26import org.onosproject.k8snode.api.K8sHostEvent;
27import org.onosproject.k8snode.api.K8sHostHandler;
28import org.onosproject.k8snode.api.K8sHostListener;
29import org.onosproject.k8snode.api.K8sHostState;
30import org.onosproject.k8snode.api.K8sNode;
31import org.onosproject.k8snode.api.K8sNodeAdminService;
Jian Li019ce6a2020-09-09 10:23:21 +090032import org.onosproject.k8snode.api.K8sRouterBridge;
Jian Li077b07e2020-09-01 16:55:25 +090033import org.onosproject.k8snode.api.K8sTunnelBridge;
34import org.onosproject.net.Device;
35import org.onosproject.net.DeviceId;
36import org.onosproject.net.Port;
37import org.onosproject.net.behaviour.BridgeConfig;
38import org.onosproject.net.behaviour.BridgeDescription;
39import org.onosproject.net.behaviour.ControllerInfo;
40import org.onosproject.net.behaviour.DefaultBridgeDescription;
41import org.onosproject.net.behaviour.DefaultPatchDescription;
42import org.onosproject.net.behaviour.DefaultTunnelDescription;
43import org.onosproject.net.behaviour.InterfaceConfig;
44import org.onosproject.net.behaviour.PatchDescription;
45import org.onosproject.net.behaviour.TunnelDescription;
46import org.onosproject.net.behaviour.TunnelEndPoints;
47import org.onosproject.net.behaviour.TunnelKey;
48import org.onosproject.net.device.DeviceAdminService;
49import org.onosproject.net.device.DeviceEvent;
50import org.onosproject.net.device.DeviceListener;
51import org.onosproject.net.device.DeviceService;
52import org.onosproject.ovsdb.controller.OvsdbClientService;
53import org.onosproject.ovsdb.controller.OvsdbController;
54import org.onosproject.ovsdb.controller.OvsdbNodeId;
55import org.osgi.service.component.annotations.Activate;
56import org.osgi.service.component.annotations.Component;
57import org.osgi.service.component.annotations.Deactivate;
58import org.osgi.service.component.annotations.Reference;
59import org.osgi.service.component.annotations.ReferenceCardinality;
60import org.slf4j.Logger;
61
62import java.util.List;
63import java.util.Objects;
64import java.util.concurrent.ExecutorService;
65import java.util.stream.Collectors;
66
67import static java.lang.Thread.sleep;
68import static java.util.concurrent.Executors.newSingleThreadExecutor;
69import static org.onlab.packet.TpPort.tpPort;
70import static org.onlab.util.Tools.groupedThreads;
71import static org.onosproject.k8snode.api.Constants.GENEVE;
72import static org.onosproject.k8snode.api.Constants.GRE;
Jian Li019ce6a2020-09-09 10:23:21 +090073import static org.onosproject.k8snode.api.Constants.OS_INTEGRATION_BRIDGE;
Jian Li077b07e2020-09-01 16:55:25 +090074import static org.onosproject.k8snode.api.Constants.VXLAN;
75import static org.onosproject.k8snode.api.K8sHostState.COMPLETE;
76import static org.onosproject.k8snode.api.K8sHostState.DEVICE_CREATED;
77import static org.onosproject.k8snode.api.K8sHostState.INCOMPLETE;
78import static org.onosproject.k8snode.api.K8sHostState.INIT;
79import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
80import static org.onosproject.net.AnnotationKeys.PORT_NAME;
81import static org.slf4j.LoggerFactory.getLogger;
82
83/**
84 * Service bootstraps kubernetes host.
85 */
86@Component(immediate = true)
87public class DefaultK8sHostHandler implements K8sHostHandler {
88
89 private final Logger log = getLogger(getClass());
90
91 private static final String DEFAULT_OF_PROTO = "tcp";
92 private static final int DEFAULT_OFPORT = 6653;
93 private static final int DPID_BEGIN = 3;
94 private static final long SLEEP_MS = 3000; // we wait 3s
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected CoreService coreService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected LeadershipService leadershipService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 protected ClusterService clusterService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
106 protected DeviceService deviceService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
109 protected DeviceAdminService deviceAdminService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected OvsdbController ovsdbController;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
115 protected K8sHostAdminService k8sHostAdminService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
118 protected K8sNodeAdminService k8sNodeAdminService;
119
120
121 private int ovsdbPortNum = 6640;
122
123 private final ExecutorService eventExecutor = newSingleThreadExecutor(
124 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
125
126 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
127 private final DeviceListener bridgeListener = new InternalBridgeListener();
128 private final K8sHostListener k8sHostListener = new InternalK8sHostListener();
129
130 private ApplicationId appId;
131 private NodeId localNode;
132
133 @Activate
134 protected void activate() {
135 appId = coreService.getAppId(APP_ID);
136 localNode = clusterService.getLocalNode().id();
137
138 leadershipService.runForLeadership(appId.name());
139 deviceService.addListener(ovsdbListener);
140 deviceService.addListener(bridgeListener);
141 k8sHostAdminService.addListener(k8sHostListener);
142
143 log.info("Started");
144 }
145
146 @Deactivate
147 protected void deactivate() {
148 k8sHostAdminService.removeListener(k8sHostListener);
149 deviceService.removeListener(bridgeListener);
150 deviceService.removeListener(ovsdbListener);
151 leadershipService.withdraw(appId.name());
152 eventExecutor.shutdown();
153
154 log.info("Stopped");
155 }
156
157 @Override
158 public void processInitState(K8sHost k8sHost) {
159 if (!isOvsdbConnected(k8sHost, ovsdbPortNum, ovsdbController, deviceService)) {
160 ovsdbController.connect(k8sHost.hostIp(), tpPort(ovsdbPortNum));
161 return;
162 }
163
164 for (K8sTunnelBridge tunBridge : k8sHost.tunBridges()) {
165 if (!deviceService.isAvailable(tunBridge.deviceId())) {
166 createBridge(k8sHost.ovsdb(), tunBridge);
167 }
168 }
Jian Li019ce6a2020-09-09 10:23:21 +0900169
170 for (K8sRouterBridge routerBridge : k8sHost.routerBridges()) {
171 if (!deviceService.isAvailable(routerBridge.deviceId())) {
172 createBridge(k8sHost.ovsdb(), routerBridge);
173 }
174 }
Jian Li077b07e2020-09-01 16:55:25 +0900175 }
176
177 @Override
178 public void processDeviceCreatedState(K8sHost k8sHost) {
179 try {
180 if (!isOvsdbConnected(k8sHost, ovsdbPortNum, ovsdbController, deviceService)) {
181 ovsdbController.connect(k8sHost.hostIp(), tpPort(ovsdbPortNum));
182 return;
183 }
184
185 // create patch ports into tunnel bridge face to integration bridge
186 for (K8sTunnelBridge bridge : k8sHost.tunBridges()) {
187 for (String node : k8sHost.nodeNames()) {
188 K8sNode k8sNode = k8sNodeAdminService.node(node);
189 if (k8sNode.segmentId() == bridge.tunnelId()) {
Jian Li019ce6a2020-09-09 10:23:21 +0900190 createTunnelPatchInterfaces(k8sHost.ovsdb(), bridge, k8sNode);
191 createInterPatchInterfaces(k8sHost.ovsdb(), k8sNode);
Jian Li077b07e2020-09-01 16:55:25 +0900192 }
193 }
194 }
195
196 // create tunnel ports
197 for (K8sTunnelBridge bridge : k8sHost.tunBridges()) {
198 if (!isTunPortEnabled(bridge, bridge.vxlanPortName())) {
199 createVxlanTunnelInterface(k8sHost.ovsdb(), bridge);
200 }
201
202 if (!isTunPortEnabled(bridge, bridge.grePortName())) {
203 createGreTunnelInterface(k8sHost.ovsdb(), bridge);
204 }
205
206 if (!isTunPortEnabled(bridge, bridge.genevePortName())) {
207 createGeneveTunnelInterface(k8sHost.ovsdb(), bridge);
208 }
209 }
Jian Li019ce6a2020-09-09 10:23:21 +0900210
211 // create patch ports into router bridge face to external bridge
212 for (K8sRouterBridge bridge : k8sHost.routerBridges()) {
213 for (String node : k8sHost.nodeNames()) {
214 K8sNode k8sNode = k8sNodeAdminService.node(node);
215 if (k8sNode.segmentId() == bridge.segmentId()) {
216 createRouterPatchInterfaces(k8sHost.ovsdb(), bridge, k8sNode);
217 }
218 }
219 }
Jian Li077b07e2020-09-01 16:55:25 +0900220 } catch (Exception e) {
221 log.error("Exception occurred because of {}", e);
222 }
223 }
224
225 @Override
226 public void processCompleteState(K8sHost k8sHost) {
227 // do something if needed
228 }
229
230 @Override
231 public void processIncompleteState(K8sHost k8sHost) {
232 // do something if needed
233 }
234
Jian Li019ce6a2020-09-09 10:23:21 +0900235 private void createBridge(DeviceId ovsdb, K8sBridge bridge) {
Jian Li077b07e2020-09-01 16:55:25 +0900236 Device device = deviceService.getDevice(ovsdb);
237
238 List<ControllerInfo> controllers = clusterService.getNodes().stream()
239 .map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
240 .collect(Collectors.toList());
241
242 String dpid = bridge.dpid().substring(DPID_BEGIN);
243
244 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
245 .name(bridge.name())
246 .failMode(BridgeDescription.FailMode.SECURE)
247 .datapathId(dpid)
248 .disableInBand()
249 .controllers(controllers);
250
251 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
252 bridgeConfig.addBridge(builder.build());
253 }
254
Jian Li019ce6a2020-09-09 10:23:21 +0900255 private void createTunnelPatchInterfaces(DeviceId ovsdb, K8sBridge bridge, K8sNode k8sNode) {
Jian Li077b07e2020-09-01 16:55:25 +0900256 Device device = deviceService.getDevice(ovsdb);
257 if (device == null || !device.is(InterfaceConfig.class)) {
258 log.error("Failed to create patch interface on {}", ovsdb);
259 return;
260 }
261
262 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
263
Jian Li019ce6a2020-09-09 10:23:21 +0900264 // tunnel bridge -> k8s integration bridge
Jian Li077b07e2020-09-01 16:55:25 +0900265 PatchDescription brTunIntPatchDesc =
266 DefaultPatchDescription.builder()
267 .deviceId(bridge.name())
268 .ifaceName(k8sNode.tunToIntgPatchPortName())
269 .peer(k8sNode.intgToTunPatchPortName())
270 .build();
271
272 ifaceConfig.addPatchMode(k8sNode.tunToIntgPatchPortName(), brTunIntPatchDesc);
273 }
274
Jian Li019ce6a2020-09-09 10:23:21 +0900275 private void createInterPatchInterfaces(DeviceId ovsdb, K8sNode k8sNode) {
276 Device device = deviceService.getDevice(ovsdb);
277 if (device == null || !device.is(InterfaceConfig.class)) {
278 log.error("Failed to create patch interface on {}", ovsdb);
279 return;
280 }
281
282 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
283
284 // openstack integration bridge -> k8s integration bridge
285 PatchDescription osIntK8sIntPatchDesc =
286 DefaultPatchDescription.builder()
287 .deviceId(OS_INTEGRATION_BRIDGE)
288 .ifaceName(k8sNode.osToK8sIntgPatchPortName())
Jian Lia4d8fba2020-09-10 23:16:50 +0900289 .peer(k8sNode.k8sIntgToOsPatchPortName())
Jian Li019ce6a2020-09-09 10:23:21 +0900290 .build();
Jian Lia4d8fba2020-09-10 23:16:50 +0900291 ifaceConfig.addPatchMode(k8sNode.osToK8sIntgPatchPortName(), osIntK8sIntPatchDesc);
Jian Li019ce6a2020-09-09 10:23:21 +0900292
Jian Lia4d8fba2020-09-10 23:16:50 +0900293 // openstack integration bridge -> k8s external bridge
294 PatchDescription osIntK8sExPatchDesc =
295 DefaultPatchDescription.builder()
296 .deviceId(OS_INTEGRATION_BRIDGE)
297 .ifaceName(k8sNode.osToK8sExtPatchPortName())
298 .peer(k8sNode.k8sExtToOsPatchPortName())
299 .build();
300 ifaceConfig.addPatchMode(k8sNode.osToK8sExtPatchPortName(), osIntK8sExPatchDesc);
Jian Li019ce6a2020-09-09 10:23:21 +0900301 }
302
303 private void createRouterPatchInterfaces(DeviceId ovsdb, K8sBridge bridge, K8sNode k8sNode) {
304 Device device = deviceService.getDevice(ovsdb);
305 if (device == null || !device.is(InterfaceConfig.class)) {
306 log.error("Failed to create patch interface on {}", ovsdb);
307 return;
308 }
309
310 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
311
312 // router bridge -> external bridge
313 PatchDescription brRouterExtPatchDesc =
314 DefaultPatchDescription.builder()
315 .deviceId(bridge.name())
316 .ifaceName(k8sNode.routerToExtPatchPortName())
317 .peer(k8sNode.extToRouterPatchPortName())
318 .build();
319
320 ifaceConfig.addPatchMode(k8sNode.tunToIntgPatchPortName(), brRouterExtPatchDesc);
321 }
322
Jian Li077b07e2020-09-01 16:55:25 +0900323 private void createVxlanTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
324 createTunnelInterface(ovsdb, bridge, VXLAN, bridge.vxlanPortName());
325 }
326
327 private void createGreTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
328 createTunnelInterface(ovsdb, bridge, GRE, bridge.grePortName());
329 }
330
331 private void createGeneveTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
332 createTunnelInterface(ovsdb, bridge, GENEVE, bridge.genevePortName());
333 }
334
335 private void createTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge,
336 String type, String intfName) {
337 if (isTunPortEnabled(bridge, intfName)) {
338 return;
339 }
340
341 Device device = deviceService.getDevice(ovsdb);
342 if (device == null || !device.is(InterfaceConfig.class)) {
343 log.error("Failed to create tunnel interface on {}", ovsdb);
344 return;
345 }
346
347 TunnelDescription tunnelDesc = buildTunnelDesc(bridge, type, intfName);
348
349 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
350 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
351 }
352
353 private TunnelDescription buildTunnelDesc(K8sTunnelBridge bridge, String type, String intfName) {
354 TunnelKey<String> key = new TunnelKey<>(String.valueOf(bridge.tunnelId()));
355
356 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
357 TunnelDescription.Builder tdBuilder =
358 DefaultTunnelDescription.builder()
359 .deviceId(bridge.name())
360 .ifaceName(intfName)
361 .remote(TunnelEndPoints.flowTunnelEndpoint())
362 .key(key);
363
364 switch (type) {
365 case VXLAN:
366 tdBuilder.type(TunnelDescription.Type.VXLAN);
367 break;
368 case GRE:
369 tdBuilder.type(TunnelDescription.Type.GRE);
370 break;
371 case GENEVE:
372 tdBuilder.type(TunnelDescription.Type.GENEVE);
373 break;
374 default:
375 return null;
376 }
377
378 return tdBuilder.build();
379 }
380
381 return null;
382 }
383
384 private boolean isOvsdbConnected(K8sHost host, int ovsdbPort,
385 OvsdbController ovsdbController,
386 DeviceService deviceService) {
387 OvsdbClientService client = getOvsdbClient(host, ovsdbPort, ovsdbController);
388 return deviceService.isAvailable(host.ovsdb()) &&
389 client != null &&
390 client.isConnected();
391 }
392
393 private OvsdbClientService getOvsdbClient(K8sHost host, int ovsdbPort,
394 OvsdbController ovsdbController) {
395 OvsdbNodeId ovsdb = new OvsdbNodeId(host.hostIp(), ovsdbPort);
396 return ovsdbController.getOvsdbClient(ovsdb);
397 }
398
399 private boolean isCurrentStateDone(K8sHost k8sHost) {
400 switch (k8sHost.state()) {
401 case INIT:
402 return isInitStateDone(k8sHost);
403 case DEVICE_CREATED:
404 return isDeviceCreatedStateDone(k8sHost);
405 case COMPLETE:
406 case INCOMPLETE:
407 return false;
408 default:
409 return true;
410 }
411 }
412
413 private boolean isInitStateDone(K8sHost k8sHost) {
414 if (!isOvsdbConnected(k8sHost, ovsdbPortNum,
415 ovsdbController, deviceService)) {
416 return false;
417 }
418
419 try {
420 // we need to wait a while, in case interface and bridge
421 // creation requires some time
422 sleep(SLEEP_MS);
423 } catch (InterruptedException e) {
424 log.error("Exception caused during init state checking...");
425 }
426
Jian Li019ce6a2020-09-09 10:23:21 +0900427 for (K8sBridge tunBridge : k8sHost.tunBridges()) {
Jian Li077b07e2020-09-01 16:55:25 +0900428 if (!deviceService.isAvailable(tunBridge.deviceId())) {
429 return false;
430 }
431 }
432
Jian Li019ce6a2020-09-09 10:23:21 +0900433 for (K8sBridge routerBridge: k8sHost.routerBridges()) {
434 if (!deviceService.isAvailable(routerBridge.deviceId())) {
435 return false;
436 }
437 }
438
Jian Li077b07e2020-09-01 16:55:25 +0900439 return true;
440 }
441
442 private boolean isDeviceCreatedStateDone(K8sHost k8sHost) {
443 try {
444 // we need to wait a while, in case interface and bridge
445 // creation requires some time
446 sleep(SLEEP_MS);
447 } catch (InterruptedException e) {
448 log.error("Exception caused during init state checking...");
449 }
450
Jian Lidc1df642020-11-25 16:49:34 +0900451 // checks whether all tunneling ports exist
Jian Li077b07e2020-09-01 16:55:25 +0900452 for (K8sTunnelBridge bridge: k8sHost.tunBridges()) {
453 if (!isTunPortEnabled(bridge, bridge.vxlanPortName())) {
454 return false;
455 }
456 if (!isTunPortEnabled(bridge, bridge.grePortName())) {
457 return false;
458 }
459 if (!isTunPortEnabled(bridge, bridge.genevePortName())) {
460 return false;
461 }
462 }
463
Jian Lidc1df642020-11-25 16:49:34 +0900464 // checks whether all patch ports attached to tunnel bridge exist
465 for (K8sTunnelBridge bridge : k8sHost.tunBridges()) {
466 for (String node : k8sHost.nodeNames()) {
467 K8sNode k8sNode = k8sNodeAdminService.node(node);
468 if (!isTunPortEnabled(bridge, k8sNode.tunToIntgPatchPortName())) {
469 return false;
470 }
471 }
472 }
473
474 // checks whether all patch ports attached to router bridge exist
475 for (K8sRouterBridge bridge : k8sHost.routerBridges()) {
476 for (String node : k8sHost.nodeNames()) {
477 K8sNode k8sNode = k8sNodeAdminService.node(node);
478 if (!isRouterPortEnabled(bridge, k8sNode.routerToExtPatchPortName())) {
479 return false;
480 }
481 }
482 }
483
Jian Li077b07e2020-09-01 16:55:25 +0900484 return true;
485 }
486
487 private boolean isTunPortEnabled(K8sTunnelBridge tunBridge, String intf) {
488 return deviceService.isAvailable(tunBridge.deviceId()) &&
489 deviceService.getPorts(tunBridge.deviceId()).stream()
490 .anyMatch(port -> Objects.equals(
491 port.annotations().value(PORT_NAME), intf) &&
492 port.isEnabled());
493 }
494
Jian Lidc1df642020-11-25 16:49:34 +0900495 private boolean isRouterPortEnabled(K8sRouterBridge routerBridge, String intf) {
496 return deviceService.isAvailable(routerBridge.deviceId()) &&
497 deviceService.getPorts(routerBridge.deviceId()).stream()
498 .anyMatch(port -> Objects.equals(
499 port.annotations().value(PORT_NAME), intf) &&
500 port.isEnabled());
501 }
502
Jian Li077b07e2020-09-01 16:55:25 +0900503 /**
504 * Configures the kubernetes host with new state.
505 *
506 * @param k8sHost kubernetes host
507 * @param newState a new state
508 */
509 private void setState(K8sHost k8sHost, K8sHostState newState) {
510 if (k8sHost.state() == newState) {
511 return;
512 }
513 K8sHost updated = k8sHost.updateState(newState);
514 k8sHostAdminService.updateHost(updated);
515 log.info("Changed {} state: {}", k8sHost.hostIp(), newState);
516 }
517
518 /**
519 * Bootstraps a new kubernetes host.
520 *
521 * @param k8sHost kubernetes host
522 */
523 private void bootstrapHost(K8sHost k8sHost) {
524 if (isCurrentStateDone(k8sHost)) {
525 setState(k8sHost, k8sHost.state().nextState());
526 } else {
527 log.trace("Processing {} state for {}", k8sHost.state(),
528 k8sHost.hostIp());
529 k8sHost.state().process(this, k8sHost);
530 }
531 }
532
Jian Li32a28ad2020-12-01 00:35:50 +0900533 private void processHostRemoval(K8sHost k8sHost) {
534 OvsdbClientService client = getOvsdbClient(k8sHost, ovsdbPortNum, ovsdbController);
535 if (client == null) {
536 log.info("Failed to get ovsdb client");
537 return;
538 }
539
540 // delete tunnel bridge from the host
541 k8sHost.tunBridges().forEach(br -> client.dropBridge(br.name()));
542
543 // delete router bridge from the host
544 k8sHost.routerBridges().forEach(br -> client.dropBridge(br.name()));
545 }
546
Jian Li077b07e2020-09-01 16:55:25 +0900547 private class InternalOvsdbListener implements DeviceListener {
548
549 @Override
550 public boolean isRelevant(DeviceEvent event) {
551 return event.subject().type() == Device.Type.CONTROLLER;
552 }
553
554 private boolean isRelevantHelper() {
555 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
556 }
557
558 @Override
559 public void event(DeviceEvent event) {
560 Device device = event.subject();
561
562 switch (event.type()) {
563 case DEVICE_AVAILABILITY_CHANGED:
564 case DEVICE_ADDED:
565 eventExecutor.execute(() -> {
566 if (!isRelevantHelper()) {
567 return;
568 }
569
570 K8sHost k8sHost = k8sHostAdminService.host(device.id());
571
572 if (k8sHost == null) {
573 return;
574 }
575
576 if (deviceService.isAvailable(device.id())) {
577 log.debug("OVSDB {} detected", device.id());
578 bootstrapHost(k8sHost);
579 }
580 });
581 break;
582 case PORT_ADDED:
583 case PORT_REMOVED:
584 case DEVICE_REMOVED:
585 default:
586 // do nothing
587 break;
588 }
589 }
590 }
591
592 private class InternalBridgeListener implements DeviceListener {
593
594 @Override
595 public boolean isRelevant(DeviceEvent event) {
596 return event.subject().type() == Device.Type.SWITCH;
597 }
598
599 private boolean isRelevantHelper() {
600 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
601 }
602
603 @Override
604 public void event(DeviceEvent event) {
605 Device device = event.subject();
606
607 switch (event.type()) {
608 case DEVICE_AVAILABILITY_CHANGED:
609 case DEVICE_ADDED:
610 eventExecutor.execute(() -> {
611 if (!isRelevantHelper()) {
612 return;
613 }
614
615 K8sHost k8sHost = k8sHostAdminService.hostByTunBridge(device.id());
616
617 if (k8sHost == null) {
618 return;
619 }
620
621 if (deviceService.isAvailable(device.id())) {
622 log.debug("Tunnel bridge created on {}",
623 k8sHost.hostIp());
624 log.debug("OVSDB {} detected", device.id());
625 bootstrapHost(k8sHost);
626 } else if (k8sHost.state() == COMPLETE) {
627 log.info("Device {} disconnected", device.id());
628 setState(k8sHost, INCOMPLETE);
629 }
630
631 if (k8sHost.state() == INCOMPLETE ||
632 k8sHost.state() == DEVICE_CREATED) {
633 log.info("Device {} is reconnected", device.id());
634 k8sHostAdminService.updateHost(
635 k8sHost.updateState(INIT));
636 }
637 });
638 break;
639 case PORT_UPDATED:
640 case PORT_ADDED:
641 eventExecutor.execute(() -> {
642 if (!isRelevantHelper()) {
643 return;
644 }
645
Jian Lidc1df642020-11-25 16:49:34 +0900646 K8sHost tunnelHost = k8sHostAdminService.hostByTunBridge(device.id());
Jian Li077b07e2020-09-01 16:55:25 +0900647
Jian Lidc1df642020-11-25 16:49:34 +0900648 if (tunnelHost == null) {
Jian Li077b07e2020-09-01 16:55:25 +0900649 return;
650 }
651
Jian Lidc1df642020-11-25 16:49:34 +0900652 if (tunnelHost.state() == DEVICE_CREATED) {
653 // we bootstrap the host whenever any ports added to the tunnel bridge
654 tunnelHost.tunBridges().stream().filter(
Jian Li077b07e2020-09-01 16:55:25 +0900655 br -> br.deviceId().equals(device.id())
Jian Lidc1df642020-11-25 16:49:34 +0900656 ).findAny().ifPresent(tunBridge -> bootstrapHost(tunnelHost));
657 }
Jian Li077b07e2020-09-01 16:55:25 +0900658
Jian Lidc1df642020-11-25 16:49:34 +0900659 K8sHost routerHost = k8sHostAdminService.hostByRouterBridge(device.id());
660
661 if (routerHost == null) {
662 return;
663 }
664
665 if (routerHost.state() == DEVICE_CREATED) {
666 // we bootstrap the host whenever any ports added to the router bridge
667 routerHost.routerBridges().stream().filter(
668 br -> br.deviceId().equals(device.id())
669 ).findAny().ifPresent(routerBridge -> bootstrapHost(routerHost));
Jian Li077b07e2020-09-01 16:55:25 +0900670 }
671 });
672 break;
673 case PORT_REMOVED:
674 eventExecutor.execute(() -> {
675 if (!isRelevantHelper()) {
676 return;
677 }
678
679 K8sHost k8sHost = k8sHostAdminService.hostByTunBridge(device.id());
680
681 if (k8sHost == null) {
682 return;
683 }
684
685 Port port = event.port();
686 String portName = port.annotations().value(PORT_NAME);
687 if (k8sHost.state() == COMPLETE) {
688 K8sTunnelBridge tunBridge = k8sHost.tunBridges().stream().filter(
689 br -> br.deviceId().equals(device.id())
690 ).findAny().orElse(null);
691
692 if (tunBridge != null) {
693 if (Objects.equals(portName, tunBridge.vxlanPortName()) ||
694 Objects.equals(portName, tunBridge.grePortName()) ||
695 Objects.equals(portName, tunBridge.genevePortName())) {
696 log.warn("Interface {} removed from {}",
697 portName, event.subject().id());
698 setState(k8sHost, INCOMPLETE);
699 }
700 }
701 }
702 });
703 break;
704 case DEVICE_REMOVED:
705 default:
706 // do nothing
707 break;
708 }
709 }
710 }
711
712 private class InternalK8sHostListener implements K8sHostListener {
713
714 private boolean isRelevantHelper() {
715 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
716 }
717
718 @Override
719 public void event(K8sHostEvent event) {
720 switch (event.type()) {
721 case K8S_HOST_CREATED:
722 case K8S_HOST_UPDATED:
723 eventExecutor.execute(() -> {
724 if (!isRelevantHelper()) {
725 return;
726 }
727
728 bootstrapHost(event.subject());
729 });
730 break;
731 case K8S_HOST_REMOVED:
Jian Li32a28ad2020-12-01 00:35:50 +0900732 eventExecutor.execute(() -> {
733 if (!isRelevantHelper()) {
734 return;
735 }
736
737 processHostRemoval(event.subject());
738 });
739 break;
Jian Li077b07e2020-09-01 16:55:25 +0900740 case K8S_HOST_INCOMPLETE:
741 default:
742 break;
743 }
744 }
745 }
746}