Added creationTime to Versioned object. This enables supporting a electedTime in leadership, which in turn helps us track how stable leadership terms are.
Change-Id: Ib051027625324646152ed85535ba337e95f8a061
diff --git a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
index c0e1a3c..c4605fb 100644
--- a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
@@ -19,6 +19,7 @@
import java.util.Map;
import org.apache.karaf.shell.commands.Command;
+import org.onlab.util.Tools;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipService;
@@ -30,15 +31,16 @@
description = "Finds the leader for particular topic.")
public class LeaderCommand extends AbstractShellCommand {
- private static final String FMT = "%-20s | %-15s | %-6s |";
+ private static final String FMT = "%-20s | %-15s | %-6s | %-10s |";
@Override
protected void execute() {
LeadershipService leaderService = get(LeadershipService.class);
Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard();
- print("-------------------------------------------------");
- print(FMT, "Topic", "Leader", "Epoch");
- print("-------------------------------------------------");
+ print("--------------------------------------------------------------");
+ print(FMT, "Topic", "Leader", "Epoch", "Elected");
+ print("--------------------------------------------------------------");
+
Comparator<Leadership> leadershipComparator =
(e1, e2) -> {
@@ -57,8 +59,11 @@
leaderBoard.values()
.stream()
.sorted(leadershipComparator)
- .forEach(l -> print(FMT, l.topic(), l.leader(), l.epoch()));
- print("-------------------------------------------------");
+ .forEach(l -> print(FMT,
+ l.topic(),
+ l.leader(),
+ l.epoch(),
+ Tools.timeAgo(l.electedTime())));
+ print("--------------------------------------------------------------");
}
-
-}
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/cluster/Leadership.java b/core/api/src/main/java/org/onosproject/cluster/Leadership.java
index 86c951e..5f3a9ef 100644
--- a/core/api/src/main/java/org/onosproject/cluster/Leadership.java
+++ b/core/api/src/main/java/org/onosproject/cluster/Leadership.java
@@ -17,6 +17,8 @@
import java.util.Objects;
+import org.joda.time.DateTime;
+
import com.google.common.base.MoreObjects;
/**
@@ -27,11 +29,13 @@
private final String topic;
private final NodeId leader;
private final long epoch;
+ private final long electedTime;
- public Leadership(String topic, NodeId leader, long epoch) {
+ public Leadership(String topic, NodeId leader, long epoch, long electedTime) {
this.topic = topic;
this.leader = leader;
this.epoch = epoch;
+ this.electedTime = electedTime;
}
/**
@@ -52,12 +56,31 @@
/**
* The epoch when the leadership was assumed.
+ * <p>
+ * Comparing epochs is only appropriate for leadership
+ * events for the same topic. The system guarantees that
+ * for any given topic the epoch for a new term is higher
+ * (not necessarily by 1) than the epoch for any previous term.
* @return leadership epoch
*/
public long epoch() {
return epoch;
}
+ /**
+ * The system time when the term started.
+ * <p>
+ * The elected time is initially set on the node coordinating
+ * the leader election using its local system time. Due to possible
+ * clock skew, relying on this value for determining event ordering
+ * is discouraged. Epoch is more appropriate for determining
+ * event ordering.
+ * @return elected time.
+ */
+ public long electedTime() {
+ return electedTime;
+ }
+
@Override
public int hashCode() {
return Objects.hash(topic, leader, epoch);
@@ -72,7 +95,8 @@
final Leadership other = (Leadership) obj;
return Objects.equals(this.topic, other.topic) &&
Objects.equals(this.leader, other.leader) &&
- Objects.equals(this.epoch, other.epoch);
+ Objects.equals(this.epoch, other.epoch) &&
+ Objects.equals(this.electedTime, other.electedTime);
}
return false;
}
@@ -83,6 +107,7 @@
.add("topic", topic)
.add("leader", leader)
.add("epoch", epoch)
+ .add("electedTime", new DateTime(electedTime))
.toString();
}
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/Versioned.java b/core/api/src/main/java/org/onosproject/store/service/Versioned.java
index ee30689..ca9e2ca 100644
--- a/core/api/src/main/java/org/onosproject/store/service/Versioned.java
+++ b/core/api/src/main/java/org/onosproject/store/service/Versioned.java
@@ -16,6 +16,8 @@
package org.onosproject.store.service;
+import org.joda.time.DateTime;
+
import com.google.common.base.MoreObjects;
/**
@@ -27,6 +29,20 @@
private final V value;
private final long version;
+ private final long creationTime;
+
+ /**
+ * Constructs a new versioned value.
+ * @param value value
+ * @param version version
+ * @param creationTime milliseconds of the creation event
+ * from the Java epoch of 1970-01-01T00:00:00Z
+ */
+ public Versioned(V value, long version, long creationTime) {
+ this.value = value;
+ this.version = version;
+ this.creationTime = System.currentTimeMillis();
+ }
/**
* Constructs a new versioned value.
@@ -34,8 +50,7 @@
* @param version version
*/
public Versioned(V value, long version) {
- this.value = value;
- this.version = version;
+ this(value, version, System.currentTimeMillis());
}
/**
@@ -56,11 +71,26 @@
return version;
}
+ /**
+ * Returns the system time when this version was created.
+ * <p>
+ * Care should be taken when relying on creationTime to
+ * implement any behavior in a distributed setting. Due
+ * to the possibility of clock skew it is likely that
+ * even creationTimes of causally related versions can be
+ * out or order.
+ * @return creation time
+ */
+ public long creationTime() {
+ return creationTime;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("value", value)
.add("version", version)
+ .add("creationTime", new DateTime(creationTime))
.toString();
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index b48ce21..d4b46ab 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -170,7 +170,8 @@
if (topic != null) {
return new Leadership(topic.topicName(),
topic.leader(),
- topic.term());
+ topic.term(),
+ 0);
}
return null;
}
@@ -215,7 +216,8 @@
for (Topic topic : topics.values()) {
Leadership leadership = new Leadership(topic.topicName(),
topic.leader(),
- topic.term());
+ topic.term(),
+ 0);
result.put(topic.topicName(), leadership);
}
return result;
@@ -412,7 +414,7 @@
//
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_REELECTED,
- new Leadership(topicName, localNodeId, myLastLeaderTerm));
+ new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
// Dispatch to all instances
clusterCommunicator.broadcastIncludeSelf(
@@ -431,7 +433,7 @@
topicName, leader);
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(topicName, leader, myLastLeaderTerm));
+ new Leadership(topicName, leader, myLastLeaderTerm, 0));
// Dispatch only to the local listener(s)
eventDispatcher.post(leadershipEvent);
leader = null;
@@ -486,8 +488,8 @@
leader = localNodeId;
leadershipEvent = new LeadershipEvent(
- LeadershipEvent.Type.LEADER_ELECTED,
- new Leadership(topicName, localNodeId, myLastLeaderTerm));
+ LeadershipEvent.Type.LEADER_ELECTED,
+ new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
clusterCommunicator.broadcastIncludeSelf(
new ClusterMessage(
clusterService.getLocalNode().id(),
@@ -514,8 +516,8 @@
leader = null;
}
leadershipEvent = new LeadershipEvent(
- LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(topicName, localNodeId, myLastLeaderTerm));
+ LeadershipEvent.Type.LEADER_BOOTED,
+ new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
clusterCommunicator.broadcastIncludeSelf(
new ClusterMessage(
clusterService.getLocalNode().id(),
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
index 83eea18..0ceb566 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
@@ -105,7 +105,13 @@
public Versioned<V> get(K key) {
checkNotNull(key, ERROR_NULL_KEY);
Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
- return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
+ if (value == null) {
+ return null;
+ }
+ return new Versioned<>(
+ serializer.decode(value.value()),
+ value.version(),
+ value.creationTime());
}
@Override
@@ -114,16 +120,26 @@
checkNotNull(value, ERROR_NULL_VALUE);
Versioned<byte[]> previousValue =
complete(proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value)));
- return (previousValue != null) ?
- new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
-
+ if (previousValue == null) {
+ return null;
+ }
+ return new Versioned<>(
+ serializer.decode(previousValue.value()),
+ previousValue.version(),
+ previousValue.creationTime());
}
@Override
public Versioned<V> remove(K key) {
checkNotNull(key, ERROR_NULL_KEY);
Versioned<byte[]> value = complete(proxy.remove(name, keyCache.getUnchecked(key)));
- return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
+ if (value == null) {
+ return null;
+ }
+ return new Versioned<>(
+ serializer.decode(value.value()),
+ value.version(),
+ value.creationTime());
}
@Override
@@ -143,7 +159,7 @@
public Collection<Versioned<V>> values() {
return Collections.unmodifiableList(complete(proxy.values(name))
.stream()
- .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version()))
+ .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version(), v.creationTime()))
.collect(Collectors.toList()));
}
@@ -161,8 +177,13 @@
checkNotNull(value, ERROR_NULL_VALUE);
Versioned<byte[]> existingValue = complete(proxy.putIfAbsent(
name, keyCache.getUnchecked(key), serializer.encode(value)));
- return (existingValue != null) ?
- new Versioned<>(serializer.decode(existingValue.value()), existingValue.version()) : null;
+ if (existingValue == null) {
+ return null;
+ }
+ return new Versioned<>(
+ serializer.decode(existingValue.value()),
+ existingValue.version(),
+ existingValue.creationTime());
}
@Override
@@ -212,6 +233,7 @@
dK(e.getKey()),
new Versioned<>(
serializer.decode(e.getValue().value()),
- e.getValue().version()));
+ e.getValue().version(),
+ e.getValue().creationTime()));
}
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 0d38dd3..5bbb4cb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -227,7 +227,7 @@
if (currentLeader != null) {
if (localNodeId.equals(currentLeader.value())) {
log.info("Already has leadership for {}", path);
- notifyNewLeader(path, localNodeId, currentLeader.version());
+ notifyNewLeader(path, localNodeId, currentLeader.version(), currentLeader.creationTime());
} else {
// someone else has leadership. will retry after sometime.
retry(path);
@@ -237,7 +237,7 @@
log.info("Assumed leadership for {}", path);
// do a get again to get the version (epoch)
Versioned<NodeId> newLeader = lockMap.get(path);
- notifyNewLeader(path, localNodeId, newLeader.version());
+ notifyNewLeader(path, localNodeId, newLeader.version(), newLeader.creationTime());
} else {
// someone beat us to it.
retry(path);
@@ -249,8 +249,8 @@
}
}
- private void notifyNewLeader(String path, NodeId leader, long epoch) {
- Leadership newLeadership = new Leadership(path, leader, epoch);
+ private void notifyNewLeader(String path, NodeId leader, long epoch, long electedTime) {
+ Leadership newLeadership = new Leadership(path, leader, epoch, electedTime);
boolean updatedLeader = false;
synchronized (leaderBoard) {
Leadership currentLeader = leaderBoard.get(path);
@@ -271,8 +271,8 @@
}
}
- private void notifyRemovedLeader(String path, NodeId leader, long epoch) {
- Leadership oldLeadership = new Leadership(path, leader, epoch);
+ private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
+ Leadership oldLeadership = new Leadership(path, leader, epoch, electedTime);
boolean updatedLeader = false;
synchronized (leaderBoard) {
Leadership currentLeader = leaderBoard.get(path);
@@ -346,12 +346,13 @@
String path = entry.getKey();
NodeId nodeId = entry.getValue().value();
long epoch = entry.getValue().version();
+ long creationTime = entry.getValue().creationTime();
if (clusterService.getState(nodeId) == ControllerNode.State.INACTIVE) {
log.info("Lock for {} is held by {} which is currently inactive", path, nodeId);
try {
if (lockMap.remove(path, epoch)) {
log.info("Purged stale lock held by {} for {}", nodeId, path);
- notifyRemovedLeader(path, nodeId, epoch);
+ notifyRemovedLeader(path, nodeId, epoch, creationTime);
}
} catch (Exception e) {
log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
@@ -362,7 +363,7 @@
try {
if (lockMap.remove(path, epoch)) {
log.info("Purged stale lock held by {} for {}", nodeId, path);
- notifyRemovedLeader(path, nodeId, epoch);
+ notifyRemovedLeader(path, nodeId, epoch, creationTime);
}
} catch (Exception e) {
log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
index d61772e..35654f2 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
@@ -61,7 +61,7 @@
@Override
public Leadership getLeadership(String path) {
checkArgument(path != null);
- return elections.get(path) ? new Leadership(path, clusterService.getLocalNode().id(), 0) : null;
+ return elections.get(path) ? new Leadership(path, clusterService.getLocalNode().id(), 0, 0) : null;
}
@Override
@@ -79,7 +79,7 @@
elections.put(path, true);
for (LeadershipEventListener listener : listeners) {
listener.event(new LeadershipEvent(Type.LEADER_ELECTED,
- new Leadership(path, clusterService.getLocalNode().id(), 0)));
+ new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
}
}
@@ -88,7 +88,7 @@
elections.remove(path);
for (LeadershipEventListener listener : listeners) {
listener.event(new LeadershipEvent(Type.LEADER_BOOTED,
- new Leadership(path, clusterService.getLocalNode().id(), 0)));
+ new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
}
}
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 8b045f1..533563b 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -18,6 +18,7 @@
import com.google.common.base.Strings;
import com.google.common.primitives.UnsignedLongs;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.slf4j.Logger;
import java.io.BufferedReader;
@@ -239,6 +240,29 @@
}
}
+ /**
+ * Returns a human friendly time ago string for a specified system time.
+ * @param unixTime system time in millis
+ * @return human friendly time ago
+ */
+ public static String timeAgo(long unixTime) {
+ long deltaMillis = System.currentTimeMillis() - unixTime;
+ long secondsSince = (long) (deltaMillis / 1000.0);
+ long minsSince = (long) (deltaMillis / (1000.0 * 60));
+ long hoursSince = (long) (deltaMillis / (1000.0 * 60 * 60));
+ long daysSince = (long) (deltaMillis / (1000.0 * 60 * 60 * 24));
+ if (daysSince > 0) {
+ return String.format("%dd ago", daysSince);
+ } else if (hoursSince > 0) {
+ return String.format("%dh ago", hoursSince);
+ } else if (minsSince > 0) {
+ return String.format("%dm ago", minsSince);
+ } else if (secondsSince > 0) {
+ return String.format("%ds ago", secondsSince);
+ } else {
+ return "just now";
+ }
+ }
/**
* Copies the specified directory path. Use with great caution since