Implementation of new Flow Subsystem:
The subsystem no longer returns futures for tracking completion of work.
Notifications are explicitely returned via a call back mechanism. Also, the
subsystem is now asynchronous.
Change-Id: I1a4cef931c24820f9ae9ed9a5398f163f05dfbc9
more flowservice improvements
Change-Id: I5c9c1b6be4b2ebfa523b64f6f52e7634b7d3e05f
more flowservice impl
Change-Id: I05f6774460effb53ced8c36844bcda2f8f6c096f
Manager to store functional (at least i believe it)
Change-Id: I09b04989bd1004c98fe0bafed4c76714b9155d53
flow subsystem functional: need to fix unit tests
Change-Id: I1667f25b91320f625a03e5e1d5e92823184d9de0
flow subsystem functional
Change-Id: I429b3335c16d4fc16f5d55f233dd37c4d1d6111d
finished refactor of flow subsystem
Change-Id: I1899abc6ff6a974a2018d936cc555049c70a6804
fix for null flow provider to use new api
Change-Id: If2fd9bd5baf74d9c61c5c8085cef8bc2d204cbdc
diff --git a/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java
index a17f7a3..fce7ee9 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java
@@ -21,6 +21,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
+import org.onosproject.net.DeviceId;
/**
* Representation of a completed flow rule batch operation.
@@ -30,19 +31,22 @@
private final boolean success;
private final Set<FlowRule> failures;
private final Set<Long> failedIds;
+ private final DeviceId deviceId;
/**
* Creates a new batch completion result.
*
- * @param success indicates whether the completion is successful.
+ * @param success indicates whether the completion is successful
* @param failures set of any failures encountered
* @param failedIds (optional) set of failed operation ids
+ * @param deviceId the device this operation completed for
*/
public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures,
- Set<Long> failedIds) {
+ Set<Long> failedIds, DeviceId deviceId) {
this.success = success;
this.failures = ImmutableSet.copyOf(failures);
this.failedIds = ImmutableSet.copyOf(failedIds);
+ this.deviceId = deviceId;
}
/**
@@ -51,10 +55,12 @@
* @param success indicates whether the completion is successful.
* @param failures set of any failures encountered
*/
- public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures) {
+ public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures,
+ DeviceId deviceId) {
this.success = success;
this.failures = ImmutableSet.copyOf(failures);
this.failedIds = Collections.emptySet();
+ this.deviceId = deviceId;
}
@@ -73,12 +79,17 @@
return failedIds;
}
+ public DeviceId deviceId() {
+ return this.deviceId;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("success?", success)
.add("failedItems", failures)
.add("failedIds", failedIds)
+ .add("deviceId", deviceId)
.toString();
}
}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java
index ab72d8b..9bcace2 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java
@@ -16,12 +16,14 @@
package org.onosproject.net.flow;
import org.onosproject.event.AbstractEvent;
+import org.onosproject.net.DeviceId;
/**
* Describes flow rule batch event.
*/
public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.Type, FlowRuleBatchRequest> {
+
/**
* Type of flow rule events.
*/
@@ -42,14 +44,17 @@
}
private final CompletedBatchOperation result;
+ private final DeviceId deviceId;
/**
* Constructs a new FlowRuleBatchEvent.
- * @param request batch operation request.
+ *
+ * @param request batch operation request
+ * @param deviceId the device this batch will be processed on
* @return event.
*/
- public static FlowRuleBatchEvent requested(FlowRuleBatchRequest request) {
- FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, null);
+ public static FlowRuleBatchEvent requested(FlowRuleBatchRequest request, DeviceId deviceId) {
+ FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, deviceId);
return event;
}
@@ -73,13 +78,36 @@
}
/**
+ * Returns the deviceId for this batch.
+ * @return device id
+ */
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ /**
* Creates an event of a given type and for the specified flow rule batch.
*
* @param type flow rule batch event type
- * @param batch event flow rule batch subject
+ * @param request event flow rule batch subject
+ * @param result the result of the batch operation
*/
private FlowRuleBatchEvent(Type type, FlowRuleBatchRequest request, CompletedBatchOperation result) {
super(type, request);
this.result = result;
+ this.deviceId = result.deviceId();
+ }
+
+ /**
+ * Creates an event of a given type and for the specified flow rule batch.
+ *
+ * @param type flow rule batch event type
+ * @param request event flow rule batch subject
+ * @param deviceId the device id for this batch
+ */
+ private FlowRuleBatchEvent(Type type, FlowRuleBatchRequest request, DeviceId deviceId) {
+ super(type, request);
+ this.result = null;
+ this.deviceId = deviceId;
}
}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java
index f9bc632..e13acd7 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java
@@ -15,12 +15,37 @@
*/
package org.onosproject.net.flow;
+import org.onosproject.net.DeviceId;
+
import java.util.Collection;
+/**
+ * Class used with the flow subsystem to process per device
+ * batches.
+ */
public class FlowRuleBatchOperation
extends BatchOperation<FlowRuleBatchEntry> {
- public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations) {
+ /**
+ * This id is used to cary to id of the original
+ * FlowOperations and track where this batch operation
+ * came from. The id is unique cluster wide.
+ */
+ private final long id;
+ private final DeviceId deviceId;
+
+ public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations,
+ DeviceId deviceId, long flowOperationId) {
super(operations);
+ this.id = flowOperationId;
+ this.deviceId = deviceId;
+ }
+
+ public DeviceId deviceId() {
+ return this.deviceId;
+ }
+
+ public long id() {
+ return id;
}
}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
index 35db171..5570a40 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
@@ -15,59 +15,43 @@
*/
package org.onosproject.net.flow;
+import com.google.common.collect.Lists;
+import org.onosproject.net.DeviceId;
+
import java.util.Collections;
import java.util.List;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-
-
-
-import com.google.common.collect.Lists;
+import java.util.Set;
public class FlowRuleBatchRequest {
- private final int batchId;
- private final List<FlowRuleBatchEntry> toAdd;
- private final List<FlowRuleBatchEntry> toRemove;
+ /**
+ * This id is used to cary to id of the original
+ * FlowOperations and track where this batch operation
+ * came from. The id is unique cluster wide.
+ */
+ private final long batchId;
- public FlowRuleBatchRequest(int batchId, List<FlowRuleBatchEntry> toAdd,
- List<FlowRuleBatchEntry> toRemove) {
+ private final Set<FlowRuleBatchEntry> ops;
+
+
+ public FlowRuleBatchRequest(long batchId, Set<FlowRuleBatchEntry> ops) {
this.batchId = batchId;
- this.toAdd = Collections.unmodifiableList(toAdd);
- this.toRemove = Collections.unmodifiableList(toRemove);
+ this.ops = Collections.unmodifiableSet(ops);
+
+
}
- public List<FlowRule> toAdd() {
- return FluentIterable.from(toAdd).transform(
- new Function<FlowRuleBatchEntry, FlowRule>() {
-
- @Override
- public FlowRule apply(FlowRuleBatchEntry input) {
- return input.target();
- }
- }).toList();
+ public Set<FlowRuleBatchEntry> ops() {
+ return ops;
}
- public List<FlowRule> toRemove() {
- return FluentIterable.from(toRemove).transform(
- new Function<FlowRuleBatchEntry, FlowRule>() {
-
- @Override
- public FlowRule apply(FlowRuleBatchEntry input) {
- return input.target();
- }
- }).toList();
- }
-
- public FlowRuleBatchOperation asBatchOperation() {
+ public FlowRuleBatchOperation asBatchOperation(DeviceId deviceId) {
List<FlowRuleBatchEntry> entries = Lists.newArrayList();
- entries.addAll(toAdd);
- entries.addAll(toRemove);
- return new FlowRuleBatchOperation(entries);
+ entries.addAll(ops);
+ return new FlowRuleBatchOperation(entries, deviceId, batchId);
}
- public int batchId() {
+ public long batchId() {
return batchId;
}
}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperation.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperation.java
new file mode 100644
index 0000000..82d43be
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperation.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flow;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Representation of an operation on a flow rule table.
+ */
+public class FlowRuleOperation {
+
+ /**
+ * Type of flow table operations.
+ */
+ public enum Type {
+ ADD,
+ MODIFY,
+ REMOVE
+ }
+
+ private final FlowRule rule;
+ private final Type type;
+
+ public FlowRuleOperation(FlowRule rule, Type type) {
+ this.rule = rule;
+ this.type = type;
+ }
+
+ /**
+ * Returns the type of operation.
+ *
+ * @return type
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the flow rule.
+ *
+ * @return flow rule
+ */
+ public FlowRule rule() {
+ return rule;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("rule", rule)
+ .add("type", type)
+ .toString();
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java
new file mode 100644
index 0000000..498cc05
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flow;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.onosproject.net.flow.FlowRuleOperation.Type.*;
+
+/**
+ * A batch of flow rule operations that are broken into stages.
+ * TODO move this up to parent's package
+ */
+public class FlowRuleOperations {
+
+ private final List<Set<FlowRuleOperation>> stages;
+ private final FlowRuleOperationsContext callback; // TODO consider Optional
+
+ private FlowRuleOperations(List<Set<FlowRuleOperation>> stages,
+ FlowRuleOperationsContext cb) {
+ this.stages = stages;
+ this.callback = cb;
+ }
+
+ // kryo-constructor
+ protected FlowRuleOperations() {
+ this.stages = Lists.newArrayList();
+ this.callback = null;
+ }
+
+ /**
+ * Returns the flow rule operations as sets of stages that should be
+ * executed sequentially.
+ *
+ * @return flow rule stages
+ */
+ public List<Set<FlowRuleOperation>> stages() {
+ return stages;
+ }
+
+ /**
+ * Returns the callback for this batch of operations.
+ *
+ * @return callback
+ */
+ public FlowRuleOperationsContext callback() {
+ return callback;
+ }
+
+ /**
+ * Returns a new builder.
+ *
+ * @return new builder
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("stages", stages)
+ .toString();
+ }
+
+ /**
+ * A builder for constructing flow rule operations.
+ */
+ public static final class Builder {
+
+ private final ImmutableList.Builder<Set<FlowRuleOperation>> listBuilder = ImmutableList.builder();
+ private ImmutableSet.Builder<FlowRuleOperation> currentStage = ImmutableSet.builder();
+
+ // prevent use of the default constructor outside of this file; use the above method
+ private Builder() {}
+
+ /**
+ * Appends a flow rule add to the current stage.
+ *
+ * @param flowRule flow rule
+ * @return this
+ */
+ public Builder add(FlowRule flowRule) {
+ currentStage.add(new FlowRuleOperation(flowRule, ADD));
+ return this;
+ }
+
+ /**
+ * Appends a flow rule modify to the current stage.
+ *
+ * @param flowRule flow rule
+ * @return this
+ */
+ public Builder modify(FlowRule flowRule) {
+ currentStage.add(new FlowRuleOperation(flowRule, MODIFY));
+ return this;
+ }
+
+ /**
+ * Appends a flow rule remove to the current stage.
+ *
+ * @param flowRule flow rule
+ * @return this
+ */
+ // FIXME this is confusing, consider renaming
+ public Builder remove(FlowRule flowRule) {
+ currentStage.add(new FlowRuleOperation(flowRule, REMOVE));
+ return this;
+ }
+
+ /**
+ * Closes the current stage.
+ */
+ private void closeStage() {
+ ImmutableSet<FlowRuleOperation> stage = currentStage.build();
+ if (!stage.isEmpty()) {
+ listBuilder.add(stage);
+ }
+ }
+
+ /**
+ * Closes the current stage and starts a new one.
+ *
+ * @return this
+ */
+ public Builder newStage() {
+ closeStage();
+ currentStage = ImmutableSet.builder();
+ return this;
+ }
+
+ /**
+ * Builds the immutable flow rule operations.
+ *
+ * @return flow rule operations
+ */
+ public FlowRuleOperations build() {
+ return build(null);
+ }
+
+ /**
+ * Builds the immutable flow rule operations.
+ *
+ * @param cb the callback to call when this operation completes
+ * @return flow rule operations
+ */
+ public FlowRuleOperations build(FlowRuleOperationsContext cb) {
+ closeStage();
+ return new FlowRuleOperations(listBuilder.build(), cb);
+ }
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperationsContext.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperationsContext.java
new file mode 100644
index 0000000..c405b12
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperationsContext.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flow;
+
+/**
+ * The context of a flow rule operations that will become the subject of
+ * the notification.
+ *
+ * Implementations of this class must be serializable.
+ */
+public interface FlowRuleOperationsContext {
+ // TODO we might also want to execute a method on behalf of the app
+ default void onSuccess(FlowRuleOperations ops){}
+ default void onError(FlowRuleOperations ops){}
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
index aae6902..066ac68 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
@@ -18,8 +18,6 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.net.provider.Provider;
-import java.util.concurrent.Future;
-
/**
* Abstraction of a flow rule provider.
*/
@@ -56,8 +54,7 @@
* Installs a batch of flow rules. Each flowrule is associated to an
* operation which results in either addition, removal or modification.
* @param batch a batch of flow rules
- * @return a future indicating the status of this execution
*/
- Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
+ void executeBatch(FlowRuleBatchOperation batch);
}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
index 9456719..4c9ebcb 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
@@ -40,4 +40,13 @@
*/
void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries);
+ /**
+ * Indicates to the core that the requested batch operation has
+ * been completed.
+ *
+ * @param batchId the batch which was processed
+ * @param operation the resulting outcome of the operation
+ */
+ void batchOperationCompleted(long batchId, CompletedBatchOperation operation);
+
}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
index 78772c2..957ecf2 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
@@ -15,11 +15,11 @@
*/
package org.onosproject.net.flow;
-import java.util.concurrent.Future;
-
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
+import java.util.concurrent.Future;
+
/**
* Service for injecting flow rules into the environment and for obtaining
* information about flow rules already in the environment. This implements
@@ -30,6 +30,11 @@
public interface FlowRuleService {
/**
+ * The topic used for obtaining globally unique ids.
+ */
+ static String FLOW_OP_TOPIC = "flow-ops-ids";
+
+ /**
* Returns the number of flow rules in the system.
*
* @return flow rule count
@@ -96,11 +101,20 @@
* Applies a batch operation of FlowRules.
*
* @param batch batch operation to apply
- * @return future indicating the state of the batch operation
+ * @return future indicating the state of the batch operation, due to the
+ * deprecation of this api the future will immediately return
*/
+ @Deprecated
Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch);
/**
+ * Applies a batch operation of FlowRules.
+ *
+ * @param ops batch operation to apply
+ */
+ void apply(FlowRuleOperations ops);
+
+ /**
* Adds the specified flow rule listener.
*
* @param listener flow rule listener
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
index 6fca2c3..d3aebba 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
@@ -15,8 +15,6 @@
*/
package org.onosproject.net.flow;
-import java.util.concurrent.Future;
-
import org.onosproject.net.DeviceId;
import org.onosproject.store.Store;
@@ -54,6 +52,7 @@
*
* @param rule the flow rule to add
*/
+ @Deprecated
void storeFlowRule(FlowRule rule);
/**
@@ -61,10 +60,9 @@
*
* @param batchOperation batch of flow rules.
* A batch can contain flow rules for a single device only.
- * @return Future response indicating success/failure of the batch operation
- * all the way down to the device.
+ *
*/
- Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation batchOperation);
+ void storeBatch(FlowRuleBatchOperation batchOperation);
/**
* Invoked on the completion of a storeBatch operation.