blob: 149b7063a1b8c5f94124e3647ff65f2be64ce946 [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
Aaron Kruglikov1b727382016-02-09 16:17:47 -08002 * Copyright 2016 Open Networking Laboratory
Thomas Vachuska24c849c2014-10-27 09:53:05 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska24c849c2014-10-27 09:53:05 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska24c849c2014-10-27 09:53:05 -070015 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampaniab6d3112014-10-02 16:30:14 -070017
Aaron Kruglikov1b727382016-02-09 16:17:47 -080018import com.google.common.base.Charsets;
Madan Jampani86ed0552014-10-03 16:45:42 -070019import io.netty.buffer.ByteBuf;
20import io.netty.channel.ChannelHandlerContext;
21import io.netty.handler.codec.ReplayingDecoder;
Madan Jampani2e5f87b2015-02-22 10:37:15 -080022import org.onlab.packet.IpAddress;
23import org.onlab.packet.IpAddress.Version;
Madan Jampanic26eede2015-04-16 11:42:16 -070024import org.onosproject.store.cluster.messaging.Endpoint;
Madan Jampani29e5dfd2014-10-07 17:26:25 -070025import org.slf4j.Logger;
26import org.slf4j.LoggerFactory;
27
Aaron Kruglikov1b727382016-02-09 16:17:47 -080028import java.util.List;
Madan Jampani49115e92015-03-14 10:43:33 -070029
Ray Milkey9f87e512016-01-05 10:00:22 -080030import static com.google.common.base.Preconditions.checkState;
Ray Milkey9f87e512016-01-05 10:00:22 -080031
Madan Jampani938aa432014-10-04 17:37:23 -070032/**
33 * Decoder for inbound messages.
34 */
35public class MessageDecoder extends ReplayingDecoder<DecoderState> {
Madan Jampaniab6d3112014-10-02 16:30:14 -070036
Madan Jampani29e5dfd2014-10-07 17:26:25 -070037 private final Logger log = LoggerFactory.getLogger(getClass());
38
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -080039 private final int correctPreamble;
Madan Jampani2e5f87b2015-02-22 10:37:15 -080040 private long messageId;
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -080041 private int preamble;
Madan Jampani2e5f87b2015-02-22 10:37:15 -080042 private Version ipVersion;
43 private IpAddress senderIp;
44 private int senderPort;
Madan Jampani49115e92015-03-14 10:43:33 -070045 private int messageTypeLength;
46 private String messageType;
Madan Jampani938aa432014-10-04 17:37:23 -070047 private int contentLength;
48
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -080049 public MessageDecoder(int correctPreamble) {
50 super(DecoderState.READ_MESSAGE_PREAMBLE);
51 this.correctPreamble = correctPreamble;
Madan Jampaniab6d3112014-10-02 16:30:14 -070052 }
53
54 @Override
Aaron Kruglikov1b727382016-02-09 16:17:47 -080055 @SuppressWarnings("squid:S128") // suppress switch fall through warning
Madan Jampani86ed0552014-10-03 16:45:42 -070056 protected void decode(
57 ChannelHandlerContext context,
58 ByteBuf buffer,
59 List<Object> out) throws Exception {
Madan Jampaniab6d3112014-10-02 16:30:14 -070060
Yuta HIGUCHI5e8ceb42014-11-04 17:22:26 -080061 switch (state()) {
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -080062 case READ_MESSAGE_PREAMBLE:
63 preamble = buffer.readInt();
64 if (preamble != correctPreamble) {
65 throw new IllegalStateException("This message had an incorrect preamble.");
66 }
67 checkpoint(DecoderState.READ_MESSAGE_ID);
Madan Jampani2e5f87b2015-02-22 10:37:15 -080068 case READ_MESSAGE_ID:
69 messageId = buffer.readLong();
70 checkpoint(DecoderState.READ_SENDER_IP_VERSION);
71 case READ_SENDER_IP_VERSION:
72 ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
73 checkpoint(DecoderState.READ_SENDER_IP);
74 case READ_SENDER_IP:
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -080075 byte[] octets = new byte[IpAddress.byteLength(ipVersion)];
76 buffer.readBytes(octets);
77 senderIp = IpAddress.valueOf(ipVersion, octets);
Madan Jampani2e5f87b2015-02-22 10:37:15 -080078 checkpoint(DecoderState.READ_SENDER_PORT);
79 case READ_SENDER_PORT:
80 senderPort = buffer.readInt();
Madan Jampani49115e92015-03-14 10:43:33 -070081 checkpoint(DecoderState.READ_MESSAGE_TYPE_LENGTH);
82 case READ_MESSAGE_TYPE_LENGTH:
83 messageTypeLength = buffer.readInt();
Madan Jampani2e5f87b2015-02-22 10:37:15 -080084 checkpoint(DecoderState.READ_MESSAGE_TYPE);
85 case READ_MESSAGE_TYPE:
Madan Jampani49115e92015-03-14 10:43:33 -070086 byte[] messageTypeBytes = new byte[messageTypeLength];
87 buffer.readBytes(messageTypeBytes);
88 messageType = new String(messageTypeBytes, Charsets.UTF_8);
Madan Jampani938aa432014-10-04 17:37:23 -070089 checkpoint(DecoderState.READ_CONTENT_LENGTH);
90 case READ_CONTENT_LENGTH:
91 contentLength = buffer.readInt();
Madan Jampani938aa432014-10-04 17:37:23 -070092 checkpoint(DecoderState.READ_CONTENT);
93 case READ_CONTENT:
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -080094 //TODO Perform a sanity check on the size before allocating
Madan Jampani2e5f87b2015-02-22 10:37:15 -080095 byte[] payload = new byte[contentLength];
96 buffer.readBytes(payload);
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -080097 InternalMessage message = new InternalMessage(messageId,
Aaron Kruglikov1b727382016-02-09 16:17:47 -080098 new Endpoint(senderIp, senderPort),
99 messageType,
100 payload);
Madan Jampani938aa432014-10-04 17:37:23 -0700101 out.add(message);
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -0800102 checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
Madan Jampani938aa432014-10-04 17:37:23 -0700103 break;
104 default:
105 checkState(false, "Must not be here");
106 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700107 }
Madan Jampani29e5dfd2014-10-07 17:26:25 -0700108
109 @Override
110 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
111 log.error("Exception inside channel handling pipeline.", cause);
112 context.close();
113 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700114}