blob: 27d1ca96b856301831fc83697c83572547b8ba36 [file] [log] [blame]
Thomas Vachuskab6451472015-03-31 16:03:56 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuskab6451472015-03-31 16:03:56 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.demo;
17
Ray Milkeyd13a37b2015-06-12 11:55:17 -070018import java.util.Collection;
19import java.util.Collections;
20import java.util.HashSet;
21import java.util.Iterator;
22import java.util.LinkedList;
23import java.util.List;
24import java.util.Objects;
25import java.util.Optional;
26import java.util.Random;
27import java.util.Set;
28import java.util.concurrent.Callable;
29import java.util.concurrent.CountDownLatch;
30import java.util.concurrent.ExecutionException;
31import java.util.concurrent.ExecutorService;
32import java.util.concurrent.Executors;
33import java.util.concurrent.Future;
34import java.util.concurrent.TimeUnit;
35import java.util.concurrent.TimeoutException;
36
Thomas Vachuskab6451472015-03-31 16:03:56 -070037import org.apache.commons.lang.math.RandomUtils;
38import org.apache.felix.scr.annotations.Activate;
39import org.apache.felix.scr.annotations.Component;
40import org.apache.felix.scr.annotations.Deactivate;
41import org.apache.felix.scr.annotations.Reference;
42import org.apache.felix.scr.annotations.ReferenceCardinality;
43import org.apache.felix.scr.annotations.Service;
44import org.onlab.packet.MacAddress;
45import org.onosproject.cluster.ClusterService;
46import org.onosproject.cluster.ControllerNode;
47import org.onosproject.cluster.NodeId;
48import org.onosproject.core.ApplicationId;
49import org.onosproject.core.CoreService;
50import org.onosproject.mastership.MastershipService;
51import org.onosproject.net.Device;
52import org.onosproject.net.Host;
53import org.onosproject.net.HostId;
54import org.onosproject.net.MastershipRole;
55import org.onosproject.net.PortNumber;
56import org.onosproject.net.device.DeviceService;
57import org.onosproject.net.flow.DefaultFlowRule;
58import org.onosproject.net.flow.DefaultTrafficSelector;
59import org.onosproject.net.flow.DefaultTrafficTreatment;
Ray Milkeyd13a37b2015-06-12 11:55:17 -070060import org.onosproject.net.flow.FlowRule;
Thomas Vachuskab6451472015-03-31 16:03:56 -070061import org.onosproject.net.flow.FlowRuleOperations;
62import org.onosproject.net.flow.FlowRuleOperationsContext;
63import org.onosproject.net.flow.FlowRuleService;
64import org.onosproject.net.flow.TrafficSelector;
65import org.onosproject.net.flow.TrafficTreatment;
66import org.onosproject.net.host.HostService;
67import org.onosproject.net.intent.Constraint;
68import org.onosproject.net.intent.HostToHostIntent;
69import org.onosproject.net.intent.Intent;
70import org.onosproject.net.intent.IntentService;
71import org.slf4j.Logger;
72
Ray Milkeyd13a37b2015-06-12 11:55:17 -070073import com.fasterxml.jackson.databind.JsonNode;
74import com.fasterxml.jackson.databind.ObjectMapper;
75import com.fasterxml.jackson.databind.node.ObjectNode;
76import com.google.common.base.Predicate;
77import com.google.common.base.Stopwatch;
78import com.google.common.collect.FluentIterable;
79import com.google.common.collect.Lists;
80import com.google.common.collect.Sets;
81import com.google.common.util.concurrent.ThreadFactoryBuilder;
Thomas Vachuskab6451472015-03-31 16:03:56 -070082
83import static org.slf4j.LoggerFactory.getLogger;
84
85/**
86 * Application to set up demos.
87 */
88@Component(immediate = true)
89@Service
90public class DemoInstaller implements DemoAPI {
91
92 private final Logger log = getLogger(getClass());
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
95 protected CoreService coreService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected IntentService intentService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101 protected HostService hostService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected MastershipService mastershipService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected ClusterService clusterService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected DeviceService deviceService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected FlowRuleService flowService;
114
115 private ExecutorService worker;
116
117 private ExecutorService installWorker;
118
119 private ApplicationId appId;
120
121 private final Set<Intent> existingIntents = new HashSet<>();
122 private RandomInstaller randomInstaller;
123
124 private ObjectMapper mapper = new ObjectMapper();
125
126
127
128 @Activate
129 public void activate() {
130 String nodeId = clusterService.getLocalNode().ip().toString();
131 appId = coreService.registerApplication("org.onosproject.demo.installer."
132 + nodeId);
133 worker = Executors.newFixedThreadPool(1,
134 new ThreadFactoryBuilder()
135 .setNameFormat("demo-app-worker")
136 .build());
137 log.info("Started with Application ID {}", appId.id());
138 }
139
140 @Deactivate
141 public void deactivate() {
142 shutdownAndAwaitTermination(worker);
143 if (installWorker != null && !installWorker.isShutdown()) {
144 shutdownAndAwaitTermination(installWorker);
145 }
146 log.info("Stopped");
147 }
148
149 @Override
150 public JsonNode flowTest(Optional<JsonNode> params) {
151 int flowsPerDevice = 1000;
152 int neighbours = 0;
153 boolean remove = true;
154 if (params.isPresent()) {
155 flowsPerDevice = params.get().get("flowsPerDevice").asInt();
156 neighbours = params.get().get("neighbours").asInt();
157 remove = params.get().get("remove").asBoolean();
158 }
159
160 Future<JsonNode> future = worker.submit(new FlowTest(flowsPerDevice, neighbours, remove));
161
162 try {
163 return future.get(10, TimeUnit.SECONDS);
164 } catch (InterruptedException | ExecutionException | TimeoutException e) {
165 ObjectNode node = mapper.createObjectNode();
166 node.put("Error", e.getMessage());
167 return node;
168 }
169 }
170
171 @Override
172 public void setup(InstallType type, Optional<JsonNode> runParams) {
173 switch (type) {
174 case MESH:
175 log.debug("Installing mesh intents");
176 worker.execute(new MeshInstaller());
177 break;
178 case RANDOM:
179 //check that we do not have a random installer running
180 if (installWorker == null || installWorker.isShutdown()) {
181 installWorker = Executors.newFixedThreadPool(1,
182 new ThreadFactoryBuilder()
183 .setNameFormat("random-worker")
184 .build());
185 log.debug("Installing random sequence of intents");
186 randomInstaller = new RandomInstaller(runParams);
187 installWorker.execute(randomInstaller);
188 } else {
189 log.warn("Random installer is already running");
190 }
191 break;
192 default:
193 throw new IllegalArgumentException("What is it you want exactly?");
194 }
195 }
196
197 @Override
198 public void tearDown() {
199 worker.submit(new UnInstaller());
200 }
201
202
203 /**
204 * Simply installs a mesh of intents from all the hosts existing in the network.
205 */
206 private class MeshInstaller implements Runnable {
207
208 @Override
209 public void run() {
210 TrafficSelector selector = DefaultTrafficSelector.emptySelector();
211 TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
212 List<Constraint> constraint = Lists.newArrayList();
213 List<Host> hosts = Lists.newArrayList(hostService.getHosts());
214 while (!hosts.isEmpty()) {
215 Host src = hosts.remove(0);
216 for (Host dst : hosts) {
217 HostToHostIntent intent = HostToHostIntent.builder()
218 .appId(appId)
219 .one(src.id())
220 .two(dst.id())
221 .selector(selector)
222 .treatment(treatment)
223 .constraints(constraint)
224 .build();
225 existingIntents.add(intent);
226 intentService.submit(intent);
227 }
228 }
229 }
230 }
231
232 /**
233 * Randomly installs and withdraws intents.
234 */
235 private class RandomInstaller implements Runnable {
236
237 private final boolean isLocal;
238 private final Set<Host> hosts;
239
240 private final Random random = new Random(System.currentTimeMillis());
241
242 private Set<HostPair> uninstalledOrWithdrawn;
243 private Set<HostPair> installed;
244
245 private CountDownLatch latch;
246
247 //used to wait on a batch to be processed.
248 private static final int ITERATIONMAX = 50000000;
249
250
251 public RandomInstaller(Optional<JsonNode> runParams) {
252 /*
253 Check if we have params and honour them. Otherwise
254 set defaults to processing only local stuff and
255 all local hosts.
256 */
257 if (runParams.isPresent()) {
258 JsonNode node = runParams.get();
259 isLocal = node.get("local").asBoolean();
260 hosts = node.get("hosts") == null ? Sets.newHashSet(hostService.getHosts()) :
261 constructHostIds(node.get("hosts").elements());
262 } else {
263 isLocal = true;
264 hosts = Sets.newHashSet(hostService.getHosts());
265 }
266
267 //construct list of intents.
268 installed = Sets.newHashSet();
269 if (isLocal) {
270 uninstalledOrWithdrawn = buildPairs(pruneHostsByMasterShip());
271 } else {
272 uninstalledOrWithdrawn = buildPairs(hosts);
273 }
274
275 }
276
277 private Set<Host> constructHostIds(Iterator<JsonNode> elements) {
278 Set<Host> hostIds = Sets.newHashSet();
279 JsonNode n;
280 while (elements.hasNext()) {
281 n = elements.next();
282 hostIds.add(hostService.getHost(HostId.hostId(n.textValue())));
283 }
284 return hostIds;
285 }
286
287 @Override
288 public void run() {
289 if (!installWorker.isShutdown()) {
290 randomize();
291 latch = new CountDownLatch(1);
292 try {
293 trackIntents();
294 } catch (InterruptedException e) {
295 shutdown();
296 }
297 }
298
299 }
300
301
302 /**
303 * Check whether the previously submitted batch is in progress
304 * and if yes submit the next one. If things hang, wait for at
305 * most 5 seconds and bail.
306 * @throws InterruptedException if the thread go interupted
307 */
308 private void trackIntents() throws InterruptedException {
309 //FIXME
310 // TODO generate keys for each set of intents to allow manager to throttle
311 // TODO may also look into the store to see how many operations are pending
312
313 //if everything is good proceed.
314 if (!installWorker.isShutdown()) {
315 installWorker.execute(this);
316 }
317
318 }
319
320 public void shutdown() {
321 log.warn("Shutting down random installer!");
322 cleanUp();
323 }
324
325
326 /**
327 * Shuffle the uninstalled and installed list (separately) and select
328 * a random number of them and install or uninstall them respectively.
329 */
330 private void randomize() {
331 List<HostPair> hostList = new LinkedList<>(uninstalledOrWithdrawn);
332 Collections.shuffle(hostList);
333 List<HostPair> toInstall = hostList.subList(0,
334 random.nextInt(hostList.size() - 1));
335 List<HostPair> toRemove;
336 if (!installed.isEmpty()) {
337 hostList = new LinkedList<>(installed);
338 Collections.shuffle(hostList);
339 toRemove = hostList.subList(0,
340 random.nextInt(hostList.size() - 1));
341 uninstallIntents(toRemove);
342 }
343 installIntents(toInstall);
344
345 }
346
347 private void installIntents(List<HostPair> toInstall) {
348 for (HostPair pair : toInstall) {
349 installed.add(pair);
350 uninstalledOrWithdrawn.remove(pair);
351 intentService.submit(pair.h2hIntent());
352 }
353 }
354
355 private void uninstallIntents(Collection<HostPair> toRemove) {
356 for (HostPair pair : toRemove) {
357 installed.remove(pair);
358 uninstalledOrWithdrawn.add(pair);
359 intentService.withdraw(pair.h2hIntent());
360 }
361 }
362
363 /**
364 * Take everything and remove it all.
365 */
366 private void cleanUp() {
367 List<HostPair> allPairs = Lists.newArrayList(installed);
368 allPairs.addAll(uninstalledOrWithdrawn);
369 for (HostPair pair : allPairs) {
370 intentService.withdraw(pair.h2hIntent());
371 }
372 }
373
374
375 private Set<HostPair> buildPairs(Set<Host> hosts) {
376 Set<HostPair> pairs = Sets.newHashSet();
377 Iterator<Host> it = Sets.newHashSet(hosts).iterator();
378 while (it.hasNext()) {
379 Host src = it.next();
380 it.remove();
381 for (Host dst : hosts) {
382 pairs.add(new HostPair(src, dst));
383 }
384 }
385 return pairs;
386 }
387
388 private Set<Host> pruneHostsByMasterShip() {
389 return FluentIterable.from(hosts)
390 .filter(hasLocalMaster())
391 .toSet();
392
393 }
394
395 private Predicate<? super Host> hasLocalMaster() {
396 return new Predicate<Host>() {
397 @Override
398 public boolean apply(Host host) {
399 return mastershipService.getLocalRole(
400 host.location().deviceId()).equals(MastershipRole.MASTER);
401 }
402 };
403 }
404
405
406 /**
407 * Simple class representing a pair of hosts and precomputes the associated
408 * h2h intent.
409 */
410 private class HostPair {
411
412 private final Host src;
413 private final Host dst;
414
415 private final TrafficSelector selector = DefaultTrafficSelector.emptySelector();
416 private final TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
417 private final List<Constraint> constraint = Lists.newArrayList();
418 private final HostToHostIntent intent;
419
420 public HostPair(Host src, Host dst) {
421 this.src = src;
422 this.dst = dst;
423 this.intent = HostToHostIntent.builder()
424 .appId(appId)
425 .one(src.id())
426 .two(dst.id())
427 .selector(selector)
428 .treatment(treatment)
429 .constraints(constraint)
430 .build();
431 }
432
433 public HostToHostIntent h2hIntent() {
434 return intent;
435 }
436
437 @Override
438 public boolean equals(Object o) {
439 if (this == o) {
440 return true;
441 }
442 if (o == null || getClass() != o.getClass()) {
443 return false;
444 }
445
446 HostPair hostPair = (HostPair) o;
447
448 return Objects.equals(src, hostPair.src) &&
449 Objects.equals(dst, hostPair.dst);
450
451 }
452
453 @Override
454 public int hashCode() {
455 return Objects.hash(src, dst);
456 }
457
458
459 }
460
461 }
462
463 /**
464 * Remove anything that is running and clear it all out.
465 */
466 private class UnInstaller implements Runnable {
467 @Override
468 public void run() {
469 if (!existingIntents.isEmpty()) {
470 clearExistingIntents();
471 }
472
473 if (installWorker != null && !installWorker.isShutdown()) {
474 shutdownAndAwaitTermination(installWorker);
475 randomInstaller.shutdown();
476 }
477 }
478
479 private void clearExistingIntents() {
480 for (Intent i : existingIntents) {
481 intentService.withdraw(i);
482 }
483 existingIntents.clear();
484 }
485 }
486
487 /**
488 * Shutdown a pool cleanly if possible.
489 *
490 * @param pool an executorService
491 */
492 private void shutdownAndAwaitTermination(ExecutorService pool) {
493 pool.shutdown(); // Disable new tasks from being submitted
494 try {
495 // Wait a while for existing tasks to terminate
496 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
497 pool.shutdownNow(); // Cancel currently executing tasks
498 // Wait a while for tasks to respond to being cancelled
499 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
500 log.error("Pool did not terminate");
501 }
502 }
503 } catch (Exception ie) {
504 // (Re-)Cancel if current thread also interrupted
505 pool.shutdownNow();
506 // Preserve interrupt status
507 Thread.currentThread().interrupt();
508 }
509 }
510
511 private class FlowTest implements Callable<JsonNode> {
512 private final int flowPerDevice;
513 private final int neighbours;
514 private final boolean remove;
515 private FlowRuleOperations.Builder adds;
516 private FlowRuleOperations.Builder removes;
517
518 public FlowTest(int flowsPerDevice, int neighbours, boolean remove) {
519 this.flowPerDevice = flowsPerDevice;
520 this.neighbours = neighbours;
521 this.remove = remove;
522 prepareInstallation();
523 }
524
525 private void prepareInstallation() {
526 Set<ControllerNode> instances = Sets.newHashSet(clusterService.getNodes());
527 instances.remove(clusterService.getLocalNode());
528 Set<NodeId> acceptableNodes = Sets.newHashSet();
529 if (neighbours >= instances.size()) {
530 instances.forEach(instance -> acceptableNodes.add(instance.id()));
531 } else {
532 Iterator<ControllerNode> nodes = instances.iterator();
533 for (int i = neighbours; i > 0; i--) {
534 acceptableNodes.add(nodes.next().id());
535 }
536 }
537 acceptableNodes.add(clusterService.getLocalNode().id());
538
539 Set<Device> devices = Sets.newHashSet();
540 for (Device dev : deviceService.getDevices()) {
541 if (acceptableNodes.contains(
542 mastershipService.getMasterFor(dev.id()))) {
543 devices.add(dev);
544 }
545 }
546
547 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
548 .setOutput(PortNumber.portNumber(RandomUtils.nextInt())).build();
549 TrafficSelector.Builder sbuilder;
550 FlowRuleOperations.Builder rules = FlowRuleOperations.builder();
551 FlowRuleOperations.Builder remove = FlowRuleOperations.builder();
552
553 for (Device d : devices) {
554 for (int i = 0; i < this.flowPerDevice; i++) {
555 sbuilder = DefaultTrafficSelector.builder();
556
557 sbuilder.matchEthSrc(MacAddress.valueOf(RandomUtils.nextInt() * i))
558 .matchEthDst(MacAddress.valueOf((Integer.MAX_VALUE - i) * RandomUtils.nextInt()));
559
560
561 int randomPriority = RandomUtils.nextInt();
Ray Milkeyd13a37b2015-06-12 11:55:17 -0700562 FlowRule f = DefaultFlowRule.builder()
563 .forDevice(d.id())
564 .withSelector(sbuilder.build())
565 .withTreatment(treatment)
566 .withPriority(randomPriority)
567 .fromApp(appId)
568 .makeTemporary(10)
569 .build();
Thomas Vachuskab6451472015-03-31 16:03:56 -0700570 rules.add(f);
571 remove.remove(f);
572
573 }
574 }
575
576 this.adds = rules;
577 this.removes = remove;
578 }
579
580 @Override
581 public JsonNode call() throws Exception {
582 ObjectNode node = mapper.createObjectNode();
583 CountDownLatch latch = new CountDownLatch(1);
584 flowService.apply(adds.build(new FlowRuleOperationsContext() {
585
586 private final Stopwatch timer = Stopwatch.createStarted();
587
588 @Override
589 public void onSuccess(FlowRuleOperations ops) {
590
591 long elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
592 node.put("elapsed", elapsed);
593
594
595 latch.countDown();
596 }
597 }));
598
599 latch.await(10, TimeUnit.SECONDS);
600 if (this.remove) {
601 flowService.apply(removes.build());
602 }
603 return node;
604 }
605 }
606}
607
608