ONOS-3350 AbstractOFSwitch race fixes
Protecting callers with synchronized block during
role request and reply methods.
Change-Id: Ie82f84d1d462923c9f410e6950e846ee3b05551c
diff --git a/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java b/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
index 51a2ce4..b6ec574 100644
--- a/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
+++ b/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
@@ -29,14 +29,10 @@
/**
* Writes the message to the driver.
- *
- * Note:
- * Calling {@link #sendMsg(OFMessage)} does NOT guarantee the messages to be
- * transmitted on the wire in order, especially during role transition.
- * The messages may be reordered at the switch side.
- *
- * Calling {@link #sendMsg(List)} guarantee the messages inside the list
- * to be transmitted on the wire in order.
+ * <p>
+ * Note: Messages may be silently dropped/lost due to IOExceptions or
+ * role. If this is a concern, then a caller should use barriers.
+ * </p>
*
* @param msg the message to write
*/
@@ -44,6 +40,10 @@
/**
* Writes the OFMessage list to the driver.
+ * <p>
+ * Note: Messages may be silently dropped/lost due to IOExceptions or
+ * role. If this is a concern, then a caller should use barriers.
+ * </p>
*
* @param msgs the messages to be written
*/
diff --git a/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java b/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
index 2c19837..c717419 100644
--- a/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
+++ b/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
@@ -16,6 +16,7 @@
package org.onosproject.openflow.controller.driver;
+import com.google.common.collect.Lists;
import org.jboss.netty.channel.Channel;
import org.onlab.packet.IpAddress;
import org.onosproject.net.Device;
@@ -46,6 +47,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
@@ -74,12 +76,14 @@
private RoleHandler roleMan;
- protected RoleState role;
+ // TODO this is accessed from multiple threads, but volatile may have performance implications
+ protected volatile RoleState role;
protected OFFeaturesReply features;
protected OFDescStatsReply desc;
- List<OFMessage> messagesPendingMastership;
+ private final AtomicReference<List<OFMessage>> messagesPendingMastership
+ = new AtomicReference<>();
@Override
public void init(Dpid dpid, OFDescStatsReply desc, OFVersion ofv) {
@@ -104,15 +108,53 @@
@Override
public final void sendMsg(List<OFMessage> msgs) {
- if (role == RoleState.MASTER && channel.isConnected()) {
+ /*
+ It is possible that in this block, we transition to SLAVE/EQUAL.
+ If this is the case, the supplied messages will race with the
+ RoleRequest message, and they could be rejected by the switch.
+ In the interest of performance, we will not protect this block with
+ a synchronization primitive, because the message would have just been
+ dropped anyway.
+ */
+ if (role == RoleState.MASTER) {
+ // fast path send when we are master
+
+ sendMsgsOnChannel(msgs);
+ return;
+ }
+ // check to see if mastership transition is in progress
+ synchronized (messagesPendingMastership) {
+ /*
+ messagesPendingMastership is used as synchronization variable for
+ all mastership related changes. In this block, mastership (including
+ role update) will have either occurred or not.
+ */
+ if (role == RoleState.MASTER) {
+ // transition to MASTER complete, send messages
+ sendMsgsOnChannel(msgs);
+ return;
+ }
+
+ List<OFMessage> messages = messagesPendingMastership.get();
+ if (messages != null) {
+ // we are transitioning to MASTER, so add messages to queue
+ messages.addAll(msgs);
+ log.debug("Enqueue message for switch {}. queue size after is {}",
+ dpid, messages.size());
+ } else {
+ // not transitioning to MASTER
+ log.warn("Dropping message for switch {} (role: {}, connected: {}): {}",
+ dpid, role, channel.isConnected(), msgs);
+ }
+ }
+ }
+
+ private void sendMsgsOnChannel(List<OFMessage> msgs) {
+ if (channel.isConnected()) {
channel.write(msgs);
- } else if (messagesPendingMastership != null) {
- messagesPendingMastership.addAll(msgs);
- log.debug("Enqueue message for switch {}. queue size after is {}",
- dpid, messagesPendingMastership.size());
} else {
- log.warn("Dropping message for switch {} (role: {}, connected: {}): {}",
- dpid, role, channel.isConnected(), msgs);
+ log.warn("Dropping messages for switch {} because channel is not connected: {}",
+ dpid, msgs);
}
}
@@ -120,7 +162,7 @@
public final void sendRoleRequest(OFMessage msg) {
if (msg instanceof OFRoleRequest ||
msg instanceof OFNiciraControllerRoleRequest) {
- channel.write(Collections.singletonList(msg));
+ sendMsgsOnChannel(Collections.singletonList(msg));
return;
}
throw new IllegalArgumentException("Someone is trying to send " +
@@ -130,7 +172,7 @@
@Override
public final void sendHandshakeMessage(OFMessage message) {
if (!this.isDriverHandshakeComplete()) {
- channel.write(Collections.singletonList(message));
+ sendMsgsOnChannel(Collections.singletonList(message));
}
}
@@ -239,11 +281,16 @@
@Override
public final void transitionToMasterSwitch() {
this.agent.transitionToMasterSwitch(dpid);
- if (messagesPendingMastership != null) {
- this.sendMsg(messagesPendingMastership);
- log.debug("Sending {} pending messages to switch {}",
- messagesPendingMastership.size(), dpid);
- messagesPendingMastership = null;
+ synchronized (messagesPendingMastership) {
+ List<OFMessage> messages = messagesPendingMastership.get();
+ if (messages != null) {
+ this.sendMsg(messages);
+ log.debug("Sending {} pending messages to switch {}",
+ messages.size(), dpid);
+ messagesPendingMastership.set(null);
+ }
+ // perform role transition after clearing messages queue
+ this.role = RoleState.MASTER;
}
}
@@ -287,17 +334,27 @@
@Override
public void setRole(RoleState role) {
try {
+ if (role == RoleState.SLAVE || role == RoleState.EQUAL) {
+ // perform role transition to SLAVE/EQUAL before sending role request
+ this.role = role;
+ }
if (this.roleMan.sendRoleRequest(role, RoleRecvStatus.MATCHED_SET_ROLE)) {
log.debug("Sending role {} to switch {}", role, getStringId());
- if (role == RoleState.SLAVE || role == RoleState.EQUAL) {
- this.role = role;
- } else {
- if (messagesPendingMastership == null) {
- log.debug("Initializing new queue for switch {}", dpid);
- messagesPendingMastership = new ArrayList<>();
+ if (role == RoleState.MASTER) {
+ synchronized (messagesPendingMastership) {
+ if (messagesPendingMastership.get() == null) {
+ log.debug("Initializing new message queue for switch {}", dpid);
+ /*
+ The presence of messagesPendingMastership indicates that
+ a switch is currently transitioning to MASTER, but
+ is still awaiting role reply from switch.
+ */
+ messagesPendingMastership.set(Lists.newArrayList());
+ }
}
}
- } else {
+ } else if (role == RoleState.MASTER) {
+ // role request not support; transition switch to MASTER
this.role = role;
}
} catch (IOException e) {
@@ -307,6 +364,7 @@
@Override
public void reassertRole() {
+ // TODO should messages be sent directly or queue during reassertion?
if (this.getRole() == RoleState.MASTER) {
log.warn("Received permission error from switch {} while " +
"being master. Reasserting master role.",
@@ -315,18 +373,15 @@
}
}
-
-
@Override
public void handleRole(OFMessage m) throws SwitchStateException {
RoleReplyInfo rri = roleMan.extractOFRoleReply((OFRoleReply) m);
RoleRecvStatus rrs = roleMan.deliverRoleReply(rri);
if (rrs == RoleRecvStatus.MATCHED_SET_ROLE) {
if (rri.getRole() == RoleState.MASTER) {
- this.role = rri.getRole();
this.transitionToMasterSwitch();
} else if (rri.getRole() == RoleState.EQUAL ||
- rri.getRole() == RoleState.SLAVE) {
+ rri.getRole() == RoleState.SLAVE) {
this.transitionToEqualSwitch();
}
} else {
@@ -348,10 +403,9 @@
new RoleReplyInfo(r, null, m.getXid()));
if (rrs == RoleRecvStatus.MATCHED_SET_ROLE) {
if (r == RoleState.MASTER) {
- this.role = r;
this.transitionToMasterSwitch();
} else if (r == RoleState.EQUAL ||
- r == RoleState.SLAVE) {
+ r == RoleState.SLAVE) {
this.transitionToEqualSwitch();
}
} else {
@@ -369,8 +423,6 @@
return true;
}
-
-
@Override
public final void setAgent(OpenFlowAgent ag) {
if (this.agent == null) {
@@ -398,9 +450,8 @@
@Override
public List<OFPortDesc> getPorts() {
return this.ports.stream()
- .flatMap((portReply) -> (portReply.getEntries().stream()))
+ .flatMap(portReply -> portReply.getEntries().stream())
.collect(Collectors.toList());
- //return Collections.unmodifiableList(ports.getEntries());
}
@Override
@@ -408,13 +459,11 @@
return this.desc.getMfrDesc();
}
-
@Override
public String datapathDescription() {
return this.desc.getDpDesc();
}
-
@Override
public String hardwareDescription() {
return this.desc.getHwDesc();
@@ -430,20 +479,15 @@
return this.desc.getSerialNum();
}
-
@Override
public Device.Type deviceType() {
return Device.Type.SWITCH;
}
-
@Override
public String toString() {
return this.getClass().getName() + " [" + ((channel != null)
? channel.getRemoteAddress() : "?")
+ " DPID[" + ((getStringId() != null) ? getStringId() : "?") + "]]";
}
-
-
-
}