blob: 2a8c14a750379ee3b97aa8575467f454c6ed3956 [file] [log] [blame]
Brian O'Connora468e902015-03-18 16:43:49 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.intentperf;
17
18import com.google.common.collect.ArrayListMultimap;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
21import com.google.common.collect.Multimap;
22import com.google.common.collect.Sets;
23import org.apache.commons.lang.math.RandomUtils;
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Modified;
28import org.apache.felix.scr.annotations.Property;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onlab.packet.MacAddress;
33import org.onlab.util.Counter;
34import org.onosproject.cfg.ComponentConfigService;
35import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.ControllerNode;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.ConnectPoint;
42import org.onosproject.net.Device;
43import org.onosproject.net.PortNumber;
44import org.onosproject.net.device.DeviceService;
45import org.onosproject.net.flow.DefaultTrafficSelector;
46import org.onosproject.net.flow.DefaultTrafficTreatment;
47import org.onosproject.net.flow.TrafficSelector;
48import org.onosproject.net.flow.TrafficTreatment;
49import org.onosproject.net.intent.Intent;
50import org.onosproject.net.intent.IntentEvent;
51import org.onosproject.net.intent.IntentListener;
52import org.onosproject.net.intent.IntentService;
53import org.onosproject.net.intent.Key;
54import org.onosproject.net.intent.PartitionService;
55import org.onosproject.net.intent.PointToPointIntent;
56import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Brian O'Connora468e902015-03-18 16:43:49 -070057import org.onosproject.store.cluster.messaging.MessageSubject;
58import org.osgi.service.component.ComponentContext;
59import org.slf4j.Logger;
60
61import java.util.ArrayList;
62import java.util.Collections;
63import java.util.Dictionary;
64import java.util.List;
65import java.util.Map;
66import java.util.Set;
67import java.util.Timer;
68import java.util.TimerTask;
69import java.util.concurrent.ExecutorService;
70import java.util.concurrent.Executors;
71import java.util.concurrent.TimeUnit;
Sho SHIMIZUc032c832016-01-13 13:02:05 -080072import java.util.function.Consumer;
Brian O'Connora468e902015-03-18 16:43:49 -070073import java.util.stream.Collectors;
74
75import static com.google.common.base.Preconditions.checkState;
76import static com.google.common.base.Strings.isNullOrEmpty;
77import static java.lang.String.format;
78import static java.lang.System.currentTimeMillis;
79import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
80import static org.onlab.util.Tools.*;
81import static org.onosproject.net.intent.IntentEvent.Type.*;
82import static org.slf4j.LoggerFactory.getLogger;
83
84/**
85 * Application to test sustained intent throughput.
86 */
87@Component(immediate = true)
88@Service(value = IntentPerfInstaller.class)
89public class IntentPerfInstaller {
90
91 private final Logger log = getLogger(getClass());
92
93 private static final int DEFAULT_NUM_WORKERS = 1;
94
95 private static final int DEFAULT_NUM_KEYS = 40000;
96 private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1000; //ms
97
98 private static final int DEFAULT_NUM_NEIGHBORS = 0;
99
100 private static final int START_DELAY = 5_000; // ms
Thomas Vachuska95aadff2015-03-26 11:45:41 -0700101 private static final int REPORT_PERIOD = 1_000; //ms
Brian O'Connora468e902015-03-18 16:43:49 -0700102
103 private static final String START = "start";
104 private static final String STOP = "stop";
105 private static final MessageSubject CONTROL = new MessageSubject("intent-perf-ctl");
106
107 //FIXME add path length
108
109 @Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
110 label = "Number of keys (i.e. unique intents) to generate per instance")
111 private int numKeys = DEFAULT_NUM_KEYS;
112
113 //TODO implement numWorkers property
114// @Property(name = "numThreads", intValue = DEFAULT_NUM_WORKERS,
115// label = "Number of installer threads per instance")
116// private int numWokers = DEFAULT_NUM_WORKERS;
117
118 @Property(name = "cyclePeriod", intValue = DEFAULT_GOAL_CYCLE_PERIOD,
119 label = "Goal for cycle period (in ms)")
120 private int cyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
121
122 @Property(name = "numNeighbors", intValue = DEFAULT_NUM_NEIGHBORS,
123 label = "Number of neighbors to generate intents for")
124 private int numNeighbors = DEFAULT_NUM_NEIGHBORS;
125
126 @Reference(cardinality = MANDATORY_UNARY)
127 protected CoreService coreService;
128
129 @Reference(cardinality = MANDATORY_UNARY)
130 protected IntentService intentService;
131
132 @Reference(cardinality = MANDATORY_UNARY)
133 protected ClusterService clusterService;
134
135 @Reference(cardinality = MANDATORY_UNARY)
136 protected DeviceService deviceService;
137
138 @Reference(cardinality = MANDATORY_UNARY)
139 protected MastershipService mastershipService;
140
141 @Reference(cardinality = MANDATORY_UNARY)
142 protected PartitionService partitionService;
143
144 @Reference(cardinality = MANDATORY_UNARY)
145 protected ComponentConfigService configService;
146
147 @Reference(cardinality = MANDATORY_UNARY)
148 protected IntentPerfCollector sampleCollector;
149
150 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
151 protected ClusterCommunicationService communicationService;
152
153 private ExecutorService messageHandlingExecutor;
154
155 private ExecutorService workers;
156 private ApplicationId appId;
157 private Listener listener;
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700158 private boolean stopped = true;
Brian O'Connora468e902015-03-18 16:43:49 -0700159
160 private Timer reportTimer;
161
162 // FIXME this variable isn't shared properly between multiple worker threads
163 private int lastKey = 0;
164
165 private IntentPerfUi perfUi;
166 private NodeId nodeId;
167 private TimerTask reporterTask;
168
169 @Activate
170 public void activate(ComponentContext context) {
171 configService.registerProperties(getClass());
172
173 nodeId = clusterService.getLocalNode().id();
174 appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId.toString());
175
176 // TODO: replace with shared timer
177 reportTimer = new Timer("onos-intent-perf-reporter");
178 workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
179
180 // disable flow backups for testing
suibin zhangb3eded02015-09-23 11:49:26 -0700181 configService.setProperty("org.onosproject.store.flow.impl.NewDistributedFlowRuleStore",
182 "backupEnabled", "true");
Brian O'Connora468e902015-03-18 16:43:49 -0700183
184 // TODO: replace with shared executor
185 messageHandlingExecutor = Executors.newSingleThreadExecutor(
186 groupedThreads("onos/perf", "command-handler"));
187
Sho SHIMIZUc032c832016-01-13 13:02:05 -0800188 communicationService.addSubscriber(CONTROL, String::new, new InternalControl(),
Brian O'Connora468e902015-03-18 16:43:49 -0700189 messageHandlingExecutor);
190
191 listener = new Listener();
192 intentService.addListener(listener);
193
194 // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
195 modify(context);
196 }
197
198 @Deactivate
199 public void deactivate() {
200 stopTestRun();
201
202 configService.unregisterProperties(getClass(), false);
203 messageHandlingExecutor.shutdown();
204 communicationService.removeSubscriber(CONTROL);
205
206 if (listener != null) {
207 reportTimer.cancel();
208 intentService.removeListener(listener);
209 listener = null;
210 reportTimer = null;
211 }
212 }
213
214 @Modified
215 public void modify(ComponentContext context) {
216 if (context == null) {
217 logConfig("Reconfigured");
218 return;
219 }
220
221 Dictionary<?, ?> properties = context.getProperties();
222 int newNumKeys, newCyclePeriod, newNumNeighbors;
223 try {
224 String s = get(properties, "numKeys");
225 newNumKeys = isNullOrEmpty(s) ? numKeys : Integer.parseInt(s.trim());
226
227 s = get(properties, "cyclePeriod");
228 newCyclePeriod = isNullOrEmpty(s) ? cyclePeriod : Integer.parseInt(s.trim());
229
230 s = get(properties, "numNeighbors");
231 newNumNeighbors = isNullOrEmpty(s) ? numNeighbors : Integer.parseInt(s.trim());
232
233 } catch (NumberFormatException | ClassCastException e) {
234 log.warn("Malformed configuration detected; using defaults", e);
235 newNumKeys = DEFAULT_NUM_KEYS;
236 newCyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
237 newNumNeighbors = DEFAULT_NUM_NEIGHBORS;
238 }
239
240 if (newNumKeys != numKeys || newCyclePeriod != cyclePeriod || newNumNeighbors != numNeighbors) {
241 numKeys = newNumKeys;
242 cyclePeriod = newCyclePeriod;
243 numNeighbors = newNumNeighbors;
244 logConfig("Reconfigured");
245 }
246 }
247
248 public void start() {
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700249 if (stopped) {
250 stopped = false;
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700251 communicationService.broadcast(START, CONTROL, str -> str.getBytes());
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700252 startTestRun();
253 }
Brian O'Connora468e902015-03-18 16:43:49 -0700254 }
255
256 public void stop() {
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700257 if (!stopped) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700258 communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700259 stopTestRun();
260 }
Brian O'Connora468e902015-03-18 16:43:49 -0700261 }
262
263 private void logConfig(String prefix) {
264 log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
265 prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
266 }
267
268 private void startTestRun() {
269 sampleCollector.clearSamples();
270
271 // adjust numNeighbors and generate list of neighbors
272 numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
273
274 // Schedule reporter task on report period boundary
275 reporterTask = new ReporterTask();
276 reportTimer.scheduleAtFixedRate(reporterTask,
277 REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
278 REPORT_PERIOD);
279
280 // Submit workers
281 stopped = false;
282 for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
283 workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
284 }
285 log.info("Started test run");
286 }
287
288 private void stopTestRun() {
Brian O'Connora468e902015-03-18 16:43:49 -0700289 if (reporterTask != null) {
290 reporterTask.cancel();
291 reporterTask = null;
292 }
293
294 try {
295 workers.awaitTermination(5 * cyclePeriod, TimeUnit.MILLISECONDS);
296 } catch (InterruptedException e) {
297 log.warn("Failed to stop worker", e);
298 }
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700299
300 sampleCollector.recordSample(0, 0);
301 sampleCollector.recordSample(0, 0);
302 stopped = true;
303
Brian O'Connora468e902015-03-18 16:43:49 -0700304 log.info("Stopped test run");
305 }
306
307 private List<NodeId> getNeighbors() {
308 List<NodeId> nodes = clusterService.getNodes().stream()
309 .map(ControllerNode::id)
310 .collect(Collectors.toCollection(ArrayList::new));
311 // sort neighbors by id
312 Collections.sort(nodes, (node1, node2) ->
313 node1.toString().compareTo(node2.toString()));
314 // rotate the local node to index 0
315 Collections.rotate(nodes, -1 * nodes.indexOf(clusterService.getLocalNode().id()));
316 log.debug("neighbors (raw): {}", nodes); //TODO remove
317 // generate the sub-list that will contain local node and selected neighbors
318 nodes = nodes.subList(0, numNeighbors + 1);
319 log.debug("neighbors: {}", nodes); //TODO remove
320 return nodes;
321 }
322
323 private Intent createIntent(Key key, long mac, NodeId node, Multimap<NodeId, Device> devices) {
324 // choose a random device for which this node is master
325 List<Device> deviceList = devices.get(node).stream().collect(Collectors.toList());
326 Device device = deviceList.get(RandomUtils.nextInt(deviceList.size()));
327
328 //FIXME we currently ignore the path length and always use the same device
329 TrafficSelector selector = DefaultTrafficSelector.builder()
330 .matchEthDst(MacAddress.valueOf(mac)).build();
331 TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
332 ConnectPoint ingress = new ConnectPoint(device.id(), PortNumber.portNumber(1));
333 ConnectPoint egress = new ConnectPoint(device.id(), PortNumber.portNumber(2));
334
335 return PointToPointIntent.builder()
336 .appId(appId)
337 .key(key)
338 .selector(selector)
339 .treatment(treatment)
340 .ingressPoint(ingress)
341 .egressPoint(egress)
342 .build();
343 }
344
345 /**
346 * Creates a specified number of intents for testing purposes.
347 *
348 * @param numberOfKeys number of intents
349 * @param pathLength path depth
350 * @param firstKey first key to attempt
351 * @return set of intents
352 */
353 private Set<Intent> createIntents(int numberOfKeys, int pathLength, int firstKey) {
354 List<NodeId> neighbors = getNeighbors();
355
356 Multimap<NodeId, Device> devices = ArrayListMultimap.create();
357 deviceService.getAvailableDevices()
358 .forEach(device -> devices.put(mastershipService.getMasterFor(device.id()), device));
359
360 // ensure that we have at least one device per neighbor
361 neighbors.forEach(node -> checkState(devices.get(node).size() > 0,
362 "There are no devices for {}", node));
363
364 // TODO pull this outside so that createIntent can use it
365 // prefix based on node id for keys generated on this instance
366 long keyPrefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;
367
368 int maxKeysPerNode = (int) Math.ceil((double) numberOfKeys / neighbors.size());
369 Multimap<NodeId, Intent> intents = ArrayListMultimap.create();
370
371 for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
372 Key key = Key.of(keyPrefix + k, appId);
373
374 NodeId leader = partitionService.getLeader(key);
375 if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) {
376 // Bail if we are not sending to this node or we have enough for this node
377 continue;
378 }
379 intents.put(leader, createIntent(key, keyPrefix + k, leader, devices));
380
381 // Bump up the counter and remember this as the last key used.
382 count++;
383 lastKey = k;
384 if (count % 1000 == 0) {
385 log.info("Building intents... {} (attempt: {})", count, lastKey);
386 }
387 }
388 checkState(intents.values().size() == numberOfKeys,
389 "Generated wrong number of intents");
390 log.info("Created {} intents", numberOfKeys);
391 intents.keySet().forEach(node -> log.info("\t{}\t{}", node, intents.get(node).size()));
392
393 return Sets.newHashSet(intents.values());
394 }
395
396 // Submits intent operations.
397 final class Submitter implements Runnable {
398
399 private long lastDuration;
400 private int lastCount;
401
402 private Set<Intent> intents = Sets.newHashSet();
403 private Set<Intent> submitted = Sets.newHashSet();
404 private Set<Intent> withdrawn = Sets.newHashSet();
405
406 private Submitter(Set<Intent> intents) {
407 this.intents = intents;
408 lastCount = numKeys / 4;
409 lastDuration = 1_000; // 1 second
410 }
411
412 @Override
413 public void run() {
414 prime();
415 while (!stopped) {
416 try {
417 cycle();
418 } catch (Exception e) {
419 log.warn("Exception during cycle", e);
420 }
421 }
422 clear();
423 }
424
425 private Iterable<Intent> subset(Set<Intent> intents) {
426 List<Intent> subset = Lists.newArrayList(intents);
427 Collections.shuffle(subset);
428 return subset.subList(0, lastCount);
429 }
430
431 // Submits the specified intent.
432 private void submit(Intent intent) {
433 intentService.submit(intent);
434 submitted.add(intent);
435 withdrawn.remove(intent); //TODO could check result here...
436 }
437
438 // Withdraws the specified intent.
439 private void withdraw(Intent intent) {
440 intentService.withdraw(intent);
441 withdrawn.add(intent);
442 submitted.remove(intent); //TODO could check result here...
443 }
444
445 // Primes the cycle.
446 private void prime() {
447 int i = 0;
448 withdrawn.addAll(intents);
449 for (Intent intent : intents) {
450 submit(intent);
451 // only submit half of the intents to start
452 if (i++ >= intents.size() / 2) {
453 break;
454 }
455 }
456 }
457
458 private void clear() {
459 submitted.forEach(this::withdraw);
460 }
461
462 // Runs a single operation cycle.
463 private void cycle() {
464 //TODO consider running without rate adjustment
465 adjustRates();
466
467 long start = currentTimeMillis();
468 subset(submitted).forEach(this::withdraw);
469 subset(withdrawn).forEach(this::submit);
470 long delta = currentTimeMillis() - start;
471
472 if (delta > cyclePeriod * 3 || delta < 0) {
473 log.warn("Cycle took {} ms", delta);
474 }
475
476 int difference = cyclePeriod - (int) delta;
477 if (difference > 0) {
478 delay(difference);
479 }
480
481 lastDuration = delta;
482 }
483
484 int cycleCount = 0;
485
486 private void adjustRates() {
487
488 int addDelta = Math.max(1000 - cycleCount, 10);
489 double multRatio = Math.min(0.8 + cycleCount * 0.0002, 0.995);
490
491 //FIXME need to iron out the rate adjustment
492 //FIXME we should taper the adjustments over time
493 //FIXME don't just use the lastDuration, take an average
494 if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
495 if (listener.requestThroughput() - listener.processedThroughput() <= 2000 && //was 500
496 lastDuration <= cyclePeriod) {
497 lastCount = Math.min(lastCount + addDelta, intents.size() / 2);
498 } else {
499 lastCount *= multRatio;
500 }
501 log.info("last count: {}, last duration: {} ms (sub: {} vs inst: {})",
502 lastCount, lastDuration, listener.requestThroughput(), listener.processedThroughput());
503 }
504
505 }
506 }
507
508 // Event listener to monitor throughput.
509 final class Listener implements IntentListener {
510
511 private final Counter runningTotal = new Counter();
512 private volatile Map<IntentEvent.Type, Counter> counters;
513
514 private volatile double processedThroughput = 0;
515 private volatile double requestThroughput = 0;
516
517 public Listener() {
518 counters = initCounters();
519 }
520
521 private Map<IntentEvent.Type, Counter> initCounters() {
522 Map<IntentEvent.Type, Counter> map = Maps.newHashMap();
523 for (IntentEvent.Type type : IntentEvent.Type.values()) {
524 map.put(type, new Counter());
525 }
526 return map;
527 }
528
529 public double processedThroughput() {
530 return processedThroughput;
531 }
532
533 public double requestThroughput() {
534 return requestThroughput;
535 }
536
537 @Override
538 public void event(IntentEvent event) {
539 if (event.subject().appId().equals(appId)) {
540 counters.get(event.type()).add(1);
541 }
542 }
543
544 public void report() {
545 Map<IntentEvent.Type, Counter> reportCounters = counters;
546 counters = initCounters();
547
548 // update running total and latest throughput
549 Counter installed = reportCounters.get(INSTALLED);
550 Counter withdrawn = reportCounters.get(WITHDRAWN);
551 processedThroughput = installed.throughput() + withdrawn.throughput();
552 runningTotal.add(installed.total() + withdrawn.total());
553
554 Counter installReq = reportCounters.get(INSTALL_REQ);
555 Counter withdrawReq = reportCounters.get(WITHDRAW_REQ);
556 requestThroughput = installReq.throughput() + withdrawReq.throughput();
557
558 // build the string to report
559 StringBuilder stringBuilder = new StringBuilder();
560 for (IntentEvent.Type type : IntentEvent.Type.values()) {
561 Counter counter = reportCounters.get(type);
562 stringBuilder.append(format("%s=%.2f;", type, counter.throughput()));
563 }
564 log.info("Throughput: OVERALL={}; CURRENT={}; {}",
565 format("%.2f", runningTotal.throughput()),
566 format("%.2f", processedThroughput),
567 stringBuilder);
568
569 sampleCollector.recordSample(runningTotal.throughput(),
570 processedThroughput);
571 }
572 }
573
Sho SHIMIZUc032c832016-01-13 13:02:05 -0800574 private class InternalControl implements Consumer<String> {
Brian O'Connora468e902015-03-18 16:43:49 -0700575 @Override
Sho SHIMIZUc032c832016-01-13 13:02:05 -0800576 public void accept(String cmd) {
Brian O'Connora468e902015-03-18 16:43:49 -0700577 log.info("Received command {}", cmd);
578 if (cmd.equals(START)) {
579 startTestRun();
580 } else {
581 stopTestRun();
582 }
583 }
584 }
585
586 private class ReporterTask extends TimerTask {
587 @Override
588 public void run() {
589 //adjustRates(); // FIXME we currently adjust rates in the cycle thread
590 listener.report();
591 }
592 }
593
594}