added notifications to add/delete flow and added getFlowsForSwitches to ParallelFlowDatabaseOperation.java
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
index 3264c0a..f24d8b6 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
@@ -2,18 +2,23 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import net.onrc.onos.datagrid.IDatagridService;
 import net.onrc.onos.graph.GraphDBOperation;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.onrc.onos.ofcontroller.util.Dpid;
 import net.onrc.onos.ofcontroller.util.FlowId;
 import net.onrc.onos.ofcontroller.util.FlowPath;
 
@@ -60,6 +65,58 @@
 	return flowPaths;	
     }
     
+    /*
+     * params: collection of dpid
+     * 
+     * return: all flows where src dpid is in collection
+     * 
+     * Note: this function is implemented naively and inefficiently
+     */
+    static ArrayList<FlowPath> getFlowsForSwitches(GraphDBOperation dbHandler, Collection<Dpid> switches) {
+	Iterable<IFlowPath> flowPathsObj = null;
+	ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+	try {
+	    flowPathsObj = dbHandler.getAllFlowPaths();
+	} catch (Exception e) {
+	    // TODO: handle exceptions
+	    dbHandler.rollback();
+	    log.error(":getAllFlowPaths failed");
+	    return flowPaths;
+	}
+	if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+	    dbHandler.commit();
+	    return flowPaths;	// No Flows found
+	}
+	
+	// convert the collection of switch dpids into a set of strings
+	Set<String> switchSet = new HashSet<>();
+	for(Dpid dpid : switches) {
+	    switchSet.add(dpid.toString());
+	}
+	
+	CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
+	int numTasks = 0;
+	for(IFlowPath flowObj : flowPathsObj) {
+	    if(switchSet.contains(flowObj.getSrcSwitch())) {
+		tasks.submit(new ExtractFlowTask(flowObj));
+		numTasks++;
+	    }
+	}
+	for(int i = 0; i < numTasks; i++) {
+	    try {
+		FlowPath flowPath = tasks.take().get();
+		if(flowPath != null) {
+		    flowPaths.add(flowPath);
+		}
+	    } catch (InterruptedException | ExecutionException e) {
+		log.error("Error reading FlowPath from IFlowPath object");
+	    }
+	}
+	dbHandler.commit();
+	return flowPaths;	
+    }
+    
     private final static class ExtractFlowTask implements Callable<FlowPath> {
 	private final IFlowPath flowObj;
 	
@@ -73,7 +130,7 @@
     }
 
     static ArrayList<FlowPath> getFlows(GraphDBOperation dbHandler,
-		  				  Collection<FlowId> flowIds) {
+		  			Collection<FlowId> flowIds) {
 	ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
 
 	CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
@@ -112,29 +169,65 @@
     }
     
     static boolean addFlow(GraphDBOperation dbHandler, FlowPath flowPath) {
-	executor.submit(new AddFlowTask(dbHandler, flowPath));
+	Future<Boolean> result = executor.submit(new AddFlowTask(dbHandler, flowPath, null));
+	// NOTE: This function is blocking
+	try {
+	    return result.get();
+	} catch (InterruptedException | ExecutionException e) {
+	    return false;
+	}
+    }
+    
+    static boolean addFlow(GraphDBOperation dbHandler, FlowPath flowPath, IDatagridService datagridService) {
+	executor.submit(new AddFlowTask(dbHandler, flowPath, datagridService));
 	// TODO: If we need the results, submit returns a Future that contains
 	// the result. 
 	return true;
+
     }
     
     private final static class AddFlowTask implements Callable<Boolean> {
 	private final GraphDBOperation dbHandler;
 	private final FlowPath flowPath;
+	private final IDatagridService datagridService;
 	
-	AddFlowTask(GraphDBOperation dbHandler, FlowPath flowPath) {
+	AddFlowTask(GraphDBOperation dbHandler, 
+		    FlowPath flowPath,
+		    IDatagridService datagridService) {
 	    this.dbHandler = dbHandler;
 	    this.flowPath = flowPath;
+	    this.datagridService = datagridService;
 	}
 	
 	@Override
 	public Boolean call() throws Exception {
-	    return FlowDatabaseOperation.addFlow(dbHandler, flowPath);
+	    boolean success = FlowDatabaseOperation.addFlow(dbHandler, flowPath);
+	    if(success) {
+		if(datagridService != null) {
+		    datagridService.notificationSendFlowIdAdded(flowPath.flowId(),
+			    flowPath.dataPath().srcPort().dpid());
+		}
+	    }
+	    else {
+		log.error("Error adding flow path {} to database", flowPath);
+	    }
+	    return success;
+
 	}
     }
 
     static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId) {
-	executor.submit(new DeleteFlowTask(dbHandler, flowId));
+	Future<Boolean> result = executor.submit(new DeleteFlowTask(dbHandler, flowId, null));
+	// NOTE: This function is blocking
+	try {
+	    return result.get();
+	} catch (InterruptedException | ExecutionException e) {
+	    return false;
+	}
+    }    
+    
+    static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
+	executor.submit(new DeleteFlowTask(dbHandler, flowId, datagridService));
 	// TODO: If we need the results, submit returns a Future that contains
 	// the result. 
 	return true;
@@ -143,14 +236,25 @@
     private final static class DeleteFlowTask implements Callable<Boolean> {
 	private final GraphDBOperation dbHandler;
 	private final FlowId flowId;
+	private final IDatagridService datagridService;
 	
-	DeleteFlowTask(GraphDBOperation dbHandler, FlowId flowId) {
+	DeleteFlowTask(GraphDBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
 	    this.dbHandler = dbHandler;
 	    this.flowId = flowId;
+	    this.datagridService = datagridService;
 	}
 	@Override
 	public Boolean call() throws Exception{
-	    return FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
+	    boolean success = FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
+	    if(success) {
+		if(datagridService != null) {
+		    datagridService.notificationSendFlowIdRemoved(flowId);
+		}
+	    }
+	    else {
+		log.error("Error removing flow path {} from database", flowId);
+	    }
+	    return success;
 	}
     }
 }