blob: a518971a8e25b9a02cf0a7a753dbf8edbaf8f1bc [file] [log] [blame]
Jian Li4aa17642019-01-30 00:01:11 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import org.onlab.packet.ARP;
19import org.onlab.packet.EthType;
20import org.onlab.packet.Ethernet;
21import org.onlab.packet.Ip4Address;
22import org.onlab.packet.IpAddress;
23import org.onlab.packet.MacAddress;
24import org.onlab.util.Tools;
25import org.onosproject.cfg.ComponentConfigService;
26import org.onosproject.cfg.ConfigProperty;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.LeadershipService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.k8snetworking.api.K8sFlowRuleService;
33import org.onosproject.k8snetworking.api.K8sNetworkService;
34import org.onosproject.k8snetworking.api.K8sPort;
35import org.onosproject.k8snode.api.K8sNode;
36import org.onosproject.k8snode.api.K8sNodeEvent;
37import org.onosproject.k8snode.api.K8sNodeListener;
38import org.onosproject.k8snode.api.K8sNodeService;
39import org.onosproject.mastership.MastershipService;
40import org.onosproject.net.PortNumber;
41import org.onosproject.net.device.DeviceService;
42import org.onosproject.net.flow.DefaultTrafficSelector;
43import org.onosproject.net.flow.DefaultTrafficTreatment;
44import org.onosproject.net.flow.TrafficSelector;
45import org.onosproject.net.flow.TrafficTreatment;
46import org.onosproject.net.packet.DefaultOutboundPacket;
47import org.onosproject.net.packet.PacketContext;
48import org.onosproject.net.packet.PacketProcessor;
49import org.onosproject.net.packet.PacketService;
50import org.osgi.service.component.ComponentContext;
51import org.osgi.service.component.annotations.Activate;
52import org.osgi.service.component.annotations.Component;
53import org.osgi.service.component.annotations.Deactivate;
54import org.osgi.service.component.annotations.Modified;
55import org.osgi.service.component.annotations.Reference;
56import org.osgi.service.component.annotations.ReferenceCardinality;
57import org.slf4j.Logger;
58import org.slf4j.LoggerFactory;
59
60import java.nio.ByteBuffer;
61import java.util.Dictionary;
62import java.util.Objects;
63import java.util.Set;
64import java.util.concurrent.ExecutorService;
65
66import static java.util.concurrent.Executors.newSingleThreadExecutor;
67import static org.onlab.util.Tools.groupedThreads;
68import static org.onosproject.k8snetworking.api.Constants.ARP_BROADCAST_MODE;
69import static org.onosproject.k8snetworking.api.Constants.ARP_PROXY_MODE;
70import static org.onosproject.k8snetworking.api.Constants.ARP_TABLE;
71import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
72import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_CONTROL_RULE;
73import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE;
74import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE_DEFAULT;
75import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC;
76import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC_DEFAULT;
77import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getPropertyValue;
Jian Li004526d2019-02-25 16:26:27 +090078import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.unshiftIpDomain;
Jian Li4aa17642019-01-30 00:01:11 +090079
80/**
81 * Handles ARP packet from containers.
82 */
83@Component(
84 immediate = true,
85 property = {
86 GATEWAY_MAC + "=" + GATEWAY_MAC_DEFAULT,
87 ARP_MODE + "=" + ARP_MODE_DEFAULT
88 }
89)
90public class K8sSwitchingArpHandler {
91
92 private final Logger log = LoggerFactory.getLogger(getClass());
93
Jian Lid89db462019-02-08 18:21:57 +090094 private static final String API_SERVER_CLUSTER_IP = "10.96.0.1";
95
Jian Li4aa17642019-01-30 00:01:11 +090096 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected CoreService coreService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected PacketService packetService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 protected ComponentConfigService configService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
106 protected ClusterService clusterService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
109 protected LeadershipService leadershipService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected DeviceService deviceService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
115 protected MastershipService mastershipService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
118 protected K8sNodeService k8sNodeService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
121 protected K8sNetworkService k8sNetworkService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
124 protected K8sFlowRuleService k8sFlowRuleService;
125
126 /** Fake MAC address for virtual network subnet gateway. */
127 private String gatewayMac = GATEWAY_MAC_DEFAULT;
128
129 /** ARP processing mode, broadcast | proxy (default). */
130 protected String arpMode = ARP_MODE_DEFAULT;
131
132 private MacAddress gwMacAddress;
133
134 private final ExecutorService eventExecutor = newSingleThreadExecutor(
135 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
136
137 private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
138 private final InternalNodeEventListener k8sNodeListener = new InternalNodeEventListener();
139
140 private ApplicationId appId;
141 private NodeId localNodeId;
142
143 @Activate
144 void activate() {
145 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
146 configService.registerProperties(getClass());
147 localNodeId = clusterService.getLocalNode().id();
148 leadershipService.runForLeadership(appId.name());
149 k8sNodeService.addListener(k8sNodeListener);
150 packetService.addProcessor(packetProcessor, PacketProcessor.director(0));
151
152 log.info("Started");
153 }
154
155 @Deactivate
156 void deactivate() {
157 packetService.removeProcessor(packetProcessor);
158 k8sNodeService.removeListener(k8sNodeListener);
159 leadershipService.withdraw(appId.name());
160 configService.unregisterProperties(getClass(), false);
161 eventExecutor.shutdown();
162
163 log.info("Stopped");
164 }
165
166 @Modified
167 void modified(ComponentContext context) {
168 readComponentConfiguration(context);
169
170 log.info("Modified");
171 }
172
173 /**
174 * Processes ARP request packets.
175 *
176 * @param context packet context
177 * @param ethPacket ethernet packet
178 */
179 private void processPacketIn(PacketContext context, Ethernet ethPacket) {
180 // if the ARP mode is configured as broadcast mode, we simply ignore ARP packet_in
181 if (ARP_BROADCAST_MODE.equals(getArpMode())) {
182 return;
183 }
184
185 ARP arpPacket = (ARP) ethPacket.getPayload();
186 if (arpPacket.getOpCode() != ARP.OP_REQUEST) {
187 return;
188 }
189
190 K8sPort srcPort = k8sNetworkService.ports().stream()
191 .filter(p -> p.macAddress().equals(ethPacket.getSourceMAC()))
192 .findAny().orElse(null);
193
194 if (srcPort == null && !context.inPacket().receivedFrom().port()
195 .equals(PortNumber.LOCAL)) {
196 log.warn("Failed to find source port(MAC:{})", ethPacket.getSourceMAC());
197 return;
198 }
199
200 // FIXME: this is a workaround for storing host GW MAC address,
201 // need to find a way to store the MAC address in persistent way
202 if (context.inPacket().receivedFrom().port().equals(PortNumber.LOCAL)) {
203 gwMacAddress = ethPacket.getSourceMAC();
204 }
205
206 IpAddress targetIp = Ip4Address.valueOf(arpPacket.getTargetProtocolAddress());
207
208 MacAddress replyMac = k8sNetworkService.ports().stream()
209 // .filter(p -> p.networkId().equals(srcPort.networkId()))
210 .filter(p -> p.ipAddress().equals(targetIp))
211 .map(K8sPort::macAddress)
212 .findAny().orElse(null);
213
214 long gwIpCnt = k8sNetworkService.networks().stream()
215 .filter(n -> n.gatewayIp().equals(targetIp))
216 .count();
217
Jian Lid89db462019-02-08 18:21:57 +0900218 if (gwIpCnt > 0 || targetIp.equals(IpAddress.valueOf(API_SERVER_CLUSTER_IP))) {
Jian Li4aa17642019-01-30 00:01:11 +0900219 replyMac = gwMacAddress;
220 }
221
222 if (replyMac == null) {
Jian Li004526d2019-02-25 16:26:27 +0900223 Set<String> unshiftedIps = unshiftIpDomain(targetIp.toString(), k8sNetworkService);
224 for (String ip : unshiftedIps) {
225 replyMac = k8sNetworkService.ports().stream()
226 .filter(p -> p.ipAddress().equals(IpAddress.valueOf(ip)))
227 .map(K8sPort::macAddress)
228 .findAny().orElse(null);
229
230 if (replyMac != null) {
231 break;
232 }
233 }
234 }
235
236 if (replyMac == null) {
Jian Li4aa17642019-01-30 00:01:11 +0900237 log.debug("Failed to find MAC address for {}", targetIp);
238 return;
239 }
240
241 Ethernet ethReply = ARP.buildArpReply(
242 targetIp.getIp4Address(),
243 replyMac,
244 ethPacket);
245
246 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
247 .setOutput(context.inPacket().receivedFrom().port())
248 .build();
249
250 packetService.emit(new DefaultOutboundPacket(
251 context.inPacket().receivedFrom().deviceId(),
252 treatment,
253 ByteBuffer.wrap(ethReply.serialize())));
254 }
255
256 private String getArpMode() {
257 Set<ConfigProperty> properties = configService.getProperties(this.getClass().getName());
258 return getPropertyValue(properties, ARP_MODE);
259 }
260
261 /**
262 * Extracts properties from the component configuration context.
263 *
264 * @param context the component context
265 */
266 private void readComponentConfiguration(ComponentContext context) {
267 Dictionary<?, ?> properties = context.getProperties();
268
269 String updatedMac = Tools.get(properties, GATEWAY_MAC);
270 gatewayMac = updatedMac != null ? updatedMac : GATEWAY_MAC_DEFAULT;
271 log.info("Configured. Gateway MAC is {}", gatewayMac);
272 }
273
274 /**
275 * An internal packet processor which processes ARP request, and results in
276 * packet-out ARP reply.
277 */
278 private class InternalPacketProcessor implements PacketProcessor {
279
280 @Override
281 public void process(PacketContext context) {
282 if (context.isHandled()) {
283 return;
284 }
285
286 Ethernet ethPacket = context.inPacket().parsed();
287 if (ethPacket == null || ethPacket.getEtherType() != Ethernet.TYPE_ARP) {
288 return;
289 }
290
291 eventExecutor.execute(() -> processPacketIn(context, ethPacket));
292 }
293 }
294
295 /**
296 * An internal kubernetes node listener which is used for listening kubernetes
297 * node activity. As long as a node is in complete state, we will install
298 * default ARP rule to handle ARP request.
299 */
300 private class InternalNodeEventListener implements K8sNodeListener {
301
302 private boolean isRelevantHelper() {
303 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
304 }
305
306 @Override
307 public void event(K8sNodeEvent event) {
308 K8sNode k8sNode = event.subject();
309 switch (event.type()) {
310 case K8S_NODE_COMPLETE:
311 eventExecutor.execute(() -> processNodeCompletion(k8sNode));
312 break;
313 case K8S_NODE_INCOMPLETE:
314 eventExecutor.execute(() -> processNodeIncompletion(k8sNode));
315 break;
316 default:
317 break;
318 }
319 }
320
321 private void processNodeCompletion(K8sNode node) {
322 if (!isRelevantHelper()) {
323 return;
324 }
325
326 setDefaultArpRule(node, true);
327 }
328
329 private void processNodeIncompletion(K8sNode node) {
330 if (!isRelevantHelper()) {
331 return;
332 }
333
334 setDefaultArpRule(node, false);
335 }
336
337 private void setDefaultArpRule(K8sNode node, boolean install) {
338
339 if (getArpMode() == null) {
340 return;
341 }
342
343 switch (getArpMode()) {
344 case ARP_PROXY_MODE:
345 setDefaultArpRuleForProxyMode(node, install);
346 break;
347 case ARP_BROADCAST_MODE:
348 // TODO: need to implement broadcast mode
349 log.warn("Not implemented yet.");
350 break;
351 default:
352 log.warn("Invalid ARP mode {}. Please use either " +
353 "broadcast or proxy mode.", getArpMode());
354 break;
355 }
356 }
357
358 private void setDefaultArpRuleForProxyMode(K8sNode node, boolean install) {
359 TrafficSelector selector = DefaultTrafficSelector.builder()
360 .matchEthType(EthType.EtherType.ARP.ethType().toShort())
361 .build();
362
363 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
364 .punt()
365 .build();
366
367 k8sFlowRuleService.setRule(
368 appId,
369 node.intgBridge(),
370 selector,
371 treatment,
372 PRIORITY_ARP_CONTROL_RULE,
373 ARP_TABLE,
374 install
375 );
376 }
377 }
378}