blob: 9748907697b799d2708a3e67c73a56542a1cd82b [file] [log] [blame]
Madan Jampanic23b6262016-04-07 15:57:22 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampanic23b6262016-04-07 15:57:22 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.flowperf;
17
18import static com.google.common.base.Strings.isNullOrEmpty;
19import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
20import static org.onlab.util.Tools.get;
21import static org.slf4j.LoggerFactory.getLogger;
22
23import java.util.Dictionary;
24import java.util.Iterator;
25import java.util.List;
26import java.util.concurrent.CountDownLatch;
27import java.util.concurrent.ExecutorService;
28import java.util.concurrent.Executors;
29import java.util.concurrent.atomic.AtomicInteger;
30import java.util.concurrent.atomic.AtomicLong;
31
32import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
34import org.apache.felix.scr.annotations.Deactivate;
35import org.apache.felix.scr.annotations.Modified;
36import org.apache.felix.scr.annotations.Property;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.apache.felix.scr.annotations.Service;
40import org.onlab.packet.MacAddress;
41import org.onlab.util.Tools;
42import org.onosproject.cfg.ComponentConfigService;
43import org.onosproject.core.ApplicationId;
44import org.onosproject.core.CoreService;
45import org.onosproject.net.Device;
46import org.onosproject.net.PortNumber;
47import org.onosproject.net.device.DeviceService;
48import org.onosproject.net.flow.DefaultFlowRule;
49import org.onosproject.net.flow.DefaultTrafficSelector;
50import org.onosproject.net.flow.DefaultTrafficTreatment;
51import org.onosproject.net.flow.FlowRule;
52import org.onosproject.net.flow.FlowRuleEvent;
53import org.onosproject.net.flow.FlowRuleListener;
54import org.onosproject.net.flow.FlowRuleService;
55import org.onosproject.net.flow.TrafficSelector;
56import org.onosproject.net.flow.TrafficTreatment;
57import org.onosproject.net.flow.instructions.Instructions;
58import org.osgi.service.component.ComponentContext;
59import org.slf4j.Logger;
60
61import com.google.common.collect.Iterables;
62import com.google.common.collect.Lists;
63
64/**
65 * Application for measuring flow installation performance.
66 * <p>
67 * This application installs a bunch of flows, validates that all those flows have
68 * been successfully added and immediately proceeds to remove all the added flows.
69 */
70@Component(immediate = true, enabled = true)
71@Service(value = FlowPerfApp.class)
72public class FlowPerfApp {
73 private final Logger log = getLogger(getClass());
74
75 @Reference(cardinality = MANDATORY_UNARY)
76 protected DeviceService deviceService;
77
78 @Reference(cardinality = MANDATORY_UNARY)
79 protected FlowRuleService flowRuleService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected CoreService coreService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected ComponentConfigService configService;
86
87 protected ApplicationId appId;
88
89 private static final int DEFAULT_BATCH_SIZE = 200;
90 private static final int DEFAULT_TOTAL_THREADS = 1;
91 private static final int DEFAULT_TOTAL_FLOWS = 100000;
92 private AtomicInteger pendingBatchCount;
93 private CountDownLatch installationLatch;
94 private CountDownLatch uninstallationLatch;
95 private Iterator<Device> devices;
96 private AtomicLong macIndex;
97
98 List<FlowRule> addedRules = Lists.newArrayList();
99
100 @Property(name = "totalFlows", intValue = DEFAULT_TOTAL_FLOWS,
101 label = "Total number of flows")
102 protected int totalFlows = DEFAULT_TOTAL_FLOWS;
103
104 @Property(name = "batchSize", intValue = DEFAULT_BATCH_SIZE,
105 label = "Number of flows per batch")
106 protected int batchSize = DEFAULT_BATCH_SIZE;
107
108 @Property(name = "totalThreads", intValue = DEFAULT_TOTAL_THREADS,
109 label = "Number of installer threads")
110 protected int totalThreads = DEFAULT_TOTAL_THREADS;
111
112 private ExecutorService installer;
113 private ExecutorService testRunner =
114 Executors.newSingleThreadExecutor(Tools.groupedThreads("app/flow-perf-test-runner", ""));
115
116 @Activate
117 public void activate(ComponentContext context) {
118 appId = coreService.registerApplication("org.onosproject.flowperf");
119 configService.registerProperties(getClass());
120 installer = Executors.newFixedThreadPool(totalThreads, Tools.groupedThreads("app/flow-perf-worker", "%d"));
121 testRunner.submit(this::runTest);
122 log.info("Started");
123 }
124
125 @Deactivate
126 public void deactivate(ComponentContext context) {
127 installer.shutdown();
128 testRunner.shutdown();
129 configService.unregisterProperties(getClass(), false);
130 log.info("Stopped.");
131 }
132
133 private void runTest() {
134 pendingBatchCount = new AtomicInteger(totalFlows / batchSize);
135 installationLatch = new CountDownLatch(totalFlows);
136 List<Device> deviceList = Lists.newArrayList();
137 deviceService.getAvailableDevices().forEach(deviceList::add);
138 devices = Iterables.cycle(deviceList).iterator();
139 log.info("Starting installation. Total flows: {}, Total threads: {}, "
140 + "Batch Size: {}", totalFlows, totalThreads, batchSize);
141
142 macIndex = new AtomicLong(0);
143 FlowRuleListener addMonitor = event -> {
144 if (event.type() == FlowRuleEvent.Type.RULE_ADDED) {
145 installationLatch.countDown();
146 }
147 };
148
149 flowRuleService.addListener(addMonitor);
150 long addStartTime = System.currentTimeMillis();
151 for (int i = 0; i < totalThreads; ++i) {
152 installer.submit(() -> {
153 while (pendingBatchCount.getAndDecrement() > 0) {
154 List<FlowRule> batch = nextBatch(batchSize);
155 addedRules.addAll(batch);
156 flowRuleService.applyFlowRules(batch.toArray(new FlowRule[]{}));
157 }
158 });
159 }
160
161 // Wait till all the flows are in ADDED state.
162 try {
163 installationLatch.await();
164 } catch (InterruptedException e) {
165 Thread.interrupted();
166 }
167 log.info("Time to install {} flows: {} ms", totalFlows, System.currentTimeMillis() - addStartTime);
168 flowRuleService.removeListener(addMonitor);
169
170
171 uninstallationLatch = new CountDownLatch(totalFlows);
172 FlowRuleListener removeListener = event -> {
173 if (event.type() == FlowRuleEvent.Type.RULE_REMOVED) {
174 uninstallationLatch.countDown();
175 }
176 };
177 AtomicInteger currentIndex = new AtomicInteger(0);
178 long removeStartTime = System.currentTimeMillis();
179 flowRuleService.addListener(removeListener);
180 // Uninstallation runs on a single thread.
181 installer.submit(() -> {
182 while (currentIndex.get() < addedRules.size()) {
183 List<FlowRule> removeBatch = addedRules.subList(currentIndex.get(),
184 Math.min(currentIndex.get() + batchSize, addedRules.size()));
185 currentIndex.addAndGet(removeBatch.size());
186 flowRuleService.removeFlowRules(removeBatch.toArray(new FlowRule[]{}));
187 }
188 });
189 try {
190 uninstallationLatch.await();
191 } catch (InterruptedException e) {
192 Thread.interrupted();
193 }
194 log.info("Time to uninstall {} flows: {} ms", totalFlows, System.currentTimeMillis() - removeStartTime);
195 flowRuleService.removeListener(removeListener);
196 }
197
198 private List<FlowRule> nextBatch(int size) {
199 List<FlowRule> rules = Lists.newArrayList();
200 for (int i = 0; i < size; ++i) {
201 Device device = devices.next();
202 long srcMac = macIndex.incrementAndGet();
203 long dstMac = srcMac + 1;
204 TrafficSelector selector = DefaultTrafficSelector.builder()
205 .matchEthSrc(MacAddress.valueOf(srcMac))
206 .matchEthDst(MacAddress.valueOf(dstMac))
207 .matchInPort(PortNumber.portNumber(2))
208 .build();
209 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
210 .add(Instructions.createOutput(PortNumber.portNumber(3))).build();
211 FlowRule rule = new DefaultFlowRule(device.id(),
212 selector,
213 treatment,
214 100,
215 appId,
216 50000,
217 true,
218 null);
219 rules.add(rule);
220 }
221 return rules;
222 }
223
224 @Modified
225 public void modified(ComponentContext context) {
226 if (context == null) {
227 totalFlows = DEFAULT_TOTAL_FLOWS;
228 batchSize = DEFAULT_BATCH_SIZE;
229 totalThreads = DEFAULT_TOTAL_THREADS;
230 return;
231 }
232
233 Dictionary properties = context.getProperties();
234
235 int newTotalFlows = totalFlows;
236 int newBatchSize = batchSize;
237 int newTotalThreads = totalThreads;
238 try {
239 String s = get(properties, "batchSize");
240 newTotalFlows = isNullOrEmpty(s)
241 ? totalFlows : Integer.parseInt(s.trim());
242
243 s = get(properties, "batchSize");
244 newBatchSize = isNullOrEmpty(s)
245 ? batchSize : Integer.parseInt(s.trim());
246
247 s = get(properties, "totalThreads");
248 newTotalThreads = isNullOrEmpty(s)
249 ? totalThreads : Integer.parseInt(s.trim());
250
251 } catch (NumberFormatException | ClassCastException e) {
252 return;
253 }
254
255 boolean modified = newTotalFlows != totalFlows || newTotalThreads != totalThreads ||
256 newBatchSize != batchSize;
257
258 // If nothing has changed, simply return.
259 if (!modified) {
260 return;
261 }
262
263 totalFlows = newTotalFlows;
264 batchSize = newBatchSize;
265 if (totalThreads != newTotalThreads) {
266 totalThreads = newTotalThreads;
267 installer.shutdown();
268 installer = Executors.newFixedThreadPool(totalThreads, Tools.groupedThreads("flow-perf-worker", "%d"));
269 }
270 }
271}