blob: 8e80419a7fa518b44c043f50a5bcbc329f420d1f [file] [log] [blame]
Jian Li1b08d652019-05-02 17:28:09 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import org.onlab.packet.ARP;
19import org.onlab.packet.Ethernet;
20import org.onlab.packet.Ip4Address;
21import org.onlab.packet.IpAddress;
22import org.onlab.packet.MacAddress;
23import org.onlab.packet.VlanId;
24import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.LeadershipService;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.core.ApplicationId;
28import org.onosproject.core.CoreService;
29import org.onosproject.k8snetworking.api.K8sFlowRuleService;
30import org.onosproject.k8snode.api.K8sNode;
31import org.onosproject.k8snode.api.K8sNodeAdminService;
32import org.onosproject.k8snode.api.K8sNodeEvent;
33import org.onosproject.k8snode.api.K8sNodeListener;
34import org.onosproject.net.flow.DefaultTrafficSelector;
35import org.onosproject.net.flow.DefaultTrafficTreatment;
36import org.onosproject.net.flow.TrafficSelector;
37import org.onosproject.net.flow.TrafficTreatment;
38import org.onosproject.net.packet.DefaultOutboundPacket;
39import org.onosproject.net.packet.InboundPacket;
40import org.onosproject.net.packet.PacketContext;
41import org.onosproject.net.packet.PacketProcessor;
42import org.onosproject.net.packet.PacketService;
43import org.osgi.service.component.annotations.Activate;
44import org.osgi.service.component.annotations.Component;
45import org.osgi.service.component.annotations.Deactivate;
46import org.osgi.service.component.annotations.Reference;
47import org.osgi.service.component.annotations.ReferenceCardinality;
48import org.slf4j.Logger;
49
50import java.nio.ByteBuffer;
51import java.util.Objects;
52import java.util.Set;
53import java.util.concurrent.ExecutorService;
54import java.util.stream.Collectors;
55
56import static java.lang.Thread.sleep;
57import static java.util.concurrent.Executors.newSingleThreadExecutor;
58import static org.onlab.util.Tools.groupedThreads;
59import static org.onosproject.k8snetworking.api.Constants.EXT_ENTRY_TABLE;
60import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Li44c2b122019-05-03 14:46:34 +090061import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_POD_RULE;
Jian Li1b08d652019-05-02 17:28:09 +090062import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_REPLY_RULE;
63import static org.slf4j.LoggerFactory.getLogger;
64
65/**
66 * Handles ARP request/reply from external gateway.
67 */
68@Component(immediate = true)
69public class K8sRoutingArpHandler {
70 private final Logger log = getLogger(getClass());
71
72 private static final long SLEEP_MS = 5000;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY)
75 protected CoreService coreService;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY)
78 protected PacketService packetService;
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY)
81 protected ClusterService clusterService;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY)
84 protected LeadershipService leadershipService;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected K8sNodeAdminService k8sNodeService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected K8sFlowRuleService k8sFlowRuleService;
91
92 private ApplicationId appId;
93 private NodeId localNodeId;
94
95 private final InternalK8sNodeListener k8sNodeListener = new InternalK8sNodeListener();
96 private final ExecutorService eventExecutor = newSingleThreadExecutor(
97 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
98
99 private final PacketProcessor packetProcessor = new InternalPacketProcessor();
100
101 @Activate
102 protected void activate() {
103 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
104 localNodeId = clusterService.getLocalNode().id();
105 leadershipService.runForLeadership(appId.name());
106 k8sNodeService.addListener(k8sNodeListener);
107 packetService.addProcessor(packetProcessor, PacketProcessor.director(1));
108 log.info("Started");
109 }
110
111 @Deactivate
112 protected void deactivate() {
113 k8sNodeService.removeListener(k8sNodeListener);
114 packetService.removeProcessor(packetProcessor);
115 leadershipService.withdraw(appId.name());
116 eventExecutor.shutdown();
117 log.info("Stopped");
118 }
119
120 private void processArpPacket(PacketContext context, Ethernet ethernet) {
121 ARP arp = (ARP) ethernet.getPayload();
122
123 if (arp.getOpCode() == ARP.OP_REPLY) {
124 IpAddress spa = Ip4Address.valueOf(arp.getSenderProtocolAddress());
125 MacAddress sha = MacAddress.valueOf(arp.getSenderHardwareAddress());
126
Jian Li232a32c2020-02-04 00:32:21 +0900127 log.info("ARP reply from ip: {}, mac: {}", spa, sha);
Jian Li1b08d652019-05-02 17:28:09 +0900128
129 Set<IpAddress> gatewayIps = k8sNodeService.completeNodes().stream()
130 .map(K8sNode::extGatewayIp).collect(Collectors.toSet());
131
132 if (!gatewayIps.contains(spa)) {
133 return;
134 }
135
Jian Li232a32c2020-02-04 00:32:21 +0900136 log.info("ARP reply from external gateway ip: {}, mac: {}", spa, sha);
137
Jian Li1b08d652019-05-02 17:28:09 +0900138 k8sNodeService.completeNodes().stream()
139 .filter(n -> n.extGatewayMac() == null)
140 .forEach(n -> {
141 K8sNode updated = n.updateExtGatewayMac(sha);
142 k8sNodeService.updateNode(updated);
143 });
144 }
145 }
146
147 private void sendArpRequest(K8sNode k8sNode) {
148 MacAddress bridgeMac = k8sNode.extBridgeMac();
149 IpAddress bridgeIp = k8sNode.extBridgeIp();
150 IpAddress extGatewayIp = k8sNode.extGatewayIp();
151 Ethernet ethRequest = ARP.buildArpRequest(bridgeMac.toBytes(), bridgeIp.toOctets(),
152 extGatewayIp.toOctets(), VlanId.NO_VID);
153
154 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
155 .setOutput(k8sNode.extBridgePortNum())
156 .build();
157
158 packetService.emit(new DefaultOutboundPacket(
159 k8sNode.extBridge(),
160 treatment,
161 ByteBuffer.wrap(ethRequest.serialize())));
162 }
163
164 private void setArpReplyRule(K8sNode k8sNode, boolean install) {
165 TrafficSelector selector = DefaultTrafficSelector.builder()
166 .matchEthType(Ethernet.TYPE_ARP)
167 .matchArpOp(ARP.OP_REPLY)
168 .matchArpSpa(Ip4Address.valueOf(k8sNode.extGatewayIp().toString()))
169 .build();
170
171 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
172 .punt()
173 .build();
174
175 k8sFlowRuleService.setRule(
176 appId,
177 k8sNode.extBridge(),
178 selector,
179 treatment,
180 PRIORITY_ARP_REPLY_RULE,
181 EXT_ENTRY_TABLE,
182 install
183 );
184 }
185
Jian Li44c2b122019-05-03 14:46:34 +0900186 private void setPodArpRequestRule(K8sNode k8sNode, boolean install) {
Jian Li89164182020-09-04 19:38:37 +0900187 if (k8sNode.extBridgePortNum() == null) {
188 log.warn("External bridge port is disabled.");
189 return;
190 }
191
Jian Li44c2b122019-05-03 14:46:34 +0900192 TrafficSelector selector = DefaultTrafficSelector.builder()
193 .matchInPort(k8sNode.extToIntgPatchPortNum())
194 .matchEthType(Ethernet.TYPE_ARP)
195 .matchArpOp(ARP.OP_REQUEST)
196 .build();
197
198 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
199 .setOutput(k8sNode.extBridgePortNum())
200 .build();
201
202 k8sFlowRuleService.setRule(
203 appId,
204 k8sNode.extBridge(),
205 selector,
206 treatment,
207 PRIORITY_ARP_POD_RULE,
208 EXT_ENTRY_TABLE,
209 install
210 );
211 }
212
213 private void setPodArpReplyRule(K8sNode k8sNode, boolean install) {
214 TrafficSelector selector = DefaultTrafficSelector.builder()
215 .matchInPort(k8sNode.extBridgePortNum())
216 .matchEthType(Ethernet.TYPE_ARP)
217 .matchArpOp(ARP.OP_REPLY)
218 .build();
219
220 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
221 .setOutput(k8sNode.extToIntgPatchPortNum())
222 .build();
223
224 k8sFlowRuleService.setRule(
225 appId,
226 k8sNode.extBridge(),
227 selector,
228 treatment,
229 PRIORITY_ARP_POD_RULE,
230 EXT_ENTRY_TABLE,
231 install
232 );
233 }
234
Jian Li1b08d652019-05-02 17:28:09 +0900235 private class InternalK8sNodeListener implements K8sNodeListener {
236
237 private boolean isRelevantHelper() {
238 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
239 }
240
241 @Override
242 public void event(K8sNodeEvent event) {
243 switch (event.type()) {
244 case K8S_NODE_COMPLETE:
245 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
246 break;
247 case K8S_NODE_INCOMPLETE:
248 default:
249 break;
250 }
251 }
252
253 private void processNodeCompletion(K8sNode k8sNode) {
254 if (!isRelevantHelper()) {
255 return;
256 }
257
258 setArpReplyRule(k8sNode, true);
Jian Li44c2b122019-05-03 14:46:34 +0900259 setPodArpRequestRule(k8sNode, true);
260 setPodArpReplyRule(k8sNode, true);
Jian Li1b08d652019-05-02 17:28:09 +0900261
262 try {
263 sleep(SLEEP_MS);
264 } catch (InterruptedException e) {
265 log.error("Exception caused during ARP requesting...");
266 }
267
268 sendArpRequest(k8sNode);
269 }
270 }
271
272 private class InternalPacketProcessor implements PacketProcessor {
273
274 @Override
275 public void process(PacketContext context) {
276 if (context.isHandled()) {
277 return;
278 }
279
280 InboundPacket pkt = context.inPacket();
281 Ethernet ethernet = pkt.parsed();
282 if (ethernet != null && ethernet.getEtherType() == Ethernet.TYPE_ARP) {
283 eventExecutor.execute(() -> processArpPacket(context, ethernet));
284 }
285 }
286 }
287}