blob: 911b4656b005da850deaac21174c9e09eaa6ad37 [file] [log] [blame]
Jordan Haltermanb8cace72018-07-02 17:39:21 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.cluster.impl;
17
18import java.lang.reflect.InvocationHandler;
19import java.lang.reflect.InvocationTargetException;
20import java.lang.reflect.Method;
21import java.util.HashMap;
22import java.util.Map;
23import java.util.concurrent.CompletableFuture;
24import java.util.function.Function;
25import java.util.stream.Collectors;
26
27import com.google.common.collect.Maps;
28import org.onosproject.store.cluster.messaging.MessageSubject;
29
30/**
31 * Implementation of the proxy service.
32 */
33public abstract class AbstractProxyManager {
34
35 /**
36 * Wrapper for a proxy service which handles registration of proxy methods as {@code ClusterCommunicationService}
37 * subscribers.
38 */
39 class ProxyService {
40 private final Map<Method, OperationService> operations = Maps.newConcurrentMap();
41
42 ProxyService(
43 Object instance,
44 Class<?> type,
45 String prefix,
46 OperationServiceFactory syncServiceFactory,
47 OperationServiceFactory asyncServiceFactory) {
48 operations.putAll(getMethodMap(type, prefix).entrySet().stream()
49 .map(entry -> {
50 if (entry.getValue().type() == Operation.Type.SYNC) {
51 return Maps.immutableEntry(
52 entry.getKey(),
53 syncServiceFactory.create(instance, entry.getKey(), entry.getValue()));
54 } else {
55 return Maps.immutableEntry(
56 entry.getKey(),
57 asyncServiceFactory.create(instance, entry.getKey(), entry.getValue()));
58 }
59 }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
60 }
61
62 /**
63 * Closes the proxy service.
64 */
65 void close() {
66 operations.values().forEach(operation -> operation.close());
67 }
68 }
69
70 /**
71 * Operation service factory.
72 */
73 @FunctionalInterface
74 interface OperationServiceFactory {
75 OperationService create(Object instance, Method method, Operation operation);
76 }
77
78 /**
79 * Wrapper for a single proxy service operation which handles registration of subscribers and invocation
80 * of service instance methods.
81 */
82 abstract class OperationService {
83 protected final Object instance;
84 protected final Method method;
85 protected final Operation operation;
86
87 OperationService(Object instance, Method method, Operation operation) {
88 this.instance = instance;
89 this.method = method;
90 this.operation = operation;
91 }
92
93 /**
94 * Invokes the method with the given arguments.
95 *
96 * @param args the arguments with which to invoke the operation
97 * @param <T> the operation return type
98 * @return the operation return value
99 */
100 @SuppressWarnings("unchecked")
101 <T> T invoke(Object[] args) {
102 try {
103 return (T) method.invoke(instance, args);
104 } catch (IllegalAccessException | InvocationTargetException e) {
105 throw new IllegalStateException(e);
106 }
107 }
108
109 /**
110 * Closes the operation service.
111 */
112 abstract void close();
113 }
114
115 /**
116 * Proxy invocation handler which routes proxy method calls to the correct node and subscriber via
117 * {@code ClusterCommunicationService}.
118 */
119 class ProxyInvocationHandler implements InvocationHandler {
120 private final Map<Method, OperationHandler> handlers = Maps.newConcurrentMap();
121
122 ProxyInvocationHandler(
123 Class<?> type,
124 String prefix,
125 OperationHandlerFactory syncHandlerFactory,
126 OperationHandlerFactory asyncHandlerFactory) {
127 handlers.putAll(getMethodMap(type, prefix).entrySet().stream()
128 .map(entry -> {
129 if (entry.getValue().type() == Operation.Type.SYNC) {
130 return Maps.immutableEntry(entry.getKey(), syncHandlerFactory.create(entry.getValue()));
131 } else {
132 return Maps.immutableEntry(entry.getKey(), asyncHandlerFactory.create(entry.getValue()));
133 }
134 }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
135 }
136
137 @Override
138 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
139 OperationHandler handler = handlers.get(method);
140 if (handler == null) {
141 throw new IllegalStateException("Unknown proxy operation " + method.getName());
142 }
143 return handler.apply(args);
144 }
145 }
146
147 /**
148 * Operation handler factory.
149 */
150 @FunctionalInterface
151 interface OperationHandlerFactory {
152 OperationHandler create(Operation operation);
153 }
154
155 /**
156 * Invocation handler for an individual proxy operation.
157 */
158 abstract class OperationHandler implements Function<Object[], Object> {
159 protected final Operation operation;
160
161 OperationHandler(Operation operation) {
162 this.operation = operation;
163 }
164 }
165
166 /**
167 * Recursively finds operations defined by the given type and its implemented interfaces.
168 *
169 * @param type the type for which to find operations
170 * @param prefix the prefix with which to generate message subjects
171 * @return the operations defined by the given type and its parent interfaces
172 */
173 Map<Method, Operation> getMethodMap(Class<?> type, String prefix) {
174 String service = type.getCanonicalName().replace(".", "-");
175 Map<Method, Operation> methods = new HashMap<>();
176 for (Method method : type.getDeclaredMethods()) {
177 String name = method.getName();
178 if (methods.values().stream().anyMatch(op -> op.name.equals(name))) {
179 throw new IllegalArgumentException("Method " + name + " is ambiguous");
180 }
181
182 Class<?> returnType = method.getReturnType();
183 if (CompletableFuture.class.isAssignableFrom(returnType)) {
184 methods.put(method, new Operation(Operation.Type.ASYNC, prefix, service, name, method));
185 } else {
186 methods.put(method, new Operation(Operation.Type.SYNC, prefix, service, name, method));
187 }
188 }
189 for (Class<?> iface : type.getInterfaces()) {
190 methods.putAll(getMethodMap(iface, prefix));
191 }
192 return methods;
193 }
194
195 /**
196 * Simple data class for proxy operation metadata.
197 */
198 static class Operation {
199
200 /**
201 * Operation type.
202 */
203 enum Type {
204 SYNC,
205 ASYNC,
206 }
207
208 private final Type type;
209 private final String service;
210 private final String name;
211 private final Method method;
212 private final MessageSubject subject;
213
214 Operation(Type type, String prefix, String service, String name, Method method) {
215 this.type = type;
216 this.service = service;
217 this.name = name;
218 this.method = method;
219 this.subject = new MessageSubject(String.format("%s-%s-%s", prefix, service, name));
220 }
221
222 /**
223 * Returns the operation type.
224 *
225 * @return the operation type
226 */
227 Type type() {
228 return type;
229 }
230
231 /**
232 * Returns the service name of the service to which this operation belongs.
233 *
234 * @return the service name of the service to which this operation belongs
235 */
236 String service() {
237 return service;
238 }
239
240 /**
241 * Returns the operation name.
242 *
243 * @return the operation name
244 */
245 String name() {
246 return name;
247 }
248
249 /**
250 * Returns the operation method.
251 *
252 * @return the operation method
253 */
254 Method method() {
255 return method;
256 }
257
258 /**
259 * Returns the operation message subject.
260 *
261 * @return the operation message subject
262 */
263 MessageSubject subject() {
264 return subject;
265 }
266 }
267}