blob: 22c0209d5d21494447ebc5705f767a40b24d605b [file] [log] [blame]
Kalhee Kimba366062017-11-07 16:32:09 +00001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.routing.fpm;
18
Andrea Campanella4310f6e2018-03-27 16:35:39 -070019import com.google.common.collect.ImmutableMap;
Charles Chan035ed1f2018-01-30 16:00:32 -080020import com.google.common.collect.Lists;
Kalhee Kimba366062017-11-07 16:32:09 +000021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Modified;
25import org.apache.felix.scr.annotations.Property;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.ReferencePolicy;
29import org.apache.felix.scr.annotations.Service;
30import org.jboss.netty.bootstrap.ServerBootstrap;
31import org.jboss.netty.channel.Channel;
32import org.jboss.netty.channel.ChannelException;
33import org.jboss.netty.channel.ChannelFactory;
34import org.jboss.netty.channel.ChannelPipeline;
35import org.jboss.netty.channel.ChannelPipelineFactory;
36import org.jboss.netty.channel.Channels;
37import org.jboss.netty.channel.group.ChannelGroup;
38import org.jboss.netty.channel.group.DefaultChannelGroup;
39import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
40import org.jboss.netty.handler.timeout.IdleStateHandler;
41import org.jboss.netty.util.HashedWheelTimer;
Kalhee Kimba366062017-11-07 16:32:09 +000042import org.onlab.packet.Ip4Address;
43import org.onlab.packet.Ip6Address;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070044import org.onlab.packet.IpAddress;
Kalhee Kimba366062017-11-07 16:32:09 +000045import org.onlab.packet.IpPrefix;
Kalhee Kimba366062017-11-07 16:32:09 +000046import org.onlab.util.KryoNamespace;
47import org.onlab.util.Tools;
48import org.onosproject.cfg.ComponentConfigService;
Charles Chan035ed1f2018-01-30 16:00:32 -080049import org.onosproject.cluster.ClusterEvent;
50import org.onosproject.cluster.ClusterEventListener;
Kalhee Kimba366062017-11-07 16:32:09 +000051import org.onosproject.cluster.ClusterService;
52import org.onosproject.cluster.NodeId;
Kalhee Kimba366062017-11-07 16:32:09 +000053import org.onosproject.core.ApplicationId;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070054import org.onosproject.core.CoreService;
55import org.onosproject.net.host.InterfaceIpAddress;
56import org.onosproject.net.intf.Interface;
57import org.onosproject.net.intf.InterfaceService;
Kalhee Kimba366062017-11-07 16:32:09 +000058import org.onosproject.routeservice.Route;
59import org.onosproject.routeservice.RouteAdminService;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070060import org.onosproject.routing.fpm.api.FpmPrefixStore;
61import org.onosproject.routing.fpm.api.FpmPrefixStoreEvent;
62import org.onosproject.routing.fpm.api.FpmRecord;
Kalhee Kimba366062017-11-07 16:32:09 +000063import org.onosproject.routing.fpm.protocol.FpmHeader;
64import org.onosproject.routing.fpm.protocol.Netlink;
65import org.onosproject.routing.fpm.protocol.NetlinkMessageType;
66import org.onosproject.routing.fpm.protocol.RouteAttribute;
67import org.onosproject.routing.fpm.protocol.RouteAttributeDst;
68import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
69import org.onosproject.routing.fpm.protocol.RtNetlink;
70import org.onosproject.routing.fpm.protocol.RtProtocol;
Andrea Campanella4310f6e2018-03-27 16:35:39 -070071import org.onosproject.store.StoreDelegate;
Kalhee Kimba366062017-11-07 16:32:09 +000072import org.onosproject.store.serializers.KryoNamespaces;
Charles Chan035ed1f2018-01-30 16:00:32 -080073import org.onosproject.store.service.AsyncDistributedLock;
Kalhee Kimba366062017-11-07 16:32:09 +000074import org.onosproject.store.service.ConsistentMap;
75import org.onosproject.store.service.Serializer;
76import org.onosproject.store.service.StorageService;
Kalhee Kimba366062017-11-07 16:32:09 +000077import org.osgi.service.component.ComponentContext;
78import org.slf4j.Logger;
79import org.slf4j.LoggerFactory;
80
81import java.net.InetSocketAddress;
Charles Chan035ed1f2018-01-30 16:00:32 -080082import java.time.Duration;
Kalhee Kimba366062017-11-07 16:32:09 +000083import java.util.Collection;
84import java.util.Collections;
85import java.util.Dictionary;
86import java.util.HashSet;
87import java.util.LinkedList;
Kalhee Kimba366062017-11-07 16:32:09 +000088import java.util.List;
shalde064d1fe85b2018-06-15 19:01:29 -040089import java.util.ArrayList;
90import java.util.Arrays;
Kalhee Kimba366062017-11-07 16:32:09 +000091import java.util.Map;
92import java.util.Set;
93import java.util.concurrent.ConcurrentHashMap;
Jordan Halterman1c9a0b42018-08-13 02:41:50 -070094import java.util.concurrent.ExecutorService;
95import java.util.concurrent.Executors;
Kalhee Kimba366062017-11-07 16:32:09 +000096import java.util.stream.Collectors;
97
98import static java.util.concurrent.Executors.newCachedThreadPool;
99import static org.onlab.util.Tools.groupedThreads;
Kalhee Kimba366062017-11-07 16:32:09 +0000100
101/**
102 * Forwarding Plane Manager (FPM) route source.
103 */
104@Service
105@Component(immediate = true)
106public class FpmManager implements FpmInfoService {
107 private final Logger log = LoggerFactory.getLogger(getClass());
108
109 private static final int FPM_PORT = 2620;
110 private static final String APP_NAME = "org.onosproject.fpm";
111 private static final int IDLE_TIMEOUT_SECS = 5;
Charles Chan035ed1f2018-01-30 16:00:32 -0800112 private static final String LOCK_NAME = "fpm-manager-lock";
Kalhee Kimba366062017-11-07 16:32:09 +0000113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
115 protected CoreService coreService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
118 protected ComponentConfigService componentConfigService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
121 protected RouteAdminService routeService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
124 protected ClusterService clusterService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
127 protected StorageService storageService;
128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
130 protected InterfaceService interfaceService;
131
132 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
133 bind = "bindRipStore",
134 unbind = "unbindRipStore",
135 policy = ReferencePolicy.DYNAMIC,
136 target = "(fpm_type=RIP)")
137 protected volatile FpmPrefixStore ripStore;
138
139 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
140 bind = "bindDhcpStore",
141 unbind = "unbindDhcpStore",
142 policy = ReferencePolicy.DYNAMIC,
143 target = "(fpm_type=DHCP)")
144 protected volatile FpmPrefixStore dhcpStore;
145
146 private final StoreDelegate<FpmPrefixStoreEvent> fpmPrefixStoreDelegate
147 = new FpmPrefixStoreDelegate();
148
149 private ApplicationId appId;
150 private ServerBootstrap serverBootstrap;
151 private Channel serverChannel;
152 private ChannelGroup allChannels = new DefaultChannelGroup();
Charles Chan035ed1f2018-01-30 16:00:32 -0800153 private final InternalClusterListener clusterListener = new InternalClusterListener();
154 private AsyncDistributedLock asyncLock;
Kalhee Kimba366062017-11-07 16:32:09 +0000155
Jordan Halterman1c9a0b42018-08-13 02:41:50 -0700156 private ExecutorService clusterEventExecutor;
157
Kalhee Kimba366062017-11-07 16:32:09 +0000158 private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
159
160 private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
161
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700162 //Local cache for peers to be used in case of cluster partition.
163 private Map<FpmPeer, Set<FpmConnectionInfo>> localPeers = new ConcurrentHashMap<>();
164
Kalhee Kimba366062017-11-07 16:32:09 +0000165 @Property(name = "clearRoutes", boolValue = true,
166 label = "Whether to clear routes when the FPM connection goes down")
167 private boolean clearRoutes = true;
168
169 @Property(name = "pdPushEnabled", boolValue = false,
170 label = "Whether to push prefixes to Quagga over fpm connection")
171 private boolean pdPushEnabled = false;
172
173 @Property(name = "pdPushNextHopIPv4", value = "",
174 label = "IPv4 next-hop address for PD Pushing.")
shalde064d1fe85b2018-06-15 19:01:29 -0400175 private List<Ip4Address> pdPushNextHopIPv4 = null;
Kalhee Kimba366062017-11-07 16:32:09 +0000176
177 @Property(name = "pdPushNextHopIPv6", value = "",
178 label = "IPv6 next-hop address for PD Pushing.")
shalde064d1fe85b2018-06-15 19:01:29 -0400179 private List<Ip6Address> pdPushNextHopIPv6 = null;
Kalhee Kimba366062017-11-07 16:32:09 +0000180
181 protected void bindRipStore(FpmPrefixStore store) {
182 if ((ripStore == null) && (store != null)) {
183 ripStore = store;
184 ripStore.setDelegate(fpmPrefixStoreDelegate);
185 for (Channel ch : allChannels) {
186 processRipStaticRoutes(ch);
187 }
188 }
189 }
190
191 protected void unbindRipStore(FpmPrefixStore store) {
192 if (ripStore == store) {
193 ripStore.unsetDelegate(fpmPrefixStoreDelegate);
194 ripStore = null;
195 }
196 }
197
198 protected void bindDhcpStore(FpmPrefixStore store) {
199 if ((dhcpStore == null) && (store != null)) {
200 dhcpStore = store;
201 dhcpStore.setDelegate(fpmPrefixStoreDelegate);
202 for (Channel ch : allChannels) {
203 processDhcpStaticRoutes(ch);
204 }
205 }
206 }
207
208 protected void unbindDhcpStore(FpmPrefixStore store) {
209 if (dhcpStore == store) {
210 dhcpStore.unsetDelegate(fpmPrefixStoreDelegate);
211 dhcpStore = null;
212 }
213 }
214
215 @Activate
216 protected void activate(ComponentContext context) {
217 componentConfigService.preSetProperty(
218 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
219 "distributed", "true");
220
221 componentConfigService.registerProperties(getClass());
222
223 KryoNamespace serializer = KryoNamespace.newBuilder()
224 .register(KryoNamespaces.API)
225 .register(FpmPeer.class)
226 .register(FpmConnectionInfo.class)
227 .build();
228 peers = storageService.<FpmPeer, Set<FpmConnectionInfo>>consistentMapBuilder()
229 .withName("fpm-connections")
230 .withSerializer(Serializer.using(serializer))
231 .build();
232
233 modified(context);
234 startServer();
235
236 appId = coreService.registerApplication(APP_NAME, peers::destroy);
237
Charles Chan035ed1f2018-01-30 16:00:32 -0800238 asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
Jordan Halterman1c9a0b42018-08-13 02:41:50 -0700239
240 clusterEventExecutor = Executors.newSingleThreadExecutor(groupedThreads("fpm-event-main", "%d", log));
Saurav Dase7f51012018-02-09 17:26:45 -0800241 clusterService.addListener(clusterListener);
Charles Chan035ed1f2018-01-30 16:00:32 -0800242
Kalhee Kimba366062017-11-07 16:32:09 +0000243 log.info("Started");
244 }
245
246 @Deactivate
247 protected void deactivate() {
248 componentConfigService.preSetProperty(
249 "org.onosproject.incubator.store.routing.impl.RouteStoreImpl",
250 "distributed", "false");
251
252 stopServer();
253 fpmRoutes.clear();
254 componentConfigService.unregisterProperties(getClass(), false);
Charles Chan035ed1f2018-01-30 16:00:32 -0800255
256 clusterService.removeListener(clusterListener);
Jordan Halterman1c9a0b42018-08-13 02:41:50 -0700257 clusterEventExecutor.shutdown();
Charles Chan035ed1f2018-01-30 16:00:32 -0800258 asyncLock.unlock();
259
Kalhee Kimba366062017-11-07 16:32:09 +0000260 log.info("Stopped");
261 }
262
263 @Modified
264 protected void modified(ComponentContext context) {
shalde064d1fe85b2018-06-15 19:01:29 -0400265 Ip4Address rurIPv4Address;
266 Ip6Address rurIPv6Address;
Kalhee Kimba366062017-11-07 16:32:09 +0000267 Dictionary<?, ?> properties = context.getProperties();
268 if (properties == null) {
269 return;
270 }
271 String strClearRoutes = Tools.get(properties, "clearRoutes");
272 if (strClearRoutes != null) {
273 clearRoutes = Boolean.parseBoolean(strClearRoutes);
274 log.info("clearRoutes is {}", clearRoutes);
275 }
276
277 String strPdPushEnabled = Tools.get(properties, "pdPushEnabled");
278 if (strPdPushEnabled != null) {
279 boolean oldValue = pdPushEnabled;
280 pdPushEnabled = Boolean.parseBoolean(strPdPushEnabled);
281 if (pdPushEnabled) {
282
shalde064d1fe85b2018-06-15 19:01:29 -0400283 pdPushNextHopIPv4 = new ArrayList<Ip4Address>();
284 pdPushNextHopIPv6 = new ArrayList<Ip6Address>();
Kalhee Kimba366062017-11-07 16:32:09 +0000285
286 String strPdPushNextHopIPv4 = Tools.get(properties, "pdPushNextHopIPv4");
287 if (strPdPushNextHopIPv4 != null) {
shalde064d1fe85b2018-06-15 19:01:29 -0400288 List<String> strPdPushNextHopIPv4List = Arrays.asList(strPdPushNextHopIPv4.split(","));
289 for (String nextHop : strPdPushNextHopIPv4List) {
290 log.debug("IPv4 next hop added is:" + nextHop);
291 pdPushNextHopIPv4.add(Ip4Address.valueOf(nextHop));
292 }
Kalhee Kimba366062017-11-07 16:32:09 +0000293 }
294 String strPdPushNextHopIPv6 = Tools.get(properties, "pdPushNextHopIPv6");
295 if (strPdPushNextHopIPv6 != null) {
shalde064d1fe85b2018-06-15 19:01:29 -0400296 List<String> strPdPushNextHopIPv6List = Arrays.asList(strPdPushNextHopIPv6.split(","));
297 for (String nextHop : strPdPushNextHopIPv6List) {
298 log.debug("IPv6 next hop added is:" + nextHop);
299 pdPushNextHopIPv6.add(Ip6Address.valueOf(nextHop));
300 }
Kalhee Kimba366062017-11-07 16:32:09 +0000301 }
302
shalde064d1fe85b2018-06-15 19:01:29 -0400303 if (pdPushNextHopIPv4 == null || pdPushNextHopIPv4.size() == 0) {
304 rurIPv4Address = interfaceService.getInterfaces()
Kalhee Kimba366062017-11-07 16:32:09 +0000305 .stream()
306 .filter(iface -> iface.name().contains("RUR"))
307 .map(Interface::ipAddressesList)
308 .flatMap(Collection::stream)
309 .map(InterfaceIpAddress::ipAddress)
310 .filter(IpAddress::isIp4)
311 .map(IpAddress::getIp4Address)
312 .findFirst()
313 .orElse(null);
Mayank Tiwariaab61e42018-06-23 11:19:08 -0400314 log.debug("RUR IPv4 address extracted from netcfg is: {}", rurIPv4Address);
shalde064d1fe85b2018-06-15 19:01:29 -0400315 if (rurIPv4Address != null) {
316 pdPushNextHopIPv4.add(rurIPv4Address);
Mayank Tiwariaab61e42018-06-23 11:19:08 -0400317 } else {
318 log.debug("Unable to extract RUR IPv4 address from netcfg");
shalde064d1fe85b2018-06-15 19:01:29 -0400319 }
320
Kalhee Kimba366062017-11-07 16:32:09 +0000321 }
322
Mayank Tiwariaab61e42018-06-23 11:19:08 -0400323 if (pdPushNextHopIPv6 == null || pdPushNextHopIPv6.size() == 0) {
shalde064d1fe85b2018-06-15 19:01:29 -0400324 rurIPv6Address = interfaceService.getInterfaces()
Kalhee Kimba366062017-11-07 16:32:09 +0000325 .stream()
326 .filter(iface -> iface.name().contains("RUR"))
327 .map(Interface::ipAddressesList)
328 .flatMap(Collection::stream)
329 .map(InterfaceIpAddress::ipAddress)
330 .filter(IpAddress::isIp6)
331 .map(IpAddress::getIp6Address)
332 .findFirst()
333 .orElse(null);
Mayank Tiwariaab61e42018-06-23 11:19:08 -0400334 log.debug("RUR IPv6 address extracted from netcfg is: {}", rurIPv6Address);
shalde064d1fe85b2018-06-15 19:01:29 -0400335 if (rurIPv6Address != null) {
336 pdPushNextHopIPv6.add(rurIPv6Address);
Mayank Tiwariaab61e42018-06-23 11:19:08 -0400337 } else {
338 log.debug("Unable to extract RUR IPv6 address from netcfg");
shalde064d1fe85b2018-06-15 19:01:29 -0400339 }
Kalhee Kimba366062017-11-07 16:32:09 +0000340 }
341
342 log.info("PD pushing is enabled.");
shalde064d1fe85b2018-06-15 19:01:29 -0400343 if (pdPushNextHopIPv4 != null || pdPushNextHopIPv4.size() != 0) {
344 log.info("ipv4 next-hop {} with {} items", pdPushNextHopIPv4.toString(), pdPushNextHopIPv4.size());
345
Kalhee Kimba366062017-11-07 16:32:09 +0000346 } else {
347 log.info("ipv4 next-hop is null");
348 }
shalde064d1fe85b2018-06-15 19:01:29 -0400349 if (pdPushNextHopIPv6 != null || pdPushNextHopIPv6.size() != 0) {
350 log.info("ipv6 next-hop={} with {} items", pdPushNextHopIPv6.toString(), pdPushNextHopIPv6.size());
Kalhee Kimba366062017-11-07 16:32:09 +0000351 } else {
352 log.info("ipv6 next-hop is null");
353 }
shalde064d1fe85b2018-06-15 19:01:29 -0400354 processStaticRoutes();
Kalhee Kimba366062017-11-07 16:32:09 +0000355 } else {
356 log.info("PD pushing is disabled.");
357 }
358 }
359 }
360
361 private void startServer() {
362 HashedWheelTimer timer = new HashedWheelTimer(
363 groupedThreads("onos/fpm", "fpm-timer-%d", log));
364
365 ChannelFactory channelFactory = new NioServerSocketChannelFactory(
366 newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)),
367 newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log)));
368 ChannelPipelineFactory pipelineFactory = () -> {
369 // Allocate a new session per connection
370 IdleStateHandler idleHandler =
371 new IdleStateHandler(timer, IDLE_TIMEOUT_SECS, 0, 0);
372 FpmSessionHandler fpmSessionHandler =
373 new FpmSessionHandler(this, new InternalFpmListener());
374 FpmFrameDecoder fpmFrameDecoder = new FpmFrameDecoder();
375
376 // Setup the processing pipeline
377 ChannelPipeline pipeline = Channels.pipeline();
378 pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
379 pipeline.addLast("idle", idleHandler);
380 pipeline.addLast("FpmSession", fpmSessionHandler);
381 return pipeline;
382 };
383
384 InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT);
385
386 serverBootstrap = new ServerBootstrap(channelFactory);
387 serverBootstrap.setOption("child.reuseAddr", true);
388 serverBootstrap.setOption("child.keepAlive", true);
389 serverBootstrap.setOption("child.tcpNoDelay", true);
390 serverBootstrap.setPipelineFactory(pipelineFactory);
391 try {
392 serverChannel = serverBootstrap.bind(listenAddress);
393 allChannels.add(serverChannel);
394 } catch (ChannelException e) {
395 log.debug("Exception binding to FPM port {}: ",
396 listenAddress.getPort(), e);
397 stopServer();
398 }
399 }
400
401 private void stopServer() {
402 allChannels.close().awaitUninterruptibly();
403 allChannels.clear();
404 if (serverBootstrap != null) {
405 serverBootstrap.releaseExternalResources();
406 }
407
408 if (clearRoutes) {
409 peers.keySet().forEach(this::clearRoutes);
410 }
411 }
412
Kalhee Kim40beb722018-01-16 20:32:04 +0000413 private boolean routeInDhcpStore(IpPrefix prefix) {
414
415 if (dhcpStore != null) {
416 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
417 return dhcpRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
418 }
419 return false;
420 }
421
422 private boolean routeInRipStore(IpPrefix prefix) {
423
424 if (ripStore != null) {
425 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
426 return ripRecords.stream().anyMatch(record -> record.ipPrefix().equals(prefix));
427 }
428 return false;
429 }
430
Kalhee Kimba366062017-11-07 16:32:09 +0000431 private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
432 if (fpmMessage.type() == FpmHeader.FPM_TYPE_KEEPALIVE) {
433 return;
434 }
435
436 Netlink netlink = fpmMessage.netlink();
437 RtNetlink rtNetlink = netlink.rtNetlink();
438
439 if (log.isTraceEnabled()) {
440 log.trace("Received FPM message: {}", fpmMessage);
441 }
442
443 if (!(rtNetlink.protocol() == RtProtocol.ZEBRA ||
444 rtNetlink.protocol() == RtProtocol.UNSPEC)) {
445 log.trace("Ignoring non-zebra route");
446 return;
447 }
448
449 IpAddress dstAddress = null;
450 IpAddress gateway = null;
451
452 for (RouteAttribute attribute : rtNetlink.attributes()) {
453 if (attribute.type() == RouteAttribute.RTA_DST) {
454 RouteAttributeDst raDst = (RouteAttributeDst) attribute;
455 dstAddress = raDst.dstAddress();
456 } else if (attribute.type() == RouteAttribute.RTA_GATEWAY) {
457 RouteAttributeGateway raGateway = (RouteAttributeGateway) attribute;
458 gateway = raGateway.gateway();
459 }
460 }
461
462 if (dstAddress == null) {
463 log.error("Dst address missing!");
464 return;
465 }
466
467 IpPrefix prefix = IpPrefix.valueOf(dstAddress, rtNetlink.dstLength());
468
Kalhee Kim40beb722018-01-16 20:32:04 +0000469 // Ignore routes that we sent.
Charles Chanc3a48e32018-06-25 13:01:35 -0700470 if (gateway != null && (
471 (prefix.isIp4() && pdPushNextHopIPv4 != null &&
472 pdPushNextHopIPv4.contains(gateway.getIp4Address())) ||
473 (prefix.isIp6() && pdPushNextHopIPv6 != null &&
474 pdPushNextHopIPv6.contains(gateway.getIp6Address())))) {
Kalhee Kim40beb722018-01-16 20:32:04 +0000475 if (routeInDhcpStore(prefix) || routeInRipStore(prefix)) {
476 return;
477 }
478 }
479
Kalhee Kimba366062017-11-07 16:32:09 +0000480 List<Route> updates = new LinkedList<>();
481 List<Route> withdraws = new LinkedList<>();
482
483 Route route;
484 switch (netlink.type()) {
485 case RTM_NEWROUTE:
486 if (gateway == null) {
487 // We ignore interface routes with no gateway for now.
488 return;
489 }
490 route = new Route(Route.Source.FPM, prefix, gateway, clusterService.getLocalNode().id());
491
492
493 Route oldRoute = fpmRoutes.get(peer).put(prefix, route);
494
495 if (oldRoute != null) {
496 log.trace("Swapping {} with {}", oldRoute, route);
497 withdraws.add(oldRoute);
498 }
499 updates.add(route);
500 break;
501 case RTM_DELROUTE:
502 Route existing = fpmRoutes.get(peer).remove(prefix);
503 if (existing == null) {
504 log.warn("Got delete for non-existent prefix");
505 return;
506 }
507
508 route = new Route(Route.Source.FPM, prefix, existing.nextHop(), clusterService.getLocalNode().id());
509
510 withdraws.add(route);
511 break;
512 case RTM_GETROUTE:
513 default:
514 break;
515 }
516
Charles Chan035ed1f2018-01-30 16:00:32 -0800517 updateRouteStore(updates, withdraws);
518 }
519
520 private synchronized void updateRouteStore(Collection<Route> routesToAdd, Collection<Route> routesToRemove) {
521 routeService.withdraw(routesToRemove);
522 routeService.update(routesToAdd);
Kalhee Kimba366062017-11-07 16:32:09 +0000523 }
524
525 private void clearRoutes(FpmPeer peer) {
526 log.info("Clearing all routes for peer {}", peer);
527 Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
528 if (routes != null) {
Charles Chan035ed1f2018-01-30 16:00:32 -0800529 updateRouteStore(Lists.newArrayList(), routes.values());
Kalhee Kimba366062017-11-07 16:32:09 +0000530 }
531 }
532
533 public void processStaticRoutes() {
shalde064d1fe85b2018-06-15 19:01:29 -0400534 log.debug("processStaticRoutes function is called");
Kalhee Kimba366062017-11-07 16:32:09 +0000535 for (Channel ch : allChannels) {
536 processStaticRoutes(ch);
537 }
538 }
539
540 public void processStaticRoutes(Channel ch) {
541 processRipStaticRoutes(ch);
542 processDhcpStaticRoutes(ch);
543 }
544
545 private void processRipStaticRoutes(Channel ch) {
546
547 /* Get RIP static routes. */
548 if (ripStore != null) {
549 Collection<FpmRecord> ripRecords = ripStore.getFpmRecords();
550 log.info("RIP store size is {}", ripRecords.size());
551
552 ripRecords.forEach(record -> sendRouteUpdateToChannel(true,
553 record.ipPrefix(), ch));
554 }
555 }
556
557 private void processDhcpStaticRoutes(Channel ch) {
558
559 /* Get Dhcp static routes. */
560 if (dhcpStore != null) {
561 Collection<FpmRecord> dhcpRecords = dhcpStore.getFpmRecords();
562 log.info("Dhcp store size is {}", dhcpRecords.size());
563
564 dhcpRecords.forEach(record -> sendRouteUpdateToChannel(true,
565 record.ipPrefix(), ch));
566 }
567 }
568
shalde064d1fe85b2018-06-15 19:01:29 -0400569 private void updateRoute(IpAddress pdPushNextHop, boolean isAdd, IpPrefix prefix,
570 Channel ch, int raLength, short addrFamily) {
Kalhee Kimba366062017-11-07 16:32:09 +0000571 try {
shalde064d1fe85b2018-06-15 19:01:29 -0400572 RouteAttributeDst raDst = RouteAttributeDst.builder()
573 .length(raLength)
574 .type(RouteAttribute.RTA_DST)
575 .dstAddress(prefix.address())
576 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000577
Kalhee Kim715dd732018-01-23 14:39:56 +0000578 RouteAttributeGateway raGateway = RouteAttributeGateway.builder()
shalde064d1fe85b2018-06-15 19:01:29 -0400579 .length(raLength)
580 .type(RouteAttribute.RTA_GATEWAY)
581 .gateway(pdPushNextHop)
582 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000583
Kalhee Kim715dd732018-01-23 14:39:56 +0000584 // Build RtNetlink.
585 RtNetlink rtNetlink = RtNetlink.builder()
shalde064d1fe85b2018-06-15 19:01:29 -0400586 .addressFamily(addrFamily)
587 .dstLength(prefix.prefixLength())
588 .routeAttribute(raDst)
589 .routeAttribute(raGateway)
590 .build();
Kalhee Kim715dd732018-01-23 14:39:56 +0000591
592 // Build Netlink.
Kalhee Kimba366062017-11-07 16:32:09 +0000593 int messageLength = raDst.length() + raGateway.length() +
shalde064d1fe85b2018-06-15 19:01:29 -0400594 RtNetlink.RT_NETLINK_LENGTH + Netlink.NETLINK_HEADER_LENGTH;
Kalhee Kim715dd732018-01-23 14:39:56 +0000595 Netlink netLink = Netlink.builder()
shalde064d1fe85b2018-06-15 19:01:29 -0400596 .length(messageLength)
597 .type(isAdd ? NetlinkMessageType.RTM_NEWROUTE : NetlinkMessageType.RTM_DELROUTE)
598 .flags(Netlink.NETLINK_REQUEST | Netlink.NETLINK_CREATE)
599 .rtNetlink(rtNetlink)
600 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000601
Kalhee Kim715dd732018-01-23 14:39:56 +0000602 // Build FpmHeader.
Kalhee Kimba366062017-11-07 16:32:09 +0000603 messageLength += FpmHeader.FPM_HEADER_LENGTH;
Kalhee Kim715dd732018-01-23 14:39:56 +0000604 FpmHeader fpmMessage = FpmHeader.builder()
shalde064d1fe85b2018-06-15 19:01:29 -0400605 .version(FpmHeader.FPM_VERSION_1)
606 .type(FpmHeader.FPM_TYPE_NETLINK)
607 .length(messageLength)
608 .netlink(netLink)
609 .build();
Kalhee Kimba366062017-11-07 16:32:09 +0000610
611 // Encode message in a channel buffer and transmit.
612 ch.write(fpmMessage.encode());
613
614 } catch (RuntimeException e) {
615 log.info("Route not sent over fpm connection.");
616 }
617 }
618
shalde064d1fe85b2018-06-15 19:01:29 -0400619 private void sendRouteUpdateToChannel(boolean isAdd, IpPrefix prefix, Channel ch) {
620
621 if (!pdPushEnabled) {
622 return;
623 }
624 int raLength;
625 short addrFamily;
626
627 // Build route attributes.
628 if (prefix.isIp4()) {
629 List<Ip4Address> pdPushNextHopList;
630 if (pdPushNextHopIPv4 == null || pdPushNextHopIPv4.size() == 0) {
631 log.info("Prefix not pushed because ipv4 next-hop is null.");
632 return;
633 }
634 pdPushNextHopList = pdPushNextHopIPv4;
635 raLength = Ip4Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
636 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET;
637 for (Ip4Address pdPushNextHop: pdPushNextHopList) {
638 log.debug("IPv4 next hop is:" + pdPushNextHop);
639 updateRoute(pdPushNextHop, isAdd, prefix, ch, raLength, addrFamily);
640 }
641 } else {
642 List<Ip6Address> pdPushNextHopList;
643 if (pdPushNextHopIPv6 == null || pdPushNextHopIPv6.size() == 0) {
644 log.info("Prefix not pushed because ipv6 next-hop is null.");
645 return;
646 }
647 pdPushNextHopList = pdPushNextHopIPv6;
648 raLength = Ip6Address.BYTE_LENGTH + RouteAttribute.ROUTE_ATTRIBUTE_HEADER_LENGTH;
649 addrFamily = RtNetlink.RT_ADDRESS_FAMILY_INET6;
650 for (Ip6Address pdPushNextHop: pdPushNextHopList) {
651 log.debug("IPv6 next hop is:" + pdPushNextHop);
652 updateRoute(pdPushNextHop, isAdd, prefix, ch, raLength, addrFamily);
653 }
654 }
655 }
656
Kalhee Kimba366062017-11-07 16:32:09 +0000657 private void sendRouteUpdate(boolean isAdd, IpPrefix prefix) {
658
659 for (Channel ch : allChannels) {
660 sendRouteUpdateToChannel(isAdd, prefix, ch);
661 }
662 }
663
664 public boolean isPdPushEnabled() {
665 return pdPushEnabled;
666 }
667
668 private FpmPeerInfo toFpmInfo(FpmPeer peer, Collection<FpmConnectionInfo> connections) {
669 return new FpmPeerInfo(connections,
670 fpmRoutes.getOrDefault(peer, Collections.emptyMap()).size());
671 }
672
673 @Override
674 public Map<FpmPeer, FpmPeerInfo> peers() {
675 return peers.asJavaMap().entrySet().stream()
676 .collect(Collectors.toMap(
677 e -> e.getKey(),
678 e -> toFpmInfo(e.getKey(), e.getValue())));
679 }
680
681 private class InternalFpmListener implements FpmListener {
682 @Override
683 public void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
684 FpmManager.this.fpmMessage(peer, fpmMessage);
685 }
686
687 @Override
688 public boolean peerConnected(FpmPeer peer) {
689 if (peers.keySet().contains(peer)) {
690 return false;
691 }
692
693 NodeId localNode = clusterService.getLocalNode().id();
694 peers.compute(peer, (p, infos) -> {
695 if (infos == null) {
696 infos = new HashSet<>();
697 }
698
699 infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700700 localPeers.put(peer, infos);
Kalhee Kimba366062017-11-07 16:32:09 +0000701 return infos;
702 });
703
704 fpmRoutes.computeIfAbsent(peer, p -> new ConcurrentHashMap<>());
705 return true;
706 }
707
708 @Override
709 public void peerDisconnected(FpmPeer peer) {
710 log.info("FPM connection to {} went down", peer);
711
712 if (clearRoutes) {
713 clearRoutes(peer);
714 }
715
716 peers.compute(peer, (p, infos) -> {
717 if (infos == null) {
718 return null;
719 }
720
721 infos.stream()
722 .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
723 .findAny()
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700724 .ifPresent(i -> {
725 infos.remove(i);
726 localPeers.get(peer).remove(i);
727 });
Kalhee Kimba366062017-11-07 16:32:09 +0000728
729 if (infos.isEmpty()) {
730 return null;
731 }
732
733 return infos;
734 });
735 }
736 }
737
738 /**
739 * Adds a channel to the channel group.
740 *
741 * @param channel the channel to add
742 */
743 public void addSessionChannel(Channel channel) {
744 allChannels.add(channel);
745 }
746
747 /**
748 * Removes a channel from the channel group.
749 *
750 * @param channel the channel to remove
751 */
752 public void removeSessionChannel(Channel channel) {
753 allChannels.remove(channel);
754 }
755
756 /**
757 * Store delegate for Fpm Prefix store.
758 * Handles Fpm prefix store event.
759 */
760 class FpmPrefixStoreDelegate implements StoreDelegate<FpmPrefixStoreEvent> {
761
762 @Override
763 public void notify(FpmPrefixStoreEvent e) {
764
765 log.trace("FpmPrefixStoreEvent notify");
766
767 FpmRecord record = e.subject();
768 switch (e.type()) {
769 case ADD:
770 sendRouteUpdate(true, record.ipPrefix());
771 break;
772 case REMOVE:
773 sendRouteUpdate(false, record.ipPrefix());
774 break;
775 default:
776 log.warn("unsupported store event type", e.type());
777 return;
778 }
779 }
780 }
Charles Chan035ed1f2018-01-30 16:00:32 -0800781
782 private class InternalClusterListener implements ClusterEventListener {
783 @Override
784 public void event(ClusterEvent event) {
Jordan Halterman1c9a0b42018-08-13 02:41:50 -0700785 clusterEventExecutor.execute(() -> {
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700786 log.info("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
Jordan Halterman1c9a0b42018-08-13 02:41:50 -0700787 switch (event.type()) {
788 case INSTANCE_READY:
789 // When current node is healing from a network partition,
790 // seeing INSTANCE_READY means current node has the ability to read from the cluster,
791 // but it is possible that current node still can't write to the cluster at this moment.
792 // The AsyncDistributedLock is introduced to ensure we attempt to push FPM routes
793 // after current node can write.
794 // Adding 15 seconds retry for the current node to be able to write.
795 asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
796 if (result != null && result.isPresent()) {
797 log.debug("Lock obtained. Push local FPM routes to route store");
798 // All FPM routes on current node will be pushed again even when current node is not
799 // the one that becomes READY. A better way is to do this only on the minority nodes.
800 pushFpmRoutes();
801 localPeers.forEach((key, value) -> peers.put(key, value));
802 asyncLock.unlock();
803 } else {
804 log.debug("Fail to obtain lock. Abort.");
805 }
806 });
807 break;
808 case INSTANCE_DEACTIVATED:
809 case INSTANCE_REMOVED:
810 ImmutableMap.copyOf(peers.asJavaMap()).forEach((key, value) -> {
811 if (value != null) {
812 value.stream()
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700813 .filter(i -> i.connectedTo().equals(event.subject().id()))
814 .findAny()
815 .ifPresent(value::remove);
816
Jordan Halterman1c9a0b42018-08-13 02:41:50 -0700817 if (value.isEmpty()) {
818 peers.remove(key);
819 }
Andrea Campanella4310f6e2018-03-27 16:35:39 -0700820 }
Jordan Halterman1c9a0b42018-08-13 02:41:50 -0700821 });
822 break;
823 case INSTANCE_ADDED:
824 case INSTANCE_ACTIVATED:
825 default:
826 break;
827 }
828 });
Charles Chan035ed1f2018-01-30 16:00:32 -0800829 }
830 }
831
Saurav Dase7f51012018-02-09 17:26:45 -0800832 @Override
Charles Chan035ed1f2018-01-30 16:00:32 -0800833 public void pushFpmRoutes() {
834 Set<Route> routes = fpmRoutes.values().stream()
835 .map(Map::entrySet).flatMap(Set::stream).map(Map.Entry::getValue)
836 .collect(Collectors.toSet());
837 updateRouteStore(routes, Lists.newArrayList());
838 log.info("{} FPM routes have been updated to route store", routes.size());
839 }
Kalhee Kimba366062017-11-07 16:32:09 +0000840}