blob: d182aa1634271ac2bea53a0f99e5e5838f567c42 [file] [log] [blame]
tom43e836a2014-09-30 01:50:29 -07001package org.onlab.onos.store.cluster.impl;
tom1d416c52014-09-29 20:55:24 -07002
3import org.onlab.nio.IOLoop;
4import org.onlab.nio.MessageStream;
5import org.onlab.onos.cluster.DefaultControllerNode;
tom43e836a2014-09-30 01:50:29 -07006import org.onlab.onos.store.cluster.messaging.ClusterMessage;
7import org.onlab.onos.store.cluster.messaging.SerializationService;
tom1d416c52014-09-29 20:55:24 -07008
9import java.nio.ByteBuffer;
10import java.nio.channels.ByteChannel;
11
12import static com.google.common.base.Preconditions.checkState;
13
14/**
15 * Stream for transferring messages between two cluster members.
16 */
17public class ClusterMessageStream extends MessageStream<ClusterMessage> {
18
19 private static final int COMM_BUFFER_SIZE = 32 * 1024;
20 private static final int COMM_IDLE_TIME = 500;
21
22 private DefaultControllerNode node;
23 private SerializationService serializationService;
24
25 /**
26 * Creates a message stream associated with the specified IO loop and
27 * backed by the given byte channel.
28 *
29 * @param serializationService service for encoding/decoding messages
30 * @param loop IO loop
31 * @param byteChannel backing byte channel
32 */
33 public ClusterMessageStream(SerializationService serializationService,
34 IOLoop<ClusterMessage, ?> loop,
35 ByteChannel byteChannel) {
36 super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
37 this.serializationService = serializationService;
38 }
39
40 /**
41 * Returns the node with which this stream is associated.
42 *
43 * @return controller node
44 */
45 public DefaultControllerNode node() {
46 return node;
47 }
48
49 /**
50 * Sets the node with which this stream is affiliated.
51 *
52 * @param node controller node
53 */
54 public void setNode(DefaultControllerNode node) {
55 checkState(this.node == null, "Stream is already bound to a node");
56 this.node = node;
57 }
58
59 @Override
60 protected ClusterMessage read(ByteBuffer buffer) {
61 return serializationService.decode(buffer);
62 }
63
64 @Override
65 protected void write(ClusterMessage message, ByteBuffer buffer) {
66 serializationService.encode(message, buffer);
67 }
68
69}