blob: 033e0b8b9b244bc01333d029259c27436533151f [file] [log] [blame]
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.flowext.impl;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.FlowRuleBatchEntry;
import org.onosproject.net.flow.FlowRuleBatchEvent;
import org.onosproject.net.flow.FlowRuleBatchRequest;
import org.onosproject.net.flowext.DefaultFlowRuleExt;
import org.onosproject.net.flowext.DownStreamFlowEntry;
import org.onosproject.net.flowext.FlowExtCompletedOperation;
import org.onosproject.net.flowext.FlowRuleExtRouter;
import org.onosproject.net.flowext.FlowRuleExtRouterListener;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.serializers.DecodeTo;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.onlab.util.Tools.namedThreads;
import static org.onosproject.store.flowext.impl.FlowExtRouterMessageSubjects.APPLY_EXTEND_FLOWS;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Experimental extension to the flow rule subsystem; still under development.
* Implement a simple routing-like mechanism to directly send service data to its master and push to device.
* This Router does not save any flow rule extension data in cache, it focus on routing mechanism.
*/
@Component(immediate = false)
@Service
public class DefaultFlowRuleExtRouter
implements FlowRuleExtRouter {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ReplicaInfoService replicaInfoManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
private int pendingFutureTimeoutMinutes = 5;
protected Set<FlowRuleExtRouterListener> routerListener = new HashSet<>();
private Cache<Long, SettableFuture<FlowExtCompletedOperation>> pendingExtendFutures = CacheBuilder
.newBuilder()
.expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
// .removalListener(new TimeoutFuture())
.build();
private final ExecutorService futureListeners = Executors
.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(FlowExtCompletedOperation.class)
.register(FlowRuleBatchRequest.class)
.register(DownStreamFlowEntry.class)
.register(DefaultFlowRuleExt.class)
.build();
}
};
private ReplicaInfoEventListener replicaInfoEventListener;
@Activate
public void activate() {
clusterCommunicator.addSubscriber(APPLY_EXTEND_FLOWS,
new ClusterMessageHandler() {
@Override
public void handle(ClusterMessage message) {
// decode the extended flow entry and store them in memory.
FlowRuleBatchRequest operation = SERIALIZER.decode(message.payload());
log.info("received batch request {}", operation);
final ListenableFuture<FlowExtCompletedOperation> f = applyBatchInternal(operation);
f.addListener(new Runnable() {
@Override
public void run() {
FlowExtCompletedOperation result = Futures.getUnchecked(f);
try {
message.respond(SERIALIZER.encode(result));
} catch (IOException e) {
log.error("Failed to respond back", e);
}
}
}, futureListeners);
}
});
replicaInfoManager.addListener(replicaInfoEventListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
clusterCommunicator.removeSubscriber(APPLY_EXTEND_FLOWS);
replicaInfoManager.removeListener(replicaInfoEventListener);
log.info("Stopped");
}
/**
* apply the sub batch of flow extension rules.
*
* @param batchOperation batch of flow rules.
* A batch can contain flow rules for a single device only.
* @return Future response indicating success/failure of the batch operation
* all the way down to the device.
*/
@Override
public Future<FlowExtCompletedOperation> applySubBatch(FlowRuleBatchRequest batchOperation) {
// TODO Auto-generated method stub
if (batchOperation.ops().isEmpty()) {
return Futures.immediateFuture(new FlowExtCompletedOperation(
batchOperation.batchId(), true, Collections.emptySet()));
}
// get the deviceId all the collection belongs to
DeviceId deviceId = getBatchDeviceId(batchOperation.ops());
if (deviceId == null) {
log.error("This Batch exists more than two deviceId");
return null;
}
ReplicaInfo replicaInfo = replicaInfoManager
.getReplicaInfoFor(deviceId);
if (replicaInfo.master().get()
.equals(clusterService.getLocalNode().id())) {
return applyBatchInternal(batchOperation);
}
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(clusterService
.getLocalNode().id(), APPLY_EXTEND_FLOWS, SERIALIZER.encode(batchOperation));
try {
ListenableFuture<byte[]> responseFuture = clusterCommunicator
.sendAndReceive(message, replicaInfo.master().get());
// here should add another decode process
return Futures.transform(responseFuture,
new DecodeTo<FlowExtCompletedOperation>(SERIALIZER));
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
}
/**
* apply the batch in local node.
* It means this instance is master of the device the flow entry belongs to.
*
* @param batchOperation a collection of flow entry, all they should send down to one device
* @return Future response indicating success/failure of the batch operation
* all the way down to the device.
*/
private ListenableFuture<FlowExtCompletedOperation> applyBatchInternal(FlowRuleBatchRequest batchOperation) {
SettableFuture<FlowExtCompletedOperation> r = SettableFuture.create();
pendingExtendFutures.put(batchOperation.batchId(), r);
// here should notify manager to complete
notify(batchOperation);
return r;
}
/**
* Get the deviceId of this batch.
* The whole Batch should belong to one deviceId.
*
* @param batchOperation a collection of flow entry, all they should send down to one device
* @return the deviceId the whole batch belongs to
*/
private DeviceId getBatchDeviceId(Collection<FlowRuleBatchEntry> batchOperation) {
Iterator<FlowRuleBatchEntry> head = batchOperation.iterator();
FlowRuleBatchEntry headOp = head.next();
boolean sameId = true;
for (FlowRuleBatchEntry operation : batchOperation) {
if (operation.target().deviceId() != headOp.target().deviceId()) {
log.warn("this batch does not apply on one device Id ");
sameId = false;
break;
}
}
return sameId ? headOp.target().deviceId() : null;
}
/**
* Notify the listener of Router to do some reaction.
*
* @param request the requested operation to do
*/
public void notify(FlowRuleBatchRequest request) {
for (FlowRuleExtRouterListener listener : routerListener) {
listener.notify(FlowRuleBatchEvent
// TODO fill in the deviceId
.requested(request, null));
}
}
/**
* Invoked on the completion of a storeBatch operation.
*
* @param event flow rule batch event
*/
@Override
public void batchOperationComplete(FlowRuleBatchEvent event) {
// TODO Auto-generated method stub
final Long batchId = event.subject().batchId();
SettableFuture<FlowExtCompletedOperation> future = pendingExtendFutures
.getIfPresent(batchId);
if (future != null) {
FlowRuleBatchRequest request = event.subject();
CompletedBatchOperation result = event.result();
FlowExtCompletedOperation completed =
new FlowExtCompletedOperation(request.batchId(), result.isSuccess(), result.failedItems());
future.set(completed);
pendingExtendFutures.invalidate(batchId);
}
}
/**
* Register the listener to monitor Router,
* The Router find master to send downStream.
*
* @param listener the listener to register
*/
@Override
public void addListener(FlowRuleExtRouterListener listener) {
routerListener.add(listener);
}
/**
* Remove the listener of Router.
*
* @param listener the listener to remove
*/
@Override
public void removeListener(FlowRuleExtRouterListener listener) {
routerListener.remove(listener);
}
}