blob: 96cbe3a06d62ff86ee942043a5edc2ac6d7a24b8 [file] [log] [blame]
Jordan Haltermand04e3442018-07-02 17:40:07 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.cluster.impl;
17
18import java.lang.reflect.Method;
19import java.lang.reflect.Proxy;
20import java.util.Map;
21import java.util.Objects;
22import java.util.Set;
23import java.util.concurrent.CompletableFuture;
24import java.util.concurrent.CompletionException;
25import java.util.concurrent.Executor;
26import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Executors;
28import java.util.function.Function;
29
30import com.google.common.collect.ImmutableSet;
31import com.google.common.collect.Maps;
32import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
34import org.apache.felix.scr.annotations.Deactivate;
35import org.apache.felix.scr.annotations.Reference;
36import org.apache.felix.scr.annotations.ReferenceCardinality;
37import org.apache.felix.scr.annotations.Service;
38import org.onlab.util.OrderedExecutor;
39import org.onlab.util.Tools;
40import org.onosproject.cluster.ClusterService;
41import org.onosproject.cluster.NodeId;
42import org.onosproject.mastership.MastershipProxyFactory;
43import org.onosproject.mastership.MastershipProxyService;
44import org.onosproject.mastership.MastershipService;
45import org.onosproject.net.DeviceId;
46import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
47import org.onosproject.store.serializers.KryoNamespaces;
48import org.onosproject.store.service.Serializer;
49import org.slf4j.Logger;
50import org.slf4j.LoggerFactory;
51
52import static com.google.common.base.Preconditions.checkArgument;
53import static org.onlab.util.Tools.groupedThreads;
54
55/**
56 * Mastership proxy service implementation.
57 * <p>
58 * This implementation wraps both the proxy service and the generated proxy instance in additional proxies which check
59 * mastership and route calls to the appropriate proxy instances.
60 */
61@Component(immediate = true)
62@Service
63public class MastershipProxyManager extends AbstractProxyManager implements MastershipProxyService {
64
65 private static final Serializer REQUEST_SERIALIZER =
66 Serializer.using(KryoNamespaces.API, MastershipProxyRequest.class);
67 private static final String MESSAGE_PREFIX = "mastership-proxy";
68
69 private final Logger log = LoggerFactory.getLogger(getClass());
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected MastershipService mastershipService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected ClusterService clusterService;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected ClusterCommunicationService clusterCommunicator;
79
80 private final ExecutorService proxyServiceExecutor =
81 Executors.newFixedThreadPool(8, groupedThreads("onos/proxy", "service-executor", log));
82
83 private final Map<Class, ProxyService> services = Maps.newConcurrentMap();
84 private NodeId localNodeId;
85
86 @Activate
87 public void activate() {
88 this.localNodeId = clusterService.getLocalNode().id();
89 log.info("Started");
90 }
91
92 @Deactivate
93 public void deactivate() {
94 proxyServiceExecutor.shutdownNow();
95 log.info("Stopped");
96 }
97
98 @Override
99 public <T> MastershipProxyFactory<T> getProxyFactory(Class<T> type, Serializer serializer) {
100 checkArgument(type.isInterface(), "proxy type must be an interface");
101 return new MastershipProxyManagerFactory<>(type, serializer);
102 }
103
104 @Override
105 public <T> void registerProxyService(Class<? super T> type, T instance, Serializer serializer) {
106 checkArgument(type.isInterface(), "proxy type must be an interface");
107 Executor executor = new OrderedExecutor(proxyServiceExecutor);
108 services.computeIfAbsent(type, t -> new ProxyService(instance, t, MESSAGE_PREFIX,
109 (i, m, o) -> new SyncOperationService(i, m, o, serializer, executor),
110 (i, m, o) -> new AsyncOperationService(i, m, o, serializer)));
111 }
112
113 @Override
114 public void unregisterProxyService(Class<?> type) {
115 ProxyService service = services.remove(type);
116 if (service != null) {
117 service.close();
118 }
119 }
120
121 /**
122 * Internal proxy factory.
123 */
124 private class MastershipProxyManagerFactory<T> implements MastershipProxyFactory<T> {
125 private final Class<T> type;
126 private final Serializer serializer;
127 private final Map<DeviceId, T> proxyInstances = Maps.newConcurrentMap();
128
129 MastershipProxyManagerFactory(Class<T> type, Serializer serializer) {
130 this.type = type;
131 this.serializer = serializer;
132 }
133
134 @Override
135 @SuppressWarnings("unchecked")
136 public T getProxyFor(DeviceId deviceId) {
137 // Avoid unnecessary locking of computeIfAbsent if possible.
138 T proxyInstance = proxyInstances.get(deviceId);
139 if (proxyInstance != null) {
140 return proxyInstance;
141 }
142 return proxyInstances.computeIfAbsent(deviceId, id -> (T) Proxy.newProxyInstance(
143 type.getClassLoader(),
144 new Class[]{type},
145 new ProxyInvocationHandler(type, MESSAGE_PREFIX,
146 o -> new SyncOperationHandler(o, deviceId, serializer),
147 o -> new AsyncOperationHandler(o, deviceId, serializer))));
148 }
149 }
150
151 /**
152 * Implementation of the operation service which handles synchronous method calls.
153 */
154 private class SyncOperationService
155 extends OperationService
156 implements Function<MastershipProxyRequest, Object> {
157
158 private final Serializer serializer;
159
160 SyncOperationService(
161 Object instance,
162 Method method,
163 Operation operation,
164 Serializer serializer,
165 Executor executor) {
166 super(instance, method, operation);
167 this.serializer = serializer;
168 clusterCommunicator.addSubscriber(
169 operation.subject(), REQUEST_SERIALIZER::decode, this, REQUEST_SERIALIZER::encode, executor);
170 }
171
172 @Override
173 public Object apply(MastershipProxyRequest request) {
174 NodeId master = mastershipService.getMasterFor(request.deviceId());
175 if (master == null) {
176 throw new IllegalStateException("No master found for device " + request.deviceId());
177 } else if (!Objects.equals(master, localNodeId)) {
178 // If the given node has already been visited by this node, that indicates a cycle in the intra-cluster
179 // communication. Reject the request.
180 if (request.visited().contains(localNodeId)) {
181 throw new IllegalStateException("Ambiguous master for device " + request.deviceId());
182 }
183
184 // If this node is being visited for the first time, update the request with the local node ID
185 // to prevent cyclic communication.
186 return clusterCommunicator.sendAndReceive(
187 request.visit(),
188 operation.subject(),
189 REQUEST_SERIALIZER::encode,
190 REQUEST_SERIALIZER::decode,
191 master);
192 } else {
193 // Unwrap the raw arguments and apply the method call to the registered proxy service.
194 return invoke(request.unwrap(serializer::decode));
195 }
196 }
197
198 @Override
199 void close() {
200 clusterCommunicator.removeSubscriber(operation.subject());
201 }
202 }
203
204 /**
205 * Implementation of the operation service which handles asynchronous method calls.
206 */
207 private class AsyncOperationService
208 extends OperationService
209 implements Function<MastershipProxyRequest, CompletableFuture<Object>> {
210
211 private final Serializer serializer;
212
213 AsyncOperationService(Object instance, Method method, Operation operation, Serializer serializer) {
214 super(instance, method, operation);
215 this.serializer = serializer;
216 clusterCommunicator.addSubscriber(
217 operation.subject(), REQUEST_SERIALIZER::decode, this, REQUEST_SERIALIZER::encode);
218 }
219
220 @Override
221 public CompletableFuture<Object> apply(MastershipProxyRequest request) {
222 NodeId master = mastershipService.getMasterFor(request.deviceId());
223 if (master == null) {
224 return Tools.exceptionalFuture(
225 new IllegalStateException("No master found for device " + request.deviceId()));
226 } else if (!Objects.equals(master, localNodeId)) {
227 // If the given node has already been visited by this node, that indicates a cycle in the intra-cluster
228 // communication. Reject the request.
229 if (request.visited().contains(localNodeId)) {
230 return Tools.exceptionalFuture(
231 new IllegalStateException("Ambiguous master for device " + request.deviceId()));
232 }
233
234 // If this node is being visited for the first time, update the request with the local node ID
235 // to prevent cyclic communication.
236 return clusterCommunicator.sendAndReceive(
237 request.visit(),
238 operation.subject(),
239 REQUEST_SERIALIZER::encode,
240 REQUEST_SERIALIZER::decode,
241 master);
242 } else {
243 // Unwrap the raw arguments and apply the method call to the registered proxy service.
244 return invoke(request.unwrap(serializer::decode));
245 }
246 }
247
248 @Override
249 void close() {
250 clusterCommunicator.removeSubscriber(operation.subject());
251 }
252 }
253
254 /**
255 * Handler for synchronous proxy operations which blocks on {@code ClusterCommunicationService} requests.
256 */
257 private class SyncOperationHandler extends OperationHandler {
258 private final DeviceId deviceId;
259 private final Serializer serializer;
260
261 SyncOperationHandler(Operation operation, DeviceId deviceId, Serializer serializer) {
262 super(operation);
263 this.deviceId = deviceId;
264 this.serializer = serializer;
265 }
266
267 @Override
268 public Object apply(Object[] args) {
269 NodeId master = mastershipService.getMasterFor(deviceId);
270 if (master == null) {
271 throw new IllegalStateException("No master found for device " + deviceId);
272 }
273
274 MastershipProxyRequest request = new MastershipProxyRequest(deviceId, serializer.encode(args));
275 try {
276 return clusterCommunicator.sendAndReceive(
277 request, operation.subject(), REQUEST_SERIALIZER::encode, REQUEST_SERIALIZER::decode, master)
278 .join();
279 } catch (CompletionException e) {
280 if (e.getCause() instanceof RuntimeException) {
281 throw (RuntimeException) e.getCause();
282 } else {
283 throw new IllegalStateException(e.getCause());
284 }
285 }
286 }
287 }
288
289 /**
290 * Handler for asynchronous proxy operations which uses async {@code ClusterCommunicationService} requests.
291 */
292 private class AsyncOperationHandler extends OperationHandler {
293 private final DeviceId deviceId;
294 private final Serializer serializer;
295
296 AsyncOperationHandler(Operation operation, DeviceId deviceId, Serializer serializer) {
297 super(operation);
298 this.deviceId = deviceId;
299 this.serializer = serializer;
300 }
301
302 @Override
303 public Object apply(Object[] args) {
304 NodeId master = mastershipService.getMasterFor(deviceId);
305 if (master == null) {
306 return Tools.exceptionalFuture(new IllegalStateException("No master found for device " + deviceId));
307 }
308
309 MastershipProxyRequest request = new MastershipProxyRequest(deviceId, serializer.encode(args));
310 return clusterCommunicator.sendAndReceive(
311 request, operation.subject(), REQUEST_SERIALIZER::encode, REQUEST_SERIALIZER::decode, master);
312 }
313 }
314
315 /**
316 * Internal arguments wrapper that contains the {@link DeviceId} for mastership checks.
317 */
318 private class MastershipProxyRequest {
319 private final DeviceId deviceId;
320 private final byte[] args;
321 private final Set<NodeId> visited;
322
323 MastershipProxyRequest(DeviceId deviceId, byte[] args) {
324 this(deviceId, args, ImmutableSet.of(localNodeId));
325 }
326
327 MastershipProxyRequest(DeviceId deviceId, byte[] args, Set<NodeId> visited) {
328 this.deviceId = deviceId;
329 this.args = args;
330 this.visited = visited;
331 }
332
333 /**
334 * Returns the device identifier.
335 *
336 * @return the device identifier
337 */
338 DeviceId deviceId() {
339 return deviceId;
340 }
341
342 /**
343 * Returns the function arguments.
344 *
345 * @return the function arguments
346 */
347 byte[] args() {
348 return args;
349 }
350
351 /**
352 * Decodes and returns the function arguments.
353 *
354 * @param decoder the arguments decoder
355 * @return the decoded function arguments
356 */
357 Object[] unwrap(Function<byte[], Object[]> decoder) {
358 return decoder.apply(args());
359 }
360
361 /**
362 * Returns the set of nodes visited by this request.
363 *
364 * @return the set of nodes visited by this request
365 */
366 Set<NodeId> visited() {
367 return visited;
368 }
369
370 /**
371 * Adds the local node to the set of visited nodes for this request, returning an updated request instance.
372 *
373 * @return a new request with the local node ID added to the visited set
374 */
375 MastershipProxyRequest visit() {
376 return new MastershipProxyRequest(deviceId, args, ImmutableSet.<NodeId>builder()
377 .addAll(visited)
378 .add(localNodeId)
379 .build());
380 }
381 }
382}