blob: 763f5deda8f1b1b0bfa73b3e2065427dc6f85818 [file] [log] [blame]
Brian O'Connora468e902015-03-18 16:43:49 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Brian O'Connora468e902015-03-18 16:43:49 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.intentperf;
17
18import com.google.common.collect.ImmutableList;
Brian O'Connora468e902015-03-18 16:43:49 -070019import org.onosproject.cluster.ClusterService;
20import org.onosproject.cluster.ControllerNode;
21import org.onosproject.cluster.NodeId;
22import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
23import org.onosproject.store.cluster.messaging.ClusterMessage;
24import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
25import org.onosproject.store.cluster.messaging.MessageSubject;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070026import org.osgi.service.component.annotations.Activate;
27import org.osgi.service.component.annotations.Component;
28import org.osgi.service.component.annotations.Deactivate;
29import org.osgi.service.component.annotations.Reference;
30import org.osgi.service.component.annotations.ReferenceCardinality;
Brian O'Connora468e902015-03-18 16:43:49 -070031import org.slf4j.Logger;
32
33import java.util.ArrayList;
34import java.util.Arrays;
35import java.util.HashMap;
36import java.util.LinkedList;
37import java.util.List;
38import java.util.Map;
Brian O'Connora468e902015-03-18 16:43:49 -070039
Thomas Vachuskab967ebf2015-03-28 15:19:30 -070040import static org.onlab.util.SharedExecutors.getPoolThreadExecutor;
Brian O'Connora468e902015-03-18 16:43:49 -070041import static org.slf4j.LoggerFactory.getLogger;
42
43/**
44 * Collects and distributes performance samples.
45 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070046@Component(immediate = true, service = IntentPerfCollector.class)
Brian O'Connora468e902015-03-18 16:43:49 -070047public class IntentPerfCollector {
48
49 private static final long SAMPLE_TIME_WINDOW_MS = 5_000;
50 private final Logger log = getLogger(getClass());
51
52 private static final int MAX_SAMPLES = 1_000;
53
54 private final List<Sample> samples = new LinkedList<>();
55
56 private static final MessageSubject SAMPLE = new MessageSubject("intent-perf-sample");
57
Ray Milkeyd84f89b2018-08-17 14:54:17 -070058 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Brian O'Connora468e902015-03-18 16:43:49 -070059 protected ClusterCommunicationService communicationService;
60
Ray Milkeyd84f89b2018-08-17 14:54:17 -070061 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Brian O'Connora468e902015-03-18 16:43:49 -070062 protected ClusterService clusterService;
63
Ray Milkeyd84f89b2018-08-17 14:54:17 -070064 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Brian O'Connora468e902015-03-18 16:43:49 -070065 protected IntentPerfUi ui;
66
67 // Auxiliary structures used to accrue data for normalized time interval
68 // across all nodes.
69 private long newestTime;
70 private Sample overall;
71 private Sample current;
72
73 private ControllerNode[] nodes;
74 private Map<NodeId, Integer> nodeToIndex;
75
76 private NodeId nodeId;
Brian O'Connora468e902015-03-18 16:43:49 -070077
78 @Activate
79 public void activate() {
80 nodeId = clusterService.getLocalNode().id();
81
Brian O'Connora468e902015-03-18 16:43:49 -070082 communicationService.addSubscriber(SAMPLE, new InternalSampleCollector(),
Thomas Vachuskab967ebf2015-03-28 15:19:30 -070083 getPoolThreadExecutor());
Brian O'Connora468e902015-03-18 16:43:49 -070084
85 nodes = clusterService.getNodes().toArray(new ControllerNode[]{});
86 Arrays.sort(nodes, (a, b) -> a.id().toString().compareTo(b.id().toString()));
87
88 nodeToIndex = new HashMap<>();
89 for (int i = 0; i < nodes.length; i++) {
90 nodeToIndex.put(nodes[i].id(), i);
91 }
92
Brian O'Connora468e902015-03-18 16:43:49 -070093 clearSamples();
Thomas Vachuskab967ebf2015-03-28 15:19:30 -070094 ui.setCollector(this);
Brian O'Connora468e902015-03-18 16:43:49 -070095 log.info("Started");
96 }
97
98 @Deactivate
99 public void deactivate() {
Brian O'Connora468e902015-03-18 16:43:49 -0700100 communicationService.removeSubscriber(SAMPLE);
101 log.info("Stopped");
102 }
103
104 /**
105 * Clears all previously accumulated data.
106 */
Satish K57de6cd2015-11-27 12:49:01 +0530107 public synchronized void clearSamples() {
Brian O'Connora468e902015-03-18 16:43:49 -0700108 newestTime = 0;
109 overall = new Sample(0, nodes.length);
110 current = new Sample(0, nodes.length);
111 samples.clear();
112 }
113
114
115 /**
116 * Records a sample point of data about intent operation rate.
117 *
118 * @param overallRate overall rate
119 * @param currentRate current rate
120 */
121 public void recordSample(double overallRate, double currentRate) {
122 long now = System.currentTimeMillis();
123 addSample(now, nodeId, overallRate, currentRate);
124 broadcastSample(now, nodeId, overallRate, currentRate);
125 }
126
127 /**
128 * Returns set of node ids as headers.
129 *
130 * @return node id headers
131 */
132 public List<String> getSampleHeaders() {
133 List<String> headers = new ArrayList<>();
134 for (ControllerNode node : nodes) {
135 headers.add(node.id().toString());
136 }
137 return headers;
138 }
139
140 /**
141 * Returns set of all accumulated samples normalized to the local set of
142 * samples.
143 *
144 * @return accumulated samples
145 */
146 public synchronized List<Sample> getSamples() {
147 return ImmutableList.copyOf(samples);
148 }
149
150 /**
151 * Returns overall throughput performance for each of the cluster nodes.
152 *
153 * @return overall intent throughput
154 */
155 public synchronized Sample getOverall() {
156 return overall;
157 }
158
159 // Records a new sample to our collection of samples
160 private synchronized void addSample(long time, NodeId nodeId,
161 double overallRate, double currentRate) {
162 Sample fullSample = createCurrentSampleIfNeeded(time);
163 setSampleData(current, nodeId, currentRate);
164 setSampleData(overall, nodeId, overallRate);
165 pruneSamplesIfNeeded();
166
167 if (fullSample != null && ui != null) {
168 ui.reportSample(fullSample);
169 }
170 }
171
172 private Sample createCurrentSampleIfNeeded(long time) {
173 Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null;
174 if (oldSample != null) {
175 newestTime = time;
176 current = new Sample(time, nodes.length);
177 if (oldSample.time > 0) {
178 samples.add(oldSample);
179 }
180 }
181 return oldSample;
182 }
183
184 private void setSampleData(Sample sample, NodeId nodeId, double data) {
185 Integer index = nodeToIndex.get(nodeId);
186 if (index != null) {
187 sample.data[index] = data;
188 }
189 }
190
191 private void pruneSamplesIfNeeded() {
192 if (samples.size() > MAX_SAMPLES) {
193 samples.remove(0);
194 }
195 }
196
197 // Performance data sample.
198 static class Sample {
199 final long time;
200 final double[] data;
201
202 public Sample(long time, int nodeCount) {
203 this.time = time;
204 this.data = new double[nodeCount];
205 Arrays.fill(data, -1);
206 }
207
208 public boolean isComplete() {
209 for (int i = 0; i < data.length; i++) {
210 if (data[i] < 0) {
211 return false;
212 }
213 }
214 return true;
215 }
216 }
217
218 private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
219 String data = String.format("%d|%f|%f", time, overallRate, currentRate);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700220 communicationService.broadcast(data, SAMPLE, str -> str.getBytes());
Brian O'Connora468e902015-03-18 16:43:49 -0700221 }
222
223 private class InternalSampleCollector implements ClusterMessageHandler {
224 @Override
225 public void handle(ClusterMessage message) {
226 String[] fields = new String(message.payload()).split("\\|");
227 log.debug("Received sample from {}: {}", message.sender(), fields);
228 addSample(Long.parseLong(fields[0]), message.sender(),
229 Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));
230 }
231 }
232}