blob: 8bf0609838a6f93760e2b2172d0a71788229e6c5 [file] [log] [blame]
YuanyouZhangebe12612015-08-05 18:19:09 +08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
YuanyouZhangebe12612015-08-05 18:19:09 +08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.ovsdb.controller.impl;
17
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070018import io.netty.bootstrap.Bootstrap;
YuanyouZhangebe12612015-08-05 18:19:09 +080019import io.netty.bootstrap.ServerBootstrap;
20import io.netty.buffer.PooledByteBufAllocator;
21import io.netty.channel.Channel;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070022import io.netty.channel.ChannelDuplexHandler;
YuanyouZhangebe12612015-08-05 18:19:09 +080023import io.netty.channel.ChannelFuture;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070024import io.netty.channel.ChannelFutureListener;
25import io.netty.channel.ChannelHandlerContext;
YuanyouZhangebe12612015-08-05 18:19:09 +080026import io.netty.channel.ChannelInitializer;
27import io.netty.channel.ChannelOption;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070028import io.netty.channel.ChannelPipeline;
29import io.netty.channel.EventLoop;
YuanyouZhangebe12612015-08-05 18:19:09 +080030import io.netty.channel.EventLoopGroup;
31import io.netty.channel.ServerChannel;
32import io.netty.channel.nio.NioEventLoopGroup;
33import io.netty.channel.socket.SocketChannel;
34import io.netty.channel.socket.nio.NioServerSocketChannel;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070035import io.netty.channel.socket.nio.NioSocketChannel;
YuanyouZhangebe12612015-08-05 18:19:09 +080036import io.netty.handler.codec.string.StringEncoder;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070037import io.netty.handler.timeout.IdleState;
38import io.netty.handler.timeout.IdleStateEvent;
39import io.netty.handler.timeout.IdleStateHandler;
YuanyouZhangebe12612015-08-05 18:19:09 +080040import io.netty.util.CharsetUtil;
41
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070042import static org.onlab.util.Tools.groupedThreads;
43
YuanyouZhangebe12612015-08-05 18:19:09 +080044import java.net.InetSocketAddress;
45import java.util.concurrent.ExecutorService;
46import java.util.concurrent.Executors;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070047import java.util.concurrent.TimeUnit;
48import java.util.concurrent.atomic.AtomicInteger;
jaegonkim1af0ae52017-01-01 10:46:55 +090049import java.util.function.Consumer;
YuanyouZhangebe12612015-08-05 18:19:09 +080050
51import org.onlab.packet.IpAddress;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070052import org.onlab.packet.TpPort;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070053import org.onlab.util.Tools;
YuanyouZhangebe12612015-08-05 18:19:09 +080054import org.onosproject.ovsdb.controller.OvsdbConstant;
55import org.onosproject.ovsdb.controller.OvsdbNodeId;
56import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient;
57import org.onosproject.ovsdb.controller.driver.OvsdbAgent;
58import org.onosproject.ovsdb.controller.driver.OvsdbProviderService;
59import org.onosproject.ovsdb.rfc.jsonrpc.Callback;
60import org.slf4j.Logger;
61import org.slf4j.LoggerFactory;
62
63/**
64 * The main controller class. Handles all setup and network listeners -
BitOhenry3e104cd2015-11-10 12:18:33 +080065 * distributed OVSDBClient.
YuanyouZhangebe12612015-08-05 18:19:09 +080066 */
BitOhenryf006ddd2015-11-17 13:25:41 +080067public class Controller {
Ray Milkey9c9cde42018-01-12 14:22:06 -080068 private static final Logger log = LoggerFactory
BitOhenryf006ddd2015-11-17 13:25:41 +080069 .getLogger(Controller.class);
YuanyouZhangebe12612015-08-05 18:19:09 +080070
71 private int ovsdbPort = OvsdbConstant.OVSDBPORT;
72
73 private OvsdbAgent agent;
74 private Callback monitorCallback;
75
76 private final ExecutorService executorService = Executors
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070077 .newFixedThreadPool(10, groupedThreads("OVSDB-C", "executor-%d", log));
YuanyouZhangebe12612015-08-05 18:19:09 +080078
79 private EventLoopGroup bossGroup;
80 private EventLoopGroup workerGroup;
81 private Class<? extends ServerChannel> serverChannelClass;
82
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070083 private static final int MAX_RETRY = 5;
84 private static final int IDLE_TIMEOUT_SEC = 10;
85
YuanyouZhangebe12612015-08-05 18:19:09 +080086 /**
87 * Initialization.
88 */
89 private void initEventLoopGroup() {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070090 bossGroup = new NioEventLoopGroup(0, Tools.groupedThreads("OVSDB-C", "boss-%d", log));
91 workerGroup = new NioEventLoopGroup(0, Tools.groupedThreads("OVSDB-C", "worker-%d", log));
YuanyouZhangebe12612015-08-05 18:19:09 +080092 serverChannelClass = NioServerSocketChannel.class;
93 }
94
95 /**
96 * Accepts incoming connections.
97 */
98 private void startAcceptingConnections() throws InterruptedException {
99 ServerBootstrap b = new ServerBootstrap();
100
101 b.group(bossGroup, workerGroup).channel(serverChannelClass)
102 .childHandler(new OnosCommunicationChannelInitializer());
103 b.option(ChannelOption.SO_BACKLOG, 128);
104 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
105 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
106 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
107 b.childOption(ChannelOption.SO_KEEPALIVE, true);
jiangrui62fb79b2015-08-21 15:37:00 +0800108 b.bind(ovsdbPort).sync();
YuanyouZhangebe12612015-08-05 18:19:09 +0800109 }
110
111 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800112 * Tells controller that we're ready to accept OVSDB node loop.
Ray Milkey9b36d812015-09-09 15:24:54 -0700113 * @throws InterruptedException if thread is interrupted
YuanyouZhangebe12612015-08-05 18:19:09 +0800114 */
115 public void run() throws InterruptedException {
116 initEventLoopGroup();
117 startAcceptingConnections();
118 }
119
120 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800121 * Adds channel pipeline to handle a new connected node.
YuanyouZhangebe12612015-08-05 18:19:09 +0800122 */
123 private class OnosCommunicationChannelInitializer
124 extends ChannelInitializer<SocketChannel> {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700125 @Override
YuanyouZhangebe12612015-08-05 18:19:09 +0800126 protected void initChannel(SocketChannel channel) throws Exception {
127 log.info("New channel created");
128 channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
129 channel.pipeline().addLast(new MessageDecoder());
130 handleNewNodeConnection(channel);
131
132 }
133 }
134
135 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800136 * Handles the new connection of node.
YuanyouZhangebe12612015-08-05 18:19:09 +0800137 *
138 * @param channel the channel to use.
139 */
140 private void handleNewNodeConnection(final Channel channel) {
kdarapufce5abb2018-05-10 19:37:53 +0530141 executorService.execute(() -> {
142 log.info("Handle new node connection");
YuanyouZhangebe12612015-08-05 18:19:09 +0800143
kdarapufce5abb2018-05-10 19:37:53 +0530144 IpAddress ipAddress = IpAddress
145 .valueOf(((InetSocketAddress) channel.remoteAddress())
146 .getAddress().getHostAddress());
147 long port = ((InetSocketAddress) channel.remoteAddress())
148 .getPort();
YuanyouZhangebe12612015-08-05 18:19:09 +0800149
kdarapufce5abb2018-05-10 19:37:53 +0530150 log.info("Get connection from ip address {} : {}",
151 ipAddress.toString(), port);
YuanyouZhangebe12612015-08-05 18:19:09 +0800152
kdarapufce5abb2018-05-10 19:37:53 +0530153 OvsdbNodeId nodeId = new OvsdbNodeId(ipAddress, port);
154 OvsdbProviderService ovsdbProviderService = getNodeInstance(nodeId,
155 agent,
156 monitorCallback,
157 channel);
158 ovsdbProviderService.setConnection(true);
159 OvsdbJsonRpcHandler ovsdbJsonRpcHandler = new OvsdbJsonRpcHandler(
160 nodeId);
161 ovsdbJsonRpcHandler
162 .setOvsdbProviderService(ovsdbProviderService);
163 channel.pipeline().addLast(ovsdbJsonRpcHandler);
YuanyouZhangebe12612015-08-05 18:19:09 +0800164
kdarapufce5abb2018-05-10 19:37:53 +0530165 ovsdbProviderService.nodeAdded();
166 ChannelFuture closeFuture = channel.closeFuture();
167 closeFuture
168 .addListener(new ChannelConnectionListener(
169 ovsdbProviderService));
YuanyouZhangebe12612015-08-05 18:19:09 +0800170 });
171 }
172
173 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800174 * Gets an OVSDB client instance.
YuanyouZhangebe12612015-08-05 18:19:09 +0800175 *
BitOhenry3e104cd2015-11-10 12:18:33 +0800176 * @param nodeId data OVSDB node id
YuanyouZhangebe12612015-08-05 18:19:09 +0800177 * @param agent OvsdbAgent
178 * @param monitorCallback Callback
179 * @param channel Channel
180 * @return OvsdbProviderService instance
181 */
182 protected OvsdbProviderService getNodeInstance(OvsdbNodeId nodeId,
183 OvsdbAgent agent,
184 Callback monitorCallback,
185 Channel channel) {
186 OvsdbProviderService ovsdbProviderService = new DefaultOvsdbClient(
187 nodeId);
188 ovsdbProviderService.setAgent(agent);
189 ovsdbProviderService.setCallback(monitorCallback);
190 ovsdbProviderService.setChannel(channel);
191 return ovsdbProviderService;
192 }
193
194 /**
195 * Starts controller.
196 *
197 * @param agent OvsdbAgent
198 * @param monitorCallback Callback
Jian Lieaf31032018-05-03 15:54:03 +0900199 * @param mode OVSDB server mode flag
YuanyouZhangebe12612015-08-05 18:19:09 +0800200 */
Jian Lieaf31032018-05-03 15:54:03 +0900201 public void start(OvsdbAgent agent, Callback monitorCallback, boolean mode) {
YuanyouZhangebe12612015-08-05 18:19:09 +0800202 this.agent = agent;
203 this.monitorCallback = monitorCallback;
Jian Lieaf31032018-05-03 15:54:03 +0900204 // if the OVSDB server flag is configured as false, we do NOT listen on 6640 port
205 // therefore, ONOS only runs as an OVSDB client
206 if (mode) {
207 try {
208 this.run();
209 } catch (InterruptedException e) {
210 log.warn("Interrupted while waiting to start");
211 Thread.currentThread().interrupt();
212 }
YuanyouZhangebe12612015-08-05 18:19:09 +0800213 }
214 }
215
216 /**
217 * Stops controller.
218 *
219 */
220 public void stop() {
221 workerGroup.shutdownGracefully();
222 bossGroup.shutdownGracefully();
223 }
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700224
225 /**
226 * Connect to the ovsdb server with given ip address and port number.
227 *
228 * @param ip ip address
229 * @param port port number
230 */
231 public void connect(IpAddress ip, TpPort port) {
jaegonkim1af0ae52017-01-01 10:46:55 +0900232 connect(ip, port, e -> log.warn("Connection to the ovsdb {}:{} failed(cause: {})", ip, port, e));
233 }
234
235 /**
236 * Connect to the ovsdb server with given ip address, port number, and failhandler.
237 *
238 * @param ip ip address
239 * @param port port number
240 * @param failhandler connection failure handler
241 */
242 public void connect(IpAddress ip, TpPort port, Consumer<Exception> failhandler) {
243 ChannelFutureListener listener = new ConnectionListener(this, ip, port, failhandler);
244 try {
245 connectRetry(ip, port, listener);
246 } catch (Exception e) {
247 failhandler.accept(e);
248 }
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700249 }
250
251 private void connectRetry(IpAddress ip, TpPort port, ChannelFutureListener listener) {
jaegonkim1af0ae52017-01-01 10:46:55 +0900252 Bootstrap b = new Bootstrap();
253 b.group(workerGroup)
254 .channel(NioSocketChannel.class)
255 .option(ChannelOption.TCP_NODELAY, true)
256 .handler(new ChannelInitializer<SocketChannel>() {
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700257
jaegonkim1af0ae52017-01-01 10:46:55 +0900258 @Override
259 protected void initChannel(SocketChannel channel) throws Exception {
260 ChannelPipeline p = channel.pipeline();
261 p.addLast(new MessageDecoder(),
262 new StringEncoder(CharsetUtil.UTF_8),
263 new IdleStateHandler(IDLE_TIMEOUT_SEC, 0, 0),
264 new ConnectionHandler());
265 }
266 });
267 b.remoteAddress(ip.toString(), port.toInt());
268 b.connect().addListener(listener);
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700269 }
270
271 private class ConnectionListener implements ChannelFutureListener {
BitOhenryf006ddd2015-11-17 13:25:41 +0800272 private Controller controller;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700273 private IpAddress ip;
274 private TpPort port;
275 private AtomicInteger count = new AtomicInteger();
jaegonkim1af0ae52017-01-01 10:46:55 +0900276 private Consumer<Exception> failhandler;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700277
BitOhenryf006ddd2015-11-17 13:25:41 +0800278 public ConnectionListener(Controller controller,
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700279 IpAddress ip,
jaegonkim1af0ae52017-01-01 10:46:55 +0900280 TpPort port,
281 Consumer<Exception> failhandler) {
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700282 this.controller = controller;
283 this.ip = ip;
284 this.port = port;
jaegonkim1af0ae52017-01-01 10:46:55 +0900285 this.failhandler = failhandler;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700286 }
287
288 @Override
289 public void operationComplete(ChannelFuture channelFuture) throws Exception {
290 if (!channelFuture.isSuccess()) {
291 channelFuture.channel().close();
292
293 if (count.incrementAndGet() < MAX_RETRY) {
294 final EventLoop loop = channelFuture.channel().eventLoop();
295
296 loop.schedule(() -> {
jaegonkim1af0ae52017-01-01 10:46:55 +0900297 try {
298 controller.connectRetry(this.ip, this.port, this);
299 } catch (Exception e) {
300 log.warn("Connection to the ovsdb server {}:{} failed(cause: {})", ip, port, e);
301 }
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700302 }, 1L, TimeUnit.SECONDS);
303 } else {
jaegonkim1af0ae52017-01-01 10:46:55 +0900304 failhandler.accept(new Exception("max connection retry(" + MAX_RETRY + ") exceeded"));
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700305 }
306 } else {
307 handleNewNodeConnection(channelFuture.channel());
308 }
309 }
310 }
311
312 private class ConnectionHandler extends ChannelDuplexHandler {
313
314 @Override
315 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
316 IdleStateEvent e = (IdleStateEvent) evt;
317
318 if (e.state() == IdleState.READER_IDLE) {
319 ctx.close();
320 }
321 }
322 }
YuanyouZhangebe12612015-08-05 18:19:09 +0800323}