blob: 86998c0f64d03d107e6ad43c2c4c316d2739ba34 [file] [log] [blame]
YuanyouZhangebe12612015-08-05 18:19:09 +08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
YuanyouZhangebe12612015-08-05 18:19:09 +08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.ovsdb.controller.impl;
17
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070018import io.netty.bootstrap.Bootstrap;
YuanyouZhangebe12612015-08-05 18:19:09 +080019import io.netty.bootstrap.ServerBootstrap;
20import io.netty.buffer.PooledByteBufAllocator;
21import io.netty.channel.Channel;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070022import io.netty.channel.ChannelDuplexHandler;
YuanyouZhangebe12612015-08-05 18:19:09 +080023import io.netty.channel.ChannelFuture;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070024import io.netty.channel.ChannelFutureListener;
25import io.netty.channel.ChannelHandlerContext;
YuanyouZhangebe12612015-08-05 18:19:09 +080026import io.netty.channel.ChannelInitializer;
27import io.netty.channel.ChannelOption;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070028import io.netty.channel.ChannelPipeline;
29import io.netty.channel.EventLoop;
YuanyouZhangebe12612015-08-05 18:19:09 +080030import io.netty.channel.EventLoopGroup;
31import io.netty.channel.ServerChannel;
debanshur37cf6ba2018-05-08 20:07:30 +053032import io.netty.channel.group.ChannelGroup;
33import io.netty.channel.group.DefaultChannelGroup;
YuanyouZhangebe12612015-08-05 18:19:09 +080034import io.netty.channel.nio.NioEventLoopGroup;
35import io.netty.channel.socket.SocketChannel;
36import io.netty.channel.socket.nio.NioServerSocketChannel;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070037import io.netty.channel.socket.nio.NioSocketChannel;
YuanyouZhangebe12612015-08-05 18:19:09 +080038import io.netty.handler.codec.string.StringEncoder;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070039import io.netty.handler.timeout.IdleState;
40import io.netty.handler.timeout.IdleStateEvent;
41import io.netty.handler.timeout.IdleStateHandler;
YuanyouZhangebe12612015-08-05 18:19:09 +080042import io.netty.util.CharsetUtil;
43
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070044import static org.onlab.util.Tools.groupedThreads;
45
debanshur37cf6ba2018-05-08 20:07:30 +053046import java.io.FileInputStream;
47import java.io.IOException;
YuanyouZhangebe12612015-08-05 18:19:09 +080048import java.net.InetSocketAddress;
debanshur37cf6ba2018-05-08 20:07:30 +053049import java.security.KeyManagementException;
50import java.security.KeyStore;
51import java.security.KeyStoreException;
52import java.security.NoSuchAlgorithmException;
53import java.security.UnrecoverableKeyException;
54import java.security.cert.CertificateException;
55import java.util.Objects;
YuanyouZhangebe12612015-08-05 18:19:09 +080056import java.util.concurrent.ExecutorService;
57import java.util.concurrent.Executors;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070058import java.util.concurrent.TimeUnit;
59import java.util.concurrent.atomic.AtomicInteger;
jaegonkim1af0ae52017-01-01 10:46:55 +090060import java.util.function.Consumer;
YuanyouZhangebe12612015-08-05 18:19:09 +080061
debanshur37cf6ba2018-05-08 20:07:30 +053062import io.netty.util.concurrent.GlobalEventExecutor;
YuanyouZhangebe12612015-08-05 18:19:09 +080063import org.onlab.packet.IpAddress;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070064import org.onlab.packet.TpPort;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070065import org.onlab.util.Tools;
YuanyouZhangebe12612015-08-05 18:19:09 +080066import org.onosproject.ovsdb.controller.OvsdbConstant;
67import org.onosproject.ovsdb.controller.OvsdbNodeId;
68import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient;
69import org.onosproject.ovsdb.controller.driver.OvsdbAgent;
70import org.onosproject.ovsdb.controller.driver.OvsdbProviderService;
71import org.onosproject.ovsdb.rfc.jsonrpc.Callback;
72import org.slf4j.Logger;
73import org.slf4j.LoggerFactory;
74
debanshur37cf6ba2018-05-08 20:07:30 +053075import javax.net.ssl.KeyManagerFactory;
76import javax.net.ssl.SSLContext;
77import javax.net.ssl.TrustManagerFactory;
78
YuanyouZhangebe12612015-08-05 18:19:09 +080079/**
80 * The main controller class. Handles all setup and network listeners -
BitOhenry3e104cd2015-11-10 12:18:33 +080081 * distributed OVSDBClient.
YuanyouZhangebe12612015-08-05 18:19:09 +080082 */
BitOhenryf006ddd2015-11-17 13:25:41 +080083public class Controller {
Ray Milkey9c9cde42018-01-12 14:22:06 -080084 private static final Logger log = LoggerFactory
BitOhenryf006ddd2015-11-17 13:25:41 +080085 .getLogger(Controller.class);
YuanyouZhangebe12612015-08-05 18:19:09 +080086
87 private int ovsdbPort = OvsdbConstant.OVSDBPORT;
88
89 private OvsdbAgent agent;
90 private Callback monitorCallback;
91
92 private final ExecutorService executorService = Executors
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070093 .newFixedThreadPool(10, groupedThreads("OVSDB-C", "executor-%d", log));
YuanyouZhangebe12612015-08-05 18:19:09 +080094
95 private EventLoopGroup bossGroup;
96 private EventLoopGroup workerGroup;
97 private Class<? extends ServerChannel> serverChannelClass;
98
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070099 private static final int MAX_RETRY = 5;
100 private static final int IDLE_TIMEOUT_SEC = 10;
101
debanshur37cf6ba2018-05-08 20:07:30 +0530102 private ChannelGroup cg;
103
104 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
105
106 protected TlsParams tlsParams = new TlsParams();
107 protected SSLContext sslContext;
108 protected KeyStore keyStore;
109
110 protected static final short MIN_KS_LENGTH = 6;
111 private static final String JAVA_KEY_STORE = "JKS";
112
YuanyouZhangebe12612015-08-05 18:19:09 +0800113 /**
114 * Initialization.
115 */
116 private void initEventLoopGroup() {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700117 bossGroup = new NioEventLoopGroup(0, Tools.groupedThreads("OVSDB-C", "boss-%d", log));
118 workerGroup = new NioEventLoopGroup(0, Tools.groupedThreads("OVSDB-C", "worker-%d", log));
YuanyouZhangebe12612015-08-05 18:19:09 +0800119 serverChannelClass = NioServerSocketChannel.class;
120 }
121
122 /**
123 * Accepts incoming connections.
124 */
125 private void startAcceptingConnections() throws InterruptedException {
debanshur37cf6ba2018-05-08 20:07:30 +0530126 if (cg == null) {
127 return;
128 }
129 final ServerBootstrap b = new ServerBootstrap();
YuanyouZhangebe12612015-08-05 18:19:09 +0800130
131 b.group(bossGroup, workerGroup).channel(serverChannelClass)
debanshur37cf6ba2018-05-08 20:07:30 +0530132 .childHandler(new OvsdbChannelInitializer(this, sslContext));
133 b.option(ChannelOption.SO_REUSEADDR, true);
YuanyouZhangebe12612015-08-05 18:19:09 +0800134 b.option(ChannelOption.SO_BACKLOG, 128);
YuanyouZhangebe12612015-08-05 18:19:09 +0800135 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
debanshur37cf6ba2018-05-08 20:07:30 +0530136 b.childOption(ChannelOption.SO_SNDBUF, Controller.SEND_BUFFER_SIZE);
YuanyouZhangebe12612015-08-05 18:19:09 +0800137 b.childOption(ChannelOption.SO_KEEPALIVE, true);
debanshur37cf6ba2018-05-08 20:07:30 +0530138
139 cg.add(b.bind(ovsdbPort).syncUninterruptibly().channel());
YuanyouZhangebe12612015-08-05 18:19:09 +0800140 }
141
142 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800143 * Tells controller that we're ready to accept OVSDB node loop.
Ray Milkey9b36d812015-09-09 15:24:54 -0700144 * @throws InterruptedException if thread is interrupted
YuanyouZhangebe12612015-08-05 18:19:09 +0800145 */
146 public void run() throws InterruptedException {
147 initEventLoopGroup();
148 startAcceptingConnections();
149 }
150
151 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800152 * Adds channel pipeline to handle a new connected node.
YuanyouZhangebe12612015-08-05 18:19:09 +0800153 */
154 private class OnosCommunicationChannelInitializer
155 extends ChannelInitializer<SocketChannel> {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700156 @Override
YuanyouZhangebe12612015-08-05 18:19:09 +0800157 protected void initChannel(SocketChannel channel) throws Exception {
158 log.info("New channel created");
159 channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
160 channel.pipeline().addLast(new MessageDecoder());
161 handleNewNodeConnection(channel);
162
163 }
164 }
165
166 /**
debanshur37cf6ba2018-05-08 20:07:30 +0530167 * Sets new TLS parameters.
168 *
169 * @param newTlsParams Modified Tls Params
170 * @return true if restart is required
171 */
172 protected boolean setTlsParameters(TlsParams newTlsParams) {
173 TlsParams oldParams = this.tlsParams;
174 this.tlsParams = newTlsParams;
175 return !Objects.equals(this.tlsParams, oldParams); // restart if TLS params change
176 }
177
178 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800179 * Handles the new connection of node.
YuanyouZhangebe12612015-08-05 18:19:09 +0800180 *
181 * @param channel the channel to use.
182 */
debanshur37cf6ba2018-05-08 20:07:30 +0530183 protected void handleNewNodeConnection(final Channel channel) {
kdarapufce5abb2018-05-10 19:37:53 +0530184 executorService.execute(() -> {
185 log.info("Handle new node connection");
YuanyouZhangebe12612015-08-05 18:19:09 +0800186
kdarapufce5abb2018-05-10 19:37:53 +0530187 IpAddress ipAddress = IpAddress
188 .valueOf(((InetSocketAddress) channel.remoteAddress())
189 .getAddress().getHostAddress());
190 long port = ((InetSocketAddress) channel.remoteAddress())
191 .getPort();
YuanyouZhangebe12612015-08-05 18:19:09 +0800192
kdarapufce5abb2018-05-10 19:37:53 +0530193 log.info("Get connection from ip address {} : {}",
194 ipAddress.toString(), port);
YuanyouZhangebe12612015-08-05 18:19:09 +0800195
kdarapufce5abb2018-05-10 19:37:53 +0530196 OvsdbNodeId nodeId = new OvsdbNodeId(ipAddress, port);
197 OvsdbProviderService ovsdbProviderService = getNodeInstance(nodeId,
198 agent,
199 monitorCallback,
200 channel);
201 ovsdbProviderService.setConnection(true);
202 OvsdbJsonRpcHandler ovsdbJsonRpcHandler = new OvsdbJsonRpcHandler(
203 nodeId);
204 ovsdbJsonRpcHandler
205 .setOvsdbProviderService(ovsdbProviderService);
206 channel.pipeline().addLast(ovsdbJsonRpcHandler);
YuanyouZhangebe12612015-08-05 18:19:09 +0800207
kdarapufce5abb2018-05-10 19:37:53 +0530208 ovsdbProviderService.nodeAdded();
209 ChannelFuture closeFuture = channel.closeFuture();
210 closeFuture
211 .addListener(new ChannelConnectionListener(
212 ovsdbProviderService));
YuanyouZhangebe12612015-08-05 18:19:09 +0800213 });
214 }
215
216 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800217 * Gets an OVSDB client instance.
YuanyouZhangebe12612015-08-05 18:19:09 +0800218 *
BitOhenry3e104cd2015-11-10 12:18:33 +0800219 * @param nodeId data OVSDB node id
YuanyouZhangebe12612015-08-05 18:19:09 +0800220 * @param agent OvsdbAgent
221 * @param monitorCallback Callback
222 * @param channel Channel
223 * @return OvsdbProviderService instance
224 */
225 protected OvsdbProviderService getNodeInstance(OvsdbNodeId nodeId,
226 OvsdbAgent agent,
227 Callback monitorCallback,
228 Channel channel) {
229 OvsdbProviderService ovsdbProviderService = new DefaultOvsdbClient(
230 nodeId);
231 ovsdbProviderService.setAgent(agent);
232 ovsdbProviderService.setCallback(monitorCallback);
233 ovsdbProviderService.setChannel(channel);
234 return ovsdbProviderService;
235 }
236
237 /**
238 * Starts controller.
239 *
240 * @param agent OvsdbAgent
241 * @param monitorCallback Callback
Jian Lieaf31032018-05-03 15:54:03 +0900242 * @param mode OVSDB server mode flag
YuanyouZhangebe12612015-08-05 18:19:09 +0800243 */
Jian Lieaf31032018-05-03 15:54:03 +0900244 public void start(OvsdbAgent agent, Callback monitorCallback, boolean mode) {
YuanyouZhangebe12612015-08-05 18:19:09 +0800245 this.agent = agent;
246 this.monitorCallback = monitorCallback;
Jian Lieaf31032018-05-03 15:54:03 +0900247 // if the OVSDB server flag is configured as false, we do NOT listen on 6640 port
248 // therefore, ONOS only runs as an OVSDB client
249 if (mode) {
250 try {
debanshur37cf6ba2018-05-08 20:07:30 +0530251 this.init();
Jian Lieaf31032018-05-03 15:54:03 +0900252 this.run();
253 } catch (InterruptedException e) {
254 log.warn("Interrupted while waiting to start");
255 Thread.currentThread().interrupt();
256 }
YuanyouZhangebe12612015-08-05 18:19:09 +0800257 }
258 }
259
260 /**
261 * Stops controller.
262 *
263 */
264 public void stop() {
debanshur37cf6ba2018-05-08 20:07:30 +0530265 if (cg != null) {
266 cg.close();
267 cg = null;
268
269 workerGroup.shutdownGracefully();
270 bossGroup.shutdownGracefully();
271 // Wait until all threads are terminated.
272 try {
273 bossGroup.terminationFuture().sync();
274 workerGroup.terminationFuture().sync();
275 } catch (InterruptedException e) {
276 log.warn("Interrupted while stopping", e);
277 Thread.currentThread().interrupt();
278 }
279 }
YuanyouZhangebe12612015-08-05 18:19:09 +0800280 }
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700281
282 /**
283 * Connect to the ovsdb server with given ip address and port number.
284 *
285 * @param ip ip address
286 * @param port port number
287 */
288 public void connect(IpAddress ip, TpPort port) {
jaegonkim1af0ae52017-01-01 10:46:55 +0900289 connect(ip, port, e -> log.warn("Connection to the ovsdb {}:{} failed(cause: {})", ip, port, e));
290 }
291
292 /**
293 * Connect to the ovsdb server with given ip address, port number, and failhandler.
294 *
295 * @param ip ip address
296 * @param port port number
297 * @param failhandler connection failure handler
298 */
299 public void connect(IpAddress ip, TpPort port, Consumer<Exception> failhandler) {
300 ChannelFutureListener listener = new ConnectionListener(this, ip, port, failhandler);
301 try {
302 connectRetry(ip, port, listener);
303 } catch (Exception e) {
304 failhandler.accept(e);
305 }
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700306 }
307
308 private void connectRetry(IpAddress ip, TpPort port, ChannelFutureListener listener) {
jaegonkim1af0ae52017-01-01 10:46:55 +0900309 Bootstrap b = new Bootstrap();
310 b.group(workerGroup)
311 .channel(NioSocketChannel.class)
312 .option(ChannelOption.TCP_NODELAY, true)
313 .handler(new ChannelInitializer<SocketChannel>() {
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700314
jaegonkim1af0ae52017-01-01 10:46:55 +0900315 @Override
316 protected void initChannel(SocketChannel channel) throws Exception {
debanshur37cf6ba2018-05-08 20:07:30 +0530317 ChannelPipeline pipeline = channel.pipeline();
318 pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
319 pipeline.addLast(new MessageDecoder());
320 pipeline.addLast(new IdleStateHandler(IDLE_TIMEOUT_SEC, 0, 0));
321 pipeline.addLast(new ConnectionHandler());
jaegonkim1af0ae52017-01-01 10:46:55 +0900322 }
323 });
324 b.remoteAddress(ip.toString(), port.toInt());
325 b.connect().addListener(listener);
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700326 }
327
328 private class ConnectionListener implements ChannelFutureListener {
BitOhenryf006ddd2015-11-17 13:25:41 +0800329 private Controller controller;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700330 private IpAddress ip;
331 private TpPort port;
332 private AtomicInteger count = new AtomicInteger();
jaegonkim1af0ae52017-01-01 10:46:55 +0900333 private Consumer<Exception> failhandler;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700334
BitOhenryf006ddd2015-11-17 13:25:41 +0800335 public ConnectionListener(Controller controller,
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700336 IpAddress ip,
jaegonkim1af0ae52017-01-01 10:46:55 +0900337 TpPort port,
338 Consumer<Exception> failhandler) {
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700339 this.controller = controller;
340 this.ip = ip;
341 this.port = port;
jaegonkim1af0ae52017-01-01 10:46:55 +0900342 this.failhandler = failhandler;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700343 }
344
345 @Override
346 public void operationComplete(ChannelFuture channelFuture) throws Exception {
347 if (!channelFuture.isSuccess()) {
348 channelFuture.channel().close();
349
350 if (count.incrementAndGet() < MAX_RETRY) {
351 final EventLoop loop = channelFuture.channel().eventLoop();
352
353 loop.schedule(() -> {
jaegonkim1af0ae52017-01-01 10:46:55 +0900354 try {
355 controller.connectRetry(this.ip, this.port, this);
356 } catch (Exception e) {
357 log.warn("Connection to the ovsdb server {}:{} failed(cause: {})", ip, port, e);
358 }
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700359 }, 1L, TimeUnit.SECONDS);
360 } else {
jaegonkim1af0ae52017-01-01 10:46:55 +0900361 failhandler.accept(new Exception("max connection retry(" + MAX_RETRY + ") exceeded"));
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700362 }
363 } else {
364 handleNewNodeConnection(channelFuture.channel());
365 }
366 }
367 }
368
369 private class ConnectionHandler extends ChannelDuplexHandler {
370
371 @Override
372 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
373 IdleStateEvent e = (IdleStateEvent) evt;
374
375 if (e.state() == IdleState.READER_IDLE) {
376 ctx.close();
377 }
378 }
379 }
debanshur37cf6ba2018-05-08 20:07:30 +0530380
381 /**
382 * Initialize internal data structures.
383 */
384 public void init() {
385 cg = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
386
387 if (tlsParams.isTlsEnabled()) {
388 initSsl();
389 }
390
391 }
392
393 private void initSsl() {
394 try {
395 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
396 KeyStore ts = KeyStore.getInstance(JAVA_KEY_STORE);
397 ts.load(new FileInputStream(tlsParams.tsLocation), tlsParams.tsPwd());
398 tmFactory.init(ts);
399
400 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
401 keyStore = KeyStore.getInstance(JAVA_KEY_STORE);
402 keyStore.load(new FileInputStream(tlsParams.ksLocation), tlsParams.ksPwd());
403 kmf.init(keyStore, tlsParams.ksPwd());
404
405 sslContext = SSLContext.getInstance("TLS");
406 sslContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
407 } catch (NoSuchAlgorithmException | KeyStoreException | CertificateException |
408 IOException | KeyManagementException | UnrecoverableKeyException ex) {
409 log.error("SSL init failed: {}", ex.getMessage());
410 }
411 }
YuanyouZhangebe12612015-08-05 18:19:09 +0800412}