blob: 0970726beb550d1689115eb0c4b8b79c8edd4ff2 [file] [log] [blame]
tom1d416c52014-09-29 20:55:24 -07001package org.onlab.onos.store.cluster.messaging;
2
3import org.onlab.nio.IOLoop;
4import org.onlab.nio.MessageStream;
5import org.onlab.onos.cluster.DefaultControllerNode;
6
7import java.nio.ByteBuffer;
8import java.nio.channels.ByteChannel;
9
10import static com.google.common.base.Preconditions.checkState;
11
12/**
13 * Stream for transferring messages between two cluster members.
14 */
15public class ClusterMessageStream extends MessageStream<ClusterMessage> {
16
17 private static final int COMM_BUFFER_SIZE = 32 * 1024;
18 private static final int COMM_IDLE_TIME = 500;
19
20 private DefaultControllerNode node;
21 private SerializationService serializationService;
22
23 /**
24 * Creates a message stream associated with the specified IO loop and
25 * backed by the given byte channel.
26 *
27 * @param serializationService service for encoding/decoding messages
28 * @param loop IO loop
29 * @param byteChannel backing byte channel
30 */
31 public ClusterMessageStream(SerializationService serializationService,
32 IOLoop<ClusterMessage, ?> loop,
33 ByteChannel byteChannel) {
34 super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
35 this.serializationService = serializationService;
36 }
37
38 /**
39 * Returns the node with which this stream is associated.
40 *
41 * @return controller node
42 */
43 public DefaultControllerNode node() {
44 return node;
45 }
46
47 /**
48 * Sets the node with which this stream is affiliated.
49 *
50 * @param node controller node
51 */
52 public void setNode(DefaultControllerNode node) {
53 checkState(this.node == null, "Stream is already bound to a node");
54 this.node = node;
55 }
56
57 @Override
58 protected ClusterMessage read(ByteBuffer buffer) {
59 return serializationService.decode(buffer);
60 }
61
62 @Override
63 protected void write(ClusterMessage message, ByteBuffer buffer) {
64 serializationService.encode(message, buffer);
65 }
66
67}