blob: 6ea85d0d8168a0c904489495797c67ae94488be3 [file] [log] [blame]
Jordan Halterman29718e62017-07-20 13:24:33 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman29718e62017-07-20 13:24:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.primitiveperf;
17
18import java.util.ArrayList;
19import java.util.Arrays;
20import java.util.HashMap;
21import java.util.LinkedList;
22import java.util.List;
23import java.util.Map;
24
25import com.google.common.collect.ImmutableList;
26import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.ControllerNode;
34import org.onosproject.cluster.NodeId;
35import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
36import org.onosproject.store.cluster.messaging.ClusterMessage;
37import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
38import org.onosproject.store.cluster.messaging.MessageSubject;
39import org.slf4j.Logger;
40
41import static org.onlab.util.SharedExecutors.getPoolThreadExecutor;
42import static org.slf4j.LoggerFactory.getLogger;
43
44/**
45 * Collects and distributes performance samples.
46 */
47@Component(immediate = true)
48@Service(value = PrimitivePerfCollector.class)
49public class PrimitivePerfCollector {
50
51 private static final long SAMPLE_TIME_WINDOW_MS = 5_000;
52 private final Logger log = getLogger(getClass());
53
54 private static final int MAX_SAMPLES = 1_000;
55
56 private final List<Sample> samples = new LinkedList<>();
57
58 private static final MessageSubject SAMPLE = new MessageSubject("primitive-perf-sample");
59
60 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
61 protected ClusterCommunicationService communicationService;
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64 protected ClusterService clusterService;
65
66 // Auxiliary structures used to accrue data for normalized time interval
67 // across all nodes.
68 private long newestTime;
69 private Sample overall;
70 private Sample current;
71
72 private ControllerNode[] nodes;
73 private Map<NodeId, Integer> nodeToIndex;
74
75 private NodeId nodeId;
76
77 @Activate
78 public void activate() {
79 nodeId = clusterService.getLocalNode().id();
80
81 communicationService.addSubscriber(SAMPLE, new InternalSampleCollector(),
82 getPoolThreadExecutor());
83
84 nodes = clusterService.getNodes().toArray(new ControllerNode[]{});
85 Arrays.sort(nodes, (a, b) -> a.id().toString().compareTo(b.id().toString()));
86
87 nodeToIndex = new HashMap<>();
88 for (int i = 0; i < nodes.length; i++) {
89 nodeToIndex.put(nodes[i].id(), i);
90 }
91
92 clearSamples();
93 log.info("Started");
94 }
95
96 @Deactivate
97 public void deactivate() {
98 communicationService.removeSubscriber(SAMPLE);
99 log.info("Stopped");
100 }
101
102 /**
103 * Clears all previously accumulated data.
104 */
105 public synchronized void clearSamples() {
106 newestTime = 0;
107 overall = new Sample(0, nodes.length);
108 current = new Sample(0, nodes.length);
109 samples.clear();
110 }
111
112
113 /**
114 * Records a sample point of data about primitive operation rate.
115 *
116 * @param overallRate overall rate
117 * @param currentRate current rate
118 */
119 public void recordSample(double overallRate, double currentRate) {
120 long now = System.currentTimeMillis();
121 addSample(now, nodeId, overallRate, currentRate);
122 broadcastSample(now, nodeId, overallRate, currentRate);
123 }
124
125 /**
126 * Returns set of node ids as headers.
127 *
128 * @return node id headers
129 */
130 public List<String> getSampleHeaders() {
131 List<String> headers = new ArrayList<>();
132 for (ControllerNode node : nodes) {
133 headers.add(node.id().toString());
134 }
135 return headers;
136 }
137
138 /**
139 * Returns set of all accumulated samples normalized to the local set of
140 * samples.
141 *
142 * @return accumulated samples
143 */
144 public synchronized List<Sample> getSamples() {
145 return ImmutableList.copyOf(samples);
146 }
147
148 /**
149 * Returns overall throughput performance for each of the cluster nodes.
150 *
151 * @return overall primitive throughput
152 */
153 public synchronized Sample getOverall() {
154 return overall;
155 }
156
157 // Records a new sample to our collection of samples
158 private synchronized void addSample(long time, NodeId nodeId,
159 double overallRate, double currentRate) {
160 Sample fullSample = createCurrentSampleIfNeeded(time);
161 setSampleData(current, nodeId, currentRate);
162 setSampleData(overall, nodeId, overallRate);
163 pruneSamplesIfNeeded();
164 }
165
166 private Sample createCurrentSampleIfNeeded(long time) {
167 Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null;
168 if (oldSample != null) {
169 newestTime = time;
170 current = new Sample(time, nodes.length);
171 if (oldSample.time > 0) {
172 samples.add(oldSample);
173 }
174 }
175 return oldSample;
176 }
177
178 private void setSampleData(Sample sample, NodeId nodeId, double data) {
179 Integer index = nodeToIndex.get(nodeId);
180 if (index != null) {
181 sample.data[index] = data;
182 }
183 }
184
185 private void pruneSamplesIfNeeded() {
186 if (samples.size() > MAX_SAMPLES) {
187 samples.remove(0);
188 }
189 }
190
191 // Performance data sample.
192 static class Sample {
193 final long time;
194 final double[] data;
195
196 public Sample(long time, int nodeCount) {
197 this.time = time;
198 this.data = new double[nodeCount];
199 Arrays.fill(data, -1);
200 }
201
202 public boolean isComplete() {
203 for (int i = 0; i < data.length; i++) {
204 if (data[i] < 0) {
205 return false;
206 }
207 }
208 return true;
209 }
210 }
211
212 private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
213 String data = String.format("%d|%f|%f", time, overallRate, currentRate);
214 communicationService.broadcast(data, SAMPLE, str -> str.getBytes());
215 }
216
217 private class InternalSampleCollector implements ClusterMessageHandler {
218 @Override
219 public void handle(ClusterMessage message) {
220 String[] fields = new String(message.payload()).split("\\|");
221 log.debug("Received sample from {}: {}", message.sender(), fields);
222 addSample(Long.parseLong(fields[0]), message.sender(),
223 Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));
224 }
225 }
226}