blob: 24697933e84fafc30f1386a223c9f417ae928e5d [file] [log] [blame]
Madan Jampani6cc224b2015-04-20 16:56:00 -07001package org.onosproject.messagingperf;
2
3import static com.google.common.base.Strings.isNullOrEmpty;
4import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
5import static org.onlab.util.Tools.get;
6import static org.onlab.util.Tools.groupedThreads;
7import static org.slf4j.LoggerFactory.getLogger;
8
9import java.util.Dictionary;
10import java.util.List;
11import java.util.Objects;
12import java.util.Set;
13import java.util.concurrent.CompletableFuture;
14import java.util.concurrent.Executor;
15import java.util.concurrent.ExecutorService;
16import java.util.concurrent.Executors;
17import java.util.concurrent.ScheduledExecutorService;
18import java.util.concurrent.TimeUnit;
19import java.util.concurrent.atomic.AtomicInteger;
20import java.util.function.Function;
21import java.util.stream.IntStream;
22
23import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Modified;
27import org.apache.felix.scr.annotations.Property;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
31import org.onlab.util.BoundedThreadPool;
32import org.onlab.util.KryoNamespace;
33import org.onosproject.cfg.ComponentConfigService;
34import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.NodeId;
36import org.onosproject.core.CoreService;
37import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
38import org.onosproject.store.cluster.messaging.MessageSubject;
39import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.serializers.KryoSerializer;
41import org.osgi.service.component.ComponentContext;
42import org.slf4j.Logger;
43
44import com.google.common.collect.ImmutableList;
45import com.google.common.collect.ImmutableSet;
46import com.google.common.collect.Lists;
47import com.google.common.collect.Sets;
48import com.google.common.util.concurrent.MoreExecutors;
49
50/**
51 * Application for measuring cluster messaging performance.
52 */
53@Component(immediate = true, enabled = true)
54@Service(value = MessagingPerfApp.class)
55public class MessagingPerfApp {
56 private final Logger log = getLogger(getClass());
57
58 @Reference(cardinality = MANDATORY_UNARY)
59 protected ClusterService clusterService;
60
61 @Reference(cardinality = MANDATORY_UNARY)
62 protected ClusterCommunicationService communicationService;
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
65 protected CoreService coreService;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 protected ComponentConfigService configService;
69
70 private static final MessageSubject TEST_UNICAST_MESSAGE_TOPIC =
71 new MessageSubject("net-perf-unicast-message");
72
73 private static final MessageSubject TEST_REQUEST_REPLY_TOPIC =
74 new MessageSubject("net-perf-rr-message");
75
76 private static final int DEFAULT_SENDER_THREAD_POOL_SIZE = 2;
77 private static final int DEFAULT_RECEIVER_THREAD_POOL_SIZE = 2;
78
79 @Property(name = "totalSenderThreads", intValue = DEFAULT_SENDER_THREAD_POOL_SIZE,
80 label = "Number of sender threads")
81 protected int totalSenderThreads = DEFAULT_SENDER_THREAD_POOL_SIZE;
82
83 @Property(name = "totalReceiverThreads", intValue = DEFAULT_RECEIVER_THREAD_POOL_SIZE,
84 label = "Number of receiver threads")
85 protected int totalReceiverThreads = DEFAULT_RECEIVER_THREAD_POOL_SIZE;
86
87 @Property(name = "serializationOn", boolValue = true,
88 label = "Turn serialization on/off")
89 private boolean serializationOn = true;
90
91 @Property(name = "receiveOnIOLoopThread", boolValue = false,
92 label = "Set this to true to handle message on IO thread")
93 private boolean receiveOnIOLoopThread = false;
94
95 protected int reportIntervalSeconds = 1;
96
97 private Executor messageReceivingExecutor;
98
99 private ExecutorService messageSendingExecutor =
100 BoundedThreadPool.newFixedThreadPool(totalSenderThreads,
101 groupedThreads("onos/messaging-perf-test", "sender-%d"));
102
103 private final ScheduledExecutorService reporter =
104 Executors.newSingleThreadScheduledExecutor(
105 groupedThreads("onos/net-perf-test", "reporter"));
106
107 private AtomicInteger received = new AtomicInteger(0);
108 private AtomicInteger sent = new AtomicInteger(0);
109 private AtomicInteger attempted = new AtomicInteger(0);
110 private AtomicInteger completed = new AtomicInteger(0);
111
112 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
113 @Override
114 protected void setupKryoPool() {
115 serializerPool = KryoNamespace.newBuilder()
116 .register(KryoNamespaces.BASIC)
117 .register(KryoNamespaces.MISC)
118 .register(byte[].class)
119 .register(Data.class)
120 .build();
121 }
122 };
123
124 private final Data data = new Data().withStringField("test")
125 .withListField(Lists.newArrayList("1", "2", "3"))
126 .withSetField(Sets.newHashSet("1", "2", "3"));
127 private final byte[] dataBytes = SERIALIZER.encode(new Data().withStringField("test")
128 .withListField(Lists.newArrayList("1", "2", "3"))
129 .withSetField(Sets.newHashSet("1", "2", "3")));
130
131 private Function<Data, byte[]> encoder;
132 private Function<byte[], Data> decoder;
133
134 @Activate
135 public void activate(ComponentContext context) {
136 configService.registerProperties(getClass());
137 setupCodecs();
138 messageReceivingExecutor = receiveOnIOLoopThread
139 ? MoreExecutors.directExecutor()
140 : Executors.newFixedThreadPool(
141 totalReceiverThreads,
142 groupedThreads("onos/net-perf-test", "receiver-%d"));
143 registerMessageHandlers();
144 startTest();
145 reporter.scheduleWithFixedDelay(this::reportPerformance,
146 reportIntervalSeconds,
147 reportIntervalSeconds,
148 TimeUnit.SECONDS);
149 logConfig("Started");
150 }
151
152 @Deactivate
153 public void deactivate(ComponentContext context) {
154 configService.unregisterProperties(getClass(), false);
155 stopTest();
156 reporter.shutdown();
157 unregisterMessageHandlers();
158 log.info("Stopped.");
159 }
160
161 @Modified
162 public void modified(ComponentContext context) {
163 if (context == null) {
164 totalSenderThreads = DEFAULT_SENDER_THREAD_POOL_SIZE;
165 totalReceiverThreads = DEFAULT_RECEIVER_THREAD_POOL_SIZE;
166 serializationOn = true;
167 receiveOnIOLoopThread = false;
168 return;
169 }
170
171 Dictionary properties = context.getProperties();
172
173 int newTotalSenderThreads = totalSenderThreads;
174 int newTotalReceiverThreads = totalReceiverThreads;
175 boolean newSerializationOn = serializationOn;
176 boolean newReceiveOnIOLoopThread = receiveOnIOLoopThread;
177 try {
178 String s = get(properties, "totalSenderThreads");
179 newTotalSenderThreads = isNullOrEmpty(s)
180 ? totalSenderThreads : Integer.parseInt(s.trim());
181
182 s = get(properties, "totalReceiverThreads");
183 newTotalReceiverThreads = isNullOrEmpty(s)
184 ? totalReceiverThreads : Integer.parseInt(s.trim());
185
186 s = get(properties, "serializationOn");
187 newSerializationOn = isNullOrEmpty(s)
188 ? serializationOn : Boolean.parseBoolean(s.trim());
189
190 s = get(properties, "receiveOnIOLoopThread");
191 newReceiveOnIOLoopThread = isNullOrEmpty(s)
192 ? receiveOnIOLoopThread : Boolean.parseBoolean(s.trim());
193
194 } catch (NumberFormatException | ClassCastException e) {
195 return;
196 }
197
198 boolean modified = newTotalSenderThreads != totalSenderThreads ||
199 newTotalReceiverThreads != totalReceiverThreads ||
200 newSerializationOn != serializationOn ||
201 newReceiveOnIOLoopThread != receiveOnIOLoopThread;
202
203 // If nothing has changed, simply return.
204 if (!modified) {
205 return;
206 }
207
208 totalSenderThreads = newTotalSenderThreads;
209 totalReceiverThreads = newTotalReceiverThreads;
210 serializationOn = newSerializationOn;
211 if (!receiveOnIOLoopThread && newReceiveOnIOLoopThread != receiveOnIOLoopThread) {
212 ((ExecutorService) messageReceivingExecutor).shutdown();
213 }
214 receiveOnIOLoopThread = newReceiveOnIOLoopThread;
215
216 // restart test.
217
218 stopTest();
219 unregisterMessageHandlers();
220 setupCodecs();
221 messageSendingExecutor =
222 BoundedThreadPool.newFixedThreadPool(
223 totalSenderThreads,
224 groupedThreads("onos/net-perf-test", "sender-%d"));
225 messageReceivingExecutor = receiveOnIOLoopThread
226 ? MoreExecutors.directExecutor()
227 : Executors.newFixedThreadPool(
228 totalReceiverThreads,
229 groupedThreads("onos/net-perf-test", "receiver-%d"));
230
231 registerMessageHandlers();
232 startTest();
233
234 logConfig("Reconfigured");
235 }
236
237
238 private void logConfig(String prefix) {
239 log.info("{} with senderThreadPoolSize = {}; receivingThreadPoolSize = {}"
240 + " serializationOn = {}, receiveOnIOLoopThread = {}",
241 prefix,
242 totalSenderThreads,
243 totalReceiverThreads,
244 serializationOn,
245 receiveOnIOLoopThread);
246 }
247
248 private void setupCodecs() {
249 encoder = serializationOn ? SERIALIZER::encode : d -> dataBytes;
250 decoder = serializationOn ? SERIALIZER::decode : b -> data;
251 }
252
253 private void registerMessageHandlers() {
254 communicationService.<Data>addSubscriber(
255 TEST_UNICAST_MESSAGE_TOPIC,
256 decoder,
257 d -> { received.incrementAndGet(); },
258 messageReceivingExecutor);
259
260 communicationService.<Data, Data>addSubscriber(
261 TEST_REQUEST_REPLY_TOPIC,
262 decoder,
263 Function.identity(),
264 encoder,
265 messageReceivingExecutor);
266 }
267
268 private void unregisterMessageHandlers() {
269 communicationService.removeSubscriber(TEST_UNICAST_MESSAGE_TOPIC);
270 communicationService.removeSubscriber(TEST_REQUEST_REPLY_TOPIC);
271 }
272
273 private void startTest() {
274 IntStream.range(0, totalSenderThreads).forEach(i -> requestReply());
275 }
276
277 private void stopTest() {
278 messageSendingExecutor.shutdown();
279 }
280
281 private void requestReply() {
282 try {
283 attempted.incrementAndGet();
284 CompletableFuture<Data> response =
285 communicationService.<Data, Data>sendAndReceive(
286 data,
287 TEST_REQUEST_REPLY_TOPIC,
288 encoder,
289 decoder,
290 randomPeer());
291 response.whenComplete((result, error) -> {
292 if (Objects.equals(data, result)) {
293 completed.incrementAndGet();
294 }
295 messageSendingExecutor.submit(this::requestReply);
296 });
297 } catch (Exception e) {
298 e.printStackTrace();
299 }
300 }
301
302 private void unicast() {
303 try {
304 sent.incrementAndGet();
305 communicationService.<Data>unicast(
306 data,
307 TEST_UNICAST_MESSAGE_TOPIC,
308 encoder,
309 randomPeer());
310 } catch (Exception e) {
311 e.printStackTrace();
312 }
313 messageSendingExecutor.submit(this::unicast);
314 }
315
316 private void broadcast() {
317 try {
318 sent.incrementAndGet();
319 communicationService.<Data>broadcast(
320 data,
321 TEST_UNICAST_MESSAGE_TOPIC,
322 encoder);
323 } catch (Exception e) {
324 e.printStackTrace();
325 }
326 messageSendingExecutor.submit(this::broadcast);
327 }
328
329 private NodeId randomPeer() {
330 return clusterService.getNodes()
331 .stream()
332 .filter(node -> clusterService.getLocalNode().equals(node))
333 .findAny()
334 .get()
335 .id();
336 }
337
338 private void reportPerformance() {
339 log.info("Attempted: {} Completed: {}", attempted.getAndSet(0), completed.getAndSet(0));
340 }
341
342 private static class Data {
343 private String stringField;
344 private List<String> listField;
345 private Set<String> setField;
346
347 public Data withStringField(String value) {
348 stringField = value;
349 return this;
350 }
351
352 public Data withListField(List<String> value) {
353 listField = ImmutableList.copyOf(value);
354 return this;
355 }
356
357 public Data withSetField(Set<String> value) {
358 setField = ImmutableSet.copyOf(value);
359 return this;
360 }
361
362 @Override
363 public int hashCode() {
364 return Objects.hash(stringField, listField, setField);
365 }
366
367 @Override
368 public boolean equals(Object other) {
369 if (other instanceof Data) {
370 Data that = (Data) other;
371 return Objects.equals(this.stringField, that.stringField) &&
372 Objects.equals(this.listField, that.listField) &&
373 Objects.equals(this.setField, that.setField);
374 }
375 return false;
376 }
377 }
378}