blob: 256edfccb2f8c0a9887b364d63c0795ee103cef7 [file] [log] [blame]
Jordan Halterman29718e62017-07-20 13:24:33 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman29718e62017-07-20 13:24:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.primitiveperf;
17
18import java.util.Dictionary;
19import java.util.List;
20import java.util.Map;
21import java.util.Timer;
22import java.util.TimerTask;
23import java.util.TreeMap;
24import java.util.UUID;
25import java.util.concurrent.ExecutorService;
26import java.util.concurrent.Executors;
27import java.util.concurrent.TimeUnit;
28import java.util.concurrent.atomic.AtomicLong;
29import java.util.function.Consumer;
30
31import com.google.common.collect.Lists;
32import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
34import org.apache.felix.scr.annotations.Deactivate;
35import org.apache.felix.scr.annotations.Modified;
36import org.apache.felix.scr.annotations.Property;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.apache.felix.scr.annotations.Service;
40import org.onosproject.cfg.ComponentConfigService;
41import org.onosproject.cluster.ClusterService;
42import org.onosproject.cluster.ControllerNode;
43import org.onosproject.cluster.NodeId;
44import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
45import org.onosproject.store.cluster.messaging.MessageSubject;
46import org.onosproject.store.serializers.KryoNamespaces;
47import org.onosproject.store.service.ConsistentMap;
48import org.onosproject.store.service.Serializer;
49import org.onosproject.store.service.StorageService;
50import org.osgi.service.component.ComponentContext;
51import org.slf4j.Logger;
52
53import static com.google.common.base.Strings.isNullOrEmpty;
54import static java.lang.System.currentTimeMillis;
55import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
56import static org.onlab.util.Tools.get;
57import static org.onlab.util.Tools.groupedThreads;
58import static org.slf4j.LoggerFactory.getLogger;
59
60/**
61 * Application to test sustained primitive throughput.
62 */
63@Component(immediate = true)
64@Service(value = PrimitivePerfApp.class)
65public class PrimitivePerfApp {
66
67 private final Logger log = getLogger(getClass());
68
69 private static final int DEFAULT_NUM_CLIENTS = 64;
70 private static final int DEFAULT_WRITE_PERCENTAGE = 100;
71
72 private static final int REPORT_PERIOD = 1_000; //ms
73
74 private static final String START = "start";
75 private static final String STOP = "stop";
76 private static final MessageSubject CONTROL = new MessageSubject("primitive-perf-ctl");
77
78 @Property(name = "numClients", intValue = DEFAULT_NUM_CLIENTS,
79 label = "Number of clients to use to submit writes")
80 private int numClients = DEFAULT_NUM_CLIENTS;
81
82 @Property(name = "writePercentage", intValue = DEFAULT_WRITE_PERCENTAGE,
83 label = "Percentage of operations to perform as writes")
84 private int writePercentage = DEFAULT_WRITE_PERCENTAGE;
85
86 @Reference(cardinality = MANDATORY_UNARY)
87 protected ClusterService clusterService;
88
89 @Reference(cardinality = MANDATORY_UNARY)
90 protected StorageService storageService;
91
92 @Reference(cardinality = MANDATORY_UNARY)
93 protected ComponentConfigService configService;
94
95 @Reference(cardinality = MANDATORY_UNARY)
96 protected PrimitivePerfCollector sampleCollector;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected ClusterCommunicationService communicationService;
100
101 private ExecutorService messageHandlingExecutor;
102
103 private ExecutorService workers;
104 private boolean stopped = true;
105
106 private Timer reportTimer;
107
108 private NodeId nodeId;
109 private TimerTask reporterTask;
110
111 private long startTime;
112 private long currentStartTime;
113 private AtomicLong overallCounter;
114 private AtomicLong currentCounter;
115
116 @Activate
117 public void activate(ComponentContext context) {
118 configService.registerProperties(getClass());
119
120 nodeId = clusterService.getLocalNode().id();
121
122 reportTimer = new Timer("onos-primitive-perf-reporter");
123
124 messageHandlingExecutor = Executors.newSingleThreadExecutor(
125 groupedThreads("onos/perf", "command-handler"));
126
127 communicationService.addSubscriber(CONTROL, String::new, new InternalControl(),
128 messageHandlingExecutor);
129
130 // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
131 modify(context);
132 }
133
134 @Deactivate
135 public void deactivate() {
136 stopTestRun();
137
138 configService.unregisterProperties(getClass(), false);
139 messageHandlingExecutor.shutdown();
140 communicationService.removeSubscriber(CONTROL);
141
142 if (reportTimer != null) {
143 reportTimer.cancel();
144 reportTimer = null;
145 }
146 }
147
148 @Modified
149 public void modify(ComponentContext context) {
150 if (context == null) {
151 logConfig("Reconfigured");
152 return;
153 }
154
155 Dictionary<?, ?> properties = context.getProperties();
156 int newNumClients;
157 try {
158 String s = get(properties, "numClients");
159 newNumClients = isNullOrEmpty(s) ? numClients : Integer.parseInt(s.trim());
160 } catch (NumberFormatException | ClassCastException e) {
161 log.warn("Malformed configuration detected; using defaults", e);
162 newNumClients = DEFAULT_NUM_CLIENTS;
163 }
164
165 int newWritePercentage;
166 try {
167 String s = get(properties, "writePercentage");
168 newWritePercentage = isNullOrEmpty(s) ? writePercentage : Integer.parseInt(s.trim());
169 } catch (NumberFormatException | ClassCastException e) {
170 log.warn("Malformed configuration detected; using defaults", e);
171 newWritePercentage = DEFAULT_WRITE_PERCENTAGE;
172 }
173
174 if (newNumClients != numClients || newWritePercentage != writePercentage) {
175 numClients = newNumClients;
176 writePercentage = newWritePercentage;
177 logConfig("Reconfigured");
178 if (!stopped) {
179 stop();
180 start();
181 }
182 }
183 }
184
185 public void start() {
186 if (stopped) {
187 stopped = false;
188 communicationService.broadcast(START, CONTROL, str -> str.getBytes());
189 startTestRun();
190 }
191 }
192
193 public void stop() {
194 if (!stopped) {
195 communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
196 stopTestRun();
197 }
198 }
199
200 private void logConfig(String prefix) {
201 log.info("{} with numClients = {}; writePercentage = {}", prefix, numClients, writePercentage);
202 }
203
204 private void startTestRun() {
205 sampleCollector.clearSamples();
206
207 startTime = System.currentTimeMillis();
208 currentStartTime = startTime;
209 currentCounter = new AtomicLong();
210 overallCounter = new AtomicLong();
211
212 reporterTask = new ReporterTask();
213 reportTimer.scheduleAtFixedRate(reporterTask,
214 REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
215 REPORT_PERIOD);
216
217 stopped = false;
218
219 Map<String, ControllerNode> nodes = new TreeMap<>();
220 for (ControllerNode node : clusterService.getNodes()) {
221 nodes.put(node.id().id(), node);
222 }
223
224 // Compute the index of the local node in a sorted list of nodes.
225 List<String> sortedNodes = Lists.newArrayList(nodes.keySet());
226 int nodeCount = nodes.size();
227 int index = sortedNodes.indexOf(nodeId.id());
228
229 // Count the number of workers assigned to this node.
230 int workerCount = 0;
231 for (int i = 1; i <= numClients; i++) {
232 if (i % nodeCount == index) {
233 workerCount++;
234 }
235 }
236
237 // Create a worker pool and start the workers for this node.
Jordan Haltermanf5333892017-08-29 10:42:13 -0700238 if (workerCount > 0) {
239 workers = Executors.newFixedThreadPool(workerCount, groupedThreads("onos/primitive-perf", "worker-%d"));
240 for (int i = 0; i < workerCount; i++) {
241 workers.submit(new Runner(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
242 }
Jordan Halterman29718e62017-07-20 13:24:33 -0700243 }
244 log.info("Started test run");
245 }
246
247 private void stopTestRun() {
248 if (reporterTask != null) {
249 reporterTask.cancel();
250 reporterTask = null;
251 }
252
Jordan Haltermanf5333892017-08-29 10:42:13 -0700253 if (workers != null) {
254 workers.shutdown();
255 try {
256 workers.awaitTermination(10, TimeUnit.MILLISECONDS);
257 } catch (InterruptedException e) {
258 log.warn("Failed to stop worker", e);
Ray Milkey5c7d4882018-02-05 14:50:39 -0800259 Thread.currentThread().interrupt();
Jordan Haltermanf5333892017-08-29 10:42:13 -0700260 }
Jordan Halterman29718e62017-07-20 13:24:33 -0700261 }
262
263 sampleCollector.recordSample(0, 0);
264 sampleCollector.recordSample(0, 0);
265 stopped = true;
266
267 log.info("Stopped test run");
268 }
269
270 // Submits primitive operations.
271 final class Runner implements Runnable {
272 private final String key;
273 private final String value;
274 private ConsistentMap<String, String> map;
275
276 private Runner(String key, String value) {
277 this.key = key;
278 this.value = value;
279 }
280
281 @Override
282 public void run() {
283 setup();
284 while (!stopped) {
285 try {
286 submit();
287 } catch (Exception e) {
288 log.warn("Exception during cycle", e);
289 }
290 }
291 teardown();
292 }
293
294 private void setup() {
295 map = storageService.<String, String>consistentMapBuilder()
296 .withName("perf-test")
297 .withSerializer(Serializer.using(KryoNamespaces.BASIC))
298 .build();
299 }
300
301 private void submit() {
302 if (currentCounter.incrementAndGet() % 100 < writePercentage) {
303 map.put(key, value);
304 } else {
305 map.get(key);
306 }
307 }
308
309 private void teardown() {
310 map.destroy();
311 }
312 }
313
314 private class InternalControl implements Consumer<String> {
315 @Override
316 public void accept(String cmd) {
317 log.info("Received command {}", cmd);
318 if (cmd.equals(START)) {
319 startTestRun();
320 } else {
321 stopTestRun();
322 }
323 }
324 }
325
326 private class ReporterTask extends TimerTask {
327 @Override
328 public void run() {
329 long endTime = System.currentTimeMillis();
330 long overallTime = endTime - startTime;
331 long currentTime = endTime - currentStartTime;
332 long currentCount = currentCounter.getAndSet(0);
333 long overallCount = overallCounter.addAndGet(currentCount);
334 sampleCollector.recordSample(overallTime > 0 ? overallCount / (overallTime / 1000d) : 0,
335 currentTime > 0 ? currentCount / (currentTime / 1000d) : 0);
336 currentStartTime = System.currentTimeMillis();
337 }
338 }
339
340}