blob: 367193a421132756d2247c4854f6b178dc3b6b0c [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Thomas Vachuska24c849c2014-10-27 09:53:05 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska24c849c2014-10-27 09:53:05 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska24c849c2014-10-27 09:53:05 -070015 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampaniab6d3112014-10-02 16:30:14 -070017
Aaron Kruglikov1b727382016-02-09 16:17:47 -080018import com.google.common.base.Charsets;
Madan Jampania9e70a62016-03-02 16:28:18 -080019
Madan Jampani86ed0552014-10-03 16:45:42 -070020import io.netty.buffer.ByteBuf;
21import io.netty.channel.ChannelHandlerContext;
22import io.netty.handler.codec.ReplayingDecoder;
Madan Jampania9e70a62016-03-02 16:28:18 -080023
Madan Jampani2e5f87b2015-02-22 10:37:15 -080024import org.onlab.packet.IpAddress;
25import org.onlab.packet.IpAddress.Version;
Madan Jampani05833872016-07-12 23:01:39 -070026import org.onosproject.core.HybridLogicalTime;
Madan Jampanic26eede2015-04-16 11:42:16 -070027import org.onosproject.store.cluster.messaging.Endpoint;
Madan Jampania9e70a62016-03-02 16:28:18 -080028import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
Madan Jampani29e5dfd2014-10-07 17:26:25 -070029import org.slf4j.Logger;
30import org.slf4j.LoggerFactory;
31
Aaron Kruglikov1b727382016-02-09 16:17:47 -080032import java.util.List;
Madan Jampani49115e92015-03-14 10:43:33 -070033
Ray Milkey9f87e512016-01-05 10:00:22 -080034import static com.google.common.base.Preconditions.checkState;
Ray Milkey9f87e512016-01-05 10:00:22 -080035
Madan Jampani938aa432014-10-04 17:37:23 -070036/**
37 * Decoder for inbound messages.
38 */
39public class MessageDecoder extends ReplayingDecoder<DecoderState> {
Madan Jampaniab6d3112014-10-02 16:30:14 -070040
Madan Jampani29e5dfd2014-10-07 17:26:25 -070041 private final Logger log = LoggerFactory.getLogger(getClass());
42
Madan Jampani05833872016-07-12 23:01:39 -070043 private long logicalTime;
44 private long logicalCounter;
Madan Jampani2e5f87b2015-02-22 10:37:15 -080045 private long messageId;
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -080046 private int preamble;
Madan Jampani2e5f87b2015-02-22 10:37:15 -080047 private Version ipVersion;
48 private IpAddress senderIp;
49 private int senderPort;
Madan Jampani49115e92015-03-14 10:43:33 -070050 private int messageTypeLength;
51 private String messageType;
Madan Jampania9e70a62016-03-02 16:28:18 -080052 private Status status;
Madan Jampani938aa432014-10-04 17:37:23 -070053 private int contentLength;
54
Madan Jampanib825aeb2016-04-01 15:18:25 -070055 public MessageDecoder() {
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -080056 super(DecoderState.READ_MESSAGE_PREAMBLE);
Madan Jampaniab6d3112014-10-02 16:30:14 -070057 }
58
59 @Override
Aaron Kruglikov1b727382016-02-09 16:17:47 -080060 @SuppressWarnings("squid:S128") // suppress switch fall through warning
Madan Jampani86ed0552014-10-03 16:45:42 -070061 protected void decode(
62 ChannelHandlerContext context,
63 ByteBuf buffer,
64 List<Object> out) throws Exception {
Madan Jampaniab6d3112014-10-02 16:30:14 -070065
Yuta HIGUCHI5e8ceb42014-11-04 17:22:26 -080066 switch (state()) {
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -080067 case READ_MESSAGE_PREAMBLE:
68 preamble = buffer.readInt();
Madan Jampani05833872016-07-12 23:01:39 -070069 checkpoint(DecoderState.READ_LOGICAL_TIME);
70 case READ_LOGICAL_TIME:
71 logicalTime = buffer.readLong();
72 checkpoint(DecoderState.READ_LOGICAL_COUNTER);
73 case READ_LOGICAL_COUNTER:
74 logicalCounter = buffer.readLong();
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -080075 checkpoint(DecoderState.READ_MESSAGE_ID);
Madan Jampani2e5f87b2015-02-22 10:37:15 -080076 case READ_MESSAGE_ID:
77 messageId = buffer.readLong();
78 checkpoint(DecoderState.READ_SENDER_IP_VERSION);
79 case READ_SENDER_IP_VERSION:
80 ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
81 checkpoint(DecoderState.READ_SENDER_IP);
82 case READ_SENDER_IP:
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -080083 byte[] octets = new byte[IpAddress.byteLength(ipVersion)];
84 buffer.readBytes(octets);
85 senderIp = IpAddress.valueOf(ipVersion, octets);
Madan Jampani2e5f87b2015-02-22 10:37:15 -080086 checkpoint(DecoderState.READ_SENDER_PORT);
87 case READ_SENDER_PORT:
88 senderPort = buffer.readInt();
Madan Jampani49115e92015-03-14 10:43:33 -070089 checkpoint(DecoderState.READ_MESSAGE_TYPE_LENGTH);
90 case READ_MESSAGE_TYPE_LENGTH:
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070091 messageTypeLength = buffer.readShort();
Madan Jampani2e5f87b2015-02-22 10:37:15 -080092 checkpoint(DecoderState.READ_MESSAGE_TYPE);
93 case READ_MESSAGE_TYPE:
Madan Jampani49115e92015-03-14 10:43:33 -070094 byte[] messageTypeBytes = new byte[messageTypeLength];
95 buffer.readBytes(messageTypeBytes);
96 messageType = new String(messageTypeBytes, Charsets.UTF_8);
Madan Jampania9e70a62016-03-02 16:28:18 -080097 checkpoint(DecoderState.READ_MESSAGE_STATUS);
98 case READ_MESSAGE_STATUS:
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070099 int statusId = buffer.readByte();
100 if (statusId == -1) {
101 status = null;
102 } else {
103 status = Status.forId(statusId);
104 }
Madan Jampani938aa432014-10-04 17:37:23 -0700105 checkpoint(DecoderState.READ_CONTENT_LENGTH);
106 case READ_CONTENT_LENGTH:
107 contentLength = buffer.readInt();
Madan Jampani938aa432014-10-04 17:37:23 -0700108 checkpoint(DecoderState.READ_CONTENT);
109 case READ_CONTENT:
Madan Jampania9e70a62016-03-02 16:28:18 -0800110 byte[] payload;
111 if (contentLength > 0) {
112 //TODO Perform a sanity check on the size before allocating
113 payload = new byte[contentLength];
114 buffer.readBytes(payload);
115 } else {
116 payload = new byte[0];
117 }
Madan Jampanib825aeb2016-04-01 15:18:25 -0700118 InternalMessage message = new InternalMessage(preamble,
Madan Jampani05833872016-07-12 23:01:39 -0700119 new HybridLogicalTime(logicalTime, logicalCounter),
Madan Jampanib825aeb2016-04-01 15:18:25 -0700120 messageId,
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800121 new Endpoint(senderIp, senderPort),
122 messageType,
Madan Jampania9e70a62016-03-02 16:28:18 -0800123 payload,
124 status);
Madan Jampani938aa432014-10-04 17:37:23 -0700125 out.add(message);
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -0800126 checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
Madan Jampani938aa432014-10-04 17:37:23 -0700127 break;
128 default:
129 checkState(false, "Must not be here");
130 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700131 }
Madan Jampani29e5dfd2014-10-07 17:26:25 -0700132
133 @Override
134 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
135 log.error("Exception inside channel handling pipeline.", cause);
136 context.close();
137 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700138}