blob: 7af34afa1217ecdd8615c502159da188995e88bc [file] [log] [blame]
Jordan Halterman29718e62017-07-20 13:24:33 -07001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.primitiveperf;
17
18import java.util.Dictionary;
19import java.util.List;
20import java.util.Map;
21import java.util.Timer;
22import java.util.TimerTask;
23import java.util.TreeMap;
24import java.util.UUID;
25import java.util.concurrent.ExecutorService;
26import java.util.concurrent.Executors;
27import java.util.concurrent.TimeUnit;
28import java.util.concurrent.atomic.AtomicLong;
29import java.util.function.Consumer;
30
31import com.google.common.collect.Lists;
32import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
34import org.apache.felix.scr.annotations.Deactivate;
35import org.apache.felix.scr.annotations.Modified;
36import org.apache.felix.scr.annotations.Property;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.apache.felix.scr.annotations.Service;
40import org.onosproject.cfg.ComponentConfigService;
41import org.onosproject.cluster.ClusterService;
42import org.onosproject.cluster.ControllerNode;
43import org.onosproject.cluster.NodeId;
44import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
45import org.onosproject.store.cluster.messaging.MessageSubject;
46import org.onosproject.store.serializers.KryoNamespaces;
47import org.onosproject.store.service.ConsistentMap;
48import org.onosproject.store.service.Serializer;
49import org.onosproject.store.service.StorageService;
50import org.osgi.service.component.ComponentContext;
51import org.slf4j.Logger;
52
53import static com.google.common.base.Strings.isNullOrEmpty;
54import static java.lang.System.currentTimeMillis;
55import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
56import static org.onlab.util.Tools.get;
57import static org.onlab.util.Tools.groupedThreads;
58import static org.slf4j.LoggerFactory.getLogger;
59
60/**
61 * Application to test sustained primitive throughput.
62 */
63@Component(immediate = true)
64@Service(value = PrimitivePerfApp.class)
65public class PrimitivePerfApp {
66
67 private final Logger log = getLogger(getClass());
68
69 private static final int DEFAULT_NUM_CLIENTS = 64;
70 private static final int DEFAULT_WRITE_PERCENTAGE = 100;
71
72 private static final int REPORT_PERIOD = 1_000; //ms
73
74 private static final String START = "start";
75 private static final String STOP = "stop";
76 private static final MessageSubject CONTROL = new MessageSubject("primitive-perf-ctl");
77
78 @Property(name = "numClients", intValue = DEFAULT_NUM_CLIENTS,
79 label = "Number of clients to use to submit writes")
80 private int numClients = DEFAULT_NUM_CLIENTS;
81
82 @Property(name = "writePercentage", intValue = DEFAULT_WRITE_PERCENTAGE,
83 label = "Percentage of operations to perform as writes")
84 private int writePercentage = DEFAULT_WRITE_PERCENTAGE;
85
86 @Reference(cardinality = MANDATORY_UNARY)
87 protected ClusterService clusterService;
88
89 @Reference(cardinality = MANDATORY_UNARY)
90 protected StorageService storageService;
91
92 @Reference(cardinality = MANDATORY_UNARY)
93 protected ComponentConfigService configService;
94
95 @Reference(cardinality = MANDATORY_UNARY)
96 protected PrimitivePerfCollector sampleCollector;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected ClusterCommunicationService communicationService;
100
101 private ExecutorService messageHandlingExecutor;
102
103 private ExecutorService workers;
104 private boolean stopped = true;
105
106 private Timer reportTimer;
107
108 private NodeId nodeId;
109 private TimerTask reporterTask;
110
111 private long startTime;
112 private long currentStartTime;
113 private AtomicLong overallCounter;
114 private AtomicLong currentCounter;
115
116 @Activate
117 public void activate(ComponentContext context) {
118 configService.registerProperties(getClass());
119
120 nodeId = clusterService.getLocalNode().id();
121
122 reportTimer = new Timer("onos-primitive-perf-reporter");
123
124 messageHandlingExecutor = Executors.newSingleThreadExecutor(
125 groupedThreads("onos/perf", "command-handler"));
126
127 communicationService.addSubscriber(CONTROL, String::new, new InternalControl(),
128 messageHandlingExecutor);
129
130 // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
131 modify(context);
132 }
133
134 @Deactivate
135 public void deactivate() {
136 stopTestRun();
137
138 configService.unregisterProperties(getClass(), false);
139 messageHandlingExecutor.shutdown();
140 communicationService.removeSubscriber(CONTROL);
141
142 if (reportTimer != null) {
143 reportTimer.cancel();
144 reportTimer = null;
145 }
146 }
147
148 @Modified
149 public void modify(ComponentContext context) {
150 if (context == null) {
151 logConfig("Reconfigured");
152 return;
153 }
154
155 Dictionary<?, ?> properties = context.getProperties();
156 int newNumClients;
157 try {
158 String s = get(properties, "numClients");
159 newNumClients = isNullOrEmpty(s) ? numClients : Integer.parseInt(s.trim());
160 } catch (NumberFormatException | ClassCastException e) {
161 log.warn("Malformed configuration detected; using defaults", e);
162 newNumClients = DEFAULT_NUM_CLIENTS;
163 }
164
165 int newWritePercentage;
166 try {
167 String s = get(properties, "writePercentage");
168 newWritePercentage = isNullOrEmpty(s) ? writePercentage : Integer.parseInt(s.trim());
169 } catch (NumberFormatException | ClassCastException e) {
170 log.warn("Malformed configuration detected; using defaults", e);
171 newWritePercentage = DEFAULT_WRITE_PERCENTAGE;
172 }
173
174 if (newNumClients != numClients || newWritePercentage != writePercentage) {
175 numClients = newNumClients;
176 writePercentage = newWritePercentage;
177 logConfig("Reconfigured");
178 if (!stopped) {
179 stop();
180 start();
181 }
182 }
183 }
184
185 public void start() {
186 if (stopped) {
187 stopped = false;
188 communicationService.broadcast(START, CONTROL, str -> str.getBytes());
189 startTestRun();
190 }
191 }
192
193 public void stop() {
194 if (!stopped) {
195 communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
196 stopTestRun();
197 }
198 }
199
200 private void logConfig(String prefix) {
201 log.info("{} with numClients = {}; writePercentage = {}", prefix, numClients, writePercentage);
202 }
203
204 private void startTestRun() {
205 sampleCollector.clearSamples();
206
207 startTime = System.currentTimeMillis();
208 currentStartTime = startTime;
209 currentCounter = new AtomicLong();
210 overallCounter = new AtomicLong();
211
212 reporterTask = new ReporterTask();
213 reportTimer.scheduleAtFixedRate(reporterTask,
214 REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
215 REPORT_PERIOD);
216
217 stopped = false;
218
219 Map<String, ControllerNode> nodes = new TreeMap<>();
220 for (ControllerNode node : clusterService.getNodes()) {
221 nodes.put(node.id().id(), node);
222 }
223
224 // Compute the index of the local node in a sorted list of nodes.
225 List<String> sortedNodes = Lists.newArrayList(nodes.keySet());
226 int nodeCount = nodes.size();
227 int index = sortedNodes.indexOf(nodeId.id());
228
229 // Count the number of workers assigned to this node.
230 int workerCount = 0;
231 for (int i = 1; i <= numClients; i++) {
232 if (i % nodeCount == index) {
233 workerCount++;
234 }
235 }
236
237 // Create a worker pool and start the workers for this node.
238 workers = Executors.newFixedThreadPool(workerCount, groupedThreads("onos/primitive-perf", "worker-%d"));
239 for (int i = 0; i < workerCount; i++) {
240 workers.submit(new Runner(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
241 }
242 log.info("Started test run");
243 }
244
245 private void stopTestRun() {
246 if (reporterTask != null) {
247 reporterTask.cancel();
248 reporterTask = null;
249 }
250
251 try {
252 workers.awaitTermination(10, TimeUnit.MILLISECONDS);
253 } catch (InterruptedException e) {
254 log.warn("Failed to stop worker", e);
255 }
256
257 sampleCollector.recordSample(0, 0);
258 sampleCollector.recordSample(0, 0);
259 stopped = true;
260
261 log.info("Stopped test run");
262 }
263
264 // Submits primitive operations.
265 final class Runner implements Runnable {
266 private final String key;
267 private final String value;
268 private ConsistentMap<String, String> map;
269
270 private Runner(String key, String value) {
271 this.key = key;
272 this.value = value;
273 }
274
275 @Override
276 public void run() {
277 setup();
278 while (!stopped) {
279 try {
280 submit();
281 } catch (Exception e) {
282 log.warn("Exception during cycle", e);
283 }
284 }
285 teardown();
286 }
287
288 private void setup() {
289 map = storageService.<String, String>consistentMapBuilder()
290 .withName("perf-test")
291 .withSerializer(Serializer.using(KryoNamespaces.BASIC))
292 .build();
293 }
294
295 private void submit() {
296 if (currentCounter.incrementAndGet() % 100 < writePercentage) {
297 map.put(key, value);
298 } else {
299 map.get(key);
300 }
301 }
302
303 private void teardown() {
304 map.destroy();
305 }
306 }
307
308 private class InternalControl implements Consumer<String> {
309 @Override
310 public void accept(String cmd) {
311 log.info("Received command {}", cmd);
312 if (cmd.equals(START)) {
313 startTestRun();
314 } else {
315 stopTestRun();
316 }
317 }
318 }
319
320 private class ReporterTask extends TimerTask {
321 @Override
322 public void run() {
323 long endTime = System.currentTimeMillis();
324 long overallTime = endTime - startTime;
325 long currentTime = endTime - currentStartTime;
326 long currentCount = currentCounter.getAndSet(0);
327 long overallCount = overallCounter.addAndGet(currentCount);
328 sampleCollector.recordSample(overallTime > 0 ? overallCount / (overallTime / 1000d) : 0,
329 currentTime > 0 ? currentCount / (currentTime / 1000d) : 0);
330 currentStartTime = System.currentTimeMillis();
331 }
332 }
333
334}