[ONOS-7584] Adding Capability of re-connecting to a P4Runtime Device.
Also addresses ONOS-7359

Change-Id: I47ec4ed429af82feb225ab5ac180b94c91366a53
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
index 7370dc7..8c83bee 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.p4runtime.ctl;
 
+import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
 import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
 import p4.P4RuntimeOuterClass.Uint128;
@@ -26,14 +27,17 @@
 public class DefaultArbitration implements P4RuntimeEventSubject {
     private MastershipRole role;
     private Uint128 electionId;
+    private DeviceId deviceId;
 
     /**
      * Creates arbitration with given role and election id.
      *
-     * @param role the role
+     * @param deviceId   the device
+     * @param role       the role
      * @param electionId the election id
      */
-    public DefaultArbitration(MastershipRole role, Uint128 electionId) {
+    public DefaultArbitration(DeviceId deviceId, MastershipRole role, Uint128 electionId) {
+        this.deviceId = deviceId;
         this.role = role;
         this.electionId = electionId;
     }
@@ -55,4 +59,9 @@
     public Uint128 electionId() {
         return electionId;
     }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java
new file mode 100644
index 0000000..5f5b3b9
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.ChannelEvent.Type;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+
+/**
+ * Default implementation of channel event in P4Runtime. It allows passing any type of event.
+ * If the event is an error a throwable can be directly passed.
+ * Any other type of event cause can be passed as string.
+ */
+public class DefaultChannelEvent implements P4RuntimeEventSubject {
+    private DeviceId deviceId;
+    private Type type;
+    private Throwable throwable;
+    private String message;
+
+    /**
+     * Creates channel event with given status and throwable.
+     *
+     * @param deviceId  the device
+     * @param type      error type
+     * @param throwable the cause
+     */
+    public DefaultChannelEvent(DeviceId deviceId, Type type, Throwable throwable) {
+        this.deviceId = deviceId;
+        this.type = type;
+        this.message = throwable.getMessage();
+        this.throwable = throwable;
+    }
+
+    /**
+     * Creates channel event with given status and string cause.
+     *
+     * @param deviceId the device
+     * @param type     error type
+     * @param message    the message
+     */
+    public DefaultChannelEvent(DeviceId deviceId, Type type, String message) {
+        this.deviceId = deviceId;
+        this.type = type;
+        this.message = message;
+        this.throwable = null;
+    }
+
+    /**
+     * Creates channel event with given status, cause and throwable.
+     *
+     * @param deviceId the device
+     * @param type     error type
+     * @param message the message
+     * @param throwable the cause
+     */
+    public DefaultChannelEvent(DeviceId deviceId, Type type, String message, Throwable throwable) {
+        this.deviceId = deviceId;
+        this.type = type;
+        this.message = message;
+        this.throwable = throwable;
+    }
+
+    /**
+     * Gets the type of this event.
+     *
+     * @return the error type
+     */
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Gets the message related to this event.
+     *
+     * @return the message
+     */
+    public String message() {
+        return message;
+    }
+
+
+    /**
+     * Gets throwable of this event.
+     * If no throwable is present returns null.
+     *
+     * @return the throwable
+     */
+    public Throwable throwable() {
+        return throwable;
+    }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 5fc153a..624c41c 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -33,6 +33,7 @@
 import org.onlab.util.Tools;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
+import org.onosproject.net.device.ChannelEvent;
 import org.onosproject.net.pi.model.PiActionProfileId;
 import org.onosproject.net.pi.model.PiCounterId;
 import org.onosproject.net.pi.model.PiMeterId;
@@ -272,6 +273,7 @@
     /* Blocking method implementations below */
 
     private boolean doArbitrationUpdate() {
+
         CompletableFuture<Boolean> result = new CompletableFuture<>();
         // TODO: currently we use 64-bit Long type for election id, should
         // we use 128-bit ?
@@ -502,7 +504,7 @@
             arbitrationRole = MastershipRole.STANDBY;
         }
 
-        DefaultArbitration arbitrationEventSubject = new DefaultArbitration(arbitrationRole, electionId);
+        DefaultArbitration arbitrationEventSubject = new DefaultArbitration(deviceId, arbitrationRole, electionId);
         P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
                                                   arbitrationEventSubject);
         controller.postEvent(event);
@@ -976,6 +978,9 @@
         @Override
         public void onError(Throwable throwable) {
             log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
+            controller.postEvent(new P4RuntimeEvent(P4RuntimeEvent.Type.CHANNEL_EVENT,
+                    new DefaultChannelEvent(deviceId, ChannelEvent.Type.CHANNEL_ERROR,
+                            throwable)));
             // FIXME: we might want to recreate the channel.
             // In general, we want to be robust against any transient error and, if the channel is open, make sure the
             // stream channel is always on.
@@ -984,7 +989,9 @@
         @Override
         public void onCompleted() {
             log.warn("Stream channel for {} has completed", deviceId);
-            // FIXME: same concern as before.
+            controller.postEvent(new P4RuntimeEvent(P4RuntimeEvent.Type.CHANNEL_EVENT,
+                    new DefaultChannelEvent(deviceId, ChannelEvent.Type.CHANNEL_DISCONNECTED,
+                            "Stream channel has completed")));
         }
     }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 3c28478..47ebe68 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -16,9 +16,9 @@
 
 package org.onosproject.p4runtime.ctl;
 
-import com.google.common.cache.LoadingCache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Maps;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
@@ -34,6 +34,8 @@
 import org.onosproject.grpc.api.GrpcChannelId;
 import org.onosproject.grpc.api.GrpcController;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.ChannelEvent;
+import org.onosproject.net.device.ChannelListener;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
@@ -43,6 +45,8 @@
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -67,6 +71,7 @@
     private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
     private final Map<DeviceId, P4RuntimeClient> clients = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+    private final Map<DeviceId, List<ChannelListener>> channelListeners = Maps.newConcurrentMap();
     private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
             .expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
             .build(new CacheLoader<DeviceId, ReadWriteLock>() {
@@ -111,6 +116,8 @@
 
         try {
             if (clients.containsKey(deviceId)) {
+                // TODO might want to consider a more fine-grained check such as same port/p4DeviceId
+                log.warn("A client already exists for {}", deviceId);
                 throw new IllegalStateException(format("A client already exists for %s", deviceId));
             } else {
                 return doCreateClient(deviceId, p4DeviceId, channelBuilder);
@@ -205,7 +212,62 @@
         return electionIdGenerator.incrementAndGet();
     }
 
+    @Override
+    public void addChannelListener(DeviceId deviceId, ChannelListener listener) {
+        channelListeners.compute(deviceId, (devId, listeners) -> {
+            List<ChannelListener> newListeners;
+            if (listeners != null) {
+                newListeners = listeners;
+            } else {
+                newListeners = new ArrayList<>();
+            }
+            newListeners.add(listener);
+            return newListeners;
+        });
+    }
+
+    @Override
+    public void removeChannelListener(DeviceId deviceId, ChannelListener listener) {
+        channelListeners.compute(deviceId, (devId, listeners) -> {
+            if (listeners != null) {
+                listeners.remove(listener);
+                return listeners;
+            } else {
+                log.debug("Device {} has no listener registered", deviceId);
+                return null;
+            }
+        });
+    }
+
     public void postEvent(P4RuntimeEvent event) {
-        post(event);
+        if (event.type().equals(P4RuntimeEvent.Type.CHANNEL_EVENT)) {
+            DefaultChannelEvent channelError = (DefaultChannelEvent) event.subject();
+            DeviceId deviceId = event.subject().deviceId();
+            ChannelEvent channelEvent = null;
+            //If disconnection is already known we propagate it.
+            if (channelError.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
+                channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
+                        channelError.throwable());
+            } else if (channelError.type().equals(ChannelEvent.Type.CHANNEL_ERROR)) {
+                //If we don't know what the error is we check for reachability
+                if (!isReacheable(deviceId)) {
+                    //if false the channel has disconnected
+                    channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
+                            channelError.throwable());
+                } else {
+                    // else we propagate the event.
+                    channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_ERROR, channelError.deviceId(),
+                            channelError.throwable());
+                }
+            }
+            //Ignoring CHANNEL_CONNECTED
+            if (channelEvent != null && channelListeners.get(deviceId) != null) {
+                for (ChannelListener listener : channelListeners.get(deviceId)) {
+                    listener.event(channelEvent);
+                }
+            }
+        } else {
+            post(event);
+        }
     }
 }