blob: b142282dbaf8dc7bd190c38497d97a029c755b00 [file] [log] [blame]
Madan Jampani5551f522016-02-23 18:27:19 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.loadtest;
17
18import static com.google.common.base.Strings.isNullOrEmpty;
19import static org.onlab.util.Tools.get;
20import static org.slf4j.LoggerFactory.getLogger;
21
22import java.util.Dictionary;
23import java.util.concurrent.ExecutorService;
24import java.util.concurrent.Executors;
25import java.util.concurrent.Semaphore;
26import java.util.concurrent.atomic.AtomicBoolean;
27import java.util.concurrent.atomic.AtomicLong;
28
29import org.apache.felix.scr.annotations.Activate;
30import org.apache.felix.scr.annotations.Component;
31import org.apache.felix.scr.annotations.Deactivate;
32import org.apache.felix.scr.annotations.Modified;
33import org.apache.felix.scr.annotations.Property;
34import org.apache.felix.scr.annotations.Reference;
35import org.apache.felix.scr.annotations.ReferenceCardinality;
36import org.apache.felix.scr.annotations.Service;
37import org.onosproject.cfg.ComponentConfigService;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
40import org.onosproject.store.service.AsyncAtomicCounter;
41import org.onosproject.store.service.StorageService;
42import org.osgi.service.component.ComponentContext;
43import org.slf4j.Logger;
44
45import com.google.common.util.concurrent.RateLimiter;
46
47/**
48 * Simple application for load testing distributed consensus.
49 * <p>
50 * This application simply increments as {@link AsyncAtomicCounter} at a configurable rate.
51 */
52@Component(immediate = true)
53@Service(value = DistributedConsensusLoadTest.class)
54public class DistributedConsensusLoadTest {
55
56 private final Logger log = getLogger(getClass());
57
58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
59 protected ComponentConfigService configService;
60
61 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
62 protected StorageService storageService;
63
64 private ApplicationId appId;
65
66 private AtomicBoolean stopped = new AtomicBoolean(false);
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected CoreService coreService;
70
71 private static final int DEFAULT_RATE = 100;
72
73 @Property(name = "rate", intValue = DEFAULT_RATE,
74 label = "Total number of increments per second to the atomic counter")
75 protected int rate = 0;
76
77 private AtomicLong lastValue = new AtomicLong(0);
78 private AtomicLong lastLoggedTime = new AtomicLong(0);
79 private AsyncAtomicCounter counter;
80 private ExecutorService testExecutor = Executors.newSingleThreadExecutor();
81
82 @Activate
83 public void activate(ComponentContext context) {
84 configService.registerProperties(getClass());
85 appId = coreService.registerApplication("org.onosproject.loadtest");
86 log.info("Started with {}", appId);
87 counter = storageService.atomicCounterBuilder()
88 .withName("onos-app-loadtest-counter")
89 .build();
90 modified(null);
91 }
92
93 private void startTest() {
94 stopped.set(false);
95 RateLimiter limiter = RateLimiter.create(rate);
96 Semaphore s = new Semaphore(100);
97 while (!stopped.get()) {
98 limiter.acquire();
99 s.acquireUninterruptibly();
100 counter.incrementAndGet().whenComplete((r, e) -> {
101 s.release();
102 long delta = System.currentTimeMillis() - lastLoggedTime.get();
103 if (e == null) {
104 if (delta > 1000) {
105 long tps = (long) ((r - lastValue.get()) * 1000.0) / delta;
106 lastValue.set(r);
107 lastLoggedTime.set(System.currentTimeMillis());
108 log.info("Rate: {}", tps);
109 }
110 }
111 });
112 }
113 }
114
115 private void stopTest() {
116 stopped.set(true);
117 }
118
119 @Deactivate
120 public void deactivate(ComponentContext context) {
121 configService.unregisterProperties(getClass(), false);
122 stopTest();
123 testExecutor.shutdown();
124 log.info("Stopped");
125 }
126
127 @Modified
128 public void modified(ComponentContext context) {
129 int newRate = DEFAULT_RATE;
130 if (context != null) {
131 Dictionary properties = context.getProperties();
132 try {
133 String s = get(properties, "rate");
134 newRate = isNullOrEmpty(s)
135 ? rate : Integer.parseInt(s.trim());
136 } catch (Exception e) {
137 return;
138 }
139 }
140 if (newRate != rate) {
141 log.info("Rate changed to {}", newRate);
142 rate = newRate;
143 stopTest();
144 testExecutor.execute(this::startTest);
145 }
146 }
147}