blob: cae5455d24bf8f447e8de5027e4d5227c9be67a2 [file] [log] [blame]
Brian O'Connora468e902015-03-18 16:43:49 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.intentperf;
17
18import com.google.common.collect.ImmutableList;
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.ControllerNode;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
29import org.onosproject.store.cluster.messaging.ClusterMessage;
30import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
31import org.onosproject.store.cluster.messaging.MessageSubject;
32import org.slf4j.Logger;
33
34import java.util.ArrayList;
35import java.util.Arrays;
36import java.util.HashMap;
37import java.util.LinkedList;
38import java.util.List;
39import java.util.Map;
Brian O'Connora468e902015-03-18 16:43:49 -070040
Thomas Vachuskab967ebf2015-03-28 15:19:30 -070041import static org.onlab.util.SharedExecutors.getPoolThreadExecutor;
Brian O'Connora468e902015-03-18 16:43:49 -070042import static org.slf4j.LoggerFactory.getLogger;
43
44/**
45 * Collects and distributes performance samples.
46 */
47@Component(immediate = true)
48@Service(value = IntentPerfCollector.class)
49public class IntentPerfCollector {
50
51 private static final long SAMPLE_TIME_WINDOW_MS = 5_000;
52 private final Logger log = getLogger(getClass());
53
54 private static final int MAX_SAMPLES = 1_000;
55
56 private final List<Sample> samples = new LinkedList<>();
57
58 private static final MessageSubject SAMPLE = new MessageSubject("intent-perf-sample");
59
60 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
61 protected ClusterCommunicationService communicationService;
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64 protected ClusterService clusterService;
65
Thomas Vachuska95aadff2015-03-26 11:45:41 -070066 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Brian O'Connora468e902015-03-18 16:43:49 -070067 protected IntentPerfUi ui;
68
69 // Auxiliary structures used to accrue data for normalized time interval
70 // across all nodes.
71 private long newestTime;
72 private Sample overall;
73 private Sample current;
74
75 private ControllerNode[] nodes;
76 private Map<NodeId, Integer> nodeToIndex;
77
78 private NodeId nodeId;
Brian O'Connora468e902015-03-18 16:43:49 -070079
80 @Activate
81 public void activate() {
82 nodeId = clusterService.getLocalNode().id();
83
Brian O'Connora468e902015-03-18 16:43:49 -070084 communicationService.addSubscriber(SAMPLE, new InternalSampleCollector(),
Thomas Vachuskab967ebf2015-03-28 15:19:30 -070085 getPoolThreadExecutor());
Brian O'Connora468e902015-03-18 16:43:49 -070086
87 nodes = clusterService.getNodes().toArray(new ControllerNode[]{});
88 Arrays.sort(nodes, (a, b) -> a.id().toString().compareTo(b.id().toString()));
89
90 nodeToIndex = new HashMap<>();
91 for (int i = 0; i < nodes.length; i++) {
92 nodeToIndex.put(nodes[i].id(), i);
93 }
94
Brian O'Connora468e902015-03-18 16:43:49 -070095 clearSamples();
Thomas Vachuskab967ebf2015-03-28 15:19:30 -070096 ui.setCollector(this);
Brian O'Connora468e902015-03-18 16:43:49 -070097 log.info("Started");
98 }
99
100 @Deactivate
101 public void deactivate() {
Brian O'Connora468e902015-03-18 16:43:49 -0700102 communicationService.removeSubscriber(SAMPLE);
103 log.info("Stopped");
104 }
105
106 /**
107 * Clears all previously accumulated data.
108 */
Satish K57de6cd2015-11-27 12:49:01 +0530109 public synchronized void clearSamples() {
Brian O'Connora468e902015-03-18 16:43:49 -0700110 newestTime = 0;
111 overall = new Sample(0, nodes.length);
112 current = new Sample(0, nodes.length);
113 samples.clear();
114 }
115
116
117 /**
118 * Records a sample point of data about intent operation rate.
119 *
120 * @param overallRate overall rate
121 * @param currentRate current rate
122 */
123 public void recordSample(double overallRate, double currentRate) {
124 long now = System.currentTimeMillis();
125 addSample(now, nodeId, overallRate, currentRate);
126 broadcastSample(now, nodeId, overallRate, currentRate);
127 }
128
129 /**
130 * Returns set of node ids as headers.
131 *
132 * @return node id headers
133 */
134 public List<String> getSampleHeaders() {
135 List<String> headers = new ArrayList<>();
136 for (ControllerNode node : nodes) {
137 headers.add(node.id().toString());
138 }
139 return headers;
140 }
141
142 /**
143 * Returns set of all accumulated samples normalized to the local set of
144 * samples.
145 *
146 * @return accumulated samples
147 */
148 public synchronized List<Sample> getSamples() {
149 return ImmutableList.copyOf(samples);
150 }
151
152 /**
153 * Returns overall throughput performance for each of the cluster nodes.
154 *
155 * @return overall intent throughput
156 */
157 public synchronized Sample getOverall() {
158 return overall;
159 }
160
161 // Records a new sample to our collection of samples
162 private synchronized void addSample(long time, NodeId nodeId,
163 double overallRate, double currentRate) {
164 Sample fullSample = createCurrentSampleIfNeeded(time);
165 setSampleData(current, nodeId, currentRate);
166 setSampleData(overall, nodeId, overallRate);
167 pruneSamplesIfNeeded();
168
169 if (fullSample != null && ui != null) {
170 ui.reportSample(fullSample);
171 }
172 }
173
174 private Sample createCurrentSampleIfNeeded(long time) {
175 Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null;
176 if (oldSample != null) {
177 newestTime = time;
178 current = new Sample(time, nodes.length);
179 if (oldSample.time > 0) {
180 samples.add(oldSample);
181 }
182 }
183 return oldSample;
184 }
185
186 private void setSampleData(Sample sample, NodeId nodeId, double data) {
187 Integer index = nodeToIndex.get(nodeId);
188 if (index != null) {
189 sample.data[index] = data;
190 }
191 }
192
193 private void pruneSamplesIfNeeded() {
194 if (samples.size() > MAX_SAMPLES) {
195 samples.remove(0);
196 }
197 }
198
199 // Performance data sample.
200 static class Sample {
201 final long time;
202 final double[] data;
203
204 public Sample(long time, int nodeCount) {
205 this.time = time;
206 this.data = new double[nodeCount];
207 Arrays.fill(data, -1);
208 }
209
210 public boolean isComplete() {
211 for (int i = 0; i < data.length; i++) {
212 if (data[i] < 0) {
213 return false;
214 }
215 }
216 return true;
217 }
218 }
219
220 private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
221 String data = String.format("%d|%f|%f", time, overallRate, currentRate);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700222 communicationService.broadcast(data, SAMPLE, str -> str.getBytes());
Brian O'Connora468e902015-03-18 16:43:49 -0700223 }
224
225 private class InternalSampleCollector implements ClusterMessageHandler {
226 @Override
227 public void handle(ClusterMessage message) {
228 String[] fields = new String(message.payload()).split("\\|");
229 log.debug("Received sample from {}: {}", message.sender(), fields);
230 addSample(Long.parseLong(fields[0]), message.sender(),
231 Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));
232 }
233 }
234}