blob: de739ccd70e72283af0dffc6ecd2cf153f33660d [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampani6cc224b2015-04-20 16:56:00 -070016package org.onosproject.messagingperf;
17
Madan Jampani6cc224b2015-04-20 16:56:00 -070018import java.util.Dictionary;
19import java.util.List;
20import java.util.Objects;
21import java.util.Set;
22import java.util.concurrent.CompletableFuture;
23import java.util.concurrent.Executor;
24import java.util.concurrent.ExecutorService;
25import java.util.concurrent.Executors;
26import java.util.concurrent.ScheduledExecutorService;
27import java.util.concurrent.TimeUnit;
28import java.util.concurrent.atomic.AtomicInteger;
29import java.util.function.Function;
30import java.util.stream.IntStream;
31
32import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
34import org.apache.felix.scr.annotations.Deactivate;
35import org.apache.felix.scr.annotations.Modified;
36import org.apache.felix.scr.annotations.Property;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.apache.felix.scr.annotations.Service;
40import org.onlab.util.BoundedThreadPool;
41import org.onlab.util.KryoNamespace;
42import org.onosproject.cfg.ComponentConfigService;
43import org.onosproject.cluster.ClusterService;
44import org.onosproject.cluster.NodeId;
45import org.onosproject.core.CoreService;
46import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
47import org.onosproject.store.cluster.messaging.MessageSubject;
48import org.onosproject.store.serializers.KryoNamespaces;
Ray Milkeya18e2a62017-05-30 15:36:09 -070049import org.onosproject.store.service.Serializer;
Madan Jampani6cc224b2015-04-20 16:56:00 -070050import org.osgi.service.component.ComponentContext;
51import org.slf4j.Logger;
52
53import com.google.common.collect.ImmutableList;
54import com.google.common.collect.ImmutableSet;
55import com.google.common.collect.Lists;
56import com.google.common.collect.Sets;
57import com.google.common.util.concurrent.MoreExecutors;
58
Ray Milkeya18e2a62017-05-30 15:36:09 -070059import static com.google.common.base.Strings.isNullOrEmpty;
60import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
61import static org.onlab.util.Tools.get;
62import static org.onlab.util.Tools.groupedThreads;
63import static org.slf4j.LoggerFactory.getLogger;
64
Madan Jampani6cc224b2015-04-20 16:56:00 -070065/**
66 * Application for measuring cluster messaging performance.
67 */
68@Component(immediate = true, enabled = true)
69@Service(value = MessagingPerfApp.class)
70public class MessagingPerfApp {
71 private final Logger log = getLogger(getClass());
72
73 @Reference(cardinality = MANDATORY_UNARY)
74 protected ClusterService clusterService;
75
76 @Reference(cardinality = MANDATORY_UNARY)
77 protected ClusterCommunicationService communicationService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected CoreService coreService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 protected ComponentConfigService configService;
84
85 private static final MessageSubject TEST_UNICAST_MESSAGE_TOPIC =
86 new MessageSubject("net-perf-unicast-message");
87
88 private static final MessageSubject TEST_REQUEST_REPLY_TOPIC =
89 new MessageSubject("net-perf-rr-message");
90
91 private static final int DEFAULT_SENDER_THREAD_POOL_SIZE = 2;
92 private static final int DEFAULT_RECEIVER_THREAD_POOL_SIZE = 2;
93
94 @Property(name = "totalSenderThreads", intValue = DEFAULT_SENDER_THREAD_POOL_SIZE,
95 label = "Number of sender threads")
96 protected int totalSenderThreads = DEFAULT_SENDER_THREAD_POOL_SIZE;
97
98 @Property(name = "totalReceiverThreads", intValue = DEFAULT_RECEIVER_THREAD_POOL_SIZE,
99 label = "Number of receiver threads")
100 protected int totalReceiverThreads = DEFAULT_RECEIVER_THREAD_POOL_SIZE;
101
102 @Property(name = "serializationOn", boolValue = true,
103 label = "Turn serialization on/off")
104 private boolean serializationOn = true;
105
106 @Property(name = "receiveOnIOLoopThread", boolValue = false,
107 label = "Set this to true to handle message on IO thread")
108 private boolean receiveOnIOLoopThread = false;
109
110 protected int reportIntervalSeconds = 1;
111
112 private Executor messageReceivingExecutor;
113
114 private ExecutorService messageSendingExecutor =
115 BoundedThreadPool.newFixedThreadPool(totalSenderThreads,
116 groupedThreads("onos/messaging-perf-test", "sender-%d"));
117
118 private final ScheduledExecutorService reporter =
119 Executors.newSingleThreadScheduledExecutor(
120 groupedThreads("onos/net-perf-test", "reporter"));
121
122 private AtomicInteger received = new AtomicInteger(0);
123 private AtomicInteger sent = new AtomicInteger(0);
124 private AtomicInteger attempted = new AtomicInteger(0);
125 private AtomicInteger completed = new AtomicInteger(0);
126
Ray Milkeya18e2a62017-05-30 15:36:09 -0700127 private static final Serializer SERIALIZER = Serializer
128 .using(
129 KryoNamespace.newBuilder()
Madan Jampani6cc224b2015-04-20 16:56:00 -0700130 .register(KryoNamespaces.BASIC)
131 .register(KryoNamespaces.MISC)
Madan Jampani6cc224b2015-04-20 16:56:00 -0700132 .register(Data.class)
Ray Milkeya18e2a62017-05-30 15:36:09 -0700133 .build("MessagingPerfApp"));
Madan Jampani6cc224b2015-04-20 16:56:00 -0700134
135 private final Data data = new Data().withStringField("test")
136 .withListField(Lists.newArrayList("1", "2", "3"))
137 .withSetField(Sets.newHashSet("1", "2", "3"));
138 private final byte[] dataBytes = SERIALIZER.encode(new Data().withStringField("test")
139 .withListField(Lists.newArrayList("1", "2", "3"))
140 .withSetField(Sets.newHashSet("1", "2", "3")));
141
142 private Function<Data, byte[]> encoder;
143 private Function<byte[], Data> decoder;
144
145 @Activate
146 public void activate(ComponentContext context) {
147 configService.registerProperties(getClass());
148 setupCodecs();
149 messageReceivingExecutor = receiveOnIOLoopThread
150 ? MoreExecutors.directExecutor()
151 : Executors.newFixedThreadPool(
152 totalReceiverThreads,
153 groupedThreads("onos/net-perf-test", "receiver-%d"));
154 registerMessageHandlers();
155 startTest();
156 reporter.scheduleWithFixedDelay(this::reportPerformance,
157 reportIntervalSeconds,
158 reportIntervalSeconds,
159 TimeUnit.SECONDS);
160 logConfig("Started");
161 }
162
163 @Deactivate
164 public void deactivate(ComponentContext context) {
165 configService.unregisterProperties(getClass(), false);
166 stopTest();
167 reporter.shutdown();
168 unregisterMessageHandlers();
169 log.info("Stopped.");
170 }
171
172 @Modified
173 public void modified(ComponentContext context) {
174 if (context == null) {
175 totalSenderThreads = DEFAULT_SENDER_THREAD_POOL_SIZE;
176 totalReceiverThreads = DEFAULT_RECEIVER_THREAD_POOL_SIZE;
177 serializationOn = true;
178 receiveOnIOLoopThread = false;
179 return;
180 }
181
182 Dictionary properties = context.getProperties();
183
184 int newTotalSenderThreads = totalSenderThreads;
185 int newTotalReceiverThreads = totalReceiverThreads;
186 boolean newSerializationOn = serializationOn;
187 boolean newReceiveOnIOLoopThread = receiveOnIOLoopThread;
188 try {
189 String s = get(properties, "totalSenderThreads");
190 newTotalSenderThreads = isNullOrEmpty(s)
191 ? totalSenderThreads : Integer.parseInt(s.trim());
192
193 s = get(properties, "totalReceiverThreads");
194 newTotalReceiverThreads = isNullOrEmpty(s)
195 ? totalReceiverThreads : Integer.parseInt(s.trim());
196
197 s = get(properties, "serializationOn");
198 newSerializationOn = isNullOrEmpty(s)
199 ? serializationOn : Boolean.parseBoolean(s.trim());
200
201 s = get(properties, "receiveOnIOLoopThread");
202 newReceiveOnIOLoopThread = isNullOrEmpty(s)
203 ? receiveOnIOLoopThread : Boolean.parseBoolean(s.trim());
204
205 } catch (NumberFormatException | ClassCastException e) {
206 return;
207 }
208
209 boolean modified = newTotalSenderThreads != totalSenderThreads ||
210 newTotalReceiverThreads != totalReceiverThreads ||
211 newSerializationOn != serializationOn ||
212 newReceiveOnIOLoopThread != receiveOnIOLoopThread;
213
214 // If nothing has changed, simply return.
215 if (!modified) {
216 return;
217 }
218
219 totalSenderThreads = newTotalSenderThreads;
220 totalReceiverThreads = newTotalReceiverThreads;
221 serializationOn = newSerializationOn;
222 if (!receiveOnIOLoopThread && newReceiveOnIOLoopThread != receiveOnIOLoopThread) {
223 ((ExecutorService) messageReceivingExecutor).shutdown();
224 }
225 receiveOnIOLoopThread = newReceiveOnIOLoopThread;
226
227 // restart test.
228
229 stopTest();
230 unregisterMessageHandlers();
231 setupCodecs();
232 messageSendingExecutor =
233 BoundedThreadPool.newFixedThreadPool(
234 totalSenderThreads,
235 groupedThreads("onos/net-perf-test", "sender-%d"));
236 messageReceivingExecutor = receiveOnIOLoopThread
237 ? MoreExecutors.directExecutor()
238 : Executors.newFixedThreadPool(
239 totalReceiverThreads,
240 groupedThreads("onos/net-perf-test", "receiver-%d"));
241
242 registerMessageHandlers();
243 startTest();
244
245 logConfig("Reconfigured");
246 }
247
248
249 private void logConfig(String prefix) {
250 log.info("{} with senderThreadPoolSize = {}; receivingThreadPoolSize = {}"
251 + " serializationOn = {}, receiveOnIOLoopThread = {}",
252 prefix,
253 totalSenderThreads,
254 totalReceiverThreads,
255 serializationOn,
256 receiveOnIOLoopThread);
257 }
258
259 private void setupCodecs() {
260 encoder = serializationOn ? SERIALIZER::encode : d -> dataBytes;
261 decoder = serializationOn ? SERIALIZER::decode : b -> data;
262 }
263
264 private void registerMessageHandlers() {
265 communicationService.<Data>addSubscriber(
266 TEST_UNICAST_MESSAGE_TOPIC,
267 decoder,
Ray Milkey88cc3432017-03-30 17:19:08 -0700268 d -> {
269 received.incrementAndGet();
270 },
Madan Jampani6cc224b2015-04-20 16:56:00 -0700271 messageReceivingExecutor);
272
273 communicationService.<Data, Data>addSubscriber(
274 TEST_REQUEST_REPLY_TOPIC,
275 decoder,
276 Function.identity(),
277 encoder,
278 messageReceivingExecutor);
279 }
280
281 private void unregisterMessageHandlers() {
282 communicationService.removeSubscriber(TEST_UNICAST_MESSAGE_TOPIC);
283 communicationService.removeSubscriber(TEST_REQUEST_REPLY_TOPIC);
284 }
285
286 private void startTest() {
287 IntStream.range(0, totalSenderThreads).forEach(i -> requestReply());
288 }
289
290 private void stopTest() {
291 messageSendingExecutor.shutdown();
292 }
293
294 private void requestReply() {
295 try {
296 attempted.incrementAndGet();
297 CompletableFuture<Data> response =
298 communicationService.<Data, Data>sendAndReceive(
299 data,
300 TEST_REQUEST_REPLY_TOPIC,
301 encoder,
302 decoder,
303 randomPeer());
304 response.whenComplete((result, error) -> {
305 if (Objects.equals(data, result)) {
306 completed.incrementAndGet();
307 }
308 messageSendingExecutor.submit(this::requestReply);
309 });
310 } catch (Exception e) {
Ray Milkey4fd3ceb2015-12-10 14:43:08 -0800311 log.info("requestReply()", e);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700312 }
313 }
314
315 private void unicast() {
316 try {
317 sent.incrementAndGet();
318 communicationService.<Data>unicast(
319 data,
320 TEST_UNICAST_MESSAGE_TOPIC,
321 encoder,
322 randomPeer());
323 } catch (Exception e) {
Ray Milkey4fd3ceb2015-12-10 14:43:08 -0800324 log.info("unicast()", e);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700325 }
326 messageSendingExecutor.submit(this::unicast);
327 }
328
329 private void broadcast() {
330 try {
331 sent.incrementAndGet();
332 communicationService.<Data>broadcast(
333 data,
334 TEST_UNICAST_MESSAGE_TOPIC,
335 encoder);
336 } catch (Exception e) {
Ray Milkey4fd3ceb2015-12-10 14:43:08 -0800337 log.info("broadcast()", e);
Madan Jampani6cc224b2015-04-20 16:56:00 -0700338 }
339 messageSendingExecutor.submit(this::broadcast);
340 }
341
342 private NodeId randomPeer() {
343 return clusterService.getNodes()
344 .stream()
345 .filter(node -> clusterService.getLocalNode().equals(node))
346 .findAny()
347 .get()
348 .id();
349 }
350
351 private void reportPerformance() {
352 log.info("Attempted: {} Completed: {}", attempted.getAndSet(0), completed.getAndSet(0));
353 }
354
355 private static class Data {
356 private String stringField;
357 private List<String> listField;
358 private Set<String> setField;
359
360 public Data withStringField(String value) {
361 stringField = value;
362 return this;
363 }
364
365 public Data withListField(List<String> value) {
366 listField = ImmutableList.copyOf(value);
367 return this;
368 }
369
370 public Data withSetField(Set<String> value) {
371 setField = ImmutableSet.copyOf(value);
372 return this;
373 }
374
375 @Override
376 public int hashCode() {
377 return Objects.hash(stringField, listField, setField);
378 }
379
380 @Override
381 public boolean equals(Object other) {
382 if (other instanceof Data) {
383 Data that = (Data) other;
384 return Objects.equals(this.stringField, that.stringField) &&
385 Objects.equals(this.listField, that.listField) &&
386 Objects.equals(this.setField, that.setField);
387 }
388 return false;
389 }
390 }
391}