Bug fixes for ONOS-3509

- Forwarding behavior added to {Device,Link}Store by ONOS-490
  cauesed false update information sent from ONOS node, which has been detached from the cluster,
  to be accepted by rest of the cluster after the detached node has rejoined cluster.

- Fix for periodic mastership check was left out
  when MastershipService#requestRoleFor(..) return value was changed to Future.

- Fix for triggerProbe() related messages getting dropped,
  right after STANDBY -> MASTER role change.

- Local state (connectedDevices) was preventing
  vertical (Core -> switch) Mastership state synchronization.

- Various debug log, comment added during investigation.

Change-Id: I777beadf04db8a879830a07bfdc7ab0e2279f190
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
index faf6dd4..d1fe670 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
@@ -31,20 +31,23 @@
      */
     public enum Type {
         /**
-         * Signifies that the leader has been elected. The event subject is the
-         * new leader.
+         * Signifies that the leader has been elected.
+         * The event subject is the new leader.
+         * This event does not guarantee accurate candidate information.
          */
         LEADER_ELECTED,
 
         /**
-         * Signifies that the leader has been re-elected. The event subject is the
-         * leader.
+         * Signifies that the leader has been re-elected.
+         * The event subject is the leader.
+         * This event does not guarantee accurate candidate information.
          */
         LEADER_REELECTED,
 
         /**
-         * Signifies that the leader has been booted and lost leadership. The
-         * event subject is the former leader.
+         * Signifies that the leader has been booted and lost leadership.
+         * The event subject is the former leader.
+         * This event does not guarantee accurate candidate information.
          */
         LEADER_BOOTED,
 
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 03281be..4ee9a00 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -491,9 +491,12 @@
                 if (Objects.equals(requested, mastershipService.getLocalRole(deviceId))) {
                     return;
                 } else {
-                    return;
-                    // FIXME roleManager got the device to comply, but doesn't agree with
+                    log.warn("Role mismatch on {}. set to {}, but store demands {}",
+                             deviceId, response, mastershipService.getLocalRole(deviceId));
+                    // roleManager got the device to comply, but doesn't agree with
                     // the store; use the store's view, then try to reassert.
+                    backgroundService.submit(() -> reassertRole(deviceId, mastershipService.getLocalRole(deviceId)));
+                    return;
                 }
             } else {
                 // we didn't get back what we asked for. Reelect someone else.
@@ -547,6 +550,7 @@
         provider.roleChanged(deviceId, newRole);
 
         if (newRole.equals(MastershipRole.MASTER)) {
+            log.debug("sent TriggerProbe({})", deviceId);
             // only trigger event when request was sent to provider
             provider.triggerProbe(deviceId);
         }
@@ -565,12 +569,19 @@
 
         MastershipRole myNextRole = nextRole;
         if (myNextRole == NONE) {
-            mastershipService.requestRoleFor(did);
-            MastershipTerm term = termService.getMastershipTerm(did);
-            if (term != null && localNodeId.equals(term.master())) {
-                myNextRole = MASTER;
-            } else {
-                myNextRole = STANDBY;
+            try {
+                mastershipService.requestRoleFor(did).get();
+                MastershipTerm term = termService.getMastershipTerm(did);
+                if (term != null && localNodeId.equals(term.master())) {
+                    myNextRole = MASTER;
+                } else {
+                    myNextRole = STANDBY;
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.error("Interrupted waiting for Mastership", e);
+            } catch (ExecutionException e) {
+                log.error("Encountered an error waiting for Mastership", e);
             }
         }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index 3bb6a70..b2ee832 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.cluster.impl;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 
@@ -163,7 +164,7 @@
     @Override
     public State getState(NodeId nodeId) {
         checkNotNull(nodeId, INSTANCE_ID_NULL);
-        return nodeStates.get(nodeId);
+        return MoreObjects.firstNonNull(nodeStates.get(nodeId), State.INACTIVE);
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 1882b1b..704df83 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -96,17 +96,26 @@
     protected EventDeliveryService eventDispatcher;
 
     private final Logger log = getLogger(getClass());
+
     private ScheduledExecutorService electionRunner;
     private ScheduledExecutorService lockExecutor;
     private ScheduledExecutorService staleLeadershipPurgeExecutor;
     private ScheduledExecutorService leadershipRefresher;
 
+    // leader for each topic
     private ConsistentMap<String, NodeId> leaderMap;
+    // list of candidates (includes chosen leader) for each topic
     private ConsistentMap<String, List<NodeId>> candidateMap;
 
     private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
+
+    // cached copy of leaderMap
+    // Note: Map value, Leadership, does not contain proper candidates info
     private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
+    // cached copy of candidateMap
+    // Note: Map value, Leadership, does not contain proper leader info
     private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
+
     private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
 
     private NodeId localNodeId;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java
index 2dae55b..2631fff 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/ECDeviceStore.java
@@ -286,6 +286,11 @@
             deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
             return refreshDeviceCache(providerId, deviceId);
         } else {
+            // Only forward for ConfigProvider
+            // Forwarding was added as a workaround for ONOS-490
+            if (!providerId.equals("cfg")) {
+                return null;
+            }
             DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription);
             return Futures.getUnchecked(
                     clusterCommunicator.sendAndReceive(deviceInjectedEvent,
@@ -413,6 +418,11 @@
             });
             deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
         } else {
+            // Only forward for ConfigProvider
+            // Forwarding was added as a workaround for ONOS-490
+            if (!providerId.equals("cfg")) {
+                return null;
+            }
             if (master == null) {
                 return Collections.emptyList();
             }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index a9a9098..03e28f1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -330,6 +330,11 @@
             }
 
         } else {
+            // Only forward for ConfigProvider
+            // Forwarding was added as a workaround for ONOS-490
+            if (!providerId.equals("cfg")) {
+                return null;
+            }
             // FIXME Temporary hack for NPE (ONOS-1171).
             // Proper fix is to implement forwarding to master on ConfigProvider
             // redo ONOS-490
@@ -579,6 +584,11 @@
             }
 
         } else {
+            // Only forward for ConfigProvider
+            // Forwarding was added as a workaround for ONOS-490
+            if (!providerId.equals("cfg")) {
+                return null;
+            }
             // FIXME Temporary hack for NPE (ONOS-1171).
             // Proper fix is to implement forwarding to master on ConfigProvider
             // redo ONOS-490
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/ECLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/ECLinkStore.java
index 243caf8..3d3ff9a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/ECLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/ECLinkStore.java
@@ -218,6 +218,13 @@
             linkDescriptions.compute(internalLinkKey, (k, v) -> createOrUpdateLinkInternal(v  , linkDescription));
             return refreshLinkCache(linkKey);
         } else {
+            // Only forward for ConfigProvider
+            // Forwarding was added as a workaround for ONOS-490
+            if (!providerId.equals("cfg")) {
+                return null;
+            }
+            // Temporary hack for NPE (ONOS-1171).
+            // Proper fix is to implement forwarding to master on ConfigProvider
             if (dstNodeId == null) {
                 return null;
             }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index 47aa85c..06402cf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -323,6 +323,11 @@
             }
 
         } else {
+            // Only forward for ConfigProvider
+            // Forwarding was added as a workaround for ONOS-490
+            if (!providerId.equals("cfg")) {
+                return null;
+            }
             // FIXME Temporary hack for NPE (ONOS-1171).
             // Proper fix is to implement forwarding to master on ConfigProvider
             // redo ONOS-490
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index c6fc693..44fbea0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -159,20 +159,12 @@
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
-        if (connectedDevices.add(deviceId)) {
-            return leadershipService.runForLeadership(leadershipTopic)
-                                    .thenApply(leadership -> {
-                                        return Objects.equal(localNodeId, leadership.leader())
-                                                ? MastershipRole.MASTER : MastershipRole.STANDBY;
-                                    });
-        } else {
-            NodeId leader = leadershipService.getLeader(leadershipTopic);
-            if (Objects.equal(localNodeId, leader)) {
-                return CompletableFuture.completedFuture(MastershipRole.MASTER);
-            } else {
-                return CompletableFuture.completedFuture(MastershipRole.STANDBY);
-            }
-        }
+        connectedDevices.add(deviceId);
+        return leadershipService.runForLeadership(leadershipTopic)
+                .thenApply(leadership -> {
+                    return Objects.equal(localNodeId, leadership.leader())
+                            ? MastershipRole.MASTER : MastershipRole.STANDBY;
+                });
     }
 
     @Override
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
index 098ff07..a877a66 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
@@ -97,6 +97,9 @@
     protected ExecutorService executorMsgs =
             Executors.newFixedThreadPool(2, groupedThreads("onos/of", "ctrl-msg-stats-%d"));
 
+    // messagesPendingMastership is used as synchronization variable for
+    // all mastership related changes. In this block, mastership (including
+    // role update) will have either occurred or not.
     private final AtomicReference<List<OFMessage>> messagesPendingMastership
             = new AtomicReference<>();
 
@@ -275,6 +278,8 @@
     public final void handleMessage(OFMessage m) {
         if (this.role == RoleState.MASTER || m instanceof OFPortStatus) {
             this.agent.processMessage(dpid, m);
+        } else {
+            log.trace("Dropping received message {}, was not MASTER", m);
         }
     }
 
@@ -309,7 +314,8 @@
         synchronized (messagesPendingMastership) {
             List<OFMessage> messages = messagesPendingMastership.get();
             if (messages != null) {
-                this.sendMsg(messages);
+                // Cannot use sendMsg here. It will only append to pending list.
+                sendMsgsOnChannel(messages);
                 log.debug("Sending {} pending messages to switch {}",
                           messages.size(), dpid);
                 messagesPendingMastership.set(null);
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index f9a6059..5bca54e 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -73,12 +73,12 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-
 import static org.onlab.util.Tools.groupedThreads;
 
 @Component(immediate = true)
@@ -118,11 +118,11 @@
     private final ExecutorService executorBarrier =
         Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d"));
 
-    protected ConcurrentHashMap<Dpid, OpenFlowSwitch> connectedSwitches =
+    protected ConcurrentMap<Dpid, OpenFlowSwitch> connectedSwitches =
             new ConcurrentHashMap<>();
-    protected ConcurrentHashMap<Dpid, OpenFlowSwitch> activeMasterSwitches =
+    protected ConcurrentMap<Dpid, OpenFlowSwitch> activeMasterSwitches =
             new ConcurrentHashMap<>();
-    protected ConcurrentHashMap<Dpid, OpenFlowSwitch> activeEqualSwitches =
+    protected ConcurrentMap<Dpid, OpenFlowSwitch> activeEqualSwitches =
             new ConcurrentHashMap<>();
 
     protected OpenFlowSwitchAgent agent = new OpenFlowSwitchAgent();
@@ -280,6 +280,7 @@
             executorMsgs.submit(new OFMessageHandler(dpid, msg));
             break;
         case ERROR:
+            log.debug("Received error message from {}: {}", dpid, msg);
             executorMsgs.submit(new OFMessageHandler(dpid, msg));
             break;
         case STATS_REPLY:
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
index 981686b..9885521 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
@@ -64,6 +64,7 @@
 import org.onosproject.net.config.NetworkConfigListener;
 import org.onosproject.net.config.NetworkConfigRegistry;
 import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceEvent.Type;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.DefaultTrafficSelector;
@@ -568,6 +569,9 @@
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
+            if (event.type() == Type.PORT_STATS_UPDATED) {
+                return;
+            }
             Device device = event.subject();
             Port port = event.port();
             if (device == null) {
diff --git a/providers/openflow/device/src/main/java/org/onosproject/provider/of/device/impl/OpenFlowDeviceProvider.java b/providers/openflow/device/src/main/java/org/onosproject/provider/of/device/impl/OpenFlowDeviceProvider.java
index 0950166..a1386ae 100644
--- a/providers/openflow/device/src/main/java/org/onosproject/provider/of/device/impl/OpenFlowDeviceProvider.java
+++ b/providers/openflow/device/src/main/java/org/onosproject/provider/of/device/impl/OpenFlowDeviceProvider.java
@@ -268,7 +268,7 @@
                 LOG.error("Unknown Mastership state : {}", newRole);
 
         }
-        LOG.debug("Accepting mastership role change for device {}", deviceId);
+        LOG.debug("Accepting mastership role change to {} for device {}", newRole, deviceId);
     }
 
 
@@ -297,7 +297,7 @@
     }
 
     private void pushPortMetrics(Dpid dpid, List<OFPortStatsEntry> portStatsEntries) {
-        DeviceId deviceId = DeviceId.deviceId(dpid.uri(dpid));
+        DeviceId deviceId = DeviceId.deviceId(Dpid.uri(dpid));
         Collection<PortStatistics> stats = buildPortStatistics(deviceId, portStatsEntries);
         providerService.updatePortStatistics(deviceId, stats);
     }
@@ -434,6 +434,7 @@
 
         @Override
         public void switchChanged(Dpid dpid) {
+            LOG.debug("switchChanged({})", dpid);
             if (providerService == null) {
                 return;
             }
@@ -442,17 +443,21 @@
             if (sw == null) {
                 return;
             }
-            providerService.updatePorts(did, buildPortDescriptions(sw));
+            final List<PortDescription> ports = buildPortDescriptions(sw);
+            LOG.debug("switchChanged({}) {}", did, ports);
+            providerService.updatePorts(did, ports);
         }
 
         @Override
         public void portChanged(Dpid dpid, OFPortStatus status) {
+            LOG.debug("portChanged({},{})", dpid, status);
             PortDescription portDescription = buildPortDescription(status);
             providerService.portStatusChanged(deviceId(uri(dpid)), portDescription);
         }
 
         @Override
         public void receivedRoleReply(Dpid dpid, RoleState requested, RoleState response) {
+            LOG.debug("receivedRoleReply({},{},{})", dpid, requested, response);
             MastershipRole request = roleOf(requested);
             MastershipRole reply = roleOf(response);
             providerService.receivedRoleReply(deviceId(uri(dpid)), request, reply);
@@ -503,7 +508,7 @@
                     LOG.debug("Ports Of{}", portsOf);
                     portsOf.forEach(
                         op -> {
-                            portDescs.add(buildPortDescription(type, (OFObject) op));
+                            portDescs.add(buildPortDescription(type, op));
                         }
                      );
                     });
diff --git a/providers/openflow/group/src/main/java/org/onosproject/provider/of/group/impl/OpenFlowGroupProvider.java b/providers/openflow/group/src/main/java/org/onosproject/provider/of/group/impl/OpenFlowGroupProvider.java
index b87d37b..2801f12 100644
--- a/providers/openflow/group/src/main/java/org/onosproject/provider/of/group/impl/OpenFlowGroupProvider.java
+++ b/providers/openflow/group/src/main/java/org/onosproject/provider/of/group/impl/OpenFlowGroupProvider.java
@@ -353,10 +353,12 @@
                 return;
             }
             if (isGroupSupported(sw)) {
-                GroupStatsCollector gsc = new GroupStatsCollector(
-                        controller.getSwitch(dpid), POLL_INTERVAL);
+                GroupStatsCollector gsc = new GroupStatsCollector(sw, POLL_INTERVAL);
                 gsc.start();
-                collectors.put(dpid, gsc);
+                GroupStatsCollector prevGsc = collectors.put(dpid, gsc);
+                if (prevGsc != null) {
+                    prevGsc.stop();
+                }
             }
 
             //figure out race condition