[VOL-4343] Processing status of ports in order with mastership and connection/disconnection to avoid inconsisten state

Change-Id: I731866b358fd4b6a7cfd296051e11f3d8690c5a7
(cherry picked from commit b0b93ac609e7860d5fd15703a50a0180fbf7a176)
(cherry picked from commit 727ed68ed3edc4512e353af814abe327ee25f143)
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 0ed09e5..53a312d 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -628,7 +628,8 @@
             checkNotNull(deviceId, DEVICE_ID_NULL);
             checkValidity();
             deviceLocalStatus.put(deviceId, new LocalStatus(false, Instant.now()));
-            log.info("Device {} disconnected from this node", deviceId);
+            log.info("Device {} disconnected from this node: {}", deviceId,
+                     clusterService.getLocalNode().id());
 
             List<PortDescription> descs = store.getPortDescriptions(provider().id(), deviceId)
                     .map(desc -> ensurePortEnabledState(desc, false))
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
index 84d9deb..1b0a449 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
@@ -19,10 +19,12 @@
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.packet.Ethernet.TYPE_BSN;
 import static org.onlab.packet.Ethernet.TYPE_LLDP;
+import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.openflow.controller.Dpid.uri;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -44,10 +46,13 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.onlab.util.GroupedThreadFactory;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.onlab.osgi.DefaultServiceDirectory;
@@ -197,7 +202,6 @@
     private List<Set<OpenFlowClassifier>> messageClassifiersMapProducer =
             new CopyOnWriteArrayList<Set<OpenFlowClassifier>>();
 
-
     /**
      * Lock held by take, poll, etc.
      */
@@ -213,7 +217,6 @@
      */
     private final AtomicInteger totalCount = new AtomicInteger();
 
-
     /**
      * Single thread executor for OFMessage dispatching.
      *
@@ -236,10 +239,12 @@
     private final Deque<OFMessage> dispatchBacklog;
 
     /**
-     * Port Status executor to offload from the main thread the processing of port
-     * status OF messages.
+     * Executor for runtime status events to offload from the main thread the
+     * processing of port status, mastership and connection OF messages.
+     * Executor is instantiated as a single thread executor guaranteeing processing
+     * of device status messages in order.
      */
-    protected ExecutorService portStatusExecutor;
+    protected ExecutorService runtimeStatusExecutor;
 
     /**
      * Create a new unconnected OFChannelHandler.
@@ -252,8 +257,17 @@
         this.pendingPortStatusMsg = new CopyOnWriteArrayList<>();
         this.portDescReplies = new ArrayList<>();
         duplicateDpidFound = Boolean.FALSE;
-        portStatusExecutor = newSingleThreadExecutor(
-                groupedThreads("onos/of-channel-handler", "port-status-%d", log));
+        String groupName = "onos/of-channel-handler";
+        String pattern = "runtime-status-%d";
+        ThreadFactory factory = new ThreadFactoryBuilder()
+                .setThreadFactory(groupedThreadFactory(groupName))
+                .setNameFormat(groupName.replace(GroupedThreadFactory.DELIMITER, "-") + "-" + pattern)
+                .setUncaughtExceptionHandler((t, e) -> {
+                    log.error("Exception on " + t.getName(), e);
+                    throw new UncheckedIOException(new IOException(e));
+                }).build();
+        runtimeStatusExecutor = newSingleThreadExecutor(
+                factory);
         //Initialize queues and classifiers
         dispatchBacklog = new LinkedBlockingDeque<>(BACKLOG_READ_BUFFER_DEFAULT);
         for (int i = 0; i < NUM_OF_QUEUES; i++) {
@@ -267,15 +281,11 @@
         }
     }
 
-
-
     // XXX S consider if necessary
     public void disconnectSwitch() {
         sw.disconnectSwitch();
     }
 
-
-
     //*************************
     //  Channel State Machine
     //*************************
@@ -376,7 +386,6 @@
             }
         },
 
-
         /**
          * We are waiting for a features reply message. Once we receive it, the
          * behavior depends on whether this is a 1.0 or 1.3 switch. For 1.0,
@@ -570,7 +579,6 @@
             }
         },
 
-
         /**
          * We are waiting for a OFDescriptionStat message from the switch.
          * Once we receive any stat message we try to parse it. If it's not
@@ -675,7 +683,6 @@
             }
         },
 
-
         /**
          * We are waiting for the respective switch driver to complete its
          * configuration. Notice that we do not consider this to be part of the main
@@ -692,8 +699,6 @@
                 // will never be called. We override processOFMessage
             }
 
-
-
             @Override
             void processOFMessage(OFChannelHandler h, OFMessage m)
                     throws IOException, SwitchStateException {
@@ -827,7 +832,6 @@
             }
         },
 
-
         /**
          * This controller is in MASTER role for this switch. We enter this state
          * after requesting and winning control from the global registry.
@@ -846,20 +850,19 @@
                     throws IOException, SwitchStateException {
                 // if we get here, then the error message is for something else
                 if (m.getErrType() == OFErrorType.BAD_REQUEST &&
-                        (((OFBadRequestErrorMsg) m).getCode() ==
-                           OFBadRequestCode.EPERM ||
-                        ((OFBadRequestErrorMsg) m).getCode() ==
-                           OFBadRequestCode.IS_SLAVE)) {
+                        (((OFBadRequestErrorMsg) m).getCode() == OFBadRequestCode.EPERM ||
+                        ((OFBadRequestErrorMsg) m).getCode() == OFBadRequestCode.IS_SLAVE)) {
                     // We are the master controller and the switch returned
                     // a permission error. This is a likely indicator that
-                    // the switch thinks we are slave. Reassert our
-                    // role
+                    // the switch thinks we are slave. Reassert our role
                     // FIXME: this could be really bad during role transitions
                     // if two controllers are master (even if its only for
                     // a brief period). We might need to see if these errors
                     // persist before we reassert
-
-                    h.sw.reassertRole();
+                    // Scheduling in the executor to keep in line with other status events.
+                    h.runtimeStatusExecutor.submit(() -> {
+                        h.sw.reassertRole();
+                    });
                 } else if (m.getErrType() == OFErrorType.FLOW_MOD_FAILED &&
                         ((OFFlowModFailedErrorMsg) m).getCode() ==
                         OFFlowModFailedCode.ALL_TABLES_FULL) {
@@ -910,32 +913,39 @@
             @Override
             void processOFRoleReply(OFChannelHandler h, OFRoleReply m)
                     throws SwitchStateException {
-                h.sw.handleRole(m);
+                h.runtimeStatusExecutor.execute(() -> {
+                    try {
+                        h.sw.handleRole(m);
+                    } catch (SwitchStateException e) {
+                        log.error("SwitchStateException while processing " +
+                                          "role reply message {}", m, e);
+                        log.error("Disconnecting switch {} due to switch state error: {}",
+                                  h.getSwitchInfoString(), e.getMessage());
+                        h.channel.close();
+                    }
+                });
             }
 
             @Override
             void processOFPortStatus(OFChannelHandler h, OFPortStatus m)
                     throws SwitchStateException {
                 // Handing over processing of port status messages to a thread to avoid
-                // getting blocked on the main thread and resulting other OF
-                // message being delayed.
-                // Ordering of the port status messages is guaranteed by portStatsExecutor
-                // being a single threaded executor.
-                // This executor will execute concurrently to the netty thread;
-                // meaning that the order is no more guaranteed like it was in the
-                // past between port status handling and the other events handled
-                // inline to the netty thread.
-                // This also remove guarantees of ordered processing of ROLE_CHANGED
-                // during active state, this should have no effect given that mastership
-                // is ignored here: https://github.com/opennetworkinglab/onos/blob/master/
-                // protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/
-                // driver/AbstractOpenFlowSwitch.java#L279
-                h.portStatusExecutor.submit(() -> {
+                // getting blocked on the main thread and resulting other OF message being delayed.
+                // Ordering of the status messages is guaranteed by runtimeStatsExecutor being a single
+                // threaded executor. This executor will execute concurrently to the netty thread; meaning
+                // that the order is no  more guaranteed like it was in the past between different
+                // status handling messages and other messages: statistics (port, flows, meters, groups)
+                // barriers, idle, features, packet-ins handled inline to the netty thread. This executor
+                // will only apply to messages during the ACTIVE state of the connection.
+                h.runtimeStatusExecutor.execute(() -> {
                     try {
                         handlePortStatusMessage(h, m, true);
                     } catch (SwitchStateException e) {
                         log.error("SwitchStateException while processing " +
                                           "port status message {}", m, e);
+                        log.error("Disconnecting switch {} due to switch state error: {}",
+                                  h.getSwitchInfoString(), e.getMessage());
+                        h.channel.close();
                     }
                 });
                 //h.dispatchMessage(m);
@@ -1079,8 +1089,6 @@
             h.channel.disconnect();
         }
 
-
-
         /**
          * Handles all pending port status messages before a switch is declared
          * activated in MASTER or EQUAL role. Note that since this handling
@@ -1155,7 +1163,6 @@
             h.sw.handleMessage(m);
         }
 
-
         /**
          * Process an OF message received on the channel and
          * update state accordingly.
@@ -1359,8 +1366,6 @@
         }
     }
 
-
-
     //*************************
     //  Channel handler methods
     //*************************
@@ -1400,7 +1405,6 @@
     @Override
     public void channelInactive(ChannelHandlerContext ctx)
             throws Exception {
-
         log.info("Switch disconnected callback for sw:{}. Cleaning up ...",
                  getSwitchInfoString());
 
@@ -1409,29 +1413,31 @@
             dispatcher = null;
         }
 
-         if (thisdpid != 0) {
-             if (!duplicateDpidFound) {
-                 // if the disconnected switch (on this ChannelHandler)
-                 // was not one with a duplicate-dpid, it is safe to remove all
-                 // state for it at the controller. Notice that if the disconnected
-                 // switch was a duplicate-dpid, calling the method below would clear
-                 // all state for the original switch (with the same dpid),
-                 // which we obviously don't want.
-                 log.info("{}:removal called", getSwitchInfoString());
-                 if (sw != null) {
-                     sw.removeConnectedSwitch();
-                 }
-             } else {
-                 // A duplicate was disconnected on this ChannelHandler,
-                 // this is the same switch reconnecting, but the original state was
-                 // not cleaned up - XXX check liveness of original ChannelHandler
-                 log.info("{}:duplicate found", getSwitchInfoString());
-                 duplicateDpidFound = Boolean.FALSE;
-             }
-         } else {
-             log.warn("no dpid in channelHandler registered for "
-                     + "disconnected switch {}", getSwitchInfoString());
-         }
+        if (thisdpid != 0) {
+            if (!duplicateDpidFound) {
+                // if the disconnected switch (on this ChannelHandler)
+                // was not one with a duplicate-dpid, it is safe to remove all
+                // state for it at the controller. Notice that if the disconnected
+                // switch was a duplicate-dpid, calling the method below would clear
+                // all state for the original switch (with the same dpid),
+                // which we obviously don't want.
+                runtimeStatusExecutor.submit(() -> {
+                    log.info("{}:removal called", getSwitchInfoString());
+                    if (sw != null) {
+                        sw.removeConnectedSwitch();
+                    }
+                });
+            } else {
+                // A duplicate was disconnected on this ChannelHandler,
+                // this is the same switch reconnecting, but the original state was
+                // not cleaned up - XXX check liveness of original ChannelHandler
+                log.info("{}:duplicate found", getSwitchInfoString());
+                duplicateDpidFound = Boolean.FALSE;
+            }
+        } else {
+            log.warn("no dpid in channelHandler registered for "
+                             + "disconnected switch {}", getSwitchInfoString());
+        }
     }
 
     @Override