blob: 41f11d939ac55e1791626a5da6b53a88c8dcde4e [file] [log] [blame]
Jordan Haltermand04e3442018-07-02 17:40:07 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.cluster.impl;
17
Ray Milkeyd84f89b2018-08-17 14:54:17 -070018import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Maps;
20import org.onlab.util.OrderedExecutor;
21import org.onlab.util.Tools;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.NodeId;
24import org.onosproject.mastership.MastershipProxyFactory;
25import org.onosproject.mastership.MastershipProxyService;
26import org.onosproject.mastership.MastershipService;
27import org.onosproject.net.DeviceId;
28import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
29import org.onosproject.store.serializers.KryoNamespaces;
30import org.onosproject.store.service.Serializer;
31import org.osgi.service.component.annotations.Activate;
32import org.osgi.service.component.annotations.Component;
33import org.osgi.service.component.annotations.Deactivate;
34import org.osgi.service.component.annotations.Reference;
35import org.osgi.service.component.annotations.ReferenceCardinality;
36import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
Jordan Haltermand04e3442018-07-02 17:40:07 -070039import java.lang.reflect.Method;
40import java.lang.reflect.Proxy;
41import java.util.Map;
42import java.util.Objects;
43import java.util.Set;
44import java.util.concurrent.CompletableFuture;
45import java.util.concurrent.CompletionException;
46import java.util.concurrent.Executor;
47import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
49import java.util.function.Function;
50
Jordan Haltermand04e3442018-07-02 17:40:07 -070051import static com.google.common.base.Preconditions.checkArgument;
52import static org.onlab.util.Tools.groupedThreads;
53
54/**
55 * Mastership proxy service implementation.
56 * <p>
57 * This implementation wraps both the proxy service and the generated proxy instance in additional proxies which check
58 * mastership and route calls to the appropriate proxy instances.
59 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070060@Component(immediate = true, service = MastershipProxyService.class)
Jordan Haltermand04e3442018-07-02 17:40:07 -070061public class MastershipProxyManager extends AbstractProxyManager implements MastershipProxyService {
62
63 private static final Serializer REQUEST_SERIALIZER =
64 Serializer.using(KryoNamespaces.API, MastershipProxyRequest.class);
65 private static final String MESSAGE_PREFIX = "mastership-proxy";
66
67 private final Logger log = LoggerFactory.getLogger(getClass());
68
Ray Milkeyd84f89b2018-08-17 14:54:17 -070069 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Haltermand04e3442018-07-02 17:40:07 -070070 protected MastershipService mastershipService;
71
Ray Milkeyd84f89b2018-08-17 14:54:17 -070072 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Haltermand04e3442018-07-02 17:40:07 -070073 protected ClusterService clusterService;
74
Ray Milkeyd84f89b2018-08-17 14:54:17 -070075 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Haltermand04e3442018-07-02 17:40:07 -070076 protected ClusterCommunicationService clusterCommunicator;
77
78 private final ExecutorService proxyServiceExecutor =
79 Executors.newFixedThreadPool(8, groupedThreads("onos/proxy", "service-executor", log));
80
81 private final Map<Class, ProxyService> services = Maps.newConcurrentMap();
82 private NodeId localNodeId;
83
84 @Activate
85 public void activate() {
86 this.localNodeId = clusterService.getLocalNode().id();
87 log.info("Started");
88 }
89
90 @Deactivate
91 public void deactivate() {
92 proxyServiceExecutor.shutdownNow();
93 log.info("Stopped");
94 }
95
96 @Override
97 public <T> MastershipProxyFactory<T> getProxyFactory(Class<T> type, Serializer serializer) {
98 checkArgument(type.isInterface(), "proxy type must be an interface");
99 return new MastershipProxyManagerFactory<>(type, serializer);
100 }
101
102 @Override
103 public <T> void registerProxyService(Class<? super T> type, T instance, Serializer serializer) {
104 checkArgument(type.isInterface(), "proxy type must be an interface");
105 Executor executor = new OrderedExecutor(proxyServiceExecutor);
106 services.computeIfAbsent(type, t -> new ProxyService(instance, t, MESSAGE_PREFIX,
107 (i, m, o) -> new SyncOperationService(i, m, o, serializer, executor),
108 (i, m, o) -> new AsyncOperationService(i, m, o, serializer)));
109 }
110
111 @Override
112 public void unregisterProxyService(Class<?> type) {
113 ProxyService service = services.remove(type);
114 if (service != null) {
115 service.close();
116 }
117 }
118
119 /**
120 * Internal proxy factory.
121 */
122 private class MastershipProxyManagerFactory<T> implements MastershipProxyFactory<T> {
123 private final Class<T> type;
124 private final Serializer serializer;
125 private final Map<DeviceId, T> proxyInstances = Maps.newConcurrentMap();
126
127 MastershipProxyManagerFactory(Class<T> type, Serializer serializer) {
128 this.type = type;
129 this.serializer = serializer;
130 }
131
132 @Override
133 @SuppressWarnings("unchecked")
134 public T getProxyFor(DeviceId deviceId) {
135 // Avoid unnecessary locking of computeIfAbsent if possible.
136 T proxyInstance = proxyInstances.get(deviceId);
137 if (proxyInstance != null) {
138 return proxyInstance;
139 }
140 return proxyInstances.computeIfAbsent(deviceId, id -> (T) Proxy.newProxyInstance(
141 type.getClassLoader(),
142 new Class[]{type},
143 new ProxyInvocationHandler(type, MESSAGE_PREFIX,
144 o -> new SyncOperationHandler(o, deviceId, serializer),
145 o -> new AsyncOperationHandler(o, deviceId, serializer))));
146 }
147 }
148
149 /**
150 * Implementation of the operation service which handles synchronous method calls.
151 */
152 private class SyncOperationService
153 extends OperationService
154 implements Function<MastershipProxyRequest, Object> {
155
156 private final Serializer serializer;
157
158 SyncOperationService(
159 Object instance,
160 Method method,
161 Operation operation,
162 Serializer serializer,
163 Executor executor) {
164 super(instance, method, operation);
165 this.serializer = serializer;
166 clusterCommunicator.addSubscriber(
167 operation.subject(), REQUEST_SERIALIZER::decode, this, REQUEST_SERIALIZER::encode, executor);
168 }
169
170 @Override
171 public Object apply(MastershipProxyRequest request) {
172 NodeId master = mastershipService.getMasterFor(request.deviceId());
173 if (master == null) {
174 throw new IllegalStateException("No master found for device " + request.deviceId());
175 } else if (!Objects.equals(master, localNodeId)) {
176 // If the given node has already been visited by this node, that indicates a cycle in the intra-cluster
177 // communication. Reject the request.
178 if (request.visited().contains(localNodeId)) {
179 throw new IllegalStateException("Ambiguous master for device " + request.deviceId());
180 }
181
182 // If this node is being visited for the first time, update the request with the local node ID
183 // to prevent cyclic communication.
184 return clusterCommunicator.sendAndReceive(
185 request.visit(),
186 operation.subject(),
187 REQUEST_SERIALIZER::encode,
188 REQUEST_SERIALIZER::decode,
189 master);
190 } else {
191 // Unwrap the raw arguments and apply the method call to the registered proxy service.
192 return invoke(request.unwrap(serializer::decode));
193 }
194 }
195
196 @Override
197 void close() {
198 clusterCommunicator.removeSubscriber(operation.subject());
199 }
200 }
201
202 /**
203 * Implementation of the operation service which handles asynchronous method calls.
204 */
205 private class AsyncOperationService
206 extends OperationService
207 implements Function<MastershipProxyRequest, CompletableFuture<Object>> {
208
209 private final Serializer serializer;
210
211 AsyncOperationService(Object instance, Method method, Operation operation, Serializer serializer) {
212 super(instance, method, operation);
213 this.serializer = serializer;
214 clusterCommunicator.addSubscriber(
215 operation.subject(), REQUEST_SERIALIZER::decode, this, REQUEST_SERIALIZER::encode);
216 }
217
218 @Override
219 public CompletableFuture<Object> apply(MastershipProxyRequest request) {
220 NodeId master = mastershipService.getMasterFor(request.deviceId());
221 if (master == null) {
222 return Tools.exceptionalFuture(
223 new IllegalStateException("No master found for device " + request.deviceId()));
224 } else if (!Objects.equals(master, localNodeId)) {
225 // If the given node has already been visited by this node, that indicates a cycle in the intra-cluster
226 // communication. Reject the request.
227 if (request.visited().contains(localNodeId)) {
228 return Tools.exceptionalFuture(
229 new IllegalStateException("Ambiguous master for device " + request.deviceId()));
230 }
231
232 // If this node is being visited for the first time, update the request with the local node ID
233 // to prevent cyclic communication.
234 return clusterCommunicator.sendAndReceive(
235 request.visit(),
236 operation.subject(),
237 REQUEST_SERIALIZER::encode,
238 REQUEST_SERIALIZER::decode,
239 master);
240 } else {
241 // Unwrap the raw arguments and apply the method call to the registered proxy service.
242 return invoke(request.unwrap(serializer::decode));
243 }
244 }
245
246 @Override
247 void close() {
248 clusterCommunicator.removeSubscriber(operation.subject());
249 }
250 }
251
252 /**
253 * Handler for synchronous proxy operations which blocks on {@code ClusterCommunicationService} requests.
254 */
255 private class SyncOperationHandler extends OperationHandler {
256 private final DeviceId deviceId;
257 private final Serializer serializer;
258
259 SyncOperationHandler(Operation operation, DeviceId deviceId, Serializer serializer) {
260 super(operation);
261 this.deviceId = deviceId;
262 this.serializer = serializer;
263 }
264
265 @Override
266 public Object apply(Object[] args) {
267 NodeId master = mastershipService.getMasterFor(deviceId);
268 if (master == null) {
269 throw new IllegalStateException("No master found for device " + deviceId);
270 }
271
272 MastershipProxyRequest request = new MastershipProxyRequest(deviceId, serializer.encode(args));
273 try {
274 return clusterCommunicator.sendAndReceive(
275 request, operation.subject(), REQUEST_SERIALIZER::encode, REQUEST_SERIALIZER::decode, master)
276 .join();
277 } catch (CompletionException e) {
278 if (e.getCause() instanceof RuntimeException) {
279 throw (RuntimeException) e.getCause();
280 } else {
281 throw new IllegalStateException(e.getCause());
282 }
283 }
284 }
285 }
286
287 /**
288 * Handler for asynchronous proxy operations which uses async {@code ClusterCommunicationService} requests.
289 */
290 private class AsyncOperationHandler extends OperationHandler {
291 private final DeviceId deviceId;
292 private final Serializer serializer;
293
294 AsyncOperationHandler(Operation operation, DeviceId deviceId, Serializer serializer) {
295 super(operation);
296 this.deviceId = deviceId;
297 this.serializer = serializer;
298 }
299
300 @Override
301 public Object apply(Object[] args) {
302 NodeId master = mastershipService.getMasterFor(deviceId);
303 if (master == null) {
304 return Tools.exceptionalFuture(new IllegalStateException("No master found for device " + deviceId));
305 }
306
307 MastershipProxyRequest request = new MastershipProxyRequest(deviceId, serializer.encode(args));
308 return clusterCommunicator.sendAndReceive(
309 request, operation.subject(), REQUEST_SERIALIZER::encode, REQUEST_SERIALIZER::decode, master);
310 }
311 }
312
313 /**
314 * Internal arguments wrapper that contains the {@link DeviceId} for mastership checks.
315 */
316 private class MastershipProxyRequest {
317 private final DeviceId deviceId;
318 private final byte[] args;
319 private final Set<NodeId> visited;
320
321 MastershipProxyRequest(DeviceId deviceId, byte[] args) {
322 this(deviceId, args, ImmutableSet.of(localNodeId));
323 }
324
325 MastershipProxyRequest(DeviceId deviceId, byte[] args, Set<NodeId> visited) {
326 this.deviceId = deviceId;
327 this.args = args;
328 this.visited = visited;
329 }
330
331 /**
332 * Returns the device identifier.
333 *
334 * @return the device identifier
335 */
336 DeviceId deviceId() {
337 return deviceId;
338 }
339
340 /**
341 * Returns the function arguments.
342 *
343 * @return the function arguments
344 */
345 byte[] args() {
346 return args;
347 }
348
349 /**
350 * Decodes and returns the function arguments.
351 *
352 * @param decoder the arguments decoder
353 * @return the decoded function arguments
354 */
355 Object[] unwrap(Function<byte[], Object[]> decoder) {
356 return decoder.apply(args());
357 }
358
359 /**
360 * Returns the set of nodes visited by this request.
361 *
362 * @return the set of nodes visited by this request
363 */
364 Set<NodeId> visited() {
365 return visited;
366 }
367
368 /**
369 * Adds the local node to the set of visited nodes for this request, returning an updated request instance.
370 *
371 * @return a new request with the local node ID added to the visited set
372 */
373 MastershipProxyRequest visit() {
374 return new MastershipProxyRequest(deviceId, args, ImmutableSet.<NodeId>builder()
375 .addAll(visited)
376 .add(localNodeId)
377 .build());
378 }
379 }
380}