Added Netty based messaging. Updated cluster management to use Netty based messaging
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
index c6ebca9..98e80f7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
@@ -21,12 +21,7 @@
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.EchoMessage;
-import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
-import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.onos.store.serializers.ConnectPointSerializer;
import org.onlab.onos.store.serializers.DefaultLinkSerializer;
@@ -43,12 +38,9 @@
import org.slf4j.LoggerFactory;
import java.net.URI;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
-import static com.google.common.base.Preconditions.checkState;
-
/**
* Factory for parsing messages sent between cluster members.
*/
@@ -96,11 +88,7 @@
Link.Type.class,
- MessageSubject.class,
- HelloMessage.class,
- NewMemberMessage.class,
- LeavingMemberMessage.class,
- EchoMessage.class
+ MessageSubject.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
.register(URI.class, new URISerializer())
@@ -118,49 +106,12 @@
@Override
- public ClusterMessage decode(ByteBuffer buffer) {
- try {
- // Do we have enough bytes to read the header? If not, bail.
- if (buffer.remaining() < METADATA_LENGTH) {
- return null;
- }
-
- // Peek at the length and if we have enough to read the entire message
- // go ahead, otherwise bail.
- int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
- if (buffer.remaining() < length) {
- return null;
- }
-
- // At this point, we have enough data to read a complete message.
- long marker = buffer.getLong();
- checkState(marker == MARKER, "Incorrect message marker");
- length = buffer.getInt();
-
- // TODO: sanity checking for length
- byte[] data = new byte[length - METADATA_LENGTH];
- buffer.get(data);
- return (ClusterMessage) serializerPool.deserialize(data);
-
- } catch (Exception e) {
- // TODO: recover from exceptions by forwarding stream to next marker
- log.warn("Unable to decode message due to: " + e);
- }
- return null;
+ public Object decode(byte[] data) {
+ return serializerPool.deserialize(data);
}
@Override
- public void encode(ClusterMessage message, ByteBuffer buffer) {
- try {
- byte[] data = serializerPool.serialize(message);
- buffer.putLong(MARKER);
- buffer.putInt(data.length + METADATA_LENGTH);
- buffer.put(data);
-
- } catch (Exception e) {
- // TODO: recover from exceptions by forwarding stream to next marker
- log.warn("Unable to encode message due to: " + e);
- }
+ public byte[] encode(Object payload) {
+ return serializerPool.serialize(payload);
}
-
-}
+}
\ No newline at end of file