Add gNMI device state subscriber
Change-Id: I20cb5e130f4e416bf8678aab2e5268faf24ad06b
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
index 5beb238..6e65dd3 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
@@ -22,6 +22,7 @@
import gnmi.Gnmi.GetResponse;
import gnmi.Gnmi.SetRequest;
import gnmi.Gnmi.SetResponse;
+import gnmi.Gnmi.SubscribeRequest;
import org.onosproject.grpc.api.GrpcClient;
import java.util.concurrent.CompletableFuture;
@@ -56,12 +57,23 @@
CompletableFuture<SetResponse> set(SetRequest request);
/**
- * Check weather the gNMI service is available or not by sending a
- * dummy get request message.
+ * Subscribes to a given specific gNMI path.
+ *
+ * @param request the subscribe request
+ * @return true if subscribe successfully; false otherwise
+ */
+ boolean subscribe(SubscribeRequest request);
+
+ /**
+ * Terminates the subscription channel of this device.
+ */
+ void terminateSubscriptionChannel();
+
+ /**
+ * Check weather the gNMI service is available or not by sending a dummy get
+ * request message.
*
* @return true if gNMI service available; false otherwise
*/
CompletableFuture<Boolean> isServiceAvailable();
-
- // TODO: Support gNMI subscription
}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java
index f4964ed..b0e0071 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java
@@ -17,6 +17,7 @@
package org.onosproject.gnmi.api;
import com.google.common.annotations.Beta;
+import org.onosproject.event.ListenerService;
import org.onosproject.grpc.api.GrpcClientController;
/**
@@ -24,5 +25,6 @@
*/
@Beta
public interface GnmiController
- extends GrpcClientController<GnmiClientKey, GnmiClient> {
+ extends GrpcClientController<GnmiClientKey, GnmiClient>,
+ ListenerService<GnmiEvent, GnmiEventListener> {
}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java
index 5129926..84031d0 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java
@@ -32,15 +32,10 @@
/**
* Update.
*/
- UPDATE,
-
- /**
- * Sync response.
- */
- SYNC_RESPONSE
+ UPDATE
}
- protected GnmiEvent(Type type, GnmiEventSubject subject) {
+ public GnmiEvent(Type type, GnmiEventSubject subject) {
super(type, subject);
}
}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUpdate.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUpdate.java
new file mode 100644
index 0000000..eeb7649
--- /dev/null
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUpdate.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.gnmi.api;
+
+import com.google.common.base.MoreObjects;
+import gnmi.Gnmi.Notification;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Event class for gNMI update.
+ */
+public class GnmiUpdate implements GnmiEventSubject {
+ private DeviceId deviceId;
+ private Notification update;
+ private boolean syncResponse;
+
+ /**
+ * Default constructor.
+ *
+ * @param deviceId the device id for this event
+ * @param update the update for this event
+ * @param syncResponse indicate target has sent all values associated with
+ * the subscription at least once.
+ */
+ public GnmiUpdate(DeviceId deviceId, Notification update, boolean syncResponse) {
+ this.deviceId = deviceId;
+ this.update = update;
+ this.syncResponse = syncResponse;
+ }
+
+ /**
+ * Gets the update data.
+ *
+ * @return the update data
+ */
+ public Notification update() {
+ return update;
+ }
+
+ /**
+ * indicate target has sent all values associated with the subscription at
+ * least once.
+ *
+ * @return true if all value from target has sent
+ */
+ public boolean syncResponse() {
+ return syncResponse;
+ }
+
+ @Override
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("deviceId", deviceId)
+ .add("syncResponse", syncResponse)
+ .add("update", update)
+ .toString();
+ }
+}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUtils.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUtils.java
new file mode 100644
index 0000000..6a71a19
--- /dev/null
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.gnmi.api;
+
+import gnmi.Gnmi.Path;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for gNMI protocol.
+ */
+public final class GnmiUtils {
+
+ private GnmiUtils() {
+ // Hide default constructor
+ }
+
+ /**
+ * Convert gNMI path to human readable string.
+ *
+ * @param path the gNMI path
+ * @return readable string of the path
+ */
+ public static String pathToString(Path path) {
+ StringBuilder pathStringBuilder = new StringBuilder();
+
+ path.getElemList().forEach(elem -> {
+ pathStringBuilder.append("/").append(elem.getName());
+ if (elem.getKeyCount() > 0) {
+ pathStringBuilder.append("[");
+ List<String> keys = elem.getKeyMap().entrySet().stream()
+ .map(entry -> entry.getKey() + "=" + entry.getValue())
+ .collect(Collectors.toList());
+ pathStringBuilder.append(String.join(", ", keys));
+ pathStringBuilder.append("]");
+ }
+ });
+ return pathStringBuilder.toString();
+ }
+}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
index 22b226b..117b27e 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
@@ -23,17 +23,26 @@
import gnmi.Gnmi.PathElem;
import gnmi.Gnmi.SetRequest;
import gnmi.Gnmi.SetResponse;
+import gnmi.Gnmi.SubscribeRequest;
+import gnmi.Gnmi.SubscribeResponse;
import gnmi.gNMIGrpc;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.StreamObserver;
import org.onosproject.gnmi.api.GnmiClient;
import org.onosproject.gnmi.api.GnmiClientKey;
+import org.onosproject.gnmi.api.GnmiEvent;
+import org.onosproject.gnmi.api.GnmiUpdate;
import org.onosproject.grpc.ctl.AbstractGrpcClient;
import org.slf4j.Logger;
+import java.net.ConnectException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import static java.lang.String.format;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -45,10 +54,14 @@
private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build();
private final Logger log = getLogger(getClass());
private final gNMIGrpc.gNMIBlockingStub blockingStub;
+ private StreamChannelManager streamChannelManager;
+ private GnmiControllerImpl controller;
- GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel) {
+ GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
super(clientKey);
this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
+ this.streamChannelManager = new StreamChannelManager(managedChannel);
+ this.controller = controller;
}
@Override
@@ -67,10 +80,26 @@
}
@Override
+ public boolean subscribe(SubscribeRequest request) {
+ return streamChannelManager.send(request);
+ }
+
+ @Override
+ public void terminateSubscriptionChannel() {
+ streamChannelManager.complete();
+ }
+
+ @Override
public CompletableFuture<Boolean> isServiceAvailable() {
return supplyInContext(this::doServiceAvailable, "isServiceAvailable");
}
+ @Override
+ protected Void doShutdown() {
+ streamChannelManager.complete();
+ return super.doShutdown();
+ }
+
private CapabilityResponse doCapability() {
CapabilityRequest request = CapabilityRequest.newBuilder().build();
try {
@@ -110,4 +139,121 @@
return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT);
}
}
+
+
+
+ /**
+ * A manager for the gNMI stream channel that opportunistically creates
+ * new stream RCP stubs (e.g. when one fails because of errors) and posts
+ * subscribe events via the gNMI controller.
+ */
+ private final class StreamChannelManager {
+
+ private final ManagedChannel channel;
+ private final AtomicBoolean open;
+ private final StreamObserver<SubscribeResponse> responseObserver;
+ private ClientCallStreamObserver<SubscribeRequest> requestObserver;
+
+ private StreamChannelManager(ManagedChannel channel) {
+ this.channel = channel;
+ this.responseObserver = new InternalStreamResponseObserver(this);
+ this.open = new AtomicBoolean(false);
+ }
+
+ private void initIfRequired() {
+ if (requestObserver == null) {
+ log.debug("Creating new stream channel for {}...", deviceId);
+ requestObserver = (ClientCallStreamObserver<SubscribeRequest>)
+ gNMIGrpc.newStub(channel).subscribe(responseObserver);
+ open.set(false);
+ }
+ }
+
+ public boolean send(SubscribeRequest value) {
+ synchronized (this) {
+ initIfRequired();
+ try {
+ requestObserver.onNext(value);
+ return true;
+ } catch (Throwable ex) {
+ if (ex instanceof StatusRuntimeException) {
+ log.warn("Unable to send subscribe request to {}: {}",
+ deviceId, ex.getMessage());
+ } else {
+ log.warn("Exception while sending subscribe request to {}",
+ deviceId, ex);
+ }
+ complete();
+ return false;
+ }
+ }
+ }
+
+ public void complete() {
+ synchronized (this) {
+ if (requestObserver != null) {
+ requestObserver.onCompleted();
+ requestObserver.cancel("Terminated", null);
+ requestObserver = null;
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Handles messages received from the device on the stream channel.
+ */
+ private final class InternalStreamResponseObserver
+ implements StreamObserver<SubscribeResponse> {
+
+ private final StreamChannelManager streamChannelManager;
+
+ private InternalStreamResponseObserver(
+ StreamChannelManager streamChannelManager) {
+ this.streamChannelManager = streamChannelManager;
+ }
+
+ @Override
+ public void onNext(SubscribeResponse message) {
+ executorService.submit(() -> doNext(message));
+ }
+
+ private void doNext(SubscribeResponse message) {
+ try {
+ log.debug("Received message on stream channel from {}: {}",
+ deviceId, message.toString());
+ GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
+ GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
+ controller.postEvent(event);
+ } catch (Throwable ex) {
+ log.error("Exception while processing stream message from {}",
+ deviceId, ex);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ if (throwable instanceof StatusRuntimeException) {
+ StatusRuntimeException sre = (StatusRuntimeException) throwable;
+ if (sre.getStatus().getCause() instanceof ConnectException) {
+ log.warn("Device {} is unreachable ({})",
+ deviceId, sre.getCause().getMessage());
+ } else {
+ log.warn("Received error on stream channel for {}: {}",
+ deviceId, throwable.getMessage());
+ }
+ } else {
+ log.warn(format("Received exception on stream channel for %s",
+ deviceId), throwable);
+ }
+ streamChannelManager.complete();
+ }
+
+ @Override
+ public void onCompleted() {
+ log.warn("Stream channel for {} has completed", deviceId);
+ streamChannelManager.complete();
+ }
+ }
}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
index 8392c4a..cf14712 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
@@ -41,6 +41,7 @@
@Activate
public void activate() {
super.activate();
+ eventDispatcher.addSink(GnmiEvent.class, listenerRegistry);
log.info("Started");
}
@@ -52,6 +53,15 @@
@Override
protected GnmiClient createClientInstance(GnmiClientKey clientKey, ManagedChannel channel) {
- return new GnmiClientImpl(clientKey, channel);
+ return new GnmiClientImpl(clientKey, channel, this);
+ }
+
+ /**
+ * Handles event from gNMI client.
+ *
+ * @param event the gNMI event
+ */
+ void postEvent(GnmiEvent event) {
+ post(event);
}
}