Merge remote-tracking branch 'origin/master'
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
index a397434..dbdb3db 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
@@ -30,6 +30,10 @@
System.exit(0);
}
public static void startStandalone(String... args) throws Exception {
+ String host = args.length > 0 ? args[0] : "localhost";
+ int port = args.length > 1 ? Integer.parseInt(args[1]) : 8081;
+ int warmup = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
+ int iterations = args.length > 3 ? Integer.parseInt(args[3]) : 50 * 100000;
NettyMessagingService messaging = new TestNettyMessagingService(9081);
MetricsManager metrics = new MetricsManager();
messaging.activate();
@@ -37,28 +41,26 @@
MetricsFeature feature = new MetricsFeature("latency");
MetricsComponent component = metrics.registerComponent("NettyMessaging");
- final int warmup = 10000;
for (int i = 0; i < warmup; i++) {
- messaging.sendAsync(new Endpoint("localhost", 8081), "simple", "Hello World".getBytes());
+ messaging.sendAsync(new Endpoint(host, port), "simple", "Hello World".getBytes());
Response response = messaging
- .sendAndReceive(new Endpoint("localhost", 8081), "echo",
+ .sendAndReceive(new Endpoint(host, port), "echo",
"Hello World".getBytes());
}
Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
- Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
- final int iterations = 10000000;
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAsyncTimer.time();
- messaging.sendAsync(new Endpoint("localhost", 8081), "simple", "Hello World".getBytes());
+ messaging.sendAsync(new Endpoint(host, port), "simple", "Hello World".getBytes());
context.stop();
}
+ Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAndReceiveTimer.time();
Response response = messaging
- .sendAndReceive(new Endpoint("localhost", 8081), "echo",
+ .sendAndReceive(new Endpoint(host, port), "echo",
"Hello World".getBytes());
// System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
context.stop();
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java
index 3423b23..049e71f 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClientCommand.java
@@ -14,30 +14,26 @@
public class SimpleNettyClientCommand extends AbstractShellCommand {
//FIXME: replace these arguments with proper ones needed for the test.
- @Argument(index = 0, name = "serverIp", description = "Server IP address",
+ @Argument(index = 0, name = "hostname", description = "Server Hostname",
required = false, multiValued = false)
- String serverIp = "127.0.0.1";
+ String host = "localhost";
- @Argument(index = 1, name = "workers", description = "IO workers",
+ @Argument(index = 3, name = "port", description = "Port",
required = false, multiValued = false)
- String workers = "6";
+ String port = "8081";
+
+ @Argument(index = 1, name = "warmupCount", description = "Warm-up count",
+ required = false, multiValued = false)
+ String warmup = "10000";
@Argument(index = 2, name = "messageCount", description = "Message count",
required = false, multiValued = false)
- String messageCount = "1000000";
-
- @Argument(index = 3, name = "messageLength", description = "Message length (bytes)",
- required = false, multiValued = false)
- String messageLength = "128";
-
- @Argument(index = 4, name = "timeoutSecs", description = "Test timeout (seconds)",
- required = false, multiValued = false)
- String timeoutSecs = "60";
+ String messageCount = "5000000";
@Override
protected void execute() {
try {
- startStandalone(new String[]{serverIp, workers, messageCount, messageLength, timeoutSecs});
+ startStandalone(new String[]{host, port, warmup, messageCount});
} catch (Exception e) {
error("Unable to start client %s", e);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 31df710..ac726c2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -42,6 +42,7 @@
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
import org.onlab.util.KryoPool;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
@@ -117,11 +118,11 @@
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
- .register(InternalDeviceEvent.class)
- .register(InternalPortEvent.class)
- .register(InternalPortStatusEvent.class)
+ .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
+ .register(InternalPortEvent.class, new InternalPortEventSerializer())
+ .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
.register(Timestamped.class)
- .register(MastershipBasedTimestamp.class)
+ .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
.build()
.populate(1);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java
new file mode 100644
index 0000000..9cb9695
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java
@@ -0,0 +1,42 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalDeviceEvent}.
+ */
+public class InternalDeviceEventSerializer extends Serializer<InternalDeviceEvent> {
+
+ /**
+ * Creates a serializer for {@link InternalDeviceEvent}.
+ */
+ public InternalDeviceEventSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, InternalDeviceEvent event) {
+ kryo.writeClassAndObject(output, event.providerId());
+ kryo.writeClassAndObject(output, event.deviceId());
+ kryo.writeClassAndObject(output, event.deviceDescription());
+ }
+
+ @Override
+ public InternalDeviceEvent read(Kryo kryo, Input input,
+ Class<InternalDeviceEvent> type) {
+ ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+ DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+ Timestamped<DeviceDescription> deviceDescription = (Timestamped<DeviceDescription>) kryo.readClassAndObject(input);
+
+ return new InternalDeviceEvent(providerId, deviceId, deviceDescription);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java
new file mode 100644
index 0000000..6925bea
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java
@@ -0,0 +1,45 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.List;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalPortEvent}.
+ */
+public class InternalPortEventSerializer extends Serializer<InternalPortEvent> {
+
+ /**
+ * Creates a serializer for {@link InternalPortEvent}.
+ */
+ public InternalPortEventSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, InternalPortEvent event) {
+ kryo.writeClassAndObject(output, event.providerId());
+ kryo.writeClassAndObject(output, event.deviceId());
+ kryo.writeClassAndObject(output, event.portDescriptions());
+ }
+
+ @Override
+ public InternalPortEvent read(Kryo kryo, Input input,
+ Class<InternalPortEvent> type) {
+ ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+ DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+ Timestamped<List<PortDescription>> portDescriptions = (Timestamped<List<PortDescription>>) kryo.readClassAndObject(input);
+
+ return new InternalPortEvent(providerId, deviceId, portDescriptions);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
new file mode 100644
index 0000000..6ec4122
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
@@ -0,0 +1,42 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalPortStatusEvent}.
+ */
+public class InternalPortStatusEventSerializer extends Serializer<InternalPortStatusEvent> {
+
+ /**
+ * Creates a serializer for {@link InternalPortStatusEvent}.
+ */
+ public InternalPortStatusEventSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, InternalPortStatusEvent event) {
+ kryo.writeClassAndObject(output, event.providerId());
+ kryo.writeClassAndObject(output, event.deviceId());
+ kryo.writeClassAndObject(output, event.portDescription());
+ }
+
+ @Override
+ public InternalPortStatusEvent read(Kryo kryo, Input input,
+ Class<InternalPortStatusEvent> type) {
+ ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+ DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+ Timestamped<PortDescription> portDescription = (Timestamped<PortDescription>) kryo.readClassAndObject(input);
+
+ return new InternalPortStatusEvent(providerId, deviceId, portDescription);
+ }
+}