blob: 998071d20f1d58607057b2248d7d039dfd669765 [file] [log] [blame]
Jian Li3d1111e2019-02-22 02:02:13 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.Maps;
19import io.fabric8.kubernetes.api.model.Pod;
Jian Li9ac73952021-01-15 14:53:22 +090020import io.fabric8.kubernetes.api.model.PodBuilder;
Jian Li3d1111e2019-02-22 02:02:13 +090021import io.fabric8.kubernetes.client.KubernetesClient;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
27import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
28import org.onosproject.k8snetworking.api.K8sNetworkEvent;
29import org.onosproject.k8snetworking.api.K8sNetworkListener;
30import org.onosproject.k8snetworking.api.K8sPodAdminService;
31import org.onosproject.k8snetworking.api.K8sPodEvent;
32import org.onosproject.k8snetworking.api.K8sPodListener;
33import org.onosproject.k8snetworking.api.K8sPort;
34import org.onosproject.k8snode.api.K8sApiConfigService;
35import org.onosproject.mastership.MastershipService;
36import org.onosproject.net.device.DeviceService;
37import org.onosproject.net.driver.DriverService;
38import org.osgi.service.component.annotations.Activate;
39import org.osgi.service.component.annotations.Component;
40import org.osgi.service.component.annotations.Deactivate;
41import org.osgi.service.component.annotations.Reference;
42import org.osgi.service.component.annotations.ReferenceCardinality;
43import org.slf4j.Logger;
44
45import java.util.Map;
46import java.util.Objects;
47import java.util.Set;
48import java.util.concurrent.ExecutorService;
49
50import static java.util.concurrent.Executors.newSingleThreadExecutor;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
53import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.k8sClient;
54import static org.slf4j.LoggerFactory.getLogger;
55
56/**
57 * Associates the kubernetes container port and pod.
58 */
59@Component(immediate = true)
60public class K8sPodPortMapper {
61
62 private final Logger log = getLogger(getClass());
63
64 private static final String PORT_ID = "portId";
65 private static final String DEVICE_ID = "deviceId";
66 private static final String PORT_NUMBER = "portNumber";
67 private static final String IP_ADDRESS = "ipAddress";
68 private static final String MAC_ADDRESS = "macAddress";
69 private static final String NETWORK_ID = "networkId";
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY)
72 protected CoreService coreService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY)
75 protected MastershipService mastershipService;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY)
78 protected ClusterService clusterService;
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY)
81 protected LeadershipService leadershipService;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY)
84 protected DeviceService deviceService;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected DriverService driverService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected K8sNetworkAdminService k8sNetworkService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected K8sPodAdminService k8sPodService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected K8sApiConfigService k8sApiConfigService;
97
98 private final ExecutorService eventExecutor = newSingleThreadExecutor(
99 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
100 private final InternalK8sNetworkListener k8sNetworkListener =
101 new InternalK8sNetworkListener();
102 private final InternalK8sPodListener k8sPodListener =
103 new InternalK8sPodListener();
104
105 private ApplicationId appId;
106 private NodeId localNodeId;
107
108 @Activate
109 protected void activate() {
110 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
111 localNodeId = clusterService.getLocalNode().id();
112 leadershipService.runForLeadership(appId.name());
113 k8sNetworkService.addListener(k8sNetworkListener);
114 k8sPodService.addListener(k8sPodListener);
115 log.info("Started");
116 }
117
118 @Deactivate
119 protected void deactivate() {
120 k8sNetworkService.removeListener(k8sNetworkListener);
121 k8sPodService.removeListener(k8sPodListener);
122 leadershipService.withdraw(appId.name());
123 eventExecutor.shutdown();
124
125 log.info("Stopped");
126 }
127
128 private class InternalK8sPodListener implements K8sPodListener {
129
130 private boolean isRelevantHelper() {
131 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
132 }
133
134 @Override
135 public void event(K8sPodEvent event) {
136 switch (event.type()) {
137 case K8S_POD_CREATED:
138 case K8S_POD_UPDATED:
139 eventExecutor.execute(() -> processPodCreation(event.subject()));
140 break;
141 case K8S_POD_REMOVED:
142 default:
143 break;
144 }
145 }
146
147 private void processPodCreation(Pod pod) {
148 if (!isRelevantHelper()) {
149 return;
150 }
151
152 KubernetesClient client = k8sClient(k8sApiConfigService);
153
154 if (client == null) {
155 return;
156 }
157
158 // if the annotations were configured, we will not update it
159 if (pod.getMetadata().getAnnotations() != null) {
160 return;
161 }
162
163 Set<K8sPort> ports = k8sNetworkService.ports();
164
165 // TODO: we assume that POD IP is unique, there might be other
166 // variable which preserves better uniqueness
167 ports.stream().filter(p -> p.ipAddress().toString()
168 .equals(pod.getStatus().getPodIP()))
169 .forEach(p -> {
170 Map<String, String> annotations = Maps.newConcurrentMap();
171 annotations.put(PORT_ID, p.portId());
172 annotations.put(NETWORK_ID, p.networkId());
173 annotations.put(DEVICE_ID, p.deviceId().toString());
Jian Li3d1111e2019-02-22 02:02:13 +0900174 annotations.put(IP_ADDRESS, p.ipAddress().toString());
175 annotations.put(MAC_ADDRESS, p.macAddress().toString());
176
Jian Li07c27f32020-10-08 02:57:45 +0900177 if (p.portNumber() != null) {
178 annotations.put(PORT_NUMBER, p.portNumber().toString());
179 }
180
Jian Li3d1111e2019-02-22 02:02:13 +0900181 client.pods().inNamespace(pod.getMetadata().getNamespace())
182 .withName(pod.getMetadata().getName())
Jian Li9ac73952021-01-15 14:53:22 +0900183 .edit(r -> new PodBuilder(r)
184 .editMetadata()
185 .addToAnnotations(annotations)
186 .endMetadata().build()
187 );
Jian Li3d1111e2019-02-22 02:02:13 +0900188 });
189 }
190 }
191
192 private class InternalK8sNetworkListener implements K8sNetworkListener {
193
194 private boolean isRelevantHelper(K8sNetworkEvent event) {
195 return mastershipService.isLocalMaster(event.port().deviceId());
196 }
197
198 @Override
199 public void event(K8sNetworkEvent event) {
200 switch (event.type()) {
201 case K8S_PORT_UPDATED:
202 case K8S_PORT_REMOVED:
203 // no need to process port removal event...
204 default:
205 break;
206 }
207 }
208 }
209}