blob: 6b6df43e1e2c788e865f766364822d74783b16e7 [file] [log] [blame]
Kalhee Kimba366062017-11-07 16:32:09 +00001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.routing.fpm;
18
Andrea Campanella4310f6e2018-03-27 16:35:39 -070019import com.google.common.collect.ImmutableMap;
Charles Chan035ed1f2018-01-30 16:00:32 -080020import com.google.common.collect.Lists;
Kalhee Kimba366062017-11-07 16:32:09 +000021import org.jboss.netty.bootstrap.ServerBootstrap;
22import org.jboss.netty.channel.Channel;
23import org.jboss.netty.channel.ChannelException;
24import org.jboss.netty.channel.ChannelFactory;
25import org.jboss.netty.channel.ChannelPipeline;
26import org.jboss.netty.channel.ChannelPipelineFactory;
27import org.jboss.netty.channel.Channels;
28import org.jboss.netty.channel.group.ChannelGroup;
29import org.jboss.netty.channel.group.DefaultChannelGroup;
30import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
31import org.jboss.netty.handler.timeout.IdleStateHandler;
32import org.jboss.netty.util.HashedWheelTimer;
Kalhee Kimba366062017-11-07 16:32:09 +000033import org.onlab.packet.Ip4Address;
34import org.onlab.packet.Ip6Address;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070035import org.onlab.packet.IpAddress;
Kalhee Kimba366062017-11-07 16:32:09 +000036import org.onlab.packet.IpPrefix;
Kalhee Kimba366062017-11-07 16:32:09 +000037import org.onlab.util.KryoNamespace;
38import org.onlab.util.Tools;
39import org.onosproject.cfg.ComponentConfigService;
Charles Chan035ed1f2018-01-30 16:00:32 -080040import org.onosproject.cluster.ClusterEvent;
41import org.onosproject.cluster.ClusterEventListener;
Kalhee Kimba366062017-11-07 16:32:09 +000042import org.onosproject.cluster.ClusterService;
43import org.onosproject.cluster.NodeId;
Kalhee Kimba366062017-11-07 16:32:09 +000044import org.onosproject.core.ApplicationId;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070045import org.onosproject.core.CoreService;
46import org.onosproject.net.host.InterfaceIpAddress;
47import org.onosproject.net.intf.Interface;
48import org.onosproject.net.intf.InterfaceService;
Kalhee Kimba366062017-11-07 16:32:09 +000049import org.onosproject.routeservice.Route;
50import org.onosproject.routeservice.RouteAdminService;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070051import org.onosproject.routing.fpm.api.FpmPrefixStore;
52import org.onosproject.routing.fpm.api.FpmPrefixStoreEvent;
53import org.onosproject.routing.fpm.api.FpmRecord;
Kalhee Kimba366062017-11-07 16:32:09 +000054import org.onosproject.routing.fpm.protocol.FpmHeader;
55import org.onosproject.routing.fpm.protocol.Netlink;
56import org.onosproject.routing.fpm.protocol.NetlinkMessageType;
57import org.onosproject.routing.fpm.protocol.RouteAttribute;
58import org.onosproject.routing.fpm.protocol.RouteAttributeDst;
59import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
60import org.onosproject.routing.fpm.protocol.RtNetlink;
61import org.onosproject.routing.fpm.protocol.RtProtocol;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070062import org.onosproject.store.StoreDelegate;
Kalhee Kimba366062017-11-07 16:32:09 +000063import org.onosproject.store.serializers.KryoNamespaces;
Charles Chan035ed1f2018-01-30 16:00:32 -080064import org.onosproject.store.service.AsyncDistributedLock;
Kalhee Kimba366062017-11-07 16:32:09 +000065import org.onosproject.store.service.ConsistentMap;
66import org.onosproject.store.service.Serializer;
67import org.onosproject.store.service.StorageService;
Kalhee Kimba366062017-11-07 16:32:09 +000068import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070069import org.osgi.service.component.annotations.Activate;
70import org.osgi.service.component.annotations.Component;
71import org.osgi.service.component.annotations.Deactivate;
72import org.osgi.service.component.annotations.Modified;
73import org.osgi.service.component.annotations.Reference;
74import org.osgi.service.component.annotations.ReferenceCardinality;
75import org.osgi.service.component.annotations.ReferencePolicy;
Kalhee Kimba366062017-11-07 16:32:09 +000076import org.slf4j.Logger;
77import org.slf4j.LoggerFactory;
78
79import java.net.InetSocketAddress;
Charles Chan035ed1f2018-01-30 16:00:32 -080080import java.time.Duration;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070081import java.util.ArrayList;
82import java.util.Arrays;
Kalhee Kimba366062017-11-07 16:32:09 +000083import java.util.Collection;
84import java.util.Collections;
85import java.util.Dictionary;
86import java.util.HashSet;
87import java.util.LinkedList;
Kalhee Kimba366062017-11-07 16:32:09 +000088import java.util.List;
89import java.util.Map;
90import java.util.Set;
William Davies5bae5052019-10-19 01:25:01 -070091import java.util.Iterator;
Kalhee Kimba366062017-11-07 16:32:09 +000092import java.util.concurrent.ConcurrentHashMap;
Jordan Haltermanaa2faca2018-08-13 02:41:50 -070093import java.util.concurrent.ExecutorService;
94import java.util.concurrent.Executors;
Kalhee Kimba366062017-11-07 16:32:09 +000095import java.util.stream.Collectors;
96
97import static java.util.concurrent.Executors.newCachedThreadPool;
98import static org.onlab.util.Tools.groupedThreads;
Ray Milkey8e406512018-10-24 15:56:50 -070099import static org.onosproject.routing.fpm.OsgiPropertyConstants.CLEAR_ROUTES;
100import static org.onosproject.routing.fpm.OsgiPropertyConstants.CLEAR_ROUTES_DEFAULT;
101import static org.onosproject.routing.fpm.OsgiPropertyConstants.PD_PUSH_ENABLED;
102import static org.onosproject.routing.fpm.OsgiPropertyConstants.PD_PUSH_ENABLED_DEFAULT;
103import static org.onosproject.routing.fpm.OsgiPropertyConstants.PD_PUSH_NEXT_HOP_IPV4;
104import static org.onosproject.routing.fpm.OsgiPropertyConstants.PD_PUSH_NEXT_HOP_IPV4_DEFAULT;
105import static org.onosproject.routing.fpm.OsgiPropertyConstants.PD_PUSH_NEXT_HOP_IPV6;
106import static org.onosproject.routing.fpm.OsgiPropertyConstants.PD_PUSH_NEXT_HOP_IPV6_DEFAULT;
Kalhee Kimba366062017-11-07 16:32:09 +0000107
108/**
109 * Forwarding Plane Manager (FPM) route source.
110 */
Ray Milkey8e406512018-10-24 15:56:50 -0700111@Component(
112 immediate = true,
113 service = FpmInfoService.class,
114 property = {
115 CLEAR_ROUTES + ":Boolean=" + CLEAR_ROUTES_DEFAULT,
116 PD_PUSH_ENABLED + ":Boolean=" + PD_PUSH_ENABLED_DEFAULT,
117 PD_PUSH_NEXT_HOP_IPV4 + "=" + PD_PUSH_NEXT_HOP_IPV4_DEFAULT,
118 PD_PUSH_NEXT_HOP_IPV6 + "=" + PD_PUSH_NEXT_HOP_IPV6_DEFAULT,
119 }
120)
Kalhee Kimba366062017-11-07 16:32:09 +0000121public class FpmManager implements FpmInfoService {
122 private final Logger log = LoggerFactory.getLogger(getClass());
123
124 private static final int FPM_PORT = 2620;
125 private static final String APP_NAME = "org.onosproject.fpm";
126 private static final int IDLE_TIMEOUT_SECS = 5;
Charles Chan035ed1f2018-01-30 16:00:32 -0800127 private static final String LOCK_NAME = "fpm-manager-lock";
Kalhee Kimba366062017-11-07 16:32:09 +0000128
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Kalhee Kimba366062017-11-07 16:32:09 +0000130 protected CoreService coreService;
131
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Kalhee Kimba366062017-11-07 16:32:09 +0000133 protected ComponentConfigService componentConfigService;
134
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700135 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Kalhee Kimba366062017-11-07 16:32:09 +0000136 protected RouteAdminService routeService;
137
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700138 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Kalhee Kimba366062017-11-07 16:32:09 +0000139 protected ClusterService clusterService;
140
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700141 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Kalhee Kimba366062017-11-07 16:32:09 +0000142 protected StorageService storageService;
143
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700144 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Kalhee Kimba366062017-11-07 16:32:09 +0000145 protected InterfaceService interfaceService;
146
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700147 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
Kalhee Kimba366062017-11-07 16:32:09 +0000148 bind = "bindRipStore",
149 unbind = "unbindRipStore",
150 policy = ReferencePolicy.DYNAMIC,
Ray Milkey5504bd22019-03-22 16:24:38 -0700151 target = "(_fpm_type=RIP)")
Kalhee Kimba366062017-11-07 16:32:09 +0000152 protected volatile FpmPrefixStore ripStore;
153
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700154 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
Kalhee Kimba366062017-11-07 16:32:09 +0000155 bind = "bindDhcpStore",
156 unbind = "unbindDhcpStore",
157 policy = ReferencePolicy.DYNAMIC,
Ray Milkey5504bd22019-03-22 16:24:38 -0700158 target = "(_fpm_type=DHCP)")
Kalhee Kimba366062017-11-07 16:32:09 +0000159 protected volatile FpmPrefixStore dhcpStore;
160
161 private final StoreDelegate<FpmPrefixStoreEvent> fpmPrefixStoreDelegate
162 = new FpmPrefixStoreDelegate();
163
164 private ApplicationId appId;
165 private ServerBootstrap serverBootstrap;
166 private Channel serverChannel;
167 private ChannelGroup allChannels = new DefaultChannelGroup();
Charles Chan035ed1f2018-01-30 16:00:32 -0800168 private final InternalClusterListener clusterListener = new InternalClusterListener();
169 private AsyncDistributedLock asyncLock;
Kalhee Kimba366062017-11-07 16:32:09 +0000170
Jordan Haltermanaa2faca2018-08-13 02:41:50 -0700171 private ExecutorService clusterEventExecutor;
172
Kalhee Kimba366062017-11-07 16:32:09 +0000173 private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
174
175 private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
176
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700177 //Local cache for peers to be used in case of cluster partition.
178 private Map<FpmPeer, Set<FpmConnectionInfo>> localPeers = new ConcurrentHashMap<>();
179
Ray Milkey8e406512018-10-24 15:56:50 -0700180 /** Whether to clear routes when the FPM connection goes down. */
181 private boolean clearRoutes = CLEAR_ROUTES_DEFAULT;
Kalhee Kimba366062017-11-07 16:32:09 +0000182
Ray Milkey8e406512018-10-24 15:56:50 -0700183 /** Whether to push prefixes to Quagga over fpm connection. */
184 private boolean pdPushEnabled = PD_PUSH_ENABLED_DEFAULT;
Kalhee Kimba366062017-11-07 16:32:09 +0000185
Ray Milkey8e406512018-10-24 15:56:50 -0700186 /** IPv4 next-hop address for PD Pushing. */
shalde064280feec2018-06-15 19:01:29 -0400187 private List<Ip4Address> pdPushNextHopIPv4 = null;
Kalhee Kimba366062017-11-07 16:32:09 +0000188
Ray Milkey8e406512018-10-24 15:56:50 -0700189 /** IPv6 next-hop address for PD Pushing. */
shalde064280feec2018-06-15 19:01:29 -0400190 private List<Ip6Address> pdPushNextHopIPv6 = null;
Kalhee Kimba366062017-11-07 16:32:09 +0000191
192 protected void bindRipStore(FpmPrefixStore store) {
193 if ((ripStore == null) && (store != null)) {
194 ripStore = store;
195 ripStore.setDelegate(fpmPrefixStoreDelegate);
196 for (Channel ch : allChannels) {
197 processRipStaticRoutes(ch);
198 }
199 }
200 }
201
202 protected void unbindRipStore(FpmPrefixStore store) {
203 if (ripStore == store) {
204 ripStore.unsetDelegate(fpmPrefixStoreDelegate);
205 ripStore = null;
206 }
207 }
208
209 protected void bindDhcpStore(FpmPrefixStore store) {
210 if ((dhcpStore == null) && (store != null)) {
211 dhcpStore = store;
212 dhcpStore.setDelegate(fpmPrefixStoreDelegate);
213 for (Channel ch : allChannels) {
214 processDhcpStaticRoutes(ch);
215 }
216 }
217 }
218
219 protected void unbindDhcpStore(FpmPrefixStore store) {
220 if (dhcpStore == store) {
221 dhcpStore.unsetDelegate(fpmPrefixStoreDelegate);
222 dhcpStore = null;
223 }
224 }
225
226 @Activate
227 protected void activate(ComponentContext context) {
228 componentConfigService.preSetProperty(
229 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
230 "distributed", "true");
231
232 componentConfigService.registerProperties(getClass());
233
234 KryoNamespace serializer = KryoNamespace.newBuilder()
235 .register(KryoNamespaces.API)
236 .register(FpmPeer.class)
237 .register(FpmConnectionInfo.class)
238 .build();
239 peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
240 .withName("fpm-connections")
241 .withSerializer(Serializer.using(serializer))
242 .build();
243
244 modified(context);
245 startServer();
246
247 appId = coreService.registerApplication(APP_NAME, peers::destroy);
248
Charles Chan035ed1f2018-01-30 16:00:32 -0800249 asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
Jordan Haltermanaa2faca2018-08-13 02:41:50 -0700250
251 clusterEventExecutor = Executors.newSingleThreadExecutor(groupedThreads("fpm-event-main", "%d", log));
Saurav Dase7f51012018-02-09 17:26:45 -0800252 clusterService.addListener(clusterListener);
Charles Chan035ed1f2018-01-30 16:00:32 -0800253
Kalhee Kimba366062017-11-07 16:32:09 +0000254 log.info("Started");
255 }
256
257 @Deactivate
258 protected void deactivate() {
259 componentConfigService.preSetProperty(
260 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
261 "distributed", "false");
262
263 stopServer();
264 fpmRoutes.clear();
265 componentConfigService.unregisterProperties(getClass(), false);
Charles Chan035ed1f2018-01-30 16:00:32 -0800266
267 clusterService.removeListener(clusterListener);
Jordan Haltermanaa2faca2018-08-13 02:41:50 -0700268 clusterEventExecutor.shutdown();
Charles Chan035ed1f2018-01-30 16:00:32 -0800269 asyncLock.unlock();
270
Kalhee Kimba366062017-11-07 16:32:09 +0000271 log.info("Stopped");
272 }
273
274 @Modified
275 protected void modified(ComponentContext context) {
shalde064280feec2018-06-15 19:01:29 -0400276 Ip4Address rurIPv4Address;
277 Ip6Address rurIPv6Address;
Kalhee Kimba366062017-11-07 16:32:09 +0000278 Dictionary<?, ?> properties = context.getProperties();
279 if (properties == null) {
280 return;
281 }
Ray Milkey8e406512018-10-24 15:56:50 -0700282 String strClearRoutes = Tools.get(properties, CLEAR_ROUTES);
Kalhee Kimba366062017-11-07 16:32:09 +0000283 if (strClearRoutes != null) {
284 clearRoutes = Boolean.parseBoolean(strClearRoutes);
285 log.info("clearRoutes is {}", clearRoutes);
286 }
287
Ray Milkey8e406512018-10-24 15:56:50 -0700288 String strPdPushEnabled = Tools.get(properties, PD_PUSH_ENABLED);
Kalhee Kimba366062017-11-07 16:32:09 +0000289 if (strPdPushEnabled != null) {
290 boolean oldValue = pdPushEnabled;
291 pdPushEnabled = Boolean.parseBoolean(strPdPushEnabled);
292 if (pdPushEnabled) {
293
shalde064280feec2018-06-15 19:01:29 -0400294 pdPushNextHopIPv4 = new ArrayList<Ip4Address>();
295 pdPushNextHopIPv6 = new ArrayList<Ip6Address>();
Kalhee Kimba366062017-11-07 16:32:09 +0000296
Ray Milkey8e406512018-10-24 15:56:50 -0700297 String strPdPushNextHopIPv4 = Tools.get(properties, PD_PUSH_NEXT_HOP_IPV4);
Kalhee Kimba366062017-11-07 16:32:09 +0000298 if (strPdPushNextHopIPv4 != null) {
shalde064280feec2018-06-15 19:01:29 -0400299 List<String> strPdPushNextHopIPv4List = Arrays.asList(strPdPushNextHopIPv4.split(","));
300 for (String nextHop : strPdPushNextHopIPv4List) {
Mayank Tiwari2d3a3082018-11-23 16:18:50 -0500301 log.trace("IPv4 next hop added is:" + nextHop);
shalde064280feec2018-06-15 19:01:29 -0400302 pdPushNextHopIPv4.add(Ip4Address.valueOf(nextHop));
303 }
Kalhee Kimba366062017-11-07 16:32:09 +0000304 }
Ray Milkey8e406512018-10-24 15:56:50 -0700305 String strPdPushNextHopIPv6 = Tools.get(properties, PD_PUSH_NEXT_HOP_IPV6);
Kalhee Kimba366062017-11-07 16:32:09 +0000306 if (strPdPushNextHopIPv6 != null) {
shalde064280feec2018-06-15 19:01:29 -0400307 List<String> strPdPushNextHopIPv6List = Arrays.asList(strPdPushNextHopIPv6.split(","));
308 for (String nextHop : strPdPushNextHopIPv6List) {
Mayank Tiwari2d3a3082018-11-23 16:18:50 -0500309 log.trace("IPv6 next hop added is:" + nextHop);
shalde064280feec2018-06-15 19:01:29 -0400310 pdPushNextHopIPv6.add(Ip6Address.valueOf(nextHop));
311 }
Kalhee Kimba366062017-11-07 16:32:09 +0000312 }
313
Ray Milkey032b9642018-06-21 08:28:12 -0700314 if (pdPushNextHopIPv4.size() == 0) {
shalde064280feec2018-06-15 19:01:29 -0400315 rurIPv4Address = interfaceService.getInterfaces()
Kalhee Kimba366062017-11-07 16:32:09 +0000316 .stream()
317 .filter(iface -> iface.name().contains("RUR"))
318 .map(Interface::ipAddressesList)
319 .flatMap(Collection::stream)
320 .map(InterfaceIpAddress::ipAddress)
321 .filter(IpAddress::isIp4)
322 .map(IpAddress::getIp4Address)
323 .findFirst()
324 .orElse(null);
Mayank Tiwaric679a022018-06-23 11:19:08 -0400325 log.debug("RUR IPv4 address extracted from netcfg is: {}", rurIPv4Address);
shalde064280feec2018-06-15 19:01:29 -0400326 if (rurIPv4Address != null) {
327 pdPushNextHopIPv4.add(rurIPv4Address);
Mayank Tiwaric679a022018-06-23 11:19:08 -0400328 } else {
329 log.debug("Unable to extract RUR IPv4 address from netcfg");
shalde064280feec2018-06-15 19:01:29 -0400330 }
331
Kalhee Kimba366062017-11-07 16:32:09 +0000332 }
333
Mayank Tiwaric679a022018-06-23 11:19:08 -0400334 if (pdPushNextHopIPv6 == null || pdPushNextHopIPv6.size() == 0) {
shalde064280feec2018-06-15 19:01:29 -0400335 rurIPv6Address = interfaceService.getInterfaces()
Kalhee Kimba366062017-11-07 16:32:09 +0000336 .stream()
337 .filter(iface -> iface.name().contains("RUR"))
338 .map(Interface::ipAddressesList)
339 .flatMap(Collection::stream)
340 .map(InterfaceIpAddress::ipAddress)
341 .filter(IpAddress::isIp6)
342 .map(IpAddress::getIp6Address)
343 .findFirst()
344 .orElse(null);
Mayank Tiwaric679a022018-06-23 11:19:08 -0400345 log.debug("RUR IPv6 address extracted from netcfg is: {}", rurIPv6Address);
shalde064280feec2018-06-15 19:01:29 -0400346 if (rurIPv6Address != null) {
347 pdPushNextHopIPv6.add(rurIPv6Address);
Mayank Tiwaric679a022018-06-23 11:19:08 -0400348 } else {
349 log.debug("Unable to extract RUR IPv6 address from netcfg");
shalde064280feec2018-06-15 19:01:29 -0400350 }
Kalhee Kimba366062017-11-07 16:32:09 +0000351 }
352
353 log.info("PD pushing is enabled.");
Ray Milkey032b9642018-06-21 08:28:12 -0700354 if (pdPushNextHopIPv4.size() != 0) {
shalde064280feec2018-06-15 19:01:29 -0400355 log.info("ipv4 next-hop {} with {} items", pdPushNextHopIPv4.toString(), pdPushNextHopIPv4.size());
Kalhee Kimba366062017-11-07 16:32:09 +0000356 } else {
357 log.info("ipv4 next-hop is null");
358 }
Ray Milkey032b9642018-06-21 08:28:12 -0700359 if (pdPushNextHopIPv6.size() != 0) {
shalde064280feec2018-06-15 19:01:29 -0400360 log.info("ipv6 next-hop={} with {} items", pdPushNextHopIPv6.toString(), pdPushNextHopIPv6.size());
Kalhee Kimba366062017-11-07 16:32:09 +0000361 } else {
362 log.info("ipv6 next-hop is null");
363 }
shalde064280feec2018-06-15 19:01:29 -0400364 processStaticRoutes();
Kalhee Kimba366062017-11-07 16:32:09 +0000365 } else {
366 log.info("PD pushing is disabled.");
367 }
368 }
369 }
370
371 private void startServer() {
372 HashedWheelTimer timer = new HashedWheelTimer(
373 groupedThreads("onos/fpm", "fpm-timer-%d", log));
374
375 ChannelFactory channelFactory = new NioServerSocketChannelFactory(
376 newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)),
377 newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log)));
378 ChannelPipelineFactory pipelineFactory = () -> {
379 // Allocate a new session per connection
380 IdleStateHandler idleHandler =
381 new IdleStateHandler(timer, IDLE_TIMEOUT_SECS, 0, 0);
382 FpmSessionHandler fpmSessionHandler =
383 new FpmSessionHandler(this, new InternalFpmListener());
384 FpmFrameDecoder fpmFrameDecoder = new FpmFrameDecoder();
385
386 // Setup the processing pipeline
387 ChannelPipeline pipeline = Channels.pipeline();
388 pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
389 pipeline.addLast("idle", idleHandler);
390 pipeline.addLast("FpmSession", fpmSessionHandler);
391 return pipeline;
392 };
393
394 InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT);
395
396 serverBootstrap = new ServerBootstrap(channelFactory);
397 serverBootstrap.setOption("child.reuseAddr", true);
398 serverBootstrap.setOption("child.keepAlive", true);
399 serverBootstrap.setOption("child.tcpNoDelay", true);
400 serverBootstrap.setPipelineFactory(pipelineFactory);
401 try {
402 serverChannel = serverBootstrap.bind(listenAddress);
403 allChannels.add(serverChannel);
404 } catch (ChannelException e) {
405 log.debug("Exception binding to FPM port {}: ",
406 listenAddress.getPort(), e);
407 stopServer();
408 }
409 }
410
411 private void stopServer() {
412 allChannels.close().awaitUninterruptibly();
413 allChannels.clear();
414 if (serverBootstrap != null) {
415 serverBootstrap.releaseExternalResources();
416 }
417
418 if (clearRoutes) {
Harshada Chaundkarf8f7e852019-09-25 17:42:33 +0000419 log.debug("Clearing routes for the peer");
Kalhee Kimba366062017-11-07 16:32:09 +0000420 peers.keySet().forEach(this::clearRoutes);
421 }
422 }
423
Kalhee Kim40beb722018-01-16 20:32:04 +0000424 private boolean routeInDhcpStore(IpPrefix prefix) {
425
426 if (dhcpStore != null) {
427 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
428 return dhcpRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
429 }
430 return false;
431 }
432
433 private boolean routeInRipStore(IpPrefix prefix) {
434
435 if (ripStore != null) {
436 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
437 return ripRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
438 }
439 return false;
440 }
441
Kalhee Kimba366062017-11-07 16:32:09 +0000442 private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
443 if (fpmMessage.type() == FpmHeader.FPM_TYPE_KEEPALIVE) {
444 return;
445 }
446
447 Netlink netlink = fpmMessage.netlink();
448 RtNetlink rtNetlink = netlink.rtNetlink();
449
450 if (log.isTraceEnabled()) {
451 log.trace("Received FPM message: {}", fpmMessage);
452 }
453
454 if (!(rtNetlink.protocol() == RtProtocol.ZEBRA ||
455 rtNetlink.protocol() == RtProtocol.UNSPEC)) {
456 log.trace("Ignoring non-zebra route");
457 return;
458 }
459
460 IpAddress dstAddress = null;
461 IpAddress gateway = null;
462
463 for (RouteAttribute attribute : rtNetlink.attributes()) {
464 if (attribute.type() == RouteAttribute.RTA_DST) {
465 RouteAttributeDst raDst = (RouteAttributeDst) attribute;
466 dstAddress = raDst.dstAddress();
467 } else if (attribute.type() == RouteAttribute.RTA_GATEWAY) {
468 RouteAttributeGateway raGateway = (RouteAttributeGateway) attribute;
469 gateway = raGateway.gateway();
470 }
471 }
472
473 if (dstAddress == null) {
474 log.error("Dst address missing!");
475 return;
476 }
477
478 IpPrefix prefix = IpPrefix.valueOf(dstAddress, rtNetlink.dstLength());
479
Kalhee Kim40beb722018-01-16 20:32:04 +0000480 // Ignore routes that we sent.
Charles Chaneb42a732018-06-25 13:01:35 -0700481 if (gateway != null && (
482 (prefix.isIp4() && pdPushNextHopIPv4 != null &&
483 pdPushNextHopIPv4.contains(gateway.getIp4Address())) ||
484 (prefix.isIp6() && pdPushNextHopIPv6 != null &&
485 pdPushNextHopIPv6.contains(gateway.getIp6Address())))) {
Kalhee Kim40beb722018-01-16 20:32:04 +0000486 if (routeInDhcpStore(prefix) || routeInRipStore(prefix)) {
487 return;
488 }
489 }
490
Kalhee Kimba366062017-11-07 16:32:09 +0000491 List<Route> updates = new LinkedList<>();
492 List<Route> withdraws = new LinkedList<>();
493
494 Route route;
495 switch (netlink.type()) {
496 case RTM_NEWROUTE:
497 if (gateway == null) {
498 // We ignore interface routes with no gateway for now.
499 return;
500 }
501 route = new Route(Route.Source.FPM, prefix, gateway, clusterService.getLocalNode().id());
502
503
504 Route oldRoute = fpmRoutes.get(peer).put(prefix, route);
505
506 if (oldRoute != null) {
507 log.trace("Swapping {} with {}", oldRoute, route);
508 withdraws.add(oldRoute);
509 }
510 updates.add(route);
511 break;
512 case RTM_DELROUTE:
513 Route existing = fpmRoutes.get(peer).remove(prefix);
514 if (existing == null) {
515 log.warn("Got delete for non-existent prefix");
516 return;
517 }
518
519 route = new Route(Route.Source.FPM, prefix, existing.nextHop(), clusterService.getLocalNode().id());
520
521 withdraws.add(route);
522 break;
523 case RTM_GETROUTE:
524 default:
525 break;
526 }
527
Charles Chan035ed1f2018-01-30 16:00:32 -0800528 updateRouteStore(updates, withdraws);
529 }
530
531 private synchronized void updateRouteStore(Collection<Route> routesToAdd, Collection<Route> routesToRemove) {
532 routeService.withdraw(routesToRemove);
533 routeService.update(routesToAdd);
Kalhee Kimba366062017-11-07 16:32:09 +0000534 }
535
536 private void clearRoutes(FpmPeer peer) {
537 log.info("Clearing all routes for peer {}", peer);
538 Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
539 if (routes != null) {
Charles Chan035ed1f2018-01-30 16:00:32 -0800540 updateRouteStore(Lists.newArrayList(), routes.values());
Kalhee Kimba366062017-11-07 16:32:09 +0000541 }
542 }
543
544 public void processStaticRoutes() {
shalde064280feec2018-06-15 19:01:29 -0400545 log.debug("processStaticRoutes function is called");
Kalhee Kimba366062017-11-07 16:32:09 +0000546 for (Channel ch : allChannels) {
547 processStaticRoutes(ch);
548 }
549 }
550
551 public void processStaticRoutes(Channel ch) {
552 processRipStaticRoutes(ch);
553 processDhcpStaticRoutes(ch);
554 }
555
556 private void processRipStaticRoutes(Channel ch) {
557
558 /* Get RIP static routes. */
559 if (ripStore != null) {
560 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
561 log.info("RIP store size is {}", ripRecords.size());
562
563 ripRecords.forEach(record -> sendRouteUpdateToChannel(true,
564 record.ipPrefix(), ch));
565 }
566 }
567
568 private void processDhcpStaticRoutes(Channel ch) {
569
570 /* Get Dhcp static routes. */
571 if (dhcpStore != null) {
572 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
573 log.info("Dhcp store size is {}", dhcpRecords.size());
574
575 dhcpRecords.forEach(record -> sendRouteUpdateToChannel(true,
576 record.ipPrefix(), ch));
577 }
578 }
579
shalde064280feec2018-06-15 19:01:29 -0400580 private void updateRoute(IpAddress pdPushNextHop, boolean isAdd, IpPrefix prefix,
581 Channel ch, int raLength, short addrFamily) {
Kalhee Kimba366062017-11-07 16:32:09 +0000582 try {
shalde064280feec2018-06-15 19:01:29 -0400583 RouteAttributeDst raDst = RouteAttributeDst.builder()
584 .length(raLength)
585 .type(RouteAttribute.RTA_DST)
586 .dstAddress(prefix.address())
587 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000588
Kalhee Kim715dd732018-01-23 14:39:56 +0000589 RouteAttributeGateway raGateway = RouteAttributeGateway.builder()
shalde064280feec2018-06-15 19:01:29 -0400590 .length(raLength)
591 .type(RouteAttribute.RTA_GATEWAY)
592 .gateway(pdPushNextHop)
593 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000594
Kalhee Kim715dd732018-01-23 14:39:56 +0000595 // Build RtNetlink.
596 RtNetlink rtNetlink = RtNetlink.builder()
shalde064280feec2018-06-15 19:01:29 -0400597 .addressFamily(addrFamily)
598 .dstLength(prefix.prefixLength())
599 .routeAttribute(raDst)
600 .routeAttribute(raGateway)
601 .build();
Kalhee Kim715dd732018-01-23 14:39:56 +0000602
603 // Build Netlink.
Kalhee Kimba366062017-11-07 16:32:09 +0000604 int messageLength = raDst.length() + raGateway.length() +
shalde064280feec2018-06-15 19:01:29 -0400605 RtNetlink.RT_NETLINK_LENGTH + Netlink.NETLINK_HEADER_LENGTH;
Kalhee Kim715dd732018-01-23 14:39:56 +0000606 Netlink netLink = Netlink.builder()
shalde064280feec2018-06-15 19:01:29 -0400607 .length(messageLength)
608 .type(isAdd ? NetlinkMessageType.RTM_NEWROUTE : NetlinkMessageType.RTM_DELROUTE)
609 .flags(Netlink.NETLINK_REQUEST | Netlink.NETLINK_CREATE)
610 .rtNetlink(rtNetlink)
611 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000612
Kalhee Kim715dd732018-01-23 14:39:56 +0000613 // Build FpmHeader.
Kalhee Kimba366062017-11-07 16:32:09 +0000614 messageLength += FpmHeader.FPM_HEADER_LENGTH;
Kalhee Kim715dd732018-01-23 14:39:56 +0000615 FpmHeader fpmMessage = FpmHeader.builder()
shalde064280feec2018-06-15 19:01:29 -0400616 .version(FpmHeader.FPM_VERSION_1)
617 .type(FpmHeader.FPM_TYPE_NETLINK)
618 .length(messageLength)
619 .netlink(netLink)
620 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000621
622 // Encode message in a channel buffer and transmit.
623 ch.write(fpmMessage.encode());
Harshada Chaundkarf8f7e852019-09-25 17:42:33 +0000624 log.debug("Fpm Message for updated route {}", fpmMessage.toString());
Kalhee Kimba366062017-11-07 16:32:09 +0000625 } catch (RuntimeException e) {
626 log.info("Route not sent over fpm connection.");
627 }
628 }
629
shalde064280feec2018-06-15 19:01:29 -0400630 private void sendRouteUpdateToChannel(boolean isAdd, IpPrefix prefix, Channel ch) {
631
632 if (!pdPushEnabled) {
633 return;
634 }
635 int raLength;
636 short addrFamily;
637
638 // Build route attributes.
639 if (prefix.isIp4()) {
640 List<Ip4Address> pdPushNextHopList;
641 if (pdPushNextHopIPv4 == null || pdPushNextHopIPv4.size() == 0) {
642 log.info("Prefix not pushed because ipv4 next-hop is null.");
643 return;
644 }
645 pdPushNextHopList = pdPushNextHopIPv4;
646 raLength = Ip4Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
647 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET;
648 for (Ip4Address pdPushNextHop: pdPushNextHopList) {
Harshada Chaundkar1a098eb2019-01-15 00:05:57 +0000649 log.trace("IPv4 next hop is:" + pdPushNextHop);
shalde064280feec2018-06-15 19:01:29 -0400650 updateRoute(pdPushNextHop, isAdd, prefix, ch, raLength, addrFamily);
651 }
652 } else {
653 List<Ip6Address> pdPushNextHopList;
654 if (pdPushNextHopIPv6 == null || pdPushNextHopIPv6.size() == 0) {
655 log.info("Prefix not pushed because ipv6 next-hop is null.");
656 return;
657 }
658 pdPushNextHopList = pdPushNextHopIPv6;
659 raLength = Ip6Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
660 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET6;
661 for (Ip6Address pdPushNextHop: pdPushNextHopList) {
Harshada Chaundkar1a098eb2019-01-15 00:05:57 +0000662 log.trace("IPv6 next hop is:" + pdPushNextHop);
shalde064280feec2018-06-15 19:01:29 -0400663 updateRoute(pdPushNextHop, isAdd, prefix, ch, raLength, addrFamily);
664 }
665 }
666 }
667
Kalhee Kimba366062017-11-07 16:32:09 +0000668 private void sendRouteUpdate(boolean isAdd, IpPrefix prefix) {
669
670 for (Channel ch : allChannels) {
671 sendRouteUpdateToChannel(isAdd, prefix, ch);
672 }
673 }
674
675 public boolean isPdPushEnabled() {
676 return pdPushEnabled;
677 }
678
679 private FpmPeerInfo toFpmInfo(FpmPeer peer, Collection<FpmConnectionInfo> connections) {
680 return new FpmPeerInfo(connections,
681 fpmRoutes.getOrDefault(peer, Collections.emptyMap()).size());
682 }
683
684 @Override
685 public Map<FpmPeer, FpmPeerInfo> peers() {
686 return peers.asJavaMap().entrySet().stream()
687 .collect(Collectors.toMap(
688 e -> e.getKey(),
689 e -> toFpmInfo(e.getKey(), e.getValue())));
690 }
691
William Davies5bae5052019-10-19 01:25:01 -0700692 @Override
693 public void updateAcceptRouteFlag(Collection<FpmPeerAcceptRoutes> modifiedPeers) {
694 modifiedPeers.forEach(modifiedPeer -> {
695 log.debug("FPM connection to {} is disabled", modifiedPeer);
696 NodeId localNode = clusterService.getLocalNode().id();
697 log.debug("Peer Flag {}", modifiedPeer.isAcceptRoutes());
698 peers.compute(modifiedPeer.peer(), (p, infos) -> {
699 if (infos == null) {
700 return null;
701 }
702 Iterator<FpmConnectionInfo> iterator = infos.iterator();
703 if (iterator.hasNext()) {
704 FpmConnectionInfo connectionInfo = iterator.next();
705 if (connectionInfo.isAcceptRoutes() == modifiedPeer.isAcceptRoutes()) {
706 return null;
707 }
708 localPeers.remove(modifiedPeer.peer());
709 infos.remove(connectionInfo);
710 infos.add(new FpmConnectionInfo(localNode, modifiedPeer.peer(),
711 System.currentTimeMillis(), modifiedPeer.isAcceptRoutes()));
712 localPeers.put(modifiedPeer.peer(), infos);
713 }
714 Map<IpPrefix, Route> routes = fpmRoutes.get(modifiedPeer.peer());
715 if (routes != null && !modifiedPeer.isAcceptRoutes()) {
716 updateRouteStore(Lists.newArrayList(), routes.values());
717 } else {
718 updateRouteStore(routes.values(), Lists.newArrayList());
719 }
720
721 return infos;
722 });
723 });
724
725 }
726
Kalhee Kimba366062017-11-07 16:32:09 +0000727 private class InternalFpmListener implements FpmListener {
728 @Override
729 public void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
730 FpmManager.this.fpmMessage(peer, fpmMessage);
731 }
732
733 @Override
734 public boolean peerConnected(FpmPeer peer) {
Harshada Chaundkarf8f7e852019-09-25 17:42:33 +0000735 log.info("FPM connection to {} was connected", peer);
Kalhee Kimba366062017-11-07 16:32:09 +0000736 if (peers.keySet().contains(peer)) {
737 return false;
738 }
739
740 NodeId localNode = clusterService.getLocalNode().id();
741 peers.compute(peer, (p, infos) -> {
742 if (infos == null) {
743 infos = new HashSet<>();
744 }
745
William Davies5bae5052019-10-19 01:25:01 -0700746 infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis(), true));
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700747 localPeers.put(peer, infos);
Kalhee Kimba366062017-11-07 16:32:09 +0000748 return infos;
749 });
750
751 fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
752 return true;
753 }
754
755 @Override
756 public void peerDisconnected(FpmPeer peer) {
757 log.info("FPM connection to {} went down", peer);
758
759 if (clearRoutes) {
760 clearRoutes(peer);
761 }
762
763 peers.compute(peer, (p, infos) -> {
764 if (infos == null) {
765 return null;
766 }
767
768 infos.stream()
769 .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
770 .findAny()
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700771 .ifPresent(i -> {
772 infos.remove(i);
773 localPeers.get(peer).remove(i);
774 });
Kalhee Kimba366062017-11-07 16:32:09 +0000775
776 if (infos.isEmpty()) {
777 return null;
778 }
779
780 return infos;
781 });
782 }
783 }
784
785 /**
786 * Adds a channel to the channel group.
787 *
788 * @param channel the channel to add
789 */
790 public void addSessionChannel(Channel channel) {
791 allChannels.add(channel);
792 }
793
794 /**
795 * Removes a channel from the channel group.
796 *
797 * @param channel the channel to remove
798 */
799 public void removeSessionChannel(Channel channel) {
800 allChannels.remove(channel);
801 }
802
803 /**
804 * Store delegate for Fpm Prefix store.
805 * Handles Fpm prefix store event.
806 */
807 class FpmPrefixStoreDelegate implements StoreDelegate<FpmPrefixStoreEvent> {
808
809 @Override
810 public void notify(FpmPrefixStoreEvent e) {
811
812 log.trace("FpmPrefixStoreEvent notify");
813
814 FpmRecord record = e.subject();
815 switch (e.type()) {
816 case ADD:
817 sendRouteUpdate(true, record.ipPrefix());
818 break;
819 case REMOVE:
820 sendRouteUpdate(false, record.ipPrefix());
821 break;
822 default:
823 log.warn("unsupported store event type", e.type());
824 return;
825 }
826 }
827 }
Charles Chan035ed1f2018-01-30 16:00:32 -0800828
829 private class InternalClusterListener implements ClusterEventListener {
830 @Override
831 public void event(ClusterEvent event) {
Jordan Haltermanaa2faca2018-08-13 02:41:50 -0700832 clusterEventExecutor.execute(() -> {
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700833 log.info("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
Jordan Haltermanaa2faca2018-08-13 02:41:50 -0700834 switch (event.type()) {
835 case INSTANCE_READY:
836 // When current node is healing from a network partition,
837 // seeing INSTANCE_READY means current node has the ability to read from the cluster,
838 // but it is possible that current node still can't write to the cluster at this moment.
839 // The AsyncDistributedLock is introduced to ensure we attempt to push FPM routes
840 // after current node can write.
841 // Adding 15 seconds retry for the current node to be able to write.
842 asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
843 if (result != null && result.isPresent()) {
844 log.debug("Lock obtained. Push local FPM routes to route store");
845 // All FPM routes on current node will be pushed again even when current node is not
846 // the one that becomes READY. A better way is to do this only on the minority nodes.
847 pushFpmRoutes();
848 localPeers.forEach((key, value) -> peers.put(key, value));
849 asyncLock.unlock();
850 } else {
851 log.debug("Fail to obtain lock. Abort.");
852 }
853 });
854 break;
855 case INSTANCE_DEACTIVATED:
856 case INSTANCE_REMOVED:
857 ImmutableMap.copyOf(peers.asJavaMap()).forEach((key, value) -> {
858 if (value != null) {
859 value.stream()
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700860 .filter(i -> i.connectedTo().equals(event.subject().id()))
861 .findAny()
862 .ifPresent(value::remove);
Harshada Chaundkarf8f7e852019-09-25 17:42:33 +0000863 log.info("Connection {} removed for disabled peer {}", value, key);
Jordan Haltermanaa2faca2018-08-13 02:41:50 -0700864 if (value.isEmpty()) {
865 peers.remove(key);
866 }
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700867 }
Jordan Haltermanaa2faca2018-08-13 02:41:50 -0700868 });
869 break;
870 case INSTANCE_ADDED:
871 case INSTANCE_ACTIVATED:
872 default:
873 break;
874 }
875 });
Charles Chan035ed1f2018-01-30 16:00:32 -0800876 }
877 }
878
Saurav Dase7f51012018-02-09 17:26:45 -0800879 @Override
Charles Chan035ed1f2018-01-30 16:00:32 -0800880 public void pushFpmRoutes() {
881 Set<Route> routes = fpmRoutes.values().stream()
882 .map(Map::entrySet).flatMap(Set::stream).map(Map.Entry::getValue)
883 .collect(Collectors.toSet());
884 updateRouteStore(routes, Lists.newArrayList());
885 log.info("{} FPM routes have been updated to route store", routes.size());
886 }
Kalhee Kimba366062017-11-07 16:32:09 +0000887}