Add gNMI device state subscriber
Also disable some gNMI/P4-related BUCK files that break the BUCK build
for ONOS 1.14
Change-Id: I20cb5e130f4e416bf8678aab2e5268faf24ad06b
diff --git a/modules.defs b/modules.defs
index 11f55a5..17d0c3a 100644
--- a/modules.defs
+++ b/modules.defs
@@ -76,7 +76,6 @@
'//providers/isis/topology:onos-providers-isis-topology',
'//providers/lisp/device:onos-providers-lisp-device',
'//providers/tl1/device:onos-providers-tl1-device',
- '//providers/general/device:onos-providers-general-device',
# '//providers/p4runtime/packet:onos-providers-p4runtime-packet',
'//web/api:onos-rest',
@@ -146,7 +145,6 @@
'//providers/link:onos-providers-link-oar',
'//providers/lisp:onos-providers-lisp-oar',
'//providers/tl1:onos-providers-tl1-oar',
- '//providers/general:onos-providers-general-oar',
# '//providers/p4runtime:onos-providers-p4runtime-oar',
# '//providers/ietfte:onos-providers-ietfte-oar',
'//providers/xmpp/device:onos-providers-xmpp-device-oar',
diff --git a/pipelines/fabric/BUCK b/pipelines/fabric/BUCK
deleted file mode 100644
index 5491289..0000000
--- a/pipelines/fabric/BUCK
+++ /dev/null
@@ -1,37 +0,0 @@
-COMPILE_DEPS = [
- '//lib:CORE_DEPS',
- '//lib:KRYO',
- '//protocols/p4runtime/model:onos-protocols-p4runtime-model',
- '//protocols/p4runtime/api:onos-protocols-p4runtime-api',
- '//pipelines/basic:onos-pipelines-basic',
- '//core/store/serializers:onos-core-serializers',
- '//apps/inbandtelemetry/api:onos-apps-inbandtelemetry-api',
- '//providers/general/device:onos-providers-general-device',
-]
-
-TEST_DEPS = [
- '//lib:TEST_ADAPTERS',
-]
-
-BUNDLES = [
- '//pipelines/fabric:onos-pipelines-fabric',
- '//apps/inbandtelemetry/api:onos-apps-inbandtelemetry-api',
-]
-
-osgi_jar_with_tests (
- deps = COMPILE_DEPS,
- test_deps = TEST_DEPS,
-)
-
-onos_app(
- app_name = 'org.onosproject.pipelines.fabric',
- title = 'Fabric Pipeline',
- category = 'Pipeline',
- url = 'http://onosproject.org',
- description = 'Provides pipelines with CORD fabric underlay support.',
- included_bundles = BUNDLES,
- required_apps = [
- 'org.onosproject.drivers.p4runtime',
- 'org.onosproject.pipelines.basic',
- ]
-)
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
index 5beb238..6e65dd3 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
@@ -22,6 +22,7 @@
import gnmi.Gnmi.GetResponse;
import gnmi.Gnmi.SetRequest;
import gnmi.Gnmi.SetResponse;
+import gnmi.Gnmi.SubscribeRequest;
import org.onosproject.grpc.api.GrpcClient;
import java.util.concurrent.CompletableFuture;
@@ -56,12 +57,23 @@
CompletableFuture<SetResponse> set(SetRequest request);
/**
- * Check weather the gNMI service is available or not by sending a
- * dummy get request message.
+ * Subscribes to a given specific gNMI path.
+ *
+ * @param request the subscribe request
+ * @return true if subscribe successfully; false otherwise
+ */
+ boolean subscribe(SubscribeRequest request);
+
+ /**
+ * Terminates the subscription channel of this device.
+ */
+ void terminateSubscriptionChannel();
+
+ /**
+ * Check weather the gNMI service is available or not by sending a dummy get
+ * request message.
*
* @return true if gNMI service available; false otherwise
*/
CompletableFuture<Boolean> isServiceAvailable();
-
- // TODO: Support gNMI subscription
}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java
index f4964ed..b0e0071 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java
@@ -17,6 +17,7 @@
package org.onosproject.gnmi.api;
import com.google.common.annotations.Beta;
+import org.onosproject.event.ListenerService;
import org.onosproject.grpc.api.GrpcClientController;
/**
@@ -24,5 +25,6 @@
*/
@Beta
public interface GnmiController
- extends GrpcClientController<GnmiClientKey, GnmiClient> {
+ extends GrpcClientController<GnmiClientKey, GnmiClient>,
+ ListenerService<GnmiEvent, GnmiEventListener> {
}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java
index 5129926..84031d0 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java
@@ -32,15 +32,10 @@
/**
* Update.
*/
- UPDATE,
-
- /**
- * Sync response.
- */
- SYNC_RESPONSE
+ UPDATE
}
- protected GnmiEvent(Type type, GnmiEventSubject subject) {
+ public GnmiEvent(Type type, GnmiEventSubject subject) {
super(type, subject);
}
}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUpdate.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUpdate.java
new file mode 100644
index 0000000..eeb7649
--- /dev/null
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUpdate.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.gnmi.api;
+
+import com.google.common.base.MoreObjects;
+import gnmi.Gnmi.Notification;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Event class for gNMI update.
+ */
+public class GnmiUpdate implements GnmiEventSubject {
+ private DeviceId deviceId;
+ private Notification update;
+ private boolean syncResponse;
+
+ /**
+ * Default constructor.
+ *
+ * @param deviceId the device id for this event
+ * @param update the update for this event
+ * @param syncResponse indicate target has sent all values associated with
+ * the subscription at least once.
+ */
+ public GnmiUpdate(DeviceId deviceId, Notification update, boolean syncResponse) {
+ this.deviceId = deviceId;
+ this.update = update;
+ this.syncResponse = syncResponse;
+ }
+
+ /**
+ * Gets the update data.
+ *
+ * @return the update data
+ */
+ public Notification update() {
+ return update;
+ }
+
+ /**
+ * indicate target has sent all values associated with the subscription at
+ * least once.
+ *
+ * @return true if all value from target has sent
+ */
+ public boolean syncResponse() {
+ return syncResponse;
+ }
+
+ @Override
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("deviceId", deviceId)
+ .add("syncResponse", syncResponse)
+ .add("update", update)
+ .toString();
+ }
+}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUtils.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUtils.java
new file mode 100644
index 0000000..6a71a19
--- /dev/null
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.gnmi.api;
+
+import gnmi.Gnmi.Path;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for gNMI protocol.
+ */
+public final class GnmiUtils {
+
+ private GnmiUtils() {
+ // Hide default constructor
+ }
+
+ /**
+ * Convert gNMI path to human readable string.
+ *
+ * @param path the gNMI path
+ * @return readable string of the path
+ */
+ public static String pathToString(Path path) {
+ StringBuilder pathStringBuilder = new StringBuilder();
+
+ path.getElemList().forEach(elem -> {
+ pathStringBuilder.append("/").append(elem.getName());
+ if (elem.getKeyCount() > 0) {
+ pathStringBuilder.append("[");
+ List<String> keys = elem.getKeyMap().entrySet().stream()
+ .map(entry -> entry.getKey() + "=" + entry.getValue())
+ .collect(Collectors.toList());
+ pathStringBuilder.append(String.join(", ", keys));
+ pathStringBuilder.append("]");
+ }
+ });
+ return pathStringBuilder.toString();
+ }
+}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
index 22b226b..117b27e 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
@@ -23,17 +23,26 @@
import gnmi.Gnmi.PathElem;
import gnmi.Gnmi.SetRequest;
import gnmi.Gnmi.SetResponse;
+import gnmi.Gnmi.SubscribeRequest;
+import gnmi.Gnmi.SubscribeResponse;
import gnmi.gNMIGrpc;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.StreamObserver;
import org.onosproject.gnmi.api.GnmiClient;
import org.onosproject.gnmi.api.GnmiClientKey;
+import org.onosproject.gnmi.api.GnmiEvent;
+import org.onosproject.gnmi.api.GnmiUpdate;
import org.onosproject.grpc.ctl.AbstractGrpcClient;
import org.slf4j.Logger;
+import java.net.ConnectException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import static java.lang.String.format;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -45,10 +54,14 @@
private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build();
private final Logger log = getLogger(getClass());
private final gNMIGrpc.gNMIBlockingStub blockingStub;
+ private StreamChannelManager streamChannelManager;
+ private GnmiControllerImpl controller;
- GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel) {
+ GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
super(clientKey);
this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
+ this.streamChannelManager = new StreamChannelManager(managedChannel);
+ this.controller = controller;
}
@Override
@@ -67,10 +80,26 @@
}
@Override
+ public boolean subscribe(SubscribeRequest request) {
+ return streamChannelManager.send(request);
+ }
+
+ @Override
+ public void terminateSubscriptionChannel() {
+ streamChannelManager.complete();
+ }
+
+ @Override
public CompletableFuture<Boolean> isServiceAvailable() {
return supplyInContext(this::doServiceAvailable, "isServiceAvailable");
}
+ @Override
+ protected Void doShutdown() {
+ streamChannelManager.complete();
+ return super.doShutdown();
+ }
+
private CapabilityResponse doCapability() {
CapabilityRequest request = CapabilityRequest.newBuilder().build();
try {
@@ -110,4 +139,121 @@
return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT);
}
}
+
+
+
+ /**
+ * A manager for the gNMI stream channel that opportunistically creates
+ * new stream RCP stubs (e.g. when one fails because of errors) and posts
+ * subscribe events via the gNMI controller.
+ */
+ private final class StreamChannelManager {
+
+ private final ManagedChannel channel;
+ private final AtomicBoolean open;
+ private final StreamObserver<SubscribeResponse> responseObserver;
+ private ClientCallStreamObserver<SubscribeRequest> requestObserver;
+
+ private StreamChannelManager(ManagedChannel channel) {
+ this.channel = channel;
+ this.responseObserver = new InternalStreamResponseObserver(this);
+ this.open = new AtomicBoolean(false);
+ }
+
+ private void initIfRequired() {
+ if (requestObserver == null) {
+ log.debug("Creating new stream channel for {}...", deviceId);
+ requestObserver = (ClientCallStreamObserver<SubscribeRequest>)
+ gNMIGrpc.newStub(channel).subscribe(responseObserver);
+ open.set(false);
+ }
+ }
+
+ public boolean send(SubscribeRequest value) {
+ synchronized (this) {
+ initIfRequired();
+ try {
+ requestObserver.onNext(value);
+ return true;
+ } catch (Throwable ex) {
+ if (ex instanceof StatusRuntimeException) {
+ log.warn("Unable to send subscribe request to {}: {}",
+ deviceId, ex.getMessage());
+ } else {
+ log.warn("Exception while sending subscribe request to {}",
+ deviceId, ex);
+ }
+ complete();
+ return false;
+ }
+ }
+ }
+
+ public void complete() {
+ synchronized (this) {
+ if (requestObserver != null) {
+ requestObserver.onCompleted();
+ requestObserver.cancel("Terminated", null);
+ requestObserver = null;
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Handles messages received from the device on the stream channel.
+ */
+ private final class InternalStreamResponseObserver
+ implements StreamObserver<SubscribeResponse> {
+
+ private final StreamChannelManager streamChannelManager;
+
+ private InternalStreamResponseObserver(
+ StreamChannelManager streamChannelManager) {
+ this.streamChannelManager = streamChannelManager;
+ }
+
+ @Override
+ public void onNext(SubscribeResponse message) {
+ executorService.submit(() -> doNext(message));
+ }
+
+ private void doNext(SubscribeResponse message) {
+ try {
+ log.debug("Received message on stream channel from {}: {}",
+ deviceId, message.toString());
+ GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
+ GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
+ controller.postEvent(event);
+ } catch (Throwable ex) {
+ log.error("Exception while processing stream message from {}",
+ deviceId, ex);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ if (throwable instanceof StatusRuntimeException) {
+ StatusRuntimeException sre = (StatusRuntimeException) throwable;
+ if (sre.getStatus().getCause() instanceof ConnectException) {
+ log.warn("Device {} is unreachable ({})",
+ deviceId, sre.getCause().getMessage());
+ } else {
+ log.warn("Received error on stream channel for {}: {}",
+ deviceId, throwable.getMessage());
+ }
+ } else {
+ log.warn(format("Received exception on stream channel for %s",
+ deviceId), throwable);
+ }
+ streamChannelManager.complete();
+ }
+
+ @Override
+ public void onCompleted() {
+ log.warn("Stream channel for {} has completed", deviceId);
+ streamChannelManager.complete();
+ }
+ }
}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
index 7ddc3f0..88df1f4 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
@@ -43,6 +43,7 @@
@Activate
public void activate() {
super.activate();
+ eventDispatcher.addSink(GnmiEvent.class, listenerRegistry);
log.info("Started");
}
@@ -54,6 +55,15 @@
@Override
protected GnmiClient createClientInstance(GnmiClientKey clientKey, ManagedChannel channel) {
- return new GnmiClientImpl(clientKey, channel);
+ return new GnmiClientImpl(clientKey, channel, this);
+ }
+
+ /**
+ * Handles event from gNMI client.
+ *
+ * @param event the gNMI event
+ */
+ void postEvent(GnmiEvent event) {
+ post(event);
}
}
diff --git a/providers/general/BUCK b/providers/general/BUCK
deleted file mode 100644
index 5d4973e..0000000
--- a/providers/general/BUCK
+++ /dev/null
@@ -1,15 +0,0 @@
-BUNDLES = [
- '//providers/general/device:onos-providers-general-device',
-]
-
-onos_app (
- app_name = "org.onosproject.generaldeviceprovider",
- title = 'General Device Provider',
- category = 'Provider',
- url = 'http://onosproject.org',
- included_bundles = BUNDLES,
- description = 'General device southbound providers.',
- required_apps = [],
-)
-
-
diff --git a/providers/general/BUILD b/providers/general/BUILD
index 0f4654f..2ac73a0 100644
--- a/providers/general/BUILD
+++ b/providers/general/BUILD
@@ -7,7 +7,7 @@
category = "Provider",
description = "General device southbound providers.",
included_bundles = BUNDLES,
- required_apps = [],
+ required_apps = ["org.onosproject.protocols.gnmi"],
title = "General Device Provider",
url = "http://onosproject.org",
)
diff --git a/providers/general/device/BUCK b/providers/general/device/BUCK
deleted file mode 100644
index 4c7bd78..0000000
--- a/providers/general/device/BUCK
+++ /dev/null
@@ -1,13 +0,0 @@
-COMPILE_DEPS = [
- '//lib:CORE_DEPS',
- '//lib:JACKSON',
-]
-
-TEST_DEPS = [
- '//lib:TEST_ADAPTERS',
-]
-
-osgi_jar_with_tests (
- deps = COMPILE_DEPS,
- test_deps = TEST_DEPS,
-)
diff --git a/providers/general/device/BUILD b/providers/general/device/BUILD
index 2807457..40957b1 100644
--- a/providers/general/device/BUILD
+++ b/providers/general/device/BUILD
@@ -1,4 +1,9 @@
-COMPILE_DEPS = CORE_DEPS + JACKSON
+COMPILE_DEPS = CORE_DEPS + JACKSON + [
+ "//protocols/gnmi/stub:onos-protocols-gnmi-stub",
+ "//protocols/gnmi/api:onos-protocols-gnmi-api",
+ "@com_google_protobuf//:protobuf_java",
+ "//protocols/grpc/api:onos-protocols-grpc-api",
+]
osgi_jar_with_tests(
test_deps = TEST_ADAPTERS,
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 3782871..ff19783 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -32,6 +32,7 @@
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.CoreService;
+import org.onosproject.gnmi.api.GnmiController;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.DefaultAnnotations;
@@ -159,6 +160,16 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private PiPipeconfWatchdogService pipeconfWatchdogService;
+ // FIXME: no longer general if we add a dependency to a protocol-specific
+ // service. Possible solutions are: rename this provider to
+ // StratumDeviceProvider, find a way to allow this provider to register for
+ // protocol specific events (e.g. port events) via drivers (similar to
+ // DeviceAgentListener).
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private GnmiController gnmiController;
+
+ private GnmiDeviceStateSubscriber gnmiDeviceStateSubscriber;
+
private static final String STATS_POLL_FREQUENCY = "deviceStatsPollFrequency";
private static final int DEFAULT_STATS_POLL_FREQUENCY = 10;
@Property(name = STATS_POLL_FREQUENCY, intValue = DEFAULT_STATS_POLL_FREQUENCY,
@@ -224,6 +235,9 @@
pipeconfWatchdogService.addListener(pipeconfWatchdogListener);
rescheduleProbeTask(false);
modified(context);
+ gnmiDeviceStateSubscriber = new GnmiDeviceStateSubscriber(gnmiController,
+ deviceService, mastershipService, providerService);
+ gnmiDeviceStateSubscriber.activate();
log.info("Started");
}
@@ -312,6 +326,8 @@
providerRegistry.unregister(this);
providerService = null;
cfgService.unregisterConfigFactory(factory);
+ gnmiDeviceStateSubscriber.deactivate();
+ gnmiDeviceStateSubscriber = null;
log.info("Stopped");
}
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
new file mode 100644
index 0000000..b253f3e
--- /dev/null
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.provider.general.device.impl;
+
+import com.google.common.annotations.Beta;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Striped;
+import gnmi.Gnmi.Notification;
+import gnmi.Gnmi.Path;
+import gnmi.Gnmi.PathElem;
+import gnmi.Gnmi.SubscribeRequest;
+import gnmi.Gnmi.Subscription;
+import gnmi.Gnmi.SubscriptionList;
+import gnmi.Gnmi.SubscriptionMode;
+import gnmi.Gnmi.Update;
+import org.onlab.util.SharedExecutors;
+import org.onosproject.gnmi.api.GnmiClient;
+import org.onosproject.gnmi.api.GnmiController;
+import org.onosproject.gnmi.api.GnmiEvent;
+import org.onosproject.gnmi.api.GnmiEventListener;
+import org.onosproject.gnmi.api.GnmiUpdate;
+import org.onosproject.gnmi.api.GnmiUtils;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipListener;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.SparseAnnotations;
+import org.onosproject.net.device.DefaultPortDescription;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.device.PortDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Entity that manages gNMI subscription for devices using OpenConfig models and
+ * that reports relevant events to the core.
+ */
+@Beta
+class GnmiDeviceStateSubscriber {
+
+ private static Logger log = LoggerFactory.getLogger(GnmiDeviceStateSubscriber.class);
+
+ private final GnmiController gnmiController;
+ private final DeviceService deviceService;
+ private final DeviceProviderService providerService;
+ private final MastershipService mastershipService;
+
+ private final ExecutorService executorService = SharedExecutors.getPoolThreadExecutor();
+
+ private final InternalGnmiEventListener gnmiEventListener = new InternalGnmiEventListener();
+ private final InternalDeviceListener deviceEventListener = new InternalDeviceListener();
+ private final InternalMastershipListener mastershipListener = new InternalMastershipListener();
+ private final Collection<DeviceId> deviceSubscribed = Sets.newHashSet();
+
+ private final Striped<Lock> deviceLocks = Striped.lock(30);
+
+ GnmiDeviceStateSubscriber(GnmiController gnmiController, DeviceService deviceService,
+ MastershipService mastershipService,
+ DeviceProviderService providerService) {
+ this.gnmiController = gnmiController;
+ this.deviceService = deviceService;
+ this.mastershipService = mastershipService;
+ this.providerService = providerService;
+ }
+
+ public void activate() {
+ deviceService.addListener(deviceEventListener);
+ mastershipService.addListener(mastershipListener);
+ gnmiController.addListener(gnmiEventListener);
+ // Subscribe to existing devices.
+ deviceService.getDevices().forEach(d -> executorService.execute(
+ () -> checkDeviceSubscription(d.id())));
+ }
+
+ public void deactivate() {
+ deviceSubscribed.forEach(this::unsubscribeIfNeeded);
+ deviceService.removeListener(deviceEventListener);
+ mastershipService.removeListener(mastershipListener);
+ gnmiController.removeListener(gnmiEventListener);
+ }
+
+ private void checkDeviceSubscription(DeviceId deviceId) {
+ deviceLocks.get(deviceId).lock();
+ try {
+ if (!deviceService.isAvailable(deviceId)
+ || deviceService.getDevice(deviceId) == null
+ || !mastershipService.isLocalMaster(deviceId)) {
+ // Device not available/removed or this instance is no longer
+ // master.
+ unsubscribeIfNeeded(deviceId);
+ } else {
+ subscribeIfNeeded(deviceId);
+ }
+ } finally {
+ deviceLocks.get(deviceId).unlock();
+ }
+ }
+
+ private Path interfaceOperStatusPath(String interfaceName) {
+ return Path.newBuilder()
+ .addElem(PathElem.newBuilder().setName("interfaces").build())
+ .addElem(PathElem.newBuilder()
+ .setName("interface").putKey("name", interfaceName).build())
+ .addElem(PathElem.newBuilder().setName("state").build())
+ .addElem(PathElem.newBuilder().setName("oper-status").build())
+ .build();
+ }
+
+ private void unsubscribeIfNeeded(DeviceId deviceId) {
+ if (!deviceSubscribed.contains(deviceId)) {
+ // Not subscribed.
+ return;
+ }
+ GnmiClient client = gnmiController.getClient(deviceId);
+ if (client == null) {
+ log.debug("Cannot find gNMI client for device {}", deviceId);
+ } else {
+ client.terminateSubscriptionChannel();
+ }
+ deviceSubscribed.remove(deviceId);
+ }
+
+ private void subscribeIfNeeded(DeviceId deviceId) {
+ if (deviceSubscribed.contains(deviceId)) {
+ // Already subscribed.
+ // FIXME: if a new port is added after the first subscription we are
+ // not subscribing to the new port.
+ return;
+ }
+
+ GnmiClient client = gnmiController.getClient(deviceId);
+ if (client == null) {
+ log.warn("Cannot find gNMI client for device {}", deviceId);
+ return;
+ }
+
+ List<Port> ports = deviceService.getPorts(deviceId);
+ SubscriptionList.Builder subscriptionList = SubscriptionList.newBuilder();
+ subscriptionList.setMode(SubscriptionList.Mode.STREAM);
+ subscriptionList.setUpdatesOnly(true);
+
+ ports.forEach(port -> {
+ String portName = port.number().name();
+ // Subscribe /interface/interface[name=port-name]/state/oper-status
+ Path subscribePath = interfaceOperStatusPath(portName);
+ Subscription interfaceOperStatusSub =
+ Subscription.newBuilder()
+ .setPath(subscribePath)
+ .setMode(SubscriptionMode.ON_CHANGE)
+ .build();
+ // TODO: more state subscription
+ subscriptionList.addSubscription(interfaceOperStatusSub);
+ });
+
+ SubscribeRequest subscribeRequest = SubscribeRequest.newBuilder()
+ .setSubscribe(subscriptionList.build())
+ .build();
+
+ client.subscribe(subscribeRequest);
+
+ deviceSubscribed.add(deviceId);
+ }
+
+ private void handleGnmiUpdate(GnmiUpdate eventSubject) {
+ Notification notification = eventSubject.update();
+ if (notification == null) {
+ log.warn("Cannot handle gNMI event without update data, abort");
+ log.debug("gNMI update:\n{}", eventSubject);
+ return;
+ }
+
+ List<Update> updateList = notification.getUpdateList();
+ updateList.forEach(update -> {
+ Path path = update.getPath();
+ PathElem lastElem = path.getElem(path.getElemCount() - 1);
+
+ // Use last element to identify which state updated
+ if ("oper-status".equals(lastElem.getName())) {
+ handleOperStatusUpdate(eventSubject.deviceId(), update);
+ } else {
+ log.debug("Unrecognized update {}", GnmiUtils.pathToString(path));
+ }
+ });
+ }
+
+ private void handleOperStatusUpdate(DeviceId deviceId, Update update) {
+ Path path = update.getPath();
+ // first element should be "interface"
+ String interfaceName = path.getElem(1).getKeyOrDefault("name", null);
+ if (interfaceName == null) {
+ log.error("No interface present in gNMI update, abort");
+ log.debug("gNMI update:\n{}", update);
+ return;
+ }
+
+ List<Port> portsFromDevice = deviceService.getPorts(deviceId);
+ portsFromDevice.forEach(port -> {
+ if (!port.number().name().equals(interfaceName)) {
+ return;
+ }
+
+ // Port/Interface name is identical in OpenConfig model, but not in ONOS
+ // This might cause some problem if we use one name to different port
+ PortDescription portDescription = DefaultPortDescription.builder()
+ .portSpeed(port.portSpeed())
+ .withPortNumber(port.number())
+ .isEnabled(update.getVal().getStringVal().equals("UP"))
+ .type(port.type())
+ .annotations((SparseAnnotations) port.annotations())
+ .build();
+ providerService.portStatusChanged(deviceId, portDescription);
+ });
+ }
+
+ class InternalGnmiEventListener implements GnmiEventListener {
+
+ @Override
+ public void event(GnmiEvent event) {
+ if (!deviceSubscribed.contains(event.subject().deviceId())) {
+ log.warn("Received gNMI event from {}, but we are not subscribed to it",
+ event.subject().deviceId());
+ }
+ log.debug("Received gNMI event {}", event.toString());
+ if (event.type() == GnmiEvent.Type.UPDATE) {
+ executorService.execute(
+ () -> handleGnmiUpdate((GnmiUpdate) event.subject()));
+ } else {
+ log.debug("Unsupported gNMI event type: {}", event.type());
+ }
+ }
+ }
+
+ class InternalMastershipListener implements MastershipListener {
+
+ @Override
+ public void event(MastershipEvent event) {
+ executorService.execute(() -> checkDeviceSubscription(event.subject()));
+ }
+ }
+
+ class InternalDeviceListener implements DeviceListener {
+
+ @Override
+ public void event(DeviceEvent event) {
+ switch (event.type()) {
+ case DEVICE_ADDED:
+ case DEVICE_AVAILABILITY_CHANGED:
+ case DEVICE_UPDATED:
+ case DEVICE_REMOVED:
+ executorService.execute(
+ () -> checkDeviceSubscription(event.subject().id()));
+ break;
+ default:
+ break;
+ }
+ }
+ }
+}