[ONOS-7584] Adding Capability of re-connecting to a P4Runtime Device.
Also addresses ONOS-7359
Change-Id: I47ec4ed429af82feb225ab5ac180b94c91366a53
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 3c28478..47ebe68 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -16,9 +16,9 @@
package org.onosproject.p4runtime.ctl;
-import com.google.common.cache.LoadingCache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -34,6 +34,8 @@
import org.onosproject.grpc.api.GrpcChannelId;
import org.onosproject.grpc.api.GrpcController;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.ChannelEvent;
+import org.onosproject.net.device.ChannelListener;
import org.onosproject.p4runtime.api.P4RuntimeClient;
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
@@ -43,6 +45,8 @@
import org.slf4j.Logger;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
@@ -67,6 +71,7 @@
private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
private final Map<DeviceId, P4RuntimeClient> clients = Maps.newHashMap();
private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+ private final Map<DeviceId, List<ChannelListener>> channelListeners = Maps.newConcurrentMap();
private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
.expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
.build(new CacheLoader<DeviceId, ReadWriteLock>() {
@@ -111,6 +116,8 @@
try {
if (clients.containsKey(deviceId)) {
+ // TODO might want to consider a more fine-grained check such as same port/p4DeviceId
+ log.warn("A client already exists for {}", deviceId);
throw new IllegalStateException(format("A client already exists for %s", deviceId));
} else {
return doCreateClient(deviceId, p4DeviceId, channelBuilder);
@@ -205,7 +212,62 @@
return electionIdGenerator.incrementAndGet();
}
+ @Override
+ public void addChannelListener(DeviceId deviceId, ChannelListener listener) {
+ channelListeners.compute(deviceId, (devId, listeners) -> {
+ List<ChannelListener> newListeners;
+ if (listeners != null) {
+ newListeners = listeners;
+ } else {
+ newListeners = new ArrayList<>();
+ }
+ newListeners.add(listener);
+ return newListeners;
+ });
+ }
+
+ @Override
+ public void removeChannelListener(DeviceId deviceId, ChannelListener listener) {
+ channelListeners.compute(deviceId, (devId, listeners) -> {
+ if (listeners != null) {
+ listeners.remove(listener);
+ return listeners;
+ } else {
+ log.debug("Device {} has no listener registered", deviceId);
+ return null;
+ }
+ });
+ }
+
public void postEvent(P4RuntimeEvent event) {
- post(event);
+ if (event.type().equals(P4RuntimeEvent.Type.CHANNEL_EVENT)) {
+ DefaultChannelEvent channelError = (DefaultChannelEvent) event.subject();
+ DeviceId deviceId = event.subject().deviceId();
+ ChannelEvent channelEvent = null;
+ //If disconnection is already known we propagate it.
+ if (channelError.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
+ channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
+ channelError.throwable());
+ } else if (channelError.type().equals(ChannelEvent.Type.CHANNEL_ERROR)) {
+ //If we don't know what the error is we check for reachability
+ if (!isReacheable(deviceId)) {
+ //if false the channel has disconnected
+ channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
+ channelError.throwable());
+ } else {
+ // else we propagate the event.
+ channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_ERROR, channelError.deviceId(),
+ channelError.throwable());
+ }
+ }
+ //Ignoring CHANNEL_CONNECTED
+ if (channelEvent != null && channelListeners.get(deviceId) != null) {
+ for (ChannelListener listener : channelListeners.get(deviceId)) {
+ listener.event(channelEvent);
+ }
+ }
+ } else {
+ post(event);
+ }
}
}