blob: d89ae4e090746f8857aec896e248fec11db97b2d [file] [log] [blame]
Jonathan Hart3930f632015-10-19 12:12:51 -07001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.routing.fpm;
17
18import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Service;
22import org.jboss.netty.bootstrap.ServerBootstrap;
23import org.jboss.netty.channel.Channel;
24import org.jboss.netty.channel.ChannelException;
25import org.jboss.netty.channel.ChannelFactory;
26import org.jboss.netty.channel.ChannelPipeline;
27import org.jboss.netty.channel.ChannelPipelineFactory;
28import org.jboss.netty.channel.Channels;
29import org.jboss.netty.channel.group.ChannelGroup;
30import org.jboss.netty.channel.group.DefaultChannelGroup;
31import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
32import org.onlab.packet.IpAddress;
33import org.onlab.packet.IpPrefix;
34import org.onosproject.routing.RouteEntry;
35import org.onosproject.routing.RouteListener;
36import org.onosproject.routing.RouteSourceService;
37import org.onosproject.routing.RouteUpdate;
38import org.onosproject.routing.fpm.protocol.FpmHeader;
39import org.onosproject.routing.fpm.protocol.Netlink;
40import org.onosproject.routing.fpm.protocol.RouteAttribute;
41import org.onosproject.routing.fpm.protocol.RouteAttributeDst;
42import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
43import org.onosproject.routing.fpm.protocol.RtNetlink;
44import org.slf4j.Logger;
45import org.slf4j.LoggerFactory;
46
47import java.net.InetSocketAddress;
48import java.util.Collections;
49import java.util.Map;
50import java.util.concurrent.ConcurrentHashMap;
51
52import static java.util.concurrent.Executors.newCachedThreadPool;
53import static org.onlab.util.Tools.groupedThreads;
54
55/**
56 * Forwarding Plane Manager (FPM) route source.
57 */
58@Service
59@Component(immediate = true, enabled = false)
60public class FpmManager implements RouteSourceService {
61 private final Logger log = LoggerFactory.getLogger(getClass());
62
63 private ServerBootstrap serverBootstrap;
64 private Channel serverChannel;
65 private ChannelGroup allChannels = new DefaultChannelGroup();
66
67 private Map<IpPrefix, RouteEntry> fpmRoutes = new ConcurrentHashMap<>();
68
69 private RouteListener routeListener;
70
71 private static final int FPM_PORT = 2620;
72
73 @Activate
74 protected void activate() {
75 log.info("Started");
76 }
77
78 @Deactivate
79 protected void deactivate() {
80 stopServer();
81 log.info("Stopped");
82 }
83
84 private void startServer() {
85 ChannelFactory channelFactory = new NioServerSocketChannelFactory(
86 newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d")),
87 newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d")));
88 ChannelPipelineFactory pipelineFactory = () -> {
89 // Allocate a new session per connection
90 FpmSessionHandler fpmSessionHandler =
91 new FpmSessionHandler(new InternalFpmListener());
92 FpmFrameDecoder fpmFrameDecoder =
93 new FpmFrameDecoder();
94
95 // Setup the processing pipeline
96 ChannelPipeline pipeline = Channels.pipeline();
97 pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
98 pipeline.addLast("FpmSession", fpmSessionHandler);
99 return pipeline;
100 };
101
102 InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT);
103
104 serverBootstrap = new ServerBootstrap(channelFactory);
105 serverBootstrap.setOption("child.reuseAddr", true);
106 serverBootstrap.setOption("child.keepAlive", true);
107 serverBootstrap.setOption("child.tcpNoDelay", true);
108 serverBootstrap.setPipelineFactory(pipelineFactory);
109 try {
110 serverChannel = serverBootstrap.bind(listenAddress);
111 allChannels.add(serverChannel);
112 } catch (ChannelException e) {
113 log.debug("Exception binding to FPM port {}: ",
114 listenAddress.getPort(), e);
115 stopServer();
116 }
117 }
118
119 private void stopServer() {
120 allChannels.close().awaitUninterruptibly();
121 allChannels.clear();
122 if (serverBootstrap != null) {
123 serverBootstrap.releaseExternalResources();
124 }
125 }
126
127 @Override
128 public void start(RouteListener routeListener) {
129 this.routeListener = routeListener;
130
131 startServer();
132 }
133
134 @Override
135 public void stop() {
136 fpmRoutes.clear();
137 stopServer();
138 }
139
140 private void fpmMessage(FpmHeader fpmMessage) {
141 Netlink netlink = fpmMessage.netlink();
142 RtNetlink rtNetlink = netlink.rtNetlink();
143
144 if (log.isTraceEnabled()) {
145 log.trace("Received FPM message: {}", fpmMessage);
146 }
147
148 IpAddress dstAddress = null;
149 IpAddress gateway = null;
150
151 for (RouteAttribute attribute : rtNetlink.attributes()) {
152 if (attribute.type() == RouteAttribute.RTA_DST) {
153 RouteAttributeDst raDst = (RouteAttributeDst) attribute;
154 dstAddress = raDst.dstAddress();
155 } else if (attribute.type() == RouteAttribute.RTA_GATEWAY) {
156 RouteAttributeGateway raGateway = (RouteAttributeGateway) attribute;
157 gateway = raGateway.gateway();
158 }
159 }
160
161 if (dstAddress == null) {
162 log.error("Dst address missing!");
163 return;
164 }
165
166 IpPrefix prefix = IpPrefix.valueOf(dstAddress, rtNetlink.dstLength());
167
168 RouteUpdate routeUpdate = null;
169 RouteEntry entry;
170 switch (netlink.type()) {
171 case RTM_NEWROUTE:
172 if (gateway == null) {
173 // We ignore interface routes with no gateway for now.
174 return;
175 }
176 entry = new RouteEntry(prefix, gateway);
177
178 fpmRoutes.put(entry.prefix(), entry);
179
180 routeUpdate = new RouteUpdate(RouteUpdate.Type.UPDATE, entry);
181 break;
182 case RTM_DELROUTE:
183 RouteEntry existing = fpmRoutes.remove(prefix);
184 if (existing == null) {
185 log.warn("Got delete for non-existent prefix");
186 return;
187 }
188
189 entry = new RouteEntry(prefix, existing.nextHop());
190
191 routeUpdate = new RouteUpdate(RouteUpdate.Type.DELETE, entry);
192 break;
193 case RTM_GETROUTE:
194 default:
195 break;
196 }
197
198 if (routeUpdate == null) {
199 log.warn("Unsupported FPM message: {}", fpmMessage);
200 return;
201 }
202
203 routeListener.update(Collections.singletonList(routeUpdate));
204 }
205
206 private class InternalFpmListener implements FpmMessageListener {
207 @Override
208 public void fpmMessage(FpmHeader fpmMessage) {
209 FpmManager.this.fpmMessage(fpmMessage);
210 }
211 }
212
213}