blob: 73bb807609f36e8fc0337c66ed04a7253062071c [file] [log] [blame]
Yi Tsenga7f76c12018-12-14 14:19:18 -08001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
Carmelo Cascone3370c962019-02-07 18:24:19 -08008 * http://www.apache.org/licenses/LICENSE-2.0
Yi Tsenga7f76c12018-12-14 14:19:18 -08009 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Carmelo Cascone3370c962019-02-07 18:24:19 -080017package org.onosproject.gnmi.ctl;
Yi Tsenga7f76c12018-12-14 14:19:18 -080018
19
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080020import com.google.common.util.concurrent.Futures;
Yi Tsenga7f76c12018-12-14 14:19:18 -080021import gnmi.Gnmi;
Yi Tsenga7f76c12018-12-14 14:19:18 -080022import io.grpc.StatusRuntimeException;
23import io.grpc.stub.ClientCallStreamObserver;
24import io.grpc.stub.StreamObserver;
25import org.onosproject.gnmi.api.GnmiEvent;
26import org.onosproject.gnmi.api.GnmiUpdate;
27import org.onosproject.net.DeviceId;
28import org.slf4j.Logger;
29
30import java.net.ConnectException;
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080031import java.util.concurrent.Future;
Yi Tsenga7f76c12018-12-14 14:19:18 -080032import java.util.concurrent.ScheduledExecutorService;
33import java.util.concurrent.TimeUnit;
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080034import java.util.concurrent.atomic.AtomicBoolean;
Yi Tsenga7f76c12018-12-14 14:19:18 -080035
36import static java.lang.String.format;
37import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
38import static org.onlab.util.Tools.groupedThreads;
39import static org.slf4j.LoggerFactory.getLogger;
40
41/**
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080042 * A manager for the gNMI Subscribe RPC that opportunistically starts new RPC
43 * (e.g. when one fails because of errors) and posts subscribe events via the
44 * gNMI controller.
Yi Tsenga7f76c12018-12-14 14:19:18 -080045 */
46final class GnmiSubscriptionManager {
47
Yi Tsenga7f76c12018-12-14 14:19:18 -080048 // FIXME: make this configurable
49 private static final long DEFAULT_RECONNECT_DELAY = 5; // Seconds
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080050
Yi Tsenga7f76c12018-12-14 14:19:18 -080051 private static final Logger log = getLogger(GnmiSubscriptionManager.class);
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080052
53 private final GnmiClientImpl client;
Yi Tsenga7f76c12018-12-14 14:19:18 -080054 private final DeviceId deviceId;
55 private final GnmiControllerImpl controller;
Yi Tsenga7f76c12018-12-14 14:19:18 -080056 private final StreamObserver<Gnmi.SubscribeResponse> responseObserver;
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080057
58 private final ScheduledExecutorService streamCheckerExecutor =
59 newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-subscribe-check", "%d", log));
60 private Future<?> checkTask;
Yi Tsenga7f76c12018-12-14 14:19:18 -080061
62 private ClientCallStreamObserver<Gnmi.SubscribeRequest> requestObserver;
63 private Gnmi.SubscribeRequest existingSubscription;
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080064 private AtomicBoolean active = new AtomicBoolean(false);
Yi Tsenga7f76c12018-12-14 14:19:18 -080065
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080066 GnmiSubscriptionManager(GnmiClientImpl client, DeviceId deviceId,
Yi Tsenga7f76c12018-12-14 14:19:18 -080067 GnmiControllerImpl controller) {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080068 this.client = client;
Yi Tsenga7f76c12018-12-14 14:19:18 -080069 this.deviceId = deviceId;
70 this.controller = controller;
71 this.responseObserver = new InternalStreamResponseObserver();
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080072 }
73
74 void subscribe(Gnmi.SubscribeRequest request) {
75 synchronized (this) {
76 if (existingSubscription != null) {
77 if (existingSubscription.equals(request)) {
78 // Nothing to do. We are already subscribed for the same
79 // request.
80 log.debug("Ignoring re-subscription to same request",
81 deviceId);
82 return;
83 }
84 log.debug("Cancelling existing subscription for {} before " +
85 "starting a new one", deviceId);
86 complete();
87 }
88 existingSubscription = request;
89 sendSubscribeRequest();
90 if (checkTask != null) {
91 checkTask = streamCheckerExecutor.scheduleAtFixedRate(
92 this::checkSubscription, 0,
93 DEFAULT_RECONNECT_DELAY,
94 TimeUnit.SECONDS);
95 }
96 }
97 }
98
99 void unsubscribe() {
100 synchronized (this) {
101 if (checkTask != null) {
102 checkTask.cancel(false);
103 checkTask = null;
104 }
105 existingSubscription = null;
106 complete();
107 }
Yi Tsenga7f76c12018-12-14 14:19:18 -0800108 }
109
110 public void shutdown() {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800111 log.debug("Shutting down gNMI subscription manager for {}", deviceId);
112 unsubscribe();
113 streamCheckerExecutor.shutdownNow();
Yi Tsenga7f76c12018-12-14 14:19:18 -0800114 }
115
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800116 private void checkSubscription() {
117 synchronized (this) {
118 if (existingSubscription != null && !active.get()) {
119 if (client.isServerReachable() || Futures.getUnchecked(client.probeService())) {
120 log.info("Re-starting Subscribe RPC for {}...", deviceId);
121 sendSubscribeRequest();
122 } else {
123 log.debug("Not restarting Subscribe RPC for {}, server is NOT reachable",
124 deviceId);
125 }
126 }
127 }
128 }
129
130 private void sendSubscribeRequest() {
Yi Tsenga7f76c12018-12-14 14:19:18 -0800131 if (requestObserver == null) {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800132 log.debug("Starting new Subscribe RPC for {}...", deviceId);
133 client.execRpcNoTimeout(
134 s -> requestObserver =
135 (ClientCallStreamObserver<Gnmi.SubscribeRequest>)
136 s.subscribe(responseObserver)
137 );
Yi Tsenga7f76c12018-12-14 14:19:18 -0800138 }
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800139 requestObserver.onNext(existingSubscription);
140 active.set(true);
Yi Tsenga7f76c12018-12-14 14:19:18 -0800141 }
142
143 public void complete() {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800144 synchronized (this) {
145 active.set(false);
Yi Tsenga7f76c12018-12-14 14:19:18 -0800146 if (requestObserver != null) {
147 requestObserver.onCompleted();
148 requestObserver.cancel("Terminated", null);
149 requestObserver = null;
150 }
151 }
152 }
153
Yi Tsenga7f76c12018-12-14 14:19:18 -0800154 /**
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800155 * Handles messages received from the device on the Subscribe RPC.
Yi Tsenga7f76c12018-12-14 14:19:18 -0800156 */
157 private final class InternalStreamResponseObserver
158 implements StreamObserver<Gnmi.SubscribeResponse> {
159
160 @Override
161 public void onNext(Gnmi.SubscribeResponse message) {
162 try {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800163 if (log.isTraceEnabled()) {
164 log.trace("Received SubscribeResponse from {}: {}",
165 deviceId, message.toString());
166 }
167 controller.postEvent(new GnmiEvent(GnmiEvent.Type.UPDATE, new GnmiUpdate(
168 deviceId, message.getUpdate(), message.getSyncResponse())));
Yi Tsenga7f76c12018-12-14 14:19:18 -0800169 } catch (Throwable ex) {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800170 log.error("Exception processing SubscribeResponse from " + deviceId,
171 ex);
Yi Tsenga7f76c12018-12-14 14:19:18 -0800172 }
173 }
174
175 @Override
176 public void onError(Throwable throwable) {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800177 complete();
Yi Tsenga7f76c12018-12-14 14:19:18 -0800178 if (throwable instanceof StatusRuntimeException) {
179 StatusRuntimeException sre = (StatusRuntimeException) throwable;
180 if (sre.getStatus().getCause() instanceof ConnectException) {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800181 log.warn("{} is unreachable ({})",
182 deviceId, sre.getCause().getMessage());
Yi Tsenga7f76c12018-12-14 14:19:18 -0800183 } else {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800184 log.warn("Error on Subscribe RPC for {}: {}",
185 deviceId, throwable.getMessage());
Yi Tsenga7f76c12018-12-14 14:19:18 -0800186 }
187 } else {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800188 log.error(format("Exception on Subscribe RPC for %s",
189 deviceId), throwable);
Yi Tsenga7f76c12018-12-14 14:19:18 -0800190 }
Yi Tsenga7f76c12018-12-14 14:19:18 -0800191 }
192
193 @Override
194 public void onCompleted() {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800195 complete();
196 log.warn("Subscribe RPC for {} has completed", deviceId);
Yi Tsenga7f76c12018-12-14 14:19:18 -0800197 }
198 }
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800199
200 @Override
201 protected void finalize() throws Throwable {
202 if (!streamCheckerExecutor.isShutdown()) {
203 log.error("Finalizing object but executor is still active! BUG? Shutting down...");
204 shutdown();
205 }
206 super.finalize();
207 }
Yi Tsenga7f76c12018-12-14 14:19:18 -0800208}
209
210