More IO loop work.
diff --git a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
index b2acca4..89107bf 100644
--- a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
+++ b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
@@ -1,5 +1,6 @@
package org.onlab.nio;
+import org.onlab.util.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,10 @@
private Exception ioError;
private long lastActiveTime;
+ private final Counter bytesIn = new Counter();
+ private final Counter messagesIn = new Counter();
+ private final Counter bytesOut = new Counter();
+ private final Counter messagesOut = new Counter();
/**
* Creates a message stream associated with the specified IO loop and
@@ -93,6 +98,11 @@
closed = true;
}
+ bytesIn.freeze();
+ bytesOut.freeze();
+ messagesIn.freeze();
+ messagesOut.freeze();
+
loop.removeStream(this);
if (key != null) {
try {
@@ -176,6 +186,8 @@
inbound.flip();
while ((message = read(inbound)) != null) {
messages.add(message);
+ messagesIn.add(1);
+ bytesIn.add(message.length());
}
inbound.compact();
@@ -226,8 +238,9 @@
while (outbound.remaining() < message.length()) {
doubleSize();
}
- // Place the message into the buffer and bump the output trackers.
write(message, outbound);
+ messagesOut.add(1);
+ bytesOut.add(message.length());
}
// Forces a flush, unless one is planned already.
@@ -273,6 +286,18 @@
}
}
+
+ /**
+ * Indicates whether data has been written but not flushed yet.
+ *
+ * @return true if flush is required
+ */
+ boolean isFlushRequired() {
+ synchronized (this) {
+ return outbound.position() > 0;
+ }
+ }
+
/**
* Attempts to flush data, internal stream state and channel availability
* permitting. Invoked by the driver I/O loop during handling of writable
@@ -344,4 +369,40 @@
return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
}
+ /**
+ * Returns the inbound bytes counter.
+ *
+ * @return inbound bytes counter
+ */
+ public Counter bytesIn() {
+ return bytesIn;
+ }
+
+ /**
+ * Returns the outbound bytes counter.
+ *
+ * @return outbound bytes counter
+ */
+ public Counter bytesOut() {
+ return bytesOut;
+ }
+
+ /**
+ * Returns the inbound messages counter.
+ *
+ * @return inbound messages counter
+ */
+ public Counter messagesIn() {
+ return messagesIn;
+ }
+
+ /**
+ * Returns the outbound messages counter.
+ *
+ * @return outbound messages counter
+ */
+ public Counter messagesOut() {
+ return messagesOut;
+ }
+
}
diff --git a/utils/nio/src/main/java/org/onlab/nio/package-info.java b/utils/nio/src/main/java/org/onlab/nio/package-info.java
new file mode 100644
index 0000000..d5ddd10
--- /dev/null
+++ b/utils/nio/src/main/java/org/onlab/nio/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Mechanism to transfer messages over network using IO loop and
+ * message stream, backed by NIO byte buffers.
+ */
+package org.onlab.nio;
\ No newline at end of file