blob: 5239ee4022e424ad5f72e632e4a30408b4bcd42d [file] [log] [blame]
Jian Li3d1111e2019-02-22 02:02:13 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import com.google.common.collect.Maps;
19import io.fabric8.kubernetes.api.model.Pod;
20import io.fabric8.kubernetes.client.KubernetesClient;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.LeadershipService;
23import org.onosproject.cluster.NodeId;
24import org.onosproject.core.ApplicationId;
25import org.onosproject.core.CoreService;
26import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
27import org.onosproject.k8snetworking.api.K8sNetworkEvent;
28import org.onosproject.k8snetworking.api.K8sNetworkListener;
29import org.onosproject.k8snetworking.api.K8sPodAdminService;
30import org.onosproject.k8snetworking.api.K8sPodEvent;
31import org.onosproject.k8snetworking.api.K8sPodListener;
32import org.onosproject.k8snetworking.api.K8sPort;
33import org.onosproject.k8snode.api.K8sApiConfigService;
34import org.onosproject.mastership.MastershipService;
35import org.onosproject.net.device.DeviceService;
36import org.onosproject.net.driver.DriverService;
37import org.osgi.service.component.annotations.Activate;
38import org.osgi.service.component.annotations.Component;
39import org.osgi.service.component.annotations.Deactivate;
40import org.osgi.service.component.annotations.Reference;
41import org.osgi.service.component.annotations.ReferenceCardinality;
42import org.slf4j.Logger;
43
44import java.util.Map;
45import java.util.Objects;
46import java.util.Set;
47import java.util.concurrent.ExecutorService;
48
49import static java.util.concurrent.Executors.newSingleThreadExecutor;
50import static org.onlab.util.Tools.groupedThreads;
51import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
52import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.k8sClient;
53import static org.slf4j.LoggerFactory.getLogger;
54
55/**
56 * Associates the kubernetes container port and pod.
57 */
58@Component(immediate = true)
59public class K8sPodPortMapper {
60
61 private final Logger log = getLogger(getClass());
62
63 private static final String PORT_ID = "portId";
64 private static final String DEVICE_ID = "deviceId";
65 private static final String PORT_NUMBER = "portNumber";
66 private static final String IP_ADDRESS = "ipAddress";
67 private static final String MAC_ADDRESS = "macAddress";
68 private static final String NETWORK_ID = "networkId";
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY)
71 protected CoreService coreService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY)
74 protected MastershipService mastershipService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY)
77 protected ClusterService clusterService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY)
80 protected LeadershipService leadershipService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 protected DeviceService deviceService;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 protected DriverService driverService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected K8sNetworkAdminService k8sNetworkService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY)
92 protected K8sPodAdminService k8sPodService;
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected K8sApiConfigService k8sApiConfigService;
96
97 private final ExecutorService eventExecutor = newSingleThreadExecutor(
98 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
99 private final InternalK8sNetworkListener k8sNetworkListener =
100 new InternalK8sNetworkListener();
101 private final InternalK8sPodListener k8sPodListener =
102 new InternalK8sPodListener();
103
104 private ApplicationId appId;
105 private NodeId localNodeId;
106
107 @Activate
108 protected void activate() {
109 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
110 localNodeId = clusterService.getLocalNode().id();
111 leadershipService.runForLeadership(appId.name());
112 k8sNetworkService.addListener(k8sNetworkListener);
113 k8sPodService.addListener(k8sPodListener);
114 log.info("Started");
115 }
116
117 @Deactivate
118 protected void deactivate() {
119 k8sNetworkService.removeListener(k8sNetworkListener);
120 k8sPodService.removeListener(k8sPodListener);
121 leadershipService.withdraw(appId.name());
122 eventExecutor.shutdown();
123
124 log.info("Stopped");
125 }
126
127 private class InternalK8sPodListener implements K8sPodListener {
128
129 private boolean isRelevantHelper() {
130 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
131 }
132
133 @Override
134 public void event(K8sPodEvent event) {
135 switch (event.type()) {
136 case K8S_POD_CREATED:
137 case K8S_POD_UPDATED:
138 eventExecutor.execute(() -> processPodCreation(event.subject()));
139 break;
140 case K8S_POD_REMOVED:
141 default:
142 break;
143 }
144 }
145
146 private void processPodCreation(Pod pod) {
147 if (!isRelevantHelper()) {
148 return;
149 }
150
151 KubernetesClient client = k8sClient(k8sApiConfigService);
152
153 if (client == null) {
154 return;
155 }
156
157 // if the annotations were configured, we will not update it
158 if (pod.getMetadata().getAnnotations() != null) {
159 return;
160 }
161
162 Set<K8sPort> ports = k8sNetworkService.ports();
163
164 // TODO: we assume that POD IP is unique, there might be other
165 // variable which preserves better uniqueness
166 ports.stream().filter(p -> p.ipAddress().toString()
167 .equals(pod.getStatus().getPodIP()))
168 .forEach(p -> {
169 Map<String, String> annotations = Maps.newConcurrentMap();
170 annotations.put(PORT_ID, p.portId());
171 annotations.put(NETWORK_ID, p.networkId());
172 annotations.put(DEVICE_ID, p.deviceId().toString());
173 annotations.put(PORT_NUMBER, p.portNumber().toString());
174 annotations.put(IP_ADDRESS, p.ipAddress().toString());
175 annotations.put(MAC_ADDRESS, p.macAddress().toString());
176
177 client.pods().inNamespace(pod.getMetadata().getNamespace())
178 .withName(pod.getMetadata().getName())
179 .edit()
180 .editMetadata()
181 .addToAnnotations(annotations)
182 .endMetadata().done();
183 });
184 }
185 }
186
187 private class InternalK8sNetworkListener implements K8sNetworkListener {
188
189 private boolean isRelevantHelper(K8sNetworkEvent event) {
190 return mastershipService.isLocalMaster(event.port().deviceId());
191 }
192
193 @Override
194 public void event(K8sNetworkEvent event) {
195 switch (event.type()) {
196 case K8S_PORT_UPDATED:
197 case K8S_PORT_REMOVED:
198 // no need to process port removal event...
199 default:
200 break;
201 }
202 }
203 }
204}