blob: 1c68f3d78c70ee23d12d9f528738f3a6a529c039 [file] [log] [blame]
Jian Lif16e8852019-01-22 22:55:31 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import org.onlab.util.Tools;
19import org.onosproject.cfg.ComponentConfigService;
20import org.onosproject.cluster.ClusterService;
21import org.onosproject.cluster.LeadershipService;
22import org.onosproject.cluster.NodeId;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.k8snode.api.K8sNode;
26import org.onosproject.k8snode.api.K8sNodeAdminService;
27import org.onosproject.k8snode.api.K8sNodeEvent;
28import org.onosproject.k8snode.api.K8sNodeHandler;
29import org.onosproject.k8snode.api.K8sNodeListener;
30import org.onosproject.k8snode.api.K8sNodeService;
31import org.onosproject.k8snode.api.K8sNodeState;
32import org.onosproject.net.Device;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.Port;
35import org.onosproject.net.behaviour.BridgeConfig;
36import org.onosproject.net.behaviour.BridgeDescription;
37import org.onosproject.net.behaviour.ControllerInfo;
38import org.onosproject.net.behaviour.DefaultBridgeDescription;
Jian Libf562c22019-04-15 18:07:14 +090039import org.onosproject.net.behaviour.DefaultPatchDescription;
Jian Lif16e8852019-01-22 22:55:31 +090040import org.onosproject.net.behaviour.DefaultTunnelDescription;
41import org.onosproject.net.behaviour.InterfaceConfig;
Jian Libf562c22019-04-15 18:07:14 +090042import org.onosproject.net.behaviour.PatchDescription;
Jian Lif16e8852019-01-22 22:55:31 +090043import org.onosproject.net.behaviour.TunnelDescription;
44import org.onosproject.net.behaviour.TunnelEndPoints;
45import org.onosproject.net.behaviour.TunnelKeys;
46import org.onosproject.net.device.DeviceAdminService;
47import org.onosproject.net.device.DeviceEvent;
48import org.onosproject.net.device.DeviceListener;
49import org.onosproject.net.device.DeviceService;
50import org.onosproject.ovsdb.controller.OvsdbClientService;
51import org.onosproject.ovsdb.controller.OvsdbController;
52import org.osgi.service.component.ComponentContext;
53import org.osgi.service.component.annotations.Activate;
54import org.osgi.service.component.annotations.Component;
55import org.osgi.service.component.annotations.Deactivate;
56import org.osgi.service.component.annotations.Modified;
57import org.osgi.service.component.annotations.Reference;
58import org.osgi.service.component.annotations.ReferenceCardinality;
59import org.slf4j.Logger;
60
61import java.util.Dictionary;
62import java.util.List;
63import java.util.Objects;
64import java.util.concurrent.ExecutorService;
65import java.util.stream.Collectors;
66
67import static java.util.concurrent.Executors.newSingleThreadExecutor;
68import static org.onlab.packet.TpPort.tpPort;
69import static org.onlab.util.Tools.groupedThreads;
Jian Libf562c22019-04-15 18:07:14 +090070import static org.onosproject.k8snode.api.Constants.EXTERNAL_BRIDGE;
Jian Lif16e8852019-01-22 22:55:31 +090071import static org.onosproject.k8snode.api.Constants.GENEVE;
72import static org.onosproject.k8snode.api.Constants.GENEVE_TUNNEL;
73import static org.onosproject.k8snode.api.Constants.GRE;
74import static org.onosproject.k8snode.api.Constants.GRE_TUNNEL;
75import static org.onosproject.k8snode.api.Constants.INTEGRATION_BRIDGE;
Jian Libf562c22019-04-15 18:07:14 +090076import static org.onosproject.k8snode.api.Constants.INTEGRATION_TO_EXTERNAL_BRIDGE;
Jian Li1a2eb5d2019-08-27 02:07:05 +090077import static org.onosproject.k8snode.api.Constants.INTEGRATION_TO_LOCAL_BRIDGE;
78import static org.onosproject.k8snode.api.Constants.LOCAL_BRIDGE;
79import static org.onosproject.k8snode.api.Constants.LOCAL_TO_INTEGRATION_BRIDGE;
Jian Libf562c22019-04-15 18:07:14 +090080import static org.onosproject.k8snode.api.Constants.PHYSICAL_EXTERNAL_BRIDGE;
Jian Lif16e8852019-01-22 22:55:31 +090081import static org.onosproject.k8snode.api.Constants.VXLAN;
82import static org.onosproject.k8snode.api.Constants.VXLAN_TUNNEL;
83import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
84import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
85import static org.onosproject.k8snode.api.K8sNodeState.DEVICE_CREATED;
86import static org.onosproject.k8snode.api.K8sNodeState.INCOMPLETE;
87import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
88import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
89import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
90import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
91import static org.onosproject.k8snode.util.K8sNodeUtil.getBooleanProperty;
92import static org.onosproject.k8snode.util.K8sNodeUtil.getOvsdbClient;
93import static org.onosproject.k8snode.util.K8sNodeUtil.isOvsdbConnected;
94import static org.onosproject.net.AnnotationKeys.PORT_NAME;
95import static org.slf4j.LoggerFactory.getLogger;
96
97/**
98 * Service bootstraps kubernetes node based on its type.
99 */
100@Component(immediate = true,
101 property = {
102 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
103 AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
104 }
105)
106public class DefaultK8sNodeHandler implements K8sNodeHandler {
107
108 private final Logger log = getLogger(getClass());
109
110 private static final String DEFAULT_OF_PROTO = "tcp";
111 private static final int DEFAULT_OFPORT = 6653;
112 private static final int DPID_BEGIN = 3;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
115 protected CoreService coreService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
118 protected LeadershipService leadershipService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
121 protected ClusterService clusterService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
124 protected DeviceService deviceService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
127 protected DeviceAdminService deviceAdminService;
128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
130 protected OvsdbController ovsdbController;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
133 protected K8sNodeService k8sNodeService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY)
136 protected K8sNodeAdminService k8sNodeAdminService;
137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY)
139 protected ComponentConfigService componentConfigService;
140
141 /** OVSDB server listen port. */
142 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
143
144 /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
145 private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
146
147 private final ExecutorService eventExecutor = newSingleThreadExecutor(
148 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
149
150 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
151 private final DeviceListener bridgeListener = new InternalBridgeListener();
152 private final K8sNodeListener k8sNodeListener = new InternalK8sNodeListener();
153
154 private ApplicationId appId;
155 private NodeId localNode;
156
157 @Activate
158 protected void activate() {
159 appId = coreService.getAppId(APP_ID);
160 localNode = clusterService.getLocalNode().id();
161
162 componentConfigService.registerProperties(getClass());
163 leadershipService.runForLeadership(appId.name());
164 deviceService.addListener(ovsdbListener);
165 deviceService.addListener(bridgeListener);
166 k8sNodeService.addListener(k8sNodeListener);
167
168 log.info("Started");
169 }
170
171 @Deactivate
172 protected void deactivate() {
173 k8sNodeService.removeListener(k8sNodeListener);
174 deviceService.removeListener(bridgeListener);
175 deviceService.removeListener(ovsdbListener);
176 componentConfigService.unregisterProperties(getClass(), false);
177 leadershipService.withdraw(appId.name());
178 eventExecutor.shutdown();
179
180 log.info("Stopped");
181 }
182
183 @Modified
184 protected void modified(ComponentContext context) {
185 readComponentConfiguration(context);
186
187 log.info("Modified");
188 }
189
190 @Override
191 public void processInitState(K8sNode k8sNode) {
192 if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
193 ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
194 return;
195 }
196 if (!deviceService.isAvailable(k8sNode.intgBridge())) {
197 createBridge(k8sNode, INTEGRATION_BRIDGE, k8sNode.intgBridge());
198 }
Jian Libf562c22019-04-15 18:07:14 +0900199 if (!deviceService.isAvailable(k8sNode.extBridge())) {
200 createBridge(k8sNode, EXTERNAL_BRIDGE, k8sNode.extBridge());
201 }
Jian Li1a2eb5d2019-08-27 02:07:05 +0900202 if (!deviceService.isAvailable(k8sNode.localBridge())) {
203 createBridge(k8sNode, LOCAL_BRIDGE, k8sNode.localBridge());
204 }
Jian Lif16e8852019-01-22 22:55:31 +0900205 }
206
207 @Override
208 public void processDeviceCreatedState(K8sNode k8sNode) {
209 try {
210 if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
211 ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
212 return;
213 }
214
Jian Libf562c22019-04-15 18:07:14 +0900215 // create patch ports between integration and external bridges
216 createPatchInterfaces(k8sNode);
217
Jian Lif16e8852019-01-22 22:55:31 +0900218 if (k8sNode.dataIp() != null &&
219 !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
220 createVxlanTunnelInterface(k8sNode);
221 }
222
223 if (k8sNode.dataIp() != null &&
224 !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
225 createGreTunnelInterface(k8sNode);
226 }
227
228 if (k8sNode.dataIp() != null &&
229 !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
230 createGeneveTunnelInterface(k8sNode);
231 }
232 } catch (Exception e) {
233 log.error("Exception occurred because of {}", e);
234 }
235 }
236
237 @Override
238 public void processCompleteState(K8sNode k8sNode) {
239 // do something if needed
240 }
241
242 @Override
243 public void processIncompleteState(K8sNode k8sNode) {
244 // do something if needed
245 }
246
Jian Li0ee8d0e2019-12-18 11:35:05 +0900247 @Override
248 public void processPreOnBoardState(K8sNode k8sNode) {
249 processInitState(k8sNode);
250 processDeviceCreatedState(k8sNode);
251 }
252
253 @Override
254 public void processOnBoardedState(K8sNode k8sNode) {
255 // do something if needed
256 }
257
Jian Lif16e8852019-01-22 22:55:31 +0900258 /**
259 * Extracts properties from the component configuration context.
260 *
261 * @param context the component context
262 */
263 private void readComponentConfiguration(ComponentContext context) {
264 Dictionary<?, ?> properties = context.getProperties();
265
266 Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
267 if (ovsdbPortConfigured == null) {
268 ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
269 log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
270 } else {
271 ovsdbPortNum = ovsdbPortConfigured;
272 log.info("Configured. OVSDB port is {}", ovsdbPortNum);
273 }
274
275 Boolean autoRecoveryConfigured =
276 getBooleanProperty(properties, AUTO_RECOVERY);
277 if (autoRecoveryConfigured == null) {
278 autoRecovery = AUTO_RECOVERY_DEFAULT;
279 log.info("Auto recovery flag is NOT " +
280 "configured, default value is {}", autoRecovery);
281 } else {
282 autoRecovery = autoRecoveryConfigured;
283 log.info("Configured. Auto recovery flag is {}", autoRecovery);
284 }
285 }
286
287 /**
288 * Creates a bridge with a given name on a given kubernetes node.
289 *
290 * @param k8sNode kubernetes node
291 * @param bridgeName bridge name
292 * @param devId device identifier
293 */
294 private void createBridge(K8sNode k8sNode, String bridgeName, DeviceId devId) {
295 Device device = deviceService.getDevice(k8sNode.ovsdb());
296
297 List<ControllerInfo> controllers = clusterService.getNodes().stream()
298 .map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
299 .collect(Collectors.toList());
300
301 String dpid = devId.toString().substring(DPID_BEGIN);
302
303 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
304 .name(bridgeName)
305 .failMode(BridgeDescription.FailMode.SECURE)
306 .datapathId(dpid)
307 .disableInBand()
308 .controllers(controllers);
309
310 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
311 bridgeConfig.addBridge(builder.build());
312 }
313
314 /**
315 * Creates a VXLAN tunnel interface in a given kubernetes node.
316 *
317 * @param k8sNode kubernetes node
318 */
319 private void createVxlanTunnelInterface(K8sNode k8sNode) {
320 createTunnelInterface(k8sNode, VXLAN, VXLAN_TUNNEL);
321 }
322
323 /**
324 * Creates a GRE tunnel interface in a given kubernetes node.
325 *
326 * @param k8sNode kubernetes node
327 */
328 private void createGreTunnelInterface(K8sNode k8sNode) {
329 createTunnelInterface(k8sNode, GRE, GRE_TUNNEL);
330 }
331
332 /**
333 * Creates a GENEVE tunnel interface in a given kubernetes node.
334 *
335 * @param k8sNode kubernetes node
336 */
337 private void createGeneveTunnelInterface(K8sNode k8sNode) {
338 createTunnelInterface(k8sNode, GENEVE, GENEVE_TUNNEL);
339 }
340
Jian Libf562c22019-04-15 18:07:14 +0900341 private void createPatchInterfaces(K8sNode k8sNode) {
342 Device device = deviceService.getDevice(k8sNode.ovsdb());
343 if (device == null || !device.is(InterfaceConfig.class)) {
344 log.error("Failed to create patch interface on {}", k8sNode.ovsdb());
345 return;
346 }
347
Jian Li1a2eb5d2019-08-27 02:07:05 +0900348 // integration bridge -> external bridge
349 PatchDescription brIntExtPatchDesc =
Jian Libf562c22019-04-15 18:07:14 +0900350 DefaultPatchDescription.builder()
351 .deviceId(INTEGRATION_BRIDGE)
352 .ifaceName(INTEGRATION_TO_EXTERNAL_BRIDGE)
353 .peer(PHYSICAL_EXTERNAL_BRIDGE)
354 .build();
355
Jian Li1a2eb5d2019-08-27 02:07:05 +0900356 // external bridge -> integration bridge
357 PatchDescription brExtIntPatchDesc =
Jian Libf562c22019-04-15 18:07:14 +0900358 DefaultPatchDescription.builder()
359 .deviceId(EXTERNAL_BRIDGE)
360 .ifaceName(PHYSICAL_EXTERNAL_BRIDGE)
361 .peer(INTEGRATION_TO_EXTERNAL_BRIDGE)
362 .build();
363
Jian Li1a2eb5d2019-08-27 02:07:05 +0900364 // integration bridge -> local bridge
365 PatchDescription brIntLocalPatchDesc =
366 DefaultPatchDescription.builder()
367 .deviceId(INTEGRATION_BRIDGE)
368 .ifaceName(INTEGRATION_TO_LOCAL_BRIDGE)
369 .peer(LOCAL_TO_INTEGRATION_BRIDGE)
370 .build();
371
372 // local bridge -> integration bridge
373 PatchDescription brLocalIntPatchDesc =
374 DefaultPatchDescription.builder()
375 .deviceId(LOCAL_BRIDGE)
376 .ifaceName(LOCAL_TO_INTEGRATION_BRIDGE)
377 .peer(INTEGRATION_TO_LOCAL_BRIDGE)
378 .build();
379
Jian Libf562c22019-04-15 18:07:14 +0900380 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
Jian Li1a2eb5d2019-08-27 02:07:05 +0900381 ifaceConfig.addPatchMode(INTEGRATION_TO_EXTERNAL_BRIDGE, brIntExtPatchDesc);
382 ifaceConfig.addPatchMode(PHYSICAL_EXTERNAL_BRIDGE, brExtIntPatchDesc);
383 ifaceConfig.addPatchMode(INTEGRATION_TO_LOCAL_BRIDGE, brIntLocalPatchDesc);
384 ifaceConfig.addPatchMode(LOCAL_TO_INTEGRATION_BRIDGE, brLocalIntPatchDesc);
Jian Libf562c22019-04-15 18:07:14 +0900385 }
386
Jian Lif16e8852019-01-22 22:55:31 +0900387 /**
388 * Creates a tunnel interface in a given kubernetes node.
389 *
390 * @param k8sNode kubernetes node
391 */
392 private void createTunnelInterface(K8sNode k8sNode,
393 String type, String intfName) {
394 if (isIntfEnabled(k8sNode, intfName)) {
395 return;
396 }
397
398 Device device = deviceService.getDevice(k8sNode.ovsdb());
399 if (device == null || !device.is(InterfaceConfig.class)) {
400 log.error("Failed to create tunnel interface on {}", k8sNode.ovsdb());
401 return;
402 }
403
404 TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
405
406 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
407 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
408 }
409
410 /**
411 * Builds tunnel description according to the network type.
412 *
413 * @param type network type
414 * @return tunnel description
415 */
416 private TunnelDescription buildTunnelDesc(String type, String intfName) {
417 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
418 TunnelDescription.Builder tdBuilder =
419 DefaultTunnelDescription.builder()
420 .deviceId(INTEGRATION_BRIDGE)
421 .ifaceName(intfName)
422 .remote(TunnelEndPoints.flowTunnelEndpoint())
423 .key(TunnelKeys.flowTunnelKey());
424
425 switch (type) {
426 case VXLAN:
427 tdBuilder.type(TunnelDescription.Type.VXLAN);
428 break;
429 case GRE:
430 tdBuilder.type(TunnelDescription.Type.GRE);
431 break;
432 case GENEVE:
433 tdBuilder.type(TunnelDescription.Type.GENEVE);
434 break;
435 default:
436 return null;
437 }
438
439 return tdBuilder.build();
440 }
441 return null;
442 }
443
444 /**
445 * Checks whether a given network interface in a given kubernetes node
446 * is enabled or not.
447 *
448 * @param k8sNode kubernetes node
449 * @param intf network interface name
450 * @return true if the given interface is enabled, false otherwise
451 */
452 private boolean isIntfEnabled(K8sNode k8sNode, String intf) {
453 return deviceService.isAvailable(k8sNode.intgBridge()) &&
454 deviceService.getPorts(k8sNode.intgBridge()).stream()
455 .anyMatch(port -> Objects.equals(
456 port.annotations().value(PORT_NAME), intf) &&
457 port.isEnabled());
458 }
459
460 /**
461 * Checks whether all requirements for this state are fulfilled or not.
462 *
463 * @param k8sNode kubernetes node
464 * @return true if all requirements are fulfilled, false otherwise
465 */
466 private boolean isCurrentStateDone(K8sNode k8sNode) {
467 switch (k8sNode.state()) {
468 case INIT:
Jian Li0ee8d0e2019-12-18 11:35:05 +0900469 return isInitStateDone(k8sNode);
Jian Lif16e8852019-01-22 22:55:31 +0900470 case DEVICE_CREATED:
Jian Li0ee8d0e2019-12-18 11:35:05 +0900471 return isDeviceCreatedStateDone(k8sNode);
472 case PRE_ON_BOARD:
473 return isInitStateDone(k8sNode) && isDeviceCreatedStateDone(k8sNode);
Jian Lif16e8852019-01-22 22:55:31 +0900474 case COMPLETE:
475 case INCOMPLETE:
Jian Li0ee8d0e2019-12-18 11:35:05 +0900476 case ON_BOARDED:
Jian Lif16e8852019-01-22 22:55:31 +0900477 // always return false
478 // run init CLI to re-trigger node bootstrap
479 return false;
480 default:
481 return true;
482 }
483 }
484
Jian Li0ee8d0e2019-12-18 11:35:05 +0900485 private boolean isInitStateDone(K8sNode k8sNode) {
486 if (!isOvsdbConnected(k8sNode, ovsdbPortNum,
487 ovsdbController, deviceService)) {
488 return false;
489 }
490
491 return k8sNode.intgBridge() != null && k8sNode.extBridge() != null &&
492 deviceService.isAvailable(k8sNode.intgBridge()) &&
493 deviceService.isAvailable(k8sNode.extBridge()) &&
494 deviceService.isAvailable(k8sNode.localBridge());
495 }
496
497 private boolean isDeviceCreatedStateDone(K8sNode k8sNode) {
498 if (k8sNode.dataIp() != null &&
499 !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
500 return false;
501 }
502 if (k8sNode.dataIp() != null &&
503 !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
504 return false;
505 }
506 if (k8sNode.dataIp() != null &&
507 !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
508 return false;
509 }
510
511 return true;
512 }
513
Jian Lif16e8852019-01-22 22:55:31 +0900514 /**
515 * Configures the kubernetes node with new state.
516 *
517 * @param k8sNode kubernetes node
518 * @param newState a new state
519 */
520 private void setState(K8sNode k8sNode, K8sNodeState newState) {
521 if (k8sNode.state() == newState) {
522 return;
523 }
524 K8sNode updated = k8sNode.updateState(newState);
525 k8sNodeAdminService.updateNode(updated);
526 log.info("Changed {} state: {}", k8sNode.hostname(), newState);
527 }
528
529 /**
530 * Bootstraps a new kubernetes node.
531 *
532 * @param k8sNode kubernetes node
533 */
534 private void bootstrapNode(K8sNode k8sNode) {
535 if (isCurrentStateDone(k8sNode)) {
536 setState(k8sNode, k8sNode.state().nextState());
537 } else {
538 log.trace("Processing {} state for {}", k8sNode.state(),
539 k8sNode.hostname());
540 k8sNode.state().process(this, k8sNode);
541 }
542 }
543
544 private void processK8sNodeRemoved(K8sNode k8sNode) {
545 OvsdbClientService client = getOvsdbClient(k8sNode, ovsdbPortNum, ovsdbController);
546 if (client == null) {
547 log.info("Failed to get ovsdb client");
548 return;
549 }
550
551 // delete integration bridge from the node
552 client.dropBridge(INTEGRATION_BRIDGE);
553
Jian Libf562c22019-04-15 18:07:14 +0900554 // delete external bridge from the node
555 client.dropBridge(EXTERNAL_BRIDGE);
556
Jian Li1a2eb5d2019-08-27 02:07:05 +0900557 // delete local bridge from the node
558 client.dropBridge(LOCAL_BRIDGE);
559
Jian Lif16e8852019-01-22 22:55:31 +0900560 // disconnect ovsdb
561 client.disconnect();
562 }
563
564 /**
565 * An internal OVSDB listener. This listener is used for listening the
566 * network facing events from OVSDB device. If a new OVSDB device is detected,
567 * ONOS tries to bootstrap the kubernetes node.
568 */
569 private class InternalOvsdbListener implements DeviceListener {
570
571 @Override
572 public boolean isRelevant(DeviceEvent event) {
573 return event.subject().type() == Device.Type.CONTROLLER;
574 }
575
576 private boolean isRelevantHelper() {
577 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
578 }
579
580 @Override
581 public void event(DeviceEvent event) {
582 Device device = event.subject();
583
584 switch (event.type()) {
585 case DEVICE_AVAILABILITY_CHANGED:
586 case DEVICE_ADDED:
587 eventExecutor.execute(() -> {
588
589 if (!isRelevantHelper()) {
590 return;
591 }
592
593 K8sNode k8sNode = k8sNodeService.node(device.id());
594
595 if (k8sNode == null) {
596 return;
597 }
598
599 if (deviceService.isAvailable(device.id())) {
600 log.debug("OVSDB {} detected", device.id());
601 bootstrapNode(k8sNode);
602 }
603 });
604 break;
605 case PORT_ADDED:
606 case PORT_REMOVED:
607 case DEVICE_REMOVED:
608 default:
609 // do nothing
610 break;
611 }
612 }
613 }
614
615 /**
616 * An internal integration bridge listener. This listener is used for
617 * listening the events from integration bridge. To listen the events from
618 * other types of bridge such as provider bridge or tunnel bridge, we need
619 * to augment K8sNodeService.node() method.
620 */
621 private class InternalBridgeListener implements DeviceListener {
622
623 @Override
624 public boolean isRelevant(DeviceEvent event) {
625 return event.subject().type() == Device.Type.SWITCH;
626 }
627
628 private boolean isRelevantHelper() {
629 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
630 }
631
632 @Override
633 public void event(DeviceEvent event) {
634 Device device = event.subject();
635
636 switch (event.type()) {
637 case DEVICE_AVAILABILITY_CHANGED:
638 case DEVICE_ADDED:
639 eventExecutor.execute(() -> {
640
641 if (!isRelevantHelper()) {
642 return;
643 }
644
645 K8sNode k8sNode = k8sNodeService.node(device.id());
646
647 if (k8sNode == null) {
648 return;
649 }
650
Jian Libf562c22019-04-15 18:07:14 +0900651 // TODO: also need to check the external bridge's availability
Jian Li1a2eb5d2019-08-27 02:07:05 +0900652 // TODO: also need to check the local bridge's availability
Jian Lif16e8852019-01-22 22:55:31 +0900653 if (deviceService.isAvailable(device.id())) {
Jian Libf562c22019-04-15 18:07:14 +0900654 log.debug("Integration bridge created on {}",
655 k8sNode.hostname());
Jian Lif16e8852019-01-22 22:55:31 +0900656 bootstrapNode(k8sNode);
657 } else if (k8sNode.state() == COMPLETE) {
658 log.info("Device {} disconnected", device.id());
659 setState(k8sNode, INCOMPLETE);
660 }
661
662 if (autoRecovery) {
663 if (k8sNode.state() == INCOMPLETE ||
664 k8sNode.state() == DEVICE_CREATED) {
665 log.info("Device {} is reconnected", device.id());
666 k8sNodeAdminService.updateNode(
667 k8sNode.updateState(K8sNodeState.INIT));
668 }
669 }
670 });
671 break;
672 case PORT_UPDATED:
673 case PORT_ADDED:
674 eventExecutor.execute(() -> {
675
676 if (!isRelevantHelper()) {
677 return;
678 }
679
680 K8sNode k8sNode = k8sNodeService.node(device.id());
681
682 if (k8sNode == null) {
683 return;
684 }
685
686 Port port = event.port();
687 String portName = port.annotations().value(PORT_NAME);
688 if (k8sNode.state() == DEVICE_CREATED && (
689 Objects.equals(portName, VXLAN_TUNNEL) ||
690 Objects.equals(portName, GRE_TUNNEL) ||
691 Objects.equals(portName, GENEVE_TUNNEL))) {
692 log.info("Interface {} added or updated to {}",
693 portName, device.id());
694 bootstrapNode(k8sNode);
695 }
696 });
697 break;
698 case PORT_REMOVED:
699 eventExecutor.execute(() -> {
700
701 if (!isRelevantHelper()) {
702 return;
703 }
704
705 K8sNode k8sNode = k8sNodeService.node(device.id());
706
707 if (k8sNode == null) {
708 return;
709 }
710
711 Port port = event.port();
712 String portName = port.annotations().value(PORT_NAME);
713 if (k8sNode.state() == COMPLETE && (
714 Objects.equals(portName, VXLAN_TUNNEL) ||
715 Objects.equals(portName, GRE_TUNNEL) ||
716 Objects.equals(portName, GENEVE_TUNNEL))) {
717 log.warn("Interface {} removed from {}",
718 portName, event.subject().id());
719 setState(k8sNode, INCOMPLETE);
720 }
721 });
722 break;
723 case DEVICE_REMOVED:
724 default:
725 // do nothing
726 break;
727 }
728 }
729 }
730
731 /**
732 * An internal kubernetes node listener.
733 * The notification is triggered by KubernetesNodeStore.
734 */
735 private class InternalK8sNodeListener implements K8sNodeListener {
736
737 private boolean isRelevantHelper() {
738 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
739 }
740
741 @Override
742 public void event(K8sNodeEvent event) {
743 switch (event.type()) {
744 case K8S_NODE_CREATED:
745 case K8S_NODE_UPDATED:
746 eventExecutor.execute(() -> {
747
748 if (!isRelevantHelper()) {
749 return;
750 }
751
752 bootstrapNode(event.subject());
753 });
754 break;
755 case K8S_NODE_REMOVED:
756 eventExecutor.execute(() -> {
757
758 if (!isRelevantHelper()) {
759 return;
760 }
761
762 processK8sNodeRemoved(event.subject());
763 });
764 break;
765 case K8S_NODE_INCOMPLETE:
766 default:
767 break;
768 }
769 }
770 }
771}