DistributedFlowRuleStore related fixes
- handle no master for Device case
- Changed failed item type to FlowRule
Change-Id: If6c85751759cf6ba9ab0ed0384cbe1bf08a5d572
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 18d5f1d..7a917a7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -26,6 +26,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -172,12 +173,36 @@
.softValues()
.build(new SMapLoader());
+ final NodeId local = clusterService.getLocalNode().id();
+
clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
@Override
public void handle(final ClusterMessage message) {
FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
log.info("received batch request {}", operation);
+
+ final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+ if (!local.equals(replicaInfo.master().orNull())) {
+
+ Set<FlowRule> failures = new HashSet<>(operation.size());
+ for (FlowRuleBatchEntry op : operation.getOperations()) {
+ failures.add(op.getTarget());
+ }
+ CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
+ // This node is no longer the master, respond as all failed.
+ // TODO: we might want to wrap response in envelope
+ // to distinguish sw programming failure and hand over
+ // it make sense in the latter case to retry immediately.
+ try {
+ message.respond(SERIALIZER.encode(allFailed));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ return;
+ }
+
final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
f.addListener(new Runnable() {
@@ -256,6 +281,14 @@
@Override
public synchronized FlowEntry getFlowEntry(FlowRule rule) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+
+ if (!replicaInfo.master().isPresent()) {
+ log.warn("No master for {}", rule);
+ // TODO: revisit if this should be returning null.
+ // FIXME: throw a FlowStoreException
+ throw new RuntimeException("No master for " + rule);
+ }
+
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getFlowEntryInternal(rule);
}
@@ -290,6 +323,14 @@
public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+
+ if (!replicaInfo.master().isPresent()) {
+ log.warn("No master for {}", deviceId);
+ // TODO: revisit if this should be returning empty collection.
+ // FIXME: throw a FlowStoreException
+ throw new RuntimeException("No master for " + deviceId);
+ }
+
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getFlowEntriesInternal(deviceId);
}
@@ -329,14 +370,22 @@
public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
if (operation.getOperations().isEmpty()) {
- return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+ return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
}
DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
- if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+ if (!replicaInfo.master().isPresent()) {
+ log.warn("No master for {}", deviceId);
+ // TODO: revisit if this should be "success" from Future point of view
+ // with every FlowEntry failed
+ return Futures.immediateFailedFuture(new IOException("No master to forward to"));
+ }
+
+ final NodeId local = clusterService.getLocalNode().id();
+ if (replicaInfo.master().get().equals(local)) {
return storeBatchInternal(operation);
}
@@ -344,7 +393,7 @@
replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
+ local,
APPLY_BATCH_FLOWS,
SERIALIZER.encode(operation));
@@ -367,7 +416,6 @@
final List<StoredFlowEntry> toAdd = new ArrayList<>();
DeviceId did = null;
-
for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
FlowRule flowRule = batchEntry.getTarget();
FlowRuleOperation op = batchEntry.getOperator();
@@ -390,7 +438,7 @@
}
}
if (toAdd.isEmpty() && toRemove.isEmpty()) {
- return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+ return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
}
// create remote backup copies
@@ -434,7 +482,8 @@
@Override
public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
- if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+ final NodeId localId = clusterService.getLocalNode().id();
+ if (localId.equals(replicaInfo.master().orNull())) {
return addOrUpdateFlowRuleInternal(rule);
}
@@ -471,7 +520,9 @@
@Override
public FlowRuleEvent removeFlowRule(FlowEntry rule) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
- if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+
+ final NodeId localId = clusterService.getLocalNode().id();
+ if (localId.equals(replicaInfo.master().orNull())) {
// bypass and handle it locally
return removeFlowRuleInternal(rule);
}