blob: 8683f3fde98388b9c5e9c0b11d57ed06902cce0c [file] [log] [blame]
Brian O'Connora468e902015-03-18 16:43:49 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.intentperf;
17
18import com.google.common.collect.ImmutableList;
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.ControllerNode;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
29import org.onosproject.store.cluster.messaging.ClusterMessage;
30import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
31import org.onosproject.store.cluster.messaging.MessageSubject;
32import org.slf4j.Logger;
33
34import java.util.ArrayList;
35import java.util.Arrays;
36import java.util.HashMap;
37import java.util.LinkedList;
38import java.util.List;
39import java.util.Map;
40import java.util.concurrent.ExecutorService;
41import java.util.concurrent.Executors;
42
43import static org.onlab.util.Tools.groupedThreads;
44import static org.slf4j.LoggerFactory.getLogger;
45
46/**
47 * Collects and distributes performance samples.
48 */
49@Component(immediate = true)
50@Service(value = IntentPerfCollector.class)
51public class IntentPerfCollector {
52
53 private static final long SAMPLE_TIME_WINDOW_MS = 5_000;
54 private final Logger log = getLogger(getClass());
55
56 private static final int MAX_SAMPLES = 1_000;
57
58 private final List<Sample> samples = new LinkedList<>();
59
60 private static final MessageSubject SAMPLE = new MessageSubject("intent-perf-sample");
61
62 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 protected ClusterCommunicationService communicationService;
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected ClusterService clusterService;
67
Thomas Vachuska95aadff2015-03-26 11:45:41 -070068 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Brian O'Connora468e902015-03-18 16:43:49 -070069 protected IntentPerfUi ui;
70
71 // Auxiliary structures used to accrue data for normalized time interval
72 // across all nodes.
73 private long newestTime;
74 private Sample overall;
75 private Sample current;
76
77 private ControllerNode[] nodes;
78 private Map<NodeId, Integer> nodeToIndex;
79
80 private NodeId nodeId;
81 private ExecutorService messageHandlingExecutor;
82
83 @Activate
84 public void activate() {
85 nodeId = clusterService.getLocalNode().id();
86
87 // TODO: replace with shared executor
88 messageHandlingExecutor = Executors.newSingleThreadExecutor(
89 groupedThreads("onos/perf", "message-handler"));
90
91 communicationService.addSubscriber(SAMPLE, new InternalSampleCollector(),
92 messageHandlingExecutor);
93
94 nodes = clusterService.getNodes().toArray(new ControllerNode[]{});
95 Arrays.sort(nodes, (a, b) -> a.id().toString().compareTo(b.id().toString()));
96
97 nodeToIndex = new HashMap<>();
98 for (int i = 0; i < nodes.length; i++) {
99 nodeToIndex.put(nodes[i].id(), i);
100 }
101
Thomas Vachuska95aadff2015-03-26 11:45:41 -0700102 ui.setHeaders(getSampleHeaders());
Brian O'Connora468e902015-03-18 16:43:49 -0700103 clearSamples();
104 log.info("Started");
105 }
106
107 @Deactivate
108 public void deactivate() {
109 messageHandlingExecutor.shutdown();
110 communicationService.removeSubscriber(SAMPLE);
111 log.info("Stopped");
112 }
113
114 /**
115 * Clears all previously accumulated data.
116 */
117 public void clearSamples() {
118 newestTime = 0;
119 overall = new Sample(0, nodes.length);
120 current = new Sample(0, nodes.length);
121 samples.clear();
122 }
123
124
125 /**
126 * Records a sample point of data about intent operation rate.
127 *
128 * @param overallRate overall rate
129 * @param currentRate current rate
130 */
131 public void recordSample(double overallRate, double currentRate) {
132 long now = System.currentTimeMillis();
133 addSample(now, nodeId, overallRate, currentRate);
134 broadcastSample(now, nodeId, overallRate, currentRate);
135 }
136
137 /**
138 * Returns set of node ids as headers.
139 *
140 * @return node id headers
141 */
142 public List<String> getSampleHeaders() {
143 List<String> headers = new ArrayList<>();
144 for (ControllerNode node : nodes) {
145 headers.add(node.id().toString());
146 }
147 return headers;
148 }
149
150 /**
151 * Returns set of all accumulated samples normalized to the local set of
152 * samples.
153 *
154 * @return accumulated samples
155 */
156 public synchronized List<Sample> getSamples() {
157 return ImmutableList.copyOf(samples);
158 }
159
160 /**
161 * Returns overall throughput performance for each of the cluster nodes.
162 *
163 * @return overall intent throughput
164 */
165 public synchronized Sample getOverall() {
166 return overall;
167 }
168
169 // Records a new sample to our collection of samples
170 private synchronized void addSample(long time, NodeId nodeId,
171 double overallRate, double currentRate) {
172 Sample fullSample = createCurrentSampleIfNeeded(time);
173 setSampleData(current, nodeId, currentRate);
174 setSampleData(overall, nodeId, overallRate);
175 pruneSamplesIfNeeded();
176
177 if (fullSample != null && ui != null) {
178 ui.reportSample(fullSample);
179 }
180 }
181
182 private Sample createCurrentSampleIfNeeded(long time) {
183 Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null;
184 if (oldSample != null) {
185 newestTime = time;
186 current = new Sample(time, nodes.length);
187 if (oldSample.time > 0) {
188 samples.add(oldSample);
189 }
190 }
191 return oldSample;
192 }
193
194 private void setSampleData(Sample sample, NodeId nodeId, double data) {
195 Integer index = nodeToIndex.get(nodeId);
196 if (index != null) {
197 sample.data[index] = data;
198 }
199 }
200
201 private void pruneSamplesIfNeeded() {
202 if (samples.size() > MAX_SAMPLES) {
203 samples.remove(0);
204 }
205 }
206
207 // Performance data sample.
208 static class Sample {
209 final long time;
210 final double[] data;
211
212 public Sample(long time, int nodeCount) {
213 this.time = time;
214 this.data = new double[nodeCount];
215 Arrays.fill(data, -1);
216 }
217
218 public boolean isComplete() {
219 for (int i = 0; i < data.length; i++) {
220 if (data[i] < 0) {
221 return false;
222 }
223 }
224 return true;
225 }
226 }
227
228 private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
229 String data = String.format("%d|%f|%f", time, overallRate, currentRate);
230 communicationService.broadcast(new ClusterMessage(nodeId, SAMPLE, data.getBytes()));
231 }
232
233 private class InternalSampleCollector implements ClusterMessageHandler {
234 @Override
235 public void handle(ClusterMessage message) {
236 String[] fields = new String(message.payload()).split("\\|");
237 log.debug("Received sample from {}: {}", message.sender(), fields);
238 addSample(Long.parseLong(fields[0]), message.sender(),
239 Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));
240 }
241 }
242}