blob: cbf528e25a1a02b11ef3ff096663a863817b3cef [file] [log] [blame]
Jordan Halterman29718e62017-07-20 13:24:33 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman29718e62017-07-20 13:24:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.primitiveperf;
17
Jordan Halterman29718e62017-07-20 13:24:33 -070018import com.google.common.collect.ImmutableList;
Jordan Halterman29718e62017-07-20 13:24:33 -070019import org.onosproject.cluster.ClusterService;
20import org.onosproject.cluster.ControllerNode;
21import org.onosproject.cluster.NodeId;
22import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
23import org.onosproject.store.cluster.messaging.ClusterMessage;
24import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
25import org.onosproject.store.cluster.messaging.MessageSubject;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070026import org.osgi.service.component.annotations.Activate;
27import org.osgi.service.component.annotations.Component;
28import org.osgi.service.component.annotations.Deactivate;
29import org.osgi.service.component.annotations.Reference;
30import org.osgi.service.component.annotations.ReferenceCardinality;
Jordan Halterman29718e62017-07-20 13:24:33 -070031import org.slf4j.Logger;
32
Ray Milkeyd84f89b2018-08-17 14:54:17 -070033import java.util.ArrayList;
34import java.util.Arrays;
35import java.util.HashMap;
36import java.util.LinkedList;
37import java.util.List;
38import java.util.Map;
39
Jordan Halterman29718e62017-07-20 13:24:33 -070040import static org.onlab.util.SharedExecutors.getPoolThreadExecutor;
41import static org.slf4j.LoggerFactory.getLogger;
42
43/**
44 * Collects and distributes performance samples.
45 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070046@Component(immediate = true, service = PrimitivePerfCollector.class)
Jordan Halterman29718e62017-07-20 13:24:33 -070047public class PrimitivePerfCollector {
48
49 private static final long SAMPLE_TIME_WINDOW_MS = 5_000;
50 private final Logger log = getLogger(getClass());
51
52 private static final int MAX_SAMPLES = 1_000;
53
54 private final List<Sample> samples = new LinkedList<>();
55
56 private static final MessageSubject SAMPLE = new MessageSubject("primitive-perf-sample");
57
Ray Milkeyd84f89b2018-08-17 14:54:17 -070058 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Halterman29718e62017-07-20 13:24:33 -070059 protected ClusterCommunicationService communicationService;
60
Ray Milkeyd84f89b2018-08-17 14:54:17 -070061 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Halterman29718e62017-07-20 13:24:33 -070062 protected ClusterService clusterService;
63
64 // Auxiliary structures used to accrue data for normalized time interval
65 // across all nodes.
66 private long newestTime;
67 private Sample overall;
68 private Sample current;
69
70 private ControllerNode[] nodes;
71 private Map<NodeId, Integer> nodeToIndex;
72
73 private NodeId nodeId;
74
75 @Activate
76 public void activate() {
77 nodeId = clusterService.getLocalNode().id();
78
79 communicationService.addSubscriber(SAMPLE, new InternalSampleCollector(),
80 getPoolThreadExecutor());
81
82 nodes = clusterService.getNodes().toArray(new ControllerNode[]{});
83 Arrays.sort(nodes, (a, b) -> a.id().toString().compareTo(b.id().toString()));
84
85 nodeToIndex = new HashMap<>();
86 for (int i = 0; i < nodes.length; i++) {
87 nodeToIndex.put(nodes[i].id(), i);
88 }
89
90 clearSamples();
91 log.info("Started");
92 }
93
94 @Deactivate
95 public void deactivate() {
96 communicationService.removeSubscriber(SAMPLE);
97 log.info("Stopped");
98 }
99
100 /**
101 * Clears all previously accumulated data.
102 */
103 public synchronized void clearSamples() {
104 newestTime = 0;
105 overall = new Sample(0, nodes.length);
106 current = new Sample(0, nodes.length);
107 samples.clear();
108 }
109
110
111 /**
112 * Records a sample point of data about primitive operation rate.
113 *
114 * @param overallRate overall rate
115 * @param currentRate current rate
116 */
117 public void recordSample(double overallRate, double currentRate) {
118 long now = System.currentTimeMillis();
119 addSample(now, nodeId, overallRate, currentRate);
120 broadcastSample(now, nodeId, overallRate, currentRate);
121 }
122
123 /**
124 * Returns set of node ids as headers.
125 *
126 * @return node id headers
127 */
128 public List<String> getSampleHeaders() {
129 List<String> headers = new ArrayList<>();
130 for (ControllerNode node : nodes) {
131 headers.add(node.id().toString());
132 }
133 return headers;
134 }
135
136 /**
137 * Returns set of all accumulated samples normalized to the local set of
138 * samples.
139 *
140 * @return accumulated samples
141 */
142 public synchronized List<Sample> getSamples() {
143 return ImmutableList.copyOf(samples);
144 }
145
146 /**
147 * Returns overall throughput performance for each of the cluster nodes.
148 *
149 * @return overall primitive throughput
150 */
151 public synchronized Sample getOverall() {
152 return overall;
153 }
154
155 // Records a new sample to our collection of samples
156 private synchronized void addSample(long time, NodeId nodeId,
157 double overallRate, double currentRate) {
158 Sample fullSample = createCurrentSampleIfNeeded(time);
159 setSampleData(current, nodeId, currentRate);
160 setSampleData(overall, nodeId, overallRate);
161 pruneSamplesIfNeeded();
162 }
163
164 private Sample createCurrentSampleIfNeeded(long time) {
165 Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null;
166 if (oldSample != null) {
167 newestTime = time;
168 current = new Sample(time, nodes.length);
169 if (oldSample.time > 0) {
170 samples.add(oldSample);
171 }
172 }
173 return oldSample;
174 }
175
176 private void setSampleData(Sample sample, NodeId nodeId, double data) {
177 Integer index = nodeToIndex.get(nodeId);
178 if (index != null) {
179 sample.data[index] = data;
180 }
181 }
182
183 private void pruneSamplesIfNeeded() {
184 if (samples.size() > MAX_SAMPLES) {
185 samples.remove(0);
186 }
187 }
188
189 // Performance data sample.
190 static class Sample {
191 final long time;
192 final double[] data;
193
194 public Sample(long time, int nodeCount) {
195 this.time = time;
196 this.data = new double[nodeCount];
197 Arrays.fill(data, -1);
198 }
199
200 public boolean isComplete() {
201 for (int i = 0; i < data.length; i++) {
202 if (data[i] < 0) {
203 return false;
204 }
205 }
206 return true;
207 }
208 }
209
210 private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
211 String data = String.format("%d|%f|%f", time, overallRate, currentRate);
212 communicationService.broadcast(data, SAMPLE, str -> str.getBytes());
213 }
214
215 private class InternalSampleCollector implements ClusterMessageHandler {
216 @Override
217 public void handle(ClusterMessage message) {
218 String[] fields = new String(message.payload()).split("\\|");
219 log.debug("Received sample from {}: {}", message.sender(), fields);
220 addSample(Long.parseLong(fields[0]), message.sender(),
221 Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));
222 }
223 }
224}