blob: e290aea874fb47cd59042137ff401953f1273b99 [file] [log] [blame]
Kalhee Kimba366062017-11-07 16:32:09 +00001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.routing.fpm;
18
Charles Chan035ed1f2018-01-30 16:00:32 -080019import com.google.common.collect.Lists;
Kalhee Kimba366062017-11-07 16:32:09 +000020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Modified;
24import org.apache.felix.scr.annotations.Property;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.ReferencePolicy;
28import org.apache.felix.scr.annotations.Service;
29import org.jboss.netty.bootstrap.ServerBootstrap;
30import org.jboss.netty.channel.Channel;
31import org.jboss.netty.channel.ChannelException;
32import org.jboss.netty.channel.ChannelFactory;
33import org.jboss.netty.channel.ChannelPipeline;
34import org.jboss.netty.channel.ChannelPipelineFactory;
35import org.jboss.netty.channel.Channels;
36import org.jboss.netty.channel.group.ChannelGroup;
37import org.jboss.netty.channel.group.DefaultChannelGroup;
38import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
39import org.jboss.netty.handler.timeout.IdleStateHandler;
40import org.jboss.netty.util.HashedWheelTimer;
41import org.onlab.packet.IpAddress;
42import org.onlab.packet.Ip4Address;
43import org.onlab.packet.Ip6Address;
44import org.onlab.packet.IpPrefix;
45import org.onosproject.net.intf.Interface;
46import org.onosproject.net.host.InterfaceIpAddress;
47import org.onosproject.net.intf.InterfaceService;
48import org.onlab.util.KryoNamespace;
49import org.onlab.util.Tools;
50import org.onosproject.cfg.ComponentConfigService;
Charles Chan035ed1f2018-01-30 16:00:32 -080051import org.onosproject.cluster.ClusterEvent;
52import org.onosproject.cluster.ClusterEventListener;
Kalhee Kimba366062017-11-07 16:32:09 +000053import org.onosproject.cluster.ClusterService;
54import org.onosproject.cluster.NodeId;
55import org.onosproject.core.CoreService;
56import org.onosproject.core.ApplicationId;
57import org.onosproject.routeservice.Route;
58import org.onosproject.routeservice.RouteAdminService;
59import org.onosproject.routing.fpm.protocol.FpmHeader;
60import org.onosproject.routing.fpm.protocol.Netlink;
61import org.onosproject.routing.fpm.protocol.NetlinkMessageType;
62import org.onosproject.routing.fpm.protocol.RouteAttribute;
63import org.onosproject.routing.fpm.protocol.RouteAttributeDst;
64import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
65import org.onosproject.routing.fpm.protocol.RtNetlink;
66import org.onosproject.routing.fpm.protocol.RtProtocol;
67import org.onosproject.store.serializers.KryoNamespaces;
Charles Chan035ed1f2018-01-30 16:00:32 -080068import org.onosproject.store.service.AsyncDistributedLock;
Kalhee Kimba366062017-11-07 16:32:09 +000069import org.onosproject.store.service.ConsistentMap;
70import org.onosproject.store.service.Serializer;
71import org.onosproject.store.service.StorageService;
72import org.onosproject.store.StoreDelegate;
73import org.osgi.service.component.ComponentContext;
74import org.slf4j.Logger;
75import org.slf4j.LoggerFactory;
76
77import java.net.InetSocketAddress;
Charles Chan035ed1f2018-01-30 16:00:32 -080078import java.time.Duration;
Kalhee Kimba366062017-11-07 16:32:09 +000079import java.util.Collection;
80import java.util.Collections;
81import java.util.Dictionary;
82import java.util.HashSet;
83import java.util.LinkedList;
84
85import java.util.List;
86import java.util.Map;
87import java.util.Set;
88import java.util.concurrent.ConcurrentHashMap;
89import java.util.stream.Collectors;
90
91import static java.util.concurrent.Executors.newCachedThreadPool;
92import static org.onlab.util.Tools.groupedThreads;
93import org.onosproject.routing.fpm.api.FpmPrefixStoreEvent;
94import org.onosproject.routing.fpm.api.FpmPrefixStore;
95import org.onosproject.routing.fpm.api.FpmRecord;
96
97/**
98 * Forwarding Plane Manager (FPM) route source.
99 */
100@Service
101@Component(immediate = true)
102public class FpmManager implements FpmInfoService {
103 private final Logger log = LoggerFactory.getLogger(getClass());
104
105 private static final int FPM_PORT = 2620;
106 private static final String APP_NAME = "org.onosproject.fpm";
107 private static final int IDLE_TIMEOUT_SECS = 5;
Charles Chan035ed1f2018-01-30 16:00:32 -0800108 private static final String LOCK_NAME = "fpm-manager-lock";
Kalhee Kimba366062017-11-07 16:32:09 +0000109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected CoreService coreService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected ComponentConfigService componentConfigService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected RouteAdminService routeService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected ClusterService clusterService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected StorageService storageService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected InterfaceService interfaceService;
127
128 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
129 bind = "bindRipStore",
130 unbind = "unbindRipStore",
131 policy = ReferencePolicy.DYNAMIC,
132 target = "(fpm_type=RIP)")
133 protected volatile FpmPrefixStore ripStore;
134
135 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
136 bind = "bindDhcpStore",
137 unbind = "unbindDhcpStore",
138 policy = ReferencePolicy.DYNAMIC,
139 target = "(fpm_type=DHCP)")
140 protected volatile FpmPrefixStore dhcpStore;
141
142 private final StoreDelegate<FpmPrefixStoreEvent> fpmPrefixStoreDelegate
143 = new FpmPrefixStoreDelegate();
144
145 private ApplicationId appId;
146 private ServerBootstrap serverBootstrap;
147 private Channel serverChannel;
148 private ChannelGroup allChannels = new DefaultChannelGroup();
Charles Chan035ed1f2018-01-30 16:00:32 -0800149 private final InternalClusterListener clusterListener = new InternalClusterListener();
150 private AsyncDistributedLock asyncLock;
Kalhee Kimba366062017-11-07 16:32:09 +0000151
152 private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
153
154 private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
155
156 @Property(name = "clearRoutes", boolValue = true,
157 label = "Whether to clear routes when the FPM connection goes down")
158 private boolean clearRoutes = true;
159
160 @Property(name = "pdPushEnabled", boolValue = false,
161 label = "Whether to push prefixes to Quagga over fpm connection")
162 private boolean pdPushEnabled = false;
163
164 @Property(name = "pdPushNextHopIPv4", value = "",
165 label = "IPv4 next-hop address for PD Pushing.")
166 private Ip4Address pdPushNextHopIPv4 = null;
167
168 @Property(name = "pdPushNextHopIPv6", value = "",
169 label = "IPv6 next-hop address for PD Pushing.")
170 private Ip6Address pdPushNextHopIPv6 = null;
171
172 protected void bindRipStore(FpmPrefixStore store) {
173 if ((ripStore == null) && (store != null)) {
174 ripStore = store;
175 ripStore.setDelegate(fpmPrefixStoreDelegate);
176 for (Channel ch : allChannels) {
177 processRipStaticRoutes(ch);
178 }
179 }
180 }
181
182 protected void unbindRipStore(FpmPrefixStore store) {
183 if (ripStore == store) {
184 ripStore.unsetDelegate(fpmPrefixStoreDelegate);
185 ripStore = null;
186 }
187 }
188
189 protected void bindDhcpStore(FpmPrefixStore store) {
190 if ((dhcpStore == null) && (store != null)) {
191 dhcpStore = store;
192 dhcpStore.setDelegate(fpmPrefixStoreDelegate);
193 for (Channel ch : allChannels) {
194 processDhcpStaticRoutes(ch);
195 }
196 }
197 }
198
199 protected void unbindDhcpStore(FpmPrefixStore store) {
200 if (dhcpStore == store) {
201 dhcpStore.unsetDelegate(fpmPrefixStoreDelegate);
202 dhcpStore = null;
203 }
204 }
205
206 @Activate
207 protected void activate(ComponentContext context) {
208 componentConfigService.preSetProperty(
209 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
210 "distributed", "true");
211
212 componentConfigService.registerProperties(getClass());
213
214 KryoNamespace serializer = KryoNamespace.newBuilder()
215 .register(KryoNamespaces.API)
216 .register(FpmPeer.class)
217 .register(FpmConnectionInfo.class)
218 .build();
219 peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
220 .withName("fpm-connections")
221 .withSerializer(Serializer.using(serializer))
222 .build();
223
224 modified(context);
225 startServer();
226
227 appId = coreService.registerApplication(APP_NAME, peers::destroy);
228
Charles Chan035ed1f2018-01-30 16:00:32 -0800229 clusterService.addListener(clusterListener);
230 asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
231
Kalhee Kimba366062017-11-07 16:32:09 +0000232 log.info("Started");
233 }
234
235 @Deactivate
236 protected void deactivate() {
237 componentConfigService.preSetProperty(
238 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
239 "distributed", "false");
240
241 stopServer();
242 fpmRoutes.clear();
243 componentConfigService.unregisterProperties(getClass(), false);
Charles Chan035ed1f2018-01-30 16:00:32 -0800244
245 clusterService.removeListener(clusterListener);
246 asyncLock.unlock();
247
Kalhee Kimba366062017-11-07 16:32:09 +0000248 log.info("Stopped");
249 }
250
251 @Modified
252 protected void modified(ComponentContext context) {
253 Dictionary<?, ?> properties = context.getProperties();
254 if (properties == null) {
255 return;
256 }
257 String strClearRoutes = Tools.get(properties, "clearRoutes");
258 if (strClearRoutes != null) {
259 clearRoutes = Boolean.parseBoolean(strClearRoutes);
260 log.info("clearRoutes is {}", clearRoutes);
261 }
262
263 String strPdPushEnabled = Tools.get(properties, "pdPushEnabled");
264 if (strPdPushEnabled != null) {
265 boolean oldValue = pdPushEnabled;
266 pdPushEnabled = Boolean.parseBoolean(strPdPushEnabled);
267 if (pdPushEnabled) {
268
269 pdPushNextHopIPv4 = null;
270 pdPushNextHopIPv6 = null;
271
272 String strPdPushNextHopIPv4 = Tools.get(properties, "pdPushNextHopIPv4");
273 if (strPdPushNextHopIPv4 != null) {
274 pdPushNextHopIPv4 = Ip4Address.valueOf(strPdPushNextHopIPv4);
275 }
276 String strPdPushNextHopIPv6 = Tools.get(properties, "pdPushNextHopIPv6");
277 if (strPdPushNextHopIPv6 != null) {
278 pdPushNextHopIPv6 = Ip6Address.valueOf(strPdPushNextHopIPv6);
279 }
280
281 if (pdPushNextHopIPv4 == null) {
282 pdPushNextHopIPv4 = interfaceService.getInterfaces()
283 .stream()
284 .filter(iface -> iface.name().contains("RUR"))
285 .map(Interface::ipAddressesList)
286 .flatMap(Collection::stream)
287 .map(InterfaceIpAddress::ipAddress)
288 .filter(IpAddress::isIp4)
289 .map(IpAddress::getIp4Address)
290 .findFirst()
291 .orElse(null);
292 }
293
294 if (pdPushNextHopIPv6 == null) {
295 pdPushNextHopIPv6 = interfaceService.getInterfaces()
296 .stream()
297 .filter(iface -> iface.name().contains("RUR"))
298 .map(Interface::ipAddressesList)
299 .flatMap(Collection::stream)
300 .map(InterfaceIpAddress::ipAddress)
301 .filter(IpAddress::isIp6)
302 .map(IpAddress::getIp6Address)
303 .findFirst()
304 .orElse(null);
305 }
306
307 log.info("PD pushing is enabled.");
308 if (pdPushNextHopIPv4 != null) {
309 log.info("ipv4 next-hop {}", pdPushNextHopIPv4.toString());
310 } else {
311 log.info("ipv4 next-hop is null");
312 }
313 if (pdPushNextHopIPv6 != null) {
314 log.info("ipv6 next-hop={}", pdPushNextHopIPv6.toString());
315 } else {
316 log.info("ipv6 next-hop is null");
317 }
318 if (!oldValue) {
319 processStaticRoutes();
320 }
321 } else {
322 log.info("PD pushing is disabled.");
323 }
324 }
325 }
326
327 private void startServer() {
328 HashedWheelTimer timer = new HashedWheelTimer(
329 groupedThreads("onos/fpm", "fpm-timer-%d", log));
330
331 ChannelFactory channelFactory = new NioServerSocketChannelFactory(
332 newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)),
333 newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log)));
334 ChannelPipelineFactory pipelineFactory = () -> {
335 // Allocate a new session per connection
336 IdleStateHandler idleHandler =
337 new IdleStateHandler(timer, IDLE_TIMEOUT_SECS, 0, 0);
338 FpmSessionHandler fpmSessionHandler =
339 new FpmSessionHandler(this, new InternalFpmListener());
340 FpmFrameDecoder fpmFrameDecoder = new FpmFrameDecoder();
341
342 // Setup the processing pipeline
343 ChannelPipeline pipeline = Channels.pipeline();
344 pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
345 pipeline.addLast("idle", idleHandler);
346 pipeline.addLast("FpmSession", fpmSessionHandler);
347 return pipeline;
348 };
349
350 InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT);
351
352 serverBootstrap = new ServerBootstrap(channelFactory);
353 serverBootstrap.setOption("child.reuseAddr", true);
354 serverBootstrap.setOption("child.keepAlive", true);
355 serverBootstrap.setOption("child.tcpNoDelay", true);
356 serverBootstrap.setPipelineFactory(pipelineFactory);
357 try {
358 serverChannel = serverBootstrap.bind(listenAddress);
359 allChannels.add(serverChannel);
360 } catch (ChannelException e) {
361 log.debug("Exception binding to FPM port {}: ",
362 listenAddress.getPort(), e);
363 stopServer();
364 }
365 }
366
367 private void stopServer() {
368 allChannels.close().awaitUninterruptibly();
369 allChannels.clear();
370 if (serverBootstrap != null) {
371 serverBootstrap.releaseExternalResources();
372 }
373
374 if (clearRoutes) {
375 peers.keySet().forEach(this::clearRoutes);
376 }
377 }
378
Kalhee Kim40beb722018-01-16 20:32:04 +0000379 private boolean routeInDhcpStore(IpPrefix prefix) {
380
381 if (dhcpStore != null) {
382 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
383 return dhcpRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
384 }
385 return false;
386 }
387
388 private boolean routeInRipStore(IpPrefix prefix) {
389
390 if (ripStore != null) {
391 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
392 return ripRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
393 }
394 return false;
395 }
396
Kalhee Kimba366062017-11-07 16:32:09 +0000397 private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
398 if (fpmMessage.type() == FpmHeader.FPM_TYPE_KEEPALIVE) {
399 return;
400 }
401
402 Netlink netlink = fpmMessage.netlink();
403 RtNetlink rtNetlink = netlink.rtNetlink();
404
405 if (log.isTraceEnabled()) {
406 log.trace("Received FPM message: {}", fpmMessage);
407 }
408
409 if (!(rtNetlink.protocol() == RtProtocol.ZEBRA ||
410 rtNetlink.protocol() == RtProtocol.UNSPEC)) {
411 log.trace("Ignoring non-zebra route");
412 return;
413 }
414
415 IpAddress dstAddress = null;
416 IpAddress gateway = null;
417
418 for (RouteAttribute attribute : rtNetlink.attributes()) {
419 if (attribute.type() == RouteAttribute.RTA_DST) {
420 RouteAttributeDst raDst = (RouteAttributeDst) attribute;
421 dstAddress = raDst.dstAddress();
422 } else if (attribute.type() == RouteAttribute.RTA_GATEWAY) {
423 RouteAttributeGateway raGateway = (RouteAttributeGateway) attribute;
424 gateway = raGateway.gateway();
425 }
426 }
427
428 if (dstAddress == null) {
429 log.error("Dst address missing!");
430 return;
431 }
432
433 IpPrefix prefix = IpPrefix.valueOf(dstAddress, rtNetlink.dstLength());
434
Kalhee Kim40beb722018-01-16 20:32:04 +0000435 // Ignore routes that we sent.
Ray Milkeyffe1a332018-01-24 10:41:14 -0800436 if (gateway != null && ((prefix.isIp4() && (gateway.equals(pdPushNextHopIPv4))) ||
437 gateway.equals(pdPushNextHopIPv6))) {
Kalhee Kim40beb722018-01-16 20:32:04 +0000438 if (routeInDhcpStore(prefix) || routeInRipStore(prefix)) {
439 return;
440 }
441 }
442
Kalhee Kimba366062017-11-07 16:32:09 +0000443 List<Route> updates = new LinkedList<>();
444 List<Route> withdraws = new LinkedList<>();
445
446 Route route;
447 switch (netlink.type()) {
448 case RTM_NEWROUTE:
449 if (gateway == null) {
450 // We ignore interface routes with no gateway for now.
451 return;
452 }
453 route = new Route(Route.Source.FPM, prefix, gateway, clusterService.getLocalNode().id());
454
455
456 Route oldRoute = fpmRoutes.get(peer).put(prefix, route);
457
458 if (oldRoute != null) {
459 log.trace("Swapping {} with {}", oldRoute, route);
460 withdraws.add(oldRoute);
461 }
462 updates.add(route);
463 break;
464 case RTM_DELROUTE:
465 Route existing = fpmRoutes.get(peer).remove(prefix);
466 if (existing == null) {
467 log.warn("Got delete for non-existent prefix");
468 return;
469 }
470
471 route = new Route(Route.Source.FPM, prefix, existing.nextHop(), clusterService.getLocalNode().id());
472
473 withdraws.add(route);
474 break;
475 case RTM_GETROUTE:
476 default:
477 break;
478 }
479
Charles Chan035ed1f2018-01-30 16:00:32 -0800480 updateRouteStore(updates, withdraws);
481 }
482
483 private synchronized void updateRouteStore(Collection<Route> routesToAdd, Collection<Route> routesToRemove) {
484 routeService.withdraw(routesToRemove);
485 routeService.update(routesToAdd);
Kalhee Kimba366062017-11-07 16:32:09 +0000486 }
487
488 private void clearRoutes(FpmPeer peer) {
489 log.info("Clearing all routes for peer {}", peer);
490 Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
491 if (routes != null) {
Charles Chan035ed1f2018-01-30 16:00:32 -0800492 updateRouteStore(Lists.newArrayList(), routes.values());
Kalhee Kimba366062017-11-07 16:32:09 +0000493 }
494 }
495
496 public void processStaticRoutes() {
497 for (Channel ch : allChannels) {
498 processStaticRoutes(ch);
499 }
500 }
501
502 public void processStaticRoutes(Channel ch) {
503 processRipStaticRoutes(ch);
504 processDhcpStaticRoutes(ch);
505 }
506
507 private void processRipStaticRoutes(Channel ch) {
508
509 /* Get RIP static routes. */
510 if (ripStore != null) {
511 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
512 log.info("RIP store size is {}", ripRecords.size());
513
514 ripRecords.forEach(record -> sendRouteUpdateToChannel(true,
515 record.ipPrefix(), ch));
516 }
517 }
518
519 private void processDhcpStaticRoutes(Channel ch) {
520
521 /* Get Dhcp static routes. */
522 if (dhcpStore != null) {
523 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
524 log.info("Dhcp store size is {}", dhcpRecords.size());
525
526 dhcpRecords.forEach(record -> sendRouteUpdateToChannel(true,
527 record.ipPrefix(), ch));
528 }
529 }
530
531 private void sendRouteUpdateToChannel(boolean isAdd, IpPrefix prefix, Channel ch) {
532
Kalhee Kimba366062017-11-07 16:32:09 +0000533 if (!pdPushEnabled) {
534 return;
535 }
536
537 try {
Kalhee Kim715dd732018-01-23 14:39:56 +0000538 int raLength;
539 short addrFamily;
540 IpAddress pdPushNextHop;
541
542 // Build route attributes.
Kalhee Kimba366062017-11-07 16:32:09 +0000543 if (prefix.isIp4()) {
544 if (pdPushNextHopIPv4 == null) {
545 log.info("Prefix not pushed because ipv4 next-hop is null.");
546 return;
547 }
548 pdPushNextHop = pdPushNextHopIPv4;
Kalhee Kim715dd732018-01-23 14:39:56 +0000549 raLength = Ip4Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
Kalhee Kimba366062017-11-07 16:32:09 +0000550 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET;
551 } else {
552 if (pdPushNextHopIPv6 == null) {
553 log.info("Prefix not pushed because ipv6 next-hop is null.");
554 return;
555 }
556 pdPushNextHop = pdPushNextHopIPv6;
Kalhee Kim715dd732018-01-23 14:39:56 +0000557 raLength = Ip6Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
Kalhee Kimba366062017-11-07 16:32:09 +0000558 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET6;
559 }
560
Kalhee Kim715dd732018-01-23 14:39:56 +0000561 RouteAttributeDst raDst = RouteAttributeDst.builder()
562 .length(raLength)
563 .type(RouteAttribute.RTA_DST)
564 .dstAddress(prefix.address())
565 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000566
Kalhee Kim715dd732018-01-23 14:39:56 +0000567 RouteAttributeGateway raGateway = RouteAttributeGateway.builder()
568 .length(raLength)
569 .type(RouteAttribute.RTA_GATEWAY)
570 .gateway(pdPushNextHop)
571 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000572
Kalhee Kim715dd732018-01-23 14:39:56 +0000573 // Build RtNetlink.
574 RtNetlink rtNetlink = RtNetlink.builder()
575 .addressFamily(addrFamily)
576 .dstLength(prefix.prefixLength())
577 .routeAttribute(raDst)
578 .routeAttribute(raGateway)
579 .build();
580
581 // Build Netlink.
Kalhee Kimba366062017-11-07 16:32:09 +0000582 int messageLength = raDst.length() + raGateway.length() +
Kalhee Kim715dd732018-01-23 14:39:56 +0000583 RtNetlink.RT_NETLINK_LENGTH + Netlink.NETLINK_HEADER_LENGTH;
584 Netlink netLink = Netlink.builder()
585 .length(messageLength)
586 .type(isAdd ? NetlinkMessageType.RTM_NEWROUTE : NetlinkMessageType.RTM_DELROUTE)
587 .flags(Netlink.NETLINK_REQUEST | Netlink.NETLINK_CREATE)
588 .rtNetlink(rtNetlink)
589 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000590
Kalhee Kim715dd732018-01-23 14:39:56 +0000591 // Build FpmHeader.
Kalhee Kimba366062017-11-07 16:32:09 +0000592 messageLength += FpmHeader.FPM_HEADER_LENGTH;
Kalhee Kim715dd732018-01-23 14:39:56 +0000593 FpmHeader fpmMessage = FpmHeader.builder()
594 .version(FpmHeader.FPM_VERSION_1)
595 .type(FpmHeader.FPM_TYPE_NETLINK)
596 .length(messageLength)
597 .netlink(netLink)
598 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000599
600 // Encode message in a channel buffer and transmit.
601 ch.write(fpmMessage.encode());
602
603 } catch (RuntimeException e) {
604 log.info("Route not sent over fpm connection.");
605 }
606 }
607
608 private void sendRouteUpdate(boolean isAdd, IpPrefix prefix) {
609
610 for (Channel ch : allChannels) {
611 sendRouteUpdateToChannel(isAdd, prefix, ch);
612 }
613 }
614
615 public boolean isPdPushEnabled() {
616 return pdPushEnabled;
617 }
618
619 private FpmPeerInfo toFpmInfo(FpmPeer peer, Collection<FpmConnectionInfo> connections) {
620 return new FpmPeerInfo(connections,
621 fpmRoutes.getOrDefault(peer, Collections.emptyMap()).size());
622 }
623
624 @Override
625 public Map<FpmPeer, FpmPeerInfo> peers() {
626 return peers.asJavaMap().entrySet().stream()
627 .collect(Collectors.toMap(
628 e -> e.getKey(),
629 e -> toFpmInfo(e.getKey(), e.getValue())));
630 }
631
632 private class InternalFpmListener implements FpmListener {
633 @Override
634 public void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
635 FpmManager.this.fpmMessage(peer, fpmMessage);
636 }
637
638 @Override
639 public boolean peerConnected(FpmPeer peer) {
640 if (peers.keySet().contains(peer)) {
641 return false;
642 }
643
644 NodeId localNode = clusterService.getLocalNode().id();
645 peers.compute(peer, (p, infos) -> {
646 if (infos == null) {
647 infos = new HashSet<>();
648 }
649
650 infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
651 return infos;
652 });
653
654 fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
655 return true;
656 }
657
658 @Override
659 public void peerDisconnected(FpmPeer peer) {
660 log.info("FPM connection to {} went down", peer);
661
662 if (clearRoutes) {
663 clearRoutes(peer);
664 }
665
666 peers.compute(peer, (p, infos) -> {
667 if (infos == null) {
668 return null;
669 }
670
671 infos.stream()
672 .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
673 .findAny()
674 .ifPresent(i -> infos.remove(i));
675
676 if (infos.isEmpty()) {
677 return null;
678 }
679
680 return infos;
681 });
682 }
683 }
684
685 /**
686 * Adds a channel to the channel group.
687 *
688 * @param channel the channel to add
689 */
690 public void addSessionChannel(Channel channel) {
691 allChannels.add(channel);
692 }
693
694 /**
695 * Removes a channel from the channel group.
696 *
697 * @param channel the channel to remove
698 */
699 public void removeSessionChannel(Channel channel) {
700 allChannels.remove(channel);
701 }
702
703 /**
704 * Store delegate for Fpm Prefix store.
705 * Handles Fpm prefix store event.
706 */
707 class FpmPrefixStoreDelegate implements StoreDelegate<FpmPrefixStoreEvent> {
708
709 @Override
710 public void notify(FpmPrefixStoreEvent e) {
711
712 log.trace("FpmPrefixStoreEvent notify");
713
714 FpmRecord record = e.subject();
715 switch (e.type()) {
716 case ADD:
717 sendRouteUpdate(true, record.ipPrefix());
718 break;
719 case REMOVE:
720 sendRouteUpdate(false, record.ipPrefix());
721 break;
722 default:
723 log.warn("unsupported store event type", e.type());
724 return;
725 }
726 }
727 }
Charles Chan035ed1f2018-01-30 16:00:32 -0800728
729 private class InternalClusterListener implements ClusterEventListener {
730 @Override
731 public void event(ClusterEvent event) {
732 log.debug("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
733 switch (event.type()) {
734 case INSTANCE_READY:
735 // When current node is healing from a network partition,
736 // seeing INSTANCE_READY means current node has the ability to read from the cluster,
737 // but it is possible that current node still can't write to the cluster at this moment.
738 // The AsyncDistributedLock is introduced to ensure we attempt to push FPM routes
739 // after current node can write.
740 // Adding 15 seconds retry for the current node to be able to write.
741 asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
742 if (result != null && result.isPresent()) {
743 log.debug("Lock obtained. Push local FPM routes to route store");
744 // All FPM routes on current node will be pushed again even when current node is not
745 // the one that becomes READY. A better way is to do this only on the minority nodes.
746 pushFpmRoutes();
747 asyncLock.unlock();
748 } else {
749 log.debug("Fail to obtain lock. Abort.");
750 }
751 });
752 break;
753 case INSTANCE_DEACTIVATED:
754 case INSTANCE_ADDED:
755 case INSTANCE_REMOVED:
756 case INSTANCE_ACTIVATED:
757 default:
758 break;
759 }
760 }
761 }
762
763 public void pushFpmRoutes() {
764 Set<Route> routes = fpmRoutes.values().stream()
765 .map(Map::entrySet).flatMap(Set::stream).map(Map.Entry::getValue)
766 .collect(Collectors.toSet());
767 updateRouteStore(routes, Lists.newArrayList());
768 log.info("{} FPM routes have been updated to route store", routes.size());
769 }
Kalhee Kimba366062017-11-07 16:32:09 +0000770}