Merge pull request #462 from n-shiota/syncdev17

Added FlowProgrammer CLI tool and FlowPusher is enabled by default
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 8f72261..d3b5375 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -40,7 +40,7 @@
  */
 public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
     // flag to use FlowPusher instead of FlowSwitchOperation/MessageDamper
-    private final static boolean enableFlowPusher = false;
+    private final static boolean enableFlowPusher = true;
 
     protected GraphDBOperation dbHandlerApi;
     protected GraphDBOperation dbHandlerInner;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index c5db2ba..461d231 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -20,7 +20,8 @@
 import net.floodlightcontroller.core.module.FloodlightModuleException;
 import net.floodlightcontroller.core.module.IFloodlightModule;
 import net.floodlightcontroller.core.module.IFloodlightService;
-import net.onrc.onos.flow.IFlowManager;
+import net.floodlightcontroller.restserver.IRestApiService;
+import net.onrc.onos.ofcontroller.flowprogrammer.web.FlowProgrammerWebRoutable;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
 import net.onrc.onos.ofcontroller.util.FlowEntryId;
 import net.onrc.onos.registry.controller.IControllerRegistryService;
@@ -47,6 +48,7 @@
     protected static Logger log = LoggerFactory.getLogger(FlowProgrammer.class);
     protected volatile IFloodlightProviderService floodlightProvider;
     protected volatile IControllerRegistryService registryService;
+    protected volatile IRestApiService restApi;
     protected volatile IFlowService flowManager;
 
     protected FlowPusher pusher;
@@ -66,6 +68,7 @@
 	    throws FloodlightModuleException {
 	floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
 	registryService = context.getServiceImpl(IControllerRegistryService.class);
+	restApi = context.getServiceImpl(IRestApiService.class);
 	flowManager = context.getServiceImpl(IFlowService.class);
 	pusher.init(null, context, floodlightProvider.getOFMessageFactory(), null);
 	if (enableFlowSync) {
@@ -75,6 +78,7 @@
 
     @Override
     public void startUp(FloodlightModuleContext context) {
+    restApi.addRestletRoutable(new FlowProgrammerWebRoutable());
 	pusher.start();
 	floodlightProvider.addOFMessageListener(OFType.FLOW_REMOVED, this);
 	floodlightProvider.addOFSwitchListener(this);
@@ -109,6 +113,7 @@
 	Collection<Class<? extends IFloodlightService>> l =
 		new ArrayList<Class<? extends IFloodlightService>>();
 	l.add(IFloodlightProviderService.class);
+	l.add(IRestApiService.class);
 	return l;
     }
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 349f69d..e4aa319 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -366,6 +366,7 @@
 		}
 		
 		if (rate > 0) {
+			log.debug("rate for {} is set to {}", sw.getId(), rate);
 			queue.max_rate = rate;
 		}
 	}
@@ -1086,11 +1087,8 @@
 		
 		OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
 		msg.setXid(sw.getNextTransactionId());
-		add(sw, msg);
 
-		// TODO create Future object of message
 		OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
-
 		synchronized (barrierFutures) {
 			Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
 			if (map == null) {
@@ -1100,6 +1098,8 @@
 			map.put(msg.getXid(), future);
 		}
 		
+		add(sw, msg);
+		
 		return future;
 	}
 
@@ -1157,11 +1157,13 @@
 	public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
 		Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
 		if (map == null) {
+			log.debug("null map for {} : {}", sw.getId(), barrierFutures);
 			return Command.CONTINUE;
 		}
 		
 		OFBarrierReplyFuture future = map.get(msg.getXid());
 		if (future == null) {
+			log.debug("null future for {} : {}", msg.getXid(), map);
 			return Command.CONTINUE;
 		}
 		
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/FlowProgrammerWebRoutable.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/FlowProgrammerWebRoutable.java
new file mode 100644
index 0000000..21e5bfb
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/FlowProgrammerWebRoutable.java
@@ -0,0 +1,26 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import org.restlet.Context;
+import org.restlet.Restlet;
+import org.restlet.routing.Router;
+
+import net.floodlightcontroller.restserver.RestletRoutable;
+
+public class FlowProgrammerWebRoutable implements RestletRoutable {
+
+	@Override
+	public Restlet getRestlet(Context context) {
+		Router router = new Router(context);
+		router.attach("/setrate/{dpid}/{rate}/json", SetPushRateResource.class);
+		router.attach("/suspend/{dpid}/json", SuspendPusherResource.class);
+		router.attach("/resume/{dpid}/json", ResumePusherResource.class);
+		router.attach("/barrier/{dpid}/json", SendBarrierResource.class);
+		return router;
+	}
+
+	@Override
+	public String basePath() {
+		return "/wm/fprog";
+	}
+
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/PusherResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/PusherResource.java
new file mode 100644
index 0000000..4e1c0fc
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/PusherResource.java
@@ -0,0 +1,33 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import org.restlet.resource.ServerResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
+
+public class PusherResource extends ServerResource {
+    protected final static Logger log = LoggerFactory.getLogger(PusherResource.class);
+
+    protected IFloodlightProviderService provider;
+	protected IFlowPusherService pusher;
+	
+	protected boolean init() {
+    	provider = (IFloodlightProviderService)
+    			getContext().getAttributes().
+    			get(IFloodlightProviderService.class.getCanonicalName());
+    	if (provider == null) {
+		    log.debug("ONOS FloodlightProvider not found");
+		    return false;
+		}
+    	
+    	pusher = (IFlowPusherService)getContext().getAttributes().
+    			get(IFlowPusherService.class.getCanonicalName());
+    	if (pusher == null) {
+		    log.debug("ONOS FlowPusherService not found");
+		    return false;
+		}
+    	return true;
+	}
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/ResumePusherResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/ResumePusherResource.java
new file mode 100644
index 0000000..ca1ec00
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/ResumePusherResource.java
@@ -0,0 +1,41 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+
+/**
+ * FlowProgrammer REST API implementation: Resume sending message to switch.
+ *
+ *   GET /wm/fprog/resume/{dpid}/json"
+ */
+public class ResumePusherResource extends PusherResource {
+    /**
+     * Implement the API.
+     *
+     * @return true if succeeded, false if failed.
+     */
+    @Get("json")
+    public boolean retrieve() {
+    	if (! init()) {
+    		return false;
+    	}
+    	
+    	long dpid;
+    	try {
+    		dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
+    	} catch (NumberFormatException e) {
+    		log.error("Invalid number format");
+    		return false;
+    	}
+
+    	IOFSwitch sw = provider.getSwitches().get(dpid);
+    	if (sw == null) {
+    		log.error("Invalid dpid");
+    		return false;
+    	}
+    	
+    	return pusher.resume(sw);
+    }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SendBarrierResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SendBarrierResource.java
new file mode 100644
index 0000000..9c348ff
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SendBarrierResource.java
@@ -0,0 +1,41 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+
+/**
+ * FlowProgrammer REST API implementation: Send barrier message to switch.
+ *
+ *   GET /wm/fprog/barrier/{dpid}/json"
+ */
+public class SendBarrierResource extends PusherResource {
+    /**
+     * Implement the API.
+     *
+     * @return true if succeeded, false if failed.
+     */
+    @Get("json")
+    public OFBarrierReply retrieve() {
+    	if (! init()) {
+    		return null;
+    	}
+    	long dpid;
+    	try {
+    		dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
+    	} catch (NumberFormatException e) {
+    		log.error("Invalid number format");
+    		return null;
+    	}
+
+    	IOFSwitch sw = provider.getSwitches().get(dpid);
+    	if (sw == null) {
+    		log.error("Invalid dpid");
+    		return null;
+    	}
+    	
+    	return pusher.barrier(sw);
+    }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SetPushRateResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SetPushRateResource.java
new file mode 100644
index 0000000..08a728e
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SetPushRateResource.java
@@ -0,0 +1,47 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+
+/**
+ * FlowProgrammer REST API implementation: Set sending rate to the switch.
+ *
+ *   GET /wm/fprog/setrate/{dpid}/{rate}/json"
+ */
+public class SetPushRateResource extends PusherResource {
+
+    /**
+     * Implement the API.
+     *
+     * @return true if succeeded, false if failed.
+     */
+    @Get("json")
+    public boolean retrieve() {
+    	if (! init()) {
+    		return false;
+    	}
+    	
+    	long dpid;
+    	long rate;
+    	
+    	try {
+    		dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
+        	rate = Long.valueOf((String)getRequestAttributes().get("rate"));
+    	} catch (NumberFormatException e) {
+    		log.error("Invalid number format");
+    		return false;
+    	}
+
+    	IOFSwitch sw = provider.getSwitches().get(dpid);
+    	if (sw == null) {
+    		log.error("Invalid dpid");
+    		return false;
+    	}
+    	
+    	pusher.setRate(sw, rate);
+    	
+    	return true;
+    }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SuspendPusherResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SuspendPusherResource.java
new file mode 100644
index 0000000..39d245b
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SuspendPusherResource.java
@@ -0,0 +1,46 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FlowProgrammer REST API implementation: Suspend sending message to switch.
+ *
+ *   GET /wm/fprog/suspend/{dpid}/json"
+ */
+public class SuspendPusherResource extends PusherResource {
+
+    protected final static Logger log = LoggerFactory.getLogger(SetPushRateResource.class);
+
+    /**
+     * Implement the API.
+     *
+     * @return true if succeeded, false if failed.
+     */
+    @Get("json")
+    public boolean retrieve() {
+    	if (! init()) {
+    		return false;
+    	}
+    	
+    	long dpid;
+    	try {
+    		dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
+    	} catch (NumberFormatException e) {
+    		log.error("Invalid number format");
+    		return false;
+    	}
+
+    	IOFSwitch sw = provider.getSwitches().get(dpid);
+    	if (sw == null) {
+    		log.error("Invalid dpid");
+    		return false;
+    	}
+    	
+    	return pusher.suspend(sw);
+    }
+}
diff --git a/web/pusher.py b/web/pusher.py
new file mode 100755
index 0000000..f53e3ea
--- /dev/null
+++ b/web/pusher.py
@@ -0,0 +1,145 @@
+#! /usr/bin/env python
+
+
+import pprint
+import os
+import sys
+import subprocess
+import json
+import argparse
+import io
+import time
+
+from flask import Flask, json, Response, render_template, make_response, request
+
+## Global Var ##
+ControllerIP="127.0.0.1"
+ControllerPort=8080
+
+DEBUG=0
+pp = pprint.PrettyPrinter(indent=4)
+
+app = Flask(__name__)
+
+## Worker Functions ##
+def log_error(txt):
+  print '%s' % (txt)
+
+def debug(txt):
+  if DEBUG:
+    print '%s' % (txt)
+
+# @app.route("/wm/fprog/setrate/<dpid>/<rate>/json")
+# Sample output:
+#  "true"
+
+
+def set_rate(dpid,rate):
+  try:
+    command = "curl -s \"http://%s:%s/wm/fprog/setrate/%s/%s/json\"" % (ControllerIP, ControllerPort, dpid, rate)
+    debug("set_rate %s" % command)
+     
+    result = os.popen(command).read()
+    debug("result %s" % result)
+    if result == "false":
+      print "Failed to set rate"
+      return;
+  except:
+    log_error("Controller IF has issue")
+    exit(1)
+  
+  print "Sending rate to %s is successfully set to %s" % (dpid, rate)
+
+def suspend(dpid):
+  try:
+    command = "curl -s \"http://%s:%s/wm/fprog/suspend/%s/json\"" % (ControllerIP, ControllerPort, dpid)
+    debug("suspend %s" % command)
+     
+    result = os.popen(command).read()
+    debug("result %s" % result)
+    if result == "false":
+      print "Failed to suspend"
+      return;
+  except:
+    log_error("Controller IF has issue")
+    exit(1)
+  
+  print "DPID %s is successfully suspended" % dpid
+
+def resume(dpid):
+  try:
+    command = "curl -s \"http://%s:%s/wm/fprog/resume/%s/json\"" % (ControllerIP, ControllerPort, dpid)
+    debug("resume %s" % command)
+     
+    result = os.popen(command).read()
+    debug("result %s" % result)
+    if result == "false":
+      print "Failed to resume"
+      return;
+  except:
+    log_error("Controller IF has issue")
+    exit(1)
+  
+  print "DPID %s is successfully resumed" % dpid
+
+def barrier(dpid):
+  try:
+    command = "curl -s \"http://%s:%s/wm/fprog/barrier/%s/json\"" % (ControllerIP, ControllerPort, dpid)
+    debug("barrier %s" % command)
+     
+    result = os.popen(command).read()
+    debug("result %s" % result)
+    if result == "false":
+      print "Failed to send barrier"
+      return;
+  except:
+    log_error("Controller IF has issue")
+    exit(1)
+  
+  print "Barrier reply from %s : %s" % (dpid, result)
+
+
+if __name__ == "__main__":
+  usage_msg1 = "Usage:\n"
+  usage_msg2 = "%s rate <dpid> <rate> : Set sending rate[bytes/ms] to the switch\n" % (sys.argv[0])
+  usage_msg3 = "                   suspend <dpid>    : Suspend sending message to the switch\n"
+  usage_msg4 = "                   resume <dpid>     : Resume sending message to the switch\n"
+  usage_msg5 = "                   barrier <dpid>    : Send barrier message to the switch\n"
+  usage_msg = usage_msg1 + usage_msg2 + usage_msg3 + usage_msg4 + usage_msg5;
+
+  app.debug = True;
+
+  # Usage info
+  if len(sys.argv) > 1 and (sys.argv[1] == "-h" or sys.argv[1] == "--help"):
+    print(usage_msg)
+    exit(0)
+
+  # Check arguments
+  if len(sys.argv) < 2:
+    log_error(usage_msg)
+    exit(1)
+
+  # Do the work
+  if sys.argv[1] == "rate":
+    if len(sys.argv) < 4:
+      log_error(usage_msg)
+      exit(1)
+    set_rate(sys.argv[2], sys.argv[3])
+  elif sys.argv[1] == "suspend":
+    if len(sys.argv) < 3:
+      log_error(usage_msg)
+      exit(1)
+    suspend(sys.argv[2])
+  elif sys.argv[1] == "resume":
+    if len(sys.argv) < 3:
+      log_error(usage_msg)
+      exit(1)
+    resume(sys.argv[2])
+  elif sys.argv[1] == "barrier":
+    if len(sys.argv) < 3:
+      log_error(usage_msg)
+      exit(1)
+    barrier(sys.argv[2])
+  else:
+    log_error(usage_msg)
+    exit(1)