blob: 8acc8fead5bb4790d5157568a2ac3430b385311c [file] [log] [blame]
tom81583142014-09-30 01:40:29 -07001package org.onlab.onos.store.cluster.impl;
2
3import com.google.common.collect.HashMultimap;
4import com.google.common.collect.ImmutableSet;
5import com.google.common.collect.Multimap;
6import org.apache.felix.scr.annotations.Activate;
7import org.apache.felix.scr.annotations.Component;
8import org.apache.felix.scr.annotations.Deactivate;
9import org.apache.felix.scr.annotations.Reference;
10import org.apache.felix.scr.annotations.ReferenceCardinality;
11import org.apache.felix.scr.annotations.Service;
12import org.onlab.onos.cluster.DefaultControllerNode;
13import org.onlab.onos.cluster.NodeId;
14import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
15import org.onlab.onos.store.cluster.messaging.ClusterMessage;
tom81583142014-09-30 01:40:29 -070016import org.onlab.onos.store.cluster.messaging.HelloMessage;
17import org.onlab.onos.store.cluster.messaging.MessageSubject;
18import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
19import org.onlab.onos.store.cluster.messaging.SerializationService;
20import org.onlab.packet.IpPrefix;
21import org.slf4j.Logger;
22import org.slf4j.LoggerFactory;
23
24import java.io.IOException;
25import java.net.InetSocketAddress;
26import java.net.SocketAddress;
27import java.nio.channels.SocketChannel;
28import java.util.ArrayList;
29import java.util.HashSet;
30import java.util.List;
31import java.util.Map;
32import java.util.Set;
33import java.util.Timer;
34import java.util.TimerTask;
35import java.util.concurrent.ConcurrentHashMap;
36import java.util.concurrent.ExecutorService;
37import java.util.concurrent.Executors;
38
39import static java.net.InetAddress.getByAddress;
40import static org.onlab.util.Tools.namedThreads;
41
42/**
43 * Implements the cluster communication services to use by other stores.
44 */
45@Component(immediate = true)
46@Service
47public class ClusterCommunicationManager
48 implements ClusterCommunicationService, ClusterCommunicationAdminService {
49
50 private final Logger log = LoggerFactory.getLogger(getClass());
51
tom43e836a2014-09-30 01:50:29 -070052 private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
53 private static final long CONNECTION_CUSTODIAN_FREQUENCY = 2000;
tom81583142014-09-30 01:40:29 -070054
55 private static final long START_TIMEOUT = 1000;
56 private static final int WORKERS = 3;
57
58 private ClusterConnectionListener connectionListener;
59 private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
60
61 private DefaultControllerNode localNode;
62 private ClusterNodesDelegate nodesDelegate;
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
65 protected SerializationService serializationService;
66
67 // Nodes to be monitored to make sure they have a connection.
68 private final Set<DefaultControllerNode> nodes = new HashSet<>();
69
70 // Means to track message streams to other nodes.
71 private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
72
73 // TODO: use something different that won't require synchronization
74 private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
75
76 // Executor pools for listening and managing connections to other nodes.
77 private final ExecutorService listenExecutor =
78 Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
79 private final ExecutorService commExecutors =
80 Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
81 private final ExecutorService heartbeatExecutor =
82 Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
83
84 private final Timer timer = new Timer("onos-comm-initiator");
85 private final TimerTask connectionCustodian = new ConnectionCustodian();
86
87 @Activate
88 public void activate() {
89 log.info("Activated but waiting for delegate");
90 }
91
92 @Deactivate
93 public void deactivate() {
94 connectionCustodian.cancel();
95 if (connectionListener != null) {
96 connectionListener.shutdown();
97 for (ClusterIOWorker worker : workers) {
98 worker.shutdown();
99 }
100 }
101 log.info("Stopped");
102 }
103
104 @Override
105 public boolean send(ClusterMessage message, NodeId toNodeId) {
106 ClusterMessageStream stream = streams.get(toNodeId);
107 if (stream != null) {
108 try {
109 stream.write(message);
110 return true;
111 } catch (IOException e) {
112 log.warn("Unable to send message {} to node {}",
113 message.subject(), toNodeId);
114 }
115 }
116 return false;
117 }
118
119 @Override
120 public synchronized void addSubscriber(MessageSubject subject,
121 MessageSubscriber subscriber) {
122 subscribers.put(subject, subscriber);
123 }
124
125 @Override
126 public synchronized void removeSubscriber(MessageSubject subject,
127 MessageSubscriber subscriber) {
128 subscribers.remove(subject, subscriber);
129 }
130
131 @Override
132 public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
133 return ImmutableSet.copyOf(subscribers.get(subject));
134 }
135
136 @Override
137 public void addNode(DefaultControllerNode node) {
138 nodes.add(node);
139 }
140
141 @Override
142 public void removeNode(DefaultControllerNode node) {
143 nodes.remove(node);
144 ClusterMessageStream stream = streams.remove(node.id());
145 if (stream != null) {
146 stream.close();
147 }
148 }
149
150 @Override
151 public void startUp(DefaultControllerNode localNode,
152 ClusterNodesDelegate delegate) {
153 this.localNode = localNode;
154 this.nodesDelegate = delegate;
155
156 startCommunications();
157 startListening();
158 startInitiatingConnections();
159 log.info("Started");
160 }
161
162 /**
163 * Dispatches the specified message to all subscribers to its subject.
164 *
165 * @param message message to dispatch
166 * @param fromNodeId node from which the message was received
167 */
168 void dispatch(ClusterMessage message, NodeId fromNodeId) {
169 Set<MessageSubscriber> set = getSubscribers(message.subject());
170 if (set != null) {
171 for (MessageSubscriber subscriber : set) {
172 subscriber.receive(message, fromNodeId);
173 }
174 }
175 }
176
177 /**
178 * Removes the stream associated with the specified node.
179 *
180 * @param nodeId newly detected cluster node id
181 * @param ip node IP listen address
182 * @param tcpPort node TCP listen port
183 * @return controller node bound to the stream
184 */
185 DefaultControllerNode addNodeStream(NodeId nodeId, IpPrefix ip, int tcpPort,
186 ClusterMessageStream stream) {
187 DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
188 stream.setNode(node);
189 streams.put(node.id(), stream);
190 return node;
191 }
192
193 /**
194 * Removes the stream associated with the specified node.
195 *
196 * @param node node whose stream to remove
197 */
198 void removeNodeStream(DefaultControllerNode node) {
199 nodesDelegate.nodeVanished(node.id());
200 streams.remove(node.id());
201 }
202
203 /**
204 * Finds the least utilized IO worker.
205 *
206 * @return IO worker
207 */
208 ClusterIOWorker findWorker() {
209 ClusterIOWorker leastUtilized = null;
210 int minCount = Integer.MAX_VALUE;
211 for (ClusterIOWorker worker : workers) {
212 int count = worker.streamCount();
213 if (count == 0) {
214 return worker;
215 }
216
217 if (count < minCount) {
218 leastUtilized = worker;
219 minCount = count;
220 }
221 }
222 return leastUtilized;
223 }
224
225 /**
226 * Kicks off the IO loops and waits for them to startup.
227 */
228 private void startCommunications() {
229 HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
230 localNode.tcpPort());
231 for (int i = 0; i < WORKERS; i++) {
232 try {
233 ClusterIOWorker worker =
234 new ClusterIOWorker(this, serializationService, hello);
235 workers.add(worker);
236 commExecutors.execute(worker);
237 } catch (IOException e) {
238 log.warn("Unable to start communication worker", e);
239 }
240 }
241
242 // Wait for the IO loops to start
243 for (ClusterIOWorker loop : workers) {
244 if (!loop.awaitStart(START_TIMEOUT)) {
245 log.warn("Comm loop did not start on-time; moving on...");
246 }
247 }
248 }
249
250 /**
251 * Starts listening for connections from peer cluster members.
252 */
253 private void startListening() {
254 try {
255 connectionListener =
256 new ClusterConnectionListener(this, localNode.ip(), localNode.tcpPort());
257 listenExecutor.execute(connectionListener);
258 if (!connectionListener.awaitStart(START_TIMEOUT)) {
259 log.warn("Listener did not start on-time; moving on...");
260 }
261 } catch (IOException e) {
262 log.error("Unable to listen for cluster connections", e);
263 }
264 }
265
266 /**
267 * Attempts to connect to any nodes that do not have an associated connection.
268 */
269 private void startInitiatingConnections() {
270 timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
271 CONNECTION_CUSTODIAN_FREQUENCY);
272 }
273
274 /**
275 * Initiates open connection request and registers the pending socket
276 * channel with the given IO worker.
277 *
278 * @param worker loop with which the channel should be registered
279 * @throws java.io.IOException if the socket could not be open or connected
280 */
281 private void initiateConnection(DefaultControllerNode node,
282 ClusterIOWorker worker) throws IOException {
283 SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
284 SocketChannel ch = SocketChannel.open();
285 ch.configureBlocking(false);
286 ch.connect(sa);
287 worker.connectStream(ch);
288 }
289
290 // Sweeps through all controller nodes and attempts to open connection to
291 // those that presently do not have one.
292 private class ConnectionCustodian extends TimerTask {
293 @Override
294 public void run() {
295 for (DefaultControllerNode node : nodes) {
296 if (!node.id().equals(localNode.id()) && !streams.containsKey(node.id())) {
297 try {
298 initiateConnection(node, findWorker());
299 } catch (IOException e) {
300 log.debug("Unable to connect", e);
301 }
302 }
303 }
304 }
305 }
306
307}