Merge pull request #507 from pingping-lin/master

Fixed the wrong device attachment points
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index 02972e2..133f29c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -7,7 +7,6 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 import net.floodlightcontroller.core.FloodlightContext;
 import net.floodlightcontroller.core.IFloodlightProviderService;
@@ -62,6 +61,8 @@
 
 	private final int IDLE_TIMEOUT = 5; // seconds
 	private final int HARD_TIMEOUT = 0; // seconds
+
+	private final int PATH_PUSHED_TIMEOUT = 3000; // milliseconds
 	
 	private IFloodlightProviderService floodlightProvider;
 	private IFlowService flowService;
@@ -71,12 +72,15 @@
 	private IDeviceStorage deviceStorage;
 	private TopologyManager topologyService;
 	
-	private Map<Path, Long> pendingFlows;
+	//private Map<Path, Long> pendingFlows;
+	// TODO it seems there is a Guava collection that will time out entries.
+	// We should see if this will work here.
+	private Map<Path, PushedFlow> pendingFlows;
 	private ListMultimap<Long, PacketToPush> waitingPackets;
 	
 	private final Object lock = new Object();
 	
-	public class PacketToPush {
+	private class PacketToPush {
 		public final OFPacketOut packet;
 		public final long dpid;
 		
@@ -86,15 +90,35 @@
 		}
 	}
 	
-	public final class Path {
+	private class PushedFlow {
+		public final long flowId;
+		private final long pushedTime;
+		public short firstHopOutPort = OFPort.OFPP_NONE.getValue();
+		
+		public PushedFlow(long flowId) {
+			this.flowId = flowId;
+			pushedTime = System.currentTimeMillis();
+		}
+		
+		public boolean isExpired() {
+			return (System.currentTimeMillis() - pushedTime) > PATH_PUSHED_TIMEOUT;
+		}
+	}
+	
+	private final class Path {
 		public final SwitchPort srcPort;
 		public final SwitchPort dstPort;
+		public final MACAddress srcMac;
+		public final MACAddress dstMac;
 		
-		public Path(SwitchPort src, SwitchPort dst) {
+		public Path(SwitchPort src, SwitchPort dst, 
+				MACAddress srcMac, MACAddress dstMac) {
 			srcPort = new SwitchPort(new Dpid(src.dpid().value()), 
 					new Port(src.port().value()));
 			dstPort = new SwitchPort(new Dpid(dst.dpid().value()), 
 					new Port(dst.port().value()));
+			this.srcMac = srcMac;
+			this.dstMac = dstMac;
 		}
 		
 		@Override
@@ -105,7 +129,9 @@
 			
 			Path otherPath = (Path) other;
 			return srcPort.equals(otherPath.srcPort) && 
-					dstPort.equals(otherPath.dstPort);
+					dstPort.equals(otherPath.dstPort) &&
+					srcMac.equals(otherPath.srcMac) &&
+					dstMac.equals(otherPath.dstMac);
 		}
 		
 		@Override
@@ -113,8 +139,16 @@
 			int hash = 17;
 			hash = 31 * hash + srcPort.hashCode();
 			hash = 31 * hash + dstPort.hashCode();
+			hash = 31 * hash + srcMac.hashCode();
+			hash = 31 * hash + dstMac.hashCode();
 			return hash;
 		}
+		
+		@Override
+		public String toString() {
+			return "(" + srcMac + " at " + srcPort + ") => (" 
+					+ dstPort + " at " + dstMac + ")";
+		}
 	}
 	
 	@Override
@@ -154,7 +188,8 @@
 		
 		floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
 		
-		pendingFlows = new ConcurrentHashMap<Path, Long>();
+		//pendingFlows = new ConcurrentHashMap<Path, Long>();
+		pendingFlows = new HashMap<Path, PushedFlow>();
 		//waitingPackets = Multimaps.synchronizedSetMultimap(
 				//HashMultimap.<Long, PacketToPush>create());
 		//waitingPackets = HashMultimap.create();
@@ -280,20 +315,35 @@
 		
 		FlowPath flowPath, reverseFlowPath;
 		
-		Path pathspec = new Path(srcSwitchPort, dstSwitchPort);
+		Path pathspec = new Path(srcSwitchPort, dstSwitchPort, 
+				srcMacAddress, dstMacAddress);
 		// TODO check concurrency
 		synchronized (lock) {
-			Long existingFlowId = pendingFlows.get(pathspec);
+			PushedFlow existingFlow = pendingFlows.get(pathspec);
+			//Long existingFlowId = pendingFlows.get(pathspec);
 			
-			if (existingFlowId != null) {
+			if (existingFlow != null && !existingFlow.isExpired()) {
 				log.debug("Found existing flow {}", 
-						HexString.toHexString(existingFlowId));
+						HexString.toHexString(existingFlow.flowId));
 				
 				OFPacketOut po = constructPacketOut(pi, sw);
-				waitingPackets.put(existingFlowId, new PacketToPush(po, sw.getId()));
+				
+				if (existingFlow.firstHopOutPort != OFPort.OFPP_NONE.getValue()) {
+					// Flow has been sent to the switches so it is safe to
+					// send a packet out now
+					sendPacketOut(sw, po, existingFlow.firstHopOutPort);
+				}
+				else {
+					// Flow has not yet been sent to switches so save the
+					// packet out for later
+					waitingPackets.put(existingFlow.flowId, 
+							new PacketToPush(po, sw.getId()));
+				}
 				return;
 			}
 			
+			//log.debug("Couldn't match {} in {}", pathspec, pendingFlows);
+			
 			log.debug("Adding new flow between {} at {} and {} at {}",
 					new Object[]{srcMacAddress, srcSwitchPort, dstMacAddress, dstSwitchPort});
 			
@@ -347,11 +397,14 @@
 			reverseFlowPath.setFlowId(reverseFlowId);
 			
 			OFPacketOut po = constructPacketOut(pi, sw);
-			Path reversePathSpec = new Path(dstSwitchPort, srcSwitchPort);
+			Path reversePathSpec = new Path(dstSwitchPort, srcSwitchPort, 
+					dstMacAddress, srcMacAddress);
 			
 			// Add to waiting lists
-			pendingFlows.put(pathspec, flowId.value());
-			pendingFlows.put(reversePathSpec, reverseFlowId.value());
+			//pendingFlows.put(pathspec, flowId.value());
+			//pendingFlows.put(reversePathSpec, reverseFlowId.value());
+			pendingFlows.put(pathspec, new PushedFlow(flowId.value()));
+			pendingFlows.put(reversePathSpec, new PushedFlow(reverseFlowId.value()));
 			waitingPackets.put(flowId.value(), new PacketToPush(po, sw.getId()));
 		
 		}
@@ -416,33 +469,42 @@
 	}
 
 	private void flowInstalled(FlowPath installedFlowPath) {
-		// TODO check concurrency
-		// will need to sync and access both collections at once.
 		long flowId = installedFlowPath.flowId().value();
 		
+		short outPort = 
+				installedFlowPath.flowEntries().get(0).outPort().value();
+		
+		MACAddress srcMacAddress = installedFlowPath.flowEntryMatch().srcMac();
+		MACAddress dstMacAddress = installedFlowPath.flowEntryMatch().dstMac();
+		
 		Collection<PacketToPush> packets;
 		synchronized (lock) {
+			log.debug("Flow {} has been installed, sending queued packets",
+					installedFlowPath.flowId());
+			
 			packets = waitingPackets.removeAll(flowId);
 			
-			//remove pending flows entry
-			Path pathToRemove = new Path(installedFlowPath.dataPath().srcPort(),
-					installedFlowPath.dataPath().dstPort());
-			pendingFlows.remove(pathToRemove);
-			
+			// remove pending flows entry
+			Path installedPath = new Path(installedFlowPath.dataPath().srcPort(),
+					installedFlowPath.dataPath().dstPort(),
+					srcMacAddress, dstMacAddress);
+			//pendingFlows.remove(pathToRemove);
+			pendingFlows.get(installedPath).firstHopOutPort = outPort;
 		}
 		
 		for (PacketToPush packet : packets) {
 			IOFSwitch sw = floodlightProvider.getSwitches().get(packet.dpid);
 			
-			OFPacketOut po = packet.packet;
-			short outPort = 
-					installedFlowPath.flowEntries().get(0).outPort().value();
-			po.getActions().add(new OFActionOutput(outPort));
-			po.setActionsLength((short)
-					(po.getActionsLength() + OFActionOutput.MINIMUM_LENGTH));
-			po.setLengthU(po.getLengthU() + OFActionOutput.MINIMUM_LENGTH);
-			
-			flowPusher.add(sw, po);
+			sendPacketOut(sw, packet.packet, outPort);
 		}
 	}
+	
+	private void sendPacketOut(IOFSwitch sw, OFPacketOut po, short outPort) {
+		po.getActions().add(new OFActionOutput(outPort));
+		po.setActionsLength((short)
+				(po.getActionsLength() + OFActionOutput.MINIMUM_LENGTH));
+		po.setLengthU(po.getLengthU() + OFActionOutput.MINIMUM_LENGTH);
+		
+		flowPusher.add(sw, po);
+	}
 }
diff --git a/start-onos.sh b/start-onos.sh
index 29a108c..a9d4d64 100755
--- a/start-onos.sh
+++ b/start-onos.sh
@@ -1,21 +1,21 @@
 #!/bin/bash
 
 # Set paths
-if [ -z "${ONOS_HOME}" ]; then
-        ONOS_HOME=`dirname $0`
-fi
+ONOS_HOME="${ONOS_HOME:-`dirname $0`}"
 
 ## Because the script change dir to $ONOS_HOME, we can set ONOS_LOGBACK and LOGDIR relative to $ONOS_HOME
-#ONOS_LOGBACK="${ONOS_HOME}/logback.`hostname`.xml"
-#LOGDIR=${ONOS_HOME}/onos-logs
-ONOS_LOGBACK="./logback.`hostname`.xml"
-LOGDIR=./onos-logs
+ONOS_LOGBACK=${ONOS_LOGBACK:-${ONOS_HOME}/logback.`hostname`.xml}
+LOGDIR=${ONOS_LOGDIR:-${ONOS_HOME}/onos-logs}
+
 ONOS_LOG="${LOGDIR}/onos.`hostname`.log"
 PCAP_LOG="${LOGDIR}/onos.`hostname`.pcap"
 LOGS="$ONOS_LOG $PCAP_LOG"
 
+ONOS_PROPS=${ONOS_PROPS:-${ONOS_HOME}/conf/onos.properties}
+JMX_PORT=${JMX_PORT:-7189}
+
 # Set JVM options
-JVM_OPTS=""
+JVM_OPTS="${JVM_OPTS:-}"
 ## If you want JaCoCo Code Coverage reports... uncomment line below
 #JVM_OPTS="$JVM_OPTS -javaagent:${ONOS_HOME}/lib/jacocoagent.jar=dumponexit=true,output=file,destfile=${LOGDIR}/jacoco.exec"
 JVM_OPTS="$JVM_OPTS -server -d64"
@@ -31,7 +31,7 @@
 		-XX:+UseThreadPriorities \
 		-XX:ThreadPriorityPolicy=42 \
 		-XX:+UseCompressedOops \
-		-Dcom.sun.management.jmxremote.port=7189 \
+		-Dcom.sun.management.jmxremote.port=$JMX_PORT \
 		-Dcom.sun.management.jmxremote.ssl=false \
 		-Dcom.sun.management.jmxremote.authenticate=false"
 JVM_OPTS="$JVM_OPTS -Dhazelcast.logging.type=slf4j"
@@ -39,9 +39,7 @@
 # Set ONOS core main class
 MAIN_CLASS="net.onrc.onos.ofcontroller.core.Main"
 
-if [ -z "${MVN}" ]; then
-    MVN="mvn -o"
-fi
+MVN=${MVN:-mvn -o}
 
 #<logger name="net.floodlightcontroller.linkdiscovery.internal" level="TRACE"/>
 #<appender-ref ref="STDOUT" />
@@ -101,15 +99,15 @@
 
   # Run ONOS
   echo "Starting ONOS controller ..."
-  echo 
+  echo
 
-  # XXX : MVN has to run at the project top dir 
+  # XXX : MVN has to run at the project top dir
   echo $ONOS_HOME
   cd ${ONOS_HOME}
-  pwd 
-  echo "${MVN} exec:exec -Dexec.executable=\"java\" -Dexec.args=\"${JVM_OPTS} -Dlogback.configurationFile=${ONOS_LOGBACK} -cp %classpath ${MAIN_CLASS} -cf ./conf/onos.properties\""
+  pwd
+  echo "${MVN} exec:exec -Dexec.executable=\"java\" -Dexec.args=\"${JVM_OPTS} -Dlogback.configurationFile=${ONOS_LOGBACK} -cp %classpath ${MAIN_CLASS} -cf ${ONOS_PROPS}\""
 
-  ${MVN} exec:exec -Dexec.executable="java" -Dexec.args="${JVM_OPTS} -Dlogback.configurationFile=${ONOS_LOGBACK} -cp %classpath ${MAIN_CLASS} -cf ./conf/onos.properties" > ${LOGDIR}/onos.`hostname`.stdout 2>${LOGDIR}/onos.`hostname`.stderr &
+  ${MVN} exec:exec -Dexec.executable="java" -Dexec.args="${JVM_OPTS} -Dlogback.configurationFile=${ONOS_LOGBACK} -cp %classpath ${MAIN_CLASS} -cf ${ONOS_PROPS}" > ${LOGDIR}/onos.`hostname`.stdout 2>${LOGDIR}/onos.`hostname`.stderr &
 
   echo "Waiting for ONOS to start..."
   COUNT=0
@@ -161,13 +159,17 @@
   start)
     stop
     check_db
-    start 
+    start
+    ;;
+  startnokill)
+    check_db
+    start
     ;;
   startifdown)
     n=`jps -l |grep "${MAIN_CLASS}" | wc -l`
     if [ $n == 0 ]; then
       start
-    else 
+    else
       echo "$n instance of onos running"
     fi
     ;;
diff --git a/vm-utils/onos.py b/vm-utils/onos.py
new file mode 100755
index 0000000..c6b96dd
--- /dev/null
+++ b/vm-utils/onos.py
@@ -0,0 +1,216 @@
+#!/usr/bin/env python
+
+"""
+onos.py: A simple ONOS Controller() subclass for Mininet
+
+We implement the following classes:
+
+ONOSController: a custom Controller() subclass to start ONOS
+OVSSwitchONOS: a custom OVSSwitch() switch that connects to multiple controllers.
+
+We use single Zookeeper and Cassandra instances for now.
+
+As a custom file, exports:
+
+--controller onos
+--switch ovso
+
+Usage:
+
+$ sudo ./onos.py
+
+This will start up a simple 2-host, 2 ONOS network
+
+$ sudo mn --custom onos.py --controller onos,2 --switch ovso
+"""
+
+from mininet.node import Controller, OVSSwitch
+from mininet.net import Mininet
+from mininet.cli import CLI
+from mininet.topo import SingleSwitchTopo
+from mininet.log import setLogLevel, info
+from mininet.util import quietRun
+
+from shutil import copyfile
+from os import environ
+from functools import partial
+
+
+class ONOS( Controller ):
+    "Custom controller class for ONOS"
+
+    # Directories and configuration templates
+    home = environ[ 'HOME' ]
+    onosDir = home + "/ONOS"
+    zookeeperDir = home + "/zookeeper-3.4.5"
+    dirBase = '/tmp'
+    logDir = dirBase + '/onos-%s.logs'
+    cassDir = dirBase + '/onos-%s.cassandra'
+    configFile = dirBase + '/onos-%s.properties'
+    logbackFile = dirBase + '/onos-%s.logback'
+    jmxbase = 7189
+    restbase = 8080
+    ofbase = 6633
+
+    # Per-instance property template
+    fc = 'net.floodlightcontroller.'
+    proctag = 'mn-onos-id'
+    jvmopts = (
+        # We match on this to shut down our instances
+        ( proctag, 0 ),
+        ( fc + 'restserver.RestApiServer.port', restbase ),
+        ( fc + 'core.FloodlightProvider.openflowport', ofbase ),
+        ( fc + 'core.FloodlightProvider.controllerid', 0 ) )
+
+
+    # For maven debugging
+    # mvn = 'mvn -o -e -X'
+
+    def __init__( self, name, n=1, drop=True, **params):
+        """n: number of ONOS instances to run (1)
+           drop: drop root privileges (True)"""
+        self.check()
+        self.drop = drop
+        self.count = n
+        self.ids = range( 0, self.count )
+        Controller.__init__( self, name, **params )
+        # We don't need to run as root, and it can interfere
+        # with starting Zookeeper manually
+        if self.drop:
+            self.user = quietRun( 'who am i' ).split()[ 0 ]
+            self.sendCmd( 'su', self.user )
+            self.waiting = False
+        # Need to run commands from ONOS dir
+        self.cmd( 'cd', self.onosDir )
+        self.cmd( 'export PATH=$PATH:%s' % self.onosDir )
+        if hasattr( self, 'mvn' ):
+            self.cmd( 'export MVN="%s"' % self.mvn )
+
+    def check( self ):
+        "Check for prerequisites"
+        if not quietRun( 'which java' ):
+                raise Exception( 'java not found -'
+                                 ' make sure it is installed and in $PATH' )
+        if not quietRun( 'which mvn' ):
+                raise Exception( 'Maven (mvn) not found -'
+                                ' make sure it is installed and in $PATH' )
+
+    def startCassandra( self ):
+        "Start Cassandra"
+        self.cmd( 'start-cassandra.sh start' )
+        status = self.cmd( 'start-cassandra.sh status' )
+        if 'Error' in status:
+            raise Exception( 'Cassandra startup failed: ' + status )
+
+    def stopCassandra( self ):
+        "Stop Cassandra"
+        self.cmd( 'start-cassandra.sh stop' )
+
+    def startZookeeper( self, initcfg=True ):
+        "Start Zookeeper"
+        # Reinitialize configuration file
+        if initcfg:
+            cfg = self.zookeeperDir + '/conf/zoo.cfg'
+            template = self.zookeeperDir + '/conf/zoo_sample.cfg'
+            copyfile( template, cfg )
+        self.cmd( 'start-zk.sh restart' )
+        status = self.cmd( 'start-zk.sh status' )
+        if 'Error' in status:
+            raise Exception( 'Zookeeper startup failed: ' + status )
+
+    def stopZookeeper( self ):
+        "Stop Zookeeper"
+        self.cmd( 'start-zk.sh stop' )
+
+    def setVars( self, id ):
+        "Set and return environment vars"
+        # ONOS directories and files
+        logdir = self.logDir % id
+        cassdir = self.cassDir % id
+        logback = self.logbackFile % id
+        jmxport = self.jmxbase + id
+        self.cmd( 'mkdir -p', logdir, cassdir )
+        self.cmd( 'export ONOS_LOGDIR="%s"' % logdir )
+        self.cmd( 'export ZOO_LOG_DIR="%s"' % logdir )
+        self.cmd( 'export CASS_DIR="%s"' % cassdir )
+        self.cmd( 'export ONOS_LOGBACK="%s"' % logback )
+        self.cmd( 'export JMX_PORT=%s' % jmxport )
+        jvmopts = ('-agentlib:jdwp=transport=dt_socket,address=%s,server=y,suspend=n '
+            % ( 8000 + id ) )
+        jvmopts += ' '.join( '-D%s=%s '% ( opt, val + id )
+            for opt, val in self.jvmopts )
+        self.cmd( 'export JVM_OPTS="%s"' % jvmopts )
+
+    def startONOS( self, id ):
+        """Start ONOS
+           id: identifier for new instance"""
+        # self.stopONOS( id )
+        self.setVars( id )
+        self.cmdPrint( 'start-onos.sh startnokill' )
+
+    def stopONOS( self, id ):
+        """Shut down ONOS
+           id: identifier for instance"""
+        pid = self.cmd( "jps -v | grep %s=%s | awk '{print $1}'" %
+            ( self.proctag, id ) ).strip()
+        if pid:
+            self.cmdPrint( 'kill', pid )
+
+    def start( self, *args ):
+        "Start ONOS instances"
+        info( '* Starting Cassandra\n' )
+        self.startCassandra()
+        info( '* Starting Zookeeper\n' )
+        self.startZookeeper()
+        for id in self.ids:
+            info( '* Starting ONOS %s\n' % id )
+            self.startONOS( id )
+
+    def stop( self, *args ):
+        "Stop ONOS instances"
+        for id in self.ids:
+            info( '* Stopping ONOS %s\n' % id )
+            self.stopONOS( id )
+        info( '* Stopping zookeeper\n' )
+        self.stopZookeeper()
+        info( '* Stopping Cassandra\n' )
+        self.stopCassandra()
+
+    def clist( self ):
+        "Return list of controller specifiers (proto:ip:port)"
+        return [ 'tcp:127.0.0.1:%s' % ( self.ofbase + id )
+            for id in range( 0, self.count ) ]
+
+
+class OVSSwitchONOS( OVSSwitch ):
+    "OVS switch which connects to multiple controllers"
+    def start( self, controllers ):
+        OVSSwitch.start( self, controllers )
+        assert len( controllers ) == 1
+        c0 = controllers[ 0 ]
+        assert type( c0 ) == ONOS
+        clist = ','.join( c0.clist() )
+        self.cmd( 'ovs-vsctl set-controller', self, clist)
+        # Reconnect quickly to controllers (1s vs. 15s max_backoff)
+        for uuid in self.controllerUUIDs():
+            if uuid.count( '-' ) != 4:
+                # Doesn't look like a UUID
+                continue
+            uuid = uuid.strip()
+            self.cmd( 'ovs-vsctl set Controller', uuid,
+                      'max_backoff=1000' )
+
+
+controllers = { 'onos': ONOS }
+switches = { 'ovso': OVSSwitchONOS }
+
+
+if __name__ == '__main__':
+    "Simple test of ONOSController"
+    setLogLevel( 'info' )
+    net = Mininet( topo=SingleSwitchTopo( 2 ),
+                   controller=partial( ONOS, n=2 ),
+                   switch=OVSSwitchONOS )
+    net.start()
+    CLI( net )
+    net.stop()