blob: b885f8ab2ae0ec40ca88385aff7bae528757878b [file] [log] [blame]
Kalhee Kimba366062017-11-07 16:32:09 +00001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.routing.fpm;
18
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Modified;
23import org.apache.felix.scr.annotations.Property;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.ReferencePolicy;
27import org.apache.felix.scr.annotations.Service;
28import org.jboss.netty.bootstrap.ServerBootstrap;
29import org.jboss.netty.channel.Channel;
30import org.jboss.netty.channel.ChannelException;
31import org.jboss.netty.channel.ChannelFactory;
32import org.jboss.netty.channel.ChannelPipeline;
33import org.jboss.netty.channel.ChannelPipelineFactory;
34import org.jboss.netty.channel.Channels;
35import org.jboss.netty.channel.group.ChannelGroup;
36import org.jboss.netty.channel.group.DefaultChannelGroup;
37import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
38import org.jboss.netty.handler.timeout.IdleStateHandler;
39import org.jboss.netty.util.HashedWheelTimer;
40import org.onlab.packet.IpAddress;
41import org.onlab.packet.Ip4Address;
42import org.onlab.packet.Ip6Address;
43import org.onlab.packet.IpPrefix;
44import org.onosproject.net.intf.Interface;
45import org.onosproject.net.host.InterfaceIpAddress;
46import org.onosproject.net.intf.InterfaceService;
47import org.onlab.util.KryoNamespace;
48import org.onlab.util.Tools;
49import org.onosproject.cfg.ComponentConfigService;
50import org.onosproject.cluster.ClusterService;
51import org.onosproject.cluster.NodeId;
52import org.onosproject.core.CoreService;
53import org.onosproject.core.ApplicationId;
54import org.onosproject.routeservice.Route;
55import org.onosproject.routeservice.RouteAdminService;
56import org.onosproject.routing.fpm.protocol.FpmHeader;
57import org.onosproject.routing.fpm.protocol.Netlink;
58import org.onosproject.routing.fpm.protocol.NetlinkMessageType;
59import org.onosproject.routing.fpm.protocol.RouteAttribute;
60import org.onosproject.routing.fpm.protocol.RouteAttributeDst;
61import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
62import org.onosproject.routing.fpm.protocol.RtNetlink;
63import org.onosproject.routing.fpm.protocol.RtProtocol;
64import org.onosproject.store.serializers.KryoNamespaces;
65import org.onosproject.store.service.ConsistentMap;
66import org.onosproject.store.service.Serializer;
67import org.onosproject.store.service.StorageService;
68import org.onosproject.store.StoreDelegate;
69import org.osgi.service.component.ComponentContext;
70import org.slf4j.Logger;
71import org.slf4j.LoggerFactory;
72
73import java.net.InetSocketAddress;
74import java.util.ArrayList;
75import java.util.Collection;
76import java.util.Collections;
77import java.util.Dictionary;
78import java.util.HashSet;
79import java.util.LinkedList;
80
81import java.util.List;
82import java.util.Map;
83import java.util.Set;
84import java.util.concurrent.ConcurrentHashMap;
85import java.util.stream.Collectors;
86
87import static java.util.concurrent.Executors.newCachedThreadPool;
88import static org.onlab.util.Tools.groupedThreads;
89import org.onosproject.routing.fpm.api.FpmPrefixStoreEvent;
90import org.onosproject.routing.fpm.api.FpmPrefixStore;
91import org.onosproject.routing.fpm.api.FpmRecord;
92
93/**
94 * Forwarding Plane Manager (FPM) route source.
95 */
96@Service
97@Component(immediate = true)
98public class FpmManager implements FpmInfoService {
99 private final Logger log = LoggerFactory.getLogger(getClass());
100
101 private static final int FPM_PORT = 2620;
102 private static final String APP_NAME = "org.onosproject.fpm";
103 private static final int IDLE_TIMEOUT_SECS = 5;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected CoreService coreService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109 protected ComponentConfigService componentConfigService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112 protected RouteAdminService routeService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
115 protected ClusterService clusterService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
118 protected StorageService storageService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
121 protected InterfaceService interfaceService;
122
123 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
124 bind = "bindRipStore",
125 unbind = "unbindRipStore",
126 policy = ReferencePolicy.DYNAMIC,
127 target = "(fpm_type=RIP)")
128 protected volatile FpmPrefixStore ripStore;
129
130 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
131 bind = "bindDhcpStore",
132 unbind = "unbindDhcpStore",
133 policy = ReferencePolicy.DYNAMIC,
134 target = "(fpm_type=DHCP)")
135 protected volatile FpmPrefixStore dhcpStore;
136
137 private final StoreDelegate<FpmPrefixStoreEvent> fpmPrefixStoreDelegate
138 = new FpmPrefixStoreDelegate();
139
140 private ApplicationId appId;
141 private ServerBootstrap serverBootstrap;
142 private Channel serverChannel;
143 private ChannelGroup allChannels = new DefaultChannelGroup();
144
145 private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
146
147 private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
148
149 @Property(name = "clearRoutes", boolValue = true,
150 label = "Whether to clear routes when the FPM connection goes down")
151 private boolean clearRoutes = true;
152
153 @Property(name = "pdPushEnabled", boolValue = false,
154 label = "Whether to push prefixes to Quagga over fpm connection")
155 private boolean pdPushEnabled = false;
156
157 @Property(name = "pdPushNextHopIPv4", value = "",
158 label = "IPv4 next-hop address for PD Pushing.")
159 private Ip4Address pdPushNextHopIPv4 = null;
160
161 @Property(name = "pdPushNextHopIPv6", value = "",
162 label = "IPv6 next-hop address for PD Pushing.")
163 private Ip6Address pdPushNextHopIPv6 = null;
164
165 protected void bindRipStore(FpmPrefixStore store) {
166 if ((ripStore == null) && (store != null)) {
167 ripStore = store;
168 ripStore.setDelegate(fpmPrefixStoreDelegate);
169 for (Channel ch : allChannels) {
170 processRipStaticRoutes(ch);
171 }
172 }
173 }
174
175 protected void unbindRipStore(FpmPrefixStore store) {
176 if (ripStore == store) {
177 ripStore.unsetDelegate(fpmPrefixStoreDelegate);
178 ripStore = null;
179 }
180 }
181
182 protected void bindDhcpStore(FpmPrefixStore store) {
183 if ((dhcpStore == null) && (store != null)) {
184 dhcpStore = store;
185 dhcpStore.setDelegate(fpmPrefixStoreDelegate);
186 for (Channel ch : allChannels) {
187 processDhcpStaticRoutes(ch);
188 }
189 }
190 }
191
192 protected void unbindDhcpStore(FpmPrefixStore store) {
193 if (dhcpStore == store) {
194 dhcpStore.unsetDelegate(fpmPrefixStoreDelegate);
195 dhcpStore = null;
196 }
197 }
198
199 @Activate
200 protected void activate(ComponentContext context) {
201 componentConfigService.preSetProperty(
202 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
203 "distributed", "true");
204
205 componentConfigService.registerProperties(getClass());
206
207 KryoNamespace serializer = KryoNamespace.newBuilder()
208 .register(KryoNamespaces.API)
209 .register(FpmPeer.class)
210 .register(FpmConnectionInfo.class)
211 .build();
212 peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
213 .withName("fpm-connections")
214 .withSerializer(Serializer.using(serializer))
215 .build();
216
217 modified(context);
218 startServer();
219
220 appId = coreService.registerApplication(APP_NAME, peers::destroy);
221
222 log.info("Started");
223 }
224
225 @Deactivate
226 protected void deactivate() {
227 componentConfigService.preSetProperty(
228 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
229 "distributed", "false");
230
231 stopServer();
232 fpmRoutes.clear();
233 componentConfigService.unregisterProperties(getClass(), false);
234 log.info("Stopped");
235 }
236
237 @Modified
238 protected void modified(ComponentContext context) {
239 Dictionary<?, ?> properties = context.getProperties();
240 if (properties == null) {
241 return;
242 }
243 String strClearRoutes = Tools.get(properties, "clearRoutes");
244 if (strClearRoutes != null) {
245 clearRoutes = Boolean.parseBoolean(strClearRoutes);
246 log.info("clearRoutes is {}", clearRoutes);
247 }
248
249 String strPdPushEnabled = Tools.get(properties, "pdPushEnabled");
250 if (strPdPushEnabled != null) {
251 boolean oldValue = pdPushEnabled;
252 pdPushEnabled = Boolean.parseBoolean(strPdPushEnabled);
253 if (pdPushEnabled) {
254
255 pdPushNextHopIPv4 = null;
256 pdPushNextHopIPv6 = null;
257
258 String strPdPushNextHopIPv4 = Tools.get(properties, "pdPushNextHopIPv4");
259 if (strPdPushNextHopIPv4 != null) {
260 pdPushNextHopIPv4 = Ip4Address.valueOf(strPdPushNextHopIPv4);
261 }
262 String strPdPushNextHopIPv6 = Tools.get(properties, "pdPushNextHopIPv6");
263 if (strPdPushNextHopIPv6 != null) {
264 pdPushNextHopIPv6 = Ip6Address.valueOf(strPdPushNextHopIPv6);
265 }
266
267 if (pdPushNextHopIPv4 == null) {
268 pdPushNextHopIPv4 = interfaceService.getInterfaces()
269 .stream()
270 .filter(iface -> iface.name().contains("RUR"))
271 .map(Interface::ipAddressesList)
272 .flatMap(Collection::stream)
273 .map(InterfaceIpAddress::ipAddress)
274 .filter(IpAddress::isIp4)
275 .map(IpAddress::getIp4Address)
276 .findFirst()
277 .orElse(null);
278 }
279
280 if (pdPushNextHopIPv6 == null) {
281 pdPushNextHopIPv6 = interfaceService.getInterfaces()
282 .stream()
283 .filter(iface -> iface.name().contains("RUR"))
284 .map(Interface::ipAddressesList)
285 .flatMap(Collection::stream)
286 .map(InterfaceIpAddress::ipAddress)
287 .filter(IpAddress::isIp6)
288 .map(IpAddress::getIp6Address)
289 .findFirst()
290 .orElse(null);
291 }
292
293 log.info("PD pushing is enabled.");
294 if (pdPushNextHopIPv4 != null) {
295 log.info("ipv4 next-hop {}", pdPushNextHopIPv4.toString());
296 } else {
297 log.info("ipv4 next-hop is null");
298 }
299 if (pdPushNextHopIPv6 != null) {
300 log.info("ipv6 next-hop={}", pdPushNextHopIPv6.toString());
301 } else {
302 log.info("ipv6 next-hop is null");
303 }
304 if (!oldValue) {
305 processStaticRoutes();
306 }
307 } else {
308 log.info("PD pushing is disabled.");
309 }
310 }
311 }
312
313 private void startServer() {
314 HashedWheelTimer timer = new HashedWheelTimer(
315 groupedThreads("onos/fpm", "fpm-timer-%d", log));
316
317 ChannelFactory channelFactory = new NioServerSocketChannelFactory(
318 newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)),
319 newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log)));
320 ChannelPipelineFactory pipelineFactory = () -> {
321 // Allocate a new session per connection
322 IdleStateHandler idleHandler =
323 new IdleStateHandler(timer, IDLE_TIMEOUT_SECS, 0, 0);
324 FpmSessionHandler fpmSessionHandler =
325 new FpmSessionHandler(this, new InternalFpmListener());
326 FpmFrameDecoder fpmFrameDecoder = new FpmFrameDecoder();
327
328 // Setup the processing pipeline
329 ChannelPipeline pipeline = Channels.pipeline();
330 pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
331 pipeline.addLast("idle", idleHandler);
332 pipeline.addLast("FpmSession", fpmSessionHandler);
333 return pipeline;
334 };
335
336 InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT);
337
338 serverBootstrap = new ServerBootstrap(channelFactory);
339 serverBootstrap.setOption("child.reuseAddr", true);
340 serverBootstrap.setOption("child.keepAlive", true);
341 serverBootstrap.setOption("child.tcpNoDelay", true);
342 serverBootstrap.setPipelineFactory(pipelineFactory);
343 try {
344 serverChannel = serverBootstrap.bind(listenAddress);
345 allChannels.add(serverChannel);
346 } catch (ChannelException e) {
347 log.debug("Exception binding to FPM port {}: ",
348 listenAddress.getPort(), e);
349 stopServer();
350 }
351 }
352
353 private void stopServer() {
354 allChannels.close().awaitUninterruptibly();
355 allChannels.clear();
356 if (serverBootstrap != null) {
357 serverBootstrap.releaseExternalResources();
358 }
359
360 if (clearRoutes) {
361 peers.keySet().forEach(this::clearRoutes);
362 }
363 }
364
Kalhee Kim40beb722018-01-16 20:32:04 +0000365 private boolean routeInDhcpStore(IpPrefix prefix) {
366
367 if (dhcpStore != null) {
368 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
369 return dhcpRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
370 }
371 return false;
372 }
373
374 private boolean routeInRipStore(IpPrefix prefix) {
375
376 if (ripStore != null) {
377 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
378 return ripRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
379 }
380 return false;
381 }
382
Kalhee Kimba366062017-11-07 16:32:09 +0000383 private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
384 if (fpmMessage.type() == FpmHeader.FPM_TYPE_KEEPALIVE) {
385 return;
386 }
387
388 Netlink netlink = fpmMessage.netlink();
389 RtNetlink rtNetlink = netlink.rtNetlink();
390
391 if (log.isTraceEnabled()) {
392 log.trace("Received FPM message: {}", fpmMessage);
393 }
394
395 if (!(rtNetlink.protocol() == RtProtocol.ZEBRA ||
396 rtNetlink.protocol() == RtProtocol.UNSPEC)) {
397 log.trace("Ignoring non-zebra route");
398 return;
399 }
400
401 IpAddress dstAddress = null;
402 IpAddress gateway = null;
403
404 for (RouteAttribute attribute : rtNetlink.attributes()) {
405 if (attribute.type() == RouteAttribute.RTA_DST) {
406 RouteAttributeDst raDst = (RouteAttributeDst) attribute;
407 dstAddress = raDst.dstAddress();
408 } else if (attribute.type() == RouteAttribute.RTA_GATEWAY) {
409 RouteAttributeGateway raGateway = (RouteAttributeGateway) attribute;
410 gateway = raGateway.gateway();
411 }
412 }
413
414 if (dstAddress == null) {
415 log.error("Dst address missing!");
416 return;
417 }
418
419 IpPrefix prefix = IpPrefix.valueOf(dstAddress, rtNetlink.dstLength());
420
Kalhee Kim40beb722018-01-16 20:32:04 +0000421 // Ignore routes that we sent.
422 if ((prefix.isIp4() && (gateway.equals(pdPushNextHopIPv4))) ||
423 gateway.equals(pdPushNextHopIPv6)) {
424 if (routeInDhcpStore(prefix) || routeInRipStore(prefix)) {
425 return;
426 }
427 }
428
Kalhee Kimba366062017-11-07 16:32:09 +0000429 List<Route> updates = new LinkedList<>();
430 List<Route> withdraws = new LinkedList<>();
431
432 Route route;
433 switch (netlink.type()) {
434 case RTM_NEWROUTE:
435 if (gateway == null) {
436 // We ignore interface routes with no gateway for now.
437 return;
438 }
439 route = new Route(Route.Source.FPM, prefix, gateway, clusterService.getLocalNode().id());
440
441
442 Route oldRoute = fpmRoutes.get(peer).put(prefix, route);
443
444 if (oldRoute != null) {
445 log.trace("Swapping {} with {}", oldRoute, route);
446 withdraws.add(oldRoute);
447 }
448 updates.add(route);
449 break;
450 case RTM_DELROUTE:
451 Route existing = fpmRoutes.get(peer).remove(prefix);
452 if (existing == null) {
453 log.warn("Got delete for non-existent prefix");
454 return;
455 }
456
457 route = new Route(Route.Source.FPM, prefix, existing.nextHop(), clusterService.getLocalNode().id());
458
459 withdraws.add(route);
460 break;
461 case RTM_GETROUTE:
462 default:
463 break;
464 }
465
466 routeService.withdraw(withdraws);
467 routeService.update(updates);
468 }
469
470 private void clearRoutes(FpmPeer peer) {
471 log.info("Clearing all routes for peer {}", peer);
472 Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
473 if (routes != null) {
474 routeService.withdraw(routes.values());
475 }
476 }
477
478 public void processStaticRoutes() {
479 for (Channel ch : allChannels) {
480 processStaticRoutes(ch);
481 }
482 }
483
484 public void processStaticRoutes(Channel ch) {
485 processRipStaticRoutes(ch);
486 processDhcpStaticRoutes(ch);
487 }
488
489 private void processRipStaticRoutes(Channel ch) {
490
491 /* Get RIP static routes. */
492 if (ripStore != null) {
493 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
494 log.info("RIP store size is {}", ripRecords.size());
495
496 ripRecords.forEach(record -> sendRouteUpdateToChannel(true,
497 record.ipPrefix(), ch));
498 }
499 }
500
501 private void processDhcpStaticRoutes(Channel ch) {
502
503 /* Get Dhcp static routes. */
504 if (dhcpStore != null) {
505 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
506 log.info("Dhcp store size is {}", dhcpRecords.size());
507
508 dhcpRecords.forEach(record -> sendRouteUpdateToChannel(true,
509 record.ipPrefix(), ch));
510 }
511 }
512
513 private void sendRouteUpdateToChannel(boolean isAdd, IpPrefix prefix, Channel ch) {
514
515 int netLinkLength;
516 short addrFamily;
517 IpAddress pdPushNextHop;
518
519 if (!pdPushEnabled) {
520 return;
521 }
522
523 try {
524 // Construct list of route attributes.
525 List<RouteAttribute> attributes = new ArrayList<>();
526 if (prefix.isIp4()) {
527 if (pdPushNextHopIPv4 == null) {
528 log.info("Prefix not pushed because ipv4 next-hop is null.");
529 return;
530 }
531 pdPushNextHop = pdPushNextHopIPv4;
532 netLinkLength = Ip4Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
533 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET;
534 } else {
535 if (pdPushNextHopIPv6 == null) {
536 log.info("Prefix not pushed because ipv6 next-hop is null.");
537 return;
538 }
539 pdPushNextHop = pdPushNextHopIPv6;
540 netLinkLength = Ip6Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
541 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET6;
542 }
543
544 RouteAttributeDst raDst = new RouteAttributeDst(
545 netLinkLength,
546 RouteAttribute.RTA_DST,
547 prefix.address());
548 attributes.add(raDst);
549
550 RouteAttributeGateway raGateway = new RouteAttributeGateway(
551 netLinkLength,
552 RouteAttribute.RTA_GATEWAY,
553 pdPushNextHop);
554 attributes.add(raGateway);
555
556 // Add RtNetlink header.
557 int srcLength = 0;
558 short tos = 0;
559 short table = 0;
560 short scope = 0;
561 long rtFlags = 0;
562 int messageLength = raDst.length() + raGateway.length() +
563 RtNetlink.RT_NETLINK_LENGTH;
564
565 RtNetlink rtNetlink = new RtNetlink(
566 addrFamily,
567 prefix.prefixLength(),
568 srcLength,
569 tos,
570 table,
571 RtProtocol.ZEBRA,
572 scope,
573 FpmHeader.FPM_TYPE_NETLINK,
574 rtFlags,
575 attributes);
576
577 // Add Netlink header.
578 NetlinkMessageType nlMsgType;
579 if (isAdd) {
580 nlMsgType = NetlinkMessageType.RTM_NEWROUTE;
581 } else {
582 nlMsgType = NetlinkMessageType.RTM_DELROUTE;
583 }
584 int flags = Netlink.NETLINK_REQUEST | Netlink.NETLINK_CREATE;
585 long sequence = 0;
586 long processPortId = 0;
587 messageLength += Netlink.NETLINK_HEADER_LENGTH;
588
589 Netlink netLink = new Netlink(messageLength,
590 nlMsgType,
591 flags,
592 sequence,
593 processPortId,
594 rtNetlink);
595
596 messageLength += FpmHeader.FPM_HEADER_LENGTH;
597
598 // Add FpmHeader.
599 FpmHeader fpmMessage = new FpmHeader(
600 FpmHeader.FPM_VERSION_1,
601 FpmHeader.FPM_TYPE_NETLINK,
602 messageLength,
603 netLink);
604
605 // Encode message in a channel buffer and transmit.
606 ch.write(fpmMessage.encode());
607
608 } catch (RuntimeException e) {
609 log.info("Route not sent over fpm connection.");
610 }
611 }
612
613 private void sendRouteUpdate(boolean isAdd, IpPrefix prefix) {
614
615 for (Channel ch : allChannels) {
616 sendRouteUpdateToChannel(isAdd, prefix, ch);
617 }
618 }
619
620 public boolean isPdPushEnabled() {
621 return pdPushEnabled;
622 }
623
624 private FpmPeerInfo toFpmInfo(FpmPeer peer, Collection<FpmConnectionInfo> connections) {
625 return new FpmPeerInfo(connections,
626 fpmRoutes.getOrDefault(peer, Collections.emptyMap()).size());
627 }
628
629 @Override
630 public Map<FpmPeer, FpmPeerInfo> peers() {
631 return peers.asJavaMap().entrySet().stream()
632 .collect(Collectors.toMap(
633 e -> e.getKey(),
634 e -> toFpmInfo(e.getKey(), e.getValue())));
635 }
636
637 private class InternalFpmListener implements FpmListener {
638 @Override
639 public void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
640 FpmManager.this.fpmMessage(peer, fpmMessage);
641 }
642
643 @Override
644 public boolean peerConnected(FpmPeer peer) {
645 if (peers.keySet().contains(peer)) {
646 return false;
647 }
648
649 NodeId localNode = clusterService.getLocalNode().id();
650 peers.compute(peer, (p, infos) -> {
651 if (infos == null) {
652 infos = new HashSet<>();
653 }
654
655 infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
656 return infos;
657 });
658
659 fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
660 return true;
661 }
662
663 @Override
664 public void peerDisconnected(FpmPeer peer) {
665 log.info("FPM connection to {} went down", peer);
666
667 if (clearRoutes) {
668 clearRoutes(peer);
669 }
670
671 peers.compute(peer, (p, infos) -> {
672 if (infos == null) {
673 return null;
674 }
675
676 infos.stream()
677 .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
678 .findAny()
679 .ifPresent(i -> infos.remove(i));
680
681 if (infos.isEmpty()) {
682 return null;
683 }
684
685 return infos;
686 });
687 }
688 }
689
690 /**
691 * Adds a channel to the channel group.
692 *
693 * @param channel the channel to add
694 */
695 public void addSessionChannel(Channel channel) {
696 allChannels.add(channel);
697 }
698
699 /**
700 * Removes a channel from the channel group.
701 *
702 * @param channel the channel to remove
703 */
704 public void removeSessionChannel(Channel channel) {
705 allChannels.remove(channel);
706 }
707
708 /**
709 * Store delegate for Fpm Prefix store.
710 * Handles Fpm prefix store event.
711 */
712 class FpmPrefixStoreDelegate implements StoreDelegate<FpmPrefixStoreEvent> {
713
714 @Override
715 public void notify(FpmPrefixStoreEvent e) {
716
717 log.trace("FpmPrefixStoreEvent notify");
718
719 FpmRecord record = e.subject();
720 switch (e.type()) {
721 case ADD:
722 sendRouteUpdate(true, record.ipPrefix());
723 break;
724 case REMOVE:
725 sendRouteUpdate(false, record.ipPrefix());
726 break;
727 default:
728 log.warn("unsupported store event type", e.type());
729 return;
730 }
731 }
732 }
733}