Added ability to track whether or not node has all components running fully.
Change-Id: Ib2b90c7a842976a3b3a9711367fa1eed43103b17
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index b537517..5c34270 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -18,7 +18,6 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -90,6 +89,7 @@
private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
+
private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/cluster/membership", "heartbeat-sender"));
private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
@@ -168,6 +168,11 @@
}
@Override
+ public void markFullyStarted(boolean started) {
+ updateState(localNode.id(), started ? State.READY : State.ACTIVE);
+ }
+
+ @Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
addNode(node);
@@ -201,13 +206,14 @@
.stream()
.filter(node -> !(node.id().equals(localNode.id())))
.collect(Collectors.toSet());
- byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
+ State state = nodeStates.get(localNode.id());
+ byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, state, peers));
peers.forEach((node) -> {
heartbeatToPeer(hbMessagePayload, node);
State currentState = nodeStates.get(node.id());
double phi = failureDetector.phi(node.id());
if (phi >= PHI_FAILURE_THRESHOLD) {
- if (currentState == State.ACTIVE) {
+ if (currentState.isActive()) {
updateState(node.id(), State.INACTIVE);
notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
}
@@ -225,7 +231,7 @@
private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
ControllerNode node = allNodes.get(nodeId);
- if (newState == State.ACTIVE) {
+ if (newState.isActive()) {
notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, node));
} else {
notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, node));
@@ -246,6 +252,7 @@
public void accept(Endpoint sender, byte[] message) {
HeartbeatMessage hb = SERIALIZER.decode(message);
failureDetector.report(hb.source().id());
+ updateState(hb.source().id(), hb.state);
hb.knownPeers().forEach(node -> {
allNodes.put(node.id(), node);
});
@@ -254,10 +261,12 @@
private static class HeartbeatMessage {
private ControllerNode source;
+ private State state;
private Set<ControllerNode> knownPeers;
- public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
+ public HeartbeatMessage(ControllerNode source, State state, Set<ControllerNode> members) {
this.source = source;
+ this.state = state != null ? state : State.ACTIVE;
this.knownPeers = ImmutableSet.copyOf(members);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
index f859ce4..0ab9704 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
@@ -22,7 +22,6 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
@@ -30,10 +29,10 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.ListenerRegistry;
-import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.IntentPartitionEvent;
import org.onosproject.net.intent.IntentPartitionEventListener;
import org.onosproject.net.intent.IntentPartitionService;
+import org.onosproject.net.intent.Key;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -173,7 +172,7 @@
private void rebalance() {
int activeNodes = (int) clusterService.getNodes()
.stream()
- .filter(node -> ControllerNode.State.ACTIVE == clusterService.getState(node.id()))
+ .filter(node -> clusterService.getState(node.id()).isActive())
.count();
int myShare = (int) Math.ceil((double) NUM_PARTITIONS / activeNodes);