blob: 1bad7a78b155130343c741f814e05c126c830b8c [file] [log] [blame]
Thomas Vachuskab6451472015-03-31 16:03:56 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.demo;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ObjectNode;
21import com.google.common.base.Predicate;
22import com.google.common.base.Stopwatch;
23import com.google.common.collect.FluentIterable;
24import com.google.common.collect.Lists;
25import com.google.common.collect.Sets;
26import com.google.common.util.concurrent.ThreadFactoryBuilder;
27import org.apache.commons.lang.math.RandomUtils;
28import org.apache.felix.scr.annotations.Activate;
29import org.apache.felix.scr.annotations.Component;
30import org.apache.felix.scr.annotations.Deactivate;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
34import org.onlab.packet.MacAddress;
35import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.ControllerNode;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.Device;
42import org.onosproject.net.Host;
43import org.onosproject.net.HostId;
44import org.onosproject.net.MastershipRole;
45import org.onosproject.net.PortNumber;
46import org.onosproject.net.device.DeviceService;
47import org.onosproject.net.flow.DefaultFlowRule;
48import org.onosproject.net.flow.DefaultTrafficSelector;
49import org.onosproject.net.flow.DefaultTrafficTreatment;
50import org.onosproject.net.flow.FlowRuleOperations;
51import org.onosproject.net.flow.FlowRuleOperationsContext;
52import org.onosproject.net.flow.FlowRuleService;
53import org.onosproject.net.flow.TrafficSelector;
54import org.onosproject.net.flow.TrafficTreatment;
55import org.onosproject.net.host.HostService;
56import org.onosproject.net.intent.Constraint;
57import org.onosproject.net.intent.HostToHostIntent;
58import org.onosproject.net.intent.Intent;
59import org.onosproject.net.intent.IntentService;
60import org.slf4j.Logger;
61
62import java.util.Collection;
63import java.util.Collections;
64import java.util.HashSet;
65import java.util.Iterator;
66import java.util.LinkedList;
67import java.util.List;
68import java.util.Objects;
69import java.util.Optional;
70import java.util.Random;
71import java.util.Set;
72import java.util.concurrent.Callable;
73import java.util.concurrent.CountDownLatch;
74import java.util.concurrent.ExecutionException;
75import java.util.concurrent.ExecutorService;
76import java.util.concurrent.Executors;
77import java.util.concurrent.Future;
78import java.util.concurrent.TimeUnit;
79import java.util.concurrent.TimeoutException;
80
81import static org.slf4j.LoggerFactory.getLogger;
82
83/**
84 * Application to set up demos.
85 */
86@Component(immediate = true)
87@Service
88public class DemoInstaller implements DemoAPI {
89
90 private final Logger log = getLogger(getClass());
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 protected CoreService coreService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected IntentService intentService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected HostService hostService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected MastershipService mastershipService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected ClusterService clusterService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected DeviceService deviceService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected FlowRuleService flowService;
112
113 private ExecutorService worker;
114
115 private ExecutorService installWorker;
116
117 private ApplicationId appId;
118
119 private final Set<Intent> existingIntents = new HashSet<>();
120 private RandomInstaller randomInstaller;
121
122 private ObjectMapper mapper = new ObjectMapper();
123
124
125
126 @Activate
127 public void activate() {
128 String nodeId = clusterService.getLocalNode().ip().toString();
129 appId = coreService.registerApplication("org.onosproject.demo.installer."
130 + nodeId);
131 worker = Executors.newFixedThreadPool(1,
132 new ThreadFactoryBuilder()
133 .setNameFormat("demo-app-worker")
134 .build());
135 log.info("Started with Application ID {}", appId.id());
136 }
137
138 @Deactivate
139 public void deactivate() {
140 shutdownAndAwaitTermination(worker);
141 if (installWorker != null && !installWorker.isShutdown()) {
142 shutdownAndAwaitTermination(installWorker);
143 }
144 log.info("Stopped");
145 }
146
147 @Override
148 public JsonNode flowTest(Optional<JsonNode> params) {
149 int flowsPerDevice = 1000;
150 int neighbours = 0;
151 boolean remove = true;
152 if (params.isPresent()) {
153 flowsPerDevice = params.get().get("flowsPerDevice").asInt();
154 neighbours = params.get().get("neighbours").asInt();
155 remove = params.get().get("remove").asBoolean();
156 }
157
158 Future<JsonNode> future = worker.submit(new FlowTest(flowsPerDevice, neighbours, remove));
159
160 try {
161 return future.get(10, TimeUnit.SECONDS);
162 } catch (InterruptedException | ExecutionException | TimeoutException e) {
163 ObjectNode node = mapper.createObjectNode();
164 node.put("Error", e.getMessage());
165 return node;
166 }
167 }
168
169 @Override
170 public void setup(InstallType type, Optional<JsonNode> runParams) {
171 switch (type) {
172 case MESH:
173 log.debug("Installing mesh intents");
174 worker.execute(new MeshInstaller());
175 break;
176 case RANDOM:
177 //check that we do not have a random installer running
178 if (installWorker == null || installWorker.isShutdown()) {
179 installWorker = Executors.newFixedThreadPool(1,
180 new ThreadFactoryBuilder()
181 .setNameFormat("random-worker")
182 .build());
183 log.debug("Installing random sequence of intents");
184 randomInstaller = new RandomInstaller(runParams);
185 installWorker.execute(randomInstaller);
186 } else {
187 log.warn("Random installer is already running");
188 }
189 break;
190 default:
191 throw new IllegalArgumentException("What is it you want exactly?");
192 }
193 }
194
195 @Override
196 public void tearDown() {
197 worker.submit(new UnInstaller());
198 }
199
200
201 /**
202 * Simply installs a mesh of intents from all the hosts existing in the network.
203 */
204 private class MeshInstaller implements Runnable {
205
206 @Override
207 public void run() {
208 TrafficSelector selector = DefaultTrafficSelector.emptySelector();
209 TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
210 List<Constraint> constraint = Lists.newArrayList();
211 List<Host> hosts = Lists.newArrayList(hostService.getHosts());
212 while (!hosts.isEmpty()) {
213 Host src = hosts.remove(0);
214 for (Host dst : hosts) {
215 HostToHostIntent intent = HostToHostIntent.builder()
216 .appId(appId)
217 .one(src.id())
218 .two(dst.id())
219 .selector(selector)
220 .treatment(treatment)
221 .constraints(constraint)
222 .build();
223 existingIntents.add(intent);
224 intentService.submit(intent);
225 }
226 }
227 }
228 }
229
230 /**
231 * Randomly installs and withdraws intents.
232 */
233 private class RandomInstaller implements Runnable {
234
235 private final boolean isLocal;
236 private final Set<Host> hosts;
237
238 private final Random random = new Random(System.currentTimeMillis());
239
240 private Set<HostPair> uninstalledOrWithdrawn;
241 private Set<HostPair> installed;
242
243 private CountDownLatch latch;
244
245 //used to wait on a batch to be processed.
246 private static final int ITERATIONMAX = 50000000;
247
248
249 public RandomInstaller(Optional<JsonNode> runParams) {
250 /*
251 Check if we have params and honour them. Otherwise
252 set defaults to processing only local stuff and
253 all local hosts.
254 */
255 if (runParams.isPresent()) {
256 JsonNode node = runParams.get();
257 isLocal = node.get("local").asBoolean();
258 hosts = node.get("hosts") == null ? Sets.newHashSet(hostService.getHosts()) :
259 constructHostIds(node.get("hosts").elements());
260 } else {
261 isLocal = true;
262 hosts = Sets.newHashSet(hostService.getHosts());
263 }
264
265 //construct list of intents.
266 installed = Sets.newHashSet();
267 if (isLocal) {
268 uninstalledOrWithdrawn = buildPairs(pruneHostsByMasterShip());
269 } else {
270 uninstalledOrWithdrawn = buildPairs(hosts);
271 }
272
273 }
274
275 private Set<Host> constructHostIds(Iterator<JsonNode> elements) {
276 Set<Host> hostIds = Sets.newHashSet();
277 JsonNode n;
278 while (elements.hasNext()) {
279 n = elements.next();
280 hostIds.add(hostService.getHost(HostId.hostId(n.textValue())));
281 }
282 return hostIds;
283 }
284
285 @Override
286 public void run() {
287 if (!installWorker.isShutdown()) {
288 randomize();
289 latch = new CountDownLatch(1);
290 try {
291 trackIntents();
292 } catch (InterruptedException e) {
293 shutdown();
294 }
295 }
296
297 }
298
299
300 /**
301 * Check whether the previously submitted batch is in progress
302 * and if yes submit the next one. If things hang, wait for at
303 * most 5 seconds and bail.
304 * @throws InterruptedException if the thread go interupted
305 */
306 private void trackIntents() throws InterruptedException {
307 //FIXME
308 // TODO generate keys for each set of intents to allow manager to throttle
309 // TODO may also look into the store to see how many operations are pending
310
311 //if everything is good proceed.
312 if (!installWorker.isShutdown()) {
313 installWorker.execute(this);
314 }
315
316 }
317
318 public void shutdown() {
319 log.warn("Shutting down random installer!");
320 cleanUp();
321 }
322
323
324 /**
325 * Shuffle the uninstalled and installed list (separately) and select
326 * a random number of them and install or uninstall them respectively.
327 */
328 private void randomize() {
329 List<HostPair> hostList = new LinkedList<>(uninstalledOrWithdrawn);
330 Collections.shuffle(hostList);
331 List<HostPair> toInstall = hostList.subList(0,
332 random.nextInt(hostList.size() - 1));
333 List<HostPair> toRemove;
334 if (!installed.isEmpty()) {
335 hostList = new LinkedList<>(installed);
336 Collections.shuffle(hostList);
337 toRemove = hostList.subList(0,
338 random.nextInt(hostList.size() - 1));
339 uninstallIntents(toRemove);
340 }
341 installIntents(toInstall);
342
343 }
344
345 private void installIntents(List<HostPair> toInstall) {
346 for (HostPair pair : toInstall) {
347 installed.add(pair);
348 uninstalledOrWithdrawn.remove(pair);
349 intentService.submit(pair.h2hIntent());
350 }
351 }
352
353 private void uninstallIntents(Collection<HostPair> toRemove) {
354 for (HostPair pair : toRemove) {
355 installed.remove(pair);
356 uninstalledOrWithdrawn.add(pair);
357 intentService.withdraw(pair.h2hIntent());
358 }
359 }
360
361 /**
362 * Take everything and remove it all.
363 */
364 private void cleanUp() {
365 List<HostPair> allPairs = Lists.newArrayList(installed);
366 allPairs.addAll(uninstalledOrWithdrawn);
367 for (HostPair pair : allPairs) {
368 intentService.withdraw(pair.h2hIntent());
369 }
370 }
371
372
373 private Set<HostPair> buildPairs(Set<Host> hosts) {
374 Set<HostPair> pairs = Sets.newHashSet();
375 Iterator<Host> it = Sets.newHashSet(hosts).iterator();
376 while (it.hasNext()) {
377 Host src = it.next();
378 it.remove();
379 for (Host dst : hosts) {
380 pairs.add(new HostPair(src, dst));
381 }
382 }
383 return pairs;
384 }
385
386 private Set<Host> pruneHostsByMasterShip() {
387 return FluentIterable.from(hosts)
388 .filter(hasLocalMaster())
389 .toSet();
390
391 }
392
393 private Predicate<? super Host> hasLocalMaster() {
394 return new Predicate<Host>() {
395 @Override
396 public boolean apply(Host host) {
397 return mastershipService.getLocalRole(
398 host.location().deviceId()).equals(MastershipRole.MASTER);
399 }
400 };
401 }
402
403
404 /**
405 * Simple class representing a pair of hosts and precomputes the associated
406 * h2h intent.
407 */
408 private class HostPair {
409
410 private final Host src;
411 private final Host dst;
412
413 private final TrafficSelector selector = DefaultTrafficSelector.emptySelector();
414 private final TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
415 private final List<Constraint> constraint = Lists.newArrayList();
416 private final HostToHostIntent intent;
417
418 public HostPair(Host src, Host dst) {
419 this.src = src;
420 this.dst = dst;
421 this.intent = HostToHostIntent.builder()
422 .appId(appId)
423 .one(src.id())
424 .two(dst.id())
425 .selector(selector)
426 .treatment(treatment)
427 .constraints(constraint)
428 .build();
429 }
430
431 public HostToHostIntent h2hIntent() {
432 return intent;
433 }
434
435 @Override
436 public boolean equals(Object o) {
437 if (this == o) {
438 return true;
439 }
440 if (o == null || getClass() != o.getClass()) {
441 return false;
442 }
443
444 HostPair hostPair = (HostPair) o;
445
446 return Objects.equals(src, hostPair.src) &&
447 Objects.equals(dst, hostPair.dst);
448
449 }
450
451 @Override
452 public int hashCode() {
453 return Objects.hash(src, dst);
454 }
455
456
457 }
458
459 }
460
461 /**
462 * Remove anything that is running and clear it all out.
463 */
464 private class UnInstaller implements Runnable {
465 @Override
466 public void run() {
467 if (!existingIntents.isEmpty()) {
468 clearExistingIntents();
469 }
470
471 if (installWorker != null && !installWorker.isShutdown()) {
472 shutdownAndAwaitTermination(installWorker);
473 randomInstaller.shutdown();
474 }
475 }
476
477 private void clearExistingIntents() {
478 for (Intent i : existingIntents) {
479 intentService.withdraw(i);
480 }
481 existingIntents.clear();
482 }
483 }
484
485 /**
486 * Shutdown a pool cleanly if possible.
487 *
488 * @param pool an executorService
489 */
490 private void shutdownAndAwaitTermination(ExecutorService pool) {
491 pool.shutdown(); // Disable new tasks from being submitted
492 try {
493 // Wait a while for existing tasks to terminate
494 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
495 pool.shutdownNow(); // Cancel currently executing tasks
496 // Wait a while for tasks to respond to being cancelled
497 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
498 log.error("Pool did not terminate");
499 }
500 }
501 } catch (Exception ie) {
502 // (Re-)Cancel if current thread also interrupted
503 pool.shutdownNow();
504 // Preserve interrupt status
505 Thread.currentThread().interrupt();
506 }
507 }
508
509 private class FlowTest implements Callable<JsonNode> {
510 private final int flowPerDevice;
511 private final int neighbours;
512 private final boolean remove;
513 private FlowRuleOperations.Builder adds;
514 private FlowRuleOperations.Builder removes;
515
516 public FlowTest(int flowsPerDevice, int neighbours, boolean remove) {
517 this.flowPerDevice = flowsPerDevice;
518 this.neighbours = neighbours;
519 this.remove = remove;
520 prepareInstallation();
521 }
522
523 private void prepareInstallation() {
524 Set<ControllerNode> instances = Sets.newHashSet(clusterService.getNodes());
525 instances.remove(clusterService.getLocalNode());
526 Set<NodeId> acceptableNodes = Sets.newHashSet();
527 if (neighbours >= instances.size()) {
528 instances.forEach(instance -> acceptableNodes.add(instance.id()));
529 } else {
530 Iterator<ControllerNode> nodes = instances.iterator();
531 for (int i = neighbours; i > 0; i--) {
532 acceptableNodes.add(nodes.next().id());
533 }
534 }
535 acceptableNodes.add(clusterService.getLocalNode().id());
536
537 Set<Device> devices = Sets.newHashSet();
538 for (Device dev : deviceService.getDevices()) {
539 if (acceptableNodes.contains(
540 mastershipService.getMasterFor(dev.id()))) {
541 devices.add(dev);
542 }
543 }
544
545 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
546 .setOutput(PortNumber.portNumber(RandomUtils.nextInt())).build();
547 TrafficSelector.Builder sbuilder;
548 FlowRuleOperations.Builder rules = FlowRuleOperations.builder();
549 FlowRuleOperations.Builder remove = FlowRuleOperations.builder();
550
551 for (Device d : devices) {
552 for (int i = 0; i < this.flowPerDevice; i++) {
553 sbuilder = DefaultTrafficSelector.builder();
554
555 sbuilder.matchEthSrc(MacAddress.valueOf(RandomUtils.nextInt() * i))
556 .matchEthDst(MacAddress.valueOf((Integer.MAX_VALUE - i) * RandomUtils.nextInt()));
557
558
559 int randomPriority = RandomUtils.nextInt();
560 DefaultFlowRule f = new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
561 randomPriority, appId, 10, false);
562 rules.add(f);
563 remove.remove(f);
564
565 }
566 }
567
568 this.adds = rules;
569 this.removes = remove;
570 }
571
572 @Override
573 public JsonNode call() throws Exception {
574 ObjectNode node = mapper.createObjectNode();
575 CountDownLatch latch = new CountDownLatch(1);
576 flowService.apply(adds.build(new FlowRuleOperationsContext() {
577
578 private final Stopwatch timer = Stopwatch.createStarted();
579
580 @Override
581 public void onSuccess(FlowRuleOperations ops) {
582
583 long elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
584 node.put("elapsed", elapsed);
585
586
587 latch.countDown();
588 }
589 }));
590
591 latch.await(10, TimeUnit.SECONDS);
592 if (this.remove) {
593 flowService.apply(removes.build());
594 }
595 return node;
596 }
597 }
598}
599
600