Adding support for MatchAction to FlowPusher

Also, refacted MatchActionComponent to use the new API

Change-Id: I227ed178ab56e370d5c970d7d88df9408e261ff7
diff --git a/src/main/java/net/onrc/onos/core/matchaction/MatchActionComponent.java b/src/main/java/net/onrc/onos/core/matchaction/MatchActionComponent.java
index 362038f..a415316 100644
--- a/src/main/java/net/onrc/onos/core/matchaction/MatchActionComponent.java
+++ b/src/main/java/net/onrc/onos/core/matchaction/MatchActionComponent.java
@@ -1,28 +1,5 @@
 package net.onrc.onos.core.matchaction;
 
-import net.floodlightcontroller.core.IFloodlightProviderService;
-import net.floodlightcontroller.core.IOFSwitch;
-import net.floodlightcontroller.core.internal.OFMessageFuture;
-import net.floodlightcontroller.core.module.IFloodlightService;
-import net.floodlightcontroller.util.MACAddress;
-import net.onrc.onos.api.flowmanager.ConflictDetectionPolicy;
-import net.onrc.onos.core.datagrid.IDatagridService;
-import net.onrc.onos.core.datagrid.IEventChannel;
-import net.onrc.onos.core.datagrid.IEventChannelListener;
-import net.onrc.onos.core.flowprogrammer.IFlowPusherService;
-import net.onrc.onos.core.intent.FlowEntry;
-import net.onrc.onos.core.matchaction.action.Action;
-import net.onrc.onos.core.matchaction.action.OutputAction;
-import net.onrc.onos.core.matchaction.match.Match;
-import net.onrc.onos.core.matchaction.match.PacketMatch;
-import net.onrc.onos.core.util.Dpid;
-import net.onrc.onos.core.util.IdGenerator;
-import net.onrc.onos.core.util.SwitchPort;
-import org.apache.commons.lang3.tuple.Pair;
-import org.projectfloodlight.openflow.protocol.OFBarrierReply;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EventListener;
@@ -37,7 +14,22 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 
-import static com.google.common.base.Preconditions.checkArgument;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.core.module.IFloodlightService;
+import net.onrc.onos.api.flowmanager.ConflictDetectionPolicy;
+import net.onrc.onos.core.datagrid.IDatagridService;
+import net.onrc.onos.core.datagrid.IEventChannel;
+import net.onrc.onos.core.datagrid.IEventChannelListener;
+import net.onrc.onos.core.flowprogrammer.IFlowPusherService;
+import net.onrc.onos.core.util.Dpid;
+import net.onrc.onos.core.util.IdGenerator;
+import net.onrc.onos.core.util.SwitchPort;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.projectfloodlight.openflow.protocol.OFBarrierReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -61,12 +53,6 @@
     private IEventChannel<String, MatchActionOperations> installSetChannel;
     private IEventChannel<String, SwitchResultList> installSetReplyChannel;
 
-    //  Convenience declarations to hide the name space collision on the Operator type
-    private static final net.onrc.onos.core.intent.IntentOperation.Operator INTENT_ADD_OP =
-            net.onrc.onos.core.intent.IntentOperation.Operator.ADD;
-    private static final net.onrc.onos.core.intent.IntentOperation.Operator INTENT_REMOVE_OP =
-            net.onrc.onos.core.intent.IntentOperation.Operator.REMOVE;
-
     // TODO Single instance for now, should be a work queue of some sort eventually
     private Thread coordinator;
     private Thread installer;
@@ -126,29 +112,6 @@
         return resolved;
     }
 
-    // TODO need operation too...
-    protected List<MatchAction> getMatchActions(final MatchActionOperations matchSet) {
-
-        final List<MatchAction> result = new ArrayList<>();
-        for (MatchActionOperationEntry op : matchSet.getOperations()) {
-            final MatchAction match = op.getTarget();
-
-            switch(op.getOperator()) {
-                case ADD:
-                    matchActionMap.put(match.getId(), match);
-                    break;
-
-                case REMOVE:
-                default:
-                    throw new UnsupportedOperationException(
-                            "Unsupported MatchAction operation" +
-                                    op.getOperator().toString());
-            }
-            result.add(match);
-        }
-        return result;
-    }
-
     class Coordinator extends Thread
             implements IEventChannelListener<String, SwitchResultList> {
 
@@ -181,9 +144,21 @@
 
             // build pending switches set for coordinator tracking
             Map<Dpid, SwitchResult> switches = new HashMap<>();
-            for (MatchAction match : getMatchActions(matchSet)) {
-                SwitchPort sw = match.getSwitchPort();
+            for (MatchActionOperationEntry matchActionOp : matchSet.getOperations()) {
+                MatchAction matchAction = matchActionOp.getTarget();
+                SwitchPort sw = matchAction.getSwitchPort();
                 switches.put(sw.getDpid(), new SwitchResult(setId, sw.getDpid()));
+                switch(matchActionOp.getOperator()) {
+                case ADD:
+                    matchActionMap.put(matchAction.getId(), matchAction);
+                    break;
+                case REMOVE:
+                    //TODO
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported MatchAction operation" +
+                                    matchActionOp.getOperator().toString());
+                }
             }
             pendingMatchActionOperationss.put(setId, switches);
 
@@ -268,47 +243,43 @@
         // Note: we should consider using an alternative representation for
         // apply sets
         protected void install(MatchActionOperations matchSet) {
-            Map<Long, IOFSwitch> switches = provider.getSwitches();
+            Set<Long> masterDpids = provider.getAllMasterSwitchDpids();
 
-            Set<Pair<Dpid, FlowEntry>> entries = new HashSet<>();
-            Set<IOFSwitch> modifiedSwitches = new HashSet<>();
+            Set<MatchActionOperationEntry> installSet = new HashSet<>();
+            Set<Dpid> modifiedSwitches = new HashSet<>();
 
-            // convert flow entries and create pairs
-            for (MatchAction entry : getMatchActions(matchSet)) {
-                Dpid swDpid = entry.getSwitchPort().getDpid();
-                IOFSwitch sw = switches.get(swDpid.value());
-                if (sw == null) {
-                    // no active switch, skip this flow entry
-                    log.debug("Skipping flow entry: {}", entry);
-                    continue;
+            for (MatchActionOperationEntry matchActionOp : matchSet.getOperations()) {
+                MatchAction matchAction = matchActionOp.getTarget();
+                Dpid dpid = matchAction.getSwitchPort().getDpid();
+                if (masterDpids.contains(dpid.value())) {
+                    // only install if we are the master
+                    // TODO this optimization will introduce some nice race
+                    // conditions on failure requiring mastership change
+                    installSet.add(matchActionOp);
+                    modifiedSwitches.add(dpid);
                 }
-                final List<FlowEntry> flowEntries = getFlowEntry(entry);
-                for (final FlowEntry flowEntry : flowEntries) {
-                    entries.add(Pair.of(swDpid, flowEntry));
-                }
-                modifiedSwitches.add(sw);
             }
 
             // push flow entries to switches
-            pusher.pushFlowEntries(entries);
+            pusher.pushMatchActions(installSet);
 
             // insert a barrier after each phase on each modifiedSwitch
             // wait for confirmation messages before proceeding
-            List<Pair<IOFSwitch, OFMessageFuture<OFBarrierReply>>> barriers = new ArrayList<>();
-            for (IOFSwitch sw : modifiedSwitches) {
-                barriers.add(Pair.of(sw, pusher.barrierAsync(new Dpid(sw.getId()))));
+            List<Pair<Dpid, OFMessageFuture<OFBarrierReply>>> barriers = new ArrayList<>();
+            for (Dpid dpid : modifiedSwitches) {
+                barriers.add(Pair.of(dpid, pusher.barrierAsync(dpid)));
             }
             List<SwitchResult> switchResults = new ArrayList<>();
-            for (Pair<IOFSwitch, OFMessageFuture<OFBarrierReply>> pair : barriers) {
-                IOFSwitch sw = pair.getLeft();
+            for (Pair<Dpid, OFMessageFuture<OFBarrierReply>> pair : barriers) {
+                Dpid dpid = pair.getLeft();
                 OFMessageFuture<OFBarrierReply> future = pair.getRight();
-                SwitchResult switchResult = new SwitchResult(matchSet.getOperationsId(), new Dpid(
-                        sw.getId()));
+                SwitchResult switchResult = new SwitchResult(matchSet.getOperationsId(),
+                        dpid);
                 try {
                     future.get();
                     switchResult.setStatus(SwitchResult.Status.SUCCESS);
                 } catch (InterruptedException | ExecutionException e) {
-                    log.error("Barrier message not received for sw: {}", sw);
+                    log.error("Barrier message not received for sw: {}", dpid);
                     switchResult.setStatus(SwitchResult.Status.FAILURE);
                 }
                 switchResults.add(switchResult);
@@ -323,48 +294,6 @@
                     switchResultList);
         }
 
-        // TODO this should be removed when FlowPusher supports MatchAction
-        private List<FlowEntry> getFlowEntry(MatchAction matchAction) {
-            final Match match = matchAction.getMatch();
-            //  Currently we only support Packet based matching
-            checkArgument(match instanceof PacketMatch);
-
-            final PacketMatch packetMatch = (PacketMatch) match;
-            final SwitchPort srcPort = matchAction.getSwitchPort();
-
-            final long switchId = srcPort.getDpid().value();
-            final long srcPortNumber = srcPort.getPortNumber().value();
-            final int srcIp = packetMatch.getSrcIpAddress().address().value();
-            final MACAddress srcMacAddress = packetMatch.getSrcMacAddress();
-            final int dstIp = packetMatch.getDstIpAddress().address().value();
-            final MACAddress dstMacAddress = packetMatch.getDstMacAddress();
-
-            final List<FlowEntry> result = new ArrayList<>();
-
-            for (final Action action : matchAction.getActions()) {
-                if (action instanceof OutputAction) {
-                    final OutputAction outputAction = (OutputAction) action;
-                    final long dstPortNumber =
-                            outputAction.getPortNumber().value();
-
-
-                    final FlowEntry entry = new FlowEntry(
-                            switchId,
-                            srcPortNumber,
-                            dstPortNumber,
-                            srcMacAddress,
-                            dstMacAddress,
-                            srcIp,
-                            dstIp,
-                            INTENT_ADD_OP
-                    );
-                    result.add(entry);
-                }
-            }
-
-            return result;
-        }
-
         @Override
         public void run() {
             while (true) {