ONOS-6561 BMv2 handshaker via P4Runtime
+ support fort device-specific default pipeconf
+ improvements to P4runtime and gRPC protocol stuff
Change-Id: I8986fce3959df564454ea3d31859860f61eabcae
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java
index f1e279a..629aad1 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java
@@ -57,7 +57,7 @@
Optional<GrpcObserverHandler> getObserverManager(GrpcStreamObserverId observerId);
/**
- * Tries to connect to a specific gRPC device, if the connection is successful
+ * Tries to connect to a specific gRPC server, if the connection is successful
* returns the ManagedChannel. This method blocks until the channel is setup or a timeout expires.
* By default the timeout is 20 seconds. If the timeout expires and thus the channel can't be set up
* a IOException is thrown.
@@ -84,6 +84,15 @@
Map<GrpcChannelId, ManagedChannel> getChannels();
/**
+ * Returns true if the channel associated with the given identifier is open, i.e. the server is able to successfully
+ * responds to RPCs.
+ *
+ * @param channelId channel identifier
+ * @return true if channel is open, false otherwise.
+ */
+ boolean isChannelOpen(GrpcChannelId channelId);
+
+ /**
* Returns all ManagedChannels associated to the given device identifier.
*
* @param deviceId the device for which we are interested.
diff --git a/protocols/grpc/ctl/BUCK b/protocols/grpc/ctl/BUCK
index 764d980..18e0411 100644
--- a/protocols/grpc/ctl/BUCK
+++ b/protocols/grpc/ctl/BUCK
@@ -1,11 +1,13 @@
COMPILE_DEPS = [
'//lib:CORE_DEPS',
'//protocols/grpc/api:onos-protocols-grpc-api',
+ '//protocols/grpc/proto:onos-protocols-grpc-proto',
'//lib:grpc-core-1.3.0',
'//lib:grpc-protobuf-1.3.0',
'//lib:grpc-stub-1.3.0',
'//lib:grpc-netty-1.3.0',
'//lib:grpc-auth-1.3.0',
+ '//lib:protobuf-java-3.0.2',
]
TEST_DEPS = [
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
index 4ce097d..152106a 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
@@ -19,6 +19,8 @@
import com.google.common.collect.ImmutableSet;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -27,16 +29,20 @@
import org.onosproject.grpc.api.GrpcController;
import org.onosproject.grpc.api.GrpcObserverHandler;
import org.onosproject.grpc.api.GrpcStreamObserverId;
+import org.onosproject.grpc.ctl.dummy.Dummy;
+import org.onosproject.grpc.ctl.dummy.DummyServiceGrpc;
import org.onosproject.net.DeviceId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
/**
* Default implementation of the GrpcController.
@@ -45,6 +51,8 @@
@Service
public class GrpcControllerImpl implements GrpcController {
+ private static final int CONNECTION_TIMEOUT_SECONDS = 20;
+
public static final Logger log = LoggerFactory
.getLogger(GrpcControllerImpl.class);
@@ -87,18 +95,63 @@
}
@Override
- public ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder) {
+ public ManagedChannel connectChannel(GrpcChannelId channelId, ManagedChannelBuilder<?> channelBuilder)
+ throws IOException {
ManagedChannel channel = channelBuilder.build();
- channel.getState(true);
+ // Forced connection not yet implemented. Use workaround...
+ // channel.getState(true);
+ doDummyMessage(channel);
+
channelBuilders.put(channelId, channelBuilder);
channels.put(channelId, channel);
return channel;
}
+ private void doDummyMessage(ManagedChannel channel) throws IOException {
+ DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc.newBlockingStub(channel)
+ .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ try {
+ dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse.getDefaultInstance());
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus() != Status.UNIMPLEMENTED) {
+ // UNIMPLEMENTED means that server received our message but doesn't know how to handle it.
+ // Hence, channel is open.
+ throw new IOException(e);
+ }
+ }
+ }
+
+ @Override
+ public boolean isChannelOpen(GrpcChannelId channelId) {
+ if (!channels.containsKey(channelId)) {
+ log.warn("Can't check if channel open for unknown channel id {}", channelId);
+ return false;
+ }
+
+ try {
+ doDummyMessage(channels.get(channelId));
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
@Override
public void disconnectChannel(GrpcChannelId channelId) {
- channels.get(channelId).shutdown();
+ if (!channels.containsKey(channelId)) {
+ // Nothing to do.
+ return;
+ }
+ ManagedChannel channel = channels.get(channelId);
+
+ try {
+ channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Channel {} didn't shut down in time.");
+ channel.shutdownNow();
+ }
+
channels.remove(channelId);
channelBuilders.remove(channelId);
}
diff --git a/protocols/grpc/proto/BUCK b/protocols/grpc/proto/BUCK
new file mode 100644
index 0000000..f354b39
--- /dev/null
+++ b/protocols/grpc/proto/BUCK
@@ -0,0 +1,28 @@
+include_defs(
+ '//bucklets/grpc.bucklet'
+)
+
+PROTOC_VER = '3.0.2'
+GRPC_VER = '1.3.0'
+
+
+COMPILE_DEPS = [
+ '//lib:CORE_DEPS',
+ '//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
+ '//lib:grpc-stub-' + GRPC_VER,
+ '//lib:grpc-protobuf-' + GRPC_VER,
+ '//lib:protobuf-java-' + PROTOC_VER,
+]
+
+
+grpc_jar(
+ proto_match_patterns = ["*.proto"],
+ proto_paths = ["$ONOS_ROOT/protocols/grpc/proto/"],
+ protoc_version = PROTOC_VER,
+ plugin_version = GRPC_VER,
+ deps = COMPILE_DEPS,
+)
+
+project_config(
+ src_target = ':onos-protocols-grpc-proto'
+)
\ No newline at end of file
diff --git a/protocols/grpc/proto/dummy.proto b/protocols/grpc/proto/dummy.proto
new file mode 100644
index 0000000..d02b5d8
--- /dev/null
+++ b/protocols/grpc/proto/dummy.proto
@@ -0,0 +1,12 @@
+syntax = "proto3";
+
+option java_package = "org.onosproject.grpc.ctl.dummy";
+
+package dummy;
+
+service DummyService {
+ rpc SayHello (DummyMessageThatNoOneWouldReallyUse) returns (DummyMessageThatNoOneWouldReallyUse) {}
+}
+
+message DummyMessageThatNoOneWouldReallyUse {
+}
\ No newline at end of file
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index 3b8074f..ecdcb31 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -65,4 +65,14 @@
* @return true if client exists, false otherwise.
*/
boolean hasClient(DeviceId deviceId);
+
+ /**
+ * Returns true if the P4Runtime server running on the given device is reachable, i.e. the channel is open and the
+ * server is able to respond to RPCs, false otherwise. Reachability can be tested only if a client was previously
+ * created using {@link #createClient(DeviceId, int, ManagedChannelBuilder)}, otherwise this method returns false.
+ *
+ * @param deviceId device identifier.
+ * @return true if a client was created and is able to contact the P4Runtime server, false otherwise.
+ */
+ boolean isReacheable(DeviceId deviceId);
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index e6603ea..35e9726 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -20,6 +20,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.TextFormat;
+import io.grpc.Context;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -64,6 +65,7 @@
private final P4RuntimeGrpc.P4RuntimeStub asyncStub;
private ExecutorService executorService;
private StreamObserver<StreamMessageRequest> streamRequestObserver;
+ private Context.CancellableContext streamContext;
P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller,
@@ -74,8 +76,7 @@
this.executorService = executorService;
this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel)
.withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
- this.asyncStub = P4RuntimeGrpc.newStub(channel)
- .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
+ this.asyncStub = P4RuntimeGrpc.newStub(channel);
}
@Override
@@ -85,21 +86,44 @@
private boolean doInitStreamChannel() {
if (this.streamRequestObserver == null) {
- this.streamRequestObserver = this.asyncStub.streamChannel(new StreamChannelResponseObserver());
+
+ streamContext = Context.current().withCancellation();
+ streamContext.run(
+ () -> streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver()));
+
// To listen for packets and other events, we need to start the RPC.
- // Here we do it by sending an empty packet out.
- try {
- this.streamRequestObserver.onNext(StreamMessageRequest.newBuilder()
- .setPacket(PacketOut.getDefaultInstance())
- .build());
- } catch (StatusRuntimeException e) {
- log.warn("Unable to initialize stream channel for {}: {}", deviceId, e);
+ // Here we do it by sending a master arbitration update.
+ if (!doArbitrationUpdate()) {
+ log.warn("Unable to initialize stream channel for {}", deviceId);
return false;
}
}
return true;
}
+ private boolean doArbitrationUpdate() {
+
+ if (streamRequestObserver == null) {
+ log.error("Null request stream observer for {}", deviceId);
+ return false;
+ }
+
+ try {
+ StreamMessageRequest initRequest = StreamMessageRequest
+ .newBuilder()
+ .setArbitration(MasterArbitrationUpdate
+ .newBuilder()
+ .setDeviceId(p4DeviceId)
+ .build())
+ .build();
+ streamRequestObserver.onNext(initRequest);
+ return true;
+ } catch (StatusRuntimeException e) {
+ log.warn("Arbitration update failed for {}: {}", deviceId, e);
+ return false;
+ }
+ }
+
@Override
public CompletableFuture<Boolean> setPipelineConfig(InputStream p4info, InputStream targetConfig) {
return CompletableFuture.supplyAsync(() -> doSetPipelineConfig(p4info, targetConfig), executorService);
@@ -168,12 +192,15 @@
@Override
public void shutdown() {
- if (this.streamRequestObserver != null) {
- this.streamRequestObserver.onError(new StatusRuntimeException(Status.CANCELLED));
- this.streamRequestObserver.onCompleted();
+ log.info("Shutting down client for {}...", deviceId);
+
+ if (streamRequestObserver != null) {
+ streamRequestObserver.onCompleted();
+ streamContext.cancel(null);
+ streamContext = null;
}
- this.executorService.shutdownNow();
+ this.executorService.shutdown();
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
@@ -216,12 +243,16 @@
@Override
public void onError(Throwable throwable) {
- log.warn("Error on stream channel for {}: {}", deviceId, throwable);
+ log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
+ // FIXME: we might want to recreate the channel.
+ // In general, we want to be robust against any transient error and, if the channel is open, make sure the
+ // stream channel is always on.
}
@Override
public void onCompleted() {
- // TODO: declare the device as disconnected?
+ log.warn("Stream channel for {} has completed", deviceId);
+ // FIXME: same concern as before.
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index cd4151c..06376ca 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -39,8 +39,8 @@
import java.io.IOException;
import java.util.Map;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
@@ -62,10 +62,11 @@
private final Logger log = getLogger(getClass());
private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
- private final Map<DeviceId, P4RuntimeClient> clients = Maps.newConcurrentMap();
- private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newConcurrentMap();
+ private final Map<DeviceId, P4RuntimeClient> clients = Maps.newHashMap();
+ private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+
// TODO: should use a cache to delete unused locks.
- private final Map<DeviceId, Lock> deviceLocks = Maps.newConcurrentMap();
+ private final Map<DeviceId, ReadWriteLock> deviceLocks = Maps.newConcurrentMap();
@Activate
public void activate() {
@@ -85,8 +86,8 @@
checkNotNull(deviceId);
checkNotNull(channelBuilder);
- deviceLocks.putIfAbsent(deviceId, new ReentrantLock());
- deviceLocks.get(deviceId).lock();
+ deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
+ deviceLocks.get(deviceId).writeLock().lock();
log.info("Creating client for {} (with internal device id {})...", deviceId, p4DeviceId);
@@ -97,11 +98,12 @@
return doCreateClient(deviceId, p4DeviceId, channelBuilder);
}
} finally {
- deviceLocks.get(deviceId).unlock();
+ deviceLocks.get(deviceId).writeLock().unlock();
}
}
private boolean doCreateClient(DeviceId deviceId, int p4DeviceId, ManagedChannelBuilder channelBuilder) {
+
GrpcChannelId channelId = GrpcChannelId.of(deviceId, "p4runtime");
// Channel defaults.
@@ -127,43 +129,62 @@
@Override
public P4RuntimeClient getClient(DeviceId deviceId) {
- deviceLocks.putIfAbsent(deviceId, new ReentrantLock());
- deviceLocks.get(deviceId).lock();
+ deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
+ deviceLocks.get(deviceId).readLock().lock();
try {
return clients.get(deviceId);
} finally {
- deviceLocks.get(deviceId).unlock();
+ deviceLocks.get(deviceId).readLock().unlock();
}
}
@Override
public void removeClient(DeviceId deviceId) {
- deviceLocks.putIfAbsent(deviceId, new ReentrantLock());
- deviceLocks.get(deviceId).lock();
+ deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
+ deviceLocks.get(deviceId).writeLock().lock();
try {
if (clients.containsKey(deviceId)) {
clients.get(deviceId).shutdown();
grpcController.disconnectChannel(channelIds.get(deviceId));
clients.remove(deviceId);
+ channelIds.remove(deviceId);
}
} finally {
- deviceLocks.get(deviceId).unlock();
+ deviceLocks.get(deviceId).writeLock().unlock();
}
}
@Override
public boolean hasClient(DeviceId deviceId) {
- deviceLocks.putIfAbsent(deviceId, new ReentrantLock());
- deviceLocks.get(deviceId).lock();
+ deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
+ deviceLocks.get(deviceId).readLock().lock();
try {
return clients.containsKey(deviceId);
} finally {
- deviceLocks.get(deviceId).unlock();
+ deviceLocks.get(deviceId).readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean isReacheable(DeviceId deviceId) {
+
+ deviceLocks.putIfAbsent(deviceId, new ReentrantReadWriteLock());
+ deviceLocks.get(deviceId).readLock().lock();
+
+ try {
+ if (!clients.containsKey(deviceId)) {
+ log.warn("No client for {}, can't check for reachability", deviceId);
+ return false;
+ }
+
+ return grpcController.isChannelOpen(channelIds.get(deviceId));
+ } finally {
+ deviceLocks.get(deviceId).readLock().unlock();
}
}