blob: 050925f624243163d44bb02a057d2c97564a1614 [file] [log] [blame]
Yi Tsenga7f76c12018-12-14 14:19:18 -08001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package protocols.gnmi.ctl.java.org.onosproject.gnmi.ctl;
18
19
20import gnmi.Gnmi;
21import gnmi.gNMIGrpc;
22import io.grpc.ManagedChannel;
23import io.grpc.StatusRuntimeException;
24import io.grpc.stub.ClientCallStreamObserver;
25import io.grpc.stub.StreamObserver;
26import org.onosproject.gnmi.api.GnmiEvent;
27import org.onosproject.gnmi.api.GnmiUpdate;
28import org.onosproject.net.DeviceId;
29import org.slf4j.Logger;
30
31import java.net.ConnectException;
32import java.util.concurrent.ScheduledExecutorService;
33import java.util.concurrent.TimeUnit;
34import java.util.concurrent.atomic.AtomicReference;
35
36import static java.lang.String.format;
37import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
38import static org.onlab.util.Tools.groupedThreads;
39import static org.slf4j.LoggerFactory.getLogger;
40
41/**
42 * A manager for the gNMI stream channel that opportunistically creates
43 * new stream RCP stubs (e.g. when one fails because of errors) and posts
44 * subscribe events via the gNMI controller.
45 */
46final class GnmiSubscriptionManager {
47
48 /**
49 * The state of the subscription manager.
50 */
51 enum State {
52
53 /**
54 * Subscription not exists.
55 */
56 INIT,
57
58 /**
59 * Exists a subscription and channel opened.
60 */
61 SUBSCRIBED,
62
63 /**
64 * Exists a subscription, but the channel does not open.
65 */
66 RETRYING,
67 }
68
69 // FIXME: make this configurable
70 private static final long DEFAULT_RECONNECT_DELAY = 5; // Seconds
71 private static final Logger log = getLogger(GnmiSubscriptionManager.class);
72 private final ManagedChannel channel;
73 private final DeviceId deviceId;
74 private final GnmiControllerImpl controller;
75
76 private final StreamObserver<Gnmi.SubscribeResponse> responseObserver;
77 private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
78
79 private ClientCallStreamObserver<Gnmi.SubscribeRequest> requestObserver;
80 private Gnmi.SubscribeRequest existingSubscription;
81 private final ScheduledExecutorService streamCheckerExecutor =
82 newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-probe", "%d", log));
83
84 GnmiSubscriptionManager(ManagedChannel channel, DeviceId deviceId,
85 GnmiControllerImpl controller) {
86 this.channel = channel;
87 this.deviceId = deviceId;
88 this.controller = controller;
89 this.responseObserver = new InternalStreamResponseObserver();
90 streamCheckerExecutor.scheduleAtFixedRate(this::checkGnmiStream, 0,
91 DEFAULT_RECONNECT_DELAY,
92 TimeUnit.SECONDS);
93 }
94
95 public void shutdown() {
96 log.info("gNMI subscription manager for device {} shutdown", deviceId);
97 streamCheckerExecutor.shutdown();
98 complete();
99 }
100
101 private void initIfRequired() {
102 if (requestObserver == null) {
103 log.debug("Creating new stream channel for {}...", deviceId);
104 requestObserver = (ClientCallStreamObserver<Gnmi.SubscribeRequest>)
105 gNMIGrpc.newStub(channel).subscribe(responseObserver);
106
107 }
108 }
109
110 boolean subscribe(Gnmi.SubscribeRequest request) {
111 synchronized (state) {
112 if (state.get() == State.SUBSCRIBED) {
113 // Cancel subscription when we need to subscribe new thing
114 complete();
115 }
116
117 existingSubscription = request;
118 return send(request);
119 }
120 }
121
122 private boolean send(Gnmi.SubscribeRequest value) {
123 initIfRequired();
124 try {
125 requestObserver.onNext(value);
126 state.set(State.SUBSCRIBED);
127 return true;
128 } catch (Throwable ex) {
129 if (ex instanceof StatusRuntimeException) {
130 log.warn("Unable to send subscribe request to {}: {}",
131 deviceId, ex.getMessage());
132 } else {
133 log.warn("Exception while sending subscribe request to {}",
134 deviceId, ex);
135 }
136 state.set(State.RETRYING);
137 return false;
138 }
139 }
140
141 public void complete() {
142 synchronized (state) {
143 state.set(State.INIT);
144 if (requestObserver != null) {
145 requestObserver.onCompleted();
146 requestObserver.cancel("Terminated", null);
147 requestObserver = null;
148 }
149 }
150 }
151
152 private void checkGnmiStream() {
153 synchronized (state) {
154 if (state.get() != State.RETRYING) {
155 // No need to retry if the state is not RETRYING
156 return;
157 }
158 log.info("Try reconnecting gNMI stream to device {}", deviceId);
159
160 complete();
161 send(existingSubscription);
162 }
163 }
164
165 /**
166 * Handles messages received from the device on the stream channel.
167 */
168 private final class InternalStreamResponseObserver
169 implements StreamObserver<Gnmi.SubscribeResponse> {
170
171 @Override
172 public void onNext(Gnmi.SubscribeResponse message) {
173 try {
174 log.debug("Received message on stream channel from {}: {}",
175 deviceId, message.toString());
176 GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
177 GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
178 controller.postEvent(event);
179 } catch (Throwable ex) {
180 log.error("Exception while processing stream message from {}",
181 deviceId, ex);
182 }
183 }
184
185 @Override
186 public void onError(Throwable throwable) {
187 if (throwable instanceof StatusRuntimeException) {
188 StatusRuntimeException sre = (StatusRuntimeException) throwable;
189 if (sre.getStatus().getCause() instanceof ConnectException) {
190 log.warn("Device {} is unreachable ({})",
191 deviceId, sre.getCause().getMessage());
192 } else {
193 log.warn("Received error on stream channel for {}: {}",
194 deviceId, throwable.getMessage());
195 }
196 } else {
197 log.warn(format("Received exception on stream channel for %s",
198 deviceId), throwable);
199 }
200 state.set(State.RETRYING);
201 }
202
203 @Override
204 public void onCompleted() {
205 log.warn("Stream channel for {} has completed", deviceId);
206 state.set(State.RETRYING);
207 }
208 }
209}
210
211