blob: 1b285ec0271678f198e3aa79d949365793e85ec4 [file] [log] [blame]
YuanyouZhangebe12612015-08-05 18:19:09 +08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
YuanyouZhangebe12612015-08-05 18:19:09 +08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.ovsdb.controller.impl;
17
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070018import io.netty.bootstrap.Bootstrap;
YuanyouZhangebe12612015-08-05 18:19:09 +080019import io.netty.bootstrap.ServerBootstrap;
20import io.netty.buffer.PooledByteBufAllocator;
21import io.netty.channel.Channel;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070022import io.netty.channel.ChannelDuplexHandler;
YuanyouZhangebe12612015-08-05 18:19:09 +080023import io.netty.channel.ChannelFuture;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070024import io.netty.channel.ChannelFutureListener;
25import io.netty.channel.ChannelHandlerContext;
YuanyouZhangebe12612015-08-05 18:19:09 +080026import io.netty.channel.ChannelInitializer;
27import io.netty.channel.ChannelOption;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070028import io.netty.channel.ChannelPipeline;
29import io.netty.channel.EventLoop;
YuanyouZhangebe12612015-08-05 18:19:09 +080030import io.netty.channel.EventLoopGroup;
31import io.netty.channel.ServerChannel;
32import io.netty.channel.nio.NioEventLoopGroup;
33import io.netty.channel.socket.SocketChannel;
34import io.netty.channel.socket.nio.NioServerSocketChannel;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070035import io.netty.channel.socket.nio.NioSocketChannel;
YuanyouZhangebe12612015-08-05 18:19:09 +080036import io.netty.handler.codec.string.StringEncoder;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070037import io.netty.handler.timeout.IdleState;
38import io.netty.handler.timeout.IdleStateEvent;
39import io.netty.handler.timeout.IdleStateHandler;
YuanyouZhangebe12612015-08-05 18:19:09 +080040import io.netty.util.CharsetUtil;
41
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070042import static org.onlab.util.Tools.groupedThreads;
43
YuanyouZhangebe12612015-08-05 18:19:09 +080044import java.net.InetSocketAddress;
45import java.util.concurrent.ExecutorService;
46import java.util.concurrent.Executors;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070047import java.util.concurrent.TimeUnit;
48import java.util.concurrent.atomic.AtomicInteger;
jaegonkim1af0ae52017-01-01 10:46:55 +090049import java.util.function.Consumer;
YuanyouZhangebe12612015-08-05 18:19:09 +080050
51import org.onlab.packet.IpAddress;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070052import org.onlab.packet.TpPort;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070053import org.onlab.util.Tools;
YuanyouZhangebe12612015-08-05 18:19:09 +080054import org.onosproject.ovsdb.controller.OvsdbConstant;
55import org.onosproject.ovsdb.controller.OvsdbNodeId;
56import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient;
57import org.onosproject.ovsdb.controller.driver.OvsdbAgent;
58import org.onosproject.ovsdb.controller.driver.OvsdbProviderService;
59import org.onosproject.ovsdb.rfc.jsonrpc.Callback;
60import org.slf4j.Logger;
61import org.slf4j.LoggerFactory;
62
63/**
64 * The main controller class. Handles all setup and network listeners -
BitOhenry3e104cd2015-11-10 12:18:33 +080065 * distributed OVSDBClient.
YuanyouZhangebe12612015-08-05 18:19:09 +080066 */
BitOhenryf006ddd2015-11-17 13:25:41 +080067public class Controller {
YuanyouZhangebe12612015-08-05 18:19:09 +080068 protected static final Logger log = LoggerFactory
BitOhenryf006ddd2015-11-17 13:25:41 +080069 .getLogger(Controller.class);
YuanyouZhangebe12612015-08-05 18:19:09 +080070
71 private int ovsdbPort = OvsdbConstant.OVSDBPORT;
72
73 private OvsdbAgent agent;
74 private Callback monitorCallback;
75
76 private final ExecutorService executorService = Executors
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070077 .newFixedThreadPool(10, groupedThreads("OVSDB-C", "executor-%d", log));
YuanyouZhangebe12612015-08-05 18:19:09 +080078
79 private EventLoopGroup bossGroup;
80 private EventLoopGroup workerGroup;
81 private Class<? extends ServerChannel> serverChannelClass;
82
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070083 private static final int MAX_RETRY = 5;
84 private static final int IDLE_TIMEOUT_SEC = 10;
85
YuanyouZhangebe12612015-08-05 18:19:09 +080086 /**
87 * Initialization.
88 */
89 private void initEventLoopGroup() {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070090 bossGroup = new NioEventLoopGroup(0, Tools.groupedThreads("OVSDB-C", "boss-%d", log));
91 workerGroup = new NioEventLoopGroup(0, Tools.groupedThreads("OVSDB-C", "worker-%d", log));
YuanyouZhangebe12612015-08-05 18:19:09 +080092 serverChannelClass = NioServerSocketChannel.class;
93 }
94
95 /**
96 * Accepts incoming connections.
97 */
98 private void startAcceptingConnections() throws InterruptedException {
99 ServerBootstrap b = new ServerBootstrap();
100
101 b.group(bossGroup, workerGroup).channel(serverChannelClass)
102 .childHandler(new OnosCommunicationChannelInitializer());
103 b.option(ChannelOption.SO_BACKLOG, 128);
104 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
105 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
106 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
107 b.childOption(ChannelOption.SO_KEEPALIVE, true);
jiangrui62fb79b2015-08-21 15:37:00 +0800108 b.bind(ovsdbPort).sync();
YuanyouZhangebe12612015-08-05 18:19:09 +0800109 }
110
111 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800112 * Tells controller that we're ready to accept OVSDB node loop.
Ray Milkey9b36d812015-09-09 15:24:54 -0700113 * @throws InterruptedException if thread is interrupted
YuanyouZhangebe12612015-08-05 18:19:09 +0800114 */
115 public void run() throws InterruptedException {
116 initEventLoopGroup();
117 startAcceptingConnections();
118 }
119
120 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800121 * Adds channel pipeline to handle a new connected node.
YuanyouZhangebe12612015-08-05 18:19:09 +0800122 */
123 private class OnosCommunicationChannelInitializer
124 extends ChannelInitializer<SocketChannel> {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700125 @Override
YuanyouZhangebe12612015-08-05 18:19:09 +0800126 protected void initChannel(SocketChannel channel) throws Exception {
127 log.info("New channel created");
128 channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
129 channel.pipeline().addLast(new MessageDecoder());
130 handleNewNodeConnection(channel);
131
132 }
133 }
134
135 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800136 * Handles the new connection of node.
YuanyouZhangebe12612015-08-05 18:19:09 +0800137 *
138 * @param channel the channel to use.
139 */
140 private void handleNewNodeConnection(final Channel channel) {
141 executorService.execute(new Runnable() {
142 @Override
143 public void run() {
144 log.info("Handle new node connection");
145
146 IpAddress ipAddress = IpAddress
147 .valueOf(((InetSocketAddress) channel.remoteAddress())
148 .getAddress().getHostAddress());
149 long port = ((InetSocketAddress) channel.remoteAddress())
150 .getPort();
151
152 log.info("Get connection from ip address {} : {}",
153 ipAddress.toString(), port);
154
155 OvsdbNodeId nodeId = new OvsdbNodeId(ipAddress, port);
156 OvsdbProviderService ovsdbProviderService = getNodeInstance(nodeId,
157 agent,
158 monitorCallback,
159 channel);
160 ovsdbProviderService.setConnection(true);
161 OvsdbJsonRpcHandler ovsdbJsonRpcHandler = new OvsdbJsonRpcHandler(
162 nodeId);
163 ovsdbJsonRpcHandler
164 .setOvsdbProviderService(ovsdbProviderService);
165 channel.pipeline().addLast(ovsdbJsonRpcHandler);
166
167 ovsdbProviderService.nodeAdded();
168 ChannelFuture closeFuture = channel.closeFuture();
169 closeFuture
170 .addListener(new ChannelConnectionListener(
171 ovsdbProviderService));
172 }
173 });
174 }
175
176 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800177 * Gets an OVSDB client instance.
YuanyouZhangebe12612015-08-05 18:19:09 +0800178 *
BitOhenry3e104cd2015-11-10 12:18:33 +0800179 * @param nodeId data OVSDB node id
YuanyouZhangebe12612015-08-05 18:19:09 +0800180 * @param agent OvsdbAgent
181 * @param monitorCallback Callback
182 * @param channel Channel
183 * @return OvsdbProviderService instance
184 */
185 protected OvsdbProviderService getNodeInstance(OvsdbNodeId nodeId,
186 OvsdbAgent agent,
187 Callback monitorCallback,
188 Channel channel) {
189 OvsdbProviderService ovsdbProviderService = new DefaultOvsdbClient(
190 nodeId);
191 ovsdbProviderService.setAgent(agent);
192 ovsdbProviderService.setCallback(monitorCallback);
193 ovsdbProviderService.setChannel(channel);
194 return ovsdbProviderService;
195 }
196
197 /**
198 * Starts controller.
199 *
200 * @param agent OvsdbAgent
201 * @param monitorCallback Callback
202 */
203 public void start(OvsdbAgent agent, Callback monitorCallback) {
204 this.agent = agent;
205 this.monitorCallback = monitorCallback;
206 try {
207 this.run();
208 } catch (InterruptedException e) {
209 log.warn("Interrupted while waiting to start");
210 Thread.currentThread().interrupt();
211 }
212 }
213
214 /**
215 * Stops controller.
216 *
217 */
218 public void stop() {
219 workerGroup.shutdownGracefully();
220 bossGroup.shutdownGracefully();
221 }
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700222
223 /**
224 * Connect to the ovsdb server with given ip address and port number.
225 *
226 * @param ip ip address
227 * @param port port number
228 */
229 public void connect(IpAddress ip, TpPort port) {
jaegonkim1af0ae52017-01-01 10:46:55 +0900230 connect(ip, port, e -> log.warn("Connection to the ovsdb {}:{} failed(cause: {})", ip, port, e));
231 }
232
233 /**
234 * Connect to the ovsdb server with given ip address, port number, and failhandler.
235 *
236 * @param ip ip address
237 * @param port port number
238 * @param failhandler connection failure handler
239 */
240 public void connect(IpAddress ip, TpPort port, Consumer<Exception> failhandler) {
241 ChannelFutureListener listener = new ConnectionListener(this, ip, port, failhandler);
242 try {
243 connectRetry(ip, port, listener);
244 } catch (Exception e) {
245 failhandler.accept(e);
246 }
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700247 }
248
249 private void connectRetry(IpAddress ip, TpPort port, ChannelFutureListener listener) {
jaegonkim1af0ae52017-01-01 10:46:55 +0900250 Bootstrap b = new Bootstrap();
251 b.group(workerGroup)
252 .channel(NioSocketChannel.class)
253 .option(ChannelOption.TCP_NODELAY, true)
254 .handler(new ChannelInitializer<SocketChannel>() {
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700255
jaegonkim1af0ae52017-01-01 10:46:55 +0900256 @Override
257 protected void initChannel(SocketChannel channel) throws Exception {
258 ChannelPipeline p = channel.pipeline();
259 p.addLast(new MessageDecoder(),
260 new StringEncoder(CharsetUtil.UTF_8),
261 new IdleStateHandler(IDLE_TIMEOUT_SEC, 0, 0),
262 new ConnectionHandler());
263 }
264 });
265 b.remoteAddress(ip.toString(), port.toInt());
266 b.connect().addListener(listener);
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700267 }
268
269 private class ConnectionListener implements ChannelFutureListener {
BitOhenryf006ddd2015-11-17 13:25:41 +0800270 private Controller controller;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700271 private IpAddress ip;
272 private TpPort port;
273 private AtomicInteger count = new AtomicInteger();
jaegonkim1af0ae52017-01-01 10:46:55 +0900274 private Consumer<Exception> failhandler;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700275
BitOhenryf006ddd2015-11-17 13:25:41 +0800276 public ConnectionListener(Controller controller,
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700277 IpAddress ip,
jaegonkim1af0ae52017-01-01 10:46:55 +0900278 TpPort port,
279 Consumer<Exception> failhandler) {
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700280 this.controller = controller;
281 this.ip = ip;
282 this.port = port;
jaegonkim1af0ae52017-01-01 10:46:55 +0900283 this.failhandler = failhandler;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700284 }
285
286 @Override
287 public void operationComplete(ChannelFuture channelFuture) throws Exception {
288 if (!channelFuture.isSuccess()) {
289 channelFuture.channel().close();
290
291 if (count.incrementAndGet() < MAX_RETRY) {
292 final EventLoop loop = channelFuture.channel().eventLoop();
293
294 loop.schedule(() -> {
jaegonkim1af0ae52017-01-01 10:46:55 +0900295 try {
296 controller.connectRetry(this.ip, this.port, this);
297 } catch (Exception e) {
298 log.warn("Connection to the ovsdb server {}:{} failed(cause: {})", ip, port, e);
299 }
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700300 }, 1L, TimeUnit.SECONDS);
301 } else {
jaegonkim1af0ae52017-01-01 10:46:55 +0900302 failhandler.accept(new Exception("max connection retry(" + MAX_RETRY + ") exceeded"));
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700303 }
304 } else {
305 handleNewNodeConnection(channelFuture.channel());
306 }
307 }
308 }
309
310 private class ConnectionHandler extends ChannelDuplexHandler {
311
312 @Override
313 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
314 IdleStateEvent e = (IdleStateEvent) evt;
315
316 if (e.state() == IdleState.READER_IDLE) {
317 ctx.close();
318 }
319 }
320 }
YuanyouZhangebe12612015-08-05 18:19:09 +0800321}