[ONOS-7829] Implement AbstractGrpcClient and AbstractGrpcClientControl

Change-Id: I39cba6834e7fe8d1b60b576b9934c0b3cfa7104b
diff --git a/protocols/p4runtime/ctl/BUILD b/protocols/p4runtime/ctl/BUILD
index d6c0c7a..66aab43 100644
--- a/protocols/p4runtime/ctl/BUILD
+++ b/protocols/p4runtime/ctl/BUILD
@@ -1,6 +1,7 @@
 COMPILE_DEPS = CORE_DEPS + KRYO + [
     "//core/store/serializers:onos-core-serializers",
     "//protocols/grpc/api:onos-protocols-grpc-api",
+    "//protocols/grpc/ctl:onos-protocols-grpc-ctl",
     "//protocols/p4runtime/api:onos-protocols-p4runtime-api",
     "//protocols/p4runtime/proto:onos-protocols-p4runtime-proto",
     "@com_google_protobuf//:protobuf_java",
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ClientKey.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ClientKey.java
deleted file mode 100644
index cc4bed0..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ClientKey.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl;
-
-import com.google.common.base.MoreObjects;
-import org.onosproject.net.DeviceId;
-
-import java.util.Objects;
-
-/**
- * Key that uniquely identifies a P4Runtime client.
- */
-final class ClientKey {
-
-    private final DeviceId deviceId;
-    private final String serverAddr;
-    private final int serverPort;
-    private final long p4DeviceId;
-
-    /**
-     * Creates a new client key.
-     *
-     * @param deviceId   ONOS device ID
-     * @param serverAddr P4Runtime server address
-     * @param serverPort P4Runtime server port
-     * @param p4DeviceId P4Runtime server-internal device ID
-     */
-    ClientKey(DeviceId deviceId, String serverAddr, int serverPort, long p4DeviceId) {
-        this.deviceId = deviceId;
-        this.serverAddr = serverAddr;
-        this.serverPort = serverPort;
-        this.p4DeviceId = p4DeviceId;
-    }
-
-    /**
-     * Returns the device ID.
-     *
-     * @return device ID.
-     */
-    public DeviceId deviceId() {
-        return deviceId;
-    }
-
-    /**
-     * Returns the P4Runtime server address.
-     *
-     * @return P4Runtime server address
-     */
-    public String serverAddr() {
-        return serverAddr;
-    }
-
-    /**
-     * Returns the P4Runtime server port.
-     *
-     * @return P4Runtime server port
-     */
-    public int serverPort() {
-        return serverPort;
-    }
-
-    /**
-     * Returns the P4Runtime server-internal device ID.
-     *
-     * @return P4Runtime server-internal device ID
-     */
-    public long p4DeviceId() {
-        return p4DeviceId;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(serverAddr, serverPort, p4DeviceId);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        final ClientKey other = (ClientKey) obj;
-        return Objects.equals(this.serverAddr, other.serverAddr)
-                && Objects.equals(this.serverPort, other.serverPort)
-                && Objects.equals(this.p4DeviceId, other.p4DeviceId);
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-                .add("deviceId", deviceId)
-                .add("serverAddr", serverAddr)
-                .add("serverPort", serverPort)
-                .add("p4DeviceId", p4DeviceId)
-                .toString();
-    }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 821e744..dd968ce 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -23,7 +23,6 @@
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
-import io.grpc.Context;
 import io.grpc.ManagedChannel;
 import io.grpc.Metadata;
 import io.grpc.Status;
@@ -32,9 +31,8 @@
 import io.grpc.stub.ClientCallStreamObserver;
 import io.grpc.stub.StreamObserver;
 import org.onlab.osgi.DefaultServiceDirectory;
-import org.onlab.util.SharedExecutors;
 import org.onlab.util.Tools;
-import org.onosproject.net.DeviceId;
+import org.onosproject.grpc.ctl.AbstractGrpcClient;
 import org.onosproject.net.pi.model.PiActionProfileId;
 import org.onosproject.net.pi.model.PiCounterId;
 import org.onosproject.net.pi.model.PiMeterId;
@@ -52,8 +50,8 @@
 import org.onosproject.net.pi.runtime.PiTableEntry;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
-import org.slf4j.Logger;
 import p4.config.v1.P4InfoOuterClass.P4Info;
 import p4.tmp.P4Config;
 import p4.v1.P4RuntimeGrpc;
@@ -87,22 +85,13 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
 import static java.util.Collections.singletonList;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
 import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
 import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
 import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.PACKET_REPLICATION_ENGINE_ENTRY;
@@ -115,10 +104,7 @@
 /**
  * Implementation of a P4Runtime client.
  */
-final class P4RuntimeClientImpl implements P4RuntimeClient {
-
-    // Timeout in seconds to obtain the request lock.
-    private static final int LOCK_TIMEOUT = 60;
+final class P4RuntimeClientImpl extends AbstractGrpcClient implements P4RuntimeClient {
 
     private static final Metadata.Key<com.google.rpc.Status> STATUS_DETAILS_KEY =
             Metadata.Key.of("grpc-status-details-bin",
@@ -132,18 +118,9 @@
             WriteOperationType.DELETE, Update.Type.DELETE
     );
 
-    private final Logger log = getLogger(getClass());
-
-    private final Lock requestLock = new ReentrantLock();
-    private final Context.CancellableContext cancellableContext =
-            Context.current().withCancellation();
-
-    private final DeviceId deviceId;
     private final long p4DeviceId;
     private final P4RuntimeControllerImpl controller;
     private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
-    private final ExecutorService executorService;
-    private final Executor contextExecutor;
     private StreamChannelManager streamChannelManager;
 
     // Used by this client for write requests.
@@ -154,70 +131,22 @@
     /**
      * Default constructor.
      *
-     * @param deviceId   the ONOS device id
-     * @param p4DeviceId the P4 device id
+     * @param clientKey  the client key of this client
      * @param channel    gRPC channel
      * @param controller runtime client controller
      */
-    P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
+    P4RuntimeClientImpl(P4RuntimeClientKey clientKey, ManagedChannel channel,
                         P4RuntimeControllerImpl controller) {
-        this.deviceId = deviceId;
-        this.p4DeviceId = p4DeviceId;
+
+        super(clientKey, channel);
+        this.p4DeviceId = clientKey.p4DeviceId();
         this.controller = controller;
-        this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
-                "onos-p4runtime-client-" + deviceId.toString(), "%d"));
-        this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
+
         //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
         this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
         this.streamChannelManager = new StreamChannelManager(channel);
     }
 
-    /**
-     * Submits a task for async execution via the given executor. All tasks
-     * submitted with this method will be executed sequentially.
-     */
-    private <U> CompletableFuture<U> supplyWithExecutor(
-            Supplier<U> supplier, String opDescription, Executor executor) {
-        return CompletableFuture.supplyAsync(() -> {
-            // TODO: explore a more relaxed locking strategy.
-            try {
-                if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
-                    log.error("LOCK TIMEOUT! This is likely a deadlock, "
-                                      + "please debug (executing {})",
-                              opDescription);
-                    throw new IllegalThreadStateException("Lock timeout");
-                }
-            } catch (InterruptedException e) {
-                log.warn("Thread interrupted while waiting for lock (executing {})",
-                         opDescription);
-                throw new IllegalStateException(e);
-            }
-            try {
-                return supplier.get();
-            } catch (StatusRuntimeException ex) {
-                log.warn("Unable to execute {} on {}: {}",
-                         opDescription, deviceId, ex.toString());
-                throw ex;
-            } catch (Throwable ex) {
-                log.error("Exception in client of {}, executing {}",
-                          deviceId, opDescription, ex);
-                throw ex;
-            } finally {
-                requestLock.unlock();
-            }
-        }, executor);
-    }
-
-    /**
-     * Equivalent of supplyWithExecutor using the gRPC context executor of this
-     * client, such that if the context is cancelled (e.g. client shutdown) the
-     * RPC is automatically cancelled.
-     */
-    private <U> CompletableFuture<U> supplyInContext(
-            Supplier<U> supplier, String opDescription) {
-        return supplyWithExecutor(supplier, opDescription, contextExecutor);
-    }
-
     @Override
     public CompletableFuture<Boolean> startStreamChannel() {
         return supplyInContext(() -> sendMasterArbitrationUpdate(false),
@@ -225,12 +154,6 @@
     }
 
     @Override
-    public CompletableFuture<Void> shutdown() {
-        return supplyWithExecutor(this::doShutdown, "shutdown",
-                                  SharedExecutors.getPoolThreadExecutor());
-    }
-
-    @Override
     public CompletableFuture<Boolean> becomeMaster() {
         return supplyInContext(() -> sendMasterArbitrationUpdate(true),
                                "becomeMaster");
@@ -1154,19 +1077,9 @@
                 .build();
     }
 
-    private Void doShutdown() {
-        log.debug("Shutting down client for {}...", deviceId);
+    protected Void doShutdown() {
         streamChannelManager.complete();
-        cancellableContext.cancel(new InterruptedException(
-                "Requested client shutdown"));
-        this.executorService.shutdownNow();
-        try {
-            executorService.awaitTermination(5, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            log.warn("Executor service didn't shutdown in time.");
-            Thread.currentThread().interrupt();
-        }
-        return null;
+        return super.doShutdown();
     }
 
     // Returns the collection of succesfully write entities.
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index a140491..abafaf5 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -19,36 +19,29 @@
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Striped;
 import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.netty.NettyChannelBuilder;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.grpc.api.GrpcChannelId;
-import org.onosproject.grpc.api.GrpcController;
+import org.onosproject.grpc.ctl.AbstractGrpcClientController;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceAgentEvent;
 import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.onosproject.p4runtime.api.P4RuntimeEventListener;
 import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
-import java.io.IOException;
 import java.math.BigInteger;
-import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
-import java.util.function.Supplier;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -58,19 +51,12 @@
 @Component(immediate = true)
 @Service
 public class P4RuntimeControllerImpl
-        extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
+        extends AbstractGrpcClientController
+        <P4RuntimeClientKey, P4RuntimeClient, P4RuntimeEvent, P4RuntimeEventListener>
         implements P4RuntimeController {
 
-    // Getting the pipeline config from the device can take tens of MBs.
-    private static final int MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
-    private static final int MEGABYTES = 1024 * 1024;
-
     private final Logger log = getLogger(getClass());
 
-    private final Map<DeviceId, ClientKey> clientKeys = Maps.newHashMap();
-    private final Map<ClientKey, P4RuntimeClient> clients = Maps.newHashMap();
-    private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
-
     private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
             deviceAgentListeners = Maps.newConcurrentMap();
     private final Striped<Lock> stripedLocks = Striped.lock(30);
@@ -78,151 +64,28 @@
     private DistributedElectionIdGenerator electionIdGenerator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private GrpcController grpcController;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private StorageService storageService;
 
     @Activate
     public void activate() {
+        super.activate();
         eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
         electionIdGenerator = new DistributedElectionIdGenerator(storageService);
         log.info("Started");
     }
 
-
     @Deactivate
     public void deactivate() {
-        clientKeys.keySet().forEach(this::removeClient);
-        clientKeys.clear();
-        clients.clear();
-        channelIds.clear();
+        super.deactivate();
         deviceAgentListeners.clear();
-        grpcController = null;
         electionIdGenerator.destroy();
         electionIdGenerator = null;
-        eventDispatcher.removeSink(P4RuntimeEvent.class);
         log.info("Stopped");
     }
 
     @Override
-    public boolean createClient(DeviceId deviceId, String serverAddr,
-                                int serverPort, long p4DeviceId) {
-        checkNotNull(deviceId);
-        checkNotNull(serverAddr);
-        checkArgument(serverPort > 0, "Invalid server port");
-
-        return withDeviceLock(() -> doCreateClient(
-                deviceId, serverAddr, serverPort, p4DeviceId), deviceId);
-    }
-
-    private boolean doCreateClient(DeviceId deviceId, String serverAddr,
-                                   int serverPort, long p4DeviceId) {
-
-        ClientKey clientKey = new ClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
-
-        if (clientKeys.containsKey(deviceId)) {
-            final ClientKey existingKey = clientKeys.get(deviceId);
-            if (clientKey.equals(existingKey)) {
-                log.debug("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
-                          deviceId, serverAddr, serverPort, p4DeviceId);
-                return true;
-            } else {
-                log.info("Requested client for {} with new " +
-                                 "endpoint, removing old client (server={}:{}, " +
-                                 "p4DeviceId={})...",
-                         deviceId, existingKey.serverAddr(),
-                         existingKey.serverPort(), existingKey.p4DeviceId());
-                doRemoveClient(deviceId);
-            }
-        }
-
-        log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
-                 deviceId, serverAddr, serverPort, p4DeviceId);
-
-        GrpcChannelId channelId = GrpcChannelId.of(
-                clientKey.deviceId(), "p4runtime-" + clientKey);
-
-        ManagedChannelBuilder channelBuilder = NettyChannelBuilder
-                .forAddress(serverAddr, serverPort)
-                .maxInboundMessageSize(MAX_INBOUND_MSG_SIZE * MEGABYTES)
-                .usePlaintext();
-
-        ManagedChannel channel;
-        try {
-            channel = grpcController.connectChannel(channelId, channelBuilder);
-        } catch (IOException e) {
-            log.warn("Unable to connect to gRPC server of {}: {}",
-                     clientKey.deviceId(), e.getMessage());
-            return false;
-        }
-
-        P4RuntimeClient client = new P4RuntimeClientImpl(
-                clientKey.deviceId(), clientKey.p4DeviceId(), channel, this);
-
-        clientKeys.put(clientKey.deviceId(), clientKey);
-        clients.put(clientKey, client);
-        channelIds.put(clientKey.deviceId(), channelId);
-
-        return true;
-    }
-
-    @Override
-    public P4RuntimeClient getClient(DeviceId deviceId) {
-        if (deviceId == null) {
-            return null;
-        }
-        return withDeviceLock(() -> doGetClient(deviceId), deviceId);
-    }
-
-    private P4RuntimeClient doGetClient(DeviceId deviceId) {
-        if (!clientKeys.containsKey(deviceId)) {
-            return null;
-        } else {
-            return clients.get(clientKeys.get(deviceId));
-        }
-    }
-
-    @Override
-    public void removeClient(DeviceId deviceId) {
-        if (deviceId == null) {
-            return;
-        }
-        withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
-    }
-
-    private Void doRemoveClient(DeviceId deviceId) {
-        if (clientKeys.containsKey(deviceId)) {
-            final ClientKey clientKey = clientKeys.get(deviceId);
-            clients.get(clientKey).shutdown();
-            grpcController.disconnectChannel(channelIds.get(deviceId));
-            clientKeys.remove(deviceId);
-            clients.remove(clientKey);
-            channelIds.remove(deviceId);
-        }
-        return null;
-    }
-
-    @Override
-    public boolean hasClient(DeviceId deviceId) {
-        return clientKeys.containsKey(deviceId);
-    }
-
-    @Override
-    public boolean isReachable(DeviceId deviceId) {
-        if (deviceId == null) {
-            return false;
-        }
-        return withDeviceLock(() -> doIsReacheable(deviceId), deviceId);
-    }
-
-    private boolean doIsReacheable(DeviceId deviceId) {
-        // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
-        if (!clientKeys.containsKey(deviceId)) {
-            log.debug("No client for {}, can't check for reachability", deviceId);
-            return false;
-        }
-        return grpcController.isChannelOpen(channelIds.get(deviceId));
+    protected P4RuntimeClient createClientInstance(P4RuntimeClientKey clientKey, ManagedChannel channel) {
+        return new P4RuntimeClientImpl(clientKey, channel, this);
     }
 
     @Override
@@ -244,16 +107,6 @@
         });
     }
 
-    private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
-        final Lock lock = stripedLocks.get(deviceId);
-        lock.lock();
-        try {
-            return task.get();
-        } finally {
-            lock.unlock();
-        }
-    }
-
     BigInteger newMasterElectionId(DeviceId deviceId) {
         return electionIdGenerator.generate(deviceId);
     }
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index 36e60ea..301d5e1 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -44,6 +44,7 @@
 import org.onosproject.net.pi.runtime.PiActionGroupMember;
 import org.onosproject.net.pi.runtime.PiActionGroupMemberId;
 import org.onosproject.net.pi.runtime.PiActionParam;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
 import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
 import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
 import p4.v1.P4RuntimeOuterClass.Entity;
@@ -100,6 +101,8 @@
     private static final String GRPC_SERVER_NAME = "P4RuntimeGroupTest";
     private static final long DEFAULT_TIMEOUT_TIME = 10;
     private static final Uint128 DEFAULT_ELECTION_ID = Uint128.newBuilder().setLow(1).build();
+    private static final String P4R_IP = "127.0.0.1";
+    private static final int P4R_PORT = 50010;
 
     private P4RuntimeClientImpl client;
     private P4RuntimeControllerImpl controller;
@@ -152,9 +155,8 @@
     @Before
     public void setup() {
         controller = niceMock(P4RuntimeControllerImpl.class);
-        client = new P4RuntimeClientImpl(DEVICE_ID, P4_DEVICE_ID,
-                                         grpcChannel,
-                                         controller);
+        P4RuntimeClientKey clientKey = new P4RuntimeClientKey(DEVICE_ID, P4R_IP, P4R_PORT, P4_DEVICE_ID);
+        client = new P4RuntimeClientImpl(clientKey, grpcChannel, controller);
         client.becomeMaster();
     }