blob: bac32e9362efe37c07abfefa5768a8ce230441db [file] [log] [blame]
tom81583142014-09-30 01:40:29 -07001package org.onlab.onos.store.cluster.impl;
2
3import com.google.common.collect.HashMultimap;
4import com.google.common.collect.ImmutableSet;
5import com.google.common.collect.Multimap;
6import org.apache.felix.scr.annotations.Activate;
7import org.apache.felix.scr.annotations.Component;
8import org.apache.felix.scr.annotations.Deactivate;
9import org.apache.felix.scr.annotations.Reference;
10import org.apache.felix.scr.annotations.ReferenceCardinality;
11import org.apache.felix.scr.annotations.Service;
12import org.onlab.onos.cluster.DefaultControllerNode;
13import org.onlab.onos.cluster.NodeId;
14import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
15import org.onlab.onos.store.cluster.messaging.ClusterMessage;
tomd33e6402014-09-30 03:14:43 -070016import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
tom81583142014-09-30 01:40:29 -070017import org.onlab.onos.store.cluster.messaging.HelloMessage;
18import org.onlab.onos.store.cluster.messaging.MessageSubject;
19import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
20import org.onlab.onos.store.cluster.messaging.SerializationService;
21import org.onlab.packet.IpPrefix;
22import org.slf4j.Logger;
23import org.slf4j.LoggerFactory;
24
25import java.io.IOException;
26import java.net.InetSocketAddress;
27import java.net.SocketAddress;
28import java.nio.channels.SocketChannel;
29import java.util.ArrayList;
30import java.util.HashSet;
31import java.util.List;
32import java.util.Map;
33import java.util.Set;
34import java.util.Timer;
35import java.util.TimerTask;
36import java.util.concurrent.ConcurrentHashMap;
37import java.util.concurrent.ExecutorService;
38import java.util.concurrent.Executors;
39
40import static java.net.InetAddress.getByAddress;
41import static org.onlab.util.Tools.namedThreads;
42
43/**
44 * Implements the cluster communication services to use by other stores.
45 */
46@Component(immediate = true)
47@Service
48public class ClusterCommunicationManager
49 implements ClusterCommunicationService, ClusterCommunicationAdminService {
50
51 private final Logger log = LoggerFactory.getLogger(getClass());
52
tom43e836a2014-09-30 01:50:29 -070053 private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
54 private static final long CONNECTION_CUSTODIAN_FREQUENCY = 2000;
tom81583142014-09-30 01:40:29 -070055
56 private static final long START_TIMEOUT = 1000;
57 private static final int WORKERS = 3;
58
59 private ClusterConnectionListener connectionListener;
60 private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
61
62 private DefaultControllerNode localNode;
63 private ClusterNodesDelegate nodesDelegate;
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected SerializationService serializationService;
67
68 // Nodes to be monitored to make sure they have a connection.
69 private final Set<DefaultControllerNode> nodes = new HashSet<>();
70
71 // Means to track message streams to other nodes.
72 private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
73
74 // TODO: use something different that won't require synchronization
75 private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
76
77 // Executor pools for listening and managing connections to other nodes.
78 private final ExecutorService listenExecutor =
79 Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
80 private final ExecutorService commExecutors =
81 Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
82 private final ExecutorService heartbeatExecutor =
83 Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
84
85 private final Timer timer = new Timer("onos-comm-initiator");
86 private final TimerTask connectionCustodian = new ConnectionCustodian();
tomd33e6402014-09-30 03:14:43 -070087 private GoodbyeSubscriber goodbyeSubscriber = new GoodbyeSubscriber();
tom81583142014-09-30 01:40:29 -070088
89 @Activate
90 public void activate() {
tomd33e6402014-09-30 03:14:43 -070091 addSubscriber(MessageSubject.GOODBYE, goodbyeSubscriber);
tom81583142014-09-30 01:40:29 -070092 log.info("Activated but waiting for delegate");
93 }
94
95 @Deactivate
96 public void deactivate() {
97 connectionCustodian.cancel();
98 if (connectionListener != null) {
99 connectionListener.shutdown();
100 for (ClusterIOWorker worker : workers) {
101 worker.shutdown();
102 }
103 }
104 log.info("Stopped");
105 }
106
107 @Override
tomd33e6402014-09-30 03:14:43 -0700108 public boolean send(ClusterMessage message) {
109 boolean ok = true;
110 for (DefaultControllerNode node : nodes) {
111 if (!node.equals(localNode)) {
112 ok = send(message, node.id()) && ok;
113 }
114 }
115 return ok;
116 }
117
118 @Override
tom81583142014-09-30 01:40:29 -0700119 public boolean send(ClusterMessage message, NodeId toNodeId) {
120 ClusterMessageStream stream = streams.get(toNodeId);
tomd33e6402014-09-30 03:14:43 -0700121 if (stream != null && !toNodeId.equals(localNode.id())) {
tom81583142014-09-30 01:40:29 -0700122 try {
123 stream.write(message);
124 return true;
125 } catch (IOException e) {
126 log.warn("Unable to send message {} to node {}",
127 message.subject(), toNodeId);
128 }
129 }
130 return false;
131 }
132
133 @Override
134 public synchronized void addSubscriber(MessageSubject subject,
135 MessageSubscriber subscriber) {
136 subscribers.put(subject, subscriber);
137 }
138
139 @Override
140 public synchronized void removeSubscriber(MessageSubject subject,
141 MessageSubscriber subscriber) {
142 subscribers.remove(subject, subscriber);
143 }
144
145 @Override
146 public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
147 return ImmutableSet.copyOf(subscribers.get(subject));
148 }
149
150 @Override
151 public void addNode(DefaultControllerNode node) {
152 nodes.add(node);
153 }
154
155 @Override
156 public void removeNode(DefaultControllerNode node) {
tomd33e6402014-09-30 03:14:43 -0700157 send(new GoodbyeMessage(node.id()));
tom81583142014-09-30 01:40:29 -0700158 nodes.remove(node);
159 ClusterMessageStream stream = streams.remove(node.id());
160 if (stream != null) {
161 stream.close();
162 }
163 }
164
165 @Override
166 public void startUp(DefaultControllerNode localNode,
167 ClusterNodesDelegate delegate) {
168 this.localNode = localNode;
169 this.nodesDelegate = delegate;
170
171 startCommunications();
172 startListening();
173 startInitiatingConnections();
174 log.info("Started");
175 }
176
tomd33e6402014-09-30 03:14:43 -0700177 @Override
178 public void clearAllNodesAndStreams() {
179 nodes.clear();
180 send(new GoodbyeMessage(localNode.id()));
181 for (ClusterMessageStream stream : streams.values()) {
182 stream.close();
183 }
184 streams.clear();
185 }
186
tom81583142014-09-30 01:40:29 -0700187 /**
188 * Dispatches the specified message to all subscribers to its subject.
189 *
190 * @param message message to dispatch
191 * @param fromNodeId node from which the message was received
192 */
193 void dispatch(ClusterMessage message, NodeId fromNodeId) {
194 Set<MessageSubscriber> set = getSubscribers(message.subject());
195 if (set != null) {
196 for (MessageSubscriber subscriber : set) {
197 subscriber.receive(message, fromNodeId);
198 }
199 }
200 }
201
202 /**
203 * Removes the stream associated with the specified node.
204 *
205 * @param nodeId newly detected cluster node id
206 * @param ip node IP listen address
207 * @param tcpPort node TCP listen port
208 * @return controller node bound to the stream
209 */
210 DefaultControllerNode addNodeStream(NodeId nodeId, IpPrefix ip, int tcpPort,
211 ClusterMessageStream stream) {
212 DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
213 stream.setNode(node);
214 streams.put(node.id(), stream);
215 return node;
216 }
217
218 /**
219 * Removes the stream associated with the specified node.
220 *
221 * @param node node whose stream to remove
222 */
223 void removeNodeStream(DefaultControllerNode node) {
224 nodesDelegate.nodeVanished(node.id());
225 streams.remove(node.id());
226 }
227
228 /**
229 * Finds the least utilized IO worker.
230 *
231 * @return IO worker
232 */
233 ClusterIOWorker findWorker() {
234 ClusterIOWorker leastUtilized = null;
235 int minCount = Integer.MAX_VALUE;
236 for (ClusterIOWorker worker : workers) {
237 int count = worker.streamCount();
238 if (count == 0) {
239 return worker;
240 }
241
242 if (count < minCount) {
243 leastUtilized = worker;
244 minCount = count;
245 }
246 }
247 return leastUtilized;
248 }
249
250 /**
251 * Kicks off the IO loops and waits for them to startup.
252 */
253 private void startCommunications() {
254 HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
255 localNode.tcpPort());
256 for (int i = 0; i < WORKERS; i++) {
257 try {
258 ClusterIOWorker worker =
259 new ClusterIOWorker(this, serializationService, hello);
260 workers.add(worker);
261 commExecutors.execute(worker);
262 } catch (IOException e) {
263 log.warn("Unable to start communication worker", e);
264 }
265 }
266
267 // Wait for the IO loops to start
268 for (ClusterIOWorker loop : workers) {
269 if (!loop.awaitStart(START_TIMEOUT)) {
270 log.warn("Comm loop did not start on-time; moving on...");
271 }
272 }
273 }
274
275 /**
276 * Starts listening for connections from peer cluster members.
277 */
278 private void startListening() {
279 try {
280 connectionListener =
281 new ClusterConnectionListener(this, localNode.ip(), localNode.tcpPort());
282 listenExecutor.execute(connectionListener);
283 if (!connectionListener.awaitStart(START_TIMEOUT)) {
284 log.warn("Listener did not start on-time; moving on...");
285 }
286 } catch (IOException e) {
287 log.error("Unable to listen for cluster connections", e);
288 }
289 }
290
291 /**
292 * Attempts to connect to any nodes that do not have an associated connection.
293 */
294 private void startInitiatingConnections() {
295 timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
296 CONNECTION_CUSTODIAN_FREQUENCY);
297 }
298
299 /**
300 * Initiates open connection request and registers the pending socket
301 * channel with the given IO worker.
302 *
303 * @param worker loop with which the channel should be registered
304 * @throws java.io.IOException if the socket could not be open or connected
305 */
306 private void initiateConnection(DefaultControllerNode node,
307 ClusterIOWorker worker) throws IOException {
308 SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
309 SocketChannel ch = SocketChannel.open();
310 ch.configureBlocking(false);
311 ch.connect(sa);
312 worker.connectStream(ch);
313 }
314
315 // Sweeps through all controller nodes and attempts to open connection to
316 // those that presently do not have one.
317 private class ConnectionCustodian extends TimerTask {
318 @Override
319 public void run() {
320 for (DefaultControllerNode node : nodes) {
321 if (!node.id().equals(localNode.id()) && !streams.containsKey(node.id())) {
322 try {
323 initiateConnection(node, findWorker());
324 } catch (IOException e) {
325 log.debug("Unable to connect", e);
326 }
327 }
328 }
329 }
330 }
331
tomd33e6402014-09-30 03:14:43 -0700332 private class GoodbyeSubscriber implements MessageSubscriber {
333 @Override
334 public void receive(ClusterMessage message, NodeId fromNodeId) {
335 log.info("Received goodbye message from {}", fromNodeId);
336 nodesDelegate.nodeRemoved(fromNodeId);
337 }
338 }
tom81583142014-09-30 01:40:29 -0700339}