ONOS-3350 AbstractOFSwitch race fixes

Protecting callers with synchronized block during
role request and reply methods.

Change-Id: Ie82f84d1d462923c9f410e6950e846ee3b05551c
diff --git a/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java b/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
index 51a2ce4..b6ec574 100644
--- a/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
+++ b/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
@@ -29,14 +29,10 @@
 
     /**
      * Writes the message to the driver.
-     *
-     * Note:
-     * Calling {@link #sendMsg(OFMessage)} does NOT guarantee the messages to be
-     * transmitted on the wire in order, especially during role transition.
-     * The messages may be reordered at the switch side.
-     *
-     * Calling {@link #sendMsg(List)} guarantee the messages inside the list
-     * to be transmitted on the wire in order.
+     * <p>
+     * Note: Messages may be silently dropped/lost due to IOExceptions or
+     * role. If this is a concern, then a caller should use barriers.
+     * </p>
      *
      * @param msg the message to write
      */
@@ -44,6 +40,10 @@
 
     /**
      * Writes the OFMessage list to the driver.
+     * <p>
+     * Note: Messages may be silently dropped/lost due to IOExceptions or
+     * role. If this is a concern, then a caller should use barriers.
+     * </p>
      *
      * @param msgs the messages to be written
      */
diff --git a/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java b/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
index 2c19837..c717419 100644
--- a/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
+++ b/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.openflow.controller.driver;
 
+import com.google.common.collect.Lists;
 import org.jboss.netty.channel.Channel;
 import org.onlab.packet.IpAddress;
 import org.onosproject.net.Device;
@@ -46,6 +47,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
@@ -74,12 +76,14 @@
 
     private RoleHandler roleMan;
 
-    protected RoleState role;
+    // TODO this is accessed from multiple threads, but volatile may have performance implications
+    protected volatile RoleState role;
 
     protected OFFeaturesReply features;
     protected OFDescStatsReply desc;
 
-    List<OFMessage> messagesPendingMastership;
+    private final AtomicReference<List<OFMessage>> messagesPendingMastership
+            = new AtomicReference<>();
 
     @Override
     public void init(Dpid dpid, OFDescStatsReply desc, OFVersion ofv) {
@@ -104,15 +108,53 @@
 
     @Override
     public final void sendMsg(List<OFMessage> msgs) {
-        if (role == RoleState.MASTER && channel.isConnected()) {
+        /*
+           It is possible that in this block, we transition to SLAVE/EQUAL.
+           If this is the case, the supplied messages will race with the
+           RoleRequest message, and they could be rejected by the switch.
+           In the interest of performance, we will not protect this block with
+           a synchronization primitive, because the message would have just been
+           dropped anyway.
+        */
+        if (role == RoleState.MASTER) {
+            // fast path send when we are master
+
+            sendMsgsOnChannel(msgs);
+            return;
+        }
+        // check to see if mastership transition is in progress
+        synchronized (messagesPendingMastership) {
+            /*
+               messagesPendingMastership is used as synchronization variable for
+               all mastership related changes. In this block, mastership (including
+               role update) will have either occurred or not.
+            */
+            if (role == RoleState.MASTER) {
+                // transition to MASTER complete, send messages
+                sendMsgsOnChannel(msgs);
+                return;
+            }
+
+            List<OFMessage> messages = messagesPendingMastership.get();
+            if (messages != null) {
+                // we are transitioning to MASTER, so add messages to queue
+                messages.addAll(msgs);
+                log.debug("Enqueue message for switch {}. queue size after is {}",
+                          dpid, messages.size());
+            } else {
+                // not transitioning to MASTER
+                log.warn("Dropping message for switch {} (role: {}, connected: {}): {}",
+                         dpid, role, channel.isConnected(), msgs);
+            }
+        }
+    }
+
+    private void sendMsgsOnChannel(List<OFMessage> msgs) {
+        if (channel.isConnected()) {
             channel.write(msgs);
-        } else if (messagesPendingMastership != null) {
-            messagesPendingMastership.addAll(msgs);
-            log.debug("Enqueue message for switch {}. queue size after is {}",
-                      dpid, messagesPendingMastership.size());
         } else {
-            log.warn("Dropping message for switch {} (role: {}, connected: {}): {}",
-                     dpid, role, channel.isConnected(), msgs);
+            log.warn("Dropping messages for switch {} because channel is not connected: {}",
+                     dpid, msgs);
         }
     }
 
@@ -120,7 +162,7 @@
     public final void sendRoleRequest(OFMessage msg) {
         if (msg instanceof OFRoleRequest ||
                 msg instanceof OFNiciraControllerRoleRequest) {
-            channel.write(Collections.singletonList(msg));
+            sendMsgsOnChannel(Collections.singletonList(msg));
             return;
         }
         throw new IllegalArgumentException("Someone is trying to send " +
@@ -130,7 +172,7 @@
     @Override
     public final void sendHandshakeMessage(OFMessage message) {
         if (!this.isDriverHandshakeComplete()) {
-            channel.write(Collections.singletonList(message));
+            sendMsgsOnChannel(Collections.singletonList(message));
         }
     }
 
@@ -239,11 +281,16 @@
     @Override
     public final void transitionToMasterSwitch() {
         this.agent.transitionToMasterSwitch(dpid);
-        if (messagesPendingMastership != null) {
-            this.sendMsg(messagesPendingMastership);
-            log.debug("Sending {} pending messages to switch {}",
-                     messagesPendingMastership.size(), dpid);
-            messagesPendingMastership = null;
+        synchronized (messagesPendingMastership) {
+            List<OFMessage> messages = messagesPendingMastership.get();
+            if (messages != null) {
+                this.sendMsg(messages);
+                log.debug("Sending {} pending messages to switch {}",
+                          messages.size(), dpid);
+                messagesPendingMastership.set(null);
+            }
+            // perform role transition after clearing messages queue
+            this.role = RoleState.MASTER;
         }
     }
 
@@ -287,17 +334,27 @@
     @Override
     public void setRole(RoleState role) {
         try {
+            if (role == RoleState.SLAVE || role == RoleState.EQUAL) {
+                // perform role transition to SLAVE/EQUAL before sending role request
+                this.role = role;
+            }
             if (this.roleMan.sendRoleRequest(role, RoleRecvStatus.MATCHED_SET_ROLE)) {
                 log.debug("Sending role {} to switch {}", role, getStringId());
-                if (role == RoleState.SLAVE || role == RoleState.EQUAL) {
-                    this.role = role;
-                } else {
-                    if (messagesPendingMastership == null) {
-                        log.debug("Initializing new queue for switch {}", dpid);
-                        messagesPendingMastership = new ArrayList<>();
+                if (role == RoleState.MASTER) {
+                    synchronized (messagesPendingMastership) {
+                        if (messagesPendingMastership.get() == null) {
+                            log.debug("Initializing new message queue for switch {}", dpid);
+                            /*
+                               The presence of messagesPendingMastership indicates that
+                               a switch is currently transitioning to MASTER, but
+                               is still awaiting role reply from switch.
+                            */
+                            messagesPendingMastership.set(Lists.newArrayList());
+                        }
                     }
                 }
-            } else {
+            } else if (role == RoleState.MASTER) {
+                // role request not support; transition switch to MASTER
                 this.role = role;
             }
         } catch (IOException e) {
@@ -307,6 +364,7 @@
 
     @Override
     public void reassertRole() {
+        // TODO should messages be sent directly or queue during reassertion?
         if (this.getRole() == RoleState.MASTER) {
             log.warn("Received permission error from switch {} while " +
                     "being master. Reasserting master role.",
@@ -315,18 +373,15 @@
         }
     }
 
-
-
     @Override
     public void handleRole(OFMessage m) throws SwitchStateException {
         RoleReplyInfo rri = roleMan.extractOFRoleReply((OFRoleReply) m);
         RoleRecvStatus rrs = roleMan.deliverRoleReply(rri);
         if (rrs == RoleRecvStatus.MATCHED_SET_ROLE) {
             if (rri.getRole() == RoleState.MASTER) {
-                this.role = rri.getRole();
                 this.transitionToMasterSwitch();
             } else if (rri.getRole() == RoleState.EQUAL ||
-                    rri.getRole() == RoleState.SLAVE) {
+                       rri.getRole() == RoleState.SLAVE) {
                 this.transitionToEqualSwitch();
             }
         }  else {
@@ -348,10 +403,9 @@
                 new RoleReplyInfo(r, null, m.getXid()));
         if (rrs == RoleRecvStatus.MATCHED_SET_ROLE) {
             if (r == RoleState.MASTER) {
-                this.role = r;
                 this.transitionToMasterSwitch();
             } else if (r == RoleState.EQUAL ||
-                    r == RoleState.SLAVE) {
+                       r == RoleState.SLAVE) {
                 this.transitionToEqualSwitch();
             }
         } else {
@@ -369,8 +423,6 @@
         return true;
     }
 
-
-
     @Override
     public final void setAgent(OpenFlowAgent ag) {
         if (this.agent == null) {
@@ -398,9 +450,8 @@
     @Override
     public List<OFPortDesc> getPorts() {
         return this.ports.stream()
-                  .flatMap((portReply) -> (portReply.getEntries().stream()))
+                  .flatMap(portReply -> portReply.getEntries().stream())
                   .collect(Collectors.toList());
-        //return Collections.unmodifiableList(ports.getEntries());
     }
 
     @Override
@@ -408,13 +459,11 @@
         return this.desc.getMfrDesc();
     }
 
-
     @Override
     public String datapathDescription() {
         return this.desc.getDpDesc();
     }
 
-
     @Override
     public String hardwareDescription() {
         return this.desc.getHwDesc();
@@ -430,20 +479,15 @@
         return this.desc.getSerialNum();
     }
 
-
     @Override
     public Device.Type deviceType() {
         return Device.Type.SWITCH;
     }
 
-
     @Override
     public String toString() {
         return this.getClass().getName() + " [" + ((channel != null)
                 ? channel.getRemoteAddress() : "?")
                 + " DPID[" + ((getStringId() != null) ? getStringId() : "?") + "]]";
     }
-
-
-
 }