[ONOS-7829] Implement AbstractGrpcClient and AbstractGrpcClientControl

Change-Id: I39cba6834e7fe8d1b60b576b9934c0b3cfa7104b
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
new file mode 100644
index 0000000..4764a56
--- /dev/null
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.ctl;
+
+import io.grpc.Context;
+import io.grpc.ManagedChannel;
+import io.grpc.StatusRuntimeException;
+import org.onlab.util.SharedExecutors;
+import org.onosproject.grpc.api.GrpcClient;
+import org.onosproject.grpc.api.GrpcClientKey;
+import org.onosproject.net.DeviceId;
+import org.slf4j.Logger;
+
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Abstract client for gRPC service.
+ *
+ */
+public abstract class AbstractGrpcClient implements GrpcClient {
+
+    // Timeout in seconds to obtain the request lock.
+    protected static final int LOCK_TIMEOUT = 60;
+    private static final int DEFAULT_THREAD_POOL_SIZE = 10;
+
+    protected final Logger log = getLogger(getClass());
+
+    protected final Lock requestLock = new ReentrantLock();
+    protected final Context.CancellableContext cancellableContext =
+            Context.current().withCancellation();
+    protected final ExecutorService executorService;
+    protected final Executor contextExecutor;
+
+    protected ManagedChannel channel;
+    protected DeviceId deviceId;
+
+    protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel) {
+        this.deviceId = clientKey.deviceId();
+        this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
+                "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
+        this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
+        this.channel = channel;
+    }
+
+    @Override
+    public CompletableFuture<Void> shutdown() {
+        return supplyWithExecutor(this::doShutdown, "shutdown",
+                SharedExecutors.getPoolThreadExecutor());
+    }
+
+    protected Void doShutdown() {
+        log.debug("Shutting down client for {}...", deviceId);
+        cancellableContext.cancel(new InterruptedException(
+                "Requested client shutdown"));
+        this.executorService.shutdownNow();
+        try {
+            executorService.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("Executor service didn't shutdown in time.");
+            Thread.currentThread().interrupt();
+        }
+        return null;
+    }
+
+    /**
+     * Equivalent of supplyWithExecutor using the gRPC context executor of this
+     * client, such that if the context is cancelled (e.g. client shutdown) the
+     * RPC is automatically cancelled.
+     *
+     * @param <U> return type of supplier
+     * @param supplier the supplier to be executed
+     * @param opDescription the description of this supplier
+     * @return CompletableFuture includes the result of supplier
+     */
+    protected  <U> CompletableFuture<U> supplyInContext(
+            Supplier<U> supplier, String opDescription) {
+        return supplyWithExecutor(supplier, opDescription, contextExecutor);
+    }
+
+    /**
+     * Submits a task for async execution via the given executor. All tasks
+     * submitted with this method will be executed sequentially.
+     *
+     * @param <U> return type of supplier
+     * @param supplier the supplier to be executed
+     * @param opDescription the description of this supplier
+     * @param executor the executor to execute this supplier
+     * @return CompletableFuture includes the result of supplier
+     */
+    protected <U> CompletableFuture<U> supplyWithExecutor(
+            Supplier<U> supplier, String opDescription, Executor executor) {
+        return CompletableFuture.supplyAsync(() -> {
+            // TODO: explore a more relaxed locking strategy.
+            try {
+                if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
+                    log.error("LOCK TIMEOUT! This is likely a deadlock, "
+                                    + "please debug (executing {})",
+                            opDescription);
+                    throw new IllegalThreadStateException("Lock timeout");
+                }
+            } catch (InterruptedException e) {
+                log.warn("Thread interrupted while waiting for lock (executing {})",
+                        opDescription);
+                throw new IllegalStateException(e);
+            }
+            try {
+                return supplier.get();
+            } catch (StatusRuntimeException ex) {
+                log.warn("Unable to execute {} on {}: {}",
+                        opDescription, deviceId, ex.toString());
+                throw ex;
+            } catch (Throwable ex) {
+                log.error("Exception in client of {}, executing {}",
+                        deviceId, opDescription, ex);
+                throw ex;
+            } finally {
+                requestLock.unlock();
+            }
+        }, executor);
+    }
+}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
new file mode 100644
index 0000000..9ae14da
--- /dev/null
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.ctl;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.NettyChannelBuilder;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.event.Event;
+import org.onosproject.event.EventListener;
+import org.onosproject.grpc.api.GrpcChannelController;
+import org.onosproject.grpc.api.GrpcChannelId;
+import org.onosproject.grpc.api.GrpcClient;
+import org.onosproject.grpc.api.GrpcClientController;
+import org.onosproject.grpc.api.GrpcClientKey;
+import org.onosproject.net.DeviceId;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Abstract class of a gRPC based client controller for specific gRPC client
+ * which provides basic gRPC client management and thread safe mechanism.
+ *
+ * @param <C> the gRPC client type
+ * @param <K> the key type of the gRPC client
+ * @param <E> the event type of the gRPC client
+ * @param <L> the event listener of event {@link E}
+ */
+@Component
+public abstract class AbstractGrpcClientController
+        <K extends GrpcClientKey, C extends GrpcClient, E extends Event, L extends EventListener<E>>
+        extends AbstractListenerManager<E, L>
+        implements GrpcClientController<K, C> {
+
+    /**
+     * The default max inbound message size (MB).
+     */
+    private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
+    private static final int MEGABYTES = 1024 * 1024;
+    private static final int DEFAULT_DEVICE_LOCK_SIZE = 30;
+
+    private final Logger log = getLogger(getClass());
+    private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
+    private final Map<K, C> clients = Maps.newHashMap();
+    private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+    private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private GrpcChannelController grpcChannelController;
+
+    @Activate
+    public void activate() {
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        clientKeys.keySet().forEach(this::removeClient);
+        clientKeys.clear();
+        clients.clear();
+        channelIds.clear();
+        log.info("Stopped");
+    }
+
+    @Override
+    public boolean createClient(K clientKey) {
+        checkNotNull(clientKey);
+        return withDeviceLock(() -> doCreateClient(clientKey), clientKey.deviceId());
+    }
+
+    private boolean doCreateClient(K clientKey) {
+        DeviceId deviceId = clientKey.deviceId();
+        String serverAddr = clientKey.serverAddr();
+        int serverPort = clientKey.serverPort();
+
+        if (clientKeys.containsKey(deviceId)) {
+            final GrpcClientKey existingKey = clientKeys.get(deviceId);
+            if (clientKey.equals(existingKey)) {
+                log.debug("Not creating client for {} as it already exists (key={})...",
+                        deviceId, clientKey);
+                return true;
+            } else {
+                log.info("Requested client for {} with new " +
+                                "endpoint, removing old client (key={})...",
+                        deviceId, clientKey);
+                doRemoveClient(deviceId);
+            }
+        }
+        log.info("Creating client for {} (server={}:{})...",
+                deviceId, serverAddr, serverPort);
+        GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(), clientKey.toString());
+        ManagedChannelBuilder channelBuilder = NettyChannelBuilder
+                .forAddress(serverAddr, serverPort)
+                .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES)
+                .usePlaintext();
+
+        ManagedChannel channel;
+        try {
+            channel = grpcChannelController.connectChannel(channelId, channelBuilder);
+        } catch (IOException e) {
+            log.warn("Unable to connect to gRPC server of {}: {}",
+                    clientKey.deviceId(), e.getMessage());
+            return false;
+        }
+
+        C client = createClientInstance(clientKey, channel);
+        if (client == null) {
+            log.warn("Cannot create client for {} (key={})", deviceId, clientKey);
+            return false;
+        }
+        clientKeys.put(deviceId, clientKey);
+        clients.put(clientKey, client);
+        channelIds.put(deviceId, channelId);
+
+        return true;
+    }
+
+    protected abstract C createClientInstance(K clientKey, ManagedChannel channel);
+
+    @Override
+    public C getClient(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        return withDeviceLock(() -> doGetClient(deviceId), deviceId);
+    }
+
+    protected C doGetClient(DeviceId deviceId) {
+        if (!clientKeys.containsKey(deviceId)) {
+            return null;
+        }
+        return clients.get(clientKeys.get(deviceId));
+    }
+
+    @Override
+    public void removeClient(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
+    }
+
+    private Void doRemoveClient(DeviceId deviceId) {
+        if (clientKeys.containsKey(deviceId)) {
+            final K clientKey = clientKeys.get(deviceId);
+            clients.get(clientKey).shutdown();
+            grpcChannelController.disconnectChannel(channelIds.get(deviceId));
+            clientKeys.remove(deviceId);
+            clients.remove(clientKey);
+            channelIds.remove(deviceId);
+        }
+        return null;
+    }
+
+    @Override
+    public boolean isReachable(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        return withDeviceLock(() -> doIsReachable(deviceId), deviceId);
+    }
+
+    protected boolean doIsReachable(DeviceId deviceId) {
+        // Default behaviour checks only the gRPC channel, should
+        // check according to different gRPC service
+        if (!clientKeys.containsKey(deviceId)) {
+            log.debug("No client for {}, can't check for reachability", deviceId);
+            return false;
+        }
+        return grpcChannelController.isChannelOpen(channelIds.get(deviceId));
+    }
+
+    private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
+        final Lock lock = stripedLocks.get(deviceId);
+        lock.lock();
+        try {
+            return task.get();
+        } finally {
+            lock.unlock();
+        }
+    }
+}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
similarity index 97%
rename from protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
rename to protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
index fb87571..6bd6a82 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
@@ -41,8 +41,8 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.grpc.api.GrpcChannelController;
 import org.onosproject.grpc.api.GrpcChannelId;
-import org.onosproject.grpc.api.GrpcController;
 import org.onosproject.grpc.proto.dummy.Dummy;
 import org.onosproject.grpc.proto.dummy.DummyServiceGrpc;
 import org.onosproject.net.DeviceId;
@@ -64,12 +64,13 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
- * Default implementation of the GrpcController.
+ * Default implementation of the GrpcChannelController.
  */
 @Component(immediate = true)
 @Service
-public class GrpcControllerImpl implements GrpcController {
+public class GrpcChannelControllerImpl implements GrpcChannelController {
 
+    // FIXME: Should use message size to determine whether it needs to log the message or not.
     private  static final String SET_FORWARDING_PIPELINE_CONFIG_METHOD = "p4.P4Runtime/SetForwardingPipelineConfig";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)