blob: b79ea2ee90bef348a5aa5f2fd0b72bca6e67cc5a [file] [log] [blame]
Jordan Halterman5d0bbb72017-04-24 13:48:55 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman5d0bbb72017-04-24 13:48:55 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.transactionperf;
17
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070018import org.onlab.util.Tools;
19import org.onosproject.cfg.ComponentConfigService;
20import org.onosproject.cluster.ClusterService;
21import org.onosproject.cluster.ControllerNode;
22import org.onosproject.store.serializers.KryoNamespaces;
23import org.onosproject.store.service.CommitStatus;
24import org.onosproject.store.service.Serializer;
25import org.onosproject.store.service.StorageService;
26import org.onosproject.store.service.TransactionContext;
27import org.onosproject.store.service.TransactionalMap;
28import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070029import org.osgi.service.component.annotations.Activate;
30import org.osgi.service.component.annotations.Component;
31import org.osgi.service.component.annotations.Deactivate;
32import org.osgi.service.component.annotations.Modified;
33import org.osgi.service.component.annotations.Reference;
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070034import org.slf4j.Logger;
35
Ray Milkeyd84f89b2018-08-17 14:54:17 -070036import java.util.Comparator;
37import java.util.Dictionary;
38import java.util.concurrent.ExecutorService;
39import java.util.concurrent.Executors;
40import java.util.concurrent.ScheduledExecutorService;
41import java.util.concurrent.TimeUnit;
42import java.util.concurrent.atomic.AtomicInteger;
43
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070044import static com.google.common.base.Strings.isNullOrEmpty;
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070045import static org.onlab.util.Tools.get;
46import static org.onlab.util.Tools.groupedThreads;
Ray Milkey88dd7e22018-10-24 10:04:03 -070047import static org.onosproject.transactionperf.OsgiPropertyConstants.MAP_NAME;
48import static org.onosproject.transactionperf.OsgiPropertyConstants.MAP_NAME_DEFAULT;
49import static org.onosproject.transactionperf.OsgiPropertyConstants.READ_PERCENTAGE;
50import static org.onosproject.transactionperf.OsgiPropertyConstants.READ_PERCENTAGE_DEFAULT;
51import static org.onosproject.transactionperf.OsgiPropertyConstants.REPORT_INTERVAL_SECONDS;
52import static org.onosproject.transactionperf.OsgiPropertyConstants.REPORT_INTERVAL_SECONDS_DEFAULT;
53import static org.onosproject.transactionperf.OsgiPropertyConstants.TOTAL_OPERATIONS;
54import static org.onosproject.transactionperf.OsgiPropertyConstants.TOTAL_OPERATIONS_DEFAULT;
55import static org.onosproject.transactionperf.OsgiPropertyConstants.WITH_CONTENTION;
56import static org.onosproject.transactionperf.OsgiPropertyConstants.WITH_CONTENTION_DEFAULT;
57import static org.onosproject.transactionperf.OsgiPropertyConstants.WITH_RETRIES;
58import static org.onosproject.transactionperf.OsgiPropertyConstants.WITH_RETRIES_DEFAULT;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070059import static org.osgi.service.component.annotations.ReferenceCardinality.MANDATORY;
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070060import static org.slf4j.LoggerFactory.getLogger;
61
62/**
63 * Application for measuring transaction performance.
64 */
Ray Milkey88dd7e22018-10-24 10:04:03 -070065@Component(
66 immediate = true,
67 service = TransactionPerfApp.class,
68 property = {
69 MAP_NAME_DEFAULT + "=" + MAP_NAME,
70 READ_PERCENTAGE + ":Double=" + READ_PERCENTAGE_DEFAULT,
71 TOTAL_OPERATIONS + ":Integer=" + TOTAL_OPERATIONS_DEFAULT,
72 WITH_CONTENTION + ":Boolean=" + WITH_CONTENTION_DEFAULT,
73 WITH_RETRIES + ":Boolean=" + WITH_RETRIES_DEFAULT,
74 REPORT_INTERVAL_SECONDS + ":Integer=" + REPORT_INTERVAL_SECONDS_DEFAULT
75 }
76)
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070077public class TransactionPerfApp {
78 private final Logger log = getLogger(getClass());
79
Ray Milkeyd84f89b2018-08-17 14:54:17 -070080 @Reference(cardinality = MANDATORY)
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070081 protected StorageService storageService;
82
Ray Milkeyd84f89b2018-08-17 14:54:17 -070083 @Reference(cardinality = MANDATORY)
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070084 protected ClusterService clusterService;
85
Ray Milkeyd84f89b2018-08-17 14:54:17 -070086 @Reference(cardinality = MANDATORY)
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070087 protected ComponentConfigService configService;
88
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070089 private static final String KEY_PREFIX = "key";
90
Ray Milkey88dd7e22018-10-24 10:04:03 -070091 /** The name of the map to use for testing. */
92 protected String mapName = MAP_NAME_DEFAULT;
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070093
Ray Milkey88dd7e22018-10-24 10:04:03 -070094 /** Percentage of reads to perform. */
95 protected double readPercentage = READ_PERCENTAGE_DEFAULT;
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070096
Ray Milkey88dd7e22018-10-24 10:04:03 -070097 /** Number of operations to perform within each transaction. */
98 protected int totalOperationsPerTransaction = TOTAL_OPERATIONS_DEFAULT;
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070099
Ray Milkey88dd7e22018-10-24 10:04:03 -0700100 /** Whether to test transactions with contention from all nodes. */
101 protected boolean withContention = WITH_CONTENTION_DEFAULT;
Jordan Halterman5d0bbb72017-04-24 13:48:55 -0700102
Ray Milkey88dd7e22018-10-24 10:04:03 -0700103 /** Whether to retry transactions until success. */
104 protected boolean withRetries = WITH_RETRIES_DEFAULT;
Jordan Halterman5d0bbb72017-04-24 13:48:55 -0700105
Ray Milkey88dd7e22018-10-24 10:04:03 -0700106 /** The frequency with which to report performance in seconds. */
107 protected int reportIntervalSeconds = REPORT_INTERVAL_SECONDS_DEFAULT;
Jordan Halterman5d0bbb72017-04-24 13:48:55 -0700108
109 private ExecutorService testRunner =
110 Executors.newSingleThreadExecutor(Tools.groupedThreads("app/transaction-perf-test-runner", ""));
111
112 private ScheduledExecutorService reporter =
113 Executors.newSingleThreadScheduledExecutor(
114 groupedThreads("onos/transaction-perf-test", "reporter"));
115
116 private Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
117
118 private AtomicInteger attempted = new AtomicInteger(0);
119 private AtomicInteger succeeded = new AtomicInteger(0);
120 private AtomicInteger iteration = new AtomicInteger(0);
121
122 @Activate
123 public void activate(ComponentContext context) {
124 configService.registerProperties(getClass());
125 if (isParticipant()) {
126 startTest();
127 reporter.scheduleWithFixedDelay(this::reportPerformance,
128 reportIntervalSeconds,
129 reportIntervalSeconds,
130 TimeUnit.SECONDS);
131 logConfig("Started");
132 }
133 }
134
135 @Modified
136 public void modified(ComponentContext context) {
137 if (context == null) {
Ray Milkey88dd7e22018-10-24 10:04:03 -0700138 mapName = MAP_NAME_DEFAULT;
139 readPercentage = READ_PERCENTAGE_DEFAULT;
140 totalOperationsPerTransaction = TOTAL_OPERATIONS_DEFAULT;
141 withContention = WITH_CONTENTION_DEFAULT;
142 withRetries = WITH_RETRIES_DEFAULT;
143 reportIntervalSeconds = REPORT_INTERVAL_SECONDS_DEFAULT;
Jordan Halterman5d0bbb72017-04-24 13:48:55 -0700144 return;
145 }
146
147 Dictionary properties = context.getProperties();
148
149 String newMapName = mapName;
150 double newReadPercentage = readPercentage;
151 int newTotalOperationsPerTransaction = totalOperationsPerTransaction;
152 boolean newWithContention = withContention;
153 boolean newWithRetries = withRetries;
154 int newReportIntervalSeconds = reportIntervalSeconds;
155
156 try {
157 String s;
158
Ray Milkey88dd7e22018-10-24 10:04:03 -0700159 s = get(properties, MAP_NAME);
Jordan Halterman5d0bbb72017-04-24 13:48:55 -0700160 if (!isNullOrEmpty(s)) {
161 newMapName = s;
162 }
163
Ray Milkey88dd7e22018-10-24 10:04:03 -0700164 s = get(properties, READ_PERCENTAGE);
Jordan Halterman5d0bbb72017-04-24 13:48:55 -0700165 if (!isNullOrEmpty(s)) {
166 newReadPercentage = Double.parseDouble(s);
167 }
168
Ray Milkey88dd7e22018-10-24 10:04:03 -0700169 s = get(properties, TOTAL_OPERATIONS);
Jordan Halterman5d0bbb72017-04-24 13:48:55 -0700170 if (!isNullOrEmpty(s)) {
171 newTotalOperationsPerTransaction = Integer.parseInt(s);
172 }
173
Ray Milkey88dd7e22018-10-24 10:04:03 -0700174 s = get(properties, WITH_CONTENTION);
Jordan Halterman5d0bbb72017-04-24 13:48:55 -0700175 if (!isNullOrEmpty(s)) {
176 newWithContention = Boolean.parseBoolean(s);
177 }
178
Ray Milkey88dd7e22018-10-24 10:04:03 -0700179 s = get(properties, WITH_RETRIES);
Jordan Halterman5d0bbb72017-04-24 13:48:55 -0700180 if (!isNullOrEmpty(s)) {
181 newWithRetries = Boolean.parseBoolean(s);
182 }
183
Ray Milkey88dd7e22018-10-24 10:04:03 -0700184 s = get(properties, REPORT_INTERVAL_SECONDS);
Jordan Halterman5d0bbb72017-04-24 13:48:55 -0700185 if (!isNullOrEmpty(s)) {
186 newReportIntervalSeconds = Integer.parseInt(s);
187 }
188 } catch (NumberFormatException | ClassCastException e) {
189 return;
190 }
191
192 boolean modified = newMapName != mapName
193 || newReadPercentage != readPercentage
194 || newTotalOperationsPerTransaction != totalOperationsPerTransaction
195 || newWithContention != withContention
196 || newWithRetries != withRetries
197 || newReportIntervalSeconds != reportIntervalSeconds;
198
199 // If no configuration options have changed, skip restarting the test.
200 if (!modified) {
201 return;
202 }
203
204 mapName = newMapName;
205 readPercentage = newReadPercentage;
206 totalOperationsPerTransaction = newTotalOperationsPerTransaction;
207 withContention = newWithContention;
208 withRetries = newWithRetries;
209 reportIntervalSeconds = newReportIntervalSeconds;
210
211 // Restart the test.
212 stopTest();
213 testRunner = Executors.newSingleThreadExecutor(
214 groupedThreads("app/transaction-perf-test-runner", ""));
215 reporter = Executors.newSingleThreadScheduledExecutor(
216 groupedThreads("onos/transaction-perf-test", "reporter"));
217 startTest();
218 reporter.scheduleWithFixedDelay(this::reportPerformance,
219 reportIntervalSeconds,
220 reportIntervalSeconds,
221 TimeUnit.SECONDS);
222 logConfig("Restarted");
223 }
224
225 @Deactivate
226 public void deactivate(ComponentContext context) {
227 configService.unregisterProperties(getClass(), false);
228 stopTest();
229 log.info("Stopped");
230 }
231
232 private void logConfig(String prefix) {
233 log.info("{} with mapName = {}; readPercentage = {}; totalOperationsPerTransaction = {}; " +
234 "withContention = {}; withRetries = {}; reportIntervalSeconds = {}",
235 prefix, mapName, readPercentage, totalOperationsPerTransaction,
236 withContention, withRetries, reportIntervalSeconds);
237 }
238
239 /**
240 * Returns whether this node is a participant in the test.
241 *
242 * @return whether this node is a participant in the test
243 */
244 private boolean isParticipant() {
245 return withContention || clusterService.getLocalNode().id().equals(clusterService.getNodes().stream()
246 .map(ControllerNode::id)
247 .min(Comparator.naturalOrder()).get());
248 }
249
250 /**
251 * Initializes the map.
252 */
253 private void initializeMap() {
254 TransactionContext context = storageService.transactionContextBuilder().build();
255 context.begin();
256 try {
257 TransactionalMap<String, String> map = context.getTransactionalMap(mapName, serializer);
258 for (int i = 0; i < totalOperationsPerTransaction; i++) {
259 map.put(KEY_PREFIX + i, KEY_PREFIX + i);
260 }
261 context.commit().join();
262 } catch (Exception e) {
263 context.abort();
264 log.warn("An exception occurred during initialization: {}", e);
265 }
266 }
267
268 /**
269 * Starts the test.
270 */
271 private void startTest() {
272 logConfig("Started");
273 initializeMap();
274 runTest(iteration.getAndIncrement());
275 }
276
277 /**
278 * Runs the test.
279 */
280 private void runTest(int iteration) {
281 testRunner.execute(() -> {
282 // Attempt the transaction until successful if retries are enabled.
283 CommitStatus status = null;
284 do {
285 TransactionContext context = storageService.transactionContextBuilder().build();
286 context.begin();
287
288 try {
289 TransactionalMap<String, String> map = context.getTransactionalMap(mapName, serializer);
290 int reads = (int) (totalOperationsPerTransaction * readPercentage);
291 for (int i = 0; i < reads; i++) {
292 map.get(KEY_PREFIX + i);
293 }
294
295 int writes = (int) (totalOperationsPerTransaction * (1 - readPercentage));
296 for (int i = 0; i < writes; i++) {
297 map.put(KEY_PREFIX + i, KEY_PREFIX + iteration + i);
298 }
299
300 status = context.commit().join();
301 attempted.incrementAndGet();
302 } catch (Exception e) {
303 context.abort();
304 log.warn("An exception occurred during a transaction: {}", e);
305 }
306 } while (withRetries && status != CommitStatus.SUCCESS);
307
308 // If the transaction was successful, increment succeeded transactions.
309 if (status == CommitStatus.SUCCESS) {
310 succeeded.incrementAndGet();
311 }
312
313 runTest(this.iteration.getAndIncrement());
314 });
315 }
316
317 /**
318 * Reports transaction performance.
319 */
320 private void reportPerformance() {
321 log.info("Attempted: {} Succeeded: {} Total iterations: {}",
322 attempted.getAndSet(0),
323 succeeded.getAndSet(0),
324 iteration.get());
325 }
326
327 /**
328 * Stops a test.
329 */
330 private void stopTest() {
331 testRunner.shutdown();
332 reporter.shutdown();
333 }
334}