Fixing flow rule batches

Problem should now be fixed. Hashing on enums last is a bad
idea because the enum value could be 0.

Change-Id: Ib29e90b393b5285be2807729b52e69b121340f09
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index 018d6f3..214bde3 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -15,21 +15,11 @@
  */
 package org.onlab.onos.provider.of.flow.impl;
 
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ExecutionList;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -80,15 +70,23 @@
 import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
 import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
 import org.projectfloodlight.openflow.types.OFPort;
-import org.projectfloodlight.openflow.types.U32;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ExecutionList;
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Provider which uses an OpenFlow controller to detect network
@@ -124,6 +122,8 @@
 
     private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
 
+    private final AtomicLong xidCounter = new AtomicLong(0);
+
     /**
      * Creates an OpenFlow host provider.
      */
@@ -154,6 +154,7 @@
 
         log.info("Stopped");
     }
+
     @Override
     public void applyFlowRule(FlowRule... flowRules) {
         for (int i = 0; i < flowRules.length; i++) {
@@ -167,7 +168,6 @@
     }
 
 
-
     @Override
     public void removeFlowRule(FlowRule... flowRules) {
         for (int i = 0; i < flowRules.length; i++) {
@@ -188,11 +188,15 @@
     }
 
     @Override
-    public ListenableFuture<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
+    public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
         final Set<Dpid> sws =
                 Collections.newSetFromMap(new ConcurrentHashMap<Dpid, Boolean>());
         final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
-        OFFlowMod mod = null;
+        /*
+         * Use identity hash map for reference equality as we could have equal
+         * flow mods for different switches.
+         */
+        Map<OFFlowMod, OpenFlowSwitch> mods = Maps.newIdentityHashMap();
         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
             FlowRule flowRule = fbe.getTarget();
             OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
@@ -208,6 +212,7 @@
             }
             sws.add(new Dpid(sw.getId()));
             FlowModBuilder builder = FlowModBuilder.builder(flowRule, sw.factory());
+            OFFlowMod mod = null;
             switch (fbe.getOperator()) {
                 case ADD:
                     mod = builder.buildFlowAdd();
@@ -222,25 +227,29 @@
                     log.error("Unsupported batch operation {}", fbe.getOperator());
             }
             if (mod != null) {
-                sw.sendMsg(mod);
-                fmXids.put(mod.getXid(), fbe);
+                mods.put(mod, sw);
+                fmXids.put(xidCounter.getAndIncrement(), fbe);
             } else {
                 log.error("Conversion of flowrule {} failed.", flowRule);
             }
-
         }
         InstallationFuture installation = new InstallationFuture(sws, fmXids);
         for (Long xid : fmXids.keySet()) {
             pendingFMs.put(xid, installation);
         }
-        pendingFutures.put(U32.f(batch.hashCode()), installation);
-        installation.verify(U32.f(batch.hashCode()));
+        pendingFutures.put(installation.xid(), installation);
+        for (Map.Entry<OFFlowMod, OpenFlowSwitch> entry : mods.entrySet()) {
+            OpenFlowSwitch sw = entry.getValue();
+            OFFlowMod mod = entry.getKey();
+            sw.sendMsg(mod);
+        }
+        installation.verify();
         return installation;
     }
 
 
     private class InternalFlowProvider
-    implements OpenFlowSwitchListener, OpenFlowEventListener {
+            implements OpenFlowSwitchListener, OpenFlowEventListener {
 
 
         private final Multimap<DeviceId, FlowEntry> completeEntries =
@@ -274,36 +283,36 @@
         public void handleMessage(Dpid dpid, OFMessage msg) {
             InstallationFuture future = null;
             switch (msg.getType()) {
-            case FLOW_REMOVED:
-                OFFlowRemoved removed = (OFFlowRemoved) msg;
+                case FLOW_REMOVED:
+                    OFFlowRemoved removed = (OFFlowRemoved) msg;
 
-                FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
-                providerService.flowRemoved(fr);
-                break;
-            case STATS_REPLY:
-                pushFlowMetrics(dpid, (OFStatsReply) msg);
-                break;
-            case BARRIER_REPLY:
-                future = pendingFutures.get(msg.getXid());
-                if (future != null) {
-                    future.satisfyRequirement(dpid);
-                }
-                break;
-            case ERROR:
-                future = pendingFMs.get(msg.getXid());
-                if (future != null) {
-                    future.fail((OFErrorMsg) msg, dpid);
-                }
-                break;
-            default:
-                log.debug("Unhandled message type: {}", msg.getType());
+                    FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
+                    providerService.flowRemoved(fr);
+                    break;
+                case STATS_REPLY:
+                    pushFlowMetrics(dpid, (OFStatsReply) msg);
+                    break;
+                case BARRIER_REPLY:
+                    future = pendingFutures.get(msg.getXid());
+                    if (future != null) {
+                        future.satisfyRequirement(dpid);
+                    }
+                    break;
+                case ERROR:
+                    future = pendingFMs.get(msg.getXid());
+                    if (future != null) {
+                        future.fail((OFErrorMsg) msg, dpid);
+                    }
+                    break;
+                default:
+                    log.debug("Unhandled message type: {}", msg.getType());
             }
 
         }
 
         @Override
         public void receivedRoleReply(Dpid dpid, RoleState requested,
-                RoleState response) {
+                                      RoleState response) {
             // Do nothing here for now.
         }
 
@@ -352,8 +361,9 @@
 
     }
 
-    private class InstallationFuture implements ListenableFuture<CompletedBatchOperation> {
+    private class InstallationFuture implements Future<CompletedBatchOperation> {
 
+        private final Long xid;
         private final Set<Dpid> sws;
         private final AtomicBoolean ok = new AtomicBoolean(true);
         private final Map<Long, FlowRuleBatchEntry> fms;
@@ -361,18 +371,22 @@
         private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
 
         private final CountDownLatch countDownLatch;
-        private Long pendingXid;
         private BatchState state;
 
         private final ExecutionList executionList = new ExecutionList();
 
         public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
+            this.xid = xidCounter.getAndIncrement();
             this.state = BatchState.STARTED;
             this.sws = sws;
             this.fms = fmXids;
             countDownLatch = new CountDownLatch(sws.size());
         }
 
+        public Long xid() {
+            return xid;
+        }
+
         public void fail(OFErrorMsg msg, Dpid dpid) {
 
             ok.set(false);
@@ -385,27 +399,27 @@
                 case BAD_ACTION:
                     OFBadActionErrorMsg bad = (OFBadActionErrorMsg) msg;
                     fe = new DefaultFlowEntry(offending, bad.getErrType().ordinal(),
-                            bad.getCode().ordinal());
+                                              bad.getCode().ordinal());
                     break;
                 case BAD_INSTRUCTION:
                     OFBadInstructionErrorMsg badins = (OFBadInstructionErrorMsg) msg;
                     fe = new DefaultFlowEntry(offending, badins.getErrType().ordinal(),
-                            badins.getCode().ordinal());
+                                              badins.getCode().ordinal());
                     break;
                 case BAD_MATCH:
                     OFBadMatchErrorMsg badMatch = (OFBadMatchErrorMsg) msg;
                     fe = new DefaultFlowEntry(offending, badMatch.getErrType().ordinal(),
-                            badMatch.getCode().ordinal());
+                                              badMatch.getCode().ordinal());
                     break;
                 case BAD_REQUEST:
                     OFBadRequestErrorMsg badReq = (OFBadRequestErrorMsg) msg;
                     fe = new DefaultFlowEntry(offending, badReq.getErrType().ordinal(),
-                            badReq.getCode().ordinal());
+                                              badReq.getCode().ordinal());
                     break;
                 case FLOW_MOD_FAILED:
                     OFFlowModFailedErrorMsg fmFail = (OFFlowModFailedErrorMsg) msg;
                     fe = new DefaultFlowEntry(offending, fmFail.getErrType().ordinal(),
-                            fmFail.getCode().ordinal());
+                                              fmFail.getCode().ordinal());
                     break;
                 case EXPERIMENTER:
                 case GROUP_MOD_FAILED:
@@ -434,13 +448,12 @@
         }
 
 
-        public void verify(Long id) {
-            pendingXid = id;
+        public void verify() {
             for (Dpid dpid : sws) {
                 OpenFlowSwitch sw = controller.getSwitch(dpid);
                 OFBarrierRequest.Builder builder = sw.factory()
                         .buildBarrierRequest()
-                        .setXid(id);
+                        .setXid(xid);
                 sw.sendMsg(builder.build());
             }
         }
@@ -462,7 +475,6 @@
                 }
 
             }
-            invokeCallbacks();
             return true;
         }
 
@@ -481,6 +493,7 @@
             countDownLatch.await();
             this.state = BatchState.FINISHED;
             CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
+            //FIXME do cleanup here
             return result;
         }
 
@@ -491,6 +504,7 @@
             if (countDownLatch.await(timeout, unit)) {
                 this.state = BatchState.FINISHED;
                 CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
+                // FIXME do cleanup here
                 return result;
             }
             throw new TimeoutException();
@@ -498,9 +512,7 @@
 
         private void cleanUp() {
             if (isDone() || isCancelled()) {
-                if (pendingXid != null) {
-                    pendingFutures.remove(pendingXid);
-                }
+                pendingFutures.remove(xid);
                 for (Long xid : fms.keySet()) {
                     pendingFMs.remove(xid);
                 }
@@ -509,21 +521,10 @@
 
         private void removeRequirement(Dpid dpid) {
             countDownLatch.countDown();
-            if (countDownLatch.getCount() == 0) {
-                invokeCallbacks();
-            }
             sws.remove(dpid);
+            //FIXME don't do cleanup here
             cleanUp();
         }
-
-        @Override
-        public void addListener(Runnable runnable, Executor executor) {
-            executionList.add(runnable, executor);
-        }
-
-        private void invokeCallbacks() {
-            executionList.execute();
-        }
     }
 
 }