Renamed devicemanager, flowprogrammer, linkdiscovery and util packages

net.onrc.onos.ofcontroller.devicemanager.* => net.onrc.onos.core.devicemanager.*
net.onrc.onos.ofcontroller.flowprogrammer.* => net.onrc.onos.core.flowprogrammer.*
net.onrc.onos.ofcontroller.linkdiscovery.* => net.onrc.onos.core.linkdiscovery.*
net.onrc.onos.ofcontroller.util.* => net.onrc.onos.core.util.*

Change-Id: Iaa865af552e8fb3a589e73d006569ac79f5a0f08
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
new file mode 100644
index 0000000..bdb1f43
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
@@ -0,0 +1,1068 @@
+package net.onrc.onos.core.flowprogrammer;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.openflow.protocol.*;
+import org.openflow.protocol.action.*;
+import org.openflow.protocol.factory.BasicFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.floodlightcontroller.core.FloodlightContext;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFMessageListener;
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
+import net.floodlightcontroller.util.MACAddress;
+import net.floodlightcontroller.util.OFMessageDamper;
+import net.onrc.onos.core.util.FlowEntry;
+import net.onrc.onos.core.util.FlowEntryAction;
+import net.onrc.onos.core.util.FlowEntryActions;
+import net.onrc.onos.core.util.FlowEntryMatch;
+import net.onrc.onos.core.util.FlowEntryUserState;
+import net.onrc.onos.core.util.IPv4Net;
+import net.onrc.onos.core.util.Pair;
+import net.onrc.onos.core.util.Port;
+import net.onrc.onos.core.util.FlowEntryAction.*;
+
+/**
+ * FlowPusher is a implementation of FlowPusherService.
+ * FlowPusher assigns one message queue instance for each one switch.
+ * Number of message processing threads is configurable by constructor, and
+ * one thread can handle multiple message queues. Each queue will be assigned to
+ * a thread according to hash function defined by getHash().
+ * Each processing thread reads messages from queues and sends it to switches
+ * in round-robin. Processing thread also calculates rate of sending to suppress
+ * excessive message sending.
+ *
+ * @author Naoki Shiota
+ */
+public class FlowPusher implements IFlowPusherService, IOFMessageListener {
+    private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
+    protected static final int DEFAULT_NUMBER_THREAD = 1;
+
+    // TODO: Values copied from elsewhere (class LearningSwitch).
+    // The local copy should go away!
+    //
+    protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
+    protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250;    // ms
+
+    // Number of messages sent to switch at once
+    protected static final int MAX_MESSAGE_SEND = 100;
+
+    private static class SwitchQueueEntry {
+        OFMessage msg;
+
+        public SwitchQueueEntry(OFMessage msg) {
+            this.msg = msg;
+        }
+
+        public OFMessage getOFMessage() {
+            return msg;
+        }
+    }
+
+    /**
+     * SwitchQueue represents message queue attached to a switch.
+     * This consists of queue itself and variables used for limiting sending rate.
+     *
+     * @author Naoki Shiota
+     */
+    private class SwitchQueue {
+        List<Queue<SwitchQueueEntry>> raw_queues;
+        QueueState state;
+
+        // Max rate of sending message (bytes/ms). 0 implies no limitation.
+        long max_rate = 0;    // 0 indicates no limitation
+        long last_sent_time = 0;
+        long last_sent_size = 0;
+
+        // "To be deleted" flag
+        boolean toBeDeleted = false;
+
+        SwitchQueue() {
+            raw_queues = new ArrayList<Queue<SwitchQueueEntry>>(
+                    MsgPriority.values().length);
+            for (int i = 0; i < MsgPriority.values().length; ++i) {
+                raw_queues.add(i, new ArrayDeque<SwitchQueueEntry>());
+            }
+
+            state = QueueState.READY;
+        }
+
+        /**
+         * Check if sending rate is within the rate
+         *
+         * @param current Current time
+         * @return true if within the rate
+         */
+        boolean isSendable(long current) {
+            if (max_rate == 0) {
+                // no limitation
+                return true;
+            }
+
+            if (current == last_sent_time) {
+                return false;
+            }
+
+            // Check if sufficient time (from aspect of rate) elapsed or not.
+            long rate = last_sent_size / (current - last_sent_time);
+            return (rate < max_rate);
+        }
+
+        /**
+         * Log time and size of last sent data.
+         *
+         * @param current Time to be sent.
+         * @param size    Size of sent data (in bytes).
+         */
+        void logSentData(long current, long size) {
+            last_sent_time = current;
+            last_sent_size = size;
+        }
+
+        boolean add(SwitchQueueEntry entry, MsgPriority priority) {
+            Queue<SwitchQueueEntry> queue = getQueue(priority);
+            if (queue == null) {
+                log.error("Unexpected priority : ", priority);
+                return false;
+            }
+            return queue.add(entry);
+        }
+
+        /**
+         * Poll single appropriate entry object according to QueueState.
+         *
+         * @return Entry object.
+         */
+        SwitchQueueEntry poll() {
+            switch (state) {
+                case READY: {
+                    for (int i = 0; i < raw_queues.size(); ++i) {
+                        SwitchQueueEntry entry = raw_queues.get(i).poll();
+                        if (entry != null) {
+                            return entry;
+                        }
+                    }
+
+                    return null;
+                }
+                case SUSPENDED: {
+                    // Only polling from high priority queue
+                    SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
+                    return entry;
+                }
+                default:
+                    log.error("Unexpected QueueState : ", state);
+                    return null;
+            }
+        }
+
+        /**
+         * Check if this object has any messages in the queues to be sent
+         *
+         * @return True if there are some messages to be sent.
+         */
+        boolean hasMessageToSend() {
+            switch (state) {
+                case READY:
+                    for (Queue<SwitchQueueEntry> queue : raw_queues) {
+                        if (!queue.isEmpty()) {
+                            return true;
+                        }
+                    }
+                    break;
+                case SUSPENDED:
+                    // Only checking high priority queue
+                    return (!getQueue(MsgPriority.HIGH).isEmpty());
+                default:
+                    log.error("Unexpected QueueState : ", state);
+                    return false;
+            }
+
+            return false;
+        }
+
+        Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
+            return raw_queues.get(priority.ordinal());
+        }
+    }
+
+    /**
+     * BarrierInfo holds information to specify barrier message sent to switch.
+     *
+     * @author Naoki
+     */
+    private static class BarrierInfo {
+        final long dpid;
+        final int xid;
+
+        static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
+            return new BarrierInfo(sw.getId(), req.getXid());
+        }
+
+        static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
+            return new BarrierInfo(sw.getId(), rpy.getXid());
+        }
+
+        private BarrierInfo(long dpid, int xid) {
+            this.dpid = dpid;
+            this.xid = xid;
+        }
+
+        // Auto generated code by Eclipse
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (int) (dpid ^ (dpid >>> 32));
+            result = prime * result + xid;
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+
+            BarrierInfo other = (BarrierInfo) obj;
+            return (this.dpid == other.dpid) && (this.xid == other.xid);
+        }
+
+
+    }
+
+    private OFMessageDamper messageDamper = null;
+    private IThreadPoolService threadPool = null;
+
+    private FloodlightContext context = null;
+    private BasicFactory factory = null;
+
+    // Map of threads versus dpid
+    private Map<Long, FlowPusherThread> threadMap = null;
+    // Map from (DPID and transaction ID) to Future objects.
+    private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
+            = new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
+
+    private int number_thread;
+
+    /**
+     * Main thread that reads messages from queues and sends them to switches.
+     *
+     * @author Naoki Shiota
+     */
+    private class FlowPusherThread extends Thread {
+        private Map<IOFSwitch, SwitchQueue> assignedQueues
+                = new ConcurrentHashMap<IOFSwitch, SwitchQueue>();
+
+        final Lock queuingLock = new ReentrantLock();
+        final Condition messagePushed = queuingLock.newCondition();
+
+        @Override
+        public void run() {
+            this.setName("FlowPusherThread " + this.getId());
+            while (true) {
+                while (!queuesHasMessageToSend()) {
+                    queuingLock.lock();
+
+                    try {
+                        // wait for message pushed to queue
+                        messagePushed.await();
+                    } catch (InterruptedException e) {
+                        // Interrupted to be shut down (not an error)
+                        log.debug("FlowPusherThread is interrupted");
+                        return;
+                    } finally {
+                        queuingLock.unlock();
+                    }
+                }
+
+                // for safety of concurrent access, copy set of key objects
+                Set<IOFSwitch> keys = new HashSet<IOFSwitch>(assignedQueues.size());
+                for (IOFSwitch sw : assignedQueues.keySet()) {
+                    keys.add(sw);
+                }
+
+                for (IOFSwitch sw : keys) {
+                    SwitchQueue queue = assignedQueues.get(sw);
+
+                    if (sw == null || queue == null) {
+                        continue;
+                    }
+
+                    synchronized (queue) {
+                        processQueue(sw, queue, MAX_MESSAGE_SEND);
+                        if (queue.toBeDeleted && !queue.hasMessageToSend()) {
+                            // remove queue if flagged to be.
+                            assignedQueues.remove(sw);
+                        }
+                    }
+                }
+            }
+        }
+
+        /**
+         * Read messages from queue and send them to the switch.
+         * If number of messages excess the limit, stop sending messages.
+         *
+         * @param sw      Switch to which messages will be sent.
+         * @param queue   Queue of messages.
+         * @param max_msg Limitation of number of messages to be sent. If set to 0,
+         *                all messages in queue will be sent.
+         */
+        private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
+            // check sending rate and determine it to be sent or not
+            long current_time = System.currentTimeMillis();
+            long size = 0;
+
+            if (queue.isSendable(current_time)) {
+                int i = 0;
+                while (queue.hasMessageToSend()) {
+                    // Number of messages excess the limit
+                    if (0 < max_msg && max_msg <= i) {
+                        break;
+                    }
+                    ++i;
+
+                    SwitchQueueEntry queueEntry;
+                    synchronized (queue) {
+                        queueEntry = queue.poll();
+                    }
+
+                    OFMessage msg = queueEntry.getOFMessage();
+                    try {
+                        messageDamper.write(sw, msg, context);
+                        if (log.isTraceEnabled()) {
+                            log.trace("Pusher sends message : {}", msg);
+                        }
+                        size += msg.getLength();
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        log.error("Exception in sending message ({}) : {}", msg, e);
+                    }
+                }
+
+                sw.flush();
+                queue.logSentData(current_time, size);
+            }
+        }
+
+        private boolean queuesHasMessageToSend() {
+            for (SwitchQueue queue : assignedQueues.values()) {
+                if (queue.hasMessageToSend()) {
+                    return true;
+                }
+            }
+
+            return false;
+        }
+
+        private void notifyMessagePushed() {
+            queuingLock.lock();
+            try {
+                messagePushed.signal();
+            } finally {
+                queuingLock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Initialize object with one thread.
+     */
+    public FlowPusher() {
+        number_thread = DEFAULT_NUMBER_THREAD;
+    }
+
+    /**
+     * Initialize object with threads of given number.
+     *
+     * @param number_thread Number of threads to handle messages.
+     */
+    public FlowPusher(int number_thread) {
+        if (number_thread > 0) {
+            this.number_thread = number_thread;
+        } else {
+            this.number_thread = DEFAULT_NUMBER_THREAD;
+        }
+    }
+
+    /**
+     * Set parameters needed for sending messages.
+     *
+     * @param context    FloodlightContext used for sending messages.
+     *                   If null, FlowPusher uses default context.
+     * @param modContext FloodlightModuleContext used for acquiring
+     *                   ThreadPoolService and registering MessageListener.
+     * @param factory    Factory object to create OFMessage objects.
+     * @param damper     Message damper used for sending messages.
+     *                   If null, FlowPusher creates its own damper object.
+     */
+    public void init(FloodlightContext context,
+                     FloodlightModuleContext modContext,
+                     BasicFactory factory,
+                     OFMessageDamper damper) {
+        this.context = context;
+        this.factory = factory;
+        this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
+        IFloodlightProviderService flservice
+                = modContext.getServiceImpl(IFloodlightProviderService.class);
+        flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
+
+        if (damper != null) {
+            messageDamper = damper;
+        } else {
+            // use default values
+            messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+                    EnumSet.of(OFType.FLOW_MOD),
+                    OFMESSAGE_DAMPER_TIMEOUT);
+        }
+    }
+
+    /**
+     * Begin processing queue.
+     */
+    public void start() {
+        if (factory == null) {
+            log.error("FlowPusher not yet initialized.");
+            return;
+        }
+
+        threadMap = new HashMap<Long, FlowPusherThread>();
+        for (long i = 0; i < number_thread; ++i) {
+            FlowPusherThread thread = new FlowPusherThread();
+
+            threadMap.put(i, thread);
+            thread.start();
+        }
+    }
+
+    @Override
+    public boolean suspend(IOFSwitch sw) {
+        SwitchQueue queue = getQueue(sw);
+
+        if (queue == null) {
+            // create queue in case suspend is called before first message addition
+            queue = createQueueImpl(sw);
+        }
+
+        synchronized (queue) {
+            if (queue.state == QueueState.READY) {
+                queue.state = QueueState.SUSPENDED;
+                return true;
+            }
+            return false;
+        }
+    }
+
+    @Override
+    public boolean resume(IOFSwitch sw) {
+        SwitchQueue queue = getQueue(sw);
+
+        if (queue == null) {
+            log.error("No queue is attached to DPID : {}", sw.getId());
+            return false;
+        }
+
+        synchronized (queue) {
+            if (queue.state == QueueState.SUSPENDED) {
+                queue.state = QueueState.READY;
+
+                // Free the latch if queue has any messages
+                FlowPusherThread thread = getProcessingThread(sw);
+                if (queue.hasMessageToSend()) {
+                    thread.notifyMessagePushed();
+                }
+                return true;
+            }
+            return false;
+        }
+    }
+
+    @Override
+    public QueueState getState(IOFSwitch sw) {
+        SwitchQueue queue = getQueue(sw);
+
+        if (queue == null) {
+            return QueueState.UNKNOWN;
+        }
+
+        return queue.state;
+    }
+
+    /**
+     * Stop processing queue and exit thread.
+     */
+    public void stop() {
+        if (threadMap == null) {
+            return;
+        }
+
+        for (FlowPusherThread t : threadMap.values()) {
+            t.interrupt();
+        }
+    }
+
+    @Override
+    public void setRate(IOFSwitch sw, long rate) {
+        SwitchQueue queue = getQueue(sw);
+        if (queue == null) {
+            queue = createQueueImpl(sw);
+        }
+
+        if (rate > 0) {
+            log.debug("rate for {} is set to {}", sw.getId(), rate);
+            synchronized (queue) {
+                queue.max_rate = rate;
+            }
+        }
+    }
+
+    @Override
+    public boolean createQueue(IOFSwitch sw) {
+        SwitchQueue queue = createQueueImpl(sw);
+
+        return (queue != null);
+    }
+
+    protected SwitchQueue createQueueImpl(IOFSwitch sw) {
+        SwitchQueue queue = getQueue(sw);
+        if (queue != null) {
+            return queue;
+        }
+
+        FlowPusherThread proc = getProcessingThread(sw);
+        queue = new SwitchQueue();
+        queue.state = QueueState.READY;
+        proc.assignedQueues.put(sw, queue);
+
+        return queue;
+    }
+
+    @Override
+    public boolean deleteQueue(IOFSwitch sw) {
+        return deleteQueue(sw, false);
+    }
+
+    @Override
+    public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
+        FlowPusherThread proc = getProcessingThread(sw);
+
+        if (forceStop) {
+            SwitchQueue queue = proc.assignedQueues.remove(sw);
+            if (queue == null) {
+                return false;
+            }
+            return true;
+        } else {
+            SwitchQueue queue = getQueue(sw);
+            if (queue == null) {
+                return false;
+            }
+            synchronized (queue) {
+                queue.toBeDeleted = true;
+            }
+            return true;
+        }
+    }
+
+    @Override
+    public boolean add(IOFSwitch sw, OFMessage msg) {
+        return add(sw, msg, MsgPriority.NORMAL);
+    }
+
+    @Override
+    public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
+        return addMessageImpl(sw, msg, priority);
+    }
+
+    @Override
+    public void pushFlowEntries(
+            Collection<Pair<IOFSwitch, FlowEntry>> entries) {
+        pushFlowEntries(entries, MsgPriority.NORMAL);
+    }
+
+    @Override
+    public void pushFlowEntries(
+            Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
+
+        for (Pair<IOFSwitch, FlowEntry> entry : entries) {
+            add(entry.first, entry.second, priority);
+        }
+    }
+
+    @Override
+    public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
+        pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
+    }
+
+    @Override
+    public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
+        Collection<Pair<IOFSwitch, FlowEntry>> entries =
+                new LinkedList<Pair<IOFSwitch, FlowEntry>>();
+
+        entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
+        pushFlowEntries(entries, priority);
+    }
+
+    /**
+     * Create a message from FlowEntry and add it to the queue of the switch.
+     *
+     * @param sw        Switch to which message is pushed.
+     * @param flowEntry FlowEntry object used for creating message.
+     * @return true if message is successfully added to a queue.
+     */
+    private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
+        //
+        // Create the OpenFlow Flow Modification Entry to push
+        //
+        OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
+        long cookie = flowEntry.flowEntryId().value();
+
+        short flowModCommand = OFFlowMod.OFPFC_ADD;
+        if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
+            flowModCommand = OFFlowMod.OFPFC_ADD;
+        } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
+            flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
+        } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
+            flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
+        } else {
+            // Unknown user state. Ignore the entry
+            log.debug(
+                    "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
+                    flowEntry.flowEntryId(),
+                    flowEntry.flowEntryUserState());
+            return false;
+        }
+
+        //
+        // Fetch the match conditions.
+        //
+        // NOTE: The Flow matching conditions common for all Flow Entries are
+        // used ONLY if a Flow Entry does NOT have the corresponding matching
+        // condition set.
+        //
+        OFMatch match = new OFMatch();
+        match.setWildcards(OFMatch.OFPFW_ALL);
+        FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
+
+        // Match the Incoming Port
+        Port matchInPort = flowEntryMatch.inPort();
+        if (matchInPort != null) {
+            match.setInputPort(matchInPort.value());
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
+        }
+
+        // Match the Source MAC address
+        MACAddress matchSrcMac = flowEntryMatch.srcMac();
+        if (matchSrcMac != null) {
+            match.setDataLayerSource(matchSrcMac.toString());
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
+        }
+
+        // Match the Destination MAC address
+        MACAddress matchDstMac = flowEntryMatch.dstMac();
+        if (matchDstMac != null) {
+            match.setDataLayerDestination(matchDstMac.toString());
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
+        }
+
+        // Match the Ethernet Frame Type
+        Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
+        if (matchEthernetFrameType != null) {
+            match.setDataLayerType(matchEthernetFrameType);
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
+        }
+
+        // Match the VLAN ID
+        Short matchVlanId = flowEntryMatch.vlanId();
+        if (matchVlanId != null) {
+            match.setDataLayerVirtualLan(matchVlanId);
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
+        }
+
+        // Match the VLAN priority
+        Byte matchVlanPriority = flowEntryMatch.vlanPriority();
+        if (matchVlanPriority != null) {
+            match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
+            match.setWildcards(match.getWildcards()
+                    & ~OFMatch.OFPFW_DL_VLAN_PCP);
+        }
+
+        // Match the Source IPv4 Network prefix
+        IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
+        if (matchSrcIPv4Net != null) {
+            match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
+        }
+
+        // Natch the Destination IPv4 Network prefix
+        IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
+        if (matchDstIPv4Net != null) {
+            match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
+        }
+
+        // Match the IP protocol
+        Byte matchIpProto = flowEntryMatch.ipProto();
+        if (matchIpProto != null) {
+            match.setNetworkProtocol(matchIpProto);
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
+        }
+
+        // Match the IP ToS (DSCP field, 6 bits)
+        Byte matchIpToS = flowEntryMatch.ipToS();
+        if (matchIpToS != null) {
+            match.setNetworkTypeOfService(matchIpToS);
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
+        }
+
+        // Match the Source TCP/UDP port
+        Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
+        if (matchSrcTcpUdpPort != null) {
+            match.setTransportSource(matchSrcTcpUdpPort);
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
+        }
+
+        // Match the Destination TCP/UDP port
+        Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
+        if (matchDstTcpUdpPort != null) {
+            match.setTransportDestination(matchDstTcpUdpPort);
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
+        }
+
+        //
+        // Fetch the actions
+        //
+        Short actionOutputPort = null;
+        List<OFAction> openFlowActions = new ArrayList<OFAction>();
+        int actionsLen = 0;
+        FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
+        //
+        for (FlowEntryAction action : flowEntryActions.actions()) {
+            ActionOutput actionOutput = action.actionOutput();
+            ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
+            ActionSetVlanPriority actionSetVlanPriority = action
+                    .actionSetVlanPriority();
+            ActionStripVlan actionStripVlan = action.actionStripVlan();
+            ActionSetEthernetAddr actionSetEthernetSrcAddr = action
+                    .actionSetEthernetSrcAddr();
+            ActionSetEthernetAddr actionSetEthernetDstAddr = action
+                    .actionSetEthernetDstAddr();
+            ActionSetIPv4Addr actionSetIPv4SrcAddr = action
+                    .actionSetIPv4SrcAddr();
+            ActionSetIPv4Addr actionSetIPv4DstAddr = action
+                    .actionSetIPv4DstAddr();
+            ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
+            ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
+                    .actionSetTcpUdpSrcPort();
+            ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
+                    .actionSetTcpUdpDstPort();
+            ActionEnqueue actionEnqueue = action.actionEnqueue();
+
+            if (actionOutput != null) {
+                actionOutputPort = actionOutput.port().value();
+                // XXX: The max length is hard-coded for now
+                OFActionOutput ofa = new OFActionOutput(actionOutput.port()
+                        .value(), (short) 0xffff);
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetVlanId != null) {
+                OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
+                        actionSetVlanId.vlanId());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetVlanPriority != null) {
+                OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
+                        actionSetVlanPriority.vlanPriority());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionStripVlan != null) {
+                if (actionStripVlan.stripVlan() == true) {
+                    OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
+                    openFlowActions.add(ofa);
+                    actionsLen += ofa.getLength();
+                }
+            }
+
+            if (actionSetEthernetSrcAddr != null) {
+                OFActionDataLayerSource ofa = new OFActionDataLayerSource(
+                        actionSetEthernetSrcAddr.addr().toBytes());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetEthernetDstAddr != null) {
+                OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
+                        actionSetEthernetDstAddr.addr().toBytes());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetIPv4SrcAddr != null) {
+                OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
+                        actionSetIPv4SrcAddr.addr().value());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetIPv4DstAddr != null) {
+                OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
+                        actionSetIPv4DstAddr.addr().value());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetIpToS != null) {
+                OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
+                        actionSetIpToS.ipToS());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetTcpUdpSrcPort != null) {
+                OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
+                        actionSetTcpUdpSrcPort.port());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetTcpUdpDstPort != null) {
+                OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
+                        actionSetTcpUdpDstPort.port());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionEnqueue != null) {
+                OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
+                        .value(), actionEnqueue.queueId());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+        }
+
+        fm.setIdleTimeout((short) flowEntry.idleTimeout())
+                .setHardTimeout((short) flowEntry.hardTimeout())
+                .setPriority((short) flowEntry.priority())
+                .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
+                .setCommand(flowModCommand).setMatch(match)
+                .setActions(openFlowActions)
+                .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
+        fm.setOutPort(OFPort.OFPP_NONE.getValue());
+        if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
+                || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
+            if (actionOutputPort != null)
+                fm.setOutPort(actionOutputPort);
+        }
+
+        //
+        // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
+        // permanent.
+        //
+        if ((flowEntry.idleTimeout() != 0) ||
+                (flowEntry.hardTimeout() != 0)) {
+            fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
+        }
+
+        if (log.isTraceEnabled()) {
+            log.trace("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
+                    , flowEntry.flowEntryUserState()
+                    , sw.getStringId()
+                    , flowEntry.flowEntryId()
+                    , matchSrcMac
+                    , matchDstMac
+                    , matchInPort
+                    , actionOutputPort
+            );
+        }
+
+        return addMessageImpl(sw, fm, priority);
+    }
+
+    /**
+     * Add message to queue
+     *
+     * @param sw
+     * @param msg
+     * @param flowEntryId
+     * @return
+     */
+    protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
+        FlowPusherThread thread = getProcessingThread(sw);
+
+        SwitchQueue queue = getQueue(sw);
+
+        // create queue at first addition of message
+        if (queue == null) {
+            queue = createQueueImpl(sw);
+        }
+
+        SwitchQueueEntry entry = new SwitchQueueEntry(msg);
+
+        synchronized (queue) {
+            queue.add(entry, priority);
+            if (log.isTraceEnabled()) {
+                log.trace("Message is pushed : {}", entry.getOFMessage());
+            }
+        }
+
+        thread.notifyMessagePushed();
+
+        return true;
+    }
+
+    @Override
+    public OFBarrierReply barrier(IOFSwitch sw) {
+        OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
+        if (future == null) {
+            return null;
+        }
+
+        try {
+            return future.get();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            log.error("InterruptedException: {}", e);
+            return null;
+        } catch (ExecutionException e) {
+            e.printStackTrace();
+            log.error("ExecutionException: {}", e);
+            return null;
+        }
+    }
+
+    @Override
+    public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
+        // TODO creation of message and future should be moved to OFSwitchImpl
+
+        if (sw == null) {
+            return null;
+        }
+
+        OFBarrierRequest msg = createBarrierRequest(sw);
+
+        OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+        barrierFutures.put(BarrierInfo.create(sw, msg), future);
+
+        addMessageImpl(sw, msg, MsgPriority.NORMAL);
+
+        return future;
+    }
+
+    protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
+        OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
+        msg.setXid(sw.getNextTransactionId());
+
+        return msg;
+    }
+
+    /**
+     * Get a queue attached to a switch.
+     *
+     * @param sw Switch object
+     * @return Queue object
+     */
+    protected SwitchQueue getQueue(IOFSwitch sw) {
+        if (sw == null) {
+            return null;
+        }
+
+        FlowPusherThread th = getProcessingThread(sw);
+        if (th == null) {
+            return null;
+        }
+
+        return th.assignedQueues.get(sw);
+    }
+
+    /**
+     * Get a hash value correspondent to a switch.
+     *
+     * @param sw Switch object
+     * @return Hash value
+     */
+    protected long getHash(IOFSwitch sw) {
+        // This code assumes DPID is sequentially assigned.
+        // TODO consider equalization algorithm
+        return sw.getId() % number_thread;
+    }
+
+    /**
+     * Get a Thread object which processes the queue attached to a switch.
+     *
+     * @param sw Switch object
+     * @return Thread object
+     */
+    protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
+        long hash = getHash(sw);
+
+        return threadMap.get(hash);
+    }
+
+    @Override
+    public String getName() {
+        return "flowpusher";
+    }
+
+    @Override
+    public boolean isCallbackOrderingPrereq(OFType type, String name) {
+        return false;
+    }
+
+    @Override
+    public boolean isCallbackOrderingPostreq(OFType type, String name) {
+        return false;
+    }
+
+    @Override
+    public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
+        if (log.isTraceEnabled()) {
+            log.trace("Received BARRIER_REPLY from : {}", sw.getId());
+        }
+
+        if (msg.getType() != OFType.BARRIER_REPLY) {
+            log.error("Unexpected reply message : {}", msg.getType());
+            return Command.CONTINUE;
+        }
+
+        OFBarrierReply reply = (OFBarrierReply) msg;
+        BarrierInfo info = BarrierInfo.create(sw, reply);
+
+        // Deliver future if exists
+        OFBarrierReplyFuture future = barrierFutures.get(info);
+        if (future != null) {
+            future.deliverFuture(sw, msg);
+            barrierFutures.remove(info);
+        }
+
+        return Command.CONTINUE;
+    }
+}