blob: 2c2d8646fc61d0c4447cc4a250104f97f5ae535b [file] [log] [blame]
Aaron Kruglikovae7e3b82017-05-03 14:13:53 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.protobuf.registry;
18
19import com.google.common.collect.Maps;
20import io.grpc.BindableService;
21import io.grpc.Server;
22import io.grpc.ServerBuilder;
23import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Modified;
27import org.apache.felix.scr.annotations.Property;
28import org.apache.felix.scr.annotations.Service;
29import org.onosproject.protobuf.api.GrpcServiceRegistry;
30import org.osgi.service.component.ComponentContext;
31import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34import java.io.IOException;
35import java.util.Dictionary;
36import java.util.Map;
37import java.util.concurrent.TimeUnit;
Jian Li10d99622017-08-26 00:58:24 +090038import java.util.concurrent.atomic.AtomicBoolean;
Aaron Kruglikovae7e3b82017-05-03 14:13:53 -070039
40import static org.onlab.util.Tools.get;
41
42/**
43 * A basic implementation of {@link GrpcServiceRegistry} designed for use with
44 * built in gRPC services.
45 *
46 * NOTE: this is an early implementation in which the addition of any new
47 * service forces a restart of the server, this is sufficient for testing but
48 * inappropriate for deployment.
49 */
50@Service
51@Component(immediate = false)
52public class GrpcServiceRegistryImpl implements GrpcServiceRegistry {
53
54 private static final int DEFAULT_SERVER_PORT = 64000;
55 private static final int DEFAULT_SHUTDOWN_TIME = 1;
Jian Li10d99622017-08-26 00:58:24 +090056 private static final AtomicBoolean SERVICES_MODIFIED_SINCE_START = new AtomicBoolean(false);
Aaron Kruglikovae7e3b82017-05-03 14:13:53 -070057
58 private static final String PORT_PROPERTY_NAME = "listeningPort";
59
60 private final Map<Class<? extends BindableService>, BindableService> registeredServices =
61 Maps.newHashMap();
62 private final Logger log = LoggerFactory.getLogger(getClass());
63
64 private Server server;
65
66 /* It is currently the responsibility of the administrator to notify
67 clients of nonstandard port usage as there is no mechanism available to
68 discover the port hosting gRPC services.
69 */
70 @Property(name = PORT_PROPERTY_NAME, intValue = DEFAULT_SERVER_PORT,
71 label = "The port number which ONOS will use to host gRPC services.")
72 private int listeningPort = DEFAULT_SERVER_PORT;
73
74 @Activate
75 public void activate() {
76 log.info("Started");
77 }
78
79 @Deactivate
80 public void deactivate() {
81 attemptGracefulShutdownThenForce(DEFAULT_SHUTDOWN_TIME);
82 log.info("Stopped");
83 }
84
85 @Modified
86 public void modified(ComponentContext context) {
87 if (context != null) {
88 setProperties(context);
89 }
90 log.info("Connection was restarted to allow service to be added, " +
91 "this is a temporary workaround");
92 restartServer(listeningPort);
93 }
94
95 @Override
96 public boolean register(BindableService service) {
97 synchronized (registeredServices) {
98 if (!registeredServices.containsKey(service.getClass())) {
99 registeredServices.put(service.getClass(), service);
100 } else {
101 log.warn("The specified class \"{}\" was not added becuase an " +
102 "instance of the class is already registered.",
103 service.getClass().toString());
104 return false;
105 }
106 }
107 return restartServer(listeningPort);
108 }
109
110 @Override
111 public boolean unregister(BindableService service) {
112 synchronized (registeredServices) {
113 if (registeredServices.containsKey(service.getClass())) {
114 registeredServices.remove(service.getClass());
115 } else {
116 log.warn("The specified class \"{}\" was not removed because it " +
117 "was not present.", service.getClass().toString());
118 return false;
119 }
120 }
121 return restartServer(listeningPort);
122 }
123
124 @Override
125 public boolean containsService(Class<BindableService> serviceClass) {
126 return registeredServices.containsKey(serviceClass);
127 }
128
129 private void setProperties(ComponentContext context) {
130 Dictionary<String, Object> properties = context.getProperties();
131 String listeningPort = get(properties, PORT_PROPERTY_NAME);
132 this.listeningPort = listeningPort == null ? DEFAULT_SERVER_PORT :
133 Integer.parseInt(listeningPort.trim());
134 }
135
136 /**
137 * Attempts a graceful shutdown allowing {@code timeLimitSeconds} to elapse
138 * before forcing a shutdown.
139 *
140 * @param timeLimitSeconds time before a shutdown is forced in seconds
141 * @return true if the server is terminated, false otherwise
142 */
143 private boolean attemptGracefulShutdownThenForce(int timeLimitSeconds) {
144 if (!server.isShutdown()) {
145 server.shutdown();
146 }
147 try {
148 /*This is not conditional in case the server is shutdown but
149 handling requests submitted before shutdown was called.*/
150 server.awaitTermination(timeLimitSeconds, TimeUnit.SECONDS);
151 } catch (InterruptedException e) {
152 log.error("Awaiting server termination failed with error {}",
153 e.getMessage());
Ray Milkey5c7d4882018-02-05 14:50:39 -0800154 Thread.currentThread().interrupt();
Aaron Kruglikovae7e3b82017-05-03 14:13:53 -0700155 }
156 if (!server.isTerminated()) {
157 server.shutdownNow();
158 try {
159 server.awaitTermination(10, TimeUnit.MILLISECONDS);
160 } catch (InterruptedException e) {
161 log.error("Server failed to terminate as expected with error" +
162 " {}", e.getMessage());
Ray Milkey5c7d4882018-02-05 14:50:39 -0800163 Thread.currentThread().interrupt();
Aaron Kruglikovae7e3b82017-05-03 14:13:53 -0700164 }
165 }
166 return server.isTerminated();
167 }
168
169 private boolean restartServer(int port) {
170 if (!attemptGracefulShutdownThenForce(DEFAULT_SHUTDOWN_TIME)) {
171 log.error("Shutdown failed, the previous server may still be" +
172 " active.");
173 }
174 return createServerAndStart(port);
175 }
176
177 /**
178 * Creates a server with the set of registered services on the specified
179 * port.
180 *
181 * @param port the port on which this server will listen
182 * @return true if the server was started successfully, false otherwise
183 */
184 private boolean createServerAndStart(int port) {
185
186 ServerBuilder serverBuilder =
187 ServerBuilder.forPort(port);
188 synchronized (registeredServices) {
189 registeredServices.values().forEach(
190 service -> serverBuilder.addService(service));
191 }
192 server = serverBuilder.build();
193 try {
194 server.start();
195 } catch (IllegalStateException e) {
196 log.error("The server could not be started because an existing " +
197 "server is already running: {}", e.getMessage());
198 return false;
199 } catch (IOException e) {
200 log.error("The server could not be started due to a failure to " +
201 "bind: {} ", e.getMessage());
202 return false;
203 }
204 return true;
205 }
206}