blob: 45bdfd6a441f42d463fd0de089d1760f2c574024 [file] [log] [blame]
Jordan Halterman5d0bbb72017-04-24 13:48:55 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman5d0bbb72017-04-24 13:48:55 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.transactionperf;
17
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070018import org.onlab.util.Tools;
19import org.onosproject.cfg.ComponentConfigService;
20import org.onosproject.cluster.ClusterService;
21import org.onosproject.cluster.ControllerNode;
22import org.onosproject.store.serializers.KryoNamespaces;
23import org.onosproject.store.service.CommitStatus;
24import org.onosproject.store.service.Serializer;
25import org.onosproject.store.service.StorageService;
26import org.onosproject.store.service.TransactionContext;
27import org.onosproject.store.service.TransactionalMap;
28import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070029import org.osgi.service.component.annotations.Activate;
30import org.osgi.service.component.annotations.Component;
31import org.osgi.service.component.annotations.Deactivate;
32import org.osgi.service.component.annotations.Modified;
33import org.osgi.service.component.annotations.Reference;
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070034import org.slf4j.Logger;
35
Ray Milkeyd84f89b2018-08-17 14:54:17 -070036import java.util.Comparator;
37import java.util.Dictionary;
38import java.util.concurrent.ExecutorService;
39import java.util.concurrent.Executors;
40import java.util.concurrent.ScheduledExecutorService;
41import java.util.concurrent.TimeUnit;
42import java.util.concurrent.atomic.AtomicInteger;
43
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070044import static com.google.common.base.Strings.isNullOrEmpty;
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070045import static org.onlab.util.Tools.get;
46import static org.onlab.util.Tools.groupedThreads;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070047import static org.osgi.service.component.annotations.ReferenceCardinality.MANDATORY;
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070048import static org.slf4j.LoggerFactory.getLogger;
49
50/**
51 * Application for measuring transaction performance.
52 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070053@Component(immediate = true, service = TransactionPerfApp.class)
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070054public class TransactionPerfApp {
55 private final Logger log = getLogger(getClass());
56
Ray Milkeyd84f89b2018-08-17 14:54:17 -070057 @Reference(cardinality = MANDATORY)
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070058 protected StorageService storageService;
59
Ray Milkeyd84f89b2018-08-17 14:54:17 -070060 @Reference(cardinality = MANDATORY)
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070061 protected ClusterService clusterService;
62
Ray Milkeyd84f89b2018-08-17 14:54:17 -070063 @Reference(cardinality = MANDATORY)
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070064 protected ComponentConfigService configService;
65
66 private static final String DEFAULT_MAP_NAME = "transaction-perf";
67 private static final double DEFAULT_READ_PERCENTAGE = .9;
68 private static final int DEFAULT_TOTAL_OPERATIONS = 1000;
69 private static final boolean DEFAULT_WITH_CONTENTION = false;
70 private static final boolean DEFAULT_WITH_RETRIES = false;
71 private static final int DEFAULT_REPORT_INTERVAL_SECONDS = 1;
72 private static final String KEY_PREFIX = "key";
73
Ray Milkeyd84f89b2018-08-17 14:54:17 -070074 //@Property(name = "mapName", value = DEFAULT_MAP_NAME,
75 // label = "The name of the map to use for testing")
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070076 protected String mapName = DEFAULT_MAP_NAME;
77
Ray Milkeyd84f89b2018-08-17 14:54:17 -070078 //@Property(name = "readPercentage", doubleValue = DEFAULT_READ_PERCENTAGE,
79 // label = "Percentage of reads to perform")
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070080 protected double readPercentage = DEFAULT_READ_PERCENTAGE;
81
Ray Milkeyd84f89b2018-08-17 14:54:17 -070082 //@Property(name = "totalOperationsPerTransaction", intValue = DEFAULT_TOTAL_OPERATIONS,
83 // label = "Number of operations to perform within each transaction")
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070084 protected int totalOperationsPerTransaction = DEFAULT_TOTAL_OPERATIONS;
85
Ray Milkeyd84f89b2018-08-17 14:54:17 -070086 //@Property(name = "withContention", boolValue = DEFAULT_WITH_CONTENTION,
87 // label = "Whether to test transactions with contention from all nodes")
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070088 protected boolean withContention = DEFAULT_WITH_CONTENTION;
89
Ray Milkeyd84f89b2018-08-17 14:54:17 -070090 //@Property(name = "withRetries", boolValue = DEFAULT_WITH_RETRIES,
91 // label = "Whether to retry transactions until success")
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070092 protected boolean withRetries = DEFAULT_WITH_RETRIES;
93
Ray Milkeyd84f89b2018-08-17 14:54:17 -070094 //@Property(name = "reportIntervalSeconds", intValue = DEFAULT_REPORT_INTERVAL_SECONDS,
95 // label = "The frequency with which to report performance in seconds")
Jordan Halterman5d0bbb72017-04-24 13:48:55 -070096 protected int reportIntervalSeconds = 1;
97
98 private ExecutorService testRunner =
99 Executors.newSingleThreadExecutor(Tools.groupedThreads("app/transaction-perf-test-runner", ""));
100
101 private ScheduledExecutorService reporter =
102 Executors.newSingleThreadScheduledExecutor(
103 groupedThreads("onos/transaction-perf-test", "reporter"));
104
105 private Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
106
107 private AtomicInteger attempted = new AtomicInteger(0);
108 private AtomicInteger succeeded = new AtomicInteger(0);
109 private AtomicInteger iteration = new AtomicInteger(0);
110
111 @Activate
112 public void activate(ComponentContext context) {
113 configService.registerProperties(getClass());
114 if (isParticipant()) {
115 startTest();
116 reporter.scheduleWithFixedDelay(this::reportPerformance,
117 reportIntervalSeconds,
118 reportIntervalSeconds,
119 TimeUnit.SECONDS);
120 logConfig("Started");
121 }
122 }
123
124 @Modified
125 public void modified(ComponentContext context) {
126 if (context == null) {
127 mapName = DEFAULT_MAP_NAME;
128 readPercentage = DEFAULT_READ_PERCENTAGE;
129 totalOperationsPerTransaction = DEFAULT_TOTAL_OPERATIONS;
130 withContention = DEFAULT_WITH_CONTENTION;
131 withRetries = DEFAULT_WITH_RETRIES;
132 reportIntervalSeconds = DEFAULT_REPORT_INTERVAL_SECONDS;
133 return;
134 }
135
136 Dictionary properties = context.getProperties();
137
138 String newMapName = mapName;
139 double newReadPercentage = readPercentage;
140 int newTotalOperationsPerTransaction = totalOperationsPerTransaction;
141 boolean newWithContention = withContention;
142 boolean newWithRetries = withRetries;
143 int newReportIntervalSeconds = reportIntervalSeconds;
144
145 try {
146 String s;
147
148 s = get(properties, "mapName");
149 if (!isNullOrEmpty(s)) {
150 newMapName = s;
151 }
152
153 s = get(properties, "readPercentage");
154 if (!isNullOrEmpty(s)) {
155 newReadPercentage = Double.parseDouble(s);
156 }
157
158 s = get(properties, "totalOperationsPerTransaction");
159 if (!isNullOrEmpty(s)) {
160 newTotalOperationsPerTransaction = Integer.parseInt(s);
161 }
162
163 s = get(properties, "withContention");
164 if (!isNullOrEmpty(s)) {
165 newWithContention = Boolean.parseBoolean(s);
166 }
167
168 s = get(properties, "withRetries");
169 if (!isNullOrEmpty(s)) {
170 newWithRetries = Boolean.parseBoolean(s);
171 }
172
173 s = get(properties, "reportIntervalSeconds");
174 if (!isNullOrEmpty(s)) {
175 newReportIntervalSeconds = Integer.parseInt(s);
176 }
177 } catch (NumberFormatException | ClassCastException e) {
178 return;
179 }
180
181 boolean modified = newMapName != mapName
182 || newReadPercentage != readPercentage
183 || newTotalOperationsPerTransaction != totalOperationsPerTransaction
184 || newWithContention != withContention
185 || newWithRetries != withRetries
186 || newReportIntervalSeconds != reportIntervalSeconds;
187
188 // If no configuration options have changed, skip restarting the test.
189 if (!modified) {
190 return;
191 }
192
193 mapName = newMapName;
194 readPercentage = newReadPercentage;
195 totalOperationsPerTransaction = newTotalOperationsPerTransaction;
196 withContention = newWithContention;
197 withRetries = newWithRetries;
198 reportIntervalSeconds = newReportIntervalSeconds;
199
200 // Restart the test.
201 stopTest();
202 testRunner = Executors.newSingleThreadExecutor(
203 groupedThreads("app/transaction-perf-test-runner", ""));
204 reporter = Executors.newSingleThreadScheduledExecutor(
205 groupedThreads("onos/transaction-perf-test", "reporter"));
206 startTest();
207 reporter.scheduleWithFixedDelay(this::reportPerformance,
208 reportIntervalSeconds,
209 reportIntervalSeconds,
210 TimeUnit.SECONDS);
211 logConfig("Restarted");
212 }
213
214 @Deactivate
215 public void deactivate(ComponentContext context) {
216 configService.unregisterProperties(getClass(), false);
217 stopTest();
218 log.info("Stopped");
219 }
220
221 private void logConfig(String prefix) {
222 log.info("{} with mapName = {}; readPercentage = {}; totalOperationsPerTransaction = {}; " +
223 "withContention = {}; withRetries = {}; reportIntervalSeconds = {}",
224 prefix, mapName, readPercentage, totalOperationsPerTransaction,
225 withContention, withRetries, reportIntervalSeconds);
226 }
227
228 /**
229 * Returns whether this node is a participant in the test.
230 *
231 * @return whether this node is a participant in the test
232 */
233 private boolean isParticipant() {
234 return withContention || clusterService.getLocalNode().id().equals(clusterService.getNodes().stream()
235 .map(ControllerNode::id)
236 .min(Comparator.naturalOrder()).get());
237 }
238
239 /**
240 * Initializes the map.
241 */
242 private void initializeMap() {
243 TransactionContext context = storageService.transactionContextBuilder().build();
244 context.begin();
245 try {
246 TransactionalMap<String, String> map = context.getTransactionalMap(mapName, serializer);
247 for (int i = 0; i < totalOperationsPerTransaction; i++) {
248 map.put(KEY_PREFIX + i, KEY_PREFIX + i);
249 }
250 context.commit().join();
251 } catch (Exception e) {
252 context.abort();
253 log.warn("An exception occurred during initialization: {}", e);
254 }
255 }
256
257 /**
258 * Starts the test.
259 */
260 private void startTest() {
261 logConfig("Started");
262 initializeMap();
263 runTest(iteration.getAndIncrement());
264 }
265
266 /**
267 * Runs the test.
268 */
269 private void runTest(int iteration) {
270 testRunner.execute(() -> {
271 // Attempt the transaction until successful if retries are enabled.
272 CommitStatus status = null;
273 do {
274 TransactionContext context = storageService.transactionContextBuilder().build();
275 context.begin();
276
277 try {
278 TransactionalMap<String, String> map = context.getTransactionalMap(mapName, serializer);
279 int reads = (int) (totalOperationsPerTransaction * readPercentage);
280 for (int i = 0; i < reads; i++) {
281 map.get(KEY_PREFIX + i);
282 }
283
284 int writes = (int) (totalOperationsPerTransaction * (1 - readPercentage));
285 for (int i = 0; i < writes; i++) {
286 map.put(KEY_PREFIX + i, KEY_PREFIX + iteration + i);
287 }
288
289 status = context.commit().join();
290 attempted.incrementAndGet();
291 } catch (Exception e) {
292 context.abort();
293 log.warn("An exception occurred during a transaction: {}", e);
294 }
295 } while (withRetries && status != CommitStatus.SUCCESS);
296
297 // If the transaction was successful, increment succeeded transactions.
298 if (status == CommitStatus.SUCCESS) {
299 succeeded.incrementAndGet();
300 }
301
302 runTest(this.iteration.getAndIncrement());
303 });
304 }
305
306 /**
307 * Reports transaction performance.
308 */
309 private void reportPerformance() {
310 log.info("Attempted: {} Succeeded: {} Total iterations: {}",
311 attempted.getAndSet(0),
312 succeeded.getAndSet(0),
313 iteration.get());
314 }
315
316 /**
317 * Stops a test.
318 */
319 private void stopTest() {
320 testRunner.shutdown();
321 reporter.shutdown();
322 }
323}