blob: 924ad871a08a246054b41874ba769fe7530b0f2f [file] [log] [blame]
YuanyouZhangebe12612015-08-05 18:19:09 +08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
YuanyouZhangebe12612015-08-05 18:19:09 +08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.ovsdb.controller.impl;
17
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070018import io.netty.bootstrap.Bootstrap;
YuanyouZhangebe12612015-08-05 18:19:09 +080019import io.netty.bootstrap.ServerBootstrap;
20import io.netty.buffer.PooledByteBufAllocator;
21import io.netty.channel.Channel;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070022import io.netty.channel.ChannelDuplexHandler;
YuanyouZhangebe12612015-08-05 18:19:09 +080023import io.netty.channel.ChannelFuture;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070024import io.netty.channel.ChannelFutureListener;
25import io.netty.channel.ChannelHandlerContext;
YuanyouZhangebe12612015-08-05 18:19:09 +080026import io.netty.channel.ChannelInitializer;
27import io.netty.channel.ChannelOption;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070028import io.netty.channel.ChannelPipeline;
29import io.netty.channel.EventLoop;
YuanyouZhangebe12612015-08-05 18:19:09 +080030import io.netty.channel.EventLoopGroup;
31import io.netty.channel.ServerChannel;
32import io.netty.channel.nio.NioEventLoopGroup;
33import io.netty.channel.socket.SocketChannel;
34import io.netty.channel.socket.nio.NioServerSocketChannel;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070035import io.netty.channel.socket.nio.NioSocketChannel;
YuanyouZhangebe12612015-08-05 18:19:09 +080036import io.netty.handler.codec.string.StringEncoder;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070037import io.netty.handler.timeout.IdleState;
38import io.netty.handler.timeout.IdleStateEvent;
39import io.netty.handler.timeout.IdleStateHandler;
YuanyouZhangebe12612015-08-05 18:19:09 +080040import io.netty.util.CharsetUtil;
41
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070042import static org.onlab.util.Tools.groupedThreads;
43
YuanyouZhangebe12612015-08-05 18:19:09 +080044import java.net.InetSocketAddress;
45import java.util.concurrent.ExecutorService;
46import java.util.concurrent.Executors;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070047import java.util.concurrent.TimeUnit;
48import java.util.concurrent.atomic.AtomicInteger;
YuanyouZhangebe12612015-08-05 18:19:09 +080049
50import org.onlab.packet.IpAddress;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070051import org.onlab.packet.TpPort;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070052import org.onlab.util.Tools;
YuanyouZhangebe12612015-08-05 18:19:09 +080053import org.onosproject.ovsdb.controller.OvsdbConstant;
54import org.onosproject.ovsdb.controller.OvsdbNodeId;
55import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient;
56import org.onosproject.ovsdb.controller.driver.OvsdbAgent;
57import org.onosproject.ovsdb.controller.driver.OvsdbProviderService;
58import org.onosproject.ovsdb.rfc.jsonrpc.Callback;
59import org.slf4j.Logger;
60import org.slf4j.LoggerFactory;
61
62/**
63 * The main controller class. Handles all setup and network listeners -
BitOhenry3e104cd2015-11-10 12:18:33 +080064 * distributed OVSDBClient.
YuanyouZhangebe12612015-08-05 18:19:09 +080065 */
BitOhenryf006ddd2015-11-17 13:25:41 +080066public class Controller {
YuanyouZhangebe12612015-08-05 18:19:09 +080067 protected static final Logger log = LoggerFactory
BitOhenryf006ddd2015-11-17 13:25:41 +080068 .getLogger(Controller.class);
YuanyouZhangebe12612015-08-05 18:19:09 +080069
70 private int ovsdbPort = OvsdbConstant.OVSDBPORT;
71
72 private OvsdbAgent agent;
73 private Callback monitorCallback;
74
75 private final ExecutorService executorService = Executors
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070076 .newFixedThreadPool(10, groupedThreads("OVSDB-C", "executor-%d", log));
YuanyouZhangebe12612015-08-05 18:19:09 +080077
78 private EventLoopGroup bossGroup;
79 private EventLoopGroup workerGroup;
80 private Class<? extends ServerChannel> serverChannelClass;
81
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070082 private static final int MAX_RETRY = 5;
83 private static final int IDLE_TIMEOUT_SEC = 10;
84
YuanyouZhangebe12612015-08-05 18:19:09 +080085 /**
86 * Initialization.
87 */
88 private void initEventLoopGroup() {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070089 bossGroup = new NioEventLoopGroup(0, Tools.groupedThreads("OVSDB-C", "boss-%d", log));
90 workerGroup = new NioEventLoopGroup(0, Tools.groupedThreads("OVSDB-C", "worker-%d", log));
YuanyouZhangebe12612015-08-05 18:19:09 +080091 serverChannelClass = NioServerSocketChannel.class;
92 }
93
94 /**
95 * Accepts incoming connections.
96 */
97 private void startAcceptingConnections() throws InterruptedException {
98 ServerBootstrap b = new ServerBootstrap();
99
100 b.group(bossGroup, workerGroup).channel(serverChannelClass)
101 .childHandler(new OnosCommunicationChannelInitializer());
102 b.option(ChannelOption.SO_BACKLOG, 128);
103 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
104 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
105 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
106 b.childOption(ChannelOption.SO_KEEPALIVE, true);
jiangrui62fb79b2015-08-21 15:37:00 +0800107 b.bind(ovsdbPort).sync();
YuanyouZhangebe12612015-08-05 18:19:09 +0800108 }
109
110 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800111 * Tells controller that we're ready to accept OVSDB node loop.
Ray Milkey9b36d812015-09-09 15:24:54 -0700112 * @throws InterruptedException if thread is interrupted
YuanyouZhangebe12612015-08-05 18:19:09 +0800113 */
114 public void run() throws InterruptedException {
115 initEventLoopGroup();
116 startAcceptingConnections();
117 }
118
119 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800120 * Adds channel pipeline to handle a new connected node.
YuanyouZhangebe12612015-08-05 18:19:09 +0800121 */
122 private class OnosCommunicationChannelInitializer
123 extends ChannelInitializer<SocketChannel> {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700124 @Override
YuanyouZhangebe12612015-08-05 18:19:09 +0800125 protected void initChannel(SocketChannel channel) throws Exception {
126 log.info("New channel created");
127 channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
128 channel.pipeline().addLast(new MessageDecoder());
129 handleNewNodeConnection(channel);
130
131 }
132 }
133
134 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800135 * Handles the new connection of node.
YuanyouZhangebe12612015-08-05 18:19:09 +0800136 *
137 * @param channel the channel to use.
138 */
139 private void handleNewNodeConnection(final Channel channel) {
140 executorService.execute(new Runnable() {
141 @Override
142 public void run() {
143 log.info("Handle new node connection");
144
145 IpAddress ipAddress = IpAddress
146 .valueOf(((InetSocketAddress) channel.remoteAddress())
147 .getAddress().getHostAddress());
148 long port = ((InetSocketAddress) channel.remoteAddress())
149 .getPort();
150
151 log.info("Get connection from ip address {} : {}",
152 ipAddress.toString(), port);
153
154 OvsdbNodeId nodeId = new OvsdbNodeId(ipAddress, port);
155 OvsdbProviderService ovsdbProviderService = getNodeInstance(nodeId,
156 agent,
157 monitorCallback,
158 channel);
159 ovsdbProviderService.setConnection(true);
160 OvsdbJsonRpcHandler ovsdbJsonRpcHandler = new OvsdbJsonRpcHandler(
161 nodeId);
162 ovsdbJsonRpcHandler
163 .setOvsdbProviderService(ovsdbProviderService);
164 channel.pipeline().addLast(ovsdbJsonRpcHandler);
165
166 ovsdbProviderService.nodeAdded();
167 ChannelFuture closeFuture = channel.closeFuture();
168 closeFuture
169 .addListener(new ChannelConnectionListener(
170 ovsdbProviderService));
171 }
172 });
173 }
174
175 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800176 * Gets an OVSDB client instance.
YuanyouZhangebe12612015-08-05 18:19:09 +0800177 *
BitOhenry3e104cd2015-11-10 12:18:33 +0800178 * @param nodeId data OVSDB node id
YuanyouZhangebe12612015-08-05 18:19:09 +0800179 * @param agent OvsdbAgent
180 * @param monitorCallback Callback
181 * @param channel Channel
182 * @return OvsdbProviderService instance
183 */
184 protected OvsdbProviderService getNodeInstance(OvsdbNodeId nodeId,
185 OvsdbAgent agent,
186 Callback monitorCallback,
187 Channel channel) {
188 OvsdbProviderService ovsdbProviderService = new DefaultOvsdbClient(
189 nodeId);
190 ovsdbProviderService.setAgent(agent);
191 ovsdbProviderService.setCallback(monitorCallback);
192 ovsdbProviderService.setChannel(channel);
193 return ovsdbProviderService;
194 }
195
196 /**
197 * Starts controller.
198 *
199 * @param agent OvsdbAgent
200 * @param monitorCallback Callback
201 */
202 public void start(OvsdbAgent agent, Callback monitorCallback) {
203 this.agent = agent;
204 this.monitorCallback = monitorCallback;
205 try {
206 this.run();
207 } catch (InterruptedException e) {
208 log.warn("Interrupted while waiting to start");
209 Thread.currentThread().interrupt();
210 }
211 }
212
213 /**
214 * Stops controller.
215 *
216 */
217 public void stop() {
218 workerGroup.shutdownGracefully();
219 bossGroup.shutdownGracefully();
220 }
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700221
222 /**
223 * Connect to the ovsdb server with given ip address and port number.
224 *
225 * @param ip ip address
226 * @param port port number
227 */
228 public void connect(IpAddress ip, TpPort port) {
229 ChannelFutureListener listener = new ConnectionListener(this, ip, port);
230 connectRetry(ip, port, listener);
231 }
232
233 private void connectRetry(IpAddress ip, TpPort port, ChannelFutureListener listener) {
234 try {
235 Bootstrap b = new Bootstrap();
236 b.group(workerGroup)
237 .channel(NioSocketChannel.class)
238 .option(ChannelOption.TCP_NODELAY, true)
239 .handler(new ChannelInitializer<SocketChannel>() {
240
241 @Override
242 protected void initChannel(SocketChannel channel) throws Exception {
243 ChannelPipeline p = channel.pipeline();
244 p.addLast(new MessageDecoder(),
245 new StringEncoder(CharsetUtil.UTF_8),
246 new IdleStateHandler(IDLE_TIMEOUT_SEC, 0, 0),
247 new ConnectionHandler());
248 }
249 });
250 b.remoteAddress(ip.toString(), port.toInt());
251 b.connect().addListener(listener);
252 } catch (Exception e) {
253 log.warn("Connection to the ovsdb server {}:{} failed", ip.toString(), port.toString());
254 }
255 }
256
257 private class ConnectionListener implements ChannelFutureListener {
BitOhenryf006ddd2015-11-17 13:25:41 +0800258 private Controller controller;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700259 private IpAddress ip;
260 private TpPort port;
261 private AtomicInteger count = new AtomicInteger();
262
BitOhenryf006ddd2015-11-17 13:25:41 +0800263 public ConnectionListener(Controller controller,
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700264 IpAddress ip,
265 TpPort port) {
266 this.controller = controller;
267 this.ip = ip;
268 this.port = port;
269 }
270
271 @Override
272 public void operationComplete(ChannelFuture channelFuture) throws Exception {
273 if (!channelFuture.isSuccess()) {
274 channelFuture.channel().close();
275
276 if (count.incrementAndGet() < MAX_RETRY) {
277 final EventLoop loop = channelFuture.channel().eventLoop();
278
279 loop.schedule(() -> {
280 controller.connectRetry(this.ip, this.port, this);
281 }, 1L, TimeUnit.SECONDS);
282 } else {
283 log.info("Connection to the ovsdb {}:{} failed",
284 this.ip.toString(), this.port.toString());
285 }
286 } else {
287 handleNewNodeConnection(channelFuture.channel());
288 }
289 }
290 }
291
292 private class ConnectionHandler extends ChannelDuplexHandler {
293
294 @Override
295 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
296 IdleStateEvent e = (IdleStateEvent) evt;
297
298 if (e.state() == IdleState.READER_IDLE) {
299 ctx.close();
300 }
301 }
302 }
YuanyouZhangebe12612015-08-05 18:19:09 +0800303}