blob: 96f2b32d5954e9f95f71ba880a7158f525985aa5 [file] [log] [blame]
Brian O'Connora468e902015-03-18 16:43:49 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.intentperf;
17
18import com.google.common.collect.ArrayListMultimap;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
21import com.google.common.collect.Multimap;
22import com.google.common.collect.Sets;
23import org.apache.commons.lang.math.RandomUtils;
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Modified;
28import org.apache.felix.scr.annotations.Property;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onlab.packet.MacAddress;
33import org.onlab.util.Counter;
34import org.onosproject.cfg.ComponentConfigService;
35import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.ControllerNode;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.ConnectPoint;
42import org.onosproject.net.Device;
43import org.onosproject.net.PortNumber;
44import org.onosproject.net.device.DeviceService;
45import org.onosproject.net.flow.DefaultTrafficSelector;
46import org.onosproject.net.flow.DefaultTrafficTreatment;
47import org.onosproject.net.flow.TrafficSelector;
48import org.onosproject.net.flow.TrafficTreatment;
49import org.onosproject.net.intent.Intent;
50import org.onosproject.net.intent.IntentEvent;
51import org.onosproject.net.intent.IntentListener;
52import org.onosproject.net.intent.IntentService;
53import org.onosproject.net.intent.Key;
54import org.onosproject.net.intent.PartitionService;
55import org.onosproject.net.intent.PointToPointIntent;
56import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
57import org.onosproject.store.cluster.messaging.ClusterMessage;
58import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
59import org.onosproject.store.cluster.messaging.MessageSubject;
60import org.osgi.service.component.ComponentContext;
61import org.slf4j.Logger;
62
63import java.util.ArrayList;
64import java.util.Collections;
65import java.util.Dictionary;
66import java.util.List;
67import java.util.Map;
68import java.util.Set;
69import java.util.Timer;
70import java.util.TimerTask;
71import java.util.concurrent.ExecutorService;
72import java.util.concurrent.Executors;
73import java.util.concurrent.TimeUnit;
74import java.util.stream.Collectors;
75
76import static com.google.common.base.Preconditions.checkState;
77import static com.google.common.base.Strings.isNullOrEmpty;
78import static java.lang.String.format;
79import static java.lang.System.currentTimeMillis;
80import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
81import static org.onlab.util.Tools.*;
82import static org.onosproject.net.intent.IntentEvent.Type.*;
83import static org.slf4j.LoggerFactory.getLogger;
84
85/**
86 * Application to test sustained intent throughput.
87 */
88@Component(immediate = true)
89@Service(value = IntentPerfInstaller.class)
90public class IntentPerfInstaller {
91
92 private final Logger log = getLogger(getClass());
93
94 private static final int DEFAULT_NUM_WORKERS = 1;
95
96 private static final int DEFAULT_NUM_KEYS = 40000;
97 private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1000; //ms
98
99 private static final int DEFAULT_NUM_NEIGHBORS = 0;
100
101 private static final int START_DELAY = 5_000; // ms
Thomas Vachuska95aadff2015-03-26 11:45:41 -0700102 private static final int REPORT_PERIOD = 1_000; //ms
Brian O'Connora468e902015-03-18 16:43:49 -0700103
104 private static final String START = "start";
105 private static final String STOP = "stop";
106 private static final MessageSubject CONTROL = new MessageSubject("intent-perf-ctl");
107
108 //FIXME add path length
109
110 @Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
111 label = "Number of keys (i.e. unique intents) to generate per instance")
112 private int numKeys = DEFAULT_NUM_KEYS;
113
114 //TODO implement numWorkers property
115// @Property(name = "numThreads", intValue = DEFAULT_NUM_WORKERS,
116// label = "Number of installer threads per instance")
117// private int numWokers = DEFAULT_NUM_WORKERS;
118
119 @Property(name = "cyclePeriod", intValue = DEFAULT_GOAL_CYCLE_PERIOD,
120 label = "Goal for cycle period (in ms)")
121 private int cyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
122
123 @Property(name = "numNeighbors", intValue = DEFAULT_NUM_NEIGHBORS,
124 label = "Number of neighbors to generate intents for")
125 private int numNeighbors = DEFAULT_NUM_NEIGHBORS;
126
127 @Reference(cardinality = MANDATORY_UNARY)
128 protected CoreService coreService;
129
130 @Reference(cardinality = MANDATORY_UNARY)
131 protected IntentService intentService;
132
133 @Reference(cardinality = MANDATORY_UNARY)
134 protected ClusterService clusterService;
135
136 @Reference(cardinality = MANDATORY_UNARY)
137 protected DeviceService deviceService;
138
139 @Reference(cardinality = MANDATORY_UNARY)
140 protected MastershipService mastershipService;
141
142 @Reference(cardinality = MANDATORY_UNARY)
143 protected PartitionService partitionService;
144
145 @Reference(cardinality = MANDATORY_UNARY)
146 protected ComponentConfigService configService;
147
148 @Reference(cardinality = MANDATORY_UNARY)
149 protected IntentPerfCollector sampleCollector;
150
151 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
152 protected ClusterCommunicationService communicationService;
153
154 private ExecutorService messageHandlingExecutor;
155
156 private ExecutorService workers;
157 private ApplicationId appId;
158 private Listener listener;
159 private boolean stopped;
160
161 private Timer reportTimer;
162
163 // FIXME this variable isn't shared properly between multiple worker threads
164 private int lastKey = 0;
165
166 private IntentPerfUi perfUi;
167 private NodeId nodeId;
168 private TimerTask reporterTask;
169
170 @Activate
171 public void activate(ComponentContext context) {
172 configService.registerProperties(getClass());
173
174 nodeId = clusterService.getLocalNode().id();
175 appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId.toString());
176
177 // TODO: replace with shared timer
178 reportTimer = new Timer("onos-intent-perf-reporter");
179 workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
180
181 // disable flow backups for testing
182 configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore",
183 "backupEnabled", "false");
184
185 // TODO: replace with shared executor
186 messageHandlingExecutor = Executors.newSingleThreadExecutor(
187 groupedThreads("onos/perf", "command-handler"));
188
189 communicationService.addSubscriber(CONTROL, new InternalControl(),
190 messageHandlingExecutor);
191
192 listener = new Listener();
193 intentService.addListener(listener);
194
195 // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
196 modify(context);
197 }
198
199 @Deactivate
200 public void deactivate() {
201 stopTestRun();
202
203 configService.unregisterProperties(getClass(), false);
204 messageHandlingExecutor.shutdown();
205 communicationService.removeSubscriber(CONTROL);
206
207 if (listener != null) {
208 reportTimer.cancel();
209 intentService.removeListener(listener);
210 listener = null;
211 reportTimer = null;
212 }
213 }
214
215 @Modified
216 public void modify(ComponentContext context) {
217 if (context == null) {
218 logConfig("Reconfigured");
219 return;
220 }
221
222 Dictionary<?, ?> properties = context.getProperties();
223 int newNumKeys, newCyclePeriod, newNumNeighbors;
224 try {
225 String s = get(properties, "numKeys");
226 newNumKeys = isNullOrEmpty(s) ? numKeys : Integer.parseInt(s.trim());
227
228 s = get(properties, "cyclePeriod");
229 newCyclePeriod = isNullOrEmpty(s) ? cyclePeriod : Integer.parseInt(s.trim());
230
231 s = get(properties, "numNeighbors");
232 newNumNeighbors = isNullOrEmpty(s) ? numNeighbors : Integer.parseInt(s.trim());
233
234 } catch (NumberFormatException | ClassCastException e) {
235 log.warn("Malformed configuration detected; using defaults", e);
236 newNumKeys = DEFAULT_NUM_KEYS;
237 newCyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
238 newNumNeighbors = DEFAULT_NUM_NEIGHBORS;
239 }
240
241 if (newNumKeys != numKeys || newCyclePeriod != cyclePeriod || newNumNeighbors != numNeighbors) {
242 numKeys = newNumKeys;
243 cyclePeriod = newCyclePeriod;
244 numNeighbors = newNumNeighbors;
245 logConfig("Reconfigured");
246 }
247 }
248
249 public void start() {
250 communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, START.getBytes()));
251 startTestRun();
252 }
253
254 public void stop() {
255 communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, STOP.getBytes()));
256 stopTestRun();
257 }
258
259 private void logConfig(String prefix) {
260 log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
261 prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
262 }
263
264 private void startTestRun() {
265 sampleCollector.clearSamples();
266
267 // adjust numNeighbors and generate list of neighbors
268 numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
269
270 // Schedule reporter task on report period boundary
271 reporterTask = new ReporterTask();
272 reportTimer.scheduleAtFixedRate(reporterTask,
273 REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
274 REPORT_PERIOD);
275
276 // Submit workers
277 stopped = false;
278 for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
279 workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
280 }
281 log.info("Started test run");
282 }
283
284 private void stopTestRun() {
285 stopped = true;
286 if (reporterTask != null) {
287 reporterTask.cancel();
288 reporterTask = null;
289 }
290
291 try {
292 workers.awaitTermination(5 * cyclePeriod, TimeUnit.MILLISECONDS);
293 } catch (InterruptedException e) {
294 log.warn("Failed to stop worker", e);
295 }
296 log.info("Stopped test run");
297 }
298
299 private List<NodeId> getNeighbors() {
300 List<NodeId> nodes = clusterService.getNodes().stream()
301 .map(ControllerNode::id)
302 .collect(Collectors.toCollection(ArrayList::new));
303 // sort neighbors by id
304 Collections.sort(nodes, (node1, node2) ->
305 node1.toString().compareTo(node2.toString()));
306 // rotate the local node to index 0
307 Collections.rotate(nodes, -1 * nodes.indexOf(clusterService.getLocalNode().id()));
308 log.debug("neighbors (raw): {}", nodes); //TODO remove
309 // generate the sub-list that will contain local node and selected neighbors
310 nodes = nodes.subList(0, numNeighbors + 1);
311 log.debug("neighbors: {}", nodes); //TODO remove
312 return nodes;
313 }
314
315 private Intent createIntent(Key key, long mac, NodeId node, Multimap<NodeId, Device> devices) {
316 // choose a random device for which this node is master
317 List<Device> deviceList = devices.get(node).stream().collect(Collectors.toList());
318 Device device = deviceList.get(RandomUtils.nextInt(deviceList.size()));
319
320 //FIXME we currently ignore the path length and always use the same device
321 TrafficSelector selector = DefaultTrafficSelector.builder()
322 .matchEthDst(MacAddress.valueOf(mac)).build();
323 TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
324 ConnectPoint ingress = new ConnectPoint(device.id(), PortNumber.portNumber(1));
325 ConnectPoint egress = new ConnectPoint(device.id(), PortNumber.portNumber(2));
326
327 return PointToPointIntent.builder()
328 .appId(appId)
329 .key(key)
330 .selector(selector)
331 .treatment(treatment)
332 .ingressPoint(ingress)
333 .egressPoint(egress)
334 .build();
335 }
336
337 /**
338 * Creates a specified number of intents for testing purposes.
339 *
340 * @param numberOfKeys number of intents
341 * @param pathLength path depth
342 * @param firstKey first key to attempt
343 * @return set of intents
344 */
345 private Set<Intent> createIntents(int numberOfKeys, int pathLength, int firstKey) {
346 List<NodeId> neighbors = getNeighbors();
347
348 Multimap<NodeId, Device> devices = ArrayListMultimap.create();
349 deviceService.getAvailableDevices()
350 .forEach(device -> devices.put(mastershipService.getMasterFor(device.id()), device));
351
352 // ensure that we have at least one device per neighbor
353 neighbors.forEach(node -> checkState(devices.get(node).size() > 0,
354 "There are no devices for {}", node));
355
356 // TODO pull this outside so that createIntent can use it
357 // prefix based on node id for keys generated on this instance
358 long keyPrefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;
359
360 int maxKeysPerNode = (int) Math.ceil((double) numberOfKeys / neighbors.size());
361 Multimap<NodeId, Intent> intents = ArrayListMultimap.create();
362
363 for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
364 Key key = Key.of(keyPrefix + k, appId);
365
366 NodeId leader = partitionService.getLeader(key);
367 if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) {
368 // Bail if we are not sending to this node or we have enough for this node
369 continue;
370 }
371 intents.put(leader, createIntent(key, keyPrefix + k, leader, devices));
372
373 // Bump up the counter and remember this as the last key used.
374 count++;
375 lastKey = k;
376 if (count % 1000 == 0) {
377 log.info("Building intents... {} (attempt: {})", count, lastKey);
378 }
379 }
380 checkState(intents.values().size() == numberOfKeys,
381 "Generated wrong number of intents");
382 log.info("Created {} intents", numberOfKeys);
383 intents.keySet().forEach(node -> log.info("\t{}\t{}", node, intents.get(node).size()));
384
385 return Sets.newHashSet(intents.values());
386 }
387
388 // Submits intent operations.
389 final class Submitter implements Runnable {
390
391 private long lastDuration;
392 private int lastCount;
393
394 private Set<Intent> intents = Sets.newHashSet();
395 private Set<Intent> submitted = Sets.newHashSet();
396 private Set<Intent> withdrawn = Sets.newHashSet();
397
398 private Submitter(Set<Intent> intents) {
399 this.intents = intents;
400 lastCount = numKeys / 4;
401 lastDuration = 1_000; // 1 second
402 }
403
404 @Override
405 public void run() {
406 prime();
407 while (!stopped) {
408 try {
409 cycle();
410 } catch (Exception e) {
411 log.warn("Exception during cycle", e);
412 }
413 }
414 clear();
415 }
416
417 private Iterable<Intent> subset(Set<Intent> intents) {
418 List<Intent> subset = Lists.newArrayList(intents);
419 Collections.shuffle(subset);
420 return subset.subList(0, lastCount);
421 }
422
423 // Submits the specified intent.
424 private void submit(Intent intent) {
425 intentService.submit(intent);
426 submitted.add(intent);
427 withdrawn.remove(intent); //TODO could check result here...
428 }
429
430 // Withdraws the specified intent.
431 private void withdraw(Intent intent) {
432 intentService.withdraw(intent);
433 withdrawn.add(intent);
434 submitted.remove(intent); //TODO could check result here...
435 }
436
437 // Primes the cycle.
438 private void prime() {
439 int i = 0;
440 withdrawn.addAll(intents);
441 for (Intent intent : intents) {
442 submit(intent);
443 // only submit half of the intents to start
444 if (i++ >= intents.size() / 2) {
445 break;
446 }
447 }
448 }
449
450 private void clear() {
451 submitted.forEach(this::withdraw);
452 }
453
454 // Runs a single operation cycle.
455 private void cycle() {
456 //TODO consider running without rate adjustment
457 adjustRates();
458
459 long start = currentTimeMillis();
460 subset(submitted).forEach(this::withdraw);
461 subset(withdrawn).forEach(this::submit);
462 long delta = currentTimeMillis() - start;
463
464 if (delta > cyclePeriod * 3 || delta < 0) {
465 log.warn("Cycle took {} ms", delta);
466 }
467
468 int difference = cyclePeriod - (int) delta;
469 if (difference > 0) {
470 delay(difference);
471 }
472
473 lastDuration = delta;
474 }
475
476 int cycleCount = 0;
477
478 private void adjustRates() {
479
480 int addDelta = Math.max(1000 - cycleCount, 10);
481 double multRatio = Math.min(0.8 + cycleCount * 0.0002, 0.995);
482
483 //FIXME need to iron out the rate adjustment
484 //FIXME we should taper the adjustments over time
485 //FIXME don't just use the lastDuration, take an average
486 if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
487 if (listener.requestThroughput() - listener.processedThroughput() <= 2000 && //was 500
488 lastDuration <= cyclePeriod) {
489 lastCount = Math.min(lastCount + addDelta, intents.size() / 2);
490 } else {
491 lastCount *= multRatio;
492 }
493 log.info("last count: {}, last duration: {} ms (sub: {} vs inst: {})",
494 lastCount, lastDuration, listener.requestThroughput(), listener.processedThroughput());
495 }
496
497 }
498 }
499
500 // Event listener to monitor throughput.
501 final class Listener implements IntentListener {
502
503 private final Counter runningTotal = new Counter();
504 private volatile Map<IntentEvent.Type, Counter> counters;
505
506 private volatile double processedThroughput = 0;
507 private volatile double requestThroughput = 0;
508
509 public Listener() {
510 counters = initCounters();
511 }
512
513 private Map<IntentEvent.Type, Counter> initCounters() {
514 Map<IntentEvent.Type, Counter> map = Maps.newHashMap();
515 for (IntentEvent.Type type : IntentEvent.Type.values()) {
516 map.put(type, new Counter());
517 }
518 return map;
519 }
520
521 public double processedThroughput() {
522 return processedThroughput;
523 }
524
525 public double requestThroughput() {
526 return requestThroughput;
527 }
528
529 @Override
530 public void event(IntentEvent event) {
531 if (event.subject().appId().equals(appId)) {
532 counters.get(event.type()).add(1);
533 }
534 }
535
536 public void report() {
537 Map<IntentEvent.Type, Counter> reportCounters = counters;
538 counters = initCounters();
539
540 // update running total and latest throughput
541 Counter installed = reportCounters.get(INSTALLED);
542 Counter withdrawn = reportCounters.get(WITHDRAWN);
543 processedThroughput = installed.throughput() + withdrawn.throughput();
544 runningTotal.add(installed.total() + withdrawn.total());
545
546 Counter installReq = reportCounters.get(INSTALL_REQ);
547 Counter withdrawReq = reportCounters.get(WITHDRAW_REQ);
548 requestThroughput = installReq.throughput() + withdrawReq.throughput();
549
550 // build the string to report
551 StringBuilder stringBuilder = new StringBuilder();
552 for (IntentEvent.Type type : IntentEvent.Type.values()) {
553 Counter counter = reportCounters.get(type);
554 stringBuilder.append(format("%s=%.2f;", type, counter.throughput()));
555 }
556 log.info("Throughput: OVERALL={}; CURRENT={}; {}",
557 format("%.2f", runningTotal.throughput()),
558 format("%.2f", processedThroughput),
559 stringBuilder);
560
561 sampleCollector.recordSample(runningTotal.throughput(),
562 processedThroughput);
563 }
564 }
565
566 private class InternalControl implements ClusterMessageHandler {
567 @Override
568 public void handle(ClusterMessage message) {
569 String cmd = new String(message.payload());
570 log.info("Received command {}", cmd);
571 if (cmd.equals(START)) {
572 startTestRun();
573 } else {
574 stopTestRun();
575 }
576 }
577 }
578
579 private class ReporterTask extends TimerTask {
580 @Override
581 public void run() {
582 //adjustRates(); // FIXME we currently adjust rates in the cycle thread
583 listener.report();
584 }
585 }
586
587}