blob: 4b9ef6c6bcf9f01c5e38c40d6dc3429f792ced6c [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Thomas Vachuska24c849c2014-10-27 09:53:05 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska24c849c2014-10-27 09:53:05 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska24c849c2014-10-27 09:53:05 -070015 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampaniab6d3112014-10-02 16:30:14 -070017
Aaron Kruglikov1b727382016-02-09 16:17:47 -080018import com.google.common.base.Charsets;
Madan Jampania9e70a62016-03-02 16:28:18 -080019
Madan Jampani86ed0552014-10-03 16:45:42 -070020import io.netty.buffer.ByteBuf;
21import io.netty.channel.ChannelHandlerContext;
22import io.netty.handler.codec.ReplayingDecoder;
Madan Jampania9e70a62016-03-02 16:28:18 -080023
Madan Jampani2e5f87b2015-02-22 10:37:15 -080024import org.onlab.packet.IpAddress;
25import org.onlab.packet.IpAddress.Version;
Madan Jampani05833872016-07-12 23:01:39 -070026import org.onosproject.core.HybridLogicalTime;
Madan Jampanic26eede2015-04-16 11:42:16 -070027import org.onosproject.store.cluster.messaging.Endpoint;
Madan Jampani29e5dfd2014-10-07 17:26:25 -070028import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
Aaron Kruglikov1b727382016-02-09 16:17:47 -080031import java.util.List;
Madan Jampani49115e92015-03-14 10:43:33 -070032
Ray Milkey9f87e512016-01-05 10:00:22 -080033import static com.google.common.base.Preconditions.checkState;
Ray Milkey9f87e512016-01-05 10:00:22 -080034
Madan Jampani938aa432014-10-04 17:37:23 -070035/**
36 * Decoder for inbound messages.
37 */
38public class MessageDecoder extends ReplayingDecoder<DecoderState> {
Madan Jampaniab6d3112014-10-02 16:30:14 -070039
Madan Jampani29e5dfd2014-10-07 17:26:25 -070040 private final Logger log = LoggerFactory.getLogger(getClass());
41
Madan Jampani2e5f87b2015-02-22 10:37:15 -080042 private Version ipVersion;
43 private IpAddress senderIp;
44 private int senderPort;
Jordan Haltermane3813a92017-07-29 14:10:31 -070045
46 private InternalMessage.Type type;
47 private int preamble;
48 private long logicalTime;
49 private long logicalCounter;
50 private long messageId;
Madan Jampani938aa432014-10-04 17:37:23 -070051 private int contentLength;
Jordan Haltermane3813a92017-07-29 14:10:31 -070052 private byte[] content;
53 private int subjectLength;
54 private String subject;
55 private InternalReply.Status status;
Madan Jampani938aa432014-10-04 17:37:23 -070056
Madan Jampanib825aeb2016-04-01 15:18:25 -070057 public MessageDecoder() {
Jordan Haltermane3813a92017-07-29 14:10:31 -070058 super(DecoderState.READ_SENDER_IP_VERSION);
Madan Jampaniab6d3112014-10-02 16:30:14 -070059 }
60
61 @Override
Aaron Kruglikov1b727382016-02-09 16:17:47 -080062 @SuppressWarnings("squid:S128") // suppress switch fall through warning
Madan Jampani86ed0552014-10-03 16:45:42 -070063 protected void decode(
64 ChannelHandlerContext context,
65 ByteBuf buffer,
66 List<Object> out) throws Exception {
Madan Jampaniab6d3112014-10-02 16:30:14 -070067
Yuta HIGUCHI5e8ceb42014-11-04 17:22:26 -080068 switch (state()) {
Jordan Haltermane3813a92017-07-29 14:10:31 -070069 case READ_SENDER_IP_VERSION:
70 ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
71 checkpoint(DecoderState.READ_SENDER_IP);
Ray Milkey7dac7da2017-08-01 16:56:05 -070072 // FALLTHROUGH
Jordan Haltermane3813a92017-07-29 14:10:31 -070073 case READ_SENDER_IP:
74 byte[] octets = new byte[IpAddress.byteLength(ipVersion)];
75 buffer.readBytes(octets);
76 senderIp = IpAddress.valueOf(ipVersion, octets);
77 checkpoint(DecoderState.READ_SENDER_PORT);
Ray Milkey7dac7da2017-08-01 16:56:05 -070078 // FALLTHROUGH
Jordan Haltermane3813a92017-07-29 14:10:31 -070079 case READ_SENDER_PORT:
80 senderPort = buffer.readInt();
81 checkpoint(DecoderState.READ_TYPE);
Ray Milkey7dac7da2017-08-01 16:56:05 -070082 // FALLTHROUGH
Jordan Haltermane3813a92017-07-29 14:10:31 -070083 case READ_TYPE:
84 type = InternalMessage.Type.forId(buffer.readByte());
85 checkpoint(DecoderState.READ_PREAMBLE);
Ray Milkey7dac7da2017-08-01 16:56:05 -070086 // FALLTHROUGH
Jordan Haltermane3813a92017-07-29 14:10:31 -070087 case READ_PREAMBLE:
88 preamble = buffer.readInt();
89 checkpoint(DecoderState.READ_LOGICAL_TIME);
Ray Milkey7dac7da2017-08-01 16:56:05 -070090 // FALLTHROUGH
Jordan Haltermane3813a92017-07-29 14:10:31 -070091 case READ_LOGICAL_TIME:
92 logicalTime = buffer.readLong();
93 checkpoint(DecoderState.READ_LOGICAL_COUNTER);
Ray Milkey7dac7da2017-08-01 16:56:05 -070094 // FALLTHROUGH
Jordan Haltermane3813a92017-07-29 14:10:31 -070095 case READ_LOGICAL_COUNTER:
96 logicalCounter = buffer.readLong();
97 checkpoint(DecoderState.READ_MESSAGE_ID);
Ray Milkey7dac7da2017-08-01 16:56:05 -070098 // FALLTHROUGH
Jordan Haltermane3813a92017-07-29 14:10:31 -070099 case READ_MESSAGE_ID:
100 messageId = buffer.readLong();
101 checkpoint(DecoderState.READ_CONTENT_LENGTH);
Ray Milkey7dac7da2017-08-01 16:56:05 -0700102 // FALLTHROUGH
Jordan Haltermane3813a92017-07-29 14:10:31 -0700103 case READ_CONTENT_LENGTH:
104 contentLength = buffer.readInt();
105 checkpoint(DecoderState.READ_CONTENT);
Ray Milkey7dac7da2017-08-01 16:56:05 -0700106 // FALLTHROUGH
Jordan Haltermane3813a92017-07-29 14:10:31 -0700107 case READ_CONTENT:
108 if (contentLength > 0) {
109 //TODO Perform a sanity check on the size before allocating
110 content = new byte[contentLength];
111 buffer.readBytes(content);
112 } else {
113 content = new byte[0];
114 }
115
116 switch (type) {
117 case REQUEST:
118 checkpoint(DecoderState.READ_SUBJECT_LENGTH);
119 break;
120 case REPLY:
121 checkpoint(DecoderState.READ_STATUS);
122 break;
123 default:
124 checkState(false, "Must not be here");
125 }
126 break;
127 default:
128 break;
129 }
130
131 switch (type) {
132 case REQUEST:
133 switch (state()) {
134 case READ_SUBJECT_LENGTH:
135 subjectLength = buffer.readShort();
136 checkpoint(DecoderState.READ_SUBJECT);
Ray Milkey7dac7da2017-08-01 16:56:05 -0700137 // FALLTHROUGH
Jordan Haltermane3813a92017-07-29 14:10:31 -0700138 case READ_SUBJECT:
139 byte[] messageTypeBytes = new byte[subjectLength];
140 buffer.readBytes(messageTypeBytes);
141 subject = new String(messageTypeBytes, Charsets.UTF_8);
142 InternalRequest message = new InternalRequest(preamble,
143 new HybridLogicalTime(logicalTime, logicalCounter),
144 messageId,
145 new Endpoint(senderIp, senderPort),
146 subject,
147 content);
148 out.add(message);
149 checkpoint(DecoderState.READ_TYPE);
150 break;
151 default:
152 break;
153 }
154 break;
155 case REPLY:
156 switch (state()) {
157 case READ_STATUS:
158 status = InternalReply.Status.forId(buffer.readByte());
159 InternalReply message = new InternalReply(preamble,
160 new HybridLogicalTime(logicalTime, logicalCounter),
161 messageId,
162 content,
163 status);
164 out.add(message);
165 checkpoint(DecoderState.READ_TYPE);
166 break;
167 default:
168 break;
169 }
170 break;
171 default:
172 checkState(false, "Must not be here");
Madan Jampani938aa432014-10-04 17:37:23 -0700173 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700174 }
Madan Jampani29e5dfd2014-10-07 17:26:25 -0700175
176 @Override
177 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
178 log.error("Exception inside channel handling pipeline.", cause);
179 context.close();
180 }
Jordan Haltermane3813a92017-07-29 14:10:31 -0700181}