blob: c941bf6ab0e798d55360dff0593807aa4280707c [file] [log] [blame]
Jordan Halterman5d0bbb72017-04-24 13:48:55 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman5d0bbb72017-04-24 13:48:55 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.transactionperf;
17
18import java.util.Comparator;
19import java.util.Dictionary;
20import java.util.concurrent.ExecutorService;
21import java.util.concurrent.Executors;
22import java.util.concurrent.ScheduledExecutorService;
23import java.util.concurrent.TimeUnit;
24import java.util.concurrent.atomic.AtomicInteger;
25
26import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Modified;
30import org.apache.felix.scr.annotations.Property;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
34import org.onlab.util.Tools;
35import org.onosproject.cfg.ComponentConfigService;
36import org.onosproject.cluster.ClusterService;
37import org.onosproject.cluster.ControllerNode;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.service.CommitStatus;
40import org.onosproject.store.service.Serializer;
41import org.onosproject.store.service.StorageService;
42import org.onosproject.store.service.TransactionContext;
43import org.onosproject.store.service.TransactionalMap;
44import org.osgi.service.component.ComponentContext;
45import org.slf4j.Logger;
46
47import static com.google.common.base.Strings.isNullOrEmpty;
48import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
49import static org.onlab.util.Tools.get;
50import static org.onlab.util.Tools.groupedThreads;
51import static org.slf4j.LoggerFactory.getLogger;
52
53/**
54 * Application for measuring transaction performance.
55 */
56@Component(immediate = true, enabled = true)
57@Service(value = TransactionPerfApp.class)
58public class TransactionPerfApp {
59 private final Logger log = getLogger(getClass());
60
61 @Reference(cardinality = MANDATORY_UNARY)
62 protected StorageService storageService;
63
64 @Reference(cardinality = MANDATORY_UNARY)
65 protected ClusterService clusterService;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 protected ComponentConfigService configService;
69
70 private static final String DEFAULT_MAP_NAME = "transaction-perf";
71 private static final double DEFAULT_READ_PERCENTAGE = .9;
72 private static final int DEFAULT_TOTAL_OPERATIONS = 1000;
73 private static final boolean DEFAULT_WITH_CONTENTION = false;
74 private static final boolean DEFAULT_WITH_RETRIES = false;
75 private static final int DEFAULT_REPORT_INTERVAL_SECONDS = 1;
76 private static final String KEY_PREFIX = "key";
77
78 @Property(name = "mapName", value = DEFAULT_MAP_NAME,
79 label = "The name of the map to use for testing")
80 protected String mapName = DEFAULT_MAP_NAME;
81
82 @Property(name = "readPercentage", doubleValue = DEFAULT_READ_PERCENTAGE,
83 label = "Percentage of reads to perform")
84 protected double readPercentage = DEFAULT_READ_PERCENTAGE;
85
86 @Property(name = "totalOperationsPerTransaction", intValue = DEFAULT_TOTAL_OPERATIONS,
87 label = "Number of operations to perform within each transaction")
88 protected int totalOperationsPerTransaction = DEFAULT_TOTAL_OPERATIONS;
89
90 @Property(name = "withContention", boolValue = DEFAULT_WITH_CONTENTION,
91 label = "Whether to test transactions with contention from all nodes")
92 protected boolean withContention = DEFAULT_WITH_CONTENTION;
93
94 @Property(name = "withRetries", boolValue = DEFAULT_WITH_RETRIES,
95 label = "Whether to retry transactions until success")
96 protected boolean withRetries = DEFAULT_WITH_RETRIES;
97
98 @Property(name = "reportIntervalSeconds", intValue = DEFAULT_REPORT_INTERVAL_SECONDS,
99 label = "The frequency with which to report performance in seconds")
100 protected int reportIntervalSeconds = 1;
101
102 private ExecutorService testRunner =
103 Executors.newSingleThreadExecutor(Tools.groupedThreads("app/transaction-perf-test-runner", ""));
104
105 private ScheduledExecutorService reporter =
106 Executors.newSingleThreadScheduledExecutor(
107 groupedThreads("onos/transaction-perf-test", "reporter"));
108
109 private Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
110
111 private AtomicInteger attempted = new AtomicInteger(0);
112 private AtomicInteger succeeded = new AtomicInteger(0);
113 private AtomicInteger iteration = new AtomicInteger(0);
114
115 @Activate
116 public void activate(ComponentContext context) {
117 configService.registerProperties(getClass());
118 if (isParticipant()) {
119 startTest();
120 reporter.scheduleWithFixedDelay(this::reportPerformance,
121 reportIntervalSeconds,
122 reportIntervalSeconds,
123 TimeUnit.SECONDS);
124 logConfig("Started");
125 }
126 }
127
128 @Modified
129 public void modified(ComponentContext context) {
130 if (context == null) {
131 mapName = DEFAULT_MAP_NAME;
132 readPercentage = DEFAULT_READ_PERCENTAGE;
133 totalOperationsPerTransaction = DEFAULT_TOTAL_OPERATIONS;
134 withContention = DEFAULT_WITH_CONTENTION;
135 withRetries = DEFAULT_WITH_RETRIES;
136 reportIntervalSeconds = DEFAULT_REPORT_INTERVAL_SECONDS;
137 return;
138 }
139
140 Dictionary properties = context.getProperties();
141
142 String newMapName = mapName;
143 double newReadPercentage = readPercentage;
144 int newTotalOperationsPerTransaction = totalOperationsPerTransaction;
145 boolean newWithContention = withContention;
146 boolean newWithRetries = withRetries;
147 int newReportIntervalSeconds = reportIntervalSeconds;
148
149 try {
150 String s;
151
152 s = get(properties, "mapName");
153 if (!isNullOrEmpty(s)) {
154 newMapName = s;
155 }
156
157 s = get(properties, "readPercentage");
158 if (!isNullOrEmpty(s)) {
159 newReadPercentage = Double.parseDouble(s);
160 }
161
162 s = get(properties, "totalOperationsPerTransaction");
163 if (!isNullOrEmpty(s)) {
164 newTotalOperationsPerTransaction = Integer.parseInt(s);
165 }
166
167 s = get(properties, "withContention");
168 if (!isNullOrEmpty(s)) {
169 newWithContention = Boolean.parseBoolean(s);
170 }
171
172 s = get(properties, "withRetries");
173 if (!isNullOrEmpty(s)) {
174 newWithRetries = Boolean.parseBoolean(s);
175 }
176
177 s = get(properties, "reportIntervalSeconds");
178 if (!isNullOrEmpty(s)) {
179 newReportIntervalSeconds = Integer.parseInt(s);
180 }
181 } catch (NumberFormatException | ClassCastException e) {
182 return;
183 }
184
185 boolean modified = newMapName != mapName
186 || newReadPercentage != readPercentage
187 || newTotalOperationsPerTransaction != totalOperationsPerTransaction
188 || newWithContention != withContention
189 || newWithRetries != withRetries
190 || newReportIntervalSeconds != reportIntervalSeconds;
191
192 // If no configuration options have changed, skip restarting the test.
193 if (!modified) {
194 return;
195 }
196
197 mapName = newMapName;
198 readPercentage = newReadPercentage;
199 totalOperationsPerTransaction = newTotalOperationsPerTransaction;
200 withContention = newWithContention;
201 withRetries = newWithRetries;
202 reportIntervalSeconds = newReportIntervalSeconds;
203
204 // Restart the test.
205 stopTest();
206 testRunner = Executors.newSingleThreadExecutor(
207 groupedThreads("app/transaction-perf-test-runner", ""));
208 reporter = Executors.newSingleThreadScheduledExecutor(
209 groupedThreads("onos/transaction-perf-test", "reporter"));
210 startTest();
211 reporter.scheduleWithFixedDelay(this::reportPerformance,
212 reportIntervalSeconds,
213 reportIntervalSeconds,
214 TimeUnit.SECONDS);
215 logConfig("Restarted");
216 }
217
218 @Deactivate
219 public void deactivate(ComponentContext context) {
220 configService.unregisterProperties(getClass(), false);
221 stopTest();
222 log.info("Stopped");
223 }
224
225 private void logConfig(String prefix) {
226 log.info("{} with mapName = {}; readPercentage = {}; totalOperationsPerTransaction = {}; " +
227 "withContention = {}; withRetries = {}; reportIntervalSeconds = {}",
228 prefix, mapName, readPercentage, totalOperationsPerTransaction,
229 withContention, withRetries, reportIntervalSeconds);
230 }
231
232 /**
233 * Returns whether this node is a participant in the test.
234 *
235 * @return whether this node is a participant in the test
236 */
237 private boolean isParticipant() {
238 return withContention || clusterService.getLocalNode().id().equals(clusterService.getNodes().stream()
239 .map(ControllerNode::id)
240 .min(Comparator.naturalOrder()).get());
241 }
242
243 /**
244 * Initializes the map.
245 */
246 private void initializeMap() {
247 TransactionContext context = storageService.transactionContextBuilder().build();
248 context.begin();
249 try {
250 TransactionalMap<String, String> map = context.getTransactionalMap(mapName, serializer);
251 for (int i = 0; i < totalOperationsPerTransaction; i++) {
252 map.put(KEY_PREFIX + i, KEY_PREFIX + i);
253 }
254 context.commit().join();
255 } catch (Exception e) {
256 context.abort();
257 log.warn("An exception occurred during initialization: {}", e);
258 }
259 }
260
261 /**
262 * Starts the test.
263 */
264 private void startTest() {
265 logConfig("Started");
266 initializeMap();
267 runTest(iteration.getAndIncrement());
268 }
269
270 /**
271 * Runs the test.
272 */
273 private void runTest(int iteration) {
274 testRunner.execute(() -> {
275 // Attempt the transaction until successful if retries are enabled.
276 CommitStatus status = null;
277 do {
278 TransactionContext context = storageService.transactionContextBuilder().build();
279 context.begin();
280
281 try {
282 TransactionalMap<String, String> map = context.getTransactionalMap(mapName, serializer);
283 int reads = (int) (totalOperationsPerTransaction * readPercentage);
284 for (int i = 0; i < reads; i++) {
285 map.get(KEY_PREFIX + i);
286 }
287
288 int writes = (int) (totalOperationsPerTransaction * (1 - readPercentage));
289 for (int i = 0; i < writes; i++) {
290 map.put(KEY_PREFIX + i, KEY_PREFIX + iteration + i);
291 }
292
293 status = context.commit().join();
294 attempted.incrementAndGet();
295 } catch (Exception e) {
296 context.abort();
297 log.warn("An exception occurred during a transaction: {}", e);
298 }
299 } while (withRetries && status != CommitStatus.SUCCESS);
300
301 // If the transaction was successful, increment succeeded transactions.
302 if (status == CommitStatus.SUCCESS) {
303 succeeded.incrementAndGet();
304 }
305
306 runTest(this.iteration.getAndIncrement());
307 });
308 }
309
310 /**
311 * Reports transaction performance.
312 */
313 private void reportPerformance() {
314 log.info("Attempted: {} Succeeded: {} Total iterations: {}",
315 attempted.getAndSet(0),
316 succeeded.getAndSet(0),
317 iteration.get());
318 }
319
320 /**
321 * Stops a test.
322 */
323 private void stopTest() {
324 testRunner.shutdown();
325 reporter.shutdown();
326 }
327}