blob: 1bf2150ff2e6c6b8bb08a6f6697a60b8f721b966 [file] [log] [blame]
Jian Li4aa17642019-01-30 00:01:11 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import org.onlab.packet.ARP;
19import org.onlab.packet.EthType;
20import org.onlab.packet.Ethernet;
21import org.onlab.packet.Ip4Address;
22import org.onlab.packet.IpAddress;
23import org.onlab.packet.MacAddress;
24import org.onlab.util.Tools;
25import org.onosproject.cfg.ComponentConfigService;
26import org.onosproject.cfg.ConfigProperty;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.LeadershipService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.k8snetworking.api.K8sFlowRuleService;
33import org.onosproject.k8snetworking.api.K8sNetworkService;
34import org.onosproject.k8snetworking.api.K8sPort;
35import org.onosproject.k8snode.api.K8sNode;
36import org.onosproject.k8snode.api.K8sNodeEvent;
37import org.onosproject.k8snode.api.K8sNodeListener;
38import org.onosproject.k8snode.api.K8sNodeService;
39import org.onosproject.mastership.MastershipService;
40import org.onosproject.net.PortNumber;
41import org.onosproject.net.device.DeviceService;
42import org.onosproject.net.flow.DefaultTrafficSelector;
43import org.onosproject.net.flow.DefaultTrafficTreatment;
44import org.onosproject.net.flow.TrafficSelector;
45import org.onosproject.net.flow.TrafficTreatment;
46import org.onosproject.net.packet.DefaultOutboundPacket;
47import org.onosproject.net.packet.PacketContext;
48import org.onosproject.net.packet.PacketProcessor;
49import org.onosproject.net.packet.PacketService;
50import org.osgi.service.component.ComponentContext;
51import org.osgi.service.component.annotations.Activate;
52import org.osgi.service.component.annotations.Component;
53import org.osgi.service.component.annotations.Deactivate;
54import org.osgi.service.component.annotations.Modified;
55import org.osgi.service.component.annotations.Reference;
56import org.osgi.service.component.annotations.ReferenceCardinality;
57import org.slf4j.Logger;
58import org.slf4j.LoggerFactory;
59
60import java.nio.ByteBuffer;
61import java.util.Dictionary;
62import java.util.Objects;
63import java.util.Set;
64import java.util.concurrent.ExecutorService;
65
66import static java.util.concurrent.Executors.newSingleThreadExecutor;
67import static org.onlab.util.Tools.groupedThreads;
68import static org.onosproject.k8snetworking.api.Constants.ARP_BROADCAST_MODE;
69import static org.onosproject.k8snetworking.api.Constants.ARP_PROXY_MODE;
70import static org.onosproject.k8snetworking.api.Constants.ARP_TABLE;
71import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
72import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_CONTROL_RULE;
73import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE;
74import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE_DEFAULT;
75import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC;
76import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC_DEFAULT;
77import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getPropertyValue;
78
79/**
80 * Handles ARP packet from containers.
81 */
82@Component(
83 immediate = true,
84 property = {
85 GATEWAY_MAC + "=" + GATEWAY_MAC_DEFAULT,
86 ARP_MODE + "=" + ARP_MODE_DEFAULT
87 }
88)
89public class K8sSwitchingArpHandler {
90
91 private final Logger log = LoggerFactory.getLogger(getClass());
92
Jian Lid89db462019-02-08 18:21:57 +090093 private static final String API_SERVER_CLUSTER_IP = "10.96.0.1";
94
Jian Li4aa17642019-01-30 00:01:11 +090095 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected CoreService coreService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected PacketService packetService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected ComponentConfigService configService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected ClusterService clusterService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected LeadershipService leadershipService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
111 protected DeviceService deviceService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY)
114 protected MastershipService mastershipService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY)
117 protected K8sNodeService k8sNodeService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY)
120 protected K8sNetworkService k8sNetworkService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY)
123 protected K8sFlowRuleService k8sFlowRuleService;
124
125 /** Fake MAC address for virtual network subnet gateway. */
126 private String gatewayMac = GATEWAY_MAC_DEFAULT;
127
128 /** ARP processing mode, broadcast | proxy (default). */
129 protected String arpMode = ARP_MODE_DEFAULT;
130
131 private MacAddress gwMacAddress;
132
133 private final ExecutorService eventExecutor = newSingleThreadExecutor(
134 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
135
136 private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
137 private final InternalNodeEventListener k8sNodeListener = new InternalNodeEventListener();
138
139 private ApplicationId appId;
140 private NodeId localNodeId;
141
142 @Activate
143 void activate() {
144 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
145 configService.registerProperties(getClass());
146 localNodeId = clusterService.getLocalNode().id();
147 leadershipService.runForLeadership(appId.name());
148 k8sNodeService.addListener(k8sNodeListener);
149 packetService.addProcessor(packetProcessor, PacketProcessor.director(0));
150
151 log.info("Started");
152 }
153
154 @Deactivate
155 void deactivate() {
156 packetService.removeProcessor(packetProcessor);
157 k8sNodeService.removeListener(k8sNodeListener);
158 leadershipService.withdraw(appId.name());
159 configService.unregisterProperties(getClass(), false);
160 eventExecutor.shutdown();
161
162 log.info("Stopped");
163 }
164
165 @Modified
166 void modified(ComponentContext context) {
167 readComponentConfiguration(context);
168
169 log.info("Modified");
170 }
171
172 /**
173 * Processes ARP request packets.
174 *
175 * @param context packet context
176 * @param ethPacket ethernet packet
177 */
178 private void processPacketIn(PacketContext context, Ethernet ethPacket) {
179 // if the ARP mode is configured as broadcast mode, we simply ignore ARP packet_in
180 if (ARP_BROADCAST_MODE.equals(getArpMode())) {
181 return;
182 }
183
184 ARP arpPacket = (ARP) ethPacket.getPayload();
185 if (arpPacket.getOpCode() != ARP.OP_REQUEST) {
186 return;
187 }
188
189 K8sPort srcPort = k8sNetworkService.ports().stream()
190 .filter(p -> p.macAddress().equals(ethPacket.getSourceMAC()))
191 .findAny().orElse(null);
192
193 if (srcPort == null && !context.inPacket().receivedFrom().port()
194 .equals(PortNumber.LOCAL)) {
195 log.warn("Failed to find source port(MAC:{})", ethPacket.getSourceMAC());
196 return;
197 }
198
199 // FIXME: this is a workaround for storing host GW MAC address,
200 // need to find a way to store the MAC address in persistent way
201 if (context.inPacket().receivedFrom().port().equals(PortNumber.LOCAL)) {
202 gwMacAddress = ethPacket.getSourceMAC();
203 }
204
205 IpAddress targetIp = Ip4Address.valueOf(arpPacket.getTargetProtocolAddress());
206
207 MacAddress replyMac = k8sNetworkService.ports().stream()
208 // .filter(p -> p.networkId().equals(srcPort.networkId()))
209 .filter(p -> p.ipAddress().equals(targetIp))
210 .map(K8sPort::macAddress)
211 .findAny().orElse(null);
212
213 long gwIpCnt = k8sNetworkService.networks().stream()
214 .filter(n -> n.gatewayIp().equals(targetIp))
215 .count();
216
Jian Lid89db462019-02-08 18:21:57 +0900217 if (gwIpCnt > 0 || targetIp.equals(IpAddress.valueOf(API_SERVER_CLUSTER_IP))) {
Jian Li4aa17642019-01-30 00:01:11 +0900218 replyMac = gwMacAddress;
219 }
220
221 if (replyMac == null) {
222 log.debug("Failed to find MAC address for {}", targetIp);
223 return;
224 }
225
226 Ethernet ethReply = ARP.buildArpReply(
227 targetIp.getIp4Address(),
228 replyMac,
229 ethPacket);
230
231 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
232 .setOutput(context.inPacket().receivedFrom().port())
233 .build();
234
235 packetService.emit(new DefaultOutboundPacket(
236 context.inPacket().receivedFrom().deviceId(),
237 treatment,
238 ByteBuffer.wrap(ethReply.serialize())));
239 }
240
241 private String getArpMode() {
242 Set<ConfigProperty> properties = configService.getProperties(this.getClass().getName());
243 return getPropertyValue(properties, ARP_MODE);
244 }
245
246 /**
247 * Extracts properties from the component configuration context.
248 *
249 * @param context the component context
250 */
251 private void readComponentConfiguration(ComponentContext context) {
252 Dictionary<?, ?> properties = context.getProperties();
253
254 String updatedMac = Tools.get(properties, GATEWAY_MAC);
255 gatewayMac = updatedMac != null ? updatedMac : GATEWAY_MAC_DEFAULT;
256 log.info("Configured. Gateway MAC is {}", gatewayMac);
257 }
258
259 /**
260 * An internal packet processor which processes ARP request, and results in
261 * packet-out ARP reply.
262 */
263 private class InternalPacketProcessor implements PacketProcessor {
264
265 @Override
266 public void process(PacketContext context) {
267 if (context.isHandled()) {
268 return;
269 }
270
271 Ethernet ethPacket = context.inPacket().parsed();
272 if (ethPacket == null || ethPacket.getEtherType() != Ethernet.TYPE_ARP) {
273 return;
274 }
275
276 eventExecutor.execute(() -> processPacketIn(context, ethPacket));
277 }
278 }
279
280 /**
281 * An internal kubernetes node listener which is used for listening kubernetes
282 * node activity. As long as a node is in complete state, we will install
283 * default ARP rule to handle ARP request.
284 */
285 private class InternalNodeEventListener implements K8sNodeListener {
286
287 private boolean isRelevantHelper() {
288 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
289 }
290
291 @Override
292 public void event(K8sNodeEvent event) {
293 K8sNode k8sNode = event.subject();
294 switch (event.type()) {
295 case K8S_NODE_COMPLETE:
296 eventExecutor.execute(() -> processNodeCompletion(k8sNode));
297 break;
298 case K8S_NODE_INCOMPLETE:
299 eventExecutor.execute(() -> processNodeIncompletion(k8sNode));
300 break;
301 default:
302 break;
303 }
304 }
305
306 private void processNodeCompletion(K8sNode node) {
307 if (!isRelevantHelper()) {
308 return;
309 }
310
311 setDefaultArpRule(node, true);
312 }
313
314 private void processNodeIncompletion(K8sNode node) {
315 if (!isRelevantHelper()) {
316 return;
317 }
318
319 setDefaultArpRule(node, false);
320 }
321
322 private void setDefaultArpRule(K8sNode node, boolean install) {
323
324 if (getArpMode() == null) {
325 return;
326 }
327
328 switch (getArpMode()) {
329 case ARP_PROXY_MODE:
330 setDefaultArpRuleForProxyMode(node, install);
331 break;
332 case ARP_BROADCAST_MODE:
333 // TODO: need to implement broadcast mode
334 log.warn("Not implemented yet.");
335 break;
336 default:
337 log.warn("Invalid ARP mode {}. Please use either " +
338 "broadcast or proxy mode.", getArpMode());
339 break;
340 }
341 }
342
343 private void setDefaultArpRuleForProxyMode(K8sNode node, boolean install) {
344 TrafficSelector selector = DefaultTrafficSelector.builder()
345 .matchEthType(EthType.EtherType.ARP.ethType().toShort())
346 .build();
347
348 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
349 .punt()
350 .build();
351
352 k8sFlowRuleService.setRule(
353 appId,
354 node.intgBridge(),
355 selector,
356 treatment,
357 PRIORITY_ARP_CONTROL_RULE,
358 ARP_TABLE,
359 install
360 );
361 }
362 }
363}