blob: 706480105e4190e6cdec267943db89a8e49dd673 [file] [log] [blame]
tom1d416c52014-09-29 20:55:24 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
tom81583142014-09-30 01:40:29 -07003import org.apache.felix.scr.annotations.Component;
4import org.apache.felix.scr.annotations.Service;
5import org.onlab.onos.cluster.NodeId;
tom1d416c52014-09-29 20:55:24 -07006import org.onlab.onos.store.cluster.messaging.ClusterMessage;
tom81583142014-09-30 01:40:29 -07007import org.onlab.onos.store.cluster.messaging.HelloMessage;
tom1d416c52014-09-29 20:55:24 -07008import org.onlab.onos.store.cluster.messaging.MessageSubject;
9import org.onlab.onos.store.cluster.messaging.SerializationService;
tom81583142014-09-30 01:40:29 -070010import org.onlab.packet.IpPrefix;
11import org.slf4j.Logger;
12import org.slf4j.LoggerFactory;
tom1d416c52014-09-29 20:55:24 -070013
14import java.nio.ByteBuffer;
15
16import static com.google.common.base.Preconditions.checkState;
17
18/**
19 * Factory for parsing messages sent between cluster members.
20 */
tom81583142014-09-30 01:40:29 -070021@Component(immediate = true)
22@Service
tom1d416c52014-09-29 20:55:24 -070023public class MessageSerializer implements SerializationService {
24
tom81583142014-09-30 01:40:29 -070025 private final Logger log = LoggerFactory.getLogger(getClass());
26
tom1d416c52014-09-29 20:55:24 -070027 private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
28 private static final int LENGTH_OFFSET = 12;
29
30 private static final long MARKER = 0xfeedcafebeaddeadL;
31
32 @Override
33 public ClusterMessage decode(ByteBuffer buffer) {
34 try {
35 // Do we have enough bytes to read the header? If not, bail.
36 if (buffer.remaining() < METADATA_LENGTH) {
37 return null;
38 }
39
40 // Peek at the length and if we have enough to read the entire message
41 // go ahead, otherwise bail.
42 int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
43 if (buffer.remaining() < length) {
44 return null;
45 }
46
47 // At this point, we have enough data to read a complete message.
48 long marker = buffer.getLong();
49 checkState(marker == MARKER, "Incorrect message marker");
50
51 int subjectOrdinal = buffer.getInt();
52 MessageSubject subject = MessageSubject.values()[subjectOrdinal];
53 length = buffer.getInt();
54
55 // TODO: sanity checking for length
56 byte[] data = new byte[length - METADATA_LENGTH];
57 buffer.get(data);
58
59 // TODO: add deserialization hook here; for now this hack
tom81583142014-09-30 01:40:29 -070060 String[] fields = new String(data).split(":");
61 return new HelloMessage(new NodeId(fields[0]), IpPrefix.valueOf(fields[1]), Integer.parseInt(fields[2]));
tom1d416c52014-09-29 20:55:24 -070062
63 } catch (Exception e) {
64 // TODO: recover from exceptions by forwarding stream to next marker
tom81583142014-09-30 01:40:29 -070065 log.warn("Unable to decode message due to: " + e);
tom1d416c52014-09-29 20:55:24 -070066 }
67 return null;
68 }
69
70 @Override
71 public void encode(ClusterMessage message, ByteBuffer buffer) {
72 try {
tom81583142014-09-30 01:40:29 -070073 HelloMessage helloMessage = (HelloMessage) message;
74 buffer.putLong(MARKER);
75 buffer.putInt(message.subject().ordinal());
76
77 String str = helloMessage.nodeId() + ":" + helloMessage.ipAddress() + ":" + helloMessage.tcpPort();
78 byte[] data = str.getBytes();
79 buffer.putInt(data.length + METADATA_LENGTH);
80 buffer.put(data);
81
tom1d416c52014-09-29 20:55:24 -070082 } catch (Exception e) {
83 // TODO: recover from exceptions by forwarding stream to next marker
tom81583142014-09-30 01:40:29 -070084 log.warn("Unable to encode message due to: " + e);
tom1d416c52014-09-29 20:55:24 -070085 }
86 }
87
88}