[ONOS-7584] Adding Capability of re-connecting to a P4Runtime Device.
Also addresses ONOS-7359
Change-Id: I47ec4ed429af82feb225ab5ac180b94c91366a53
diff --git a/core/api/src/main/java/org/onosproject/net/device/ChannelEvent.java b/core/api/src/main/java/org/onosproject/net/device/ChannelEvent.java
new file mode 100644
index 0000000..98ea5cb
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/device/ChannelEvent.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.device;
+
+import org.onlab.util.Tools;
+import org.onosproject.event.AbstractEvent;
+import org.onosproject.net.DeviceId;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Describes event related to a channel established by ONOS with a device.
+ */
+public class ChannelEvent extends AbstractEvent<ChannelEvent.Type, DeviceId> {
+
+ private final Throwable throwable;
+
+ /**
+ * Type of device events.
+ */
+ public enum Type {
+ /**
+ * Signifies that the channel has properly connected.
+ */
+ CHANNEL_CONNECTED,
+
+ /**
+ * Signifies that the channel has disconnected.
+ */
+ CHANNEL_DISCONNECTED,
+
+ /**
+ * Signifies that an error happened on the channel with the given device.
+ */
+ CHANNEL_ERROR
+
+ }
+
+ /**
+ * Creates an event of a given type and for the specified device.
+ *
+ * @param type device event type
+ * @param deviceId event device subject
+ */
+ public ChannelEvent(Type type, DeviceId deviceId) {
+ this(type, deviceId, null);
+ }
+
+ /**
+ * Creates an event of a given type and for the specified device, given a certain throwable.
+ *
+ * @param type device event type
+ * @param deviceId event device subject
+ * @param throwable exception happened on the channel
+ */
+ public ChannelEvent(Type type, DeviceId deviceId, Throwable throwable) {
+ super(type, deviceId);
+ this.throwable = throwable;
+ }
+
+ /**
+ * Creates an event of a given type and for the specified device and the current time.
+ *
+ * @param type device event type
+ * @param deviceId event device subject
+ * @param throwable exception happened on the channel
+ * @param time occurrence time
+ */
+ public ChannelEvent(Type type, DeviceId deviceId, Throwable throwable, long time) {
+ super(type, deviceId, time);
+ this.throwable = throwable;
+ }
+
+ /**
+ * Returns the exception that happened on the channel.
+ *
+ * @return a throwable if associated to the event, otherwise null.
+ */
+ public Throwable throwable() {
+ return throwable;
+ }
+
+ @Override
+ public String toString() {
+ if (throwable == null) {
+ return super.toString();
+ }
+ return toStringHelper(this)
+ .add("time", Tools.defaultOffsetDataTime(time()))
+ .add("type", type())
+ .add("subject", subject())
+ .add("throwable", throwable)
+ .toString();
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/device/ChannelListener.java b/core/api/src/main/java/org/onosproject/net/device/ChannelListener.java
new file mode 100644
index 0000000..cdc90bf70
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/device/ChannelListener.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.device;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * A listener to receive events related to a transport channel established with a device.
+ */
+public interface ChannelListener extends EventListener<ChannelEvent> {
+}
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java b/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
index eb3c19c..606248a 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
@@ -46,4 +46,22 @@
*/
CompletableFuture<MastershipRole> roleChanged(MastershipRole newRole);
+ /**
+ * Applies a listener to a channel established with the device.
+ *
+ * @param listener the channel listener
+ */
+ default void addChannelListener(ChannelListener listener) {
+ throw new UnsupportedOperationException("Listener Registration not supported");
+ }
+
+ /**
+ * Removes a listener to a channel established with the device.
+ *
+ * @param listener the channel listener
+ */
+ default void removeChannelListener(ChannelListener listener) {
+ throw new UnsupportedOperationException("Listener Removal not supported");
+ }
+
}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
index f49ad18..58cab8a 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
@@ -18,6 +18,7 @@
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
+import org.onosproject.net.device.ChannelListener;
import org.onosproject.net.device.DeviceHandshaker;
import org.onosproject.p4runtime.api.P4RuntimeController;
@@ -86,4 +87,14 @@
}
return result;
}
+
+ @Override
+ public void addChannelListener(ChannelListener listener) {
+ controller.addChannelListener(deviceId, listener);
+ }
+
+ @Override
+ public void removeChannelListener(ChannelListener listener) {
+ controller.removeChannelListener(deviceId, listener);
+ }
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index 3e015a4..d8bb9c7 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -20,6 +20,7 @@
import io.grpc.ManagedChannelBuilder;
import org.onosproject.event.ListenerService;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.ChannelListener;
/**
* Controller of P4Runtime devices.
@@ -82,4 +83,22 @@
* @return the election id
*/
long getNewMasterElectionId();
+
+ /**
+ * Adds a listener for P4Runtime client-server channel events.
+ * If the channel for the device is not present and/or established the listener will get notified
+ * only after channel setup.
+ *
+ * @param deviceId device identifier
+ * @param listener the channel listener
+ */
+ void addChannelListener(DeviceId deviceId, ChannelListener listener);
+
+ /**
+ * Removes the listener for P4Runtime client-server channel events.
+ *
+ * @param deviceId device identifier
+ * @param listener the channel listener
+ */
+ void removeChannelListener(DeviceId deviceId, ChannelListener listener);
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
index ab0146f..ae29097 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
@@ -38,6 +38,11 @@
* Arbitration reply.
*/
ARBITRATION,
+
+ /**
+ * Channel Event.
+ */
+ CHANNEL_EVENT
}
public P4RuntimeEvent(Type type, P4RuntimeEventSubject subject) {
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEventSubject.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEventSubject.java
index 7709de3..8363e53 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEventSubject.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEventSubject.java
@@ -17,6 +17,7 @@
package org.onosproject.p4runtime.api;
import com.google.common.annotations.Beta;
+import org.onosproject.net.DeviceId;
/**
* Information about an event generated by a P4Runtime device .
@@ -24,4 +25,11 @@
@Beta
public interface P4RuntimeEventSubject {
+ /**
+ * Returns the deviceId associated to this subject.
+ *
+ * @return the deviceid
+ */
+ DeviceId deviceId();
+
}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePacketIn.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePacketIn.java
index 3319759..c217065 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePacketIn.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePacketIn.java
@@ -17,7 +17,6 @@
package org.onosproject.p4runtime.api;
import com.google.common.annotations.Beta;
-import org.onosproject.net.DeviceId;
import org.onosproject.net.pi.runtime.PiPacketOperation;
/**
@@ -27,13 +26,6 @@
public interface P4RuntimePacketIn extends P4RuntimeEventSubject {
/**
- * Returns the identifier of the device that generated this packet-in.
- *
- * @return device identifier
- */
- DeviceId deviceId();
-
- /**
* Returns the packet operation corresponding to this packet-in event.
*
* @return pi packet operation
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
index 7370dc7..8c83bee 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
@@ -16,6 +16,7 @@
package org.onosproject.p4runtime.ctl;
+import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
import p4.P4RuntimeOuterClass.Uint128;
@@ -26,14 +27,17 @@
public class DefaultArbitration implements P4RuntimeEventSubject {
private MastershipRole role;
private Uint128 electionId;
+ private DeviceId deviceId;
/**
* Creates arbitration with given role and election id.
*
- * @param role the role
+ * @param deviceId the device
+ * @param role the role
* @param electionId the election id
*/
- public DefaultArbitration(MastershipRole role, Uint128 electionId) {
+ public DefaultArbitration(DeviceId deviceId, MastershipRole role, Uint128 electionId) {
+ this.deviceId = deviceId;
this.role = role;
this.electionId = electionId;
}
@@ -55,4 +59,9 @@
public Uint128 electionId() {
return electionId;
}
+
+ @Override
+ public DeviceId deviceId() {
+ return deviceId;
+ }
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java
new file mode 100644
index 0000000..5f5b3b9
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.ChannelEvent.Type;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+
+/**
+ * Default implementation of channel event in P4Runtime. It allows passing any type of event.
+ * If the event is an error a throwable can be directly passed.
+ * Any other type of event cause can be passed as string.
+ */
+public class DefaultChannelEvent implements P4RuntimeEventSubject {
+ private DeviceId deviceId;
+ private Type type;
+ private Throwable throwable;
+ private String message;
+
+ /**
+ * Creates channel event with given status and throwable.
+ *
+ * @param deviceId the device
+ * @param type error type
+ * @param throwable the cause
+ */
+ public DefaultChannelEvent(DeviceId deviceId, Type type, Throwable throwable) {
+ this.deviceId = deviceId;
+ this.type = type;
+ this.message = throwable.getMessage();
+ this.throwable = throwable;
+ }
+
+ /**
+ * Creates channel event with given status and string cause.
+ *
+ * @param deviceId the device
+ * @param type error type
+ * @param message the message
+ */
+ public DefaultChannelEvent(DeviceId deviceId, Type type, String message) {
+ this.deviceId = deviceId;
+ this.type = type;
+ this.message = message;
+ this.throwable = null;
+ }
+
+ /**
+ * Creates channel event with given status, cause and throwable.
+ *
+ * @param deviceId the device
+ * @param type error type
+ * @param message the message
+ * @param throwable the cause
+ */
+ public DefaultChannelEvent(DeviceId deviceId, Type type, String message, Throwable throwable) {
+ this.deviceId = deviceId;
+ this.type = type;
+ this.message = message;
+ this.throwable = throwable;
+ }
+
+ /**
+ * Gets the type of this event.
+ *
+ * @return the error type
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Gets the message related to this event.
+ *
+ * @return the message
+ */
+ public String message() {
+ return message;
+ }
+
+
+ /**
+ * Gets throwable of this event.
+ * If no throwable is present returns null.
+ *
+ * @return the throwable
+ */
+ public Throwable throwable() {
+ return throwable;
+ }
+
+ @Override
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 5fc153a..624c41c 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -33,6 +33,7 @@
import org.onlab.util.Tools;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
+import org.onosproject.net.device.ChannelEvent;
import org.onosproject.net.pi.model.PiActionProfileId;
import org.onosproject.net.pi.model.PiCounterId;
import org.onosproject.net.pi.model.PiMeterId;
@@ -272,6 +273,7 @@
/* Blocking method implementations below */
private boolean doArbitrationUpdate() {
+
CompletableFuture<Boolean> result = new CompletableFuture<>();
// TODO: currently we use 64-bit Long type for election id, should
// we use 128-bit ?
@@ -502,7 +504,7 @@
arbitrationRole = MastershipRole.STANDBY;
}
- DefaultArbitration arbitrationEventSubject = new DefaultArbitration(arbitrationRole, electionId);
+ DefaultArbitration arbitrationEventSubject = new DefaultArbitration(deviceId, arbitrationRole, electionId);
P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
arbitrationEventSubject);
controller.postEvent(event);
@@ -976,6 +978,9 @@
@Override
public void onError(Throwable throwable) {
log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
+ controller.postEvent(new P4RuntimeEvent(P4RuntimeEvent.Type.CHANNEL_EVENT,
+ new DefaultChannelEvent(deviceId, ChannelEvent.Type.CHANNEL_ERROR,
+ throwable)));
// FIXME: we might want to recreate the channel.
// In general, we want to be robust against any transient error and, if the channel is open, make sure the
// stream channel is always on.
@@ -984,7 +989,9 @@
@Override
public void onCompleted() {
log.warn("Stream channel for {} has completed", deviceId);
- // FIXME: same concern as before.
+ controller.postEvent(new P4RuntimeEvent(P4RuntimeEvent.Type.CHANNEL_EVENT,
+ new DefaultChannelEvent(deviceId, ChannelEvent.Type.CHANNEL_DISCONNECTED,
+ "Stream channel has completed")));
}
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 3c28478..47ebe68 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -16,9 +16,9 @@
package org.onosproject.p4runtime.ctl;
-import com.google.common.cache.LoadingCache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -34,6 +34,8 @@
import org.onosproject.grpc.api.GrpcChannelId;
import org.onosproject.grpc.api.GrpcController;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.ChannelEvent;
+import org.onosproject.net.device.ChannelListener;
import org.onosproject.p4runtime.api.P4RuntimeClient;
import org.onosproject.p4runtime.api.P4RuntimeController;
import org.onosproject.p4runtime.api.P4RuntimeEvent;
@@ -43,6 +45,8 @@
import org.slf4j.Logger;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
@@ -67,6 +71,7 @@
private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
private final Map<DeviceId, P4RuntimeClient> clients = Maps.newHashMap();
private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+ private final Map<DeviceId, List<ChannelListener>> channelListeners = Maps.newConcurrentMap();
private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
.expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
.build(new CacheLoader<DeviceId, ReadWriteLock>() {
@@ -111,6 +116,8 @@
try {
if (clients.containsKey(deviceId)) {
+ // TODO might want to consider a more fine-grained check such as same port/p4DeviceId
+ log.warn("A client already exists for {}", deviceId);
throw new IllegalStateException(format("A client already exists for %s", deviceId));
} else {
return doCreateClient(deviceId, p4DeviceId, channelBuilder);
@@ -205,7 +212,62 @@
return electionIdGenerator.incrementAndGet();
}
+ @Override
+ public void addChannelListener(DeviceId deviceId, ChannelListener listener) {
+ channelListeners.compute(deviceId, (devId, listeners) -> {
+ List<ChannelListener> newListeners;
+ if (listeners != null) {
+ newListeners = listeners;
+ } else {
+ newListeners = new ArrayList<>();
+ }
+ newListeners.add(listener);
+ return newListeners;
+ });
+ }
+
+ @Override
+ public void removeChannelListener(DeviceId deviceId, ChannelListener listener) {
+ channelListeners.compute(deviceId, (devId, listeners) -> {
+ if (listeners != null) {
+ listeners.remove(listener);
+ return listeners;
+ } else {
+ log.debug("Device {} has no listener registered", deviceId);
+ return null;
+ }
+ });
+ }
+
public void postEvent(P4RuntimeEvent event) {
- post(event);
+ if (event.type().equals(P4RuntimeEvent.Type.CHANNEL_EVENT)) {
+ DefaultChannelEvent channelError = (DefaultChannelEvent) event.subject();
+ DeviceId deviceId = event.subject().deviceId();
+ ChannelEvent channelEvent = null;
+ //If disconnection is already known we propagate it.
+ if (channelError.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
+ channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
+ channelError.throwable());
+ } else if (channelError.type().equals(ChannelEvent.Type.CHANNEL_ERROR)) {
+ //If we don't know what the error is we check for reachability
+ if (!isReacheable(deviceId)) {
+ //if false the channel has disconnected
+ channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
+ channelError.throwable());
+ } else {
+ // else we propagate the event.
+ channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_ERROR, channelError.deviceId(),
+ channelError.throwable());
+ }
+ }
+ //Ignoring CHANNEL_CONNECTED
+ if (channelEvent != null && channelListeners.get(deviceId) != null) {
+ for (ChannelListener listener : channelListeners.get(deviceId)) {
+ listener.event(channelEvent);
+ }
+ }
+ } else {
+ post(event);
+ }
}
}
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 48d8007..550ca59 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -50,6 +50,8 @@
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.config.basics.BasicDeviceConfig;
import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.device.ChannelEvent;
+import org.onosproject.net.device.ChannelListener;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceDescriptionDiscovery;
@@ -118,6 +120,9 @@
public static final int REACHABILITY_TIMEOUT = 10;
public static final String DEPLOY = "deploy-";
public static final String PIPECONF_TOPIC = "-pipeconf";
+ public static final String CHECK = "check-";
+ public static final String CONNECTION = "-connection";
+ private static final String POLL_FREQUENCY = "pollFrequency";
private final Logger log = getLogger(getClass());
@@ -152,11 +157,17 @@
protected LeadershipService leadershipService;
private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
- @Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
+ @Property(name = POLL_FREQUENCY, intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
label = "Configure poll frequency for port status and statistics; " +
"default is 10 sec")
private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
+ private static final int DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS = 10;
+ @Property(name = "deviceAvailabilityPollFrequency", intValue = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS,
+ label = "Configure poll frequency for checking device availability; " +
+ "default is 10 sec")
+ private int deviceAvailabilityPollFrequency = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS;
+
protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
protected static final String URI_SCHEME = "device";
protected static final String CFG_SCHEME = "generalprovider";
@@ -185,6 +196,10 @@
= newScheduledThreadPool(CORE_POOL_SIZE,
groupedThreads("onos/generaldeviceprovider-port-stats",
"port-stats-executor-%d", log));
+ protected ScheduledExecutorService availabilityCheckExecutor
+ = newScheduledThreadPool(CORE_POOL_SIZE,
+ groupedThreads("onos/generaldeviceprovider-availability-check",
+ "availability-check-executor-%d", log));
protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
protected DeviceProviderService providerService;
@@ -201,10 +216,11 @@
};
protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
+ private ChannelListener channelListener = new InternalChannelListener();
@Activate
- public void activate() {
+ public void activate(ComponentContext context) {
providerService = providerRegistry.register(this);
componentConfigService.registerProperties(getClass());
coreService.registerApplication(APP_NAME);
@@ -216,6 +232,10 @@
// are activated, failing due to not finding the driver.
cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
.forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
+ //Initiating a periodic check to see if any device is available again and reconnect it.
+ availabilityCheckExecutor.scheduleAtFixedRate(this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
+ deviceAvailabilityPollFrequency, TimeUnit.SECONDS);
+ modified(context);
log.info("Started");
}
@@ -223,7 +243,7 @@
public void modified(ComponentContext context) {
if (context != null) {
Dictionary<?, ?> properties = context.getProperties();
- pollFrequency = Tools.getIntegerProperty(properties, "pollFrequency",
+ pollFrequency = Tools.getIntegerProperty(properties, POLL_FREQUENCY,
DEFAULT_POLL_FREQUENCY_SECONDS);
log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
}
@@ -240,7 +260,7 @@
log.debug("{} is not my scheme, skipping", deviceId);
return;
}
- scheduledTasks.put(deviceId, schedulePolling(deviceId, true));
+ scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, true));
});
}
}
@@ -248,6 +268,7 @@
@Deactivate
public void deactivate() {
portStatsExecutor.shutdown();
+ availabilityCheckExecutor.shutdown();
componentConfigService.unregisterProperties(getClass(), false);
cfgService.removeListener(cfgListener);
//Not Removing the device so they can still be used from other driver providers
@@ -285,7 +306,7 @@
scheduledTasks.get(deviceId).cancel(false);
scheduledTasks.remove(deviceId);
} else if (mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) == null) {
- scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
+ scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, false));
updatePortStatistics(deviceId);
}
});
@@ -304,7 +325,8 @@
try {
return reachable.get(REACHABILITY_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
- log.error("Device {} is not reachable", deviceId, e);
+ log.warn("Device {} is not reachable {}", deviceId, e.getMessage());
+ log.debug("Exception", e);
return false;
}
}
@@ -335,7 +357,12 @@
@Override
public void triggerDisconnect(DeviceId deviceId) {
- connectionExecutor.execute(() -> disconnectDevice(deviceId));
+ log.debug("Triggering disconnection of device {}", deviceId);
+ connectionExecutor.execute(() -> {
+ disconnectDevice(deviceId).whenComplete((success, ex) -> {
+ checkAndConnect(deviceId);
+ });
+ });
}
private DeviceHandshaker getHandshaker(DeviceId deviceId) {
@@ -411,7 +438,7 @@
connected.thenAcceptAsync(result -> {
if (result) {
-
+ handshaker.addChannelListener(channelListener);
//Populated with the default values obtained by the driver
ChassisId cid = new ChassisId();
SparseAnnotations annotations = DefaultAnnotations.builder()
@@ -445,7 +472,6 @@
handshaker.disconnect();
return;
}
-
advertiseDevice(deviceId, description, ports);
} else {
@@ -470,6 +496,7 @@
//Connecting to the device
handshaker.connect().thenAcceptAsync(result -> {
if (result) {
+ handshaker.addChannelListener(channelListener);
handlePipeconf(deviceId, handshaker.handler().driver(), handshaker.data(), false);
}
});
@@ -553,17 +580,20 @@
providerService.updatePorts(deviceId, ports);
}
- private void disconnectDevice(DeviceId deviceId) {
+ private CompletableFuture<Boolean> disconnectDevice(DeviceId deviceId) {
log.info("Disconnecting for device {}", deviceId);
+ CompletableFuture<Boolean> disconnectError = new CompletableFuture<>();
+
DeviceHandshaker handshaker = handshakers.remove(deviceId);
if (handshaker != null) {
- CompletableFuture<Boolean> disconnect = handshaker.disconnect();
- disconnect.thenAcceptAsync(result -> {
+ handshaker.disconnect().thenAcceptAsync(result -> {
if (result) {
log.info("Disconnected device {}", deviceId);
providerService.deviceDisconnected(deviceId);
+ disconnectError.complete(true);
} else {
+ disconnectError.complete(false);
log.warn("Device {} was unable to disconnect", deviceId);
}
});
@@ -571,11 +601,14 @@
//gracefully ignoring.
log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
"shutdown of communication", deviceId);
+ disconnectError.complete(false);
}
ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
if (pollingStatisticsTask != null) {
pollingStatisticsTask.cancel(true);
+ scheduledTasks.remove(deviceId);
}
+ return disconnectError;
}
//Needed to catch the exception in the executors since are not rethrown otherwise.
@@ -732,7 +765,7 @@
pipelineConfigured.remove(deviceId);
}
- private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
+ private ScheduledFuture<?> scheduleStatistcsPolling(DeviceId deviceId, boolean randomize) {
int delay = 0;
if (randomize) {
delay = new SecureRandom().nextInt(10);
@@ -742,6 +775,57 @@
delay, pollFrequency, TimeUnit.SECONDS);
}
+ private void scheduleDevicePolling() {
+ cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class).forEach(this::checkAndConnect);
+ }
+
+ private void checkAndConnect(DeviceId deviceId) {
+ // Let's try and reconnect to a device which is stored in the net-cfg.
+ // One of the following conditions must be satisfied:
+ // 1) device is null in the store meaning that is was never connected or it was administratively removed
+ // 2) the device is not available and there is no MASTER instance, meaning the device lost
+ // it's connection to ONOS at some point in the past.
+ // We also check that the general device provider config and the driver config are present.
+ // We do not check for reachability using isReachable(deviceId) since the behaviour of this method
+ // can vary depending on protocol nuances. We leave this check to the device handshaker
+ // at later stages of the connection process.
+ // IF the conditions are not met but instead the device is present in the store, available and this instance is
+ // MASTER but is not reachable we remove it from the store.
+
+ if ((deviceService.getDevice(deviceId) == null || (!deviceService.isAvailable(deviceId) &&
+ mastershipService.getMasterFor(deviceId) == null)) && configIsPresent(deviceId)) {
+ log.debug("Trying to re-connect to device {}", deviceId);
+ NodeId leaderNodeId = leadershipService.runForLeadership(CHECK + deviceId.toString() + CONNECTION)
+ .leader().nodeId();
+ NodeId localNodeId = clusterService.getLocalNode().id();
+ if (localNodeId.equals(leaderNodeId)) {
+ log.debug("{} is leader for {}, initiating the connection", leaderNodeId,
+ deviceId);
+ checkAndSubmitDeviceTask(deviceId);
+ } else {
+ log.debug("{} is not leader for {}, initiating connection, {} is LEADER",
+ localNodeId, deviceId, leaderNodeId);
+ connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
+ //FIXME this will be removed when config is synced
+ cleanUpConfigInfo(deviceId);
+ }
+ } else if ((deviceService.getDevice(deviceId) != null && deviceService.isAvailable(deviceId)
+ && mastershipService.isLocalMaster(deviceId) && !isReachable(deviceId) && configIsPresent(deviceId))) {
+ log.info("Removing available but unreachable device {}", deviceId);
+ disconnectDevice(deviceId);
+ providerService.deviceDisconnected(deviceId);
+ }
+ }
+
+ private boolean configIsPresent(DeviceId deviceId) {
+ boolean present = cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
+ && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
+ if (!present) {
+ log.warn("Configuration for device {} is not complete", deviceId);
+ }
+ return present;
+ }
+
/**
* Listener for core device events.
*/
@@ -754,7 +838,7 @@
// For now this is scheduled periodically, when streaming API will
// be available we check and base it on the streaming API (e.g. gNMI)
if (mastershipService.isLocalMaster(deviceId)) {
- scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
+ scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, false));
}
}
@@ -764,4 +848,29 @@
event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
}
}
+
+ /**
+ * Listener for device channel events.
+ */
+ private class InternalChannelListener implements ChannelListener {
+
+ @Override
+ public void event(ChannelEvent event) {
+ DeviceId deviceId = event.subject();
+ if (event.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
+ //let's properly handle device disconnection
+ CompletableFuture<Boolean> disconnection = disconnectDevice(deviceId);
+ disconnection.thenAcceptAsync(result -> {
+ //If master notifying of disconnection to the core.
+ if (mastershipService.isLocalMaster(deviceId)) {
+ log.info("Disconnecting unreachable device {}, due to error on channel", deviceId);
+ providerService.deviceDisconnected(deviceId);
+ }
+ });
+
+ }
+ //TODO evaluate other type of reactions.
+ }
+
+ }
}