[VOL-4343] Processing status of ports in order with mastership and connection/disconnection to avoid inconsisten state
Change-Id: I731866b358fd4b6a7cfd296051e11f3d8690c5a7
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 0ed09e5..53a312d 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -628,7 +628,8 @@
checkNotNull(deviceId, DEVICE_ID_NULL);
checkValidity();
deviceLocalStatus.put(deviceId, new LocalStatus(false, Instant.now()));
- log.info("Device {} disconnected from this node", deviceId);
+ log.info("Device {} disconnected from this node: {}", deviceId,
+ clusterService.getLocalNode().id());
List<PortDescription> descs = store.getPortDescriptions(provider().id(), deviceId)
.map(desc -> ensurePortEnabledState(desc, false))
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
index 84d9deb..1b0a449 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
@@ -19,10 +19,12 @@
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.packet.Ethernet.TYPE_BSN;
import static org.onlab.packet.Ethernet.TYPE_LLDP;
+import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.openflow.controller.Dpid.uri;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -44,10 +46,13 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.onlab.util.GroupedThreadFactory;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.onlab.osgi.DefaultServiceDirectory;
@@ -197,7 +202,6 @@
private List<Set<OpenFlowClassifier>> messageClassifiersMapProducer =
new CopyOnWriteArrayList<Set<OpenFlowClassifier>>();
-
/**
* Lock held by take, poll, etc.
*/
@@ -213,7 +217,6 @@
*/
private final AtomicInteger totalCount = new AtomicInteger();
-
/**
* Single thread executor for OFMessage dispatching.
*
@@ -236,10 +239,12 @@
private final Deque<OFMessage> dispatchBacklog;
/**
- * Port Status executor to offload from the main thread the processing of port
- * status OF messages.
+ * Executor for runtime status events to offload from the main thread the
+ * processing of port status, mastership and connection OF messages.
+ * Executor is instantiated as a single thread executor guaranteeing processing
+ * of device status messages in order.
*/
- protected ExecutorService portStatusExecutor;
+ protected ExecutorService runtimeStatusExecutor;
/**
* Create a new unconnected OFChannelHandler.
@@ -252,8 +257,17 @@
this.pendingPortStatusMsg = new CopyOnWriteArrayList<>();
this.portDescReplies = new ArrayList<>();
duplicateDpidFound = Boolean.FALSE;
- portStatusExecutor = newSingleThreadExecutor(
- groupedThreads("onos/of-channel-handler", "port-status-%d", log));
+ String groupName = "onos/of-channel-handler";
+ String pattern = "runtime-status-%d";
+ ThreadFactory factory = new ThreadFactoryBuilder()
+ .setThreadFactory(groupedThreadFactory(groupName))
+ .setNameFormat(groupName.replace(GroupedThreadFactory.DELIMITER, "-") + "-" + pattern)
+ .setUncaughtExceptionHandler((t, e) -> {
+ log.error("Exception on " + t.getName(), e);
+ throw new UncheckedIOException(new IOException(e));
+ }).build();
+ runtimeStatusExecutor = newSingleThreadExecutor(
+ factory);
//Initialize queues and classifiers
dispatchBacklog = new LinkedBlockingDeque<>(BACKLOG_READ_BUFFER_DEFAULT);
for (int i = 0; i < NUM_OF_QUEUES; i++) {
@@ -267,15 +281,11 @@
}
}
-
-
// XXX S consider if necessary
public void disconnectSwitch() {
sw.disconnectSwitch();
}
-
-
//*************************
// Channel State Machine
//*************************
@@ -376,7 +386,6 @@
}
},
-
/**
* We are waiting for a features reply message. Once we receive it, the
* behavior depends on whether this is a 1.0 or 1.3 switch. For 1.0,
@@ -570,7 +579,6 @@
}
},
-
/**
* We are waiting for a OFDescriptionStat message from the switch.
* Once we receive any stat message we try to parse it. If it's not
@@ -675,7 +683,6 @@
}
},
-
/**
* We are waiting for the respective switch driver to complete its
* configuration. Notice that we do not consider this to be part of the main
@@ -692,8 +699,6 @@
// will never be called. We override processOFMessage
}
-
-
@Override
void processOFMessage(OFChannelHandler h, OFMessage m)
throws IOException, SwitchStateException {
@@ -827,7 +832,6 @@
}
},
-
/**
* This controller is in MASTER role for this switch. We enter this state
* after requesting and winning control from the global registry.
@@ -846,20 +850,19 @@
throws IOException, SwitchStateException {
// if we get here, then the error message is for something else
if (m.getErrType() == OFErrorType.BAD_REQUEST &&
- (((OFBadRequestErrorMsg) m).getCode() ==
- OFBadRequestCode.EPERM ||
- ((OFBadRequestErrorMsg) m).getCode() ==
- OFBadRequestCode.IS_SLAVE)) {
+ (((OFBadRequestErrorMsg) m).getCode() == OFBadRequestCode.EPERM ||
+ ((OFBadRequestErrorMsg) m).getCode() == OFBadRequestCode.IS_SLAVE)) {
// We are the master controller and the switch returned
// a permission error. This is a likely indicator that
- // the switch thinks we are slave. Reassert our
- // role
+ // the switch thinks we are slave. Reassert our role
// FIXME: this could be really bad during role transitions
// if two controllers are master (even if its only for
// a brief period). We might need to see if these errors
// persist before we reassert
-
- h.sw.reassertRole();
+ // Scheduling in the executor to keep in line with other status events.
+ h.runtimeStatusExecutor.submit(() -> {
+ h.sw.reassertRole();
+ });
} else if (m.getErrType() == OFErrorType.FLOW_MOD_FAILED &&
((OFFlowModFailedErrorMsg) m).getCode() ==
OFFlowModFailedCode.ALL_TABLES_FULL) {
@@ -910,32 +913,39 @@
@Override
void processOFRoleReply(OFChannelHandler h, OFRoleReply m)
throws SwitchStateException {
- h.sw.handleRole(m);
+ h.runtimeStatusExecutor.execute(() -> {
+ try {
+ h.sw.handleRole(m);
+ } catch (SwitchStateException e) {
+ log.error("SwitchStateException while processing " +
+ "role reply message {}", m, e);
+ log.error("Disconnecting switch {} due to switch state error: {}",
+ h.getSwitchInfoString(), e.getMessage());
+ h.channel.close();
+ }
+ });
}
@Override
void processOFPortStatus(OFChannelHandler h, OFPortStatus m)
throws SwitchStateException {
// Handing over processing of port status messages to a thread to avoid
- // getting blocked on the main thread and resulting other OF
- // message being delayed.
- // Ordering of the port status messages is guaranteed by portStatsExecutor
- // being a single threaded executor.
- // This executor will execute concurrently to the netty thread;
- // meaning that the order is no more guaranteed like it was in the
- // past between port status handling and the other events handled
- // inline to the netty thread.
- // This also remove guarantees of ordered processing of ROLE_CHANGED
- // during active state, this should have no effect given that mastership
- // is ignored here: https://github.com/opennetworkinglab/onos/blob/master/
- // protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/
- // driver/AbstractOpenFlowSwitch.java#L279
- h.portStatusExecutor.submit(() -> {
+ // getting blocked on the main thread and resulting other OF message being delayed.
+ // Ordering of the status messages is guaranteed by runtimeStatsExecutor being a single
+ // threaded executor. This executor will execute concurrently to the netty thread; meaning
+ // that the order is no more guaranteed like it was in the past between different
+ // status handling messages and other messages: statistics (port, flows, meters, groups)
+ // barriers, idle, features, packet-ins handled inline to the netty thread. This executor
+ // will only apply to messages during the ACTIVE state of the connection.
+ h.runtimeStatusExecutor.execute(() -> {
try {
handlePortStatusMessage(h, m, true);
} catch (SwitchStateException e) {
log.error("SwitchStateException while processing " +
"port status message {}", m, e);
+ log.error("Disconnecting switch {} due to switch state error: {}",
+ h.getSwitchInfoString(), e.getMessage());
+ h.channel.close();
}
});
//h.dispatchMessage(m);
@@ -1079,8 +1089,6 @@
h.channel.disconnect();
}
-
-
/**
* Handles all pending port status messages before a switch is declared
* activated in MASTER or EQUAL role. Note that since this handling
@@ -1155,7 +1163,6 @@
h.sw.handleMessage(m);
}
-
/**
* Process an OF message received on the channel and
* update state accordingly.
@@ -1359,8 +1366,6 @@
}
}
-
-
//*************************
// Channel handler methods
//*************************
@@ -1400,7 +1405,6 @@
@Override
public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
-
log.info("Switch disconnected callback for sw:{}. Cleaning up ...",
getSwitchInfoString());
@@ -1409,29 +1413,31 @@
dispatcher = null;
}
- if (thisdpid != 0) {
- if (!duplicateDpidFound) {
- // if the disconnected switch (on this ChannelHandler)
- // was not one with a duplicate-dpid, it is safe to remove all
- // state for it at the controller. Notice that if the disconnected
- // switch was a duplicate-dpid, calling the method below would clear
- // all state for the original switch (with the same dpid),
- // which we obviously don't want.
- log.info("{}:removal called", getSwitchInfoString());
- if (sw != null) {
- sw.removeConnectedSwitch();
- }
- } else {
- // A duplicate was disconnected on this ChannelHandler,
- // this is the same switch reconnecting, but the original state was
- // not cleaned up - XXX check liveness of original ChannelHandler
- log.info("{}:duplicate found", getSwitchInfoString());
- duplicateDpidFound = Boolean.FALSE;
- }
- } else {
- log.warn("no dpid in channelHandler registered for "
- + "disconnected switch {}", getSwitchInfoString());
- }
+ if (thisdpid != 0) {
+ if (!duplicateDpidFound) {
+ // if the disconnected switch (on this ChannelHandler)
+ // was not one with a duplicate-dpid, it is safe to remove all
+ // state for it at the controller. Notice that if the disconnected
+ // switch was a duplicate-dpid, calling the method below would clear
+ // all state for the original switch (with the same dpid),
+ // which we obviously don't want.
+ runtimeStatusExecutor.submit(() -> {
+ log.info("{}:removal called", getSwitchInfoString());
+ if (sw != null) {
+ sw.removeConnectedSwitch();
+ }
+ });
+ } else {
+ // A duplicate was disconnected on this ChannelHandler,
+ // this is the same switch reconnecting, but the original state was
+ // not cleaned up - XXX check liveness of original ChannelHandler
+ log.info("{}:duplicate found", getSwitchInfoString());
+ duplicateDpidFound = Boolean.FALSE;
+ }
+ } else {
+ log.warn("no dpid in channelHandler registered for "
+ + "disconnected switch {}", getSwitchInfoString());
+ }
}
@Override