blob: ef1acaac70228a10ecbf2e2bb2e1829a7ad25f58 [file] [log] [blame]
Jian Lid89db462019-02-08 18:21:57 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import org.onlab.packet.Ethernet;
19import org.onlab.packet.IPv4;
20import org.onlab.packet.IpAddress;
21import org.onlab.packet.IpPrefix;
22import org.onlab.packet.TpPort;
23import org.onosproject.cluster.ClusterService;
24import org.onosproject.cluster.LeadershipService;
25import org.onosproject.cluster.NodeId;
26import org.onosproject.core.ApplicationId;
27import org.onosproject.core.CoreService;
28import org.onosproject.k8snetworking.api.K8sFlowRuleService;
29import org.onosproject.k8snode.api.K8sNode;
30import org.onosproject.k8snode.api.K8sNodeEvent;
31import org.onosproject.k8snode.api.K8sNodeListener;
32import org.onosproject.k8snode.api.K8sNodeService;
33import org.onosproject.net.PortNumber;
34import org.onosproject.net.flow.DefaultTrafficSelector;
35import org.onosproject.net.flow.DefaultTrafficTreatment;
36import org.onosproject.net.flow.TrafficSelector;
37import org.onosproject.net.flow.TrafficTreatment;
38import org.onosproject.net.packet.PacketService;
39import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Reference;
43import org.osgi.service.component.annotations.ReferenceCardinality;
44import org.slf4j.Logger;
45
46import java.util.Objects;
47import java.util.concurrent.ExecutorService;
48
49import static java.util.concurrent.Executors.newSingleThreadExecutor;
50import static org.onlab.util.Tools.groupedThreads;
51import static org.onosproject.k8snetworking.api.Constants.FORWARDING_TABLE;
52import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
53import static org.onosproject.k8snetworking.api.Constants.PRIORITY_TRANSLATION_RULE;
54import static org.onosproject.k8snetworking.api.Constants.STAT_OUTBOUND_TABLE;
55import static org.slf4j.LoggerFactory.getLogger;
56
57/**
58 * Handles kubernetes API server requests from pods.
59 */
60@Component(immediate = true)
61public class K8sApiServerProxyHandler {
62 protected final Logger log = getLogger(getClass());
63
64 private static final String API_SERVER_CLUSTER_IP = "10.96.0.1";
65 private static final int API_SERVER_CLUSTER_PORT = 443;
66 private static final String API_SERVER_IP = "10.10.10.1";
67 private static final int API_SERVER_PORT = 6443;
68 private static final int PREFIX_LENGTH = 32;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY)
71 protected CoreService coreService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY)
74 protected PacketService packetService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY)
77 protected ClusterService clusterService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY)
80 protected LeadershipService leadershipService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 protected K8sNodeService k8sNodeService;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 protected K8sFlowRuleService k8sFlowRuleService;
87
88 private final ExecutorService eventExecutor = newSingleThreadExecutor(
89 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
90 private final K8sNodeListener k8sNodeListener = new InternalNodeEventListener();
91
92 private ApplicationId appId;
93 private NodeId localNodeId;
94
95 @Activate
96 protected void activate() {
97 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
98 localNodeId = clusterService.getLocalNode().id();
99 k8sNodeService.addListener(k8sNodeListener);
100 leadershipService.runForLeadership(appId.name());
101
102 log.info("Started");
103 }
104
105 @Deactivate
106 protected void deactivate() {
107 k8sNodeService.removeListener(k8sNodeListener);
108 leadershipService.withdraw(appId.name());
109 eventExecutor.shutdown();
110
111 log.info("Stopped");
112 }
113
114 private class InternalNodeEventListener implements K8sNodeListener {
115
116 private boolean isRelevantHelper() {
117 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
118 }
119
120 @Override
121 public void event(K8sNodeEvent event) {
122 K8sNode k8sNode = event.subject();
123 switch (event.type()) {
124 case K8S_NODE_COMPLETE:
125 eventExecutor.execute(() -> processNodeCompletion(k8sNode));
126 break;
127 case K8S_NODE_INCOMPLETE:
128 eventExecutor.execute(() -> processNodeIncompletion(k8sNode));
129 break;
130 default:
131 break;
132 }
133 }
134
135 private void processNodeCompletion(K8sNode k8sNode) {
136 if (!isRelevantHelper()) {
137 return;
138 }
139
140 setRequestTranslationRule(k8sNode, true);
141 setResponseTranslationRule(k8sNode, true);
142 }
143
144 private void processNodeIncompletion(K8sNode k8sNode) {
145 if (!isRelevantHelper()) {
146 return;
147 }
148
149 setRequestTranslationRule(k8sNode, false);
150 setResponseTranslationRule(k8sNode, false);
151 }
152
153 /**
154 * Installs k8s API server rule for receiving all API request packets.
155 *
156 * @param k8sNode kubernetes node
157 * @param install installation flag
158 */
159 private void setRequestTranslationRule(K8sNode k8sNode, boolean install) {
160 TrafficSelector selector = DefaultTrafficSelector.builder()
161 .matchEthType(Ethernet.TYPE_IPV4)
162 .matchIPProtocol(IPv4.PROTOCOL_TCP)
163 .matchIPDst(IpPrefix.valueOf(
164 IpAddress.valueOf(API_SERVER_CLUSTER_IP), PREFIX_LENGTH))
165 .matchTcpDst(TpPort.tpPort(API_SERVER_CLUSTER_PORT))
166 .build();
167
168 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
169 .setIpDst(IpAddress.valueOf(API_SERVER_IP))
170 .setTcpDst(TpPort.tpPort(API_SERVER_PORT))
171 .setOutput(PortNumber.LOCAL)
172 .build();
173
174 k8sFlowRuleService.setRule(
175 appId,
176 k8sNode.intgBridge(),
177 selector,
178 treatment,
179 PRIORITY_TRANSLATION_RULE,
180 STAT_OUTBOUND_TABLE,
181 install
182 );
183 }
184
185 /**
186 * Installs k8s API server rule for receiving all API response packets.
187 *
188 * @param k8sNode kubernetes node
189 * @param install installation flag
190 */
191 private void setResponseTranslationRule(K8sNode k8sNode, boolean install) {
192 TrafficSelector selector = DefaultTrafficSelector.builder()
193 .matchEthType(Ethernet.TYPE_IPV4)
194 .matchIPProtocol(IPv4.PROTOCOL_TCP)
195 .matchIPSrc(IpPrefix.valueOf(
196 IpAddress.valueOf(API_SERVER_IP), PREFIX_LENGTH))
197 .matchTcpSrc(TpPort.tpPort(API_SERVER_PORT))
198 .build();
199
200 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
201 .setIpSrc(IpAddress.valueOf(API_SERVER_CLUSTER_IP))
202 .setTcpSrc(TpPort.tpPort(API_SERVER_CLUSTER_PORT))
203 .transition(FORWARDING_TABLE)
204 .build();
205
206 k8sFlowRuleService.setRule(
207 appId,
208 k8sNode.intgBridge(),
209 selector,
210 treatment,
211 PRIORITY_TRANSLATION_RULE,
212 STAT_OUTBOUND_TABLE,
213 install
214 );
215 }
216 }
217}