blob: 260d2b25601c77f2ae96019b92acbe97dad92419 [file] [log] [blame]
tom43e836a2014-09-30 01:50:29 -07001package org.onlab.onos.store.cluster.impl;
tom1d416c52014-09-29 20:55:24 -07002
tomd33e6402014-09-30 03:14:43 -07003import de.javakaffee.kryoserializers.URISerializer;
4import org.apache.felix.scr.annotations.Activate;
tom81583142014-09-30 01:40:29 -07005import org.apache.felix.scr.annotations.Component;
tomd33e6402014-09-30 03:14:43 -07006import org.apache.felix.scr.annotations.Deactivate;
tom81583142014-09-30 01:40:29 -07007import org.apache.felix.scr.annotations.Service;
tomd33e6402014-09-30 03:14:43 -07008import org.onlab.onos.cluster.ControllerNode;
9import org.onlab.onos.cluster.DefaultControllerNode;
tom81583142014-09-30 01:40:29 -070010import org.onlab.onos.cluster.NodeId;
tomd33e6402014-09-30 03:14:43 -070011import org.onlab.onos.net.ConnectPoint;
12import org.onlab.onos.net.DefaultDevice;
13import org.onlab.onos.net.DefaultLink;
14import org.onlab.onos.net.DefaultPort;
15import org.onlab.onos.net.Device;
16import org.onlab.onos.net.DeviceId;
17import org.onlab.onos.net.Element;
18import org.onlab.onos.net.Link;
19import org.onlab.onos.net.LinkKey;
20import org.onlab.onos.net.MastershipRole;
21import org.onlab.onos.net.Port;
22import org.onlab.onos.net.PortNumber;
23import org.onlab.onos.net.provider.ProviderId;
tom1d416c52014-09-29 20:55:24 -070024import org.onlab.onos.store.cluster.messaging.ClusterMessage;
tomd33e6402014-09-30 03:14:43 -070025import org.onlab.onos.store.cluster.messaging.EchoMessage;
26import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
tom81583142014-09-30 01:40:29 -070027import org.onlab.onos.store.cluster.messaging.HelloMessage;
tom1d416c52014-09-29 20:55:24 -070028import org.onlab.onos.store.cluster.messaging.MessageSubject;
29import org.onlab.onos.store.cluster.messaging.SerializationService;
tomd33e6402014-09-30 03:14:43 -070030import org.onlab.onos.store.serializers.ConnectPointSerializer;
31import org.onlab.onos.store.serializers.DefaultLinkSerializer;
32import org.onlab.onos.store.serializers.DefaultPortSerializer;
33import org.onlab.onos.store.serializers.DeviceIdSerializer;
34import org.onlab.onos.store.serializers.IpPrefixSerializer;
35import org.onlab.onos.store.serializers.LinkKeySerializer;
36import org.onlab.onos.store.serializers.NodeIdSerializer;
37import org.onlab.onos.store.serializers.PortNumberSerializer;
38import org.onlab.onos.store.serializers.ProviderIdSerializer;
tom81583142014-09-30 01:40:29 -070039import org.onlab.packet.IpPrefix;
tomd33e6402014-09-30 03:14:43 -070040import org.onlab.util.KryoPool;
tom81583142014-09-30 01:40:29 -070041import org.slf4j.Logger;
42import org.slf4j.LoggerFactory;
tom1d416c52014-09-29 20:55:24 -070043
tomd33e6402014-09-30 03:14:43 -070044import java.net.URI;
tom1d416c52014-09-29 20:55:24 -070045import java.nio.ByteBuffer;
tomd33e6402014-09-30 03:14:43 -070046import java.util.ArrayList;
47import java.util.HashMap;
tom1d416c52014-09-29 20:55:24 -070048
49import static com.google.common.base.Preconditions.checkState;
50
51/**
52 * Factory for parsing messages sent between cluster members.
53 */
tom81583142014-09-30 01:40:29 -070054@Component(immediate = true)
55@Service
tom1d416c52014-09-29 20:55:24 -070056public class MessageSerializer implements SerializationService {
57
tom81583142014-09-30 01:40:29 -070058 private final Logger log = LoggerFactory.getLogger(getClass());
59
tomd33e6402014-09-30 03:14:43 -070060 private static final int METADATA_LENGTH = 12; // 8 + 4
61 private static final int LENGTH_OFFSET = 8;
tom1d416c52014-09-29 20:55:24 -070062
63 private static final long MARKER = 0xfeedcafebeaddeadL;
64
tomd33e6402014-09-30 03:14:43 -070065 private KryoPool serializerPool;
66
67 @Activate
68 public void activate() {
69 setupKryoPool();
70 log.info("Started");
71 }
72
73 @Deactivate
74 public void deactivate() {
75 log.info("Stopped");
76 }
77
78 /**
79 * Sets up the common serialzers pool.
80 */
81 protected void setupKryoPool() {
82 // FIXME Slice out types used in common to separate pool/namespace.
83 serializerPool = KryoPool.newBuilder()
84 .register(ArrayList.class,
85 HashMap.class,
86
87 ControllerNode.State.class,
88 Device.Type.class,
89
90 DefaultControllerNode.class,
91 DefaultDevice.class,
92 MastershipRole.class,
93 Port.class,
94 Element.class,
95
96 Link.Type.class,
97
98 MessageSubject.class,
99 HelloMessage.class,
100 GoodbyeMessage.class,
101 EchoMessage.class
102 )
103 .register(IpPrefix.class, new IpPrefixSerializer())
104 .register(URI.class, new URISerializer())
105 .register(NodeId.class, new NodeIdSerializer())
106 .register(ProviderId.class, new ProviderIdSerializer())
107 .register(DeviceId.class, new DeviceIdSerializer())
108 .register(PortNumber.class, new PortNumberSerializer())
109 .register(DefaultPort.class, new DefaultPortSerializer())
110 .register(LinkKey.class, new LinkKeySerializer())
111 .register(ConnectPoint.class, new ConnectPointSerializer())
112 .register(DefaultLink.class, new DefaultLinkSerializer())
113 .build()
114 .populate(1);
115 }
116
117
tom1d416c52014-09-29 20:55:24 -0700118 @Override
119 public ClusterMessage decode(ByteBuffer buffer) {
120 try {
121 // Do we have enough bytes to read the header? If not, bail.
122 if (buffer.remaining() < METADATA_LENGTH) {
123 return null;
124 }
125
126 // Peek at the length and if we have enough to read the entire message
127 // go ahead, otherwise bail.
128 int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
129 if (buffer.remaining() < length) {
130 return null;
131 }
132
133 // At this point, we have enough data to read a complete message.
134 long marker = buffer.getLong();
135 checkState(marker == MARKER, "Incorrect message marker");
tom1d416c52014-09-29 20:55:24 -0700136 length = buffer.getInt();
137
138 // TODO: sanity checking for length
139 byte[] data = new byte[length - METADATA_LENGTH];
140 buffer.get(data);
tomd33e6402014-09-30 03:14:43 -0700141 return (ClusterMessage) serializerPool.deserialize(data);
tom1d416c52014-09-29 20:55:24 -0700142
143 } catch (Exception e) {
144 // TODO: recover from exceptions by forwarding stream to next marker
tom81583142014-09-30 01:40:29 -0700145 log.warn("Unable to decode message due to: " + e);
tom1d416c52014-09-29 20:55:24 -0700146 }
147 return null;
148 }
149
150 @Override
151 public void encode(ClusterMessage message, ByteBuffer buffer) {
152 try {
tomd33e6402014-09-30 03:14:43 -0700153 byte[] data = serializerPool.serialize(message);
tom81583142014-09-30 01:40:29 -0700154 buffer.putLong(MARKER);
tom81583142014-09-30 01:40:29 -0700155 buffer.putInt(data.length + METADATA_LENGTH);
156 buffer.put(data);
157
tom1d416c52014-09-29 20:55:24 -0700158 } catch (Exception e) {
159 // TODO: recover from exceptions by forwarding stream to next marker
tom81583142014-09-30 01:40:29 -0700160 log.warn("Unable to encode message due to: " + e);
tom1d416c52014-09-29 20:55:24 -0700161 }
162 }
163
164}