blob: f37b19a70b0711b3c10bea4b58c5417b495ebe4c [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampani6cc224b2015-04-20 16:56:00 -070016package org.onosproject.messagingperf;
17
Ray Milkeyd84f89b2018-08-17 14:54:17 -070018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.ImmutableSet;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Sets;
22import com.google.common.util.concurrent.MoreExecutors;
23import org.onlab.util.BoundedThreadPool;
24import org.onlab.util.KryoNamespace;
25import org.onosproject.cfg.ComponentConfigService;
26import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.CoreService;
29import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
30import org.onosproject.store.cluster.messaging.MessageSubject;
31import org.onosproject.store.serializers.KryoNamespaces;
32import org.onosproject.store.service.Serializer;
33import org.osgi.service.component.ComponentContext;
34import org.osgi.service.component.annotations.Activate;
35import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.annotations.Deactivate;
37import org.osgi.service.component.annotations.Modified;
38import org.osgi.service.component.annotations.Reference;
39import org.slf4j.Logger;
40
Madan Jampani6cc224b2015-04-20 16:56:00 -070041import java.util.Dictionary;
42import java.util.List;
43import java.util.Objects;
44import java.util.Set;
45import java.util.concurrent.CompletableFuture;
46import java.util.concurrent.Executor;
47import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
49import java.util.concurrent.ScheduledExecutorService;
50import java.util.concurrent.TimeUnit;
51import java.util.concurrent.atomic.AtomicInteger;
52import java.util.function.Function;
53import java.util.stream.IntStream;
54
Ray Milkeya18e2a62017-05-30 15:36:09 -070055import static com.google.common.base.Strings.isNullOrEmpty;
Ray Milkeya18e2a62017-05-30 15:36:09 -070056import static org.onlab.util.Tools.get;
57import static org.onlab.util.Tools.groupedThreads;
Ray Milkey88dd7e22018-10-24 10:04:03 -070058import static org.onosproject.messagingperf.OsgiPropertyConstants.RECEIVER_THREAD_POOL_SIZE;
59import static org.onosproject.messagingperf.OsgiPropertyConstants.RECEIVER_THREAD_POOL_SIZE_DEFAULT;
60import static org.onosproject.messagingperf.OsgiPropertyConstants.RECEIVE_ON_IO_LOOP_THREAD;
61import static org.onosproject.messagingperf.OsgiPropertyConstants.RECEIVE_ON_IO_LOOP_THREAD_DEFAULT;
62import static org.onosproject.messagingperf.OsgiPropertyConstants.SENDER_THREAD_POOL_SIZE;
63import static org.onosproject.messagingperf.OsgiPropertyConstants.SENDER_THREAD_POOL_SIZE_DEFAULT;
64import static org.onosproject.messagingperf.OsgiPropertyConstants.SERIALIZATION_ON;
65import static org.onosproject.messagingperf.OsgiPropertyConstants.SERIALIZATION_ON_DEFAULT;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070066import static org.osgi.service.component.annotations.ReferenceCardinality.MANDATORY;
Ray Milkeya18e2a62017-05-30 15:36:09 -070067import static org.slf4j.LoggerFactory.getLogger;
68
Madan Jampani6cc224b2015-04-20 16:56:00 -070069/**
70 * Application for measuring cluster messaging performance.
71 */
Ray Milkey88dd7e22018-10-24 10:04:03 -070072@Component(
73 immediate = true,
74 service = MessagingPerfApp.class,
75 property = {
76 SENDER_THREAD_POOL_SIZE + ":Integer=" + SENDER_THREAD_POOL_SIZE_DEFAULT,
77 RECEIVER_THREAD_POOL_SIZE + ":Integer=" + RECEIVER_THREAD_POOL_SIZE_DEFAULT,
78 SERIALIZATION_ON + ":Boolean=" + SERIALIZATION_ON_DEFAULT,
79 RECEIVE_ON_IO_LOOP_THREAD + ":Boolean=" + RECEIVE_ON_IO_LOOP_THREAD_DEFAULT
80 }
81)
Madan Jampani6cc224b2015-04-20 16:56:00 -070082public class MessagingPerfApp {
83 private final Logger log = getLogger(getClass());
84
Ray Milkeyd84f89b2018-08-17 14:54:17 -070085 @Reference(cardinality = MANDATORY)
Madan Jampani6cc224b2015-04-20 16:56:00 -070086 protected ClusterService clusterService;
87
Ray Milkeyd84f89b2018-08-17 14:54:17 -070088 @Reference(cardinality = MANDATORY)
Madan Jampani6cc224b2015-04-20 16:56:00 -070089 protected ClusterCommunicationService communicationService;
90
Ray Milkeyd84f89b2018-08-17 14:54:17 -070091 @Reference(cardinality = MANDATORY)
Madan Jampani6cc224b2015-04-20 16:56:00 -070092 protected CoreService coreService;
93
Ray Milkeyd84f89b2018-08-17 14:54:17 -070094 @Reference(cardinality = MANDATORY)
Madan Jampani6cc224b2015-04-20 16:56:00 -070095 protected ComponentConfigService configService;
96
97 private static final MessageSubject TEST_UNICAST_MESSAGE_TOPIC =
98 new MessageSubject("net-perf-unicast-message");
99
100 private static final MessageSubject TEST_REQUEST_REPLY_TOPIC =
101 new MessageSubject("net-perf-rr-message");
102
Ray Milkey88dd7e22018-10-24 10:04:03 -0700103 /** Number of sender threads. */
104 private int totalSenderThreads = SENDER_THREAD_POOL_SIZE_DEFAULT;
Madan Jampani6cc224b2015-04-20 16:56:00 -0700105
Ray Milkey88dd7e22018-10-24 10:04:03 -0700106 /** Number of receiver threads. */
107 private int totalReceiverThreads = RECEIVER_THREAD_POOL_SIZE_DEFAULT;
Madan Jampani6cc224b2015-04-20 16:56:00 -0700108
Ray Milkey88dd7e22018-10-24 10:04:03 -0700109 /** Turn serialization on/off. */
110 private boolean serializationOn = SERIALIZATION_ON_DEFAULT;
Madan Jampani6cc224b2015-04-20 16:56:00 -0700111
Ray Milkey88dd7e22018-10-24 10:04:03 -0700112 /** Set this to true to handle message on IO thread. */
113 private boolean receiveOnIOLoopThread = RECEIVE_ON_IO_LOOP_THREAD_DEFAULT;
Madan Jampani6cc224b2015-04-20 16:56:00 -0700114
Ray Milkey88dd7e22018-10-24 10:04:03 -0700115 private int reportIntervalSeconds = 1;
Madan Jampani6cc224b2015-04-20 16:56:00 -0700116
117 private Executor messageReceivingExecutor;
118
119 private ExecutorService messageSendingExecutor =
120 BoundedThreadPool.newFixedThreadPool(totalSenderThreads,
121 groupedThreads("onos/messaging-perf-test", "sender-%d"));
122
123 private final ScheduledExecutorService reporter =
124 Executors.newSingleThreadScheduledExecutor(
125 groupedThreads("onos/net-perf-test", "reporter"));
126
127 private AtomicInteger received = new AtomicInteger(0);
128 private AtomicInteger sent = new AtomicInteger(0);
129 private AtomicInteger attempted = new AtomicInteger(0);
130 private AtomicInteger completed = new AtomicInteger(0);
131
Ray Milkeya18e2a62017-05-30 15:36:09 -0700132 private static final Serializer SERIALIZER = Serializer
133 .using(
134 KryoNamespace.newBuilder()
Madan Jampani6cc224b2015-04-20 16:56:00 -0700135 .register(KryoNamespaces.BASIC)
136 .register(KryoNamespaces.MISC)
Madan Jampani6cc224b2015-04-20 16:56:00 -0700137 .register(Data.class)
Ray Milkeya18e2a62017-05-30 15:36:09 -0700138 .build("MessagingPerfApp"));
Madan Jampani6cc224b2015-04-20 16:56:00 -0700139
140 private final Data data = new Data().withStringField("test")
141 .withListField(Lists.newArrayList("1", "2", "3"))
142 .withSetField(Sets.newHashSet("1", "2", "3"));
143 private final byte[] dataBytes = SERIALIZER.encode(new Data().withStringField("test")
144 .withListField(Lists.newArrayList("1", "2", "3"))
145 .withSetField(Sets.newHashSet("1", "2", "3")));
146
147 private Function<Data, byte[]> encoder;
148 private Function<byte[], Data> decoder;
149
150 @Activate
151 public void activate(ComponentContext context) {
152 configService.registerProperties(getClass());
153 setupCodecs();
154 messageReceivingExecutor = receiveOnIOLoopThread
155 ? MoreExecutors.directExecutor()
156 : Executors.newFixedThreadPool(
157 totalReceiverThreads,
158 groupedThreads("onos/net-perf-test", "receiver-%d"));
159 registerMessageHandlers();
160 startTest();
161 reporter.scheduleWithFixedDelay(this::reportPerformance,
162 reportIntervalSeconds,
163 reportIntervalSeconds,
164 TimeUnit.SECONDS);
165 logConfig("Started");
166 }
167
168 @Deactivate
169 public void deactivate(ComponentContext context) {
170 configService.unregisterProperties(getClass(), false);
171 stopTest();
172 reporter.shutdown();
173 unregisterMessageHandlers();
174 log.info("Stopped.");
175 }
176
177 @Modified
178 public void modified(ComponentContext context) {
179 if (context == null) {
Ray Milkey88dd7e22018-10-24 10:04:03 -0700180 totalSenderThreads = SENDER_THREAD_POOL_SIZE_DEFAULT;
181 totalReceiverThreads = RECEIVER_THREAD_POOL_SIZE_DEFAULT;
Madan Jampani6cc224b2015-04-20 16:56:00 -0700182 serializationOn = true;
183 receiveOnIOLoopThread = false;
184 return;
185 }
186
187 Dictionary properties = context.getProperties();
188
189 int newTotalSenderThreads = totalSenderThreads;
190 int newTotalReceiverThreads = totalReceiverThreads;
191 boolean newSerializationOn = serializationOn;
192 boolean newReceiveOnIOLoopThread = receiveOnIOLoopThread;
193 try {
Ray Milkey88dd7e22018-10-24 10:04:03 -0700194 String s = get(properties, SENDER_THREAD_POOL_SIZE);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700195 newTotalSenderThreads = isNullOrEmpty(s)
196 ? totalSenderThreads : Integer.parseInt(s.trim());
197
Ray Milkey88dd7e22018-10-24 10:04:03 -0700198 s = get(properties, RECEIVER_THREAD_POOL_SIZE);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700199 newTotalReceiverThreads = isNullOrEmpty(s)
200 ? totalReceiverThreads : Integer.parseInt(s.trim());
201
Ray Milkey88dd7e22018-10-24 10:04:03 -0700202 s = get(properties, SERIALIZATION_ON);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700203 newSerializationOn = isNullOrEmpty(s)
204 ? serializationOn : Boolean.parseBoolean(s.trim());
205
Ray Milkey88dd7e22018-10-24 10:04:03 -0700206 s = get(properties, RECEIVE_ON_IO_LOOP_THREAD);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700207 newReceiveOnIOLoopThread = isNullOrEmpty(s)
208 ? receiveOnIOLoopThread : Boolean.parseBoolean(s.trim());
209
210 } catch (NumberFormatException | ClassCastException e) {
211 return;
212 }
213
214 boolean modified = newTotalSenderThreads != totalSenderThreads ||
215 newTotalReceiverThreads != totalReceiverThreads ||
216 newSerializationOn != serializationOn ||
217 newReceiveOnIOLoopThread != receiveOnIOLoopThread;
218
219 // If nothing has changed, simply return.
220 if (!modified) {
221 return;
222 }
223
224 totalSenderThreads = newTotalSenderThreads;
225 totalReceiverThreads = newTotalReceiverThreads;
226 serializationOn = newSerializationOn;
227 if (!receiveOnIOLoopThread && newReceiveOnIOLoopThread != receiveOnIOLoopThread) {
228 ((ExecutorService) messageReceivingExecutor).shutdown();
229 }
230 receiveOnIOLoopThread = newReceiveOnIOLoopThread;
231
232 // restart test.
233
234 stopTest();
235 unregisterMessageHandlers();
236 setupCodecs();
237 messageSendingExecutor =
238 BoundedThreadPool.newFixedThreadPool(
239 totalSenderThreads,
240 groupedThreads("onos/net-perf-test", "sender-%d"));
241 messageReceivingExecutor = receiveOnIOLoopThread
242 ? MoreExecutors.directExecutor()
243 : Executors.newFixedThreadPool(
244 totalReceiverThreads,
245 groupedThreads("onos/net-perf-test", "receiver-%d"));
246
247 registerMessageHandlers();
248 startTest();
249
250 logConfig("Reconfigured");
251 }
252
253
254 private void logConfig(String prefix) {
255 log.info("{} with senderThreadPoolSize = {}; receivingThreadPoolSize = {}"
256 + " serializationOn = {}, receiveOnIOLoopThread = {}",
257 prefix,
258 totalSenderThreads,
259 totalReceiverThreads,
260 serializationOn,
261 receiveOnIOLoopThread);
262 }
263
264 private void setupCodecs() {
265 encoder = serializationOn ? SERIALIZER::encode : d -> dataBytes;
266 decoder = serializationOn ? SERIALIZER::decode : b -> data;
267 }
268
269 private void registerMessageHandlers() {
270 communicationService.<Data>addSubscriber(
271 TEST_UNICAST_MESSAGE_TOPIC,
272 decoder,
Ray Milkey88cc3432017-03-30 17:19:08 -0700273 d -> {
274 received.incrementAndGet();
275 },
Madan Jampani6cc224b2015-04-20 16:56:00 -0700276 messageReceivingExecutor);
277
278 communicationService.<Data, Data>addSubscriber(
279 TEST_REQUEST_REPLY_TOPIC,
280 decoder,
281 Function.identity(),
282 encoder,
283 messageReceivingExecutor);
284 }
285
286 private void unregisterMessageHandlers() {
287 communicationService.removeSubscriber(TEST_UNICAST_MESSAGE_TOPIC);
288 communicationService.removeSubscriber(TEST_REQUEST_REPLY_TOPIC);
289 }
290
291 private void startTest() {
292 IntStream.range(0, totalSenderThreads).forEach(i -> requestReply());
293 }
294
295 private void stopTest() {
296 messageSendingExecutor.shutdown();
297 }
298
299 private void requestReply() {
300 try {
301 attempted.incrementAndGet();
302 CompletableFuture<Data> response =
303 communicationService.<Data, Data>sendAndReceive(
304 data,
305 TEST_REQUEST_REPLY_TOPIC,
306 encoder,
307 decoder,
308 randomPeer());
309 response.whenComplete((result, error) -> {
310 if (Objects.equals(data, result)) {
311 completed.incrementAndGet();
312 }
313 messageSendingExecutor.submit(this::requestReply);
314 });
315 } catch (Exception e) {
Ray Milkey4fd3ceb2015-12-10 14:43:08 -0800316 log.info("requestReply()", e);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700317 }
318 }
319
320 private void unicast() {
321 try {
322 sent.incrementAndGet();
323 communicationService.<Data>unicast(
324 data,
325 TEST_UNICAST_MESSAGE_TOPIC,
326 encoder,
327 randomPeer());
328 } catch (Exception e) {
Ray Milkey4fd3ceb2015-12-10 14:43:08 -0800329 log.info("unicast()", e);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700330 }
331 messageSendingExecutor.submit(this::unicast);
332 }
333
334 private void broadcast() {
335 try {
336 sent.incrementAndGet();
337 communicationService.<Data>broadcast(
338 data,
339 TEST_UNICAST_MESSAGE_TOPIC,
340 encoder);
341 } catch (Exception e) {
Ray Milkey4fd3ceb2015-12-10 14:43:08 -0800342 log.info("broadcast()", e);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700343 }
344 messageSendingExecutor.submit(this::broadcast);
345 }
346
347 private NodeId randomPeer() {
348 return clusterService.getNodes()
349 .stream()
350 .filter(node -> clusterService.getLocalNode().equals(node))
351 .findAny()
352 .get()
353 .id();
354 }
355
356 private void reportPerformance() {
357 log.info("Attempted: {} Completed: {}", attempted.getAndSet(0), completed.getAndSet(0));
358 }
359
360 private static class Data {
361 private String stringField;
362 private List<String> listField;
363 private Set<String> setField;
364
365 public Data withStringField(String value) {
366 stringField = value;
367 return this;
368 }
369
370 public Data withListField(List<String> value) {
371 listField = ImmutableList.copyOf(value);
372 return this;
373 }
374
375 public Data withSetField(Set<String> value) {
376 setField = ImmutableSet.copyOf(value);
377 return this;
378 }
379
380 @Override
381 public int hashCode() {
382 return Objects.hash(stringField, listField, setField);
383 }
384
385 @Override
386 public boolean equals(Object other) {
387 if (other instanceof Data) {
388 Data that = (Data) other;
389 return Objects.equals(this.stringField, that.stringField) &&
390 Objects.equals(this.listField, that.listField) &&
391 Objects.equals(this.setField, that.setField);
392 }
393 return false;
394 }
395 }
396}