blob: 914a3d99ba6a8ff0996ea348ef3db80a863cde2b [file] [log] [blame]
Jonathan Hart3930f632015-10-19 12:12:51 -07001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.routing.fpm;
17
Jonathan Hart6b045582016-02-03 10:00:08 -080018import com.google.common.collect.ImmutableMap;
Jonathan Hart3930f632015-10-19 12:12:51 -070019import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Service;
23import org.jboss.netty.bootstrap.ServerBootstrap;
24import org.jboss.netty.channel.Channel;
25import org.jboss.netty.channel.ChannelException;
26import org.jboss.netty.channel.ChannelFactory;
27import org.jboss.netty.channel.ChannelPipeline;
28import org.jboss.netty.channel.ChannelPipelineFactory;
29import org.jboss.netty.channel.Channels;
30import org.jboss.netty.channel.group.ChannelGroup;
31import org.jboss.netty.channel.group.DefaultChannelGroup;
32import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
33import org.onlab.packet.IpAddress;
34import org.onlab.packet.IpPrefix;
35import org.onosproject.routing.RouteEntry;
36import org.onosproject.routing.RouteListener;
37import org.onosproject.routing.RouteSourceService;
38import org.onosproject.routing.RouteUpdate;
39import org.onosproject.routing.fpm.protocol.FpmHeader;
40import org.onosproject.routing.fpm.protocol.Netlink;
41import org.onosproject.routing.fpm.protocol.RouteAttribute;
42import org.onosproject.routing.fpm.protocol.RouteAttributeDst;
43import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
44import org.onosproject.routing.fpm.protocol.RtNetlink;
Jonathan Hart916bf892016-01-27 16:42:55 -080045import org.onosproject.routing.fpm.protocol.RtProtocol;
Jonathan Hart3930f632015-10-19 12:12:51 -070046import org.slf4j.Logger;
47import org.slf4j.LoggerFactory;
48
49import java.net.InetSocketAddress;
Jonathan Hart6b045582016-02-03 10:00:08 -080050import java.net.SocketAddress;
Jonathan Hart3930f632015-10-19 12:12:51 -070051import java.util.Collections;
52import java.util.Map;
53import java.util.concurrent.ConcurrentHashMap;
54
55import static java.util.concurrent.Executors.newCachedThreadPool;
56import static org.onlab.util.Tools.groupedThreads;
57
58/**
59 * Forwarding Plane Manager (FPM) route source.
60 */
61@Service
62@Component(immediate = true, enabled = false)
Jonathan Hart6b045582016-02-03 10:00:08 -080063public class FpmManager implements RouteSourceService, FpmInfoService {
Jonathan Hart3930f632015-10-19 12:12:51 -070064 private final Logger log = LoggerFactory.getLogger(getClass());
65
66 private ServerBootstrap serverBootstrap;
67 private Channel serverChannel;
68 private ChannelGroup allChannels = new DefaultChannelGroup();
69
Jonathan Hart6b045582016-02-03 10:00:08 -080070 private Map<SocketAddress, Long> peers = new ConcurrentHashMap<>();
71
Jonathan Hart3930f632015-10-19 12:12:51 -070072 private Map<IpPrefix, RouteEntry> fpmRoutes = new ConcurrentHashMap<>();
73
74 private RouteListener routeListener;
75
76 private static final int FPM_PORT = 2620;
77
78 @Activate
79 protected void activate() {
80 log.info("Started");
81 }
82
83 @Deactivate
84 protected void deactivate() {
85 stopServer();
86 log.info("Stopped");
87 }
88
89 private void startServer() {
90 ChannelFactory channelFactory = new NioServerSocketChannelFactory(
91 newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d")),
92 newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d")));
93 ChannelPipelineFactory pipelineFactory = () -> {
94 // Allocate a new session per connection
95 FpmSessionHandler fpmSessionHandler =
96 new FpmSessionHandler(new InternalFpmListener());
97 FpmFrameDecoder fpmFrameDecoder =
98 new FpmFrameDecoder();
99
100 // Setup the processing pipeline
101 ChannelPipeline pipeline = Channels.pipeline();
102 pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
103 pipeline.addLast("FpmSession", fpmSessionHandler);
104 return pipeline;
105 };
106
107 InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT);
108
109 serverBootstrap = new ServerBootstrap(channelFactory);
110 serverBootstrap.setOption("child.reuseAddr", true);
111 serverBootstrap.setOption("child.keepAlive", true);
112 serverBootstrap.setOption("child.tcpNoDelay", true);
113 serverBootstrap.setPipelineFactory(pipelineFactory);
114 try {
115 serverChannel = serverBootstrap.bind(listenAddress);
116 allChannels.add(serverChannel);
117 } catch (ChannelException e) {
118 log.debug("Exception binding to FPM port {}: ",
119 listenAddress.getPort(), e);
120 stopServer();
121 }
122 }
123
124 private void stopServer() {
125 allChannels.close().awaitUninterruptibly();
126 allChannels.clear();
127 if (serverBootstrap != null) {
128 serverBootstrap.releaseExternalResources();
129 }
130 }
131
132 @Override
133 public void start(RouteListener routeListener) {
134 this.routeListener = routeListener;
135
136 startServer();
137 }
138
139 @Override
140 public void stop() {
141 fpmRoutes.clear();
142 stopServer();
143 }
144
145 private void fpmMessage(FpmHeader fpmMessage) {
146 Netlink netlink = fpmMessage.netlink();
147 RtNetlink rtNetlink = netlink.rtNetlink();
148
149 if (log.isTraceEnabled()) {
150 log.trace("Received FPM message: {}", fpmMessage);
151 }
152
Jonathan Hart916bf892016-01-27 16:42:55 -0800153 if (rtNetlink.protocol() != RtProtocol.ZEBRA) {
154 log.trace("Ignoring non-zebra route");
155 return;
156 }
157
Jonathan Hart3930f632015-10-19 12:12:51 -0700158 IpAddress dstAddress = null;
159 IpAddress gateway = null;
160
161 for (RouteAttribute attribute : rtNetlink.attributes()) {
162 if (attribute.type() == RouteAttribute.RTA_DST) {
163 RouteAttributeDst raDst = (RouteAttributeDst) attribute;
164 dstAddress = raDst.dstAddress();
165 } else if (attribute.type() == RouteAttribute.RTA_GATEWAY) {
166 RouteAttributeGateway raGateway = (RouteAttributeGateway) attribute;
167 gateway = raGateway.gateway();
168 }
169 }
170
171 if (dstAddress == null) {
172 log.error("Dst address missing!");
173 return;
174 }
175
176 IpPrefix prefix = IpPrefix.valueOf(dstAddress, rtNetlink.dstLength());
177
178 RouteUpdate routeUpdate = null;
179 RouteEntry entry;
180 switch (netlink.type()) {
181 case RTM_NEWROUTE:
182 if (gateway == null) {
183 // We ignore interface routes with no gateway for now.
184 return;
185 }
186 entry = new RouteEntry(prefix, gateway);
187
188 fpmRoutes.put(entry.prefix(), entry);
189
190 routeUpdate = new RouteUpdate(RouteUpdate.Type.UPDATE, entry);
191 break;
192 case RTM_DELROUTE:
193 RouteEntry existing = fpmRoutes.remove(prefix);
194 if (existing == null) {
195 log.warn("Got delete for non-existent prefix");
196 return;
197 }
198
199 entry = new RouteEntry(prefix, existing.nextHop());
200
201 routeUpdate = new RouteUpdate(RouteUpdate.Type.DELETE, entry);
202 break;
203 case RTM_GETROUTE:
204 default:
205 break;
206 }
207
208 if (routeUpdate == null) {
209 log.warn("Unsupported FPM message: {}", fpmMessage);
210 return;
211 }
212
213 routeListener.update(Collections.singletonList(routeUpdate));
214 }
215
Jonathan Hart6b045582016-02-03 10:00:08 -0800216 @Override
217 public Map<SocketAddress, Long> peers() {
218 return ImmutableMap.copyOf(peers);
219 }
220
221 private class InternalFpmListener implements FpmListener {
Jonathan Hart3930f632015-10-19 12:12:51 -0700222 @Override
223 public void fpmMessage(FpmHeader fpmMessage) {
224 FpmManager.this.fpmMessage(fpmMessage);
225 }
Jonathan Hart6b045582016-02-03 10:00:08 -0800226
227 @Override
228 public boolean peerConnected(SocketAddress address) {
229 if (peers.keySet().contains(address)) {
230 return false;
231 }
232
233 peers.put(address, System.currentTimeMillis());
234 return true;
235 }
236
237 @Override
238 public void peerDisconnected(SocketAddress address) {
239 peers.remove(address);
240 }
Jonathan Hart3930f632015-10-19 12:12:51 -0700241 }
242
243}