blob: 235407611dd8d959928a84e7b86beb77f29db182 [file] [log] [blame]
Jian Lia1186772018-07-27 18:06:41 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktroubleshoot.impl;
17
Jian Li0b93b002018-07-31 13:41:08 +090018import com.google.common.collect.Sets;
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onlab.packet.DeserializationException;
26import org.onlab.packet.Ethernet;
27import org.onlab.packet.ICMP;
28import org.onlab.packet.ICMPEcho;
29import org.onlab.packet.IPv4;
30import org.onlab.packet.IpAddress;
31import org.onlab.packet.IpPrefix;
32import org.onlab.util.KryoNamespace;
33import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.LeadershipService;
35import org.onosproject.cluster.NodeId;
36import org.onosproject.core.ApplicationId;
37import org.onosproject.core.CoreService;
38import org.onosproject.mastership.MastershipService;
39import org.onosproject.net.flow.DefaultTrafficSelector;
40import org.onosproject.net.flow.DefaultTrafficTreatment;
41import org.onosproject.net.flow.FlowEntry;
42import org.onosproject.net.flow.FlowRuleService;
43import org.onosproject.net.flow.TrafficSelector;
44import org.onosproject.net.flow.TrafficTreatment;
45import org.onosproject.net.flow.criteria.IPCriterion;
46import org.onosproject.net.packet.DefaultOutboundPacket;
47import org.onosproject.net.packet.InboundPacket;
48import org.onosproject.net.packet.OutboundPacket;
49import org.onosproject.net.packet.PacketContext;
50import org.onosproject.net.packet.PacketProcessor;
51import org.onosproject.net.packet.PacketService;
52import org.onosproject.openstacknetworking.api.InstancePort;
53import org.onosproject.openstacknetworking.api.InstancePortService;
54import org.onosproject.openstacknetworking.api.OpenstackFlowRuleService;
55import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
56import org.onosproject.openstacknode.api.OpenstackNodeService;
Jian Lia1186772018-07-27 18:06:41 +090057import org.onosproject.openstacktroubleshoot.api.OpenstackTroubleshootService;
Jian Li0b93b002018-07-31 13:41:08 +090058import org.onosproject.openstacktroubleshoot.api.Reachability;
59import org.onosproject.store.serializers.KryoNamespaces;
60import org.onosproject.store.service.AtomicCounter;
61import org.onosproject.store.service.ConsistentMap;
62import org.onosproject.store.service.Serializer;
63import org.onosproject.store.service.StorageService;
64import org.slf4j.Logger;
65import org.slf4j.LoggerFactory;
66
67import java.nio.ByteBuffer;
68import java.util.Map;
69import java.util.Set;
70import java.util.concurrent.ExecutorService;
71import java.util.function.BooleanSupplier;
72import java.util.function.Predicate;
73import java.util.stream.Collectors;
74
75import static com.google.common.base.Preconditions.checkNotNull;
76import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
77import static org.onlab.packet.Ethernet.TYPE_IPV4;
78import static org.onlab.packet.ICMP.TYPE_ECHO_REPLY;
79import static org.onlab.packet.ICMP.TYPE_ECHO_REQUEST;
80import static org.onlab.util.Tools.groupedThreads;
81import static org.onosproject.net.PortNumber.TABLE;
82import static org.onosproject.net.flow.FlowEntry.FlowEntryState.ADDED;
83import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_DST;
84import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_SRC;
85import static org.onosproject.openstacknetworking.api.Constants.ACL_TABLE;
86import static org.onosproject.openstacknetworking.api.Constants.DEFAULT_GATEWAY_MAC;
87import static org.onosproject.openstacknetworking.api.Constants.FORWARDING_TABLE;
88import static org.onosproject.openstacknetworking.api.Constants.OPENSTACK_NETWORKING_APP_ID;
89import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_ICMP_PROBE_RULE;
90import static org.onosproject.openstacknetworking.api.Constants.VTAG_TABLE;
91import static org.onosproject.openstacknetworking.api.InstancePort.State.ACTIVE;
92import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
93import static org.onosproject.openstacktroubleshoot.util.OpenstackTroubleshootUtil.getSegId;
Jian Lia1186772018-07-27 18:06:41 +090094
95/**
96 * Implementation of openstack troubleshoot app.
97 */
Jian Li0b93b002018-07-31 13:41:08 +090098@Component(immediate = true)
99@Service
Jian Lia1186772018-07-27 18:06:41 +0900100public class OpenstackTroubleshootManager implements OpenstackTroubleshootService {
Jian Li0b93b002018-07-31 13:41:08 +0900101
102 private final Logger log = LoggerFactory.getLogger(getClass());
103
104 private static final int VID_TAG_RULE_INSTALL_TIMEOUT_MS = 1000;
105 private static final int ICMP_RULE_INSTALL_TIMEOUT_MS = 1000;
106 private static final int ICMP_REPLY_TIMEOUT_MS = 3000;
107 private static final String SERIALIZER_NAME = "openstack-troubleshoot";
108 private static final byte TTL = 64;
109 private static final short INITIAL_SEQ = 1;
110 private static final short MAX_ICMP_GEN = 3;
111 private static final int PREFIX_LENGTH = 32;
112 private static final int ICMP_PROCESSOR_PRIORITY = 99;
113
114 private static final String ICMP_COUNTER_NAME = "icmp-id-counter";
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected CoreService coreService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected PacketService packetService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected FlowRuleService flowRuleService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected StorageService storageService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected LeadershipService leadershipService;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected MastershipService mastershipService;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
135 protected ClusterService clusterService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 protected OpenstackNodeService osNodeService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 protected OpenstackNetworkService osNetworkService;
142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
144 protected OpenstackFlowRuleService osFlowRuleService;
145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
147 protected InstancePortService instancePortService;
148
149 private final ExecutorService eventExecutor = newSingleThreadScheduledExecutor(
150 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
151 private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
152 private ConsistentMap<String, Reachability> icmpReachabilityMap;
153 private AtomicCounter icmpIdCounter;
154
155 private static final KryoNamespace SERIALIZER_DEFAULT_MAP = KryoNamespace.newBuilder()
156 .register(KryoNamespaces.API)
157 .register(Reachability.class)
158 .register(DefaultReachability.class)
159 .build();
160
161 private Set<String> icmpIds = Sets.newConcurrentHashSet();
162
163 private ApplicationId appId;
164 private NodeId localNodeId;
165
166 @Activate
167 protected void activate() {
168
169 appId = coreService.registerApplication(OPENSTACK_NETWORKING_APP_ID);
170 packetService.addProcessor(packetProcessor,
171 PacketProcessor.director(ICMP_PROCESSOR_PRIORITY));
172
173 localNodeId = clusterService.getLocalNode().id();
174 leadershipService.runForLeadership(appId.name());
175
176 icmpReachabilityMap = storageService.<String, Reachability>consistentMapBuilder()
177 .withSerializer(Serializer.using(SERIALIZER_DEFAULT_MAP))
178 .withName(SERIALIZER_NAME)
179 .withApplicationId(appId)
180 .build();
181
182 icmpIdCounter = storageService.getAtomicCounter(ICMP_COUNTER_NAME);
183
184 log.info("Started");
185 }
186
187 @Deactivate
188 protected void deactivate() {
189
190 packetService.removeProcessor(packetProcessor);
191 leadershipService.withdraw(appId.name());
192 eventExecutor.shutdown();
193
194 log.info("Stopped");
195 }
196
Jian Lia1186772018-07-27 18:06:41 +0900197 @Override
Jian Li0b93b002018-07-31 13:41:08 +0900198 public Map<String, Reachability> probeEastWestBulk() {
199
200 // install flow rules to enforce ICMP_REQUEST to be tagged and direct to ACL table
201 eventExecutor.execute(() -> setAllVidTagRule(true));
202
203 // install flow rules to enforce forwarding ICMP_REPLY to controller
204 eventExecutor.execute(() -> setAllIcmpReplyRule(true));
205
206 icmpReachabilityMap.clear();
207
208 // send ICMP PACKET_OUT to all connect VMs whose instance port state is ACTIVE
209 Set<InstancePort> activePorts = instancePortService.instancePorts().stream()
210 .filter(p -> p.state() == ACTIVE)
211 .collect(Collectors.toSet());
212
213 timeoutSupplier(activePorts.size(), VID_TAG_RULE_INSTALL_TIMEOUT_MS, this::checkAllVidTagRules);
214 timeoutSupplier(activePorts.size(), ICMP_RULE_INSTALL_TIMEOUT_MS, this::checkAllIcmpReplyRules);
215
216 for (InstancePort srcPort : activePorts) {
217
218 // we only let the master of the switch where the source host
219 // is attached to send out ICMP request packet
220 if (!mastershipService.isLocalMaster(srcPort.deviceId())) {
221 continue;
222 }
223
224 for (InstancePort dstPort : activePorts) {
225 // if the source and destination ports are identical, we do
226 // not probe the reachability
227 if (srcPort.equals(dstPort)) {
228 continue;
229 }
230
231 // if the two ports are located in different types of networks,
232 // we do not probe the reachability
233 if (!osNetworkService.networkType(srcPort.networkId())
234 .equals(osNetworkService.networkType(dstPort.networkId()))) {
235 continue;
236 }
237
238 sendIcmpEchoRequest(srcPort, dstPort);
239 }
240 }
241
242 long count = icmpReachabilityMap.asJavaMap().values().stream()
243 .filter(r -> !r.isReachable()).count();
244
245 BooleanSupplier checkReachability = () -> icmpReachabilityMap.asJavaMap()
246 .values().stream().allMatch(Reachability::isReachable);
247
248 timeoutSupplier(count, ICMP_REPLY_TIMEOUT_MS, checkReachability);
249
250 // uninstall ICMP_REQUEST VID tagging rules
251 eventExecutor.execute(() -> setAllVidTagRule(false));
252
253 // uninstall ICMP_REPLY enforcing rules
254 eventExecutor.execute(() -> setAllIcmpReplyRule(false));
255
256 return icmpReachabilityMap.asJavaMap();
257 }
258
259 @Override
Jian Lie189c1c2018-08-08 15:55:08 +0900260 public Reachability probeEastWest(InstancePort srcPort, InstancePort dstPort) {
Jian Li0b93b002018-07-31 13:41:08 +0900261
262 Reachability.Builder rBuilder = DefaultReachability.builder()
Jian Lie189c1c2018-08-08 15:55:08 +0900263 .srcIp(srcPort.ipAddress())
264 .dstIp(dstPort.ipAddress());
Jian Li0b93b002018-07-31 13:41:08 +0900265
Jian Lie189c1c2018-08-08 15:55:08 +0900266 if (srcPort.equals(dstPort)) {
Jian Li0b93b002018-07-31 13:41:08 +0900267 // self probing should always return true
268 rBuilder.isReachable(true);
269 return rBuilder.build();
270 } else {
Jian Li0b93b002018-07-31 13:41:08 +0900271 if (srcPort.state() == ACTIVE && dstPort.state() == ACTIVE) {
272
273 // install flow rules to enforce ICMP_REQUEST to be tagged and direct to ACL table
274 eventExecutor.execute(() -> setVidTagRule(srcPort, true));
275
276 // install flow rules to enforce forwarding ICMP_REPLY to controller
277 eventExecutor.execute(() -> setIcmpReplyRule(srcPort, true));
278
279 timeoutPredicate(1, VID_TAG_RULE_INSTALL_TIMEOUT_MS,
280 this::checkVidTagRule, srcPort.ipAddress().toString());
281
282 timeoutPredicate(1, ICMP_RULE_INSTALL_TIMEOUT_MS,
283 this::checkIcmpReplyRule, srcPort.ipAddress().toString());
284
285 // send out ICMP ECHO request
286 sendIcmpEchoRequest(srcPort, dstPort);
287
288 BooleanSupplier checkReachability = () -> icmpReachabilityMap.asJavaMap()
289 .values().stream().allMatch(Reachability::isReachable);
290
291 timeoutSupplier(1, ICMP_REPLY_TIMEOUT_MS, checkReachability);
292
293 // uninstall ICMP_REQUEST VID tagging rules
294 eventExecutor.execute(() -> setVidTagRule(srcPort, false));
295
296 // uninstall ICMP_REPLY enforcing rules
297 eventExecutor.execute(() -> setIcmpReplyRule(srcPort, false));
298
299 return icmpReachabilityMap.asJavaMap()
300 .get(String.valueOf(icmpIdCounter.get()));
301
302 } else {
303 rBuilder.isReachable(false);
304 return rBuilder.build();
305 }
306 }
307 }
308
309 @Override
310 public Map<String, Reachability> probeNorthSouth() {
311 // TODO: require implementation
312 return null;
313 }
314
315 @Override
316 public Reachability probeNorthSouth(String netId, IpAddress ip) {
317 // TODO: require implementation
318 return null;
319 }
320
321 /**
322 * Checks whether all of ICMP reply rules are added or not.
323 *
324 * @return true if all of ICMP reply rules are added, false otherwise
325 */
326 private boolean checkAllIcmpReplyRules() {
327
328 Set<InstancePort> activePorts = instancePortService.instancePorts().stream()
329 .filter(p -> p.state() == ACTIVE).collect(Collectors.toSet());
330
331 for (InstancePort port : activePorts) {
332 if (!checkIcmpReplyRule(port.ipAddress().toString())) {
333 return false;
334 }
335 }
336
337 return true;
338 }
339
340 /**
341 * Checks whether ICMP reply rule is added or not.
342 *
343 * @param dstIp destination IP address
344 * @return true if ICMP reply rule is added, false otherwise
345 */
346 private boolean checkIcmpReplyRule(String dstIp) {
347 for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
348 TrafficSelector selector = entry.selector();
349
350 IPCriterion dstIpCriterion = (IPCriterion) selector.getCriterion(IPV4_DST);
351
352 if (dstIpCriterion != null &&
353 dstIp.equals(dstIpCriterion.ip().address().toString()) &&
354 entry.state() == ADDED) {
355 return true;
356 }
357 }
358
359 return false;
360 }
361
362 /**
363 * Checks whether all of ICMP request VID tagging rules are added or not.
364 *
365 * @return true if the rule is added, false otherwise
366 */
367 private boolean checkAllVidTagRules() {
368 Set<InstancePort> activePorts = instancePortService.instancePorts().stream()
369 .filter(p -> p.state() == ACTIVE).collect(Collectors.toSet());
370
371 for (InstancePort port : activePorts) {
372 if (!checkVidTagRule(port.ipAddress().toString())) {
373 return false;
374 }
375 }
376
377 return true;
378 }
379
380 /**
381 * Checks whether ICMP request VID tagging rule is added or not.
382 *
383 * @param srcIp source IP address
384 * @return true if the rule is added, false otherwise
385 */
386 private boolean checkVidTagRule(String srcIp) {
387 for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
388 TrafficSelector selector = entry.selector();
389
390 IPCriterion srcIpCriterion = (IPCriterion) selector.getCriterion(IPV4_SRC);
391
392 if (srcIpCriterion != null &&
393 srcIp.equals(srcIpCriterion.ip().address().toString()) &&
394 entry.state() == ADDED) {
395 return true;
396 }
397 }
398
399 return false;
400 }
401
402 /**
403 * Installs/uninstalls all of the flow rules to match ingress fake ICMP requests.
404 *
405 * @param install installation flag
406 */
407 private void setAllVidTagRule(boolean install) {
408 osNodeService.nodes(COMPUTE).forEach(n ->
409 instancePortService.instancePorts().stream()
410 .filter(p -> p.deviceId().equals(n.intgBridge()))
411 .forEach(p -> setVidTagRule(p, install))
412 );
413 }
414
415 /**
416 * Installs/uninstalls a flow rule to match ingress fake ICMP request packets,
417 * and tags VNI/VID, direct the tagged packet to ACL table.
418 *
419 * @param port instance port
420 * @param install installation flag
421 */
422 private void setVidTagRule(InstancePort port, boolean install) {
423 TrafficSelector selector = DefaultTrafficSelector.builder()
424 .matchEthType(Ethernet.TYPE_IPV4)
425 .matchIPSrc(IpPrefix.valueOf(port.ipAddress(), PREFIX_LENGTH))
426 .build();
427
428 TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder()
429 .setTunnelId(getSegId(osNetworkService, port))
430 .transition(ACL_TABLE);
431
432 osFlowRuleService.setRule(
433 appId,
434 port.deviceId(),
435 selector,
436 tb.build(),
437 PRIORITY_ICMP_PROBE_RULE,
438 VTAG_TABLE,
439 install);
440 }
441
442 /**
443 * Installs/uninstalls all of the flow rules to match ICMP reply packets.
444 *
445 * @param install installation flag
446 */
447 private void setAllIcmpReplyRule(boolean install) {
448 osNodeService.nodes(COMPUTE).forEach(n ->
449 instancePortService.instancePorts().stream()
450 .filter(p -> p.deviceId().equals(n.intgBridge()))
451 .forEach(p -> setIcmpReplyRule(p, install))
452 );
453 }
454
455 /**
456 * Installs/uninstalls a flow rule to match ICMP reply packets, direct all
457 * ICMP reply packets to the controller.
458 *
459 * @param install installation flag
460 */
461 private void setIcmpReplyRule(InstancePort port, boolean install) {
462 TrafficSelector selector = DefaultTrafficSelector.builder()
463 .matchEthType(Ethernet.TYPE_IPV4)
464 .matchIPDst(IpPrefix.valueOf(port.ipAddress(), PREFIX_LENGTH))
465 .matchIPProtocol(IPv4.PROTOCOL_ICMP)
466 .matchIcmpType(ICMP.TYPE_ECHO_REPLY)
467 .build();
468
469 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
470 .punt()
471 .build();
472
473 osFlowRuleService.setRule(
474 appId,
475 port.deviceId(),
476 selector,
477 treatment,
478 PRIORITY_ICMP_PROBE_RULE,
479 FORWARDING_TABLE,
480 install);
481 }
482
483 /**
484 * Sends out ICMP ECHO REQUEST to destined VM.
485 *
486 * @param srcPort source instance port
487 * @param dstPort destination instance port
488 */
489 private void sendIcmpEchoRequest(InstancePort srcPort, InstancePort dstPort) {
490
491 short icmpSeq = INITIAL_SEQ;
492
493 short icmpId = (short) icmpIdCounter.incrementAndGet();
494
495 for (int i = 0; i < MAX_ICMP_GEN; i++) {
496 packetService.emit(buildIcmpOutputPacket(srcPort, dstPort, icmpId, icmpSeq));
497 icmpSeq++;
498 }
499 }
500
501 /**
502 * Builds ICMP Outbound packet.
503 *
504 * @param srcPort source instance port
505 * @param dstPort destination instance port
506 * @param icmpId ICMP identifier
507 * @param icmpSeq ICMP sequence number
508 */
509 private OutboundPacket buildIcmpOutputPacket(InstancePort srcPort,
510 InstancePort dstPort,
511 short icmpId,
512 short icmpSeq) {
513
514 // TODO: need to encapsulate the frame into VXLAN/VLAN and transit the
515 // packet to TABLE 0 in order to force the packet to go through all pipelines
516 Ethernet ethFrame = constructIcmpPacket(srcPort, dstPort, icmpId, icmpSeq);
517
518 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
519
520 // we send out the packet to ingress table (index is 0) of source OVS
521 // to enforce the Outbound packet to go through the ingress and egress
522 // pipeline
523 tBuilder.setOutput(TABLE);
524
525 Reachability reachability = DefaultReachability.builder()
526 .srcIp(srcPort.ipAddress())
527 .dstIp(dstPort.ipAddress())
528 .isReachable(false)
529 .build();
530
531 icmpReachabilityMap.put(String.valueOf(icmpId), reachability);
532 icmpIds.add(String.valueOf(icmpId));
533
534 return new DefaultOutboundPacket(
535 srcPort.deviceId(),
536 tBuilder.build(),
537 ByteBuffer.wrap(ethFrame.serialize()));
538 }
539
540 /**
541 * Constructs an ICMP packet with given source and destination IP/MAC.
542 *
543 * @param srcPort source instance port
544 * @param dstPort destination instance port
545 * @param icmpId ICMP identifier
546 * @param icmpSeq ICMP sequence number
547 * @return an ethernet frame which contains ICMP payload
548 */
549 private Ethernet constructIcmpPacket(InstancePort srcPort,
550 InstancePort dstPort,
551 short icmpId, short icmpSeq) {
552 // Ethernet frame
553 Ethernet ethFrame = new Ethernet();
554
555 ethFrame.setEtherType(TYPE_IPV4);
556 ethFrame.setSourceMACAddress(srcPort.macAddress());
557
558 boolean isRemote = !srcPort.deviceId().equals(dstPort.deviceId());
559
560 if (isRemote) {
561 // if the source and destination VMs are located in different OVS,
562 // we will assign fake gateway MAC as the destination MAC
563 ethFrame.setDestinationMACAddress(DEFAULT_GATEWAY_MAC);
564 } else {
565 ethFrame.setDestinationMACAddress(dstPort.macAddress());
566 }
567
568 // IP packet
569 IPv4 iPacket = new IPv4();
570 iPacket.setDestinationAddress(dstPort.ipAddress().toString());
571 iPacket.setSourceAddress(srcPort.ipAddress().toString());
572 iPacket.setTtl(TTL);
573 iPacket.setProtocol(IPv4.PROTOCOL_ICMP);
574
575 // ICMP packet
576 ICMP icmp = new ICMP();
577 icmp.setIcmpType(TYPE_ECHO_REQUEST)
578 .setIcmpCode(TYPE_ECHO_REQUEST)
579 .resetChecksum();
580
581 // ICMP ECHO packet
582 ICMPEcho icmpEcho = new ICMPEcho();
583 icmpEcho.setIdentifier(icmpId)
584 .setSequenceNum(icmpSeq);
585
586 ByteBuffer byteBufferIcmpEcho = ByteBuffer.wrap(icmpEcho.serialize());
587
588 try {
589 icmp.setPayload(ICMPEcho.deserializer().deserialize(byteBufferIcmpEcho.array(),
590 0, ICMPEcho.ICMP_ECHO_HEADER_LENGTH));
591 } catch (DeserializationException e) {
592 log.warn("Failed to deserialize ICMP ECHO REQUEST packet");
593 }
594
595 ByteBuffer byteBufferIcmp = ByteBuffer.wrap(icmp.serialize());
596
597 try {
598 iPacket.setPayload(ICMP.deserializer().deserialize(byteBufferIcmp.array(),
599 0,
600 byteBufferIcmp.array().length));
601 } catch (DeserializationException e) {
602 log.warn("Failed to deserialize ICMP packet");
603 }
604
605 ethFrame.setPayload(iPacket);
606
607 return ethFrame;
608 }
609
610 /**
611 * Handles ICMP ECHO REPLY packets.
612 *
613 * @param ipPacket IP packet
614 * @param icmp ICMP packet
615 */
616 private void handleIcmpEchoReply(IPv4 ipPacket, ICMP icmp) {
617
618 String icmpKey = icmpId(icmp);
619
620 String srcIp = IPv4.fromIPv4Address(ipPacket.getDestinationAddress());
621 String dstIp = IPv4.fromIPv4Address(ipPacket.getSourceAddress());
622
623 Reachability reachability = DefaultReachability.builder()
624 .srcIp(IpAddress.valueOf(srcIp))
625 .dstIp(IpAddress.valueOf(dstIp))
626 .isReachable(false)
627 .build();
628
629 icmpReachabilityMap.computeIfPresent(icmpKey, (key, value) -> {
630 if (value.equals(reachability)) {
631
632 log.debug("src: {}, dst: {} is reachable!", value.dstIp(), value.srcIp());
633
634 return DefaultReachability.builder()
635 .srcIp(IpAddress.valueOf(srcIp))
636 .dstIp(IpAddress.valueOf(dstIp))
637 .isReachable(true)
638 .build();
639 }
640 return reachability;
641 });
642 }
643
644 /**
645 * Obtains an unique ICMP key.
646 *
647 * @param icmp ICMP packet
648 * @return ICMP key
649 */
650 private String icmpId(ICMP icmp) {
651 ICMPEcho echo = (ICMPEcho) icmp.getPayload();
652 checkNotNull(echo);
653
654 short icmpId = echo.getIdentifier();
655
656 return String.valueOf(icmpId);
657 }
658
659 /**
660 * Holds the current thread unit the timeout expires, during the hold the
661 * thread periodically execute the given method.
662 *
663 * @param count count of unit
664 * @param unit unit
665 * @param predicate predicate
666 * @param predicateArg predicate argument
667 */
668 private void timeoutPredicate(long count, int unit,
669 Predicate<String> predicate, String predicateArg) {
670 long timeoutExpiredMs = System.currentTimeMillis() + unit * count;
671
672 while (true) {
673
674 long waitMs = timeoutExpiredMs - System.currentTimeMillis();
675
676 if (predicate.test(predicateArg)) {
677 break;
678 }
679
680 if (waitMs <= 0) {
681 break;
682 }
683 }
684 }
685
686 /**
687 * Holds the current thread unit the timeout expires, during the hold the
688 * thread periodically execute the given method.
689 *
690 * @param count count of unit
691 * @param unit unit
692 * @param supplier boolean supplier
693 */
694 private void timeoutSupplier(long count, int unit, BooleanSupplier supplier) {
695 long timeoutExpiredMs = System.currentTimeMillis() + unit * count;
696
697 while (true) {
698
699 long waitMs = timeoutExpiredMs - System.currentTimeMillis();
700
701 if (supplier.getAsBoolean()) {
702 break;
703 }
704
705 if (waitMs <= 0) {
706 break;
707 }
708 }
709 }
710
711 private class InternalPacketProcessor implements PacketProcessor {
712
713 @Override
714 public void process(PacketContext context) {
715 if (context.isHandled()) {
716 return;
717 }
718
719 InboundPacket pkt = context.inPacket();
720 Ethernet ethernet = pkt.parsed();
721 if (ethernet == null || ethernet.getEtherType() == Ethernet.TYPE_ARP) {
722 return;
723 }
724
725 IPv4 iPacket = (IPv4) ethernet.getPayload();
726 if (iPacket.getProtocol() == IPv4.PROTOCOL_ICMP) {
727 eventExecutor.execute(() -> processIcmpPacket(context, ethernet));
728 }
729 }
730
731 /**
732 * Processes the received ICMP packet.
733 *
734 * @param context packet context
735 * @param ethernet ethernet
736 */
737 private void processIcmpPacket(PacketContext context, Ethernet ethernet) {
738 IPv4 ipPacket = (IPv4) ethernet.getPayload();
739 ICMP icmp = (ICMP) ipPacket.getPayload();
740 log.trace("Processing ICMP packet source MAC:{}, source IP:{}," +
741 "dest MAC:{}, dest IP:{}",
742 ethernet.getSourceMAC(),
743 IpAddress.valueOf(ipPacket.getSourceAddress()),
744 ethernet.getDestinationMAC(),
745 IpAddress.valueOf(ipPacket.getDestinationAddress()));
746
747 String icmpId = icmpId(icmp);
748
749 // if the ICMP ID is not contained in ICMP ID set, we do not handle it
750 if (!icmpIds.contains(icmpId)) {
751 return;
752 }
753
754 switch (icmp.getIcmpType()) {
755 case TYPE_ECHO_REPLY:
756 handleIcmpEchoReply(ipPacket, icmp);
757 context.block();
758 icmpIds.remove(icmpId);
759 break;
760 default:
761 break;
762 }
763 }
Jian Lia1186772018-07-27 18:06:41 +0900764 }
765}