blob: 96cbe3a06d62ff86ee942043a5edc2ac6d7a24b8 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster.impl;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.OrderedExecutor;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipProxyFactory;
import org.onosproject.mastership.MastershipProxyService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.util.Tools.groupedThreads;
/**
* Mastership proxy service implementation.
* <p>
* This implementation wraps both the proxy service and the generated proxy instance in additional proxies which check
* mastership and route calls to the appropriate proxy instances.
*/
@Component(immediate = true)
@Service
public class MastershipProxyManager extends AbstractProxyManager implements MastershipProxyService {
private static final Serializer REQUEST_SERIALIZER =
Serializer.using(KryoNamespaces.API, MastershipProxyRequest.class);
private static final String MESSAGE_PREFIX = "mastership-proxy";
private final Logger log = LoggerFactory.getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
private final ExecutorService proxyServiceExecutor =
Executors.newFixedThreadPool(8, groupedThreads("onos/proxy", "service-executor", log));
private final Map<Class, ProxyService> services = Maps.newConcurrentMap();
private NodeId localNodeId;
@Activate
public void activate() {
this.localNodeId = clusterService.getLocalNode().id();
log.info("Started");
}
@Deactivate
public void deactivate() {
proxyServiceExecutor.shutdownNow();
log.info("Stopped");
}
@Override
public <T> MastershipProxyFactory<T> getProxyFactory(Class<T> type, Serializer serializer) {
checkArgument(type.isInterface(), "proxy type must be an interface");
return new MastershipProxyManagerFactory<>(type, serializer);
}
@Override
public <T> void registerProxyService(Class<? super T> type, T instance, Serializer serializer) {
checkArgument(type.isInterface(), "proxy type must be an interface");
Executor executor = new OrderedExecutor(proxyServiceExecutor);
services.computeIfAbsent(type, t -> new ProxyService(instance, t, MESSAGE_PREFIX,
(i, m, o) -> new SyncOperationService(i, m, o, serializer, executor),
(i, m, o) -> new AsyncOperationService(i, m, o, serializer)));
}
@Override
public void unregisterProxyService(Class<?> type) {
ProxyService service = services.remove(type);
if (service != null) {
service.close();
}
}
/**
* Internal proxy factory.
*/
private class MastershipProxyManagerFactory<T> implements MastershipProxyFactory<T> {
private final Class<T> type;
private final Serializer serializer;
private final Map<DeviceId, T> proxyInstances = Maps.newConcurrentMap();
MastershipProxyManagerFactory(Class<T> type, Serializer serializer) {
this.type = type;
this.serializer = serializer;
}
@Override
@SuppressWarnings("unchecked")
public T getProxyFor(DeviceId deviceId) {
// Avoid unnecessary locking of computeIfAbsent if possible.
T proxyInstance = proxyInstances.get(deviceId);
if (proxyInstance != null) {
return proxyInstance;
}
return proxyInstances.computeIfAbsent(deviceId, id -> (T) Proxy.newProxyInstance(
type.getClassLoader(),
new Class[]{type},
new ProxyInvocationHandler(type, MESSAGE_PREFIX,
o -> new SyncOperationHandler(o, deviceId, serializer),
o -> new AsyncOperationHandler(o, deviceId, serializer))));
}
}
/**
* Implementation of the operation service which handles synchronous method calls.
*/
private class SyncOperationService
extends OperationService
implements Function<MastershipProxyRequest, Object> {
private final Serializer serializer;
SyncOperationService(
Object instance,
Method method,
Operation operation,
Serializer serializer,
Executor executor) {
super(instance, method, operation);
this.serializer = serializer;
clusterCommunicator.addSubscriber(
operation.subject(), REQUEST_SERIALIZER::decode, this, REQUEST_SERIALIZER::encode, executor);
}
@Override
public Object apply(MastershipProxyRequest request) {
NodeId master = mastershipService.getMasterFor(request.deviceId());
if (master == null) {
throw new IllegalStateException("No master found for device " + request.deviceId());
} else if (!Objects.equals(master, localNodeId)) {
// If the given node has already been visited by this node, that indicates a cycle in the intra-cluster
// communication. Reject the request.
if (request.visited().contains(localNodeId)) {
throw new IllegalStateException("Ambiguous master for device " + request.deviceId());
}
// If this node is being visited for the first time, update the request with the local node ID
// to prevent cyclic communication.
return clusterCommunicator.sendAndReceive(
request.visit(),
operation.subject(),
REQUEST_SERIALIZER::encode,
REQUEST_SERIALIZER::decode,
master);
} else {
// Unwrap the raw arguments and apply the method call to the registered proxy service.
return invoke(request.unwrap(serializer::decode));
}
}
@Override
void close() {
clusterCommunicator.removeSubscriber(operation.subject());
}
}
/**
* Implementation of the operation service which handles asynchronous method calls.
*/
private class AsyncOperationService
extends OperationService
implements Function<MastershipProxyRequest, CompletableFuture<Object>> {
private final Serializer serializer;
AsyncOperationService(Object instance, Method method, Operation operation, Serializer serializer) {
super(instance, method, operation);
this.serializer = serializer;
clusterCommunicator.addSubscriber(
operation.subject(), REQUEST_SERIALIZER::decode, this, REQUEST_SERIALIZER::encode);
}
@Override
public CompletableFuture<Object> apply(MastershipProxyRequest request) {
NodeId master = mastershipService.getMasterFor(request.deviceId());
if (master == null) {
return Tools.exceptionalFuture(
new IllegalStateException("No master found for device " + request.deviceId()));
} else if (!Objects.equals(master, localNodeId)) {
// If the given node has already been visited by this node, that indicates a cycle in the intra-cluster
// communication. Reject the request.
if (request.visited().contains(localNodeId)) {
return Tools.exceptionalFuture(
new IllegalStateException("Ambiguous master for device " + request.deviceId()));
}
// If this node is being visited for the first time, update the request with the local node ID
// to prevent cyclic communication.
return clusterCommunicator.sendAndReceive(
request.visit(),
operation.subject(),
REQUEST_SERIALIZER::encode,
REQUEST_SERIALIZER::decode,
master);
} else {
// Unwrap the raw arguments and apply the method call to the registered proxy service.
return invoke(request.unwrap(serializer::decode));
}
}
@Override
void close() {
clusterCommunicator.removeSubscriber(operation.subject());
}
}
/**
* Handler for synchronous proxy operations which blocks on {@code ClusterCommunicationService} requests.
*/
private class SyncOperationHandler extends OperationHandler {
private final DeviceId deviceId;
private final Serializer serializer;
SyncOperationHandler(Operation operation, DeviceId deviceId, Serializer serializer) {
super(operation);
this.deviceId = deviceId;
this.serializer = serializer;
}
@Override
public Object apply(Object[] args) {
NodeId master = mastershipService.getMasterFor(deviceId);
if (master == null) {
throw new IllegalStateException("No master found for device " + deviceId);
}
MastershipProxyRequest request = new MastershipProxyRequest(deviceId, serializer.encode(args));
try {
return clusterCommunicator.sendAndReceive(
request, operation.subject(), REQUEST_SERIALIZER::encode, REQUEST_SERIALIZER::decode, master)
.join();
} catch (CompletionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new IllegalStateException(e.getCause());
}
}
}
}
/**
* Handler for asynchronous proxy operations which uses async {@code ClusterCommunicationService} requests.
*/
private class AsyncOperationHandler extends OperationHandler {
private final DeviceId deviceId;
private final Serializer serializer;
AsyncOperationHandler(Operation operation, DeviceId deviceId, Serializer serializer) {
super(operation);
this.deviceId = deviceId;
this.serializer = serializer;
}
@Override
public Object apply(Object[] args) {
NodeId master = mastershipService.getMasterFor(deviceId);
if (master == null) {
return Tools.exceptionalFuture(new IllegalStateException("No master found for device " + deviceId));
}
MastershipProxyRequest request = new MastershipProxyRequest(deviceId, serializer.encode(args));
return clusterCommunicator.sendAndReceive(
request, operation.subject(), REQUEST_SERIALIZER::encode, REQUEST_SERIALIZER::decode, master);
}
}
/**
* Internal arguments wrapper that contains the {@link DeviceId} for mastership checks.
*/
private class MastershipProxyRequest {
private final DeviceId deviceId;
private final byte[] args;
private final Set<NodeId> visited;
MastershipProxyRequest(DeviceId deviceId, byte[] args) {
this(deviceId, args, ImmutableSet.of(localNodeId));
}
MastershipProxyRequest(DeviceId deviceId, byte[] args, Set<NodeId> visited) {
this.deviceId = deviceId;
this.args = args;
this.visited = visited;
}
/**
* Returns the device identifier.
*
* @return the device identifier
*/
DeviceId deviceId() {
return deviceId;
}
/**
* Returns the function arguments.
*
* @return the function arguments
*/
byte[] args() {
return args;
}
/**
* Decodes and returns the function arguments.
*
* @param decoder the arguments decoder
* @return the decoded function arguments
*/
Object[] unwrap(Function<byte[], Object[]> decoder) {
return decoder.apply(args());
}
/**
* Returns the set of nodes visited by this request.
*
* @return the set of nodes visited by this request
*/
Set<NodeId> visited() {
return visited;
}
/**
* Adds the local node to the set of visited nodes for this request, returning an updated request instance.
*
* @return a new request with the local node ID added to the visited set
*/
MastershipProxyRequest visit() {
return new MastershipProxyRequest(deviceId, args, ImmutableSet.<NodeId>builder()
.addAll(visited)
.add(localNodeId)
.build());
}
}
}