blob: 0e939857cfe1be92c03a95ad68bf2131e353b69b [file] [log] [blame]
tom1d416c52014-09-29 20:55:24 -07001package org.onlab.onos.store.cluster.impl;
2
3import org.onlab.nio.IOLoop;
4import org.onlab.nio.MessageStream;
5import org.onlab.onos.cluster.DefaultControllerNode;
6import org.onlab.onos.store.cluster.messaging.ClusterMessage;
7import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
8import org.onlab.onos.store.cluster.messaging.SerializationService;
9import org.slf4j.Logger;
10import org.slf4j.LoggerFactory;
11
12import java.io.IOException;
13import java.net.InetSocketAddress;
14import java.nio.channels.ByteChannel;
15import java.nio.channels.SelectionKey;
16import java.nio.channels.SocketChannel;
17import java.util.List;
18import java.util.Objects;
19
20import static org.onlab.packet.IpPrefix.valueOf;
21
22/**
23 * Performs the IO operations related to a cluster-wide communications.
24 */
25public class ClusterIOWorker extends
26 IOLoop<ClusterMessage, ClusterMessageStream> {
27
28 private final Logger log = LoggerFactory.getLogger(getClass());
29
30 private static final long SELECT_TIMEOUT = 50;
31
32 private final ConnectionManager connectionManager;
33 private final CommunicationsDelegate commsDelegate;
34 private final SerializationService serializationService;
35 private final ClusterMessage helloMessage;
36
37 /**
38 * Creates a new cluster IO worker.
39 *
40 * @param connectionManager parent connection manager
41 * @param commsDelegate communications delegate for dispatching
42 * @param serializationService serialization service for encode/decode
43 * @param helloMessage hello message for greeting peers
44 * @throws IOException if errors occur during IO loop ignition
45 */
46 ClusterIOWorker(ConnectionManager connectionManager,
47 CommunicationsDelegate commsDelegate,
48 SerializationService serializationService,
49 ClusterMessage helloMessage) throws IOException {
50 super(SELECT_TIMEOUT);
51 this.connectionManager = connectionManager;
52 this.commsDelegate = commsDelegate;
53 this.serializationService = serializationService;
54 this.helloMessage = helloMessage;
55 }
56
57 @Override
58 protected ClusterMessageStream createStream(ByteChannel byteChannel) {
59 return new ClusterMessageStream(serializationService, this, byteChannel);
60 }
61
62 @Override
63 protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) {
64 for (ClusterMessage message : messages) {
65 commsDelegate.dispatch(message);
66 }
67 }
68
69 @Override
70 public ClusterMessageStream acceptStream(SocketChannel channel) {
71 ClusterMessageStream stream = super.acceptStream(channel);
72 try {
73 InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
74 log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
75 stream.write(helloMessage);
76
77 } catch (IOException e) {
78 log.warn("Unable to accept connection from an unknown end-point", e);
79 }
80 return stream;
81 }
82
83 @Override
84 protected void connect(SelectionKey key) throws IOException {
85 try {
86 super.connect(key);
87 ClusterMessageStream stream = (ClusterMessageStream) key.attachment();
88 stream.write(helloMessage);
89
90 } catch (IOException e) {
91 if (!Objects.equals(e.getMessage(), "Connection refused")) {
92 throw e;
93 }
94 }
95 }
96
97 @Override
98 protected void removeStream(MessageStream<ClusterMessage> stream) {
99 DefaultControllerNode node = ((ClusterMessageStream) stream).node();
100 if (node != null) {
101 log.info("Closed connection to node {}", node.id());
102 connectionManager.removeNodeStream(node);
103 }
104 super.removeStream(stream);
105 }
106
107}