DistributedFlowRuleStore related fixes
- handle no master for Device case
- Changed failed item type to FlowRule
Change-Id: If6c85751759cf6ba9ab0ed0384cbe1bf08a5d572
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
index e78ebe0..3758ea9 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
@@ -19,13 +19,13 @@
import com.google.common.collect.ImmutableSet;
-public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> {
+public class CompletedBatchOperation implements BatchOperationResult<FlowRule> {
private final boolean success;
- private final Set<FlowEntry> failures;
+ private final Set<FlowRule> failures;
- public CompletedBatchOperation(boolean success, Set<FlowEntry> failures) {
+ public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures) {
this.success = success;
this.failures = ImmutableSet.copyOf(failures);
}
@@ -36,7 +36,7 @@
}
@Override
- public Set<FlowEntry> failedItems() {
+ public Set<FlowRule> failedItems() {
return failures;
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 3791296..4fe9022 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -395,7 +395,7 @@
break;
case BATCH_OPERATION_COMPLETED:
- Set<FlowEntry> failedItems = event.result().failedItems();
+ Set<FlowRule> failedItems = event.result().failedItems();
for (FlowEntry entry : request.toAdd()) {
if (!failedItems.contains(entry)) {
eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry));
@@ -463,7 +463,7 @@
}
boolean success = true;
- Set<FlowEntry> failed = Sets.newHashSet();
+ Set<FlowRule> failed = Sets.newHashSet();
CompletedBatchOperation completed;
for (Future<CompletedBatchOperation> future : futures) {
completed = future.get();
@@ -483,7 +483,7 @@
return overall;
}
boolean success = true;
- Set<FlowEntry> failed = Sets.newHashSet();
+ Set<FlowRule> failed = Sets.newHashSet();
CompletedBatchOperation completed;
long start = System.nanoTime();
long end = start + unit.toNanos(timeout);
@@ -497,7 +497,7 @@
return finalizeBatchOperation(success, failed);
}
- private boolean validateBatchOperation(Set<FlowEntry> failed,
+ private boolean validateBatchOperation(Set<FlowRule> failed,
CompletedBatchOperation completed) {
if (isCancelled()) {
@@ -519,7 +519,7 @@
}
private CompletedBatchOperation finalizeBatchOperation(boolean success,
- Set<FlowEntry> failed) {
+ Set<FlowRule> failed) {
synchronized (this) {
if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
if (state.get() == BatchState.FINISHED) {
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index 2bb0965..b986d6d 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -555,7 +555,7 @@
@Override
public CompletedBatchOperation get()
throws InterruptedException, ExecutionException {
- return new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet());
+ return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
}
@Override
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 18d5f1d..7a917a7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -26,6 +26,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -172,12 +173,36 @@
.softValues()
.build(new SMapLoader());
+ final NodeId local = clusterService.getLocalNode().id();
+
clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
@Override
public void handle(final ClusterMessage message) {
FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
log.info("received batch request {}", operation);
+
+ final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+ if (!local.equals(replicaInfo.master().orNull())) {
+
+ Set<FlowRule> failures = new HashSet<>(operation.size());
+ for (FlowRuleBatchEntry op : operation.getOperations()) {
+ failures.add(op.getTarget());
+ }
+ CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
+ // This node is no longer the master, respond as all failed.
+ // TODO: we might want to wrap response in envelope
+ // to distinguish sw programming failure and hand over
+ // it make sense in the latter case to retry immediately.
+ try {
+ message.respond(SERIALIZER.encode(allFailed));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ return;
+ }
+
final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
f.addListener(new Runnable() {
@@ -256,6 +281,14 @@
@Override
public synchronized FlowEntry getFlowEntry(FlowRule rule) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+
+ if (!replicaInfo.master().isPresent()) {
+ log.warn("No master for {}", rule);
+ // TODO: revisit if this should be returning null.
+ // FIXME: throw a FlowStoreException
+ throw new RuntimeException("No master for " + rule);
+ }
+
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getFlowEntryInternal(rule);
}
@@ -290,6 +323,14 @@
public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+
+ if (!replicaInfo.master().isPresent()) {
+ log.warn("No master for {}", deviceId);
+ // TODO: revisit if this should be returning empty collection.
+ // FIXME: throw a FlowStoreException
+ throw new RuntimeException("No master for " + deviceId);
+ }
+
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getFlowEntriesInternal(deviceId);
}
@@ -329,14 +370,22 @@
public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
if (operation.getOperations().isEmpty()) {
- return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+ return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
}
DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
- if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+ if (!replicaInfo.master().isPresent()) {
+ log.warn("No master for {}", deviceId);
+ // TODO: revisit if this should be "success" from Future point of view
+ // with every FlowEntry failed
+ return Futures.immediateFailedFuture(new IOException("No master to forward to"));
+ }
+
+ final NodeId local = clusterService.getLocalNode().id();
+ if (replicaInfo.master().get().equals(local)) {
return storeBatchInternal(operation);
}
@@ -344,7 +393,7 @@
replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
+ local,
APPLY_BATCH_FLOWS,
SERIALIZER.encode(operation));
@@ -367,7 +416,6 @@
final List<StoredFlowEntry> toAdd = new ArrayList<>();
DeviceId did = null;
-
for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
FlowRule flowRule = batchEntry.getTarget();
FlowRuleOperation op = batchEntry.getOperator();
@@ -390,7 +438,7 @@
}
}
if (toAdd.isEmpty() && toRemove.isEmpty()) {
- return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+ return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
}
// create remote backup copies
@@ -434,7 +482,8 @@
@Override
public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
- if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+ final NodeId localId = clusterService.getLocalNode().id();
+ if (localId.equals(replicaInfo.master().orNull())) {
return addOrUpdateFlowRuleInternal(rule);
}
@@ -471,7 +520,9 @@
@Override
public FlowRuleEvent removeFlowRule(FlowEntry rule) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
- if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+
+ final NodeId localId = clusterService.getLocalNode().id();
+ if (localId.equals(replicaInfo.master().orNull())) {
// bypass and handle it locally
return removeFlowRuleInternal(rule);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
index bd28e47..a7a3a02 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
@@ -28,6 +28,7 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
@@ -191,7 +192,14 @@
@Override
public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
+ final DeviceId deviceId = connectPoint.deviceId();
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+ if (!replicaInfo.master().isPresent()) {
+ log.warn("No master for {}", deviceId);
+ // TODO: revisit if this should be returning empty collection.
+ // FIXME: throw a StatsStoreException
+ throw new RuntimeException("No master for " + deviceId);
+ }
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getCurrentStatisticInternal(connectPoint);
} else {
@@ -219,7 +227,14 @@
@Override
public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
+ final DeviceId deviceId = connectPoint.deviceId();
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+ if (!replicaInfo.master().isPresent()) {
+ log.warn("No master for {}", deviceId);
+ // TODO: revisit if this should be returning empty collection.
+ // FIXME: throw a StatsStoreException
+ throw new RuntimeException("No master for " + deviceId);
+ }
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getPreviousStatisticInternal(connectPoint);
} else {