blob: f31301f6379981735ca0d15a63ff27f625e2455e [file] [log] [blame]
Jian Lif16e8852019-01-22 22:55:31 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import org.onlab.util.Tools;
19import org.onosproject.cfg.ComponentConfigService;
20import org.onosproject.cluster.ClusterService;
21import org.onosproject.cluster.LeadershipService;
22import org.onosproject.cluster.NodeId;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.k8snode.api.K8sNode;
26import org.onosproject.k8snode.api.K8sNodeAdminService;
27import org.onosproject.k8snode.api.K8sNodeEvent;
28import org.onosproject.k8snode.api.K8sNodeHandler;
29import org.onosproject.k8snode.api.K8sNodeListener;
30import org.onosproject.k8snode.api.K8sNodeService;
31import org.onosproject.k8snode.api.K8sNodeState;
32import org.onosproject.net.Device;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.Port;
35import org.onosproject.net.behaviour.BridgeConfig;
36import org.onosproject.net.behaviour.BridgeDescription;
37import org.onosproject.net.behaviour.ControllerInfo;
38import org.onosproject.net.behaviour.DefaultBridgeDescription;
Jian Libf562c22019-04-15 18:07:14 +090039import org.onosproject.net.behaviour.DefaultPatchDescription;
Jian Lif16e8852019-01-22 22:55:31 +090040import org.onosproject.net.behaviour.DefaultTunnelDescription;
41import org.onosproject.net.behaviour.InterfaceConfig;
Jian Libf562c22019-04-15 18:07:14 +090042import org.onosproject.net.behaviour.PatchDescription;
Jian Lif16e8852019-01-22 22:55:31 +090043import org.onosproject.net.behaviour.TunnelDescription;
44import org.onosproject.net.behaviour.TunnelEndPoints;
45import org.onosproject.net.behaviour.TunnelKeys;
46import org.onosproject.net.device.DeviceAdminService;
47import org.onosproject.net.device.DeviceEvent;
48import org.onosproject.net.device.DeviceListener;
49import org.onosproject.net.device.DeviceService;
50import org.onosproject.ovsdb.controller.OvsdbClientService;
51import org.onosproject.ovsdb.controller.OvsdbController;
52import org.osgi.service.component.ComponentContext;
53import org.osgi.service.component.annotations.Activate;
54import org.osgi.service.component.annotations.Component;
55import org.osgi.service.component.annotations.Deactivate;
56import org.osgi.service.component.annotations.Modified;
57import org.osgi.service.component.annotations.Reference;
58import org.osgi.service.component.annotations.ReferenceCardinality;
59import org.slf4j.Logger;
60
61import java.util.Dictionary;
62import java.util.List;
63import java.util.Objects;
64import java.util.concurrent.ExecutorService;
65import java.util.stream.Collectors;
66
67import static java.util.concurrent.Executors.newSingleThreadExecutor;
68import static org.onlab.packet.TpPort.tpPort;
69import static org.onlab.util.Tools.groupedThreads;
Jian Libf562c22019-04-15 18:07:14 +090070import static org.onosproject.k8snode.api.Constants.EXTERNAL_BRIDGE;
Jian Lif16e8852019-01-22 22:55:31 +090071import static org.onosproject.k8snode.api.Constants.GENEVE;
72import static org.onosproject.k8snode.api.Constants.GENEVE_TUNNEL;
73import static org.onosproject.k8snode.api.Constants.GRE;
74import static org.onosproject.k8snode.api.Constants.GRE_TUNNEL;
75import static org.onosproject.k8snode.api.Constants.INTEGRATION_BRIDGE;
Jian Libf562c22019-04-15 18:07:14 +090076import static org.onosproject.k8snode.api.Constants.INTEGRATION_TO_EXTERNAL_BRIDGE;
Jian Li1a2eb5d2019-08-27 02:07:05 +090077import static org.onosproject.k8snode.api.Constants.INTEGRATION_TO_LOCAL_BRIDGE;
78import static org.onosproject.k8snode.api.Constants.LOCAL_BRIDGE;
79import static org.onosproject.k8snode.api.Constants.LOCAL_TO_INTEGRATION_BRIDGE;
Jian Libf562c22019-04-15 18:07:14 +090080import static org.onosproject.k8snode.api.Constants.PHYSICAL_EXTERNAL_BRIDGE;
Jian Lif16e8852019-01-22 22:55:31 +090081import static org.onosproject.k8snode.api.Constants.VXLAN;
82import static org.onosproject.k8snode.api.Constants.VXLAN_TUNNEL;
83import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
84import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
85import static org.onosproject.k8snode.api.K8sNodeState.DEVICE_CREATED;
86import static org.onosproject.k8snode.api.K8sNodeState.INCOMPLETE;
87import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
88import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
89import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
90import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
91import static org.onosproject.k8snode.util.K8sNodeUtil.getBooleanProperty;
92import static org.onosproject.k8snode.util.K8sNodeUtil.getOvsdbClient;
93import static org.onosproject.k8snode.util.K8sNodeUtil.isOvsdbConnected;
94import static org.onosproject.net.AnnotationKeys.PORT_NAME;
95import static org.slf4j.LoggerFactory.getLogger;
96
97/**
98 * Service bootstraps kubernetes node based on its type.
99 */
100@Component(immediate = true,
101 property = {
102 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
103 AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
104 }
105)
106public class DefaultK8sNodeHandler implements K8sNodeHandler {
107
108 private final Logger log = getLogger(getClass());
109
110 private static final String DEFAULT_OF_PROTO = "tcp";
111 private static final int DEFAULT_OFPORT = 6653;
112 private static final int DPID_BEGIN = 3;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
115 protected CoreService coreService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
118 protected LeadershipService leadershipService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
121 protected ClusterService clusterService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
124 protected DeviceService deviceService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
127 protected DeviceAdminService deviceAdminService;
128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
130 protected OvsdbController ovsdbController;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
133 protected K8sNodeService k8sNodeService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY)
136 protected K8sNodeAdminService k8sNodeAdminService;
137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY)
139 protected ComponentConfigService componentConfigService;
140
141 /** OVSDB server listen port. */
142 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
143
144 /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
145 private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
146
147 private final ExecutorService eventExecutor = newSingleThreadExecutor(
148 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
149
150 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
151 private final DeviceListener bridgeListener = new InternalBridgeListener();
152 private final K8sNodeListener k8sNodeListener = new InternalK8sNodeListener();
153
154 private ApplicationId appId;
155 private NodeId localNode;
156
157 @Activate
158 protected void activate() {
159 appId = coreService.getAppId(APP_ID);
160 localNode = clusterService.getLocalNode().id();
161
162 componentConfigService.registerProperties(getClass());
163 leadershipService.runForLeadership(appId.name());
164 deviceService.addListener(ovsdbListener);
165 deviceService.addListener(bridgeListener);
166 k8sNodeService.addListener(k8sNodeListener);
167
168 log.info("Started");
169 }
170
171 @Deactivate
172 protected void deactivate() {
173 k8sNodeService.removeListener(k8sNodeListener);
174 deviceService.removeListener(bridgeListener);
175 deviceService.removeListener(ovsdbListener);
176 componentConfigService.unregisterProperties(getClass(), false);
177 leadershipService.withdraw(appId.name());
178 eventExecutor.shutdown();
179
180 log.info("Stopped");
181 }
182
183 @Modified
184 protected void modified(ComponentContext context) {
185 readComponentConfiguration(context);
186
187 log.info("Modified");
188 }
189
190 @Override
191 public void processInitState(K8sNode k8sNode) {
192 if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
193 ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
194 return;
195 }
196 if (!deviceService.isAvailable(k8sNode.intgBridge())) {
197 createBridge(k8sNode, INTEGRATION_BRIDGE, k8sNode.intgBridge());
198 }
Jian Libf562c22019-04-15 18:07:14 +0900199 if (!deviceService.isAvailable(k8sNode.extBridge())) {
200 createBridge(k8sNode, EXTERNAL_BRIDGE, k8sNode.extBridge());
201 }
Jian Li1a2eb5d2019-08-27 02:07:05 +0900202 if (!deviceService.isAvailable(k8sNode.localBridge())) {
203 createBridge(k8sNode, LOCAL_BRIDGE, k8sNode.localBridge());
204 }
Jian Lif16e8852019-01-22 22:55:31 +0900205 }
206
207 @Override
208 public void processDeviceCreatedState(K8sNode k8sNode) {
209 try {
210 if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
211 ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
212 return;
213 }
214
Jian Libf562c22019-04-15 18:07:14 +0900215 // create patch ports between integration and external bridges
216 createPatchInterfaces(k8sNode);
217
Jian Lif16e8852019-01-22 22:55:31 +0900218 if (k8sNode.dataIp() != null &&
219 !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
220 createVxlanTunnelInterface(k8sNode);
221 }
222
223 if (k8sNode.dataIp() != null &&
224 !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
225 createGreTunnelInterface(k8sNode);
226 }
227
228 if (k8sNode.dataIp() != null &&
229 !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
230 createGeneveTunnelInterface(k8sNode);
231 }
232 } catch (Exception e) {
233 log.error("Exception occurred because of {}", e);
234 }
235 }
236
237 @Override
238 public void processCompleteState(K8sNode k8sNode) {
239 // do something if needed
240 }
241
242 @Override
243 public void processIncompleteState(K8sNode k8sNode) {
244 // do something if needed
245 }
246
247 /**
248 * Extracts properties from the component configuration context.
249 *
250 * @param context the component context
251 */
252 private void readComponentConfiguration(ComponentContext context) {
253 Dictionary<?, ?> properties = context.getProperties();
254
255 Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
256 if (ovsdbPortConfigured == null) {
257 ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
258 log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
259 } else {
260 ovsdbPortNum = ovsdbPortConfigured;
261 log.info("Configured. OVSDB port is {}", ovsdbPortNum);
262 }
263
264 Boolean autoRecoveryConfigured =
265 getBooleanProperty(properties, AUTO_RECOVERY);
266 if (autoRecoveryConfigured == null) {
267 autoRecovery = AUTO_RECOVERY_DEFAULT;
268 log.info("Auto recovery flag is NOT " +
269 "configured, default value is {}", autoRecovery);
270 } else {
271 autoRecovery = autoRecoveryConfigured;
272 log.info("Configured. Auto recovery flag is {}", autoRecovery);
273 }
274 }
275
276 /**
277 * Creates a bridge with a given name on a given kubernetes node.
278 *
279 * @param k8sNode kubernetes node
280 * @param bridgeName bridge name
281 * @param devId device identifier
282 */
283 private void createBridge(K8sNode k8sNode, String bridgeName, DeviceId devId) {
284 Device device = deviceService.getDevice(k8sNode.ovsdb());
285
286 List<ControllerInfo> controllers = clusterService.getNodes().stream()
287 .map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
288 .collect(Collectors.toList());
289
290 String dpid = devId.toString().substring(DPID_BEGIN);
291
292 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
293 .name(bridgeName)
294 .failMode(BridgeDescription.FailMode.SECURE)
295 .datapathId(dpid)
296 .disableInBand()
297 .controllers(controllers);
298
299 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
300 bridgeConfig.addBridge(builder.build());
301 }
302
303 /**
304 * Creates a VXLAN tunnel interface in a given kubernetes node.
305 *
306 * @param k8sNode kubernetes node
307 */
308 private void createVxlanTunnelInterface(K8sNode k8sNode) {
309 createTunnelInterface(k8sNode, VXLAN, VXLAN_TUNNEL);
310 }
311
312 /**
313 * Creates a GRE tunnel interface in a given kubernetes node.
314 *
315 * @param k8sNode kubernetes node
316 */
317 private void createGreTunnelInterface(K8sNode k8sNode) {
318 createTunnelInterface(k8sNode, GRE, GRE_TUNNEL);
319 }
320
321 /**
322 * Creates a GENEVE tunnel interface in a given kubernetes node.
323 *
324 * @param k8sNode kubernetes node
325 */
326 private void createGeneveTunnelInterface(K8sNode k8sNode) {
327 createTunnelInterface(k8sNode, GENEVE, GENEVE_TUNNEL);
328 }
329
Jian Libf562c22019-04-15 18:07:14 +0900330 private void createPatchInterfaces(K8sNode k8sNode) {
331 Device device = deviceService.getDevice(k8sNode.ovsdb());
332 if (device == null || !device.is(InterfaceConfig.class)) {
333 log.error("Failed to create patch interface on {}", k8sNode.ovsdb());
334 return;
335 }
336
Jian Li1a2eb5d2019-08-27 02:07:05 +0900337 // integration bridge -> external bridge
338 PatchDescription brIntExtPatchDesc =
Jian Libf562c22019-04-15 18:07:14 +0900339 DefaultPatchDescription.builder()
340 .deviceId(INTEGRATION_BRIDGE)
341 .ifaceName(INTEGRATION_TO_EXTERNAL_BRIDGE)
342 .peer(PHYSICAL_EXTERNAL_BRIDGE)
343 .build();
344
Jian Li1a2eb5d2019-08-27 02:07:05 +0900345 // external bridge -> integration bridge
346 PatchDescription brExtIntPatchDesc =
Jian Libf562c22019-04-15 18:07:14 +0900347 DefaultPatchDescription.builder()
348 .deviceId(EXTERNAL_BRIDGE)
349 .ifaceName(PHYSICAL_EXTERNAL_BRIDGE)
350 .peer(INTEGRATION_TO_EXTERNAL_BRIDGE)
351 .build();
352
Jian Li1a2eb5d2019-08-27 02:07:05 +0900353 // integration bridge -> local bridge
354 PatchDescription brIntLocalPatchDesc =
355 DefaultPatchDescription.builder()
356 .deviceId(INTEGRATION_BRIDGE)
357 .ifaceName(INTEGRATION_TO_LOCAL_BRIDGE)
358 .peer(LOCAL_TO_INTEGRATION_BRIDGE)
359 .build();
360
361 // local bridge -> integration bridge
362 PatchDescription brLocalIntPatchDesc =
363 DefaultPatchDescription.builder()
364 .deviceId(LOCAL_BRIDGE)
365 .ifaceName(LOCAL_TO_INTEGRATION_BRIDGE)
366 .peer(INTEGRATION_TO_LOCAL_BRIDGE)
367 .build();
368
Jian Libf562c22019-04-15 18:07:14 +0900369 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
Jian Li1a2eb5d2019-08-27 02:07:05 +0900370 ifaceConfig.addPatchMode(INTEGRATION_TO_EXTERNAL_BRIDGE, brIntExtPatchDesc);
371 ifaceConfig.addPatchMode(PHYSICAL_EXTERNAL_BRIDGE, brExtIntPatchDesc);
372 ifaceConfig.addPatchMode(INTEGRATION_TO_LOCAL_BRIDGE, brIntLocalPatchDesc);
373 ifaceConfig.addPatchMode(LOCAL_TO_INTEGRATION_BRIDGE, brLocalIntPatchDesc);
Jian Libf562c22019-04-15 18:07:14 +0900374 }
375
Jian Lif16e8852019-01-22 22:55:31 +0900376 /**
377 * Creates a tunnel interface in a given kubernetes node.
378 *
379 * @param k8sNode kubernetes node
380 */
381 private void createTunnelInterface(K8sNode k8sNode,
382 String type, String intfName) {
383 if (isIntfEnabled(k8sNode, intfName)) {
384 return;
385 }
386
387 Device device = deviceService.getDevice(k8sNode.ovsdb());
388 if (device == null || !device.is(InterfaceConfig.class)) {
389 log.error("Failed to create tunnel interface on {}", k8sNode.ovsdb());
390 return;
391 }
392
393 TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
394
395 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
396 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
397 }
398
399 /**
400 * Builds tunnel description according to the network type.
401 *
402 * @param type network type
403 * @return tunnel description
404 */
405 private TunnelDescription buildTunnelDesc(String type, String intfName) {
406 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
407 TunnelDescription.Builder tdBuilder =
408 DefaultTunnelDescription.builder()
409 .deviceId(INTEGRATION_BRIDGE)
410 .ifaceName(intfName)
411 .remote(TunnelEndPoints.flowTunnelEndpoint())
412 .key(TunnelKeys.flowTunnelKey());
413
414 switch (type) {
415 case VXLAN:
416 tdBuilder.type(TunnelDescription.Type.VXLAN);
417 break;
418 case GRE:
419 tdBuilder.type(TunnelDescription.Type.GRE);
420 break;
421 case GENEVE:
422 tdBuilder.type(TunnelDescription.Type.GENEVE);
423 break;
424 default:
425 return null;
426 }
427
428 return tdBuilder.build();
429 }
430 return null;
431 }
432
433 /**
434 * Checks whether a given network interface in a given kubernetes node
435 * is enabled or not.
436 *
437 * @param k8sNode kubernetes node
438 * @param intf network interface name
439 * @return true if the given interface is enabled, false otherwise
440 */
441 private boolean isIntfEnabled(K8sNode k8sNode, String intf) {
442 return deviceService.isAvailable(k8sNode.intgBridge()) &&
443 deviceService.getPorts(k8sNode.intgBridge()).stream()
444 .anyMatch(port -> Objects.equals(
445 port.annotations().value(PORT_NAME), intf) &&
446 port.isEnabled());
447 }
448
449 /**
450 * Checks whether all requirements for this state are fulfilled or not.
451 *
452 * @param k8sNode kubernetes node
453 * @return true if all requirements are fulfilled, false otherwise
454 */
455 private boolean isCurrentStateDone(K8sNode k8sNode) {
456 switch (k8sNode.state()) {
457 case INIT:
458 if (!isOvsdbConnected(k8sNode, ovsdbPortNum,
459 ovsdbController, deviceService)) {
460 return false;
461 }
462
Jian Libf562c22019-04-15 18:07:14 +0900463 return k8sNode.intgBridge() != null && k8sNode.extBridge() != null &&
464 deviceService.isAvailable(k8sNode.intgBridge()) &&
Jian Li1a2eb5d2019-08-27 02:07:05 +0900465 deviceService.isAvailable(k8sNode.extBridge()) &&
466 deviceService.isAvailable(k8sNode.localBridge());
Jian Lif16e8852019-01-22 22:55:31 +0900467 case DEVICE_CREATED:
468 if (k8sNode.dataIp() != null &&
469 !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
470 return false;
471 }
472 if (k8sNode.dataIp() != null &&
473 !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
474 return false;
475 }
476 if (k8sNode.dataIp() != null &&
477 !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
478 return false;
479 }
480
481 return true;
482 case COMPLETE:
483 case INCOMPLETE:
484 // always return false
485 // run init CLI to re-trigger node bootstrap
486 return false;
487 default:
488 return true;
489 }
490 }
491
492 /**
493 * Configures the kubernetes node with new state.
494 *
495 * @param k8sNode kubernetes node
496 * @param newState a new state
497 */
498 private void setState(K8sNode k8sNode, K8sNodeState newState) {
499 if (k8sNode.state() == newState) {
500 return;
501 }
502 K8sNode updated = k8sNode.updateState(newState);
503 k8sNodeAdminService.updateNode(updated);
504 log.info("Changed {} state: {}", k8sNode.hostname(), newState);
505 }
506
507 /**
508 * Bootstraps a new kubernetes node.
509 *
510 * @param k8sNode kubernetes node
511 */
512 private void bootstrapNode(K8sNode k8sNode) {
513 if (isCurrentStateDone(k8sNode)) {
514 setState(k8sNode, k8sNode.state().nextState());
515 } else {
516 log.trace("Processing {} state for {}", k8sNode.state(),
517 k8sNode.hostname());
518 k8sNode.state().process(this, k8sNode);
519 }
520 }
521
522 private void processK8sNodeRemoved(K8sNode k8sNode) {
523 OvsdbClientService client = getOvsdbClient(k8sNode, ovsdbPortNum, ovsdbController);
524 if (client == null) {
525 log.info("Failed to get ovsdb client");
526 return;
527 }
528
529 // delete integration bridge from the node
530 client.dropBridge(INTEGRATION_BRIDGE);
531
Jian Libf562c22019-04-15 18:07:14 +0900532 // delete external bridge from the node
533 client.dropBridge(EXTERNAL_BRIDGE);
534
Jian Li1a2eb5d2019-08-27 02:07:05 +0900535 // delete local bridge from the node
536 client.dropBridge(LOCAL_BRIDGE);
537
Jian Lif16e8852019-01-22 22:55:31 +0900538 // disconnect ovsdb
539 client.disconnect();
540 }
541
542 /**
543 * An internal OVSDB listener. This listener is used for listening the
544 * network facing events from OVSDB device. If a new OVSDB device is detected,
545 * ONOS tries to bootstrap the kubernetes node.
546 */
547 private class InternalOvsdbListener implements DeviceListener {
548
549 @Override
550 public boolean isRelevant(DeviceEvent event) {
551 return event.subject().type() == Device.Type.CONTROLLER;
552 }
553
554 private boolean isRelevantHelper() {
555 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
556 }
557
558 @Override
559 public void event(DeviceEvent event) {
560 Device device = event.subject();
561
562 switch (event.type()) {
563 case DEVICE_AVAILABILITY_CHANGED:
564 case DEVICE_ADDED:
565 eventExecutor.execute(() -> {
566
567 if (!isRelevantHelper()) {
568 return;
569 }
570
571 K8sNode k8sNode = k8sNodeService.node(device.id());
572
573 if (k8sNode == null) {
574 return;
575 }
576
577 if (deviceService.isAvailable(device.id())) {
578 log.debug("OVSDB {} detected", device.id());
579 bootstrapNode(k8sNode);
580 }
581 });
582 break;
583 case PORT_ADDED:
584 case PORT_REMOVED:
585 case DEVICE_REMOVED:
586 default:
587 // do nothing
588 break;
589 }
590 }
591 }
592
593 /**
594 * An internal integration bridge listener. This listener is used for
595 * listening the events from integration bridge. To listen the events from
596 * other types of bridge such as provider bridge or tunnel bridge, we need
597 * to augment K8sNodeService.node() method.
598 */
599 private class InternalBridgeListener implements DeviceListener {
600
601 @Override
602 public boolean isRelevant(DeviceEvent event) {
603 return event.subject().type() == Device.Type.SWITCH;
604 }
605
606 private boolean isRelevantHelper() {
607 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
608 }
609
610 @Override
611 public void event(DeviceEvent event) {
612 Device device = event.subject();
613
614 switch (event.type()) {
615 case DEVICE_AVAILABILITY_CHANGED:
616 case DEVICE_ADDED:
617 eventExecutor.execute(() -> {
618
619 if (!isRelevantHelper()) {
620 return;
621 }
622
623 K8sNode k8sNode = k8sNodeService.node(device.id());
624
625 if (k8sNode == null) {
626 return;
627 }
628
Jian Libf562c22019-04-15 18:07:14 +0900629 // TODO: also need to check the external bridge's availability
Jian Li1a2eb5d2019-08-27 02:07:05 +0900630 // TODO: also need to check the local bridge's availability
Jian Lif16e8852019-01-22 22:55:31 +0900631 if (deviceService.isAvailable(device.id())) {
Jian Libf562c22019-04-15 18:07:14 +0900632 log.debug("Integration bridge created on {}",
633 k8sNode.hostname());
Jian Lif16e8852019-01-22 22:55:31 +0900634 bootstrapNode(k8sNode);
635 } else if (k8sNode.state() == COMPLETE) {
636 log.info("Device {} disconnected", device.id());
637 setState(k8sNode, INCOMPLETE);
638 }
639
640 if (autoRecovery) {
641 if (k8sNode.state() == INCOMPLETE ||
642 k8sNode.state() == DEVICE_CREATED) {
643 log.info("Device {} is reconnected", device.id());
644 k8sNodeAdminService.updateNode(
645 k8sNode.updateState(K8sNodeState.INIT));
646 }
647 }
648 });
649 break;
650 case PORT_UPDATED:
651 case PORT_ADDED:
652 eventExecutor.execute(() -> {
653
654 if (!isRelevantHelper()) {
655 return;
656 }
657
658 K8sNode k8sNode = k8sNodeService.node(device.id());
659
660 if (k8sNode == null) {
661 return;
662 }
663
664 Port port = event.port();
665 String portName = port.annotations().value(PORT_NAME);
666 if (k8sNode.state() == DEVICE_CREATED && (
667 Objects.equals(portName, VXLAN_TUNNEL) ||
668 Objects.equals(portName, GRE_TUNNEL) ||
669 Objects.equals(portName, GENEVE_TUNNEL))) {
670 log.info("Interface {} added or updated to {}",
671 portName, device.id());
672 bootstrapNode(k8sNode);
673 }
674 });
675 break;
676 case PORT_REMOVED:
677 eventExecutor.execute(() -> {
678
679 if (!isRelevantHelper()) {
680 return;
681 }
682
683 K8sNode k8sNode = k8sNodeService.node(device.id());
684
685 if (k8sNode == null) {
686 return;
687 }
688
689 Port port = event.port();
690 String portName = port.annotations().value(PORT_NAME);
691 if (k8sNode.state() == COMPLETE && (
692 Objects.equals(portName, VXLAN_TUNNEL) ||
693 Objects.equals(portName, GRE_TUNNEL) ||
694 Objects.equals(portName, GENEVE_TUNNEL))) {
695 log.warn("Interface {} removed from {}",
696 portName, event.subject().id());
697 setState(k8sNode, INCOMPLETE);
698 }
699 });
700 break;
701 case DEVICE_REMOVED:
702 default:
703 // do nothing
704 break;
705 }
706 }
707 }
708
709 /**
710 * An internal kubernetes node listener.
711 * The notification is triggered by KubernetesNodeStore.
712 */
713 private class InternalK8sNodeListener implements K8sNodeListener {
714
715 private boolean isRelevantHelper() {
716 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
717 }
718
719 @Override
720 public void event(K8sNodeEvent event) {
721 switch (event.type()) {
722 case K8S_NODE_CREATED:
723 case K8S_NODE_UPDATED:
724 eventExecutor.execute(() -> {
725
726 if (!isRelevantHelper()) {
727 return;
728 }
729
730 bootstrapNode(event.subject());
731 });
732 break;
733 case K8S_NODE_REMOVED:
734 eventExecutor.execute(() -> {
735
736 if (!isRelevantHelper()) {
737 return;
738 }
739
740 processK8sNodeRemoved(event.subject());
741 });
742 break;
743 case K8S_NODE_INCOMPLETE:
744 default:
745 break;
746 }
747 }
748 }
749}