Setting up monitoring of cluster communcation service
Change-Id: I771b23db6920b26b592abc5d5156e9d77cde4f00
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index df4ac5c..ffde400 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -31,6 +31,7 @@
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.utils.MeteringAgent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +48,7 @@
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
@Component(immediate = true)
@Service
@@ -55,6 +57,18 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
+ private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
+
+ private static final String PRIMITIVE_NAME = "clusterCommunication";
+ private static final String SUBJECT_PREFIX = "subject";
+ private static final String ENDPOINT_PREFIX = "endpoint";
+
+ private static final String SERIALIZING = "serialization";
+ private static final String DESERIALIZING = "deserialization";
+ private static final String NODE_PREFIX = "node:";
+ private static final String ROUND_TRIP_SUFFIX = ".rtt";
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
@@ -110,7 +124,8 @@
byte[] payload = new ClusterMessage(
localNodeId,
subject,
- encoder.apply(message)).getBytes();
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
+ ).getBytes();
return doUnicast(subject, payload, toNodeId);
} catch (Exception e) {
return Tools.exceptionalFuture(e);
@@ -125,7 +140,8 @@
byte[] payload = new ClusterMessage(
localNodeId,
subject,
- encoder.apply(message)).getBytes();
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
+ .getBytes();
nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
}
@@ -139,8 +155,10 @@
ClusterMessage envelope = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
- encoder.apply(message));
- return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
+ apply(message));
+ return sendAndReceive(subject, envelope.getBytes(), toNodeId).
+ thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
} catch (Exception e) {
return Tools.exceptionalFuture(e);
}
@@ -157,7 +175,15 @@
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
+ final MeteringAgent.Context epContext = endpointMeteringAgent.
+ startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
+ final MeteringAgent.Context subjectContext = subjectMeteringAgent.
+ startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
+ return messagingService.sendAndReceive(nodeEp, subject.value(), payload).
+ whenComplete((bytes, throwable) -> {
+ subjectContext.stop(throwable);
+ epContext.stop(throwable);
+ });
}
@Override
@@ -213,6 +239,40 @@
executor);
}
+ /**
+ * Performs the timed function, returning the value it would while timing the operation.
+ *
+ * @param timedFunction the function to be timed
+ * @param meter the metering agent to be used to time the function
+ * @param opName the opname to be used when starting the meter
+ * @param <A> The param type of the function
+ * @param <B> The return type of the function
+ * @return the value returned by the timed function
+ */
+ private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
+ MeteringAgent meter, String opName) {
+ checkNotNull(timedFunction);
+ checkNotNull(meter);
+ checkNotNull(opName);
+ return new Function<A, B>() {
+ @Override
+ public B apply(A a) {
+ final MeteringAgent.Context context = meter.startTimer(opName);
+ B result = null;
+ try {
+ result = timedFunction.apply(a);
+ } catch (Exception e) {
+ context.stop(e);
+ throw new RuntimeException(e);
+ } finally {
+ context.stop(null);
+ return result;
+ }
+ }
+ };
+ }
+
+
private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
private ClusterMessageHandler handler;
@@ -243,7 +303,9 @@
@Override
public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
- return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
+ return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
+ apply(ClusterMessage.fromBytes(bytes).payload())).
+ thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
}
}
@@ -258,7 +320,8 @@
@Override
public void accept(Endpoint sender, byte[] bytes) {
- consumer.accept(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
+ consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
+ apply(ClusterMessage.fromBytes(bytes).payload()));
}
}
}