Timeout local messages in NettyMessagingManager to avoid hanging when receivers are blocked on external calls.

Change-Id: Ic104a21317f4223921f1acba231e3f97039c2f2e
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index b7e121b..0359bae 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -116,8 +116,8 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private final ClientConnection localClientConnection = new LocalClientConnection();
-    private final ServerConnection localServerConnection = new LocalServerConnection(null);
+    private final LocalClientConnection localClientConnection = new LocalClientConnection();
+    private final LocalServerConnection localServerConnection = new LocalServerConnection(null);
 
     //TODO CONFIG_DIR is duplicated from ConfigFileBasedClusterMetadataProvider
     private static final String CONFIG_DIR = "../config";
@@ -287,6 +287,7 @@
      */
     private void timeoutAllCallbacks() {
         // Iterate through all connections and time out callbacks.
+        localClientConnection.timeoutCallbacks();
         for (RemoteClientConnection connection : clientConnections.values()) {
             connection.timeoutCallbacks();
         }
@@ -406,11 +407,11 @@
             CompletableFuture<T> future) {
         if (endpoint.equals(localEndpoint)) {
             callback.apply(localClientConnection).whenComplete((result, error) -> {
-               if (error == null) {
-                   executor.execute(() -> future.complete(result));
-               } else {
-                   executor.execute(() -> future.completeExceptionally(error));
-               }
+                if (error == null) {
+                    executor.execute(() -> future.complete(result));
+                } else {
+                    executor.execute(() -> future.completeExceptionally(error));
+                }
             });
             return;
         }
@@ -656,7 +657,6 @@
          *
          * @param msg inbound message
          * @return true if {@code msg} is {@link InternalMessage} instance.
-         *
          * @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
          */
         @Override
@@ -724,7 +724,7 @@
          * Sends a reply to the other side of the connection.
          *
          * @param message the message to which to reply
-         * @param status the reply status
+         * @param status  the reply status
          * @param payload the response payload
          */
         void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
@@ -737,9 +737,84 @@
     }
 
     /**
+     * Remote connection implementation.
+     */
+    private abstract class AbstractClientConnection implements ClientConnection {
+        private final Map<Long, Callback> futures = Maps.newConcurrentMap();
+        private final AtomicBoolean closed = new AtomicBoolean(false);
+        private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
+                .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
+                .build();
+
+        /**
+         * Times out callbacks for this connection.
+         */
+        protected void timeoutCallbacks() {
+            // Store the current time.
+            long currentTime = System.currentTimeMillis();
+
+            // Iterate through future callbacks and time out callbacks that have been alive
+            // longer than the current timeout according to the message type.
+            Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Callback callback = iterator.next().getValue();
+                try {
+                    TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
+                    long currentTimeout = timeoutHistory.currentTimeout;
+                    if (currentTime - callback.time > currentTimeout) {
+                        iterator.remove();
+                        long elapsedTime = currentTime - callback.time;
+                        timeoutHistory.addReplyTime(elapsedTime);
+                        callback.completeExceptionally(
+                                new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
+                    }
+                } catch (ExecutionException e) {
+                    throw new AssertionError();
+                }
+            }
+
+            // Iterate through all timeout histories and recompute the timeout.
+            for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
+                timeoutHistory.recomputeTimeoutMillis();
+            }
+        }
+
+        protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) {
+            futures.put(id, new Callback(subject, future));
+        }
+
+        protected Callback completeCallback(long id) {
+            Callback callback = futures.remove(id);
+            if (callback != null) {
+                try {
+                    TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
+                    timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time);
+                } catch (ExecutionException e) {
+                    throw new AssertionError();
+                }
+            }
+            return callback;
+        }
+
+        protected Callback failCallback(long id) {
+            return futures.remove(id);
+        }
+
+        @Override
+        public void close() {
+            if (closed.compareAndSet(false, true)) {
+                timeoutFuture.cancel(false);
+                for (Callback callback : futures.values()) {
+                    callback.completeExceptionally(new ConnectException());
+                }
+            }
+        }
+    }
+
+    /**
      * Local connection implementation.
      */
-    private final class LocalClientConnection implements ClientConnection {
+    private final class LocalClientConnection extends AbstractClientConnection {
         @Override
         public CompletableFuture<Void> sendAsync(InternalRequest message) {
             BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
@@ -754,6 +829,8 @@
         @Override
         public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
             CompletableFuture<byte[]> future = new CompletableFuture<>();
+            future.whenComplete((r, e) -> completeCallback(message.id()));
+            registerCallback(message.id(), message.subject(), future);
             BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
             if (handler != null) {
                 handler.accept(message, new LocalServerConnection(future));
@@ -795,51 +872,13 @@
     /**
      * Remote connection implementation.
      */
-    private final class RemoteClientConnection implements ClientConnection {
+    private final class RemoteClientConnection extends AbstractClientConnection {
         private final Channel channel;
-        private final Map<Long, Callback> futures = Maps.newConcurrentMap();
-        private final AtomicBoolean closed = new AtomicBoolean(false);
-        private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
-                .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
-                .build();
 
         RemoteClientConnection(Channel channel) {
             this.channel = channel;
         }
 
-        /**
-         * Times out callbacks for this connection.
-         */
-        private void timeoutCallbacks() {
-            // Store the current time.
-            long currentTime = System.currentTimeMillis();
-
-            // Iterate through future callbacks and time out callbacks that have been alive
-            // longer than the current timeout according to the message type.
-            Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
-            while (iterator.hasNext()) {
-                Callback callback = iterator.next().getValue();
-                try {
-                    TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
-                    long currentTimeout = timeoutHistory.currentTimeout;
-                    if (currentTime - callback.time > currentTimeout) {
-                        iterator.remove();
-                        long elapsedTime = currentTime - callback.time;
-                        timeoutHistory.addReplyTime(elapsedTime);
-                        callback.completeExceptionally(
-                                new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
-                    }
-                } catch (ExecutionException e) {
-                    throw new AssertionError();
-                }
-            }
-
-            // Iterate through all timeout histories and recompute the timeout.
-            for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
-                timeoutHistory.recomputeTimeoutMillis();
-            }
-        }
-
         @Override
         public CompletableFuture<Void> sendAsync(InternalRequest message) {
             CompletableFuture<Void> future = new CompletableFuture<>();
@@ -856,12 +895,13 @@
         @Override
         public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
             CompletableFuture<byte[]> future = new CompletableFuture<>();
-            Callback callback = new Callback(message.subject(), future);
-            futures.put(message.id(), callback);
+            registerCallback(message.id(), message.subject(), future);
             channel.writeAndFlush(message).addListener(channelFuture -> {
                 if (!channelFuture.isSuccess()) {
-                    futures.remove(message.id());
-                    callback.completeExceptionally(channelFuture.cause());
+                    Callback callback = failCallback(message.id());
+                    if (callback != null) {
+                        callback.completeExceptionally(channelFuture.cause());
+                    }
                 }
             });
             return future;
@@ -880,7 +920,7 @@
 
             clockService.recordEventTime(message.time());
 
-            Callback callback = futures.remove(message.id());
+            Callback callback = completeCallback(message.id());
             if (callback != null) {
                 if (message.status() == InternalReply.Status.OK) {
                     callback.complete(message.payload());
@@ -891,29 +931,12 @@
                 } else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
                     callback.completeExceptionally(new MessagingException.ProtocolException());
                 }
-
-                try {
-                    TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
-                    timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time);
-                } catch (ExecutionException e) {
-                    throw new AssertionError();
-                }
             } else {
                 log.debug("Received a reply for message id:[{}] "
                         + "but was unable to locate the"
                         + " request handle", message.id());
             }
         }
-
-        @Override
-        public void close() {
-            if (closed.compareAndSet(false, true)) {
-                timeoutFuture.cancel(false);
-                for (Callback callback : futures.values()) {
-                    callback.completeExceptionally(new ConnectException());
-                }
-            }
-        }
     }
 
     /**