blob: 1514f6c5e67cf6733251b501fb2ecf5309330f5b [file] [log] [blame]
Jian Li1b08d652019-05-02 17:28:09 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import org.onlab.packet.ARP;
19import org.onlab.packet.Ethernet;
20import org.onlab.packet.Ip4Address;
21import org.onlab.packet.IpAddress;
22import org.onlab.packet.MacAddress;
23import org.onlab.packet.VlanId;
24import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.LeadershipService;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.core.ApplicationId;
28import org.onosproject.core.CoreService;
29import org.onosproject.k8snetworking.api.K8sFlowRuleService;
Jian Lieab51352020-09-11 03:29:16 +090030import org.onosproject.k8snode.api.K8sHostAdminService;
Jian Li1b08d652019-05-02 17:28:09 +090031import org.onosproject.k8snode.api.K8sNode;
32import org.onosproject.k8snode.api.K8sNodeAdminService;
33import org.onosproject.k8snode.api.K8sNodeEvent;
34import org.onosproject.k8snode.api.K8sNodeListener;
Jian Lieab51352020-09-11 03:29:16 +090035import org.onosproject.net.DeviceId;
Jian Li1b08d652019-05-02 17:28:09 +090036import org.onosproject.net.flow.DefaultTrafficSelector;
37import org.onosproject.net.flow.DefaultTrafficTreatment;
38import org.onosproject.net.flow.TrafficSelector;
39import org.onosproject.net.flow.TrafficTreatment;
40import org.onosproject.net.packet.DefaultOutboundPacket;
41import org.onosproject.net.packet.InboundPacket;
42import org.onosproject.net.packet.PacketContext;
43import org.onosproject.net.packet.PacketProcessor;
44import org.onosproject.net.packet.PacketService;
45import org.osgi.service.component.annotations.Activate;
46import org.osgi.service.component.annotations.Component;
47import org.osgi.service.component.annotations.Deactivate;
48import org.osgi.service.component.annotations.Reference;
49import org.osgi.service.component.annotations.ReferenceCardinality;
50import org.slf4j.Logger;
51
52import java.nio.ByteBuffer;
53import java.util.Objects;
54import java.util.Set;
55import java.util.concurrent.ExecutorService;
56import java.util.stream.Collectors;
57
58import static java.lang.Thread.sleep;
59import static java.util.concurrent.Executors.newSingleThreadExecutor;
60import static org.onlab.util.Tools.groupedThreads;
61import static org.onosproject.k8snetworking.api.Constants.EXT_ENTRY_TABLE;
62import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Li44c2b122019-05-03 14:46:34 +090063import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_POD_RULE;
Jian Li1b08d652019-05-02 17:28:09 +090064import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_REPLY_RULE;
Jian Lieab51352020-09-11 03:29:16 +090065import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.allK8sDevices;
Jian Li1b08d652019-05-02 17:28:09 +090066import static org.slf4j.LoggerFactory.getLogger;
67
68/**
69 * Handles ARP request/reply from external gateway.
70 */
71@Component(immediate = true)
72public class K8sRoutingArpHandler {
73 private final Logger log = getLogger(getClass());
74
75 private static final long SLEEP_MS = 5000;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY)
78 protected CoreService coreService;
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY)
81 protected PacketService packetService;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY)
84 protected ClusterService clusterService;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected LeadershipService leadershipService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected K8sNodeAdminService k8sNodeService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lieab51352020-09-11 03:29:16 +090093 protected K8sHostAdminService k8sHostService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li1b08d652019-05-02 17:28:09 +090096 protected K8sFlowRuleService k8sFlowRuleService;
97
98 private ApplicationId appId;
99 private NodeId localNodeId;
100
101 private final InternalK8sNodeListener k8sNodeListener = new InternalK8sNodeListener();
102 private final ExecutorService eventExecutor = newSingleThreadExecutor(
103 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
104
105 private final PacketProcessor packetProcessor = new InternalPacketProcessor();
106
107 @Activate
108 protected void activate() {
109 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
110 localNodeId = clusterService.getLocalNode().id();
111 leadershipService.runForLeadership(appId.name());
112 k8sNodeService.addListener(k8sNodeListener);
113 packetService.addProcessor(packetProcessor, PacketProcessor.director(1));
114 log.info("Started");
115 }
116
117 @Deactivate
118 protected void deactivate() {
119 k8sNodeService.removeListener(k8sNodeListener);
120 packetService.removeProcessor(packetProcessor);
121 leadershipService.withdraw(appId.name());
122 eventExecutor.shutdown();
123 log.info("Stopped");
124 }
125
126 private void processArpPacket(PacketContext context, Ethernet ethernet) {
Jian Lieab51352020-09-11 03:29:16 +0900127
128 DeviceId deviceId = context.inPacket().receivedFrom().deviceId();
129
130 if (!allK8sDevices(k8sNodeService, k8sHostService).contains(deviceId)) {
131 return;
132 }
133
Jian Li1b08d652019-05-02 17:28:09 +0900134 ARP arp = (ARP) ethernet.getPayload();
135
136 if (arp.getOpCode() == ARP.OP_REPLY) {
137 IpAddress spa = Ip4Address.valueOf(arp.getSenderProtocolAddress());
138 MacAddress sha = MacAddress.valueOf(arp.getSenderHardwareAddress());
139
Jian Li1efcb982020-02-04 00:32:21 +0900140 log.info("ARP reply from ip: {}, mac: {}", spa, sha);
Jian Li1b08d652019-05-02 17:28:09 +0900141
142 Set<IpAddress> gatewayIps = k8sNodeService.completeNodes().stream()
143 .map(K8sNode::extGatewayIp).collect(Collectors.toSet());
144
145 if (!gatewayIps.contains(spa)) {
146 return;
147 }
148
Jian Li1efcb982020-02-04 00:32:21 +0900149 log.info("ARP reply from external gateway ip: {}, mac: {}", spa, sha);
150
Jian Li1b08d652019-05-02 17:28:09 +0900151 k8sNodeService.completeNodes().stream()
152 .filter(n -> n.extGatewayMac() == null)
153 .forEach(n -> {
154 K8sNode updated = n.updateExtGatewayMac(sha);
155 k8sNodeService.updateNode(updated);
156 });
157 }
158 }
159
160 private void sendArpRequest(K8sNode k8sNode) {
161 MacAddress bridgeMac = k8sNode.extBridgeMac();
162 IpAddress bridgeIp = k8sNode.extBridgeIp();
163 IpAddress extGatewayIp = k8sNode.extGatewayIp();
164 Ethernet ethRequest = ARP.buildArpRequest(bridgeMac.toBytes(), bridgeIp.toOctets(),
165 extGatewayIp.toOctets(), VlanId.NO_VID);
166
167 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
168 .setOutput(k8sNode.extBridgePortNum())
169 .build();
170
171 packetService.emit(new DefaultOutboundPacket(
172 k8sNode.extBridge(),
173 treatment,
174 ByteBuffer.wrap(ethRequest.serialize())));
175 }
176
177 private void setArpReplyRule(K8sNode k8sNode, boolean install) {
178 TrafficSelector selector = DefaultTrafficSelector.builder()
179 .matchEthType(Ethernet.TYPE_ARP)
180 .matchArpOp(ARP.OP_REPLY)
Jian Li1b08d652019-05-02 17:28:09 +0900181 .build();
182
183 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
184 .punt()
185 .build();
186
187 k8sFlowRuleService.setRule(
188 appId,
189 k8sNode.extBridge(),
190 selector,
191 treatment,
192 PRIORITY_ARP_REPLY_RULE,
193 EXT_ENTRY_TABLE,
194 install
195 );
196 }
197
Jian Li44c2b122019-05-03 14:46:34 +0900198 private void setPodArpRequestRule(K8sNode k8sNode, boolean install) {
Jian Li5abc9f02020-09-04 19:38:37 +0900199 if (k8sNode.extBridgePortNum() == null) {
200 log.warn("External bridge port is disabled.");
201 return;
202 }
203
Jian Li44c2b122019-05-03 14:46:34 +0900204 TrafficSelector selector = DefaultTrafficSelector.builder()
205 .matchInPort(k8sNode.extToIntgPatchPortNum())
206 .matchEthType(Ethernet.TYPE_ARP)
207 .matchArpOp(ARP.OP_REQUEST)
208 .build();
209
210 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
211 .setOutput(k8sNode.extBridgePortNum())
212 .build();
213
214 k8sFlowRuleService.setRule(
215 appId,
216 k8sNode.extBridge(),
217 selector,
218 treatment,
219 PRIORITY_ARP_POD_RULE,
220 EXT_ENTRY_TABLE,
221 install
222 );
223 }
224
225 private void setPodArpReplyRule(K8sNode k8sNode, boolean install) {
226 TrafficSelector selector = DefaultTrafficSelector.builder()
227 .matchInPort(k8sNode.extBridgePortNum())
228 .matchEthType(Ethernet.TYPE_ARP)
229 .matchArpOp(ARP.OP_REPLY)
230 .build();
231
232 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
233 .setOutput(k8sNode.extToIntgPatchPortNum())
234 .build();
235
236 k8sFlowRuleService.setRule(
237 appId,
238 k8sNode.extBridge(),
239 selector,
240 treatment,
241 PRIORITY_ARP_POD_RULE,
242 EXT_ENTRY_TABLE,
243 install
244 );
245 }
246
Jian Li1b08d652019-05-02 17:28:09 +0900247 private class InternalK8sNodeListener implements K8sNodeListener {
248
249 private boolean isRelevantHelper() {
250 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
251 }
252
253 @Override
254 public void event(K8sNodeEvent event) {
255 switch (event.type()) {
256 case K8S_NODE_COMPLETE:
257 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
258 break;
259 case K8S_NODE_INCOMPLETE:
260 default:
261 break;
262 }
263 }
264
265 private void processNodeCompletion(K8sNode k8sNode) {
266 if (!isRelevantHelper()) {
267 return;
268 }
269
270 setArpReplyRule(k8sNode, true);
Jian Li44c2b122019-05-03 14:46:34 +0900271 setPodArpRequestRule(k8sNode, true);
272 setPodArpReplyRule(k8sNode, true);
Jian Li1b08d652019-05-02 17:28:09 +0900273
274 try {
275 sleep(SLEEP_MS);
276 } catch (InterruptedException e) {
277 log.error("Exception caused during ARP requesting...");
278 }
279
280 sendArpRequest(k8sNode);
281 }
282 }
283
284 private class InternalPacketProcessor implements PacketProcessor {
285
286 @Override
287 public void process(PacketContext context) {
288 if (context.isHandled()) {
289 return;
290 }
291
292 InboundPacket pkt = context.inPacket();
293 Ethernet ethernet = pkt.parsed();
294 if (ethernet != null && ethernet.getEtherType() == Ethernet.TYPE_ARP) {
295 eventExecutor.execute(() -> processArpPacket(context, ethernet));
296 }
297 }
298 }
299}