blob: 492afbc9294e7c937953406fe44062fa09853e4a [file] [log] [blame]
tom81583142014-09-30 01:40:29 -07001package org.onlab.onos.store.cluster.impl;
2
3import com.google.common.collect.HashMultimap;
4import com.google.common.collect.ImmutableSet;
5import com.google.common.collect.Multimap;
6import org.apache.felix.scr.annotations.Activate;
7import org.apache.felix.scr.annotations.Component;
8import org.apache.felix.scr.annotations.Deactivate;
9import org.apache.felix.scr.annotations.Reference;
10import org.apache.felix.scr.annotations.ReferenceCardinality;
11import org.apache.felix.scr.annotations.Service;
12import org.onlab.onos.cluster.DefaultControllerNode;
13import org.onlab.onos.cluster.NodeId;
14import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
15import org.onlab.onos.store.cluster.messaging.ClusterMessage;
16import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
17import org.onlab.onos.store.cluster.messaging.HelloMessage;
18import org.onlab.onos.store.cluster.messaging.MessageSubject;
19import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
20import org.onlab.onos.store.cluster.messaging.SerializationService;
21import org.onlab.packet.IpPrefix;
22import org.slf4j.Logger;
23import org.slf4j.LoggerFactory;
24
25import java.io.IOException;
26import java.net.InetSocketAddress;
27import java.net.SocketAddress;
28import java.nio.channels.SocketChannel;
29import java.util.ArrayList;
30import java.util.HashSet;
31import java.util.List;
32import java.util.Map;
33import java.util.Set;
34import java.util.Timer;
35import java.util.TimerTask;
36import java.util.concurrent.ConcurrentHashMap;
37import java.util.concurrent.ExecutorService;
38import java.util.concurrent.Executors;
39
40import static java.net.InetAddress.getByAddress;
41import static org.onlab.util.Tools.namedThreads;
42
43/**
44 * Implements the cluster communication services to use by other stores.
45 */
46@Component(immediate = true)
47@Service
48public class ClusterCommunicationManager
49 implements ClusterCommunicationService, ClusterCommunicationAdminService {
50
51 private final Logger log = LoggerFactory.getLogger(getClass());
52
53 private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
54 private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
55
56 private static final long START_TIMEOUT = 1000;
57 private static final int WORKERS = 3;
58
59 private ClusterConnectionListener connectionListener;
60 private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
61
62 private DefaultControllerNode localNode;
63 private ClusterNodesDelegate nodesDelegate;
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected SerializationService serializationService;
67
68 // Nodes to be monitored to make sure they have a connection.
69 private final Set<DefaultControllerNode> nodes = new HashSet<>();
70
71 // Means to track message streams to other nodes.
72 private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
73
74 // TODO: use something different that won't require synchronization
75 private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
76
77 // Executor pools for listening and managing connections to other nodes.
78 private final ExecutorService listenExecutor =
79 Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
80 private final ExecutorService commExecutors =
81 Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
82 private final ExecutorService heartbeatExecutor =
83 Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
84
85 private final Timer timer = new Timer("onos-comm-initiator");
86 private final TimerTask connectionCustodian = new ConnectionCustodian();
87
88 @Activate
89 public void activate() {
90 log.info("Activated but waiting for delegate");
91 }
92
93 @Deactivate
94 public void deactivate() {
95 connectionCustodian.cancel();
96 if (connectionListener != null) {
97 connectionListener.shutdown();
98 for (ClusterIOWorker worker : workers) {
99 worker.shutdown();
100 }
101 }
102 log.info("Stopped");
103 }
104
105 @Override
106 public boolean send(ClusterMessage message, NodeId toNodeId) {
107 ClusterMessageStream stream = streams.get(toNodeId);
108 if (stream != null) {
109 try {
110 stream.write(message);
111 return true;
112 } catch (IOException e) {
113 log.warn("Unable to send message {} to node {}",
114 message.subject(), toNodeId);
115 }
116 }
117 return false;
118 }
119
120 @Override
121 public synchronized void addSubscriber(MessageSubject subject,
122 MessageSubscriber subscriber) {
123 subscribers.put(subject, subscriber);
124 }
125
126 @Override
127 public synchronized void removeSubscriber(MessageSubject subject,
128 MessageSubscriber subscriber) {
129 subscribers.remove(subject, subscriber);
130 }
131
132 @Override
133 public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
134 return ImmutableSet.copyOf(subscribers.get(subject));
135 }
136
137 @Override
138 public void addNode(DefaultControllerNode node) {
139 nodes.add(node);
140 }
141
142 @Override
143 public void removeNode(DefaultControllerNode node) {
144 nodes.remove(node);
145 ClusterMessageStream stream = streams.remove(node.id());
146 if (stream != null) {
147 stream.close();
148 }
149 }
150
151 @Override
152 public void startUp(DefaultControllerNode localNode,
153 ClusterNodesDelegate delegate) {
154 this.localNode = localNode;
155 this.nodesDelegate = delegate;
156
157 startCommunications();
158 startListening();
159 startInitiatingConnections();
160 log.info("Started");
161 }
162
163 /**
164 * Dispatches the specified message to all subscribers to its subject.
165 *
166 * @param message message to dispatch
167 * @param fromNodeId node from which the message was received
168 */
169 void dispatch(ClusterMessage message, NodeId fromNodeId) {
170 Set<MessageSubscriber> set = getSubscribers(message.subject());
171 if (set != null) {
172 for (MessageSubscriber subscriber : set) {
173 subscriber.receive(message, fromNodeId);
174 }
175 }
176 }
177
178 /**
179 * Removes the stream associated with the specified node.
180 *
181 * @param nodeId newly detected cluster node id
182 * @param ip node IP listen address
183 * @param tcpPort node TCP listen port
184 * @return controller node bound to the stream
185 */
186 DefaultControllerNode addNodeStream(NodeId nodeId, IpPrefix ip, int tcpPort,
187 ClusterMessageStream stream) {
188 DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
189 stream.setNode(node);
190 streams.put(node.id(), stream);
191 return node;
192 }
193
194 /**
195 * Removes the stream associated with the specified node.
196 *
197 * @param node node whose stream to remove
198 */
199 void removeNodeStream(DefaultControllerNode node) {
200 nodesDelegate.nodeVanished(node.id());
201 streams.remove(node.id());
202 }
203
204 /**
205 * Finds the least utilized IO worker.
206 *
207 * @return IO worker
208 */
209 ClusterIOWorker findWorker() {
210 ClusterIOWorker leastUtilized = null;
211 int minCount = Integer.MAX_VALUE;
212 for (ClusterIOWorker worker : workers) {
213 int count = worker.streamCount();
214 if (count == 0) {
215 return worker;
216 }
217
218 if (count < minCount) {
219 leastUtilized = worker;
220 minCount = count;
221 }
222 }
223 return leastUtilized;
224 }
225
226 /**
227 * Kicks off the IO loops and waits for them to startup.
228 */
229 private void startCommunications() {
230 HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
231 localNode.tcpPort());
232 for (int i = 0; i < WORKERS; i++) {
233 try {
234 ClusterIOWorker worker =
235 new ClusterIOWorker(this, serializationService, hello);
236 workers.add(worker);
237 commExecutors.execute(worker);
238 } catch (IOException e) {
239 log.warn("Unable to start communication worker", e);
240 }
241 }
242
243 // Wait for the IO loops to start
244 for (ClusterIOWorker loop : workers) {
245 if (!loop.awaitStart(START_TIMEOUT)) {
246 log.warn("Comm loop did not start on-time; moving on...");
247 }
248 }
249 }
250
251 /**
252 * Starts listening for connections from peer cluster members.
253 */
254 private void startListening() {
255 try {
256 connectionListener =
257 new ClusterConnectionListener(this, localNode.ip(), localNode.tcpPort());
258 listenExecutor.execute(connectionListener);
259 if (!connectionListener.awaitStart(START_TIMEOUT)) {
260 log.warn("Listener did not start on-time; moving on...");
261 }
262 } catch (IOException e) {
263 log.error("Unable to listen for cluster connections", e);
264 }
265 }
266
267 /**
268 * Attempts to connect to any nodes that do not have an associated connection.
269 */
270 private void startInitiatingConnections() {
271 timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
272 CONNECTION_CUSTODIAN_FREQUENCY);
273 }
274
275 /**
276 * Initiates open connection request and registers the pending socket
277 * channel with the given IO worker.
278 *
279 * @param worker loop with which the channel should be registered
280 * @throws java.io.IOException if the socket could not be open or connected
281 */
282 private void initiateConnection(DefaultControllerNode node,
283 ClusterIOWorker worker) throws IOException {
284 SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
285 SocketChannel ch = SocketChannel.open();
286 ch.configureBlocking(false);
287 ch.connect(sa);
288 worker.connectStream(ch);
289 }
290
291 // Sweeps through all controller nodes and attempts to open connection to
292 // those that presently do not have one.
293 private class ConnectionCustodian extends TimerTask {
294 @Override
295 public void run() {
296 for (DefaultControllerNode node : nodes) {
297 if (!node.id().equals(localNode.id()) && !streams.containsKey(node.id())) {
298 try {
299 initiateConnection(node, findWorker());
300 } catch (IOException e) {
301 log.debug("Unable to connect", e);
302 }
303 }
304 }
305 }
306 }
307
308}