blob: a7dd3c0423cd5b1b5a29ae08c91afcfce2c7ce9b [file] [log] [blame]
Madan Jampanic26eede2015-04-16 11:42:16 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.nio.service;
17
18import java.nio.ByteBuffer;
19import java.nio.channels.ByteChannel;
20import java.util.concurrent.CompletableFuture;
21import java.util.concurrent.atomic.AtomicInteger;
22
23import org.onlab.nio.IOLoop;
24import org.onlab.nio.MessageStream;
25import org.onlab.packet.IpAddress;
26import org.onlab.packet.IpAddress.Version;
27import org.onosproject.store.cluster.messaging.Endpoint;
28
29import com.google.common.base.Charsets;
30
31/**
32 * Default bi-directional message stream for transferring messages to & from the
33 * network via two byte buffers.
34 */
35public class DefaultMessageStream extends MessageStream<DefaultMessage> {
36
37 private final CompletableFuture<Void> connectFuture = new CompletableFuture<>();
38
39 public DefaultMessageStream(
40 IOLoop<DefaultMessage, ?> loop,
41 ByteChannel byteChannel,
42 int bufferSize,
43 int maxIdleMillis) {
44 super(loop, byteChannel, bufferSize, maxIdleMillis);
45 }
46
47 public CompletableFuture<DefaultMessageStream> connectedFuture() {
48 return connectFuture.thenApply(v -> this);
49 }
50
51 private final AtomicInteger messageLength = new AtomicInteger(-1);
52
53 @Override
54 protected DefaultMessage read(ByteBuffer buffer) {
55 if (messageLength.get() == -1) {
56 // check if we can read the message length.
57 if (buffer.remaining() < Integer.BYTES) {
58 return null;
59 } else {
60 messageLength.set(buffer.getInt());
61 }
62 }
63
64 if (buffer.remaining() < messageLength.get()) {
65 return null;
66 }
67
68 long id = buffer.getLong();
69 Version ipVersion = buffer.get() == 0x0 ? Version.INET : Version.INET6;
70 byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
71 buffer.get(octects);
72 IpAddress senderIp = IpAddress.valueOf(ipVersion, octects);
73 int senderPort = buffer.getInt();
74 int messageTypeByteLength = buffer.getInt();
75 byte[] messageTypeBytes = new byte[messageTypeByteLength];
76 buffer.get(messageTypeBytes);
77 String messageType = new String(messageTypeBytes, Charsets.UTF_8);
78 int payloadLength = buffer.getInt();
79 byte[] payloadBytes = new byte[payloadLength];
80 buffer.get(payloadBytes);
81
82 // reset for next message
83 messageLength.set(-1);
84
85 return new DefaultMessage(id, new Endpoint(senderIp, senderPort), messageType, payloadBytes);
86 }
87
88 @Override
89 protected void write(DefaultMessage message, ByteBuffer buffer) {
90 Endpoint sender = message.sender();
91 byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8);
92 IpAddress senderIp = sender.host();
93 byte[] ipOctets = senderIp.toOctets();
94 byte[] payload = message.payload();
95
96 int messageLength = 21 + ipOctets.length + messageTypeBytes.length + payload.length;
97
98 buffer.putInt(messageLength);
99
100 buffer.putLong(message.id());
101
102 if (senderIp.version() == Version.INET) {
103 buffer.put((byte) 0x0);
104 } else {
105 buffer.put((byte) 0x1);
106 }
107 buffer.put(ipOctets);
108
109 // write sender port
110 buffer.putInt(sender.port());
111
112 // write length of message type
113 buffer.putInt(messageTypeBytes.length);
114
115 // write message type bytes
116 buffer.put(messageTypeBytes);
117
118 // write payload length
119 buffer.putInt(payload.length);
120
121 // write payload.
122 buffer.put(payload);
123 }
124
125 /**
126 * Callback invoked when the stream is successfully connected.
127 */
128 public void connected() {
129 connectFuture.complete(null);
130 }
131
132 /**
133 * Callback invoked when the stream fails to connect.
134 * @param cause failure cause
135 */
136 public void connectFailed(Throwable cause) {
137 connectFuture.completeExceptionally(cause);
138 }
139}