blob: 2e2887c9a56c98fff9576150a1325d4b6b624400 [file] [log] [blame]
tom81583142014-09-30 01:40:29 -07001package org.onlab.onos.store.cluster.impl;
2
3import com.google.common.collect.HashMultimap;
4import com.google.common.collect.ImmutableSet;
5import com.google.common.collect.Multimap;
6import org.apache.felix.scr.annotations.Activate;
7import org.apache.felix.scr.annotations.Component;
8import org.apache.felix.scr.annotations.Deactivate;
9import org.apache.felix.scr.annotations.Reference;
10import org.apache.felix.scr.annotations.ReferenceCardinality;
11import org.apache.felix.scr.annotations.Service;
12import org.onlab.onos.cluster.DefaultControllerNode;
13import org.onlab.onos.cluster.NodeId;
14import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
tom28e1fa22014-09-30 10:38:21 -070015import org.onlab.onos.store.cluster.messaging.ClusterMembershipMessage;
tom81583142014-09-30 01:40:29 -070016import org.onlab.onos.store.cluster.messaging.ClusterMessage;
tom81583142014-09-30 01:40:29 -070017import org.onlab.onos.store.cluster.messaging.HelloMessage;
tom28e1fa22014-09-30 10:38:21 -070018import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
tom81583142014-09-30 01:40:29 -070019import org.onlab.onos.store.cluster.messaging.MessageSubject;
20import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
tom28e1fa22014-09-30 10:38:21 -070021import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
tom81583142014-09-30 01:40:29 -070022import org.onlab.onos.store.cluster.messaging.SerializationService;
23import org.onlab.packet.IpPrefix;
24import org.slf4j.Logger;
25import org.slf4j.LoggerFactory;
26
27import java.io.IOException;
28import java.net.InetSocketAddress;
29import java.net.SocketAddress;
30import java.nio.channels.SocketChannel;
31import java.util.ArrayList;
32import java.util.HashSet;
33import java.util.List;
34import java.util.Map;
35import java.util.Set;
36import java.util.Timer;
37import java.util.TimerTask;
38import java.util.concurrent.ConcurrentHashMap;
39import java.util.concurrent.ExecutorService;
40import java.util.concurrent.Executors;
41
42import static java.net.InetAddress.getByAddress;
43import static org.onlab.util.Tools.namedThreads;
44
45/**
46 * Implements the cluster communication services to use by other stores.
47 */
48@Component(immediate = true)
49@Service
50public class ClusterCommunicationManager
51 implements ClusterCommunicationService, ClusterCommunicationAdminService {
52
53 private final Logger log = LoggerFactory.getLogger(getClass());
54
tom43e836a2014-09-30 01:50:29 -070055 private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
56 private static final long CONNECTION_CUSTODIAN_FREQUENCY = 2000;
tom81583142014-09-30 01:40:29 -070057
58 private static final long START_TIMEOUT = 1000;
59 private static final int WORKERS = 3;
60
61 private ClusterConnectionListener connectionListener;
62 private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
63
64 private DefaultControllerNode localNode;
65 private ClusterNodesDelegate nodesDelegate;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 protected SerializationService serializationService;
69
70 // Nodes to be monitored to make sure they have a connection.
71 private final Set<DefaultControllerNode> nodes = new HashSet<>();
72
73 // Means to track message streams to other nodes.
74 private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
75
76 // TODO: use something different that won't require synchronization
77 private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
78
79 // Executor pools for listening and managing connections to other nodes.
80 private final ExecutorService listenExecutor =
81 Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
82 private final ExecutorService commExecutors =
83 Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
84 private final ExecutorService heartbeatExecutor =
85 Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
86
87 private final Timer timer = new Timer("onos-comm-initiator");
88 private final TimerTask connectionCustodian = new ConnectionCustodian();
tom28e1fa22014-09-30 10:38:21 -070089 private MembershipSubscriber membershipSubscriber = new MembershipSubscriber();
tom81583142014-09-30 01:40:29 -070090
91 @Activate
92 public void activate() {
tom28e1fa22014-09-30 10:38:21 -070093 addSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
94 addSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
tom81583142014-09-30 01:40:29 -070095 log.info("Activated but waiting for delegate");
96 }
97
98 @Deactivate
99 public void deactivate() {
tom28e1fa22014-09-30 10:38:21 -0700100 removeSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
101 removeSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
102
tom81583142014-09-30 01:40:29 -0700103 connectionCustodian.cancel();
104 if (connectionListener != null) {
105 connectionListener.shutdown();
106 for (ClusterIOWorker worker : workers) {
107 worker.shutdown();
108 }
109 }
110 log.info("Stopped");
111 }
112
113 @Override
tomd33e6402014-09-30 03:14:43 -0700114 public boolean send(ClusterMessage message) {
115 boolean ok = true;
116 for (DefaultControllerNode node : nodes) {
117 if (!node.equals(localNode)) {
118 ok = send(message, node.id()) && ok;
119 }
120 }
121 return ok;
122 }
123
124 @Override
tom81583142014-09-30 01:40:29 -0700125 public boolean send(ClusterMessage message, NodeId toNodeId) {
126 ClusterMessageStream stream = streams.get(toNodeId);
tomd33e6402014-09-30 03:14:43 -0700127 if (stream != null && !toNodeId.equals(localNode.id())) {
tom81583142014-09-30 01:40:29 -0700128 try {
129 stream.write(message);
130 return true;
131 } catch (IOException e) {
132 log.warn("Unable to send message {} to node {}",
133 message.subject(), toNodeId);
134 }
135 }
136 return false;
137 }
138
139 @Override
140 public synchronized void addSubscriber(MessageSubject subject,
141 MessageSubscriber subscriber) {
142 subscribers.put(subject, subscriber);
143 }
144
145 @Override
146 public synchronized void removeSubscriber(MessageSubject subject,
147 MessageSubscriber subscriber) {
148 subscribers.remove(subject, subscriber);
149 }
150
151 @Override
152 public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
153 return ImmutableSet.copyOf(subscribers.get(subject));
154 }
155
156 @Override
157 public void addNode(DefaultControllerNode node) {
158 nodes.add(node);
159 }
160
161 @Override
162 public void removeNode(DefaultControllerNode node) {
tom28e1fa22014-09-30 10:38:21 -0700163 send(new LeavingMemberMessage(node.id()));
tom81583142014-09-30 01:40:29 -0700164 nodes.remove(node);
165 ClusterMessageStream stream = streams.remove(node.id());
166 if (stream != null) {
167 stream.close();
168 }
169 }
170
171 @Override
172 public void startUp(DefaultControllerNode localNode,
173 ClusterNodesDelegate delegate) {
174 this.localNode = localNode;
175 this.nodesDelegate = delegate;
176
177 startCommunications();
178 startListening();
179 startInitiatingConnections();
180 log.info("Started");
181 }
182
tomd33e6402014-09-30 03:14:43 -0700183 @Override
184 public void clearAllNodesAndStreams() {
185 nodes.clear();
tom28e1fa22014-09-30 10:38:21 -0700186 send(new LeavingMemberMessage(localNode.id()));
tomd33e6402014-09-30 03:14:43 -0700187 for (ClusterMessageStream stream : streams.values()) {
188 stream.close();
189 }
190 streams.clear();
191 }
192
tom81583142014-09-30 01:40:29 -0700193 /**
194 * Dispatches the specified message to all subscribers to its subject.
195 *
tom28e1fa22014-09-30 10:38:21 -0700196 * @param message message to dispatch
tom81583142014-09-30 01:40:29 -0700197 * @param fromNodeId node from which the message was received
198 */
199 void dispatch(ClusterMessage message, NodeId fromNodeId) {
200 Set<MessageSubscriber> set = getSubscribers(message.subject());
201 if (set != null) {
202 for (MessageSubscriber subscriber : set) {
203 subscriber.receive(message, fromNodeId);
204 }
205 }
206 }
207
208 /**
tom28e1fa22014-09-30 10:38:21 -0700209 * Adds the stream associated with the specified node.
tom81583142014-09-30 01:40:29 -0700210 *
211 * @param nodeId newly detected cluster node id
212 * @param ip node IP listen address
213 * @param tcpPort node TCP listen port
214 * @return controller node bound to the stream
215 */
216 DefaultControllerNode addNodeStream(NodeId nodeId, IpPrefix ip, int tcpPort,
217 ClusterMessageStream stream) {
218 DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
219 stream.setNode(node);
220 streams.put(node.id(), stream);
tom28e1fa22014-09-30 10:38:21 -0700221 send(new NewMemberMessage(node.id(), node.ip(), node.tcpPort()));
tom81583142014-09-30 01:40:29 -0700222 return node;
223 }
224
225 /**
226 * Removes the stream associated with the specified node.
227 *
228 * @param node node whose stream to remove
229 */
230 void removeNodeStream(DefaultControllerNode node) {
231 nodesDelegate.nodeVanished(node.id());
232 streams.remove(node.id());
233 }
234
235 /**
236 * Finds the least utilized IO worker.
237 *
238 * @return IO worker
239 */
240 ClusterIOWorker findWorker() {
241 ClusterIOWorker leastUtilized = null;
242 int minCount = Integer.MAX_VALUE;
243 for (ClusterIOWorker worker : workers) {
244 int count = worker.streamCount();
245 if (count == 0) {
246 return worker;
247 }
248
249 if (count < minCount) {
250 leastUtilized = worker;
251 minCount = count;
252 }
253 }
254 return leastUtilized;
255 }
256
257 /**
258 * Kicks off the IO loops and waits for them to startup.
259 */
260 private void startCommunications() {
261 HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
262 localNode.tcpPort());
263 for (int i = 0; i < WORKERS; i++) {
264 try {
265 ClusterIOWorker worker =
266 new ClusterIOWorker(this, serializationService, hello);
267 workers.add(worker);
268 commExecutors.execute(worker);
269 } catch (IOException e) {
270 log.warn("Unable to start communication worker", e);
271 }
272 }
273
274 // Wait for the IO loops to start
275 for (ClusterIOWorker loop : workers) {
276 if (!loop.awaitStart(START_TIMEOUT)) {
277 log.warn("Comm loop did not start on-time; moving on...");
278 }
279 }
280 }
281
282 /**
283 * Starts listening for connections from peer cluster members.
284 */
285 private void startListening() {
286 try {
287 connectionListener =
288 new ClusterConnectionListener(this, localNode.ip(), localNode.tcpPort());
289 listenExecutor.execute(connectionListener);
290 if (!connectionListener.awaitStart(START_TIMEOUT)) {
291 log.warn("Listener did not start on-time; moving on...");
292 }
293 } catch (IOException e) {
294 log.error("Unable to listen for cluster connections", e);
295 }
296 }
297
298 /**
299 * Attempts to connect to any nodes that do not have an associated connection.
300 */
301 private void startInitiatingConnections() {
302 timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
303 CONNECTION_CUSTODIAN_FREQUENCY);
304 }
305
306 /**
307 * Initiates open connection request and registers the pending socket
308 * channel with the given IO worker.
309 *
310 * @param worker loop with which the channel should be registered
311 * @throws java.io.IOException if the socket could not be open or connected
312 */
313 private void initiateConnection(DefaultControllerNode node,
314 ClusterIOWorker worker) throws IOException {
315 SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
316 SocketChannel ch = SocketChannel.open();
317 ch.configureBlocking(false);
318 ch.connect(sa);
319 worker.connectStream(ch);
320 }
321
322 // Sweeps through all controller nodes and attempts to open connection to
323 // those that presently do not have one.
324 private class ConnectionCustodian extends TimerTask {
325 @Override
326 public void run() {
327 for (DefaultControllerNode node : nodes) {
328 if (!node.id().equals(localNode.id()) && !streams.containsKey(node.id())) {
329 try {
330 initiateConnection(node, findWorker());
331 } catch (IOException e) {
332 log.debug("Unable to connect", e);
333 }
334 }
335 }
336 }
337 }
338
tom28e1fa22014-09-30 10:38:21 -0700339 private class MembershipSubscriber implements MessageSubscriber {
tomd33e6402014-09-30 03:14:43 -0700340 @Override
341 public void receive(ClusterMessage message, NodeId fromNodeId) {
tom28e1fa22014-09-30 10:38:21 -0700342 MessageSubject subject = message.subject();
343 ClusterMembershipMessage cmm = (ClusterMembershipMessage) message;
344 if (message.subject() == MessageSubject.NEW_MEMBER) {
345 log.info("Node {} arrived", cmm.nodeId());
346 nodesDelegate.nodeDetected(cmm.nodeId(), cmm.ipAddress(), cmm.tcpPort());
347
348 } else if (subject == MessageSubject.LEAVING_MEMBER) {
349 log.info("Node {} is leaving", cmm.nodeId());
350 nodesDelegate.nodeRemoved(cmm.nodeId());
351 }
tomd33e6402014-09-30 03:14:43 -0700352 }
353 }
tom81583142014-09-30 01:40:29 -0700354}