For ONOS-356

- Add current InstallationFuture information on TimeoutException
- Set timeout values propotional to batch size
- Fix for ConcurrentModificationException
- Check if src/dst is part of the graph before path computation

Change-Id: Iabeac7939c52502b83bf9ebcbe2023539de3ae99
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java
index ef9b5b5..0decb08 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java
@@ -132,7 +132,7 @@
         start = System.currentTimeMillis();
         service.execute(ops);
         try {
-            if (latch.await(30, TimeUnit.SECONDS)) {
+            if (latch.await(100 + count * 200, TimeUnit.MILLISECONDS)) {
                 printResults(count);
             } else {
                 print("Failure: %d intents not installed", latch.getCount());
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
index a7bffe7..597c9cd 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
@@ -17,9 +17,9 @@
 
 
 import java.util.Collections;
-
 import java.util.Set;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableSet;
 
 /**
@@ -73,4 +73,12 @@
         return failedIds;
     }
 
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("success?", success)
+                .add("failedItems", failures)
+                .add("failedIds", failedIds)
+                .toString();
+    }
 }
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 95bf8c5..3f37c29 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -67,7 +67,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -232,7 +231,7 @@
             lastSeen.remove(flowEntry);
             FlowEntry stored = store.getFlowEntry(flowEntry);
             if (stored == null) {
-                log.info("Rule already evicted from store: {}", flowEntry);
+                log.debug("Rule already evicted from store: {}", flowEntry);
                 return;
             }
             Device device = deviceService.getDevice(flowEntry.deviceId());
@@ -378,7 +377,8 @@
     // Store delegate to re-post events emitted from the store.
     private class InternalStoreDelegate implements FlowRuleStoreDelegate {
 
-        private static final int TIMEOUT = 5000; // ms
+        // FIXME set appropriate default and make it configurable
+        private static final int TIMEOUT_PER_OP = 500; // ms
 
         // TODO: Right now we only dispatch events at individual flowEntry level.
         // It may be more efficient for also dispatch events as a batch.
@@ -407,7 +407,7 @@
                     public void run() {
                         CompletedBatchOperation res;
                         try {
-                            res = result.get(TIMEOUT, TimeUnit.MILLISECONDS);
+                            res = result.get(TIMEOUT_PER_OP * batchOperation.size(), TimeUnit.MILLISECONDS);
                             store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
                         } catch (TimeoutException | InterruptedException | ExecutionException e) {
                             log.warn("Something went wrong with the batch operation {}",
@@ -457,6 +457,10 @@
             if (state.get() == BatchState.FINISHED) {
                 return false;
             }
+            if (log.isDebugEnabled()) {
+                log.debug("Cancelling FlowRuleBatchFuture",
+                          new RuntimeException("Just printing backtrace"));
+            }
             if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
                 return false;
             }
@@ -526,6 +530,7 @@
                 throw new CancellationException();
             }
             if (!completed.isSuccess()) {
+                log.warn("FlowRuleBatch failed: {}", completed);
                 failed.addAll(completed.failedItems());
                 failedIds.addAll(completed.failedIds());
                 cleanUpBatch();
@@ -557,9 +562,11 @@
         }
 
         private void cleanUpBatch() {
+            log.debug("cleaning up batch");
+            // TODO convert these into a batch?
             for (FlowRuleBatchEntry fbe : batches.values()) {
                 if (fbe.getOperator() == FlowRuleOperation.ADD ||
-                        fbe.getOperator() == FlowRuleOperation.MODIFY) {
+                    fbe.getOperator() == FlowRuleOperation.MODIFY) {
                     store.deleteFlowRule(fbe.getTarget());
                 } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
                     store.removeFlowRule(new DefaultFlowEntry(fbe.getTarget()));
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index 1cb3210..d93432d 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -734,7 +734,8 @@
 
     private class IntentInstallMonitor implements Runnable {
 
-        private static final long TIMEOUT = 5000; // ms
+        // TODO make this configurable
+        private static final int TIMEOUT_PER_OP = 500; // ms
         private static final int MAX_ATTEMPTS = 3;
 
         private final IntentOperations ops;
@@ -742,11 +743,18 @@
 
         private Future<CompletedBatchOperation> future;
         private long startTime = System.currentTimeMillis();
-        private long endTime = startTime + TIMEOUT;
+        private long endTime;
         private int installAttempt;
 
         public IntentInstallMonitor(IntentOperations ops) {
             this.ops = ops;
+            resetTimeoutLimit();
+        }
+
+        private void resetTimeoutLimit() {
+            // FIXME compute reasonable timeouts
+            this.endTime = System.currentTimeMillis()
+                           + ops.operations().size() * TIMEOUT_PER_OP;
         }
 
         private void buildIntentUpdates() {
@@ -805,6 +813,7 @@
          */
         private void processFutures() {
             if (future == null) {
+                log.warn("I have no Future.");
                 return; //FIXME look at this
             }
             try {
@@ -818,9 +827,10 @@
         }
 
         private void retry() {
+            log.debug("Execution timed out, retrying.");
             if (future.cancel(true)) { // cancel success; batch is reverted
                 // reset the timer
-                endTime = System.currentTimeMillis() + TIMEOUT;
+                resetTimeoutLimit();
                 if (installAttempt++ >= MAX_ATTEMPTS) {
                     log.warn("Install request timed out: {}", ops);
                     for (IntentUpdate update : intentUpdates) {
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
index 7cc5d47..22673b6 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
@@ -169,8 +169,10 @@
                                         linkEvent.subject().isDurable())) {
                             final LinkKey linkKey = linkKey(linkEvent.subject());
                             Set<IntentId> intentIds = intentsByLink.get(linkKey);
-                            log.debug("recompile triggered by LinkDown {} {}", linkKey, intentIds);
-                            toBeRecompiled.addAll(intentIds);
+                            synchronized (intentsByLink) {
+                                log.debug("recompile triggered by LinkDown {} {}", linkKey, intentIds);
+                                toBeRecompiled.addAll(intentIds);
+                            }
                         }
                         recompileOnly = recompileOnly &&
                                 (linkEvent.type() == LINK_REMOVED ||
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DefaultTopology.java b/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DefaultTopology.java
index 939e558..3ee7ae2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DefaultTopology.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DefaultTopology.java
@@ -262,9 +262,16 @@
      * @return set of shortest paths
      */
     Set<Path> getPaths(DeviceId src, DeviceId dst, LinkWeight weight) {
+        final DefaultTopologyVertex srcV = new DefaultTopologyVertex(src);
+        final DefaultTopologyVertex dstV = new DefaultTopologyVertex(dst);
+        Set<TopologyVertex> vertices = graph.getVertexes();
+        if (!vertices.contains(srcV) || !vertices.contains(dstV)) {
+            // src or dst not part of the current graph
+            return ImmutableSet.of();
+        }
+
         GraphPathSearch.Result<TopologyVertex, TopologyEdge> result =
-                DIJKSTRA.search(graph, new DefaultTopologyVertex(src),
-                                new DefaultTopologyVertex(dst), weight);
+                DIJKSTRA.search(graph, srcV, dstV, weight);
         ImmutableSet.Builder<Path> builder = ImmutableSet.builder();
         for (org.onlab.graph.Path<TopologyVertex, TopologyEdge> path : result.paths()) {
             builder.add(networkPath(path));
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/DefaultTopology.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/DefaultTopology.java
index 066089a..a938039 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/DefaultTopology.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/DefaultTopology.java
@@ -262,9 +262,16 @@
      * @return set of shortest paths
      */
     Set<Path> getPaths(DeviceId src, DeviceId dst, LinkWeight weight) {
+        final DefaultTopologyVertex srcV = new DefaultTopologyVertex(src);
+        final DefaultTopologyVertex dstV = new DefaultTopologyVertex(dst);
+        Set<TopologyVertex> vertices = graph.getVertexes();
+        if (!vertices.contains(srcV) || !vertices.contains(dstV)) {
+            // src or dst not part of the current graph
+            return ImmutableSet.of();
+        }
+
         GraphPathSearch.Result<TopologyVertex, TopologyEdge> result =
-                DIJKSTRA.search(graph, new DefaultTopologyVertex(src),
-                                new DefaultTopologyVertex(dst), weight);
+                DIJKSTRA.search(graph, srcV, dstV, weight);
         ImmutableSet.Builder<Path> builder = ImmutableSet.builder();
         for (org.onlab.graph.Path<TopologyVertex, TopologyEdge> path : result.paths()) {
             builder.add(networkPath(path));
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index 9f26230..6a6249e 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -15,10 +15,12 @@
  */
 package org.onlab.onos.provider.of.flow.impl;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -85,7 +87,9 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkState;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -191,9 +195,8 @@
 
     @Override
     public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
-        final Set<Dpid> sws =
-                Collections.newSetFromMap(new ConcurrentHashMap<Dpid, Boolean>());
-        final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
+        final Set<Dpid> sws = Sets.newConcurrentHashSet();
+        final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<>();
         /*
          * Use identity hash map for reference equality as we could have equal
          * flow mods for different switches.
@@ -201,7 +204,8 @@
         Map<OFFlowMod, OpenFlowSwitch> mods = Maps.newIdentityHashMap();
         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
             FlowRule flowRule = fbe.getTarget();
-            OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
+            final Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
+            OpenFlowSwitch sw = controller.getSwitch(dpid);
             if (sw == null) {
                 /*
                  * if a switch we are supposed to install to is gone then
@@ -212,8 +216,8 @@
                 failed.cancel(true);
                 return failed;
             }
-            sws.add(new Dpid(sw.getId()));
-            Long flowModXid = xidCounter.getAndIncrement();
+            sws.add(dpid);
+            final Long flowModXid = xidCounter.getAndIncrement();
             FlowModBuilder builder =
                     FlowModBuilder.builder(flowRule, sw.factory(),
                                            Optional.of(flowModXid));
@@ -302,12 +306,17 @@
                     future = pendingFutures.get(msg.getXid());
                     if (future != null) {
                         future.satisfyRequirement(dpid);
+                    } else {
+                        log.warn("Received unknown Barrier Reply: {}", msg.getXid());
                     }
                     break;
                 case ERROR:
+                    log.warn("received Error message {} from {}", msg, dpid);
                     future = pendingFMs.get(msg.getXid());
                     if (future != null) {
                         future.fail((OFErrorMsg) msg, dpid);
+                    } else {
+                        log.warn("Received unknown Error Reply: {} {}", msg.getXid(), msg);
                     }
                     break;
                 default:
@@ -369,13 +378,17 @@
 
     private class InstallationFuture implements Future<CompletedBatchOperation> {
 
+        // barrier xid
         private final Long xid;
+        // waiting for barrier reply from...
         private final Set<Dpid> sws;
         private final AtomicBoolean ok = new AtomicBoolean(true);
+        // FlowMod xid ->
         private final Map<Long, FlowRuleBatchEntry> fms;
 
 
         private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
+        // Failed batch operation id
         private Long failedId;
 
         private final CountDownLatch countDownLatch;
@@ -456,6 +469,7 @@
 
 
         public void verify() {
+            checkState(!sws.isEmpty());
             for (Dpid dpid : sws) {
                 OpenFlowSwitch sw = controller.getSwitch(dpid);
                 OFBarrierRequest.Builder builder = sw.factory()
@@ -520,7 +534,7 @@
                 cleanUp();
                 return result;
             }
-            throw new TimeoutException();
+            throw new TimeoutException(this.toString());
         }
 
         private void cleanUp() {
@@ -538,6 +552,22 @@
             //FIXME don't do cleanup here (moved by BOC)
             //cleanUp();
         }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("xid", xid)
+                    .add("pending devices", sws)
+                    .add("devices in batch",
+                         fms.values().stream()
+                             .map((fbe) -> fbe.getTarget().deviceId())
+                             .distinct().collect(Collectors.toList()))
+                    .add("failedId", failedId)
+                    .add("latchCount", countDownLatch.getCount())
+                    .add("state", state)
+                    .add("no error?", ok.get())
+                    .toString();
+        }
     }
 
 }