ClusterMessagingProtocol: stop processing in netty handler thread
- Fix for io.netty.util.concurrent.BlockingOperationException
Change-Id: Ie0f4dee2c3a49aa4b03674f6f7678f32fcf07a44
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
index 475db82..d96ecbf 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -4,12 +4,12 @@
import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
+import static java.util.concurrent.Executors.newCachedThreadPool;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,9 +49,7 @@
private ControllerNode remoteNode;
private final AtomicBoolean connectionOK = new AtomicBoolean(true);
- // TODO: make this non-static and stop on close
- private static final ExecutorService THREAD_POOL
- = Executors.newCachedThreadPool(namedThreads("copycat-netty-messaging-%d"));
+ private ExecutorService pool;
public ClusterMessagingProtocolClient(
ClusterService clusterService,
@@ -87,11 +85,19 @@
@Override
public synchronized CompletableFuture<Void> connect() {
+ if (pool == null || pool.isShutdown()) {
+ // TODO include remote name?
+ pool = newCachedThreadPool(namedThreads("copycat-netty-messaging-client-%d"));
+ }
return CompletableFuture.completedFuture(null);
}
@Override
public synchronized CompletableFuture<Void> close() {
+ if (pool != null) {
+ pool.shutdownNow();
+ pool = null;
+ }
return CompletableFuture.completedFuture(null);
}
@@ -112,7 +118,11 @@
private <I, O> CompletableFuture<O> requestReply(I request) {
CompletableFuture<O> future = new CompletableFuture<>();
- THREAD_POOL.submit(new RPCTask<I, O>(request, future));
+ if (pool == null) {
+ log.info("Attempted to use closed client, connecting now. {}", request);
+ connect();
+ }
+ pool.submit(new RPCTask<I, O>(request, future));
return future;
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
index 373bc97..d5532d5 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
@@ -1,19 +1,20 @@
package org.onlab.onos.store.service.impl;
+import static java.util.concurrent.Executors.newCachedThreadPool;
+import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.*;
+import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import net.kuujo.copycat.protocol.PingRequest;
-import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
-import net.kuujo.copycat.protocol.PollResponse;
import net.kuujo.copycat.protocol.RequestHandler;
import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.protocol.SyncRequest;
-import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
@@ -27,12 +28,15 @@
public class ClusterMessagingProtocolServer implements ProtocolServer {
private final Logger log = getLogger(getClass());
+
+ private final ClusterCommunicationService clusterCommunicator;
+
private volatile RequestHandler handler;
- private ClusterCommunicationService clusterCommunicator;
+
+ private ExecutorService pool;
public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
this.clusterCommunicator = clusterCommunicator;
-
}
@Override
@@ -42,67 +46,128 @@
@Override
public CompletableFuture<Void> listen() {
- clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_PING,
- new CopycatMessageHandler<PingRequest>());
- clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC,
- new CopycatMessageHandler<SyncRequest>());
- clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_POLL,
- new CopycatMessageHandler<PollRequest>());
- clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT,
- new CopycatMessageHandler<SubmitRequest>());
+ if (pool == null || pool.isShutdown()) {
+ pool = newCachedThreadPool(namedThreads("copycat-netty-messaging-server-%d"));
+ }
+
+ clusterCommunicator.addSubscriber(COPYCAT_PING, new PingHandler());
+ clusterCommunicator.addSubscriber(COPYCAT_SYNC, new SyncHandler());
+ clusterCommunicator.addSubscriber(COPYCAT_POLL, new PollHandler());
+ clusterCommunicator.addSubscriber(COPYCAT_SUBMIT, new SubmitHandler());
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> close() {
- clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_PING);
- clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC);
- clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_POLL);
- clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT);
+ clusterCommunicator.removeSubscriber(COPYCAT_PING);
+ clusterCommunicator.removeSubscriber(COPYCAT_SYNC);
+ clusterCommunicator.removeSubscriber(COPYCAT_POLL);
+ clusterCommunicator.removeSubscriber(COPYCAT_SUBMIT);
+ if (pool != null) {
+ pool.shutdownNow();
+ pool = null;
+ }
return CompletableFuture.completedFuture(null);
}
- private class CopycatMessageHandler<T> implements ClusterMessageHandler {
+ private final class PingHandler extends CopycatMessageHandler<PingRequest> {
+
+ @Override
+ public void raftHandle(PingRequest request, ClusterMessage message) {
+ pool.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ currentHandler().ping(request)
+ .whenComplete(new PostExecutionTask<>(message));
+ }
+ });
+ }
+ }
+
+ private final class SyncHandler extends CopycatMessageHandler<SyncRequest> {
+
+ @Override
+ public void raftHandle(SyncRequest request, ClusterMessage message) {
+ pool.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ currentHandler().sync(request)
+ .whenComplete(new PostExecutionTask<>(message));
+ }
+ });
+ }
+ }
+
+ private final class PollHandler extends CopycatMessageHandler<PollRequest> {
+
+ @Override
+ public void raftHandle(PollRequest request, ClusterMessage message) {
+ pool.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ currentHandler().poll(request)
+ .whenComplete(new PostExecutionTask<>(message));
+ }
+ });
+ }
+ }
+
+ private final class SubmitHandler extends CopycatMessageHandler<SubmitRequest> {
+
+ @Override
+ public void raftHandle(SubmitRequest request, ClusterMessage message) {
+ pool.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ currentHandler().submit(request)
+ .whenComplete(new PostExecutionTask<>(message));
+ }
+ });
+ }
+ }
+
+ private abstract class CopycatMessageHandler<T> implements ClusterMessageHandler {
+
+ public abstract void raftHandle(T request, ClusterMessage message);
@Override
public void handle(ClusterMessage message) {
- T request = ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
- if (handler == null) {
+ T request = DB_SERIALIZER.decode(message.payload());
+ raftHandle(request, message);
+ }
+
+ RequestHandler currentHandler() {
+ RequestHandler currentHandler = handler;
+ if (currentHandler == null) {
// there is a slight window of time during state transition,
// where handler becomes null
+ long sleepMs = 1;
for (int i = 0; i < 10; ++i) {
- if (handler != null) {
+ currentHandler = handler;
+ if (currentHandler != null) {
break;
}
try {
- Thread.sleep(1);
+ sleepMs <<= 1;
+ Thread.sleep(sleepMs);
} catch (InterruptedException e) {
- log.trace("Exception", e);
+ log.error("Interrupted", e);
+ return handler;
}
}
- if (handler == null) {
+ if (currentHandler == null) {
log.error("There was no handler registered!");
- return;
+ return handler;
}
}
- if (request.getClass().equals(PingRequest.class)) {
- handler.ping((PingRequest) request)
- .whenComplete(new PostExecutionTask<PingResponse>(message));
- } else if (request.getClass().equals(PollRequest.class)) {
- handler.poll((PollRequest) request)
- .whenComplete(new PostExecutionTask<PollResponse>(message));
- } else if (request.getClass().equals(SyncRequest.class)) {
- handler.sync((SyncRequest) request)
- .whenComplete(new PostExecutionTask<SyncResponse>(message));
- } else if (request.getClass().equals(SubmitRequest.class)) {
- handler.submit((SubmitRequest) request)
- .whenComplete(new PostExecutionTask<SubmitResponse>(message));
- } else {
- throw new IllegalStateException("Unknown request type: " + request.getClass().getName());
- }
+ return currentHandler;
}
- private class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
+ final class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
private final ClusterMessage message;
@@ -111,15 +176,15 @@
}
@Override
- public void accept(R response, Throwable t) {
- if (t != null) {
- log.error("Processing for " + message.subject() + " failed.", t);
+ public void accept(R response, Throwable error) {
+ if (error != null) {
+ log.error("Processing {} failed.", message.subject(), error);
} else {
try {
log.trace("responding to {}", message.subject());
- message.respond(ClusterMessagingProtocol.DB_SERIALIZER.encode(response));
+ message.respond(DB_SERIALIZER.encode(response));
} catch (Exception e) {
- log.error("Failed to respond to " + response.getClass().getName(), e);
+ log.error("Failed responding with {}", response.getClass().getName(), e);
}
}
}