blob: 37a36c3d883dc827ac87393352a4831a40ee700f [file] [log] [blame]
Madan Jampani5551f522016-02-23 18:27:19 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani5551f522016-02-23 18:27:19 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.loadtest;
17
18import static com.google.common.base.Strings.isNullOrEmpty;
19import static org.onlab.util.Tools.get;
20import static org.slf4j.LoggerFactory.getLogger;
21
22import java.util.Dictionary;
Madan Jampaniee3736e2016-03-29 23:40:39 -070023import java.util.List;
Madan Jampani5551f522016-02-23 18:27:19 -080024import java.util.concurrent.Executors;
Madan Jampaniee3736e2016-03-29 23:40:39 -070025import java.util.concurrent.ScheduledExecutorService;
Madan Jampani5551f522016-02-23 18:27:19 -080026import java.util.concurrent.Semaphore;
Madan Jampaniee3736e2016-03-29 23:40:39 -070027import java.util.concurrent.TimeUnit;
Madan Jampani5551f522016-02-23 18:27:19 -080028import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampaniee3736e2016-03-29 23:40:39 -070029import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani5551f522016-02-23 18:27:19 -080030import java.util.concurrent.atomic.AtomicLong;
Madan Jampaniee3736e2016-03-29 23:40:39 -070031import java.util.stream.Collectors;
Madan Jampani5551f522016-02-23 18:27:19 -080032
Madan Jampaniee3736e2016-03-29 23:40:39 -070033import org.apache.commons.lang.math.RandomUtils;
Madan Jampani5551f522016-02-23 18:27:19 -080034import org.apache.felix.scr.annotations.Activate;
35import org.apache.felix.scr.annotations.Component;
36import org.apache.felix.scr.annotations.Deactivate;
37import org.apache.felix.scr.annotations.Modified;
38import org.apache.felix.scr.annotations.Property;
39import org.apache.felix.scr.annotations.Reference;
40import org.apache.felix.scr.annotations.ReferenceCardinality;
41import org.apache.felix.scr.annotations.Service;
Madan Jampaniee3736e2016-03-29 23:40:39 -070042import org.onlab.util.Tools;
Madan Jampani5551f522016-02-23 18:27:19 -080043import org.onosproject.cfg.ComponentConfigService;
44import org.onosproject.core.ApplicationId;
45import org.onosproject.core.CoreService;
46import org.onosproject.store.service.AsyncAtomicCounter;
47import org.onosproject.store.service.StorageService;
48import org.osgi.service.component.ComponentContext;
49import org.slf4j.Logger;
50
Madan Jampaniee3736e2016-03-29 23:40:39 -070051import com.google.common.collect.Lists;
Madan Jampani5551f522016-02-23 18:27:19 -080052import com.google.common.util.concurrent.RateLimiter;
53
54/**
55 * Simple application for load testing distributed consensus.
56 * <p>
57 * This application simply increments as {@link AsyncAtomicCounter} at a configurable rate.
58 */
59@Component(immediate = true)
60@Service(value = DistributedConsensusLoadTest.class)
61public class DistributedConsensusLoadTest {
62
63 private final Logger log = getLogger(getClass());
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected ComponentConfigService configService;
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected StorageService storageService;
70
71 private ApplicationId appId;
72
73 private AtomicBoolean stopped = new AtomicBoolean(false);
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected CoreService coreService;
77
78 private static final int DEFAULT_RATE = 100;
Madan Jampaniee3736e2016-03-29 23:40:39 -070079 private static final int TOTAL_COUNTERS = 50;
Madan Jampani5551f522016-02-23 18:27:19 -080080
81 @Property(name = "rate", intValue = DEFAULT_RATE,
82 label = "Total number of increments per second to the atomic counter")
83 protected int rate = 0;
84
Madan Jampaniee3736e2016-03-29 23:40:39 -070085 private final AtomicLong previousReportTime = new AtomicLong(0);
86 private final AtomicLong previousCount = new AtomicLong(0);
87 private final AtomicInteger increments = new AtomicInteger(0);
88 private final List<AsyncAtomicCounter> counters = Lists.newArrayList();
89 private final ScheduledExecutorService runner = Executors.newSingleThreadScheduledExecutor();
90 private final ScheduledExecutorService reporter = Executors.newSingleThreadScheduledExecutor();
Madan Jampani5551f522016-02-23 18:27:19 -080091
92 @Activate
93 public void activate(ComponentContext context) {
94 configService.registerProperties(getClass());
95 appId = coreService.registerApplication("org.onosproject.loadtest");
96 log.info("Started with {}", appId);
Madan Jampaniee3736e2016-03-29 23:40:39 -070097 for (int i = 0; i < TOTAL_COUNTERS; ++i) {
Madan Jampanid5714e02016-04-19 14:15:20 -070098 AsyncAtomicCounter counter =
99 storageService.getAsyncAtomicCounter(String.format("onos-app-loadtest-counter-%d", i));
Madan Jampaniee3736e2016-03-29 23:40:39 -0700100 counters.add(counter);
101 }
102 reporter.scheduleWithFixedDelay(() -> {
103 Tools.allOf(counters.stream()
104 .map(AsyncAtomicCounter::get)
105 .collect(Collectors.toList()))
106 .whenComplete((r, e) -> {
107 if (e == null) {
108 long newCount = r.stream().reduce(Long::sum).get();
109 long currentTime = System.currentTimeMillis();
110 long delta = currentTime - previousReportTime.getAndSet(currentTime);
111 long rate = (newCount - previousCount.getAndSet(newCount)) * 1000 / delta;
112 log.info("{} updates per second", rate);
113 } else {
114 log.warn(e.getMessage());
115 }
116 });
117 }, 5, 5, TimeUnit.SECONDS);
Madan Jampani5551f522016-02-23 18:27:19 -0800118 modified(null);
119 }
120
121 private void startTest() {
122 stopped.set(false);
123 RateLimiter limiter = RateLimiter.create(rate);
124 Semaphore s = new Semaphore(100);
125 while (!stopped.get()) {
126 limiter.acquire();
127 s.acquireUninterruptibly();
Madan Jampaniee3736e2016-03-29 23:40:39 -0700128 counters.get(RandomUtils.nextInt(TOTAL_COUNTERS)).incrementAndGet().whenComplete((r, e) -> {
Madan Jampani5551f522016-02-23 18:27:19 -0800129 s.release();
Madan Jampani5551f522016-02-23 18:27:19 -0800130 if (e == null) {
Madan Jampaniee3736e2016-03-29 23:40:39 -0700131 increments.incrementAndGet();
Madan Jampani5551f522016-02-23 18:27:19 -0800132 }
133 });
134 }
135 }
136
137 private void stopTest() {
138 stopped.set(true);
139 }
140
141 @Deactivate
142 public void deactivate(ComponentContext context) {
143 configService.unregisterProperties(getClass(), false);
144 stopTest();
Madan Jampaniee3736e2016-03-29 23:40:39 -0700145 runner.shutdown();
146 reporter.shutdown();
Madan Jampani5551f522016-02-23 18:27:19 -0800147 log.info("Stopped");
148 }
149
150 @Modified
151 public void modified(ComponentContext context) {
152 int newRate = DEFAULT_RATE;
153 if (context != null) {
154 Dictionary properties = context.getProperties();
155 try {
156 String s = get(properties, "rate");
157 newRate = isNullOrEmpty(s)
158 ? rate : Integer.parseInt(s.trim());
159 } catch (Exception e) {
160 return;
161 }
162 }
163 if (newRate != rate) {
Madan Jampaniee3736e2016-03-29 23:40:39 -0700164 log.info("Per node rate changed to {}", newRate);
Madan Jampani5551f522016-02-23 18:27:19 -0800165 rate = newRate;
166 stopTest();
Madan Jampaniee3736e2016-03-29 23:40:39 -0700167 runner.execute(this::startTest);
Madan Jampani5551f522016-02-23 18:27:19 -0800168 }
169 }
170}