blob: 03d4f63826dec4086829c83efeff1167f58149d4 [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampani6cc224b2015-04-20 16:56:00 -070016package org.onosproject.messagingperf;
17
18import static com.google.common.base.Strings.isNullOrEmpty;
19import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
20import static org.onlab.util.Tools.get;
21import static org.onlab.util.Tools.groupedThreads;
22import static org.slf4j.LoggerFactory.getLogger;
23
24import java.util.Dictionary;
25import java.util.List;
26import java.util.Objects;
27import java.util.Set;
28import java.util.concurrent.CompletableFuture;
29import java.util.concurrent.Executor;
30import java.util.concurrent.ExecutorService;
31import java.util.concurrent.Executors;
32import java.util.concurrent.ScheduledExecutorService;
33import java.util.concurrent.TimeUnit;
34import java.util.concurrent.atomic.AtomicInteger;
35import java.util.function.Function;
36import java.util.stream.IntStream;
37
38import org.apache.felix.scr.annotations.Activate;
39import org.apache.felix.scr.annotations.Component;
40import org.apache.felix.scr.annotations.Deactivate;
41import org.apache.felix.scr.annotations.Modified;
42import org.apache.felix.scr.annotations.Property;
43import org.apache.felix.scr.annotations.Reference;
44import org.apache.felix.scr.annotations.ReferenceCardinality;
45import org.apache.felix.scr.annotations.Service;
46import org.onlab.util.BoundedThreadPool;
47import org.onlab.util.KryoNamespace;
48import org.onosproject.cfg.ComponentConfigService;
49import org.onosproject.cluster.ClusterService;
50import org.onosproject.cluster.NodeId;
51import org.onosproject.core.CoreService;
52import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
53import org.onosproject.store.cluster.messaging.MessageSubject;
54import org.onosproject.store.serializers.KryoNamespaces;
55import org.onosproject.store.serializers.KryoSerializer;
56import org.osgi.service.component.ComponentContext;
57import org.slf4j.Logger;
58
59import com.google.common.collect.ImmutableList;
60import com.google.common.collect.ImmutableSet;
61import com.google.common.collect.Lists;
62import com.google.common.collect.Sets;
63import com.google.common.util.concurrent.MoreExecutors;
64
65/**
66 * Application for measuring cluster messaging performance.
67 */
68@Component(immediate = true, enabled = true)
69@Service(value = MessagingPerfApp.class)
70public class MessagingPerfApp {
71 private final Logger log = getLogger(getClass());
72
73 @Reference(cardinality = MANDATORY_UNARY)
74 protected ClusterService clusterService;
75
76 @Reference(cardinality = MANDATORY_UNARY)
77 protected ClusterCommunicationService communicationService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected CoreService coreService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 protected ComponentConfigService configService;
84
85 private static final MessageSubject TEST_UNICAST_MESSAGE_TOPIC =
86 new MessageSubject("net-perf-unicast-message");
87
88 private static final MessageSubject TEST_REQUEST_REPLY_TOPIC =
89 new MessageSubject("net-perf-rr-message");
90
91 private static final int DEFAULT_SENDER_THREAD_POOL_SIZE = 2;
92 private static final int DEFAULT_RECEIVER_THREAD_POOL_SIZE = 2;
93
94 @Property(name = "totalSenderThreads", intValue = DEFAULT_SENDER_THREAD_POOL_SIZE,
95 label = "Number of sender threads")
96 protected int totalSenderThreads = DEFAULT_SENDER_THREAD_POOL_SIZE;
97
98 @Property(name = "totalReceiverThreads", intValue = DEFAULT_RECEIVER_THREAD_POOL_SIZE,
99 label = "Number of receiver threads")
100 protected int totalReceiverThreads = DEFAULT_RECEIVER_THREAD_POOL_SIZE;
101
102 @Property(name = "serializationOn", boolValue = true,
103 label = "Turn serialization on/off")
104 private boolean serializationOn = true;
105
106 @Property(name = "receiveOnIOLoopThread", boolValue = false,
107 label = "Set this to true to handle message on IO thread")
108 private boolean receiveOnIOLoopThread = false;
109
110 protected int reportIntervalSeconds = 1;
111
112 private Executor messageReceivingExecutor;
113
114 private ExecutorService messageSendingExecutor =
115 BoundedThreadPool.newFixedThreadPool(totalSenderThreads,
116 groupedThreads("onos/messaging-perf-test", "sender-%d"));
117
118 private final ScheduledExecutorService reporter =
119 Executors.newSingleThreadScheduledExecutor(
120 groupedThreads("onos/net-perf-test", "reporter"));
121
122 private AtomicInteger received = new AtomicInteger(0);
123 private AtomicInteger sent = new AtomicInteger(0);
124 private AtomicInteger attempted = new AtomicInteger(0);
125 private AtomicInteger completed = new AtomicInteger(0);
126
127 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
128 @Override
129 protected void setupKryoPool() {
130 serializerPool = KryoNamespace.newBuilder()
131 .register(KryoNamespaces.BASIC)
132 .register(KryoNamespaces.MISC)
Madan Jampani6cc224b2015-04-20 16:56:00 -0700133 .register(Data.class)
Charles Chaneefdedf2016-05-23 16:45:45 -0700134 .build("MessagingPerfApp");
Madan Jampani6cc224b2015-04-20 16:56:00 -0700135 }
136 };
137
138 private final Data data = new Data().withStringField("test")
139 .withListField(Lists.newArrayList("1", "2", "3"))
140 .withSetField(Sets.newHashSet("1", "2", "3"));
141 private final byte[] dataBytes = SERIALIZER.encode(new Data().withStringField("test")
142 .withListField(Lists.newArrayList("1", "2", "3"))
143 .withSetField(Sets.newHashSet("1", "2", "3")));
144
145 private Function<Data, byte[]> encoder;
146 private Function<byte[], Data> decoder;
147
148 @Activate
149 public void activate(ComponentContext context) {
150 configService.registerProperties(getClass());
151 setupCodecs();
152 messageReceivingExecutor = receiveOnIOLoopThread
153 ? MoreExecutors.directExecutor()
154 : Executors.newFixedThreadPool(
155 totalReceiverThreads,
156 groupedThreads("onos/net-perf-test", "receiver-%d"));
157 registerMessageHandlers();
158 startTest();
159 reporter.scheduleWithFixedDelay(this::reportPerformance,
160 reportIntervalSeconds,
161 reportIntervalSeconds,
162 TimeUnit.SECONDS);
163 logConfig("Started");
164 }
165
166 @Deactivate
167 public void deactivate(ComponentContext context) {
168 configService.unregisterProperties(getClass(), false);
169 stopTest();
170 reporter.shutdown();
171 unregisterMessageHandlers();
172 log.info("Stopped.");
173 }
174
175 @Modified
176 public void modified(ComponentContext context) {
177 if (context == null) {
178 totalSenderThreads = DEFAULT_SENDER_THREAD_POOL_SIZE;
179 totalReceiverThreads = DEFAULT_RECEIVER_THREAD_POOL_SIZE;
180 serializationOn = true;
181 receiveOnIOLoopThread = false;
182 return;
183 }
184
185 Dictionary properties = context.getProperties();
186
187 int newTotalSenderThreads = totalSenderThreads;
188 int newTotalReceiverThreads = totalReceiverThreads;
189 boolean newSerializationOn = serializationOn;
190 boolean newReceiveOnIOLoopThread = receiveOnIOLoopThread;
191 try {
192 String s = get(properties, "totalSenderThreads");
193 newTotalSenderThreads = isNullOrEmpty(s)
194 ? totalSenderThreads : Integer.parseInt(s.trim());
195
196 s = get(properties, "totalReceiverThreads");
197 newTotalReceiverThreads = isNullOrEmpty(s)
198 ? totalReceiverThreads : Integer.parseInt(s.trim());
199
200 s = get(properties, "serializationOn");
201 newSerializationOn = isNullOrEmpty(s)
202 ? serializationOn : Boolean.parseBoolean(s.trim());
203
204 s = get(properties, "receiveOnIOLoopThread");
205 newReceiveOnIOLoopThread = isNullOrEmpty(s)
206 ? receiveOnIOLoopThread : Boolean.parseBoolean(s.trim());
207
208 } catch (NumberFormatException | ClassCastException e) {
209 return;
210 }
211
212 boolean modified = newTotalSenderThreads != totalSenderThreads ||
213 newTotalReceiverThreads != totalReceiverThreads ||
214 newSerializationOn != serializationOn ||
215 newReceiveOnIOLoopThread != receiveOnIOLoopThread;
216
217 // If nothing has changed, simply return.
218 if (!modified) {
219 return;
220 }
221
222 totalSenderThreads = newTotalSenderThreads;
223 totalReceiverThreads = newTotalReceiverThreads;
224 serializationOn = newSerializationOn;
225 if (!receiveOnIOLoopThread && newReceiveOnIOLoopThread != receiveOnIOLoopThread) {
226 ((ExecutorService) messageReceivingExecutor).shutdown();
227 }
228 receiveOnIOLoopThread = newReceiveOnIOLoopThread;
229
230 // restart test.
231
232 stopTest();
233 unregisterMessageHandlers();
234 setupCodecs();
235 messageSendingExecutor =
236 BoundedThreadPool.newFixedThreadPool(
237 totalSenderThreads,
238 groupedThreads("onos/net-perf-test", "sender-%d"));
239 messageReceivingExecutor = receiveOnIOLoopThread
240 ? MoreExecutors.directExecutor()
241 : Executors.newFixedThreadPool(
242 totalReceiverThreads,
243 groupedThreads("onos/net-perf-test", "receiver-%d"));
244
245 registerMessageHandlers();
246 startTest();
247
248 logConfig("Reconfigured");
249 }
250
251
252 private void logConfig(String prefix) {
253 log.info("{} with senderThreadPoolSize = {}; receivingThreadPoolSize = {}"
254 + " serializationOn = {}, receiveOnIOLoopThread = {}",
255 prefix,
256 totalSenderThreads,
257 totalReceiverThreads,
258 serializationOn,
259 receiveOnIOLoopThread);
260 }
261
262 private void setupCodecs() {
263 encoder = serializationOn ? SERIALIZER::encode : d -> dataBytes;
264 decoder = serializationOn ? SERIALIZER::decode : b -> data;
265 }
266
267 private void registerMessageHandlers() {
268 communicationService.<Data>addSubscriber(
269 TEST_UNICAST_MESSAGE_TOPIC,
270 decoder,
Ray Milkey88cc3432017-03-30 17:19:08 -0700271 d -> {
272 received.incrementAndGet();
273 },
Madan Jampani6cc224b2015-04-20 16:56:00 -0700274 messageReceivingExecutor);
275
276 communicationService.<Data, Data>addSubscriber(
277 TEST_REQUEST_REPLY_TOPIC,
278 decoder,
279 Function.identity(),
280 encoder,
281 messageReceivingExecutor);
282 }
283
284 private void unregisterMessageHandlers() {
285 communicationService.removeSubscriber(TEST_UNICAST_MESSAGE_TOPIC);
286 communicationService.removeSubscriber(TEST_REQUEST_REPLY_TOPIC);
287 }
288
289 private void startTest() {
290 IntStream.range(0, totalSenderThreads).forEach(i -> requestReply());
291 }
292
293 private void stopTest() {
294 messageSendingExecutor.shutdown();
295 }
296
297 private void requestReply() {
298 try {
299 attempted.incrementAndGet();
300 CompletableFuture<Data> response =
301 communicationService.<Data, Data>sendAndReceive(
302 data,
303 TEST_REQUEST_REPLY_TOPIC,
304 encoder,
305 decoder,
306 randomPeer());
307 response.whenComplete((result, error) -> {
308 if (Objects.equals(data, result)) {
309 completed.incrementAndGet();
310 }
311 messageSendingExecutor.submit(this::requestReply);
312 });
313 } catch (Exception e) {
Ray Milkey4fd3ceb2015-12-10 14:43:08 -0800314 log.info("requestReply()", e);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700315 }
316 }
317
318 private void unicast() {
319 try {
320 sent.incrementAndGet();
321 communicationService.<Data>unicast(
322 data,
323 TEST_UNICAST_MESSAGE_TOPIC,
324 encoder,
325 randomPeer());
326 } catch (Exception e) {
Ray Milkey4fd3ceb2015-12-10 14:43:08 -0800327 log.info("unicast()", e);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700328 }
329 messageSendingExecutor.submit(this::unicast);
330 }
331
332 private void broadcast() {
333 try {
334 sent.incrementAndGet();
335 communicationService.<Data>broadcast(
336 data,
337 TEST_UNICAST_MESSAGE_TOPIC,
338 encoder);
339 } catch (Exception e) {
Ray Milkey4fd3ceb2015-12-10 14:43:08 -0800340 log.info("broadcast()", e);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700341 }
342 messageSendingExecutor.submit(this::broadcast);
343 }
344
345 private NodeId randomPeer() {
346 return clusterService.getNodes()
347 .stream()
348 .filter(node -> clusterService.getLocalNode().equals(node))
349 .findAny()
350 .get()
351 .id();
352 }
353
354 private void reportPerformance() {
355 log.info("Attempted: {} Completed: {}", attempted.getAndSet(0), completed.getAndSet(0));
356 }
357
358 private static class Data {
359 private String stringField;
360 private List<String> listField;
361 private Set<String> setField;
362
363 public Data withStringField(String value) {
364 stringField = value;
365 return this;
366 }
367
368 public Data withListField(List<String> value) {
369 listField = ImmutableList.copyOf(value);
370 return this;
371 }
372
373 public Data withSetField(Set<String> value) {
374 setField = ImmutableSet.copyOf(value);
375 return this;
376 }
377
378 @Override
379 public int hashCode() {
380 return Objects.hash(stringField, listField, setField);
381 }
382
383 @Override
384 public boolean equals(Object other) {
385 if (other instanceof Data) {
386 Data that = (Data) other;
387 return Objects.equals(this.stringField, that.stringField) &&
388 Objects.equals(this.listField, that.listField) &&
389 Objects.equals(this.setField, that.setField);
390 }
391 return false;
392 }
393 }
394}