blob: 0b509f45ae2f067651fe0e971747f4cfd3f1e1de [file] [log] [blame]
Brian O'Connora468e902015-03-18 16:43:49 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Brian O'Connora468e902015-03-18 16:43:49 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.intentperf;
17
18import com.google.common.collect.ArrayListMultimap;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
21import com.google.common.collect.Multimap;
22import com.google.common.collect.Sets;
23import org.apache.commons.lang.math.RandomUtils;
Brian O'Connora468e902015-03-18 16:43:49 -070024import org.onlab.packet.MacAddress;
25import org.onlab.util.Counter;
26import org.onosproject.cfg.ComponentConfigService;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.ControllerNode;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.mastership.MastershipService;
33import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.Device;
Ray Milkeya2cf3a12018-02-15 16:13:56 -080035import org.onosproject.net.FilteredConnectPoint;
Brian O'Connora468e902015-03-18 16:43:49 -070036import org.onosproject.net.PortNumber;
37import org.onosproject.net.device.DeviceService;
38import org.onosproject.net.flow.DefaultTrafficSelector;
39import org.onosproject.net.flow.DefaultTrafficTreatment;
40import org.onosproject.net.flow.TrafficSelector;
41import org.onosproject.net.flow.TrafficTreatment;
42import org.onosproject.net.intent.Intent;
43import org.onosproject.net.intent.IntentEvent;
44import org.onosproject.net.intent.IntentListener;
45import org.onosproject.net.intent.IntentService;
46import org.onosproject.net.intent.Key;
Brian O'Connora468e902015-03-18 16:43:49 -070047import org.onosproject.net.intent.PointToPointIntent;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070048import org.onosproject.net.intent.WorkPartitionService;
Brian O'Connora468e902015-03-18 16:43:49 -070049import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Brian O'Connora468e902015-03-18 16:43:49 -070050import org.onosproject.store.cluster.messaging.MessageSubject;
51import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070052import org.osgi.service.component.annotations.Activate;
53import org.osgi.service.component.annotations.Component;
54import org.osgi.service.component.annotations.Deactivate;
55import org.osgi.service.component.annotations.Modified;
56import org.osgi.service.component.annotations.Reference;
Brian O'Connora468e902015-03-18 16:43:49 -070057import org.slf4j.Logger;
58
59import java.util.ArrayList;
60import java.util.Collections;
61import java.util.Dictionary;
62import java.util.List;
63import java.util.Map;
64import java.util.Set;
65import java.util.Timer;
66import java.util.TimerTask;
67import java.util.concurrent.ExecutorService;
68import java.util.concurrent.Executors;
69import java.util.concurrent.TimeUnit;
Sho SHIMIZUc032c832016-01-13 13:02:05 -080070import java.util.function.Consumer;
Brian O'Connora468e902015-03-18 16:43:49 -070071import java.util.stream.Collectors;
72
73import static com.google.common.base.Preconditions.checkState;
74import static com.google.common.base.Strings.isNullOrEmpty;
75import static java.lang.String.format;
76import static java.lang.System.currentTimeMillis;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070077import static org.onlab.util.Tools.delay;
78import static org.onlab.util.Tools.get;
79import static org.onlab.util.Tools.groupedThreads;
Ray Milkey88dd7e22018-10-24 10:04:03 -070080import static org.onosproject.intentperf.OsgiPropertyConstants.CYCLE_PERIOD;
81import static org.onosproject.intentperf.OsgiPropertyConstants.CYCLE_PERIOD_DEFAULT;
82import static org.onosproject.intentperf.OsgiPropertyConstants.NUM_KEYS;
83import static org.onosproject.intentperf.OsgiPropertyConstants.NUM_KEYS_DEFAULT;
84import static org.onosproject.intentperf.OsgiPropertyConstants.NUM_NEIGHBORS;
85import static org.onosproject.intentperf.OsgiPropertyConstants.NUM_NEIGHBORS_DEFAULT;
86import static org.onosproject.intentperf.OsgiPropertyConstants.NUM_WORKERS;
87import static org.onosproject.intentperf.OsgiPropertyConstants.NUM_WORKERS_DEFAULT;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070088import static org.onosproject.net.intent.IntentEvent.Type.INSTALLED;
89import static org.onosproject.net.intent.IntentEvent.Type.INSTALL_REQ;
90import static org.onosproject.net.intent.IntentEvent.Type.WITHDRAWN;
91import static org.onosproject.net.intent.IntentEvent.Type.WITHDRAW_REQ;
92import static org.osgi.service.component.annotations.ReferenceCardinality.MANDATORY;
Brian O'Connora468e902015-03-18 16:43:49 -070093import static org.slf4j.LoggerFactory.getLogger;
94
95/**
96 * Application to test sustained intent throughput.
97 */
Ray Milkey88dd7e22018-10-24 10:04:03 -070098@Component(
99 immediate = true,
100 service = IntentPerfInstaller.class,
101 property = {
102 NUM_KEYS + ":Integer=" + NUM_KEYS_DEFAULT,
103 NUM_WORKERS + ":Integer=" + NUM_WORKERS_DEFAULT,
104 CYCLE_PERIOD + ":Integer=" + CYCLE_PERIOD_DEFAULT,
105 NUM_NEIGHBORS + ":Integer=" + NUM_NEIGHBORS_DEFAULT,
106 }
107)
Brian O'Connora468e902015-03-18 16:43:49 -0700108public class IntentPerfInstaller {
109
110 private final Logger log = getLogger(getClass());
111
Brian O'Connora468e902015-03-18 16:43:49 -0700112 private static final int START_DELAY = 5_000; // ms
Thomas Vachuska95aadff2015-03-26 11:45:41 -0700113 private static final int REPORT_PERIOD = 1_000; //ms
Brian O'Connora468e902015-03-18 16:43:49 -0700114
115 private static final String START = "start";
116 private static final String STOP = "stop";
117 private static final MessageSubject CONTROL = new MessageSubject("intent-perf-ctl");
118
119 //FIXME add path length
120
Ray Milkey88dd7e22018-10-24 10:04:03 -0700121 /** Number of keys (i.e. unique intents) to generate per instance. */
122 private int numKeys = NUM_KEYS_DEFAULT;
Brian O'Connora468e902015-03-18 16:43:49 -0700123
Ray Milkey88dd7e22018-10-24 10:04:03 -0700124 /** Number of installer threads per instance. */
125 private int numWorkers = NUM_WORKERS_DEFAULT;
Brian O'Connora468e902015-03-18 16:43:49 -0700126
Ray Milkey88dd7e22018-10-24 10:04:03 -0700127 /** Goal for cycle period (in ms). */
128 private int cyclePeriod = CYCLE_PERIOD_DEFAULT;
Brian O'Connora468e902015-03-18 16:43:49 -0700129
Ray Milkey88dd7e22018-10-24 10:04:03 -0700130 /** Number of neighbors to generate intents for. */
131 private int numNeighbors = NUM_NEIGHBORS_DEFAULT;
Brian O'Connora468e902015-03-18 16:43:49 -0700132
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700133 @Reference(cardinality = MANDATORY)
Brian O'Connora468e902015-03-18 16:43:49 -0700134 protected CoreService coreService;
135
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700136 @Reference(cardinality = MANDATORY)
Brian O'Connora468e902015-03-18 16:43:49 -0700137 protected IntentService intentService;
138
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700139 @Reference(cardinality = MANDATORY)
Brian O'Connora468e902015-03-18 16:43:49 -0700140 protected ClusterService clusterService;
141
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700142 @Reference(cardinality = MANDATORY)
Brian O'Connora468e902015-03-18 16:43:49 -0700143 protected DeviceService deviceService;
144
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700145 @Reference(cardinality = MANDATORY)
Brian O'Connora468e902015-03-18 16:43:49 -0700146 protected MastershipService mastershipService;
147
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700148 @Reference(cardinality = MANDATORY)
Madan Jampani3b8101a2016-09-15 13:22:01 -0700149 protected WorkPartitionService partitionService;
Brian O'Connora468e902015-03-18 16:43:49 -0700150
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700151 @Reference(cardinality = MANDATORY)
Brian O'Connora468e902015-03-18 16:43:49 -0700152 protected ComponentConfigService configService;
153
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700154 @Reference(cardinality = MANDATORY)
Brian O'Connora468e902015-03-18 16:43:49 -0700155 protected IntentPerfCollector sampleCollector;
156
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700157 @Reference(cardinality = MANDATORY)
Brian O'Connora468e902015-03-18 16:43:49 -0700158 protected ClusterCommunicationService communicationService;
159
160 private ExecutorService messageHandlingExecutor;
161
162 private ExecutorService workers;
163 private ApplicationId appId;
164 private Listener listener;
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700165 private boolean stopped = true;
Brian O'Connora468e902015-03-18 16:43:49 -0700166
167 private Timer reportTimer;
168
169 // FIXME this variable isn't shared properly between multiple worker threads
170 private int lastKey = 0;
171
172 private IntentPerfUi perfUi;
173 private NodeId nodeId;
174 private TimerTask reporterTask;
175
176 @Activate
177 public void activate(ComponentContext context) {
178 configService.registerProperties(getClass());
179
180 nodeId = clusterService.getLocalNode().id();
181 appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId.toString());
182
183 // TODO: replace with shared timer
184 reportTimer = new Timer("onos-intent-perf-reporter");
Ray Milkey88dd7e22018-10-24 10:04:03 -0700185 workers = Executors.newFixedThreadPool(numWorkers, groupedThreads("onos/intent-perf", "worker-%d"));
Brian O'Connora468e902015-03-18 16:43:49 -0700186
Brian O'Connora468e902015-03-18 16:43:49 -0700187 // TODO: replace with shared executor
188 messageHandlingExecutor = Executors.newSingleThreadExecutor(
189 groupedThreads("onos/perf", "command-handler"));
190
Sho SHIMIZUc032c832016-01-13 13:02:05 -0800191 communicationService.addSubscriber(CONTROL, String::new, new InternalControl(),
Brian O'Connora468e902015-03-18 16:43:49 -0700192 messageHandlingExecutor);
193
194 listener = new Listener();
195 intentService.addListener(listener);
196
197 // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
198 modify(context);
199 }
200
201 @Deactivate
202 public void deactivate() {
203 stopTestRun();
204
205 configService.unregisterProperties(getClass(), false);
206 messageHandlingExecutor.shutdown();
207 communicationService.removeSubscriber(CONTROL);
208
209 if (listener != null) {
210 reportTimer.cancel();
211 intentService.removeListener(listener);
212 listener = null;
213 reportTimer = null;
214 }
215 }
216
217 @Modified
218 public void modify(ComponentContext context) {
219 if (context == null) {
220 logConfig("Reconfigured");
221 return;
222 }
223
224 Dictionary<?, ?> properties = context.getProperties();
Ray Milkey88dd7e22018-10-24 10:04:03 -0700225 int newNumKeys, newCyclePeriod, newNumNeighbors, newNumWorkers;
Brian O'Connora468e902015-03-18 16:43:49 -0700226 try {
Ray Milkey88dd7e22018-10-24 10:04:03 -0700227 String s = get(properties, NUM_KEYS);
Brian O'Connora468e902015-03-18 16:43:49 -0700228 newNumKeys = isNullOrEmpty(s) ? numKeys : Integer.parseInt(s.trim());
229
Ray Milkey88dd7e22018-10-24 10:04:03 -0700230 s = get(properties, CYCLE_PERIOD);
Brian O'Connora468e902015-03-18 16:43:49 -0700231 newCyclePeriod = isNullOrEmpty(s) ? cyclePeriod : Integer.parseInt(s.trim());
232
Ray Milkey88dd7e22018-10-24 10:04:03 -0700233 s = get(properties, NUM_NEIGHBORS);
Brian O'Connora468e902015-03-18 16:43:49 -0700234 newNumNeighbors = isNullOrEmpty(s) ? numNeighbors : Integer.parseInt(s.trim());
235
Ray Milkey88dd7e22018-10-24 10:04:03 -0700236 s = get(properties, NUM_WORKERS);
237 newNumWorkers = isNullOrEmpty(s) ? numWorkers : Integer.parseInt(s.trim());
238
Brian O'Connora468e902015-03-18 16:43:49 -0700239 } catch (NumberFormatException | ClassCastException e) {
240 log.warn("Malformed configuration detected; using defaults", e);
Ray Milkey88dd7e22018-10-24 10:04:03 -0700241 newNumKeys = NUM_KEYS_DEFAULT;
242 newCyclePeriod = CYCLE_PERIOD_DEFAULT;
243 newNumNeighbors = NUM_NEIGHBORS_DEFAULT;
244 newNumWorkers = NUM_WORKERS_DEFAULT;
Brian O'Connora468e902015-03-18 16:43:49 -0700245 }
246
247 if (newNumKeys != numKeys || newCyclePeriod != cyclePeriod || newNumNeighbors != numNeighbors) {
248 numKeys = newNumKeys;
249 cyclePeriod = newCyclePeriod;
250 numNeighbors = newNumNeighbors;
Ray Milkey88dd7e22018-10-24 10:04:03 -0700251 numWorkers = newNumWorkers;
Brian O'Connora468e902015-03-18 16:43:49 -0700252 logConfig("Reconfigured");
253 }
254 }
255
256 public void start() {
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700257 if (stopped) {
258 stopped = false;
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700259 communicationService.broadcast(START, CONTROL, str -> str.getBytes());
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700260 startTestRun();
261 }
Brian O'Connora468e902015-03-18 16:43:49 -0700262 }
263
264 public void stop() {
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700265 if (!stopped) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700266 communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700267 stopTestRun();
268 }
Brian O'Connora468e902015-03-18 16:43:49 -0700269 }
270
271 private void logConfig(String prefix) {
272 log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
273 prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
274 }
275
276 private void startTestRun() {
277 sampleCollector.clearSamples();
278
279 // adjust numNeighbors and generate list of neighbors
280 numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
281
282 // Schedule reporter task on report period boundary
283 reporterTask = new ReporterTask();
284 reportTimer.scheduleAtFixedRate(reporterTask,
285 REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
286 REPORT_PERIOD);
287
288 // Submit workers
289 stopped = false;
Ray Milkey88dd7e22018-10-24 10:04:03 -0700290 for (int i = 0; i < numWorkers; i++) {
Brian O'Connora468e902015-03-18 16:43:49 -0700291 workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
292 }
293 log.info("Started test run");
294 }
295
296 private void stopTestRun() {
Brian O'Connora468e902015-03-18 16:43:49 -0700297 if (reporterTask != null) {
298 reporterTask.cancel();
299 reporterTask = null;
300 }
301
302 try {
Ray Milkey3717e602018-02-01 13:49:47 -0800303 workers.awaitTermination(5L * cyclePeriod, TimeUnit.MILLISECONDS);
Brian O'Connora468e902015-03-18 16:43:49 -0700304 } catch (InterruptedException e) {
305 log.warn("Failed to stop worker", e);
306 }
Thomas Vachuskab967ebf2015-03-28 15:19:30 -0700307
308 sampleCollector.recordSample(0, 0);
309 sampleCollector.recordSample(0, 0);
310 stopped = true;
311
Brian O'Connora468e902015-03-18 16:43:49 -0700312 log.info("Stopped test run");
313 }
314
315 private List<NodeId> getNeighbors() {
316 List<NodeId> nodes = clusterService.getNodes().stream()
317 .map(ControllerNode::id)
318 .collect(Collectors.toCollection(ArrayList::new));
319 // sort neighbors by id
320 Collections.sort(nodes, (node1, node2) ->
321 node1.toString().compareTo(node2.toString()));
322 // rotate the local node to index 0
323 Collections.rotate(nodes, -1 * nodes.indexOf(clusterService.getLocalNode().id()));
324 log.debug("neighbors (raw): {}", nodes); //TODO remove
325 // generate the sub-list that will contain local node and selected neighbors
326 nodes = nodes.subList(0, numNeighbors + 1);
327 log.debug("neighbors: {}", nodes); //TODO remove
328 return nodes;
329 }
330
331 private Intent createIntent(Key key, long mac, NodeId node, Multimap<NodeId, Device> devices) {
332 // choose a random device for which this node is master
333 List<Device> deviceList = devices.get(node).stream().collect(Collectors.toList());
334 Device device = deviceList.get(RandomUtils.nextInt(deviceList.size()));
335
336 //FIXME we currently ignore the path length and always use the same device
337 TrafficSelector selector = DefaultTrafficSelector.builder()
338 .matchEthDst(MacAddress.valueOf(mac)).build();
339 TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
340 ConnectPoint ingress = new ConnectPoint(device.id(), PortNumber.portNumber(1));
341 ConnectPoint egress = new ConnectPoint(device.id(), PortNumber.portNumber(2));
342
343 return PointToPointIntent.builder()
344 .appId(appId)
345 .key(key)
346 .selector(selector)
347 .treatment(treatment)
Ray Milkeya2cf3a12018-02-15 16:13:56 -0800348 .filteredIngressPoint(new FilteredConnectPoint(ingress))
349 .filteredEgressPoint(new FilteredConnectPoint(egress))
Brian O'Connora468e902015-03-18 16:43:49 -0700350 .build();
351 }
352
353 /**
354 * Creates a specified number of intents for testing purposes.
355 *
356 * @param numberOfKeys number of intents
357 * @param pathLength path depth
358 * @param firstKey first key to attempt
359 * @return set of intents
360 */
361 private Set<Intent> createIntents(int numberOfKeys, int pathLength, int firstKey) {
362 List<NodeId> neighbors = getNeighbors();
363
364 Multimap<NodeId, Device> devices = ArrayListMultimap.create();
365 deviceService.getAvailableDevices()
366 .forEach(device -> devices.put(mastershipService.getMasterFor(device.id()), device));
367
368 // ensure that we have at least one device per neighbor
Jon Hallcbd1b392017-01-18 20:15:44 -0800369 neighbors.forEach(node -> checkState(!devices.get(node).isEmpty(),
Brian O'Connora468e902015-03-18 16:43:49 -0700370 "There are no devices for {}", node));
371
372 // TODO pull this outside so that createIntent can use it
373 // prefix based on node id for keys generated on this instance
374 long keyPrefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;
375
376 int maxKeysPerNode = (int) Math.ceil((double) numberOfKeys / neighbors.size());
377 Multimap<NodeId, Intent> intents = ArrayListMultimap.create();
378
379 for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
380 Key key = Key.of(keyPrefix + k, appId);
381
Madan Jampani3b8101a2016-09-15 13:22:01 -0700382 NodeId leader = partitionService.getLeader(key, Key::hash);
Brian O'Connora468e902015-03-18 16:43:49 -0700383 if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) {
384 // Bail if we are not sending to this node or we have enough for this node
385 continue;
386 }
387 intents.put(leader, createIntent(key, keyPrefix + k, leader, devices));
388
389 // Bump up the counter and remember this as the last key used.
390 count++;
391 lastKey = k;
392 if (count % 1000 == 0) {
393 log.info("Building intents... {} (attempt: {})", count, lastKey);
394 }
395 }
396 checkState(intents.values().size() == numberOfKeys,
397 "Generated wrong number of intents");
398 log.info("Created {} intents", numberOfKeys);
399 intents.keySet().forEach(node -> log.info("\t{}\t{}", node, intents.get(node).size()));
400
401 return Sets.newHashSet(intents.values());
402 }
403
Yi Tseng356d1252017-05-25 16:01:58 -0700404 final Set<Intent> submitted = Sets.newConcurrentHashSet();
405 final Set<Intent> withdrawn = Sets.newConcurrentHashSet();
406
Brian O'Connora468e902015-03-18 16:43:49 -0700407 // Submits intent operations.
408 final class Submitter implements Runnable {
409
410 private long lastDuration;
411 private int lastCount;
412
413 private Set<Intent> intents = Sets.newHashSet();
Yi Tseng356d1252017-05-25 16:01:58 -0700414
Brian O'Connora468e902015-03-18 16:43:49 -0700415
416 private Submitter(Set<Intent> intents) {
417 this.intents = intents;
418 lastCount = numKeys / 4;
419 lastDuration = 1_000; // 1 second
420 }
421
422 @Override
423 public void run() {
424 prime();
425 while (!stopped) {
426 try {
427 cycle();
428 } catch (Exception e) {
429 log.warn("Exception during cycle", e);
430 }
431 }
432 clear();
433 }
434
435 private Iterable<Intent> subset(Set<Intent> intents) {
436 List<Intent> subset = Lists.newArrayList(intents);
437 Collections.shuffle(subset);
Yi Tseng356d1252017-05-25 16:01:58 -0700438 return subset.subList(0, Math.min(subset.size(), lastCount));
Brian O'Connora468e902015-03-18 16:43:49 -0700439 }
440
441 // Submits the specified intent.
442 private void submit(Intent intent) {
443 intentService.submit(intent);
Brian O'Connora468e902015-03-18 16:43:49 -0700444 withdrawn.remove(intent); //TODO could check result here...
445 }
446
447 // Withdraws the specified intent.
448 private void withdraw(Intent intent) {
449 intentService.withdraw(intent);
Brian O'Connora468e902015-03-18 16:43:49 -0700450 submitted.remove(intent); //TODO could check result here...
451 }
452
453 // Primes the cycle.
454 private void prime() {
455 int i = 0;
456 withdrawn.addAll(intents);
457 for (Intent intent : intents) {
458 submit(intent);
459 // only submit half of the intents to start
460 if (i++ >= intents.size() / 2) {
461 break;
462 }
463 }
464 }
465
466 private void clear() {
467 submitted.forEach(this::withdraw);
468 }
469
470 // Runs a single operation cycle.
471 private void cycle() {
472 //TODO consider running without rate adjustment
473 adjustRates();
474
475 long start = currentTimeMillis();
476 subset(submitted).forEach(this::withdraw);
477 subset(withdrawn).forEach(this::submit);
478 long delta = currentTimeMillis() - start;
479
480 if (delta > cyclePeriod * 3 || delta < 0) {
481 log.warn("Cycle took {} ms", delta);
482 }
483
484 int difference = cyclePeriod - (int) delta;
485 if (difference > 0) {
486 delay(difference);
487 }
488
489 lastDuration = delta;
490 }
491
492 int cycleCount = 0;
493
494 private void adjustRates() {
495
496 int addDelta = Math.max(1000 - cycleCount, 10);
497 double multRatio = Math.min(0.8 + cycleCount * 0.0002, 0.995);
498
499 //FIXME need to iron out the rate adjustment
500 //FIXME we should taper the adjustments over time
501 //FIXME don't just use the lastDuration, take an average
502 if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
503 if (listener.requestThroughput() - listener.processedThroughput() <= 2000 && //was 500
504 lastDuration <= cyclePeriod) {
505 lastCount = Math.min(lastCount + addDelta, intents.size() / 2);
506 } else {
507 lastCount *= multRatio;
508 }
509 log.info("last count: {}, last duration: {} ms (sub: {} vs inst: {})",
510 lastCount, lastDuration, listener.requestThroughput(), listener.processedThroughput());
511 }
512
513 }
514 }
515
516 // Event listener to monitor throughput.
517 final class Listener implements IntentListener {
518
519 private final Counter runningTotal = new Counter();
520 private volatile Map<IntentEvent.Type, Counter> counters;
521
522 private volatile double processedThroughput = 0;
523 private volatile double requestThroughput = 0;
524
525 public Listener() {
526 counters = initCounters();
527 }
528
529 private Map<IntentEvent.Type, Counter> initCounters() {
530 Map<IntentEvent.Type, Counter> map = Maps.newHashMap();
531 for (IntentEvent.Type type : IntentEvent.Type.values()) {
532 map.put(type, new Counter());
533 }
534 return map;
535 }
536
537 public double processedThroughput() {
538 return processedThroughput;
539 }
540
541 public double requestThroughput() {
542 return requestThroughput;
543 }
544
545 @Override
546 public void event(IntentEvent event) {
547 if (event.subject().appId().equals(appId)) {
Yi Tseng356d1252017-05-25 16:01:58 -0700548 if (event.type() == INSTALLED) {
549 submitted.add(event.subject());
550 }
551 if (event.type() == WITHDRAWN) {
552 withdrawn.add(event.subject());
553 }
Brian O'Connora468e902015-03-18 16:43:49 -0700554 counters.get(event.type()).add(1);
555 }
556 }
557
558 public void report() {
559 Map<IntentEvent.Type, Counter> reportCounters = counters;
560 counters = initCounters();
561
562 // update running total and latest throughput
563 Counter installed = reportCounters.get(INSTALLED);
564 Counter withdrawn = reportCounters.get(WITHDRAWN);
565 processedThroughput = installed.throughput() + withdrawn.throughput();
566 runningTotal.add(installed.total() + withdrawn.total());
567
568 Counter installReq = reportCounters.get(INSTALL_REQ);
569 Counter withdrawReq = reportCounters.get(WITHDRAW_REQ);
570 requestThroughput = installReq.throughput() + withdrawReq.throughput();
571
572 // build the string to report
573 StringBuilder stringBuilder = new StringBuilder();
574 for (IntentEvent.Type type : IntentEvent.Type.values()) {
575 Counter counter = reportCounters.get(type);
576 stringBuilder.append(format("%s=%.2f;", type, counter.throughput()));
577 }
578 log.info("Throughput: OVERALL={}; CURRENT={}; {}",
579 format("%.2f", runningTotal.throughput()),
580 format("%.2f", processedThroughput),
581 stringBuilder);
582
583 sampleCollector.recordSample(runningTotal.throughput(),
584 processedThroughput);
585 }
586 }
587
Sho SHIMIZUc032c832016-01-13 13:02:05 -0800588 private class InternalControl implements Consumer<String> {
Brian O'Connora468e902015-03-18 16:43:49 -0700589 @Override
Sho SHIMIZUc032c832016-01-13 13:02:05 -0800590 public void accept(String cmd) {
Brian O'Connora468e902015-03-18 16:43:49 -0700591 log.info("Received command {}", cmd);
592 if (cmd.equals(START)) {
593 startTestRun();
594 } else {
595 stopTestRun();
596 }
597 }
598 }
599
600 private class ReporterTask extends TimerTask {
601 @Override
602 public void run() {
603 //adjustRates(); // FIXME we currently adjust rates in the cycle thread
604 listener.report();
605 }
606 }
607
608}