blob: 4ddca9fbe91ef5c65e2617ba1ec0e885522d5387 [file] [log] [blame]
Jordan Haltermanb8cace72018-07-02 17:39:21 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.cluster.impl;
17
Ray Milkeyd84f89b2018-08-17 14:54:17 -070018import com.google.common.collect.Maps;
19import org.onlab.util.OrderedExecutor;
20import org.onosproject.cluster.NodeId;
21import org.onosproject.cluster.ProxyFactory;
22import org.onosproject.cluster.ProxyService;
23import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
24import org.onosproject.store.service.Serializer;
25import org.osgi.service.component.annotations.Activate;
26import org.osgi.service.component.annotations.Component;
27import org.osgi.service.component.annotations.Deactivate;
28import org.osgi.service.component.annotations.Reference;
29import org.osgi.service.component.annotations.ReferenceCardinality;
30import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
Jordan Haltermanb8cace72018-07-02 17:39:21 -070033import java.lang.reflect.Method;
34import java.lang.reflect.Proxy;
35import java.util.Map;
36import java.util.concurrent.CompletableFuture;
37import java.util.concurrent.CompletionException;
38import java.util.concurrent.Executor;
39import java.util.concurrent.ExecutorService;
40import java.util.concurrent.Executors;
41import java.util.function.Function;
42
Jordan Haltermanb8cace72018-07-02 17:39:21 -070043import static com.google.common.base.Preconditions.checkArgument;
44import static org.onlab.util.Tools.groupedThreads;
45
46/**
47 * Implementation of the proxy service.
48 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070049@Component(immediate = true, service = ProxyService.class)
Jordan Haltermanb8cace72018-07-02 17:39:21 -070050public class ProxyManager extends AbstractProxyManager implements ProxyService {
51
52 private final Logger log = LoggerFactory.getLogger(getClass());
53
54 private static final String MESSAGE_PREFIX = "proxy";
55
Ray Milkeyd84f89b2018-08-17 14:54:17 -070056 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Haltermanb8cace72018-07-02 17:39:21 -070057 protected ClusterCommunicationService clusterCommunicator;
58
59 private final ExecutorService proxyServiceExecutor =
60 Executors.newFixedThreadPool(
61 Math.min(Math.max(Runtime.getRuntime().availableProcessors(), 4), 16),
62 groupedThreads("onos/proxy", "service-executor", log));
63
64 private final Map<Class, ProxyService> services = Maps.newConcurrentMap();
65
66 @Activate
67 public void activate() {
68 log.info("Started");
69 }
70
71 @Deactivate
72 public void deactivate() {
73 proxyServiceExecutor.shutdownNow();
74 log.info("Stopped");
75 }
76
77 @Override
78 public <T> ProxyFactory<T> getProxyFactory(Class<T> type, Serializer serializer) {
79 checkArgument(type.isInterface(), "proxy type must be an interface");
80 return new ProxyManagerFactory<>(type, serializer);
81 }
82
83 @Override
84 public <T> void registerProxyService(Class<? super T> type, T instance, Serializer serializer) {
85 checkArgument(type.isInterface(), "proxy type must be an interface");
86 Executor executor = new OrderedExecutor(proxyServiceExecutor);
87 services.computeIfAbsent(type, t -> new ProxyService(instance, t, MESSAGE_PREFIX,
88 (i, m, o) -> new SyncOperationService(i, m, o, serializer, executor),
89 (i, m, o) -> new AsyncOperationService(i, m, o, serializer)));
90 }
91
92 @Override
93 public void unregisterProxyService(Class<?> type) {
94 ProxyService service = services.remove(type);
95 if (service != null) {
96 service.close();
97 }
98 }
99
100 /**
101 * Internal proxy factory.
102 */
103 private class ProxyManagerFactory<T> implements ProxyFactory<T> {
104 private final Class<T> type;
105 private final Serializer serializer;
106 private final Map<NodeId, T> proxyInstances = Maps.newConcurrentMap();
107
108 ProxyManagerFactory(Class<T> type, Serializer serializer) {
109 this.type = type;
110 this.serializer = serializer;
111 }
112
113 @Override
114 @SuppressWarnings("unchecked")
115 public T getProxyFor(NodeId nodeId) {
116 // Avoid unnecessary locking of computeIfAbsent if possible.
117 T proxyInstance = proxyInstances.get(nodeId);
118 if (proxyInstance != null) {
119 return proxyInstance;
120 }
121 return proxyInstances.computeIfAbsent(nodeId, id -> (T) Proxy.newProxyInstance(
122 type.getClassLoader(),
123 new Class[]{type},
124 new ProxyInvocationHandler(type, MESSAGE_PREFIX,
125 o -> new SyncOperationHandler(o, nodeId, serializer),
126 o -> new AsyncOperationHandler(o, nodeId, serializer))));
127 }
128 }
129
130 /**
131 * Implementation of the operation service which handles synchronous method calls.
132 */
133 private class SyncOperationService
134 extends OperationService
135 implements Function<Object[], Object> {
136 SyncOperationService(
137 Object instance,
138 Method method,
139 Operation operation,
140 Serializer serializer,
141 Executor executor) {
142 super(instance, method, operation);
143 clusterCommunicator.addSubscriber(
144 operation.subject(), serializer::decode, this, serializer::encode, executor);
145 }
146
147 @Override
148 public Object apply(Object[] args) {
149 return invoke(args);
150 }
151
152 @Override
153 void close() {
154 clusterCommunicator.removeSubscriber(operation.subject());
155 }
156 }
157
158 /**
159 * Implementation of the operation service which handles asynchronous method calls.
160 */
161 private class AsyncOperationService
162 extends OperationService
163 implements Function<Object[], CompletableFuture<Object>> {
164 AsyncOperationService(Object instance, Method method, Operation operation, Serializer serializer) {
165 super(instance, method, operation);
166 clusterCommunicator.addSubscriber(
167 operation.subject(), serializer::decode, this, serializer::encode);
168 }
169
170 @Override
171 public CompletableFuture<Object> apply(Object[] args) {
172 return invoke(args);
173 }
174
175 @Override
176 void close() {
177 clusterCommunicator.removeSubscriber(operation.subject());
178 }
179 }
180
181 /**
182 * Handler for synchronous proxy operations which blocks on {@code ClusterCommunicationService} requests.
183 */
184 private class SyncOperationHandler extends OperationHandler {
185 private final NodeId nodeId;
186 private final Serializer serializer;
187
188 SyncOperationHandler(Operation operation, NodeId nodeId, Serializer serializer) {
189 super(operation);
190 this.nodeId = nodeId;
191 this.serializer = serializer;
192 }
193
194 @Override
195 public Object apply(Object[] args) {
196 try {
197 return clusterCommunicator.sendAndReceive(
198 args, operation.subject(), serializer::encode, serializer::decode, nodeId)
199 .join();
200 } catch (CompletionException e) {
201 if (e.getCause() instanceof RuntimeException) {
202 throw (RuntimeException) e.getCause();
203 } else {
204 throw new IllegalStateException(e.getCause());
205 }
206 }
207 }
208 }
209
210 /**
211 * Handler for asynchronous proxy operations which uses async {@code ClusterCommunicationService} requests.
212 */
213 private class AsyncOperationHandler extends OperationHandler {
214 private final NodeId nodeId;
215 private final Serializer serializer;
216
217 AsyncOperationHandler(Operation operation, NodeId nodeId, Serializer serializer) {
218 super(operation);
219 this.nodeId = nodeId;
220 this.serializer = serializer;
221 }
222
223 @Override
224 public Object apply(Object[] args) {
225 return clusterCommunicator.sendAndReceive(
226 args, operation.subject(), serializer::encode, serializer::decode, nodeId);
227 }
228 }
229}