blob: 333fa4ec31d7abdbe130cfcb53a8339bab84e518 [file] [log] [blame]
Thomas Vachuska7648d662015-03-16 11:58:30 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.intentperf;
17
18import com.google.common.collect.ImmutableList;
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.ControllerNode;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
29import org.onosproject.store.cluster.messaging.ClusterMessage;
30import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
31import org.onosproject.store.cluster.messaging.MessageSubject;
32import org.slf4j.Logger;
33
34import java.util.ArrayList;
35import java.util.Arrays;
36import java.util.HashMap;
37import java.util.LinkedList;
38import java.util.List;
39import java.util.Map;
40import java.util.concurrent.ExecutorService;
41import java.util.concurrent.Executors;
42
43import static org.onlab.util.Tools.groupedThreads;
44import static org.slf4j.LoggerFactory.getLogger;
45
46/**
47 * Collects and distributes performance samples.
48 */
49@Component(immediate = true)
50@Service(value = IntentPerfCollector.class)
51public class IntentPerfCollector {
52
53 private static final long SAMPLE_WINDOW = 5_000;
54 private final Logger log = getLogger(getClass());
55
56 private static final int MAX_SAMPLES = 1_000;
57
58 private final List<Sample> samples = new LinkedList<>();
59
60 private static final MessageSubject SAMPLE = new MessageSubject("intent-perf-sample");
61
62 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 protected ClusterCommunicationService communicationService;
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected ClusterService clusterService;
67
68 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
69 protected IntentPerfUi ui;
70
71 // Auxiliary structures used to accrue data for normalized time interval
72 // across all nodes.
73 private long newestTime;
74 private Sample overall;
75 private Sample current;
76
77 private ControllerNode[] nodes;
78 private Map<NodeId, Integer> nodeToIndex;
79
80 private NodeId nodeId;
81 private ExecutorService messageHandlingExecutor;
82
83 @Activate
84 public void activate() {
85 this.nodeId = clusterService.getLocalNode().id();
86 this.newestTime = 0;
87
88 messageHandlingExecutor = Executors.newSingleThreadExecutor(
89 groupedThreads("onos/perf", "message-handler"));
90
91 communicationService.addSubscriber(SAMPLE, new InternalSampleCollector(),
92 messageHandlingExecutor);
93
94 nodes = clusterService.getNodes().toArray(new ControllerNode[]{});
95 Arrays.sort(nodes, (a, b) -> a.id().toString().compareTo(b.id().toString()));
96
97 nodeToIndex = new HashMap<>();
98 for (int i = 0; i < nodes.length; i++) {
99 nodeToIndex.put(nodes[i].id(), i);
100 }
101 overall = new Sample(0, nodes.length);
102 current = new Sample(0, nodes.length);
103
104 log.info("Started");
105 }
106
107 @Deactivate
108 public void deactivate() {
109 messageHandlingExecutor.shutdown();
110 communicationService.removeSubscriber(SAMPLE);
111 log.info("Stopped");
112 }
113
114 /**
115 * Records a sample point of data about intent operation rate.
116 *
117 * @param overallRate overall rate
118 * @param currentRate current rate
119 */
120 public void recordSample(double overallRate, double currentRate) {
121 try {
122 long now = System.currentTimeMillis();
123 addSample(now, nodeId, overallRate, currentRate);
124 broadcastSample(now, nodeId, overallRate, currentRate);
125 } catch (Exception e) {
126 log.error("Boom!", e);
127 }
128 }
129
130 /**
131 * Returns set of node ids as headers.
132 *
133 * @return node id headers
134 */
135 public List<String> getSampleHeaders() {
136 List<String> headers = new ArrayList<>();
137 for (ControllerNode node : nodes) {
138 headers.add(node.id().toString());
139 }
140 return headers;
141 }
142
143 /**
144 * Returns set of all accumulated samples normalized to the local set of
145 * samples.
146 *
147 * @return accumulated samples
148 */
149 public synchronized List<Sample> getSamples() {
150 return ImmutableList.copyOf(samples);
151 }
152
153 /**
154 * Returns overall throughput performance for each of the cluster nodes.
155 *
156 * @return overall intent throughput
157 */
158 public synchronized Sample getOverall() {
159 return overall;
160 }
161
162 // Records a new sample to our collection of samples
163 private synchronized void addSample(long time, NodeId nodeId,
164 double overallRate, double currentRate) {
165 Sample fullSample = createCurrentSampleIfNeeded(time);
166 setSampleData(current, nodeId, currentRate);
167 setSampleData(overall, nodeId, overallRate);
168 pruneSamplesIfNeeded();
169
170 if (fullSample != null && ui != null) {
171 ui.reportSample(fullSample);
172 }
173 }
174
175 private Sample createCurrentSampleIfNeeded(long time) {
176 Sample oldSample = time - newestTime > SAMPLE_WINDOW || current.isComplete() ? current : null;
177 if (oldSample != null) {
178 newestTime = time;
179 current = new Sample(time, nodes.length);
180 if (oldSample.time > 0) {
181 samples.add(oldSample);
182 }
183 }
184 return oldSample;
185 }
186
187 private void setSampleData(Sample sample, NodeId nodeId, double data) {
188 Integer index = nodeToIndex.get(nodeId);
189 if (index != null) {
190 sample.data[index] = data;
191 }
192 }
193
194 private void pruneSamplesIfNeeded() {
195 if (samples.size() > MAX_SAMPLES) {
196 samples.remove(0);
197 }
198 }
199
200 // Performance data sample.
201 static class Sample {
202 final long time;
203 final double[] data;
204
205 public Sample(long time, int nodeCount) {
206 this.time = time;
207 this.data = new double[nodeCount];
208 Arrays.fill(data, -1);
209 }
210
211 public boolean isComplete() {
212 for (int i = 0; i < data.length; i++) {
213 if (data[i] < 0) {
214 return false;
215 }
216 }
217 return true;
218 }
219 }
220
221 private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
222 String data = String.format("%d|%f|%f", time, overallRate, currentRate);
223 communicationService.broadcast(new ClusterMessage(nodeId, SAMPLE, data.getBytes()));
224 }
225
226 private class InternalSampleCollector implements ClusterMessageHandler {
227 @Override
228 public void handle(ClusterMessage message) {
229 String[] fields = new String(message.payload()).split("\\|");
230 log.info("Received sample from {}: {}", message.sender(), fields);
231 addSample(Long.parseLong(fields[0]), message.sender(),
232 Double.parseDouble(fields[1]), Double.parseDouble(fields[1]));
233 }
234 }
235}