ONOS-1983: Migrating all copycat Raft protocol specific communication to use ONOS cluster communication primitives
Change-Id: I3f07266e50106b1adc13f722c647686c2b42ef7f
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 4a93068..61d3c56 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -25,6 +25,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
@@ -34,9 +35,11 @@
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Dictionary;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
@@ -388,6 +391,37 @@
}
}
+ /**
+ * Returns a future that is completed exceptionally.
+ * @param t exception
+ * @param <T> future value type
+ * @return future
+ */
+ public static <T> CompletableFuture<T> exceptionalFuture(Throwable t) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ future.completeExceptionally(t);
+ return future;
+ }
+
+ /**
+ * Returns the contents of {@code ByteBuffer} as byte array.
+ * <p>
+ * WARNING: There is a performance cost due to array copy
+ * when using this method.
+ * @param buffer byte buffer
+ * @return byte array containing the byte buffer contents
+ */
+ public static byte[] byteBuffertoArray(ByteBuffer buffer) {
+ int length = buffer.remaining();
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ return Arrays.copyOfRange(buffer.array(), offset, offset + length);
+ }
+ byte[] bytes = new byte[length];
+ buffer.duplicate().get(bytes);
+ return bytes;
+ }
+
// Auxiliary path visitor for recursive directory structure copying.
private static class DirectoryCopier extends SimpleFileVisitor<Path> {
private Path src;
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index 44b7027..c19dc59 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -203,6 +203,25 @@
}
@Override
+ public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
+ handlers.put(type, message -> {
+ handler.apply(message.payload()).whenComplete((result, error) -> {
+ if (error == null) {
+ InternalMessage response = new InternalMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ result);
+ try {
+ sendAsync(message.sender(), response);
+ } catch (IOException e) {
+ log.debug("Failed to respond", e);
+ }
+ }
+ });
+ });
+ }
+
+ @Override
public void unregisterHandler(String type) {
handlers.remove(type);
}
diff --git a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
index 69b312a..37a6535 100644
--- a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
+++ b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
@@ -212,6 +212,23 @@
}
@Override
+ public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
+ handlers.put(type, message -> handler.apply(message.payload()).whenComplete((result, error) -> {
+ if (error == null) {
+ DefaultMessage response = new DefaultMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ result);
+ try {
+ sendAsync(message.sender(), response);
+ } catch (IOException e) {
+ log.debug("Failed to respond", e);
+ }
+ }
+ }));
+ }
+
+ @Override
public void unregisterHandler(String type) {
handlers.remove(type);
}
@@ -312,4 +329,4 @@
return stream.isClosed();
}
}
-}
\ No newline at end of file
+}