Implementation of new Flow Subsystem:
The subsystem no longer returns futures for tracking completion of work.
Notifications are explicitely returned via a call back mechanism. Also, the
subsystem is now asynchronous.
Change-Id: I1a4cef931c24820f9ae9ed9a5398f163f05dfbc9
more flowservice improvements
Change-Id: I5c9c1b6be4b2ebfa523b64f6f52e7634b7d3e05f
more flowservice impl
Change-Id: I05f6774460effb53ced8c36844bcda2f8f6c096f
Manager to store functional (at least i believe it)
Change-Id: I09b04989bd1004c98fe0bafed4c76714b9155d53
flow subsystem functional: need to fix unit tests
Change-Id: I1667f25b91320f625a03e5e1d5e92823184d9de0
flow subsystem functional
Change-Id: I429b3335c16d4fc16f5d55f233dd37c4d1d6111d
finished refactor of flow subsystem
Change-Id: I1899abc6ff6a974a2018d936cc555049c70a6804
fix for null flow provider to use new api
Change-Id: If2fd9bd5baf74d9c61c5c8085cef8bc2d204cbdc
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 6d6cd9a..158764f 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -21,7 +21,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
-
+import com.google.common.util.concurrent.Futures;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -29,22 +29,25 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
import org.onosproject.event.AbstractListenerRegistry;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onosproject.net.flow.FlowRuleBatchEvent;
import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleBatchRequest;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleOperation;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
import org.onosproject.net.flow.FlowRuleProvider;
import org.onosproject.net.flow.FlowRuleProviderRegistry;
import org.onosproject.net.flow.FlowRuleProviderService;
@@ -55,18 +58,16 @@
import org.onosproject.net.provider.AbstractProviderService;
import org.slf4j.Logger;
-import java.util.HashSet;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -90,7 +91,16 @@
private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
- private ExecutorService futureService;
+ protected ExecutorService deviceInstallers =
+ Executors.newCachedThreadPool(namedThreads("onos-device-installer-%d"));
+
+ protected ExecutorService operationsService =
+ Executors.newFixedThreadPool(32, namedThreads("onos-flowservice-operations-%d"));
+
+ private IdGenerator idGenerator;
+
+ private Map<Long, FlowOperationsProcessor> pendingFlowOperations = new
+ ConcurrentHashMap<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleStore store;
@@ -101,10 +111,15 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
@Activate
public void activate() {
- futureService =
- Executors.newFixedThreadPool(32, namedThreads("onos-provider-future-listeners-%d"));
+
+ idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
+
+
store.setDelegate(delegate);
eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
log.info("Started");
@@ -112,8 +127,8 @@
@Deactivate
public void deactivate() {
- futureService.shutdownNow();
-
+ deviceInstallers.shutdownNow();
+ operationsService.shutdownNow();
store.unsetDelegate(delegate);
eventDispatcher.removeSink(FlowRuleEvent.class);
log.info("Stopped");
@@ -131,20 +146,20 @@
@Override
public void applyFlowRules(FlowRule... flowRules) {
- Set<FlowRuleBatchEntry> toAddBatchEntries = Sets.newHashSet();
+ FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
for (int i = 0; i < flowRules.length; i++) {
- toAddBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, flowRules[i]));
+ builder.add(flowRules[i]);
}
- applyBatch(new FlowRuleBatchOperation(toAddBatchEntries));
+ apply(builder.build());
}
@Override
public void removeFlowRules(FlowRule... flowRules) {
- Set<FlowRuleBatchEntry> toRemoveBatchEntries = Sets.newHashSet();
+ FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
for (int i = 0; i < flowRules.length; i++) {
- toRemoveBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, flowRules[i]));
+ builder.remove(flowRules[i]);
}
- applyBatch(new FlowRuleBatchOperation(toRemoveBatchEntries));
+ apply(builder.build());
}
@Override
@@ -180,23 +195,38 @@
}
@Override
- public Future<CompletedBatchOperation> applyBatch(
- FlowRuleBatchOperation batch) {
- Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
- ArrayListMultimap.create();
- List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
- for (FlowRuleBatchEntry fbe : batch.getOperations()) {
- final FlowRule f = fbe.target();
- perDeviceBatches.put(f.deviceId(), fbe);
- }
+ public Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch) {
- for (DeviceId deviceId : perDeviceBatches.keySet()) {
- FlowRuleBatchOperation b =
- new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
- Future<CompletedBatchOperation> future = store.storeBatch(b);
- futures.add(future);
- }
- return new FlowRuleBatchFuture(futures, perDeviceBatches);
+
+ FlowRuleOperations.Builder fopsBuilder = FlowRuleOperations.builder();
+ batch.getOperations().stream().forEach(op -> {
+ switch (op.getOperator()) {
+ case ADD:
+ fopsBuilder.add(op.getTarget());
+ break;
+ case REMOVE:
+ fopsBuilder.remove(op.getTarget());
+ break;
+ case MODIFY:
+ fopsBuilder.modify(op.getTarget());
+ break;
+ default:
+ log.warn("Unknown flow operation operator: {}", op.getOperator());
+
+ }
+ }
+ );
+
+ apply(fopsBuilder.build());
+ return Futures.immediateFuture(
+ new CompletedBatchOperation(true,
+ Collections.emptySet(), null));
+
+ }
+
+ @Override
+ public void apply(FlowRuleOperations ops) {
+ operationsService.submit(new FlowOperationsProcessor(ops));
}
@Override
@@ -373,13 +403,19 @@
}
}
+
+ @Override
+ public void batchOperationCompleted(long batchId, CompletedBatchOperation operation) {
+ store.batchOperationComplete(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(batchId, Collections.emptySet()),
+ operation
+ ));
+ }
}
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements FlowRuleStoreDelegate {
- // FIXME set appropriate default and make it configurable
- private static final int TIMEOUT_PER_OP = 500; // ms
// TODO: Right now we only dispatch events at individual flowEntry level.
// It may be more efficient for also dispatch events as a batch.
@@ -389,47 +425,55 @@
switch (event.type()) {
case BATCH_OPERATION_REQUESTED:
// Request has been forwarded to MASTER Node, and was
- for (FlowRule entry : request.toAdd()) {
- eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
- }
- for (FlowRule entry : request.toRemove()) {
- eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
- }
- // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
+ request.ops().stream().forEach(
+ op -> {
+ switch (op.getOperator()) {
- FlowRuleBatchOperation batchOperation = request.asBatchOperation();
+ case ADD:
+ eventDispatcher.post(
+ new FlowRuleEvent(
+ FlowRuleEvent.Type.RULE_ADD_REQUESTED,
+ op.getTarget()));
+ break;
+ case REMOVE:
+ eventDispatcher.post(
+ new FlowRuleEvent(
+ FlowRuleEvent.Type.RULE_REMOVE_REQUESTED,
+ op.getTarget()));
+ break;
+ case MODIFY:
+ //TODO: do something here when the time comes.
+ break;
+ default:
+ log.warn("Unknown flow operation operator: {}", op.getOperator());
+ }
+ }
+ );
+
+ DeviceId deviceId = event.deviceId();
+
+ FlowRuleBatchOperation batchOperation =
+ request.asBatchOperation(deviceId);
FlowRuleProvider flowRuleProvider =
- getProvider(batchOperation.getOperations().get(0).target().deviceId());
- final Future<CompletedBatchOperation> result =
- flowRuleProvider.executeBatch(batchOperation);
- futureService.submit(new Runnable() {
- @Override
- public void run() {
- CompletedBatchOperation res;
- try {
- res = result.get(TIMEOUT_PER_OP * batchOperation.size(), TimeUnit.MILLISECONDS);
- store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
- } catch (TimeoutException | InterruptedException | ExecutionException e) {
- log.warn("Something went wrong with the batch operation {}",
- request.batchId(), e);
+ getProvider(deviceId);
- Set<FlowRule> failures = new HashSet<>(batchOperation.size());
- for (FlowRuleBatchEntry op : batchOperation.getOperations()) {
- failures.add(op.target());
- }
- res = new CompletedBatchOperation(false, failures);
- store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
- }
- }
- });
+ flowRuleProvider.executeBatch(batchOperation);
+
break;
case BATCH_OPERATION_COMPLETED:
- // MASTER Node has pushed the batch down to the Device
- // Note: RULE_ADDED will be posted
- // when Flow was actually confirmed by stats reply.
+ FlowOperationsProcessor fops = pendingFlowOperations.remove(
+ event.subject().batchId());
+ if (event.result().isSuccess()) {
+ if (fops != null) {
+ fops.satisfy(event.deviceId());
+ }
+ } else {
+ fops.fail(event.deviceId(), event.result().failedItems());
+ }
+
break;
default:
@@ -438,141 +482,100 @@
}
}
- private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
+ private class FlowOperationsProcessor implements Runnable {
- private final List<Future<CompletedBatchOperation>> futures;
- private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
- private final AtomicReference<BatchState> state;
- private CompletedBatchOperation overall;
+ private final List<Set<FlowRuleOperation>> stages;
+ private final FlowRuleOperationsContext context;
+ private final FlowRuleOperations fops;
+ private final AtomicBoolean hasFailed = new AtomicBoolean(false);
- public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
- Multimap<DeviceId, FlowRuleBatchEntry> batches) {
- this.futures = futures;
- this.batches = batches;
- this.state = new AtomicReference<>(BatchState.STARTED);
- }
+ private Set<DeviceId> pendingDevices;
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (state.get() == BatchState.FINISHED) {
- return false;
- }
- if (log.isDebugEnabled()) {
- log.debug("Cancelling FlowRuleBatchFuture",
- new RuntimeException("Just printing backtrace"));
- }
- if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
- return false;
- }
- cleanUpBatch();
- for (Future<CompletedBatchOperation> f : futures) {
- f.cancel(mayInterruptIfRunning);
- }
- return true;
- }
+ public FlowOperationsProcessor(FlowRuleOperations ops) {
- @Override
- public boolean isCancelled() {
- return state.get() == BatchState.CANCELLED;
- }
+ this.stages = Lists.newArrayList(ops.stages());
+ this.context = ops.callback();
+ this.fops = ops;
+ pendingDevices = Sets.newConcurrentHashSet();
- @Override
- public boolean isDone() {
- return state.get() == BatchState.FINISHED;
- }
-
-
- @Override
- public CompletedBatchOperation get() throws InterruptedException,
- ExecutionException {
-
- if (isDone()) {
- return overall;
- }
-
- boolean success = true;
- Set<FlowRule> failed = Sets.newHashSet();
- Set<Long> failedIds = Sets.newHashSet();
- CompletedBatchOperation completed;
- for (Future<CompletedBatchOperation> future : futures) {
- completed = future.get();
- success = validateBatchOperation(failed, failedIds, completed);
- }
-
- return finalizeBatchOperation(success, failed, failedIds);
}
@Override
- public CompletedBatchOperation get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException,
- TimeoutException {
-
- if (isDone()) {
- return overall;
- }
- boolean success = true;
- Set<FlowRule> failed = Sets.newHashSet();
- Set<Long> failedIds = Sets.newHashSet();
- CompletedBatchOperation completed;
- for (Future<CompletedBatchOperation> future : futures) {
- completed = future.get(timeout, unit);
- success = validateBatchOperation(failed, failedIds, completed);
- }
- return finalizeBatchOperation(success, failed, failedIds);
- }
-
- private boolean validateBatchOperation(Set<FlowRule> failed,
- Set<Long> failedIds,
- CompletedBatchOperation completed) {
-
- if (isCancelled()) {
- throw new CancellationException();
- }
- if (!completed.isSuccess()) {
- log.warn("FlowRuleBatch failed: {}", completed);
- failed.addAll(completed.failedItems());
- failedIds.addAll(completed.failedIds());
- cleanUpBatch();
- cancelAllSubBatches();
- return false;
- }
- return true;
- }
-
- private void cancelAllSubBatches() {
- for (Future<CompletedBatchOperation> f : futures) {
- f.cancel(true);
+ public void run() {
+ if (stages.size() > 0) {
+ process(stages.remove(0));
+ } else if (!hasFailed.get() && context != null) {
+ context.onSuccess(fops);
}
}
- private CompletedBatchOperation finalizeBatchOperation(boolean success,
- Set<FlowRule> failed,
- Set<Long> failedIds) {
- synchronized (this) {
- if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
- if (state.get() == BatchState.FINISHED) {
- return overall;
+ private void process(Set<FlowRuleOperation> ops) {
+ Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
+ ArrayListMultimap.create();
+
+ FlowRuleBatchEntry fbe;
+ for (FlowRuleOperation flowRuleOperation : ops) {
+ switch (flowRuleOperation.type()) {
+ // FIXME: Brian needs imagination when creating class names.
+ case ADD:
+ fbe = new FlowRuleBatchEntry(
+ FlowRuleBatchEntry.FlowRuleOperation.ADD, flowRuleOperation.rule());
+ break;
+ case MODIFY:
+ fbe = new FlowRuleBatchEntry(
+ FlowRuleBatchEntry.FlowRuleOperation.MODIFY, flowRuleOperation.rule());
+ break;
+ case REMOVE:
+ fbe = new FlowRuleBatchEntry(
+ FlowRuleBatchEntry.FlowRuleOperation.REMOVE, flowRuleOperation.rule());
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown flow rule type " + flowRuleOperation.type());
+ }
+ pendingDevices.add(flowRuleOperation.rule().deviceId());
+ perDeviceBatches.put(flowRuleOperation.rule().deviceId(), fbe);
+ }
+
+
+ for (DeviceId deviceId : perDeviceBatches.keySet()) {
+ Long id = idGenerator.getNewId();
+ final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
+ deviceId, id);
+ pendingFlowOperations.put(id, this);
+ deviceInstallers.submit(new Runnable() {
+ @Override
+ public void run() {
+ store.storeBatch(b);
}
- throw new CancellationException();
- }
- overall = new CompletedBatchOperation(success, failed, failedIds);
- return overall;
+ });
}
}
- private void cleanUpBatch() {
- log.debug("cleaning up batch");
- // TODO convert these into a batch?
- for (FlowRuleBatchEntry fbe : batches.values()) {
- if (fbe.operator() == FlowRuleOperation.ADD ||
- fbe.operator() == FlowRuleOperation.MODIFY) {
- store.deleteFlowRule(fbe.target());
- } else if (fbe.operator() == FlowRuleOperation.REMOVE) {
- store.removeFlowRule(new DefaultFlowEntry(fbe.target()));
- store.storeFlowRule(fbe.target());
- }
+ public void satisfy(DeviceId devId) {
+ pendingDevices.remove(devId);
+ if (pendingDevices.isEmpty()) {
+ operationsService.submit(this);
}
}
+
+
+
+ public void fail(DeviceId devId, Set<? extends FlowRule> failures) {
+ hasFailed.set(true);
+ pendingDevices.remove(devId);
+ if (pendingDevices.isEmpty()) {
+ operationsService.submit(this);
+ }
+
+ if (context != null) {
+ final FlowRuleOperations.Builder failedOpsBuilder =
+ FlowRuleOperations.builder();
+ failures.stream().forEach(failedOpsBuilder::add);
+
+ context.onError(failedOpsBuilder.build());
+ }
+ }
+
}
}