blob: f00de400de4e041cc2ee45154c1a4f4e83c64b15 [file] [log] [blame]
Jian Li85387732019-02-19 23:56:18 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import io.fabric8.kubernetes.api.model.Pod;
19import io.fabric8.kubernetes.client.KubernetesClient;
20import io.fabric8.kubernetes.client.KubernetesClientException;
21import io.fabric8.kubernetes.client.Watcher;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
27import org.onosproject.k8snetworking.api.K8sPodAdminService;
28import org.onosproject.k8snode.api.K8sApiConfig;
29import org.onosproject.k8snode.api.K8sApiConfigService;
30import org.onosproject.mastership.MastershipService;
31import org.osgi.service.component.annotations.Activate;
32import org.osgi.service.component.annotations.Component;
33import org.osgi.service.component.annotations.Deactivate;
34import org.osgi.service.component.annotations.Reference;
35import org.osgi.service.component.annotations.ReferenceCardinality;
36import org.slf4j.Logger;
37
38import java.util.Objects;
39import java.util.concurrent.ExecutorService;
40
41import static java.util.concurrent.Executors.newSingleThreadExecutor;
42import static org.onlab.util.Tools.groupedThreads;
43import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
44import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.k8sClient;
45import static org.slf4j.LoggerFactory.getLogger;
46
47/**
48 * Kubernetes pod watcher used for feeding pod information.
49 */
50@Component(immediate = true)
51public class K8sPodWatcher {
52
53 private final Logger log = getLogger(getClass());
54
55 @Reference(cardinality = ReferenceCardinality.MANDATORY)
56 protected CoreService coreService;
57
58 @Reference(cardinality = ReferenceCardinality.MANDATORY)
59 protected MastershipService mastershipService;
60
61 @Reference(cardinality = ReferenceCardinality.MANDATORY)
62 protected ClusterService clusterService;
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY)
65 protected LeadershipService leadershipService;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY)
68 protected K8sPodAdminService k8sPodAdminService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY)
71 protected K8sApiConfigService k8sApiConfigService;
72
73 private final ExecutorService eventExecutor = newSingleThreadExecutor(
74 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
75 private final Watcher<Pod> internalK8sPodWatcher = new InternalK8sPodWatcher();
76
77 private ApplicationId appId;
78 private NodeId localNodeId;
79
80 @Activate
81 protected void activate() {
82 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
83 localNodeId = clusterService.getLocalNode().id();
84 leadershipService.runForLeadership(appId.name());
85
86 initWatcher();
87
88 log.info("Started");
89 }
90
91 @Deactivate
92 protected void deactivate() {
93 leadershipService.withdraw(appId.name());
94 eventExecutor.shutdown();
95
96 log.info("Stopped");
97 }
98
99 private void initWatcher() {
100 K8sApiConfig config =
101 k8sApiConfigService.apiConfigs().stream().findAny().orElse(null);
102 if (config == null) {
103 log.error("Failed to find valid kubernetes API configuration.");
104 return;
105 }
106
107 KubernetesClient client = k8sClient(config);
108
109 if (client == null) {
110 log.error("Failed to connect to kubernetes API server.");
111 return;
112 }
113
114 client.pods().watch(internalK8sPodWatcher);
115 }
116
117 private class InternalK8sPodWatcher implements Watcher<Pod> {
118
119 @Override
120 public void eventReceived(Action action, Pod pod) {
121 switch (action) {
122 case ADDED:
123 eventExecutor.execute(() -> processAddition(pod));
124 break;
125 case MODIFIED:
126 eventExecutor.execute(() -> processModification(pod));
127 break;
128 case DELETED:
129 eventExecutor.execute(() -> processDeletion(pod));
130 break;
131 case ERROR:
132 log.warn("Failures processing pod manipulation.");
133 break;
134 default:
135 break;
136 }
137 }
138
139 @Override
140 public void onClose(KubernetesClientException e) {
141 log.info("Pod watcher OnClose: {}" + e);
142 }
143
144 private void processAddition(Pod pod) {
145 if (!isMaster()) {
146 return;
147 }
148
149 log.info("Process pod {} creating event from API server.",
150 pod.getMetadata().getName());
151
152 k8sPodAdminService.createPod(pod);
153 }
154
155 private void processModification(Pod pod) {
156 if (!isMaster()) {
157 return;
158 }
159
160 log.info("Process pod {} updating event from API server.",
161 pod.getMetadata().getName());
162
163 k8sPodAdminService.updatePod(pod);
164 }
165
166 private void processDeletion(Pod pod) {
167 if (!isMaster()) {
168 return;
169 }
170
171 log.info("Process pod {} removal event from API server.",
172 pod.getMetadata().getName());
173
174 k8sPodAdminService.removePod(pod.getMetadata().getUid());
175 }
176
177 private boolean isMaster() {
178 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
179 }
180 }
181}