blob: e7d0c452d52989daee981f703ad6ec5cac4da65f [file] [log] [blame]
Jian Li9b199162019-02-10 18:00:35 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
Jian Li7970b712019-05-03 20:58:21 +090018import io.fabric8.kubernetes.api.model.Pod;
19import org.apache.commons.lang.StringUtils;
Jian Li9b199162019-02-10 18:00:35 +090020import org.onlab.packet.IpAddress;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.LeadershipService;
23import org.onosproject.cluster.NodeId;
24import org.onosproject.core.ApplicationId;
25import org.onosproject.core.CoreService;
26import org.onosproject.k8snetworking.api.K8sIpamAdminService;
27import org.onosproject.k8snetworking.api.K8sNetworkEvent;
28import org.onosproject.k8snetworking.api.K8sNetworkListener;
29import org.onosproject.k8snetworking.api.K8sNetworkService;
Jian Li7970b712019-05-03 20:58:21 +090030import org.onosproject.k8snetworking.api.K8sPodEvent;
31import org.onosproject.k8snetworking.api.K8sPodListener;
32import org.onosproject.k8snetworking.api.K8sPodService;
Jian Li9b199162019-02-10 18:00:35 +090033import org.onosproject.mastership.MastershipService;
34import org.osgi.service.component.annotations.Activate;
35import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.annotations.Deactivate;
37import org.osgi.service.component.annotations.Reference;
38import org.osgi.service.component.annotations.ReferenceCardinality;
39import org.slf4j.Logger;
40
Jian Li7970b712019-05-03 20:58:21 +090041import java.util.Map;
Jian Li9b199162019-02-10 18:00:35 +090042import java.util.Objects;
43import java.util.Set;
44import java.util.concurrent.ExecutorService;
45
46import static java.util.concurrent.Executors.newSingleThreadExecutor;
47import static org.onlab.util.Tools.groupedThreads;
48import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
49import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getSubnetIps;
50import static org.slf4j.LoggerFactory.getLogger;
51
52/**
53 * Initializes and purges the kubernetes IPAM.
54 */
55@Component(immediate = true)
56public class K8sIpamHandler {
57
58 private final Logger log = getLogger(getClass());
59
Jian Li7970b712019-05-03 20:58:21 +090060 private static final String IP_ADDRESS = "ipAddress";
61 private static final String NETWORK_ID = "networkId";
62
Jian Liee039592019-08-26 14:54:43 +090063
Jian Li9b199162019-02-10 18:00:35 +090064 @Reference(cardinality = ReferenceCardinality.MANDATORY)
65 protected CoreService coreService;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY)
68 protected MastershipService mastershipService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY)
71 protected ClusterService clusterService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY)
74 protected LeadershipService leadershipService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY)
77 protected K8sNetworkService k8sNetworkService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li7970b712019-05-03 20:58:21 +090080 protected K8sPodService k8sPodService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li9b199162019-02-10 18:00:35 +090083 protected K8sIpamAdminService k8sIpamAdminService;
84
85 private final ExecutorService eventExecutor = newSingleThreadExecutor(
86 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
87 private final InternalK8sNetworkListener k8sNetworkListener =
88 new InternalK8sNetworkListener();
Jian Li7970b712019-05-03 20:58:21 +090089 private final InternalK8sPodListener k8sPodListener =
90 new InternalK8sPodListener();
Jian Li9b199162019-02-10 18:00:35 +090091
92 private ApplicationId appId;
93 private NodeId localNodeId;
94
95 @Activate
96 protected void activate() {
97 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
98 localNodeId = clusterService.getLocalNode().id();
99 leadershipService.runForLeadership(appId.name());
100 k8sNetworkService.addListener(k8sNetworkListener);
Jian Li7970b712019-05-03 20:58:21 +0900101 k8sPodService.addListener(k8sPodListener);
Jian Li9b199162019-02-10 18:00:35 +0900102
103 log.info("Started");
104 }
105
106 @Deactivate
107 protected void deactivate() {
Jian Li7970b712019-05-03 20:58:21 +0900108 k8sPodService.removeListener(k8sPodListener);
Jian Li9b199162019-02-10 18:00:35 +0900109 k8sNetworkService.removeListener(k8sNetworkListener);
110 leadershipService.withdraw(appId.name());
111 eventExecutor.shutdown();
112
113 log.info("Stopped");
114 }
115
116 private class InternalK8sNetworkListener implements K8sNetworkListener {
117
118 private boolean isRelevantHelper() {
119 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
120 }
121
122 @Override
123 public void event(K8sNetworkEvent event) {
124 switch (event.type()) {
125 case K8S_NETWORK_CREATED:
126 eventExecutor.execute(() -> processNetworkAddition(event));
127 break;
128 case K8S_NETWORK_REMOVED:
129 eventExecutor.execute(() -> processNetworkRemoval(event));
130 break;
131 default:
132 break;
133 }
134 }
135
136 private void processNetworkAddition(K8sNetworkEvent event) {
137 if (!isRelevantHelper()) {
138 return;
139 }
140
141 Set<IpAddress> ips = getSubnetIps(event.subject().cidr());
Jian Li5cf3b002019-08-30 17:57:53 +0900142 String networkId = event.subject().networkId();
143 k8sIpamAdminService.initializeIpPool(networkId, ips);
144
145 k8sPodService.pods().stream()
146 .filter(p -> p.getStatus().getPodIP() != null)
147 .filter(p -> p.getMetadata().getAnnotations() != null)
148 .filter(p -> networkId.equals(p.getMetadata()
149 .getAnnotations().get(NETWORK_ID)))
150 .forEach(p -> {
151 String podIp = p.getStatus().getPodIP();
152
153 // if the POD with valid IP address has not yet been
154 // added into IPAM IP pool, we will reserve that IP address
155 // for the POD
156 if (!k8sIpamAdminService.allocatedIps(networkId)
157 .contains(IpAddress.valueOf(podIp))) {
158 k8sIpamAdminService.reserveIp(networkId, IpAddress.valueOf(podIp));
159 }
160 });
Jian Li9b199162019-02-10 18:00:35 +0900161 }
162
163 private void processNetworkRemoval(K8sNetworkEvent event) {
164 if (!isRelevantHelper()) {
165 return;
166 }
167
168 k8sIpamAdminService.purgeIpPool(event.subject().networkId());
169 }
170 }
Jian Li7970b712019-05-03 20:58:21 +0900171
172 private class InternalK8sPodListener implements K8sPodListener {
173
174 private boolean isRelevantHelper() {
175 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
176 }
177
178 @Override
179 public void event(K8sPodEvent event) {
180 switch (event.type()) {
181 case K8S_POD_CREATED:
182 eventExecutor.execute(() -> processPodCreation(event.subject()));
183 break;
184 case K8S_POD_REMOVED:
185 default:
186 break;
187 }
188 }
189
190 private void processPodCreation(Pod pod) {
191 if (!isRelevantHelper()) {
192 return;
193 }
194
195 Map<String, String> annots = pod.getMetadata().getAnnotations();
196
197 if (annots == null || annots.isEmpty()) {
198 return;
199 }
200
201 String annotIp = annots.get(IP_ADDRESS);
202 String annotNetwork = annots.get(NETWORK_ID);
203 String podIp = pod.getStatus().getPodIP();
204
205 if (podIp == null && annotIp == null) {
206 return;
207 }
208
209 if (annotNetwork == null) {
210 return;
211 }
212
213 if (!StringUtils.equals(annotIp, podIp)) {
214 return;
215 }
216
Jian Liee039592019-08-26 14:54:43 +0900217 k8sIpamAdminService.availableIps(annotNetwork);
218
Jian Li5cf3b002019-08-30 17:57:53 +0900219 // if the kubernetes network has been initialized, we may have
220 // empty available IP pool, in this case, we will postpone IP reserve
221 // process until finishing kubernetes network initialization
222 if (!containIp(annotIp, annotNetwork)) {
223 return;
Jian Liee039592019-08-26 14:54:43 +0900224 }
225
Jian Li7970b712019-05-03 20:58:21 +0900226 k8sIpamAdminService.reserveIp(annotNetwork, IpAddress.valueOf(podIp));
227 }
Jian Liee039592019-08-26 14:54:43 +0900228
229 private boolean containIp(String podIp, String networkId) {
230 return k8sIpamAdminService.availableIps(networkId).stream()
231 .anyMatch(i -> i.toString().equals(podIp));
232 }
Jian Li7970b712019-05-03 20:58:21 +0900233 }
Jian Li9b199162019-02-10 18:00:35 +0900234}