ONOS-1326: Added support for observing when node liveness status was last updated. Useful for detecting/debugging stability issues.
Change-Id: I8ffebcf3a09a51c6e3e7526986a0f05530ed757f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index 213d0f4..90fbd60 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -38,6 +38,7 @@
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
+import org.joda.time.DateTime;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
@@ -99,6 +100,7 @@
private Set<ControllerNode> seedNodes;
private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
+ private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
private NettyMessagingService messagingService = new NettyMessagingService();
private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/cluster/membership", "heartbeat-sender"));
@@ -131,7 +133,7 @@
seedNodes.forEach(node -> {
allNodes.put(node.id(), node);
- nodeStates.put(node.id(), State.INACTIVE);
+ updateState(node.id(), State.INACTIVE);
});
establishSelfIdentity();
@@ -216,7 +218,7 @@
checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
allNodes.put(node.id(), node);
- nodeStates.put(nodeId, State.INACTIVE);
+ updateState(nodeId, State.INACTIVE);
delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
return node;
}
@@ -231,12 +233,17 @@
}
}
+ private void updateState(NodeId nodeId, State newState) {
+ nodeStates.put(nodeId, newState);
+ nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
+ }
+
private void establishSelfIdentity() {
try {
IpAddress ip = findLocalIp();
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
allNodes.put(localNode.id(), localNode);
- nodeStates.put(localNode.id(), State.ACTIVE);
+ updateState(localNode.id(), State.ACTIVE);
log.info("Local Node: {}", localNode);
} catch (SocketException e) {
throw new IllegalStateException("Cannot determine local IP", e);
@@ -256,12 +263,12 @@
double phi = failureDetector.phi(node.id());
if (phi >= PHI_FAILURE_THRESHOLD) {
if (currentState == State.ACTIVE) {
- nodeStates.put(node.id(), State.INACTIVE);
+ updateState(node.id(), State.INACTIVE);
notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
}
} else {
if (currentState == State.INACTIVE) {
- nodeStates.put(node.id(), State.ACTIVE);
+ updateState(node.id(), State.ACTIVE);
notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
}
}
@@ -334,4 +341,8 @@
}
}
-}
+ @Override
+ public DateTime getLastUpdated(NodeId nodeId) {
+ return nodeStateLastUpdatedTimes.get(nodeId);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
index d32f7a8..55d877d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
@@ -18,6 +18,7 @@
import com.google.common.base.Optional;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import com.hazelcast.core.IMap;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberAttributeEvent;
@@ -28,6 +29,7 @@
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
+import org.joda.time.DateTime;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
@@ -63,6 +65,7 @@
private String listenerId;
private final MembershipListener listener = new InternalMembershipListener();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
+ private final Map<NodeId, DateTime> lastUpdatedTimes = Maps.newConcurrentMap();
private String nodesListenerId;
@@ -123,6 +126,11 @@
}
@Override
+ public DateTime getLastUpdated(NodeId nodeId) {
+ return lastUpdatedTimes.get(nodeId);
+ }
+
+ @Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
}
@@ -139,7 +147,7 @@
private synchronized ControllerNode addNode(DefaultControllerNode node) {
rawNodes.put(serialize(node.id()), serialize(node));
nodes.put(node.id(), Optional.of(node));
- states.put(node.id(), State.ACTIVE);
+ updateState(node.id(), State.ACTIVE);
return node;
}
@@ -153,6 +161,11 @@
return IpAddress.valueOf(member.getSocketAddress().getAddress());
}
+ private void updateState(NodeId nodeId, State newState) {
+ updateState(nodeId, newState);
+ lastUpdatedTimes.put(nodeId, DateTime.now());
+ }
+
// Interceptor for membership events.
private class InternalMembershipListener implements MembershipListener {
@Override
@@ -166,7 +179,7 @@
public void memberRemoved(MembershipEvent membershipEvent) {
log.info("Member {} removed", membershipEvent.getMember());
NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
- states.put(nodeId, State.INACTIVE);
+ updateState(nodeId, State.INACTIVE);
notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
}
@@ -178,4 +191,4 @@
memberAttributeEvent.getValue());
}
}
-}
+}
\ No newline at end of file