blob: c1b251314baa532f7170cee0f7a3ef7db81f315e [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampani6cc224b2015-04-20 16:56:00 -070016package org.onosproject.messagingperf;
17
Ray Milkeyd84f89b2018-08-17 14:54:17 -070018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.ImmutableSet;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Sets;
22import com.google.common.util.concurrent.MoreExecutors;
23import org.onlab.util.BoundedThreadPool;
24import org.onlab.util.KryoNamespace;
25import org.onosproject.cfg.ComponentConfigService;
26import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.CoreService;
29import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
30import org.onosproject.store.cluster.messaging.MessageSubject;
31import org.onosproject.store.serializers.KryoNamespaces;
32import org.onosproject.store.service.Serializer;
33import org.osgi.service.component.ComponentContext;
34import org.osgi.service.component.annotations.Activate;
35import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.annotations.Deactivate;
37import org.osgi.service.component.annotations.Modified;
38import org.osgi.service.component.annotations.Reference;
39import org.slf4j.Logger;
40
Madan Jampani6cc224b2015-04-20 16:56:00 -070041import java.util.Dictionary;
42import java.util.List;
43import java.util.Objects;
44import java.util.Set;
45import java.util.concurrent.CompletableFuture;
46import java.util.concurrent.Executor;
47import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
49import java.util.concurrent.ScheduledExecutorService;
50import java.util.concurrent.TimeUnit;
51import java.util.concurrent.atomic.AtomicInteger;
52import java.util.function.Function;
53import java.util.stream.IntStream;
54
Ray Milkeya18e2a62017-05-30 15:36:09 -070055import static com.google.common.base.Strings.isNullOrEmpty;
Ray Milkeya18e2a62017-05-30 15:36:09 -070056import static org.onlab.util.Tools.get;
57import static org.onlab.util.Tools.groupedThreads;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070058import static org.osgi.service.component.annotations.ReferenceCardinality.MANDATORY;
Ray Milkeya18e2a62017-05-30 15:36:09 -070059import static org.slf4j.LoggerFactory.getLogger;
60
Madan Jampani6cc224b2015-04-20 16:56:00 -070061/**
62 * Application for measuring cluster messaging performance.
63 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070064@Component(immediate = true, service = MessagingPerfApp.class)
Madan Jampani6cc224b2015-04-20 16:56:00 -070065public class MessagingPerfApp {
66 private final Logger log = getLogger(getClass());
67
Ray Milkeyd84f89b2018-08-17 14:54:17 -070068 @Reference(cardinality = MANDATORY)
Madan Jampani6cc224b2015-04-20 16:56:00 -070069 protected ClusterService clusterService;
70
Ray Milkeyd84f89b2018-08-17 14:54:17 -070071 @Reference(cardinality = MANDATORY)
Madan Jampani6cc224b2015-04-20 16:56:00 -070072 protected ClusterCommunicationService communicationService;
73
Ray Milkeyd84f89b2018-08-17 14:54:17 -070074 @Reference(cardinality = MANDATORY)
Madan Jampani6cc224b2015-04-20 16:56:00 -070075 protected CoreService coreService;
76
Ray Milkeyd84f89b2018-08-17 14:54:17 -070077 @Reference(cardinality = MANDATORY)
Madan Jampani6cc224b2015-04-20 16:56:00 -070078 protected ComponentConfigService configService;
79
80 private static final MessageSubject TEST_UNICAST_MESSAGE_TOPIC =
81 new MessageSubject("net-perf-unicast-message");
82
83 private static final MessageSubject TEST_REQUEST_REPLY_TOPIC =
84 new MessageSubject("net-perf-rr-message");
85
86 private static final int DEFAULT_SENDER_THREAD_POOL_SIZE = 2;
87 private static final int DEFAULT_RECEIVER_THREAD_POOL_SIZE = 2;
88
Ray Milkeyd84f89b2018-08-17 14:54:17 -070089 //@Property(name = "totalSenderThreads", intValue = DEFAULT_SENDER_THREAD_POOL_SIZE,
90 // label = "Number of sender threads")
Madan Jampani6cc224b2015-04-20 16:56:00 -070091 protected int totalSenderThreads = DEFAULT_SENDER_THREAD_POOL_SIZE;
92
Ray Milkeyd84f89b2018-08-17 14:54:17 -070093 //@Property(name = "totalReceiverThreads", intValue = DEFAULT_RECEIVER_THREAD_POOL_SIZE,
94 // label = "Number of receiver threads")
Madan Jampani6cc224b2015-04-20 16:56:00 -070095 protected int totalReceiverThreads = DEFAULT_RECEIVER_THREAD_POOL_SIZE;
96
Ray Milkeyd84f89b2018-08-17 14:54:17 -070097 //@Property(name = "serializationOn", boolValue = true,
98 // label = "Turn serialization on/off")
Madan Jampani6cc224b2015-04-20 16:56:00 -070099 private boolean serializationOn = true;
100
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700101 //@Property(name = "receiveOnIOLoopThread", boolValue = false,
102 // label = "Set this to true to handle message on IO thread")
Madan Jampani6cc224b2015-04-20 16:56:00 -0700103 private boolean receiveOnIOLoopThread = false;
104
105 protected int reportIntervalSeconds = 1;
106
107 private Executor messageReceivingExecutor;
108
109 private ExecutorService messageSendingExecutor =
110 BoundedThreadPool.newFixedThreadPool(totalSenderThreads,
111 groupedThreads("onos/messaging-perf-test", "sender-%d"));
112
113 private final ScheduledExecutorService reporter =
114 Executors.newSingleThreadScheduledExecutor(
115 groupedThreads("onos/net-perf-test", "reporter"));
116
117 private AtomicInteger received = new AtomicInteger(0);
118 private AtomicInteger sent = new AtomicInteger(0);
119 private AtomicInteger attempted = new AtomicInteger(0);
120 private AtomicInteger completed = new AtomicInteger(0);
121
Ray Milkeya18e2a62017-05-30 15:36:09 -0700122 private static final Serializer SERIALIZER = Serializer
123 .using(
124 KryoNamespace.newBuilder()
Madan Jampani6cc224b2015-04-20 16:56:00 -0700125 .register(KryoNamespaces.BASIC)
126 .register(KryoNamespaces.MISC)
Madan Jampani6cc224b2015-04-20 16:56:00 -0700127 .register(Data.class)
Ray Milkeya18e2a62017-05-30 15:36:09 -0700128 .build("MessagingPerfApp"));
Madan Jampani6cc224b2015-04-20 16:56:00 -0700129
130 private final Data data = new Data().withStringField("test")
131 .withListField(Lists.newArrayList("1", "2", "3"))
132 .withSetField(Sets.newHashSet("1", "2", "3"));
133 private final byte[] dataBytes = SERIALIZER.encode(new Data().withStringField("test")
134 .withListField(Lists.newArrayList("1", "2", "3"))
135 .withSetField(Sets.newHashSet("1", "2", "3")));
136
137 private Function<Data, byte[]> encoder;
138 private Function<byte[], Data> decoder;
139
140 @Activate
141 public void activate(ComponentContext context) {
142 configService.registerProperties(getClass());
143 setupCodecs();
144 messageReceivingExecutor = receiveOnIOLoopThread
145 ? MoreExecutors.directExecutor()
146 : Executors.newFixedThreadPool(
147 totalReceiverThreads,
148 groupedThreads("onos/net-perf-test", "receiver-%d"));
149 registerMessageHandlers();
150 startTest();
151 reporter.scheduleWithFixedDelay(this::reportPerformance,
152 reportIntervalSeconds,
153 reportIntervalSeconds,
154 TimeUnit.SECONDS);
155 logConfig("Started");
156 }
157
158 @Deactivate
159 public void deactivate(ComponentContext context) {
160 configService.unregisterProperties(getClass(), false);
161 stopTest();
162 reporter.shutdown();
163 unregisterMessageHandlers();
164 log.info("Stopped.");
165 }
166
167 @Modified
168 public void modified(ComponentContext context) {
169 if (context == null) {
170 totalSenderThreads = DEFAULT_SENDER_THREAD_POOL_SIZE;
171 totalReceiverThreads = DEFAULT_RECEIVER_THREAD_POOL_SIZE;
172 serializationOn = true;
173 receiveOnIOLoopThread = false;
174 return;
175 }
176
177 Dictionary properties = context.getProperties();
178
179 int newTotalSenderThreads = totalSenderThreads;
180 int newTotalReceiverThreads = totalReceiverThreads;
181 boolean newSerializationOn = serializationOn;
182 boolean newReceiveOnIOLoopThread = receiveOnIOLoopThread;
183 try {
184 String s = get(properties, "totalSenderThreads");
185 newTotalSenderThreads = isNullOrEmpty(s)
186 ? totalSenderThreads : Integer.parseInt(s.trim());
187
188 s = get(properties, "totalReceiverThreads");
189 newTotalReceiverThreads = isNullOrEmpty(s)
190 ? totalReceiverThreads : Integer.parseInt(s.trim());
191
192 s = get(properties, "serializationOn");
193 newSerializationOn = isNullOrEmpty(s)
194 ? serializationOn : Boolean.parseBoolean(s.trim());
195
196 s = get(properties, "receiveOnIOLoopThread");
197 newReceiveOnIOLoopThread = isNullOrEmpty(s)
198 ? receiveOnIOLoopThread : Boolean.parseBoolean(s.trim());
199
200 } catch (NumberFormatException | ClassCastException e) {
201 return;
202 }
203
204 boolean modified = newTotalSenderThreads != totalSenderThreads ||
205 newTotalReceiverThreads != totalReceiverThreads ||
206 newSerializationOn != serializationOn ||
207 newReceiveOnIOLoopThread != receiveOnIOLoopThread;
208
209 // If nothing has changed, simply return.
210 if (!modified) {
211 return;
212 }
213
214 totalSenderThreads = newTotalSenderThreads;
215 totalReceiverThreads = newTotalReceiverThreads;
216 serializationOn = newSerializationOn;
217 if (!receiveOnIOLoopThread && newReceiveOnIOLoopThread != receiveOnIOLoopThread) {
218 ((ExecutorService) messageReceivingExecutor).shutdown();
219 }
220 receiveOnIOLoopThread = newReceiveOnIOLoopThread;
221
222 // restart test.
223
224 stopTest();
225 unregisterMessageHandlers();
226 setupCodecs();
227 messageSendingExecutor =
228 BoundedThreadPool.newFixedThreadPool(
229 totalSenderThreads,
230 groupedThreads("onos/net-perf-test", "sender-%d"));
231 messageReceivingExecutor = receiveOnIOLoopThread
232 ? MoreExecutors.directExecutor()
233 : Executors.newFixedThreadPool(
234 totalReceiverThreads,
235 groupedThreads("onos/net-perf-test", "receiver-%d"));
236
237 registerMessageHandlers();
238 startTest();
239
240 logConfig("Reconfigured");
241 }
242
243
244 private void logConfig(String prefix) {
245 log.info("{} with senderThreadPoolSize = {}; receivingThreadPoolSize = {}"
246 + " serializationOn = {}, receiveOnIOLoopThread = {}",
247 prefix,
248 totalSenderThreads,
249 totalReceiverThreads,
250 serializationOn,
251 receiveOnIOLoopThread);
252 }
253
254 private void setupCodecs() {
255 encoder = serializationOn ? SERIALIZER::encode : d -> dataBytes;
256 decoder = serializationOn ? SERIALIZER::decode : b -> data;
257 }
258
259 private void registerMessageHandlers() {
260 communicationService.<Data>addSubscriber(
261 TEST_UNICAST_MESSAGE_TOPIC,
262 decoder,
Ray Milkey88cc3432017-03-30 17:19:08 -0700263 d -> {
264 received.incrementAndGet();
265 },
Madan Jampani6cc224b2015-04-20 16:56:00 -0700266 messageReceivingExecutor);
267
268 communicationService.<Data, Data>addSubscriber(
269 TEST_REQUEST_REPLY_TOPIC,
270 decoder,
271 Function.identity(),
272 encoder,
273 messageReceivingExecutor);
274 }
275
276 private void unregisterMessageHandlers() {
277 communicationService.removeSubscriber(TEST_UNICAST_MESSAGE_TOPIC);
278 communicationService.removeSubscriber(TEST_REQUEST_REPLY_TOPIC);
279 }
280
281 private void startTest() {
282 IntStream.range(0, totalSenderThreads).forEach(i -> requestReply());
283 }
284
285 private void stopTest() {
286 messageSendingExecutor.shutdown();
287 }
288
289 private void requestReply() {
290 try {
291 attempted.incrementAndGet();
292 CompletableFuture<Data> response =
293 communicationService.<Data, Data>sendAndReceive(
294 data,
295 TEST_REQUEST_REPLY_TOPIC,
296 encoder,
297 decoder,
298 randomPeer());
299 response.whenComplete((result, error) -> {
300 if (Objects.equals(data, result)) {
301 completed.incrementAndGet();
302 }
303 messageSendingExecutor.submit(this::requestReply);
304 });
305 } catch (Exception e) {
Ray Milkey4fd3ceb2015-12-10 14:43:08 -0800306 log.info("requestReply()", e);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700307 }
308 }
309
310 private void unicast() {
311 try {
312 sent.incrementAndGet();
313 communicationService.<Data>unicast(
314 data,
315 TEST_UNICAST_MESSAGE_TOPIC,
316 encoder,
317 randomPeer());
318 } catch (Exception e) {
Ray Milkey4fd3ceb2015-12-10 14:43:08 -0800319 log.info("unicast()", e);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700320 }
321 messageSendingExecutor.submit(this::unicast);
322 }
323
324 private void broadcast() {
325 try {
326 sent.incrementAndGet();
327 communicationService.<Data>broadcast(
328 data,
329 TEST_UNICAST_MESSAGE_TOPIC,
330 encoder);
331 } catch (Exception e) {
Ray Milkey4fd3ceb2015-12-10 14:43:08 -0800332 log.info("broadcast()", e);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700333 }
334 messageSendingExecutor.submit(this::broadcast);
335 }
336
337 private NodeId randomPeer() {
338 return clusterService.getNodes()
339 .stream()
340 .filter(node -> clusterService.getLocalNode().equals(node))
341 .findAny()
342 .get()
343 .id();
344 }
345
346 private void reportPerformance() {
347 log.info("Attempted: {} Completed: {}", attempted.getAndSet(0), completed.getAndSet(0));
348 }
349
350 private static class Data {
351 private String stringField;
352 private List<String> listField;
353 private Set<String> setField;
354
355 public Data withStringField(String value) {
356 stringField = value;
357 return this;
358 }
359
360 public Data withListField(List<String> value) {
361 listField = ImmutableList.copyOf(value);
362 return this;
363 }
364
365 public Data withSetField(Set<String> value) {
366 setField = ImmutableSet.copyOf(value);
367 return this;
368 }
369
370 @Override
371 public int hashCode() {
372 return Objects.hash(stringField, listField, setField);
373 }
374
375 @Override
376 public boolean equals(Object other) {
377 if (other instanceof Data) {
378 Data that = (Data) other;
379 return Objects.equals(this.stringField, that.stringField) &&
380 Objects.equals(this.listField, that.listField) &&
381 Objects.equals(this.setField, that.setField);
382 }
383 return false;
384 }
385 }
386}