blob: 4f43fa6533b3ad625c5d1ceea3792a076225c96b [file] [log] [blame]
HIGUCHI Yuta15653fd2015-11-09 11:05:09 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.rpc.grpc;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static java.util.stream.Collectors.toList;
20import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
21import static org.onosproject.net.DeviceId.deviceId;
22
23import java.io.IOException;
24import java.util.Set;
25import java.util.concurrent.CompletableFuture;
26import java.util.concurrent.ExecutionException;
27import java.util.concurrent.TimeUnit;
28import java.util.concurrent.TimeoutException;
29import java.util.concurrent.atomic.AtomicInteger;
30
31import org.apache.felix.scr.annotations.Activate;
32import org.apache.felix.scr.annotations.Component;
33import org.apache.felix.scr.annotations.Deactivate;
34import org.apache.felix.scr.annotations.Modified;
35import org.apache.felix.scr.annotations.Property;
36import org.apache.felix.scr.annotations.Reference;
37import org.apache.felix.scr.annotations.ReferenceCardinality;
38import org.onosproject.grpc.Device.DeviceConnected;
39import org.onosproject.grpc.Device.DeviceDisconnected;
40import org.onosproject.grpc.Device.DeviceProviderMsg;
41import org.onosproject.grpc.Device.DeviceProviderServiceMsg;
42import org.onosproject.grpc.Device.IsReachableResponse;
43import org.onosproject.grpc.Device.PortStatusChanged;
44import org.onosproject.grpc.Device.ReceivedRoleReply;
45import org.onosproject.grpc.Device.RegisterProvider;
46import org.onosproject.grpc.Device.UpdatePortStatistics;
47import org.onosproject.grpc.Device.UpdatePorts;
48import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
49import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
50import org.onosproject.net.DeviceId;
51import org.onosproject.net.MastershipRole;
52import org.onosproject.net.device.DeviceProvider;
53import org.onosproject.net.device.DeviceProviderRegistry;
54import org.onosproject.net.device.DeviceProviderService;
55import org.onosproject.net.provider.ProviderId;
56import org.osgi.service.component.ComponentContext;
57import org.slf4j.Logger;
58import org.slf4j.LoggerFactory;
59
60import com.google.common.cache.Cache;
61import com.google.common.cache.CacheBuilder;
62import com.google.common.collect.Sets;
63
64import io.grpc.Server;
65import io.grpc.netty.NettyServerBuilder;
66import io.grpc.stub.StreamObserver;
67
68// gRPC Server on Metro-side
69// Translates request received on RPC channel, and calls corresponding Service on
70// Metro-ONOS cluster.
71/**
72 * Server side implementation of gRPC based RemoteService.
73 */
74@Component(immediate = true)
75public class GrpcRemoteServiceServer {
76
77 private static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
78
79 // TODO pick a number
80 public static final int DEFAULT_LISTEN_PORT = 11984;
81
82 private final Logger log = LoggerFactory.getLogger(getClass());
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected DeviceProviderRegistry deviceProviderRegistry;
86
87
88 @Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT,
89 label = "Port to listen on")
90 protected int listenPort = DEFAULT_LISTEN_PORT;
91
92 private Server server;
93 private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet();
94
95 @Activate
96 protected void activate(ComponentContext context) throws IOException {
97 modified(context);
98
99 log.debug("Server starting on {}", listenPort);
100 try {
101 server = NettyServerBuilder.forPort(listenPort)
102 .addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy()))
103 .build().start();
104 } catch (IOException e) {
105 log.error("Failed to start gRPC server", e);
106 throw e;
107 }
108
109 log.info("Started on {}", listenPort);
110 }
111
112 @Deactivate
113 protected void deactivate() {
114
115 registeredProviders.stream()
116 .forEach(deviceProviderRegistry::unregister);
117
118 server.shutdown();
119 // Should we wait for shutdown?
120 log.info("Stopped");
121 }
122
123 @Modified
124 public void modified(ComponentContext context) {
125 // TODO support dynamic reconfiguration and restarting server?
126 }
127
128 // RPC Server-side code
129 // RPC session Factory
130 /**
131 * Relays DeviceProviderRegistry calls from RPC client.
132 */
133 class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc {
134
135 @Override
136 public StreamObserver<DeviceProviderServiceMsg> register(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
137 log.trace("DeviceProviderRegistryServerProxy#register called!");
138
139 DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider);
140
141 return new DeviceProviderServiceServerProxy(provider, toDeviceProvider);
142 }
143 }
144
145 // Lower -> Upper Controller message
146 // RPC Server-side code
147 // RPC session handler
148 private final class DeviceProviderServiceServerProxy
149 implements StreamObserver<DeviceProviderServiceMsg> {
150
151 // intentionally shadowing
152 private final Logger log = LoggerFactory.getLogger(getClass());
153
154 private final DeviceProviderServerProxy pairedProvider;
155 private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
156
157 private final Cache<Integer, CompletableFuture<Boolean>> outstandingIsReachable;
158
159 // wrapped providerService
160 private DeviceProviderService deviceProviderService;
161
162
163 DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider,
164 StreamObserver<DeviceProviderMsg> toDeviceProvider) {
165 this.pairedProvider = provider;
166 this.toDeviceProvider = toDeviceProvider;
167 outstandingIsReachable = CacheBuilder.newBuilder()
168 .expireAfterWrite(1, TimeUnit.MINUTES)
169 .build();
170
171 // pair RPC session in other direction
172 provider.pair(this);
173 }
174
175 @Override
176 public void onNext(DeviceProviderServiceMsg msg) {
177 try {
178 log.trace("DeviceProviderServiceServerProxy received: {}", msg);
179 onMethod(msg);
180 } catch (Exception e) {
181 log.error("Exception thrown handling {}", msg, e);
182 onError(e);
183 throw e;
184 }
185 }
186
187 /**
188 * Translates received RPC message to {@link DeviceProviderService} method calls.
189 * @param msg DeviceProviderService message
190 */
191 private void onMethod(DeviceProviderServiceMsg msg) {
192 switch (msg.getMethodCase()) {
193 case REGISTER_PROVIDER:
194 RegisterProvider registerProvider = msg.getRegisterProvider();
195 // TODO Do we care about provider name?
196 pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME));
197 registeredProviders.add(pairedProvider);
198 deviceProviderService = deviceProviderRegistry.register(pairedProvider);
199 break;
200
201 case DEVICE_CONNECTED:
202 DeviceConnected deviceConnected = msg.getDeviceConnected();
203 deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()),
204 translate(deviceConnected.getDeviceDescription()));
205 break;
206 case DEVICE_DISCONNECTED:
207 DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected();
208 deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId()));
209 break;
210 case UPDATE_PORTS:
211 UpdatePorts updatePorts = msg.getUpdatePorts();
212 deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()),
213 updatePorts.getPortDescriptionsList()
214 .stream()
215 .map(GrpcDeviceUtils::translate)
216 .collect(toList()));
217 break;
218 case PORT_STATUS_CHANGED:
219 PortStatusChanged portStatusChanged = msg.getPortStatusChanged();
220 deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()),
221 translate(portStatusChanged.getPortDescription()));
222 break;
223 case RECEIVED_ROLE_REPLY:
224 ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply();
225 deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()),
226 translate(receivedRoleReply.getRequested()),
227 translate(receivedRoleReply.getResponse()));
228 break;
229 case UPDATE_PORT_STATISTICS:
230 UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics();
231 deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()),
232 updatePortStatistics.getPortStatisticsList()
233 .stream()
234 .map(GrpcDeviceUtils::translate)
235 .collect(toList()));
236 break;
237
238 // return value of DeviceProvider#isReachable
239 case IS_REACHABLE_RESPONSE:
240 IsReachableResponse isReachableResponse = msg.getIsReachableResponse();
241 int xid = isReachableResponse.getXid();
242 boolean isReachable = isReachableResponse.getIsReachable();
243 CompletableFuture<Boolean> result = outstandingIsReachable.asMap().remove(xid);
244 if (result != null) {
245 result.complete(isReachable);
246 }
247 break;
248
249 case METHOD_NOT_SET:
250 default:
251 log.warn("Unexpected message received {}", msg);
252 break;
253 }
254 }
255
256 @Override
257 public void onCompleted() {
258 log.info("DeviceProviderServiceServerProxy completed");
259 deviceProviderRegistry.unregister(pairedProvider);
260 registeredProviders.remove(pairedProvider);
261 toDeviceProvider.onCompleted();
262 }
263
264 @Override
265 public void onError(Throwable e) {
266 log.error("DeviceProviderServiceServerProxy#onError", e);
267 deviceProviderRegistry.unregister(pairedProvider);
268 registeredProviders.remove(pairedProvider);
269 // TODO What is the proper clean up for bi-di stream on error?
270 // sample suggests no-op
271 toDeviceProvider.onError(e);
272 }
273
274
275 /**
276 * Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value.
277 * @param xid IsReachable call ID.
278 * @param reply Future to
279 */
280 void register(int xid, CompletableFuture<Boolean> reply) {
281 outstandingIsReachable.put(xid, reply);
282 }
283
284 }
285
286 // Upper -> Lower Controller message
287 /**
288 * Relay DeviceProvider calls to RPC client.
289 */
290 private final class DeviceProviderServerProxy
291 implements DeviceProvider {
292
293 private final Logger log = LoggerFactory.getLogger(getClass());
294
295 // xid for isReachable calls
296 private final AtomicInteger xidPool = new AtomicInteger();
297 private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
298
299 private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null;
300 private ProviderId providerId;
301
302 DeviceProviderServerProxy(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
303 this.toDeviceProvider = toDeviceProvider;
304 }
305
306 void setProviderId(ProviderId pid) {
307 this.providerId = pid;
308 }
309
310 /**
311 * Registers RPC stream in other direction.
312 * @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy}
313 */
314 void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) {
315 this.deviceProviderServiceProxy = deviceProviderServiceProxy;
316 }
317
318 @Override
319 public void triggerProbe(DeviceId deviceId) {
320 log.trace("triggerProbe({})", deviceId);
321 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
322 msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder()
323 .setDeviceId(deviceId.toString())
324 .build());
325 DeviceProviderMsg triggerProbeMsg = msgBuilder.build();
326 toDeviceProvider.onNext(triggerProbeMsg);
327 // TODO Catch Exceptions and call onError()
328 }
329
330 @Override
331 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
332 log.trace("roleChanged({}, {})", deviceId, newRole);
333 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
334 msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder()
335 .setDeviceId(deviceId.toString())
336 .setNewRole(translate(newRole))
337 .build());
338 toDeviceProvider.onNext(msgBuilder.build());
339 // TODO Catch Exceptions and call onError()
340 }
341
342 @Override
343 public boolean isReachable(DeviceId deviceId) {
344 log.trace("isReachable({})", deviceId);
345 CompletableFuture<Boolean> result = new CompletableFuture<>();
346 final int xid = xidPool.incrementAndGet();
347
348 DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
349 msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder()
350 .setXid(xid)
351 .setDeviceId(deviceId.toString())
352 .build());
353
354 // Associate xid and register above future some where
355 // in DeviceProviderService channel to receive reply
356 if (deviceProviderServiceProxy != null) {
357 deviceProviderServiceProxy.register(xid, result);
358 }
359
360 // send message down RPC
361 toDeviceProvider.onNext(msgBuilder.build());
362
363 // wait for reply
364 try {
365 return result.get(10, TimeUnit.SECONDS);
366 } catch (InterruptedException e) {
367 log.debug("isReachable({}) was Interrupted", deviceId, e);
368 Thread.currentThread().interrupt();
369 } catch (TimeoutException e) {
370 log.warn("isReachable({}) Timed out", deviceId, e);
371 } catch (ExecutionException e) {
372 log.error("isReachable({}) Execution failed", deviceId, e);
373 // close session?
374 }
375 return false;
376 // TODO Catch Exceptions and call onError()
377 }
378
379 @Override
380 public ProviderId id() {
381 return checkNotNull(providerId, "not initialized yet");
382 }
383
384 }
385}