blob: d12133cefc9f713e11ec92dfd111f1f0d1c6e466 [file] [log] [blame]
Thomas Vachuskab6451472015-03-31 16:03:56 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Thomas Vachuskab6451472015-03-31 16:03:56 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.demo;
17
Ray Milkeyd13a37b2015-06-12 11:55:17 -070018import java.util.Collection;
19import java.util.Collections;
20import java.util.HashSet;
21import java.util.Iterator;
22import java.util.LinkedList;
23import java.util.List;
24import java.util.Objects;
25import java.util.Optional;
26import java.util.Random;
27import java.util.Set;
28import java.util.concurrent.Callable;
29import java.util.concurrent.CountDownLatch;
30import java.util.concurrent.ExecutionException;
31import java.util.concurrent.ExecutorService;
32import java.util.concurrent.Executors;
33import java.util.concurrent.Future;
34import java.util.concurrent.TimeUnit;
35import java.util.concurrent.TimeoutException;
36
Thomas Vachuskab6451472015-03-31 16:03:56 -070037import org.apache.commons.lang.math.RandomUtils;
38import org.apache.felix.scr.annotations.Activate;
39import org.apache.felix.scr.annotations.Component;
40import org.apache.felix.scr.annotations.Deactivate;
41import org.apache.felix.scr.annotations.Reference;
42import org.apache.felix.scr.annotations.ReferenceCardinality;
43import org.apache.felix.scr.annotations.Service;
44import org.onlab.packet.MacAddress;
45import org.onosproject.cluster.ClusterService;
46import org.onosproject.cluster.ControllerNode;
47import org.onosproject.cluster.NodeId;
48import org.onosproject.core.ApplicationId;
49import org.onosproject.core.CoreService;
50import org.onosproject.mastership.MastershipService;
51import org.onosproject.net.Device;
52import org.onosproject.net.Host;
53import org.onosproject.net.HostId;
54import org.onosproject.net.MastershipRole;
55import org.onosproject.net.PortNumber;
56import org.onosproject.net.device.DeviceService;
57import org.onosproject.net.flow.DefaultFlowRule;
58import org.onosproject.net.flow.DefaultTrafficSelector;
59import org.onosproject.net.flow.DefaultTrafficTreatment;
Ray Milkeyd13a37b2015-06-12 11:55:17 -070060import org.onosproject.net.flow.FlowRule;
Thomas Vachuskab6451472015-03-31 16:03:56 -070061import org.onosproject.net.flow.FlowRuleOperations;
62import org.onosproject.net.flow.FlowRuleOperationsContext;
63import org.onosproject.net.flow.FlowRuleService;
64import org.onosproject.net.flow.TrafficSelector;
65import org.onosproject.net.flow.TrafficTreatment;
66import org.onosproject.net.host.HostService;
67import org.onosproject.net.intent.Constraint;
68import org.onosproject.net.intent.HostToHostIntent;
69import org.onosproject.net.intent.Intent;
70import org.onosproject.net.intent.IntentService;
71import org.slf4j.Logger;
72
Ray Milkeyd13a37b2015-06-12 11:55:17 -070073import com.fasterxml.jackson.databind.JsonNode;
74import com.fasterxml.jackson.databind.ObjectMapper;
75import com.fasterxml.jackson.databind.node.ObjectNode;
76import com.google.common.base.Predicate;
77import com.google.common.base.Stopwatch;
78import com.google.common.collect.FluentIterable;
79import com.google.common.collect.Lists;
80import com.google.common.collect.Sets;
81import com.google.common.util.concurrent.ThreadFactoryBuilder;
Thomas Vachuskab6451472015-03-31 16:03:56 -070082
83import static org.slf4j.LoggerFactory.getLogger;
84
85/**
86 * Application to set up demos.
87 */
88@Component(immediate = true)
89@Service
Jonathan Hartd9df7bd2015-11-10 17:10:25 -080090public class DemoInstaller implements DemoApi {
Thomas Vachuskab6451472015-03-31 16:03:56 -070091
92 private final Logger log = getLogger(getClass());
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
95 protected CoreService coreService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected IntentService intentService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101 protected HostService hostService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected MastershipService mastershipService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected ClusterService clusterService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected DeviceService deviceService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected FlowRuleService flowService;
114
115 private ExecutorService worker;
116
117 private ExecutorService installWorker;
118
119 private ApplicationId appId;
120
121 private final Set<Intent> existingIntents = new HashSet<>();
122 private RandomInstaller randomInstaller;
123
124 private ObjectMapper mapper = new ObjectMapper();
125
126
127
128 @Activate
129 public void activate() {
130 String nodeId = clusterService.getLocalNode().ip().toString();
131 appId = coreService.registerApplication("org.onosproject.demo.installer."
132 + nodeId);
133 worker = Executors.newFixedThreadPool(1,
134 new ThreadFactoryBuilder()
135 .setNameFormat("demo-app-worker")
136 .build());
137 log.info("Started with Application ID {}", appId.id());
138 }
139
140 @Deactivate
141 public void deactivate() {
142 shutdownAndAwaitTermination(worker);
143 if (installWorker != null && !installWorker.isShutdown()) {
144 shutdownAndAwaitTermination(installWorker);
145 }
146 log.info("Stopped");
147 }
148
149 @Override
150 public JsonNode flowTest(Optional<JsonNode> params) {
151 int flowsPerDevice = 1000;
152 int neighbours = 0;
153 boolean remove = true;
154 if (params.isPresent()) {
155 flowsPerDevice = params.get().get("flowsPerDevice").asInt();
156 neighbours = params.get().get("neighbours").asInt();
157 remove = params.get().get("remove").asBoolean();
158 }
159
160 Future<JsonNode> future = worker.submit(new FlowTest(flowsPerDevice, neighbours, remove));
161
162 try {
163 return future.get(10, TimeUnit.SECONDS);
164 } catch (InterruptedException | ExecutionException | TimeoutException e) {
165 ObjectNode node = mapper.createObjectNode();
166 node.put("Error", e.getMessage());
167 return node;
168 }
169 }
170
171 @Override
172 public void setup(InstallType type, Optional<JsonNode> runParams) {
173 switch (type) {
174 case MESH:
175 log.debug("Installing mesh intents");
176 worker.execute(new MeshInstaller());
177 break;
178 case RANDOM:
179 //check that we do not have a random installer running
180 if (installWorker == null || installWorker.isShutdown()) {
181 installWorker = Executors.newFixedThreadPool(1,
182 new ThreadFactoryBuilder()
183 .setNameFormat("random-worker")
184 .build());
185 log.debug("Installing random sequence of intents");
186 randomInstaller = new RandomInstaller(runParams);
187 installWorker.execute(randomInstaller);
188 } else {
189 log.warn("Random installer is already running");
190 }
191 break;
192 default:
193 throw new IllegalArgumentException("What is it you want exactly?");
194 }
195 }
196
197 @Override
198 public void tearDown() {
199 worker.submit(new UnInstaller());
200 }
201
202
203 /**
204 * Simply installs a mesh of intents from all the hosts existing in the network.
205 */
206 private class MeshInstaller implements Runnable {
207
208 @Override
209 public void run() {
210 TrafficSelector selector = DefaultTrafficSelector.emptySelector();
211 TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
212 List<Constraint> constraint = Lists.newArrayList();
213 List<Host> hosts = Lists.newArrayList(hostService.getHosts());
214 while (!hosts.isEmpty()) {
215 Host src = hosts.remove(0);
216 for (Host dst : hosts) {
217 HostToHostIntent intent = HostToHostIntent.builder()
218 .appId(appId)
219 .one(src.id())
220 .two(dst.id())
221 .selector(selector)
222 .treatment(treatment)
223 .constraints(constraint)
224 .build();
225 existingIntents.add(intent);
226 intentService.submit(intent);
227 }
228 }
229 }
230 }
231
232 /**
233 * Randomly installs and withdraws intents.
234 */
235 private class RandomInstaller implements Runnable {
236
237 private final boolean isLocal;
238 private final Set<Host> hosts;
239
240 private final Random random = new Random(System.currentTimeMillis());
241
242 private Set<HostPair> uninstalledOrWithdrawn;
243 private Set<HostPair> installed;
244
245 private CountDownLatch latch;
246
247 //used to wait on a batch to be processed.
248 private static final int ITERATIONMAX = 50000000;
249
250
251 public RandomInstaller(Optional<JsonNode> runParams) {
252 /*
253 Check if we have params and honour them. Otherwise
254 set defaults to processing only local stuff and
255 all local hosts.
256 */
257 if (runParams.isPresent()) {
258 JsonNode node = runParams.get();
259 isLocal = node.get("local").asBoolean();
260 hosts = node.get("hosts") == null ? Sets.newHashSet(hostService.getHosts()) :
261 constructHostIds(node.get("hosts").elements());
262 } else {
263 isLocal = true;
264 hosts = Sets.newHashSet(hostService.getHosts());
265 }
266
267 //construct list of intents.
268 installed = Sets.newHashSet();
269 if (isLocal) {
270 uninstalledOrWithdrawn = buildPairs(pruneHostsByMasterShip());
271 } else {
272 uninstalledOrWithdrawn = buildPairs(hosts);
273 }
274
275 }
276
277 private Set<Host> constructHostIds(Iterator<JsonNode> elements) {
278 Set<Host> hostIds = Sets.newHashSet();
279 JsonNode n;
280 while (elements.hasNext()) {
281 n = elements.next();
282 hostIds.add(hostService.getHost(HostId.hostId(n.textValue())));
283 }
284 return hostIds;
285 }
286
287 @Override
288 public void run() {
289 if (!installWorker.isShutdown()) {
290 randomize();
291 latch = new CountDownLatch(1);
292 try {
293 trackIntents();
294 } catch (InterruptedException e) {
295 shutdown();
296 }
297 }
298
299 }
300
301
302 /**
303 * Check whether the previously submitted batch is in progress
304 * and if yes submit the next one. If things hang, wait for at
305 * most 5 seconds and bail.
306 * @throws InterruptedException if the thread go interupted
307 */
308 private void trackIntents() throws InterruptedException {
309 //FIXME
310 // TODO generate keys for each set of intents to allow manager to throttle
311 // TODO may also look into the store to see how many operations are pending
312
313 //if everything is good proceed.
314 if (!installWorker.isShutdown()) {
315 installWorker.execute(this);
316 }
317
318 }
319
320 public void shutdown() {
321 log.warn("Shutting down random installer!");
322 cleanUp();
323 }
324
325
326 /**
327 * Shuffle the uninstalled and installed list (separately) and select
328 * a random number of them and install or uninstall them respectively.
329 */
330 private void randomize() {
331 List<HostPair> hostList = new LinkedList<>(uninstalledOrWithdrawn);
332 Collections.shuffle(hostList);
333 List<HostPair> toInstall = hostList.subList(0,
334 random.nextInt(hostList.size() - 1));
335 List<HostPair> toRemove;
336 if (!installed.isEmpty()) {
337 hostList = new LinkedList<>(installed);
338 Collections.shuffle(hostList);
339 toRemove = hostList.subList(0,
340 random.nextInt(hostList.size() - 1));
341 uninstallIntents(toRemove);
342 }
343 installIntents(toInstall);
344
345 }
346
347 private void installIntents(List<HostPair> toInstall) {
348 for (HostPair pair : toInstall) {
349 installed.add(pair);
350 uninstalledOrWithdrawn.remove(pair);
351 intentService.submit(pair.h2hIntent());
352 }
353 }
354
355 private void uninstallIntents(Collection<HostPair> toRemove) {
356 for (HostPair pair : toRemove) {
357 installed.remove(pair);
358 uninstalledOrWithdrawn.add(pair);
359 intentService.withdraw(pair.h2hIntent());
360 }
361 }
362
363 /**
364 * Take everything and remove it all.
365 */
366 private void cleanUp() {
367 List<HostPair> allPairs = Lists.newArrayList(installed);
368 allPairs.addAll(uninstalledOrWithdrawn);
369 for (HostPair pair : allPairs) {
370 intentService.withdraw(pair.h2hIntent());
371 }
372 }
373
374
375 private Set<HostPair> buildPairs(Set<Host> hosts) {
376 Set<HostPair> pairs = Sets.newHashSet();
377 Iterator<Host> it = Sets.newHashSet(hosts).iterator();
378 while (it.hasNext()) {
379 Host src = it.next();
380 it.remove();
381 for (Host dst : hosts) {
382 pairs.add(new HostPair(src, dst));
383 }
384 }
385 return pairs;
386 }
387
388 private Set<Host> pruneHostsByMasterShip() {
389 return FluentIterable.from(hosts)
390 .filter(hasLocalMaster())
391 .toSet();
392
393 }
394
395 private Predicate<? super Host> hasLocalMaster() {
Sho SHIMIZU74626412015-09-11 11:46:27 -0700396 return host -> mastershipService.getLocalRole(
397 host.location().deviceId()).equals(MastershipRole.MASTER);
Thomas Vachuskab6451472015-03-31 16:03:56 -0700398 }
399
400
401 /**
402 * Simple class representing a pair of hosts and precomputes the associated
403 * h2h intent.
404 */
405 private class HostPair {
406
407 private final Host src;
408 private final Host dst;
409
410 private final TrafficSelector selector = DefaultTrafficSelector.emptySelector();
411 private final TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
412 private final List<Constraint> constraint = Lists.newArrayList();
413 private final HostToHostIntent intent;
414
415 public HostPair(Host src, Host dst) {
416 this.src = src;
417 this.dst = dst;
418 this.intent = HostToHostIntent.builder()
419 .appId(appId)
420 .one(src.id())
421 .two(dst.id())
422 .selector(selector)
423 .treatment(treatment)
424 .constraints(constraint)
425 .build();
426 }
427
428 public HostToHostIntent h2hIntent() {
429 return intent;
430 }
431
432 @Override
433 public boolean equals(Object o) {
434 if (this == o) {
435 return true;
436 }
437 if (o == null || getClass() != o.getClass()) {
438 return false;
439 }
440
441 HostPair hostPair = (HostPair) o;
442
443 return Objects.equals(src, hostPair.src) &&
444 Objects.equals(dst, hostPair.dst);
445
446 }
447
448 @Override
449 public int hashCode() {
450 return Objects.hash(src, dst);
451 }
452
453
454 }
455
456 }
457
458 /**
459 * Remove anything that is running and clear it all out.
460 */
461 private class UnInstaller implements Runnable {
462 @Override
463 public void run() {
464 if (!existingIntents.isEmpty()) {
465 clearExistingIntents();
466 }
467
468 if (installWorker != null && !installWorker.isShutdown()) {
469 shutdownAndAwaitTermination(installWorker);
470 randomInstaller.shutdown();
471 }
472 }
473
474 private void clearExistingIntents() {
475 for (Intent i : existingIntents) {
476 intentService.withdraw(i);
477 }
478 existingIntents.clear();
479 }
480 }
481
482 /**
483 * Shutdown a pool cleanly if possible.
484 *
485 * @param pool an executorService
486 */
487 private void shutdownAndAwaitTermination(ExecutorService pool) {
488 pool.shutdown(); // Disable new tasks from being submitted
489 try {
490 // Wait a while for existing tasks to terminate
491 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
492 pool.shutdownNow(); // Cancel currently executing tasks
493 // Wait a while for tasks to respond to being cancelled
494 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
495 log.error("Pool did not terminate");
496 }
497 }
498 } catch (Exception ie) {
499 // (Re-)Cancel if current thread also interrupted
500 pool.shutdownNow();
501 // Preserve interrupt status
502 Thread.currentThread().interrupt();
503 }
504 }
505
506 private class FlowTest implements Callable<JsonNode> {
507 private final int flowPerDevice;
508 private final int neighbours;
509 private final boolean remove;
510 private FlowRuleOperations.Builder adds;
511 private FlowRuleOperations.Builder removes;
512
513 public FlowTest(int flowsPerDevice, int neighbours, boolean remove) {
514 this.flowPerDevice = flowsPerDevice;
515 this.neighbours = neighbours;
516 this.remove = remove;
517 prepareInstallation();
518 }
519
520 private void prepareInstallation() {
521 Set<ControllerNode> instances = Sets.newHashSet(clusterService.getNodes());
522 instances.remove(clusterService.getLocalNode());
523 Set<NodeId> acceptableNodes = Sets.newHashSet();
524 if (neighbours >= instances.size()) {
525 instances.forEach(instance -> acceptableNodes.add(instance.id()));
526 } else {
527 Iterator<ControllerNode> nodes = instances.iterator();
528 for (int i = neighbours; i > 0; i--) {
529 acceptableNodes.add(nodes.next().id());
530 }
531 }
532 acceptableNodes.add(clusterService.getLocalNode().id());
533
534 Set<Device> devices = Sets.newHashSet();
535 for (Device dev : deviceService.getDevices()) {
536 if (acceptableNodes.contains(
537 mastershipService.getMasterFor(dev.id()))) {
538 devices.add(dev);
539 }
540 }
541
542 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
543 .setOutput(PortNumber.portNumber(RandomUtils.nextInt())).build();
544 TrafficSelector.Builder sbuilder;
545 FlowRuleOperations.Builder rules = FlowRuleOperations.builder();
546 FlowRuleOperations.Builder remove = FlowRuleOperations.builder();
547
548 for (Device d : devices) {
549 for (int i = 0; i < this.flowPerDevice; i++) {
550 sbuilder = DefaultTrafficSelector.builder();
551
552 sbuilder.matchEthSrc(MacAddress.valueOf(RandomUtils.nextInt() * i))
553 .matchEthDst(MacAddress.valueOf((Integer.MAX_VALUE - i) * RandomUtils.nextInt()));
554
555
556 int randomPriority = RandomUtils.nextInt();
Ray Milkeyd13a37b2015-06-12 11:55:17 -0700557 FlowRule f = DefaultFlowRule.builder()
558 .forDevice(d.id())
559 .withSelector(sbuilder.build())
560 .withTreatment(treatment)
561 .withPriority(randomPriority)
562 .fromApp(appId)
563 .makeTemporary(10)
564 .build();
Thomas Vachuskab6451472015-03-31 16:03:56 -0700565 rules.add(f);
566 remove.remove(f);
567
568 }
569 }
570
571 this.adds = rules;
572 this.removes = remove;
573 }
574
575 @Override
576 public JsonNode call() throws Exception {
577 ObjectNode node = mapper.createObjectNode();
578 CountDownLatch latch = new CountDownLatch(1);
579 flowService.apply(adds.build(new FlowRuleOperationsContext() {
580
581 private final Stopwatch timer = Stopwatch.createStarted();
582
583 @Override
584 public void onSuccess(FlowRuleOperations ops) {
585
586 long elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
587 node.put("elapsed", elapsed);
588
589
590 latch.countDown();
591 }
592 }));
593
594 latch.await(10, TimeUnit.SECONDS);
595 if (this.remove) {
596 flowService.apply(removes.build());
597 }
598 return node;
599 }
600 }
601}
602
603