blob: c4616be89dec2b076fef2e19f6aae88ef66b3a4a [file] [log] [blame]
YuanyouZhangebe12612015-08-05 18:19:09 +08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
YuanyouZhangebe12612015-08-05 18:19:09 +08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.ovsdb.controller.impl;
17
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070018import io.netty.bootstrap.Bootstrap;
YuanyouZhangebe12612015-08-05 18:19:09 +080019import io.netty.bootstrap.ServerBootstrap;
20import io.netty.buffer.PooledByteBufAllocator;
21import io.netty.channel.Channel;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070022import io.netty.channel.ChannelDuplexHandler;
YuanyouZhangebe12612015-08-05 18:19:09 +080023import io.netty.channel.ChannelFuture;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070024import io.netty.channel.ChannelFutureListener;
25import io.netty.channel.ChannelHandlerContext;
YuanyouZhangebe12612015-08-05 18:19:09 +080026import io.netty.channel.ChannelInitializer;
27import io.netty.channel.ChannelOption;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070028import io.netty.channel.ChannelPipeline;
29import io.netty.channel.EventLoop;
YuanyouZhangebe12612015-08-05 18:19:09 +080030import io.netty.channel.EventLoopGroup;
31import io.netty.channel.ServerChannel;
debanshur37cf6ba2018-05-08 20:07:30 +053032import io.netty.channel.group.ChannelGroup;
33import io.netty.channel.group.DefaultChannelGroup;
YuanyouZhangebe12612015-08-05 18:19:09 +080034import io.netty.channel.nio.NioEventLoopGroup;
35import io.netty.channel.socket.SocketChannel;
36import io.netty.channel.socket.nio.NioServerSocketChannel;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070037import io.netty.channel.socket.nio.NioSocketChannel;
YuanyouZhangebe12612015-08-05 18:19:09 +080038import io.netty.handler.codec.string.StringEncoder;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070039import io.netty.handler.timeout.IdleState;
40import io.netty.handler.timeout.IdleStateEvent;
41import io.netty.handler.timeout.IdleStateHandler;
YuanyouZhangebe12612015-08-05 18:19:09 +080042import io.netty.util.CharsetUtil;
43
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070044import static org.onlab.util.Tools.groupedThreads;
45
debanshur37cf6ba2018-05-08 20:07:30 +053046import java.io.FileInputStream;
47import java.io.IOException;
YuanyouZhangebe12612015-08-05 18:19:09 +080048import java.net.InetSocketAddress;
debanshur37cf6ba2018-05-08 20:07:30 +053049import java.security.KeyManagementException;
50import java.security.KeyStore;
51import java.security.KeyStoreException;
52import java.security.NoSuchAlgorithmException;
53import java.security.UnrecoverableKeyException;
54import java.security.cert.CertificateException;
55import java.util.Objects;
YuanyouZhangebe12612015-08-05 18:19:09 +080056import java.util.concurrent.ExecutorService;
57import java.util.concurrent.Executors;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070058import java.util.concurrent.TimeUnit;
59import java.util.concurrent.atomic.AtomicInteger;
jaegonkim1af0ae52017-01-01 10:46:55 +090060import java.util.function.Consumer;
YuanyouZhangebe12612015-08-05 18:19:09 +080061
debanshur37cf6ba2018-05-08 20:07:30 +053062import io.netty.util.concurrent.GlobalEventExecutor;
YuanyouZhangebe12612015-08-05 18:19:09 +080063import org.onlab.packet.IpAddress;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070064import org.onlab.packet.TpPort;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070065import org.onlab.util.Tools;
YuanyouZhangebe12612015-08-05 18:19:09 +080066import org.onosproject.ovsdb.controller.OvsdbConstant;
67import org.onosproject.ovsdb.controller.OvsdbNodeId;
68import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient;
69import org.onosproject.ovsdb.controller.driver.OvsdbAgent;
70import org.onosproject.ovsdb.controller.driver.OvsdbProviderService;
71import org.onosproject.ovsdb.rfc.jsonrpc.Callback;
72import org.slf4j.Logger;
73import org.slf4j.LoggerFactory;
74
debanshur37cf6ba2018-05-08 20:07:30 +053075import javax.net.ssl.KeyManagerFactory;
76import javax.net.ssl.SSLContext;
77import javax.net.ssl.TrustManagerFactory;
78
YuanyouZhangebe12612015-08-05 18:19:09 +080079/**
80 * The main controller class. Handles all setup and network listeners -
BitOhenry3e104cd2015-11-10 12:18:33 +080081 * distributed OVSDBClient.
YuanyouZhangebe12612015-08-05 18:19:09 +080082 */
BitOhenryf006ddd2015-11-17 13:25:41 +080083public class Controller {
Ray Milkey9c9cde42018-01-12 14:22:06 -080084 private static final Logger log = LoggerFactory
BitOhenryf006ddd2015-11-17 13:25:41 +080085 .getLogger(Controller.class);
YuanyouZhangebe12612015-08-05 18:19:09 +080086
87 private int ovsdbPort = OvsdbConstant.OVSDBPORT;
88
89 private OvsdbAgent agent;
90 private Callback monitorCallback;
91
92 private final ExecutorService executorService = Executors
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070093 .newFixedThreadPool(10, groupedThreads("OVSDB-C", "executor-%d", log));
YuanyouZhangebe12612015-08-05 18:19:09 +080094
95 private EventLoopGroup bossGroup;
96 private EventLoopGroup workerGroup;
97 private Class<? extends ServerChannel> serverChannelClass;
98
Hyunsun Moon5fb20a52015-09-25 17:02:33 -070099 private static final int MAX_RETRY = 5;
100 private static final int IDLE_TIMEOUT_SEC = 10;
101
debanshur37cf6ba2018-05-08 20:07:30 +0530102 private ChannelGroup cg;
103
104 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
105
106 protected TlsParams tlsParams = new TlsParams();
107 protected SSLContext sslContext;
108 protected KeyStore keyStore;
109
110 protected static final short MIN_KS_LENGTH = 6;
111 private static final String JAVA_KEY_STORE = "JKS";
112
YuanyouZhangebe12612015-08-05 18:19:09 +0800113 /**
114 * Initialization.
115 */
116 private void initEventLoopGroup() {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700117 bossGroup = new NioEventLoopGroup(0, Tools.groupedThreads("OVSDB-C", "boss-%d", log));
118 workerGroup = new NioEventLoopGroup(0, Tools.groupedThreads("OVSDB-C", "worker-%d", log));
YuanyouZhangebe12612015-08-05 18:19:09 +0800119 serverChannelClass = NioServerSocketChannel.class;
120 }
121
122 /**
123 * Accepts incoming connections.
124 */
125 private void startAcceptingConnections() throws InterruptedException {
debanshur37cf6ba2018-05-08 20:07:30 +0530126 if (cg == null) {
127 return;
128 }
129 final ServerBootstrap b = new ServerBootstrap();
YuanyouZhangebe12612015-08-05 18:19:09 +0800130
131 b.group(bossGroup, workerGroup).channel(serverChannelClass)
debanshur37cf6ba2018-05-08 20:07:30 +0530132 .childHandler(new OvsdbChannelInitializer(this, sslContext));
133 b.option(ChannelOption.SO_REUSEADDR, true);
YuanyouZhangebe12612015-08-05 18:19:09 +0800134 b.option(ChannelOption.SO_BACKLOG, 128);
YuanyouZhangebe12612015-08-05 18:19:09 +0800135 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
debanshur37cf6ba2018-05-08 20:07:30 +0530136 b.childOption(ChannelOption.SO_SNDBUF, Controller.SEND_BUFFER_SIZE);
YuanyouZhangebe12612015-08-05 18:19:09 +0800137 b.childOption(ChannelOption.SO_KEEPALIVE, true);
debanshur37cf6ba2018-05-08 20:07:30 +0530138
139 cg.add(b.bind(ovsdbPort).syncUninterruptibly().channel());
YuanyouZhangebe12612015-08-05 18:19:09 +0800140 }
141
142 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800143 * Tells controller that we're ready to accept OVSDB node loop.
Ray Milkey9b36d812015-09-09 15:24:54 -0700144 * @throws InterruptedException if thread is interrupted
YuanyouZhangebe12612015-08-05 18:19:09 +0800145 */
146 public void run() throws InterruptedException {
147 initEventLoopGroup();
148 startAcceptingConnections();
149 }
150
151 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800152 * Adds channel pipeline to handle a new connected node.
YuanyouZhangebe12612015-08-05 18:19:09 +0800153 */
154 private class OnosCommunicationChannelInitializer
155 extends ChannelInitializer<SocketChannel> {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700156 @Override
YuanyouZhangebe12612015-08-05 18:19:09 +0800157 protected void initChannel(SocketChannel channel) throws Exception {
158 log.info("New channel created");
159 channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
160 channel.pipeline().addLast(new MessageDecoder());
161 handleNewNodeConnection(channel);
162
163 }
164 }
165
166 /**
debanshur37cf6ba2018-05-08 20:07:30 +0530167 * Sets new TLS parameters.
168 *
169 * @param newTlsParams Modified Tls Params
170 * @return true if restart is required
171 */
172 protected boolean setTlsParameters(TlsParams newTlsParams) {
173 TlsParams oldParams = this.tlsParams;
174 this.tlsParams = newTlsParams;
175 return !Objects.equals(this.tlsParams, oldParams); // restart if TLS params change
176 }
177
178 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800179 * Handles the new connection of node.
YuanyouZhangebe12612015-08-05 18:19:09 +0800180 *
181 * @param channel the channel to use.
182 */
debanshur37cf6ba2018-05-08 20:07:30 +0530183 protected void handleNewNodeConnection(final Channel channel) {
kdarapufce5abb2018-05-10 19:37:53 +0530184 executorService.execute(() -> {
185 log.info("Handle new node connection");
YuanyouZhangebe12612015-08-05 18:19:09 +0800186
kdarapufce5abb2018-05-10 19:37:53 +0530187 IpAddress ipAddress = IpAddress
188 .valueOf(((InetSocketAddress) channel.remoteAddress())
189 .getAddress().getHostAddress());
190 long port = ((InetSocketAddress) channel.remoteAddress())
191 .getPort();
YuanyouZhangebe12612015-08-05 18:19:09 +0800192
kdarapufce5abb2018-05-10 19:37:53 +0530193 log.info("Get connection from ip address {} : {}",
194 ipAddress.toString(), port);
YuanyouZhangebe12612015-08-05 18:19:09 +0800195
kdarapufce5abb2018-05-10 19:37:53 +0530196 OvsdbNodeId nodeId = new OvsdbNodeId(ipAddress, port);
197 OvsdbProviderService ovsdbProviderService = getNodeInstance(nodeId,
198 agent,
199 monitorCallback,
200 channel);
201 ovsdbProviderService.setConnection(true);
202 OvsdbJsonRpcHandler ovsdbJsonRpcHandler = new OvsdbJsonRpcHandler(
203 nodeId);
204 ovsdbJsonRpcHandler
205 .setOvsdbProviderService(ovsdbProviderService);
206 channel.pipeline().addLast(ovsdbJsonRpcHandler);
YuanyouZhangebe12612015-08-05 18:19:09 +0800207
kdarapufce5abb2018-05-10 19:37:53 +0530208 ovsdbProviderService.nodeAdded();
209 ChannelFuture closeFuture = channel.closeFuture();
210 closeFuture
211 .addListener(new ChannelConnectionListener(
212 ovsdbProviderService));
YuanyouZhangebe12612015-08-05 18:19:09 +0800213 });
214 }
215
216 /**
BitOhenry3e104cd2015-11-10 12:18:33 +0800217 * Gets an OVSDB client instance.
YuanyouZhangebe12612015-08-05 18:19:09 +0800218 *
BitOhenry3e104cd2015-11-10 12:18:33 +0800219 * @param nodeId data OVSDB node id
YuanyouZhangebe12612015-08-05 18:19:09 +0800220 * @param agent OvsdbAgent
221 * @param monitorCallback Callback
222 * @param channel Channel
223 * @return OvsdbProviderService instance
224 */
225 protected OvsdbProviderService getNodeInstance(OvsdbNodeId nodeId,
226 OvsdbAgent agent,
227 Callback monitorCallback,
228 Channel channel) {
229 OvsdbProviderService ovsdbProviderService = new DefaultOvsdbClient(
230 nodeId);
231 ovsdbProviderService.setAgent(agent);
232 ovsdbProviderService.setCallback(monitorCallback);
233 ovsdbProviderService.setChannel(channel);
234 return ovsdbProviderService;
235 }
236
237 /**
238 * Starts controller.
239 *
240 * @param agent OvsdbAgent
241 * @param monitorCallback Callback
Jian Lieaf31032018-05-03 15:54:03 +0900242 * @param mode OVSDB server mode flag
YuanyouZhangebe12612015-08-05 18:19:09 +0800243 */
Jian Lieaf31032018-05-03 15:54:03 +0900244 public void start(OvsdbAgent agent, Callback monitorCallback, boolean mode) {
YuanyouZhangebe12612015-08-05 18:19:09 +0800245 this.agent = agent;
246 this.monitorCallback = monitorCallback;
Jian Lieaf31032018-05-03 15:54:03 +0900247 // if the OVSDB server flag is configured as false, we do NOT listen on 6640 port
248 // therefore, ONOS only runs as an OVSDB client
249 if (mode) {
250 try {
debanshur37cf6ba2018-05-08 20:07:30 +0530251 this.init();
Jian Lieaf31032018-05-03 15:54:03 +0900252 this.run();
253 } catch (InterruptedException e) {
254 log.warn("Interrupted while waiting to start");
255 Thread.currentThread().interrupt();
256 }
Miguel Borges de Freitas3daf7ce2021-10-13 10:39:54 +0100257 } else {
258 initEventLoopGroup();
YuanyouZhangebe12612015-08-05 18:19:09 +0800259 }
260 }
261
262 /**
263 * Stops controller.
264 *
265 */
266 public void stop() {
debanshur37cf6ba2018-05-08 20:07:30 +0530267 if (cg != null) {
268 cg.close();
269 cg = null;
270
271 workerGroup.shutdownGracefully();
272 bossGroup.shutdownGracefully();
273 // Wait until all threads are terminated.
274 try {
275 bossGroup.terminationFuture().sync();
276 workerGroup.terminationFuture().sync();
277 } catch (InterruptedException e) {
278 log.warn("Interrupted while stopping", e);
279 Thread.currentThread().interrupt();
280 }
281 }
YuanyouZhangebe12612015-08-05 18:19:09 +0800282 }
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700283
284 /**
285 * Connect to the ovsdb server with given ip address and port number.
286 *
287 * @param ip ip address
288 * @param port port number
289 */
290 public void connect(IpAddress ip, TpPort port) {
jaegonkim1af0ae52017-01-01 10:46:55 +0900291 connect(ip, port, e -> log.warn("Connection to the ovsdb {}:{} failed(cause: {})", ip, port, e));
292 }
293
294 /**
295 * Connect to the ovsdb server with given ip address, port number, and failhandler.
296 *
297 * @param ip ip address
298 * @param port port number
299 * @param failhandler connection failure handler
300 */
301 public void connect(IpAddress ip, TpPort port, Consumer<Exception> failhandler) {
302 ChannelFutureListener listener = new ConnectionListener(this, ip, port, failhandler);
303 try {
304 connectRetry(ip, port, listener);
305 } catch (Exception e) {
306 failhandler.accept(e);
307 }
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700308 }
309
310 private void connectRetry(IpAddress ip, TpPort port, ChannelFutureListener listener) {
jaegonkim1af0ae52017-01-01 10:46:55 +0900311 Bootstrap b = new Bootstrap();
312 b.group(workerGroup)
313 .channel(NioSocketChannel.class)
314 .option(ChannelOption.TCP_NODELAY, true)
315 .handler(new ChannelInitializer<SocketChannel>() {
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700316
jaegonkim1af0ae52017-01-01 10:46:55 +0900317 @Override
318 protected void initChannel(SocketChannel channel) throws Exception {
debanshur37cf6ba2018-05-08 20:07:30 +0530319 ChannelPipeline pipeline = channel.pipeline();
320 pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
321 pipeline.addLast(new MessageDecoder());
322 pipeline.addLast(new IdleStateHandler(IDLE_TIMEOUT_SEC, 0, 0));
323 pipeline.addLast(new ConnectionHandler());
jaegonkim1af0ae52017-01-01 10:46:55 +0900324 }
325 });
326 b.remoteAddress(ip.toString(), port.toInt());
327 b.connect().addListener(listener);
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700328 }
329
330 private class ConnectionListener implements ChannelFutureListener {
BitOhenryf006ddd2015-11-17 13:25:41 +0800331 private Controller controller;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700332 private IpAddress ip;
333 private TpPort port;
334 private AtomicInteger count = new AtomicInteger();
jaegonkim1af0ae52017-01-01 10:46:55 +0900335 private Consumer<Exception> failhandler;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700336
BitOhenryf006ddd2015-11-17 13:25:41 +0800337 public ConnectionListener(Controller controller,
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700338 IpAddress ip,
jaegonkim1af0ae52017-01-01 10:46:55 +0900339 TpPort port,
340 Consumer<Exception> failhandler) {
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700341 this.controller = controller;
342 this.ip = ip;
343 this.port = port;
jaegonkim1af0ae52017-01-01 10:46:55 +0900344 this.failhandler = failhandler;
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700345 }
346
347 @Override
348 public void operationComplete(ChannelFuture channelFuture) throws Exception {
349 if (!channelFuture.isSuccess()) {
350 channelFuture.channel().close();
351
352 if (count.incrementAndGet() < MAX_RETRY) {
353 final EventLoop loop = channelFuture.channel().eventLoop();
354
355 loop.schedule(() -> {
jaegonkim1af0ae52017-01-01 10:46:55 +0900356 try {
357 controller.connectRetry(this.ip, this.port, this);
358 } catch (Exception e) {
359 log.warn("Connection to the ovsdb server {}:{} failed(cause: {})", ip, port, e);
360 }
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700361 }, 1L, TimeUnit.SECONDS);
362 } else {
jaegonkim1af0ae52017-01-01 10:46:55 +0900363 failhandler.accept(new Exception("max connection retry(" + MAX_RETRY + ") exceeded"));
Hyunsun Moon5fb20a52015-09-25 17:02:33 -0700364 }
365 } else {
366 handleNewNodeConnection(channelFuture.channel());
367 }
368 }
369 }
370
371 private class ConnectionHandler extends ChannelDuplexHandler {
372
373 @Override
374 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
375 IdleStateEvent e = (IdleStateEvent) evt;
376
377 if (e.state() == IdleState.READER_IDLE) {
378 ctx.close();
379 }
380 }
381 }
debanshur37cf6ba2018-05-08 20:07:30 +0530382
383 /**
384 * Initialize internal data structures.
385 */
386 public void init() {
387 cg = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
388
389 if (tlsParams.isTlsEnabled()) {
390 initSsl();
391 }
392
393 }
394
395 private void initSsl() {
396 try {
397 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
398 KeyStore ts = KeyStore.getInstance(JAVA_KEY_STORE);
399 ts.load(new FileInputStream(tlsParams.tsLocation), tlsParams.tsPwd());
400 tmFactory.init(ts);
401
402 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
403 keyStore = KeyStore.getInstance(JAVA_KEY_STORE);
404 keyStore.load(new FileInputStream(tlsParams.ksLocation), tlsParams.ksPwd());
405 kmf.init(keyStore, tlsParams.ksPwd());
406
407 sslContext = SSLContext.getInstance("TLS");
408 sslContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
409 } catch (NoSuchAlgorithmException | KeyStoreException | CertificateException |
410 IOException | KeyManagementException | UnrecoverableKeyException ex) {
411 log.error("SSL init failed: {}", ex.getMessage());
412 }
413 }
YuanyouZhangebe12612015-08-05 18:19:09 +0800414}