blob: 945172a1ae36d09a81a31607050f483def2f2b8a [file] [log] [blame]
Jordan Haltermanb8cace72018-07-02 17:39:21 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.cluster.impl;
17
18import java.lang.reflect.Method;
19import java.lang.reflect.Proxy;
20import java.util.Map;
21import java.util.concurrent.CompletableFuture;
22import java.util.concurrent.CompletionException;
23import java.util.concurrent.Executor;
24import java.util.concurrent.ExecutorService;
25import java.util.concurrent.Executors;
26import java.util.function.Function;
27
28import com.google.common.collect.Maps;
29import org.apache.felix.scr.annotations.Activate;
30import org.apache.felix.scr.annotations.Component;
31import org.apache.felix.scr.annotations.Deactivate;
32import org.apache.felix.scr.annotations.Reference;
33import org.apache.felix.scr.annotations.ReferenceCardinality;
34import org.apache.felix.scr.annotations.Service;
35import org.onlab.util.OrderedExecutor;
36import org.onosproject.cluster.NodeId;
37import org.onosproject.cluster.ProxyFactory;
38import org.onosproject.cluster.ProxyService;
39import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
40import org.onosproject.store.service.Serializer;
41import org.slf4j.Logger;
42import org.slf4j.LoggerFactory;
43
44import static com.google.common.base.Preconditions.checkArgument;
45import static org.onlab.util.Tools.groupedThreads;
46
47/**
48 * Implementation of the proxy service.
49 */
50@Component(immediate = true)
51@Service
52public class ProxyManager extends AbstractProxyManager implements ProxyService {
53
54 private final Logger log = LoggerFactory.getLogger(getClass());
55
56 private static final String MESSAGE_PREFIX = "proxy";
57
58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
59 protected ClusterCommunicationService clusterCommunicator;
60
61 private final ExecutorService proxyServiceExecutor =
62 Executors.newFixedThreadPool(
63 Math.min(Math.max(Runtime.getRuntime().availableProcessors(), 4), 16),
64 groupedThreads("onos/proxy", "service-executor", log));
65
66 private final Map<Class, ProxyService> services = Maps.newConcurrentMap();
67
68 @Activate
69 public void activate() {
70 log.info("Started");
71 }
72
73 @Deactivate
74 public void deactivate() {
75 proxyServiceExecutor.shutdownNow();
76 log.info("Stopped");
77 }
78
79 @Override
80 public <T> ProxyFactory<T> getProxyFactory(Class<T> type, Serializer serializer) {
81 checkArgument(type.isInterface(), "proxy type must be an interface");
82 return new ProxyManagerFactory<>(type, serializer);
83 }
84
85 @Override
86 public <T> void registerProxyService(Class<? super T> type, T instance, Serializer serializer) {
87 checkArgument(type.isInterface(), "proxy type must be an interface");
88 Executor executor = new OrderedExecutor(proxyServiceExecutor);
89 services.computeIfAbsent(type, t -> new ProxyService(instance, t, MESSAGE_PREFIX,
90 (i, m, o) -> new SyncOperationService(i, m, o, serializer, executor),
91 (i, m, o) -> new AsyncOperationService(i, m, o, serializer)));
92 }
93
94 @Override
95 public void unregisterProxyService(Class<?> type) {
96 ProxyService service = services.remove(type);
97 if (service != null) {
98 service.close();
99 }
100 }
101
102 /**
103 * Internal proxy factory.
104 */
105 private class ProxyManagerFactory<T> implements ProxyFactory<T> {
106 private final Class<T> type;
107 private final Serializer serializer;
108 private final Map<NodeId, T> proxyInstances = Maps.newConcurrentMap();
109
110 ProxyManagerFactory(Class<T> type, Serializer serializer) {
111 this.type = type;
112 this.serializer = serializer;
113 }
114
115 @Override
116 @SuppressWarnings("unchecked")
117 public T getProxyFor(NodeId nodeId) {
118 // Avoid unnecessary locking of computeIfAbsent if possible.
119 T proxyInstance = proxyInstances.get(nodeId);
120 if (proxyInstance != null) {
121 return proxyInstance;
122 }
123 return proxyInstances.computeIfAbsent(nodeId, id -> (T) Proxy.newProxyInstance(
124 type.getClassLoader(),
125 new Class[]{type},
126 new ProxyInvocationHandler(type, MESSAGE_PREFIX,
127 o -> new SyncOperationHandler(o, nodeId, serializer),
128 o -> new AsyncOperationHandler(o, nodeId, serializer))));
129 }
130 }
131
132 /**
133 * Implementation of the operation service which handles synchronous method calls.
134 */
135 private class SyncOperationService
136 extends OperationService
137 implements Function<Object[], Object> {
138 SyncOperationService(
139 Object instance,
140 Method method,
141 Operation operation,
142 Serializer serializer,
143 Executor executor) {
144 super(instance, method, operation);
145 clusterCommunicator.addSubscriber(
146 operation.subject(), serializer::decode, this, serializer::encode, executor);
147 }
148
149 @Override
150 public Object apply(Object[] args) {
151 return invoke(args);
152 }
153
154 @Override
155 void close() {
156 clusterCommunicator.removeSubscriber(operation.subject());
157 }
158 }
159
160 /**
161 * Implementation of the operation service which handles asynchronous method calls.
162 */
163 private class AsyncOperationService
164 extends OperationService
165 implements Function<Object[], CompletableFuture<Object>> {
166 AsyncOperationService(Object instance, Method method, Operation operation, Serializer serializer) {
167 super(instance, method, operation);
168 clusterCommunicator.addSubscriber(
169 operation.subject(), serializer::decode, this, serializer::encode);
170 }
171
172 @Override
173 public CompletableFuture<Object> apply(Object[] args) {
174 return invoke(args);
175 }
176
177 @Override
178 void close() {
179 clusterCommunicator.removeSubscriber(operation.subject());
180 }
181 }
182
183 /**
184 * Handler for synchronous proxy operations which blocks on {@code ClusterCommunicationService} requests.
185 */
186 private class SyncOperationHandler extends OperationHandler {
187 private final NodeId nodeId;
188 private final Serializer serializer;
189
190 SyncOperationHandler(Operation operation, NodeId nodeId, Serializer serializer) {
191 super(operation);
192 this.nodeId = nodeId;
193 this.serializer = serializer;
194 }
195
196 @Override
197 public Object apply(Object[] args) {
198 try {
199 return clusterCommunicator.sendAndReceive(
200 args, operation.subject(), serializer::encode, serializer::decode, nodeId)
201 .join();
202 } catch (CompletionException e) {
203 if (e.getCause() instanceof RuntimeException) {
204 throw (RuntimeException) e.getCause();
205 } else {
206 throw new IllegalStateException(e.getCause());
207 }
208 }
209 }
210 }
211
212 /**
213 * Handler for asynchronous proxy operations which uses async {@code ClusterCommunicationService} requests.
214 */
215 private class AsyncOperationHandler extends OperationHandler {
216 private final NodeId nodeId;
217 private final Serializer serializer;
218
219 AsyncOperationHandler(Operation operation, NodeId nodeId, Serializer serializer) {
220 super(operation);
221 this.nodeId = nodeId;
222 this.serializer = serializer;
223 }
224
225 @Override
226 public Object apply(Object[] args) {
227 return clusterCommunicator.sendAndReceive(
228 args, operation.subject(), serializer::encode, serializer::decode, nodeId);
229 }
230 }
231}