blob: e31102748712df13b1db41f40232473ceaa3bb36 [file] [log] [blame]
Jian Li0c632722019-05-08 15:58:04 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import org.onosproject.cfg.ComponentConfigService;
19import org.onosproject.cluster.ClusterService;
20import org.onosproject.cluster.LeadershipService;
21import org.onosproject.cluster.NodeId;
22import org.onosproject.core.ApplicationId;
23import org.onosproject.core.CoreService;
24import org.onosproject.k8snetworking.api.DefaultK8sNetwork;
25import org.onosproject.k8snetworking.api.K8sNetwork;
26import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
27import org.onosproject.k8snode.api.K8sNode;
28import org.onosproject.k8snode.api.K8sNodeEvent;
29import org.onosproject.k8snode.api.K8sNodeListener;
30import org.onosproject.k8snode.api.K8sNodeService;
31import org.onosproject.mastership.MastershipService;
32import org.onosproject.net.device.DeviceService;
33import org.onosproject.net.driver.DriverService;
34import org.osgi.service.component.annotations.Activate;
35import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.annotations.Deactivate;
37import org.osgi.service.component.annotations.Reference;
38import org.osgi.service.component.annotations.ReferenceCardinality;
39import org.slf4j.Logger;
40
41import java.util.Objects;
42import java.util.concurrent.ExecutorService;
43
44import static java.util.concurrent.Executors.newSingleThreadExecutor;
45import static org.onlab.util.Tools.groupedThreads;
46import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
47import static org.slf4j.LoggerFactory.getLogger;
48
49/**
50 * Provides implementation of initializes the kubernetes network.
51 */
52@Component(immediate = true)
53public class K8sNetworkHandler {
54
55 private final Logger log = getLogger(getClass());
56
57 @Reference(cardinality = ReferenceCardinality.MANDATORY)
58 protected CoreService coreService;
59
60 @Reference(cardinality = ReferenceCardinality.MANDATORY)
61 protected ComponentConfigService configService;
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY)
64 protected DeviceService deviceService;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY)
67 protected DriverService driverService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY)
70 protected LeadershipService leadershipService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY)
73 protected MastershipService mastershipService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY)
76 protected ClusterService clusterService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
79 protected K8sNodeService k8sNodeService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY)
82 protected K8sNetworkAdminService k8sNetworkService;
83
84 private final InternalK8sNodeListener k8sNodeListener =
85 new InternalK8sNodeListener();
86 private final ExecutorService eventExecutor = newSingleThreadExecutor(
87 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
88
89 private ApplicationId appId;
90 private NodeId localNodeId;
91
92 @Activate
93 protected void activate() {
94 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
95
96 localNodeId = clusterService.getLocalNode().id();
97 leadershipService.runForLeadership(appId.name());
98 k8sNodeService.addListener(k8sNodeListener);
99
100 log.info("Started");
101 }
102
103 @Deactivate
104 protected void deactivate() {
105 k8sNodeService.removeListener(k8sNodeListener);
106 leadershipService.withdraw(appId.name());
107 eventExecutor.shutdown();
108
109 log.info("Stopped");
110 }
111
112 private class InternalK8sNodeListener implements K8sNodeListener {
113
114 private boolean isRelevantHelper() {
115 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
116 }
117
118 @Override
119 public void event(K8sNodeEvent event) {
120 switch (event.type()) {
121 case K8S_NODE_CREATED:
122 eventExecutor.execute(() -> processNodeCreation(event.subject()));
123 break;
124 case K8S_NODE_REMOVED:
125 eventExecutor.execute(() -> processNodeRemoval(event.subject()));
126 break;
127 default:
128 break;
129 }
130 }
131
132 private void processNodeCreation(K8sNode k8sNode) {
133 if (!isRelevantHelper()) {
134 return;
135 }
136
137 K8sNetwork network = k8sNetworkService.network(k8sNode.hostname());
138 if (network == null) {
139 K8sNetwork.Builder nBuilder = DefaultK8sNetwork.builder()
140 .networkId(k8sNode.hostname())
141 .name(k8sNode.hostname())
142 .cidr(k8sNode.podCidr());
143 k8sNetworkService.createNetwork(nBuilder.build());
144 }
145 }
146
147 private void processNodeRemoval(K8sNode k8sNode) {
148 if (!isRelevantHelper()) {
149 return;
150 }
151
152 K8sNetwork network = k8sNetworkService.network(k8sNode.hostname());
153 if (network != null) {
154 k8sNetworkService.removeNetwork(k8sNode.hostname());
155 }
156 }
157 }
158}