blob: 4bc6f7c9f0bed87d7bb3bbb00e6ae2c154b43a91 [file] [log] [blame]
Jian Li9b199162019-02-10 18:00:35 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
Jian Li7970b712019-05-03 20:58:21 +090018import io.fabric8.kubernetes.api.model.Pod;
19import org.apache.commons.lang.StringUtils;
Jian Li9b199162019-02-10 18:00:35 +090020import org.onlab.packet.IpAddress;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.LeadershipService;
23import org.onosproject.cluster.NodeId;
24import org.onosproject.core.ApplicationId;
25import org.onosproject.core.CoreService;
26import org.onosproject.k8snetworking.api.K8sIpamAdminService;
27import org.onosproject.k8snetworking.api.K8sNetworkEvent;
28import org.onosproject.k8snetworking.api.K8sNetworkListener;
29import org.onosproject.k8snetworking.api.K8sNetworkService;
Jian Li7970b712019-05-03 20:58:21 +090030import org.onosproject.k8snetworking.api.K8sPodEvent;
31import org.onosproject.k8snetworking.api.K8sPodListener;
32import org.onosproject.k8snetworking.api.K8sPodService;
Jian Li9b199162019-02-10 18:00:35 +090033import org.onosproject.mastership.MastershipService;
34import org.osgi.service.component.annotations.Activate;
35import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.annotations.Deactivate;
37import org.osgi.service.component.annotations.Reference;
38import org.osgi.service.component.annotations.ReferenceCardinality;
39import org.slf4j.Logger;
40
Jian Li7970b712019-05-03 20:58:21 +090041import java.util.Map;
Jian Li9b199162019-02-10 18:00:35 +090042import java.util.Objects;
43import java.util.Set;
44import java.util.concurrent.ExecutorService;
45
46import static java.util.concurrent.Executors.newSingleThreadExecutor;
47import static org.onlab.util.Tools.groupedThreads;
48import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
49import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getSubnetIps;
50import static org.slf4j.LoggerFactory.getLogger;
51
52/**
53 * Initializes and purges the kubernetes IPAM.
54 */
55@Component(immediate = true)
56public class K8sIpamHandler {
57
58 private final Logger log = getLogger(getClass());
59
Jian Li7970b712019-05-03 20:58:21 +090060 private static final String IP_ADDRESS = "ipAddress";
61 private static final String NETWORK_ID = "networkId";
62
Jian Li9b199162019-02-10 18:00:35 +090063 @Reference(cardinality = ReferenceCardinality.MANDATORY)
64 protected CoreService coreService;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY)
67 protected MastershipService mastershipService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY)
70 protected ClusterService clusterService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY)
73 protected LeadershipService leadershipService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY)
76 protected K8sNetworkService k8sNetworkService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li7970b712019-05-03 20:58:21 +090079 protected K8sPodService k8sPodService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li9b199162019-02-10 18:00:35 +090082 protected K8sIpamAdminService k8sIpamAdminService;
83
84 private final ExecutorService eventExecutor = newSingleThreadExecutor(
85 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
86 private final InternalK8sNetworkListener k8sNetworkListener =
87 new InternalK8sNetworkListener();
Jian Li7970b712019-05-03 20:58:21 +090088 private final InternalK8sPodListener k8sPodListener =
89 new InternalK8sPodListener();
Jian Li9b199162019-02-10 18:00:35 +090090
91 private ApplicationId appId;
92 private NodeId localNodeId;
93
94 @Activate
95 protected void activate() {
96 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
97 localNodeId = clusterService.getLocalNode().id();
98 leadershipService.runForLeadership(appId.name());
99 k8sNetworkService.addListener(k8sNetworkListener);
Jian Li7970b712019-05-03 20:58:21 +0900100 k8sPodService.addListener(k8sPodListener);
Jian Li9b199162019-02-10 18:00:35 +0900101
102 log.info("Started");
103 }
104
105 @Deactivate
106 protected void deactivate() {
Jian Li7970b712019-05-03 20:58:21 +0900107 k8sPodService.removeListener(k8sPodListener);
Jian Li9b199162019-02-10 18:00:35 +0900108 k8sNetworkService.removeListener(k8sNetworkListener);
109 leadershipService.withdraw(appId.name());
110 eventExecutor.shutdown();
111
112 log.info("Stopped");
113 }
114
115 private class InternalK8sNetworkListener implements K8sNetworkListener {
116
117 private boolean isRelevantHelper() {
118 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
119 }
120
121 @Override
122 public void event(K8sNetworkEvent event) {
123 switch (event.type()) {
124 case K8S_NETWORK_CREATED:
125 eventExecutor.execute(() -> processNetworkAddition(event));
126 break;
127 case K8S_NETWORK_REMOVED:
128 eventExecutor.execute(() -> processNetworkRemoval(event));
129 break;
130 default:
131 break;
132 }
133 }
134
135 private void processNetworkAddition(K8sNetworkEvent event) {
136 if (!isRelevantHelper()) {
137 return;
138 }
139
140 Set<IpAddress> ips = getSubnetIps(event.subject().cidr());
141 k8sIpamAdminService.initializeIpPool(event.subject().networkId(), ips);
142 }
143
144 private void processNetworkRemoval(K8sNetworkEvent event) {
145 if (!isRelevantHelper()) {
146 return;
147 }
148
149 k8sIpamAdminService.purgeIpPool(event.subject().networkId());
150 }
151 }
Jian Li7970b712019-05-03 20:58:21 +0900152
153 private class InternalK8sPodListener implements K8sPodListener {
154
155 private boolean isRelevantHelper() {
156 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
157 }
158
159 @Override
160 public void event(K8sPodEvent event) {
161 switch (event.type()) {
162 case K8S_POD_CREATED:
163 eventExecutor.execute(() -> processPodCreation(event.subject()));
164 break;
165 case K8S_POD_REMOVED:
166 default:
167 break;
168 }
169 }
170
171 private void processPodCreation(Pod pod) {
172 if (!isRelevantHelper()) {
173 return;
174 }
175
176 Map<String, String> annots = pod.getMetadata().getAnnotations();
177
178 if (annots == null || annots.isEmpty()) {
179 return;
180 }
181
182 String annotIp = annots.get(IP_ADDRESS);
183 String annotNetwork = annots.get(NETWORK_ID);
184 String podIp = pod.getStatus().getPodIP();
185
186 if (podIp == null && annotIp == null) {
187 return;
188 }
189
190 if (annotNetwork == null) {
191 return;
192 }
193
194 if (!StringUtils.equals(annotIp, podIp)) {
195 return;
196 }
197
198 k8sIpamAdminService.reserveIp(annotNetwork, IpAddress.valueOf(podIp));
199 }
200 }
Jian Li9b199162019-02-10 18:00:35 +0900201}