Implementation of new Flow Subsystem:
The subsystem no longer returns futures for tracking completion of work.
Notifications are explicitely returned via a call back mechanism. Also, the
subsystem is now asynchronous.
Change-Id: I1a4cef931c24820f9ae9ed9a5398f163f05dfbc9
more flowservice improvements
Change-Id: I5c9c1b6be4b2ebfa523b64f6f52e7634b7d3e05f
more flowservice impl
Change-Id: I05f6774460effb53ced8c36844bcda2f8f6c096f
Manager to store functional (at least i believe it)
Change-Id: I09b04989bd1004c98fe0bafed4c76714b9155d53
flow subsystem functional: need to fix unit tests
Change-Id: I1667f25b91320f625a03e5e1d5e92823184d9de0
flow subsystem functional
Change-Id: I429b3335c16d4fc16f5d55f233dd37c4d1d6111d
finished refactor of flow subsystem
Change-Id: I1899abc6ff6a974a2018d936cc555049c70a6804
fix for null flow provider to use new api
Change-Id: If2fd9bd5baf74d9c61c5c8085cef8bc2d204cbdc
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 6d6cd9a..158764f 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -21,7 +21,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
-
+import com.google.common.util.concurrent.Futures;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -29,22 +29,25 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
import org.onosproject.event.AbstractListenerRegistry;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onosproject.net.flow.FlowRuleBatchEvent;
import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleBatchRequest;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleOperation;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
import org.onosproject.net.flow.FlowRuleProvider;
import org.onosproject.net.flow.FlowRuleProviderRegistry;
import org.onosproject.net.flow.FlowRuleProviderService;
@@ -55,18 +58,16 @@
import org.onosproject.net.provider.AbstractProviderService;
import org.slf4j.Logger;
-import java.util.HashSet;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -90,7 +91,16 @@
private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
- private ExecutorService futureService;
+ protected ExecutorService deviceInstallers =
+ Executors.newCachedThreadPool(namedThreads("onos-device-installer-%d"));
+
+ protected ExecutorService operationsService =
+ Executors.newFixedThreadPool(32, namedThreads("onos-flowservice-operations-%d"));
+
+ private IdGenerator idGenerator;
+
+ private Map<Long, FlowOperationsProcessor> pendingFlowOperations = new
+ ConcurrentHashMap<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleStore store;
@@ -101,10 +111,15 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
@Activate
public void activate() {
- futureService =
- Executors.newFixedThreadPool(32, namedThreads("onos-provider-future-listeners-%d"));
+
+ idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
+
+
store.setDelegate(delegate);
eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
log.info("Started");
@@ -112,8 +127,8 @@
@Deactivate
public void deactivate() {
- futureService.shutdownNow();
-
+ deviceInstallers.shutdownNow();
+ operationsService.shutdownNow();
store.unsetDelegate(delegate);
eventDispatcher.removeSink(FlowRuleEvent.class);
log.info("Stopped");
@@ -131,20 +146,20 @@
@Override
public void applyFlowRules(FlowRule... flowRules) {
- Set<FlowRuleBatchEntry> toAddBatchEntries = Sets.newHashSet();
+ FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
for (int i = 0; i < flowRules.length; i++) {
- toAddBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, flowRules[i]));
+ builder.add(flowRules[i]);
}
- applyBatch(new FlowRuleBatchOperation(toAddBatchEntries));
+ apply(builder.build());
}
@Override
public void removeFlowRules(FlowRule... flowRules) {
- Set<FlowRuleBatchEntry> toRemoveBatchEntries = Sets.newHashSet();
+ FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
for (int i = 0; i < flowRules.length; i++) {
- toRemoveBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, flowRules[i]));
+ builder.remove(flowRules[i]);
}
- applyBatch(new FlowRuleBatchOperation(toRemoveBatchEntries));
+ apply(builder.build());
}
@Override
@@ -180,23 +195,38 @@
}
@Override
- public Future<CompletedBatchOperation> applyBatch(
- FlowRuleBatchOperation batch) {
- Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
- ArrayListMultimap.create();
- List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
- for (FlowRuleBatchEntry fbe : batch.getOperations()) {
- final FlowRule f = fbe.target();
- perDeviceBatches.put(f.deviceId(), fbe);
- }
+ public Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch) {
- for (DeviceId deviceId : perDeviceBatches.keySet()) {
- FlowRuleBatchOperation b =
- new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
- Future<CompletedBatchOperation> future = store.storeBatch(b);
- futures.add(future);
- }
- return new FlowRuleBatchFuture(futures, perDeviceBatches);
+
+ FlowRuleOperations.Builder fopsBuilder = FlowRuleOperations.builder();
+ batch.getOperations().stream().forEach(op -> {
+ switch (op.getOperator()) {
+ case ADD:
+ fopsBuilder.add(op.getTarget());
+ break;
+ case REMOVE:
+ fopsBuilder.remove(op.getTarget());
+ break;
+ case MODIFY:
+ fopsBuilder.modify(op.getTarget());
+ break;
+ default:
+ log.warn("Unknown flow operation operator: {}", op.getOperator());
+
+ }
+ }
+ );
+
+ apply(fopsBuilder.build());
+ return Futures.immediateFuture(
+ new CompletedBatchOperation(true,
+ Collections.emptySet(), null));
+
+ }
+
+ @Override
+ public void apply(FlowRuleOperations ops) {
+ operationsService.submit(new FlowOperationsProcessor(ops));
}
@Override
@@ -373,13 +403,19 @@
}
}
+
+ @Override
+ public void batchOperationCompleted(long batchId, CompletedBatchOperation operation) {
+ store.batchOperationComplete(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(batchId, Collections.emptySet()),
+ operation
+ ));
+ }
}
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements FlowRuleStoreDelegate {
- // FIXME set appropriate default and make it configurable
- private static final int TIMEOUT_PER_OP = 500; // ms
// TODO: Right now we only dispatch events at individual flowEntry level.
// It may be more efficient for also dispatch events as a batch.
@@ -389,47 +425,55 @@
switch (event.type()) {
case BATCH_OPERATION_REQUESTED:
// Request has been forwarded to MASTER Node, and was
- for (FlowRule entry : request.toAdd()) {
- eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
- }
- for (FlowRule entry : request.toRemove()) {
- eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
- }
- // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
+ request.ops().stream().forEach(
+ op -> {
+ switch (op.getOperator()) {
- FlowRuleBatchOperation batchOperation = request.asBatchOperation();
+ case ADD:
+ eventDispatcher.post(
+ new FlowRuleEvent(
+ FlowRuleEvent.Type.RULE_ADD_REQUESTED,
+ op.getTarget()));
+ break;
+ case REMOVE:
+ eventDispatcher.post(
+ new FlowRuleEvent(
+ FlowRuleEvent.Type.RULE_REMOVE_REQUESTED,
+ op.getTarget()));
+ break;
+ case MODIFY:
+ //TODO: do something here when the time comes.
+ break;
+ default:
+ log.warn("Unknown flow operation operator: {}", op.getOperator());
+ }
+ }
+ );
+
+ DeviceId deviceId = event.deviceId();
+
+ FlowRuleBatchOperation batchOperation =
+ request.asBatchOperation(deviceId);
FlowRuleProvider flowRuleProvider =
- getProvider(batchOperation.getOperations().get(0).target().deviceId());
- final Future<CompletedBatchOperation> result =
- flowRuleProvider.executeBatch(batchOperation);
- futureService.submit(new Runnable() {
- @Override
- public void run() {
- CompletedBatchOperation res;
- try {
- res = result.get(TIMEOUT_PER_OP * batchOperation.size(), TimeUnit.MILLISECONDS);
- store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
- } catch (TimeoutException | InterruptedException | ExecutionException e) {
- log.warn("Something went wrong with the batch operation {}",
- request.batchId(), e);
+ getProvider(deviceId);
- Set<FlowRule> failures = new HashSet<>(batchOperation.size());
- for (FlowRuleBatchEntry op : batchOperation.getOperations()) {
- failures.add(op.target());
- }
- res = new CompletedBatchOperation(false, failures);
- store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
- }
- }
- });
+ flowRuleProvider.executeBatch(batchOperation);
+
break;
case BATCH_OPERATION_COMPLETED:
- // MASTER Node has pushed the batch down to the Device
- // Note: RULE_ADDED will be posted
- // when Flow was actually confirmed by stats reply.
+ FlowOperationsProcessor fops = pendingFlowOperations.remove(
+ event.subject().batchId());
+ if (event.result().isSuccess()) {
+ if (fops != null) {
+ fops.satisfy(event.deviceId());
+ }
+ } else {
+ fops.fail(event.deviceId(), event.result().failedItems());
+ }
+
break;
default:
@@ -438,141 +482,100 @@
}
}
- private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
+ private class FlowOperationsProcessor implements Runnable {
- private final List<Future<CompletedBatchOperation>> futures;
- private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
- private final AtomicReference<BatchState> state;
- private CompletedBatchOperation overall;
+ private final List<Set<FlowRuleOperation>> stages;
+ private final FlowRuleOperationsContext context;
+ private final FlowRuleOperations fops;
+ private final AtomicBoolean hasFailed = new AtomicBoolean(false);
- public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
- Multimap<DeviceId, FlowRuleBatchEntry> batches) {
- this.futures = futures;
- this.batches = batches;
- this.state = new AtomicReference<>(BatchState.STARTED);
- }
+ private Set<DeviceId> pendingDevices;
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (state.get() == BatchState.FINISHED) {
- return false;
- }
- if (log.isDebugEnabled()) {
- log.debug("Cancelling FlowRuleBatchFuture",
- new RuntimeException("Just printing backtrace"));
- }
- if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
- return false;
- }
- cleanUpBatch();
- for (Future<CompletedBatchOperation> f : futures) {
- f.cancel(mayInterruptIfRunning);
- }
- return true;
- }
+ public FlowOperationsProcessor(FlowRuleOperations ops) {
- @Override
- public boolean isCancelled() {
- return state.get() == BatchState.CANCELLED;
- }
+ this.stages = Lists.newArrayList(ops.stages());
+ this.context = ops.callback();
+ this.fops = ops;
+ pendingDevices = Sets.newConcurrentHashSet();
- @Override
- public boolean isDone() {
- return state.get() == BatchState.FINISHED;
- }
-
-
- @Override
- public CompletedBatchOperation get() throws InterruptedException,
- ExecutionException {
-
- if (isDone()) {
- return overall;
- }
-
- boolean success = true;
- Set<FlowRule> failed = Sets.newHashSet();
- Set<Long> failedIds = Sets.newHashSet();
- CompletedBatchOperation completed;
- for (Future<CompletedBatchOperation> future : futures) {
- completed = future.get();
- success = validateBatchOperation(failed, failedIds, completed);
- }
-
- return finalizeBatchOperation(success, failed, failedIds);
}
@Override
- public CompletedBatchOperation get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException,
- TimeoutException {
-
- if (isDone()) {
- return overall;
- }
- boolean success = true;
- Set<FlowRule> failed = Sets.newHashSet();
- Set<Long> failedIds = Sets.newHashSet();
- CompletedBatchOperation completed;
- for (Future<CompletedBatchOperation> future : futures) {
- completed = future.get(timeout, unit);
- success = validateBatchOperation(failed, failedIds, completed);
- }
- return finalizeBatchOperation(success, failed, failedIds);
- }
-
- private boolean validateBatchOperation(Set<FlowRule> failed,
- Set<Long> failedIds,
- CompletedBatchOperation completed) {
-
- if (isCancelled()) {
- throw new CancellationException();
- }
- if (!completed.isSuccess()) {
- log.warn("FlowRuleBatch failed: {}", completed);
- failed.addAll(completed.failedItems());
- failedIds.addAll(completed.failedIds());
- cleanUpBatch();
- cancelAllSubBatches();
- return false;
- }
- return true;
- }
-
- private void cancelAllSubBatches() {
- for (Future<CompletedBatchOperation> f : futures) {
- f.cancel(true);
+ public void run() {
+ if (stages.size() > 0) {
+ process(stages.remove(0));
+ } else if (!hasFailed.get() && context != null) {
+ context.onSuccess(fops);
}
}
- private CompletedBatchOperation finalizeBatchOperation(boolean success,
- Set<FlowRule> failed,
- Set<Long> failedIds) {
- synchronized (this) {
- if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
- if (state.get() == BatchState.FINISHED) {
- return overall;
+ private void process(Set<FlowRuleOperation> ops) {
+ Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
+ ArrayListMultimap.create();
+
+ FlowRuleBatchEntry fbe;
+ for (FlowRuleOperation flowRuleOperation : ops) {
+ switch (flowRuleOperation.type()) {
+ // FIXME: Brian needs imagination when creating class names.
+ case ADD:
+ fbe = new FlowRuleBatchEntry(
+ FlowRuleBatchEntry.FlowRuleOperation.ADD, flowRuleOperation.rule());
+ break;
+ case MODIFY:
+ fbe = new FlowRuleBatchEntry(
+ FlowRuleBatchEntry.FlowRuleOperation.MODIFY, flowRuleOperation.rule());
+ break;
+ case REMOVE:
+ fbe = new FlowRuleBatchEntry(
+ FlowRuleBatchEntry.FlowRuleOperation.REMOVE, flowRuleOperation.rule());
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown flow rule type " + flowRuleOperation.type());
+ }
+ pendingDevices.add(flowRuleOperation.rule().deviceId());
+ perDeviceBatches.put(flowRuleOperation.rule().deviceId(), fbe);
+ }
+
+
+ for (DeviceId deviceId : perDeviceBatches.keySet()) {
+ Long id = idGenerator.getNewId();
+ final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
+ deviceId, id);
+ pendingFlowOperations.put(id, this);
+ deviceInstallers.submit(new Runnable() {
+ @Override
+ public void run() {
+ store.storeBatch(b);
}
- throw new CancellationException();
- }
- overall = new CompletedBatchOperation(success, failed, failedIds);
- return overall;
+ });
}
}
- private void cleanUpBatch() {
- log.debug("cleaning up batch");
- // TODO convert these into a batch?
- for (FlowRuleBatchEntry fbe : batches.values()) {
- if (fbe.operator() == FlowRuleOperation.ADD ||
- fbe.operator() == FlowRuleOperation.MODIFY) {
- store.deleteFlowRule(fbe.target());
- } else if (fbe.operator() == FlowRuleOperation.REMOVE) {
- store.removeFlowRule(new DefaultFlowEntry(fbe.target()));
- store.storeFlowRule(fbe.target());
- }
+ public void satisfy(DeviceId devId) {
+ pendingDevices.remove(devId);
+ if (pendingDevices.isEmpty()) {
+ operationsService.submit(this);
}
}
+
+
+
+ public void fail(DeviceId devId, Set<? extends FlowRule> failures) {
+ hasFailed.set(true);
+ pendingDevices.remove(devId);
+ if (pendingDevices.isEmpty()) {
+ operationsService.submit(this);
+ }
+
+ if (context != null) {
+ final FlowRuleOperations.Builder failedOpsBuilder =
+ FlowRuleOperations.builder();
+ failures.stream().forEach(failedOpsBuilder::add);
+
+ context.onError(failedOpsBuilder.build());
+ }
+ }
+
}
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 8b6d8ad..685fa70 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -1151,6 +1151,7 @@
*/
protected Future<CompletedBatchOperation> applyNextBatch(List<CompletedIntentUpdate> updates) {
//TODO test this. (also, maybe save this batch)
+
FlowRuleBatchOperation batch = createFlowRuleBatchOperation(updates);
if (batch.size() > 0) {
//FIXME apply batch might throw an exception
@@ -1165,7 +1166,7 @@
}
private FlowRuleBatchOperation createFlowRuleBatchOperation(List<CompletedIntentUpdate> intentUpdates) {
- FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList());
+ FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList(), null, 0);
for (CompletedIntentUpdate update : intentUpdates) {
FlowRuleBatchOperation currentBatch = update.currentBatch();
if (currentBatch != null) {
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java
index 583d7e3..816fc12 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java
@@ -98,6 +98,7 @@
outputPorts.put(egressPoint.deviceId(), egressPoint.port());
}
+ //FIXME change to new api
FlowRuleBatchOperation batchOperation =
new FlowRuleBatchOperation(outputPorts
.keys()
@@ -105,7 +106,7 @@
.map(deviceId -> createBatchEntry(operation,
intent, deviceId,
outputPorts.get(deviceId)))
- .collect(Collectors.toList()));
+ .collect(Collectors.toList()), null, 0);
return Collections.singletonList(batchOperation);
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java
index e958c58..5b226a3 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java
@@ -181,6 +181,7 @@
true);
rules.add(new FlowRuleBatchEntry(operation, rule));
- return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ //FIXME change to new api
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
}
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java
index 1c8ea7e..b90be50 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java
@@ -108,7 +108,8 @@
intent.id().fingerprint()));
prev = link.dst();
}
- return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ //FIXME this should change to new api.
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
}
@Override
@@ -138,7 +139,8 @@
intent.id().fingerprint()));
prev = link.dst();
}
- return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ // FIXME this should change to new api
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
}
@Override
diff --git a/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java b/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
index c3f9ba0..87fc585 100644
--- a/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
+++ b/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
@@ -31,7 +31,7 @@
@Override
@SuppressWarnings("unchecked")
- public void post(Event event) {
+ public synchronized void post(Event event) {
EventSink sink = getSink(event.getClass());
checkState(sink != null, "No sink for event %s", event);
sink.process(event);
diff --git a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
index 5a79b43..3fd8b5a 100644
--- a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
@@ -20,12 +20,15 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
-
+import com.google.common.util.concurrent.MoreExecutors;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.core.IdGenerator;
+import org.onosproject.core.Version;
import org.onosproject.event.impl.TestEventDispatcher;
import org.onosproject.net.DefaultDevice;
import org.onosproject.net.Device;
@@ -36,7 +39,6 @@
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceServiceAdapter;
-import org.onosproject.net.flow.BatchOperation;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.DefaultFlowRule;
@@ -72,6 +74,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.*;
import static org.onosproject.net.flow.FlowRuleEvent.Type.*;
@@ -97,12 +100,16 @@
protected TestListener listener = new TestListener();
private ApplicationId appId;
+
@Before
public void setUp() {
mgr = new FlowRuleManager();
mgr.store = new SimpleFlowRuleStore();
mgr.eventDispatcher = new TestEventDispatcher();
mgr.deviceService = new TestDeviceService();
+ mgr.coreService = new TestCoreService();
+ mgr.operationsService = MoreExecutors.newDirectExecutorService();
+ mgr.deviceInstallers = MoreExecutors.newDirectExecutorService();
service = mgr;
registry = mgr;
@@ -246,14 +253,23 @@
@Test
public void flowRemoved() {
+
FlowRule f1 = addFlowRule(1);
FlowRule f2 = addFlowRule(2);
StoredFlowEntry fe1 = new DefaultFlowEntry(f1);
FlowEntry fe2 = new DefaultFlowEntry(f2);
+
+
providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2));
service.removeFlowRules(f1);
+
fe1.setState(FlowEntryState.REMOVED);
+
+
+
providerService.flowRemoved(fe1);
+
+
validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED,
RULE_ADDED, RULE_REMOVE_REQUESTED, RULE_REMOVED);
@@ -263,11 +279,13 @@
FlowRule f3 = flowRule(3, 3);
FlowEntry fe3 = new DefaultFlowEntry(f3);
service.applyFlowRules(f3);
+
providerService.pushFlowMetrics(DID, Collections.singletonList(fe3));
validateEvents(RULE_ADD_REQUESTED, RULE_ADDED);
providerService.flowRemoved(fe3);
validateEvents();
+
}
@Test
@@ -281,7 +299,6 @@
FlowEntry fe1 = new DefaultFlowEntry(f1);
FlowEntry fe2 = new DefaultFlowEntry(f2);
-
//FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED);
//FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED);
@@ -388,7 +405,7 @@
FlowRuleBatchEntry.FlowRuleOperation.ADD, f2);
FlowRuleBatchOperation fbo = new FlowRuleBatchOperation(
- Lists.newArrayList(fbe1, fbe2));
+ Lists.newArrayList(fbe1, fbe2), null, 0);
Future<CompletedBatchOperation> future = mgr.applyBatch(fbo);
assertTrue("Entries in wrong state",
validateState(ImmutableMap.of(
@@ -406,53 +423,6 @@
}
- @Test
- public void cancelBatch() {
- FlowRule f1 = flowRule(1, 1);
- FlowRule f2 = flowRule(2, 2);
-
-
- mgr.applyFlowRules(f1);
-
- assertTrue("Entries in wrong state",
- validateState(ImmutableMap.of(
- f1, FlowEntryState.PENDING_ADD)));
-
- FlowEntry fe1 = new DefaultFlowEntry(f1);
- providerService.pushFlowMetrics(DID, Collections.<FlowEntry>singletonList(fe1));
-
- assertTrue("Entries in wrong state",
- validateState(ImmutableMap.of(
- f1, FlowEntryState.ADDED)));
-
-
- FlowRuleBatchEntry fbe1 = new FlowRuleBatchEntry(
- FlowRuleBatchEntry.FlowRuleOperation.REMOVE, f1);
-
- FlowRuleBatchEntry fbe2 = new FlowRuleBatchEntry(
- FlowRuleBatchEntry.FlowRuleOperation.ADD, f2);
-
- FlowRuleBatchOperation fbo = new FlowRuleBatchOperation(
- Lists.newArrayList(fbe1, fbe2));
- Future<CompletedBatchOperation> future = mgr.applyBatch(fbo);
-
- future.cancel(true);
-
- assertTrue(flowCount() == 2);
-
- /*
- * Rule f1 should be re-added to the list and therefore be in a pending add
- * state.
- */
- assertTrue("Entries in wrong state",
- validateState(ImmutableMap.of(
- f2, FlowEntryState.PENDING_REMOVE,
- f1, FlowEntryState.PENDING_ADD)));
-
-
- }
-
-
private static class TestListener implements FlowRuleListener {
final List<FlowRuleEvent> events = new ArrayList<>();
@@ -528,9 +498,8 @@
}
@Override
- public ListenableFuture<CompletedBatchOperation> executeBatch(
- BatchOperation<FlowRuleBatchEntry> batch) {
- return new TestInstallationFuture();
+ public void executeBatch(FlowRuleBatchOperation batch) {
+ // TODO: need to call batchOperationComplete
}
private class TestInstallationFuture
@@ -554,14 +523,14 @@
@Override
public CompletedBatchOperation get()
throws InterruptedException, ExecutionException {
- return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
+ return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet(), null);
}
@Override
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException,
ExecutionException, TimeoutException {
- return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
+ return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet(), null);
}
@Override
@@ -644,4 +613,37 @@
}
}
+ private class TestCoreService implements CoreService {
+ @Override
+ public Version version() {
+ return null;
+ }
+
+ @Override
+ public Set<ApplicationId> getAppIds() {
+ return null;
+ }
+
+ @Override
+ public ApplicationId getAppId(Short id) {
+ return null;
+ }
+
+ @Override
+ public ApplicationId registerApplication(String identifier) {
+ return null;
+ }
+
+ @Override
+ public IdGenerator getIdGenerator(String topic) {
+ return new IdGenerator() {
+ private AtomicLong counter = new AtomicLong(0);
+ @Override
+ public long getNewId() {
+ return counter.getAndIncrement();
+ }
+ };
+ }
+ }
+
}
diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java b/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
index 17a9498..3570c77 100644
--- a/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
@@ -201,7 +201,7 @@
FlowRule fr = new IntentTestsMocks.MockFlowRule(intent.number().intValue());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, fr));
- return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
}
@Override
@@ -209,7 +209,7 @@
FlowRule fr = new IntentTestsMocks.MockFlowRule(intent.number().intValue());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, fr));
- return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
}
@Override
@@ -219,7 +219,7 @@
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, fr));
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, fr2));
- return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
}
}
diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java b/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java
index e956575..72de4f0 100644
--- a/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java
+++ b/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java
@@ -27,6 +27,7 @@
import org.onosproject.net.flow.FlowRuleBatchEntry;
import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleService;
import com.google.common.collect.ImmutableSet;
@@ -45,11 +46,11 @@
public void setFuture(boolean success, long intentId) {
if (success) {
- future = Futures.immediateFuture(new CompletedBatchOperation(true, Collections.emptySet()));
+ future = Futures.immediateFuture(new CompletedBatchOperation(true, Collections.emptySet(), null));
} else {
final Set<Long> failedIds = ImmutableSet.of(intentId);
future = Futures.immediateFuture(
- new CompletedBatchOperation(false, flows, failedIds));
+ new CompletedBatchOperation(false, flows, failedIds, null));
}
}
@@ -74,6 +75,11 @@
}
@Override
+ public void apply(FlowRuleOperations ops) {
+
+ }
+
+ @Override
public int getFlowRuleCount() {
return flows.size();
}