blob: 075823275f1dd58549cbe149d6f8f9e29f097c6a [file] [log] [blame]
YuanyouZhangebe12612015-08-05 18:19:09 +08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.ovsdb.controller.impl;
17
18import io.netty.bootstrap.ServerBootstrap;
19import io.netty.buffer.PooledByteBufAllocator;
20import io.netty.channel.Channel;
21import io.netty.channel.ChannelFuture;
22import io.netty.channel.ChannelInitializer;
23import io.netty.channel.ChannelOption;
24import io.netty.channel.EventLoopGroup;
25import io.netty.channel.ServerChannel;
26import io.netty.channel.nio.NioEventLoopGroup;
27import io.netty.channel.socket.SocketChannel;
28import io.netty.channel.socket.nio.NioServerSocketChannel;
29import io.netty.handler.codec.string.StringEncoder;
30import io.netty.util.CharsetUtil;
31
32import java.net.InetSocketAddress;
33import java.util.concurrent.ExecutorService;
34import java.util.concurrent.Executors;
35
36import org.onlab.packet.IpAddress;
37import org.onosproject.ovsdb.controller.OvsdbConstant;
38import org.onosproject.ovsdb.controller.OvsdbNodeId;
39import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient;
40import org.onosproject.ovsdb.controller.driver.OvsdbAgent;
41import org.onosproject.ovsdb.controller.driver.OvsdbProviderService;
42import org.onosproject.ovsdb.rfc.jsonrpc.Callback;
43import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
46/**
47 * The main controller class. Handles all setup and network listeners -
48 * Distributed ovsdbClient.
49 */
50public class Controller {
51 protected static final Logger log = LoggerFactory
52 .getLogger(Controller.class);
53
54 private int ovsdbPort = OvsdbConstant.OVSDBPORT;
55
56 private OvsdbAgent agent;
57 private Callback monitorCallback;
58
59 private final ExecutorService executorService = Executors
60 .newFixedThreadPool(10);
61
62 private EventLoopGroup bossGroup;
63 private EventLoopGroup workerGroup;
64 private Class<? extends ServerChannel> serverChannelClass;
65
66 /**
67 * Initialization.
68 */
69 private void initEventLoopGroup() {
70 bossGroup = new NioEventLoopGroup();
71 workerGroup = new NioEventLoopGroup();
72 serverChannelClass = NioServerSocketChannel.class;
73 }
74
75 /**
76 * Accepts incoming connections.
77 */
78 private void startAcceptingConnections() throws InterruptedException {
79 ServerBootstrap b = new ServerBootstrap();
80
81 b.group(bossGroup, workerGroup).channel(serverChannelClass)
82 .childHandler(new OnosCommunicationChannelInitializer());
83 b.option(ChannelOption.SO_BACKLOG, 128);
84 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
85 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
86 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
87 b.childOption(ChannelOption.SO_KEEPALIVE, true);
jiangrui62fb79b2015-08-21 15:37:00 +080088 b.bind(ovsdbPort).sync();
YuanyouZhangebe12612015-08-05 18:19:09 +080089 }
90
91 /**
92 * Tells controller that we're ready to accept ovsdb node loop.
Ray Milkey9b36d812015-09-09 15:24:54 -070093 * @throws InterruptedException if thread is interrupted
YuanyouZhangebe12612015-08-05 18:19:09 +080094 */
95 public void run() throws InterruptedException {
96 initEventLoopGroup();
97 startAcceptingConnections();
98 }
99
100 /**
101 * Adds channel pipiline to handle a new connected node.
102 */
103 private class OnosCommunicationChannelInitializer
104 extends ChannelInitializer<SocketChannel> {
105 protected void initChannel(SocketChannel channel) throws Exception {
106 log.info("New channel created");
107 channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
108 channel.pipeline().addLast(new MessageDecoder());
109 handleNewNodeConnection(channel);
110
111 }
112 }
113
114 /**
115 * Handles the new connection of a node.
116 *
117 * @param channel the channel to use.
118 */
119 private void handleNewNodeConnection(final Channel channel) {
120 executorService.execute(new Runnable() {
121 @Override
122 public void run() {
123 log.info("Handle new node connection");
124
125 IpAddress ipAddress = IpAddress
126 .valueOf(((InetSocketAddress) channel.remoteAddress())
127 .getAddress().getHostAddress());
128 long port = ((InetSocketAddress) channel.remoteAddress())
129 .getPort();
130
131 log.info("Get connection from ip address {} : {}",
132 ipAddress.toString(), port);
133
134 OvsdbNodeId nodeId = new OvsdbNodeId(ipAddress, port);
135 OvsdbProviderService ovsdbProviderService = getNodeInstance(nodeId,
136 agent,
137 monitorCallback,
138 channel);
139 ovsdbProviderService.setConnection(true);
140 OvsdbJsonRpcHandler ovsdbJsonRpcHandler = new OvsdbJsonRpcHandler(
141 nodeId);
142 ovsdbJsonRpcHandler
143 .setOvsdbProviderService(ovsdbProviderService);
144 channel.pipeline().addLast(ovsdbJsonRpcHandler);
145
146 ovsdbProviderService.nodeAdded();
147 ChannelFuture closeFuture = channel.closeFuture();
148 closeFuture
149 .addListener(new ChannelConnectionListener(
150 ovsdbProviderService));
151 }
152 });
153 }
154
155 /**
156 * Gets an ovsdb client instance.
157 *
158 * @param nodeId data ovsdb node id
159 * @param agent OvsdbAgent
160 * @param monitorCallback Callback
161 * @param channel Channel
162 * @return OvsdbProviderService instance
163 */
164 protected OvsdbProviderService getNodeInstance(OvsdbNodeId nodeId,
165 OvsdbAgent agent,
166 Callback monitorCallback,
167 Channel channel) {
168 OvsdbProviderService ovsdbProviderService = new DefaultOvsdbClient(
169 nodeId);
170 ovsdbProviderService.setAgent(agent);
171 ovsdbProviderService.setCallback(monitorCallback);
172 ovsdbProviderService.setChannel(channel);
173 return ovsdbProviderService;
174 }
175
176 /**
177 * Starts controller.
178 *
179 * @param agent OvsdbAgent
180 * @param monitorCallback Callback
181 */
182 public void start(OvsdbAgent agent, Callback monitorCallback) {
183 this.agent = agent;
184 this.monitorCallback = monitorCallback;
185 try {
186 this.run();
187 } catch (InterruptedException e) {
188 log.warn("Interrupted while waiting to start");
189 Thread.currentThread().interrupt();
190 }
191 }
192
193 /**
194 * Stops controller.
195 *
196 */
197 public void stop() {
198 workerGroup.shutdownGracefully();
199 bossGroup.shutdownGracefully();
200 }
201}