blob: efda592519edf7c04e15ec43aed78fea15d8b419 [file] [log] [blame]
Jonathan Hart3930f632015-10-19 12:12:51 -07001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.routing.fpm;
17
18import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Service;
22import org.jboss.netty.bootstrap.ServerBootstrap;
23import org.jboss.netty.channel.Channel;
24import org.jboss.netty.channel.ChannelException;
25import org.jboss.netty.channel.ChannelFactory;
26import org.jboss.netty.channel.ChannelPipeline;
27import org.jboss.netty.channel.ChannelPipelineFactory;
28import org.jboss.netty.channel.Channels;
29import org.jboss.netty.channel.group.ChannelGroup;
30import org.jboss.netty.channel.group.DefaultChannelGroup;
31import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
32import org.onlab.packet.IpAddress;
33import org.onlab.packet.IpPrefix;
34import org.onosproject.routing.RouteEntry;
35import org.onosproject.routing.RouteListener;
36import org.onosproject.routing.RouteSourceService;
37import org.onosproject.routing.RouteUpdate;
38import org.onosproject.routing.fpm.protocol.FpmHeader;
39import org.onosproject.routing.fpm.protocol.Netlink;
40import org.onosproject.routing.fpm.protocol.RouteAttribute;
41import org.onosproject.routing.fpm.protocol.RouteAttributeDst;
42import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
43import org.onosproject.routing.fpm.protocol.RtNetlink;
Jonathan Hart916bf892016-01-27 16:42:55 -080044import org.onosproject.routing.fpm.protocol.RtProtocol;
Jonathan Hart3930f632015-10-19 12:12:51 -070045import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
47
48import java.net.InetSocketAddress;
49import java.util.Collections;
50import java.util.Map;
51import java.util.concurrent.ConcurrentHashMap;
52
53import static java.util.concurrent.Executors.newCachedThreadPool;
54import static org.onlab.util.Tools.groupedThreads;
55
56/**
57 * Forwarding Plane Manager (FPM) route source.
58 */
59@Service
60@Component(immediate = true, enabled = false)
61public class FpmManager implements RouteSourceService {
62 private final Logger log = LoggerFactory.getLogger(getClass());
63
64 private ServerBootstrap serverBootstrap;
65 private Channel serverChannel;
66 private ChannelGroup allChannels = new DefaultChannelGroup();
67
68 private Map<IpPrefix, RouteEntry> fpmRoutes = new ConcurrentHashMap<>();
69
70 private RouteListener routeListener;
71
72 private static final int FPM_PORT = 2620;
73
74 @Activate
75 protected void activate() {
76 log.info("Started");
77 }
78
79 @Deactivate
80 protected void deactivate() {
81 stopServer();
82 log.info("Stopped");
83 }
84
85 private void startServer() {
86 ChannelFactory channelFactory = new NioServerSocketChannelFactory(
87 newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d")),
88 newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d")));
89 ChannelPipelineFactory pipelineFactory = () -> {
90 // Allocate a new session per connection
91 FpmSessionHandler fpmSessionHandler =
92 new FpmSessionHandler(new InternalFpmListener());
93 FpmFrameDecoder fpmFrameDecoder =
94 new FpmFrameDecoder();
95
96 // Setup the processing pipeline
97 ChannelPipeline pipeline = Channels.pipeline();
98 pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
99 pipeline.addLast("FpmSession", fpmSessionHandler);
100 return pipeline;
101 };
102
103 InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT);
104
105 serverBootstrap = new ServerBootstrap(channelFactory);
106 serverBootstrap.setOption("child.reuseAddr", true);
107 serverBootstrap.setOption("child.keepAlive", true);
108 serverBootstrap.setOption("child.tcpNoDelay", true);
109 serverBootstrap.setPipelineFactory(pipelineFactory);
110 try {
111 serverChannel = serverBootstrap.bind(listenAddress);
112 allChannels.add(serverChannel);
113 } catch (ChannelException e) {
114 log.debug("Exception binding to FPM port {}: ",
115 listenAddress.getPort(), e);
116 stopServer();
117 }
118 }
119
120 private void stopServer() {
121 allChannels.close().awaitUninterruptibly();
122 allChannels.clear();
123 if (serverBootstrap != null) {
124 serverBootstrap.releaseExternalResources();
125 }
126 }
127
128 @Override
129 public void start(RouteListener routeListener) {
130 this.routeListener = routeListener;
131
132 startServer();
133 }
134
135 @Override
136 public void stop() {
137 fpmRoutes.clear();
138 stopServer();
139 }
140
141 private void fpmMessage(FpmHeader fpmMessage) {
142 Netlink netlink = fpmMessage.netlink();
143 RtNetlink rtNetlink = netlink.rtNetlink();
144
145 if (log.isTraceEnabled()) {
146 log.trace("Received FPM message: {}", fpmMessage);
147 }
148
Jonathan Hart916bf892016-01-27 16:42:55 -0800149 if (rtNetlink.protocol() != RtProtocol.ZEBRA) {
150 log.trace("Ignoring non-zebra route");
151 return;
152 }
153
Jonathan Hart3930f632015-10-19 12:12:51 -0700154 IpAddress dstAddress = null;
155 IpAddress gateway = null;
156
157 for (RouteAttribute attribute : rtNetlink.attributes()) {
158 if (attribute.type() == RouteAttribute.RTA_DST) {
159 RouteAttributeDst raDst = (RouteAttributeDst) attribute;
160 dstAddress = raDst.dstAddress();
161 } else if (attribute.type() == RouteAttribute.RTA_GATEWAY) {
162 RouteAttributeGateway raGateway = (RouteAttributeGateway) attribute;
163 gateway = raGateway.gateway();
164 }
165 }
166
167 if (dstAddress == null) {
168 log.error("Dst address missing!");
169 return;
170 }
171
172 IpPrefix prefix = IpPrefix.valueOf(dstAddress, rtNetlink.dstLength());
173
174 RouteUpdate routeUpdate = null;
175 RouteEntry entry;
176 switch (netlink.type()) {
177 case RTM_NEWROUTE:
178 if (gateway == null) {
179 // We ignore interface routes with no gateway for now.
180 return;
181 }
182 entry = new RouteEntry(prefix, gateway);
183
184 fpmRoutes.put(entry.prefix(), entry);
185
186 routeUpdate = new RouteUpdate(RouteUpdate.Type.UPDATE, entry);
187 break;
188 case RTM_DELROUTE:
189 RouteEntry existing = fpmRoutes.remove(prefix);
190 if (existing == null) {
191 log.warn("Got delete for non-existent prefix");
192 return;
193 }
194
195 entry = new RouteEntry(prefix, existing.nextHop());
196
197 routeUpdate = new RouteUpdate(RouteUpdate.Type.DELETE, entry);
198 break;
199 case RTM_GETROUTE:
200 default:
201 break;
202 }
203
204 if (routeUpdate == null) {
205 log.warn("Unsupported FPM message: {}", fpmMessage);
206 return;
207 }
208
209 routeListener.update(Collections.singletonList(routeUpdate));
210 }
211
212 private class InternalFpmListener implements FpmMessageListener {
213 @Override
214 public void fpmMessage(FpmHeader fpmMessage) {
215 FpmManager.this.fpmMessage(fpmMessage);
216 }
217 }
218
219}