blob: 5e8900acf0445822b532ea40c8c683fbcf74bf9d [file] [log] [blame]
Jordan Halterman29718e62017-07-20 13:24:33 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman29718e62017-07-20 13:24:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.primitiveperf;
17
18import java.util.Dictionary;
19import java.util.List;
20import java.util.Map;
21import java.util.Timer;
22import java.util.TimerTask;
23import java.util.TreeMap;
24import java.util.UUID;
25import java.util.concurrent.ExecutorService;
26import java.util.concurrent.Executors;
27import java.util.concurrent.TimeUnit;
28import java.util.concurrent.atomic.AtomicLong;
29import java.util.function.Consumer;
30
31import com.google.common.collect.Lists;
32import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
34import org.apache.felix.scr.annotations.Deactivate;
35import org.apache.felix.scr.annotations.Modified;
36import org.apache.felix.scr.annotations.Property;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.apache.felix.scr.annotations.Service;
40import org.onosproject.cfg.ComponentConfigService;
41import org.onosproject.cluster.ClusterService;
42import org.onosproject.cluster.ControllerNode;
43import org.onosproject.cluster.NodeId;
44import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
45import org.onosproject.store.cluster.messaging.MessageSubject;
46import org.onosproject.store.serializers.KryoNamespaces;
47import org.onosproject.store.service.ConsistentMap;
48import org.onosproject.store.service.Serializer;
49import org.onosproject.store.service.StorageService;
50import org.osgi.service.component.ComponentContext;
51import org.slf4j.Logger;
52
53import static com.google.common.base.Strings.isNullOrEmpty;
54import static java.lang.System.currentTimeMillis;
55import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
56import static org.onlab.util.Tools.get;
57import static org.onlab.util.Tools.groupedThreads;
58import static org.slf4j.LoggerFactory.getLogger;
59
60/**
61 * Application to test sustained primitive throughput.
62 */
63@Component(immediate = true)
64@Service(value = PrimitivePerfApp.class)
65public class PrimitivePerfApp {
66
67 private final Logger log = getLogger(getClass());
68
69 private static final int DEFAULT_NUM_CLIENTS = 64;
70 private static final int DEFAULT_WRITE_PERCENTAGE = 100;
71
72 private static final int REPORT_PERIOD = 1_000; //ms
73
74 private static final String START = "start";
75 private static final String STOP = "stop";
76 private static final MessageSubject CONTROL = new MessageSubject("primitive-perf-ctl");
77
78 @Property(name = "numClients", intValue = DEFAULT_NUM_CLIENTS,
79 label = "Number of clients to use to submit writes")
80 private int numClients = DEFAULT_NUM_CLIENTS;
81
82 @Property(name = "writePercentage", intValue = DEFAULT_WRITE_PERCENTAGE,
83 label = "Percentage of operations to perform as writes")
84 private int writePercentage = DEFAULT_WRITE_PERCENTAGE;
85
86 @Reference(cardinality = MANDATORY_UNARY)
87 protected ClusterService clusterService;
88
89 @Reference(cardinality = MANDATORY_UNARY)
90 protected StorageService storageService;
91
92 @Reference(cardinality = MANDATORY_UNARY)
93 protected ComponentConfigService configService;
94
95 @Reference(cardinality = MANDATORY_UNARY)
96 protected PrimitivePerfCollector sampleCollector;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected ClusterCommunicationService communicationService;
100
101 private ExecutorService messageHandlingExecutor;
102
103 private ExecutorService workers;
104 private boolean stopped = true;
105
106 private Timer reportTimer;
107
108 private NodeId nodeId;
109 private TimerTask reporterTask;
110
111 private long startTime;
112 private long currentStartTime;
113 private AtomicLong overallCounter;
114 private AtomicLong currentCounter;
115
116 @Activate
117 public void activate(ComponentContext context) {
118 configService.registerProperties(getClass());
119
120 nodeId = clusterService.getLocalNode().id();
121
122 reportTimer = new Timer("onos-primitive-perf-reporter");
123
124 messageHandlingExecutor = Executors.newSingleThreadExecutor(
125 groupedThreads("onos/perf", "command-handler"));
126
127 communicationService.addSubscriber(CONTROL, String::new, new InternalControl(),
128 messageHandlingExecutor);
129
130 // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
131 modify(context);
132 }
133
134 @Deactivate
135 public void deactivate() {
136 stopTestRun();
137
138 configService.unregisterProperties(getClass(), false);
139 messageHandlingExecutor.shutdown();
140 communicationService.removeSubscriber(CONTROL);
141
142 if (reportTimer != null) {
143 reportTimer.cancel();
144 reportTimer = null;
145 }
146 }
147
148 @Modified
149 public void modify(ComponentContext context) {
150 if (context == null) {
151 logConfig("Reconfigured");
152 return;
153 }
154
155 Dictionary<?, ?> properties = context.getProperties();
156 int newNumClients;
157 try {
158 String s = get(properties, "numClients");
159 newNumClients = isNullOrEmpty(s) ? numClients : Integer.parseInt(s.trim());
160 } catch (NumberFormatException | ClassCastException e) {
161 log.warn("Malformed configuration detected; using defaults", e);
162 newNumClients = DEFAULT_NUM_CLIENTS;
163 }
164
165 int newWritePercentage;
166 try {
167 String s = get(properties, "writePercentage");
168 newWritePercentage = isNullOrEmpty(s) ? writePercentage : Integer.parseInt(s.trim());
169 } catch (NumberFormatException | ClassCastException e) {
170 log.warn("Malformed configuration detected; using defaults", e);
171 newWritePercentage = DEFAULT_WRITE_PERCENTAGE;
172 }
173
174 if (newNumClients != numClients || newWritePercentage != writePercentage) {
175 numClients = newNumClients;
176 writePercentage = newWritePercentage;
177 logConfig("Reconfigured");
178 if (!stopped) {
179 stop();
180 start();
181 }
182 }
183 }
184
185 public void start() {
186 if (stopped) {
187 stopped = false;
188 communicationService.broadcast(START, CONTROL, str -> str.getBytes());
189 startTestRun();
190 }
191 }
192
193 public void stop() {
194 if (!stopped) {
195 communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
196 stopTestRun();
197 }
198 }
199
200 private void logConfig(String prefix) {
201 log.info("{} with numClients = {}; writePercentage = {}", prefix, numClients, writePercentage);
202 }
203
204 private void startTestRun() {
205 sampleCollector.clearSamples();
206
207 startTime = System.currentTimeMillis();
208 currentStartTime = startTime;
209 currentCounter = new AtomicLong();
210 overallCounter = new AtomicLong();
211
212 reporterTask = new ReporterTask();
213 reportTimer.scheduleAtFixedRate(reporterTask,
214 REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
215 REPORT_PERIOD);
216
217 stopped = false;
218
219 Map<String, ControllerNode> nodes = new TreeMap<>();
220 for (ControllerNode node : clusterService.getNodes()) {
221 nodes.put(node.id().id(), node);
222 }
223
224 // Compute the index of the local node in a sorted list of nodes.
225 List<String> sortedNodes = Lists.newArrayList(nodes.keySet());
226 int nodeCount = nodes.size();
227 int index = sortedNodes.indexOf(nodeId.id());
228
229 // Count the number of workers assigned to this node.
230 int workerCount = 0;
231 for (int i = 1; i <= numClients; i++) {
232 if (i % nodeCount == index) {
233 workerCount++;
234 }
235 }
236
237 // Create a worker pool and start the workers for this node.
Jordan Halterman404e2f92017-08-29 10:42:13 -0700238 if (workerCount > 0) {
239 workers = Executors.newFixedThreadPool(workerCount, groupedThreads("onos/primitive-perf", "worker-%d"));
240 for (int i = 0; i < workerCount; i++) {
241 workers.submit(new Runner(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
242 }
Jordan Halterman29718e62017-07-20 13:24:33 -0700243 }
244 log.info("Started test run");
245 }
246
247 private void stopTestRun() {
248 if (reporterTask != null) {
249 reporterTask.cancel();
250 reporterTask = null;
251 }
252
Jordan Halterman404e2f92017-08-29 10:42:13 -0700253 if (workers != null) {
254 workers.shutdown();
255 try {
256 workers.awaitTermination(10, TimeUnit.MILLISECONDS);
257 } catch (InterruptedException e) {
258 log.warn("Failed to stop worker", e);
259 }
Jordan Halterman29718e62017-07-20 13:24:33 -0700260 }
261
262 sampleCollector.recordSample(0, 0);
263 sampleCollector.recordSample(0, 0);
264 stopped = true;
265
266 log.info("Stopped test run");
267 }
268
269 // Submits primitive operations.
270 final class Runner implements Runnable {
271 private final String key;
272 private final String value;
273 private ConsistentMap<String, String> map;
274
275 private Runner(String key, String value) {
276 this.key = key;
277 this.value = value;
278 }
279
280 @Override
281 public void run() {
282 setup();
283 while (!stopped) {
284 try {
285 submit();
286 } catch (Exception e) {
287 log.warn("Exception during cycle", e);
288 }
289 }
290 teardown();
291 }
292
293 private void setup() {
294 map = storageService.<String, String>consistentMapBuilder()
295 .withName("perf-test")
296 .withSerializer(Serializer.using(KryoNamespaces.BASIC))
297 .build();
298 }
299
300 private void submit() {
301 if (currentCounter.incrementAndGet() % 100 < writePercentage) {
302 map.put(key, value);
303 } else {
304 map.get(key);
305 }
306 }
307
308 private void teardown() {
309 map.destroy();
310 }
311 }
312
313 private class InternalControl implements Consumer<String> {
314 @Override
315 public void accept(String cmd) {
316 log.info("Received command {}", cmd);
317 if (cmd.equals(START)) {
318 startTestRun();
319 } else {
320 stopTestRun();
321 }
322 }
323 }
324
325 private class ReporterTask extends TimerTask {
326 @Override
327 public void run() {
328 long endTime = System.currentTimeMillis();
329 long overallTime = endTime - startTime;
330 long currentTime = endTime - currentStartTime;
331 long currentCount = currentCounter.getAndSet(0);
332 long overallCount = overallCounter.addAndGet(currentCount);
333 sampleCollector.recordSample(overallTime > 0 ? overallCount / (overallTime / 1000d) : 0,
334 currentTime > 0 ? currentCount / (currentTime / 1000d) : 0);
335 currentStartTime = System.currentTimeMillis();
336 }
337 }
338
339}