ONOS-6561 BMv2 handshaker via P4Runtime

+ support fort device-specific default pipeconf
+ improvements to P4runtime and gRPC protocol stuff

Change-Id: I8986fce3959df564454ea3d31859860f61eabcae
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index e6603ea..35e9726 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -20,6 +20,7 @@
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ExtensionRegistry;
 import com.google.protobuf.TextFormat;
+import io.grpc.Context;
 import io.grpc.ManagedChannel;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
@@ -64,6 +65,7 @@
     private final P4RuntimeGrpc.P4RuntimeStub asyncStub;
     private ExecutorService executorService;
     private StreamObserver<StreamMessageRequest> streamRequestObserver;
+    private Context.CancellableContext streamContext;
 
 
     P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller,
@@ -74,8 +76,7 @@
         this.executorService = executorService;
         this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel)
                 .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
-        this.asyncStub = P4RuntimeGrpc.newStub(channel)
-                .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
+        this.asyncStub = P4RuntimeGrpc.newStub(channel);
     }
 
     @Override
@@ -85,21 +86,44 @@
 
     private boolean doInitStreamChannel() {
         if (this.streamRequestObserver == null) {
-            this.streamRequestObserver = this.asyncStub.streamChannel(new StreamChannelResponseObserver());
+
+            streamContext = Context.current().withCancellation();
+            streamContext.run(
+                    () -> streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver()));
+
             // To listen for packets and other events, we need to start the RPC.
-            // Here we do it by sending an empty packet out.
-            try {
-                this.streamRequestObserver.onNext(StreamMessageRequest.newBuilder()
-                                                          .setPacket(PacketOut.getDefaultInstance())
-                                                          .build());
-            } catch (StatusRuntimeException e) {
-                log.warn("Unable to initialize stream channel for {}: {}", deviceId, e);
+            // Here we do it by sending a master arbitration update.
+            if (!doArbitrationUpdate()) {
+                log.warn("Unable to initialize stream channel for {}", deviceId);
                 return false;
             }
         }
         return true;
     }
 
+    private boolean doArbitrationUpdate() {
+
+        if (streamRequestObserver == null) {
+            log.error("Null request stream observer for {}", deviceId);
+            return false;
+        }
+
+        try {
+            StreamMessageRequest initRequest = StreamMessageRequest
+                    .newBuilder()
+                    .setArbitration(MasterArbitrationUpdate
+                                            .newBuilder()
+                                            .setDeviceId(p4DeviceId)
+                                            .build())
+                    .build();
+            streamRequestObserver.onNext(initRequest);
+            return true;
+        } catch (StatusRuntimeException e) {
+            log.warn("Arbitration update failed for {}: {}", deviceId, e);
+            return false;
+        }
+    }
+
     @Override
     public CompletableFuture<Boolean> setPipelineConfig(InputStream p4info, InputStream targetConfig) {
         return CompletableFuture.supplyAsync(() -> doSetPipelineConfig(p4info, targetConfig), executorService);
@@ -168,12 +192,15 @@
     @Override
     public void shutdown() {
 
-        if (this.streamRequestObserver != null) {
-            this.streamRequestObserver.onError(new StatusRuntimeException(Status.CANCELLED));
-            this.streamRequestObserver.onCompleted();
+        log.info("Shutting down client for {}...", deviceId);
+
+        if (streamRequestObserver != null) {
+            streamRequestObserver.onCompleted();
+            streamContext.cancel(null);
+            streamContext = null;
         }
 
-        this.executorService.shutdownNow();
+        this.executorService.shutdown();
         try {
             executorService.awaitTermination(5, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
@@ -216,12 +243,16 @@
 
         @Override
         public void onError(Throwable throwable) {
-            log.warn("Error on stream channel for {}: {}", deviceId, throwable);
+            log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
+            // FIXME: we might want to recreate the channel.
+            // In general, we want to be robust against any transient error and, if the channel is open, make sure the
+            // stream channel is always on.
         }
 
         @Override
         public void onCompleted() {
-            // TODO: declare the device as disconnected?
+            log.warn("Stream channel for {} has completed", deviceId);
+            // FIXME: same concern as before.
         }
     }
 
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index cd4151c..06376ca 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -39,8 +39,8 @@
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
@@ -62,10 +62,11 @@
     private final Logger log = getLogger(getClass());
 
     private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
-    private final Map<DeviceId, P4RuntimeClient> clients = Maps.newConcurrentMap();
-    private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newConcurrentMap();
+    private final Map<DeviceId, P4RuntimeClient> clients = Maps.newHashMap();
+    private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+
     // TODO: should use a cache to delete unused locks.
-    private final Map<DeviceId, Lock> deviceLocks = Maps.newConcurrentMap();
+    private final Map<DeviceId, ReadWriteLock> deviceLocks = Maps.newConcurrentMap();
 
     @Activate
     public void activate() {
@@ -85,8 +86,8 @@
         checkNotNull(deviceId);
         checkNotNull(channelBuilder);
 
-        deviceLocks.putIfAbsent(deviceId, new ReentrantLock());
-        deviceLocks.get(deviceId).lock();
+        deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
+        deviceLocks.get(deviceId).writeLock().lock();
 
         log.info("Creating client for {} (with internal device id {})...", deviceId, p4DeviceId);
 
@@ -97,11 +98,12 @@
                 return doCreateClient(deviceId, p4DeviceId, channelBuilder);
             }
         } finally {
-            deviceLocks.get(deviceId).unlock();
+            deviceLocks.get(deviceId).writeLock().unlock();
         }
     }
 
     private boolean doCreateClient(DeviceId deviceId, int p4DeviceId, ManagedChannelBuilder channelBuilder) {
+
         GrpcChannelId channelId = GrpcChannelId.of(deviceId, "p4runtime");
 
         // Channel defaults.
@@ -127,43 +129,62 @@
     @Override
     public P4RuntimeClient getClient(DeviceId deviceId) {
 
-        deviceLocks.putIfAbsent(deviceId, new ReentrantLock());
-        deviceLocks.get(deviceId).lock();
+        deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
+        deviceLocks.get(deviceId).readLock().lock();
 
         try {
             return clients.get(deviceId);
         } finally {
-            deviceLocks.get(deviceId).unlock();
+            deviceLocks.get(deviceId).readLock().unlock();
         }
     }
 
     @Override
     public void removeClient(DeviceId deviceId) {
 
-        deviceLocks.putIfAbsent(deviceId, new ReentrantLock());
-        deviceLocks.get(deviceId).lock();
+        deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
+        deviceLocks.get(deviceId).writeLock().lock();
 
         try {
             if (clients.containsKey(deviceId)) {
                 clients.get(deviceId).shutdown();
                 grpcController.disconnectChannel(channelIds.get(deviceId));
                 clients.remove(deviceId);
+                channelIds.remove(deviceId);
             }
         } finally {
-            deviceLocks.get(deviceId).unlock();
+            deviceLocks.get(deviceId).writeLock().unlock();
         }
     }
 
     @Override
     public boolean hasClient(DeviceId deviceId) {
 
-        deviceLocks.putIfAbsent(deviceId, new ReentrantLock());
-        deviceLocks.get(deviceId).lock();
+        deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
+        deviceLocks.get(deviceId).readLock().lock();
 
         try {
             return clients.containsKey(deviceId);
         } finally {
-            deviceLocks.get(deviceId).unlock();
+            deviceLocks.get(deviceId).readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean isReacheable(DeviceId deviceId) {
+
+        deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
+        deviceLocks.get(deviceId).readLock().lock();
+
+        try {
+            if (!clients.containsKey(deviceId)) {
+                log.warn("No client for {}, can't check for reachability", deviceId);
+                return false;
+            }
+
+            return grpcController.isChannelOpen(channelIds.get(deviceId));
+        } finally {
+            deviceLocks.get(deviceId).readLock().unlock();
         }
     }