blob: fcbbff8e511fba3947b04599d3919918dda6d1d9 [file] [log] [blame]
YuanyouZhangebe12612015-08-05 18:19:09 +08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.ovsdb.controller.impl;
17
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070018import io.netty.bootstrap.Bootstrap;
YuanyouZhangebe12612015-08-05 18:19:09 +080019import io.netty.bootstrap.ServerBootstrap;
20import io.netty.buffer.PooledByteBufAllocator;
21import io.netty.channel.Channel;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070022import io.netty.channel.ChannelDuplexHandler;
YuanyouZhangebe12612015-08-05 18:19:09 +080023import io.netty.channel.ChannelFuture;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070024import io.netty.channel.ChannelFutureListener;
25import io.netty.channel.ChannelHandlerContext;
YuanyouZhangebe12612015-08-05 18:19:09 +080026import io.netty.channel.ChannelInitializer;
27import io.netty.channel.ChannelOption;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070028import io.netty.channel.ChannelPipeline;
29import io.netty.channel.EventLoop;
YuanyouZhangebe12612015-08-05 18:19:09 +080030import io.netty.channel.EventLoopGroup;
31import io.netty.channel.ServerChannel;
32import io.netty.channel.nio.NioEventLoopGroup;
33import io.netty.channel.socket.SocketChannel;
34import io.netty.channel.socket.nio.NioServerSocketChannel;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070035import io.netty.channel.socket.nio.NioSocketChannel;
YuanyouZhangebe12612015-08-05 18:19:09 +080036import io.netty.handler.codec.string.StringEncoder;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070037import io.netty.handler.timeout.IdleState;
38import io.netty.handler.timeout.IdleStateEvent;
39import io.netty.handler.timeout.IdleStateHandler;
YuanyouZhangebe12612015-08-05 18:19:09 +080040import io.netty.util.CharsetUtil;
41
42import java.net.InetSocketAddress;
43import java.util.concurrent.ExecutorService;
44import java.util.concurrent.Executors;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070045import java.util.concurrent.TimeUnit;
46import java.util.concurrent.atomic.AtomicInteger;
YuanyouZhangebe12612015-08-05 18:19:09 +080047
48import org.onlab.packet.IpAddress;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070049import org.onlab.packet.TpPort;
YuanyouZhangebe12612015-08-05 18:19:09 +080050import org.onosproject.ovsdb.controller.OvsdbConstant;
51import org.onosproject.ovsdb.controller.OvsdbNodeId;
52import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient;
53import org.onosproject.ovsdb.controller.driver.OvsdbAgent;
54import org.onosproject.ovsdb.controller.driver.OvsdbProviderService;
55import org.onosproject.ovsdb.rfc.jsonrpc.Callback;
56import org.slf4j.Logger;
57import org.slf4j.LoggerFactory;
58
59/**
60 * The main controller class. Handles all setup and network listeners -
BitOhenry3e104cd2015-11-10 12:18:33 +080061 * distributed OVSDBClient.
YuanyouZhangebe12612015-08-05 18:19:09 +080062 */
BitOhenryf006ddd2015-11-17 13:25:41 +080063public class Controller {
YuanyouZhangebe12612015-08-05 18:19:09 +080064 protected static final Logger log = LoggerFactory
BitOhenryf006ddd2015-11-17 13:25:41 +080065 .getLogger(Controller.class);
YuanyouZhangebe12612015-08-05 18:19:09 +080066
67 private int ovsdbPort = OvsdbConstant.OVSDBPORT;
68
69 private OvsdbAgent agent;
70 private Callback monitorCallback;
71
72 private final ExecutorService executorService = Executors
73 .newFixedThreadPool(10);
74
75 private EventLoopGroup bossGroup;
76 private EventLoopGroup workerGroup;
77 private Class<? extends ServerChannel> serverChannelClass;
78
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070079 private static final int MAX_RETRY = 5;
80 private static final int IDLE_TIMEOUT_SEC = 10;
81
YuanyouZhangebe12612015-08-05 18:19:09 +080082 /**
83 * Initialization.
84 */
85 private void initEventLoopGroup() {
86 bossGroup = new NioEventLoopGroup();
87 workerGroup = new NioEventLoopGroup();
88 serverChannelClass = NioServerSocketChannel.class;
89 }
90
91 /**
92 * Accepts incoming connections.
93 */
94 private void startAcceptingConnections() throws InterruptedException {
95 ServerBootstrap b = new ServerBootstrap();
96
97 b.group(bossGroup, workerGroup).channel(serverChannelClass)
98 .childHandler(new OnosCommunicationChannelInitializer());
99 b.option(ChannelOption.SO_BACKLOG, 128);
100 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
101 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
102 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
103 b.childOption(ChannelOption.SO_KEEPALIVE, true);
jiangrui62fb79b2015-08-21 15:37:00 +0800104 b.bind(ovsdbPort).sync();
YuanyouZhangebe12612015-08-05 18:19:09 +0800105 }
106
107 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800108 * Tells controller that we're ready to accept OVSDB node loop.
Ray Milkey9b36d812015-09-09 15:24:54 -0700109 * @throws InterruptedException if thread is interrupted
YuanyouZhangebe12612015-08-05 18:19:09 +0800110 */
111 public void run() throws InterruptedException {
112 initEventLoopGroup();
113 startAcceptingConnections();
114 }
115
116 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800117 * Adds channel pipeline to handle a new connected node.
YuanyouZhangebe12612015-08-05 18:19:09 +0800118 */
119 private class OnosCommunicationChannelInitializer
120 extends ChannelInitializer<SocketChannel> {
121 protected void initChannel(SocketChannel channel) throws Exception {
122 log.info("New channel created");
123 channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
124 channel.pipeline().addLast(new MessageDecoder());
125 handleNewNodeConnection(channel);
126
127 }
128 }
129
130 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800131 * Handles the new connection of node.
YuanyouZhangebe12612015-08-05 18:19:09 +0800132 *
133 * @param channel the channel to use.
134 */
135 private void handleNewNodeConnection(final Channel channel) {
136 executorService.execute(new Runnable() {
137 @Override
138 public void run() {
139 log.info("Handle new node connection");
140
141 IpAddress ipAddress = IpAddress
142 .valueOf(((InetSocketAddress) channel.remoteAddress())
143 .getAddress().getHostAddress());
144 long port = ((InetSocketAddress) channel.remoteAddress())
145 .getPort();
146
147 log.info("Get connection from ip address {} : {}",
148 ipAddress.toString(), port);
149
150 OvsdbNodeId nodeId = new OvsdbNodeId(ipAddress, port);
151 OvsdbProviderService ovsdbProviderService = getNodeInstance(nodeId,
152 agent,
153 monitorCallback,
154 channel);
155 ovsdbProviderService.setConnection(true);
156 OvsdbJsonRpcHandler ovsdbJsonRpcHandler = new OvsdbJsonRpcHandler(
157 nodeId);
158 ovsdbJsonRpcHandler
159 .setOvsdbProviderService(ovsdbProviderService);
160 channel.pipeline().addLast(ovsdbJsonRpcHandler);
161
162 ovsdbProviderService.nodeAdded();
163 ChannelFuture closeFuture = channel.closeFuture();
164 closeFuture
165 .addListener(new ChannelConnectionListener(
166 ovsdbProviderService));
167 }
168 });
169 }
170
171 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800172 * Gets an OVSDB client instance.
YuanyouZhangebe12612015-08-05 18:19:09 +0800173 *
BitOhenry3e104cd2015-11-10 12:18:33 +0800174 * @param nodeId data OVSDB node id
YuanyouZhangebe12612015-08-05 18:19:09 +0800175 * @param agent OvsdbAgent
176 * @param monitorCallback Callback
177 * @param channel Channel
178 * @return OvsdbProviderService instance
179 */
180 protected OvsdbProviderService getNodeInstance(OvsdbNodeId nodeId,
181 OvsdbAgent agent,
182 Callback monitorCallback,
183 Channel channel) {
184 OvsdbProviderService ovsdbProviderService = new DefaultOvsdbClient(
185 nodeId);
186 ovsdbProviderService.setAgent(agent);
187 ovsdbProviderService.setCallback(monitorCallback);
188 ovsdbProviderService.setChannel(channel);
189 return ovsdbProviderService;
190 }
191
192 /**
193 * Starts controller.
194 *
195 * @param agent OvsdbAgent
196 * @param monitorCallback Callback
197 */
198 public void start(OvsdbAgent agent, Callback monitorCallback) {
199 this.agent = agent;
200 this.monitorCallback = monitorCallback;
201 try {
202 this.run();
203 } catch (InterruptedException e) {
204 log.warn("Interrupted while waiting to start");
205 Thread.currentThread().interrupt();
206 }
207 }
208
209 /**
210 * Stops controller.
211 *
212 */
213 public void stop() {
214 workerGroup.shutdownGracefully();
215 bossGroup.shutdownGracefully();
216 }
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700217
218 /**
219 * Connect to the ovsdb server with given ip address and port number.
220 *
221 * @param ip ip address
222 * @param port port number
223 */
224 public void connect(IpAddress ip, TpPort port) {
225 ChannelFutureListener listener = new ConnectionListener(this, ip, port);
226 connectRetry(ip, port, listener);
227 }
228
229 private void connectRetry(IpAddress ip, TpPort port, ChannelFutureListener listener) {
230 try {
231 Bootstrap b = new Bootstrap();
232 b.group(workerGroup)
233 .channel(NioSocketChannel.class)
234 .option(ChannelOption.TCP_NODELAY, true)
235 .handler(new ChannelInitializer<SocketChannel>() {
236
237 @Override
238 protected void initChannel(SocketChannel channel) throws Exception {
239 ChannelPipeline p = channel.pipeline();
240 p.addLast(new MessageDecoder(),
241 new StringEncoder(CharsetUtil.UTF_8),
242 new IdleStateHandler(IDLE_TIMEOUT_SEC, 0, 0),
243 new ConnectionHandler());
244 }
245 });
246 b.remoteAddress(ip.toString(), port.toInt());
247 b.connect().addListener(listener);
248 } catch (Exception e) {
249 log.warn("Connection to the ovsdb server {}:{} failed", ip.toString(), port.toString());
250 }
251 }
252
253 private class ConnectionListener implements ChannelFutureListener {
BitOhenryf006ddd2015-11-17 13:25:41 +0800254 private Controller controller;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700255 private IpAddress ip;
256 private TpPort port;
257 private AtomicInteger count = new AtomicInteger();
258
BitOhenryf006ddd2015-11-17 13:25:41 +0800259 public ConnectionListener(Controller controller,
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700260 IpAddress ip,
261 TpPort port) {
262 this.controller = controller;
263 this.ip = ip;
264 this.port = port;
265 }
266
267 @Override
268 public void operationComplete(ChannelFuture channelFuture) throws Exception {
269 if (!channelFuture.isSuccess()) {
270 channelFuture.channel().close();
271
272 if (count.incrementAndGet() < MAX_RETRY) {
273 final EventLoop loop = channelFuture.channel().eventLoop();
274
275 loop.schedule(() -> {
276 controller.connectRetry(this.ip, this.port, this);
277 }, 1L, TimeUnit.SECONDS);
278 } else {
279 log.info("Connection to the ovsdb {}:{} failed",
280 this.ip.toString(), this.port.toString());
281 }
282 } else {
283 handleNewNodeConnection(channelFuture.channel());
284 }
285 }
286 }
287
288 private class ConnectionHandler extends ChannelDuplexHandler {
289
290 @Override
291 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
292 IdleStateEvent e = (IdleStateEvent) evt;
293
294 if (e.state() == IdleState.READER_IDLE) {
295 ctx.close();
296 }
297 }
298 }
YuanyouZhangebe12612015-08-05 18:19:09 +0800299}