blob: c0f74faa708c7a1b6b5f58732832118ad63a59de [file] [log] [blame]
tomcb43d602014-09-28 22:46:16 -07001package org.onlab.onos.ccc;
2
3import com.google.common.collect.ImmutableSet;
4import org.apache.felix.scr.annotations.Activate;
5import org.apache.felix.scr.annotations.Component;
6import org.apache.felix.scr.annotations.Deactivate;
7import org.apache.felix.scr.annotations.Service;
8import org.onlab.nio.AcceptorLoop;
9import org.onlab.nio.IOLoop;
10import org.onlab.nio.MessageStream;
11import org.onlab.onos.cluster.ClusterEvent;
12import org.onlab.onos.cluster.ClusterStore;
13import org.onlab.onos.cluster.ClusterStoreDelegate;
14import org.onlab.onos.cluster.ControllerNode;
15import org.onlab.onos.cluster.DefaultControllerNode;
16import org.onlab.onos.cluster.NodeId;
17import org.onlab.onos.store.AbstractStore;
18import org.onlab.packet.IpPrefix;
19import org.slf4j.Logger;
20import org.slf4j.LoggerFactory;
21
22import java.io.IOException;
23import java.net.InetSocketAddress;
tom16555622014-09-29 08:49:27 -070024import java.net.Socket;
25import java.net.SocketAddress;
tomcb43d602014-09-28 22:46:16 -070026import java.nio.channels.ByteChannel;
27import java.nio.channels.ServerSocketChannel;
tom16555622014-09-29 08:49:27 -070028import java.nio.channels.SocketChannel;
tomcb43d602014-09-28 22:46:16 -070029import java.util.ArrayList;
30import java.util.List;
31import java.util.Map;
32import java.util.Set;
tom16555622014-09-29 08:49:27 -070033import java.util.Timer;
34import java.util.TimerTask;
tomcb43d602014-09-28 22:46:16 -070035import java.util.concurrent.ConcurrentHashMap;
36import java.util.concurrent.ExecutorService;
37import java.util.concurrent.Executors;
38
39import static java.net.InetAddress.getByAddress;
40import static org.onlab.onos.cluster.ControllerNode.State;
41import static org.onlab.packet.IpPrefix.valueOf;
42import static org.onlab.util.Tools.namedThreads;
43
44/**
45 * Distributed implementation of the cluster nodes store.
46 */
47@Component(immediate = true)
48@Service
49public class DistributedClusterStore
50 extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
51 implements ClusterStore {
52
53 private final Logger log = LoggerFactory.getLogger(getClass());
54
tom16555622014-09-29 08:49:27 -070055 private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
56 private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
57
tomcb43d602014-09-28 22:46:16 -070058 private static final long SELECT_TIMEOUT = 50;
59 private static final int WORKERS = 3;
tom16555622014-09-29 08:49:27 -070060 private static final int INITIATORS = 2;
tomcb43d602014-09-28 22:46:16 -070061 private static final int COMM_BUFFER_SIZE = 16 * 1024;
62 private static final int COMM_IDLE_TIME = 500;
63
tom16555622014-09-29 08:49:27 -070064 private static final boolean SO_NO_DELAY = false;
65 private static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
66 private static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
67
tomcb43d602014-09-28 22:46:16 -070068 private DefaultControllerNode self;
69 private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
70 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
tom16555622014-09-29 08:49:27 -070071 private final Map<NodeId, TLVMessageStream> streams = new ConcurrentHashMap<>();
72 private final Map<SocketChannel, DefaultControllerNode> nodesByChannel = new ConcurrentHashMap<>();
tomcb43d602014-09-28 22:46:16 -070073
74 private final ExecutorService listenExecutor =
tom16555622014-09-29 08:49:27 -070075 Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
tomcb43d602014-09-28 22:46:16 -070076 private final ExecutorService commExecutors =
tom16555622014-09-29 08:49:27 -070077 Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
tomcb43d602014-09-28 22:46:16 -070078 private final ExecutorService heartbeatExecutor =
tom16555622014-09-29 08:49:27 -070079 Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
80 private final ExecutorService initiatorExecutors =
81 Executors.newFixedThreadPool(INITIATORS, namedThreads("onos-comm-initiator"));
82
83 private final Timer timer = new Timer();
84 private final TimerTask connectionCustodian = new ConnectionCustodian();
tomcb43d602014-09-28 22:46:16 -070085
86 private ListenLoop listenLoop;
87 private List<CommLoop> commLoops = new ArrayList<>(WORKERS);
88
89 @Activate
90 public void activate() {
91 establishIdentity();
92 startCommunications();
93 startListening();
tom16555622014-09-29 08:49:27 -070094 startInitiating();
tomcb43d602014-09-28 22:46:16 -070095 log.info("Started");
96 }
97
tom16555622014-09-29 08:49:27 -070098 @Deactivate
99 public void deactivate() {
100 listenLoop.shutdown();
101 for (CommLoop loop : commLoops) {
102 loop.shutdown();
103 }
104 log.info("Stopped");
105 }
106
107
108 // Establishes the controller's own identity.
109 private void establishIdentity() {
110 IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
111 self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
112 nodes.put(self.id(), self);
113 }
114
115 // Kicks off the IO loops.
tomcb43d602014-09-28 22:46:16 -0700116 private void startCommunications() {
117 for (int i = 0; i < WORKERS; i++) {
118 try {
119 CommLoop loop = new CommLoop();
120 commLoops.add(loop);
121 commExecutors.execute(loop);
122 } catch (IOException e) {
123 log.warn("Unable to start comm IO loop", e);
124 }
125 }
126 }
127
128 // Starts listening for connections from peer cluster members.
129 private void startListening() {
130 try {
131 listenLoop = new ListenLoop(self.ip(), self.tcpPort());
132 listenExecutor.execute(listenLoop);
133 } catch (IOException e) {
134 log.error("Unable to listen for cluster connections", e);
135 }
136 }
137
tom16555622014-09-29 08:49:27 -0700138 /**
139 * Initiates open connection request and registers the pending socket
140 * channel with the given IO loop.
141 *
142 * @param loop loop with which the channel should be registered
143 * @throws java.io.IOException if the socket could not be open or connected
144 */
145 private void openConnection(DefaultControllerNode node, CommLoop loop) throws IOException {
146 SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
147 SocketChannel ch = SocketChannel.open();
148 nodesByChannel.put(ch, node);
149 ch.configureBlocking(false);
150 loop.connectStream(ch);
151 ch.connect(sa);
tomcb43d602014-09-28 22:46:16 -0700152 }
153
tom16555622014-09-29 08:49:27 -0700154
155 // Attempts to connect to any nodes that do not have an associated connection.
156 private void startInitiating() {
157 timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY, CONNECTION_CUSTODIAN_FREQUENCY);
tomcb43d602014-09-28 22:46:16 -0700158 }
159
160 @Override
161 public ControllerNode getLocalNode() {
162 return self;
163 }
164
165 @Override
166 public Set<ControllerNode> getNodes() {
167 ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
168 return builder.addAll(nodes.values()).build();
169 }
170
171 @Override
172 public ControllerNode getNode(NodeId nodeId) {
173 return nodes.get(nodeId);
174 }
175
176 @Override
177 public State getState(NodeId nodeId) {
178 State state = states.get(nodeId);
179 return state == null ? State.INACTIVE : state;
180 }
181
182 @Override
183 public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
184 DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
185 nodes.put(nodeId, node);
186 return node;
187 }
188
189 @Override
190 public void removeNode(NodeId nodeId) {
191 nodes.remove(nodeId);
192 }
193
194 // Listens and accepts inbound connections from other cluster nodes.
195 private class ListenLoop extends AcceptorLoop {
196 ListenLoop(IpPrefix ip, int tcpPort) throws IOException {
197 super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
198 }
199
200 @Override
201 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
tom16555622014-09-29 08:49:27 -0700202 SocketChannel sc = channel.accept();
203 sc.configureBlocking(false);
tomcb43d602014-09-28 22:46:16 -0700204
tom16555622014-09-29 08:49:27 -0700205 Socket so = sc.socket();
206 so.setTcpNoDelay(SO_NO_DELAY);
207 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
208 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
209
210 findLeastUtilizedLoop().acceptStream(sc);
211 log.info("Connected client");
tomcb43d602014-09-28 22:46:16 -0700212 }
213 }
214
215 private class CommLoop extends IOLoop<TLVMessage, TLVMessageStream> {
216 CommLoop() throws IOException {
217 super(SELECT_TIMEOUT);
218 }
219
220 @Override
221 protected TLVMessageStream createStream(ByteChannel byteChannel) {
222 return new TLVMessageStream(this, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
223 }
224
225 @Override
226 protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
227
228 }
tom16555622014-09-29 08:49:27 -0700229
230 @Override
231 public TLVMessageStream acceptStream(SocketChannel channel) {
232 TLVMessageStream stream = super.acceptStream(channel);
233 try {
234 InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
235 log.info("Accepted a new connection from {}", IpPrefix.valueOf(sa.getAddress().getAddress()));
236 } catch (IOException e) {
237 log.warn("Unable to accept connection from an unknown end-point", e);
238 }
239 return stream;
240 }
241
242 @Override
243 public TLVMessageStream connectStream(SocketChannel channel) {
244 TLVMessageStream stream = super.connectStream(channel);
245 DefaultControllerNode node = nodesByChannel.get(channel);
246 if (node != null) {
247 log.info("Opened connection to {}", node.id());
248 streams.put(node.id(), stream);
249 }
250 return stream;
251 }
252 }
253
254
255 // Sweeps through all controller nodes and attempts to open connection to
256 // those that presently do not have one.
257 private class ConnectionCustodian extends TimerTask {
258 @Override
259 public void run() {
260 for (DefaultControllerNode node : nodes.values()) {
261 if (node != self && !streams.containsKey(node.id())) {
262 try {
263 openConnection(node, findLeastUtilizedLoop());
264 } catch (IOException e) {
265 log.warn("Unable to connect", e);
266 }
267 }
268 }
269 }
270 }
271
272 // Finds the least utilities IO loop.
273 private CommLoop findLeastUtilizedLoop() {
274 CommLoop leastUtilized = null;
275 int minCount = Integer.MAX_VALUE;
276 for (CommLoop loop : commLoops) {
277 int count = loop.streamCount();
278 if (count == 0) {
279 return loop;
280 }
281
282 if (count < minCount) {
283 leastUtilized = loop;
284 minCount = count;
285 }
286 }
287 return leastUtilized;
tomcb43d602014-09-28 22:46:16 -0700288 }
289}