blob: d442cc85b7ce14d71adc18da7a24caf8810bda6d [file] [log] [blame]
tom1d416c52014-09-29 20:55:24 -07001package org.onlab.onos.store.cluster.impl;
2
3import org.onlab.nio.IOLoop;
4import org.onlab.nio.MessageStream;
5import org.onlab.onos.cluster.DefaultControllerNode;
tom81583142014-09-30 01:40:29 -07006import org.onlab.onos.cluster.NodeId;
tom1d416c52014-09-29 20:55:24 -07007import org.onlab.onos.store.cluster.messaging.ClusterMessage;
tom81583142014-09-30 01:40:29 -07008import org.onlab.onos.store.cluster.messaging.HelloMessage;
tom1d416c52014-09-29 20:55:24 -07009import org.onlab.onos.store.cluster.messaging.SerializationService;
10import org.slf4j.Logger;
11import org.slf4j.LoggerFactory;
12
13import java.io.IOException;
14import java.net.InetSocketAddress;
15import java.nio.channels.ByteChannel;
16import java.nio.channels.SelectionKey;
17import java.nio.channels.SocketChannel;
18import java.util.List;
19import java.util.Objects;
20
21import static org.onlab.packet.IpPrefix.valueOf;
22
23/**
24 * Performs the IO operations related to a cluster-wide communications.
25 */
26public class ClusterIOWorker extends
27 IOLoop<ClusterMessage, ClusterMessageStream> {
28
29 private final Logger log = LoggerFactory.getLogger(getClass());
30
31 private static final long SELECT_TIMEOUT = 50;
32
tom81583142014-09-30 01:40:29 -070033 private final ClusterCommunicationManager manager;
tom1d416c52014-09-29 20:55:24 -070034 private final SerializationService serializationService;
35 private final ClusterMessage helloMessage;
36
37 /**
38 * Creates a new cluster IO worker.
39 *
tom81583142014-09-30 01:40:29 -070040 * @param manager parent comms manager
tom1d416c52014-09-29 20:55:24 -070041 * @param serializationService serialization service for encode/decode
42 * @param helloMessage hello message for greeting peers
43 * @throws IOException if errors occur during IO loop ignition
44 */
tom81583142014-09-30 01:40:29 -070045 ClusterIOWorker(ClusterCommunicationManager manager,
tom1d416c52014-09-29 20:55:24 -070046 SerializationService serializationService,
47 ClusterMessage helloMessage) throws IOException {
48 super(SELECT_TIMEOUT);
tom81583142014-09-30 01:40:29 -070049 this.manager = manager;
tom1d416c52014-09-29 20:55:24 -070050 this.serializationService = serializationService;
51 this.helloMessage = helloMessage;
52 }
53
54 @Override
55 protected ClusterMessageStream createStream(ByteChannel byteChannel) {
56 return new ClusterMessageStream(serializationService, this, byteChannel);
57 }
58
59 @Override
60 protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) {
tom81583142014-09-30 01:40:29 -070061 NodeId nodeId = getNodeId(messages, (ClusterMessageStream) stream);
tom1d416c52014-09-29 20:55:24 -070062 for (ClusterMessage message : messages) {
tom81583142014-09-30 01:40:29 -070063 manager.dispatch(message, nodeId);
tom1d416c52014-09-29 20:55:24 -070064 }
65 }
66
tom81583142014-09-30 01:40:29 -070067 // Retrieves the node from the stream. If one is not bound, it attempts
68 // to bind it using the knowledge that the first message must be a hello.
69 private NodeId getNodeId(List<ClusterMessage> messages, ClusterMessageStream stream) {
70 DefaultControllerNode node = stream.node();
71 if (node == null && !messages.isEmpty()) {
72 ClusterMessage firstMessage = messages.get(0);
73 if (firstMessage instanceof HelloMessage) {
74 HelloMessage hello = (HelloMessage) firstMessage;
75 node = manager.addNodeStream(hello.nodeId(), hello.ipAddress(),
76 hello.tcpPort(), stream);
77 }
78 }
79 return node != null ? node.id() : null;
80 }
81
tom1d416c52014-09-29 20:55:24 -070082 @Override
83 public ClusterMessageStream acceptStream(SocketChannel channel) {
84 ClusterMessageStream stream = super.acceptStream(channel);
85 try {
86 InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
87 log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
88 stream.write(helloMessage);
89
90 } catch (IOException e) {
91 log.warn("Unable to accept connection from an unknown end-point", e);
92 }
93 return stream;
94 }
95
96 @Override
97 protected void connect(SelectionKey key) throws IOException {
98 try {
99 super.connect(key);
100 ClusterMessageStream stream = (ClusterMessageStream) key.attachment();
101 stream.write(helloMessage);
102
103 } catch (IOException e) {
104 if (!Objects.equals(e.getMessage(), "Connection refused")) {
105 throw e;
106 }
107 }
108 }
109
110 @Override
111 protected void removeStream(MessageStream<ClusterMessage> stream) {
112 DefaultControllerNode node = ((ClusterMessageStream) stream).node();
113 if (node != null) {
114 log.info("Closed connection to node {}", node.id());
tom81583142014-09-30 01:40:29 -0700115 manager.removeNodeStream(node);
tom1d416c52014-09-29 20:55:24 -0700116 }
117 super.removeStream(stream);
118 }
119
120}