blob: c34d3ccad6e8632ba8875b03820619f5c40befbb [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska24c849c2014-10-27 09:53:05 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska24c849c2014-10-27 09:53:05 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska24c849c2014-10-27 09:53:05 -070015 */
Madan Jampaniab6d3112014-10-02 16:30:14 -070016package org.onlab.netty;
17
Madan Jampani86ed0552014-10-03 16:45:42 -070018import static com.google.common.base.Preconditions.checkState;
19import io.netty.buffer.ByteBuf;
20import io.netty.channel.ChannelHandlerContext;
21import io.netty.handler.codec.ReplayingDecoder;
22
Madan Jampaniab6d3112014-10-02 16:30:14 -070023import java.util.List;
24
Madan Jampani2e5f87b2015-02-22 10:37:15 -080025import org.onlab.packet.IpAddress;
26import org.onlab.packet.IpAddress.Version;
Madan Jampanic26eede2015-04-16 11:42:16 -070027import org.onosproject.store.cluster.messaging.Endpoint;
Madan Jampani29e5dfd2014-10-07 17:26:25 -070028import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
Madan Jampani49115e92015-03-14 10:43:33 -070031import com.google.common.base.Charsets;
32
Madan Jampani938aa432014-10-04 17:37:23 -070033/**
34 * Decoder for inbound messages.
35 */
36public class MessageDecoder extends ReplayingDecoder<DecoderState> {
Madan Jampaniab6d3112014-10-02 16:30:14 -070037
Madan Jampani29e5dfd2014-10-07 17:26:25 -070038 private final Logger log = LoggerFactory.getLogger(getClass());
39
Madan Jampani2e5f87b2015-02-22 10:37:15 -080040 private long messageId;
41 private Version ipVersion;
42 private IpAddress senderIp;
43 private int senderPort;
Madan Jampani49115e92015-03-14 10:43:33 -070044 private int messageTypeLength;
45 private String messageType;
Madan Jampani938aa432014-10-04 17:37:23 -070046 private int contentLength;
47
Madan Jampanic26eede2015-04-16 11:42:16 -070048 public MessageDecoder() {
Madan Jampani2e5f87b2015-02-22 10:37:15 -080049 super(DecoderState.READ_MESSAGE_ID);
Madan Jampaniab6d3112014-10-02 16:30:14 -070050 }
51
52 @Override
Madan Jampani86ed0552014-10-03 16:45:42 -070053 protected void decode(
54 ChannelHandlerContext context,
55 ByteBuf buffer,
56 List<Object> out) throws Exception {
Madan Jampaniab6d3112014-10-02 16:30:14 -070057
Yuta HIGUCHI5e8ceb42014-11-04 17:22:26 -080058 switch (state()) {
Madan Jampani2e5f87b2015-02-22 10:37:15 -080059 case READ_MESSAGE_ID:
60 messageId = buffer.readLong();
61 checkpoint(DecoderState.READ_SENDER_IP_VERSION);
62 case READ_SENDER_IP_VERSION:
63 ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
64 checkpoint(DecoderState.READ_SENDER_IP);
65 case READ_SENDER_IP:
66 byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
67 buffer.readBytes(octects);
68 senderIp = IpAddress.valueOf(ipVersion, octects);
69 checkpoint(DecoderState.READ_SENDER_PORT);
70 case READ_SENDER_PORT:
71 senderPort = buffer.readInt();
Madan Jampani49115e92015-03-14 10:43:33 -070072 checkpoint(DecoderState.READ_MESSAGE_TYPE_LENGTH);
73 case READ_MESSAGE_TYPE_LENGTH:
74 messageTypeLength = buffer.readInt();
Madan Jampani2e5f87b2015-02-22 10:37:15 -080075 checkpoint(DecoderState.READ_MESSAGE_TYPE);
76 case READ_MESSAGE_TYPE:
Madan Jampani49115e92015-03-14 10:43:33 -070077 byte[] messageTypeBytes = new byte[messageTypeLength];
78 buffer.readBytes(messageTypeBytes);
79 messageType = new String(messageTypeBytes, Charsets.UTF_8);
Madan Jampani938aa432014-10-04 17:37:23 -070080 checkpoint(DecoderState.READ_CONTENT_LENGTH);
81 case READ_CONTENT_LENGTH:
82 contentLength = buffer.readInt();
Madan Jampani938aa432014-10-04 17:37:23 -070083 checkpoint(DecoderState.READ_CONTENT);
84 case READ_CONTENT:
Madan Jampani2e5f87b2015-02-22 10:37:15 -080085 byte[] payload = new byte[contentLength];
86 buffer.readBytes(payload);
87 InternalMessage message = new InternalMessage(
88 messageId,
89 new Endpoint(senderIp, senderPort),
90 messageType,
91 payload);
Madan Jampani938aa432014-10-04 17:37:23 -070092 out.add(message);
Madan Jampani2e5f87b2015-02-22 10:37:15 -080093 checkpoint(DecoderState.READ_MESSAGE_ID);
Madan Jampani938aa432014-10-04 17:37:23 -070094 break;
95 default:
96 checkState(false, "Must not be here");
97 }
Madan Jampaniab6d3112014-10-02 16:30:14 -070098 }
Madan Jampani29e5dfd2014-10-07 17:26:25 -070099
100 @Override
101 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
102 log.error("Exception inside channel handling pipeline.", cause);
103 context.close();
104 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700105}