MessagingService API enchancements
Change-Id: Iabfe15d4f08d7c53bd6575c5d94d0ac9f4e1a38e
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index b2ee832..b537517 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -49,7 +49,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -241,9 +241,9 @@
});
}
- private class HeartbeatMessageHandler implements Consumer<byte[]> {
+ private class HeartbeatMessageHandler implements BiConsumer<Endpoint, byte[]> {
@Override
- public void accept(byte[] message) {
+ public void accept(Endpoint sender, byte[] message) {
HeartbeatMessage hb = SERIALIZER.decode(message);
failureDetector.report(hb.source().id());
hb.knownPeers().forEach(node -> {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 8a237ef..df4ac5c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -35,10 +35,13 @@
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
+
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -210,7 +213,7 @@
executor);
}
- private class InternalClusterMessageHandler implements Function<byte[], byte[]> {
+ private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
private ClusterMessageHandler handler;
public InternalClusterMessageHandler(ClusterMessageHandler handler) {
@@ -218,14 +221,14 @@
}
@Override
- public byte[] apply(byte[] bytes) {
+ public byte[] apply(Endpoint sender, byte[] bytes) {
ClusterMessage message = ClusterMessage.fromBytes(bytes);
handler.handle(message);
return message.response();
}
}
- private class InternalMessageResponder<M, R> implements Function<byte[], CompletableFuture<byte[]>> {
+ private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
private final Function<byte[], M> decoder;
private final Function<R, byte[]> encoder;
private final Function<M, CompletableFuture<R>> handler;
@@ -239,12 +242,12 @@
}
@Override
- public CompletableFuture<byte[]> apply(byte[] bytes) {
+ public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
}
}
- private class InternalMessageConsumer<M> implements Consumer<byte[]> {
+ private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
private final Function<byte[], M> decoder;
private final Consumer<M> consumer;
@@ -254,7 +257,7 @@
}
@Override
- public void accept(byte[] bytes) {
+ public void accept(Endpoint sender, byte[] bytes) {
consumer.accept(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
}
}