Cherry-pick from https://gerrit.onos.onlab.us/#/c/92/4 (partially).

NOTE: Abandoned the implementation about automatic-barrier function.

FlowEntryAdded event is changed to be occurred after reception of BARRIER_REPLY (ONOS-915).
Fixed a bug that the same FlowEntry is installed to switch multiple times.

Change-Id: Ibb5cdaf9a478e2697232203d9190dedb4792f108

Fixed a bug that FlowEntries are shallow-copied in FlowPusher.

Change-Id: I704d245d3660a91f5e60fd6cdf05f89b1fba6c25

Fixed a problem that FlowPusherTest fails often(ONOS-984).

Change-Id: I2a290d5dc756e43a8058c6f35372f2e2932a8d73
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 21be62c..d2af973 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -11,6 +11,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 
@@ -29,7 +30,6 @@
 import net.floodlightcontroller.threadpool.IThreadPoolService;
 import net.floodlightcontroller.util.MACAddress;
 import net.floodlightcontroller.util.OFMessageDamper;
-import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
 import net.onrc.onos.ofcontroller.util.FlowEntryAction;
 import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
@@ -54,7 +54,6 @@
  */
 public class FlowPusher implements IFlowPusherService, IOFMessageListener {
     private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
-    protected volatile IFlowService flowManager;
 
     // NOTE: Below are moved from FlowManager.
     // TODO: Values copied from elsewhere (class LearningSwitch).
@@ -70,6 +69,18 @@
 		READY,
 		SUSPENDED,
 	}
+
+	private static class SwitchQueueEntry {
+		OFMessage msg;
+		
+		public SwitchQueueEntry(OFMessage msg) {
+			this.msg = msg;
+		}
+		
+		public OFMessage getOFMessage() {
+			return msg;
+		}
+	}
 	
 	/**
 	 * SwitchQueue represents message queue attached to a switch.
@@ -77,7 +88,7 @@
 	 * @author Naoki Shiota
 	 *
 	 */
-	private class SwitchQueue extends ArrayDeque<OFMessage> {
+	private class SwitchQueue extends ArrayDeque<SwitchQueueEntry> {
 		private static final long serialVersionUID = 1L;
 
 		QueueState state;
@@ -119,6 +130,52 @@
 			last_sent_time = current;
 			last_sent_size = size;
 		}
+	}
+	
+	/**
+	 * BarrierInfo holds information to specify barrier message sent to switch.
+	 * @author Naoki
+	 */
+	private static class BarrierInfo {
+		final long dpid;
+		final int xid;
+		
+		static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
+			return new BarrierInfo(sw.getId(), req.getXid());
+		}
+		
+		static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
+			return new BarrierInfo(sw.getId(), rpy.getXid());
+		}
+		
+		private BarrierInfo(long dpid, int xid) {
+			this.dpid = dpid;
+			this.xid = xid;
+		}
+
+		// Auto generated code by Eclipse
+		@Override
+		public int hashCode() {
+			final int prime = 31;
+			int result = 1;
+			result = prime * result + (int) (dpid ^ (dpid >>> 32));
+			result = prime * result + xid;
+			return result;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (this == obj)
+				return true;
+			if (obj == null)
+				return false;
+			if (getClass() != obj.getClass())
+				return false;
+			
+			BarrierInfo other = (BarrierInfo) obj;
+			return (this.dpid == other.dpid) && (this.xid == other.xid);
+		}
+		
 		
 	}
 	
@@ -130,9 +187,9 @@
 	
 	// Map of threads versus dpid
 	private Map<Long, FlowPusherThread> threadMap = null;
-	// Map of Future objects versus dpid and transaction ID.
-	private Map<Long, Map<Integer, OFBarrierReplyFuture>>
-		barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
+	// Map from (DPID and transaction ID) to Future objects.
+	private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
+		= new ConcurrentHashMap<BarrierInfo,OFBarrierReplyFuture>();
 	
 	private int number_thread = 1;
 	
@@ -143,7 +200,7 @@
 	 */
 	private class FlowPusherThread extends Thread {
 		private Map<IOFSwitch,SwitchQueue> queues
-			= new HashMap<IOFSwitch,SwitchQueue>();
+			= new ConcurrentHashMap<IOFSwitch,SwitchQueue>();
 		
 		// reusable latch used for waiting for arrival of message
 		private Semaphore mutex = new Semaphore(0);
@@ -163,10 +220,8 @@
 				
 				// for safety of concurrent access, copy all key objects
 				Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
-				synchronized (queues) {
-					for (IOFSwitch sw : queues.keySet()) {
-						keys.add(sw);
-					}
+				for (IOFSwitch sw : queues.keySet()) {
+					keys.add(sw);
 				}
 				
 				for (IOFSwitch sw : keys) {
@@ -183,9 +238,7 @@
 						if (queue.isEmpty()) {
 							// remove queue if flagged to be.
 							if (queue.toBeDeleted) {
-								synchronized (queues) {
-									queues.remove(sw);
-								}
+								queues.remove(sw);
 							}
 						} else {
 							// if some messages remains in queue, latch down
@@ -206,7 +259,7 @@
 		 * @param max_msg Limitation of number of messages to be sent. If set to 0,
 		 *                all messages in queue will be sent.
 		 */
-		private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
+		private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
 			// check sending rate and determine it to be sent or not
 			long current_time = System.currentTimeMillis();
 			long size = 0;
@@ -220,16 +273,24 @@
 					}
 					++i;
 					
-					OFMessage msg = queue.poll();
+					SwitchQueueEntry queueEntry;
+					synchronized (queue) {
+						queueEntry = queue.poll();
+					}
+					
+					OFMessage msg = queueEntry.getOFMessage();
 					try {
 						messageDamper.write(sw, msg, context);
-//						log.debug("Pusher sends message : {}", msg);
+						if (log.isTraceEnabled()) {
+							log.trace("Pusher sends message : {}", msg);
+						}
 						size += msg.getLength();
 					} catch (IOException e) {
 						e.printStackTrace();
 						log.error("Exception in sending message ({}) : {}", msg, e);
 					}
 				}
+				
 				sw.flush();
 				queue.logSentData(current_time, size);
 			}
@@ -269,7 +330,6 @@
 		this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
 		IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
 		flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
-		flowManager = modContext.getServiceImpl(IFlowService.class);
 		
 		if (damper != null) {
 			messageDamper = damper;
@@ -374,7 +434,9 @@
 		
 		if (rate > 0) {
 			log.debug("rate for {} is set to {}", sw.getId(), rate);
-			queue.max_rate = rate;
+			synchronized (queue) {
+				queue.max_rate = rate;
+			}
 		}
 	}
 
@@ -388,9 +450,7 @@
 		FlowPusherThread proc = getProcess(sw);
 		queue = new SwitchQueue();
 		queue.state = QueueState.READY;
-		synchronized (proc.queues) {
-			proc.queues.put(sw, queue);
-		}
+		proc.queues.put(sw, queue);
 		
 		return true;
 	}
@@ -405,11 +465,9 @@
 		FlowPusherThread proc = getProcess(sw);
 		
 		if (forceStop) {
-			synchronized (proc.queues) {
-				SwitchQueue queue = proc.queues.remove(sw);
-				if (queue == null) {
-					return false;
-				}
+			SwitchQueue queue = proc.queues.remove(sw);
+			if (queue == null) {
+				return false;
 			}
 			return true;
 		} else {
@@ -426,48 +484,16 @@
 	
 	@Override
 	public boolean add(IOFSwitch sw, OFMessage msg) {
-		FlowPusherThread proc = getProcess(sw);
-		SwitchQueue queue = proc.queues.get(sw);
-
-		// create queue at first addition of message
-		if (queue == null) {
-			createQueue(sw);
-			queue = getQueue(sw);
-		}
-		
-		synchronized (queue) {
-			queue.add(msg);
-//			log.debug("Message is pushed : {}", msg);
-		}
-		
-		if (proc.mutex.availablePermits() == 0) {
-			proc.mutex.release();
-		}
-
-		return true;
+		return addMessageImpl(sw, msg);
 	}
-
+	
 	@Override
 	public void pushFlowEntries(
 		Collection<Pair<IOFSwitch, FlowEntry>> entries) {
 
-		List<Pair<IOFSwitch, FlowEntry>> pushedEntries =
-			new LinkedList<Pair<IOFSwitch, FlowEntry>>();
-
 		for (Pair<IOFSwitch, FlowEntry> entry : entries) {
-			if (add(entry.first, entry.second)) {
-				pushedEntries.add(entry);
-			}
+			add(entry.first, entry.second);
 		}
-
-		//
-		// TODO: We should use the OpenFlow Barrier mechanism
-		// to check for errors, and update the SwitchState
-		// for a flow entry after the Barrier message is
-		// is received.
-		// Only after inform the Flow Manager that the entry is pushed.
-		//
-		flowManager.flowEntriesPushedToSwitch(pushedEntries);
 	}
 
 	@Override
@@ -755,7 +781,39 @@
 			, actionOutputPort
 			);
 
-		return add(sw, fm);
+		return addMessageImpl(sw, fm);
+	}
+	
+	/**
+	 * Add message to queue
+	 * @param sw
+	 * @param msg
+	 * @param flowEntryId
+	 * @return
+	 */
+	protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg) {
+		FlowPusherThread proc = getProcess(sw);
+		SwitchQueue queue = proc.queues.get(sw);
+
+		// create queue at first addition of message
+		if (queue == null) {
+			createQueue(sw);
+			queue = getQueue(sw);
+		}
+		
+		SwitchQueueEntry entry = new SwitchQueueEntry(msg);
+		synchronized (queue) {
+			queue.add(entry);
+			if (log.isTraceEnabled()) {
+				log.trace("Message is pushed : {}", entry.getOFMessage());
+			}
+		}
+		
+		if (proc.mutex.availablePermits() == 0) {
+			proc.mutex.release();
+		}
+
+		return true;
 	}
 	
 	@Override
@@ -786,24 +844,23 @@
 			return null;
 		}
 		
-		OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
-		msg.setXid(sw.getNextTransactionId());
-
-		OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
-		synchronized (barrierFutures) {
-			Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
-			if (map == null) {
-				map = new HashMap<Integer,OFBarrierReplyFuture>();
-				barrierFutures.put(sw.getId(), map);
-			}
-			map.put(msg.getXid(), future);
-		}
+		OFBarrierRequest msg = createBarrierRequest(sw);
 		
-		add(sw, msg);
+		OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+		barrierFutures.put(BarrierInfo.create(sw,msg), future);
+		
+		addMessageImpl(sw, msg);
 		
 		return future;
 	}
 
+	protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
+		OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
+		msg.setXid(sw.getNextTransactionId());
+		
+		return msg;
+	}
+	
 	/**
 	 * Get a queue attached to a switch.
 	 * @param sw Switch object
@@ -838,7 +895,7 @@
 		
 		return threadMap.get(hash);
 	}
-
+	
 	@Override
 	public String getName() {
 		return "flowpusher";
@@ -856,22 +913,25 @@
 
 	@Override
 	public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
-		Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
-		if (map == null) {
-			log.debug("null map for {} : {}", sw.getId(), barrierFutures);
+		if (log.isTraceEnabled()) {
+			log.trace("Received BARRIER_REPLY from : {}", sw.getId());
+		}
+
+		if (msg.getType() != OFType.BARRIER_REPLY) {
+			log.error("Unexpected reply message : {}", msg.getType());
 			return Command.CONTINUE;
 		}
 		
-		OFBarrierReplyFuture future = map.get(msg.getXid());
-		if (future == null) {
-			log.debug("null future for {} : {}", msg.getXid(), map);
-			return Command.CONTINUE;
-		}
+		OFBarrierReply reply = (OFBarrierReply) msg;
+		BarrierInfo info = BarrierInfo.create(sw,reply);
 		
-		log.debug("Received BARRIER_REPLY : {}", msg);
-		future.deliverFuture(sw, msg);
+		// Deliver future if exists
+		OFBarrierReplyFuture future = barrierFutures.get(info);
+		if (future != null) {
+			future.deliverFuture(sw, msg);
+			barrierFutures.remove(info);
+		}
 		
 		return Command.CONTINUE;
 	}
-
 }