Adding support for MatchAction to FlowPusher

Also, refacted MatchActionComponent to use the new API

Change-Id: I227ed178ab56e370d5c970d7d88df9408e261ff7
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
index aa1c057..824ed4f 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
@@ -24,16 +24,42 @@
 import net.floodlightcontroller.core.internal.OFMessageFuture;
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
 import net.floodlightcontroller.threadpool.IThreadPoolService;
+import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.core.intent.FlowEntry;
+import net.onrc.onos.core.matchaction.MatchAction;
+import net.onrc.onos.core.matchaction.MatchActionOperationEntry;
+import net.onrc.onos.core.matchaction.MatchActionOperations.Operator;
+import net.onrc.onos.core.matchaction.action.Action;
+import net.onrc.onos.core.matchaction.action.ModifyDstMacAction;
+import net.onrc.onos.core.matchaction.action.ModifySrcMacAction;
+import net.onrc.onos.core.matchaction.action.OutputAction;
+import net.onrc.onos.core.matchaction.match.Match;
+import net.onrc.onos.core.matchaction.match.PacketMatch;
 import net.onrc.onos.core.util.Dpid;
+import net.onrc.onos.core.util.IPv4Net;
+import net.onrc.onos.core.util.SwitchPort;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.projectfloodlight.openflow.protocol.OFActionType;
 import org.projectfloodlight.openflow.protocol.OFBarrierReply;
 import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
 import org.projectfloodlight.openflow.protocol.OFFactory;
 import org.projectfloodlight.openflow.protocol.OFFlowMod;
 import org.projectfloodlight.openflow.protocol.OFMessage;
 import org.projectfloodlight.openflow.protocol.OFType;
+import org.projectfloodlight.openflow.protocol.action.OFAction;
+import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
+import org.projectfloodlight.openflow.protocol.action.OFActions;
+import org.projectfloodlight.openflow.protocol.match.Match.Builder;
+import org.projectfloodlight.openflow.protocol.match.MatchField;
+import org.projectfloodlight.openflow.types.EthType;
+import org.projectfloodlight.openflow.types.IPv4Address;
+import org.projectfloodlight.openflow.types.IpProtocol;
+import org.projectfloodlight.openflow.types.MacAddress;
+import org.projectfloodlight.openflow.types.OFBufferId;
+import org.projectfloodlight.openflow.types.OFPort;
+import org.projectfloodlight.openflow.types.TransportPort;
+import org.projectfloodlight.openflow.types.U64;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -591,6 +617,144 @@
         pushFlowEntries(entries, priority);
     }
 
+    public static final int PRIORITY_DEFAULT = 32768; // Default Flow Priority
+
+    @Override
+    public void pushMatchAction(MatchActionOperationEntry matchActionOp) {
+        final MatchAction matchAction = matchActionOp.getTarget();
+
+        // Get the switch and its OFFactory
+        final SwitchPort srcPort = matchAction.getSwitchPort();
+        final Dpid dpid = srcPort.getDpid();
+        IOFSwitch sw = floodlightProvider.getMasterSwitch(dpid.value());
+        if (sw == null) {
+            log.warn("Couldn't find switch {} when pushing message", dpid);
+            return;
+        }
+        OFFactory factory = sw.getFactory();
+
+        // Build Match
+        final Match match = matchAction.getMatch();
+        Builder matchBuilder = factory.buildMatch();
+        if (match instanceof PacketMatch) {
+            final PacketMatch packetMatch = (PacketMatch) match;
+            final MACAddress srcMac = packetMatch.getSrcMacAddress();
+            final MACAddress dstMac = packetMatch.getDstMacAddress();
+            final Short etherType = packetMatch.getEtherType();
+            final IPv4Net srcIp = packetMatch.getSrcIpAddress();
+            final IPv4Net dstIp = packetMatch.getDstIpAddress();
+            final Byte ipProto = packetMatch.getIpProtocolNumber();
+            final Short srcTcpPort = packetMatch.getSrcTcpPortNumber();
+            final Short dstTcpPort = packetMatch.getDstTcpPortNumber();
+
+            if (srcMac != null) {
+                matchBuilder.setExact(MatchField.ETH_SRC, MacAddress.of(srcMac.toLong()));
+            }
+            if (dstMac != null) {
+                matchBuilder.setExact(MatchField.ETH_DST, MacAddress.of(dstMac.toLong()));
+            }
+            if (etherType != null) {
+                matchBuilder.setExact(MatchField.ETH_TYPE, EthType.of(etherType));
+            }
+            if (srcIp != null) {
+                matchBuilder.setMasked(MatchField.IPV4_SRC,
+                        IPv4Address.of(srcIp.address().value())
+                                .withMaskOfLength(srcIp.prefixLen()));
+            }
+            if (dstIp != null) {
+                matchBuilder.setMasked(MatchField.IPV4_DST,
+                        IPv4Address.of(dstIp.address().value())
+                                .withMaskOfLength(dstIp.prefixLen()));
+            }
+            if (ipProto != null) {
+                matchBuilder.setExact(MatchField.IP_PROTO, IpProtocol.of(ipProto));
+            }
+            if (srcTcpPort != null) {
+                matchBuilder.setExact(MatchField.TCP_SRC, TransportPort.of(srcTcpPort));
+            }
+            if (dstTcpPort != null) {
+                matchBuilder.setExact(MatchField.TCP_DST, TransportPort.of(dstTcpPort));
+            }
+            matchBuilder.setExact(MatchField.IN_PORT,
+                    OFPort.of(srcPort.getPortNumber().shortValue()));
+        } else {
+            log.warn("Unsupported Match type: {}", match.getClass().getName());
+            return;
+        }
+
+        // Build Actions
+        List<OFAction> actionList = new ArrayList<>(matchAction.getActions().size());
+        OFActions ofActionTypes = factory.actions();
+        for (Action action : matchAction.getActions()) {
+            OFAction ofAction = null;
+            if (action instanceof OutputAction) {
+                OutputAction outputAction = (OutputAction) action;
+                // short or int?
+                OFPort port = OFPort.of((int) outputAction.getPortNumber().value());
+                ofAction = ofActionTypes.output(port, Short.MAX_VALUE);
+            } else if (action instanceof ModifyDstMacAction) {
+                ModifyDstMacAction dstMacAction = (ModifyDstMacAction) action;
+                ofActionTypes.setDlDst(MacAddress.of(dstMacAction.getDstMac().toLong()));
+            } else if (action instanceof ModifySrcMacAction) {
+                ModifySrcMacAction srcMacAction = (ModifySrcMacAction) action;
+                ofActionTypes.setDlSrc(MacAddress.of(srcMacAction.getSrcMac().toLong()));
+            } else {
+                log.warn("Unsupported Action type: {}", action.getClass().getName());
+                continue;
+            }
+            actionList.add(ofAction);
+        }
+
+        // Construct a FlowMod message builder
+        OFFlowMod.Builder fmBuilder = null;
+        switch (matchActionOp.getOperator()) {
+        case ADD:
+            fmBuilder = factory.buildFlowAdd();
+            break;
+        case REMOVE:
+            fmBuilder = factory.buildFlowDeleteStrict();
+            break;
+        // case MODIFY: // TODO
+        // fmBuilder = factory.buildFlowModifyStrict();
+        // break;
+        default:
+            log.warn("Unsupported MatchAction Operator: {}", matchActionOp.getOperator());
+            return;
+        }
+
+        // Add output port for OF1.0
+        OFPort outp = OFPort.of((short) 0xffff); // OF1.0 OFPP.NONE
+        if (matchActionOp.getOperator() == Operator.REMOVE) {
+            if (actionList.size() == 1) {
+                if (actionList.get(0).getType() == OFActionType.OUTPUT) {
+                    OFActionOutput oa = (OFActionOutput) actionList.get(0);
+                    outp = oa.getPort();
+                }
+            }
+        }
+
+
+        // Build OFFlowMod
+        fmBuilder.setMatch(matchBuilder.build())
+                .setActions(actionList)
+                .setIdleTimeout(0) // hardcoded to zero for now
+                .setHardTimeout(0) // hardcoded to zero for now
+                .setCookie(U64.of(matchAction.getId().value()))
+                .setBufferId(OFBufferId.NO_BUFFER)
+                .setPriority(PRIORITY_DEFAULT)
+                .setOutPort(outp);
+
+        // Build the message and add it to the queue
+        add(dpid, fmBuilder.build());
+    }
+
+    @Override
+    public void pushMatchActions(Collection<MatchActionOperationEntry> matchActionOps) {
+        for (MatchActionOperationEntry matchActionOp : matchActionOps) {
+            pushMatchAction(matchActionOp);
+        }
+    }
+
     /**
      * Create a message from FlowEntry and add it to the queue of the switch.
      * <p>
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java
index 1026855..f11b0b1 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java
@@ -5,6 +5,7 @@
 import net.floodlightcontroller.core.internal.OFMessageFuture;
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.onrc.onos.core.intent.FlowEntry;
+import net.onrc.onos.core.matchaction.MatchActionOperationEntry;
 import net.onrc.onos.core.util.Dpid;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -135,6 +136,22 @@
                        MsgPriority priority);
 
     /**
+     * Create a message from MatchAction and add it to the queue of the switch
+     * with normal priority.
+     *
+     * @param matchActionOp MatchAction to use
+     */
+    void pushMatchAction(MatchActionOperationEntry matchActionOp);
+
+    /**
+     * Create messages for a collection of MatchActions and add them to the
+     * appropriate queue.
+     *
+     * @param matchActionOps Collection of MatchAction to use
+     */
+    void pushMatchActions(Collection<MatchActionOperationEntry> matchActionOps);
+
+    /**
      * Set sending rate to a switch.
      * <p/>
      * TODO The rate limiter function currently does not work as we are unable
diff --git a/src/main/java/net/onrc/onos/core/matchaction/MatchActionComponent.java b/src/main/java/net/onrc/onos/core/matchaction/MatchActionComponent.java
index 362038f..a415316 100644
--- a/src/main/java/net/onrc/onos/core/matchaction/MatchActionComponent.java
+++ b/src/main/java/net/onrc/onos/core/matchaction/MatchActionComponent.java
@@ -1,28 +1,5 @@
 package net.onrc.onos.core.matchaction;
 
-import net.floodlightcontroller.core.IFloodlightProviderService;
-import net.floodlightcontroller.core.IOFSwitch;
-import net.floodlightcontroller.core.internal.OFMessageFuture;
-import net.floodlightcontroller.core.module.IFloodlightService;
-import net.floodlightcontroller.util.MACAddress;
-import net.onrc.onos.api.flowmanager.ConflictDetectionPolicy;
-import net.onrc.onos.core.datagrid.IDatagridService;
-import net.onrc.onos.core.datagrid.IEventChannel;
-import net.onrc.onos.core.datagrid.IEventChannelListener;
-import net.onrc.onos.core.flowprogrammer.IFlowPusherService;
-import net.onrc.onos.core.intent.FlowEntry;
-import net.onrc.onos.core.matchaction.action.Action;
-import net.onrc.onos.core.matchaction.action.OutputAction;
-import net.onrc.onos.core.matchaction.match.Match;
-import net.onrc.onos.core.matchaction.match.PacketMatch;
-import net.onrc.onos.core.util.Dpid;
-import net.onrc.onos.core.util.IdGenerator;
-import net.onrc.onos.core.util.SwitchPort;
-import org.apache.commons.lang3.tuple.Pair;
-import org.projectfloodlight.openflow.protocol.OFBarrierReply;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EventListener;
@@ -37,7 +14,22 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 
-import static com.google.common.base.Preconditions.checkArgument;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.core.module.IFloodlightService;
+import net.onrc.onos.api.flowmanager.ConflictDetectionPolicy;
+import net.onrc.onos.core.datagrid.IDatagridService;
+import net.onrc.onos.core.datagrid.IEventChannel;
+import net.onrc.onos.core.datagrid.IEventChannelListener;
+import net.onrc.onos.core.flowprogrammer.IFlowPusherService;
+import net.onrc.onos.core.util.Dpid;
+import net.onrc.onos.core.util.IdGenerator;
+import net.onrc.onos.core.util.SwitchPort;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.projectfloodlight.openflow.protocol.OFBarrierReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -61,12 +53,6 @@
     private IEventChannel<String, MatchActionOperations> installSetChannel;
     private IEventChannel<String, SwitchResultList> installSetReplyChannel;
 
-    //  Convenience declarations to hide the name space collision on the Operator type
-    private static final net.onrc.onos.core.intent.IntentOperation.Operator INTENT_ADD_OP =
-            net.onrc.onos.core.intent.IntentOperation.Operator.ADD;
-    private static final net.onrc.onos.core.intent.IntentOperation.Operator INTENT_REMOVE_OP =
-            net.onrc.onos.core.intent.IntentOperation.Operator.REMOVE;
-
     // TODO Single instance for now, should be a work queue of some sort eventually
     private Thread coordinator;
     private Thread installer;
@@ -126,29 +112,6 @@
         return resolved;
     }
 
-    // TODO need operation too...
-    protected List<MatchAction> getMatchActions(final MatchActionOperations matchSet) {
-
-        final List<MatchAction> result = new ArrayList<>();
-        for (MatchActionOperationEntry op : matchSet.getOperations()) {
-            final MatchAction match = op.getTarget();
-
-            switch(op.getOperator()) {
-                case ADD:
-                    matchActionMap.put(match.getId(), match);
-                    break;
-
-                case REMOVE:
-                default:
-                    throw new UnsupportedOperationException(
-                            "Unsupported MatchAction operation" +
-                                    op.getOperator().toString());
-            }
-            result.add(match);
-        }
-        return result;
-    }
-
     class Coordinator extends Thread
             implements IEventChannelListener<String, SwitchResultList> {
 
@@ -181,9 +144,21 @@
 
             // build pending switches set for coordinator tracking
             Map<Dpid, SwitchResult> switches = new HashMap<>();
-            for (MatchAction match : getMatchActions(matchSet)) {
-                SwitchPort sw = match.getSwitchPort();
+            for (MatchActionOperationEntry matchActionOp : matchSet.getOperations()) {
+                MatchAction matchAction = matchActionOp.getTarget();
+                SwitchPort sw = matchAction.getSwitchPort();
                 switches.put(sw.getDpid(), new SwitchResult(setId, sw.getDpid()));
+                switch(matchActionOp.getOperator()) {
+                case ADD:
+                    matchActionMap.put(matchAction.getId(), matchAction);
+                    break;
+                case REMOVE:
+                    //TODO
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported MatchAction operation" +
+                                    matchActionOp.getOperator().toString());
+                }
             }
             pendingMatchActionOperationss.put(setId, switches);
 
@@ -268,47 +243,43 @@
         // Note: we should consider using an alternative representation for
         // apply sets
         protected void install(MatchActionOperations matchSet) {
-            Map<Long, IOFSwitch> switches = provider.getSwitches();
+            Set<Long> masterDpids = provider.getAllMasterSwitchDpids();
 
-            Set<Pair<Dpid, FlowEntry>> entries = new HashSet<>();
-            Set<IOFSwitch> modifiedSwitches = new HashSet<>();
+            Set<MatchActionOperationEntry> installSet = new HashSet<>();
+            Set<Dpid> modifiedSwitches = new HashSet<>();
 
-            // convert flow entries and create pairs
-            for (MatchAction entry : getMatchActions(matchSet)) {
-                Dpid swDpid = entry.getSwitchPort().getDpid();
-                IOFSwitch sw = switches.get(swDpid.value());
-                if (sw == null) {
-                    // no active switch, skip this flow entry
-                    log.debug("Skipping flow entry: {}", entry);
-                    continue;
+            for (MatchActionOperationEntry matchActionOp : matchSet.getOperations()) {
+                MatchAction matchAction = matchActionOp.getTarget();
+                Dpid dpid = matchAction.getSwitchPort().getDpid();
+                if (masterDpids.contains(dpid.value())) {
+                    // only install if we are the master
+                    // TODO this optimization will introduce some nice race
+                    // conditions on failure requiring mastership change
+                    installSet.add(matchActionOp);
+                    modifiedSwitches.add(dpid);
                 }
-                final List<FlowEntry> flowEntries = getFlowEntry(entry);
-                for (final FlowEntry flowEntry : flowEntries) {
-                    entries.add(Pair.of(swDpid, flowEntry));
-                }
-                modifiedSwitches.add(sw);
             }
 
             // push flow entries to switches
-            pusher.pushFlowEntries(entries);
+            pusher.pushMatchActions(installSet);
 
             // insert a barrier after each phase on each modifiedSwitch
             // wait for confirmation messages before proceeding
-            List<Pair<IOFSwitch, OFMessageFuture<OFBarrierReply>>> barriers = new ArrayList<>();
-            for (IOFSwitch sw : modifiedSwitches) {
-                barriers.add(Pair.of(sw, pusher.barrierAsync(new Dpid(sw.getId()))));
+            List<Pair<Dpid, OFMessageFuture<OFBarrierReply>>> barriers = new ArrayList<>();
+            for (Dpid dpid : modifiedSwitches) {
+                barriers.add(Pair.of(dpid, pusher.barrierAsync(dpid)));
             }
             List<SwitchResult> switchResults = new ArrayList<>();
-            for (Pair<IOFSwitch, OFMessageFuture<OFBarrierReply>> pair : barriers) {
-                IOFSwitch sw = pair.getLeft();
+            for (Pair<Dpid, OFMessageFuture<OFBarrierReply>> pair : barriers) {
+                Dpid dpid = pair.getLeft();
                 OFMessageFuture<OFBarrierReply> future = pair.getRight();
-                SwitchResult switchResult = new SwitchResult(matchSet.getOperationsId(), new Dpid(
-                        sw.getId()));
+                SwitchResult switchResult = new SwitchResult(matchSet.getOperationsId(),
+                        dpid);
                 try {
                     future.get();
                     switchResult.setStatus(SwitchResult.Status.SUCCESS);
                 } catch (InterruptedException | ExecutionException e) {
-                    log.error("Barrier message not received for sw: {}", sw);
+                    log.error("Barrier message not received for sw: {}", dpid);
                     switchResult.setStatus(SwitchResult.Status.FAILURE);
                 }
                 switchResults.add(switchResult);
@@ -323,48 +294,6 @@
                     switchResultList);
         }
 
-        // TODO this should be removed when FlowPusher supports MatchAction
-        private List<FlowEntry> getFlowEntry(MatchAction matchAction) {
-            final Match match = matchAction.getMatch();
-            //  Currently we only support Packet based matching
-            checkArgument(match instanceof PacketMatch);
-
-            final PacketMatch packetMatch = (PacketMatch) match;
-            final SwitchPort srcPort = matchAction.getSwitchPort();
-
-            final long switchId = srcPort.getDpid().value();
-            final long srcPortNumber = srcPort.getPortNumber().value();
-            final int srcIp = packetMatch.getSrcIpAddress().address().value();
-            final MACAddress srcMacAddress = packetMatch.getSrcMacAddress();
-            final int dstIp = packetMatch.getDstIpAddress().address().value();
-            final MACAddress dstMacAddress = packetMatch.getDstMacAddress();
-
-            final List<FlowEntry> result = new ArrayList<>();
-
-            for (final Action action : matchAction.getActions()) {
-                if (action instanceof OutputAction) {
-                    final OutputAction outputAction = (OutputAction) action;
-                    final long dstPortNumber =
-                            outputAction.getPortNumber().value();
-
-
-                    final FlowEntry entry = new FlowEntry(
-                            switchId,
-                            srcPortNumber,
-                            dstPortNumber,
-                            srcMacAddress,
-                            dstMacAddress,
-                            srcIp,
-                            dstIp,
-                            INTENT_ADD_OP
-                    );
-                    result.add(entry);
-                }
-            }
-
-            return result;
-        }
-
         @Override
         public void run() {
             while (true) {
diff --git a/src/main/java/net/onrc/onos/core/matchaction/MatchActionId.java b/src/main/java/net/onrc/onos/core/matchaction/MatchActionId.java
index 3dcb64c..0bddffa 100644
--- a/src/main/java/net/onrc/onos/core/matchaction/MatchActionId.java
+++ b/src/main/java/net/onrc/onos/core/matchaction/MatchActionId.java
@@ -1,9 +1,9 @@
 package net.onrc.onos.core.matchaction;
 
-import net.onrc.onos.api.batchoperation.BatchOperationTarget;
-
 import java.util.Objects;
 
+import net.onrc.onos.api.batchoperation.BatchOperationTarget;
+
 /**
  * A unique identifier for a MatchAction.  Objects of this class are immutable.
  */
@@ -19,6 +19,15 @@
         value = id;
     }
 
+    /**
+     * Returns the MatchActionId as a long.
+     *
+     * @return MatchAction ID
+     */
+    public long value() {
+        return value;
+    }
+
     @Override
     public String toString() {
         return Long.toString(value);