blob: 371a34a4503072b84a5202a33aa07bf9f5854249 [file] [log] [blame]
Jian Li85387732019-02-19 23:56:18 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import io.fabric8.kubernetes.api.model.Endpoints;
19import io.fabric8.kubernetes.client.KubernetesClient;
20import io.fabric8.kubernetes.client.KubernetesClientException;
21import io.fabric8.kubernetes.client.Watcher;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
27import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
28import org.onosproject.k8snode.api.K8sApiConfig;
29import org.onosproject.k8snode.api.K8sApiConfigService;
30import org.onosproject.mastership.MastershipService;
31import org.osgi.service.component.annotations.Activate;
32import org.osgi.service.component.annotations.Component;
33import org.osgi.service.component.annotations.Deactivate;
34import org.osgi.service.component.annotations.Reference;
35import org.osgi.service.component.annotations.ReferenceCardinality;
36import org.slf4j.Logger;
37
38import java.util.Objects;
39import java.util.concurrent.ExecutorService;
40
41import static java.util.concurrent.Executors.newSingleThreadExecutor;
42import static org.onlab.util.Tools.groupedThreads;
43import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
44import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.k8sClient;
45import static org.slf4j.LoggerFactory.getLogger;
46
47/**
48 * Kubernetes endpoints watcher used for feeding endpoints information.
49 */
50@Component(immediate = true)
51public class K8sEndpointsWatcher {
52
53 private final Logger log = getLogger(getClass());
54
55 @Reference(cardinality = ReferenceCardinality.MANDATORY)
56 protected CoreService coreService;
57
58 @Reference(cardinality = ReferenceCardinality.MANDATORY)
59 protected MastershipService mastershipService;
60
61 @Reference(cardinality = ReferenceCardinality.MANDATORY)
62 protected ClusterService clusterService;
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY)
65 protected LeadershipService leadershipService;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY)
68 protected K8sEndpointsAdminService k8sEndpointsAdminService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY)
71 protected K8sApiConfigService k8sApiConfigService;
72
73 private final ExecutorService eventExecutor = newSingleThreadExecutor(
74 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
75 private final Watcher<Endpoints>
76 internalEndpointsWatcher = new InternalK8sEndpointsWatcher();
77
78
79 private ApplicationId appId;
80 private NodeId localNodeId;
81
82 @Activate
83 protected void activate() {
84 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
85 localNodeId = clusterService.getLocalNode().id();
86 leadershipService.runForLeadership(appId.name());
87
88 initWatcher();
89
90 log.info("Started");
91 }
92
93 @Deactivate
94 protected void deactivate() {
95 leadershipService.withdraw(appId.name());
96 eventExecutor.shutdown();
97
98 log.info("Stopped");
99 }
100
101 private void initWatcher() {
102 K8sApiConfig config =
103 k8sApiConfigService.apiConfigs().stream().findAny().orElse(null);
104 if (config == null) {
105 log.error("Failed to find valid kubernetes API configuration.");
106 return;
107 }
108
109 KubernetesClient client = k8sClient(config);
110
111 if (client == null) {
112 log.error("Failed to connect to kubernetes API server.");
113 return;
114 }
115
116 client.endpoints().watch(internalEndpointsWatcher);
117 }
118
119 private class InternalK8sEndpointsWatcher implements Watcher<Endpoints> {
120
121 @Override
122 public void eventReceived(Action action, Endpoints endpoints) {
123 switch (action) {
124 case ADDED:
125 eventExecutor.execute(() -> processAddition(endpoints));
126 break;
127 case MODIFIED:
128 eventExecutor.execute(() -> processModification(endpoints));
129 break;
130 case DELETED:
131 eventExecutor.execute(() -> processDeletion(endpoints));
132 break;
133 case ERROR:
134 log.warn("Failures processing endpoints manipulation.");
135 break;
136 default:
137 // do nothing
138 break;
139 }
140 }
141
142 @Override
143 public void onClose(KubernetesClientException e) {
144 log.info("Endpoints watcher OnClose: {}" + e);
145 }
146
147 private void processAddition(Endpoints endpoints) {
148 if (!isMaster()) {
149 return;
150 }
151
152 log.info("Process endpoints {} creating event from API server.",
153 endpoints.getMetadata().getName());
154
155 k8sEndpointsAdminService.createEndpoints(endpoints);
156 }
157
158 private void processModification(Endpoints endpoints) {
159 if (!isMaster()) {
160 return;
161 }
162
163 log.info("Process endpoints {} updating event from API server.",
164 endpoints.getMetadata().getName());
165
166 k8sEndpointsAdminService.updateEndpoints(endpoints);
167 }
168
169 private void processDeletion(Endpoints endpoints) {
170 if (!isMaster()) {
171 return;
172 }
173
174 log.info("Process endpoints {} removal event from API server.",
175 endpoints.getMetadata().getName());
176
177 k8sEndpointsAdminService.removeEndpoints(endpoints.getMetadata().getUid());
178 }
179
180 private boolean isMaster() {
181 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
182 }
183 }
184}