Cherry-pick from https://gerrit.onos.onlab.us/#/c/334

Changes for fixing reactiveFlow bugs
*Because of my test enviroment, I haven't tested this on the multiple onos instance enviroment.
*IDeviceStorage transaction close issue will be committed later.

-Implemented confirming flow add event feature and made some forwarding class changes to work with it.
-Made checking strictly the state of forwarding class.
-Added plus 2 sec of idle timeout for all flow entries except the head flow entry to avoid leaving a intermittent path.

Change-Id: I737c29da6241686c73079566c1b22947801c6e48
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index 882d5fa..479f170 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -6,6 +6,9 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import net.floodlightcontroller.core.FloodlightContext;
 import net.floodlightcontroller.core.IFloodlightProviderService;
@@ -57,9 +60,13 @@
 public class Forwarding implements IOFMessageListener, IFloodlightModule,
 									IForwardingService {
 	private final static Logger log = LoggerFactory.getLogger(Forwarding.class);
-
+   
 	private final int IDLE_TIMEOUT = 5; // seconds
 	private final int HARD_TIMEOUT = 0; // seconds
+	private final int SLEEP_TIME_FOR_DB_DEVICE_INSTALLED = 100; // milliseconds
+	private final static int NUMBER_OF_THREAD_FOR_EXECUTOR = 1;
+	
+	private final static ScheduledExecutorService executor = Executors.newScheduledThreadPool(NUMBER_OF_THREAD_FOR_EXECUTOR);
 	
 	private final CallerId callerId = new CallerId("Forwarding");
 	
@@ -91,6 +98,7 @@
 	private class PushedFlow {
 		public final long flowId;
 		public boolean installed = false;
+		public short firstOutPort;
 		
 		public PushedFlow(long flowId) {
 			this.flowId = flowId;
@@ -239,28 +247,69 @@
 				 eth.serialize(), sw.getId(), pi.getInPort()));
 	}
 	
-	private void handlePacketIn(IOFSwitch sw, OFPacketIn pi, Ethernet eth) {
+	private void handlePacketIn(IOFSwitch sw, OFPacketIn pi, Ethernet eth){
+		log.debug("Start handlePacketIn swId {}, portId {}", sw.getId(), pi.getInPort());
+
 		String destinationMac = 
 				HexString.toHexString(eth.getDestinationMACAddress()); 
 		
+		//FIXME TitanTransaction opened here probably needs to be either commit()/rollback() to avoid transaction leak, before exiting run().
+		//But it seems that IDeviceStorage does not provide a way to close transaction properly.
+		//Also getDeviceByMac() is a blocking call, so it may be better way to handle it to avoid the condition.
 		IDeviceObject deviceObject = deviceStorage.getDeviceByMac(
 				destinationMac);
 		
 		if (deviceObject == null) {
-			log.debug("No device entry found for {} - broadcasting packet", 
+			log.debug("No device entry found for {}",
 					destinationMac);
-			handleBroadcast(sw, pi, eth);
+			
+			//Device is not in the DB, so wait it until the device is added.
+			executor.schedule(new WaitDeviceArp(sw, pi, eth), SLEEP_TIME_FOR_DB_DEVICE_INSTALLED, TimeUnit.MILLISECONDS);
 			return;
 		}
 		
+		continueHandlePacketIn(sw, pi, eth, deviceObject);
+	}
+	
+	private class WaitDeviceArp implements Runnable {
+		IOFSwitch sw;
+		OFPacketIn pi;
+		Ethernet eth;
+
+		public WaitDeviceArp(IOFSwitch sw, OFPacketIn pi, Ethernet eth) {
+			super();
+			this.sw = sw;
+			this.pi = pi;
+			this.eth = eth;
+		}
+
+		@Override
+		public void run() {
+				IDeviceObject deviceObject = deviceStorage.getDeviceByMac(HexString.toHexString(eth.getDestinationMACAddress()));
+				if(deviceObject == null){
+					log.debug("wait {}ms and device was not found. Send broadcast packet and the thread finish.", SLEEP_TIME_FOR_DB_DEVICE_INSTALLED);
+					handleBroadcast(sw, pi, eth);
+					return;
+				}
+				log.debug("wait {}ms and device {} was found, continue",SLEEP_TIME_FOR_DB_DEVICE_INSTALLED, deviceObject.getMACAddress());
+				continueHandlePacketIn(sw, pi, eth, deviceObject);
+		}
+	}
+
+	private void continueHandlePacketIn(IOFSwitch sw, OFPacketIn pi, Ethernet eth, IDeviceObject deviceObject) {
+		log.debug("Start continuehandlePacketIn");
+
 		Iterator<IPortObject> ports = deviceObject.getAttachedPorts().iterator();	
 		if (!ports.hasNext()) {
 			log.debug("No attachment point found for device {} - broadcasting packet", 
-					destinationMac);
+					deviceObject.getMACAddress());
 			handleBroadcast(sw, pi, eth);
-			return;
+			return;	
 		}
+
+		//This code assumes the device has only one port. It should be problem.
 		IPortObject portObject = ports.next();
+
 		short destinationPort = portObject.getNumber();
 		ISwitchObject switchObject = portObject.getSwitch();
 		long destinationDpid = HexString.toLong(switchObject.getDPID());
@@ -276,16 +325,15 @@
 		
 		FlowPath flowPath, reverseFlowPath;
 		
-		Path pathspec = new Path(srcMacAddress, dstMacAddress);
-		// TODO check concurrency
 		synchronized (lock) {
+			//TODO check concurrency
+			Path pathspec = new Path(srcMacAddress, dstMacAddress);
 			PushedFlow existingFlow = pendingFlows.get(pathspec);
 
 			if (existingFlow != null) {
 				// We've already installed a flow for this pair of MAC addresses
-				log.debug("Found existing flow {}", 
-						HexString.toHexString(existingFlow.flowId));
-				
+				log.debug("Found existing same pathspec {}, Flow ID is {}",
+						pathspec, HexString.toHexString(existingFlow.flowId));
 				OFPacketOut po = constructPacketOut(pi, sw);
 				
 				// Find the correct port here. We just assume the PI is from 
@@ -316,14 +364,16 @@
 								srcMacAddress, dstMacAddress, flow});
 					}
 					else {
+						log.debug("Sending packet out from sw {}, outport{}", sw, flowEntryForThisSwitch.outPort().value());
 						sendPacketOut(sw, po, flowEntryForThisSwitch.outPort().value());
 					}
 				}
 				else {
-					// Flow has not yet been sent to switches so save the
+					//log.debug("Existing Flow ID {} is not installed. Continue to overwrite.",Long.toHexString(existingFlow.flowId) );
+					// Flow has not yet been installed to switches so save the
 					// packet out for later
-					waitingPackets.put(existingFlow.flowId, 
-							new PacketToPush(po, sw.getId()));
+					log.debug("Put a packet into the waitng list. flowId {}", Long.toHexString(existingFlow.flowId));
+					waitingPackets.put(existingFlow.flowId, new PacketToPush(po, sw.getId()));
 				}
 				return;
 			}
@@ -367,7 +417,6 @@
 			reverseFlowPath.flowEntryMatch().enableDstMac(srcMacAddress);
 			reverseFlowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
 			reverseFlowPath.setDataPath(reverseDataPath);
-			reverseFlowPath.dataPath().srcPort().dpid().toString();
 
 			FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
 			FlowId reverseFlowId = new FlowId(flowService.getNextFlowEntryId());
@@ -380,18 +429,22 @@
 			
 			// Add to waiting lists
 			pendingFlows.put(pathspec, new PushedFlow(flowId.value()));
+			log.debug("Put a Path {} in the pending flow, Flow ID {}", pathspec, flowId);
 			pendingFlows.put(reversePathSpec, new PushedFlow(reverseFlowId.value()));
-			waitingPackets.put(flowId.value(), new PacketToPush(po, sw.getId()));
-		
+			log.debug("Put a Path {} in the pending flow, Flow ID {}", reversePathSpec, reverseFlowId);
+			PacketToPush pp = new PacketToPush(po, sw.getId());
+			waitingPackets.put(flowId.value(), pp);
+
+			log.debug("Put a Packet in the wating list. relatedflowId {}, realatedReversedFlowId {}", 
+					flowId, reverseFlowId);
 		}
 		
-		log.debug("Adding reverse {} to {} flowid {}", new Object[] {
+		log.debug("Adding reverse {} to {}. Flow ID {}", new Object[] {
 				dstMacAddress, srcMacAddress, reverseFlowPath.flowId()});
 		flowService.addFlow(reverseFlowPath);
-		log.debug("Adding forward {} to {} flowid {}", new Object[] {
+		log.debug("Adding forward {} to {}. Flow ID {}", new Object[] {
 				srcMacAddress, dstMacAddress, flowPath.flowId()});
 		flowService.addFlow(flowPath);
-		
 	}
 
 	private OFPacketOut constructPacketOut(OFPacketIn pi, IOFSwitch sw) {	
@@ -422,67 +475,111 @@
 	
 	@Override
 	public void flowRemoved(FlowPath removedFlowPath) {
+		if(log.isDebugEnabled()){
+			log.debug("Flow {} was removed, having {} queued packets",
+					removedFlowPath.flowId(), waitingPackets.get(removedFlowPath.flowId().value()).size());
+		}
+
+
 		if (!removedFlowPath.installerId().equals(callerId)) {
 			// Not our flow path, ignore
 			return;
 		}
-		
+
 		MACAddress srcMacAddress = removedFlowPath.flowEntryMatch().srcMac();
 		MACAddress dstMacAddress = removedFlowPath.flowEntryMatch().dstMac();
 		
 		Path removedPath = new Path(srcMacAddress, dstMacAddress);
 		
 		synchronized (lock) {
-			pendingFlows.remove(removedPath);
-			
 			// There *shouldn't* be any packets queued if the flow has 
 			// just been removed. 
 			List<PacketToPush> packets = 
 					waitingPackets.removeAll(removedFlowPath.flowId().value());
 			if (!packets.isEmpty()) {
-				log.warn("Removed flow {} has packets queued", 
-						removedFlowPath.flowId());
+				log.warn("Removed flow {} has packets queued.",  removedFlowPath.flowId());
 			}
+			pendingFlows.remove(removedPath);
+			log.debug("Removed from the pendingFlow: Path {}, Flow ID {}", removedPath, removedFlowPath.flowId());
 		}
 	}
 
-	private void flowInstalled(FlowPath installedFlowPath) {
-		long flowId = installedFlowPath.flowId().value();
+	private void flowInstalled(FlowPath installedFlowPath) {	
+		log.debug("Flow {} was installed", installedFlowPath.flowId());
 		
 		if (!installedFlowPath.installerId().equals(callerId)) {
 			// Not our flow path, ignore
 			return;
+		}	
+
+		if(installedFlowPath.flowEntries().isEmpty()){
+			//If there is no flowEntry, ignore
+			log.warn("There is no flowEntry in the installedFlowPath id {}.return.", installedFlowPath.flowId());
+			return;
 		}
 		
+		MACAddress srcMacAddress = installedFlowPath.flowEntryMatch().srcMac();
+		MACAddress dstMacAddress = installedFlowPath.flowEntryMatch().dstMac();
+		Path installedPath = new Path(srcMacAddress, dstMacAddress);
+		Path reversedInstalledPath = new Path(dstMacAddress, srcMacAddress);
+		
 		// TODO waiting packets should time out. We could request a path that
 		// can't be installed right now because of a network partition. The path
 		// may eventually be installed, but we may have received thousands of 
 		// packets in the meantime and probably don't want to send very old packets.
-		short outPort = 
-				installedFlowPath.flowEntries().get(0).outPort().value();
 		
-		MACAddress srcMacAddress = installedFlowPath.flowEntryMatch().srcMac();
-		MACAddress dstMacAddress = installedFlowPath.flowEntryMatch().dstMac();
+		List<PacketToPush> packets;
+		List<PacketToPush> reversedPackets;
+		Short outPort = installedFlowPath.flowEntries().get(0).outPort().value();
+
+		PushedFlow existingFlow;
+		PushedFlow reversedExistingFlow;
 		
-		Collection<PacketToPush> packets;
 		synchronized (lock) {
-			packets = waitingPackets.removeAll(flowId);
-			
-			log.debug("Flow {} has been installed, sending {} queued packets",
-					installedFlowPath.flowId(), packets.size());
-			
-			// remove pending flows entry
-			Path installedPath = new Path(srcMacAddress, dstMacAddress);
-			PushedFlow existingFlow = pendingFlows.get(installedPath);
+			existingFlow = pendingFlows.get(installedPath);
+			reversedExistingFlow = pendingFlows.get(reversedInstalledPath);
+
 			if (existingFlow != null) {
 			    existingFlow.installed = true;
+			    existingFlow.firstOutPort = outPort;
+			} else {
+				log.debug("ExistingFlow {} is null", installedPath);
+				return;
+			}
+
+			if(reversedExistingFlow == null) {
+				log.debug("ReversedExistingFlow {} is null", reversedInstalledPath);
+				return;
+			}
+
+			//Check both existing flow and reversedExisting flow are installed status.
+			if(reversedExistingFlow.installed){
+				packets = waitingPackets.removeAll(existingFlow.flowId);
+				if(log.isDebugEnabled()){
+					log.debug("removed my packets {} to push from waitingPackets. outPort {} size {}",
+							Long.toHexString(existingFlow.flowId), existingFlow.firstOutPort, packets.size());
+				}
+				reversedPackets = waitingPackets.removeAll(reversedExistingFlow.flowId);
+				if(log.isDebugEnabled()){
+					log.debug("removed my reversed packets {} to push from waitingPackets. outPort {} size {}",
+							Long.toHexString(reversedExistingFlow.flowId), reversedExistingFlow.firstOutPort, reversedPackets.size());
+				}
+			}else{
+				log.debug("Forward or reverse flows hasn't been pushed yet. return");	
+				return;
 			}
 		}
-		
+
 		for (PacketToPush packet : packets) {
+			log.debug("Start packetToPush to sw {}, outPort {}", packet.dpid, existingFlow.firstOutPort);
 			IOFSwitch sw = floodlightProvider.getSwitches().get(packet.dpid);
-			
-			sendPacketOut(sw, packet.packet, outPort);
+			sendPacketOut(sw, packet.packet, existingFlow.firstOutPort);
+		}
+
+		for (PacketToPush packet : reversedPackets) {
+			log.debug("Start packetToPush to sw {}, outPort {}", packet.dpid, reversedExistingFlow.firstOutPort);
+			IOFSwitch sw = floodlightProvider.getSwitches().get(packet.dpid);
+			sendPacketOut(sw, packet.packet, reversedExistingFlow.firstOutPort);
 		}
 	}