blob: ff675c1f8ae40fc3d3455b55d5e65e9b3a25fca2 [file] [log] [blame]
Brian O'Connor6ccba962015-02-17 18:16:02 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.intentperf;
17
Brian O'Connor87ba7a72015-03-11 14:40:09 -070018import com.google.common.collect.ArrayListMultimap;
Brian O'Connor6ccba962015-02-17 18:16:02 -080019import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
Brian O'Connor87ba7a72015-03-11 14:40:09 -070021import com.google.common.collect.Multimap;
Brian O'Connor6ccba962015-02-17 18:16:02 -080022import com.google.common.collect.Sets;
Brian O'Connor87ba7a72015-03-11 14:40:09 -070023import org.apache.commons.lang.math.RandomUtils;
Brian O'Connor6ccba962015-02-17 18:16:02 -080024import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
Thomas Vachuskae63e3d42015-03-17 12:52:50 -070027import org.apache.felix.scr.annotations.Modified;
Brian O'Connor87ba7a72015-03-11 14:40:09 -070028import org.apache.felix.scr.annotations.Property;
Brian O'Connor6ccba962015-02-17 18:16:02 -080029import org.apache.felix.scr.annotations.Reference;
Thomas Vachuskae63e3d42015-03-17 12:52:50 -070030import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
Brian O'Connorb9a91c12015-03-10 11:19:50 -070032import org.onlab.packet.MacAddress;
Brian O'Connor6ccba962015-02-17 18:16:02 -080033import org.onlab.util.Counter;
Brian O'Connor87ba7a72015-03-11 14:40:09 -070034import org.onosproject.cfg.ComponentConfigService;
Brian O'Connor6ccba962015-02-17 18:16:02 -080035import org.onosproject.cluster.ClusterService;
Brian O'Connor87ba7a72015-03-11 14:40:09 -070036import org.onosproject.cluster.ControllerNode;
37import org.onosproject.cluster.NodeId;
Brian O'Connor6ccba962015-02-17 18:16:02 -080038import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
Brian O'Connor87ba7a72015-03-11 14:40:09 -070040import org.onosproject.mastership.MastershipService;
Brian O'Connor6ccba962015-02-17 18:16:02 -080041import org.onosproject.net.ConnectPoint;
42import org.onosproject.net.Device;
43import org.onosproject.net.PortNumber;
44import org.onosproject.net.device.DeviceService;
45import org.onosproject.net.flow.DefaultTrafficSelector;
46import org.onosproject.net.flow.DefaultTrafficTreatment;
47import org.onosproject.net.flow.TrafficSelector;
48import org.onosproject.net.flow.TrafficTreatment;
49import org.onosproject.net.intent.Intent;
50import org.onosproject.net.intent.IntentEvent;
51import org.onosproject.net.intent.IntentListener;
52import org.onosproject.net.intent.IntentService;
53import org.onosproject.net.intent.Key;
Brian O'Connor87ba7a72015-03-11 14:40:09 -070054import org.onosproject.net.intent.PartitionService;
Brian O'Connor6ccba962015-02-17 18:16:02 -080055import org.onosproject.net.intent.PointToPointIntent;
Thomas Vachuskae63e3d42015-03-17 12:52:50 -070056import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
57import org.onosproject.store.cluster.messaging.ClusterMessage;
58import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
59import org.onosproject.store.cluster.messaging.MessageSubject;
60import org.osgi.service.component.ComponentContext;
Brian O'Connor6ccba962015-02-17 18:16:02 -080061import org.slf4j.Logger;
62
Brian O'Connor87ba7a72015-03-11 14:40:09 -070063import java.util.ArrayList;
Brian O'Connor6ccba962015-02-17 18:16:02 -080064import java.util.Collections;
Thomas Vachuskae63e3d42015-03-17 12:52:50 -070065import java.util.Dictionary;
Brian O'Connor6ccba962015-02-17 18:16:02 -080066import java.util.List;
67import java.util.Map;
68import java.util.Set;
69import java.util.Timer;
70import java.util.TimerTask;
71import java.util.concurrent.ExecutorService;
72import java.util.concurrent.Executors;
73import java.util.concurrent.TimeUnit;
Brian O'Connor87ba7a72015-03-11 14:40:09 -070074import java.util.stream.Collectors;
Brian O'Connor6ccba962015-02-17 18:16:02 -080075
Brian O'Connorbcfeadb2015-02-19 21:50:01 -080076import static com.google.common.base.Preconditions.checkState;
Thomas Vachuskae63e3d42015-03-17 12:52:50 -070077import static com.google.common.base.Strings.isNullOrEmpty;
Thomas Vachuska0249b532015-02-20 16:46:18 -080078import static java.lang.String.format;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080079import static java.lang.System.currentTimeMillis;
Brian O'Connorbcfeadb2015-02-19 21:50:01 -080080import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
Thomas Vachuskae63e3d42015-03-17 12:52:50 -070081import static org.onlab.util.Tools.*;
Brian O'Connor36ef71a2015-02-24 12:05:01 -080082import static org.onosproject.net.intent.IntentEvent.Type.*;
Brian O'Connor6ccba962015-02-17 18:16:02 -080083import static org.slf4j.LoggerFactory.getLogger;
84
85/**
Brian O'Connor36ef71a2015-02-24 12:05:01 -080086 * Application to test sustained intent throughput.
Brian O'Connor6ccba962015-02-17 18:16:02 -080087 */
88@Component(immediate = true)
Thomas Vachuskae63e3d42015-03-17 12:52:50 -070089@Service(value = IntentPerfInstaller.class)
Brian O'Connor6ccba962015-02-17 18:16:02 -080090public class IntentPerfInstaller {
91
92 private final Logger log = getLogger(getClass());
93
Brian O'Connor87ba7a72015-03-11 14:40:09 -070094 private static final int DEFAULT_NUM_WORKERS = 1;
95
Thomas Vachuskae63e3d42015-03-17 12:52:50 -070096 private static final int DEFAULT_NUM_KEYS = 40000;
97 private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1000; //ms
Brian O'Connor87ba7a72015-03-11 14:40:09 -070098
99 private static final int DEFAULT_NUM_NEIGHBORS = 0;
100
101 private static final int START_DELAY = 5_000; // ms
102 private static final int REPORT_PERIOD = 5_000; //ms
103
Thomas Vachuskae63e3d42015-03-17 12:52:50 -0700104 private static final String START = "start";
105 private static final String STOP = "stop";
106 private static final MessageSubject CONTROL = new MessageSubject("intent-perf-ctl");
107
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700108 //FIXME add path length
109
110 @Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
Thomas Vachuska7648d662015-03-16 11:58:30 -0700111 label = "Number of keys (i.e. unique intents) to generate per instance")
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700112 private int numKeys = DEFAULT_NUM_KEYS;
113
114 //TODO implement numWorkers property
115// @Property(name = "numThreads", intValue = DEFAULT_NUM_WORKERS,
116// label = "Number of installer threads per instance")
117// private int numWokers = DEFAULT_NUM_WORKERS;
118
119 @Property(name = "cyclePeriod", intValue = DEFAULT_GOAL_CYCLE_PERIOD,
Thomas Vachuska7648d662015-03-16 11:58:30 -0700120 label = "Goal for cycle period (in ms)")
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700121 private int cyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
122
123 @Property(name = "numNeighbors", intValue = DEFAULT_NUM_NEIGHBORS,
Thomas Vachuska7648d662015-03-16 11:58:30 -0700124 label = "Number of neighbors to generate intents for")
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700125 private int numNeighbors = DEFAULT_NUM_NEIGHBORS;
126
Brian O'Connorbcfeadb2015-02-19 21:50:01 -0800127 @Reference(cardinality = MANDATORY_UNARY)
Brian O'Connor6ccba962015-02-17 18:16:02 -0800128 protected CoreService coreService;
129
Brian O'Connorbcfeadb2015-02-19 21:50:01 -0800130 @Reference(cardinality = MANDATORY_UNARY)
Brian O'Connor6ccba962015-02-17 18:16:02 -0800131 protected IntentService intentService;
132
Brian O'Connorbcfeadb2015-02-19 21:50:01 -0800133 @Reference(cardinality = MANDATORY_UNARY)
Brian O'Connor6ccba962015-02-17 18:16:02 -0800134 protected ClusterService clusterService;
135
Brian O'Connorbcfeadb2015-02-19 21:50:01 -0800136 @Reference(cardinality = MANDATORY_UNARY)
Brian O'Connor6ccba962015-02-17 18:16:02 -0800137 protected DeviceService deviceService;
138
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700139 @Reference(cardinality = MANDATORY_UNARY)
140 protected MastershipService mastershipService;
141
142 @Reference(cardinality = MANDATORY_UNARY)
143 protected PartitionService partitionService;
144
145 @Reference(cardinality = MANDATORY_UNARY)
146 protected ComponentConfigService configService;
147
Thomas Vachuska7648d662015-03-16 11:58:30 -0700148 @Reference(cardinality = MANDATORY_UNARY)
149 protected IntentPerfCollector sampleCollector;
150
Thomas Vachuskae63e3d42015-03-17 12:52:50 -0700151 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
152 protected ClusterCommunicationService communicationService;
153
154 private ExecutorService messageHandlingExecutor;
155
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800156 private ExecutorService workers;
Brian O'Connor6ccba962015-02-17 18:16:02 -0800157 private ApplicationId appId;
158 private Listener listener;
Brian O'Connor6ccba962015-02-17 18:16:02 -0800159 private boolean stopped;
160
Brian O'Connor6ccba962015-02-17 18:16:02 -0800161 private Timer reportTimer;
162
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800163 // FIXME this variable isn't shared properly between multiple worker threads
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800164 private int lastKey = 0;
Brian O'Connor6ccba962015-02-17 18:16:02 -0800165
Thomas Vachuska7648d662015-03-16 11:58:30 -0700166 private IntentPerfUi perfUi;
Thomas Vachuskae63e3d42015-03-17 12:52:50 -0700167 private NodeId nodeId;
168 private TimerTask reporterTask;
Thomas Vachuska7648d662015-03-16 11:58:30 -0700169
Brian O'Connor6ccba962015-02-17 18:16:02 -0800170 @Activate
Thomas Vachuskae63e3d42015-03-17 12:52:50 -0700171 public void activate(ComponentContext context) {
172 configService.registerProperties(getClass());
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700173
Thomas Vachuskae63e3d42015-03-17 12:52:50 -0700174 nodeId = clusterService.getLocalNode().id();
175 appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId.toString());
Brian O'Connor6ccba962015-02-17 18:16:02 -0800176
Thomas Vachuskae63e3d42015-03-17 12:52:50 -0700177 // TODO: replace with shared timer
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800178 reportTimer = new Timer("onos-intent-perf-reporter");
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700179 workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
180
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700181 // disable flow backups for testing
182 log.info("flow props: {}",
183 configService.getProperties("org.onosproject.store.flow.impl.DistributedFlowRuleStore"));
184 configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore",
185 "backupEnabled", "false");
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800186
Thomas Vachuskae63e3d42015-03-17 12:52:50 -0700187 // TODO: replace with shared executor
188 messageHandlingExecutor = Executors.newSingleThreadExecutor(
189 groupedThreads("onos/perf", "command-handler"));
190
191 communicationService.addSubscriber(CONTROL, new InternalControl(),
192 messageHandlingExecutor);
193
194 listener = new Listener();
195 intentService.addListener(listener);
196
197 // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
198 modify(context);
Brian O'Connor6ccba962015-02-17 18:16:02 -0800199 }
200
201 @Deactivate
202 public void deactivate() {
Thomas Vachuskae63e3d42015-03-17 12:52:50 -0700203 stopTestRun();
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700204
Thomas Vachuskae63e3d42015-03-17 12:52:50 -0700205 configService.unregisterProperties(getClass(), false);
206 messageHandlingExecutor.shutdown();
207 communicationService.removeSubscriber(CONTROL);
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700208
Brian O'Connor6ccba962015-02-17 18:16:02 -0800209 if (listener != null) {
210 reportTimer.cancel();
211 intentService.removeListener(listener);
212 listener = null;
213 reportTimer = null;
214 }
Thomas Vachuskae63e3d42015-03-17 12:52:50 -0700215 }
216
217 @Modified
218 public void modify(ComponentContext context) {
219 if (context == null) {
220 logConfig("Reconfigured");
221 return;
222 }
223
224 Dictionary<?, ?> properties = context.getProperties();
225 int newNumKeys, newCyclePeriod, newNumNeighbors;
Brian O'Connor6ccba962015-02-17 18:16:02 -0800226 try {
Thomas Vachuskae63e3d42015-03-17 12:52:50 -0700227 String s = get(properties, "numKeys");
228 newNumKeys = isNullOrEmpty(s) ? numKeys : Integer.parseInt(s.trim());
229
230 s = get(properties, "cyclePeriod");
231 newCyclePeriod = isNullOrEmpty(s) ? cyclePeriod : Integer.parseInt(s.trim());
232
233 s = get(properties, "numNeighbors");
234 newNumNeighbors = isNullOrEmpty(s) ? numNeighbors : Integer.parseInt(s.trim());
235
236 } catch (NumberFormatException | ClassCastException e) {
237 log.warn("Malformed configuration detected; using defaults", e);
238 newNumKeys = DEFAULT_NUM_KEYS;
239 newCyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
240 newNumNeighbors = DEFAULT_NUM_NEIGHBORS;
241 }
242
243 if (newNumKeys != numKeys || newCyclePeriod != cyclePeriod || newNumNeighbors != numNeighbors) {
244 numKeys = newNumKeys;
245 cyclePeriod = newCyclePeriod;
246 numNeighbors = newNumNeighbors;
247 logConfig("Reconfigured");
248 }
249 }
250
251 public void start() {
252 communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, START.getBytes()));
253 startTestRun();
254 }
255
256 public void stop() {
257 communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, STOP.getBytes()));
258 stopTestRun();
259 }
260
261 private void logConfig(String prefix) {
262 log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
263 prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
264 }
265
266 private void startTestRun() {
267 sampleCollector.clearSamples();
268
269 // adjust numNeighbors and generate list of neighbors
270 numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
271
272 // Schedule reporter task on report period boundary
273 reporterTask = new ReporterTask();
274 reportTimer.scheduleAtFixedRate(reporterTask,
275 REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
276 REPORT_PERIOD);
277
278 // Submit workers
279 stopped = false;
280 for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
281 workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
282 }
283 log.info("Started test run");
284 }
285
286 private void stopTestRun() {
287 stopped = true;
288 if (reporterTask != null) {
289 reporterTask.cancel();
290 reporterTask = null;
291 }
292
293 try {
294 workers.awaitTermination(5 * cyclePeriod, TimeUnit.MILLISECONDS);
Brian O'Connor6ccba962015-02-17 18:16:02 -0800295 } catch (InterruptedException e) {
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800296 log.warn("Failed to stop worker", e);
Brian O'Connor6ccba962015-02-17 18:16:02 -0800297 }
Thomas Vachuskae63e3d42015-03-17 12:52:50 -0700298 log.info("Stopped test run");
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700299 }
300
301 private List<NodeId> getNeighbors() {
302 List<NodeId> nodes = clusterService.getNodes().stream()
303 .map(ControllerNode::id)
304 .collect(Collectors.toCollection(ArrayList::new));
305 // sort neighbors by id
306 Collections.sort(nodes, (node1, node2) ->
307 node1.toString().compareTo(node2.toString()));
308 // rotate the local node to index 0
309 Collections.rotate(nodes, -1 * nodes.indexOf(clusterService.getLocalNode().id()));
Brian O'Connor4964d3d2015-03-12 20:38:10 -0700310 log.debug("neighbors (raw): {}", nodes); //TODO remove
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700311 // generate the sub-list that will contain local node and selected neighbors
312 nodes = nodes.subList(0, numNeighbors + 1);
Brian O'Connor4964d3d2015-03-12 20:38:10 -0700313 log.debug("neighbors: {}", nodes); //TODO remove
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700314 return nodes;
315 }
316
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700317 private Intent createIntent(Key key, long mac, NodeId node, Multimap<NodeId, Device> devices) {
318 // choose a random device for which this node is master
319 List<Device> deviceList = devices.get(node).stream().collect(Collectors.toList());
320 Device device = deviceList.get(RandomUtils.nextInt(deviceList.size()));
321
322 //FIXME we currently ignore the path length and always use the same device
323 TrafficSelector selector = DefaultTrafficSelector.builder()
324 .matchEthDst(MacAddress.valueOf(mac)).build();
325 TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
326 ConnectPoint ingress = new ConnectPoint(device.id(), PortNumber.portNumber(1));
327 ConnectPoint egress = new ConnectPoint(device.id(), PortNumber.portNumber(2));
328
329 return new PointToPointIntent(appId, key,
330 selector, treatment,
331 ingress, egress,
332 Collections.emptyList(),
333 Intent.DEFAULT_INTENT_PRIORITY);
Brian O'Connor6ccba962015-02-17 18:16:02 -0800334 }
335
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800336 /**
337 * Creates a specified number of intents for testing purposes.
338 *
339 * @param numberOfKeys number of intents
340 * @param pathLength path depth
341 * @param firstKey first key to attempt
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700342 * @return set of intents
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800343 */
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700344 private Set<Intent> createIntents(int numberOfKeys, int pathLength, int firstKey) {
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700345 List<NodeId> neighbors = getNeighbors();
Brian O'Connor6ccba962015-02-17 18:16:02 -0800346
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700347 Multimap<NodeId, Device> devices = ArrayListMultimap.create();
Thomas Vachuska7648d662015-03-16 11:58:30 -0700348 deviceService.getAvailableDevices()
349 .forEach(device -> devices.put(mastershipService.getMasterFor(device.id()), device));
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700350
351 // ensure that we have at least one device per neighbor
Thomas Vachuska7648d662015-03-16 11:58:30 -0700352 neighbors.forEach(node -> checkState(devices.get(node).size() > 0,
353 "There are no devices for {}", node));
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700354
355 // TODO pull this outside so that createIntent can use it
356 // prefix based on node id for keys generated on this instance
357 long keyPrefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;
358
359 int maxKeysPerNode = (int) Math.ceil((double) numberOfKeys / neighbors.size());
360 Multimap<NodeId, Intent> intents = ArrayListMultimap.create();
361
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800362 for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700363 Key key = Key.of(keyPrefix + k, appId);
364
365 NodeId leader = partitionService.getLeader(key);
366 if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) {
367 // Bail if we are not sending to this node or we have enough for this node
Brian O'Connorbcfeadb2015-02-19 21:50:01 -0800368 continue;
369 }
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700370 intents.put(leader, createIntent(key, keyPrefix + k, leader, devices));
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800371
372 // Bump up the counter and remember this as the last key used.
373 count++;
374 lastKey = k;
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700375 if (count % 1000 == 0) {
376 log.info("Building intents... {} (attempt: {})", count, lastKey);
Brian O'Connorbcfeadb2015-02-19 21:50:01 -0800377 }
Brian O'Connor6ccba962015-02-17 18:16:02 -0800378 }
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700379 checkState(intents.values().size() == numberOfKeys,
380 "Generated wrong number of intents");
Brian O'Connorbcfeadb2015-02-19 21:50:01 -0800381 log.info("Created {} intents", numberOfKeys);
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700382 intents.keySet().forEach(node -> log.info("\t{}\t{}", node, intents.get(node).size()));
383
384 return Sets.newHashSet(intents.values());
Brian O'Connor6ccba962015-02-17 18:16:02 -0800385 }
386
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800387 // Submits intent operations.
388 final class Submitter implements Runnable {
389
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800390 private long lastDuration;
391 private int lastCount;
392
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800393 private Set<Intent> intents = Sets.newHashSet();
394 private Set<Intent> submitted = Sets.newHashSet();
395 private Set<Intent> withdrawn = Sets.newHashSet();
396
397 private Submitter(Set<Intent> intents) {
398 this.intents = intents;
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700399 lastCount = numKeys / 4;
400 lastDuration = 1_000; // 1 second
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800401 }
402
403 @Override
404 public void run() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800405 prime();
406 while (!stopped) {
Brian O'Connor4964d3d2015-03-12 20:38:10 -0700407 try {
408 cycle();
409 } catch (Exception e) {
410 log.warn("Exception during cycle", e);
411 }
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800412 }
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700413 clear();
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800414 }
415
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800416 private Iterable<Intent> subset(Set<Intent> intents) {
417 List<Intent> subset = Lists.newArrayList(intents);
418 Collections.shuffle(subset);
419 return subset.subList(0, lastCount);
420 }
421
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800422 // Submits the specified intent.
423 private void submit(Intent intent) {
424 intentService.submit(intent);
425 submitted.add(intent);
426 withdrawn.remove(intent); //TODO could check result here...
427 }
428
429 // Withdraws the specified intent.
430 private void withdraw(Intent intent) {
431 intentService.withdraw(intent);
432 withdrawn.add(intent);
433 submitted.remove(intent); //TODO could check result here...
434 }
435
436 // Primes the cycle.
437 private void prime() {
438 int i = 0;
439 withdrawn.addAll(intents);
440 for (Intent intent : intents) {
441 submit(intent);
442 // only submit half of the intents to start
443 if (i++ >= intents.size() / 2) {
444 break;
445 }
446 }
447 }
448
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700449 private void clear() {
450 submitted.forEach(this::withdraw);
451 }
452
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800453 // Runs a single operation cycle.
454 private void cycle() {
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800455 //TODO consider running without rate adjustment
456 adjustRates();
457
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800458 long start = currentTimeMillis();
459 subset(submitted).forEach(this::withdraw);
460 subset(withdrawn).forEach(this::submit);
461 long delta = currentTimeMillis() - start;
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800462
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700463 if (delta > cyclePeriod * 3 || delta < 0) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800464 log.warn("Cycle took {} ms", delta);
Brian O'Connor6ccba962015-02-17 18:16:02 -0800465 }
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800466
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700467 int difference = cyclePeriod - (int) delta;
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800468 if (difference > 0) {
469 delay(difference);
470 }
471
472 lastDuration = delta;
473 }
474
475 int cycleCount = 0;
Thomas Vachuska7648d662015-03-16 11:58:30 -0700476
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800477 private void adjustRates() {
Brian O'Connor4964d3d2015-03-12 20:38:10 -0700478
479 int addDelta = Math.max(1000 - cycleCount, 10);
480 double multRatio = Math.min(0.8 + cycleCount * 0.0002, 0.995);
481
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800482 //FIXME need to iron out the rate adjustment
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700483 //FIXME we should taper the adjustments over time
Brian O'Connor4964d3d2015-03-12 20:38:10 -0700484 //FIXME don't just use the lastDuration, take an average
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800485 if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
Brian O'Connor375573b2015-03-06 01:09:43 -0800486 if (listener.requestThroughput() - listener.processedThroughput() <= 2000 && //was 500
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700487 lastDuration <= cyclePeriod) {
Brian O'Connor4964d3d2015-03-12 20:38:10 -0700488 lastCount = Math.min(lastCount + addDelta, intents.size() / 2);
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800489 } else {
Brian O'Connor4964d3d2015-03-12 20:38:10 -0700490 lastCount *= multRatio;
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800491 }
492 log.info("last count: {}, last duration: {} ms (sub: {} vs inst: {})",
493 lastCount, lastDuration, listener.requestThroughput(), listener.processedThroughput());
494 }
495
Brian O'Connor6ccba962015-02-17 18:16:02 -0800496 }
497 }
498
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800499 // Event listener to monitor throughput.
500 final class Listener implements IntentListener {
Brian O'Connor6ccba962015-02-17 18:16:02 -0800501
Thomas Vachuska0249b532015-02-20 16:46:18 -0800502 private final Counter runningTotal = new Counter();
Brian O'Connor87ba7a72015-03-11 14:40:09 -0700503 private volatile Map<IntentEvent.Type, Counter> counters;
Brian O'Connor6ccba962015-02-17 18:16:02 -0800504
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800505 private volatile double processedThroughput = 0;
506 private volatile double requestThroughput = 0;
507
Brian O'Connor6ccba962015-02-17 18:16:02 -0800508 public Listener() {
509 counters = initCounters();
Brian O'Connor6ccba962015-02-17 18:16:02 -0800510 }
511
512 private Map<IntentEvent.Type, Counter> initCounters() {
513 Map<IntentEvent.Type, Counter> map = Maps.newHashMap();
514 for (IntentEvent.Type type : IntentEvent.Type.values()) {
515 map.put(type, new Counter());
516 }
517 return map;
518 }
519
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800520 public double processedThroughput() {
521 return processedThroughput;
522 }
523
524 public double requestThroughput() {
525 return requestThroughput;
526 }
527
Brian O'Connor6ccba962015-02-17 18:16:02 -0800528 @Override
529 public void event(IntentEvent event) {
530 if (event.subject().appId().equals(appId)) {
531 counters.get(event.type()).add(1);
532 }
533 }
534
535 public void report() {
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800536 Map<IntentEvent.Type, Counter> reportCounters = counters;
537 counters = initCounters();
538
539 // update running total and latest throughput
540 Counter installed = reportCounters.get(INSTALLED);
541 Counter withdrawn = reportCounters.get(WITHDRAWN);
542 processedThroughput = installed.throughput() + withdrawn.throughput();
Thomas Vachuska0249b532015-02-20 16:46:18 -0800543 runningTotal.add(installed.total() + withdrawn.total());
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800544
545 Counter installReq = reportCounters.get(INSTALL_REQ);
546 Counter withdrawReq = reportCounters.get(WITHDRAW_REQ);
547 requestThroughput = installReq.throughput() + withdrawReq.throughput();
548
549 // build the string to report
550 StringBuilder stringBuilder = new StringBuilder();
Brian O'Connor6ccba962015-02-17 18:16:02 -0800551 for (IntentEvent.Type type : IntentEvent.Type.values()) {
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800552 Counter counter = reportCounters.get(type);
553 stringBuilder.append(format("%s=%.2f;", type, counter.throughput()));
Brian O'Connor6ccba962015-02-17 18:16:02 -0800554 }
Thomas Vachuska0249b532015-02-20 16:46:18 -0800555 log.info("Throughput: OVERALL={}; CURRENT={}; {}",
556 format("%.2f", runningTotal.throughput()),
Brian O'Connor36ef71a2015-02-24 12:05:01 -0800557 format("%.2f", processedThroughput),
558 stringBuilder);
Thomas Vachuska7648d662015-03-16 11:58:30 -0700559
560 sampleCollector.recordSample(runningTotal.throughput(),
561 processedThroughput);
Brian O'Connor6ccba962015-02-17 18:16:02 -0800562 }
563 }
Thomas Vachuska7648d662015-03-16 11:58:30 -0700564
Thomas Vachuskae63e3d42015-03-17 12:52:50 -0700565 private class InternalControl implements ClusterMessageHandler {
566 @Override
567 public void handle(ClusterMessage message) {
568 String cmd = new String(message.payload());
569 log.info("Received command {}", cmd);
570 if (cmd.equals(START)) {
571 startTestRun();
572 } else {
573 stopTestRun();
574 }
575 }
576 }
577
578 private class ReporterTask extends TimerTask {
579 @Override
580 public void run() {
581 //adjustRates(); // FIXME we currently adjust rates in the cycle thread
582 listener.report();
583 }
584 }
585
Brian O'Connor6ccba962015-02-17 18:16:02 -0800586}