blob: c096a7d386caaf79292a0b84503490c46a05bb9c [file] [log] [blame]
tom1d416c52014-09-29 20:55:24 -07001package org.onlab.onos.store.cluster.impl;
2
3import org.onlab.nio.IOLoop;
4import org.onlab.nio.MessageStream;
5import org.onlab.onos.cluster.DefaultControllerNode;
tom81583142014-09-30 01:40:29 -07006import org.onlab.onos.cluster.NodeId;
tom1d416c52014-09-29 20:55:24 -07007import org.onlab.onos.store.cluster.messaging.ClusterMessage;
8import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
tom81583142014-09-30 01:40:29 -07009import org.onlab.onos.store.cluster.messaging.HelloMessage;
tom1d416c52014-09-29 20:55:24 -070010import org.onlab.onos.store.cluster.messaging.SerializationService;
11import org.slf4j.Logger;
12import org.slf4j.LoggerFactory;
13
14import java.io.IOException;
15import java.net.InetSocketAddress;
16import java.nio.channels.ByteChannel;
17import java.nio.channels.SelectionKey;
18import java.nio.channels.SocketChannel;
19import java.util.List;
20import java.util.Objects;
21
22import static org.onlab.packet.IpPrefix.valueOf;
23
24/**
25 * Performs the IO operations related to a cluster-wide communications.
26 */
27public class ClusterIOWorker extends
28 IOLoop<ClusterMessage, ClusterMessageStream> {
29
30 private final Logger log = LoggerFactory.getLogger(getClass());
31
32 private static final long SELECT_TIMEOUT = 50;
33
tom81583142014-09-30 01:40:29 -070034 private final ClusterCommunicationManager manager;
tom1d416c52014-09-29 20:55:24 -070035 private final SerializationService serializationService;
36 private final ClusterMessage helloMessage;
37
38 /**
39 * Creates a new cluster IO worker.
40 *
tom81583142014-09-30 01:40:29 -070041 * @param manager parent comms manager
tom1d416c52014-09-29 20:55:24 -070042 * @param serializationService serialization service for encode/decode
43 * @param helloMessage hello message for greeting peers
44 * @throws IOException if errors occur during IO loop ignition
45 */
tom81583142014-09-30 01:40:29 -070046 ClusterIOWorker(ClusterCommunicationManager manager,
tom1d416c52014-09-29 20:55:24 -070047 SerializationService serializationService,
48 ClusterMessage helloMessage) throws IOException {
49 super(SELECT_TIMEOUT);
tom81583142014-09-30 01:40:29 -070050 this.manager = manager;
tom1d416c52014-09-29 20:55:24 -070051 this.serializationService = serializationService;
52 this.helloMessage = helloMessage;
53 }
54
55 @Override
56 protected ClusterMessageStream createStream(ByteChannel byteChannel) {
57 return new ClusterMessageStream(serializationService, this, byteChannel);
58 }
59
60 @Override
61 protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) {
tom81583142014-09-30 01:40:29 -070062 NodeId nodeId = getNodeId(messages, (ClusterMessageStream) stream);
tom1d416c52014-09-29 20:55:24 -070063 for (ClusterMessage message : messages) {
tom81583142014-09-30 01:40:29 -070064 manager.dispatch(message, nodeId);
tom1d416c52014-09-29 20:55:24 -070065 }
66 }
67
tom81583142014-09-30 01:40:29 -070068 // Retrieves the node from the stream. If one is not bound, it attempts
69 // to bind it using the knowledge that the first message must be a hello.
70 private NodeId getNodeId(List<ClusterMessage> messages, ClusterMessageStream stream) {
71 DefaultControllerNode node = stream.node();
72 if (node == null && !messages.isEmpty()) {
73 ClusterMessage firstMessage = messages.get(0);
74 if (firstMessage instanceof HelloMessage) {
75 HelloMessage hello = (HelloMessage) firstMessage;
76 node = manager.addNodeStream(hello.nodeId(), hello.ipAddress(),
77 hello.tcpPort(), stream);
78 }
79 }
80 return node != null ? node.id() : null;
81 }
82
tom1d416c52014-09-29 20:55:24 -070083 @Override
84 public ClusterMessageStream acceptStream(SocketChannel channel) {
85 ClusterMessageStream stream = super.acceptStream(channel);
86 try {
87 InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
88 log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
89 stream.write(helloMessage);
90
91 } catch (IOException e) {
92 log.warn("Unable to accept connection from an unknown end-point", e);
93 }
94 return stream;
95 }
96
97 @Override
98 protected void connect(SelectionKey key) throws IOException {
99 try {
100 super.connect(key);
101 ClusterMessageStream stream = (ClusterMessageStream) key.attachment();
102 stream.write(helloMessage);
103
104 } catch (IOException e) {
105 if (!Objects.equals(e.getMessage(), "Connection refused")) {
106 throw e;
107 }
108 }
109 }
110
111 @Override
112 protected void removeStream(MessageStream<ClusterMessage> stream) {
113 DefaultControllerNode node = ((ClusterMessageStream) stream).node();
114 if (node != null) {
115 log.info("Closed connection to node {}", node.id());
tom81583142014-09-30 01:40:29 -0700116 manager.removeNodeStream(node);
tom1d416c52014-09-29 20:55:24 -0700117 }
118 super.removeStream(stream);
119 }
120
121}