Introduce method to write msg and get response through OpenFlow.

Change-Id: Ibc92985a98a3b9a178e474f55c117e08b05a8d44
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index 1b53bbd..cde105f 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -77,6 +77,7 @@
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -146,6 +147,10 @@
     protected ConcurrentMap<Dpid, OpenFlowSwitch> activeEqualSwitches =
             new ConcurrentHashMap<>();
 
+    // Key: dpid, value: map with key: long (XID), value: completable future
+    protected ConcurrentMap<Dpid, ConcurrentMap<Long, CompletableFuture<OFMessage>>> responses =
+            new ConcurrentHashMap<>();
+
     protected OpenFlowSwitchAgent agent = new OpenFlowSwitchAgent();
     protected Set<OpenFlowSwitchListener> ofSwitchListener = new CopyOnWriteArraySet<>();
 
@@ -285,6 +290,19 @@
     }
 
     @Override
+    public CompletableFuture<OFMessage> writeResponse(Dpid dpid, OFMessage msg) {
+        write(dpid, msg);
+
+        ConcurrentMap<Long, CompletableFuture<OFMessage>> xids =
+                responses.computeIfAbsent(dpid, k -> new ConcurrentHashMap());
+
+        CompletableFuture<OFMessage> future = new CompletableFuture();
+        xids.put(msg.getXid(), future);
+
+        return future;
+    }
+
+    @Override
     public void processPacket(Dpid dpid, OFMessage msg) {
         Collection<OFFlowStatsEntry> flowStats;
         Collection<OFTableStatsEntry> tableStats;
@@ -294,6 +312,12 @@
 
         OpenFlowSwitch sw = this.getSwitch(dpid);
 
+        // Check if someone is waiting for this message
+        ConcurrentMap<Long, CompletableFuture<OFMessage>> xids = responses.get(dpid);
+        if (xids != null && xids.containsKey(msg.getXid())) {
+            xids.remove(msg.getXid()).complete(msg);
+        }
+
         switch (msg.getType()) {
         case PORT_STATUS:
             for (OpenFlowSwitchListener l : ofSwitchListener) {