Implementation of Hybrid Logical Clock Service.
Change-Id: I52e231433d044f9e6156db7e28bde9fd199118e8
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 710665f..239b43b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -53,6 +53,7 @@
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
+import org.onosproject.core.HybridLogicalClockService;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.cluster.messaging.MessagingService;
@@ -99,6 +100,9 @@
private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected HybridLogicalClockService clockService;
+
private Endpoint localEp;
private int preamble;
private final AtomicBoolean started = new AtomicBoolean(false);
@@ -218,6 +222,7 @@
public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
checkPermission(CLUSTER_WRITE);
InternalMessage message = new InternalMessage(preamble,
+ clockService.timeNow(),
messageIdGenerator.incrementAndGet(),
localEp,
type,
@@ -264,7 +269,12 @@
Callback callback = new Callback(response, executor);
Long messageId = messageIdGenerator.incrementAndGet();
callbacks.put(messageId, callback);
- InternalMessage message = new InternalMessage(preamble, messageId, localEp, type, payload);
+ InternalMessage message = new InternalMessage(preamble,
+ clockService.timeNow(),
+ messageId,
+ localEp,
+ type,
+ payload);
return sendAsync(ep, message).whenComplete((r, e) -> {
if (e != null) {
callbacks.invalidate(messageId);
@@ -502,6 +512,7 @@
log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
sendReply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
}
+ clockService.recordEventTime(message.time());
String type = message.type();
if (REPLY_MESSAGE_TYPE.equals(type)) {
try {
@@ -538,6 +549,7 @@
private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
InternalMessage response = new InternalMessage(preamble,
+ clockService.timeNow(),
message.id(),
localEp,
REPLY_MESSAGE_TYPE,