blob: 3124da6d82cd9cef23ac3bfc77595006ae04b453 [file] [log] [blame]
Jian Lia1186772018-07-27 18:06:41 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktroubleshoot.impl;
17
Jian Li0b93b002018-07-31 13:41:08 +090018import com.google.common.collect.Sets;
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onlab.packet.DeserializationException;
26import org.onlab.packet.Ethernet;
27import org.onlab.packet.ICMP;
28import org.onlab.packet.ICMPEcho;
29import org.onlab.packet.IPv4;
30import org.onlab.packet.IpAddress;
31import org.onlab.packet.IpPrefix;
32import org.onlab.util.KryoNamespace;
33import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.LeadershipService;
35import org.onosproject.cluster.NodeId;
36import org.onosproject.core.ApplicationId;
37import org.onosproject.core.CoreService;
38import org.onosproject.mastership.MastershipService;
39import org.onosproject.net.flow.DefaultTrafficSelector;
40import org.onosproject.net.flow.DefaultTrafficTreatment;
41import org.onosproject.net.flow.FlowEntry;
42import org.onosproject.net.flow.FlowRuleService;
43import org.onosproject.net.flow.TrafficSelector;
44import org.onosproject.net.flow.TrafficTreatment;
45import org.onosproject.net.flow.criteria.IPCriterion;
46import org.onosproject.net.packet.DefaultOutboundPacket;
47import org.onosproject.net.packet.InboundPacket;
48import org.onosproject.net.packet.OutboundPacket;
49import org.onosproject.net.packet.PacketContext;
50import org.onosproject.net.packet.PacketProcessor;
51import org.onosproject.net.packet.PacketService;
52import org.onosproject.openstacknetworking.api.InstancePort;
53import org.onosproject.openstacknetworking.api.InstancePortService;
54import org.onosproject.openstacknetworking.api.OpenstackFlowRuleService;
55import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
56import org.onosproject.openstacknode.api.OpenstackNodeService;
Jian Lia1186772018-07-27 18:06:41 +090057import org.onosproject.openstacktroubleshoot.api.OpenstackTroubleshootService;
Jian Li0b93b002018-07-31 13:41:08 +090058import org.onosproject.openstacktroubleshoot.api.Reachability;
59import org.onosproject.store.serializers.KryoNamespaces;
60import org.onosproject.store.service.AtomicCounter;
61import org.onosproject.store.service.ConsistentMap;
62import org.onosproject.store.service.Serializer;
63import org.onosproject.store.service.StorageService;
64import org.slf4j.Logger;
65import org.slf4j.LoggerFactory;
66
67import java.nio.ByteBuffer;
68import java.util.Map;
69import java.util.Set;
70import java.util.concurrent.ExecutorService;
71import java.util.function.BooleanSupplier;
72import java.util.function.Predicate;
73import java.util.stream.Collectors;
74
75import static com.google.common.base.Preconditions.checkNotNull;
76import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
77import static org.onlab.packet.Ethernet.TYPE_IPV4;
78import static org.onlab.packet.ICMP.TYPE_ECHO_REPLY;
79import static org.onlab.packet.ICMP.TYPE_ECHO_REQUEST;
80import static org.onlab.util.Tools.groupedThreads;
81import static org.onosproject.net.PortNumber.TABLE;
82import static org.onosproject.net.flow.FlowEntry.FlowEntryState.ADDED;
83import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_DST;
84import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_SRC;
85import static org.onosproject.openstacknetworking.api.Constants.ACL_TABLE;
86import static org.onosproject.openstacknetworking.api.Constants.DEFAULT_GATEWAY_MAC;
87import static org.onosproject.openstacknetworking.api.Constants.FORWARDING_TABLE;
88import static org.onosproject.openstacknetworking.api.Constants.OPENSTACK_NETWORKING_APP_ID;
89import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_ICMP_PROBE_RULE;
90import static org.onosproject.openstacknetworking.api.Constants.VTAG_TABLE;
91import static org.onosproject.openstacknetworking.api.InstancePort.State.ACTIVE;
92import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
93import static org.onosproject.openstacktroubleshoot.util.OpenstackTroubleshootUtil.getSegId;
Jian Lia1186772018-07-27 18:06:41 +090094
95/**
96 * Implementation of openstack troubleshoot app.
97 */
Jian Li0b93b002018-07-31 13:41:08 +090098@Component(immediate = true)
99@Service
Jian Lia1186772018-07-27 18:06:41 +0900100public class OpenstackTroubleshootManager implements OpenstackTroubleshootService {
Jian Li0b93b002018-07-31 13:41:08 +0900101
102 private final Logger log = LoggerFactory.getLogger(getClass());
103
104 private static final int VID_TAG_RULE_INSTALL_TIMEOUT_MS = 1000;
105 private static final int ICMP_RULE_INSTALL_TIMEOUT_MS = 1000;
106 private static final int ICMP_REPLY_TIMEOUT_MS = 3000;
107 private static final String SERIALIZER_NAME = "openstack-troubleshoot";
108 private static final byte TTL = 64;
109 private static final short INITIAL_SEQ = 1;
110 private static final short MAX_ICMP_GEN = 3;
111 private static final int PREFIX_LENGTH = 32;
112 private static final int ICMP_PROCESSOR_PRIORITY = 99;
113
114 private static final String ICMP_COUNTER_NAME = "icmp-id-counter";
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected CoreService coreService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected PacketService packetService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected FlowRuleService flowRuleService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected StorageService storageService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected LeadershipService leadershipService;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected MastershipService mastershipService;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
135 protected ClusterService clusterService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 protected OpenstackNodeService osNodeService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 protected OpenstackNetworkService osNetworkService;
142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
144 protected OpenstackFlowRuleService osFlowRuleService;
145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
147 protected InstancePortService instancePortService;
148
149 private final ExecutorService eventExecutor = newSingleThreadScheduledExecutor(
150 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
151 private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
152 private ConsistentMap<String, Reachability> icmpReachabilityMap;
153 private AtomicCounter icmpIdCounter;
154
155 private static final KryoNamespace SERIALIZER_DEFAULT_MAP = KryoNamespace.newBuilder()
156 .register(KryoNamespaces.API)
157 .register(Reachability.class)
158 .register(DefaultReachability.class)
159 .build();
160
161 private Set<String> icmpIds = Sets.newConcurrentHashSet();
162
163 private ApplicationId appId;
164 private NodeId localNodeId;
165
166 @Activate
167 protected void activate() {
168
169 appId = coreService.registerApplication(OPENSTACK_NETWORKING_APP_ID);
170 packetService.addProcessor(packetProcessor,
171 PacketProcessor.director(ICMP_PROCESSOR_PRIORITY));
172
173 localNodeId = clusterService.getLocalNode().id();
174 leadershipService.runForLeadership(appId.name());
175
176 icmpReachabilityMap = storageService.<String, Reachability>consistentMapBuilder()
177 .withSerializer(Serializer.using(SERIALIZER_DEFAULT_MAP))
178 .withName(SERIALIZER_NAME)
179 .withApplicationId(appId)
180 .build();
181
182 icmpIdCounter = storageService.getAtomicCounter(ICMP_COUNTER_NAME);
183
184 log.info("Started");
185 }
186
187 @Deactivate
188 protected void deactivate() {
189
190 packetService.removeProcessor(packetProcessor);
191 leadershipService.withdraw(appId.name());
192 eventExecutor.shutdown();
193
194 log.info("Stopped");
195 }
196
Jian Lia1186772018-07-27 18:06:41 +0900197 @Override
Jian Li0b93b002018-07-31 13:41:08 +0900198 public Map<String, Reachability> probeEastWestBulk() {
199
200 // install flow rules to enforce ICMP_REQUEST to be tagged and direct to ACL table
201 eventExecutor.execute(() -> setAllVidTagRule(true));
202
203 // install flow rules to enforce forwarding ICMP_REPLY to controller
204 eventExecutor.execute(() -> setAllIcmpReplyRule(true));
205
206 icmpReachabilityMap.clear();
207
208 // send ICMP PACKET_OUT to all connect VMs whose instance port state is ACTIVE
209 Set<InstancePort> activePorts = instancePortService.instancePorts().stream()
210 .filter(p -> p.state() == ACTIVE)
211 .collect(Collectors.toSet());
212
213 timeoutSupplier(activePorts.size(), VID_TAG_RULE_INSTALL_TIMEOUT_MS, this::checkAllVidTagRules);
214 timeoutSupplier(activePorts.size(), ICMP_RULE_INSTALL_TIMEOUT_MS, this::checkAllIcmpReplyRules);
215
216 for (InstancePort srcPort : activePorts) {
217
218 // we only let the master of the switch where the source host
219 // is attached to send out ICMP request packet
220 if (!mastershipService.isLocalMaster(srcPort.deviceId())) {
221 continue;
222 }
223
224 for (InstancePort dstPort : activePorts) {
225 // if the source and destination ports are identical, we do
226 // not probe the reachability
227 if (srcPort.equals(dstPort)) {
228 continue;
229 }
230
231 // if the two ports are located in different types of networks,
232 // we do not probe the reachability
233 if (!osNetworkService.networkType(srcPort.networkId())
234 .equals(osNetworkService.networkType(dstPort.networkId()))) {
235 continue;
236 }
237
238 sendIcmpEchoRequest(srcPort, dstPort);
239 }
240 }
241
242 long count = icmpReachabilityMap.asJavaMap().values().stream()
243 .filter(r -> !r.isReachable()).count();
244
245 BooleanSupplier checkReachability = () -> icmpReachabilityMap.asJavaMap()
246 .values().stream().allMatch(Reachability::isReachable);
247
248 timeoutSupplier(count, ICMP_REPLY_TIMEOUT_MS, checkReachability);
249
250 // uninstall ICMP_REQUEST VID tagging rules
251 eventExecutor.execute(() -> setAllVidTagRule(false));
252
253 // uninstall ICMP_REPLY enforcing rules
254 eventExecutor.execute(() -> setAllIcmpReplyRule(false));
255
256 return icmpReachabilityMap.asJavaMap();
257 }
258
259 @Override
260 public Reachability probeEastWest(String srcNetId, IpAddress srcIp,
261 String dstNetId, IpAddress dstIp) {
262
263 Reachability.Builder rBuilder = DefaultReachability.builder()
264 .srcIp(srcIp)
265 .dstIp(dstIp);
266
267 if (srcIp.equals(dstIp)) {
268 // self probing should always return true
269 rBuilder.isReachable(true);
270 return rBuilder.build();
271 } else {
272 InstancePort srcPort = instancePortService.instancePort(srcIp, srcNetId);
273 InstancePort dstPort = instancePortService.instancePort(dstIp, dstNetId);
274
275 if (srcPort.state() == ACTIVE && dstPort.state() == ACTIVE) {
276
277 // install flow rules to enforce ICMP_REQUEST to be tagged and direct to ACL table
278 eventExecutor.execute(() -> setVidTagRule(srcPort, true));
279
280 // install flow rules to enforce forwarding ICMP_REPLY to controller
281 eventExecutor.execute(() -> setIcmpReplyRule(srcPort, true));
282
283 timeoutPredicate(1, VID_TAG_RULE_INSTALL_TIMEOUT_MS,
284 this::checkVidTagRule, srcPort.ipAddress().toString());
285
286 timeoutPredicate(1, ICMP_RULE_INSTALL_TIMEOUT_MS,
287 this::checkIcmpReplyRule, srcPort.ipAddress().toString());
288
289 // send out ICMP ECHO request
290 sendIcmpEchoRequest(srcPort, dstPort);
291
292 BooleanSupplier checkReachability = () -> icmpReachabilityMap.asJavaMap()
293 .values().stream().allMatch(Reachability::isReachable);
294
295 timeoutSupplier(1, ICMP_REPLY_TIMEOUT_MS, checkReachability);
296
297 // uninstall ICMP_REQUEST VID tagging rules
298 eventExecutor.execute(() -> setVidTagRule(srcPort, false));
299
300 // uninstall ICMP_REPLY enforcing rules
301 eventExecutor.execute(() -> setIcmpReplyRule(srcPort, false));
302
303 return icmpReachabilityMap.asJavaMap()
304 .get(String.valueOf(icmpIdCounter.get()));
305
306 } else {
307 rBuilder.isReachable(false);
308 return rBuilder.build();
309 }
310 }
311 }
312
313 @Override
314 public Map<String, Reachability> probeNorthSouth() {
315 // TODO: require implementation
316 return null;
317 }
318
319 @Override
320 public Reachability probeNorthSouth(String netId, IpAddress ip) {
321 // TODO: require implementation
322 return null;
323 }
324
325 /**
326 * Checks whether all of ICMP reply rules are added or not.
327 *
328 * @return true if all of ICMP reply rules are added, false otherwise
329 */
330 private boolean checkAllIcmpReplyRules() {
331
332 Set<InstancePort> activePorts = instancePortService.instancePorts().stream()
333 .filter(p -> p.state() == ACTIVE).collect(Collectors.toSet());
334
335 for (InstancePort port : activePorts) {
336 if (!checkIcmpReplyRule(port.ipAddress().toString())) {
337 return false;
338 }
339 }
340
341 return true;
342 }
343
344 /**
345 * Checks whether ICMP reply rule is added or not.
346 *
347 * @param dstIp destination IP address
348 * @return true if ICMP reply rule is added, false otherwise
349 */
350 private boolean checkIcmpReplyRule(String dstIp) {
351 for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
352 TrafficSelector selector = entry.selector();
353
354 IPCriterion dstIpCriterion = (IPCriterion) selector.getCriterion(IPV4_DST);
355
356 if (dstIpCriterion != null &&
357 dstIp.equals(dstIpCriterion.ip().address().toString()) &&
358 entry.state() == ADDED) {
359 return true;
360 }
361 }
362
363 return false;
364 }
365
366 /**
367 * Checks whether all of ICMP request VID tagging rules are added or not.
368 *
369 * @return true if the rule is added, false otherwise
370 */
371 private boolean checkAllVidTagRules() {
372 Set<InstancePort> activePorts = instancePortService.instancePorts().stream()
373 .filter(p -> p.state() == ACTIVE).collect(Collectors.toSet());
374
375 for (InstancePort port : activePorts) {
376 if (!checkVidTagRule(port.ipAddress().toString())) {
377 return false;
378 }
379 }
380
381 return true;
382 }
383
384 /**
385 * Checks whether ICMP request VID tagging rule is added or not.
386 *
387 * @param srcIp source IP address
388 * @return true if the rule is added, false otherwise
389 */
390 private boolean checkVidTagRule(String srcIp) {
391 for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
392 TrafficSelector selector = entry.selector();
393
394 IPCriterion srcIpCriterion = (IPCriterion) selector.getCriterion(IPV4_SRC);
395
396 if (srcIpCriterion != null &&
397 srcIp.equals(srcIpCriterion.ip().address().toString()) &&
398 entry.state() == ADDED) {
399 return true;
400 }
401 }
402
403 return false;
404 }
405
406 /**
407 * Installs/uninstalls all of the flow rules to match ingress fake ICMP requests.
408 *
409 * @param install installation flag
410 */
411 private void setAllVidTagRule(boolean install) {
412 osNodeService.nodes(COMPUTE).forEach(n ->
413 instancePortService.instancePorts().stream()
414 .filter(p -> p.deviceId().equals(n.intgBridge()))
415 .forEach(p -> setVidTagRule(p, install))
416 );
417 }
418
419 /**
420 * Installs/uninstalls a flow rule to match ingress fake ICMP request packets,
421 * and tags VNI/VID, direct the tagged packet to ACL table.
422 *
423 * @param port instance port
424 * @param install installation flag
425 */
426 private void setVidTagRule(InstancePort port, boolean install) {
427 TrafficSelector selector = DefaultTrafficSelector.builder()
428 .matchEthType(Ethernet.TYPE_IPV4)
429 .matchIPSrc(IpPrefix.valueOf(port.ipAddress(), PREFIX_LENGTH))
430 .build();
431
432 TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder()
433 .setTunnelId(getSegId(osNetworkService, port))
434 .transition(ACL_TABLE);
435
436 osFlowRuleService.setRule(
437 appId,
438 port.deviceId(),
439 selector,
440 tb.build(),
441 PRIORITY_ICMP_PROBE_RULE,
442 VTAG_TABLE,
443 install);
444 }
445
446 /**
447 * Installs/uninstalls all of the flow rules to match ICMP reply packets.
448 *
449 * @param install installation flag
450 */
451 private void setAllIcmpReplyRule(boolean install) {
452 osNodeService.nodes(COMPUTE).forEach(n ->
453 instancePortService.instancePorts().stream()
454 .filter(p -> p.deviceId().equals(n.intgBridge()))
455 .forEach(p -> setIcmpReplyRule(p, install))
456 );
457 }
458
459 /**
460 * Installs/uninstalls a flow rule to match ICMP reply packets, direct all
461 * ICMP reply packets to the controller.
462 *
463 * @param install installation flag
464 */
465 private void setIcmpReplyRule(InstancePort port, boolean install) {
466 TrafficSelector selector = DefaultTrafficSelector.builder()
467 .matchEthType(Ethernet.TYPE_IPV4)
468 .matchIPDst(IpPrefix.valueOf(port.ipAddress(), PREFIX_LENGTH))
469 .matchIPProtocol(IPv4.PROTOCOL_ICMP)
470 .matchIcmpType(ICMP.TYPE_ECHO_REPLY)
471 .build();
472
473 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
474 .punt()
475 .build();
476
477 osFlowRuleService.setRule(
478 appId,
479 port.deviceId(),
480 selector,
481 treatment,
482 PRIORITY_ICMP_PROBE_RULE,
483 FORWARDING_TABLE,
484 install);
485 }
486
487 /**
488 * Sends out ICMP ECHO REQUEST to destined VM.
489 *
490 * @param srcPort source instance port
491 * @param dstPort destination instance port
492 */
493 private void sendIcmpEchoRequest(InstancePort srcPort, InstancePort dstPort) {
494
495 short icmpSeq = INITIAL_SEQ;
496
497 short icmpId = (short) icmpIdCounter.incrementAndGet();
498
499 for (int i = 0; i < MAX_ICMP_GEN; i++) {
500 packetService.emit(buildIcmpOutputPacket(srcPort, dstPort, icmpId, icmpSeq));
501 icmpSeq++;
502 }
503 }
504
505 /**
506 * Builds ICMP Outbound packet.
507 *
508 * @param srcPort source instance port
509 * @param dstPort destination instance port
510 * @param icmpId ICMP identifier
511 * @param icmpSeq ICMP sequence number
512 */
513 private OutboundPacket buildIcmpOutputPacket(InstancePort srcPort,
514 InstancePort dstPort,
515 short icmpId,
516 short icmpSeq) {
517
518 // TODO: need to encapsulate the frame into VXLAN/VLAN and transit the
519 // packet to TABLE 0 in order to force the packet to go through all pipelines
520 Ethernet ethFrame = constructIcmpPacket(srcPort, dstPort, icmpId, icmpSeq);
521
522 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
523
524 // we send out the packet to ingress table (index is 0) of source OVS
525 // to enforce the Outbound packet to go through the ingress and egress
526 // pipeline
527 tBuilder.setOutput(TABLE);
528
529 Reachability reachability = DefaultReachability.builder()
530 .srcIp(srcPort.ipAddress())
531 .dstIp(dstPort.ipAddress())
532 .isReachable(false)
533 .build();
534
535 icmpReachabilityMap.put(String.valueOf(icmpId), reachability);
536 icmpIds.add(String.valueOf(icmpId));
537
538 return new DefaultOutboundPacket(
539 srcPort.deviceId(),
540 tBuilder.build(),
541 ByteBuffer.wrap(ethFrame.serialize()));
542 }
543
544 /**
545 * Constructs an ICMP packet with given source and destination IP/MAC.
546 *
547 * @param srcPort source instance port
548 * @param dstPort destination instance port
549 * @param icmpId ICMP identifier
550 * @param icmpSeq ICMP sequence number
551 * @return an ethernet frame which contains ICMP payload
552 */
553 private Ethernet constructIcmpPacket(InstancePort srcPort,
554 InstancePort dstPort,
555 short icmpId, short icmpSeq) {
556 // Ethernet frame
557 Ethernet ethFrame = new Ethernet();
558
559 ethFrame.setEtherType(TYPE_IPV4);
560 ethFrame.setSourceMACAddress(srcPort.macAddress());
561
562 boolean isRemote = !srcPort.deviceId().equals(dstPort.deviceId());
563
564 if (isRemote) {
565 // if the source and destination VMs are located in different OVS,
566 // we will assign fake gateway MAC as the destination MAC
567 ethFrame.setDestinationMACAddress(DEFAULT_GATEWAY_MAC);
568 } else {
569 ethFrame.setDestinationMACAddress(dstPort.macAddress());
570 }
571
572 // IP packet
573 IPv4 iPacket = new IPv4();
574 iPacket.setDestinationAddress(dstPort.ipAddress().toString());
575 iPacket.setSourceAddress(srcPort.ipAddress().toString());
576 iPacket.setTtl(TTL);
577 iPacket.setProtocol(IPv4.PROTOCOL_ICMP);
578
579 // ICMP packet
580 ICMP icmp = new ICMP();
581 icmp.setIcmpType(TYPE_ECHO_REQUEST)
582 .setIcmpCode(TYPE_ECHO_REQUEST)
583 .resetChecksum();
584
585 // ICMP ECHO packet
586 ICMPEcho icmpEcho = new ICMPEcho();
587 icmpEcho.setIdentifier(icmpId)
588 .setSequenceNum(icmpSeq);
589
590 ByteBuffer byteBufferIcmpEcho = ByteBuffer.wrap(icmpEcho.serialize());
591
592 try {
593 icmp.setPayload(ICMPEcho.deserializer().deserialize(byteBufferIcmpEcho.array(),
594 0, ICMPEcho.ICMP_ECHO_HEADER_LENGTH));
595 } catch (DeserializationException e) {
596 log.warn("Failed to deserialize ICMP ECHO REQUEST packet");
597 }
598
599 ByteBuffer byteBufferIcmp = ByteBuffer.wrap(icmp.serialize());
600
601 try {
602 iPacket.setPayload(ICMP.deserializer().deserialize(byteBufferIcmp.array(),
603 0,
604 byteBufferIcmp.array().length));
605 } catch (DeserializationException e) {
606 log.warn("Failed to deserialize ICMP packet");
607 }
608
609 ethFrame.setPayload(iPacket);
610
611 return ethFrame;
612 }
613
614 /**
615 * Handles ICMP ECHO REPLY packets.
616 *
617 * @param ipPacket IP packet
618 * @param icmp ICMP packet
619 */
620 private void handleIcmpEchoReply(IPv4 ipPacket, ICMP icmp) {
621
622 String icmpKey = icmpId(icmp);
623
624 String srcIp = IPv4.fromIPv4Address(ipPacket.getDestinationAddress());
625 String dstIp = IPv4.fromIPv4Address(ipPacket.getSourceAddress());
626
627 Reachability reachability = DefaultReachability.builder()
628 .srcIp(IpAddress.valueOf(srcIp))
629 .dstIp(IpAddress.valueOf(dstIp))
630 .isReachable(false)
631 .build();
632
633 icmpReachabilityMap.computeIfPresent(icmpKey, (key, value) -> {
634 if (value.equals(reachability)) {
635
636 log.debug("src: {}, dst: {} is reachable!", value.dstIp(), value.srcIp());
637
638 return DefaultReachability.builder()
639 .srcIp(IpAddress.valueOf(srcIp))
640 .dstIp(IpAddress.valueOf(dstIp))
641 .isReachable(true)
642 .build();
643 }
644 return reachability;
645 });
646 }
647
648 /**
649 * Obtains an unique ICMP key.
650 *
651 * @param icmp ICMP packet
652 * @return ICMP key
653 */
654 private String icmpId(ICMP icmp) {
655 ICMPEcho echo = (ICMPEcho) icmp.getPayload();
656 checkNotNull(echo);
657
658 short icmpId = echo.getIdentifier();
659
660 return String.valueOf(icmpId);
661 }
662
663 /**
664 * Holds the current thread unit the timeout expires, during the hold the
665 * thread periodically execute the given method.
666 *
667 * @param count count of unit
668 * @param unit unit
669 * @param predicate predicate
670 * @param predicateArg predicate argument
671 */
672 private void timeoutPredicate(long count, int unit,
673 Predicate<String> predicate, String predicateArg) {
674 long timeoutExpiredMs = System.currentTimeMillis() + unit * count;
675
676 while (true) {
677
678 long waitMs = timeoutExpiredMs - System.currentTimeMillis();
679
680 if (predicate.test(predicateArg)) {
681 break;
682 }
683
684 if (waitMs <= 0) {
685 break;
686 }
687 }
688 }
689
690 /**
691 * Holds the current thread unit the timeout expires, during the hold the
692 * thread periodically execute the given method.
693 *
694 * @param count count of unit
695 * @param unit unit
696 * @param supplier boolean supplier
697 */
698 private void timeoutSupplier(long count, int unit, BooleanSupplier supplier) {
699 long timeoutExpiredMs = System.currentTimeMillis() + unit * count;
700
701 while (true) {
702
703 long waitMs = timeoutExpiredMs - System.currentTimeMillis();
704
705 if (supplier.getAsBoolean()) {
706 break;
707 }
708
709 if (waitMs <= 0) {
710 break;
711 }
712 }
713 }
714
715 private class InternalPacketProcessor implements PacketProcessor {
716
717 @Override
718 public void process(PacketContext context) {
719 if (context.isHandled()) {
720 return;
721 }
722
723 InboundPacket pkt = context.inPacket();
724 Ethernet ethernet = pkt.parsed();
725 if (ethernet == null || ethernet.getEtherType() == Ethernet.TYPE_ARP) {
726 return;
727 }
728
729 IPv4 iPacket = (IPv4) ethernet.getPayload();
730 if (iPacket.getProtocol() == IPv4.PROTOCOL_ICMP) {
731 eventExecutor.execute(() -> processIcmpPacket(context, ethernet));
732 }
733 }
734
735 /**
736 * Processes the received ICMP packet.
737 *
738 * @param context packet context
739 * @param ethernet ethernet
740 */
741 private void processIcmpPacket(PacketContext context, Ethernet ethernet) {
742 IPv4 ipPacket = (IPv4) ethernet.getPayload();
743 ICMP icmp = (ICMP) ipPacket.getPayload();
744 log.trace("Processing ICMP packet source MAC:{}, source IP:{}," +
745 "dest MAC:{}, dest IP:{}",
746 ethernet.getSourceMAC(),
747 IpAddress.valueOf(ipPacket.getSourceAddress()),
748 ethernet.getDestinationMAC(),
749 IpAddress.valueOf(ipPacket.getDestinationAddress()));
750
751 String icmpId = icmpId(icmp);
752
753 // if the ICMP ID is not contained in ICMP ID set, we do not handle it
754 if (!icmpIds.contains(icmpId)) {
755 return;
756 }
757
758 switch (icmp.getIcmpType()) {
759 case TYPE_ECHO_REPLY:
760 handleIcmpEchoReply(ipPacket, icmp);
761 context.block();
762 icmpIds.remove(icmpId);
763 break;
764 default:
765 break;
766 }
767 }
Jian Lia1186772018-07-27 18:06:41 +0900768 }
769}