Introduce method to write msg and get response through OpenFlow.
Change-Id: Ibc92985a98a3b9a178e474f55c117e08b05a8d44
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
index df85811..43dd80b 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
@@ -17,6 +17,8 @@
import org.projectfloodlight.openflow.protocol.OFMessage;
+import java.util.concurrent.CompletableFuture;
+
/**
* Abstraction of an OpenFlow controller. Serves as a one stop
* shop for obtaining OpenFlow devices and (un)register listeners
@@ -128,6 +130,15 @@
void write(Dpid dpid, OFMessage msg);
/**
+ * Send a message to a particular switch and return the future response.
+ *
+ * @param dpid the switch to send to
+ * @param msg the message to send
+ * @return future for response message
+ */
+ CompletableFuture<OFMessage> writeResponse(Dpid dpid, OFMessage msg);
+
+ /**
* Process a message and notify the appropriate listeners.
*
* @param dpid the dpid the message arrived on
diff --git a/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java b/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java
index 20e2aed..26122e3 100644
--- a/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java
+++ b/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java
@@ -17,6 +17,8 @@
import org.projectfloodlight.openflow.protocol.OFMessage;
+import java.util.concurrent.CompletableFuture;
+
/**
* Test adapter for the OpenFlow controller interface.
*/
@@ -82,6 +84,11 @@
}
@Override
+ public CompletableFuture<OFMessage> writeResponse(Dpid dpid, OFMessage msg) {
+ return null;
+ }
+
+ @Override
public void processPacket(Dpid dpid, OFMessage msg) {
}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index 1b53bbd..cde105f 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -77,6 +77,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -146,6 +147,10 @@
protected ConcurrentMap<Dpid, OpenFlowSwitch> activeEqualSwitches =
new ConcurrentHashMap<>();
+ // Key: dpid, value: map with key: long (XID), value: completable future
+ protected ConcurrentMap<Dpid, ConcurrentMap<Long, CompletableFuture<OFMessage>>> responses =
+ new ConcurrentHashMap<>();
+
protected OpenFlowSwitchAgent agent = new OpenFlowSwitchAgent();
protected Set<OpenFlowSwitchListener> ofSwitchListener = new CopyOnWriteArraySet<>();
@@ -285,6 +290,19 @@
}
@Override
+ public CompletableFuture<OFMessage> writeResponse(Dpid dpid, OFMessage msg) {
+ write(dpid, msg);
+
+ ConcurrentMap<Long, CompletableFuture<OFMessage>> xids =
+ responses.computeIfAbsent(dpid, k -> new ConcurrentHashMap());
+
+ CompletableFuture<OFMessage> future = new CompletableFuture();
+ xids.put(msg.getXid(), future);
+
+ return future;
+ }
+
+ @Override
public void processPacket(Dpid dpid, OFMessage msg) {
Collection<OFFlowStatsEntry> flowStats;
Collection<OFTableStatsEntry> tableStats;
@@ -294,6 +312,12 @@
OpenFlowSwitch sw = this.getSwitch(dpid);
+ // Check if someone is waiting for this message
+ ConcurrentMap<Long, CompletableFuture<OFMessage>> xids = responses.get(dpid);
+ if (xids != null && xids.containsKey(msg.getXid())) {
+ xids.remove(msg.getXid()).complete(msg);
+ }
+
switch (msg.getType()) {
case PORT_STATUS:
for (OpenFlowSwitchListener l : ofSwitchListener) {