Implementation of new Flow Subsystem:
The subsystem no longer returns futures for tracking completion of work.
Notifications are explicitely returned via a call back mechanism. Also, the
subsystem is now asynchronous.
Change-Id: I1a4cef931c24820f9ae9ed9a5398f163f05dfbc9
more flowservice improvements
Change-Id: I5c9c1b6be4b2ebfa523b64f6f52e7634b7d3e05f
more flowservice impl
Change-Id: I05f6774460effb53ced8c36844bcda2f8f6c096f
Manager to store functional (at least i believe it)
Change-Id: I09b04989bd1004c98fe0bafed4c76714b9155d53
flow subsystem functional: need to fix unit tests
Change-Id: I1667f25b91320f625a03e5e1d5e92823184d9de0
flow subsystem functional
Change-Id: I429b3335c16d4fc16f5d55f233dd37c4d1d6111d
finished refactor of flow subsystem
Change-Id: I1899abc6ff6a974a2018d936cc555049c70a6804
fix for null flow provider to use new api
Change-Id: If2fd9bd5baf74d9c61c5c8085cef8bc2d204cbdc
diff --git a/cli/src/main/java/org/onosproject/cli/net/AddFlowsCommand.java b/cli/src/main/java/org/onosproject/cli/net/AddFlowsCommand.java
index a5fc611..9c4700c 100644
--- a/cli/src/main/java/org/onosproject/cli/net/AddFlowsCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/AddFlowsCommand.java
@@ -19,29 +19,30 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import org.apache.commons.lang.math.RandomUtils;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
+import org.onlab.packet.MacAddress;
import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
import org.onosproject.net.Device;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
-import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
-import org.onlab.packet.MacAddress;
import java.util.ArrayList;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* Installs many many flows.
@@ -50,6 +51,8 @@
description = "Installs a number of test flow rules - for testing only")
public class AddFlowsCommand extends AbstractShellCommand {
+ private CountDownLatch latch;
+
@Argument(index = 0, name = "flowPerDevice", description = "Number of flows to add per device",
required = true, multiValued = false)
String flows = null;
@@ -63,6 +66,9 @@
FlowRuleService flowService = get(FlowRuleService.class);
DeviceService deviceService = get(DeviceService.class);
+ CoreService coreService = get(CoreService.class);
+
+ ApplicationId appId = coreService.registerApplication("onos.test.flow.installer");
int flowsPerDevice = Integer.parseInt(flows);
int num = Integer.parseInt(numOfRuns);
@@ -70,49 +76,73 @@
ArrayList<Long> results = Lists.newArrayList();
Iterable<Device> devices = deviceService.getDevices();
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
- .setOutput(PortNumber.portNumber(1)).build();
+ .setOutput(PortNumber.portNumber(RandomUtils.nextInt())).build();
TrafficSelector.Builder sbuilder;
- Set<FlowRuleBatchEntry> rules = Sets.newHashSet();
- Set<FlowRuleBatchEntry> remove = Sets.newHashSet();
+ FlowRuleOperations.Builder rules = FlowRuleOperations.builder();
+ FlowRuleOperations.Builder remove = FlowRuleOperations.builder();
+
for (Device d : devices) {
for (int i = 0; i < flowsPerDevice; i++) {
sbuilder = DefaultTrafficSelector.builder();
- sbuilder.matchEthSrc(MacAddress.valueOf(i))
- .matchEthDst(MacAddress.valueOf(Integer.MAX_VALUE - i));
- rules.add(new FlowRuleBatchEntry(FlowRuleBatchEntry.FlowRuleOperation.ADD,
- new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
- 100, (long) 0, 10, false)));
- remove.add(new FlowRuleBatchEntry(FlowRuleBatchEntry.FlowRuleOperation.REMOVE,
- new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
- 100, (long) 0, 10, false)));
+
+ sbuilder.matchEthSrc(MacAddress.valueOf(RandomUtils.nextInt() * i))
+ .matchEthDst(MacAddress.valueOf((Integer.MAX_VALUE - i) * RandomUtils.nextInt()));
+
+
+ int randomPriority = RandomUtils.nextInt();
+ rules.add(new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
+ randomPriority, appId, 10, false));
+ remove.remove(new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
+ randomPriority, appId, 10, false));
}
}
- boolean isSuccess = true;
+
for (int i = 0; i < num; i++) {
- long startTime = System.currentTimeMillis();
- Future<CompletedBatchOperation> op = flowService.applyBatch(
- new FlowRuleBatchOperation(rules));
+
+ latch = new CountDownLatch(2);
+ flowService.apply(rules.build(new FlowRuleOperationsContext() {
+
+ private final Stopwatch timer = Stopwatch.createStarted();
+
+ @Override
+ public void onSuccess(FlowRuleOperations ops) {
+
+ timer.stop();
+ results.add(timer.elapsed(TimeUnit.MILLISECONDS));
+ if (results.size() == num) {
+ if (outputJson()) {
+ print("%s", json(new ObjectMapper(), true, results));
+ } else {
+ printTime(true, results);
+ }
+ }
+ latch.countDown();
+ }
+ }));
+
+
+ flowService.apply(remove.build(new FlowRuleOperationsContext() {
+ @Override
+ public void onSuccess(FlowRuleOperations ops) {
+ latch.countDown();
+ }
+ }));
try {
- isSuccess &= op.get().isSuccess();
- } catch (InterruptedException | ExecutionException e) {
+ latch.await();
+ } catch (InterruptedException e) {
e.printStackTrace();
}
- long endTime = System.currentTimeMillis();
- results.add(endTime - startTime);
- flowService.applyBatch(
- new FlowRuleBatchOperation(remove));
+
}
- if (outputJson()) {
- print("%s", json(new ObjectMapper(), isSuccess, results));
- } else {
- printTime(isSuccess, results);
- }
+
}
+
+
private Object json(ObjectMapper mapper, boolean isSuccess, ArrayList<Long> elapsed) {
ObjectNode result = mapper.createObjectNode();
result.put("Success", isSuccess);
diff --git a/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java
index a17f7a3..fce7ee9 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java
@@ -21,6 +21,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
+import org.onosproject.net.DeviceId;
/**
* Representation of a completed flow rule batch operation.
@@ -30,19 +31,22 @@
private final boolean success;
private final Set<FlowRule> failures;
private final Set<Long> failedIds;
+ private final DeviceId deviceId;
/**
* Creates a new batch completion result.
*
- * @param success indicates whether the completion is successful.
+ * @param success indicates whether the completion is successful
* @param failures set of any failures encountered
* @param failedIds (optional) set of failed operation ids
+ * @param deviceId the device this operation completed for
*/
public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures,
- Set<Long> failedIds) {
+ Set<Long> failedIds, DeviceId deviceId) {
this.success = success;
this.failures = ImmutableSet.copyOf(failures);
this.failedIds = ImmutableSet.copyOf(failedIds);
+ this.deviceId = deviceId;
}
/**
@@ -51,10 +55,12 @@
* @param success indicates whether the completion is successful.
* @param failures set of any failures encountered
*/
- public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures) {
+ public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures,
+ DeviceId deviceId) {
this.success = success;
this.failures = ImmutableSet.copyOf(failures);
this.failedIds = Collections.emptySet();
+ this.deviceId = deviceId;
}
@@ -73,12 +79,17 @@
return failedIds;
}
+ public DeviceId deviceId() {
+ return this.deviceId;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("success?", success)
.add("failedItems", failures)
.add("failedIds", failedIds)
+ .add("deviceId", deviceId)
.toString();
}
}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java
index ab72d8b..9bcace2 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java
@@ -16,12 +16,14 @@
package org.onosproject.net.flow;
import org.onosproject.event.AbstractEvent;
+import org.onosproject.net.DeviceId;
/**
* Describes flow rule batch event.
*/
public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.Type, FlowRuleBatchRequest> {
+
/**
* Type of flow rule events.
*/
@@ -42,14 +44,17 @@
}
private final CompletedBatchOperation result;
+ private final DeviceId deviceId;
/**
* Constructs a new FlowRuleBatchEvent.
- * @param request batch operation request.
+ *
+ * @param request batch operation request
+ * @param deviceId the device this batch will be processed on
* @return event.
*/
- public static FlowRuleBatchEvent requested(FlowRuleBatchRequest request) {
- FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, null);
+ public static FlowRuleBatchEvent requested(FlowRuleBatchRequest request, DeviceId deviceId) {
+ FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, deviceId);
return event;
}
@@ -73,13 +78,36 @@
}
/**
+ * Returns the deviceId for this batch.
+ * @return device id
+ */
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ /**
* Creates an event of a given type and for the specified flow rule batch.
*
* @param type flow rule batch event type
- * @param batch event flow rule batch subject
+ * @param request event flow rule batch subject
+ * @param result the result of the batch operation
*/
private FlowRuleBatchEvent(Type type, FlowRuleBatchRequest request, CompletedBatchOperation result) {
super(type, request);
this.result = result;
+ this.deviceId = result.deviceId();
+ }
+
+ /**
+ * Creates an event of a given type and for the specified flow rule batch.
+ *
+ * @param type flow rule batch event type
+ * @param request event flow rule batch subject
+ * @param deviceId the device id for this batch
+ */
+ private FlowRuleBatchEvent(Type type, FlowRuleBatchRequest request, DeviceId deviceId) {
+ super(type, request);
+ this.result = null;
+ this.deviceId = deviceId;
}
}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java
index f9bc632..e13acd7 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java
@@ -15,12 +15,37 @@
*/
package org.onosproject.net.flow;
+import org.onosproject.net.DeviceId;
+
import java.util.Collection;
+/**
+ * Class used with the flow subsystem to process per device
+ * batches.
+ */
public class FlowRuleBatchOperation
extends BatchOperation<FlowRuleBatchEntry> {
- public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations) {
+ /**
+ * This id is used to cary to id of the original
+ * FlowOperations and track where this batch operation
+ * came from. The id is unique cluster wide.
+ */
+ private final long id;
+ private final DeviceId deviceId;
+
+ public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations,
+ DeviceId deviceId, long flowOperationId) {
super(operations);
+ this.id = flowOperationId;
+ this.deviceId = deviceId;
+ }
+
+ public DeviceId deviceId() {
+ return this.deviceId;
+ }
+
+ public long id() {
+ return id;
}
}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
index 35db171..5570a40 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
@@ -15,59 +15,43 @@
*/
package org.onosproject.net.flow;
+import com.google.common.collect.Lists;
+import org.onosproject.net.DeviceId;
+
import java.util.Collections;
import java.util.List;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-
-
-
-import com.google.common.collect.Lists;
+import java.util.Set;
public class FlowRuleBatchRequest {
- private final int batchId;
- private final List<FlowRuleBatchEntry> toAdd;
- private final List<FlowRuleBatchEntry> toRemove;
+ /**
+ * This id is used to cary to id of the original
+ * FlowOperations and track where this batch operation
+ * came from. The id is unique cluster wide.
+ */
+ private final long batchId;
- public FlowRuleBatchRequest(int batchId, List<FlowRuleBatchEntry> toAdd,
- List<FlowRuleBatchEntry> toRemove) {
+ private final Set<FlowRuleBatchEntry> ops;
+
+
+ public FlowRuleBatchRequest(long batchId, Set<FlowRuleBatchEntry> ops) {
this.batchId = batchId;
- this.toAdd = Collections.unmodifiableList(toAdd);
- this.toRemove = Collections.unmodifiableList(toRemove);
+ this.ops = Collections.unmodifiableSet(ops);
+
+
}
- public List<FlowRule> toAdd() {
- return FluentIterable.from(toAdd).transform(
- new Function<FlowRuleBatchEntry, FlowRule>() {
-
- @Override
- public FlowRule apply(FlowRuleBatchEntry input) {
- return input.target();
- }
- }).toList();
+ public Set<FlowRuleBatchEntry> ops() {
+ return ops;
}
- public List<FlowRule> toRemove() {
- return FluentIterable.from(toRemove).transform(
- new Function<FlowRuleBatchEntry, FlowRule>() {
-
- @Override
- public FlowRule apply(FlowRuleBatchEntry input) {
- return input.target();
- }
- }).toList();
- }
-
- public FlowRuleBatchOperation asBatchOperation() {
+ public FlowRuleBatchOperation asBatchOperation(DeviceId deviceId) {
List<FlowRuleBatchEntry> entries = Lists.newArrayList();
- entries.addAll(toAdd);
- entries.addAll(toRemove);
- return new FlowRuleBatchOperation(entries);
+ entries.addAll(ops);
+ return new FlowRuleBatchOperation(entries, deviceId, batchId);
}
- public int batchId() {
+ public long batchId() {
return batchId;
}
}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperation.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperation.java
new file mode 100644
index 0000000..82d43be
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperation.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flow;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Representation of an operation on a flow rule table.
+ */
+public class FlowRuleOperation {
+
+ /**
+ * Type of flow table operations.
+ */
+ public enum Type {
+ ADD,
+ MODIFY,
+ REMOVE
+ }
+
+ private final FlowRule rule;
+ private final Type type;
+
+ public FlowRuleOperation(FlowRule rule, Type type) {
+ this.rule = rule;
+ this.type = type;
+ }
+
+ /**
+ * Returns the type of operation.
+ *
+ * @return type
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the flow rule.
+ *
+ * @return flow rule
+ */
+ public FlowRule rule() {
+ return rule;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("rule", rule)
+ .add("type", type)
+ .toString();
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java
new file mode 100644
index 0000000..498cc05
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flow;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.onosproject.net.flow.FlowRuleOperation.Type.*;
+
+/**
+ * A batch of flow rule operations that are broken into stages.
+ * TODO move this up to parent's package
+ */
+public class FlowRuleOperations {
+
+ private final List<Set<FlowRuleOperation>> stages;
+ private final FlowRuleOperationsContext callback; // TODO consider Optional
+
+ private FlowRuleOperations(List<Set<FlowRuleOperation>> stages,
+ FlowRuleOperationsContext cb) {
+ this.stages = stages;
+ this.callback = cb;
+ }
+
+ // kryo-constructor
+ protected FlowRuleOperations() {
+ this.stages = Lists.newArrayList();
+ this.callback = null;
+ }
+
+ /**
+ * Returns the flow rule operations as sets of stages that should be
+ * executed sequentially.
+ *
+ * @return flow rule stages
+ */
+ public List<Set<FlowRuleOperation>> stages() {
+ return stages;
+ }
+
+ /**
+ * Returns the callback for this batch of operations.
+ *
+ * @return callback
+ */
+ public FlowRuleOperationsContext callback() {
+ return callback;
+ }
+
+ /**
+ * Returns a new builder.
+ *
+ * @return new builder
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("stages", stages)
+ .toString();
+ }
+
+ /**
+ * A builder for constructing flow rule operations.
+ */
+ public static final class Builder {
+
+ private final ImmutableList.Builder<Set<FlowRuleOperation>> listBuilder = ImmutableList.builder();
+ private ImmutableSet.Builder<FlowRuleOperation> currentStage = ImmutableSet.builder();
+
+ // prevent use of the default constructor outside of this file; use the above method
+ private Builder() {}
+
+ /**
+ * Appends a flow rule add to the current stage.
+ *
+ * @param flowRule flow rule
+ * @return this
+ */
+ public Builder add(FlowRule flowRule) {
+ currentStage.add(new FlowRuleOperation(flowRule, ADD));
+ return this;
+ }
+
+ /**
+ * Appends a flow rule modify to the current stage.
+ *
+ * @param flowRule flow rule
+ * @return this
+ */
+ public Builder modify(FlowRule flowRule) {
+ currentStage.add(new FlowRuleOperation(flowRule, MODIFY));
+ return this;
+ }
+
+ /**
+ * Appends a flow rule remove to the current stage.
+ *
+ * @param flowRule flow rule
+ * @return this
+ */
+ // FIXME this is confusing, consider renaming
+ public Builder remove(FlowRule flowRule) {
+ currentStage.add(new FlowRuleOperation(flowRule, REMOVE));
+ return this;
+ }
+
+ /**
+ * Closes the current stage.
+ */
+ private void closeStage() {
+ ImmutableSet<FlowRuleOperation> stage = currentStage.build();
+ if (!stage.isEmpty()) {
+ listBuilder.add(stage);
+ }
+ }
+
+ /**
+ * Closes the current stage and starts a new one.
+ *
+ * @return this
+ */
+ public Builder newStage() {
+ closeStage();
+ currentStage = ImmutableSet.builder();
+ return this;
+ }
+
+ /**
+ * Builds the immutable flow rule operations.
+ *
+ * @return flow rule operations
+ */
+ public FlowRuleOperations build() {
+ return build(null);
+ }
+
+ /**
+ * Builds the immutable flow rule operations.
+ *
+ * @param cb the callback to call when this operation completes
+ * @return flow rule operations
+ */
+ public FlowRuleOperations build(FlowRuleOperationsContext cb) {
+ closeStage();
+ return new FlowRuleOperations(listBuilder.build(), cb);
+ }
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperationsContext.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperationsContext.java
new file mode 100644
index 0000000..c405b12
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperationsContext.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flow;
+
+/**
+ * The context of a flow rule operations that will become the subject of
+ * the notification.
+ *
+ * Implementations of this class must be serializable.
+ */
+public interface FlowRuleOperationsContext {
+ // TODO we might also want to execute a method on behalf of the app
+ default void onSuccess(FlowRuleOperations ops){}
+ default void onError(FlowRuleOperations ops){}
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
index aae6902..066ac68 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
@@ -18,8 +18,6 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.net.provider.Provider;
-import java.util.concurrent.Future;
-
/**
* Abstraction of a flow rule provider.
*/
@@ -56,8 +54,7 @@
* Installs a batch of flow rules. Each flowrule is associated to an
* operation which results in either addition, removal or modification.
* @param batch a batch of flow rules
- * @return a future indicating the status of this execution
*/
- Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
+ void executeBatch(FlowRuleBatchOperation batch);
}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
index 9456719..4c9ebcb 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
@@ -40,4 +40,13 @@
*/
void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries);
+ /**
+ * Indicates to the core that the requested batch operation has
+ * been completed.
+ *
+ * @param batchId the batch which was processed
+ * @param operation the resulting outcome of the operation
+ */
+ void batchOperationCompleted(long batchId, CompletedBatchOperation operation);
+
}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
index 78772c2..957ecf2 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
@@ -15,11 +15,11 @@
*/
package org.onosproject.net.flow;
-import java.util.concurrent.Future;
-
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
+import java.util.concurrent.Future;
+
/**
* Service for injecting flow rules into the environment and for obtaining
* information about flow rules already in the environment. This implements
@@ -30,6 +30,11 @@
public interface FlowRuleService {
/**
+ * The topic used for obtaining globally unique ids.
+ */
+ static String FLOW_OP_TOPIC = "flow-ops-ids";
+
+ /**
* Returns the number of flow rules in the system.
*
* @return flow rule count
@@ -96,11 +101,20 @@
* Applies a batch operation of FlowRules.
*
* @param batch batch operation to apply
- * @return future indicating the state of the batch operation
+ * @return future indicating the state of the batch operation, due to the
+ * deprecation of this api the future will immediately return
*/
+ @Deprecated
Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch);
/**
+ * Applies a batch operation of FlowRules.
+ *
+ * @param ops batch operation to apply
+ */
+ void apply(FlowRuleOperations ops);
+
+ /**
* Adds the specified flow rule listener.
*
* @param listener flow rule listener
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
index 6fca2c3..d3aebba 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
@@ -15,8 +15,6 @@
*/
package org.onosproject.net.flow;
-import java.util.concurrent.Future;
-
import org.onosproject.net.DeviceId;
import org.onosproject.store.Store;
@@ -54,6 +52,7 @@
*
* @param rule the flow rule to add
*/
+ @Deprecated
void storeFlowRule(FlowRule rule);
/**
@@ -61,10 +60,9 @@
*
* @param batchOperation batch of flow rules.
* A batch can contain flow rules for a single device only.
- * @return Future response indicating success/failure of the batch operation
- * all the way down to the device.
+ *
*/
- Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation batchOperation);
+ void storeBatch(FlowRuleBatchOperation batchOperation);
/**
* Invoked on the completion of a storeBatch operation.
diff --git a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchOperationTest.java b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchOperationTest.java
index 07b60fa..70fe077 100644
--- a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchOperationTest.java
+++ b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchOperationTest.java
@@ -46,10 +46,10 @@
final LinkedList<FlowRuleBatchEntry> ops3 = new LinkedList<>();
ops3.add(entry3);
- final FlowRuleBatchOperation operation1 = new FlowRuleBatchOperation(ops1);
- final FlowRuleBatchOperation sameAsOperation1 = new FlowRuleBatchOperation(ops1);
- final FlowRuleBatchOperation operation2 = new FlowRuleBatchOperation(ops2);
- final FlowRuleBatchOperation operation3 = new FlowRuleBatchOperation(ops3);
+ final FlowRuleBatchOperation operation1 = new FlowRuleBatchOperation(ops1, null, 0);
+ final FlowRuleBatchOperation sameAsOperation1 = new FlowRuleBatchOperation(ops1, null, 0);
+ final FlowRuleBatchOperation operation2 = new FlowRuleBatchOperation(ops2, null, 0);
+ final FlowRuleBatchOperation operation3 = new FlowRuleBatchOperation(ops3, null, 0);
new EqualsTester()
.addEqualityGroup(operation1, sameAsOperation1)
diff --git a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchRequestTest.java b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchRequestTest.java
index d7ad903..e6864e1 100644
--- a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchRequestTest.java
+++ b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchRequestTest.java
@@ -15,17 +15,18 @@
*/
package org.onosproject.net.flow;
-import java.util.LinkedList;
-import java.util.List;
-
import org.junit.Test;
import org.onosproject.net.intent.IntentTestsMocks;
-import static org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation.*;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
+import static org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation.ADD;
+import static org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation.REMOVE;
/**
* Unit tests for the FlowRuleBatchRequest class.
@@ -40,22 +41,19 @@
public void testConstruction() {
final FlowRule rule1 = new IntentTestsMocks.MockFlowRule(1);
final FlowRule rule2 = new IntentTestsMocks.MockFlowRule(2);
- final List<FlowRuleBatchEntry> toAdd = new LinkedList<>();
- toAdd.add(new FlowRuleBatchEntry(ADD, rule1));
- final List<FlowRuleBatchEntry> toRemove = new LinkedList<>();
- toRemove.add(new FlowRuleBatchEntry(REMOVE, rule2));
+ final Set<FlowRuleBatchEntry> batch = new HashSet<>();
+ batch.add(new FlowRuleBatchEntry(ADD, rule1));
+
+ batch.add(new FlowRuleBatchEntry(REMOVE, rule2));
final FlowRuleBatchRequest request =
- new FlowRuleBatchRequest(1, toAdd, toRemove);
+ new FlowRuleBatchRequest(1, batch);
- assertThat(request.toAdd(), hasSize(1));
- assertThat(request.toAdd().get(0), is(rule1));
- assertThat(request.toRemove(), hasSize(1));
- assertThat(request.toRemove().get(0), is(rule2));
- assertThat(request.batchId(), is(1));
+ assertThat(request.ops(), hasSize(2));
+ assertThat(request.batchId(), is(1L));
- final FlowRuleBatchOperation op = request.asBatchOperation();
+ final FlowRuleBatchOperation op = request.asBatchOperation(rule1.deviceId());
assertThat(op.size(), is(2));
final List<FlowRuleBatchEntry> ops = op.getOperations();
diff --git a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java
index 8f54e85..393f3cf 100644
--- a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java
@@ -66,6 +66,11 @@
}
@Override
+ public void apply(FlowRuleOperations ops) {
+
+ }
+
+ @Override
public void addListener(FlowRuleListener listener) {
}
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 6d6cd9a..158764f 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -21,7 +21,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
-
+import com.google.common.util.concurrent.Futures;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -29,22 +29,25 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
import org.onosproject.event.AbstractListenerRegistry;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onosproject.net.flow.FlowRuleBatchEvent;
import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleBatchRequest;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleOperation;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
import org.onosproject.net.flow.FlowRuleProvider;
import org.onosproject.net.flow.FlowRuleProviderRegistry;
import org.onosproject.net.flow.FlowRuleProviderService;
@@ -55,18 +58,16 @@
import org.onosproject.net.provider.AbstractProviderService;
import org.slf4j.Logger;
-import java.util.HashSet;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -90,7 +91,16 @@
private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
- private ExecutorService futureService;
+ protected ExecutorService deviceInstallers =
+ Executors.newCachedThreadPool(namedThreads("onos-device-installer-%d"));
+
+ protected ExecutorService operationsService =
+ Executors.newFixedThreadPool(32, namedThreads("onos-flowservice-operations-%d"));
+
+ private IdGenerator idGenerator;
+
+ private Map<Long, FlowOperationsProcessor> pendingFlowOperations = new
+ ConcurrentHashMap<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleStore store;
@@ -101,10 +111,15 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
@Activate
public void activate() {
- futureService =
- Executors.newFixedThreadPool(32, namedThreads("onos-provider-future-listeners-%d"));
+
+ idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
+
+
store.setDelegate(delegate);
eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
log.info("Started");
@@ -112,8 +127,8 @@
@Deactivate
public void deactivate() {
- futureService.shutdownNow();
-
+ deviceInstallers.shutdownNow();
+ operationsService.shutdownNow();
store.unsetDelegate(delegate);
eventDispatcher.removeSink(FlowRuleEvent.class);
log.info("Stopped");
@@ -131,20 +146,20 @@
@Override
public void applyFlowRules(FlowRule... flowRules) {
- Set<FlowRuleBatchEntry> toAddBatchEntries = Sets.newHashSet();
+ FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
for (int i = 0; i < flowRules.length; i++) {
- toAddBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, flowRules[i]));
+ builder.add(flowRules[i]);
}
- applyBatch(new FlowRuleBatchOperation(toAddBatchEntries));
+ apply(builder.build());
}
@Override
public void removeFlowRules(FlowRule... flowRules) {
- Set<FlowRuleBatchEntry> toRemoveBatchEntries = Sets.newHashSet();
+ FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
for (int i = 0; i < flowRules.length; i++) {
- toRemoveBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, flowRules[i]));
+ builder.remove(flowRules[i]);
}
- applyBatch(new FlowRuleBatchOperation(toRemoveBatchEntries));
+ apply(builder.build());
}
@Override
@@ -180,23 +195,38 @@
}
@Override
- public Future<CompletedBatchOperation> applyBatch(
- FlowRuleBatchOperation batch) {
- Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
- ArrayListMultimap.create();
- List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
- for (FlowRuleBatchEntry fbe : batch.getOperations()) {
- final FlowRule f = fbe.target();
- perDeviceBatches.put(f.deviceId(), fbe);
- }
+ public Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch) {
- for (DeviceId deviceId : perDeviceBatches.keySet()) {
- FlowRuleBatchOperation b =
- new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
- Future<CompletedBatchOperation> future = store.storeBatch(b);
- futures.add(future);
- }
- return new FlowRuleBatchFuture(futures, perDeviceBatches);
+
+ FlowRuleOperations.Builder fopsBuilder = FlowRuleOperations.builder();
+ batch.getOperations().stream().forEach(op -> {
+ switch (op.getOperator()) {
+ case ADD:
+ fopsBuilder.add(op.getTarget());
+ break;
+ case REMOVE:
+ fopsBuilder.remove(op.getTarget());
+ break;
+ case MODIFY:
+ fopsBuilder.modify(op.getTarget());
+ break;
+ default:
+ log.warn("Unknown flow operation operator: {}", op.getOperator());
+
+ }
+ }
+ );
+
+ apply(fopsBuilder.build());
+ return Futures.immediateFuture(
+ new CompletedBatchOperation(true,
+ Collections.emptySet(), null));
+
+ }
+
+ @Override
+ public void apply(FlowRuleOperations ops) {
+ operationsService.submit(new FlowOperationsProcessor(ops));
}
@Override
@@ -373,13 +403,19 @@
}
}
+
+ @Override
+ public void batchOperationCompleted(long batchId, CompletedBatchOperation operation) {
+ store.batchOperationComplete(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(batchId, Collections.emptySet()),
+ operation
+ ));
+ }
}
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements FlowRuleStoreDelegate {
- // FIXME set appropriate default and make it configurable
- private static final int TIMEOUT_PER_OP = 500; // ms
// TODO: Right now we only dispatch events at individual flowEntry level.
// It may be more efficient for also dispatch events as a batch.
@@ -389,47 +425,55 @@
switch (event.type()) {
case BATCH_OPERATION_REQUESTED:
// Request has been forwarded to MASTER Node, and was
- for (FlowRule entry : request.toAdd()) {
- eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
- }
- for (FlowRule entry : request.toRemove()) {
- eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
- }
- // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
+ request.ops().stream().forEach(
+ op -> {
+ switch (op.getOperator()) {
- FlowRuleBatchOperation batchOperation = request.asBatchOperation();
+ case ADD:
+ eventDispatcher.post(
+ new FlowRuleEvent(
+ FlowRuleEvent.Type.RULE_ADD_REQUESTED,
+ op.getTarget()));
+ break;
+ case REMOVE:
+ eventDispatcher.post(
+ new FlowRuleEvent(
+ FlowRuleEvent.Type.RULE_REMOVE_REQUESTED,
+ op.getTarget()));
+ break;
+ case MODIFY:
+ //TODO: do something here when the time comes.
+ break;
+ default:
+ log.warn("Unknown flow operation operator: {}", op.getOperator());
+ }
+ }
+ );
+
+ DeviceId deviceId = event.deviceId();
+
+ FlowRuleBatchOperation batchOperation =
+ request.asBatchOperation(deviceId);
FlowRuleProvider flowRuleProvider =
- getProvider(batchOperation.getOperations().get(0).target().deviceId());
- final Future<CompletedBatchOperation> result =
- flowRuleProvider.executeBatch(batchOperation);
- futureService.submit(new Runnable() {
- @Override
- public void run() {
- CompletedBatchOperation res;
- try {
- res = result.get(TIMEOUT_PER_OP * batchOperation.size(), TimeUnit.MILLISECONDS);
- store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
- } catch (TimeoutException | InterruptedException | ExecutionException e) {
- log.warn("Something went wrong with the batch operation {}",
- request.batchId(), e);
+ getProvider(deviceId);
- Set<FlowRule> failures = new HashSet<>(batchOperation.size());
- for (FlowRuleBatchEntry op : batchOperation.getOperations()) {
- failures.add(op.target());
- }
- res = new CompletedBatchOperation(false, failures);
- store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
- }
- }
- });
+ flowRuleProvider.executeBatch(batchOperation);
+
break;
case BATCH_OPERATION_COMPLETED:
- // MASTER Node has pushed the batch down to the Device
- // Note: RULE_ADDED will be posted
- // when Flow was actually confirmed by stats reply.
+ FlowOperationsProcessor fops = pendingFlowOperations.remove(
+ event.subject().batchId());
+ if (event.result().isSuccess()) {
+ if (fops != null) {
+ fops.satisfy(event.deviceId());
+ }
+ } else {
+ fops.fail(event.deviceId(), event.result().failedItems());
+ }
+
break;
default:
@@ -438,141 +482,100 @@
}
}
- private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
+ private class FlowOperationsProcessor implements Runnable {
- private final List<Future<CompletedBatchOperation>> futures;
- private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
- private final AtomicReference<BatchState> state;
- private CompletedBatchOperation overall;
+ private final List<Set<FlowRuleOperation>> stages;
+ private final FlowRuleOperationsContext context;
+ private final FlowRuleOperations fops;
+ private final AtomicBoolean hasFailed = new AtomicBoolean(false);
- public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
- Multimap<DeviceId, FlowRuleBatchEntry> batches) {
- this.futures = futures;
- this.batches = batches;
- this.state = new AtomicReference<>(BatchState.STARTED);
- }
+ private Set<DeviceId> pendingDevices;
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (state.get() == BatchState.FINISHED) {
- return false;
- }
- if (log.isDebugEnabled()) {
- log.debug("Cancelling FlowRuleBatchFuture",
- new RuntimeException("Just printing backtrace"));
- }
- if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
- return false;
- }
- cleanUpBatch();
- for (Future<CompletedBatchOperation> f : futures) {
- f.cancel(mayInterruptIfRunning);
- }
- return true;
- }
+ public FlowOperationsProcessor(FlowRuleOperations ops) {
- @Override
- public boolean isCancelled() {
- return state.get() == BatchState.CANCELLED;
- }
+ this.stages = Lists.newArrayList(ops.stages());
+ this.context = ops.callback();
+ this.fops = ops;
+ pendingDevices = Sets.newConcurrentHashSet();
- @Override
- public boolean isDone() {
- return state.get() == BatchState.FINISHED;
- }
-
-
- @Override
- public CompletedBatchOperation get() throws InterruptedException,
- ExecutionException {
-
- if (isDone()) {
- return overall;
- }
-
- boolean success = true;
- Set<FlowRule> failed = Sets.newHashSet();
- Set<Long> failedIds = Sets.newHashSet();
- CompletedBatchOperation completed;
- for (Future<CompletedBatchOperation> future : futures) {
- completed = future.get();
- success = validateBatchOperation(failed, failedIds, completed);
- }
-
- return finalizeBatchOperation(success, failed, failedIds);
}
@Override
- public CompletedBatchOperation get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException,
- TimeoutException {
-
- if (isDone()) {
- return overall;
- }
- boolean success = true;
- Set<FlowRule> failed = Sets.newHashSet();
- Set<Long> failedIds = Sets.newHashSet();
- CompletedBatchOperation completed;
- for (Future<CompletedBatchOperation> future : futures) {
- completed = future.get(timeout, unit);
- success = validateBatchOperation(failed, failedIds, completed);
- }
- return finalizeBatchOperation(success, failed, failedIds);
- }
-
- private boolean validateBatchOperation(Set<FlowRule> failed,
- Set<Long> failedIds,
- CompletedBatchOperation completed) {
-
- if (isCancelled()) {
- throw new CancellationException();
- }
- if (!completed.isSuccess()) {
- log.warn("FlowRuleBatch failed: {}", completed);
- failed.addAll(completed.failedItems());
- failedIds.addAll(completed.failedIds());
- cleanUpBatch();
- cancelAllSubBatches();
- return false;
- }
- return true;
- }
-
- private void cancelAllSubBatches() {
- for (Future<CompletedBatchOperation> f : futures) {
- f.cancel(true);
+ public void run() {
+ if (stages.size() > 0) {
+ process(stages.remove(0));
+ } else if (!hasFailed.get() && context != null) {
+ context.onSuccess(fops);
}
}
- private CompletedBatchOperation finalizeBatchOperation(boolean success,
- Set<FlowRule> failed,
- Set<Long> failedIds) {
- synchronized (this) {
- if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
- if (state.get() == BatchState.FINISHED) {
- return overall;
+ private void process(Set<FlowRuleOperation> ops) {
+ Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
+ ArrayListMultimap.create();
+
+ FlowRuleBatchEntry fbe;
+ for (FlowRuleOperation flowRuleOperation : ops) {
+ switch (flowRuleOperation.type()) {
+ // FIXME: Brian needs imagination when creating class names.
+ case ADD:
+ fbe = new FlowRuleBatchEntry(
+ FlowRuleBatchEntry.FlowRuleOperation.ADD, flowRuleOperation.rule());
+ break;
+ case MODIFY:
+ fbe = new FlowRuleBatchEntry(
+ FlowRuleBatchEntry.FlowRuleOperation.MODIFY, flowRuleOperation.rule());
+ break;
+ case REMOVE:
+ fbe = new FlowRuleBatchEntry(
+ FlowRuleBatchEntry.FlowRuleOperation.REMOVE, flowRuleOperation.rule());
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown flow rule type " + flowRuleOperation.type());
+ }
+ pendingDevices.add(flowRuleOperation.rule().deviceId());
+ perDeviceBatches.put(flowRuleOperation.rule().deviceId(), fbe);
+ }
+
+
+ for (DeviceId deviceId : perDeviceBatches.keySet()) {
+ Long id = idGenerator.getNewId();
+ final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
+ deviceId, id);
+ pendingFlowOperations.put(id, this);
+ deviceInstallers.submit(new Runnable() {
+ @Override
+ public void run() {
+ store.storeBatch(b);
}
- throw new CancellationException();
- }
- overall = new CompletedBatchOperation(success, failed, failedIds);
- return overall;
+ });
}
}
- private void cleanUpBatch() {
- log.debug("cleaning up batch");
- // TODO convert these into a batch?
- for (FlowRuleBatchEntry fbe : batches.values()) {
- if (fbe.operator() == FlowRuleOperation.ADD ||
- fbe.operator() == FlowRuleOperation.MODIFY) {
- store.deleteFlowRule(fbe.target());
- } else if (fbe.operator() == FlowRuleOperation.REMOVE) {
- store.removeFlowRule(new DefaultFlowEntry(fbe.target()));
- store.storeFlowRule(fbe.target());
- }
+ public void satisfy(DeviceId devId) {
+ pendingDevices.remove(devId);
+ if (pendingDevices.isEmpty()) {
+ operationsService.submit(this);
}
}
+
+
+
+ public void fail(DeviceId devId, Set<? extends FlowRule> failures) {
+ hasFailed.set(true);
+ pendingDevices.remove(devId);
+ if (pendingDevices.isEmpty()) {
+ operationsService.submit(this);
+ }
+
+ if (context != null) {
+ final FlowRuleOperations.Builder failedOpsBuilder =
+ FlowRuleOperations.builder();
+ failures.stream().forEach(failedOpsBuilder::add);
+
+ context.onError(failedOpsBuilder.build());
+ }
+ }
+
}
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 8b6d8ad..685fa70 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -1151,6 +1151,7 @@
*/
protected Future<CompletedBatchOperation> applyNextBatch(List<CompletedIntentUpdate> updates) {
//TODO test this. (also, maybe save this batch)
+
FlowRuleBatchOperation batch = createFlowRuleBatchOperation(updates);
if (batch.size() > 0) {
//FIXME apply batch might throw an exception
@@ -1165,7 +1166,7 @@
}
private FlowRuleBatchOperation createFlowRuleBatchOperation(List<CompletedIntentUpdate> intentUpdates) {
- FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList());
+ FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList(), null, 0);
for (CompletedIntentUpdate update : intentUpdates) {
FlowRuleBatchOperation currentBatch = update.currentBatch();
if (currentBatch != null) {
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java
index 583d7e3..816fc12 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java
@@ -98,6 +98,7 @@
outputPorts.put(egressPoint.deviceId(), egressPoint.port());
}
+ //FIXME change to new api
FlowRuleBatchOperation batchOperation =
new FlowRuleBatchOperation(outputPorts
.keys()
@@ -105,7 +106,7 @@
.map(deviceId -> createBatchEntry(operation,
intent, deviceId,
outputPorts.get(deviceId)))
- .collect(Collectors.toList()));
+ .collect(Collectors.toList()), null, 0);
return Collections.singletonList(batchOperation);
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java
index e958c58..5b226a3 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java
@@ -181,6 +181,7 @@
true);
rules.add(new FlowRuleBatchEntry(operation, rule));
- return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ //FIXME change to new api
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
}
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java
index 1c8ea7e..b90be50 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java
@@ -108,7 +108,8 @@
intent.id().fingerprint()));
prev = link.dst();
}
- return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ //FIXME this should change to new api.
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
}
@Override
@@ -138,7 +139,8 @@
intent.id().fingerprint()));
prev = link.dst();
}
- return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ // FIXME this should change to new api
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
}
@Override
diff --git a/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java b/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
index c3f9ba0..87fc585 100644
--- a/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
+++ b/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
@@ -31,7 +31,7 @@
@Override
@SuppressWarnings("unchecked")
- public void post(Event event) {
+ public synchronized void post(Event event) {
EventSink sink = getSink(event.getClass());
checkState(sink != null, "No sink for event %s", event);
sink.process(event);
diff --git a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
index 5a79b43..3fd8b5a 100644
--- a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
@@ -20,12 +20,15 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
-
+import com.google.common.util.concurrent.MoreExecutors;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.core.IdGenerator;
+import org.onosproject.core.Version;
import org.onosproject.event.impl.TestEventDispatcher;
import org.onosproject.net.DefaultDevice;
import org.onosproject.net.Device;
@@ -36,7 +39,6 @@
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceServiceAdapter;
-import org.onosproject.net.flow.BatchOperation;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.DefaultFlowRule;
@@ -72,6 +74,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.*;
import static org.onosproject.net.flow.FlowRuleEvent.Type.*;
@@ -97,12 +100,16 @@
protected TestListener listener = new TestListener();
private ApplicationId appId;
+
@Before
public void setUp() {
mgr = new FlowRuleManager();
mgr.store = new SimpleFlowRuleStore();
mgr.eventDispatcher = new TestEventDispatcher();
mgr.deviceService = new TestDeviceService();
+ mgr.coreService = new TestCoreService();
+ mgr.operationsService = MoreExecutors.newDirectExecutorService();
+ mgr.deviceInstallers = MoreExecutors.newDirectExecutorService();
service = mgr;
registry = mgr;
@@ -246,14 +253,23 @@
@Test
public void flowRemoved() {
+
FlowRule f1 = addFlowRule(1);
FlowRule f2 = addFlowRule(2);
StoredFlowEntry fe1 = new DefaultFlowEntry(f1);
FlowEntry fe2 = new DefaultFlowEntry(f2);
+
+
providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2));
service.removeFlowRules(f1);
+
fe1.setState(FlowEntryState.REMOVED);
+
+
+
providerService.flowRemoved(fe1);
+
+
validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED,
RULE_ADDED, RULE_REMOVE_REQUESTED, RULE_REMOVED);
@@ -263,11 +279,13 @@
FlowRule f3 = flowRule(3, 3);
FlowEntry fe3 = new DefaultFlowEntry(f3);
service.applyFlowRules(f3);
+
providerService.pushFlowMetrics(DID, Collections.singletonList(fe3));
validateEvents(RULE_ADD_REQUESTED, RULE_ADDED);
providerService.flowRemoved(fe3);
validateEvents();
+
}
@Test
@@ -281,7 +299,6 @@
FlowEntry fe1 = new DefaultFlowEntry(f1);
FlowEntry fe2 = new DefaultFlowEntry(f2);
-
//FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED);
//FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED);
@@ -388,7 +405,7 @@
FlowRuleBatchEntry.FlowRuleOperation.ADD, f2);
FlowRuleBatchOperation fbo = new FlowRuleBatchOperation(
- Lists.newArrayList(fbe1, fbe2));
+ Lists.newArrayList(fbe1, fbe2), null, 0);
Future<CompletedBatchOperation> future = mgr.applyBatch(fbo);
assertTrue("Entries in wrong state",
validateState(ImmutableMap.of(
@@ -406,53 +423,6 @@
}
- @Test
- public void cancelBatch() {
- FlowRule f1 = flowRule(1, 1);
- FlowRule f2 = flowRule(2, 2);
-
-
- mgr.applyFlowRules(f1);
-
- assertTrue("Entries in wrong state",
- validateState(ImmutableMap.of(
- f1, FlowEntryState.PENDING_ADD)));
-
- FlowEntry fe1 = new DefaultFlowEntry(f1);
- providerService.pushFlowMetrics(DID, Collections.<FlowEntry>singletonList(fe1));
-
- assertTrue("Entries in wrong state",
- validateState(ImmutableMap.of(
- f1, FlowEntryState.ADDED)));
-
-
- FlowRuleBatchEntry fbe1 = new FlowRuleBatchEntry(
- FlowRuleBatchEntry.FlowRuleOperation.REMOVE, f1);
-
- FlowRuleBatchEntry fbe2 = new FlowRuleBatchEntry(
- FlowRuleBatchEntry.FlowRuleOperation.ADD, f2);
-
- FlowRuleBatchOperation fbo = new FlowRuleBatchOperation(
- Lists.newArrayList(fbe1, fbe2));
- Future<CompletedBatchOperation> future = mgr.applyBatch(fbo);
-
- future.cancel(true);
-
- assertTrue(flowCount() == 2);
-
- /*
- * Rule f1 should be re-added to the list and therefore be in a pending add
- * state.
- */
- assertTrue("Entries in wrong state",
- validateState(ImmutableMap.of(
- f2, FlowEntryState.PENDING_REMOVE,
- f1, FlowEntryState.PENDING_ADD)));
-
-
- }
-
-
private static class TestListener implements FlowRuleListener {
final List<FlowRuleEvent> events = new ArrayList<>();
@@ -528,9 +498,8 @@
}
@Override
- public ListenableFuture<CompletedBatchOperation> executeBatch(
- BatchOperation<FlowRuleBatchEntry> batch) {
- return new TestInstallationFuture();
+ public void executeBatch(FlowRuleBatchOperation batch) {
+ // TODO: need to call batchOperationComplete
}
private class TestInstallationFuture
@@ -554,14 +523,14 @@
@Override
public CompletedBatchOperation get()
throws InterruptedException, ExecutionException {
- return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
+ return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet(), null);
}
@Override
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException,
ExecutionException, TimeoutException {
- return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
+ return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet(), null);
}
@Override
@@ -644,4 +613,37 @@
}
}
+ private class TestCoreService implements CoreService {
+ @Override
+ public Version version() {
+ return null;
+ }
+
+ @Override
+ public Set<ApplicationId> getAppIds() {
+ return null;
+ }
+
+ @Override
+ public ApplicationId getAppId(Short id) {
+ return null;
+ }
+
+ @Override
+ public ApplicationId registerApplication(String identifier) {
+ return null;
+ }
+
+ @Override
+ public IdGenerator getIdGenerator(String topic) {
+ return new IdGenerator() {
+ private AtomicLong counter = new AtomicLong(0);
+ @Override
+ public long getNewId() {
+ return counter.getAndIncrement();
+ }
+ };
+ }
+ }
+
}
diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java b/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
index 17a9498..3570c77 100644
--- a/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
@@ -201,7 +201,7 @@
FlowRule fr = new IntentTestsMocks.MockFlowRule(intent.number().intValue());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, fr));
- return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
}
@Override
@@ -209,7 +209,7 @@
FlowRule fr = new IntentTestsMocks.MockFlowRule(intent.number().intValue());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, fr));
- return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
}
@Override
@@ -219,7 +219,7 @@
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, fr));
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, fr2));
- return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
}
}
diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java b/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java
index e956575..72de4f0 100644
--- a/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java
+++ b/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java
@@ -27,6 +27,7 @@
import org.onosproject.net.flow.FlowRuleBatchEntry;
import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleService;
import com.google.common.collect.ImmutableSet;
@@ -45,11 +46,11 @@
public void setFuture(boolean success, long intentId) {
if (success) {
- future = Futures.immediateFuture(new CompletedBatchOperation(true, Collections.emptySet()));
+ future = Futures.immediateFuture(new CompletedBatchOperation(true, Collections.emptySet(), null));
} else {
final Set<Long> failedIds = ImmutableSet.of(intentId);
future = Futures.immediateFuture(
- new CompletedBatchOperation(false, flows, failedIds));
+ new CompletedBatchOperation(false, flows, failedIds, null));
}
}
@@ -74,6 +75,11 @@
}
@Override
+ public void apply(FlowRuleOperations ops) {
+
+ }
+
+ @Override
public int getFlowRuleCount() {
return flows.size();
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 09aa401..3d66386 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -15,33 +15,15 @@
*/
package org.onosproject.store.flow.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onlab.util.Tools.namedThreads;
-import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.APPLY_BATCH_FLOWS;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_FLOW_ENTRY;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.REMOVE_FLOW_ENTRY;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.hazelcast.core.IMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -49,8 +31,11 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
+import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
@@ -67,6 +52,7 @@
import org.onosproject.net.flow.FlowRuleBatchRequest;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleEvent.Type;
+import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
@@ -79,27 +65,37 @@
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.hz.AbstractHazelcastStore;
import org.onosproject.store.hz.SMap;
-import org.onosproject.store.serializers.DecodeTo;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
import org.slf4j.Logger;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.hazelcast.core.IMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.util.Tools.namedThreads;
+import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
+import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of flow rules using a distributed state management protocol.
@@ -112,12 +108,10 @@
private final Logger log = getLogger(getClass());
- // primary data:
- // read/write needs to be locked
- private final ReentrantReadWriteLock flowEntriesLock = new ReentrantReadWriteLock();
- // store entries as a pile of rules, no info about device tables
- private final Multimap<DeviceId, StoredFlowEntry> flowEntries
- = ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
+ private InternalFlowTable flowTable = new InternalFlowTable();
+
+ /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
+ flowEntries = new ConcurrentHashMap<>();*/
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ReplicaInfoService replicaInfoManager;
@@ -131,23 +125,15 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
- private final AtomicInteger localBatchIdGen = new AtomicInteger();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
- private int pendingFutureTimeoutMinutes = 5;
-
- private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
- CacheBuilder.newBuilder()
- .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
- .removalListener(new TimeoutFuture())
- .build();
+ private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
// Cache of SMaps used for backup data. each SMap contain device flow table
private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
- private final ExecutorService futureListeners =
- Executors.newCachedThreadPool(namedThreads("onos-flowstore-peer-responders"));
-
private final ExecutorService backupExecutors =
Executors.newSingleThreadExecutor(namedThreads("onos-async-backups"));
@@ -169,6 +155,8 @@
private ReplicaInfoEventListener replicaInfoEventListener;
+ private IdGenerator idGenerator;
+
@Override
@Activate
public void activate() {
@@ -176,22 +164,33 @@
super.serializer = SERIALIZER;
super.theInstance = storeService.getHazelcastInstance();
+ idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
+
// Cache to create SMap on demand
smaps = CacheBuilder.newBuilder()
- .softValues()
- .build(new SMapLoader());
+ .softValues()
+ .build(new SMapLoader());
final NodeId local = clusterService.getLocalNode().id();
clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
+ clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
+ @Override
+ public void handle(ClusterMessage message) {
+ FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
+ log.trace("received completed notification for {}", event);
+ notifyDelegate(event);
+ }
+ });
+
clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
@Override
public void handle(ClusterMessage message) {
FlowRule rule = SERIALIZER.decode(message.payload());
log.trace("received get flow entry request for {}", rule);
- FlowEntry flowEntry = getFlowEntryInternal(rule);
+ FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
try {
message.respond(SERIALIZER.encode(flowEntry));
} catch (IOException e) {
@@ -206,7 +205,7 @@
public void handle(ClusterMessage message) {
DeviceId deviceId = SERIALIZER.decode(message.payload());
log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
- Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
+ Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
try {
message.respond(SERIALIZER.encode(flowEntries));
} catch (IOException e) {
@@ -272,11 +271,11 @@
}
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
- return getFlowEntryInternal(rule);
+ return flowTable.getFlowEntry(rule);
}
log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), rule.deviceId());
+ replicaInfo.master().orNull(), rule.deviceId());
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
@@ -292,19 +291,7 @@
return null;
}
- private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
- flowEntriesLock.readLock().lock();
- try {
- for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
- if (f.equals(rule)) {
- return f;
- }
- }
- } finally {
- flowEntriesLock.readLock().unlock();
- }
- return null;
- }
+
@Override
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
@@ -317,11 +304,11 @@
}
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
- return getFlowEntriesInternal(deviceId);
+ return flowTable.getFlowEntries(deviceId);
}
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), deviceId);
+ replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
@@ -337,30 +324,26 @@
return Collections.emptyList();
}
- private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
- flowEntriesLock.readLock().lock();
- try {
- Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
- if (rules == null) {
- return Collections.emptySet();
- }
- return ImmutableSet.copyOf(rules);
- } finally {
- flowEntriesLock.readLock().unlock();
- }
- }
+
@Override
public void storeFlowRule(FlowRule rule) {
- storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
+ storeBatch(new FlowRuleBatchOperation(
+ Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
+ rule.deviceId(), idGenerator.getNewId()));
}
@Override
- public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
+ public void storeBatch(FlowRuleBatchOperation operation) {
+
if (operation.getOperations().isEmpty()) {
- return Futures.immediateFuture(new CompletedBatchOperation(true,
- Collections.<FlowRule>emptySet()));
+
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(),
+ operation.deviceId())));
+ return;
}
DeviceId deviceId = operation.getOperations().get(0).target().deviceId();
@@ -369,110 +352,129 @@
if (!replicaInfo.master().isPresent()) {
log.warn("Failed to storeBatch: No master for {}", deviceId);
- // TODO: revisit if this should be "success" from Future point of view
- // with every FlowEntry failed
- return Futures.immediateFailedFuture(new IOException("Failed to storeBatch: No master for " + deviceId));
+
+ Set<FlowRule> allFailures = operation.getOperations().stream()
+ .map(op -> op.getTarget())
+ .collect(Collectors.toSet());
+
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(false, allFailures, operation.deviceId())));
+ return;
}
final NodeId local = clusterService.getLocalNode().id();
if (replicaInfo.master().get().equals(local)) {
- return storeBatchInternal(operation);
+ storeBatchInternal(operation);
+ return;
}
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), deviceId);
+ replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
local,
APPLY_BATCH_FLOWS,
SERIALIZER.encode(operation));
+ //CompletedBatchOperation response;
try {
ListenableFuture<byte[]> responseFuture =
clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
- } catch (IOException e) {
- return Futures.immediateFailedFuture(e);
+ /*response =
+ Futures.transform(responseFuture,
+ new DecodeTo<CompletedBatchOperation>(SERIALIZER))
+ .get(500 * operation.size(), TimeUnit.MILLISECONDS);
+
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), response));*/
+
+ } catch (IOException /*| InterruptedException | ExecutionException | TimeoutException*/ e) {
+ log.warn("Failed to storeBatch: {}", e.getMessage());
+
+ Set<FlowRule> allFailures = operation.getOperations().stream()
+ .map(op -> op.getTarget())
+ .collect(Collectors.toSet());
+
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(false, allFailures, deviceId)));
+ return;
}
+
}
- private ListenableFuture<CompletedBatchOperation>
- storeBatchInternal(FlowRuleBatchOperation operation) {
+ private void storeBatchInternal(FlowRuleBatchOperation operation) {
- final List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
- final List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
- DeviceId did = null;
+ final DeviceId did = operation.deviceId();
+ //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
+ Set<FlowRuleBatchEntry> currentOps;
- flowEntriesLock.writeLock().lock();
- try {
- for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
- FlowRule flowRule = batchEntry.target();
- FlowRuleOperation op = batchEntry.operator();
- if (did == null) {
- did = flowRule.deviceId();
- }
- if (op.equals(FlowRuleOperation.REMOVE)) {
- StoredFlowEntry entry = getFlowEntryInternal(flowRule);
- if (entry != null) {
- entry.setState(FlowEntryState.PENDING_REMOVE);
- toRemove.add(batchEntry);
+ currentOps = operation.getOperations().stream().map(
+ op -> {
+ StoredFlowEntry entry;
+ switch (op.getOperator()) {
+ case ADD:
+ entry = new DefaultFlowEntry(op.getTarget());
+ // always add requested FlowRule

+ // Note: 2 equal FlowEntry may have different treatment
+ flowTable.remove(entry.deviceId(), entry);
+ flowTable.add(entry);
+
+ return op;
+ case REMOVE:
+ entry = flowTable.getFlowEntry(op.target());
+ if (entry != null) {
+ entry.setState(FlowEntryState.PENDING_REMOVE);
+ return op;
+ }
+ break;
+ case MODIFY:
+ //TODO: figure this out at some point
+ break;
+ default:
+ log.warn("Unknown flow operation operator: {}", op.getOperator());
}
- } else if (op.equals(FlowRuleOperation.ADD)) {
- StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
- DeviceId deviceId = flowRule.deviceId();
- Collection<StoredFlowEntry> ft = flowEntries.get(deviceId);
-
- // always add requested FlowRule
- // Note: 2 equal FlowEntry may have different treatment
- ft.remove(flowEntry);
- ft.add(flowEntry);
- toAdd.add(batchEntry);
+ return null;
}
- }
- if (toAdd.isEmpty() && toRemove.isEmpty()) {
- return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
- }
-
- // create remote backup copies
- updateBackup(did, toAdd, toRemove);
- } finally {
- flowEntriesLock.writeLock().unlock();
+ ).filter(op -> op != null).collect(Collectors.toSet());
+ if (currentOps.isEmpty()) {
+ batchOperationComplete(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(), did)));
+ return;
}
+ updateBackup(did, currentOps);
- SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
- final int batchId = localBatchIdGen.incrementAndGet();
- pendingFutures.put(batchId, r);
- notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
+ notifyDelegate(FlowRuleBatchEvent.requested(new
+ FlowRuleBatchRequest(operation.id(), currentOps), operation.deviceId()));
- return r;
+
}
- private void updateBackup(final DeviceId deviceId,
- final List<FlowRuleBatchEntry> toAdd,
- final List<FlowRuleBatchEntry> list) {
-
- Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
+ private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
+ Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
if (syncBackup) {
// wait for backup to complete
try {
- submit.get();
+ backup.get();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to create backups", e);
}
}
}
- private void updateBackup(DeviceId deviceId, List<FlowRuleBatchEntry> toAdd) {
-
- updateBackup(deviceId, toAdd, Collections.<FlowRuleBatchEntry>emptyList());
- }
-
@Override
public void deleteFlowRule(FlowRule rule) {
- storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
+ storeBatch(
+ new FlowRuleBatchOperation(
+ Arrays.asList(
+ new FlowRuleBatchEntry(
+ FlowRuleOperation.REMOVE,
+ rule)), rule.deviceId(), idGenerator.getNewId()));
}
@Override
@@ -484,37 +486,35 @@
}
log.warn("Tried to update FlowRule {} state,"
- + " while the Node was not the master.", rule);
+ + " while the Node was not the master.", rule);
return null;
}
private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
final DeviceId did = rule.deviceId();
- flowEntriesLock.writeLock().lock();
- try {
- // check if this new rule is an update to an existing entry
- StoredFlowEntry stored = getFlowEntryInternal(rule);
- if (stored != null) {
- stored.setBytes(rule.bytes());
- stored.setLife(rule.life());
- stored.setPackets(rule.packets());
- if (stored.state() == FlowEntryState.PENDING_ADD) {
- stored.setState(FlowEntryState.ADDED);
- FlowRuleBatchEntry entry =
- new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
- updateBackup(did, Arrays.asList(entry));
- return new FlowRuleEvent(Type.RULE_ADDED, rule);
- }
- return new FlowRuleEvent(Type.RULE_UPDATED, rule);
- }
- // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
- // TODO: also update backup if the behavior is correct.
- flowEntries.put(did, new DefaultFlowEntry(rule));
- } finally {
- flowEntriesLock.writeLock().unlock();
+ // check if this new rule is an update to an existing entry
+ StoredFlowEntry stored = flowTable.getFlowEntry(rule);
+ if (stored != null) {
+ stored.setBytes(rule.bytes());
+ stored.setLife(rule.life());
+ stored.setPackets(rule.packets());
+ if (stored.state() == FlowEntryState.PENDING_ADD) {
+ stored.setState(FlowEntryState.ADDED);
+ FlowRuleBatchEntry entry =
+ new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
+ updateBackup(did, Sets.newHashSet(entry));
+ return new FlowRuleEvent(Type.RULE_ADDED, rule);
+ }
+ return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
+
+ // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
+ // TODO: also update backup if the behavior is correct.
+ flowTable.add(rule);
+
+
return null;
}
@@ -540,9 +540,9 @@
replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- REMOVE_FLOW_ENTRY,
- SERIALIZER.encode(rule));
+ clusterService.getLocalNode().id(),
+ REMOVE_FLOW_ENTRY,
+ SERIALIZER.encode(rule));
try {
Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
@@ -555,38 +555,42 @@
private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
final DeviceId deviceId = rule.deviceId();
- flowEntriesLock.writeLock().lock();
- try {
- // This is where one could mark a rule as removed and still keep it in the store.
- final boolean removed = flowEntries.remove(deviceId, rule);
- FlowRuleBatchEntry entry =
- new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
- updateBackup(deviceId, Collections.<FlowRuleBatchEntry>emptyList(), Arrays.asList(entry));
- if (removed) {
- return new FlowRuleEvent(RULE_REMOVED, rule);
- } else {
- return null;
- }
- } finally {
- flowEntriesLock.writeLock().unlock();
+ // This is where one could mark a rule as removed and still keep it in the store.
+ final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
+ FlowRuleBatchEntry entry =
+ new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
+ updateBackup(deviceId, Sets.newHashSet(entry));
+ if (removed) {
+ return new FlowRuleEvent(RULE_REMOVED, rule);
+ } else {
+ return null;
}
+
}
@Override
public void batchOperationComplete(FlowRuleBatchEvent event) {
- final Integer batchId = event.subject().batchId();
- SettableFuture<CompletedBatchOperation> future
- = pendingFutures.getIfPresent(batchId);
- if (future != null) {
- future.set(event.result());
- pendingFutures.invalidate(batchId);
+ //FIXME: need a per device pending response
+
+ NodeId nodeId = pendingResponses.remove(event.subject().batchId());
+ if (nodeId == null) {
+ notifyDelegate(event);
+ } else {
+ try {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ REMOTE_APPLY_COMPLETED,
+ SERIALIZER.encode(event));
+ clusterCommunicator.sendAndReceive(message, nodeId);
+ } catch (IOException e) {
+ log.warn("Failed to respond to peer for batch operation result");
+ }
}
- notifyDelegate(event);
}
private void loadFromBackup(final DeviceId did) {
- flowEntriesLock.writeLock().lock();
+
try {
log.debug("Loading FlowRules for {} from backups", did);
SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
@@ -595,38 +599,21 @@
log.trace("loading {}", e.getValue());
for (StoredFlowEntry entry : e.getValue()) {
- flowEntries.remove(did, entry);
- flowEntries.put(did, entry);
+ flowTable.getFlowEntriesById(entry).remove(entry);
+ flowTable.getFlowEntriesById(entry).add(entry);
+
+
}
}
} catch (ExecutionException e) {
log.error("Failed to load backup flowtable for {}", did, e);
- } finally {
- flowEntriesLock.writeLock().unlock();
}
}
private void removeFromPrimary(final DeviceId did) {
- Collection<StoredFlowEntry> removed = null;
- flowEntriesLock.writeLock().lock();
- try {
- removed = flowEntries.removeAll(did);
- } finally {
- flowEntriesLock.writeLock().unlock();
- }
- log.trace("removedFromPrimary {}", removed);
+ flowTable.clearDevice(did);
}
- private static final class TimeoutFuture
- implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
- @Override
- public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
- // wrapping in ExecutionException to support Future.get
- notification.getValue()
- .setException(new ExecutionException("Timed out",
- new TimeoutException()));
- }
- }
private final class OnStoreBatch implements ClusterMessageHandler {
private final NodeId local;
@@ -640,7 +627,7 @@
FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
log.debug("received batch request {}", operation);
- final DeviceId deviceId = operation.getOperations().get(0).target().deviceId();
+ final DeviceId deviceId = operation.deviceId();
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
if (!local.equals(replicaInfo.master().orNull())) {
@@ -648,7 +635,7 @@
for (FlowRuleBatchEntry op : operation.getOperations()) {
failures.add(op.target());
}
- CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
+ CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
// This node is no longer the master, respond as all failed.
// TODO: we might want to wrap response in envelope
// to distinguish sw programming failure and hand over
@@ -661,36 +648,15 @@
return;
}
- final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
- f.addListener(new Runnable() {
+ pendingResponses.put(operation.id(), message.sender());
+ storeBatchInternal(operation);
- @Override
- public void run() {
- CompletedBatchOperation result;
- try {
- result = f.get();
- } catch (InterruptedException | ExecutionException e) {
- log.error("Batch operation failed", e);
- // create everything failed response
- Set<FlowRule> failures = new HashSet<>(operation.size());
- for (FlowRuleBatchEntry op : operation.getOperations()) {
- failures.add(op.target());
- }
- result = new CompletedBatchOperation(false, failures);
- }
- try {
- message.respond(SERIALIZER.encode(result));
- } catch (IOException e) {
- log.error("Failed to respond back", e);
- }
- }
- }, futureListeners);
}
}
private final class SMapLoader
- extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
+ extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
@Override
public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
@@ -701,7 +667,7 @@
}
private final class InternalReplicaInfoEventListener
- implements ReplicaInfoEventListener {
+ implements ReplicaInfoEventListener {
@Override
public void event(ReplicaInfoEvent event) {
@@ -710,98 +676,166 @@
final ReplicaInfo rInfo = event.replicaInfo();
switch (event.type()) {
- case MASTER_CHANGED:
- if (local.equals(rInfo.master().orNull())) {
- // This node is the new master, populate local structure
- // from backup
- loadFromBackup(did);
- } else {
- // This node is no longer the master holder,
- // clean local structure
- removeFromPrimary(did);
- // TODO: probably should stop pending backup activities in
- // executors to avoid overwriting with old value
- }
- break;
- default:
- break;
+ case MASTER_CHANGED:
+ if (local.equals(rInfo.master().orNull())) {
+ // This node is the new master, populate local structure
+ // from backup
+ loadFromBackup(did);
+ } else {
+ // This node is no longer the master holder,
+ // clean local structure
+ removeFromPrimary(did);
+ // TODO: probably should stop pending backup activities in
+ // executors to avoid overwriting with old value
+ }
+ break;
+ default:
+ break;
}
}
}
// Task to update FlowEntries in backup HZ store
- // TODO: Should be refactored to contain only one list and not
- // toAdd and toRemove
private final class UpdateBackup implements Runnable {
private final DeviceId deviceId;
- private final List<FlowRuleBatchEntry> toAdd;
- private final List<FlowRuleBatchEntry> toRemove;
+ private final Set<FlowRuleBatchEntry> ops;
+
public UpdateBackup(DeviceId deviceId,
- List<FlowRuleBatchEntry> toAdd,
- List<FlowRuleBatchEntry> list) {
+ Set<FlowRuleBatchEntry> ops) {
this.deviceId = checkNotNull(deviceId);
- this.toAdd = checkNotNull(toAdd);
- this.toRemove = checkNotNull(list);
+ this.ops = checkNotNull(ops);
+
}
@Override
public void run() {
try {
- log.trace("update backup {} +{} -{}", deviceId, toAdd, toRemove);
+ log.trace("update backup {} {}", deviceId, ops
+ );
final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
- // Following should be rewritten using async APIs
- for (FlowRuleBatchEntry bEntry : toAdd) {
- final FlowRule entry = bEntry.target();
- final FlowId id = entry.id();
- ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
- List<StoredFlowEntry> list = new ArrayList<>();
- if (original != null) {
- list.addAll(original);
- }
- list.remove(bEntry.target());
- list.add((StoredFlowEntry) entry);
- ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
- boolean success;
- if (original == null) {
- success = (backupFlowTable.putIfAbsent(id, newValue) == null);
- } else {
- success = backupFlowTable.replace(id, original, newValue);
- }
- if (!success) {
- log.error("Updating backup failed.");
- }
- }
- for (FlowRuleBatchEntry bEntry : toRemove) {
- final FlowRule entry = bEntry.target();
- final FlowId id = entry.id();
- ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
- List<StoredFlowEntry> list = new ArrayList<>();
- if (original != null) {
- list.addAll(original);
- }
+ ops.stream().forEach(
+ op -> {
+ final FlowRule entry = op.getTarget();
+ final FlowId id = entry.id();
+ ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
+ List<StoredFlowEntry> list = new ArrayList<>();
+ if (original != null) {
+ list.addAll(original);
+ }
+ list.remove(op.getTarget());
+ if (op.getOperator() == FlowRuleOperation.ADD) {
+ list.add((StoredFlowEntry) entry);
+ }
- list.remove(bEntry.target());
+ ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
+ boolean success;
+ if (original == null) {
+ success = (backupFlowTable.putIfAbsent(id, newValue) == null);
+ } else {
+ success = backupFlowTable.replace(id, original, newValue);
+ }
+ if (!success) {
+ log.error("Updating backup failed.");
+ }
- ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
- boolean success;
- if (original == null) {
- success = (backupFlowTable.putIfAbsent(id, newValue) == null);
- } else {
- success = backupFlowTable.replace(id, original, newValue);
- }
- if (!success) {
- log.error("Updating backup failed.");
- }
- }
+ }
+ );
} catch (ExecutionException e) {
log.error("Failed to write to backups", e);
}
}
}
+
+ private class InternalFlowTable {
+
+ /*
+ TODO: This needs to be cleaned up. Perhaps using the eventually consistent
+ map when it supports distributed to a sequence of instances.
+ */
+
+
+ private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
+ flowEntries = new ConcurrentHashMap<>();
+
+
+ private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
+ return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
+ }
+
+ /**
+ * Returns the flow table for specified device.
+ *
+ * @param deviceId identifier of the device
+ * @return Map representing Flow Table of given device.
+ */
+ private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
+ return createIfAbsentUnchecked(flowEntries,
+ deviceId, lazyEmptyFlowTable());
+ }
+
+ private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
+ final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
+ Set<StoredFlowEntry> r = flowTable.get(flowId);
+ if (r == null) {
+ final Set<StoredFlowEntry> concurrentlyAdded;
+ r = new CopyOnWriteArraySet<>();
+ concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
+ if (concurrentlyAdded != null) {
+ return concurrentlyAdded;
+ }
+ }
+ return r;
+ }
+
+ private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
+ for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
+ if (f.equals(rule)) {
+ return f;
+ }
+ }
+ return null;
+ }
+
+ private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
+ return getFlowTable(deviceId).values().stream()
+ .flatMap((list -> list.stream())).collect(Collectors.toSet());
+
+ }
+
+
+ public StoredFlowEntry getFlowEntry(FlowRule rule) {
+ return getFlowEntryInternal(rule);
+ }
+
+ public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
+ return getFlowEntriesInternal(deviceId);
+ }
+
+ public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
+ return getFlowEntriesInternal(entry.deviceId(), entry.id());
+ }
+
+ public void add(FlowEntry rule) {
+ ((CopyOnWriteArraySet)
+ getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
+ }
+
+ public boolean remove(DeviceId deviceId, FlowEntry rule) {
+ return ((CopyOnWriteArraySet)
+ getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
+ //return flowEntries.remove(deviceId, rule);
+ }
+
+ public void clearDevice(DeviceId did) {
+ flowEntries.remove(did);
+ }
+ }
+
+
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java
index a21d73a..79df272 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java
@@ -34,4 +34,7 @@
public static final MessageSubject REMOVE_FLOW_ENTRY
= new MessageSubject("peer-forward-remove-flow-entry");
+
+ public static final MessageSubject REMOTE_APPLY_COMPLETED
+ = new MessageSubject("peer-apply-completed");
}
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 289efe3..7c18820 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -59,15 +59,17 @@
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
+import org.onosproject.net.flow.FlowRuleBatchEvent;
import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.FlowRuleBatchRequest;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.criteria.Criteria;
import org.onosproject.net.flow.criteria.Criterion;
@@ -162,6 +164,7 @@
.register(Collections.emptySet().getClass())
.register(Optional.class)
.register(Collections.emptyList().getClass())
+ .register(Collections.unmodifiableSet(Collections.emptySet()).getClass())
.build();
/**
@@ -255,6 +258,9 @@
L3ModificationInstruction.L3SubType.class,
L3ModificationInstruction.ModIPInstruction.class,
RoleInfo.class,
+ FlowRuleBatchEvent.class,
+ FlowRuleBatchEvent.Type.class,
+ FlowRuleBatchRequest.class,
FlowRuleBatchOperation.class,
CompletedBatchOperation.class,
FlowRuleBatchEntry.class,
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleFlowRuleStore.java
index a8e15a2..177618d 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleFlowRuleStore.java
@@ -21,13 +21,13 @@
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.FluentIterable;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.SettableFuture;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.net.DeviceId;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
@@ -46,7 +46,6 @@
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.store.AbstractStore;
-import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.util.ArrayList;
@@ -56,7 +55,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -261,13 +259,14 @@
}
@Override
- public Future<CompletedBatchOperation> storeBatch(
- FlowRuleBatchOperation batchOperation) {
+ public void storeBatch(
+ FlowRuleBatchOperation operation) {
List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
- for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
- final FlowRule flowRule = entry.target();
- if (entry.operator().equals(FlowRuleOperation.ADD)) {
+
+ for (FlowRuleBatchEntry entry : operation.getOperations()) {
+ final FlowRule flowRule = entry.getTarget();
+ if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
if (!getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
storeFlowRule(flowRule);
toAdd.add(entry);
@@ -283,21 +282,27 @@
}
if (toAdd.isEmpty() && toRemove.isEmpty()) {
- return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(),
+ operation.deviceId())));
+ return;
}
SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
final int batchId = localBatchIdGen.incrementAndGet();
pendingFutures.put(batchId, r);
- notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
- return r;
+ toAdd.addAll(toRemove);
+ notifyDelegate(FlowRuleBatchEvent.requested(
+ new FlowRuleBatchRequest(batchId, Sets.newHashSet(toAdd)), operation.deviceId()));
+
}
@Override
public void batchOperationComplete(FlowRuleBatchEvent event) {
- final Integer batchId = event.subject().batchId();
+ final Long batchId = event.subject().batchId();
SettableFuture<CompletedBatchOperation> future
= pendingFutures.getIfPresent(batchId);
if (future != null) {
diff --git a/providers/null/device/src/main/java/org/onosproject/provider/nil/device/impl/NullDeviceProvider.java b/providers/null/device/src/main/java/org/onosproject/provider/nil/device/impl/NullDeviceProvider.java
index b2b0b51..fb17bf9 100644
--- a/providers/null/device/src/main/java/org/onosproject/provider/nil/device/impl/NullDeviceProvider.java
+++ b/providers/null/device/src/main/java/org/onosproject/provider/nil/device/impl/NullDeviceProvider.java
@@ -116,7 +116,7 @@
@Activate
public void activate(ComponentContext context) {
providerService = providerRegistry.register(this);
- if (modified(context)) {
+ if (!modified(context)) {
deviceBuilder.submit(new DeviceCreator(true));
}
log.info("Started");
@@ -173,6 +173,9 @@
chgd |= true;
}
log.info("Using settings numDevices={}, numPorts={}", numDevices, numPorts);
+ if (chgd) {
+ deviceBuilder.submit(new DeviceCreator(true));
+ }
return chgd;
}
diff --git a/providers/null/flow/src/main/java/org/onosproject/provider/nil/flow/impl/NullFlowRuleProvider.java b/providers/null/flow/src/main/java/org/onosproject/provider/nil/flow/impl/NullFlowRuleProvider.java
index e882c76..0686349 100644
--- a/providers/null/flow/src/main/java/org/onosproject/provider/nil/flow/impl/NullFlowRuleProvider.java
+++ b/providers/null/flow/src/main/java/org/onosproject/provider/nil/flow/impl/NullFlowRuleProvider.java
@@ -15,9 +15,7 @@
*/
package org.onosproject.provider.nil.flow.impl;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -29,12 +27,12 @@
import org.onlab.util.Timer;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.flow.BatchOperation;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleProvider;
import org.onosproject.net.flow.FlowRuleProviderRegistry;
import org.onosproject.net.flow.FlowRuleProviderService;
@@ -43,7 +41,9 @@
import org.slf4j.Logger;
import java.util.Collections;
-import java.util.concurrent.Future;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import static org.slf4j.LoggerFactory.getLogger;
@@ -59,7 +59,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleProviderRegistry providerRegistry;
- private Multimap<DeviceId, FlowEntry> flowTable = HashMultimap.create();
+ private ConcurrentMap<DeviceId, Set<FlowEntry>> flowTable = new ConcurrentHashMap<>();
private FlowRuleProviderService providerService;
@@ -88,18 +88,10 @@
}
@Override
- public void applyFlowRule(FlowRule... flowRules) {
- for (int i = 0; i < flowRules.length; i++) {
- flowTable.put(flowRules[i].deviceId(), new DefaultFlowEntry(flowRules[i]));
- }
- }
+ public void applyFlowRule(FlowRule... flowRules) {}
@Override
- public void removeFlowRule(FlowRule... flowRules) {
- for (int i = 0; i < flowRules.length; i++) {
- flowTable.remove(flowRules[i].deviceId(), flowRules[i]);
- }
- }
+ public void removeFlowRule(FlowRule... flowRules) {}
@Override
public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
@@ -107,26 +99,32 @@
}
@Override
- public Future<CompletedBatchOperation> executeBatch(
- BatchOperation<FlowRuleBatchEntry> batch) {
+ public void executeBatch(
+ FlowRuleBatchOperation batch) {
+ Set<FlowEntry> flowRules = flowTable.getOrDefault(batch.deviceId(), Sets.newConcurrentHashSet());
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
switch (fbe.operator()) {
case ADD:
- applyFlowRule(fbe.target());
+ flowRules.add(new DefaultFlowEntry(fbe.target()));
break;
case REMOVE:
- removeFlowRule(fbe.target());
+ flowRules.remove(new DefaultFlowEntry(fbe.target()));
break;
case MODIFY:
- removeFlowRule(fbe.target());
- applyFlowRule(fbe.target());
+ FlowEntry entry = new DefaultFlowEntry(fbe.target());
+ flowRules.remove(entry);
+ flowRules.add(entry);
break;
default:
log.error("Unknown flow operation: {}", fbe);
}
}
- return Futures.immediateFuture(
- new CompletedBatchOperation(true, Collections.emptySet()));
+ flowTable.put(batch.deviceId(), flowRules);
+ providerService.batchOperationCompleted(batch.id(),
+ new CompletedBatchOperation(
+ true,
+ Collections.emptySet(),
+ batch.deviceId()));
}
private class StatisticTask implements TimerTask {
@@ -134,10 +132,11 @@
@Override
public void run(Timeout to) throws Exception {
for (DeviceId devId : flowTable.keySet()) {
- providerService.pushFlowMetrics(devId, flowTable.get(devId));
+ providerService.pushFlowMetrics(devId,
+ flowTable.getOrDefault(devId, Collections.emptySet()));
}
-
timeout = timer.newTimeout(to.getTask(), 5, TimeUnit.SECONDS);
+
}
}
}
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowEntryBuilder.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowEntryBuilder.java
index 04ff897..8f2b5f4 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowEntryBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowEntryBuilder.java
@@ -35,6 +35,7 @@
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.openflow.controller.Dpid;
+import org.projectfloodlight.openflow.protocol.OFFlowMod;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.OFInstructionType;
@@ -74,13 +75,16 @@
private final OFFlowStatsEntry stat;
private final OFFlowRemoved removed;
+ private final OFFlowMod flowMod;
private final Match match;
private final List<OFAction> actions;
private final Dpid dpid;
- private final boolean addedRule;
+ public enum FlowType { STAT, REMOVED, MOD }
+
+ private final FlowType type;
public FlowEntryBuilder(Dpid dpid, OFFlowStatsEntry entry) {
@@ -89,7 +93,8 @@
this.actions = getActions(entry);
this.dpid = dpid;
this.removed = null;
- this.addedRule = true;
+ this.flowMod = null;
+ this.type = FlowType.STAT;
}
public FlowEntryBuilder(Dpid dpid, OFFlowRemoved removed) {
@@ -99,26 +104,48 @@
this.dpid = dpid;
this.actions = null;
this.stat = null;
- this.addedRule = false;
+ this.flowMod = null;
+ this.type = FlowType.REMOVED;
}
- public FlowEntry build() {
- if (addedRule) {
- FlowRule rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
- buildSelector(), buildTreatment(), stat.getPriority(),
- stat.getCookie().getValue(), stat.getIdleTimeout(), false);
- return new DefaultFlowEntry(rule, FlowEntryState.ADDED,
- stat.getDurationSec(), stat.getPacketCount().getValue(),
- stat.getByteCount().getValue());
+ public FlowEntryBuilder(Dpid dpid, OFFlowMod fm) {
+ this.match = fm.getMatch();
+ this.dpid = dpid;
+ this.actions = fm.getActions();
+ this.type = FlowType.MOD;
+ this.flowMod = fm;
+ this.stat = null;
+ this.removed = null;
+ }
- } else {
- FlowRule rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
- buildSelector(), null, removed.getPriority(),
- removed.getCookie().getValue(), removed.getIdleTimeout(), false);
- return new DefaultFlowEntry(rule, FlowEntryState.REMOVED, removed.getDurationSec(),
- removed.getPacketCount().getValue(), removed.getByteCount().getValue());
+ public FlowEntry build(FlowEntryState... state) {
+ FlowRule rule;
+ switch (this.type) {
+ case STAT:
+ rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
+ buildSelector(), buildTreatment(), stat.getPriority(),
+ stat.getCookie().getValue(), stat.getIdleTimeout(), false);
+ return new DefaultFlowEntry(rule, FlowEntryState.ADDED,
+ stat.getDurationSec(), stat.getPacketCount().getValue(),
+ stat.getByteCount().getValue());
+ case REMOVED:
+ rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
+ buildSelector(), null, removed.getPriority(),
+ removed.getCookie().getValue(), removed.getIdleTimeout(), false);
+ return new DefaultFlowEntry(rule, FlowEntryState.REMOVED, removed.getDurationSec(),
+ removed.getPacketCount().getValue(), removed.getByteCount().getValue());
+ case MOD:
+ FlowEntryState flowState = state.length > 0 ? state[0] : FlowEntryState.FAILED;
+ rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
+ buildSelector(), buildTreatment(), flowMod.getPriority(),
+ flowMod.getCookie().getValue(), flowMod.getIdleTimeout(), false);
+ return new DefaultFlowEntry(rule, flowState, 0, 0, 0);
+ default:
+ log.error("Unknown flow type : {}", this.type);
+ return null;
}
+
}
private List<OFAction> getActions(OFFlowStatsEntry entry) {
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index dd0bcaa..70d35aa 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -15,25 +15,13 @@
*/
package org.onosproject.provider.of.flow.impl;
-import static com.google.common.base.Preconditions.checkState;
-import static org.slf4j.LoggerFactory.getLogger;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -41,19 +29,16 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.flow.BatchOperation;
import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleProvider;
import org.onosproject.net.flow.FlowRuleProviderRegistry;
import org.onosproject.net.flow.FlowRuleProviderService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
-import org.onosproject.net.topology.TopologyService;
import org.onosproject.openflow.controller.Dpid;
import org.onosproject.openflow.controller.OpenFlowController;
import org.onosproject.openflow.controller.OpenFlowEventListener;
@@ -63,6 +48,7 @@
import org.projectfloodlight.openflow.protocol.OFActionType;
import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
+import org.projectfloodlight.openflow.protocol.OFErrorType;
import org.projectfloodlight.openflow.protocol.OFFlowMod;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
@@ -75,21 +61,22 @@
import org.projectfloodlight.openflow.protocol.OFVersion;
import org.projectfloodlight.openflow.protocol.action.OFAction;
import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadActionErrorMsg;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadInstructionErrorMsg;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadMatchErrorMsg;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
import org.slf4j.Logger;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
/**
* Provider which uses an OpenFlow controller to detect network
@@ -98,8 +85,6 @@
@Component(immediate = true)
public class OpenFlowRuleProvider extends AbstractProvider implements FlowRuleProvider {
- enum BatchState { STARTED, FINISHED, CANCELLED }
-
private static final int LOWEST_PRIORITY = 0;
private final Logger log = getLogger(getClass());
@@ -110,22 +95,15 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenFlowController controller;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected TopologyService topologyService;
private FlowRuleProviderService providerService;
private final InternalFlowProvider listener = new InternalFlowProvider();
- // FIXME: This should be an expiring map to ensure futures that don't have
- // a future eventually get garbage collected.
- private final Map<Long, InstallationFuture> pendingFutures = new ConcurrentHashMap<>();
-
- private final Map<Long, InstallationFuture> pendingFMs = new ConcurrentHashMap<>();
+ private Cache<Long, InternalCacheEntry> pendingBatches;
private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
- private final AtomicLong xidCounter = new AtomicLong(1);
/**
* Creates an OpenFlow host provider.
@@ -140,6 +118,16 @@
controller.addListener(listener);
controller.addEventListener(listener);
+ pendingBatches = CacheBuilder.newBuilder()
+ .expireAfterWrite(10, TimeUnit.SECONDS)
+ .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
+ if (notification.getCause() == RemovalCause.EXPIRED) {
+ providerService.batchOperationCompleted(notification.getKey(),
+ notification.getValue().failedCompletion());
+ }
+ }).build();
+
+
for (OpenFlowSwitch sw : controller.getSwitches()) {
FlowStatsCollector fsc = new FlowStatsCollector(sw, POLL_INTERVAL);
fsc.start();
@@ -160,8 +148,8 @@
@Override
public void applyFlowRule(FlowRule... flowRules) {
- for (int i = 0; i < flowRules.length; i++) {
- applyRule(flowRules[i]);
+ for (FlowRule flowRule : flowRules) {
+ applyRule(flowRule);
}
}
@@ -179,8 +167,8 @@
@Override
public void removeFlowRule(FlowRule... flowRules) {
- for (int i = 0; i < flowRules.length; i++) {
- removeRule(flowRules[i]);
+ for (FlowRule flowRule : flowRules) {
+ removeRule(flowRule);
}
}
@@ -203,36 +191,20 @@
}
@Override
- public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
- final Set<Dpid> sws = Sets.newConcurrentHashSet();
- final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<>();
- /*
- * Use identity hash map for reference equality as we could have equal
- * flow mods for different switches.
- */
- Map<OFFlowMod, OpenFlowSwitch> mods = Maps.newIdentityHashMap();
- Map<OFFlowMod, OpenFlowSwitch.TableType> modTypes = Maps.newIdentityHashMap();
+ public void executeBatch(FlowRuleBatchOperation batch) {
+
+ pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
+
+
+ OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId().uri()));
+ OFFlowMod mod;
+
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
- FlowRule flowRule = fbe.target();
- final Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
- OpenFlowSwitch sw = controller.getSwitch(dpid);
- if (sw == null) {
- /*
- * if a switch we are supposed to install to is gone then
- * cancel (ie. rollback) the work that has been done so far
- * and return the associated future.
- */
- InstallationFuture failed = new InstallationFuture(sws, fmXids);
- failed.cancel(true);
- return failed;
- }
- sws.add(dpid);
- final Long flowModXid = xidCounter.getAndIncrement();
+
FlowModBuilder builder =
- FlowModBuilder.builder(flowRule, sw.factory(),
- Optional.of(flowModXid));
- OFFlowMod mod = null;
+ FlowModBuilder.builder(fbe.target(), sw.factory(),
+ Optional.of(batch.id()));
switch (fbe.operator()) {
case ADD:
mod = builder.buildFlowAdd();
@@ -244,34 +216,16 @@
mod = builder.buildFlowMod();
break;
default:
- log.error("Unsupported batch operation {}", fbe.operator());
- }
- if (mod != null) {
- mods.put(mod, sw);
- modTypes.put(mod, getTableType(flowRule.type()));
- fmXids.put(flowModXid, fbe);
- } else {
- log.error("Conversion of flowrule {} failed.", flowRule);
- }
+ log.error("Unsupported batch operation {}; skipping flowmod {}",
+ fbe.operator(), fbe);
+ continue;
+ }
+ sw.sendMsg(mod);
}
- InstallationFuture installation = new InstallationFuture(sws, fmXids);
- for (Long xid : fmXids.keySet()) {
- pendingFMs.put(xid, installation);
- }
-
- pendingFutures.put(installation.xid(), installation);
- for (Map.Entry<OFFlowMod, OpenFlowSwitch> entry : mods.entrySet()) {
- OpenFlowSwitch sw = entry.getValue();
- OFFlowMod mod = entry.getKey();
- OpenFlowSwitch.TableType tableType = modTypes.get(mod);
- if (tableType == OpenFlowSwitch.TableType.NONE) {
- sw.sendMsg(mod);
- } else {
- sw.sendMsg(mod, tableType);
- }
- }
- installation.verify();
- return installation;
+ OFBarrierRequest.Builder builder = sw.factory()
+ .buildBarrierRequest()
+ .setXid(batch.id());
+ sw.sendMsg(builder.build());
}
private OpenFlowSwitch.TableType getTableType(FlowRule.Type type) {
@@ -287,13 +241,12 @@
}
}
+
+
+
private class InternalFlowProvider
implements OpenFlowSwitchListener, OpenFlowEventListener {
-
- private final Multimap<DeviceId, FlowEntry> completeEntries =
- ArrayListMultimap.create();
-
@Override
public void switchAdded(Dpid dpid) {
FlowStatsCollector fsc = new FlowStatsCollector(controller.getSwitch(dpid), POLL_INTERVAL);
@@ -320,7 +273,6 @@
@Override
public void handleMessage(Dpid dpid, OFMessage msg) {
- InstallationFuture future = null;
switch (msg.getType()) {
case FLOW_REMOVED:
OFFlowRemoved removed = (OFFlowRemoved) msg;
@@ -334,22 +286,42 @@
}
break;
case BARRIER_REPLY:
- future = pendingFutures.get(msg.getXid());
- if (future != null) {
- future.satisfyRequirement(dpid);
- } else {
- log.warn("Received unknown Barrier Reply: {}", msg.getXid());
+ try {
+ InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
+ if (entry != null) {
+ providerService.batchOperationCompleted(msg.getXid(), entry.completed());
+ } else {
+ log.warn("Received unknown Barrier Reply: {}", msg.getXid());
+ }
+ } finally {
+ pendingBatches.invalidate(msg.getXid());
}
break;
case ERROR:
log.warn("received Error message {} from {}", msg, dpid);
- future = pendingFMs.get(msg.getXid());
- if (future != null) {
- future.fail((OFErrorMsg) msg, dpid);
+
+ OFErrorMsg error = (OFErrorMsg) msg;
+ if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
+ OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
+ if (fmFailed.getData().getParsedMessage().isPresent()) {
+ OFMessage m = fmFailed.getData().getParsedMessage().get();
+ OFFlowMod fm = (OFFlowMod) m;
+ InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
+ if (entry != null) {
+ entry.appendFailure(new FlowEntryBuilder(dpid, fm).build());
+ } else {
+ log.error("No matching batch for this error: {}", error);
+ }
+ } else {
+ //FIXME: Potentially add flowtracking to avoid this message.
+ log.error("Flow installation failed but switch didn't" +
+ " tell us which one.");
+ }
} else {
- log.warn("Received unknown Error Reply: {} {}", msg.getXid(), msg);
+ log.warn("Received error {}", error);
}
- break;
+
+
default:
log.debug("Unhandled message type: {}", msg.getType());
}
@@ -402,198 +374,50 @@
}
- private class InstallationFuture implements Future<CompletedBatchOperation> {
+ /**
+ * The internal cache entry holding the original request as well
+ * as accumulating the any failures along the way.
+ *
+ * If this entry is evicted from the cache then the entire operation
+ * is considered failed. Otherwise, only the failures reported by the device
+ * will be propagated up.
+ */
+ private class InternalCacheEntry {
- // barrier xid
- private final Long xid;
- // waiting for barrier reply from...
- private final Set<Dpid> sws;
- private final AtomicBoolean ok = new AtomicBoolean(true);
- // FlowMod xid ->
- private final Map<Long, FlowRuleBatchEntry> fms;
+ private final FlowRuleBatchOperation operation;
+ private final Set<FlowRule> failures = Sets.newConcurrentHashSet();
-
- private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
- // Failed batch operation id
- private Long failedId;
-
- private final CountDownLatch countDownLatch;
- private BatchState state;
-
- public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
- this.xid = xidCounter.getAndIncrement();
- this.state = BatchState.STARTED;
- this.sws = sws;
- this.fms = fmXids;
- countDownLatch = new CountDownLatch(sws.size());
+ public InternalCacheEntry(FlowRuleBatchOperation operation) {
+ this.operation = operation;
}
- public Long xid() {
- return xid;
+ /**
+ * Appends a failed rule to the set of failed items.
+ * @param rule the failed rule
+ */
+ public void appendFailure(FlowRule rule) {
+ failures.add(rule);
}
- public void fail(OFErrorMsg msg, Dpid dpid) {
-
- ok.set(false);
- FlowEntry fe = null;
- FlowRuleBatchEntry fbe = fms.get(msg.getXid());
- failedId = fbe.id();
- FlowRule offending = fbe.target();
- //TODO handle specific error msgs
- switch (msg.getErrType()) {
- case BAD_ACTION:
- OFBadActionErrorMsg bad = (OFBadActionErrorMsg) msg;
- fe = new DefaultFlowEntry(offending, bad.getErrType().ordinal(),
- bad.getCode().ordinal());
- break;
- case BAD_INSTRUCTION:
- OFBadInstructionErrorMsg badins = (OFBadInstructionErrorMsg) msg;
- fe = new DefaultFlowEntry(offending, badins.getErrType().ordinal(),
- badins.getCode().ordinal());
- break;
- case BAD_MATCH:
- OFBadMatchErrorMsg badMatch = (OFBadMatchErrorMsg) msg;
- fe = new DefaultFlowEntry(offending, badMatch.getErrType().ordinal(),
- badMatch.getCode().ordinal());
- break;
- case BAD_REQUEST:
- OFBadRequestErrorMsg badReq = (OFBadRequestErrorMsg) msg;
- fe = new DefaultFlowEntry(offending, badReq.getErrType().ordinal(),
- badReq.getCode().ordinal());
- break;
- case FLOW_MOD_FAILED:
- OFFlowModFailedErrorMsg fmFail = (OFFlowModFailedErrorMsg) msg;
- fe = new DefaultFlowEntry(offending, fmFail.getErrType().ordinal(),
- fmFail.getCode().ordinal());
- break;
- case EXPERIMENTER:
- case GROUP_MOD_FAILED:
- case HELLO_FAILED:
- case METER_MOD_FAILED:
- case PORT_MOD_FAILED:
- case QUEUE_OP_FAILED:
- case ROLE_REQUEST_FAILED:
- case SWITCH_CONFIG_FAILED:
- case TABLE_FEATURES_FAILED:
- case TABLE_MOD_FAILED:
- fe = new DefaultFlowEntry(offending, msg.getErrType().ordinal(), 0);
- break;
- default:
- log.error("Unknown error type {}", msg.getErrType());
-
- }
- offendingFlowMods.add(fe);
-
- removeRequirement(dpid);
+ /**
+ * Fails the entire batch and returns the failed operation.
+ * @return the failed operation
+ */
+ public CompletedBatchOperation failedCompletion() {
+ Set<FlowRule> fails = operation.getOperations().stream()
+ .map(op -> op.target()).collect(Collectors.toSet());
+ return new CompletedBatchOperation(false, Collections.unmodifiableSet(fails), operation.deviceId());
}
-
- public void satisfyRequirement(Dpid dpid) {
- log.debug("Satisfaction from switch {}", dpid);
- removeRequirement(dpid);
+ /**
+ * Returns the completed operation and whether the batch suceeded.
+ * @return the completed operation
+ */
+ public CompletedBatchOperation completed() {
+ return new CompletedBatchOperation(failures.isEmpty(),
+ Collections.unmodifiableSet(failures), operation.deviceId());
}
-
- public void verify() {
- checkState(!sws.isEmpty());
- for (Dpid dpid : sws) {
- OpenFlowSwitch sw = controller.getSwitch(dpid);
- OFBarrierRequest.Builder builder = sw.factory()
- .buildBarrierRequest()
- .setXid(xid);
- sw.sendMsg(builder.build());
- }
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (isDone()) {
- return false;
- }
- ok.set(false);
- this.state = BatchState.CANCELLED;
- cleanUp();
- for (FlowRuleBatchEntry fbe : fms.values()) {
- if (fbe.operator() == FlowRuleOperation.ADD ||
- fbe.operator() == FlowRuleOperation.MODIFY) {
- removeFlowRule(fbe.target());
- } else if (fbe.operator() == FlowRuleOperation.REMOVE) {
- applyRule(fbe.target());
- }
-
- }
- return true;
- }
-
- @Override
- public boolean isCancelled() {
- return this.state == BatchState.CANCELLED;
- }
-
- @Override
- public boolean isDone() {
- return this.state == BatchState.FINISHED || isCancelled();
- }
-
- @Override
- public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
- countDownLatch.await();
- this.state = BatchState.FINISHED;
- Set<Long> failedIds = (failedId != null) ? Sets.newHashSet(failedId) : Collections.emptySet();
- CompletedBatchOperation result =
- new CompletedBatchOperation(ok.get(), offendingFlowMods, failedIds);
- //FIXME do cleanup here (moved by BOC)
- cleanUp();
- return result;
- }
-
- @Override
- public CompletedBatchOperation get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException,
- TimeoutException {
- if (countDownLatch.await(timeout, unit)) {
- this.state = BatchState.FINISHED;
- Set<Long> failedIds = (failedId != null) ? Sets.newHashSet(failedId) : Collections.emptySet();
- CompletedBatchOperation result =
- new CompletedBatchOperation(ok.get(), offendingFlowMods, failedIds);
- // FIXME do cleanup here (moved by BOC)
- cleanUp();
- return result;
- }
- throw new TimeoutException(this.toString());
- }
-
- private void cleanUp() {
- if (isDone() || isCancelled()) {
- pendingFutures.remove(xid);
- for (Long xid : fms.keySet()) {
- pendingFMs.remove(xid);
- }
- }
- }
-
- private void removeRequirement(Dpid dpid) {
- countDownLatch.countDown();
- sws.remove(dpid);
- //FIXME don't do cleanup here (moved by BOC)
- //cleanUp();
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("xid", xid)
- .add("pending devices", sws)
- .add("devices in batch",
- fms.values().stream()
- .map((fbe) -> fbe.target().deviceId())
- .distinct().collect(Collectors.toList()))
- .add("failedId", failedId)
- .add("latchCount", countDownLatch.getCount())
- .add("state", state)
- .add("no error?", ok.get())
- .toString();
- }
}
}
diff --git a/tools/package/etc/org.onosproject.provider.nil.device.impl.NullDeviceProvider.cfg b/tools/package/etc/org.onosproject.provider.nil.device.impl.NullDeviceProvider.cfg
index 375ed48..1c3a009 100644
--- a/tools/package/etc/org.onosproject.provider.nil.device.impl.NullDeviceProvider.cfg
+++ b/tools/package/etc/org.onosproject.provider.nil.device.impl.NullDeviceProvider.cfg
@@ -2,7 +2,7 @@
# Instance-specific configurations, in this case, the number of
# devices per node.
#
-devConfigs = 192.168.56.30:5,192.168.56.40:7
+devConfigs = 192.168.97.132:5,192.168.97.131:5
#
# Number of ports per device. This is global to all devices
diff --git a/tools/package/etc/org.onosproject.provider.nil.link.impl.NullLinkProvider.cfg b/tools/package/etc/org.onosproject.provider.nil.link.impl.NullLinkProvider.cfg
index afc3fd9..a1b1d16 100644
--- a/tools/package/etc/org.onosproject.provider.nil.link.impl.NullLinkProvider.cfg
+++ b/tools/package/etc/org.onosproject.provider.nil.link.impl.NullLinkProvider.cfg
@@ -9,4 +9,4 @@
#
# Set order of islands to chain together, in a line.
#
-neighbors = 192.168.56.20,192.168.56.30,192.168.56.40
+neighbors = 192.168.97.132,192.168.97.131