blob: 4cec95d014cb3c855c483c8c2c6c73cd65a3ab2d [file] [log] [blame]
Kalhee Kimba366062017-11-07 16:32:09 +00001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.routing.fpm;
18
Andrea Campanella4310f6e2018-03-27 16:35:39 -070019import com.google.common.collect.ImmutableMap;
Charles Chan035ed1f2018-01-30 16:00:32 -080020import com.google.common.collect.Lists;
Kalhee Kimba366062017-11-07 16:32:09 +000021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Modified;
25import org.apache.felix.scr.annotations.Property;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.ReferencePolicy;
29import org.apache.felix.scr.annotations.Service;
30import org.jboss.netty.bootstrap.ServerBootstrap;
31import org.jboss.netty.channel.Channel;
32import org.jboss.netty.channel.ChannelException;
33import org.jboss.netty.channel.ChannelFactory;
34import org.jboss.netty.channel.ChannelPipeline;
35import org.jboss.netty.channel.ChannelPipelineFactory;
36import org.jboss.netty.channel.Channels;
37import org.jboss.netty.channel.group.ChannelGroup;
38import org.jboss.netty.channel.group.DefaultChannelGroup;
39import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
40import org.jboss.netty.handler.timeout.IdleStateHandler;
41import org.jboss.netty.util.HashedWheelTimer;
Kalhee Kimba366062017-11-07 16:32:09 +000042import org.onlab.packet.Ip4Address;
43import org.onlab.packet.Ip6Address;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070044import org.onlab.packet.IpAddress;
Kalhee Kimba366062017-11-07 16:32:09 +000045import org.onlab.packet.IpPrefix;
Kalhee Kimba366062017-11-07 16:32:09 +000046import org.onlab.util.KryoNamespace;
47import org.onlab.util.Tools;
48import org.onosproject.cfg.ComponentConfigService;
Charles Chan035ed1f2018-01-30 16:00:32 -080049import org.onosproject.cluster.ClusterEvent;
50import org.onosproject.cluster.ClusterEventListener;
Kalhee Kimba366062017-11-07 16:32:09 +000051import org.onosproject.cluster.ClusterService;
52import org.onosproject.cluster.NodeId;
Kalhee Kimba366062017-11-07 16:32:09 +000053import org.onosproject.core.ApplicationId;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070054import org.onosproject.core.CoreService;
55import org.onosproject.net.host.InterfaceIpAddress;
56import org.onosproject.net.intf.Interface;
57import org.onosproject.net.intf.InterfaceService;
Kalhee Kimba366062017-11-07 16:32:09 +000058import org.onosproject.routeservice.Route;
59import org.onosproject.routeservice.RouteAdminService;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070060import org.onosproject.routing.fpm.api.FpmPrefixStore;
61import org.onosproject.routing.fpm.api.FpmPrefixStoreEvent;
62import org.onosproject.routing.fpm.api.FpmRecord;
Kalhee Kimba366062017-11-07 16:32:09 +000063import org.onosproject.routing.fpm.protocol.FpmHeader;
64import org.onosproject.routing.fpm.protocol.Netlink;
65import org.onosproject.routing.fpm.protocol.NetlinkMessageType;
66import org.onosproject.routing.fpm.protocol.RouteAttribute;
67import org.onosproject.routing.fpm.protocol.RouteAttributeDst;
68import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
69import org.onosproject.routing.fpm.protocol.RtNetlink;
70import org.onosproject.routing.fpm.protocol.RtProtocol;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070071import org.onosproject.store.StoreDelegate;
Kalhee Kimba366062017-11-07 16:32:09 +000072import org.onosproject.store.serializers.KryoNamespaces;
Charles Chan035ed1f2018-01-30 16:00:32 -080073import org.onosproject.store.service.AsyncDistributedLock;
Kalhee Kimba366062017-11-07 16:32:09 +000074import org.onosproject.store.service.ConsistentMap;
75import org.onosproject.store.service.Serializer;
76import org.onosproject.store.service.StorageService;
Kalhee Kimba366062017-11-07 16:32:09 +000077import org.osgi.service.component.ComponentContext;
78import org.slf4j.Logger;
79import org.slf4j.LoggerFactory;
80
81import java.net.InetSocketAddress;
Charles Chan035ed1f2018-01-30 16:00:32 -080082import java.time.Duration;
Kalhee Kimba366062017-11-07 16:32:09 +000083import java.util.Collection;
84import java.util.Collections;
85import java.util.Dictionary;
86import java.util.HashSet;
87import java.util.LinkedList;
Kalhee Kimba366062017-11-07 16:32:09 +000088import java.util.List;
shalde064d1fe85b2018-06-15 19:01:29 -040089import java.util.ArrayList;
90import java.util.Arrays;
Kalhee Kimba366062017-11-07 16:32:09 +000091import java.util.Map;
92import java.util.Set;
William Daviesd3687502019-06-19 20:23:17 +000093import java.util.Iterator;
94
Kalhee Kimba366062017-11-07 16:32:09 +000095import java.util.concurrent.ConcurrentHashMap;
Jordan Halterman1c9a0b42018-08-13 02:41:50 -070096import java.util.concurrent.ExecutorService;
97import java.util.concurrent.Executors;
Kalhee Kimba366062017-11-07 16:32:09 +000098import java.util.stream.Collectors;
99
100import static java.util.concurrent.Executors.newCachedThreadPool;
101import static org.onlab.util.Tools.groupedThreads;
Kalhee Kimba366062017-11-07 16:32:09 +0000102
103/**
104 * Forwarding Plane Manager (FPM) route source.
105 */
106@Service
107@Component(immediate = true)
108public class FpmManager implements FpmInfoService {
109 private final Logger log = LoggerFactory.getLogger(getClass());
110
111 private static final int FPM_PORT = 2620;
112 private static final String APP_NAME = "org.onosproject.fpm";
113 private static final int IDLE_TIMEOUT_SECS = 5;
Charles Chan035ed1f2018-01-30 16:00:32 -0800114 private static final String LOCK_NAME = "fpm-manager-lock";
Kalhee Kimba366062017-11-07 16:32:09 +0000115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected CoreService coreService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected ComponentConfigService componentConfigService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected RouteAdminService routeService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected ClusterService clusterService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected StorageService storageService;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected InterfaceService interfaceService;
133
134 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
135 bind = "bindRipStore",
136 unbind = "unbindRipStore",
137 policy = ReferencePolicy.DYNAMIC,
138 target = "(fpm_type=RIP)")
139 protected volatile FpmPrefixStore ripStore;
140
141 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
142 bind = "bindDhcpStore",
143 unbind = "unbindDhcpStore",
144 policy = ReferencePolicy.DYNAMIC,
145 target = "(fpm_type=DHCP)")
146 protected volatile FpmPrefixStore dhcpStore;
147
148 private final StoreDelegate<FpmPrefixStoreEvent> fpmPrefixStoreDelegate
149 = new FpmPrefixStoreDelegate();
150
151 private ApplicationId appId;
152 private ServerBootstrap serverBootstrap;
153 private Channel serverChannel;
154 private ChannelGroup allChannels = new DefaultChannelGroup();
Charles Chan035ed1f2018-01-30 16:00:32 -0800155 private final InternalClusterListener clusterListener = new InternalClusterListener();
156 private AsyncDistributedLock asyncLock;
Kalhee Kimba366062017-11-07 16:32:09 +0000157
Jordan Halterman1c9a0b42018-08-13 02:41:50 -0700158 private ExecutorService clusterEventExecutor;
159
Kalhee Kimba366062017-11-07 16:32:09 +0000160 private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
161
162 private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
163
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700164 //Local cache for peers to be used in case of cluster partition.
165 private Map<FpmPeer, Set<FpmConnectionInfo>> localPeers = new ConcurrentHashMap<>();
166
Kalhee Kimba366062017-11-07 16:32:09 +0000167 @Property(name = "clearRoutes", boolValue = true,
168 label = "Whether to clear routes when the FPM connection goes down")
169 private boolean clearRoutes = true;
170
171 @Property(name = "pdPushEnabled", boolValue = false,
172 label = "Whether to push prefixes to Quagga over fpm connection")
173 private boolean pdPushEnabled = false;
174
175 @Property(name = "pdPushNextHopIPv4", value = "",
176 label = "IPv4 next-hop address for PD Pushing.")
shalde064d1fe85b2018-06-15 19:01:29 -0400177 private List<Ip4Address> pdPushNextHopIPv4 = null;
Kalhee Kimba366062017-11-07 16:32:09 +0000178
179 @Property(name = "pdPushNextHopIPv6", value = "",
180 label = "IPv6 next-hop address for PD Pushing.")
shalde064d1fe85b2018-06-15 19:01:29 -0400181 private List<Ip6Address> pdPushNextHopIPv6 = null;
Kalhee Kimba366062017-11-07 16:32:09 +0000182
183 protected void bindRipStore(FpmPrefixStore store) {
184 if ((ripStore == null) && (store != null)) {
185 ripStore = store;
186 ripStore.setDelegate(fpmPrefixStoreDelegate);
187 for (Channel ch : allChannels) {
188 processRipStaticRoutes(ch);
189 }
190 }
191 }
192
193 protected void unbindRipStore(FpmPrefixStore store) {
194 if (ripStore == store) {
195 ripStore.unsetDelegate(fpmPrefixStoreDelegate);
196 ripStore = null;
197 }
198 }
199
200 protected void bindDhcpStore(FpmPrefixStore store) {
201 if ((dhcpStore == null) && (store != null)) {
202 dhcpStore = store;
203 dhcpStore.setDelegate(fpmPrefixStoreDelegate);
204 for (Channel ch : allChannels) {
205 processDhcpStaticRoutes(ch);
206 }
207 }
208 }
209
210 protected void unbindDhcpStore(FpmPrefixStore store) {
211 if (dhcpStore == store) {
212 dhcpStore.unsetDelegate(fpmPrefixStoreDelegate);
213 dhcpStore = null;
214 }
215 }
216
217 @Activate
218 protected void activate(ComponentContext context) {
219 componentConfigService.preSetProperty(
220 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
221 "distributed", "true");
222
223 componentConfigService.registerProperties(getClass());
224
225 KryoNamespace serializer = KryoNamespace.newBuilder()
226 .register(KryoNamespaces.API)
227 .register(FpmPeer.class)
228 .register(FpmConnectionInfo.class)
229 .build();
230 peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
231 .withName("fpm-connections")
232 .withSerializer(Serializer.using(serializer))
233 .build();
234
235 modified(context);
236 startServer();
237
238 appId = coreService.registerApplication(APP_NAME, peers::destroy);
239
Charles Chan035ed1f2018-01-30 16:00:32 -0800240 asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
Jordan Halterman1c9a0b42018-08-13 02:41:50 -0700241
242 clusterEventExecutor = Executors.newSingleThreadExecutor(groupedThreads("fpm-event-main", "%d", log));
Saurav Dase7f51012018-02-09 17:26:45 -0800243 clusterService.addListener(clusterListener);
Charles Chan035ed1f2018-01-30 16:00:32 -0800244
Kalhee Kimba366062017-11-07 16:32:09 +0000245 log.info("Started");
246 }
247
248 @Deactivate
249 protected void deactivate() {
250 componentConfigService.preSetProperty(
251 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
252 "distributed", "false");
253
254 stopServer();
255 fpmRoutes.clear();
256 componentConfigService.unregisterProperties(getClass(), false);
Charles Chan035ed1f2018-01-30 16:00:32 -0800257
258 clusterService.removeListener(clusterListener);
Jordan Halterman1c9a0b42018-08-13 02:41:50 -0700259 clusterEventExecutor.shutdown();
Charles Chan035ed1f2018-01-30 16:00:32 -0800260 asyncLock.unlock();
261
Kalhee Kimba366062017-11-07 16:32:09 +0000262 log.info("Stopped");
263 }
264
265 @Modified
266 protected void modified(ComponentContext context) {
shalde064d1fe85b2018-06-15 19:01:29 -0400267 Ip4Address rurIPv4Address;
268 Ip6Address rurIPv6Address;
Kalhee Kimba366062017-11-07 16:32:09 +0000269 Dictionary<?, ?> properties = context.getProperties();
270 if (properties == null) {
271 return;
272 }
273 String strClearRoutes = Tools.get(properties, "clearRoutes");
274 if (strClearRoutes != null) {
275 clearRoutes = Boolean.parseBoolean(strClearRoutes);
276 log.info("clearRoutes is {}", clearRoutes);
277 }
278
279 String strPdPushEnabled = Tools.get(properties, "pdPushEnabled");
280 if (strPdPushEnabled != null) {
281 boolean oldValue = pdPushEnabled;
282 pdPushEnabled = Boolean.parseBoolean(strPdPushEnabled);
283 if (pdPushEnabled) {
284
shalde064d1fe85b2018-06-15 19:01:29 -0400285 pdPushNextHopIPv4 = new ArrayList<Ip4Address>();
286 pdPushNextHopIPv6 = new ArrayList<Ip6Address>();
Kalhee Kimba366062017-11-07 16:32:09 +0000287
288 String strPdPushNextHopIPv4 = Tools.get(properties, "pdPushNextHopIPv4");
289 if (strPdPushNextHopIPv4 != null) {
shalde064d1fe85b2018-06-15 19:01:29 -0400290 List<String> strPdPushNextHopIPv4List = Arrays.asList(strPdPushNextHopIPv4.split(","));
291 for (String nextHop : strPdPushNextHopIPv4List) {
Mayank Tiwariaa3c7ea2018-11-23 16:18:50 -0500292 log.trace("IPv4 next hop added is:" + nextHop);
shalde064d1fe85b2018-06-15 19:01:29 -0400293 pdPushNextHopIPv4.add(Ip4Address.valueOf(nextHop));
294 }
Kalhee Kimba366062017-11-07 16:32:09 +0000295 }
296 String strPdPushNextHopIPv6 = Tools.get(properties, "pdPushNextHopIPv6");
297 if (strPdPushNextHopIPv6 != null) {
shalde064d1fe85b2018-06-15 19:01:29 -0400298 List<String> strPdPushNextHopIPv6List = Arrays.asList(strPdPushNextHopIPv6.split(","));
299 for (String nextHop : strPdPushNextHopIPv6List) {
Mayank Tiwariaa3c7ea2018-11-23 16:18:50 -0500300 log.trace("IPv6 next hop added is:" + nextHop);
shalde064d1fe85b2018-06-15 19:01:29 -0400301 pdPushNextHopIPv6.add(Ip6Address.valueOf(nextHop));
302 }
Kalhee Kimba366062017-11-07 16:32:09 +0000303 }
304
shalde064d1fe85b2018-06-15 19:01:29 -0400305 if (pdPushNextHopIPv4 == null || pdPushNextHopIPv4.size() == 0) {
306 rurIPv4Address = interfaceService.getInterfaces()
Kalhee Kimba366062017-11-07 16:32:09 +0000307 .stream()
308 .filter(iface -> iface.name().contains("RUR"))
309 .map(Interface::ipAddressesList)
310 .flatMap(Collection::stream)
311 .map(InterfaceIpAddress::ipAddress)
312 .filter(IpAddress::isIp4)
313 .map(IpAddress::getIp4Address)
314 .findFirst()
315 .orElse(null);
Mayank Tiwariaab61e42018-06-23 11:19:08 -0400316 log.debug("RUR IPv4 address extracted from netcfg is: {}", rurIPv4Address);
shalde064d1fe85b2018-06-15 19:01:29 -0400317 if (rurIPv4Address != null) {
318 pdPushNextHopIPv4.add(rurIPv4Address);
Mayank Tiwariaab61e42018-06-23 11:19:08 -0400319 } else {
320 log.debug("Unable to extract RUR IPv4 address from netcfg");
shalde064d1fe85b2018-06-15 19:01:29 -0400321 }
322
Kalhee Kimba366062017-11-07 16:32:09 +0000323 }
324
Mayank Tiwariaab61e42018-06-23 11:19:08 -0400325 if (pdPushNextHopIPv6 == null || pdPushNextHopIPv6.size() == 0) {
shalde064d1fe85b2018-06-15 19:01:29 -0400326 rurIPv6Address = interfaceService.getInterfaces()
Kalhee Kimba366062017-11-07 16:32:09 +0000327 .stream()
328 .filter(iface -> iface.name().contains("RUR"))
329 .map(Interface::ipAddressesList)
330 .flatMap(Collection::stream)
331 .map(InterfaceIpAddress::ipAddress)
332 .filter(IpAddress::isIp6)
333 .map(IpAddress::getIp6Address)
334 .findFirst()
335 .orElse(null);
Mayank Tiwariaab61e42018-06-23 11:19:08 -0400336 log.debug("RUR IPv6 address extracted from netcfg is: {}", rurIPv6Address);
shalde064d1fe85b2018-06-15 19:01:29 -0400337 if (rurIPv6Address != null) {
338 pdPushNextHopIPv6.add(rurIPv6Address);
Mayank Tiwariaab61e42018-06-23 11:19:08 -0400339 } else {
340 log.debug("Unable to extract RUR IPv6 address from netcfg");
shalde064d1fe85b2018-06-15 19:01:29 -0400341 }
Kalhee Kimba366062017-11-07 16:32:09 +0000342 }
343
344 log.info("PD pushing is enabled.");
shalde064d1fe85b2018-06-15 19:01:29 -0400345 if (pdPushNextHopIPv4 != null || pdPushNextHopIPv4.size() != 0) {
346 log.info("ipv4 next-hop {} with {} items", pdPushNextHopIPv4.toString(), pdPushNextHopIPv4.size());
347
Kalhee Kimba366062017-11-07 16:32:09 +0000348 } else {
349 log.info("ipv4 next-hop is null");
350 }
shalde064d1fe85b2018-06-15 19:01:29 -0400351 if (pdPushNextHopIPv6 != null || pdPushNextHopIPv6.size() != 0) {
352 log.info("ipv6 next-hop={} with {} items", pdPushNextHopIPv6.toString(), pdPushNextHopIPv6.size());
Kalhee Kimba366062017-11-07 16:32:09 +0000353 } else {
354 log.info("ipv6 next-hop is null");
355 }
shalde064d1fe85b2018-06-15 19:01:29 -0400356 processStaticRoutes();
Kalhee Kimba366062017-11-07 16:32:09 +0000357 } else {
358 log.info("PD pushing is disabled.");
359 }
360 }
361 }
362
363 private void startServer() {
364 HashedWheelTimer timer = new HashedWheelTimer(
365 groupedThreads("onos/fpm", "fpm-timer-%d", log));
366
367 ChannelFactory channelFactory = new NioServerSocketChannelFactory(
368 newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)),
369 newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log)));
370 ChannelPipelineFactory pipelineFactory = () -> {
371 // Allocate a new session per connection
372 IdleStateHandler idleHandler =
373 new IdleStateHandler(timer, IDLE_TIMEOUT_SECS, 0, 0);
374 FpmSessionHandler fpmSessionHandler =
375 new FpmSessionHandler(this, new InternalFpmListener());
376 FpmFrameDecoder fpmFrameDecoder = new FpmFrameDecoder();
377
378 // Setup the processing pipeline
379 ChannelPipeline pipeline = Channels.pipeline();
380 pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
381 pipeline.addLast("idle", idleHandler);
382 pipeline.addLast("FpmSession", fpmSessionHandler);
383 return pipeline;
384 };
385
386 InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT);
387
388 serverBootstrap = new ServerBootstrap(channelFactory);
389 serverBootstrap.setOption("child.reuseAddr", true);
390 serverBootstrap.setOption("child.keepAlive", true);
391 serverBootstrap.setOption("child.tcpNoDelay", true);
392 serverBootstrap.setPipelineFactory(pipelineFactory);
393 try {
394 serverChannel = serverBootstrap.bind(listenAddress);
395 allChannels.add(serverChannel);
396 } catch (ChannelException e) {
397 log.debug("Exception binding to FPM port {}: ",
398 listenAddress.getPort(), e);
399 stopServer();
400 }
401 }
402
403 private void stopServer() {
404 allChannels.close().awaitUninterruptibly();
405 allChannels.clear();
406 if (serverBootstrap != null) {
407 serverBootstrap.releaseExternalResources();
408 }
409
410 if (clearRoutes) {
Harshada Chaundkar11ac6e62019-09-25 17:42:33 +0000411 log.debug("Clearing routes for the peer");
Kalhee Kimba366062017-11-07 16:32:09 +0000412 peers.keySet().forEach(this::clearRoutes);
413 }
414 }
415
Kalhee Kim40beb722018-01-16 20:32:04 +0000416 private boolean routeInDhcpStore(IpPrefix prefix) {
417
418 if (dhcpStore != null) {
419 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
420 return dhcpRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
421 }
422 return false;
423 }
424
425 private boolean routeInRipStore(IpPrefix prefix) {
426
427 if (ripStore != null) {
428 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
429 return ripRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
430 }
431 return false;
432 }
433
Kalhee Kimba366062017-11-07 16:32:09 +0000434 private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
435 if (fpmMessage.type() == FpmHeader.FPM_TYPE_KEEPALIVE) {
436 return;
437 }
438
439 Netlink netlink = fpmMessage.netlink();
440 RtNetlink rtNetlink = netlink.rtNetlink();
441
442 if (log.isTraceEnabled()) {
443 log.trace("Received FPM message: {}", fpmMessage);
444 }
445
446 if (!(rtNetlink.protocol() == RtProtocol.ZEBRA ||
447 rtNetlink.protocol() == RtProtocol.UNSPEC)) {
448 log.trace("Ignoring non-zebra route");
449 return;
450 }
451
452 IpAddress dstAddress = null;
453 IpAddress gateway = null;
454
455 for (RouteAttribute attribute : rtNetlink.attributes()) {
456 if (attribute.type() == RouteAttribute.RTA_DST) {
457 RouteAttributeDst raDst = (RouteAttributeDst) attribute;
458 dstAddress = raDst.dstAddress();
459 } else if (attribute.type() == RouteAttribute.RTA_GATEWAY) {
460 RouteAttributeGateway raGateway = (RouteAttributeGateway) attribute;
461 gateway = raGateway.gateway();
462 }
463 }
464
465 if (dstAddress == null) {
466 log.error("Dst address missing!");
467 return;
468 }
469
470 IpPrefix prefix = IpPrefix.valueOf(dstAddress, rtNetlink.dstLength());
471
Kalhee Kim40beb722018-01-16 20:32:04 +0000472 // Ignore routes that we sent.
Charles Chanc3a48e32018-06-25 13:01:35 -0700473 if (gateway != null && (
474 (prefix.isIp4() && pdPushNextHopIPv4 != null &&
475 pdPushNextHopIPv4.contains(gateway.getIp4Address())) ||
476 (prefix.isIp6() && pdPushNextHopIPv6 != null &&
477 pdPushNextHopIPv6.contains(gateway.getIp6Address())))) {
Kalhee Kim40beb722018-01-16 20:32:04 +0000478 if (routeInDhcpStore(prefix) || routeInRipStore(prefix)) {
479 return;
480 }
481 }
482
Kalhee Kimba366062017-11-07 16:32:09 +0000483 List<Route> updates = new LinkedList<>();
484 List<Route> withdraws = new LinkedList<>();
485
486 Route route;
487 switch (netlink.type()) {
488 case RTM_NEWROUTE:
489 if (gateway == null) {
490 // We ignore interface routes with no gateway for now.
491 return;
492 }
493 route = new Route(Route.Source.FPM, prefix, gateway, clusterService.getLocalNode().id());
494
495
496 Route oldRoute = fpmRoutes.get(peer).put(prefix, route);
497
498 if (oldRoute != null) {
499 log.trace("Swapping {} with {}", oldRoute, route);
500 withdraws.add(oldRoute);
501 }
502 updates.add(route);
503 break;
504 case RTM_DELROUTE:
505 Route existing = fpmRoutes.get(peer).remove(prefix);
506 if (existing == null) {
507 log.warn("Got delete for non-existent prefix");
508 return;
509 }
510
511 route = new Route(Route.Source.FPM, prefix, existing.nextHop(), clusterService.getLocalNode().id());
512
513 withdraws.add(route);
514 break;
515 case RTM_GETROUTE:
516 default:
517 break;
518 }
519
Charles Chan035ed1f2018-01-30 16:00:32 -0800520 updateRouteStore(updates, withdraws);
521 }
522
523 private synchronized void updateRouteStore(Collection<Route> routesToAdd, Collection<Route> routesToRemove) {
524 routeService.withdraw(routesToRemove);
525 routeService.update(routesToAdd);
Kalhee Kimba366062017-11-07 16:32:09 +0000526 }
527
528 private void clearRoutes(FpmPeer peer) {
529 log.info("Clearing all routes for peer {}", peer);
530 Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
William Daviesd3687502019-06-19 20:23:17 +0000531
Kalhee Kimba366062017-11-07 16:32:09 +0000532 if (routes != null) {
Charles Chan035ed1f2018-01-30 16:00:32 -0800533 updateRouteStore(Lists.newArrayList(), routes.values());
Kalhee Kimba366062017-11-07 16:32:09 +0000534 }
535 }
536
537 public void processStaticRoutes() {
shalde064d1fe85b2018-06-15 19:01:29 -0400538 log.debug("processStaticRoutes function is called");
Kalhee Kimba366062017-11-07 16:32:09 +0000539 for (Channel ch : allChannels) {
540 processStaticRoutes(ch);
541 }
542 }
543
544 public void processStaticRoutes(Channel ch) {
545 processRipStaticRoutes(ch);
546 processDhcpStaticRoutes(ch);
547 }
548
549 private void processRipStaticRoutes(Channel ch) {
550
551 /* Get RIP static routes. */
552 if (ripStore != null) {
553 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
554 log.info("RIP store size is {}", ripRecords.size());
555
556 ripRecords.forEach(record -> sendRouteUpdateToChannel(true,
557 record.ipPrefix(), ch));
558 }
559 }
560
561 private void processDhcpStaticRoutes(Channel ch) {
562
563 /* Get Dhcp static routes. */
564 if (dhcpStore != null) {
565 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
566 log.info("Dhcp store size is {}", dhcpRecords.size());
567
568 dhcpRecords.forEach(record -> sendRouteUpdateToChannel(true,
569 record.ipPrefix(), ch));
570 }
571 }
572
shalde064d1fe85b2018-06-15 19:01:29 -0400573 private void updateRoute(IpAddress pdPushNextHop, boolean isAdd, IpPrefix prefix,
574 Channel ch, int raLength, short addrFamily) {
Kalhee Kimba366062017-11-07 16:32:09 +0000575 try {
shalde064d1fe85b2018-06-15 19:01:29 -0400576 RouteAttributeDst raDst = RouteAttributeDst.builder()
577 .length(raLength)
578 .type(RouteAttribute.RTA_DST)
579 .dstAddress(prefix.address())
580 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000581
Kalhee Kim715dd732018-01-23 14:39:56 +0000582 RouteAttributeGateway raGateway = RouteAttributeGateway.builder()
shalde064d1fe85b2018-06-15 19:01:29 -0400583 .length(raLength)
584 .type(RouteAttribute.RTA_GATEWAY)
585 .gateway(pdPushNextHop)
586 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000587
Kalhee Kim715dd732018-01-23 14:39:56 +0000588 // Build RtNetlink.
589 RtNetlink rtNetlink = RtNetlink.builder()
shalde064d1fe85b2018-06-15 19:01:29 -0400590 .addressFamily(addrFamily)
591 .dstLength(prefix.prefixLength())
592 .routeAttribute(raDst)
593 .routeAttribute(raGateway)
594 .build();
Kalhee Kim715dd732018-01-23 14:39:56 +0000595
596 // Build Netlink.
Kalhee Kimba366062017-11-07 16:32:09 +0000597 int messageLength = raDst.length() + raGateway.length() +
shalde064d1fe85b2018-06-15 19:01:29 -0400598 RtNetlink.RT_NETLINK_LENGTH + Netlink.NETLINK_HEADER_LENGTH;
Kalhee Kim715dd732018-01-23 14:39:56 +0000599 Netlink netLink = Netlink.builder()
shalde064d1fe85b2018-06-15 19:01:29 -0400600 .length(messageLength)
601 .type(isAdd ? NetlinkMessageType.RTM_NEWROUTE : NetlinkMessageType.RTM_DELROUTE)
602 .flags(Netlink.NETLINK_REQUEST | Netlink.NETLINK_CREATE)
603 .rtNetlink(rtNetlink)
604 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000605
Kalhee Kim715dd732018-01-23 14:39:56 +0000606 // Build FpmHeader.
Kalhee Kimba366062017-11-07 16:32:09 +0000607 messageLength += FpmHeader.FPM_HEADER_LENGTH;
Kalhee Kim715dd732018-01-23 14:39:56 +0000608 FpmHeader fpmMessage = FpmHeader.builder()
shalde064d1fe85b2018-06-15 19:01:29 -0400609 .version(FpmHeader.FPM_VERSION_1)
610 .type(FpmHeader.FPM_TYPE_NETLINK)
611 .length(messageLength)
612 .netlink(netLink)
613 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000614
615 // Encode message in a channel buffer and transmit.
616 ch.write(fpmMessage.encode());
Harshada Chaundkar11ac6e62019-09-25 17:42:33 +0000617 log.debug("Fpm Message for updated route {}", fpmMessage.toString());
Kalhee Kimba366062017-11-07 16:32:09 +0000618 } catch (RuntimeException e) {
619 log.info("Route not sent over fpm connection.");
620 }
621 }
622
shalde064d1fe85b2018-06-15 19:01:29 -0400623 private void sendRouteUpdateToChannel(boolean isAdd, IpPrefix prefix, Channel ch) {
624
625 if (!pdPushEnabled) {
626 return;
627 }
628 int raLength;
629 short addrFamily;
630
631 // Build route attributes.
632 if (prefix.isIp4()) {
633 List<Ip4Address> pdPushNextHopList;
634 if (pdPushNextHopIPv4 == null || pdPushNextHopIPv4.size() == 0) {
635 log.info("Prefix not pushed because ipv4 next-hop is null.");
636 return;
637 }
638 pdPushNextHopList = pdPushNextHopIPv4;
639 raLength = Ip4Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
640 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET;
641 for (Ip4Address pdPushNextHop: pdPushNextHopList) {
Ruchi Sahota3304e792019-01-15 00:05:57 +0000642 log.trace("IPv4 next hop is:" + pdPushNextHop);
shalde064d1fe85b2018-06-15 19:01:29 -0400643 updateRoute(pdPushNextHop, isAdd, prefix, ch, raLength, addrFamily);
644 }
645 } else {
646 List<Ip6Address> pdPushNextHopList;
647 if (pdPushNextHopIPv6 == null || pdPushNextHopIPv6.size() == 0) {
648 log.info("Prefix not pushed because ipv6 next-hop is null.");
649 return;
650 }
651 pdPushNextHopList = pdPushNextHopIPv6;
652 raLength = Ip6Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
653 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET6;
654 for (Ip6Address pdPushNextHop: pdPushNextHopList) {
Ruchi Sahota3304e792019-01-15 00:05:57 +0000655 log.trace("IPv6 next hop is:" + pdPushNextHop);
shalde064d1fe85b2018-06-15 19:01:29 -0400656 updateRoute(pdPushNextHop, isAdd, prefix, ch, raLength, addrFamily);
657 }
658 }
659 }
660
Kalhee Kimba366062017-11-07 16:32:09 +0000661 private void sendRouteUpdate(boolean isAdd, IpPrefix prefix) {
662
663 for (Channel ch : allChannels) {
664 sendRouteUpdateToChannel(isAdd, prefix, ch);
665 }
666 }
667
668 public boolean isPdPushEnabled() {
669 return pdPushEnabled;
670 }
671
672 private FpmPeerInfo toFpmInfo(FpmPeer peer, Collection<FpmConnectionInfo> connections) {
673 return new FpmPeerInfo(connections,
674 fpmRoutes.getOrDefault(peer, Collections.emptyMap()).size());
675 }
676
677 @Override
678 public Map<FpmPeer, FpmPeerInfo> peers() {
679 return peers.asJavaMap().entrySet().stream()
680 .collect(Collectors.toMap(
681 e -> e.getKey(),
682 e -> toFpmInfo(e.getKey(), e.getValue())));
683 }
684
William Daviesd3687502019-06-19 20:23:17 +0000685
686
687 @Override
688 public void updateAcceptRouteFlag(Collection<FpmPeerAcceptRoutes> modifiedPeers) {
689 modifiedPeers.forEach(modifiedPeer -> {
690 log.debug("FPM connection to {} is disabled", modifiedPeer);
691 NodeId localNode = clusterService.getLocalNode().id();
692 log.debug("Peer Flag {}", modifiedPeer.isAcceptRoutes());
693 peers.compute(modifiedPeer.peer(), (p, infos) -> {
694 if (infos == null) {
695 return null;
696 }
697 Iterator<FpmConnectionInfo> iterator = infos.iterator();
698 if (iterator.hasNext()) {
699 FpmConnectionInfo connectionInfo = iterator.next();
700 if (connectionInfo.isAcceptRoutes() == modifiedPeer.isAcceptRoutes()) {
701 return null;
702 }
703 localPeers.remove(modifiedPeer.peer());
704 infos.remove(connectionInfo);
705 infos.add(new FpmConnectionInfo(localNode, modifiedPeer.peer(),
706 System.currentTimeMillis(), modifiedPeer.isAcceptRoutes()));
707 localPeers.put(modifiedPeer.peer(), infos);
708 }
709 Map<IpPrefix, Route> routes = fpmRoutes.get(modifiedPeer.peer());
710 if (routes != null && !modifiedPeer.isAcceptRoutes()) {
711 updateRouteStore(Lists.newArrayList(), routes.values());
712 } else {
713 updateRouteStore(routes.values(), Lists.newArrayList());
714 }
715
716 return infos;
717 });
718 });
719
720 }
721
722
Kalhee Kimba366062017-11-07 16:32:09 +0000723 private class InternalFpmListener implements FpmListener {
724 @Override
725 public void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
726 FpmManager.this.fpmMessage(peer, fpmMessage);
727 }
728
729 @Override
730 public boolean peerConnected(FpmPeer peer) {
Harshada Chaundkar11ac6e62019-09-25 17:42:33 +0000731 log.info("FPM connection to {} was connected", peer);
Kalhee Kimba366062017-11-07 16:32:09 +0000732 if (peers.keySet().contains(peer)) {
733 return false;
734 }
735
736 NodeId localNode = clusterService.getLocalNode().id();
737 peers.compute(peer, (p, infos) -> {
738 if (infos == null) {
739 infos = new HashSet<>();
740 }
741
William Daviesd3687502019-06-19 20:23:17 +0000742 infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis(), true));
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700743 localPeers.put(peer, infos);
Kalhee Kimba366062017-11-07 16:32:09 +0000744 return infos;
745 });
746
747 fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
748 return true;
749 }
750
751 @Override
752 public void peerDisconnected(FpmPeer peer) {
753 log.info("FPM connection to {} went down", peer);
754
755 if (clearRoutes) {
756 clearRoutes(peer);
757 }
758
759 peers.compute(peer, (p, infos) -> {
760 if (infos == null) {
761 return null;
762 }
763
764 infos.stream()
765 .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
766 .findAny()
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700767 .ifPresent(i -> {
768 infos.remove(i);
769 localPeers.get(peer).remove(i);
770 });
Kalhee Kimba366062017-11-07 16:32:09 +0000771
772 if (infos.isEmpty()) {
773 return null;
774 }
775
776 return infos;
777 });
778 }
779 }
780
781 /**
782 * Adds a channel to the channel group.
783 *
784 * @param channel the channel to add
785 */
786 public void addSessionChannel(Channel channel) {
787 allChannels.add(channel);
788 }
789
790 /**
791 * Removes a channel from the channel group.
792 *
793 * @param channel the channel to remove
794 */
795 public void removeSessionChannel(Channel channel) {
796 allChannels.remove(channel);
797 }
798
799 /**
800 * Store delegate for Fpm Prefix store.
801 * Handles Fpm prefix store event.
802 */
803 class FpmPrefixStoreDelegate implements StoreDelegate<FpmPrefixStoreEvent> {
804
805 @Override
806 public void notify(FpmPrefixStoreEvent e) {
807
808 log.trace("FpmPrefixStoreEvent notify");
809
810 FpmRecord record = e.subject();
811 switch (e.type()) {
812 case ADD:
813 sendRouteUpdate(true, record.ipPrefix());
814 break;
815 case REMOVE:
816 sendRouteUpdate(false, record.ipPrefix());
817 break;
818 default:
819 log.warn("unsupported store event type", e.type());
820 return;
821 }
822 }
823 }
Charles Chan035ed1f2018-01-30 16:00:32 -0800824
825 private class InternalClusterListener implements ClusterEventListener {
826 @Override
827 public void event(ClusterEvent event) {
Jordan Halterman1c9a0b42018-08-13 02:41:50 -0700828 clusterEventExecutor.execute(() -> {
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700829 log.info("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
Jordan Halterman1c9a0b42018-08-13 02:41:50 -0700830 switch (event.type()) {
831 case INSTANCE_READY:
832 // When current node is healing from a network partition,
833 // seeing INSTANCE_READY means current node has the ability to read from the cluster,
834 // but it is possible that current node still can't write to the cluster at this moment.
835 // The AsyncDistributedLock is introduced to ensure we attempt to push FPM routes
836 // after current node can write.
837 // Adding 15 seconds retry for the current node to be able to write.
838 asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
839 if (result != null && result.isPresent()) {
840 log.debug("Lock obtained. Push local FPM routes to route store");
841 // All FPM routes on current node will be pushed again even when current node is not
842 // the one that becomes READY. A better way is to do this only on the minority nodes.
843 pushFpmRoutes();
844 localPeers.forEach((key, value) -> peers.put(key, value));
845 asyncLock.unlock();
846 } else {
847 log.debug("Fail to obtain lock. Abort.");
848 }
849 });
850 break;
851 case INSTANCE_DEACTIVATED:
852 case INSTANCE_REMOVED:
853 ImmutableMap.copyOf(peers.asJavaMap()).forEach((key, value) -> {
854 if (value != null) {
855 value.stream()
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700856 .filter(i -> i.connectedTo().equals(event.subject().id()))
857 .findAny()
858 .ifPresent(value::remove);
Harshada Chaundkar11ac6e62019-09-25 17:42:33 +0000859 log.info("Connection {} removed for disabled peer {}", value, key);
Jordan Halterman1c9a0b42018-08-13 02:41:50 -0700860 if (value.isEmpty()) {
861 peers.remove(key);
862 }
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700863 }
Jordan Halterman1c9a0b42018-08-13 02:41:50 -0700864 });
865 break;
866 case INSTANCE_ADDED:
867 case INSTANCE_ACTIVATED:
868 default:
869 break;
870 }
871 });
Charles Chan035ed1f2018-01-30 16:00:32 -0800872 }
873 }
874
Saurav Dase7f51012018-02-09 17:26:45 -0800875 @Override
Charles Chan035ed1f2018-01-30 16:00:32 -0800876 public void pushFpmRoutes() {
877 Set<Route> routes = fpmRoutes.values().stream()
878 .map(Map::entrySet).flatMap(Set::stream).map(Map.Entry::getValue)
879 .collect(Collectors.toSet());
880 updateRouteStore(routes, Lists.newArrayList());
881 log.info("{} FPM routes have been updated to route store", routes.size());
882 }
Kalhee Kimba366062017-11-07 16:32:09 +0000883}