* Added initial support for add/get/delete Flow state to the Network MAP via the REST API
  NOTE: The "add" REST API can't be used as-is.
* Added initial support for reading the Flow state from the Network MAP by the Controller
  and sending it to the switches.
  Currently, the Controller reads periodically the Flow entries (every 3 seconds)
  NOTE: The writing of the OpenFlow state to the switches is not tested.

The Python scripts for to add/delete/get flows are intentionally omitted until
the "add" REST API issue is resolved.

NOTE: Two new keys have been added to the database: "flow_id" and "flow_entry_id".
This requires that the older database should be deleted, because Cassandra
doesn't allow adding new keys to an existing database.
diff --git a/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java b/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
index 12ed505..11519b7 100644
--- a/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
+++ b/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
@@ -1,10 +1,22 @@
 package net.floodlightcontroller.flowcache;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.INetMapStorage;
+import net.floodlightcontroller.core.INetMapTopologyObjects.IFlowEntry;
+import net.floodlightcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.floodlightcontroller.core.IOFSwitch;
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
 import net.floodlightcontroller.core.module.FloodlightModuleException;
 import net.floodlightcontroller.core.module.IFloodlightModule;
@@ -13,20 +25,178 @@
 import net.floodlightcontroller.flowcache.web.FlowWebRoutable;
 import net.floodlightcontroller.restserver.IRestApiService;
 import net.floodlightcontroller.util.CallerId;
+import net.floodlightcontroller.util.DataPath;
+import net.floodlightcontroller.util.Dpid;
 import net.floodlightcontroller.util.DataPathEndpoints;
+import net.floodlightcontroller.util.FlowEntry;
+import net.floodlightcontroller.util.FlowEntryId;
+import net.floodlightcontroller.util.FlowEntrySwitchState;
+import net.floodlightcontroller.util.FlowEntryUserState;
 import net.floodlightcontroller.util.FlowId;
 import net.floodlightcontroller.util.FlowPath;
+import net.floodlightcontroller.util.OFMessageDamper;
+import net.floodlightcontroller.util.Port;
+import net.onrc.onos.util.GraphDBConnection;
+import net.onrc.onos.util.GraphDBConnection.Transaction;
+
+import org.openflow.protocol.OFFlowMod;
+import org.openflow.protocol.OFMatch;
+import org.openflow.protocol.OFPacketOut;
+import org.openflow.protocol.OFType;
+import org.openflow.protocol.action.OFAction;
+import org.openflow.protocol.action.OFActionOutput;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FlowManager implements IFloodlightModule, IFlowService {
+public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
+
+    public GraphDBConnection conn;
 
     protected IRestApiService restApi;
+    protected IFloodlightProviderService floodlightProvider;
+
+    protected OFMessageDamper messageDamper;
+
+    protected static int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
+    protected static int OFMESSAGE_DAMPER_TIMEOUT = 250;	// ms
+    public static short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0;	// infinity
+    public static short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0;	// infinite
 
     /** The logger. */
-    private static Logger logger =
-	LoggerFactory.getLogger(FlowManager.class);
+    private static Logger log = LoggerFactory.getLogger(FlowManager.class);
+
+    // The periodic task(s)
+    private final ScheduledExecutorService scheduler =
+	Executors.newScheduledThreadPool(1);
+    final Runnable reader = new Runnable() {
+	    public void run() {
+		// log.debug("Reading Flow Entries from the Network Map...");
+		if (floodlightProvider == null) {
+		    log.debug("FloodlightProvider service not found!");
+		    return;
+		}
+
+		Map<Long, IOFSwitch> mySwitches = floodlightProvider.getSwitches();
+
+		// Fetch all Flow Entries
+		Iterable<IFlowEntry> flowEntries = conn.utils().getAllFlowEntries(conn);
+		for (IFlowEntry flowEntryObj : flowEntries) {
+		    FlowEntryId flowEntryId =
+			new FlowEntryId(flowEntryObj.getFlowEntryId());
+		    String userState = flowEntryObj.getUserState();
+		    String switchState = flowEntryObj.getSwitchState();
+
+		    log.debug("Found Flow Entry {}: ", flowEntryId.toString());
+		    log.debug("User State {}:", userState);
+		    log.debug("Switch State {}:", switchState);
+
+		    if (! switchState.equals("FE_SWITCH_NOT_UPDATED")) {
+			// Ignore the entry: nothing to do
+			continue;
+		    }
+
+		    Dpid dpid = new Dpid(flowEntryObj.getSwitchDpid());
+		    IOFSwitch mySwitch = mySwitches.get(dpid.value());
+		    if (mySwitch == null) {
+			log.debug("Flow Entry ignored: not my switch");
+			continue;
+		    }
+
+		    //
+		    // Create the Open Flow Flow Modification Entry to push
+		    //
+		    OFFlowMod fm =
+			(OFFlowMod) floodlightProvider.getOFMessageFactory()
+			.getMessage(OFType.FLOW_MOD);
+		    long cookie = flowEntryId.value();
+
+		    short flowModCommand = OFFlowMod.OFPFC_ADD;
+		    if (userState.equals("FE_USER_ADD")) {
+			flowModCommand = OFFlowMod.OFPFC_ADD;
+		    } else if (userState.equals("FE_USER_MODIFY")) {
+			flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
+		    } else if (userState.equals("FE_USER_DELETE")) {
+			flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
+		    } else {
+			// Unknown user state. Ignore the entry
+			continue;
+		    }
+
+		    OFMatch match = new OFMatch();
+		    match.setInputPort(flowEntryObj.getInPort());
+
+		    OFActionOutput action = new OFActionOutput();
+		    action.setMaxLength((short)0xffff);
+		    action.setPort(flowEntryObj.getOutPort());
+		    List<OFAction> actions = new ArrayList<OFAction>();
+		    actions.add(action);
+
+		    fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
+			.setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
+			.setBufferId(OFPacketOut.BUFFER_ID_NONE)
+			.setCookie(cookie)
+			.setCommand(flowModCommand)
+			.setMatch(match)
+			.setActions(actions)
+			.setLengthU(OFFlowMod.MINIMUM_LENGTH+OFActionOutput.MINIMUM_LENGTH);
+		    //
+		    // TODO: Set the following flag
+		    // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
+		    // See method ForwardingBase::pushRoute()
+		    //
+		    try {
+			messageDamper.write(mySwitch, fm, null);
+			mySwitch.flush();
+			flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
+			if (userState.equals("FE_USER_DELETE")) {
+			    // Delete the entry
+			    IFlowPath flowObj = null;
+			    flowObj = conn.utils().getFlowPathByFlowEntry(conn,
+					flowEntryObj);
+			    if (flowObj != null)
+				log.debug("Found FlowPath to be deleted");
+			    else
+				log.debug("Did not find FlowPath to be deleted");
+			    flowObj.removeFlowEntry(flowEntryObj);
+			    conn.utils().removeFlowEntry(conn, flowEntryObj);
+
+			    // Test whether the last flow entry
+			    Iterable<IFlowEntry> tmpflowEntries =
+				flowObj.getFlowEntries();
+			    boolean found = false;
+			    for (IFlowEntry tmpflowEntryObj : tmpflowEntries) {
+				found = true;
+				break;
+			    }
+			    if (! found) {
+				// Remove the Flow Path as well
+				conn.utils().removeFlowPath(conn, flowObj);
+			    }
+			}
+		    } catch (IOException e) {
+			log.error("Failure writing flow mod from network map", e);
+		    }
+		}
+		conn.endTx(Transaction.COMMIT);
+	    }
+	};
+    final ScheduledFuture<?> readerHandle =
+	scheduler.scheduleAtFixedRate(reader, 3, 3, TimeUnit.SECONDS);
+
+    @Override
+    public void init(String conf) {
+	conn = GraphDBConnection.getInstance(conf);
+    }
+
+    public void finalize() {
+	close();
+    }
+
+    @Override
+    public void close() {
+	conn.close();
+    }
 
     @Override
     public Collection<Class<? extends IFloodlightService>> getModuleServices() {
@@ -52,6 +222,7 @@
                                                     getModuleDependencies() {
 	Collection<Class<? extends IFloodlightService>> l =
 	    new ArrayList<Class<? extends IFloodlightService>>();
+	l.add(IFloodlightProviderService.class);
 	l.add(IRestApiService.class);
         return l;
     }
@@ -59,7 +230,14 @@
     @Override
     public void init(FloodlightModuleContext context)
 	throws FloodlightModuleException {
+	floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
 	restApi = context.getServiceImpl(IRestApiService.class);
+	messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+					    EnumSet.of(OFType.FLOW_MOD),
+					    OFMESSAGE_DAMPER_TIMEOUT);
+	// TODO: An ugly hack!
+	String conf = "/tmp/cassandra.titan";
+	this.init(conf);
     }
 
     @Override
@@ -79,7 +257,135 @@
      */
     @Override
     public boolean addFlow(FlowPath flowPath, FlowId flowId) {
-	// TODO
+
+	//
+	// Assign the FlowEntry IDs
+	// TODO: This is an ugly hack!
+	// The Flow Entry IDs are set to 1000*FlowId + Index
+	//
+	int i = 1;
+	for (FlowEntry flowEntry : flowPath.dataPath().flowEntries()) {
+	    long id = flowPath.flowId().value() * 1000 + i;
+	    ++i;
+	    flowEntry.setFlowEntryId(new FlowEntryId(id));
+	}
+
+	IFlowPath flowObj = null;
+	try {
+	    if ((flowObj = conn.utils().searchFlowPath(conn, flowPath.flowId()))
+		!= null) {
+		log.debug("Adding FlowPath with FlowId {}: found existing FlowPath",
+			  flowPath.flowId().toString());
+	    } else {
+		flowObj = conn.utils().newFlowPath(conn);
+		log.debug("Adding FlowPath with FlowId {}: creating new FlowPath",
+			  flowPath.flowId().toString());
+	    }
+	} catch (Exception e) {
+	    // TODO: handle exceptions
+	    conn.endTx(Transaction.ROLLBACK);
+	    log.error(":addFlow FlowId:{} failed",
+		      flowPath.flowId().toString());
+	}
+	if (flowObj == null)
+	    return false;
+
+	//
+	// Set the Flow key:
+	// - flowId
+	//
+	flowObj.setFlowId(flowPath.flowId().toString());
+	flowObj.setType("flow");
+
+	//
+	// Set the Flow attributes:
+	// - flowPath.installerId()
+	// - flowPath.dataPath().srcPort()
+	// - flowPath.dataPath().dstPort()
+	//
+	flowObj.setInstallerId(flowPath.installerId().toString());
+	flowObj.setSrcSwitch(flowPath.dataPath().srcPort().dpid().toString());
+	flowObj.setSrcPort(flowPath.dataPath().srcPort().port().value());
+	flowObj.setDstSwitch(flowPath.dataPath().dstPort().dpid().toString());
+	flowObj.setDstPort(flowPath.dataPath().dstPort().port().value());
+
+	// Flow edges:
+	//   HeadFE
+
+
+	//
+	// Flow Entries:
+	// flowPath.dataPath().flowEntries()
+	//
+	for (FlowEntry flowEntry : flowPath.dataPath().flowEntries()) {
+	    IFlowEntry flowEntryObj = null;
+	    boolean found = false;
+	    try {
+		if ((flowEntryObj = conn.utils().searchFlowEntry(conn, flowEntry.flowEntryId())) != null) {
+		    log.debug("Adding FlowEntry with FlowEntryId {}: found existing FlowEntry",
+			      flowEntry.flowEntryId().toString());
+		    found = true;
+		} else {
+		    flowEntryObj = conn.utils().newFlowEntry(conn);
+		    log.debug("Adding FlowEntry with FlowEntryId {}: creating new FlowEntry",
+			      flowEntry.flowEntryId().toString());
+		}
+	    } catch (Exception e) {
+		// TODO: handle exceptions
+		conn.endTx(Transaction.ROLLBACK);
+		log.error(":addFlow FlowEntryId:{} failed",
+			  flowEntry.flowEntryId().toString());
+	    }
+	    if (flowEntryObj == null)
+		return false;
+
+	    //
+	    // Set the Flow Entry key:
+	    // - flowEntry.flowEntryId()
+	    //
+	    flowEntryObj.setFlowEntryId(flowEntry.flowEntryId().toString());
+	    flowEntryObj.setType("flow_entry");
+
+	    // 
+	    // Set the Flow Entry attributes:
+	    // - flowEntry.flowEntryMatch()
+	    // - flowEntry.flowEntryActions()
+	    // - flowEntry.dpid()
+	    // - flowEntry.inPort()
+	    // - flowEntry.outPort()
+	    // - flowEntry.flowEntryUserState()
+	    // - flowEntry.flowEntrySwitchState()
+	    // - flowEntry.flowEntryErrorState()
+	    //
+	    flowEntryObj.setSwitchDpid(flowEntry.dpid().toString());
+	    flowEntryObj.setInPort(flowEntry.inPort().value());
+	    flowEntryObj.setOutPort(flowEntry.outPort().value());
+	    // TODO: Hacks with hard-coded state names!
+	    if (found)
+		flowEntryObj.setUserState("FE_USER_MODIFY");
+	    else
+		flowEntryObj.setUserState("FE_USER_ADD");
+	    flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
+	    //
+	    // TODO: Take care of the FlowEntryMatch, FlowEntryActions,
+	    // and FlowEntryErrorState.
+	    //
+
+	    // Flow Entries edges:
+	    //   Flow
+	    //   NextFE
+	    //   InPort
+	    //   OutPort
+	    //   Switch
+	    if (! found)
+		flowObj.addFlowEntry(flowEntryObj);
+	}
+	conn.endTx(Transaction.COMMIT);
+
+	//
+	// TODO: We need a proper Flow ID allocation mechanism.
+	//
+	flowId.setValue(flowPath.flowId().value());
 	return true;
     }
 
@@ -91,7 +397,46 @@
      */
     @Override
     public boolean deleteFlow(FlowId flowId) {
-	// TODO
+	IFlowPath flowObj = null;
+	//
+	// We just mark the entries for deletion,
+	// and let the switches remove each individual entry after
+	// it has been removed from the switches.
+	//
+	try {
+	    if ((flowObj = conn.utils().searchFlowPath(conn, flowId))
+		!= null) {
+		log.debug("Deleting FlowPath with FlowId {}: found existing FlowPath",
+			  flowId.toString());
+	    } else {
+		log.debug("Deleting FlowPath with FlowId {}:  FlowPath not found",
+			  flowId.toString());
+	    }
+	} catch (Exception e) {
+	    // TODO: handle exceptions
+	    conn.endTx(Transaction.ROLLBACK);
+	    log.error(":deleteFlow FlowId:{} failed", flowId.toString());
+	}
+	if (flowObj == null)
+	    return true;		// OK: No such flow
+
+	//
+	// Find and mark for deletion all Flow Entries
+	//
+	Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
+	boolean empty = true;	// TODO: an ugly hack
+	for (IFlowEntry flowEntryObj : flowEntries) {
+	    empty = false;
+	    // flowObj.removeFlowEntry(flowEntryObj);
+	    // conn.utils().removeFlowEntry(conn, flowEntryObj);
+	    flowEntryObj.setUserState("FE_USER_DELETE");
+	    flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
+	}
+	// Remove from the database empty flows
+	if (empty)
+	    conn.utils().removeFlowPath(conn, flowObj);
+	conn.endTx(Transaction.COMMIT);
+
 	return true;
     }
 
@@ -99,13 +444,62 @@
      * Get a previously added flow.
      *
      * @param flowId the Flow ID of the flow to get.
-     * @param flowPath the return-by-reference flow path.
-     * @return true on success, otherwise false.
+     * @return the Flow Path if found, otherwise null.
      */
     @Override
-    public boolean getFlow(FlowId flowId, FlowPath flowPath) {
-	// TODO
-	return true;
+    public FlowPath getFlow(FlowId flowId) {
+	IFlowPath flowObj = null;
+	try {
+	    if ((flowObj = conn.utils().searchFlowPath(conn, flowId))
+		!= null) {
+		log.debug("Get FlowPath with FlowId {}: found existing FlowPath",
+			  flowId.toString());
+	    } else {
+		log.debug("Get FlowPath with FlowId {}:  FlowPath not found",
+			  flowId.toString());
+	    }
+	} catch (Exception e) {
+	    // TODO: handle exceptions
+	    conn.endTx(Transaction.ROLLBACK);
+	    log.error(":getFlow FlowId:{} failed", flowId.toString());
+	}
+	if (flowObj == null)
+	    return null;		// Flow not found
+
+	//
+	// Extract the Flow state
+	//
+	FlowPath flowPath = new FlowPath();
+	flowPath.setFlowId(new FlowId(flowObj.getFlowId()));
+	flowPath.setInstallerId(new CallerId(flowObj.getInstallerId()));
+	flowPath.dataPath().srcPort().setDpid(new Dpid(flowObj.getSrcSwitch()));
+	flowPath.dataPath().srcPort().setPort(new Port(flowObj.getSrcPort()));
+	flowPath.dataPath().dstPort().setDpid(new Dpid(flowObj.getDstSwitch()));
+	flowPath.dataPath().dstPort().setPort(new Port(flowObj.getDstPort()));
+
+	//
+	// Extract all Flow Entries
+	//
+	Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
+	for (IFlowEntry flowEntryObj : flowEntries) {
+	    FlowEntry flowEntry = new FlowEntry();
+	    flowEntry.setFlowEntryId(new FlowEntryId(flowEntryObj.getFlowEntryId()));
+	    flowEntry.setDpid(new Dpid(flowEntryObj.getSwitchDpid()));
+	    flowEntry.setInPort(new Port(flowEntryObj.getInPort()));
+	    flowEntry.setOutPort(new Port(flowEntryObj.getOutPort()));
+	    String userState = flowEntryObj.getUserState();
+	    flowEntry.setFlowEntryUserState(FlowEntryUserState.valueOf(userState));
+	    String switchState = flowEntryObj.getSwitchState();
+	    flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.valueOf(switchState));
+	    //
+	    // TODO: Take care of the FlowEntryMatch, FlowEntryActions,
+	    // and FlowEntryErrorState.
+	    //
+	    flowPath.dataPath().flowEntries().add(flowEntry);
+	}
+	conn.endTx(Transaction.COMMIT);
+
+	return flowPath;
     }
 
     /**
@@ -114,36 +508,35 @@
      *
      * @param installerId the Caller ID of the installer of the flow to get.
      * @param dataPathEndpoints the data path endpoints of the flow to get.
-     * @param flowPath the return-by-reference flow path.
-     * @return true on success, otherwise false.
+     * @return the Flow Path if found, otherwise null.
      */
     @Override
-    public boolean getFlow(CallerId installerId,
-			   DataPathEndpoints dataPathEndpoints,
-			   FlowPath flowPath) {
+    public FlowPath getFlow(CallerId installerId,
+			    DataPathEndpoints dataPathEndpoints) {
 	// TODO
-	return true;
+	return null;
     }
 
     /**
      * Get all installed flows by all installers for given data path endpoints.
      *
      * @param dataPathEndpoints the data path endpoints of the flows to get.
-     * @param flowPaths the return-by-reference list of flows.
+     * @return the Flow Paths if found, otherwise null.
      */
     @Override
-    public void getAllFlows(DataPathEndpoints dataPathEndpoints,
-			    ArrayList<FlowPath> flowPaths) {
+    public ArrayList<FlowPath> getAllFlows(DataPathEndpoints dataPathEndpoints) {
 	// TODO
+	return null;
     }
 
     /**
      * Get all installed flows by all installers.
      *
-     * @param flowPaths the return-by-reference list of flows.
+     * @return the Flow Paths if found, otherwise null.
      */
     @Override
-    public void getAllFlows(ArrayList<FlowPath> flowPaths) {
+    public ArrayList<FlowPath> getAllFlows() {
 	// TODO
+	return null;
     }
 }