blob: cd4d5147e2d2bb86be7480a453711dd4bbab2599 [file] [log] [blame]
YuanyouZhangebe12612015-08-05 18:19:09 +08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.ovsdb.controller.impl;
17
18import io.netty.bootstrap.ServerBootstrap;
19import io.netty.buffer.PooledByteBufAllocator;
20import io.netty.channel.Channel;
21import io.netty.channel.ChannelFuture;
22import io.netty.channel.ChannelInitializer;
23import io.netty.channel.ChannelOption;
24import io.netty.channel.EventLoopGroup;
25import io.netty.channel.ServerChannel;
26import io.netty.channel.nio.NioEventLoopGroup;
27import io.netty.channel.socket.SocketChannel;
28import io.netty.channel.socket.nio.NioServerSocketChannel;
29import io.netty.handler.codec.string.StringEncoder;
30import io.netty.util.CharsetUtil;
31
32import java.net.InetSocketAddress;
33import java.util.concurrent.ExecutorService;
34import java.util.concurrent.Executors;
35
36import org.onlab.packet.IpAddress;
37import org.onosproject.ovsdb.controller.OvsdbConstant;
38import org.onosproject.ovsdb.controller.OvsdbNodeId;
39import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient;
40import org.onosproject.ovsdb.controller.driver.OvsdbAgent;
41import org.onosproject.ovsdb.controller.driver.OvsdbProviderService;
42import org.onosproject.ovsdb.rfc.jsonrpc.Callback;
43import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
46/**
47 * The main controller class. Handles all setup and network listeners -
48 * Distributed ovsdbClient.
49 */
50public class Controller {
51 protected static final Logger log = LoggerFactory
52 .getLogger(Controller.class);
53
54 private int ovsdbPort = OvsdbConstant.OVSDBPORT;
55
56 private OvsdbAgent agent;
57 private Callback monitorCallback;
58
59 private final ExecutorService executorService = Executors
60 .newFixedThreadPool(10);
61
62 private EventLoopGroup bossGroup;
63 private EventLoopGroup workerGroup;
64 private Class<? extends ServerChannel> serverChannelClass;
65
66 /**
67 * Initialization.
68 */
69 private void initEventLoopGroup() {
70 bossGroup = new NioEventLoopGroup();
71 workerGroup = new NioEventLoopGroup();
72 serverChannelClass = NioServerSocketChannel.class;
73 }
74
75 /**
76 * Accepts incoming connections.
77 */
78 private void startAcceptingConnections() throws InterruptedException {
79 ServerBootstrap b = new ServerBootstrap();
80
81 b.group(bossGroup, workerGroup).channel(serverChannelClass)
82 .childHandler(new OnosCommunicationChannelInitializer());
83 b.option(ChannelOption.SO_BACKLOG, 128);
84 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
85 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
86 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
87 b.childOption(ChannelOption.SO_KEEPALIVE, true);
jiangrui62fb79b2015-08-21 15:37:00 +080088 b.bind(ovsdbPort).sync();
YuanyouZhangebe12612015-08-05 18:19:09 +080089 }
90
91 /**
92 * Tells controller that we're ready to accept ovsdb node loop.
93 */
94 public void run() throws InterruptedException {
95 initEventLoopGroup();
96 startAcceptingConnections();
97 }
98
99 /**
100 * Adds channel pipiline to handle a new connected node.
101 */
102 private class OnosCommunicationChannelInitializer
103 extends ChannelInitializer<SocketChannel> {
104 protected void initChannel(SocketChannel channel) throws Exception {
105 log.info("New channel created");
106 channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
107 channel.pipeline().addLast(new MessageDecoder());
108 handleNewNodeConnection(channel);
109
110 }
111 }
112
113 /**
114 * Handles the new connection of a node.
115 *
116 * @param channel the channel to use.
117 */
118 private void handleNewNodeConnection(final Channel channel) {
119 executorService.execute(new Runnable() {
120 @Override
121 public void run() {
122 log.info("Handle new node connection");
123
124 IpAddress ipAddress = IpAddress
125 .valueOf(((InetSocketAddress) channel.remoteAddress())
126 .getAddress().getHostAddress());
127 long port = ((InetSocketAddress) channel.remoteAddress())
128 .getPort();
129
130 log.info("Get connection from ip address {} : {}",
131 ipAddress.toString(), port);
132
133 OvsdbNodeId nodeId = new OvsdbNodeId(ipAddress, port);
134 OvsdbProviderService ovsdbProviderService = getNodeInstance(nodeId,
135 agent,
136 monitorCallback,
137 channel);
138 ovsdbProviderService.setConnection(true);
139 OvsdbJsonRpcHandler ovsdbJsonRpcHandler = new OvsdbJsonRpcHandler(
140 nodeId);
141 ovsdbJsonRpcHandler
142 .setOvsdbProviderService(ovsdbProviderService);
143 channel.pipeline().addLast(ovsdbJsonRpcHandler);
144
145 ovsdbProviderService.nodeAdded();
146 ChannelFuture closeFuture = channel.closeFuture();
147 closeFuture
148 .addListener(new ChannelConnectionListener(
149 ovsdbProviderService));
150 }
151 });
152 }
153
154 /**
155 * Gets an ovsdb client instance.
156 *
157 * @param nodeId data ovsdb node id
158 * @param agent OvsdbAgent
159 * @param monitorCallback Callback
160 * @param channel Channel
161 * @return OvsdbProviderService instance
162 */
163 protected OvsdbProviderService getNodeInstance(OvsdbNodeId nodeId,
164 OvsdbAgent agent,
165 Callback monitorCallback,
166 Channel channel) {
167 OvsdbProviderService ovsdbProviderService = new DefaultOvsdbClient(
168 nodeId);
169 ovsdbProviderService.setAgent(agent);
170 ovsdbProviderService.setCallback(monitorCallback);
171 ovsdbProviderService.setChannel(channel);
172 return ovsdbProviderService;
173 }
174
175 /**
176 * Starts controller.
177 *
178 * @param agent OvsdbAgent
179 * @param monitorCallback Callback
180 */
181 public void start(OvsdbAgent agent, Callback monitorCallback) {
182 this.agent = agent;
183 this.monitorCallback = monitorCallback;
184 try {
185 this.run();
186 } catch (InterruptedException e) {
187 log.warn("Interrupted while waiting to start");
188 Thread.currentThread().interrupt();
189 }
190 }
191
192 /**
193 * Stops controller.
194 *
195 */
196 public void stop() {
197 workerGroup.shutdownGracefully();
198 bossGroup.shutdownGracefully();
199 }
200}