Remove unused distributed cluster store
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Modified;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.packet.IpAddress;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cfg.ConfigProperty;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ClusterStore;
-import org.onosproject.cluster.ClusterStoreDelegate;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.ControllerNode.State;
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.Node;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.Version;
-import org.onosproject.core.VersionService;
-import org.osgi.service.component.ComponentContext;
-import org.slf4j.Logger;
-import java.time.Instant;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
-import static;
-import static;
-import static;
-import static;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
-import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
-import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_READY;
-import static org.slf4j.LoggerFactory.getLogger;
-@Component(enabled = false)
- * Distributed cluster nodes store that employs an accrual failure
- * detector to identify cluster member up/down status.
- */
-public class DistributedClusterStore
- extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
- implements ClusterStore {
- private static final Logger log = getLogger(DistributedClusterStore.class);
- public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
- private static final int DEFAULT_HEARTBEAT_INTERVAL = 100;
- @Property(name = "heartbeatInterval", intValue = DEFAULT_HEARTBEAT_INTERVAL,
- label = "Interval time to send heartbeat to other controller nodes (millisecond)")
- private int heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
- private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;
- @Property(name = "phiFailureThreshold", intValue = DEFAULT_PHI_FAILURE_THRESHOLD,
- label = "the value of Phi threshold to detect accrual failure")
- private int phiFailureThreshold = DEFAULT_PHI_FAILURE_THRESHOLD;
- private static final long DEFAULT_MIN_STANDARD_DEVIATION_MILLIS = 50;
- @Property(name = "minStandardDeviationMillis", longValue = DEFAULT_MIN_STANDARD_DEVIATION_MILLIS,
- label = "The minimum standard deviation to take into account when computing the Phi value")
- private long minStandardDeviationMillis = DEFAULT_MIN_STANDARD_DEVIATION_MILLIS;
- private static final Serializer SERIALIZER = Serializer.using(
- KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
- .register(HeartbeatMessage.class)
- .build("ClusterStore"));
- private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
- private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
- private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
- private final Map<NodeId, Version> nodeVersions = Maps.newConcurrentMap();
- private final Map<NodeId, Instant> nodeLastUpdatedTimes = Maps.newConcurrentMap();
- private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/cluster/membership", "heartbeat-sender", log));
- private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
- groupedThreads("onos/cluster/membership", "heartbeat-receiver", log));
- private PhiAccrualFailureDetector failureDetector;
- private ControllerNode localNode;
- private Version localVersion;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected VersionService versionService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataService clusterMetadataService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MessagingService messagingService;
- // This must be optional to avoid a cyclic dependency
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
- bind = "bindComponentConfigService",
- unbind = "unbindComponentConfigService",
- policy = ReferencePolicy.DYNAMIC)
- protected ComponentConfigService cfgService;
- /**
- * Hook for wiring up optional reference to a service.
- *
- * @param service service being announced
- */
- protected void bindComponentConfigService(ComponentConfigService service) {
- if (cfgService == null) {
- cfgService = service;
- cfgService.registerProperties(getClass());
- readComponentConfiguration();
- }
- }
- /**
- * Hook for unwiring optional reference to a service.
- *
- * @param service service being withdrawn
- */
- protected void unbindComponentConfigService(ComponentConfigService service) {
- if (cfgService == service) {
- cfgService.unregisterProperties(getClass(), false);
- cfgService = null;
- }
- }
- @Activate
- public void activate() {
- localNode = clusterMetadataService.getLocalNode();
- localVersion = versionService.version();
- nodeVersions.put(, localVersion);
- messagingService.registerHandler(HEARTBEAT_MESSAGE,
- new HeartbeatMessageHandler(), heartBeatMessageHandler);
- failureDetector = new PhiAccrualFailureDetector(minStandardDeviationMillis);
- heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
- heartbeatInterval, TimeUnit.MILLISECONDS);
- }
- @Deactivate
- public void deactivate() {
- messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
- heartBeatSender.shutdownNow();
- heartBeatMessageHandler.shutdownNow();
- }
- @Modified
- public void modified(ComponentContext context) {
- readComponentConfiguration();
- }
- @Override
- public void setDelegate(ClusterStoreDelegate delegate) {
- checkNotNull(delegate, "Delegate cannot be null");
- this.delegate = delegate;
- }
- @Override
- public void unsetDelegate(ClusterStoreDelegate delegate) {
- this.delegate = null;
- }
- @Override
- public boolean hasDelegate() {
- return this.delegate != null;
- }
- @Override
- public ControllerNode getLocalNode() {
- return localNode;
- }
- @Override
- public Set<ControllerNode> getNodes() {
- return ImmutableSet.copyOf(allNodes.values());
- }
- @Override
- public Set<Node> getStorageNodes() {
- return ImmutableSet.of();
- }
- @Override
- public ControllerNode getNode(NodeId nodeId) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return allNodes.get(nodeId);
- }
- @Override
- public State getState(NodeId nodeId) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return firstNonNull(nodeStates.get(nodeId), State.INACTIVE);
- }
- @Override
- public Version getVersion(NodeId nodeId) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return nodeVersions.get(nodeId);
- }
- @Override
- public void markFullyStarted(boolean started) {
- updateNode(, started ? State.READY : State.ACTIVE, null);
- }
- @Override
- public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
- addNode(node);
- return node;
- }
- @Override
- public void removeNode(NodeId nodeId) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- ControllerNode node = allNodes.remove(nodeId);
- if (node != null) {
- nodeStates.remove(nodeId);
- nodeVersions.remove(nodeId);
- notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
- }
- }
- private void addNode(ControllerNode node) {
- allNodes.put(, node);
- updateNode(, node.equals(localNode) ? State.ACTIVE : State.INACTIVE, null);
- notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
- }
- private void updateNode(NodeId nodeId, State newState, Version newVersion) {
- State currentState = nodeStates.get(nodeId);
- Version currentVersion = nodeVersions.get(nodeId);
- if (!Objects.equals(currentState, newState)
- || (newVersion != null && !Objects.equals(currentVersion, newVersion))) {
- nodeStates.put(nodeId, newState);
- if (newVersion != null) {
- nodeVersions.put(nodeId, newVersion);
- }
- nodeLastUpdatedTimes.put(nodeId,;
- notifyChange(nodeId, currentState, newState, currentVersion, newVersion);
- }
- }
- private void heartbeat() {
- try {
- Set<ControllerNode> peers = allNodes.values()
- .stream()
- .filter(node -> !(
- .collect(Collectors.toSet());
- State state = nodeStates.get(;
- byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, state, localVersion));
- peers.forEach((node) -> {
- heartbeatToPeer(hbMessagePayload, node);
- State currentState = nodeStates.get(;
- double phi = failureDetector.phi(;
- if (phi >= phiFailureThreshold) {
- if (currentState.isActive()) {
- updateNode(, State.INACTIVE, null);
- failureDetector.reset(;
- }
- } else {
- if (currentState == State.INACTIVE) {
- updateNode(, State.ACTIVE, null);
- }
- }
- });
- } catch (Exception e) {
- log.debug("Failed to send heartbeat", e);
- }
- }
- private void notifyChange(NodeId nodeId, State oldState, State newState, Version oldVersion, Version newVersion) {
- if (oldState != newState || !Objects.equals(oldVersion, newVersion)) {
- ControllerNode node = allNodes.get(nodeId);
- // Either this node or that node is no longer part of the same cluster
- if (node == null) {
- log.debug("Could not find node {} in the cluster, ignoring state change", nodeId);
- return;
- }
- ClusterEvent.Type type = newState == State.READY ? INSTANCE_READY :
- notifyDelegate(new ClusterEvent(type, node));
- }
- }
- private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
- Endpoint remoteEp = new Endpoint(peer.ip(), peer.tcpPort());
- messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload).whenComplete((result, error) -> {
- if (error != null) {
- log.trace("Sending heartbeat to {} failed", remoteEp, error);
- }
- });
- }
- private class HeartbeatMessageHandler implements BiConsumer<Endpoint, byte[]> {
- @Override
- public void accept(Endpoint sender, byte[] message) {
- HeartbeatMessage hb = SERIALIZER.decode(message);
- if (clusterMetadataService.getClusterMetadata().getNodes().contains(hb.source())) {
- // Avoid reporting heartbeats that have been enqueued by setting a minimum interval.
- long heartbeatTime = System.currentTimeMillis();
- long lastHeartbeatTime = failureDetector.getLastHeartbeatTime(hb.source().id());
- if (heartbeatTime - lastHeartbeatTime > heartbeatInterval / 2) {
-, heartbeatTime);
- }
- updateNode(hb.source().id(), hb.state, hb.version);
- }
- }
- }
- private static class HeartbeatMessage {
- private ControllerNode source;
- private State state;
- private Version version;
- public HeartbeatMessage(ControllerNode source, State state, Version version) {
- this.source = source;
- this.state = state != null ? state : State.ACTIVE;
- this.version = version;
- }
- public ControllerNode source() {
- return source;
- }
- }
- @Override
- public Instant getLastUpdatedInstant(NodeId nodeId) {
- return nodeLastUpdatedTimes.get(nodeId);
- }
- /**
- * Extracts properties from the component configuration.
- *
- */
- private void readComponentConfiguration() {
- Set<ConfigProperty> configProperties = cfgService.getProperties(getClass().getName());
- for (ConfigProperty property : configProperties) {
- if ("heartbeatInterval".equals( {
- String s = property.value();
- if (s == null) {
-"Heartbeat interval time is not configured, default value is {}",
- } else {
- int newHeartbeatInterval = isNullOrEmpty(s) ? DEFAULT_HEARTBEAT_INTERVAL
- : Integer.parseInt(s.trim());
- if (newHeartbeatInterval > 0 && heartbeatInterval != newHeartbeatInterval) {
- heartbeatInterval = newHeartbeatInterval;
- restartHeartbeatSender();
- }
-"Configured. Heartbeat interval time is configured to {}",
- heartbeatInterval);
- }
- }
- if ("phiFailureThreshold".equals( {
- String s = property.value();
- if (s == null) {
-"Phi failure threshold is not configured, default value is {}",
- } else {
- int newPhiFailureThreshold = isNullOrEmpty(s) ? DEFAULT_HEARTBEAT_INTERVAL
- : Integer.parseInt(s.trim());
- setPhiFailureThreshold(newPhiFailureThreshold);
-"Configured. Phi failure threshold is configured to {}",
- phiFailureThreshold);
- }
- }
- if ("minStandardDeviationMillis".equals( {
- String s = property.value();
- if (s == null) {
-"Minimum standard deviation is not configured, default value is {}",
- } else {
- long newMinStandardDeviationMillis = isNullOrEmpty(s)
- : Long.parseLong(s.trim());
- setMinStandardDeviationMillis(newMinStandardDeviationMillis);
-"Configured. Minimum standard deviation is configured to {}",
- newMinStandardDeviationMillis);
- }
- }
- }
- }
- /**
- * Sets heartbeat interval between the termination of one execution of heartbeat
- * and the commencement of the next.
- *
- * @param interval term between each heartbeat
- */
- private void setHeartbeatInterval(int interval) {
- try {
- checkArgument(interval > 0, "Interval must be greater than zero");
- heartbeatInterval = interval;
- } catch (IllegalArgumentException e) {
- log.warn(e.getMessage());
- }
- }
- /**
- * Sets Phi failure threshold.
- * Phi is based on a paper titled: "The φ Accrual Failure Detector" by Hayashibara, et al.
- *
- * @param threshold
- */
- private void setPhiFailureThreshold(int threshold) {
- phiFailureThreshold = threshold;
- }
- /**
- * Sets the minimum standard deviation milliseconds.
- *
- * @param minStandardDeviationMillis the updated minimum standard deviation
- */
- private void setMinStandardDeviationMillis(long minStandardDeviationMillis) {
- this.minStandardDeviationMillis = minStandardDeviationMillis;
- try {
- failureDetector = new PhiAccrualFailureDetector(minStandardDeviationMillis);
- } catch (IllegalArgumentException e) {
- log.warn(e.getMessage());
- this.minStandardDeviationMillis = DEFAULT_MIN_STANDARD_DEVIATION_MILLIS;
- failureDetector = new PhiAccrualFailureDetector(this.minStandardDeviationMillis);
- }
- }
- /**
- * Restarts heartbeatSender executor.
- */
- private void restartHeartbeatSender() {
- try {
- ScheduledExecutorService prevSender = heartBeatSender;
- heartBeatSender = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/cluster/membership", "heartbeat-sender-%d", log));
- heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
- heartbeatInterval, TimeUnit.MILLISECONDS);
- prevSender.shutdown();
- } catch (Exception e) {
- log.warn(e.getMessage());
- }
- }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/impl/ b/core/store/dist/src/test/java/org/onosproject/store/cluster/impl/
deleted file mode 100644
index e651288..0000000
--- a/core/store/dist/src/test/java/org/onosproject/store/cluster/impl/
+++ /dev/null
@@ -1,128 +0,0 @@
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onlab.packet.IpAddress;
-import org.onosproject.cfg.ComponentConfigAdapter;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterMetadataServiceAdapter;
-import org.onosproject.cluster.ClusterStore;
-import org.onosproject.cluster.ClusterStoreDelegate;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.Version;
-import org.onosproject.core.VersionServiceAdapter;
-import java.util.Set;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.*;
-import static org.junit.Assert.assertFalse;
- * Unit test for DistributedClusterStore.
- */
-public class DistributedClusterStoreTest {
- DistributedClusterStore distributedClusterStore;
- ClusterStore clusterStore;
- NodeId nodeId;
- ControllerNode local;
- private static final NodeId NID1 = new NodeId("foo");
- private static final NodeId NID2 = new NodeId("bar");
- private static final NodeId NID3 = new NodeId("buz");
- private static final IpAddress IP1 = IpAddress.valueOf("");
- private static final IpAddress IP2 = IpAddress.valueOf("");
- private static final IpAddress IP3 = IpAddress.valueOf("");
- private static final int PORT1 = 1;
- private static final int PORT2 = 2;
- private static Set<ControllerNode> nodes;
- private TestDelegate delegate = new TestDelegate();
- private class TestDelegate implements ClusterStoreDelegate {
- private ClusterEvent event;
- @Override
- public void notify(ClusterEvent event) {
- this.event = event;
- }
- }
- @Before
- public void setUp() throws Exception {
- distributedClusterStore = new DistributedClusterStore();
- distributedClusterStore.clusterMetadataService = new ClusterMetadataServiceAdapter() {
- @Override
- public ControllerNode getLocalNode() {
- return new DefaultControllerNode(NID1, IP1);
- }
- };
- distributedClusterStore.messagingService = new NettyMessagingManager();
- distributedClusterStore.cfgService = new ComponentConfigAdapter();
- distributedClusterStore.versionService = new VersionServiceAdapter() {
- @Override
- public Version version() {
- return Version.version("1.1.1");
- }
- };
- distributedClusterStore.activate();
- clusterStore = distributedClusterStore;
- }
- @After
- public void tearDown() throws Exception {
- distributedClusterStore.deactivate();
- }
- @Test
- public void testEmpty() {
- nodeId = new NodeId("newNode");
- assertThat(clusterStore.getNode((nodeId)), is(nullValue()));
- assertFalse(clusterStore.hasDelegate());
- assertThat(clusterStore.getState(nodeId), is(ControllerNode.State.INACTIVE));
- assertThat(clusterStore.getVersion(nodeId), is(nullValue()));
- }
- @Test
- public void addNodes() {
- clusterStore.setDelegate(delegate);
- assertThat(clusterStore.hasDelegate(), is(true));
- clusterStore.addNode(NID1, IP1, PORT1);
- clusterStore.addNode(NID2, IP2, PORT2);
- clusterStore.removeNode(NID1);
- assertThat(clusterStore.getNode(NID1), is(nullValue()));
- clusterStore.addNode(NID3, IP3, PORT2);
- clusterStore.markFullyStarted(true);
- assertThat(clusterStore.getState(clusterStore.getLocalNode().id()),
- is(ControllerNode.State.READY));
- clusterStore.markFullyStarted(false);
- assertThat(clusterStore.getState(clusterStore.getLocalNode().id()),
- is(ControllerNode.State.ACTIVE));
- nodes = clusterStore.getNodes();
- assertThat(nodes.size(), is(2));
- clusterStore.markFullyStarted(true);
- clusterStore.unsetDelegate(delegate);
- assertThat(clusterStore.hasDelegate(), is(false));
- }