Merge master branch into syncdev17

Conflicts:
	src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
diff --git a/cluster-mgmt/template/onsdemo_edge_template.py b/cluster-mgmt/template/onsdemo_edge_template.py
index e340f38..c3d0287 100755
--- a/cluster-mgmt/template/onsdemo_edge_template.py
+++ b/cluster-mgmt/template/onsdemo_edge_template.py
@@ -65,7 +65,7 @@
             switch.append(sw)
 
         for i in range (NR_NODES):
-            host.append(self.addHost( 'host%d' % (int(i)+1) ))
+            host.append(self.addHost( 'host%d.%d' % (NWID, int(i)+1) ))
 
         for i in range (NR_NODES):
             self.addLink(host[i], switch[i])
@@ -117,7 +117,7 @@
 
     host = []
     for i in range (NR_NODES):
-      host.append(net.get( 'host%d' % (int(i)+1) ))
+      host.append(net.get( 'host%d.%d' % (NWID, (int(i)+1)) ))
 
     net.start()
 
diff --git a/kryo2/pom.xml b/kryo2/pom.xml
index 1beb87d..788f952 100644
--- a/kryo2/pom.xml
+++ b/kryo2/pom.xml
@@ -7,7 +7,7 @@
   <version>2.22</version>
   <packaging>jar</packaging>
 
-  <name>kyro2</name>
+  <name>kryo2</name>
   <url>http://maven.apache.org</url>
 
   <properties>
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 180cbe9..775f952 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -607,6 +607,29 @@
     }
 
     /**
+     * Get a Flow for a given Flow ID.
+     *
+     * @param flowId the Flow ID of the Flow to get.
+     * @return the Flow if found, otherwise null.
+     */
+    @Override
+    public FlowPath getFlow(FlowId flowId) {
+	byte[] valueBytes = mapFlow.get(flowId.value());
+	if (valueBytes == null)
+	    return null;
+
+	Kryo kryo = kryoFactory.newKryo();
+	//
+	// Decode the value
+	//
+	Input input = new Input(valueBytes);
+	FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+	kryoFactory.deleteKryo(kryo);
+
+	return flowPath;
+    }
+
+    /**
      * Send a notification that a Flow is added.
      *
      * @param flowPath the Flow that is added.
@@ -702,6 +725,29 @@
     }
 
     /**
+     * Get a Flow Entry for a given Flow Entry ID.
+     *
+     * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
+     * @return the Flow Entry if found, otherwise null.
+     */
+    @Override
+    public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
+	byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
+	if (valueBytes == null)
+	    return null;
+
+	Kryo kryo = kryoFactory.newKryo();
+	//
+	// Decode the value
+	//
+	Input input = new Input(valueBytes);
+	FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
+	kryoFactory.deleteKryo(kryo);
+
+	return flowEntry;
+    }
+
+    /**
      * Send a notification that a FlowEntry is added.
      *
      * @param flowEntry the FlowEntry that is added.
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 9361341..034fe25 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -50,7 +50,7 @@
      * @param arpEventHandler The ARP event handler to de-register.
      */
     public void deregisterArpEventHandler(IArpEventHandler arpEventHandler);
-    
+
     /**
      * Get all Flows that are currently in the datagrid.
      *
@@ -59,6 +59,14 @@
     Collection<FlowPath> getAllFlows();
 
     /**
+     * Get a Flow for a given Flow ID.
+     *
+     * @param flowId the Flow ID of the Flow to get.
+     * @return the Flow if found, otherwise null.
+     */
+    FlowPath getFlow(FlowId flowId);
+
+    /**
      * Send a notification that a Flow is added.
      *
      * @param flowPath the Flow that is added.
@@ -92,6 +100,14 @@
     Collection<FlowEntry> getAllFlowEntries();
 
     /**
+     * Get a Flow Entry for a given Flow Entry ID.
+     *
+     * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
+     * @return the Flow Entry if found, otherwise null.
+     */
+    FlowEntry getFlowEntry(FlowEntryId flowEntryId);
+
+    /**
      * Send a notification that a FlowEntry is added.
      *
      * @param flowEntry the FlowEntry that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 04cde1d..8f72261 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -449,6 +449,16 @@
     }
 
     /**
+     * Inform the Flow Manager that a Flow Entry on switch expired.
+     *
+     * @param sw the switch the Flow Entry expired on.
+     * @param flowEntryId the Flow Entry ID of the expired Flow Entry.
+     */
+    public void flowEntryOnSwitchExpired(IOFSwitch sw, FlowEntryId flowEntryId) {
+	// TODO: Not implemented yet
+    }
+
+    /**
      * Push modified Flow Entries to switches.
      *
      * NOTE: Only the Flow Entries to switches controlled by this instance
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
index ba3a6e7..8d2b797 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
@@ -2,10 +2,12 @@
 
 import java.util.ArrayList;
 
+import net.floodlightcontroller.core.IOFSwitch;
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.onrc.onos.ofcontroller.topology.Topology;
 import net.onrc.onos.ofcontroller.util.CallerId;
 import net.onrc.onos.ofcontroller.util.DataPathEndpoints;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
 import net.onrc.onos.ofcontroller.util.FlowId;
 import net.onrc.onos.ofcontroller.util.FlowPath;
 
@@ -112,4 +114,12 @@
      * @return unique flow ID
      */
     public long getNextFlowEntryId();
+
+    /**
+     * Inform the Flow Manager that a Flow Entry on switch expired.
+     *
+     * @param sw the switch the Flow Entry expired on.
+     * @param flowEntryId the Flow Entry ID of the expired Flow Entry.
+     */
+    public void flowEntryOnSwitchExpired(IOFSwitch sw, FlowEntryId flowEntryId);
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index 69afc3d..461d231 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -22,6 +22,8 @@
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.floodlightcontroller.restserver.IRestApiService;
 import net.onrc.onos.ofcontroller.flowprogrammer.web.FlowProgrammerWebRoutable;
+import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
 import net.onrc.onos.registry.controller.IControllerRegistryService;
 
 /**
@@ -47,6 +49,7 @@
     protected volatile IFloodlightProviderService floodlightProvider;
     protected volatile IControllerRegistryService registryService;
     protected volatile IRestApiService restApi;
+    protected volatile IFlowService flowManager;
 
     protected FlowPusher pusher;
     private static final int NUM_PUSHER_THREAD = 1;
@@ -66,7 +69,7 @@
 	floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
 	registryService = context.getServiceImpl(IControllerRegistryService.class);
 	restApi = context.getServiceImpl(IRestApiService.class);
-
+	flowManager = context.getServiceImpl(IFlowService.class);
 	pusher.init(null, context, floodlightProvider.getOFMessageFactory(), null);
 	if (enableFlowSync) {
 	    synchronizer.init(pusher);
@@ -138,6 +141,8 @@
 	case FLOW_REMOVED:
 	    OFFlowRemoved flowMsg = (OFFlowRemoved) msg;
 	    log.debug("Got flow removed from "+ sw.getId() +": "+ flowMsg.getCookie());
+	    FlowEntryId id = new FlowEntryId(flowMsg.getCookie());
+	    flowManager.flowEntryOnSwitchExpired(sw, id);
 	    break;
 	default:
 	    break;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 526c727..e4aa319 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -5,6 +5,7 @@
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -146,6 +147,7 @@
 		private Map<IOFSwitch,SwitchQueue> queues
 			= new HashMap<IOFSwitch,SwitchQueue>();
 		
+		// reusable latch used for waiting for arrival of message
 		private Semaphore mutex = new Semaphore(0);
 		
 		@Override
@@ -160,14 +162,16 @@
 					return;
 				}
 				
-				Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
+				// for safety of concurrent access, copy all key objects
+				Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
 				synchronized (queues) {
-					entries = queues.entrySet();
+					for (IOFSwitch sw : queues.keySet()) {
+						keys.add(sw);
+					}
 				}
 				
-				for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
-					IOFSwitch sw = entry.getKey();
-					SwitchQueue queue = entry.getValue();
+				for (IOFSwitch sw : keys) {
+					SwitchQueue queue = queues.get(sw);
 
 					// Skip if queue is suspended
 					if (sw == null || queue == null ||
@@ -175,48 +179,62 @@
 						continue;
 					}
 					
-					// check sending rate and determine it to be sent or not
-					long current_time = System.currentTimeMillis();
-					long size = 0;
-					
 					synchronized (queue) {
-						if (queue.isSendable(current_time)) {
-							int i = 0;
-							while (! queue.isEmpty()) {
-								// Number of messages excess the limit
-								if (i >= MAX_MESSAGE_SEND) {
-									// Messages remains in queue
-									mutex.release();
-									break;
-								}
-								++i;
-								
-								OFMessage msg = queue.poll();
-								try {
-									messageDamper.write(sw, msg, context);
-									log.debug("Pusher sends message : {}", msg);
-									size += msg.getLength();
-								} catch (IOException e) {
-									e.printStackTrace();
-									log.error("Exception in sending message ({}) : {}", msg, e);
+						processQueue(sw, queue, MAX_MESSAGE_SEND);
+						if (queue.isEmpty()) {
+							// remove queue if flagged to be.
+							if (queue.toBeDeleted) {
+								synchronized (queues) {
+									queues.remove(sw);
 								}
 							}
-							sw.flush();
-							queue.logSentData(current_time, size);
-							
-							if (queue.isEmpty()) {
-								// remove queue if flagged to be.
-								if (queue.toBeDeleted) {
-									synchronized (queues) {
-										queues.remove(sw);
-									}
-								}
+						} else {
+							// if some messages remains in queue, latch down
+							if (mutex.availablePermits() == 0) {
+								mutex.release();
 							}
 						}
 					}
 				}
 			}
 		}
+		
+		/**
+		 * Read messages from queue and send them to the switch.
+		 * If number of messages excess the limit, stop sending messages.
+		 * @param sw Switch to which messages will be sent.
+		 * @param queue Queue of messages.
+		 * @param max_msg Limitation of number of messages to be sent. If set to 0,
+		 *                all messages in queue will be sent.
+		 */
+		private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
+			// check sending rate and determine it to be sent or not
+			long current_time = System.currentTimeMillis();
+			long size = 0;
+			
+			if (queue.isSendable(current_time)) {
+				int i = 0;
+				while (! queue.isEmpty()) {
+					// Number of messages excess the limit
+					if (0 < max_msg && max_msg <= i) {
+						break;
+					}
+					++i;
+					
+					OFMessage msg = queue.poll();
+					try {
+						messageDamper.write(sw, msg, context);
+						log.debug("Pusher sends message : {}", msg);
+						size += msg.getLength();
+					} catch (IOException e) {
+						e.printStackTrace();
+						log.error("Exception in sending message ({}) : {}", msg, e);
+					}
+				}
+				sw.flush();
+				queue.logSentData(current_time, size);
+			}
+		}
 	}
 	
 	/**
@@ -363,7 +381,7 @@
 		FlowPusherThread proc = getProcess(sw);
 		queue = new SwitchQueue();
 		queue.state = QueueState.READY;
-		synchronized (proc) {
+		synchronized (proc.queues) {
 			proc.queues.put(sw, queue);
 		}
 		
diff --git a/web/ons-demo/js/app.js b/web/ons-demo/js/app.js
index 94c41e2..d869de7 100644
--- a/web/ons-demo/js/app.js
+++ b/web/ons-demo/js/app.js
@@ -1,6 +1,17 @@
 /*global d3, document∆*/
 
 
+function updateFlow(model) {
+    model.flows.forEach(function (flow) {
+	flow.flowId = flow.flowId.value;
+	flow.installerId = flow.installerId.value;
+	flow.dstDpid = flow.dataPath.dstPort.dpid.value;
+	flow.srcDpid = flow.dataPath.srcPort.dpid.value;
+	flow.dstPort = flow.dataPath.dstPort.port.value;
+	flow.srcPort = flow.dataPath.srcPort.port.value;
+    });
+}
+
 function sync() {
 	var d = Date.now();
 
@@ -8,6 +19,7 @@
 //		console.log('Update time: ' + (Date.now() - d)/1000 + 's');
 
 		if (newModel) {
+			updateFlow(newModel);
 			var modelChanged = false;
 			var newModelString = JSON.stringify(newModel);
 			if (!modelString || newModelString != modelString) {
diff --git a/web/topology_rest.py b/web/topology_rest.py
index bac3113..b3a415e 100755
--- a/web/topology_rest.py
+++ b/web/topology_rest.py
@@ -720,8 +720,8 @@
 #    print "Debug: Controller command %s called %s" % (cmd, controller_name)
   else:
     # No longer use -i to specify keys (use .ssh/config to specify it)
-    start_onos="ssh %s ONOS/start-onos.sh start" % (controller_name)
-    stop_onos="ssh %s ONOS/start-onos.sh stop" % (controller_name)
+    start_onos="ssh %s \"cd ONOS; ./start-onos.sh start\"" % (controller_name)
+    stop_onos="ssh %s \"cd ONOS; ./start-onos.sh stop\"" % (controller_name)
 #    start_onos="ssh -i ~/.ssh/onlabkey.pem %s ONOS/start-onos.sh start" % (controller_name)
 #    stop_onos="ssh -i ~/.ssh/onlabkey.pem %s ONOS/start-onos.sh stop" % (controller_name)
 
@@ -960,7 +960,7 @@
   parsedResult = json.loads(result)
   if len(parsedResult) > 0:
     if parsedResult[-1].has_key('flowId'):
-      flow_nr = int(parsedResult[-1]['flowId'], 16)
+      flow_nr = int(parsedResult[-1]['flowId']['value'], 16)
   else:
     flow_nr = -1  # first flow
     print "first flow"