blob: 90eb9d78214b68402d342221348cd8d0f2b373ff [file] [log] [blame]
Brian O'Connora468e902015-03-18 16:43:49 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.intentperf;
17
18import com.google.common.collect.ImmutableList;
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.ControllerNode;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
29import org.onosproject.store.cluster.messaging.ClusterMessage;
30import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
31import org.onosproject.store.cluster.messaging.MessageSubject;
32import org.slf4j.Logger;
33
34import java.util.ArrayList;
35import java.util.Arrays;
36import java.util.HashMap;
37import java.util.LinkedList;
38import java.util.List;
39import java.util.Map;
40import java.util.concurrent.ExecutorService;
41import java.util.concurrent.Executors;
42
43import static org.onlab.util.Tools.groupedThreads;
44import static org.slf4j.LoggerFactory.getLogger;
45
46/**
47 * Collects and distributes performance samples.
48 */
49@Component(immediate = true)
50@Service(value = IntentPerfCollector.class)
51public class IntentPerfCollector {
52
53 private static final long SAMPLE_TIME_WINDOW_MS = 5_000;
54 private final Logger log = getLogger(getClass());
55
56 private static final int MAX_SAMPLES = 1_000;
57
58 private final List<Sample> samples = new LinkedList<>();
59
60 private static final MessageSubject SAMPLE = new MessageSubject("intent-perf-sample");
61
62 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 protected ClusterCommunicationService communicationService;
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected ClusterService clusterService;
67
68 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
69 protected IntentPerfUi ui;
70
71 // Auxiliary structures used to accrue data for normalized time interval
72 // across all nodes.
73 private long newestTime;
74 private Sample overall;
75 private Sample current;
76
77 private ControllerNode[] nodes;
78 private Map<NodeId, Integer> nodeToIndex;
79
80 private NodeId nodeId;
81 private ExecutorService messageHandlingExecutor;
82
83 @Activate
84 public void activate() {
85 nodeId = clusterService.getLocalNode().id();
86
87 // TODO: replace with shared executor
88 messageHandlingExecutor = Executors.newSingleThreadExecutor(
89 groupedThreads("onos/perf", "message-handler"));
90
91 communicationService.addSubscriber(SAMPLE, new InternalSampleCollector(),
92 messageHandlingExecutor);
93
94 nodes = clusterService.getNodes().toArray(new ControllerNode[]{});
95 Arrays.sort(nodes, (a, b) -> a.id().toString().compareTo(b.id().toString()));
96
97 nodeToIndex = new HashMap<>();
98 for (int i = 0; i < nodes.length; i++) {
99 nodeToIndex.put(nodes[i].id(), i);
100 }
101
102 clearSamples();
103 log.info("Started");
104 }
105
106 @Deactivate
107 public void deactivate() {
108 messageHandlingExecutor.shutdown();
109 communicationService.removeSubscriber(SAMPLE);
110 log.info("Stopped");
111 }
112
113 /**
114 * Clears all previously accumulated data.
115 */
116 public void clearSamples() {
117 newestTime = 0;
118 overall = new Sample(0, nodes.length);
119 current = new Sample(0, nodes.length);
120 samples.clear();
121 }
122
123
124 /**
125 * Records a sample point of data about intent operation rate.
126 *
127 * @param overallRate overall rate
128 * @param currentRate current rate
129 */
130 public void recordSample(double overallRate, double currentRate) {
131 long now = System.currentTimeMillis();
132 addSample(now, nodeId, overallRate, currentRate);
133 broadcastSample(now, nodeId, overallRate, currentRate);
134 }
135
136 /**
137 * Returns set of node ids as headers.
138 *
139 * @return node id headers
140 */
141 public List<String> getSampleHeaders() {
142 List<String> headers = new ArrayList<>();
143 for (ControllerNode node : nodes) {
144 headers.add(node.id().toString());
145 }
146 return headers;
147 }
148
149 /**
150 * Returns set of all accumulated samples normalized to the local set of
151 * samples.
152 *
153 * @return accumulated samples
154 */
155 public synchronized List<Sample> getSamples() {
156 return ImmutableList.copyOf(samples);
157 }
158
159 /**
160 * Returns overall throughput performance for each of the cluster nodes.
161 *
162 * @return overall intent throughput
163 */
164 public synchronized Sample getOverall() {
165 return overall;
166 }
167
168 // Records a new sample to our collection of samples
169 private synchronized void addSample(long time, NodeId nodeId,
170 double overallRate, double currentRate) {
171 Sample fullSample = createCurrentSampleIfNeeded(time);
172 setSampleData(current, nodeId, currentRate);
173 setSampleData(overall, nodeId, overallRate);
174 pruneSamplesIfNeeded();
175
176 if (fullSample != null && ui != null) {
177 ui.reportSample(fullSample);
178 }
179 }
180
181 private Sample createCurrentSampleIfNeeded(long time) {
182 Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null;
183 if (oldSample != null) {
184 newestTime = time;
185 current = new Sample(time, nodes.length);
186 if (oldSample.time > 0) {
187 samples.add(oldSample);
188 }
189 }
190 return oldSample;
191 }
192
193 private void setSampleData(Sample sample, NodeId nodeId, double data) {
194 Integer index = nodeToIndex.get(nodeId);
195 if (index != null) {
196 sample.data[index] = data;
197 }
198 }
199
200 private void pruneSamplesIfNeeded() {
201 if (samples.size() > MAX_SAMPLES) {
202 samples.remove(0);
203 }
204 }
205
206 // Performance data sample.
207 static class Sample {
208 final long time;
209 final double[] data;
210
211 public Sample(long time, int nodeCount) {
212 this.time = time;
213 this.data = new double[nodeCount];
214 Arrays.fill(data, -1);
215 }
216
217 public boolean isComplete() {
218 for (int i = 0; i < data.length; i++) {
219 if (data[i] < 0) {
220 return false;
221 }
222 }
223 return true;
224 }
225 }
226
227 private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
228 String data = String.format("%d|%f|%f", time, overallRate, currentRate);
229 communicationService.broadcast(new ClusterMessage(nodeId, SAMPLE, data.getBytes()));
230 }
231
232 private class InternalSampleCollector implements ClusterMessageHandler {
233 @Override
234 public void handle(ClusterMessage message) {
235 String[] fields = new String(message.payload()).split("\\|");
236 log.debug("Received sample from {}: {}", message.sender(), fields);
237 addSample(Long.parseLong(fields[0]), message.sender(),
238 Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));
239 }
240 }
241}