For ONOS-356
- Add current InstallationFuture information on TimeoutException
- Set timeout values propotional to batch size
- Fix for ConcurrentModificationException
- Check if src/dst is part of the graph before path computation
Change-Id: Iabeac7939c52502b83bf9ebcbe2023539de3ae99
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java
index ef9b5b5..0decb08 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java
@@ -132,7 +132,7 @@
start = System.currentTimeMillis();
service.execute(ops);
try {
- if (latch.await(30, TimeUnit.SECONDS)) {
+ if (latch.await(100 + count * 200, TimeUnit.MILLISECONDS)) {
printResults(count);
} else {
print("Failure: %d intents not installed", latch.getCount());
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
index a7bffe7..597c9cd 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
@@ -17,9 +17,9 @@
import java.util.Collections;
-
import java.util.Set;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
/**
@@ -73,4 +73,12 @@
return failedIds;
}
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("success?", success)
+ .add("failedItems", failures)
+ .add("failedIds", failedIds)
+ .toString();
+ }
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 95bf8c5..3f37c29 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -67,7 +67,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -232,7 +231,7 @@
lastSeen.remove(flowEntry);
FlowEntry stored = store.getFlowEntry(flowEntry);
if (stored == null) {
- log.info("Rule already evicted from store: {}", flowEntry);
+ log.debug("Rule already evicted from store: {}", flowEntry);
return;
}
Device device = deviceService.getDevice(flowEntry.deviceId());
@@ -378,7 +377,8 @@
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements FlowRuleStoreDelegate {
- private static final int TIMEOUT = 5000; // ms
+ // FIXME set appropriate default and make it configurable
+ private static final int TIMEOUT_PER_OP = 500; // ms
// TODO: Right now we only dispatch events at individual flowEntry level.
// It may be more efficient for also dispatch events as a batch.
@@ -407,7 +407,7 @@
public void run() {
CompletedBatchOperation res;
try {
- res = result.get(TIMEOUT, TimeUnit.MILLISECONDS);
+ res = result.get(TIMEOUT_PER_OP * batchOperation.size(), TimeUnit.MILLISECONDS);
store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
} catch (TimeoutException | InterruptedException | ExecutionException e) {
log.warn("Something went wrong with the batch operation {}",
@@ -457,6 +457,10 @@
if (state.get() == BatchState.FINISHED) {
return false;
}
+ if (log.isDebugEnabled()) {
+ log.debug("Cancelling FlowRuleBatchFuture",
+ new RuntimeException("Just printing backtrace"));
+ }
if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
return false;
}
@@ -526,6 +530,7 @@
throw new CancellationException();
}
if (!completed.isSuccess()) {
+ log.warn("FlowRuleBatch failed: {}", completed);
failed.addAll(completed.failedItems());
failedIds.addAll(completed.failedIds());
cleanUpBatch();
@@ -557,9 +562,11 @@
}
private void cleanUpBatch() {
+ log.debug("cleaning up batch");
+ // TODO convert these into a batch?
for (FlowRuleBatchEntry fbe : batches.values()) {
if (fbe.getOperator() == FlowRuleOperation.ADD ||
- fbe.getOperator() == FlowRuleOperation.MODIFY) {
+ fbe.getOperator() == FlowRuleOperation.MODIFY) {
store.deleteFlowRule(fbe.getTarget());
} else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
store.removeFlowRule(new DefaultFlowEntry(fbe.getTarget()));
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index 1cb3210..d93432d 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -734,7 +734,8 @@
private class IntentInstallMonitor implements Runnable {
- private static final long TIMEOUT = 5000; // ms
+ // TODO make this configurable
+ private static final int TIMEOUT_PER_OP = 500; // ms
private static final int MAX_ATTEMPTS = 3;
private final IntentOperations ops;
@@ -742,11 +743,18 @@
private Future<CompletedBatchOperation> future;
private long startTime = System.currentTimeMillis();
- private long endTime = startTime + TIMEOUT;
+ private long endTime;
private int installAttempt;
public IntentInstallMonitor(IntentOperations ops) {
this.ops = ops;
+ resetTimeoutLimit();
+ }
+
+ private void resetTimeoutLimit() {
+ // FIXME compute reasonable timeouts
+ this.endTime = System.currentTimeMillis()
+ + ops.operations().size() * TIMEOUT_PER_OP;
}
private void buildIntentUpdates() {
@@ -805,6 +813,7 @@
*/
private void processFutures() {
if (future == null) {
+ log.warn("I have no Future.");
return; //FIXME look at this
}
try {
@@ -818,9 +827,10 @@
}
private void retry() {
+ log.debug("Execution timed out, retrying.");
if (future.cancel(true)) { // cancel success; batch is reverted
// reset the timer
- endTime = System.currentTimeMillis() + TIMEOUT;
+ resetTimeoutLimit();
if (installAttempt++ >= MAX_ATTEMPTS) {
log.warn("Install request timed out: {}", ops);
for (IntentUpdate update : intentUpdates) {
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
index 7cc5d47..22673b6 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
@@ -169,8 +169,10 @@
linkEvent.subject().isDurable())) {
final LinkKey linkKey = linkKey(linkEvent.subject());
Set<IntentId> intentIds = intentsByLink.get(linkKey);
- log.debug("recompile triggered by LinkDown {} {}", linkKey, intentIds);
- toBeRecompiled.addAll(intentIds);
+ synchronized (intentsByLink) {
+ log.debug("recompile triggered by LinkDown {} {}", linkKey, intentIds);
+ toBeRecompiled.addAll(intentIds);
+ }
}
recompileOnly = recompileOnly &&
(linkEvent.type() == LINK_REMOVED ||
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DefaultTopology.java b/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DefaultTopology.java
index 939e558..3ee7ae2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DefaultTopology.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DefaultTopology.java
@@ -262,9 +262,16 @@
* @return set of shortest paths
*/
Set<Path> getPaths(DeviceId src, DeviceId dst, LinkWeight weight) {
+ final DefaultTopologyVertex srcV = new DefaultTopologyVertex(src);
+ final DefaultTopologyVertex dstV = new DefaultTopologyVertex(dst);
+ Set<TopologyVertex> vertices = graph.getVertexes();
+ if (!vertices.contains(srcV) || !vertices.contains(dstV)) {
+ // src or dst not part of the current graph
+ return ImmutableSet.of();
+ }
+
GraphPathSearch.Result<TopologyVertex, TopologyEdge> result =
- DIJKSTRA.search(graph, new DefaultTopologyVertex(src),
- new DefaultTopologyVertex(dst), weight);
+ DIJKSTRA.search(graph, srcV, dstV, weight);
ImmutableSet.Builder<Path> builder = ImmutableSet.builder();
for (org.onlab.graph.Path<TopologyVertex, TopologyEdge> path : result.paths()) {
builder.add(networkPath(path));
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/DefaultTopology.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/DefaultTopology.java
index 066089a..a938039 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/DefaultTopology.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/DefaultTopology.java
@@ -262,9 +262,16 @@
* @return set of shortest paths
*/
Set<Path> getPaths(DeviceId src, DeviceId dst, LinkWeight weight) {
+ final DefaultTopologyVertex srcV = new DefaultTopologyVertex(src);
+ final DefaultTopologyVertex dstV = new DefaultTopologyVertex(dst);
+ Set<TopologyVertex> vertices = graph.getVertexes();
+ if (!vertices.contains(srcV) || !vertices.contains(dstV)) {
+ // src or dst not part of the current graph
+ return ImmutableSet.of();
+ }
+
GraphPathSearch.Result<TopologyVertex, TopologyEdge> result =
- DIJKSTRA.search(graph, new DefaultTopologyVertex(src),
- new DefaultTopologyVertex(dst), weight);
+ DIJKSTRA.search(graph, srcV, dstV, weight);
ImmutableSet.Builder<Path> builder = ImmutableSet.builder();
for (org.onlab.graph.Path<TopologyVertex, TopologyEdge> path : result.paths()) {
builder.add(networkPath(path));
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index 9f26230..6a6249e 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -15,10 +15,12 @@
*/
package org.onlab.onos.provider.of.flow.impl;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -85,7 +87,9 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -191,9 +195,8 @@
@Override
public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
- final Set<Dpid> sws =
- Collections.newSetFromMap(new ConcurrentHashMap<Dpid, Boolean>());
- final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
+ final Set<Dpid> sws = Sets.newConcurrentHashSet();
+ final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<>();
/*
* Use identity hash map for reference equality as we could have equal
* flow mods for different switches.
@@ -201,7 +204,8 @@
Map<OFFlowMod, OpenFlowSwitch> mods = Maps.newIdentityHashMap();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
FlowRule flowRule = fbe.getTarget();
- OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
+ final Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
+ OpenFlowSwitch sw = controller.getSwitch(dpid);
if (sw == null) {
/*
* if a switch we are supposed to install to is gone then
@@ -212,8 +216,8 @@
failed.cancel(true);
return failed;
}
- sws.add(new Dpid(sw.getId()));
- Long flowModXid = xidCounter.getAndIncrement();
+ sws.add(dpid);
+ final Long flowModXid = xidCounter.getAndIncrement();
FlowModBuilder builder =
FlowModBuilder.builder(flowRule, sw.factory(),
Optional.of(flowModXid));
@@ -302,12 +306,17 @@
future = pendingFutures.get(msg.getXid());
if (future != null) {
future.satisfyRequirement(dpid);
+ } else {
+ log.warn("Received unknown Barrier Reply: {}", msg.getXid());
}
break;
case ERROR:
+ log.warn("received Error message {} from {}", msg, dpid);
future = pendingFMs.get(msg.getXid());
if (future != null) {
future.fail((OFErrorMsg) msg, dpid);
+ } else {
+ log.warn("Received unknown Error Reply: {} {}", msg.getXid(), msg);
}
break;
default:
@@ -369,13 +378,17 @@
private class InstallationFuture implements Future<CompletedBatchOperation> {
+ // barrier xid
private final Long xid;
+ // waiting for barrier reply from...
private final Set<Dpid> sws;
private final AtomicBoolean ok = new AtomicBoolean(true);
+ // FlowMod xid ->
private final Map<Long, FlowRuleBatchEntry> fms;
private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
+ // Failed batch operation id
private Long failedId;
private final CountDownLatch countDownLatch;
@@ -456,6 +469,7 @@
public void verify() {
+ checkState(!sws.isEmpty());
for (Dpid dpid : sws) {
OpenFlowSwitch sw = controller.getSwitch(dpid);
OFBarrierRequest.Builder builder = sw.factory()
@@ -520,7 +534,7 @@
cleanUp();
return result;
}
- throw new TimeoutException();
+ throw new TimeoutException(this.toString());
}
private void cleanUp() {
@@ -538,6 +552,22 @@
//FIXME don't do cleanup here (moved by BOC)
//cleanUp();
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("xid", xid)
+ .add("pending devices", sws)
+ .add("devices in batch",
+ fms.values().stream()
+ .map((fbe) -> fbe.getTarget().deviceId())
+ .distinct().collect(Collectors.toList()))
+ .add("failedId", failedId)
+ .add("latchCount", countDownLatch.getCount())
+ .add("state", state)
+ .add("no error?", ok.get())
+ .toString();
+ }
}
}