blob: e9227556324f7495efac4166366f006e3ec456ec [file] [log] [blame]
Kalhee Kimba366062017-11-07 16:32:09 +00001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.routing.fpm;
18
Andrea Campanella4310f6e2018-03-27 16:35:39 -070019import com.google.common.collect.ImmutableMap;
Charles Chan035ed1f2018-01-30 16:00:32 -080020import com.google.common.collect.Lists;
Kalhee Kimba366062017-11-07 16:32:09 +000021import org.jboss.netty.bootstrap.ServerBootstrap;
22import org.jboss.netty.channel.Channel;
23import org.jboss.netty.channel.ChannelException;
24import org.jboss.netty.channel.ChannelFactory;
25import org.jboss.netty.channel.ChannelPipeline;
26import org.jboss.netty.channel.ChannelPipelineFactory;
27import org.jboss.netty.channel.Channels;
28import org.jboss.netty.channel.group.ChannelGroup;
29import org.jboss.netty.channel.group.DefaultChannelGroup;
30import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
31import org.jboss.netty.handler.timeout.IdleStateHandler;
32import org.jboss.netty.util.HashedWheelTimer;
Kalhee Kimba366062017-11-07 16:32:09 +000033import org.onlab.packet.Ip4Address;
34import org.onlab.packet.Ip6Address;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070035import org.onlab.packet.IpAddress;
Kalhee Kimba366062017-11-07 16:32:09 +000036import org.onlab.packet.IpPrefix;
Kalhee Kimba366062017-11-07 16:32:09 +000037import org.onlab.util.KryoNamespace;
38import org.onlab.util.Tools;
39import org.onosproject.cfg.ComponentConfigService;
Charles Chan035ed1f2018-01-30 16:00:32 -080040import org.onosproject.cluster.ClusterEvent;
41import org.onosproject.cluster.ClusterEventListener;
Kalhee Kimba366062017-11-07 16:32:09 +000042import org.onosproject.cluster.ClusterService;
43import org.onosproject.cluster.NodeId;
Kalhee Kimba366062017-11-07 16:32:09 +000044import org.onosproject.core.ApplicationId;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070045import org.onosproject.core.CoreService;
46import org.onosproject.net.host.InterfaceIpAddress;
47import org.onosproject.net.intf.Interface;
48import org.onosproject.net.intf.InterfaceService;
Kalhee Kimba366062017-11-07 16:32:09 +000049import org.onosproject.routeservice.Route;
50import org.onosproject.routeservice.RouteAdminService;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070051import org.onosproject.routing.fpm.api.FpmPrefixStore;
52import org.onosproject.routing.fpm.api.FpmPrefixStoreEvent;
53import org.onosproject.routing.fpm.api.FpmRecord;
Kalhee Kimba366062017-11-07 16:32:09 +000054import org.onosproject.routing.fpm.protocol.FpmHeader;
55import org.onosproject.routing.fpm.protocol.Netlink;
56import org.onosproject.routing.fpm.protocol.NetlinkMessageType;
57import org.onosproject.routing.fpm.protocol.RouteAttribute;
58import org.onosproject.routing.fpm.protocol.RouteAttributeDst;
59import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
60import org.onosproject.routing.fpm.protocol.RtNetlink;
61import org.onosproject.routing.fpm.protocol.RtProtocol;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070062import org.onosproject.store.StoreDelegate;
Kalhee Kimba366062017-11-07 16:32:09 +000063import org.onosproject.store.serializers.KryoNamespaces;
Charles Chan035ed1f2018-01-30 16:00:32 -080064import org.onosproject.store.service.AsyncDistributedLock;
Kalhee Kimba366062017-11-07 16:32:09 +000065import org.onosproject.store.service.ConsistentMap;
66import org.onosproject.store.service.Serializer;
67import org.onosproject.store.service.StorageService;
Kalhee Kimba366062017-11-07 16:32:09 +000068import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070069import org.osgi.service.component.annotations.Activate;
70import org.osgi.service.component.annotations.Component;
71import org.osgi.service.component.annotations.Deactivate;
72import org.osgi.service.component.annotations.Modified;
73import org.osgi.service.component.annotations.Reference;
74import org.osgi.service.component.annotations.ReferenceCardinality;
75import org.osgi.service.component.annotations.ReferencePolicy;
Kalhee Kimba366062017-11-07 16:32:09 +000076import org.slf4j.Logger;
77import org.slf4j.LoggerFactory;
78
79import java.net.InetSocketAddress;
Charles Chan035ed1f2018-01-30 16:00:32 -080080import java.time.Duration;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070081import java.util.ArrayList;
82import java.util.Arrays;
Kalhee Kimba366062017-11-07 16:32:09 +000083import java.util.Collection;
84import java.util.Collections;
85import java.util.Dictionary;
86import java.util.HashSet;
87import java.util.LinkedList;
Kalhee Kimba366062017-11-07 16:32:09 +000088import java.util.List;
89import java.util.Map;
90import java.util.Set;
91import java.util.concurrent.ConcurrentHashMap;
Jordan Haltermanaa2faca2018-08-13 02:41:50 -070092import java.util.concurrent.ExecutorService;
93import java.util.concurrent.Executors;
Kalhee Kimba366062017-11-07 16:32:09 +000094import java.util.stream.Collectors;
95
96import static java.util.concurrent.Executors.newCachedThreadPool;
97import static org.onlab.util.Tools.groupedThreads;
Ray Milkey8e406512018-10-24 15:56:50 -070098import static org.onosproject.routing.fpm.OsgiPropertyConstants.CLEAR_ROUTES;
99import static org.onosproject.routing.fpm.OsgiPropertyConstants.CLEAR_ROUTES_DEFAULT;
100import static org.onosproject.routing.fpm.OsgiPropertyConstants.PD_PUSH_ENABLED;
101import static org.onosproject.routing.fpm.OsgiPropertyConstants.PD_PUSH_ENABLED_DEFAULT;
102import static org.onosproject.routing.fpm.OsgiPropertyConstants.PD_PUSH_NEXT_HOP_IPV4;
103import static org.onosproject.routing.fpm.OsgiPropertyConstants.PD_PUSH_NEXT_HOP_IPV4_DEFAULT;
104import static org.onosproject.routing.fpm.OsgiPropertyConstants.PD_PUSH_NEXT_HOP_IPV6;
105import static org.onosproject.routing.fpm.OsgiPropertyConstants.PD_PUSH_NEXT_HOP_IPV6_DEFAULT;
Kalhee Kimba366062017-11-07 16:32:09 +0000106
107/**
108 * Forwarding Plane Manager (FPM) route source.
109 */
Ray Milkey8e406512018-10-24 15:56:50 -0700110@Component(
111 immediate = true,
112 service = FpmInfoService.class,
113 property = {
114 CLEAR_ROUTES + ":Boolean=" + CLEAR_ROUTES_DEFAULT,
115 PD_PUSH_ENABLED + ":Boolean=" + PD_PUSH_ENABLED_DEFAULT,
116 PD_PUSH_NEXT_HOP_IPV4 + "=" + PD_PUSH_NEXT_HOP_IPV4_DEFAULT,
117 PD_PUSH_NEXT_HOP_IPV6 + "=" + PD_PUSH_NEXT_HOP_IPV6_DEFAULT,
118 }
119)
Kalhee Kimba366062017-11-07 16:32:09 +0000120public class FpmManager implements FpmInfoService {
121 private final Logger log = LoggerFactory.getLogger(getClass());
122
123 private static final int FPM_PORT = 2620;
124 private static final String APP_NAME = "org.onosproject.fpm";
125 private static final int IDLE_TIMEOUT_SECS = 5;
Charles Chan035ed1f2018-01-30 16:00:32 -0800126 private static final String LOCK_NAME = "fpm-manager-lock";
Kalhee Kimba366062017-11-07 16:32:09 +0000127
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700128 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Kalhee Kimba366062017-11-07 16:32:09 +0000129 protected CoreService coreService;
130
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700131 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Kalhee Kimba366062017-11-07 16:32:09 +0000132 protected ComponentConfigService componentConfigService;
133
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700134 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Kalhee Kimba366062017-11-07 16:32:09 +0000135 protected RouteAdminService routeService;
136
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Kalhee Kimba366062017-11-07 16:32:09 +0000138 protected ClusterService clusterService;
139
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700140 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Kalhee Kimba366062017-11-07 16:32:09 +0000141 protected StorageService storageService;
142
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700143 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Kalhee Kimba366062017-11-07 16:32:09 +0000144 protected InterfaceService interfaceService;
145
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700146 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
Kalhee Kimba366062017-11-07 16:32:09 +0000147 bind = "bindRipStore",
148 unbind = "unbindRipStore",
149 policy = ReferencePolicy.DYNAMIC,
Ray Milkey5504bd22019-03-22 16:24:38 -0700150 target = "(_fpm_type=RIP)")
Kalhee Kimba366062017-11-07 16:32:09 +0000151 protected volatile FpmPrefixStore ripStore;
152
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700153 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
Kalhee Kimba366062017-11-07 16:32:09 +0000154 bind = "bindDhcpStore",
155 unbind = "unbindDhcpStore",
156 policy = ReferencePolicy.DYNAMIC,
Ray Milkey5504bd22019-03-22 16:24:38 -0700157 target = "(_fpm_type=DHCP)")
Kalhee Kimba366062017-11-07 16:32:09 +0000158 protected volatile FpmPrefixStore dhcpStore;
159
160 private final StoreDelegate<FpmPrefixStoreEvent> fpmPrefixStoreDelegate
161 = new FpmPrefixStoreDelegate();
162
163 private ApplicationId appId;
164 private ServerBootstrap serverBootstrap;
165 private Channel serverChannel;
166 private ChannelGroup allChannels = new DefaultChannelGroup();
Charles Chan035ed1f2018-01-30 16:00:32 -0800167 private final InternalClusterListener clusterListener = new InternalClusterListener();
168 private AsyncDistributedLock asyncLock;
Kalhee Kimba366062017-11-07 16:32:09 +0000169
Jordan Haltermanaa2faca2018-08-13 02:41:50 -0700170 private ExecutorService clusterEventExecutor;
171
Kalhee Kimba366062017-11-07 16:32:09 +0000172 private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
173
174 private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
175
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700176 //Local cache for peers to be used in case of cluster partition.
177 private Map<FpmPeer, Set<FpmConnectionInfo>> localPeers = new ConcurrentHashMap<>();
178
Ray Milkey8e406512018-10-24 15:56:50 -0700179 /** Whether to clear routes when the FPM connection goes down. */
180 private boolean clearRoutes = CLEAR_ROUTES_DEFAULT;
Kalhee Kimba366062017-11-07 16:32:09 +0000181
Ray Milkey8e406512018-10-24 15:56:50 -0700182 /** Whether to push prefixes to Quagga over fpm connection. */
183 private boolean pdPushEnabled = PD_PUSH_ENABLED_DEFAULT;
Kalhee Kimba366062017-11-07 16:32:09 +0000184
Ray Milkey8e406512018-10-24 15:56:50 -0700185 /** IPv4 next-hop address for PD Pushing. */
shalde064280feec2018-06-15 19:01:29 -0400186 private List<Ip4Address> pdPushNextHopIPv4 = null;
Kalhee Kimba366062017-11-07 16:32:09 +0000187
Ray Milkey8e406512018-10-24 15:56:50 -0700188 /** IPv6 next-hop address for PD Pushing. */
shalde064280feec2018-06-15 19:01:29 -0400189 private List<Ip6Address> pdPushNextHopIPv6 = null;
Kalhee Kimba366062017-11-07 16:32:09 +0000190
191 protected void bindRipStore(FpmPrefixStore store) {
192 if ((ripStore == null) && (store != null)) {
193 ripStore = store;
194 ripStore.setDelegate(fpmPrefixStoreDelegate);
195 for (Channel ch : allChannels) {
196 processRipStaticRoutes(ch);
197 }
198 }
199 }
200
201 protected void unbindRipStore(FpmPrefixStore store) {
202 if (ripStore == store) {
203 ripStore.unsetDelegate(fpmPrefixStoreDelegate);
204 ripStore = null;
205 }
206 }
207
208 protected void bindDhcpStore(FpmPrefixStore store) {
209 if ((dhcpStore == null) && (store != null)) {
210 dhcpStore = store;
211 dhcpStore.setDelegate(fpmPrefixStoreDelegate);
212 for (Channel ch : allChannels) {
213 processDhcpStaticRoutes(ch);
214 }
215 }
216 }
217
218 protected void unbindDhcpStore(FpmPrefixStore store) {
219 if (dhcpStore == store) {
220 dhcpStore.unsetDelegate(fpmPrefixStoreDelegate);
221 dhcpStore = null;
222 }
223 }
224
225 @Activate
226 protected void activate(ComponentContext context) {
227 componentConfigService.preSetProperty(
228 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
229 "distributed", "true");
230
231 componentConfigService.registerProperties(getClass());
232
233 KryoNamespace serializer = KryoNamespace.newBuilder()
234 .register(KryoNamespaces.API)
235 .register(FpmPeer.class)
236 .register(FpmConnectionInfo.class)
237 .build();
238 peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
239 .withName("fpm-connections")
240 .withSerializer(Serializer.using(serializer))
241 .build();
242
243 modified(context);
244 startServer();
245
246 appId = coreService.registerApplication(APP_NAME, peers::destroy);
247
Charles Chan035ed1f2018-01-30 16:00:32 -0800248 asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
Jordan Haltermanaa2faca2018-08-13 02:41:50 -0700249
250 clusterEventExecutor = Executors.newSingleThreadExecutor(groupedThreads("fpm-event-main", "%d", log));
Saurav Dase7f51012018-02-09 17:26:45 -0800251 clusterService.addListener(clusterListener);
Charles Chan035ed1f2018-01-30 16:00:32 -0800252
Kalhee Kimba366062017-11-07 16:32:09 +0000253 log.info("Started");
254 }
255
256 @Deactivate
257 protected void deactivate() {
258 componentConfigService.preSetProperty(
259 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
260 "distributed", "false");
261
262 stopServer();
263 fpmRoutes.clear();
264 componentConfigService.unregisterProperties(getClass(), false);
Charles Chan035ed1f2018-01-30 16:00:32 -0800265
266 clusterService.removeListener(clusterListener);
Jordan Haltermanaa2faca2018-08-13 02:41:50 -0700267 clusterEventExecutor.shutdown();
Charles Chan035ed1f2018-01-30 16:00:32 -0800268 asyncLock.unlock();
269
Kalhee Kimba366062017-11-07 16:32:09 +0000270 log.info("Stopped");
271 }
272
273 @Modified
274 protected void modified(ComponentContext context) {
shalde064280feec2018-06-15 19:01:29 -0400275 Ip4Address rurIPv4Address;
276 Ip6Address rurIPv6Address;
Kalhee Kimba366062017-11-07 16:32:09 +0000277 Dictionary<?, ?> properties = context.getProperties();
278 if (properties == null) {
279 return;
280 }
Ray Milkey8e406512018-10-24 15:56:50 -0700281 String strClearRoutes = Tools.get(properties, CLEAR_ROUTES);
Kalhee Kimba366062017-11-07 16:32:09 +0000282 if (strClearRoutes != null) {
283 clearRoutes = Boolean.parseBoolean(strClearRoutes);
284 log.info("clearRoutes is {}", clearRoutes);
285 }
286
Ray Milkey8e406512018-10-24 15:56:50 -0700287 String strPdPushEnabled = Tools.get(properties, PD_PUSH_ENABLED);
Kalhee Kimba366062017-11-07 16:32:09 +0000288 if (strPdPushEnabled != null) {
289 boolean oldValue = pdPushEnabled;
290 pdPushEnabled = Boolean.parseBoolean(strPdPushEnabled);
291 if (pdPushEnabled) {
292
shalde064280feec2018-06-15 19:01:29 -0400293 pdPushNextHopIPv4 = new ArrayList<Ip4Address>();
294 pdPushNextHopIPv6 = new ArrayList<Ip6Address>();
Kalhee Kimba366062017-11-07 16:32:09 +0000295
Ray Milkey8e406512018-10-24 15:56:50 -0700296 String strPdPushNextHopIPv4 = Tools.get(properties, PD_PUSH_NEXT_HOP_IPV4);
Kalhee Kimba366062017-11-07 16:32:09 +0000297 if (strPdPushNextHopIPv4 != null) {
shalde064280feec2018-06-15 19:01:29 -0400298 List<String> strPdPushNextHopIPv4List = Arrays.asList(strPdPushNextHopIPv4.split(","));
299 for (String nextHop : strPdPushNextHopIPv4List) {
Mayank Tiwari2d3a3082018-11-23 16:18:50 -0500300 log.trace("IPv4 next hop added is:" + nextHop);
shalde064280feec2018-06-15 19:01:29 -0400301 pdPushNextHopIPv4.add(Ip4Address.valueOf(nextHop));
302 }
Kalhee Kimba366062017-11-07 16:32:09 +0000303 }
Ray Milkey8e406512018-10-24 15:56:50 -0700304 String strPdPushNextHopIPv6 = Tools.get(properties, PD_PUSH_NEXT_HOP_IPV6);
Kalhee Kimba366062017-11-07 16:32:09 +0000305 if (strPdPushNextHopIPv6 != null) {
shalde064280feec2018-06-15 19:01:29 -0400306 List<String> strPdPushNextHopIPv6List = Arrays.asList(strPdPushNextHopIPv6.split(","));
307 for (String nextHop : strPdPushNextHopIPv6List) {
Mayank Tiwari2d3a3082018-11-23 16:18:50 -0500308 log.trace("IPv6 next hop added is:" + nextHop);
shalde064280feec2018-06-15 19:01:29 -0400309 pdPushNextHopIPv6.add(Ip6Address.valueOf(nextHop));
310 }
Kalhee Kimba366062017-11-07 16:32:09 +0000311 }
312
Ray Milkey032b9642018-06-21 08:28:12 -0700313 if (pdPushNextHopIPv4.size() == 0) {
shalde064280feec2018-06-15 19:01:29 -0400314 rurIPv4Address = interfaceService.getInterfaces()
Kalhee Kimba366062017-11-07 16:32:09 +0000315 .stream()
316 .filter(iface -> iface.name().contains("RUR"))
317 .map(Interface::ipAddressesList)
318 .flatMap(Collection::stream)
319 .map(InterfaceIpAddress::ipAddress)
320 .filter(IpAddress::isIp4)
321 .map(IpAddress::getIp4Address)
322 .findFirst()
323 .orElse(null);
Mayank Tiwaric679a022018-06-23 11:19:08 -0400324 log.debug("RUR IPv4 address extracted from netcfg is: {}", rurIPv4Address);
shalde064280feec2018-06-15 19:01:29 -0400325 if (rurIPv4Address != null) {
326 pdPushNextHopIPv4.add(rurIPv4Address);
Mayank Tiwaric679a022018-06-23 11:19:08 -0400327 } else {
328 log.debug("Unable to extract RUR IPv4 address from netcfg");
shalde064280feec2018-06-15 19:01:29 -0400329 }
330
Kalhee Kimba366062017-11-07 16:32:09 +0000331 }
332
Mayank Tiwaric679a022018-06-23 11:19:08 -0400333 if (pdPushNextHopIPv6 == null || pdPushNextHopIPv6.size() == 0) {
shalde064280feec2018-06-15 19:01:29 -0400334 rurIPv6Address = interfaceService.getInterfaces()
Kalhee Kimba366062017-11-07 16:32:09 +0000335 .stream()
336 .filter(iface -> iface.name().contains("RUR"))
337 .map(Interface::ipAddressesList)
338 .flatMap(Collection::stream)
339 .map(InterfaceIpAddress::ipAddress)
340 .filter(IpAddress::isIp6)
341 .map(IpAddress::getIp6Address)
342 .findFirst()
343 .orElse(null);
Mayank Tiwaric679a022018-06-23 11:19:08 -0400344 log.debug("RUR IPv6 address extracted from netcfg is: {}", rurIPv6Address);
shalde064280feec2018-06-15 19:01:29 -0400345 if (rurIPv6Address != null) {
346 pdPushNextHopIPv6.add(rurIPv6Address);
Mayank Tiwaric679a022018-06-23 11:19:08 -0400347 } else {
348 log.debug("Unable to extract RUR IPv6 address from netcfg");
shalde064280feec2018-06-15 19:01:29 -0400349 }
Kalhee Kimba366062017-11-07 16:32:09 +0000350 }
351
352 log.info("PD pushing is enabled.");
Ray Milkey032b9642018-06-21 08:28:12 -0700353 if (pdPushNextHopIPv4.size() != 0) {
shalde064280feec2018-06-15 19:01:29 -0400354 log.info("ipv4 next-hop {} with {} items", pdPushNextHopIPv4.toString(), pdPushNextHopIPv4.size());
Kalhee Kimba366062017-11-07 16:32:09 +0000355 } else {
356 log.info("ipv4 next-hop is null");
357 }
Ray Milkey032b9642018-06-21 08:28:12 -0700358 if (pdPushNextHopIPv6.size() != 0) {
shalde064280feec2018-06-15 19:01:29 -0400359 log.info("ipv6 next-hop={} with {} items", pdPushNextHopIPv6.toString(), pdPushNextHopIPv6.size());
Kalhee Kimba366062017-11-07 16:32:09 +0000360 } else {
361 log.info("ipv6 next-hop is null");
362 }
shalde064280feec2018-06-15 19:01:29 -0400363 processStaticRoutes();
Kalhee Kimba366062017-11-07 16:32:09 +0000364 } else {
365 log.info("PD pushing is disabled.");
366 }
367 }
368 }
369
370 private void startServer() {
371 HashedWheelTimer timer = new HashedWheelTimer(
372 groupedThreads("onos/fpm", "fpm-timer-%d", log));
373
374 ChannelFactory channelFactory = new NioServerSocketChannelFactory(
375 newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)),
376 newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log)));
377 ChannelPipelineFactory pipelineFactory = () -> {
378 // Allocate a new session per connection
379 IdleStateHandler idleHandler =
380 new IdleStateHandler(timer, IDLE_TIMEOUT_SECS, 0, 0);
381 FpmSessionHandler fpmSessionHandler =
382 new FpmSessionHandler(this, new InternalFpmListener());
383 FpmFrameDecoder fpmFrameDecoder = new FpmFrameDecoder();
384
385 // Setup the processing pipeline
386 ChannelPipeline pipeline = Channels.pipeline();
387 pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
388 pipeline.addLast("idle", idleHandler);
389 pipeline.addLast("FpmSession", fpmSessionHandler);
390 return pipeline;
391 };
392
393 InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT);
394
395 serverBootstrap = new ServerBootstrap(channelFactory);
396 serverBootstrap.setOption("child.reuseAddr", true);
397 serverBootstrap.setOption("child.keepAlive", true);
398 serverBootstrap.setOption("child.tcpNoDelay", true);
399 serverBootstrap.setPipelineFactory(pipelineFactory);
400 try {
401 serverChannel = serverBootstrap.bind(listenAddress);
402 allChannels.add(serverChannel);
403 } catch (ChannelException e) {
404 log.debug("Exception binding to FPM port {}: ",
405 listenAddress.getPort(), e);
406 stopServer();
407 }
408 }
409
410 private void stopServer() {
411 allChannels.close().awaitUninterruptibly();
412 allChannels.clear();
413 if (serverBootstrap != null) {
414 serverBootstrap.releaseExternalResources();
415 }
416
417 if (clearRoutes) {
Harshada Chaundkarf8f7e852019-09-25 17:42:33 +0000418 log.debug("Clearing routes for the peer");
Kalhee Kimba366062017-11-07 16:32:09 +0000419 peers.keySet().forEach(this::clearRoutes);
420 }
421 }
422
Kalhee Kim40beb722018-01-16 20:32:04 +0000423 private boolean routeInDhcpStore(IpPrefix prefix) {
424
425 if (dhcpStore != null) {
426 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
427 return dhcpRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
428 }
429 return false;
430 }
431
432 private boolean routeInRipStore(IpPrefix prefix) {
433
434 if (ripStore != null) {
435 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
436 return ripRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
437 }
438 return false;
439 }
440
Kalhee Kimba366062017-11-07 16:32:09 +0000441 private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
442 if (fpmMessage.type() == FpmHeader.FPM_TYPE_KEEPALIVE) {
443 return;
444 }
445
446 Netlink netlink = fpmMessage.netlink();
447 RtNetlink rtNetlink = netlink.rtNetlink();
448
449 if (log.isTraceEnabled()) {
450 log.trace("Received FPM message: {}", fpmMessage);
451 }
452
453 if (!(rtNetlink.protocol() == RtProtocol.ZEBRA ||
454 rtNetlink.protocol() == RtProtocol.UNSPEC)) {
455 log.trace("Ignoring non-zebra route");
456 return;
457 }
458
459 IpAddress dstAddress = null;
460 IpAddress gateway = null;
461
462 for (RouteAttribute attribute : rtNetlink.attributes()) {
463 if (attribute.type() == RouteAttribute.RTA_DST) {
464 RouteAttributeDst raDst = (RouteAttributeDst) attribute;
465 dstAddress = raDst.dstAddress();
466 } else if (attribute.type() == RouteAttribute.RTA_GATEWAY) {
467 RouteAttributeGateway raGateway = (RouteAttributeGateway) attribute;
468 gateway = raGateway.gateway();
469 }
470 }
471
472 if (dstAddress == null) {
473 log.error("Dst address missing!");
474 return;
475 }
476
477 IpPrefix prefix = IpPrefix.valueOf(dstAddress, rtNetlink.dstLength());
478
Kalhee Kim40beb722018-01-16 20:32:04 +0000479 // Ignore routes that we sent.
Charles Chaneb42a732018-06-25 13:01:35 -0700480 if (gateway != null && (
481 (prefix.isIp4() && pdPushNextHopIPv4 != null &&
482 pdPushNextHopIPv4.contains(gateway.getIp4Address())) ||
483 (prefix.isIp6() && pdPushNextHopIPv6 != null &&
484 pdPushNextHopIPv6.contains(gateway.getIp6Address())))) {
Kalhee Kim40beb722018-01-16 20:32:04 +0000485 if (routeInDhcpStore(prefix) || routeInRipStore(prefix)) {
486 return;
487 }
488 }
489
Kalhee Kimba366062017-11-07 16:32:09 +0000490 List<Route> updates = new LinkedList<>();
491 List<Route> withdraws = new LinkedList<>();
492
493 Route route;
494 switch (netlink.type()) {
495 case RTM_NEWROUTE:
496 if (gateway == null) {
497 // We ignore interface routes with no gateway for now.
498 return;
499 }
500 route = new Route(Route.Source.FPM, prefix, gateway, clusterService.getLocalNode().id());
501
502
503 Route oldRoute = fpmRoutes.get(peer).put(prefix, route);
504
505 if (oldRoute != null) {
506 log.trace("Swapping {} with {}", oldRoute, route);
507 withdraws.add(oldRoute);
508 }
509 updates.add(route);
510 break;
511 case RTM_DELROUTE:
512 Route existing = fpmRoutes.get(peer).remove(prefix);
513 if (existing == null) {
514 log.warn("Got delete for non-existent prefix");
515 return;
516 }
517
518 route = new Route(Route.Source.FPM, prefix, existing.nextHop(), clusterService.getLocalNode().id());
519
520 withdraws.add(route);
521 break;
522 case RTM_GETROUTE:
523 default:
524 break;
525 }
526
Charles Chan035ed1f2018-01-30 16:00:32 -0800527 updateRouteStore(updates, withdraws);
528 }
529
530 private synchronized void updateRouteStore(Collection<Route> routesToAdd, Collection<Route> routesToRemove) {
531 routeService.withdraw(routesToRemove);
532 routeService.update(routesToAdd);
Kalhee Kimba366062017-11-07 16:32:09 +0000533 }
534
535 private void clearRoutes(FpmPeer peer) {
536 log.info("Clearing all routes for peer {}", peer);
537 Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
538 if (routes != null) {
Charles Chan035ed1f2018-01-30 16:00:32 -0800539 updateRouteStore(Lists.newArrayList(), routes.values());
Kalhee Kimba366062017-11-07 16:32:09 +0000540 }
541 }
542
543 public void processStaticRoutes() {
shalde064280feec2018-06-15 19:01:29 -0400544 log.debug("processStaticRoutes function is called");
Kalhee Kimba366062017-11-07 16:32:09 +0000545 for (Channel ch : allChannels) {
546 processStaticRoutes(ch);
547 }
548 }
549
550 public void processStaticRoutes(Channel ch) {
551 processRipStaticRoutes(ch);
552 processDhcpStaticRoutes(ch);
553 }
554
555 private void processRipStaticRoutes(Channel ch) {
556
557 /* Get RIP static routes. */
558 if (ripStore != null) {
559 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
560 log.info("RIP store size is {}", ripRecords.size());
561
562 ripRecords.forEach(record -> sendRouteUpdateToChannel(true,
563 record.ipPrefix(), ch));
564 }
565 }
566
567 private void processDhcpStaticRoutes(Channel ch) {
568
569 /* Get Dhcp static routes. */
570 if (dhcpStore != null) {
571 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
572 log.info("Dhcp store size is {}", dhcpRecords.size());
573
574 dhcpRecords.forEach(record -> sendRouteUpdateToChannel(true,
575 record.ipPrefix(), ch));
576 }
577 }
578
shalde064280feec2018-06-15 19:01:29 -0400579 private void updateRoute(IpAddress pdPushNextHop, boolean isAdd, IpPrefix prefix,
580 Channel ch, int raLength, short addrFamily) {
Kalhee Kimba366062017-11-07 16:32:09 +0000581 try {
shalde064280feec2018-06-15 19:01:29 -0400582 RouteAttributeDst raDst = RouteAttributeDst.builder()
583 .length(raLength)
584 .type(RouteAttribute.RTA_DST)
585 .dstAddress(prefix.address())
586 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000587
Kalhee Kim715dd732018-01-23 14:39:56 +0000588 RouteAttributeGateway raGateway = RouteAttributeGateway.builder()
shalde064280feec2018-06-15 19:01:29 -0400589 .length(raLength)
590 .type(RouteAttribute.RTA_GATEWAY)
591 .gateway(pdPushNextHop)
592 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000593
Kalhee Kim715dd732018-01-23 14:39:56 +0000594 // Build RtNetlink.
595 RtNetlink rtNetlink = RtNetlink.builder()
shalde064280feec2018-06-15 19:01:29 -0400596 .addressFamily(addrFamily)
597 .dstLength(prefix.prefixLength())
598 .routeAttribute(raDst)
599 .routeAttribute(raGateway)
600 .build();
Kalhee Kim715dd732018-01-23 14:39:56 +0000601
602 // Build Netlink.
Kalhee Kimba366062017-11-07 16:32:09 +0000603 int messageLength = raDst.length() + raGateway.length() +
shalde064280feec2018-06-15 19:01:29 -0400604 RtNetlink.RT_NETLINK_LENGTH + Netlink.NETLINK_HEADER_LENGTH;
Kalhee Kim715dd732018-01-23 14:39:56 +0000605 Netlink netLink = Netlink.builder()
shalde064280feec2018-06-15 19:01:29 -0400606 .length(messageLength)
607 .type(isAdd ? NetlinkMessageType.RTM_NEWROUTE : NetlinkMessageType.RTM_DELROUTE)
608 .flags(Netlink.NETLINK_REQUEST | Netlink.NETLINK_CREATE)
609 .rtNetlink(rtNetlink)
610 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000611
Kalhee Kim715dd732018-01-23 14:39:56 +0000612 // Build FpmHeader.
Kalhee Kimba366062017-11-07 16:32:09 +0000613 messageLength += FpmHeader.FPM_HEADER_LENGTH;
Kalhee Kim715dd732018-01-23 14:39:56 +0000614 FpmHeader fpmMessage = FpmHeader.builder()
shalde064280feec2018-06-15 19:01:29 -0400615 .version(FpmHeader.FPM_VERSION_1)
616 .type(FpmHeader.FPM_TYPE_NETLINK)
617 .length(messageLength)
618 .netlink(netLink)
619 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000620
621 // Encode message in a channel buffer and transmit.
622 ch.write(fpmMessage.encode());
Harshada Chaundkarf8f7e852019-09-25 17:42:33 +0000623 log.debug("Fpm Message for updated route {}", fpmMessage.toString());
Kalhee Kimba366062017-11-07 16:32:09 +0000624 } catch (RuntimeException e) {
625 log.info("Route not sent over fpm connection.");
626 }
627 }
628
shalde064280feec2018-06-15 19:01:29 -0400629 private void sendRouteUpdateToChannel(boolean isAdd, IpPrefix prefix, Channel ch) {
630
631 if (!pdPushEnabled) {
632 return;
633 }
634 int raLength;
635 short addrFamily;
636
637 // Build route attributes.
638 if (prefix.isIp4()) {
639 List<Ip4Address> pdPushNextHopList;
640 if (pdPushNextHopIPv4 == null || pdPushNextHopIPv4.size() == 0) {
641 log.info("Prefix not pushed because ipv4 next-hop is null.");
642 return;
643 }
644 pdPushNextHopList = pdPushNextHopIPv4;
645 raLength = Ip4Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
646 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET;
647 for (Ip4Address pdPushNextHop: pdPushNextHopList) {
Harshada Chaundkar1a098eb2019-01-15 00:05:57 +0000648 log.trace("IPv4 next hop is:" + pdPushNextHop);
shalde064280feec2018-06-15 19:01:29 -0400649 updateRoute(pdPushNextHop, isAdd, prefix, ch, raLength, addrFamily);
650 }
651 } else {
652 List<Ip6Address> pdPushNextHopList;
653 if (pdPushNextHopIPv6 == null || pdPushNextHopIPv6.size() == 0) {
654 log.info("Prefix not pushed because ipv6 next-hop is null.");
655 return;
656 }
657 pdPushNextHopList = pdPushNextHopIPv6;
658 raLength = Ip6Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
659 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET6;
660 for (Ip6Address pdPushNextHop: pdPushNextHopList) {
Harshada Chaundkar1a098eb2019-01-15 00:05:57 +0000661 log.trace("IPv6 next hop is:" + pdPushNextHop);
shalde064280feec2018-06-15 19:01:29 -0400662 updateRoute(pdPushNextHop, isAdd, prefix, ch, raLength, addrFamily);
663 }
664 }
665 }
666
Kalhee Kimba366062017-11-07 16:32:09 +0000667 private void sendRouteUpdate(boolean isAdd, IpPrefix prefix) {
668
669 for (Channel ch : allChannels) {
670 sendRouteUpdateToChannel(isAdd, prefix, ch);
671 }
672 }
673
674 public boolean isPdPushEnabled() {
675 return pdPushEnabled;
676 }
677
678 private FpmPeerInfo toFpmInfo(FpmPeer peer, Collection<FpmConnectionInfo> connections) {
679 return new FpmPeerInfo(connections,
680 fpmRoutes.getOrDefault(peer, Collections.emptyMap()).size());
681 }
682
683 @Override
684 public Map<FpmPeer, FpmPeerInfo> peers() {
685 return peers.asJavaMap().entrySet().stream()
686 .collect(Collectors.toMap(
687 e -> e.getKey(),
688 e -> toFpmInfo(e.getKey(), e.getValue())));
689 }
690
691 private class InternalFpmListener implements FpmListener {
692 @Override
693 public void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
694 FpmManager.this.fpmMessage(peer, fpmMessage);
695 }
696
697 @Override
698 public boolean peerConnected(FpmPeer peer) {
Harshada Chaundkarf8f7e852019-09-25 17:42:33 +0000699 log.info("FPM connection to {} was connected", peer);
Kalhee Kimba366062017-11-07 16:32:09 +0000700 if (peers.keySet().contains(peer)) {
701 return false;
702 }
703
704 NodeId localNode = clusterService.getLocalNode().id();
705 peers.compute(peer, (p, infos) -> {
706 if (infos == null) {
707 infos = new HashSet<>();
708 }
709
710 infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700711 localPeers.put(peer, infos);
Kalhee Kimba366062017-11-07 16:32:09 +0000712 return infos;
713 });
714
715 fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
716 return true;
717 }
718
719 @Override
720 public void peerDisconnected(FpmPeer peer) {
721 log.info("FPM connection to {} went down", peer);
722
723 if (clearRoutes) {
724 clearRoutes(peer);
725 }
726
727 peers.compute(peer, (p, infos) -> {
728 if (infos == null) {
729 return null;
730 }
731
732 infos.stream()
733 .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
734 .findAny()
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700735 .ifPresent(i -> {
736 infos.remove(i);
737 localPeers.get(peer).remove(i);
738 });
Kalhee Kimba366062017-11-07 16:32:09 +0000739
740 if (infos.isEmpty()) {
741 return null;
742 }
743
744 return infos;
745 });
746 }
747 }
748
749 /**
750 * Adds a channel to the channel group.
751 *
752 * @param channel the channel to add
753 */
754 public void addSessionChannel(Channel channel) {
755 allChannels.add(channel);
756 }
757
758 /**
759 * Removes a channel from the channel group.
760 *
761 * @param channel the channel to remove
762 */
763 public void removeSessionChannel(Channel channel) {
764 allChannels.remove(channel);
765 }
766
767 /**
768 * Store delegate for Fpm Prefix store.
769 * Handles Fpm prefix store event.
770 */
771 class FpmPrefixStoreDelegate implements StoreDelegate<FpmPrefixStoreEvent> {
772
773 @Override
774 public void notify(FpmPrefixStoreEvent e) {
775
776 log.trace("FpmPrefixStoreEvent notify");
777
778 FpmRecord record = e.subject();
779 switch (e.type()) {
780 case ADD:
781 sendRouteUpdate(true, record.ipPrefix());
782 break;
783 case REMOVE:
784 sendRouteUpdate(false, record.ipPrefix());
785 break;
786 default:
787 log.warn("unsupported store event type", e.type());
788 return;
789 }
790 }
791 }
Charles Chan035ed1f2018-01-30 16:00:32 -0800792
793 private class InternalClusterListener implements ClusterEventListener {
794 @Override
795 public void event(ClusterEvent event) {
Jordan Haltermanaa2faca2018-08-13 02:41:50 -0700796 clusterEventExecutor.execute(() -> {
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700797 log.info("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
Jordan Haltermanaa2faca2018-08-13 02:41:50 -0700798 switch (event.type()) {
799 case INSTANCE_READY:
800 // When current node is healing from a network partition,
801 // seeing INSTANCE_READY means current node has the ability to read from the cluster,
802 // but it is possible that current node still can't write to the cluster at this moment.
803 // The AsyncDistributedLock is introduced to ensure we attempt to push FPM routes
804 // after current node can write.
805 // Adding 15 seconds retry for the current node to be able to write.
806 asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
807 if (result != null && result.isPresent()) {
808 log.debug("Lock obtained. Push local FPM routes to route store");
809 // All FPM routes on current node will be pushed again even when current node is not
810 // the one that becomes READY. A better way is to do this only on the minority nodes.
811 pushFpmRoutes();
812 localPeers.forEach((key, value) -> peers.put(key, value));
813 asyncLock.unlock();
814 } else {
815 log.debug("Fail to obtain lock. Abort.");
816 }
817 });
818 break;
819 case INSTANCE_DEACTIVATED:
820 case INSTANCE_REMOVED:
821 ImmutableMap.copyOf(peers.asJavaMap()).forEach((key, value) -> {
822 if (value != null) {
823 value.stream()
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700824 .filter(i -> i.connectedTo().equals(event.subject().id()))
825 .findAny()
826 .ifPresent(value::remove);
Harshada Chaundkarf8f7e852019-09-25 17:42:33 +0000827 log.info("Connection {} removed for disabled peer {}", value, key);
Jordan Haltermanaa2faca2018-08-13 02:41:50 -0700828 if (value.isEmpty()) {
829 peers.remove(key);
830 }
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700831 }
Jordan Haltermanaa2faca2018-08-13 02:41:50 -0700832 });
833 break;
834 case INSTANCE_ADDED:
835 case INSTANCE_ACTIVATED:
836 default:
837 break;
838 }
839 });
Charles Chan035ed1f2018-01-30 16:00:32 -0800840 }
841 }
842
Saurav Dase7f51012018-02-09 17:26:45 -0800843 @Override
Charles Chan035ed1f2018-01-30 16:00:32 -0800844 public void pushFpmRoutes() {
845 Set<Route> routes = fpmRoutes.values().stream()
846 .map(Map::entrySet).flatMap(Set::stream).map(Map.Entry::getValue)
847 .collect(Collectors.toSet());
848 updateRouteStore(routes, Lists.newArrayList());
849 log.info("{} FPM routes have been updated to route store", routes.size());
850 }
Kalhee Kimba366062017-11-07 16:32:09 +0000851}