Merge remote-tracking branch 'origin/master'
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
index a397434..dbdb3db 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
@@ -30,6 +30,10 @@
         System.exit(0);
     }
     public static void startStandalone(String... args) throws Exception {
+        String host = args.length > 0 ? args[0] : "localhost";
+        int port = args.length > 1 ? Integer.parseInt(args[1]) : 8081;
+        int warmup = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
+        int iterations = args.length > 3 ? Integer.parseInt(args[3]) : 50 * 100000;
         NettyMessagingService messaging = new TestNettyMessagingService(9081);
         MetricsManager metrics = new MetricsManager();
         messaging.activate();
@@ -37,28 +41,26 @@
         MetricsFeature feature = new MetricsFeature("latency");
         MetricsComponent component = metrics.registerComponent("NettyMessaging");
 
-        final int warmup = 10000;
         for (int i = 0; i < warmup; i++) {
-            messaging.sendAsync(new Endpoint("localhost", 8081), "simple", "Hello World".getBytes());
+            messaging.sendAsync(new Endpoint(host, port), "simple", "Hello World".getBytes());
             Response response = messaging
-                    .sendAndReceive(new Endpoint("localhost", 8081), "echo",
+                    .sendAndReceive(new Endpoint(host, port), "echo",
                             "Hello World".getBytes());
         }
 
         Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
-        Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
 
-        final int iterations = 10000000;
         for (int i = 0; i < iterations; i++) {
             Timer.Context context = sendAsyncTimer.time();
-            messaging.sendAsync(new Endpoint("localhost", 8081), "simple", "Hello World".getBytes());
+            messaging.sendAsync(new Endpoint(host, port), "simple", "Hello World".getBytes());
             context.stop();
         }
 
+        Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
         for (int i = 0; i < iterations; i++) {
             Timer.Context context = sendAndReceiveTimer.time();
             Response response = messaging
-                    .sendAndReceive(new Endpoint("localhost", 8081), "echo",
+                    .sendAndReceive(new Endpoint(host, port), "echo",
                                     "Hello World".getBytes());
             // System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
             context.stop();
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java
index 3423b23..049e71f 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java
@@ -14,30 +14,26 @@
 public class SimpleNettyClientCommand extends AbstractShellCommand {
 
     //FIXME: replace these arguments with proper ones needed for the test.
-    @Argument(index = 0, name = "serverIp", description = "Server IP address",
+    @Argument(index = 0, name = "hostname", description = "Server Hostname",
             required = false, multiValued = false)
-    String serverIp = "127.0.0.1";
+    String host = "localhost";
 
-    @Argument(index = 1, name = "workers", description = "IO workers",
+    @Argument(index = 3, name = "port", description = "Port",
             required = false, multiValued = false)
-    String workers = "6";
+    String port = "8081";
+
+    @Argument(index = 1, name = "warmupCount", description = "Warm-up count",
+            required = false, multiValued = false)
+    String warmup = "10000";
 
     @Argument(index = 2, name = "messageCount", description = "Message count",
             required = false, multiValued = false)
-    String messageCount = "1000000";
-
-    @Argument(index = 3, name = "messageLength", description = "Message length (bytes)",
-            required = false, multiValued = false)
-    String messageLength = "128";
-
-    @Argument(index = 4, name = "timeoutSecs", description = "Test timeout (seconds)",
-            required = false, multiValued = false)
-    String timeoutSecs = "60";
+    String messageCount = "5000000";
 
     @Override
     protected void execute() {
         try {
-            startStandalone(new String[]{serverIp, workers, messageCount, messageLength, timeoutSecs});
+            startStandalone(new String[]{host, port, warmup, messageCount});
         } catch (Exception e) {
             error("Unable to start client %s", e);
         }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 31df710..ac726c2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -42,6 +42,7 @@
 import org.onlab.onos.store.common.impl.Timestamped;
 import org.onlab.onos.store.serializers.KryoPoolUtil;
 import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
 import org.onlab.util.KryoPool;
 import org.onlab.util.NewConcurrentHashMap;
 import org.slf4j.Logger;
@@ -117,11 +118,11 @@
         protected void setupKryoPool() {
             serializerPool = KryoPool.newBuilder()
                     .register(KryoPoolUtil.API)
-                    .register(InternalDeviceEvent.class)
-                    .register(InternalPortEvent.class)
-                    .register(InternalPortStatusEvent.class)
+                    .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
+                    .register(InternalPortEvent.class, new InternalPortEventSerializer())
+                    .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
                     .register(Timestamped.class)
-                    .register(MastershipBasedTimestamp.class)
+                    .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
                     .build()
                     .populate(1);
         }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java
new file mode 100644
index 0000000..9cb9695
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java
@@ -0,0 +1,42 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalDeviceEvent}.
+ */
+public class InternalDeviceEventSerializer extends Serializer<InternalDeviceEvent> {
+
+    /**
+     * Creates a serializer for {@link InternalDeviceEvent}.
+     */
+    public InternalDeviceEventSerializer() {
+        // does not accept null
+        super(false);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, InternalDeviceEvent event) {
+        kryo.writeClassAndObject(output, event.providerId());
+        kryo.writeClassAndObject(output, event.deviceId());
+        kryo.writeClassAndObject(output, event.deviceDescription());
+    }
+
+    @Override
+    public InternalDeviceEvent read(Kryo kryo, Input input,
+                               Class<InternalDeviceEvent> type) {
+        ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+        DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+        Timestamped<DeviceDescription> deviceDescription = (Timestamped<DeviceDescription>) kryo.readClassAndObject(input);
+
+        return new InternalDeviceEvent(providerId, deviceId, deviceDescription);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java
new file mode 100644
index 0000000..6925bea
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java
@@ -0,0 +1,45 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.List;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalPortEvent}.
+ */
+public class InternalPortEventSerializer extends Serializer<InternalPortEvent> {
+
+    /**
+     * Creates a serializer for {@link InternalPortEvent}.
+     */
+    public InternalPortEventSerializer() {
+        // does not accept null
+        super(false);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, InternalPortEvent event) {
+        kryo.writeClassAndObject(output, event.providerId());
+        kryo.writeClassAndObject(output, event.deviceId());
+        kryo.writeClassAndObject(output, event.portDescriptions());
+    }
+
+    @Override
+    public InternalPortEvent read(Kryo kryo, Input input,
+                               Class<InternalPortEvent> type) {
+        ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+        DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+        Timestamped<List<PortDescription>> portDescriptions = (Timestamped<List<PortDescription>>) kryo.readClassAndObject(input);
+
+        return new InternalPortEvent(providerId, deviceId, portDescriptions);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
new file mode 100644
index 0000000..6ec4122
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
@@ -0,0 +1,42 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalPortStatusEvent}.
+ */
+public class InternalPortStatusEventSerializer extends Serializer<InternalPortStatusEvent> {
+
+    /**
+     * Creates a serializer for {@link InternalPortStatusEvent}.
+     */
+    public InternalPortStatusEventSerializer() {
+        // does not accept null
+        super(false);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, InternalPortStatusEvent event) {
+        kryo.writeClassAndObject(output, event.providerId());
+        kryo.writeClassAndObject(output, event.deviceId());
+        kryo.writeClassAndObject(output, event.portDescription());
+    }
+
+    @Override
+    public InternalPortStatusEvent read(Kryo kryo, Input input,
+                               Class<InternalPortStatusEvent> type) {
+        ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+        DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+        Timestamped<PortDescription> portDescription = (Timestamped<PortDescription>) kryo.readClassAndObject(input);
+
+        return new InternalPortStatusEvent(providerId, deviceId, portDescription);
+    }
+}