blob: 42ce4e2e791c2e597c64febdf529cc0d55477116 [file] [log] [blame]
Kalhee Kimba366062017-11-07 16:32:09 +00001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.routing.fpm;
18
Andrea Campanella4310f6e2018-03-27 16:35:39 -070019import com.google.common.collect.ImmutableMap;
Charles Chan035ed1f2018-01-30 16:00:32 -080020import com.google.common.collect.Lists;
Kalhee Kimba366062017-11-07 16:32:09 +000021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Modified;
25import org.apache.felix.scr.annotations.Property;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.ReferencePolicy;
29import org.apache.felix.scr.annotations.Service;
30import org.jboss.netty.bootstrap.ServerBootstrap;
31import org.jboss.netty.channel.Channel;
32import org.jboss.netty.channel.ChannelException;
33import org.jboss.netty.channel.ChannelFactory;
34import org.jboss.netty.channel.ChannelPipeline;
35import org.jboss.netty.channel.ChannelPipelineFactory;
36import org.jboss.netty.channel.Channels;
37import org.jboss.netty.channel.group.ChannelGroup;
38import org.jboss.netty.channel.group.DefaultChannelGroup;
39import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
40import org.jboss.netty.handler.timeout.IdleStateHandler;
41import org.jboss.netty.util.HashedWheelTimer;
Kalhee Kimba366062017-11-07 16:32:09 +000042import org.onlab.packet.Ip4Address;
43import org.onlab.packet.Ip6Address;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070044import org.onlab.packet.IpAddress;
Kalhee Kimba366062017-11-07 16:32:09 +000045import org.onlab.packet.IpPrefix;
Kalhee Kimba366062017-11-07 16:32:09 +000046import org.onlab.util.KryoNamespace;
47import org.onlab.util.Tools;
48import org.onosproject.cfg.ComponentConfigService;
Charles Chan035ed1f2018-01-30 16:00:32 -080049import org.onosproject.cluster.ClusterEvent;
50import org.onosproject.cluster.ClusterEventListener;
Kalhee Kimba366062017-11-07 16:32:09 +000051import org.onosproject.cluster.ClusterService;
52import org.onosproject.cluster.NodeId;
Kalhee Kimba366062017-11-07 16:32:09 +000053import org.onosproject.core.ApplicationId;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070054import org.onosproject.core.CoreService;
55import org.onosproject.net.host.InterfaceIpAddress;
56import org.onosproject.net.intf.Interface;
57import org.onosproject.net.intf.InterfaceService;
Kalhee Kimba366062017-11-07 16:32:09 +000058import org.onosproject.routeservice.Route;
59import org.onosproject.routeservice.RouteAdminService;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070060import org.onosproject.routing.fpm.api.FpmPrefixStore;
61import org.onosproject.routing.fpm.api.FpmPrefixStoreEvent;
62import org.onosproject.routing.fpm.api.FpmRecord;
Kalhee Kimba366062017-11-07 16:32:09 +000063import org.onosproject.routing.fpm.protocol.FpmHeader;
64import org.onosproject.routing.fpm.protocol.Netlink;
65import org.onosproject.routing.fpm.protocol.NetlinkMessageType;
66import org.onosproject.routing.fpm.protocol.RouteAttribute;
67import org.onosproject.routing.fpm.protocol.RouteAttributeDst;
68import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
69import org.onosproject.routing.fpm.protocol.RtNetlink;
70import org.onosproject.routing.fpm.protocol.RtProtocol;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070071import org.onosproject.store.StoreDelegate;
Kalhee Kimba366062017-11-07 16:32:09 +000072import org.onosproject.store.serializers.KryoNamespaces;
Charles Chan035ed1f2018-01-30 16:00:32 -080073import org.onosproject.store.service.AsyncDistributedLock;
Kalhee Kimba366062017-11-07 16:32:09 +000074import org.onosproject.store.service.ConsistentMap;
75import org.onosproject.store.service.Serializer;
76import org.onosproject.store.service.StorageService;
Kalhee Kimba366062017-11-07 16:32:09 +000077import org.osgi.service.component.ComponentContext;
78import org.slf4j.Logger;
79import org.slf4j.LoggerFactory;
80
81import java.net.InetSocketAddress;
Charles Chan035ed1f2018-01-30 16:00:32 -080082import java.time.Duration;
Kalhee Kimba366062017-11-07 16:32:09 +000083import java.util.Collection;
84import java.util.Collections;
85import java.util.Dictionary;
86import java.util.HashSet;
87import java.util.LinkedList;
Kalhee Kimba366062017-11-07 16:32:09 +000088import java.util.List;
shalde064280feec2018-06-15 19:01:29 -040089import java.util.ArrayList;
90import java.util.Arrays;
Kalhee Kimba366062017-11-07 16:32:09 +000091import java.util.Map;
92import java.util.Set;
93import java.util.concurrent.ConcurrentHashMap;
94import java.util.stream.Collectors;
95
96import static java.util.concurrent.Executors.newCachedThreadPool;
97import static org.onlab.util.Tools.groupedThreads;
Kalhee Kimba366062017-11-07 16:32:09 +000098
99/**
100 * Forwarding Plane Manager (FPM) route source.
101 */
102@Service
103@Component(immediate = true)
104public class FpmManager implements FpmInfoService {
105 private final Logger log = LoggerFactory.getLogger(getClass());
106
107 private static final int FPM_PORT = 2620;
108 private static final String APP_NAME = "org.onosproject.fpm";
109 private static final int IDLE_TIMEOUT_SECS = 5;
Charles Chan035ed1f2018-01-30 16:00:32 -0800110 private static final String LOCK_NAME = "fpm-manager-lock";
Kalhee Kimba366062017-11-07 16:32:09 +0000111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected CoreService coreService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected ComponentConfigService componentConfigService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected RouteAdminService routeService;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
122 protected ClusterService clusterService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected StorageService storageService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected InterfaceService interfaceService;
129
130 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
131 bind = "bindRipStore",
132 unbind = "unbindRipStore",
133 policy = ReferencePolicy.DYNAMIC,
134 target = "(fpm_type=RIP)")
135 protected volatile FpmPrefixStore ripStore;
136
137 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
138 bind = "bindDhcpStore",
139 unbind = "unbindDhcpStore",
140 policy = ReferencePolicy.DYNAMIC,
141 target = "(fpm_type=DHCP)")
142 protected volatile FpmPrefixStore dhcpStore;
143
144 private final StoreDelegate<FpmPrefixStoreEvent> fpmPrefixStoreDelegate
145 = new FpmPrefixStoreDelegate();
146
147 private ApplicationId appId;
148 private ServerBootstrap serverBootstrap;
149 private Channel serverChannel;
150 private ChannelGroup allChannels = new DefaultChannelGroup();
Charles Chan035ed1f2018-01-30 16:00:32 -0800151 private final InternalClusterListener clusterListener = new InternalClusterListener();
152 private AsyncDistributedLock asyncLock;
Kalhee Kimba366062017-11-07 16:32:09 +0000153
154 private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
155
156 private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
157
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700158 //Local cache for peers to be used in case of cluster partition.
159 private Map<FpmPeer, Set<FpmConnectionInfo>> localPeers = new ConcurrentHashMap<>();
160
Kalhee Kimba366062017-11-07 16:32:09 +0000161 @Property(name = "clearRoutes", boolValue = true,
162 label = "Whether to clear routes when the FPM connection goes down")
163 private boolean clearRoutes = true;
164
165 @Property(name = "pdPushEnabled", boolValue = false,
166 label = "Whether to push prefixes to Quagga over fpm connection")
167 private boolean pdPushEnabled = false;
168
169 @Property(name = "pdPushNextHopIPv4", value = "",
170 label = "IPv4 next-hop address for PD Pushing.")
shalde064280feec2018-06-15 19:01:29 -0400171 private List<Ip4Address> pdPushNextHopIPv4 = null;
Kalhee Kimba366062017-11-07 16:32:09 +0000172
173 @Property(name = "pdPushNextHopIPv6", value = "",
174 label = "IPv6 next-hop address for PD Pushing.")
shalde064280feec2018-06-15 19:01:29 -0400175 private List<Ip6Address> pdPushNextHopIPv6 = null;
Kalhee Kimba366062017-11-07 16:32:09 +0000176
177 protected void bindRipStore(FpmPrefixStore store) {
178 if ((ripStore == null) && (store != null)) {
179 ripStore = store;
180 ripStore.setDelegate(fpmPrefixStoreDelegate);
181 for (Channel ch : allChannels) {
182 processRipStaticRoutes(ch);
183 }
184 }
185 }
186
187 protected void unbindRipStore(FpmPrefixStore store) {
188 if (ripStore == store) {
189 ripStore.unsetDelegate(fpmPrefixStoreDelegate);
190 ripStore = null;
191 }
192 }
193
194 protected void bindDhcpStore(FpmPrefixStore store) {
195 if ((dhcpStore == null) && (store != null)) {
196 dhcpStore = store;
197 dhcpStore.setDelegate(fpmPrefixStoreDelegate);
198 for (Channel ch : allChannels) {
199 processDhcpStaticRoutes(ch);
200 }
201 }
202 }
203
204 protected void unbindDhcpStore(FpmPrefixStore store) {
205 if (dhcpStore == store) {
206 dhcpStore.unsetDelegate(fpmPrefixStoreDelegate);
207 dhcpStore = null;
208 }
209 }
210
211 @Activate
212 protected void activate(ComponentContext context) {
213 componentConfigService.preSetProperty(
214 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
215 "distributed", "true");
216
217 componentConfigService.registerProperties(getClass());
218
219 KryoNamespace serializer = KryoNamespace.newBuilder()
220 .register(KryoNamespaces.API)
221 .register(FpmPeer.class)
222 .register(FpmConnectionInfo.class)
223 .build();
224 peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
225 .withName("fpm-connections")
226 .withSerializer(Serializer.using(serializer))
227 .build();
228
229 modified(context);
230 startServer();
231
232 appId = coreService.registerApplication(APP_NAME, peers::destroy);
233
Charles Chan035ed1f2018-01-30 16:00:32 -0800234 asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
Saurav Dase7f51012018-02-09 17:26:45 -0800235 clusterService.addListener(clusterListener);
Charles Chan035ed1f2018-01-30 16:00:32 -0800236
Kalhee Kimba366062017-11-07 16:32:09 +0000237 log.info("Started");
238 }
239
240 @Deactivate
241 protected void deactivate() {
242 componentConfigService.preSetProperty(
243 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
244 "distributed", "false");
245
246 stopServer();
247 fpmRoutes.clear();
248 componentConfigService.unregisterProperties(getClass(), false);
Charles Chan035ed1f2018-01-30 16:00:32 -0800249
250 clusterService.removeListener(clusterListener);
251 asyncLock.unlock();
252
Kalhee Kimba366062017-11-07 16:32:09 +0000253 log.info("Stopped");
254 }
255
256 @Modified
257 protected void modified(ComponentContext context) {
shalde064280feec2018-06-15 19:01:29 -0400258 Ip4Address rurIPv4Address;
259 Ip6Address rurIPv6Address;
Kalhee Kimba366062017-11-07 16:32:09 +0000260 Dictionary<?, ?> properties = context.getProperties();
261 if (properties == null) {
262 return;
263 }
264 String strClearRoutes = Tools.get(properties, "clearRoutes");
265 if (strClearRoutes != null) {
266 clearRoutes = Boolean.parseBoolean(strClearRoutes);
267 log.info("clearRoutes is {}", clearRoutes);
268 }
269
270 String strPdPushEnabled = Tools.get(properties, "pdPushEnabled");
271 if (strPdPushEnabled != null) {
272 boolean oldValue = pdPushEnabled;
273 pdPushEnabled = Boolean.parseBoolean(strPdPushEnabled);
274 if (pdPushEnabled) {
275
shalde064280feec2018-06-15 19:01:29 -0400276 pdPushNextHopIPv4 = new ArrayList<Ip4Address>();
277 pdPushNextHopIPv6 = new ArrayList<Ip6Address>();
Kalhee Kimba366062017-11-07 16:32:09 +0000278
279 String strPdPushNextHopIPv4 = Tools.get(properties, "pdPushNextHopIPv4");
280 if (strPdPushNextHopIPv4 != null) {
shalde064280feec2018-06-15 19:01:29 -0400281 List<String> strPdPushNextHopIPv4List = Arrays.asList(strPdPushNextHopIPv4.split(","));
282 for (String nextHop : strPdPushNextHopIPv4List) {
283 log.debug("IPv4 next hop added is:" + nextHop);
284 pdPushNextHopIPv4.add(Ip4Address.valueOf(nextHop));
285 }
Kalhee Kimba366062017-11-07 16:32:09 +0000286 }
287 String strPdPushNextHopIPv6 = Tools.get(properties, "pdPushNextHopIPv6");
288 if (strPdPushNextHopIPv6 != null) {
shalde064280feec2018-06-15 19:01:29 -0400289 List<String> strPdPushNextHopIPv6List = Arrays.asList(strPdPushNextHopIPv6.split(","));
290 for (String nextHop : strPdPushNextHopIPv6List) {
291 log.debug("IPv6 next hop added is:" + nextHop);
292 pdPushNextHopIPv6.add(Ip6Address.valueOf(nextHop));
293 }
Kalhee Kimba366062017-11-07 16:32:09 +0000294 }
295
shalde064280feec2018-06-15 19:01:29 -0400296 if (pdPushNextHopIPv4 == null || pdPushNextHopIPv4.size() == 0) {
297 rurIPv4Address = interfaceService.getInterfaces()
Kalhee Kimba366062017-11-07 16:32:09 +0000298 .stream()
299 .filter(iface -> iface.name().contains("RUR"))
300 .map(Interface::ipAddressesList)
301 .flatMap(Collection::stream)
302 .map(InterfaceIpAddress::ipAddress)
303 .filter(IpAddress::isIp4)
304 .map(IpAddress::getIp4Address)
305 .findFirst()
306 .orElse(null);
shalde064280feec2018-06-15 19:01:29 -0400307 if (rurIPv4Address != null) {
308 pdPushNextHopIPv4.add(rurIPv4Address);
309 }
310
Kalhee Kimba366062017-11-07 16:32:09 +0000311 }
312
313 if (pdPushNextHopIPv6 == null) {
shalde064280feec2018-06-15 19:01:29 -0400314 rurIPv6Address = interfaceService.getInterfaces()
Kalhee Kimba366062017-11-07 16:32:09 +0000315 .stream()
316 .filter(iface -> iface.name().contains("RUR"))
317 .map(Interface::ipAddressesList)
318 .flatMap(Collection::stream)
319 .map(InterfaceIpAddress::ipAddress)
320 .filter(IpAddress::isIp6)
321 .map(IpAddress::getIp6Address)
322 .findFirst()
323 .orElse(null);
shalde064280feec2018-06-15 19:01:29 -0400324 if (rurIPv6Address != null) {
325 pdPushNextHopIPv6.add(rurIPv6Address);
326 }
Kalhee Kimba366062017-11-07 16:32:09 +0000327 }
328
329 log.info("PD pushing is enabled.");
shalde064280feec2018-06-15 19:01:29 -0400330 if (pdPushNextHopIPv4 != null || pdPushNextHopIPv4.size() != 0) {
331 log.info("ipv4 next-hop {} with {} items", pdPushNextHopIPv4.toString(), pdPushNextHopIPv4.size());
332
Kalhee Kimba366062017-11-07 16:32:09 +0000333 } else {
334 log.info("ipv4 next-hop is null");
335 }
shalde064280feec2018-06-15 19:01:29 -0400336 if (pdPushNextHopIPv6 != null || pdPushNextHopIPv6.size() != 0) {
337 log.info("ipv6 next-hop={} with {} items", pdPushNextHopIPv6.toString(), pdPushNextHopIPv6.size());
Kalhee Kimba366062017-11-07 16:32:09 +0000338 } else {
339 log.info("ipv6 next-hop is null");
340 }
shalde064280feec2018-06-15 19:01:29 -0400341 processStaticRoutes();
Kalhee Kimba366062017-11-07 16:32:09 +0000342 } else {
343 log.info("PD pushing is disabled.");
344 }
345 }
346 }
347
348 private void startServer() {
349 HashedWheelTimer timer = new HashedWheelTimer(
350 groupedThreads("onos/fpm", "fpm-timer-%d", log));
351
352 ChannelFactory channelFactory = new NioServerSocketChannelFactory(
353 newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)),
354 newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log)));
355 ChannelPipelineFactory pipelineFactory = () -> {
356 // Allocate a new session per connection
357 IdleStateHandler idleHandler =
358 new IdleStateHandler(timer, IDLE_TIMEOUT_SECS, 0, 0);
359 FpmSessionHandler fpmSessionHandler =
360 new FpmSessionHandler(this, new InternalFpmListener());
361 FpmFrameDecoder fpmFrameDecoder = new FpmFrameDecoder();
362
363 // Setup the processing pipeline
364 ChannelPipeline pipeline = Channels.pipeline();
365 pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
366 pipeline.addLast("idle", idleHandler);
367 pipeline.addLast("FpmSession", fpmSessionHandler);
368 return pipeline;
369 };
370
371 InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT);
372
373 serverBootstrap = new ServerBootstrap(channelFactory);
374 serverBootstrap.setOption("child.reuseAddr", true);
375 serverBootstrap.setOption("child.keepAlive", true);
376 serverBootstrap.setOption("child.tcpNoDelay", true);
377 serverBootstrap.setPipelineFactory(pipelineFactory);
378 try {
379 serverChannel = serverBootstrap.bind(listenAddress);
380 allChannels.add(serverChannel);
381 } catch (ChannelException e) {
382 log.debug("Exception binding to FPM port {}: ",
383 listenAddress.getPort(), e);
384 stopServer();
385 }
386 }
387
388 private void stopServer() {
389 allChannels.close().awaitUninterruptibly();
390 allChannels.clear();
391 if (serverBootstrap != null) {
392 serverBootstrap.releaseExternalResources();
393 }
394
395 if (clearRoutes) {
396 peers.keySet().forEach(this::clearRoutes);
397 }
398 }
399
Kalhee Kim40beb722018-01-16 20:32:04 +0000400 private boolean routeInDhcpStore(IpPrefix prefix) {
401
402 if (dhcpStore != null) {
403 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
404 return dhcpRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
405 }
406 return false;
407 }
408
409 private boolean routeInRipStore(IpPrefix prefix) {
410
411 if (ripStore != null) {
412 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
413 return ripRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
414 }
415 return false;
416 }
417
Kalhee Kimba366062017-11-07 16:32:09 +0000418 private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
419 if (fpmMessage.type() == FpmHeader.FPM_TYPE_KEEPALIVE) {
420 return;
421 }
422
423 Netlink netlink = fpmMessage.netlink();
424 RtNetlink rtNetlink = netlink.rtNetlink();
425
426 if (log.isTraceEnabled()) {
427 log.trace("Received FPM message: {}", fpmMessage);
428 }
429
430 if (!(rtNetlink.protocol() == RtProtocol.ZEBRA ||
431 rtNetlink.protocol() == RtProtocol.UNSPEC)) {
432 log.trace("Ignoring non-zebra route");
433 return;
434 }
435
436 IpAddress dstAddress = null;
437 IpAddress gateway = null;
438
439 for (RouteAttribute attribute : rtNetlink.attributes()) {
440 if (attribute.type() == RouteAttribute.RTA_DST) {
441 RouteAttributeDst raDst = (RouteAttributeDst) attribute;
442 dstAddress = raDst.dstAddress();
443 } else if (attribute.type() == RouteAttribute.RTA_GATEWAY) {
444 RouteAttributeGateway raGateway = (RouteAttributeGateway) attribute;
445 gateway = raGateway.gateway();
446 }
447 }
448
449 if (dstAddress == null) {
450 log.error("Dst address missing!");
451 return;
452 }
453
454 IpPrefix prefix = IpPrefix.valueOf(dstAddress, rtNetlink.dstLength());
455
Kalhee Kim40beb722018-01-16 20:32:04 +0000456 // Ignore routes that we sent.
Ray Milkeyffe1a332018-01-24 10:41:14 -0800457 if (gateway != null && ((prefix.isIp4() && (gateway.equals(pdPushNextHopIPv4))) ||
458 gateway.equals(pdPushNextHopIPv6))) {
Kalhee Kim40beb722018-01-16 20:32:04 +0000459 if (routeInDhcpStore(prefix) || routeInRipStore(prefix)) {
460 return;
461 }
462 }
463
Kalhee Kimba366062017-11-07 16:32:09 +0000464 List<Route> updates = new LinkedList<>();
465 List<Route> withdraws = new LinkedList<>();
466
467 Route route;
468 switch (netlink.type()) {
469 case RTM_NEWROUTE:
470 if (gateway == null) {
471 // We ignore interface routes with no gateway for now.
472 return;
473 }
474 route = new Route(Route.Source.FPM, prefix, gateway, clusterService.getLocalNode().id());
475
476
477 Route oldRoute = fpmRoutes.get(peer).put(prefix, route);
478
479 if (oldRoute != null) {
480 log.trace("Swapping {} with {}", oldRoute, route);
481 withdraws.add(oldRoute);
482 }
483 updates.add(route);
484 break;
485 case RTM_DELROUTE:
486 Route existing = fpmRoutes.get(peer).remove(prefix);
487 if (existing == null) {
488 log.warn("Got delete for non-existent prefix");
489 return;
490 }
491
492 route = new Route(Route.Source.FPM, prefix, existing.nextHop(), clusterService.getLocalNode().id());
493
494 withdraws.add(route);
495 break;
496 case RTM_GETROUTE:
497 default:
498 break;
499 }
500
Charles Chan035ed1f2018-01-30 16:00:32 -0800501 updateRouteStore(updates, withdraws);
502 }
503
504 private synchronized void updateRouteStore(Collection<Route> routesToAdd, Collection<Route> routesToRemove) {
505 routeService.withdraw(routesToRemove);
506 routeService.update(routesToAdd);
Kalhee Kimba366062017-11-07 16:32:09 +0000507 }
508
509 private void clearRoutes(FpmPeer peer) {
510 log.info("Clearing all routes for peer {}", peer);
511 Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
512 if (routes != null) {
Charles Chan035ed1f2018-01-30 16:00:32 -0800513 updateRouteStore(Lists.newArrayList(), routes.values());
Kalhee Kimba366062017-11-07 16:32:09 +0000514 }
515 }
516
517 public void processStaticRoutes() {
shalde064280feec2018-06-15 19:01:29 -0400518 log.debug("processStaticRoutes function is called");
Kalhee Kimba366062017-11-07 16:32:09 +0000519 for (Channel ch : allChannels) {
520 processStaticRoutes(ch);
521 }
522 }
523
524 public void processStaticRoutes(Channel ch) {
525 processRipStaticRoutes(ch);
526 processDhcpStaticRoutes(ch);
527 }
528
529 private void processRipStaticRoutes(Channel ch) {
530
531 /* Get RIP static routes. */
532 if (ripStore != null) {
533 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
534 log.info("RIP store size is {}", ripRecords.size());
535
536 ripRecords.forEach(record -> sendRouteUpdateToChannel(true,
537 record.ipPrefix(), ch));
538 }
539 }
540
541 private void processDhcpStaticRoutes(Channel ch) {
542
543 /* Get Dhcp static routes. */
544 if (dhcpStore != null) {
545 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
546 log.info("Dhcp store size is {}", dhcpRecords.size());
547
548 dhcpRecords.forEach(record -> sendRouteUpdateToChannel(true,
549 record.ipPrefix(), ch));
550 }
551 }
552
shalde064280feec2018-06-15 19:01:29 -0400553 private void updateRoute(IpAddress pdPushNextHop, boolean isAdd, IpPrefix prefix,
554 Channel ch, int raLength, short addrFamily) {
Kalhee Kimba366062017-11-07 16:32:09 +0000555 try {
shalde064280feec2018-06-15 19:01:29 -0400556 RouteAttributeDst raDst = RouteAttributeDst.builder()
557 .length(raLength)
558 .type(RouteAttribute.RTA_DST)
559 .dstAddress(prefix.address())
560 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000561
Kalhee Kim715dd732018-01-23 14:39:56 +0000562 RouteAttributeGateway raGateway = RouteAttributeGateway.builder()
shalde064280feec2018-06-15 19:01:29 -0400563 .length(raLength)
564 .type(RouteAttribute.RTA_GATEWAY)
565 .gateway(pdPushNextHop)
566 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000567
Kalhee Kim715dd732018-01-23 14:39:56 +0000568 // Build RtNetlink.
569 RtNetlink rtNetlink = RtNetlink.builder()
shalde064280feec2018-06-15 19:01:29 -0400570 .addressFamily(addrFamily)
571 .dstLength(prefix.prefixLength())
572 .routeAttribute(raDst)
573 .routeAttribute(raGateway)
574 .build();
Kalhee Kim715dd732018-01-23 14:39:56 +0000575
576 // Build Netlink.
Kalhee Kimba366062017-11-07 16:32:09 +0000577 int messageLength = raDst.length() + raGateway.length() +
shalde064280feec2018-06-15 19:01:29 -0400578 RtNetlink.RT_NETLINK_LENGTH + Netlink.NETLINK_HEADER_LENGTH;
Kalhee Kim715dd732018-01-23 14:39:56 +0000579 Netlink netLink = Netlink.builder()
shalde064280feec2018-06-15 19:01:29 -0400580 .length(messageLength)
581 .type(isAdd ? NetlinkMessageType.RTM_NEWROUTE : NetlinkMessageType.RTM_DELROUTE)
582 .flags(Netlink.NETLINK_REQUEST | Netlink.NETLINK_CREATE)
583 .rtNetlink(rtNetlink)
584 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000585
Kalhee Kim715dd732018-01-23 14:39:56 +0000586 // Build FpmHeader.
Kalhee Kimba366062017-11-07 16:32:09 +0000587 messageLength += FpmHeader.FPM_HEADER_LENGTH;
Kalhee Kim715dd732018-01-23 14:39:56 +0000588 FpmHeader fpmMessage = FpmHeader.builder()
shalde064280feec2018-06-15 19:01:29 -0400589 .version(FpmHeader.FPM_VERSION_1)
590 .type(FpmHeader.FPM_TYPE_NETLINK)
591 .length(messageLength)
592 .netlink(netLink)
593 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000594
595 // Encode message in a channel buffer and transmit.
596 ch.write(fpmMessage.encode());
597
598 } catch (RuntimeException e) {
599 log.info("Route not sent over fpm connection.");
600 }
601 }
602
shalde064280feec2018-06-15 19:01:29 -0400603 private void sendRouteUpdateToChannel(boolean isAdd, IpPrefix prefix, Channel ch) {
604
605 if (!pdPushEnabled) {
606 return;
607 }
608 int raLength;
609 short addrFamily;
610
611 // Build route attributes.
612 if (prefix.isIp4()) {
613 List<Ip4Address> pdPushNextHopList;
614 if (pdPushNextHopIPv4 == null || pdPushNextHopIPv4.size() == 0) {
615 log.info("Prefix not pushed because ipv4 next-hop is null.");
616 return;
617 }
618 pdPushNextHopList = pdPushNextHopIPv4;
619 raLength = Ip4Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
620 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET;
621 for (Ip4Address pdPushNextHop: pdPushNextHopList) {
622 log.debug("IPv4 next hop is:" + pdPushNextHop);
623 updateRoute(pdPushNextHop, isAdd, prefix, ch, raLength, addrFamily);
624 }
625 } else {
626 List<Ip6Address> pdPushNextHopList;
627 if (pdPushNextHopIPv6 == null || pdPushNextHopIPv6.size() == 0) {
628 log.info("Prefix not pushed because ipv6 next-hop is null.");
629 return;
630 }
631 pdPushNextHopList = pdPushNextHopIPv6;
632 raLength = Ip6Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
633 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET6;
634 for (Ip6Address pdPushNextHop: pdPushNextHopList) {
635 log.debug("IPv6 next hop is:" + pdPushNextHop);
636 updateRoute(pdPushNextHop, isAdd, prefix, ch, raLength, addrFamily);
637 }
638 }
639 }
640
Kalhee Kimba366062017-11-07 16:32:09 +0000641 private void sendRouteUpdate(boolean isAdd, IpPrefix prefix) {
642
643 for (Channel ch : allChannels) {
644 sendRouteUpdateToChannel(isAdd, prefix, ch);
645 }
646 }
647
648 public boolean isPdPushEnabled() {
649 return pdPushEnabled;
650 }
651
652 private FpmPeerInfo toFpmInfo(FpmPeer peer, Collection<FpmConnectionInfo> connections) {
653 return new FpmPeerInfo(connections,
654 fpmRoutes.getOrDefault(peer, Collections.emptyMap()).size());
655 }
656
657 @Override
658 public Map<FpmPeer, FpmPeerInfo> peers() {
659 return peers.asJavaMap().entrySet().stream()
660 .collect(Collectors.toMap(
661 e -> e.getKey(),
662 e -> toFpmInfo(e.getKey(), e.getValue())));
663 }
664
665 private class InternalFpmListener implements FpmListener {
666 @Override
667 public void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
668 FpmManager.this.fpmMessage(peer, fpmMessage);
669 }
670
671 @Override
672 public boolean peerConnected(FpmPeer peer) {
673 if (peers.keySet().contains(peer)) {
674 return false;
675 }
676
677 NodeId localNode = clusterService.getLocalNode().id();
678 peers.compute(peer, (p, infos) -> {
679 if (infos == null) {
680 infos = new HashSet<>();
681 }
682
683 infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700684 localPeers.put(peer, infos);
Kalhee Kimba366062017-11-07 16:32:09 +0000685 return infos;
686 });
687
688 fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
689 return true;
690 }
691
692 @Override
693 public void peerDisconnected(FpmPeer peer) {
694 log.info("FPM connection to {} went down", peer);
695
696 if (clearRoutes) {
697 clearRoutes(peer);
698 }
699
700 peers.compute(peer, (p, infos) -> {
701 if (infos == null) {
702 return null;
703 }
704
705 infos.stream()
706 .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
707 .findAny()
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700708 .ifPresent(i -> {
709 infos.remove(i);
710 localPeers.get(peer).remove(i);
711 });
Kalhee Kimba366062017-11-07 16:32:09 +0000712
713 if (infos.isEmpty()) {
714 return null;
715 }
716
717 return infos;
718 });
719 }
720 }
721
722 /**
723 * Adds a channel to the channel group.
724 *
725 * @param channel the channel to add
726 */
727 public void addSessionChannel(Channel channel) {
728 allChannels.add(channel);
729 }
730
731 /**
732 * Removes a channel from the channel group.
733 *
734 * @param channel the channel to remove
735 */
736 public void removeSessionChannel(Channel channel) {
737 allChannels.remove(channel);
738 }
739
740 /**
741 * Store delegate for Fpm Prefix store.
742 * Handles Fpm prefix store event.
743 */
744 class FpmPrefixStoreDelegate implements StoreDelegate<FpmPrefixStoreEvent> {
745
746 @Override
747 public void notify(FpmPrefixStoreEvent e) {
748
749 log.trace("FpmPrefixStoreEvent notify");
750
751 FpmRecord record = e.subject();
752 switch (e.type()) {
753 case ADD:
754 sendRouteUpdate(true, record.ipPrefix());
755 break;
756 case REMOVE:
757 sendRouteUpdate(false, record.ipPrefix());
758 break;
759 default:
760 log.warn("unsupported store event type", e.type());
761 return;
762 }
763 }
764 }
Charles Chan035ed1f2018-01-30 16:00:32 -0800765
766 private class InternalClusterListener implements ClusterEventListener {
767 @Override
768 public void event(ClusterEvent event) {
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700769 log.info("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
Charles Chan035ed1f2018-01-30 16:00:32 -0800770 switch (event.type()) {
771 case INSTANCE_READY:
772 // When current node is healing from a network partition,
773 // seeing INSTANCE_READY means current node has the ability to read from the cluster,
774 // but it is possible that current node still can't write to the cluster at this moment.
775 // The AsyncDistributedLock is introduced to ensure we attempt to push FPM routes
776 // after current node can write.
777 // Adding 15 seconds retry for the current node to be able to write.
778 asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
779 if (result != null && result.isPresent()) {
780 log.debug("Lock obtained. Push local FPM routes to route store");
781 // All FPM routes on current node will be pushed again even when current node is not
782 // the one that becomes READY. A better way is to do this only on the minority nodes.
783 pushFpmRoutes();
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700784 localPeers.forEach((key, value) -> peers.put(key, value));
Charles Chan035ed1f2018-01-30 16:00:32 -0800785 asyncLock.unlock();
786 } else {
787 log.debug("Fail to obtain lock. Abort.");
788 }
789 });
790 break;
791 case INSTANCE_DEACTIVATED:
Charles Chan035ed1f2018-01-30 16:00:32 -0800792 case INSTANCE_REMOVED:
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700793 ImmutableMap.copyOf(peers.asJavaMap()).forEach((key, value) -> {
794 if (value != null) {
795 value.stream()
796 .filter(i -> i.connectedTo().equals(event.subject().id()))
797 .findAny()
798 .ifPresent(value::remove);
799
800 if (value.isEmpty()) {
801 peers.remove(key);
802 }
803 }
804 });
805 break;
806 case INSTANCE_ADDED:
Charles Chan035ed1f2018-01-30 16:00:32 -0800807 case INSTANCE_ACTIVATED:
808 default:
809 break;
810 }
811 }
812 }
813
Saurav Dase7f51012018-02-09 17:26:45 -0800814 @Override
Charles Chan035ed1f2018-01-30 16:00:32 -0800815 public void pushFpmRoutes() {
816 Set<Route> routes = fpmRoutes.values().stream()
817 .map(Map::entrySet).flatMap(Set::stream).map(Map.Entry::getValue)
818 .collect(Collectors.toSet());
819 updateRouteStore(routes, Lists.newArrayList());
820 log.info("{} FPM routes have been updated to route store", routes.size());
821 }
Kalhee Kimba366062017-11-07 16:32:09 +0000822}