blob: 56d193eeb1065bf38afd26fcd0a543c44e0ce56a [file] [log] [blame]
Kalhee Kimba366062017-11-07 16:32:09 +00001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.routing.fpm;
18
Charles Chan035ed1f2018-01-30 16:00:32 -080019import com.google.common.collect.Lists;
Kalhee Kimba366062017-11-07 16:32:09 +000020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Modified;
24import org.apache.felix.scr.annotations.Property;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.ReferencePolicy;
28import org.apache.felix.scr.annotations.Service;
29import org.jboss.netty.bootstrap.ServerBootstrap;
30import org.jboss.netty.channel.Channel;
31import org.jboss.netty.channel.ChannelException;
32import org.jboss.netty.channel.ChannelFactory;
33import org.jboss.netty.channel.ChannelPipeline;
34import org.jboss.netty.channel.ChannelPipelineFactory;
35import org.jboss.netty.channel.Channels;
36import org.jboss.netty.channel.group.ChannelGroup;
37import org.jboss.netty.channel.group.DefaultChannelGroup;
38import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
39import org.jboss.netty.handler.timeout.IdleStateHandler;
40import org.jboss.netty.util.HashedWheelTimer;
41import org.onlab.packet.IpAddress;
42import org.onlab.packet.Ip4Address;
43import org.onlab.packet.Ip6Address;
44import org.onlab.packet.IpPrefix;
45import org.onosproject.net.intf.Interface;
46import org.onosproject.net.host.InterfaceIpAddress;
47import org.onosproject.net.intf.InterfaceService;
48import org.onlab.util.KryoNamespace;
49import org.onlab.util.Tools;
50import org.onosproject.cfg.ComponentConfigService;
Charles Chan035ed1f2018-01-30 16:00:32 -080051import org.onosproject.cluster.ClusterEvent;
52import org.onosproject.cluster.ClusterEventListener;
Kalhee Kimba366062017-11-07 16:32:09 +000053import org.onosproject.cluster.ClusterService;
54import org.onosproject.cluster.NodeId;
55import org.onosproject.core.CoreService;
56import org.onosproject.core.ApplicationId;
57import org.onosproject.routeservice.Route;
58import org.onosproject.routeservice.RouteAdminService;
59import org.onosproject.routing.fpm.protocol.FpmHeader;
60import org.onosproject.routing.fpm.protocol.Netlink;
61import org.onosproject.routing.fpm.protocol.NetlinkMessageType;
62import org.onosproject.routing.fpm.protocol.RouteAttribute;
63import org.onosproject.routing.fpm.protocol.RouteAttributeDst;
64import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
65import org.onosproject.routing.fpm.protocol.RtNetlink;
66import org.onosproject.routing.fpm.protocol.RtProtocol;
67import org.onosproject.store.serializers.KryoNamespaces;
Charles Chan035ed1f2018-01-30 16:00:32 -080068import org.onosproject.store.service.AsyncDistributedLock;
Kalhee Kimba366062017-11-07 16:32:09 +000069import org.onosproject.store.service.ConsistentMap;
70import org.onosproject.store.service.Serializer;
71import org.onosproject.store.service.StorageService;
72import org.onosproject.store.StoreDelegate;
73import org.osgi.service.component.ComponentContext;
74import org.slf4j.Logger;
75import org.slf4j.LoggerFactory;
76
77import java.net.InetSocketAddress;
78import java.util.ArrayList;
Charles Chan035ed1f2018-01-30 16:00:32 -080079import java.time.Duration;
Kalhee Kimba366062017-11-07 16:32:09 +000080import java.util.Collection;
81import java.util.Collections;
82import java.util.Dictionary;
83import java.util.HashSet;
84import java.util.LinkedList;
85
86import java.util.List;
87import java.util.Map;
88import java.util.Set;
89import java.util.concurrent.ConcurrentHashMap;
90import java.util.stream.Collectors;
91
92import static java.util.concurrent.Executors.newCachedThreadPool;
93import static org.onlab.util.Tools.groupedThreads;
94import org.onosproject.routing.fpm.api.FpmPrefixStoreEvent;
95import org.onosproject.routing.fpm.api.FpmPrefixStore;
96import org.onosproject.routing.fpm.api.FpmRecord;
97
98/**
99 * Forwarding Plane Manager (FPM) route source.
100 */
101@Service
102@Component(immediate = true)
103public class FpmManager implements FpmInfoService {
104 private final Logger log = LoggerFactory.getLogger(getClass());
105
106 private static final int FPM_PORT = 2620;
107 private static final String APP_NAME = "org.onosproject.fpm";
108 private static final int IDLE_TIMEOUT_SECS = 5;
Charles Chan035ed1f2018-01-30 16:00:32 -0800109 private static final String LOCK_NAME = "fpm-manager-lock";
Kalhee Kimba366062017-11-07 16:32:09 +0000110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112 protected CoreService coreService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
115 protected ComponentConfigService componentConfigService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
118 protected RouteAdminService routeService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
121 protected ClusterService clusterService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
124 protected StorageService storageService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
127 protected InterfaceService interfaceService;
128
129 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
130 bind = "bindRipStore",
131 unbind = "unbindRipStore",
132 policy = ReferencePolicy.DYNAMIC,
133 target = "(fpm_type=RIP)")
134 protected volatile FpmPrefixStore ripStore;
135
136 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
137 bind = "bindDhcpStore",
138 unbind = "unbindDhcpStore",
139 policy = ReferencePolicy.DYNAMIC,
140 target = "(fpm_type=DHCP)")
141 protected volatile FpmPrefixStore dhcpStore;
142
143 private final StoreDelegate<FpmPrefixStoreEvent> fpmPrefixStoreDelegate
144 = new FpmPrefixStoreDelegate();
145
146 private ApplicationId appId;
147 private ServerBootstrap serverBootstrap;
148 private Channel serverChannel;
149 private ChannelGroup allChannels = new DefaultChannelGroup();
Charles Chan035ed1f2018-01-30 16:00:32 -0800150 private final InternalClusterListener clusterListener = new InternalClusterListener();
151 private AsyncDistributedLock asyncLock;
Kalhee Kimba366062017-11-07 16:32:09 +0000152
153 private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
154
155 private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
156
157 @Property(name = "clearRoutes", boolValue = true,
158 label = "Whether to clear routes when the FPM connection goes down")
159 private boolean clearRoutes = true;
160
161 @Property(name = "pdPushEnabled", boolValue = false,
162 label = "Whether to push prefixes to Quagga over fpm connection")
163 private boolean pdPushEnabled = false;
164
165 @Property(name = "pdPushNextHopIPv4", value = "",
166 label = "IPv4 next-hop address for PD Pushing.")
167 private Ip4Address pdPushNextHopIPv4 = null;
168
169 @Property(name = "pdPushNextHopIPv6", value = "",
170 label = "IPv6 next-hop address for PD Pushing.")
171 private Ip6Address pdPushNextHopIPv6 = null;
172
173 protected void bindRipStore(FpmPrefixStore store) {
174 if ((ripStore == null) && (store != null)) {
175 ripStore = store;
176 ripStore.setDelegate(fpmPrefixStoreDelegate);
177 for (Channel ch : allChannels) {
178 processRipStaticRoutes(ch);
179 }
180 }
181 }
182
183 protected void unbindRipStore(FpmPrefixStore store) {
184 if (ripStore == store) {
185 ripStore.unsetDelegate(fpmPrefixStoreDelegate);
186 ripStore = null;
187 }
188 }
189
190 protected void bindDhcpStore(FpmPrefixStore store) {
191 if ((dhcpStore == null) && (store != null)) {
192 dhcpStore = store;
193 dhcpStore.setDelegate(fpmPrefixStoreDelegate);
194 for (Channel ch : allChannels) {
195 processDhcpStaticRoutes(ch);
196 }
197 }
198 }
199
200 protected void unbindDhcpStore(FpmPrefixStore store) {
201 if (dhcpStore == store) {
202 dhcpStore.unsetDelegate(fpmPrefixStoreDelegate);
203 dhcpStore = null;
204 }
205 }
206
207 @Activate
208 protected void activate(ComponentContext context) {
209 componentConfigService.preSetProperty(
210 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
211 "distributed", "true");
212
213 componentConfigService.registerProperties(getClass());
214
215 KryoNamespace serializer = KryoNamespace.newBuilder()
216 .register(KryoNamespaces.API)
217 .register(FpmPeer.class)
218 .register(FpmConnectionInfo.class)
219 .build();
220 peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
221 .withName("fpm-connections")
222 .withSerializer(Serializer.using(serializer))
223 .build();
224
225 modified(context);
226 startServer();
227
228 appId = coreService.registerApplication(APP_NAME, peers::destroy);
229
Charles Chan035ed1f2018-01-30 16:00:32 -0800230 clusterService.addListener(clusterListener);
231 asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
232
Kalhee Kimba366062017-11-07 16:32:09 +0000233 log.info("Started");
234 }
235
236 @Deactivate
237 protected void deactivate() {
238 componentConfigService.preSetProperty(
239 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
240 "distributed", "false");
241
242 stopServer();
243 fpmRoutes.clear();
244 componentConfigService.unregisterProperties(getClass(), false);
Charles Chan035ed1f2018-01-30 16:00:32 -0800245
246 clusterService.removeListener(clusterListener);
247 asyncLock.unlock();
248
Kalhee Kimba366062017-11-07 16:32:09 +0000249 log.info("Stopped");
250 }
251
252 @Modified
253 protected void modified(ComponentContext context) {
254 Dictionary<?, ?> properties = context.getProperties();
255 if (properties == null) {
256 return;
257 }
258 String strClearRoutes = Tools.get(properties, "clearRoutes");
259 if (strClearRoutes != null) {
260 clearRoutes = Boolean.parseBoolean(strClearRoutes);
261 log.info("clearRoutes is {}", clearRoutes);
262 }
263
264 String strPdPushEnabled = Tools.get(properties, "pdPushEnabled");
265 if (strPdPushEnabled != null) {
266 boolean oldValue = pdPushEnabled;
267 pdPushEnabled = Boolean.parseBoolean(strPdPushEnabled);
268 if (pdPushEnabled) {
269
270 pdPushNextHopIPv4 = null;
271 pdPushNextHopIPv6 = null;
272
273 String strPdPushNextHopIPv4 = Tools.get(properties, "pdPushNextHopIPv4");
274 if (strPdPushNextHopIPv4 != null) {
275 pdPushNextHopIPv4 = Ip4Address.valueOf(strPdPushNextHopIPv4);
276 }
277 String strPdPushNextHopIPv6 = Tools.get(properties, "pdPushNextHopIPv6");
278 if (strPdPushNextHopIPv6 != null) {
279 pdPushNextHopIPv6 = Ip6Address.valueOf(strPdPushNextHopIPv6);
280 }
281
282 if (pdPushNextHopIPv4 == null) {
283 pdPushNextHopIPv4 = interfaceService.getInterfaces()
284 .stream()
285 .filter(iface -> iface.name().contains("RUR"))
286 .map(Interface::ipAddressesList)
287 .flatMap(Collection::stream)
288 .map(InterfaceIpAddress::ipAddress)
289 .filter(IpAddress::isIp4)
290 .map(IpAddress::getIp4Address)
291 .findFirst()
292 .orElse(null);
293 }
294
295 if (pdPushNextHopIPv6 == null) {
296 pdPushNextHopIPv6 = interfaceService.getInterfaces()
297 .stream()
298 .filter(iface -> iface.name().contains("RUR"))
299 .map(Interface::ipAddressesList)
300 .flatMap(Collection::stream)
301 .map(InterfaceIpAddress::ipAddress)
302 .filter(IpAddress::isIp6)
303 .map(IpAddress::getIp6Address)
304 .findFirst()
305 .orElse(null);
306 }
307
308 log.info("PD pushing is enabled.");
309 if (pdPushNextHopIPv4 != null) {
310 log.info("ipv4 next-hop {}", pdPushNextHopIPv4.toString());
311 } else {
312 log.info("ipv4 next-hop is null");
313 }
314 if (pdPushNextHopIPv6 != null) {
315 log.info("ipv6 next-hop={}", pdPushNextHopIPv6.toString());
316 } else {
317 log.info("ipv6 next-hop is null");
318 }
319 if (!oldValue) {
320 processStaticRoutes();
321 }
322 } else {
323 log.info("PD pushing is disabled.");
324 }
325 }
326 }
327
328 private void startServer() {
329 HashedWheelTimer timer = new HashedWheelTimer(
330 groupedThreads("onos/fpm", "fpm-timer-%d", log));
331
332 ChannelFactory channelFactory = new NioServerSocketChannelFactory(
333 newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)),
334 newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log)));
335 ChannelPipelineFactory pipelineFactory = () -> {
336 // Allocate a new session per connection
337 IdleStateHandler idleHandler =
338 new IdleStateHandler(timer, IDLE_TIMEOUT_SECS, 0, 0);
339 FpmSessionHandler fpmSessionHandler =
340 new FpmSessionHandler(this, new InternalFpmListener());
341 FpmFrameDecoder fpmFrameDecoder = new FpmFrameDecoder();
342
343 // Setup the processing pipeline
344 ChannelPipeline pipeline = Channels.pipeline();
345 pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
346 pipeline.addLast("idle", idleHandler);
347 pipeline.addLast("FpmSession", fpmSessionHandler);
348 return pipeline;
349 };
350
351 InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT);
352
353 serverBootstrap = new ServerBootstrap(channelFactory);
354 serverBootstrap.setOption("child.reuseAddr", true);
355 serverBootstrap.setOption("child.keepAlive", true);
356 serverBootstrap.setOption("child.tcpNoDelay", true);
357 serverBootstrap.setPipelineFactory(pipelineFactory);
358 try {
359 serverChannel = serverBootstrap.bind(listenAddress);
360 allChannels.add(serverChannel);
361 } catch (ChannelException e) {
362 log.debug("Exception binding to FPM port {}: ",
363 listenAddress.getPort(), e);
364 stopServer();
365 }
366 }
367
368 private void stopServer() {
369 allChannels.close().awaitUninterruptibly();
370 allChannels.clear();
371 if (serverBootstrap != null) {
372 serverBootstrap.releaseExternalResources();
373 }
374
375 if (clearRoutes) {
376 peers.keySet().forEach(this::clearRoutes);
377 }
378 }
379
Kalhee Kim40beb722018-01-16 20:32:04 +0000380 private boolean routeInDhcpStore(IpPrefix prefix) {
381
382 if (dhcpStore != null) {
383 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
384 return dhcpRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
385 }
386 return false;
387 }
388
389 private boolean routeInRipStore(IpPrefix prefix) {
390
391 if (ripStore != null) {
392 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
393 return ripRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
394 }
395 return false;
396 }
397
Kalhee Kimba366062017-11-07 16:32:09 +0000398 private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
399 if (fpmMessage.type() == FpmHeader.FPM_TYPE_KEEPALIVE) {
400 return;
401 }
402
403 Netlink netlink = fpmMessage.netlink();
404 RtNetlink rtNetlink = netlink.rtNetlink();
405
406 if (log.isTraceEnabled()) {
407 log.trace("Received FPM message: {}", fpmMessage);
408 }
409
410 if (!(rtNetlink.protocol() == RtProtocol.ZEBRA ||
411 rtNetlink.protocol() == RtProtocol.UNSPEC)) {
412 log.trace("Ignoring non-zebra route");
413 return;
414 }
415
416 IpAddress dstAddress = null;
417 IpAddress gateway = null;
418
419 for (RouteAttribute attribute : rtNetlink.attributes()) {
420 if (attribute.type() == RouteAttribute.RTA_DST) {
421 RouteAttributeDst raDst = (RouteAttributeDst) attribute;
422 dstAddress = raDst.dstAddress();
423 } else if (attribute.type() == RouteAttribute.RTA_GATEWAY) {
424 RouteAttributeGateway raGateway = (RouteAttributeGateway) attribute;
425 gateway = raGateway.gateway();
426 }
427 }
428
429 if (dstAddress == null) {
430 log.error("Dst address missing!");
431 return;
432 }
433
434 IpPrefix prefix = IpPrefix.valueOf(dstAddress, rtNetlink.dstLength());
435
Kalhee Kim40beb722018-01-16 20:32:04 +0000436 // Ignore routes that we sent.
Ray Milkeyffe1a332018-01-24 10:41:14 -0800437 if (gateway != null && ((prefix.isIp4() && (gateway.equals(pdPushNextHopIPv4))) ||
438 gateway.equals(pdPushNextHopIPv6))) {
Kalhee Kim40beb722018-01-16 20:32:04 +0000439 if (routeInDhcpStore(prefix) || routeInRipStore(prefix)) {
440 return;
441 }
442 }
443
Kalhee Kimba366062017-11-07 16:32:09 +0000444 List<Route> updates = new LinkedList<>();
445 List<Route> withdraws = new LinkedList<>();
446
447 Route route;
448 switch (netlink.type()) {
449 case RTM_NEWROUTE:
450 if (gateway == null) {
451 // We ignore interface routes with no gateway for now.
452 return;
453 }
454 route = new Route(Route.Source.FPM, prefix, gateway, clusterService.getLocalNode().id());
455
456
457 Route oldRoute = fpmRoutes.get(peer).put(prefix, route);
458
459 if (oldRoute != null) {
460 log.trace("Swapping {} with {}", oldRoute, route);
461 withdraws.add(oldRoute);
462 }
463 updates.add(route);
464 break;
465 case RTM_DELROUTE:
466 Route existing = fpmRoutes.get(peer).remove(prefix);
467 if (existing == null) {
468 log.warn("Got delete for non-existent prefix");
469 return;
470 }
471
472 route = new Route(Route.Source.FPM, prefix, existing.nextHop(), clusterService.getLocalNode().id());
473
474 withdraws.add(route);
475 break;
476 case RTM_GETROUTE:
477 default:
478 break;
479 }
480
Charles Chan035ed1f2018-01-30 16:00:32 -0800481 updateRouteStore(updates, withdraws);
482 }
483
484 private synchronized void updateRouteStore(Collection<Route> routesToAdd, Collection<Route> routesToRemove) {
485 routeService.withdraw(routesToRemove);
486 routeService.update(routesToAdd);
Kalhee Kimba366062017-11-07 16:32:09 +0000487 }
488
489 private void clearRoutes(FpmPeer peer) {
490 log.info("Clearing all routes for peer {}", peer);
491 Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
492 if (routes != null) {
Charles Chan035ed1f2018-01-30 16:00:32 -0800493 updateRouteStore(Lists.newArrayList(), routes.values());
Kalhee Kimba366062017-11-07 16:32:09 +0000494 }
495 }
496
497 public void processStaticRoutes() {
498 for (Channel ch : allChannels) {
499 processStaticRoutes(ch);
500 }
501 }
502
503 public void processStaticRoutes(Channel ch) {
504 processRipStaticRoutes(ch);
505 processDhcpStaticRoutes(ch);
506 }
507
508 private void processRipStaticRoutes(Channel ch) {
509
510 /* Get RIP static routes. */
511 if (ripStore != null) {
512 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
513 log.info("RIP store size is {}", ripRecords.size());
514
515 ripRecords.forEach(record -> sendRouteUpdateToChannel(true,
516 record.ipPrefix(), ch));
517 }
518 }
519
520 private void processDhcpStaticRoutes(Channel ch) {
521
522 /* Get Dhcp static routes. */
523 if (dhcpStore != null) {
524 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
525 log.info("Dhcp store size is {}", dhcpRecords.size());
526
527 dhcpRecords.forEach(record -> sendRouteUpdateToChannel(true,
528 record.ipPrefix(), ch));
529 }
530 }
531
532 private void sendRouteUpdateToChannel(boolean isAdd, IpPrefix prefix, Channel ch) {
533
534 int netLinkLength;
535 short addrFamily;
536 IpAddress pdPushNextHop;
537
538 if (!pdPushEnabled) {
539 return;
540 }
541
542 try {
543 // Construct list of route attributes.
544 List<RouteAttribute> attributes = new ArrayList<>();
545 if (prefix.isIp4()) {
546 if (pdPushNextHopIPv4 == null) {
547 log.info("Prefix not pushed because ipv4 next-hop is null.");
548 return;
549 }
550 pdPushNextHop = pdPushNextHopIPv4;
551 netLinkLength = Ip4Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
552 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET;
553 } else {
554 if (pdPushNextHopIPv6 == null) {
555 log.info("Prefix not pushed because ipv6 next-hop is null.");
556 return;
557 }
558 pdPushNextHop = pdPushNextHopIPv6;
559 netLinkLength = Ip6Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
560 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET6;
561 }
562
563 RouteAttributeDst raDst = new RouteAttributeDst(
564 netLinkLength,
565 RouteAttribute.RTA_DST,
566 prefix.address());
567 attributes.add(raDst);
568
569 RouteAttributeGateway raGateway = new RouteAttributeGateway(
570 netLinkLength,
571 RouteAttribute.RTA_GATEWAY,
572 pdPushNextHop);
573 attributes.add(raGateway);
574
575 // Add RtNetlink header.
576 int srcLength = 0;
577 short tos = 0;
578 short table = 0;
579 short scope = 0;
580 long rtFlags = 0;
581 int messageLength = raDst.length() + raGateway.length() +
582 RtNetlink.RT_NETLINK_LENGTH;
583
584 RtNetlink rtNetlink = new RtNetlink(
585 addrFamily,
586 prefix.prefixLength(),
587 srcLength,
588 tos,
589 table,
590 RtProtocol.ZEBRA,
591 scope,
592 FpmHeader.FPM_TYPE_NETLINK,
593 rtFlags,
594 attributes);
595
596 // Add Netlink header.
597 NetlinkMessageType nlMsgType;
598 if (isAdd) {
599 nlMsgType = NetlinkMessageType.RTM_NEWROUTE;
600 } else {
601 nlMsgType = NetlinkMessageType.RTM_DELROUTE;
602 }
603 int flags = Netlink.NETLINK_REQUEST | Netlink.NETLINK_CREATE;
604 long sequence = 0;
605 long processPortId = 0;
606 messageLength += Netlink.NETLINK_HEADER_LENGTH;
607
608 Netlink netLink = new Netlink(messageLength,
609 nlMsgType,
610 flags,
611 sequence,
612 processPortId,
613 rtNetlink);
614
615 messageLength += FpmHeader.FPM_HEADER_LENGTH;
616
617 // Add FpmHeader.
618 FpmHeader fpmMessage = new FpmHeader(
619 FpmHeader.FPM_VERSION_1,
620 FpmHeader.FPM_TYPE_NETLINK,
621 messageLength,
622 netLink);
623
624 // Encode message in a channel buffer and transmit.
625 ch.write(fpmMessage.encode());
626
627 } catch (RuntimeException e) {
628 log.info("Route not sent over fpm connection.");
629 }
630 }
631
632 private void sendRouteUpdate(boolean isAdd, IpPrefix prefix) {
633
634 for (Channel ch : allChannels) {
635 sendRouteUpdateToChannel(isAdd, prefix, ch);
636 }
637 }
638
639 public boolean isPdPushEnabled() {
640 return pdPushEnabled;
641 }
642
643 private FpmPeerInfo toFpmInfo(FpmPeer peer, Collection<FpmConnectionInfo> connections) {
644 return new FpmPeerInfo(connections,
645 fpmRoutes.getOrDefault(peer, Collections.emptyMap()).size());
646 }
647
648 @Override
649 public Map<FpmPeer, FpmPeerInfo> peers() {
650 return peers.asJavaMap().entrySet().stream()
651 .collect(Collectors.toMap(
652 e -> e.getKey(),
653 e -> toFpmInfo(e.getKey(), e.getValue())));
654 }
655
656 private class InternalFpmListener implements FpmListener {
657 @Override
658 public void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
659 FpmManager.this.fpmMessage(peer, fpmMessage);
660 }
661
662 @Override
663 public boolean peerConnected(FpmPeer peer) {
664 if (peers.keySet().contains(peer)) {
665 return false;
666 }
667
668 NodeId localNode = clusterService.getLocalNode().id();
669 peers.compute(peer, (p, infos) -> {
670 if (infos == null) {
671 infos = new HashSet<>();
672 }
673
674 infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
675 return infos;
676 });
677
678 fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
679 return true;
680 }
681
682 @Override
683 public void peerDisconnected(FpmPeer peer) {
684 log.info("FPM connection to {} went down", peer);
685
686 if (clearRoutes) {
687 clearRoutes(peer);
688 }
689
690 peers.compute(peer, (p, infos) -> {
691 if (infos == null) {
692 return null;
693 }
694
695 infos.stream()
696 .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
697 .findAny()
698 .ifPresent(i -> infos.remove(i));
699
700 if (infos.isEmpty()) {
701 return null;
702 }
703
704 return infos;
705 });
706 }
707 }
708
709 /**
710 * Adds a channel to the channel group.
711 *
712 * @param channel the channel to add
713 */
714 public void addSessionChannel(Channel channel) {
715 allChannels.add(channel);
716 }
717
718 /**
719 * Removes a channel from the channel group.
720 *
721 * @param channel the channel to remove
722 */
723 public void removeSessionChannel(Channel channel) {
724 allChannels.remove(channel);
725 }
726
727 /**
728 * Store delegate for Fpm Prefix store.
729 * Handles Fpm prefix store event.
730 */
731 class FpmPrefixStoreDelegate implements StoreDelegate<FpmPrefixStoreEvent> {
732
733 @Override
734 public void notify(FpmPrefixStoreEvent e) {
735
736 log.trace("FpmPrefixStoreEvent notify");
737
738 FpmRecord record = e.subject();
739 switch (e.type()) {
740 case ADD:
741 sendRouteUpdate(true, record.ipPrefix());
742 break;
743 case REMOVE:
744 sendRouteUpdate(false, record.ipPrefix());
745 break;
746 default:
747 log.warn("unsupported store event type", e.type());
748 return;
749 }
750 }
751 }
Charles Chan035ed1f2018-01-30 16:00:32 -0800752
753 private class InternalClusterListener implements ClusterEventListener {
754 @Override
755 public void event(ClusterEvent event) {
756 log.debug("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
757 switch (event.type()) {
758 case INSTANCE_READY:
759 // When current node is healing from a network partition,
760 // seeing INSTANCE_READY means current node has the ability to read from the cluster,
761 // but it is possible that current node still can't write to the cluster at this moment.
762 // The AsyncDistributedLock is introduced to ensure we attempt to push FPM routes
763 // after current node can write.
764 // Adding 15 seconds retry for the current node to be able to write.
765 asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
766 if (result != null && result.isPresent()) {
767 log.debug("Lock obtained. Push local FPM routes to route store");
768 // All FPM routes on current node will be pushed again even when current node is not
769 // the one that becomes READY. A better way is to do this only on the minority nodes.
770 pushFpmRoutes();
771 asyncLock.unlock();
772 } else {
773 log.debug("Fail to obtain lock. Abort.");
774 }
775 });
776 break;
777 case INSTANCE_DEACTIVATED:
778 case INSTANCE_ADDED:
779 case INSTANCE_REMOVED:
780 case INSTANCE_ACTIVATED:
781 default:
782 break;
783 }
784 }
785 }
786
787 public void pushFpmRoutes() {
788 Set<Route> routes = fpmRoutes.values().stream()
789 .map(Map::entrySet).flatMap(Set::stream).map(Map.Entry::getValue)
790 .collect(Collectors.toSet());
791 updateRouteStore(routes, Lists.newArrayList());
792 log.info("{} FPM routes have been updated to route store", routes.size());
793 }
Kalhee Kimba366062017-11-07 16:32:09 +0000794}