Merge remote-tracking branch 'origin/master'
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
index dceb7c6..2bdf5a0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
@@ -11,7 +11,7 @@
 
     private final NodeId sender;
     private final MessageSubject subject;
-    private final Object payload;
+    private final byte[] payload;
     // TODO: add field specifying Serializer for payload
 
     /**
@@ -19,7 +19,7 @@
      *
      * @param subject message subject
      */
-    public ClusterMessage(NodeId sender, MessageSubject subject, Object payload) {
+    public ClusterMessage(NodeId sender, MessageSubject subject, byte[] payload) {
         this.sender = sender;
         this.subject = subject;
         this.payload = payload;
@@ -48,7 +48,7 @@
      *
      * @return message payload.
      */
-    public Object payload() {
+    public byte[] payload() {
         return payload;
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 98be0b1..babe4d3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -23,6 +23,9 @@
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
 import org.onlab.netty.Endpoint;
 import org.onlab.netty.Message;
 import org.onlab.netty.MessageHandler;
@@ -48,6 +51,18 @@
     //@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private MessagingService messagingService;
 
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        protected void setupKryoPool() {
+            serializerPool = KryoPool.newBuilder()
+                    .register(KryoPoolUtil.API)
+                    .register(ClusterMessage.class)
+                    .register(ClusterMembershipEvent.class)
+                    .build()
+                    .populate(1);
+        }
+
+    };
+
     @Activate
     public void activate() {
         // TODO: initialize messagingService
@@ -92,7 +107,7 @@
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
         try {
-            messagingService.sendAsync(nodeEp, message.subject().value(), message);
+            messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
             return true;
         } catch (IOException e) {
             log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
@@ -126,7 +141,7 @@
         broadcast(new ClusterMessage(
                 localNode.id(),
                 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
-                new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
+                SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
         members.remove(node.id());
     }
 
@@ -138,7 +153,7 @@
             broadcast(new ClusterMessage(
                 localNode.id(),
                 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
-                new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
+                SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
         }
     }
 
@@ -147,7 +162,7 @@
         @Override
         public void handle(ClusterMessage message) {
 
-            ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
+            ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
             ControllerNode node = event.node();
             if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
                 log.info("Node {} sent a hearbeat", node.id());
@@ -172,7 +187,8 @@
 
         @Override
         public void handle(Message message) {
-            handler.handle((ClusterMessage) message.payload());
+            ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
+            handler.handle(clusterMessage);
         }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index ab9ae3c..2f1e504 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -13,6 +13,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.net.AnnotationsUtil;
 import org.onlab.onos.net.DefaultAnnotations;
 import org.onlab.onos.net.DefaultDevice;
@@ -37,7 +38,11 @@
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
 import org.onlab.onos.store.common.impl.Timestamped;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
 import org.onlab.util.NewConcurrentHashMap;
 import org.slf4j.Logger;
 
@@ -104,6 +109,24 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        protected void setupKryoPool() {
+            serializerPool = KryoPool.newBuilder()
+                    .register(KryoPoolUtil.API)
+                    .register(InternalDeviceEvent.class)
+                    .register(InternalPortEvent.class)
+                    .register(InternalPortStatusEvent.class)
+                    .register(Timestamped.class)
+                    .register(MastershipBasedTimestamp.class)
+                    .build()
+                    .populate(1);
+        }
+
+    };
+
     @Activate
     public void activate() {
         clusterCommunicator.addSubscriber(
@@ -779,17 +802,26 @@
     }
 
     private void notifyPeers(InternalDeviceEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
+                SERIALIZER.encode(event));
         clusterCommunicator.broadcast(message);
     }
 
     private void notifyPeers(InternalPortEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.PORT_UPDATE,
+                SERIALIZER.encode(event));
         clusterCommunicator.broadcast(message);
     }
 
     private void notifyPeers(InternalPortStatusEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
+                SERIALIZER.encode(event));
         clusterCommunicator.broadcast(message);
     }
 
@@ -797,7 +829,7 @@
         @Override
         public void handle(ClusterMessage message) {
             log.info("Received device update event from peer: {}", message.sender());
-            InternalDeviceEvent event = (InternalDeviceEvent) message.payload();
+            InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
             Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
@@ -810,7 +842,7 @@
         public void handle(ClusterMessage message) {
 
             log.info("Received port update event from peer: {}", message.sender());
-            InternalPortEvent event = (InternalPortEvent) message.payload();
+            InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
 
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
@@ -825,7 +857,7 @@
         public void handle(ClusterMessage message) {
 
             log.info("Received port status update event from peer: {}", message.sender());
-            InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload();
+            InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
 
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
index dbd88c3..f4dadad 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
@@ -17,27 +17,20 @@
     }
 
     @Override
-    public void write(Kryo kryo, Output output, ClusterMessage object) {
-        kryo.writeClassAndObject(output, object.sender());
-        kryo.writeClassAndObject(output, object.subject());
-        // TODO: write bytes serialized using ClusterMessage specified serializer
-        // write serialized payload size
-        //output.writeInt(...);
-        // write serialized payload
-        //output.writeBytes(...);
+    public void write(Kryo kryo, Output output, ClusterMessage message) {
+        kryo.writeClassAndObject(output, message.sender());
+        kryo.writeClassAndObject(output, message.subject());
+        output.writeInt(message.payload().length);
+        output.writeBytes(message.payload());
     }
 
     @Override
     public ClusterMessage read(Kryo kryo, Input input,
                                Class<ClusterMessage> type) {
-        // TODO Auto-generated method stub
         NodeId sender = (NodeId) kryo.readClassAndObject(input);
         MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input);
-        int size = input.readInt();
-        byte[] payloadBytes = input.readBytes(size);
-        // TODO: deserialize payload using ClusterMessage specified serializer
-        Object payload = null;
+        int payloadSize = input.readInt();
+        byte[] payload = input.readBytes(payloadSize);
         return new ClusterMessage(sender, subject, payload);
     }
-
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
index 361b071..fa42a6b 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
@@ -20,6 +20,11 @@
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.ControllerNode.State;
+import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.MastershipTerm;
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.net.Annotations;
@@ -42,6 +47,7 @@
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.packet.IpPrefix;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -111,8 +117,9 @@
         deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
 
         ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
+        ClusterService clusterService = new TestClusterService();
 
-        gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterCommunicator);
+        gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator);
         gossipDeviceStore.activate();
         deviceStore = gossipDeviceStore;
     }
@@ -548,8 +555,12 @@
 
     private static final class TestGossipDeviceStore extends GossipDeviceStore {
 
-        public TestGossipDeviceStore(ClockService clockService, ClusterCommunicationService clusterCommunicator) {
+        public TestGossipDeviceStore(
+                ClockService clockService,
+                ClusterService clusterService,
+                ClusterCommunicationService clusterCommunicator) {
             this.clockService = clockService;
+            this.clusterService = clusterService;
             this.clusterCommunicator = clusterCommunicator;
         }
     }
@@ -564,4 +575,45 @@
         @Override
         public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
     }
+
+    private static final class TestClusterService implements ClusterService {
+
+        private static final ControllerNode ONOS1 =
+            new DefaultControllerNode(new NodeId("N1"), IpPrefix.valueOf("127.0.0.1"));
+        private final Map<NodeId, ControllerNode> nodes = new HashMap<>();
+        private final Map<NodeId, ControllerNode.State> nodeStates = new HashMap<>();
+
+        public TestClusterService() {
+            nodes.put(new NodeId("N1"), ONOS1);
+            nodeStates.put(new NodeId("N1"), ControllerNode.State.ACTIVE);
+        }
+
+        @Override
+        public ControllerNode getLocalNode() {
+            return ONOS1;
+        }
+
+        @Override
+        public Set<ControllerNode> getNodes() {
+            return Sets.newHashSet(nodes.values());
+        }
+
+        @Override
+        public ControllerNode getNode(NodeId nodeId) {
+            return nodes.get(nodeId);
+        }
+
+        @Override
+        public State getState(NodeId nodeId) {
+            return nodeStates.get(nodeId);
+        }
+
+        @Override
+        public void addListener(ClusterEventListener listener) {
+        }
+
+        @Override
+        public void removeListener(ClusterEventListener listener) {
+        }
+    }
 }
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
index f1a12fe..0c33cfe 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
@@ -2,6 +2,7 @@
 
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 
 import org.onlab.onos.cluster.ControllerNode;
@@ -21,6 +22,8 @@
 import org.onlab.onos.net.MastershipRole;
 import org.onlab.onos.net.Port;
 import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DefaultPortDescription;
 import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
@@ -47,6 +50,7 @@
             .register(
                     //
                     ArrayList.class,
+                    Arrays.asList().getClass(),
                     HashMap.class,
                     //
                     ControllerNode.State.class,
@@ -54,8 +58,10 @@
                     DefaultAnnotations.class,
                     DefaultControllerNode.class,
                     DefaultDevice.class,
+                    DefaultDeviceDescription.class,
                     MastershipRole.class,
                     Port.class,
+                    DefaultPortDescription.class,
                     Element.class,
                     Link.Type.class
                     )
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
index 19517f3..93ee854 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
@@ -12,7 +12,7 @@
 public class KryoSerializer implements Serializer {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private KryoPool serializerPool;
+    protected KryoPool serializerPool;
 
 
     public KryoSerializer() {
diff --git a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
index f4024a4..1772a3c 100644
--- a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
+++ b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
@@ -8,16 +8,15 @@
  * This class provides a base implementation of Response, with methods to retrieve the
  * result and query to see if the result is ready. The result can only be retrieved when
  * it is ready and the get methods will block if the result is not ready yet.
- * @param <T> type of response.
  */
-public class AsyncResponse<T> implements Response<T> {
+public class AsyncResponse implements Response {
 
-    private T value;
+    private byte[] value;
     private boolean done = false;
     private final long start = System.nanoTime();
 
     @Override
-    public T get(long timeout, TimeUnit timeUnit) throws TimeoutException {
+    public byte[] get(long timeout, TimeUnit timeUnit) throws TimeoutException {
         timeout = timeUnit.toNanos(timeout);
         boolean interrupted = false;
         try {
@@ -43,7 +42,7 @@
     }
 
     @Override
-    public T get() throws InterruptedException {
+    public byte[] get() throws InterruptedException {
         throw new UnsupportedOperationException();
     }
 
@@ -57,11 +56,10 @@
      * available.
      * @param data response data.
      */
-    @SuppressWarnings("unchecked")
-    public synchronized void setResponse(Object data) {
+    public synchronized void setResponse(byte[] data) {
         if (!done) {
             done = true;
-            value = (T) data;
+            value = data;
             this.notifyAll();
         }
     }
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index 367ca91..938ec7b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -13,11 +13,8 @@
     private long id;
     private Endpoint sender;
     private String type;
-    private Object payload;
-
+    private byte[] payload;
     private transient NettyMessagingService messagingService;
-    // TODO: add transient payload serializer or change payload type to
-    //       byte[], ByteBuffer, etc.
 
     // Must be created using the Builder.
     private InternalMessage() {}
@@ -35,7 +32,7 @@
     }
 
     @Override
-    public Object payload() {
+    public byte[] payload() {
         return payload;
     }
 
@@ -44,7 +41,7 @@
     }
 
     @Override
-    public void respond(Object data) throws IOException {
+    public void respond(byte[] data) throws IOException {
         Builder builder = new Builder(messagingService);
         InternalMessage message = builder.withId(this.id)
              // FIXME: Sender should be messagingService.localEp.
@@ -81,7 +78,7 @@
             message.sender = sender;
             return this;
         }
-        public Builder withPayload(Object payload) {
+        public Builder withPayload(byte[] payload) {
             message.payload = payload;
             return this;
         }
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
index 4414d05..b8efb51 100644
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -10,7 +10,7 @@
 /**
  * Kryo Serializer.
  */
-public class KryoSerializer implements PayloadSerializer {
+public class KryoSerializer {
 
     private KryoPool serializerPool;
 
@@ -28,29 +28,26 @@
                           HashMap.class,
                           ArrayList.class,
                           InternalMessage.class,
-                          Endpoint.class
+                          Endpoint.class,
+                          byte[].class
                 )
                 .build()
                 .populate(1);
     }
 
 
-    @Override
     public <T> T decode(byte[] data) {
         return serializerPool.deserialize(data);
     }
 
-    @Override
     public byte[] encode(Object payload) {
         return serializerPool.serialize(payload);
     }
 
-    @Override
     public <T> T decode(ByteBuffer buffer) {
         return serializerPool.deserialize(buffer);
     }
 
-    @Override
     public void encode(Object obj, ByteBuffer buffer) {
         serializerPool.serialize(obj, buffer);
     }
diff --git a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
index 23c4073..366898b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
+++ b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
@@ -12,6 +12,6 @@
 
     @Override
     public void handle(Message message) {
-        log.info("Received message. Payload: " + message.payload());
+        log.info("Received message. Payload has {} bytes", message.payload().length);
     }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/Message.java b/utils/netty/src/main/java/org/onlab/netty/Message.java
index 54b9526..87a8bb6 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Message.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Message.java
@@ -12,12 +12,12 @@
      * Returns the payload of this message.
      * @return message payload.
      */
-    public Object payload();
+    public byte[] payload();
 
     /**
-     * Sends a reply back to the sender of this messge.
+     * Sends a reply back to the sender of this message.
      * @param data payload of the response.
      * @throws IOException if there is a communication error.
      */
-    public void respond(Object data) throws IOException;
+    public void respond(byte[] data) throws IOException;
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index f199019..d4832e5 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -14,14 +14,14 @@
 public class MessageDecoder extends ReplayingDecoder<DecoderState> {
 
     private final NettyMessagingService messagingService;
-    private final PayloadSerializer payloadSerializer;
+
+    private static final KryoSerializer SERIALIZER = new KryoSerializer();
 
     private int contentLength;
 
-    public MessageDecoder(NettyMessagingService messagingService, PayloadSerializer payloadSerializer) {
+    public MessageDecoder(NettyMessagingService messagingService) {
         super(DecoderState.READ_HEADER_VERSION);
         this.messagingService = messagingService;
-        this.payloadSerializer = payloadSerializer;
     }
 
     @Override
@@ -48,7 +48,7 @@
             checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
             checkpoint(DecoderState.READ_CONTENT);
         case READ_CONTENT:
-            InternalMessage message = payloadSerializer.decode(buffer.readBytes(contentLength).nioBuffer());
+            InternalMessage message = SERIALIZER.decode(buffer.readBytes(contentLength).nioBuffer());
             message.setMessagingService(messagingService);
             out.add(message);
             checkpoint(DecoderState.READ_HEADER_VERSION);
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 0ee29cb..716efb9 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -17,11 +17,7 @@
     public static final int SERIALIZER_VERSION = 1;
 
 
-    private final PayloadSerializer payloadSerializer;
-
-    public MessageEncoder(PayloadSerializer payloadSerializer) {
-        this.payloadSerializer = payloadSerializer;
-    }
+    private static final KryoSerializer SERIALIZER = new KryoSerializer();
 
     @Override
     protected void encode(
@@ -35,7 +31,12 @@
         // write preamble
         out.writeBytes(PREAMBLE);
 
-        byte[] payload = payloadSerializer.encode(message);
+        try {
+            SERIALIZER.encode(message);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        byte[] payload = SERIALIZER.encode(message);
 
         // write payload length
         out.writeInt(payload.length);
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index fece742..08676ac 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -11,10 +11,10 @@
      * The message is specified using the type and payload.
      * @param ep end point to send the message to.
      * @param type type of message.
-     * @param payload message payload.
+     * @param payload message payload bytes.
      * @throws IOException
      */
-    public void sendAsync(Endpoint ep, String type, Object payload) throws IOException;
+    public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
 
     /**
      * Sends a message synchronously and waits for a response.
@@ -24,7 +24,7 @@
      * @return a response future
      * @throws IOException
      */
-    public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) throws IOException;
+    public Response sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
 
     /**
      * Registers a new message handler for message type.
@@ -38,12 +38,4 @@
      * @param type message type
      */
     public void unregisterHandler(String type);
-
-    // FIXME: remove me and add PayloadSerializer to all other methods
-    /**
-     * Specify the serializer to use for encoding/decoding payload.
-     *
-     * @param payloadSerializer payloadSerializer to use
-     */
-    public void setPayloadSerializer(PayloadSerializer payloadSerializer);
-}
+}
\ No newline at end of file
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 051482e..48aeb30 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -43,7 +43,7 @@
     private final EventLoopGroup bossGroup = new NioEventLoopGroup();
     private final EventLoopGroup workerGroup = new NioEventLoopGroup();
     private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
-    private final Cache<Long, AsyncResponse<?>> responseFutures = CacheBuilder.newBuilder()
+    private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
             .maximumSize(100000)
             .weakValues()
             // TODO: Once the entry expires, notify blocking threads (if any).
@@ -52,8 +52,6 @@
     private final GenericKeyedObjectPool<Endpoint, Channel> channels
             = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
 
-    protected PayloadSerializer payloadSerializer;
-
     public NettyMessagingService() {
         // TODO: Default port should be configurable.
         this(8080);
@@ -83,7 +81,7 @@
     }
 
     @Override
-    public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
+    public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
         InternalMessage message = new InternalMessage.Builder(this)
             .withId(RandomUtils.nextLong())
             .withSender(localEp)
@@ -108,9 +106,9 @@
     }
 
     @Override
-    public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
+    public Response sendAndReceive(Endpoint ep, String type, byte[] payload)
             throws IOException {
-        AsyncResponse<T> futureResponse = new AsyncResponse<T>();
+        AsyncResponse futureResponse = new AsyncResponse();
         Long messageId = RandomUtils.nextLong();
         responseFutures.put(messageId, futureResponse);
         InternalMessage message = new InternalMessage.Builder(this)
@@ -133,11 +131,6 @@
         handlers.remove(type);
     }
 
-    @Override
-    public void setPayloadSerializer(PayloadSerializer payloadSerializer) {
-        this.payloadSerializer = payloadSerializer;
-    }
-
     private MessageHandler getMessageHandler(String type) {
         return handlers.get(type);
     }
@@ -202,13 +195,13 @@
     private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
 
         private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-        private final ChannelHandler encoder = new MessageEncoder(payloadSerializer);
+        private final ChannelHandler encoder = new MessageEncoder();
 
         @Override
         protected void initChannel(SocketChannel channel) throws Exception {
             channel.pipeline()
                 .addLast("encoder", encoder)
-                .addLast("decoder", new MessageDecoder(NettyMessagingService.this, payloadSerializer))
+                .addLast("decoder", new MessageDecoder(NettyMessagingService.this))
                 .addLast("handler", dispatcher);
         }
     }
@@ -237,7 +230,7 @@
             String type = message.type();
             if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
                 try {
-                    AsyncResponse<?> futureResponse =
+                    AsyncResponse futureResponse =
                         NettyMessagingService.this.responseFutures.getIfPresent(message.id());
                     if (futureResponse != null) {
                         futureResponse.setResponse(message.payload());
diff --git a/utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java b/utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java
deleted file mode 100644
index 9874543..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.onlab.netty;
-
-import java.nio.ByteBuffer;
-
-/**
- * Interface for encoding/decoding message payloads.
- */
-public interface PayloadSerializer {
-
-    /**
-     * Decodes the specified byte array to a POJO.
-     *
-     * @param data byte array.
-     * @return POJO
-     */
-    public <T> T decode(byte[] data);
-
-    /**
-     * Encodes the specified POJO into a byte array.
-     *
-     * @param data POJO to be encoded
-     * @return byte array.
-     */
-    public byte[] encode(Object data);
-
-    /**
-     * Encodes the specified POJO into a byte buffer.
-     *
-     * @param data POJO to be encoded
-     * @param buffer to write serialized bytes
-     */
-    public void encode(final Object data, ByteBuffer buffer);
-
-    /**
-     * Decodes the specified byte buffer to a POJO.
-     *
-     * @param buffer bytes to be decoded
-     * @return POJO
-     */
-    public <T> T decode(final ByteBuffer buffer);
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Response.java b/utils/netty/src/main/java/org/onlab/netty/Response.java
index 04675ce..150755e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Response.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Response.java
@@ -7,26 +7,24 @@
  * Response object returned when making synchronous requests.
  * Can you used to check is a response is ready and/or wait for a response
  * to become available.
- *
- * @param <T> type of response.
  */
-public interface Response<T> {
+public interface Response {
 
     /**
      * Gets the response waiting for a designated timeout period.
      * @param timeout timeout period (since request was sent out)
      * @param tu unit of time.
-     * @return response
+     * @return response payload
      * @throws TimeoutException if the timeout expires before the response arrives.
      */
-    public T get(long timeout, TimeUnit tu) throws TimeoutException;
+    public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
 
     /**
      * Gets the response waiting for indefinite timeout period.
-     * @return response
+     * @return response payload
      * @throws InterruptedException if the thread is interrupted before the response arrives.
      */
-    public T get() throws InterruptedException;
+    public byte[] get() throws InterruptedException;
 
     /**
      * Checks if the response is ready without blocking.
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
index 494d410..3869948 100644
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
@@ -24,7 +24,7 @@
         final int warmup = 100;
         for (int i = 0; i < warmup; i++) {
             Timer.Context context = sendAsyncTimer.time();
-            messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World");
+            messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes());
             context.stop();
         }
         metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer);
@@ -33,10 +33,10 @@
         final int iterations = 1000000;
         for (int i = 0; i < iterations; i++) {
             Timer.Context context = sendAndReceiveTimer.time();
-            Response<String> response = messaging
+            Response response = messaging
                     .sendAndReceive(new Endpoint("localhost", 8080), "echo",
-                                    "Hello World");
-            System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS));
+                                    "Hello World".getBytes());
+            System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
             context.stop();
         }
         metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer);
@@ -45,8 +45,6 @@
     public static class TestNettyMessagingService extends NettyMessagingService {
         public TestNettyMessagingService(int port) throws Exception {
             super(port);
-            PayloadSerializer payloadSerializer = new KryoSerializer();
-            this.payloadSerializer = payloadSerializer;
         }
     }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
index 84984c1..b8ae5b0 100644
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
@@ -7,7 +7,6 @@
     public static void main(String... args) throws Exception {
         NettyMessagingService server = new NettyMessagingService(8080);
         server.activate();
-        server.setPayloadSerializer(new KryoSerializer());
         server.registerHandler("simple", new LoggingHandler());
         server.registerHandler("echo", new EchoHandler());
     }
diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
index 96b877e..36d2a1e 100644
--- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
+++ b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
@@ -2,7 +2,8 @@
 
 import java.util.concurrent.TimeUnit;
 
-import org.junit.Assert;
+import org.apache.commons.lang3.RandomUtils;
+import static org.junit.Assert.*;
 import org.junit.Test;
 
 /**
@@ -17,11 +18,10 @@
         try {
             pinger.activate();
             ponger.activate();
-            pinger.setPayloadSerializer(new KryoSerializer());
-            ponger.setPayloadSerializer(new KryoSerializer());
             ponger.registerHandler("echo", new EchoHandler());
-            Response<String> response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", "hello");
-            Assert.assertEquals("hello", response.get(10000, TimeUnit.MILLISECONDS));
+            byte[] payload = RandomUtils.nextBytes(100);
+            Response response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload);
+            assertArrayEquals(payload, response.get(10000, TimeUnit.MILLISECONDS));
         } finally {
             pinger.deactivate();
             ponger.deactivate();