Ensure client events are handled in the correct thread in consistent primitive tests.
Change-Id: Ic6db44ec8fc393d6194eeba8b3f84c28245a254a
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/TestClusterCommunicationService.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/TestClusterCommunicationService.java
index 791a488..b7a1b89 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/TestClusterCommunicationService.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/TestClusterCommunicationService.java
@@ -127,8 +127,8 @@
public <M, R> void addSubscriber(
MessageSubject subject,
Function<byte[], M> decoder,
-
- Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
+ Function<M, CompletableFuture<R>> handler,
+ Function<R, byte[]> encoder) {
subscribers.put(subject, message -> {
CompletableFuture<byte[]> future = new CompletableFuture<>();
try {
@@ -153,12 +153,16 @@
Consumer<M> handler,
Executor executor) {
subscribers.put(subject, message -> {
- try {
- handler.accept(decoder.apply(message));
- } catch (Exception e) {
- return Futures.exceptionalFuture(new MessagingException.RemoteHandlerFailure());
- }
- return Futures.completedFuture(null);
+ CompletableFuture<byte[]> future = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ handler.accept(decoder.apply(message));
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+ }
+ });
+ return future;
});
}