MapDB backed Copycat log implementation
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
index 7d94847..b3eaeb4 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
@@ -3,12 +3,17 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
import net.kuujo.copycat.protocol.RequestHandler;
import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
@@ -57,37 +62,37 @@
public void handle(ClusterMessage message) {
T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
if (request.getClass().equals(PingRequest.class)) {
- handler.ping((PingRequest) request).whenComplete((response, error) -> {
- try {
- message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
- } catch (Exception e) {
- log.error("Failed to respond to ping request", e);
- }
- });
+ handler.ping((PingRequest) request).whenComplete(new PostExecutionTask<PingResponse>(message));
} else if (request.getClass().equals(PollRequest.class)) {
- handler.poll((PollRequest) request).whenComplete((response, error) -> {
- try {
- message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
- } catch (Exception e) {
- log.error("Failed to respond to poll request", e);
- }
- });
+ handler.poll((PollRequest) request).whenComplete(new PostExecutionTask<PollResponse>(message));
} else if (request.getClass().equals(SyncRequest.class)) {
- handler.sync((SyncRequest) request).whenComplete((response, error) -> {
- try {
- message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
- } catch (Exception e) {
- log.error("Failed to respond to sync request", e);
- }
- });
+ handler.sync((SyncRequest) request).whenComplete(new PostExecutionTask<SyncResponse>(message));
} else if (request.getClass().equals(SubmitRequest.class)) {
- handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
+ handler.submit((SubmitRequest) request).whenComplete(new PostExecutionTask<SubmitResponse>(message));
+ } else {
+ throw new IllegalStateException("Unknown request type: " + request.getClass().getName());
+ }
+ }
+
+ private class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
+
+ private final ClusterMessage message;
+
+ public PostExecutionTask(ClusterMessage message) {
+ this.message = message;
+ }
+
+ @Override
+ public void accept(R response, Throwable t) {
+ if (t != null) {
+ log.error("Processing for " + message.subject() + " failed.", t);
+ } else {
try {
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
- log.error("Failed to respond to submit request", e);
+ log.error("Failed to respond to " + response.getClass().getName(), e);
}
- });
+ }
}
}
}