initial working impl of batch operations

Change-Id: Ie970543dec1104a394c7bcfa6eec24c0538278d6
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index eac3c18..0aca754 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -2,8 +2,17 @@
 
 import static org.slf4j.LoggerFactory.getLogger;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -14,9 +23,11 @@
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.flow.FlowEntry;
 import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
 import org.onlab.onos.net.flow.FlowRuleProvider;
 import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
 import org.onlab.onos.net.flow.FlowRuleProviderService;
+import org.onlab.onos.net.intent.BatchOperation;
 import org.onlab.onos.net.provider.AbstractProvider;
 import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.onos.net.topology.TopologyService;
@@ -27,6 +38,8 @@
 import org.onlab.onos.openflow.controller.OpenFlowSwitchListener;
 import org.onlab.onos.openflow.controller.RoleState;
 import org.projectfloodlight.openflow.protocol.OFActionType;
+import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
+import org.projectfloodlight.openflow.protocol.OFErrorMsg;
 import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
 import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
 import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
@@ -42,9 +55,11 @@
 import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
 import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
 import org.projectfloodlight.openflow.types.OFPort;
+import org.projectfloodlight.openflow.types.U32;
 import org.slf4j.Logger;
 
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 
@@ -70,6 +85,9 @@
 
     private final InternalFlowProvider listener = new InternalFlowProvider();
 
+    private final Map<Long, InstallationFuture> pendingFutures =
+            new ConcurrentHashMap<Long, InstallationFuture>();
+
     /**
      * Creates an OpenFlow host provider.
      */
@@ -101,7 +119,7 @@
 
     private void applyRule(FlowRule flowRule) {
         OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
-        sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
+        sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
     }
 
 
@@ -154,6 +172,7 @@
 
         @Override
         public void handleMessage(Dpid dpid, OFMessage msg) {
+            InstallationFuture future = null;
             switch (msg.getType()) {
             case FLOW_REMOVED:
                 //TODO: make this better
@@ -166,7 +185,17 @@
                 pushFlowMetrics(dpid, (OFStatsReply) msg);
                 break;
             case BARRIER_REPLY:
+                future = pendingFutures.get(msg.getXid());
+                if (future != null) {
+                    future.satisfyRequirement(dpid);
+                }
+                break;
             case ERROR:
+                future = pendingFutures.get(msg.getXid());
+                if (future != null) {
+                    future.fail((OFErrorMsg) msg, dpid);
+                }
+                break;
             default:
                 log.debug("Unhandled message type: {}", msg.getType());
             }
@@ -226,6 +255,144 @@
     }
 
 
+    @Override
+    public Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
+        final Set<Dpid> sws = new HashSet<Dpid>();
 
+        for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+            FlowRule flowRule = fbe.getTarget();
+            OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
+            sws.add(new Dpid(sw.getId()));
+            switch (fbe.getOperator()) {
+                case ADD:
+                  //TODO: Track XID for each flowmod
+                    sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
+                    break;
+                case REMOVE:
+                  //TODO: Track XID for each flowmod
+                    sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel());
+                    break;
+                case MODIFY:
+                  //TODO: Track XID for each flowmod
+                    sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
+                    break;
+                default:
+                    log.error("Unsupported batch operation {}", fbe.getOperator());
+            }
+        }
+        InstallationFuture installation = new InstallationFuture(sws);
+        pendingFutures.put(U32.f(batch.hashCode()), installation);
+        installation.verify(batch.hashCode());
+        return installation;
+    }
+
+    private class InstallationFuture implements Future<Void> {
+
+        private final Set<Dpid> sws;
+        private final AtomicBoolean ok = new AtomicBoolean(true);
+        private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
+
+        private final CountDownLatch countDownLatch;
+
+        public InstallationFuture(Set<Dpid> sws) {
+            this.sws = sws;
+            countDownLatch = new CountDownLatch(sws.size());
+        }
+
+        public void fail(OFErrorMsg msg, Dpid dpid) {
+            ok.set(false);
+            //TODO add reason to flowentry
+            //TODO handle specific error msgs
+            //offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.));
+            switch (msg.getErrType()) {
+                case BAD_ACTION:
+                    break;
+                case BAD_INSTRUCTION:
+                    break;
+                case BAD_MATCH:
+                    break;
+                case BAD_REQUEST:
+                    break;
+                case EXPERIMENTER:
+                    break;
+                case FLOW_MOD_FAILED:
+                    break;
+                case GROUP_MOD_FAILED:
+                    break;
+                case HELLO_FAILED:
+                    break;
+                case METER_MOD_FAILED:
+                    break;
+                case PORT_MOD_FAILED:
+                    break;
+                case QUEUE_OP_FAILED:
+                    break;
+                case ROLE_REQUEST_FAILED:
+                    break;
+                case SWITCH_CONFIG_FAILED:
+                    break;
+                case TABLE_FEATURES_FAILED:
+                    break;
+                case TABLE_MOD_FAILED:
+                    break;
+                default:
+                    break;
+
+            }
+
+        }
+
+        public void satisfyRequirement(Dpid dpid) {
+            log.warn("Satisfaction from switch {}", dpid);
+            sws.remove(controller.getSwitch(dpid));
+            countDownLatch.countDown();
+        }
+
+        public void verify(Integer id) {
+            for (Dpid dpid : sws) {
+                OpenFlowSwitch sw = controller.getSwitch(dpid);
+                OFBarrierRequest.Builder builder = sw.factory()
+                        .buildBarrierRequest()
+                        .setXid(id);
+                sw.sendMsg(builder.build());
+            }
+
+
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+                // TODO Auto-generated method stub
+                return false;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            // TODO Auto-generated method stub
+            return false;
+        }
+
+        @Override
+        public boolean isDone() {
+            return sws.isEmpty();
+        }
+
+        @Override
+        public Void get() throws InterruptedException, ExecutionException {
+            countDownLatch.await();
+            //return offendingFlowMods;
+            return null;
+        }
+
+        @Override
+        public Void get(long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException,
+                TimeoutException {
+            countDownLatch.await(timeout, unit);
+            //return offendingFlowMods;
+            return null;
+        }
+
+    }
 
 }