blob: fefb9528f216ad9d1287222c96ae9646bd5e0614 [file] [log] [blame]
Jian Li9b199162019-02-10 18:00:35 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
Jian Li7970b712019-05-03 20:58:21 +090018import io.fabric8.kubernetes.api.model.Pod;
19import org.apache.commons.lang.StringUtils;
Jian Li9b199162019-02-10 18:00:35 +090020import org.onlab.packet.IpAddress;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.LeadershipService;
23import org.onosproject.cluster.NodeId;
24import org.onosproject.core.ApplicationId;
25import org.onosproject.core.CoreService;
26import org.onosproject.k8snetworking.api.K8sIpamAdminService;
27import org.onosproject.k8snetworking.api.K8sNetworkEvent;
28import org.onosproject.k8snetworking.api.K8sNetworkListener;
29import org.onosproject.k8snetworking.api.K8sNetworkService;
Jian Li7970b712019-05-03 20:58:21 +090030import org.onosproject.k8snetworking.api.K8sPodEvent;
31import org.onosproject.k8snetworking.api.K8sPodListener;
32import org.onosproject.k8snetworking.api.K8sPodService;
Jian Li9b199162019-02-10 18:00:35 +090033import org.onosproject.mastership.MastershipService;
34import org.osgi.service.component.annotations.Activate;
35import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.annotations.Deactivate;
37import org.osgi.service.component.annotations.Reference;
38import org.osgi.service.component.annotations.ReferenceCardinality;
39import org.slf4j.Logger;
40
Jian Li7970b712019-05-03 20:58:21 +090041import java.util.Map;
Jian Li9b199162019-02-10 18:00:35 +090042import java.util.Objects;
43import java.util.Set;
44import java.util.concurrent.ExecutorService;
45
Jian Li186fde52019-08-26 14:54:43 +090046import static java.lang.Thread.sleep;
Jian Li9b199162019-02-10 18:00:35 +090047import static java.util.concurrent.Executors.newSingleThreadExecutor;
48import static org.onlab.util.Tools.groupedThreads;
49import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
50import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getSubnetIps;
51import static org.slf4j.LoggerFactory.getLogger;
52
53/**
54 * Initializes and purges the kubernetes IPAM.
55 */
56@Component(immediate = true)
57public class K8sIpamHandler {
58
59 private final Logger log = getLogger(getClass());
60
Jian Li7970b712019-05-03 20:58:21 +090061 private static final String IP_ADDRESS = "ipAddress";
62 private static final String NETWORK_ID = "networkId";
63
Jian Li186fde52019-08-26 14:54:43 +090064 private static final int RETRY_NUM = 5;
65 private static final int RETRY_DELAY_MS = 3000;
66
Jian Li9b199162019-02-10 18:00:35 +090067 @Reference(cardinality = ReferenceCardinality.MANDATORY)
68 protected CoreService coreService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY)
71 protected MastershipService mastershipService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY)
74 protected ClusterService clusterService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY)
77 protected LeadershipService leadershipService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY)
80 protected K8sNetworkService k8sNetworkService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li7970b712019-05-03 20:58:21 +090083 protected K8sPodService k8sPodService;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li9b199162019-02-10 18:00:35 +090086 protected K8sIpamAdminService k8sIpamAdminService;
87
88 private final ExecutorService eventExecutor = newSingleThreadExecutor(
89 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
90 private final InternalK8sNetworkListener k8sNetworkListener =
91 new InternalK8sNetworkListener();
Jian Li7970b712019-05-03 20:58:21 +090092 private final InternalK8sPodListener k8sPodListener =
93 new InternalK8sPodListener();
Jian Li9b199162019-02-10 18:00:35 +090094
95 private ApplicationId appId;
96 private NodeId localNodeId;
97
98 @Activate
99 protected void activate() {
100 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
101 localNodeId = clusterService.getLocalNode().id();
102 leadershipService.runForLeadership(appId.name());
103 k8sNetworkService.addListener(k8sNetworkListener);
Jian Li7970b712019-05-03 20:58:21 +0900104 k8sPodService.addListener(k8sPodListener);
Jian Li9b199162019-02-10 18:00:35 +0900105
106 log.info("Started");
107 }
108
109 @Deactivate
110 protected void deactivate() {
Jian Li7970b712019-05-03 20:58:21 +0900111 k8sPodService.removeListener(k8sPodListener);
Jian Li9b199162019-02-10 18:00:35 +0900112 k8sNetworkService.removeListener(k8sNetworkListener);
113 leadershipService.withdraw(appId.name());
114 eventExecutor.shutdown();
115
116 log.info("Stopped");
117 }
118
119 private class InternalK8sNetworkListener implements K8sNetworkListener {
120
121 private boolean isRelevantHelper() {
122 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
123 }
124
125 @Override
126 public void event(K8sNetworkEvent event) {
127 switch (event.type()) {
128 case K8S_NETWORK_CREATED:
129 eventExecutor.execute(() -> processNetworkAddition(event));
130 break;
131 case K8S_NETWORK_REMOVED:
132 eventExecutor.execute(() -> processNetworkRemoval(event));
133 break;
134 default:
135 break;
136 }
137 }
138
139 private void processNetworkAddition(K8sNetworkEvent event) {
140 if (!isRelevantHelper()) {
141 return;
142 }
143
144 Set<IpAddress> ips = getSubnetIps(event.subject().cidr());
145 k8sIpamAdminService.initializeIpPool(event.subject().networkId(), ips);
146 }
147
148 private void processNetworkRemoval(K8sNetworkEvent event) {
149 if (!isRelevantHelper()) {
150 return;
151 }
152
153 k8sIpamAdminService.purgeIpPool(event.subject().networkId());
154 }
155 }
Jian Li7970b712019-05-03 20:58:21 +0900156
157 private class InternalK8sPodListener implements K8sPodListener {
158
159 private boolean isRelevantHelper() {
160 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
161 }
162
163 @Override
164 public void event(K8sPodEvent event) {
165 switch (event.type()) {
166 case K8S_POD_CREATED:
167 eventExecutor.execute(() -> processPodCreation(event.subject()));
168 break;
169 case K8S_POD_REMOVED:
170 default:
171 break;
172 }
173 }
174
175 private void processPodCreation(Pod pod) {
176 if (!isRelevantHelper()) {
177 return;
178 }
179
180 Map<String, String> annots = pod.getMetadata().getAnnotations();
181
182 if (annots == null || annots.isEmpty()) {
183 return;
184 }
185
186 String annotIp = annots.get(IP_ADDRESS);
187 String annotNetwork = annots.get(NETWORK_ID);
188 String podIp = pod.getStatus().getPodIP();
189
190 if (podIp == null && annotIp == null) {
191 return;
192 }
193
194 if (annotNetwork == null) {
195 return;
196 }
197
198 if (!StringUtils.equals(annotIp, podIp)) {
199 return;
200 }
201
Jian Li186fde52019-08-26 14:54:43 +0900202 k8sIpamAdminService.availableIps(annotNetwork);
203
204 int cnt = 0;
205 while ((RETRY_NUM - cnt > 0) && !containIp(annotIp, annotNetwork)) {
206 try {
207 sleep(RETRY_DELAY_MS);
208 } catch (InterruptedException e) {
209 log.error("Exception caused during checking available IP addresses");
210 }
211
212 cnt++;
213 }
214
Jian Li7970b712019-05-03 20:58:21 +0900215 k8sIpamAdminService.reserveIp(annotNetwork, IpAddress.valueOf(podIp));
216 }
Jian Li186fde52019-08-26 14:54:43 +0900217
218 private boolean containIp(String podIp, String networkId) {
219 return k8sIpamAdminService.availableIps(networkId).stream()
220 .anyMatch(i -> i.toString().equals(podIp));
221 }
Jian Li7970b712019-05-03 20:58:21 +0900222 }
Jian Li9b199162019-02-10 18:00:35 +0900223}