blob: abb8ddd364cc506b245e2826d978f81ed42a7bae [file] [log] [blame]
Kalhee Kimba366062017-11-07 16:32:09 +00001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.routing.fpm;
18
Andrea Campanella4310f6e2018-03-27 16:35:39 -070019import com.google.common.collect.ImmutableMap;
Charles Chan035ed1f2018-01-30 16:00:32 -080020import com.google.common.collect.Lists;
Kalhee Kimba366062017-11-07 16:32:09 +000021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Modified;
25import org.apache.felix.scr.annotations.Property;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.ReferencePolicy;
29import org.apache.felix.scr.annotations.Service;
30import org.jboss.netty.bootstrap.ServerBootstrap;
31import org.jboss.netty.channel.Channel;
32import org.jboss.netty.channel.ChannelException;
33import org.jboss.netty.channel.ChannelFactory;
34import org.jboss.netty.channel.ChannelPipeline;
35import org.jboss.netty.channel.ChannelPipelineFactory;
36import org.jboss.netty.channel.Channels;
37import org.jboss.netty.channel.group.ChannelGroup;
38import org.jboss.netty.channel.group.DefaultChannelGroup;
39import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
40import org.jboss.netty.handler.timeout.IdleStateHandler;
41import org.jboss.netty.util.HashedWheelTimer;
Kalhee Kimba366062017-11-07 16:32:09 +000042import org.onlab.packet.Ip4Address;
43import org.onlab.packet.Ip6Address;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070044import org.onlab.packet.IpAddress;
Kalhee Kimba366062017-11-07 16:32:09 +000045import org.onlab.packet.IpPrefix;
Kalhee Kimba366062017-11-07 16:32:09 +000046import org.onlab.util.KryoNamespace;
47import org.onlab.util.Tools;
48import org.onosproject.cfg.ComponentConfigService;
Charles Chan035ed1f2018-01-30 16:00:32 -080049import org.onosproject.cluster.ClusterEvent;
50import org.onosproject.cluster.ClusterEventListener;
Kalhee Kimba366062017-11-07 16:32:09 +000051import org.onosproject.cluster.ClusterService;
52import org.onosproject.cluster.NodeId;
Kalhee Kimba366062017-11-07 16:32:09 +000053import org.onosproject.core.ApplicationId;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070054import org.onosproject.core.CoreService;
55import org.onosproject.net.host.InterfaceIpAddress;
56import org.onosproject.net.intf.Interface;
57import org.onosproject.net.intf.InterfaceService;
Kalhee Kimba366062017-11-07 16:32:09 +000058import org.onosproject.routeservice.Route;
59import org.onosproject.routeservice.RouteAdminService;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070060import org.onosproject.routing.fpm.api.FpmPrefixStore;
61import org.onosproject.routing.fpm.api.FpmPrefixStoreEvent;
62import org.onosproject.routing.fpm.api.FpmRecord;
Kalhee Kimba366062017-11-07 16:32:09 +000063import org.onosproject.routing.fpm.protocol.FpmHeader;
64import org.onosproject.routing.fpm.protocol.Netlink;
65import org.onosproject.routing.fpm.protocol.NetlinkMessageType;
66import org.onosproject.routing.fpm.protocol.RouteAttribute;
67import org.onosproject.routing.fpm.protocol.RouteAttributeDst;
68import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
69import org.onosproject.routing.fpm.protocol.RtNetlink;
70import org.onosproject.routing.fpm.protocol.RtProtocol;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070071import org.onosproject.store.StoreDelegate;
Kalhee Kimba366062017-11-07 16:32:09 +000072import org.onosproject.store.serializers.KryoNamespaces;
Charles Chan035ed1f2018-01-30 16:00:32 -080073import org.onosproject.store.service.AsyncDistributedLock;
Kalhee Kimba366062017-11-07 16:32:09 +000074import org.onosproject.store.service.ConsistentMap;
75import org.onosproject.store.service.Serializer;
76import org.onosproject.store.service.StorageService;
Kalhee Kimba366062017-11-07 16:32:09 +000077import org.osgi.service.component.ComponentContext;
78import org.slf4j.Logger;
79import org.slf4j.LoggerFactory;
80
81import java.net.InetSocketAddress;
Charles Chan035ed1f2018-01-30 16:00:32 -080082import java.time.Duration;
Kalhee Kimba366062017-11-07 16:32:09 +000083import java.util.Collection;
84import java.util.Collections;
85import java.util.Dictionary;
86import java.util.HashSet;
87import java.util.LinkedList;
Kalhee Kimba366062017-11-07 16:32:09 +000088import java.util.List;
89import java.util.Map;
90import java.util.Set;
91import java.util.concurrent.ConcurrentHashMap;
92import java.util.stream.Collectors;
93
94import static java.util.concurrent.Executors.newCachedThreadPool;
95import static org.onlab.util.Tools.groupedThreads;
Kalhee Kimba366062017-11-07 16:32:09 +000096
97/**
98 * Forwarding Plane Manager (FPM) route source.
99 */
100@Service
101@Component(immediate = true)
102public class FpmManager implements FpmInfoService {
103 private final Logger log = LoggerFactory.getLogger(getClass());
104
105 private static final int FPM_PORT = 2620;
106 private static final String APP_NAME = "org.onosproject.fpm";
107 private static final int IDLE_TIMEOUT_SECS = 5;
Charles Chan035ed1f2018-01-30 16:00:32 -0800108 private static final String LOCK_NAME = "fpm-manager-lock";
Kalhee Kimba366062017-11-07 16:32:09 +0000109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected CoreService coreService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected ComponentConfigService componentConfigService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected RouteAdminService routeService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected ClusterService clusterService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected StorageService storageService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected InterfaceService interfaceService;
127
128 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
129 bind = "bindRipStore",
130 unbind = "unbindRipStore",
131 policy = ReferencePolicy.DYNAMIC,
132 target = "(fpm_type=RIP)")
133 protected volatile FpmPrefixStore ripStore;
134
135 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
136 bind = "bindDhcpStore",
137 unbind = "unbindDhcpStore",
138 policy = ReferencePolicy.DYNAMIC,
139 target = "(fpm_type=DHCP)")
140 protected volatile FpmPrefixStore dhcpStore;
141
142 private final StoreDelegate<FpmPrefixStoreEvent> fpmPrefixStoreDelegate
143 = new FpmPrefixStoreDelegate();
144
145 private ApplicationId appId;
146 private ServerBootstrap serverBootstrap;
147 private Channel serverChannel;
148 private ChannelGroup allChannels = new DefaultChannelGroup();
Charles Chan035ed1f2018-01-30 16:00:32 -0800149 private final InternalClusterListener clusterListener = new InternalClusterListener();
150 private AsyncDistributedLock asyncLock;
Kalhee Kimba366062017-11-07 16:32:09 +0000151
152 private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
153
154 private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
155
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700156 //Local cache for peers to be used in case of cluster partition.
157 private Map<FpmPeer, Set<FpmConnectionInfo>> localPeers = new ConcurrentHashMap<>();
158
Kalhee Kimba366062017-11-07 16:32:09 +0000159 @Property(name = "clearRoutes", boolValue = true,
160 label = "Whether to clear routes when the FPM connection goes down")
161 private boolean clearRoutes = true;
162
163 @Property(name = "pdPushEnabled", boolValue = false,
164 label = "Whether to push prefixes to Quagga over fpm connection")
165 private boolean pdPushEnabled = false;
166
167 @Property(name = "pdPushNextHopIPv4", value = "",
168 label = "IPv4 next-hop address for PD Pushing.")
169 private Ip4Address pdPushNextHopIPv4 = null;
170
171 @Property(name = "pdPushNextHopIPv6", value = "",
172 label = "IPv6 next-hop address for PD Pushing.")
173 private Ip6Address pdPushNextHopIPv6 = null;
174
175 protected void bindRipStore(FpmPrefixStore store) {
176 if ((ripStore == null) && (store != null)) {
177 ripStore = store;
178 ripStore.setDelegate(fpmPrefixStoreDelegate);
179 for (Channel ch : allChannels) {
180 processRipStaticRoutes(ch);
181 }
182 }
183 }
184
185 protected void unbindRipStore(FpmPrefixStore store) {
186 if (ripStore == store) {
187 ripStore.unsetDelegate(fpmPrefixStoreDelegate);
188 ripStore = null;
189 }
190 }
191
192 protected void bindDhcpStore(FpmPrefixStore store) {
193 if ((dhcpStore == null) && (store != null)) {
194 dhcpStore = store;
195 dhcpStore.setDelegate(fpmPrefixStoreDelegate);
196 for (Channel ch : allChannels) {
197 processDhcpStaticRoutes(ch);
198 }
199 }
200 }
201
202 protected void unbindDhcpStore(FpmPrefixStore store) {
203 if (dhcpStore == store) {
204 dhcpStore.unsetDelegate(fpmPrefixStoreDelegate);
205 dhcpStore = null;
206 }
207 }
208
209 @Activate
210 protected void activate(ComponentContext context) {
211 componentConfigService.preSetProperty(
212 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
213 "distributed", "true");
214
215 componentConfigService.registerProperties(getClass());
216
217 KryoNamespace serializer = KryoNamespace.newBuilder()
218 .register(KryoNamespaces.API)
219 .register(FpmPeer.class)
220 .register(FpmConnectionInfo.class)
221 .build();
222 peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
223 .withName("fpm-connections")
224 .withSerializer(Serializer.using(serializer))
225 .build();
226
227 modified(context);
228 startServer();
229
230 appId = coreService.registerApplication(APP_NAME, peers::destroy);
231
Charles Chan035ed1f2018-01-30 16:00:32 -0800232 asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
Saurav Dase7f51012018-02-09 17:26:45 -0800233 clusterService.addListener(clusterListener);
Charles Chan035ed1f2018-01-30 16:00:32 -0800234
Kalhee Kimba366062017-11-07 16:32:09 +0000235 log.info("Started");
236 }
237
238 @Deactivate
239 protected void deactivate() {
240 componentConfigService.preSetProperty(
241 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
242 "distributed", "false");
243
244 stopServer();
245 fpmRoutes.clear();
246 componentConfigService.unregisterProperties(getClass(), false);
Charles Chan035ed1f2018-01-30 16:00:32 -0800247
248 clusterService.removeListener(clusterListener);
249 asyncLock.unlock();
250
Kalhee Kimba366062017-11-07 16:32:09 +0000251 log.info("Stopped");
252 }
253
254 @Modified
255 protected void modified(ComponentContext context) {
256 Dictionary<?, ?> properties = context.getProperties();
257 if (properties == null) {
258 return;
259 }
260 String strClearRoutes = Tools.get(properties, "clearRoutes");
261 if (strClearRoutes != null) {
262 clearRoutes = Boolean.parseBoolean(strClearRoutes);
263 log.info("clearRoutes is {}", clearRoutes);
264 }
265
266 String strPdPushEnabled = Tools.get(properties, "pdPushEnabled");
267 if (strPdPushEnabled != null) {
268 boolean oldValue = pdPushEnabled;
269 pdPushEnabled = Boolean.parseBoolean(strPdPushEnabled);
270 if (pdPushEnabled) {
271
272 pdPushNextHopIPv4 = null;
273 pdPushNextHopIPv6 = null;
274
275 String strPdPushNextHopIPv4 = Tools.get(properties, "pdPushNextHopIPv4");
276 if (strPdPushNextHopIPv4 != null) {
277 pdPushNextHopIPv4 = Ip4Address.valueOf(strPdPushNextHopIPv4);
278 }
279 String strPdPushNextHopIPv6 = Tools.get(properties, "pdPushNextHopIPv6");
280 if (strPdPushNextHopIPv6 != null) {
281 pdPushNextHopIPv6 = Ip6Address.valueOf(strPdPushNextHopIPv6);
282 }
283
284 if (pdPushNextHopIPv4 == null) {
285 pdPushNextHopIPv4 = interfaceService.getInterfaces()
286 .stream()
287 .filter(iface -> iface.name().contains("RUR"))
288 .map(Interface::ipAddressesList)
289 .flatMap(Collection::stream)
290 .map(InterfaceIpAddress::ipAddress)
291 .filter(IpAddress::isIp4)
292 .map(IpAddress::getIp4Address)
293 .findFirst()
294 .orElse(null);
295 }
296
297 if (pdPushNextHopIPv6 == null) {
298 pdPushNextHopIPv6 = interfaceService.getInterfaces()
299 .stream()
300 .filter(iface -> iface.name().contains("RUR"))
301 .map(Interface::ipAddressesList)
302 .flatMap(Collection::stream)
303 .map(InterfaceIpAddress::ipAddress)
304 .filter(IpAddress::isIp6)
305 .map(IpAddress::getIp6Address)
306 .findFirst()
307 .orElse(null);
308 }
309
310 log.info("PD pushing is enabled.");
311 if (pdPushNextHopIPv4 != null) {
312 log.info("ipv4 next-hop {}", pdPushNextHopIPv4.toString());
313 } else {
314 log.info("ipv4 next-hop is null");
315 }
316 if (pdPushNextHopIPv6 != null) {
317 log.info("ipv6 next-hop={}", pdPushNextHopIPv6.toString());
318 } else {
319 log.info("ipv6 next-hop is null");
320 }
321 if (!oldValue) {
322 processStaticRoutes();
323 }
324 } else {
325 log.info("PD pushing is disabled.");
326 }
327 }
328 }
329
330 private void startServer() {
331 HashedWheelTimer timer = new HashedWheelTimer(
332 groupedThreads("onos/fpm", "fpm-timer-%d", log));
333
334 ChannelFactory channelFactory = new NioServerSocketChannelFactory(
335 newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)),
336 newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log)));
337 ChannelPipelineFactory pipelineFactory = () -> {
338 // Allocate a new session per connection
339 IdleStateHandler idleHandler =
340 new IdleStateHandler(timer, IDLE_TIMEOUT_SECS, 0, 0);
341 FpmSessionHandler fpmSessionHandler =
342 new FpmSessionHandler(this, new InternalFpmListener());
343 FpmFrameDecoder fpmFrameDecoder = new FpmFrameDecoder();
344
345 // Setup the processing pipeline
346 ChannelPipeline pipeline = Channels.pipeline();
347 pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
348 pipeline.addLast("idle", idleHandler);
349 pipeline.addLast("FpmSession", fpmSessionHandler);
350 return pipeline;
351 };
352
353 InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT);
354
355 serverBootstrap = new ServerBootstrap(channelFactory);
356 serverBootstrap.setOption("child.reuseAddr", true);
357 serverBootstrap.setOption("child.keepAlive", true);
358 serverBootstrap.setOption("child.tcpNoDelay", true);
359 serverBootstrap.setPipelineFactory(pipelineFactory);
360 try {
361 serverChannel = serverBootstrap.bind(listenAddress);
362 allChannels.add(serverChannel);
363 } catch (ChannelException e) {
364 log.debug("Exception binding to FPM port {}: ",
365 listenAddress.getPort(), e);
366 stopServer();
367 }
368 }
369
370 private void stopServer() {
371 allChannels.close().awaitUninterruptibly();
372 allChannels.clear();
373 if (serverBootstrap != null) {
374 serverBootstrap.releaseExternalResources();
375 }
376
377 if (clearRoutes) {
378 peers.keySet().forEach(this::clearRoutes);
379 }
380 }
381
Kalhee Kim40beb722018-01-16 20:32:04 +0000382 private boolean routeInDhcpStore(IpPrefix prefix) {
383
384 if (dhcpStore != null) {
385 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
386 return dhcpRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
387 }
388 return false;
389 }
390
391 private boolean routeInRipStore(IpPrefix prefix) {
392
393 if (ripStore != null) {
394 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
395 return ripRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
396 }
397 return false;
398 }
399
Kalhee Kimba366062017-11-07 16:32:09 +0000400 private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
401 if (fpmMessage.type() == FpmHeader.FPM_TYPE_KEEPALIVE) {
402 return;
403 }
404
405 Netlink netlink = fpmMessage.netlink();
406 RtNetlink rtNetlink = netlink.rtNetlink();
407
408 if (log.isTraceEnabled()) {
409 log.trace("Received FPM message: {}", fpmMessage);
410 }
411
412 if (!(rtNetlink.protocol() == RtProtocol.ZEBRA ||
413 rtNetlink.protocol() == RtProtocol.UNSPEC)) {
414 log.trace("Ignoring non-zebra route");
415 return;
416 }
417
418 IpAddress dstAddress = null;
419 IpAddress gateway = null;
420
421 for (RouteAttribute attribute : rtNetlink.attributes()) {
422 if (attribute.type() == RouteAttribute.RTA_DST) {
423 RouteAttributeDst raDst = (RouteAttributeDst) attribute;
424 dstAddress = raDst.dstAddress();
425 } else if (attribute.type() == RouteAttribute.RTA_GATEWAY) {
426 RouteAttributeGateway raGateway = (RouteAttributeGateway) attribute;
427 gateway = raGateway.gateway();
428 }
429 }
430
431 if (dstAddress == null) {
432 log.error("Dst address missing!");
433 return;
434 }
435
436 IpPrefix prefix = IpPrefix.valueOf(dstAddress, rtNetlink.dstLength());
437
Kalhee Kim40beb722018-01-16 20:32:04 +0000438 // Ignore routes that we sent.
Ray Milkeyffe1a332018-01-24 10:41:14 -0800439 if (gateway != null && ((prefix.isIp4() && (gateway.equals(pdPushNextHopIPv4))) ||
440 gateway.equals(pdPushNextHopIPv6))) {
Kalhee Kim40beb722018-01-16 20:32:04 +0000441 if (routeInDhcpStore(prefix) || routeInRipStore(prefix)) {
442 return;
443 }
444 }
445
Kalhee Kimba366062017-11-07 16:32:09 +0000446 List<Route> updates = new LinkedList<>();
447 List<Route> withdraws = new LinkedList<>();
448
449 Route route;
450 switch (netlink.type()) {
451 case RTM_NEWROUTE:
452 if (gateway == null) {
453 // We ignore interface routes with no gateway for now.
454 return;
455 }
456 route = new Route(Route.Source.FPM, prefix, gateway, clusterService.getLocalNode().id());
457
458
459 Route oldRoute = fpmRoutes.get(peer).put(prefix, route);
460
461 if (oldRoute != null) {
462 log.trace("Swapping {} with {}", oldRoute, route);
463 withdraws.add(oldRoute);
464 }
465 updates.add(route);
466 break;
467 case RTM_DELROUTE:
468 Route existing = fpmRoutes.get(peer).remove(prefix);
469 if (existing == null) {
470 log.warn("Got delete for non-existent prefix");
471 return;
472 }
473
474 route = new Route(Route.Source.FPM, prefix, existing.nextHop(), clusterService.getLocalNode().id());
475
476 withdraws.add(route);
477 break;
478 case RTM_GETROUTE:
479 default:
480 break;
481 }
482
Charles Chan035ed1f2018-01-30 16:00:32 -0800483 updateRouteStore(updates, withdraws);
484 }
485
486 private synchronized void updateRouteStore(Collection<Route> routesToAdd, Collection<Route> routesToRemove) {
487 routeService.withdraw(routesToRemove);
488 routeService.update(routesToAdd);
Kalhee Kimba366062017-11-07 16:32:09 +0000489 }
490
491 private void clearRoutes(FpmPeer peer) {
492 log.info("Clearing all routes for peer {}", peer);
493 Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
494 if (routes != null) {
Charles Chan035ed1f2018-01-30 16:00:32 -0800495 updateRouteStore(Lists.newArrayList(), routes.values());
Kalhee Kimba366062017-11-07 16:32:09 +0000496 }
497 }
498
499 public void processStaticRoutes() {
500 for (Channel ch : allChannels) {
501 processStaticRoutes(ch);
502 }
503 }
504
505 public void processStaticRoutes(Channel ch) {
506 processRipStaticRoutes(ch);
507 processDhcpStaticRoutes(ch);
508 }
509
510 private void processRipStaticRoutes(Channel ch) {
511
512 /* Get RIP static routes. */
513 if (ripStore != null) {
514 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
515 log.info("RIP store size is {}", ripRecords.size());
516
517 ripRecords.forEach(record -> sendRouteUpdateToChannel(true,
518 record.ipPrefix(), ch));
519 }
520 }
521
522 private void processDhcpStaticRoutes(Channel ch) {
523
524 /* Get Dhcp static routes. */
525 if (dhcpStore != null) {
526 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
527 log.info("Dhcp store size is {}", dhcpRecords.size());
528
529 dhcpRecords.forEach(record -> sendRouteUpdateToChannel(true,
530 record.ipPrefix(), ch));
531 }
532 }
533
534 private void sendRouteUpdateToChannel(boolean isAdd, IpPrefix prefix, Channel ch) {
535
Kalhee Kimba366062017-11-07 16:32:09 +0000536 if (!pdPushEnabled) {
537 return;
538 }
539
540 try {
Kalhee Kim715dd732018-01-23 14:39:56 +0000541 int raLength;
542 short addrFamily;
543 IpAddress pdPushNextHop;
544
545 // Build route attributes.
Kalhee Kimba366062017-11-07 16:32:09 +0000546 if (prefix.isIp4()) {
547 if (pdPushNextHopIPv4 == null) {
548 log.info("Prefix not pushed because ipv4 next-hop is null.");
549 return;
550 }
551 pdPushNextHop = pdPushNextHopIPv4;
Kalhee Kim715dd732018-01-23 14:39:56 +0000552 raLength = Ip4Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
Kalhee Kimba366062017-11-07 16:32:09 +0000553 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET;
554 } else {
555 if (pdPushNextHopIPv6 == null) {
556 log.info("Prefix not pushed because ipv6 next-hop is null.");
557 return;
558 }
559 pdPushNextHop = pdPushNextHopIPv6;
Kalhee Kim715dd732018-01-23 14:39:56 +0000560 raLength = Ip6Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
Kalhee Kimba366062017-11-07 16:32:09 +0000561 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET6;
562 }
563
Kalhee Kim715dd732018-01-23 14:39:56 +0000564 RouteAttributeDst raDst = RouteAttributeDst.builder()
565 .length(raLength)
566 .type(RouteAttribute.RTA_DST)
567 .dstAddress(prefix.address())
568 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000569
Kalhee Kim715dd732018-01-23 14:39:56 +0000570 RouteAttributeGateway raGateway = RouteAttributeGateway.builder()
571 .length(raLength)
572 .type(RouteAttribute.RTA_GATEWAY)
573 .gateway(pdPushNextHop)
574 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000575
Kalhee Kim715dd732018-01-23 14:39:56 +0000576 // Build RtNetlink.
577 RtNetlink rtNetlink = RtNetlink.builder()
578 .addressFamily(addrFamily)
579 .dstLength(prefix.prefixLength())
580 .routeAttribute(raDst)
581 .routeAttribute(raGateway)
582 .build();
583
584 // Build Netlink.
Kalhee Kimba366062017-11-07 16:32:09 +0000585 int messageLength = raDst.length() + raGateway.length() +
Kalhee Kim715dd732018-01-23 14:39:56 +0000586 RtNetlink.RT_NETLINK_LENGTH + Netlink.NETLINK_HEADER_LENGTH;
587 Netlink netLink = Netlink.builder()
588 .length(messageLength)
589 .type(isAdd ? NetlinkMessageType.RTM_NEWROUTE : NetlinkMessageType.RTM_DELROUTE)
590 .flags(Netlink.NETLINK_REQUEST | Netlink.NETLINK_CREATE)
591 .rtNetlink(rtNetlink)
592 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000593
Kalhee Kim715dd732018-01-23 14:39:56 +0000594 // Build FpmHeader.
Kalhee Kimba366062017-11-07 16:32:09 +0000595 messageLength += FpmHeader.FPM_HEADER_LENGTH;
Kalhee Kim715dd732018-01-23 14:39:56 +0000596 FpmHeader fpmMessage = FpmHeader.builder()
597 .version(FpmHeader.FPM_VERSION_1)
598 .type(FpmHeader.FPM_TYPE_NETLINK)
599 .length(messageLength)
600 .netlink(netLink)
601 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000602
603 // Encode message in a channel buffer and transmit.
604 ch.write(fpmMessage.encode());
605
606 } catch (RuntimeException e) {
607 log.info("Route not sent over fpm connection.");
608 }
609 }
610
611 private void sendRouteUpdate(boolean isAdd, IpPrefix prefix) {
612
613 for (Channel ch : allChannels) {
614 sendRouteUpdateToChannel(isAdd, prefix, ch);
615 }
616 }
617
618 public boolean isPdPushEnabled() {
619 return pdPushEnabled;
620 }
621
622 private FpmPeerInfo toFpmInfo(FpmPeer peer, Collection<FpmConnectionInfo> connections) {
623 return new FpmPeerInfo(connections,
624 fpmRoutes.getOrDefault(peer, Collections.emptyMap()).size());
625 }
626
627 @Override
628 public Map<FpmPeer, FpmPeerInfo> peers() {
629 return peers.asJavaMap().entrySet().stream()
630 .collect(Collectors.toMap(
631 e -> e.getKey(),
632 e -> toFpmInfo(e.getKey(), e.getValue())));
633 }
634
635 private class InternalFpmListener implements FpmListener {
636 @Override
637 public void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
638 FpmManager.this.fpmMessage(peer, fpmMessage);
639 }
640
641 @Override
642 public boolean peerConnected(FpmPeer peer) {
643 if (peers.keySet().contains(peer)) {
644 return false;
645 }
646
647 NodeId localNode = clusterService.getLocalNode().id();
648 peers.compute(peer, (p, infos) -> {
649 if (infos == null) {
650 infos = new HashSet<>();
651 }
652
653 infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700654 localPeers.put(peer, infos);
Kalhee Kimba366062017-11-07 16:32:09 +0000655 return infos;
656 });
657
658 fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
659 return true;
660 }
661
662 @Override
663 public void peerDisconnected(FpmPeer peer) {
664 log.info("FPM connection to {} went down", peer);
665
666 if (clearRoutes) {
667 clearRoutes(peer);
668 }
669
670 peers.compute(peer, (p, infos) -> {
671 if (infos == null) {
672 return null;
673 }
674
675 infos.stream()
676 .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
677 .findAny()
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700678 .ifPresent(i -> {
679 infos.remove(i);
680 localPeers.get(peer).remove(i);
681 });
Kalhee Kimba366062017-11-07 16:32:09 +0000682
683 if (infos.isEmpty()) {
684 return null;
685 }
686
687 return infos;
688 });
689 }
690 }
691
692 /**
693 * Adds a channel to the channel group.
694 *
695 * @param channel the channel to add
696 */
697 public void addSessionChannel(Channel channel) {
698 allChannels.add(channel);
699 }
700
701 /**
702 * Removes a channel from the channel group.
703 *
704 * @param channel the channel to remove
705 */
706 public void removeSessionChannel(Channel channel) {
707 allChannels.remove(channel);
708 }
709
710 /**
711 * Store delegate for Fpm Prefix store.
712 * Handles Fpm prefix store event.
713 */
714 class FpmPrefixStoreDelegate implements StoreDelegate<FpmPrefixStoreEvent> {
715
716 @Override
717 public void notify(FpmPrefixStoreEvent e) {
718
719 log.trace("FpmPrefixStoreEvent notify");
720
721 FpmRecord record = e.subject();
722 switch (e.type()) {
723 case ADD:
724 sendRouteUpdate(true, record.ipPrefix());
725 break;
726 case REMOVE:
727 sendRouteUpdate(false, record.ipPrefix());
728 break;
729 default:
730 log.warn("unsupported store event type", e.type());
731 return;
732 }
733 }
734 }
Charles Chan035ed1f2018-01-30 16:00:32 -0800735
736 private class InternalClusterListener implements ClusterEventListener {
737 @Override
738 public void event(ClusterEvent event) {
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700739 log.info("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
Charles Chan035ed1f2018-01-30 16:00:32 -0800740 switch (event.type()) {
741 case INSTANCE_READY:
742 // When current node is healing from a network partition,
743 // seeing INSTANCE_READY means current node has the ability to read from the cluster,
744 // but it is possible that current node still can't write to the cluster at this moment.
745 // The AsyncDistributedLock is introduced to ensure we attempt to push FPM routes
746 // after current node can write.
747 // Adding 15 seconds retry for the current node to be able to write.
748 asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
749 if (result != null && result.isPresent()) {
750 log.debug("Lock obtained. Push local FPM routes to route store");
751 // All FPM routes on current node will be pushed again even when current node is not
752 // the one that becomes READY. A better way is to do this only on the minority nodes.
753 pushFpmRoutes();
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700754 localPeers.forEach((key, value) -> peers.put(key, value));
Charles Chan035ed1f2018-01-30 16:00:32 -0800755 asyncLock.unlock();
756 } else {
757 log.debug("Fail to obtain lock. Abort.");
758 }
759 });
760 break;
761 case INSTANCE_DEACTIVATED:
Charles Chan035ed1f2018-01-30 16:00:32 -0800762 case INSTANCE_REMOVED:
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700763 ImmutableMap.copyOf(peers.asJavaMap()).forEach((key, value) -> {
764 if (value != null) {
765 value.stream()
766 .filter(i -> i.connectedTo().equals(event.subject().id()))
767 .findAny()
768 .ifPresent(value::remove);
769
770 if (value.isEmpty()) {
771 peers.remove(key);
772 }
773 }
774 });
775 break;
776 case INSTANCE_ADDED:
Charles Chan035ed1f2018-01-30 16:00:32 -0800777 case INSTANCE_ACTIVATED:
778 default:
779 break;
780 }
781 }
782 }
783
Saurav Dase7f51012018-02-09 17:26:45 -0800784 @Override
Charles Chan035ed1f2018-01-30 16:00:32 -0800785 public void pushFpmRoutes() {
786 Set<Route> routes = fpmRoutes.values().stream()
787 .map(Map::entrySet).flatMap(Set::stream).map(Map.Entry::getValue)
788 .collect(Collectors.toSet());
789 updateRouteStore(routes, Lists.newArrayList());
790 log.info("{} FPM routes have been updated to route store", routes.size());
791 }
Kalhee Kimba366062017-11-07 16:32:09 +0000792}