blob: c6ebca9f730535ca5d0c9a7e289b38bd7b75eaa2 [file] [log] [blame]
tom43e836a2014-09-30 01:50:29 -07001package org.onlab.onos.store.cluster.impl;
tom1d416c52014-09-29 20:55:24 -07002
tomd33e6402014-09-30 03:14:43 -07003import de.javakaffee.kryoserializers.URISerializer;
4import org.apache.felix.scr.annotations.Activate;
tom81583142014-09-30 01:40:29 -07005import org.apache.felix.scr.annotations.Component;
tomd33e6402014-09-30 03:14:43 -07006import org.apache.felix.scr.annotations.Deactivate;
tom81583142014-09-30 01:40:29 -07007import org.apache.felix.scr.annotations.Service;
tomd33e6402014-09-30 03:14:43 -07008import org.onlab.onos.cluster.ControllerNode;
9import org.onlab.onos.cluster.DefaultControllerNode;
tom81583142014-09-30 01:40:29 -070010import org.onlab.onos.cluster.NodeId;
tomd33e6402014-09-30 03:14:43 -070011import org.onlab.onos.net.ConnectPoint;
12import org.onlab.onos.net.DefaultDevice;
13import org.onlab.onos.net.DefaultLink;
14import org.onlab.onos.net.DefaultPort;
15import org.onlab.onos.net.Device;
16import org.onlab.onos.net.DeviceId;
17import org.onlab.onos.net.Element;
18import org.onlab.onos.net.Link;
19import org.onlab.onos.net.LinkKey;
20import org.onlab.onos.net.MastershipRole;
21import org.onlab.onos.net.Port;
22import org.onlab.onos.net.PortNumber;
23import org.onlab.onos.net.provider.ProviderId;
tom1d416c52014-09-29 20:55:24 -070024import org.onlab.onos.store.cluster.messaging.ClusterMessage;
tomd33e6402014-09-30 03:14:43 -070025import org.onlab.onos.store.cluster.messaging.EchoMessage;
tom28e1fa22014-09-30 10:38:21 -070026import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
tom81583142014-09-30 01:40:29 -070027import org.onlab.onos.store.cluster.messaging.HelloMessage;
tom1d416c52014-09-29 20:55:24 -070028import org.onlab.onos.store.cluster.messaging.MessageSubject;
tom28e1fa22014-09-30 10:38:21 -070029import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
tom1d416c52014-09-29 20:55:24 -070030import org.onlab.onos.store.cluster.messaging.SerializationService;
tomd33e6402014-09-30 03:14:43 -070031import org.onlab.onos.store.serializers.ConnectPointSerializer;
32import org.onlab.onos.store.serializers.DefaultLinkSerializer;
33import org.onlab.onos.store.serializers.DefaultPortSerializer;
34import org.onlab.onos.store.serializers.DeviceIdSerializer;
35import org.onlab.onos.store.serializers.IpPrefixSerializer;
36import org.onlab.onos.store.serializers.LinkKeySerializer;
37import org.onlab.onos.store.serializers.NodeIdSerializer;
38import org.onlab.onos.store.serializers.PortNumberSerializer;
39import org.onlab.onos.store.serializers.ProviderIdSerializer;
tom81583142014-09-30 01:40:29 -070040import org.onlab.packet.IpPrefix;
tomd33e6402014-09-30 03:14:43 -070041import org.onlab.util.KryoPool;
tom81583142014-09-30 01:40:29 -070042import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
tom1d416c52014-09-29 20:55:24 -070044
tomd33e6402014-09-30 03:14:43 -070045import java.net.URI;
tom1d416c52014-09-29 20:55:24 -070046import java.nio.ByteBuffer;
tomd33e6402014-09-30 03:14:43 -070047import java.util.ArrayList;
48import java.util.HashMap;
tom1d416c52014-09-29 20:55:24 -070049
50import static com.google.common.base.Preconditions.checkState;
51
52/**
53 * Factory for parsing messages sent between cluster members.
54 */
tom81583142014-09-30 01:40:29 -070055@Component(immediate = true)
56@Service
tom1d416c52014-09-29 20:55:24 -070057public class MessageSerializer implements SerializationService {
58
tom81583142014-09-30 01:40:29 -070059 private final Logger log = LoggerFactory.getLogger(getClass());
60
tomd33e6402014-09-30 03:14:43 -070061 private static final int METADATA_LENGTH = 12; // 8 + 4
62 private static final int LENGTH_OFFSET = 8;
tom1d416c52014-09-29 20:55:24 -070063
64 private static final long MARKER = 0xfeedcafebeaddeadL;
65
tomd33e6402014-09-30 03:14:43 -070066 private KryoPool serializerPool;
67
68 @Activate
69 public void activate() {
70 setupKryoPool();
71 log.info("Started");
72 }
73
74 @Deactivate
75 public void deactivate() {
76 log.info("Stopped");
77 }
78
79 /**
80 * Sets up the common serialzers pool.
81 */
82 protected void setupKryoPool() {
83 // FIXME Slice out types used in common to separate pool/namespace.
84 serializerPool = KryoPool.newBuilder()
85 .register(ArrayList.class,
86 HashMap.class,
87
88 ControllerNode.State.class,
89 Device.Type.class,
90
91 DefaultControllerNode.class,
92 DefaultDevice.class,
93 MastershipRole.class,
94 Port.class,
95 Element.class,
96
97 Link.Type.class,
98
99 MessageSubject.class,
100 HelloMessage.class,
tom28e1fa22014-09-30 10:38:21 -0700101 NewMemberMessage.class,
102 LeavingMemberMessage.class,
tomd33e6402014-09-30 03:14:43 -0700103 EchoMessage.class
104 )
105 .register(IpPrefix.class, new IpPrefixSerializer())
106 .register(URI.class, new URISerializer())
107 .register(NodeId.class, new NodeIdSerializer())
108 .register(ProviderId.class, new ProviderIdSerializer())
109 .register(DeviceId.class, new DeviceIdSerializer())
110 .register(PortNumber.class, new PortNumberSerializer())
111 .register(DefaultPort.class, new DefaultPortSerializer())
112 .register(LinkKey.class, new LinkKeySerializer())
113 .register(ConnectPoint.class, new ConnectPointSerializer())
114 .register(DefaultLink.class, new DefaultLinkSerializer())
115 .build()
116 .populate(1);
117 }
118
119
tom1d416c52014-09-29 20:55:24 -0700120 @Override
121 public ClusterMessage decode(ByteBuffer buffer) {
122 try {
123 // Do we have enough bytes to read the header? If not, bail.
124 if (buffer.remaining() < METADATA_LENGTH) {
125 return null;
126 }
127
128 // Peek at the length and if we have enough to read the entire message
129 // go ahead, otherwise bail.
130 int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
131 if (buffer.remaining() < length) {
132 return null;
133 }
134
135 // At this point, we have enough data to read a complete message.
136 long marker = buffer.getLong();
137 checkState(marker == MARKER, "Incorrect message marker");
tom1d416c52014-09-29 20:55:24 -0700138 length = buffer.getInt();
139
140 // TODO: sanity checking for length
141 byte[] data = new byte[length - METADATA_LENGTH];
142 buffer.get(data);
tomd33e6402014-09-30 03:14:43 -0700143 return (ClusterMessage) serializerPool.deserialize(data);
tom1d416c52014-09-29 20:55:24 -0700144
145 } catch (Exception e) {
146 // TODO: recover from exceptions by forwarding stream to next marker
tom81583142014-09-30 01:40:29 -0700147 log.warn("Unable to decode message due to: " + e);
tom1d416c52014-09-29 20:55:24 -0700148 }
149 return null;
150 }
151
152 @Override
153 public void encode(ClusterMessage message, ByteBuffer buffer) {
154 try {
tomd33e6402014-09-30 03:14:43 -0700155 byte[] data = serializerPool.serialize(message);
tom81583142014-09-30 01:40:29 -0700156 buffer.putLong(MARKER);
tom81583142014-09-30 01:40:29 -0700157 buffer.putInt(data.length + METADATA_LENGTH);
158 buffer.put(data);
159
tom1d416c52014-09-29 20:55:24 -0700160 } catch (Exception e) {
161 // TODO: recover from exceptions by forwarding stream to next marker
tom81583142014-09-30 01:40:29 -0700162 log.warn("Unable to encode message due to: " + e);
tom1d416c52014-09-29 20:55:24 -0700163 }
164 }
165
166}