blob: de9e9f217025398e579701fcf4885964d5f67e1d [file] [log] [blame]
Brian O'Connora468e902015-03-18 16:43:49 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.intentperf;
17
18import com.google.common.collect.ArrayListMultimap;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
21import com.google.common.collect.Multimap;
22import com.google.common.collect.Sets;
23import org.apache.commons.lang.math.RandomUtils;
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Modified;
28import org.apache.felix.scr.annotations.Property;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onlab.packet.MacAddress;
33import org.onlab.util.Counter;
34import org.onosproject.cfg.ComponentConfigService;
35import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.ControllerNode;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.ConnectPoint;
42import org.onosproject.net.Device;
43import org.onosproject.net.PortNumber;
44import org.onosproject.net.device.DeviceService;
45import org.onosproject.net.flow.DefaultTrafficSelector;
46import org.onosproject.net.flow.DefaultTrafficTreatment;
47import org.onosproject.net.flow.TrafficSelector;
48import org.onosproject.net.flow.TrafficTreatment;
49import org.onosproject.net.intent.Intent;
50import org.onosproject.net.intent.IntentEvent;
51import org.onosproject.net.intent.IntentListener;
52import org.onosproject.net.intent.IntentService;
53import org.onosproject.net.intent.Key;
54import org.onosproject.net.intent.PartitionService;
55import org.onosproject.net.intent.PointToPointIntent;
56import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
57import org.onosproject.store.cluster.messaging.ClusterMessage;
58import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
59import org.onosproject.store.cluster.messaging.MessageSubject;
60import org.osgi.service.component.ComponentContext;
61import org.slf4j.Logger;
62
63import java.util.ArrayList;
64import java.util.Collections;
65import java.util.Dictionary;
66import java.util.List;
67import java.util.Map;
68import java.util.Set;
69import java.util.Timer;
70import java.util.TimerTask;
71import java.util.concurrent.ExecutorService;
72import java.util.concurrent.Executors;
73import java.util.concurrent.TimeUnit;
74import java.util.stream.Collectors;
75
76import static com.google.common.base.Preconditions.checkState;
77import static com.google.common.base.Strings.isNullOrEmpty;
78import static java.lang.String.format;
79import static java.lang.System.currentTimeMillis;
80import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
81import static org.onlab.util.Tools.*;
82import static org.onosproject.net.intent.IntentEvent.Type.*;
83import static org.slf4j.LoggerFactory.getLogger;
84
85/**
86 * Application to test sustained intent throughput.
87 */
88@Component(immediate = true)
89@Service(value = IntentPerfInstaller.class)
90public class IntentPerfInstaller {
91
92 private final Logger log = getLogger(getClass());
93
94 private static final int DEFAULT_NUM_WORKERS = 1;
95
96 private static final int DEFAULT_NUM_KEYS = 40000;
97 private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1000; //ms
98
99 private static final int DEFAULT_NUM_NEIGHBORS = 0;
100
101 private static final int START_DELAY = 5_000; // ms
Thomas Vachuska95aadff2015-03-26 11:45:41 -0700102 private static final int REPORT_PERIOD = 1_000; //ms
Brian O'Connora468e902015-03-18 16:43:49 -0700103
104 private static final String START = "start";
105 private static final String STOP = "stop";
106 private static final MessageSubject CONTROL = new MessageSubject("intent-perf-ctl");
107
108 //FIXME add path length
109
110 @Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
111 label = "Number of keys (i.e. unique intents) to generate per instance")
112 private int numKeys = DEFAULT_NUM_KEYS;
113
114 //TODO implement numWorkers property
115// @Property(name = "numThreads", intValue = DEFAULT_NUM_WORKERS,
116// label = "Number of installer threads per instance")
117// private int numWokers = DEFAULT_NUM_WORKERS;
118
119 @Property(name = "cyclePeriod", intValue = DEFAULT_GOAL_CYCLE_PERIOD,
120 label = "Goal for cycle period (in ms)")
121 private int cyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
122
123 @Property(name = "numNeighbors", intValue = DEFAULT_NUM_NEIGHBORS,
124 label = "Number of neighbors to generate intents for")
125 private int numNeighbors = DEFAULT_NUM_NEIGHBORS;
126
127 @Reference(cardinality = MANDATORY_UNARY)
128 protected CoreService coreService;
129
130 @Reference(cardinality = MANDATORY_UNARY)
131 protected IntentService intentService;
132
133 @Reference(cardinality = MANDATORY_UNARY)
134 protected ClusterService clusterService;
135
136 @Reference(cardinality = MANDATORY_UNARY)
137 protected DeviceService deviceService;
138
139 @Reference(cardinality = MANDATORY_UNARY)
140 protected MastershipService mastershipService;
141
142 @Reference(cardinality = MANDATORY_UNARY)
143 protected PartitionService partitionService;
144
145 @Reference(cardinality = MANDATORY_UNARY)
146 protected ComponentConfigService configService;
147
148 @Reference(cardinality = MANDATORY_UNARY)
149 protected IntentPerfCollector sampleCollector;
150
151 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
152 protected ClusterCommunicationService communicationService;
153
154 private ExecutorService messageHandlingExecutor;
155
156 private ExecutorService workers;
157 private ApplicationId appId;
158 private Listener listener;
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700159 private boolean stopped = true;
Brian O'Connora468e902015-03-18 16:43:49 -0700160
161 private Timer reportTimer;
162
163 // FIXME this variable isn't shared properly between multiple worker threads
164 private int lastKey = 0;
165
166 private IntentPerfUi perfUi;
167 private NodeId nodeId;
168 private TimerTask reporterTask;
169
170 @Activate
171 public void activate(ComponentContext context) {
172 configService.registerProperties(getClass());
173
174 nodeId = clusterService.getLocalNode().id();
175 appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId.toString());
176
177 // TODO: replace with shared timer
178 reportTimer = new Timer("onos-intent-perf-reporter");
179 workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
180
181 // disable flow backups for testing
182 configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore",
183 "backupEnabled", "false");
184
185 // TODO: replace with shared executor
186 messageHandlingExecutor = Executors.newSingleThreadExecutor(
187 groupedThreads("onos/perf", "command-handler"));
188
189 communicationService.addSubscriber(CONTROL, new InternalControl(),
190 messageHandlingExecutor);
191
192 listener = new Listener();
193 intentService.addListener(listener);
194
195 // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
196 modify(context);
197 }
198
199 @Deactivate
200 public void deactivate() {
201 stopTestRun();
202
203 configService.unregisterProperties(getClass(), false);
204 messageHandlingExecutor.shutdown();
205 communicationService.removeSubscriber(CONTROL);
206
207 if (listener != null) {
208 reportTimer.cancel();
209 intentService.removeListener(listener);
210 listener = null;
211 reportTimer = null;
212 }
213 }
214
215 @Modified
216 public void modify(ComponentContext context) {
217 if (context == null) {
218 logConfig("Reconfigured");
219 return;
220 }
221
222 Dictionary<?, ?> properties = context.getProperties();
223 int newNumKeys, newCyclePeriod, newNumNeighbors;
224 try {
225 String s = get(properties, "numKeys");
226 newNumKeys = isNullOrEmpty(s) ? numKeys : Integer.parseInt(s.trim());
227
228 s = get(properties, "cyclePeriod");
229 newCyclePeriod = isNullOrEmpty(s) ? cyclePeriod : Integer.parseInt(s.trim());
230
231 s = get(properties, "numNeighbors");
232 newNumNeighbors = isNullOrEmpty(s) ? numNeighbors : Integer.parseInt(s.trim());
233
234 } catch (NumberFormatException | ClassCastException e) {
235 log.warn("Malformed configuration detected; using defaults", e);
236 newNumKeys = DEFAULT_NUM_KEYS;
237 newCyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
238 newNumNeighbors = DEFAULT_NUM_NEIGHBORS;
239 }
240
241 if (newNumKeys != numKeys || newCyclePeriod != cyclePeriod || newNumNeighbors != numNeighbors) {
242 numKeys = newNumKeys;
243 cyclePeriod = newCyclePeriod;
244 numNeighbors = newNumNeighbors;
245 logConfig("Reconfigured");
246 }
247 }
248
249 public void start() {
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700250 if (stopped) {
251 stopped = false;
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700252 communicationService.broadcast(START, CONTROL, str -> str.getBytes());
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700253 startTestRun();
254 }
Brian O'Connora468e902015-03-18 16:43:49 -0700255 }
256
257 public void stop() {
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700258 if (!stopped) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700259 communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700260 stopTestRun();
261 }
Brian O'Connora468e902015-03-18 16:43:49 -0700262 }
263
264 private void logConfig(String prefix) {
265 log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
266 prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
267 }
268
269 private void startTestRun() {
270 sampleCollector.clearSamples();
271
272 // adjust numNeighbors and generate list of neighbors
273 numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
274
275 // Schedule reporter task on report period boundary
276 reporterTask = new ReporterTask();
277 reportTimer.scheduleAtFixedRate(reporterTask,
278 REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
279 REPORT_PERIOD);
280
281 // Submit workers
282 stopped = false;
283 for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
284 workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
285 }
286 log.info("Started test run");
287 }
288
289 private void stopTestRun() {
Brian O'Connora468e902015-03-18 16:43:49 -0700290 if (reporterTask != null) {
291 reporterTask.cancel();
292 reporterTask = null;
293 }
294
295 try {
296 workers.awaitTermination(5 * cyclePeriod, TimeUnit.MILLISECONDS);
297 } catch (InterruptedException e) {
298 log.warn("Failed to stop worker", e);
299 }
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700300
301 sampleCollector.recordSample(0, 0);
302 sampleCollector.recordSample(0, 0);
303 stopped = true;
304
Brian O'Connora468e902015-03-18 16:43:49 -0700305 log.info("Stopped test run");
306 }
307
308 private List<NodeId> getNeighbors() {
309 List<NodeId> nodes = clusterService.getNodes().stream()
310 .map(ControllerNode::id)
311 .collect(Collectors.toCollection(ArrayList::new));
312 // sort neighbors by id
313 Collections.sort(nodes, (node1, node2) ->
314 node1.toString().compareTo(node2.toString()));
315 // rotate the local node to index 0
316 Collections.rotate(nodes, -1 * nodes.indexOf(clusterService.getLocalNode().id()));
317 log.debug("neighbors (raw): {}", nodes); //TODO remove
318 // generate the sub-list that will contain local node and selected neighbors
319 nodes = nodes.subList(0, numNeighbors + 1);
320 log.debug("neighbors: {}", nodes); //TODO remove
321 return nodes;
322 }
323
324 private Intent createIntent(Key key, long mac, NodeId node, Multimap<NodeId, Device> devices) {
325 // choose a random device for which this node is master
326 List<Device> deviceList = devices.get(node).stream().collect(Collectors.toList());
327 Device device = deviceList.get(RandomUtils.nextInt(deviceList.size()));
328
329 //FIXME we currently ignore the path length and always use the same device
330 TrafficSelector selector = DefaultTrafficSelector.builder()
331 .matchEthDst(MacAddress.valueOf(mac)).build();
332 TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
333 ConnectPoint ingress = new ConnectPoint(device.id(), PortNumber.portNumber(1));
334 ConnectPoint egress = new ConnectPoint(device.id(), PortNumber.portNumber(2));
335
336 return PointToPointIntent.builder()
337 .appId(appId)
338 .key(key)
339 .selector(selector)
340 .treatment(treatment)
341 .ingressPoint(ingress)
342 .egressPoint(egress)
343 .build();
344 }
345
346 /**
347 * Creates a specified number of intents for testing purposes.
348 *
349 * @param numberOfKeys number of intents
350 * @param pathLength path depth
351 * @param firstKey first key to attempt
352 * @return set of intents
353 */
354 private Set<Intent> createIntents(int numberOfKeys, int pathLength, int firstKey) {
355 List<NodeId> neighbors = getNeighbors();
356
357 Multimap<NodeId, Device> devices = ArrayListMultimap.create();
358 deviceService.getAvailableDevices()
359 .forEach(device -> devices.put(mastershipService.getMasterFor(device.id()), device));
360
361 // ensure that we have at least one device per neighbor
362 neighbors.forEach(node -> checkState(devices.get(node).size() > 0,
363 "There are no devices for {}", node));
364
365 // TODO pull this outside so that createIntent can use it
366 // prefix based on node id for keys generated on this instance
367 long keyPrefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;
368
369 int maxKeysPerNode = (int) Math.ceil((double) numberOfKeys / neighbors.size());
370 Multimap<NodeId, Intent> intents = ArrayListMultimap.create();
371
372 for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
373 Key key = Key.of(keyPrefix + k, appId);
374
375 NodeId leader = partitionService.getLeader(key);
376 if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) {
377 // Bail if we are not sending to this node or we have enough for this node
378 continue;
379 }
380 intents.put(leader, createIntent(key, keyPrefix + k, leader, devices));
381
382 // Bump up the counter and remember this as the last key used.
383 count++;
384 lastKey = k;
385 if (count % 1000 == 0) {
386 log.info("Building intents... {} (attempt: {})", count, lastKey);
387 }
388 }
389 checkState(intents.values().size() == numberOfKeys,
390 "Generated wrong number of intents");
391 log.info("Created {} intents", numberOfKeys);
392 intents.keySet().forEach(node -> log.info("\t{}\t{}", node, intents.get(node).size()));
393
394 return Sets.newHashSet(intents.values());
395 }
396
397 // Submits intent operations.
398 final class Submitter implements Runnable {
399
400 private long lastDuration;
401 private int lastCount;
402
403 private Set<Intent> intents = Sets.newHashSet();
404 private Set<Intent> submitted = Sets.newHashSet();
405 private Set<Intent> withdrawn = Sets.newHashSet();
406
407 private Submitter(Set<Intent> intents) {
408 this.intents = intents;
409 lastCount = numKeys / 4;
410 lastDuration = 1_000; // 1 second
411 }
412
413 @Override
414 public void run() {
415 prime();
416 while (!stopped) {
417 try {
418 cycle();
419 } catch (Exception e) {
420 log.warn("Exception during cycle", e);
421 }
422 }
423 clear();
424 }
425
426 private Iterable<Intent> subset(Set<Intent> intents) {
427 List<Intent> subset = Lists.newArrayList(intents);
428 Collections.shuffle(subset);
429 return subset.subList(0, lastCount);
430 }
431
432 // Submits the specified intent.
433 private void submit(Intent intent) {
434 intentService.submit(intent);
435 submitted.add(intent);
436 withdrawn.remove(intent); //TODO could check result here...
437 }
438
439 // Withdraws the specified intent.
440 private void withdraw(Intent intent) {
441 intentService.withdraw(intent);
442 withdrawn.add(intent);
443 submitted.remove(intent); //TODO could check result here...
444 }
445
446 // Primes the cycle.
447 private void prime() {
448 int i = 0;
449 withdrawn.addAll(intents);
450 for (Intent intent : intents) {
451 submit(intent);
452 // only submit half of the intents to start
453 if (i++ >= intents.size() / 2) {
454 break;
455 }
456 }
457 }
458
459 private void clear() {
460 submitted.forEach(this::withdraw);
461 }
462
463 // Runs a single operation cycle.
464 private void cycle() {
465 //TODO consider running without rate adjustment
466 adjustRates();
467
468 long start = currentTimeMillis();
469 subset(submitted).forEach(this::withdraw);
470 subset(withdrawn).forEach(this::submit);
471 long delta = currentTimeMillis() - start;
472
473 if (delta > cyclePeriod * 3 || delta < 0) {
474 log.warn("Cycle took {} ms", delta);
475 }
476
477 int difference = cyclePeriod - (int) delta;
478 if (difference > 0) {
479 delay(difference);
480 }
481
482 lastDuration = delta;
483 }
484
485 int cycleCount = 0;
486
487 private void adjustRates() {
488
489 int addDelta = Math.max(1000 - cycleCount, 10);
490 double multRatio = Math.min(0.8 + cycleCount * 0.0002, 0.995);
491
492 //FIXME need to iron out the rate adjustment
493 //FIXME we should taper the adjustments over time
494 //FIXME don't just use the lastDuration, take an average
495 if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
496 if (listener.requestThroughput() - listener.processedThroughput() <= 2000 && //was 500
497 lastDuration <= cyclePeriod) {
498 lastCount = Math.min(lastCount + addDelta, intents.size() / 2);
499 } else {
500 lastCount *= multRatio;
501 }
502 log.info("last count: {}, last duration: {} ms (sub: {} vs inst: {})",
503 lastCount, lastDuration, listener.requestThroughput(), listener.processedThroughput());
504 }
505
506 }
507 }
508
509 // Event listener to monitor throughput.
510 final class Listener implements IntentListener {
511
512 private final Counter runningTotal = new Counter();
513 private volatile Map<IntentEvent.Type, Counter> counters;
514
515 private volatile double processedThroughput = 0;
516 private volatile double requestThroughput = 0;
517
518 public Listener() {
519 counters = initCounters();
520 }
521
522 private Map<IntentEvent.Type, Counter> initCounters() {
523 Map<IntentEvent.Type, Counter> map = Maps.newHashMap();
524 for (IntentEvent.Type type : IntentEvent.Type.values()) {
525 map.put(type, new Counter());
526 }
527 return map;
528 }
529
530 public double processedThroughput() {
531 return processedThroughput;
532 }
533
534 public double requestThroughput() {
535 return requestThroughput;
536 }
537
538 @Override
539 public void event(IntentEvent event) {
540 if (event.subject().appId().equals(appId)) {
541 counters.get(event.type()).add(1);
542 }
543 }
544
545 public void report() {
546 Map<IntentEvent.Type, Counter> reportCounters = counters;
547 counters = initCounters();
548
549 // update running total and latest throughput
550 Counter installed = reportCounters.get(INSTALLED);
551 Counter withdrawn = reportCounters.get(WITHDRAWN);
552 processedThroughput = installed.throughput() + withdrawn.throughput();
553 runningTotal.add(installed.total() + withdrawn.total());
554
555 Counter installReq = reportCounters.get(INSTALL_REQ);
556 Counter withdrawReq = reportCounters.get(WITHDRAW_REQ);
557 requestThroughput = installReq.throughput() + withdrawReq.throughput();
558
559 // build the string to report
560 StringBuilder stringBuilder = new StringBuilder();
561 for (IntentEvent.Type type : IntentEvent.Type.values()) {
562 Counter counter = reportCounters.get(type);
563 stringBuilder.append(format("%s=%.2f;", type, counter.throughput()));
564 }
565 log.info("Throughput: OVERALL={}; CURRENT={}; {}",
566 format("%.2f", runningTotal.throughput()),
567 format("%.2f", processedThroughput),
568 stringBuilder);
569
570 sampleCollector.recordSample(runningTotal.throughput(),
571 processedThroughput);
572 }
573 }
574
575 private class InternalControl implements ClusterMessageHandler {
576 @Override
577 public void handle(ClusterMessage message) {
578 String cmd = new String(message.payload());
579 log.info("Received command {}", cmd);
580 if (cmd.equals(START)) {
581 startTestRun();
582 } else {
583 stopTestRun();
584 }
585 }
586 }
587
588 private class ReporterTask extends TimerTask {
589 @Override
590 public void run() {
591 //adjustRates(); // FIXME we currently adjust rates in the cycle thread
592 listener.report();
593 }
594 }
595
596}