blob: 8323229105887f07cb43c2e630d40e46e0be046b [file] [log] [blame]
alshabibfd23d312014-11-11 18:14:47 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.demo;
alshabibfd23d312014-11-11 18:14:47 -080017
alshabib486349d2014-11-25 18:09:25 -050018import com.fasterxml.jackson.databind.JsonNode;
alshabib3a0e9f52015-02-08 14:51:16 -080019import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ObjectNode;
alshabib486349d2014-11-25 18:09:25 -050021import com.google.common.base.Predicate;
alshabib3a0e9f52015-02-08 14:51:16 -080022import com.google.common.base.Stopwatch;
alshabib486349d2014-11-25 18:09:25 -050023import com.google.common.collect.FluentIterable;
alshabibfd23d312014-11-11 18:14:47 -080024import com.google.common.collect.Lists;
alshabib486349d2014-11-25 18:09:25 -050025import com.google.common.collect.Sets;
alshabibfd23d312014-11-11 18:14:47 -080026import com.google.common.util.concurrent.ThreadFactoryBuilder;
alshabib3a0e9f52015-02-08 14:51:16 -080027import org.apache.commons.lang.math.RandomUtils;
alshabibfd23d312014-11-11 18:14:47 -080028import org.apache.felix.scr.annotations.Activate;
29import org.apache.felix.scr.annotations.Component;
30import org.apache.felix.scr.annotations.Deactivate;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
alshabib3a0e9f52015-02-08 14:51:16 -080034import org.onlab.packet.MacAddress;
Brian O'Connorabafb502014-12-02 22:26:20 -080035import org.onosproject.cluster.ClusterService;
alshabib3a0e9f52015-02-08 14:51:16 -080036import org.onosproject.cluster.ControllerNode;
37import org.onosproject.cluster.NodeId;
Brian O'Connorabafb502014-12-02 22:26:20 -080038import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
40import org.onosproject.mastership.MastershipService;
alshabib3a0e9f52015-02-08 14:51:16 -080041import org.onosproject.net.Device;
Brian O'Connorabafb502014-12-02 22:26:20 -080042import org.onosproject.net.Host;
43import org.onosproject.net.HostId;
44import org.onosproject.net.MastershipRole;
alshabib3a0e9f52015-02-08 14:51:16 -080045import org.onosproject.net.PortNumber;
46import org.onosproject.net.device.DeviceService;
47import org.onosproject.net.flow.DefaultFlowRule;
Brian O'Connorabafb502014-12-02 22:26:20 -080048import org.onosproject.net.flow.DefaultTrafficSelector;
49import org.onosproject.net.flow.DefaultTrafficTreatment;
alshabib3a0e9f52015-02-08 14:51:16 -080050import org.onosproject.net.flow.FlowRuleOperations;
51import org.onosproject.net.flow.FlowRuleOperationsContext;
52import org.onosproject.net.flow.FlowRuleService;
Brian O'Connorabafb502014-12-02 22:26:20 -080053import org.onosproject.net.flow.TrafficSelector;
54import org.onosproject.net.flow.TrafficTreatment;
55import org.onosproject.net.host.HostService;
56import org.onosproject.net.intent.Constraint;
57import org.onosproject.net.intent.HostToHostIntent;
58import org.onosproject.net.intent.Intent;
Brian O'Connorabafb502014-12-02 22:26:20 -080059import org.onosproject.net.intent.IntentService;
alshabibfd23d312014-11-11 18:14:47 -080060import org.slf4j.Logger;
61
alshabib486349d2014-11-25 18:09:25 -050062import java.util.Collection;
63import java.util.Collections;
alshabibfd23d312014-11-11 18:14:47 -080064import java.util.HashSet;
alshabib486349d2014-11-25 18:09:25 -050065import java.util.Iterator;
66import java.util.LinkedList;
alshabibfd23d312014-11-11 18:14:47 -080067import java.util.List;
alshabib486349d2014-11-25 18:09:25 -050068import java.util.Objects;
69import java.util.Optional;
70import java.util.Random;
alshabibfd23d312014-11-11 18:14:47 -080071import java.util.Set;
alshabib3a0e9f52015-02-08 14:51:16 -080072import java.util.concurrent.Callable;
alshabib486349d2014-11-25 18:09:25 -050073import java.util.concurrent.CountDownLatch;
alshabib3a0e9f52015-02-08 14:51:16 -080074import java.util.concurrent.ExecutionException;
alshabibfd23d312014-11-11 18:14:47 -080075import java.util.concurrent.ExecutorService;
76import java.util.concurrent.Executors;
alshabib3a0e9f52015-02-08 14:51:16 -080077import java.util.concurrent.Future;
alshabib486349d2014-11-25 18:09:25 -050078import java.util.concurrent.TimeUnit;
alshabib3a0e9f52015-02-08 14:51:16 -080079import java.util.concurrent.TimeoutException;
alshabibfd23d312014-11-11 18:14:47 -080080
81import static org.slf4j.LoggerFactory.getLogger;
82
83/**
84 * Application to set up demos.
85 */
86@Component(immediate = true)
87@Service
88public class DemoInstaller implements DemoAPI {
89
90 private final Logger log = getLogger(getClass());
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 protected CoreService coreService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected IntentService intentService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected HostService hostService;
100
alshabib486349d2014-11-25 18:09:25 -0500101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected MastershipService mastershipService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
alshabib486349d2014-11-25 18:09:25 -0500105 protected ClusterService clusterService;
106
alshabib3a0e9f52015-02-08 14:51:16 -0800107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected DeviceService deviceService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected FlowRuleService flowService;
112
alshabibfd23d312014-11-11 18:14:47 -0800113 private ExecutorService worker;
114
alshabib3a0e9f52015-02-08 14:51:16 -0800115 private ExecutorService installWorker;
alshabib486349d2014-11-25 18:09:25 -0500116
alshabibfd23d312014-11-11 18:14:47 -0800117 private ApplicationId appId;
118
119 private final Set<Intent> existingIntents = new HashSet<>();
alshabib486349d2014-11-25 18:09:25 -0500120 private RandomInstaller randomInstaller;
alshabibfd23d312014-11-11 18:14:47 -0800121
alshabib3a0e9f52015-02-08 14:51:16 -0800122 private ObjectMapper mapper = new ObjectMapper();
123
alshabibfd23d312014-11-11 18:14:47 -0800124
125
126 @Activate
127 public void activate() {
alshabib486349d2014-11-25 18:09:25 -0500128 String nodeId = clusterService.getLocalNode().ip().toString();
Brian O'Connorabafb502014-12-02 22:26:20 -0800129 appId = coreService.registerApplication("org.onosproject.demo.installer."
alshabib486349d2014-11-25 18:09:25 -0500130 + nodeId);
alshabibfd23d312014-11-11 18:14:47 -0800131 worker = Executors.newFixedThreadPool(1,
132 new ThreadFactoryBuilder()
133 .setNameFormat("demo-app-worker")
134 .build());
135 log.info("Started with Application ID {}", appId.id());
136 }
137
138 @Deactivate
139 public void deactivate() {
alshabib486349d2014-11-25 18:09:25 -0500140 shutdownAndAwaitTermination(worker);
alshabib3a0e9f52015-02-08 14:51:16 -0800141 if (installWorker != null && !installWorker.isShutdown()) {
142 shutdownAndAwaitTermination(installWorker);
alshabib486349d2014-11-25 18:09:25 -0500143 }
alshabibfd23d312014-11-11 18:14:47 -0800144 log.info("Stopped");
145 }
146
147 @Override
alshabib3a0e9f52015-02-08 14:51:16 -0800148 public JsonNode flowTest(Optional<JsonNode> params) {
149 int flowsPerDevice = 1000;
150 int neighbours = 0;
151 if (params.isPresent()) {
152 flowsPerDevice = params.get().get("flowsPerDevice").asInt();
153 neighbours = params.get().get("neighbours").asInt();
154 }
155
156 Future<JsonNode> future = worker.submit(new FlowTest(flowsPerDevice, neighbours));
157
158 try {
159 return future.get(10, TimeUnit.SECONDS);
160 } catch (InterruptedException | ExecutionException | TimeoutException e) {
161 ObjectNode node = mapper.createObjectNode();
162 node.put("Error", e.getMessage());
163 return node;
164 }
165 }
166
167 @Override
alshabib486349d2014-11-25 18:09:25 -0500168 public void setup(InstallType type, Optional<JsonNode> runParams) {
alshabibfd23d312014-11-11 18:14:47 -0800169 switch (type) {
170 case MESH:
171 log.debug("Installing mesh intents");
172 worker.execute(new MeshInstaller());
173 break;
174 case RANDOM:
alshabib486349d2014-11-25 18:09:25 -0500175 //check that we do not have a random installer running
alshabib3a0e9f52015-02-08 14:51:16 -0800176 if (installWorker == null || installWorker.isShutdown()) {
177 installWorker = Executors.newFixedThreadPool(1,
alshabib486349d2014-11-25 18:09:25 -0500178 new ThreadFactoryBuilder()
179 .setNameFormat("random-worker")
180 .build());
181 log.debug("Installing random sequence of intents");
182 randomInstaller = new RandomInstaller(runParams);
alshabib3a0e9f52015-02-08 14:51:16 -0800183 installWorker.execute(randomInstaller);
alshabib486349d2014-11-25 18:09:25 -0500184 } else {
185 log.warn("Random installer is already running");
186 }
187 break;
alshabibfd23d312014-11-11 18:14:47 -0800188 default:
189 throw new IllegalArgumentException("What is it you want exactly?");
190 }
191 }
192
193 @Override
194 public void tearDown() {
195 worker.submit(new UnInstaller());
196 }
197
198
alshabib486349d2014-11-25 18:09:25 -0500199 /**
200 * Simply installs a mesh of intents from all the hosts existing in the network.
201 */
alshabibfd23d312014-11-11 18:14:47 -0800202 private class MeshInstaller implements Runnable {
203
204 @Override
205 public void run() {
206 TrafficSelector selector = DefaultTrafficSelector.builder().build();
207 TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
alshabib19678cc2014-11-12 11:06:08 -0800208 List<Constraint> constraint = Lists.newArrayList();
alshabibfd23d312014-11-11 18:14:47 -0800209 List<Host> hosts = Lists.newArrayList(hostService.getHosts());
210 while (!hosts.isEmpty()) {
211 Host src = hosts.remove(0);
212 for (Host dst : hosts) {
213 HostToHostIntent intent = new HostToHostIntent(appId, src.id(), dst.id(),
214 selector, treatment,
alshabib19678cc2014-11-12 11:06:08 -0800215 constraint);
alshabibfd23d312014-11-11 18:14:47 -0800216 existingIntents.add(intent);
217 intentService.submit(intent);
218 }
219 }
220 }
221 }
222
alshabib486349d2014-11-25 18:09:25 -0500223 /**
224 * Randomly installs and withdraws intents.
225 */
226 private class RandomInstaller implements Runnable {
alshabibfd23d312014-11-11 18:14:47 -0800227
alshabib486349d2014-11-25 18:09:25 -0500228 private final boolean isLocal;
229 private final Set<Host> hosts;
230
231 private final Random random = new Random(System.currentTimeMillis());
232
233 private Set<HostPair> uninstalledOrWithdrawn;
234 private Set<HostPair> installed;
235
236 private CountDownLatch latch;
237
238 //used to wait on a batch to be processed.
239 private static final int ITERATIONMAX = 50000000;
240
241
242 public RandomInstaller(Optional<JsonNode> runParams) {
243 /*
244 Check if we have params and honour them. Otherwise
245 set defaults to processing only local stuff and
246 all local hosts.
247 */
248 if (runParams.isPresent()) {
249 JsonNode node = runParams.get();
250 isLocal = node.get("local").asBoolean();
251 hosts = node.get("hosts") == null ? Sets.newHashSet(hostService.getHosts()) :
252 constructHostIds(node.get("hosts").elements());
253 } else {
254 isLocal = true;
255 hosts = Sets.newHashSet(hostService.getHosts());
256 }
257
258 //construct list of intents.
259 installed = Sets.newHashSet();
260 if (isLocal) {
261 uninstalledOrWithdrawn = buildPairs(pruneHostsByMasterShip());
262 } else {
263 uninstalledOrWithdrawn = buildPairs(hosts);
264 }
265
266 }
267
268 private Set<Host> constructHostIds(Iterator<JsonNode> elements) {
269 Set<Host> hostIds = Sets.newHashSet();
270 JsonNode n;
271 while (elements.hasNext()) {
272 n = elements.next();
273 hostIds.add(hostService.getHost(HostId.hostId(n.textValue())));
274 }
275 return hostIds;
276 }
277
278 @Override
279 public void run() {
alshabib3a0e9f52015-02-08 14:51:16 -0800280 if (!installWorker.isShutdown()) {
alshabib486349d2014-11-25 18:09:25 -0500281 randomize();
282 latch = new CountDownLatch(1);
283 try {
284 trackIntents();
285 } catch (InterruptedException e) {
286 shutdown();
287 }
288 }
289
290 }
291
292
293 /**
294 * Check whether the previously submitted batch is in progress
295 * and if yes submit the next one. If things hang, wait for at
296 * most 5 seconds and bail.
297 * @throws InterruptedException if the thread go interupted
298 */
299 private void trackIntents() throws InterruptedException {
Sho SHIMIZU4931ee52015-02-03 21:09:28 -0800300 //FIXME
301 // TODO generate keys for each set of intents to allow manager to throttle
302 // TODO may also look into the store to see how many operations are pending
303
Brian O'Connor03406a42015-02-03 17:28:57 -0800304 //if everything is good proceed.
alshabib3a0e9f52015-02-08 14:51:16 -0800305 if (!installWorker.isShutdown()) {
306 installWorker.execute(this);
alshabib486349d2014-11-25 18:09:25 -0500307 }
308
309 }
310
311 public void shutdown() {
312 log.warn("Shutting down random installer!");
313 cleanUp();
314 }
315
316
317 /**
318 * Shuffle the uninstalled and installed list (separately) and select
319 * a random number of them and install or uninstall them respectively.
320 */
321 private void randomize() {
322 List<HostPair> hostList = new LinkedList<>(uninstalledOrWithdrawn);
323 Collections.shuffle(hostList);
324 List<HostPair> toInstall = hostList.subList(0,
325 random.nextInt(hostList.size() - 1));
326 List<HostPair> toRemove;
327 if (!installed.isEmpty()) {
328 hostList = new LinkedList<>(installed);
329 Collections.shuffle(hostList);
330 toRemove = hostList.subList(0,
331 random.nextInt(hostList.size() - 1));
332 uninstallIntents(toRemove);
333 }
334 installIntents(toInstall);
335
336 }
337
338 private void installIntents(List<HostPair> toInstall) {
alshabib486349d2014-11-25 18:09:25 -0500339 for (HostPair pair : toInstall) {
340 installed.add(pair);
341 uninstalledOrWithdrawn.remove(pair);
Brian O'Connor03406a42015-02-03 17:28:57 -0800342 intentService.submit(pair.h2hIntent());
alshabib486349d2014-11-25 18:09:25 -0500343 }
alshabib486349d2014-11-25 18:09:25 -0500344 }
345
346 private void uninstallIntents(Collection<HostPair> toRemove) {
alshabib486349d2014-11-25 18:09:25 -0500347 for (HostPair pair : toRemove) {
348 installed.remove(pair);
349 uninstalledOrWithdrawn.add(pair);
Brian O'Connor03406a42015-02-03 17:28:57 -0800350 intentService.withdraw(pair.h2hIntent());
alshabib486349d2014-11-25 18:09:25 -0500351 }
alshabib486349d2014-11-25 18:09:25 -0500352 }
353
354 /**
355 * Take everything and remove it all.
356 */
357 private void cleanUp() {
358 List<HostPair> allPairs = Lists.newArrayList(installed);
359 allPairs.addAll(uninstalledOrWithdrawn);
alshabib486349d2014-11-25 18:09:25 -0500360 for (HostPair pair : allPairs) {
Brian O'Connor03406a42015-02-03 17:28:57 -0800361 intentService.withdraw(pair.h2hIntent());
alshabib486349d2014-11-25 18:09:25 -0500362 }
alshabib486349d2014-11-25 18:09:25 -0500363 }
364
365
366 private Set<HostPair> buildPairs(Set<Host> hosts) {
367 Set<HostPair> pairs = Sets.newHashSet();
368 Iterator<Host> it = Sets.newHashSet(hosts).iterator();
369 while (it.hasNext()) {
370 Host src = it.next();
371 it.remove();
372 for (Host dst : hosts) {
373 pairs.add(new HostPair(src, dst));
374 }
375 }
376 return pairs;
377 }
378
379 private Set<Host> pruneHostsByMasterShip() {
380 return FluentIterable.from(hosts)
381 .filter(hasLocalMaster())
382 .toSet();
383
384 }
385
386 private Predicate<? super Host> hasLocalMaster() {
387 return new Predicate<Host>() {
388 @Override
389 public boolean apply(Host host) {
390 return mastershipService.getLocalRole(
391 host.location().deviceId()).equals(MastershipRole.MASTER);
392 }
393 };
394 }
395
396
397 /**
398 * Simple class representing a pair of hosts and precomputes the associated
399 * h2h intent.
400 */
401 private class HostPair {
402
403 private final Host src;
404 private final Host dst;
405
406 private final TrafficSelector selector = DefaultTrafficSelector.builder().build();
407 private final TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
408 private final List<Constraint> constraint = Lists.newArrayList();
409 private final HostToHostIntent intent;
410
411 public HostPair(Host src, Host dst) {
412 this.src = src;
413 this.dst = dst;
414 this.intent = new HostToHostIntent(appId, src.id(), dst.id(),
415 selector, treatment, constraint);
416 }
417
418 public HostToHostIntent h2hIntent() {
419 return intent;
420 }
421
422 @Override
423 public boolean equals(Object o) {
424 if (this == o) {
425 return true;
426 }
427 if (o == null || getClass() != o.getClass()) {
428 return false;
429 }
430
431 HostPair hostPair = (HostPair) o;
432
433 return Objects.equals(src, hostPair.src) &&
434 Objects.equals(dst, hostPair.dst);
435
436 }
437
438 @Override
439 public int hashCode() {
440 return Objects.hash(src, dst);
441 }
442
443
444 }
445
446 }
447
448 /**
449 * Remove anything that is running and clear it all out.
450 */
alshabibfd23d312014-11-11 18:14:47 -0800451 private class UnInstaller implements Runnable {
452 @Override
453 public void run() {
alshabib486349d2014-11-25 18:09:25 -0500454 if (!existingIntents.isEmpty()) {
455 clearExistingIntents();
456 }
457
alshabib3a0e9f52015-02-08 14:51:16 -0800458 if (installWorker != null && !installWorker.isShutdown()) {
459 shutdownAndAwaitTermination(installWorker);
alshabib486349d2014-11-25 18:09:25 -0500460 randomInstaller.shutdown();
461 }
462 }
463
464 private void clearExistingIntents() {
alshabibfd23d312014-11-11 18:14:47 -0800465 for (Intent i : existingIntents) {
466 intentService.withdraw(i);
467 }
alshabib486349d2014-11-25 18:09:25 -0500468 existingIntents.clear();
alshabibfd23d312014-11-11 18:14:47 -0800469 }
470 }
alshabib486349d2014-11-25 18:09:25 -0500471
472 /**
473 * Shutdown a pool cleanly if possible.
474 *
475 * @param pool an executorService
476 */
477 private void shutdownAndAwaitTermination(ExecutorService pool) {
478 pool.shutdown(); // Disable new tasks from being submitted
479 try {
480 // Wait a while for existing tasks to terminate
481 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
482 pool.shutdownNow(); // Cancel currently executing tasks
483 // Wait a while for tasks to respond to being cancelled
484 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
485 log.error("Pool did not terminate");
486 }
487 }
488 } catch (Exception ie) {
489 // (Re-)Cancel if current thread also interrupted
490 pool.shutdownNow();
491 // Preserve interrupt status
492 Thread.currentThread().interrupt();
493 }
494 }
495
alshabib3a0e9f52015-02-08 14:51:16 -0800496 private class FlowTest implements Callable<JsonNode> {
497 private final int flowPerDevice;
498 private final int neighbours;
499 private FlowRuleOperations.Builder adds;
500 private FlowRuleOperations.Builder removes;
501
502 public FlowTest(int flowsPerDevice, int neighbours) {
503 this.flowPerDevice = flowsPerDevice;
504 this.neighbours = neighbours;
505 prepareInstallation();
506 }
507
508 private void prepareInstallation() {
509 Set<ControllerNode> instances = Sets.newHashSet(clusterService.getNodes());
510 instances.remove(clusterService.getLocalNode());
511 Set<NodeId> acceptableNodes = Sets.newHashSet();
512 if (neighbours >= instances.size()) {
513 instances.forEach(instance -> acceptableNodes.add(instance.id()));
514 } else {
515 Iterator<ControllerNode> nodes = instances.iterator();
516 for (int i = neighbours; i > 0; i--) {
517 acceptableNodes.add(nodes.next().id());
518 }
519 }
520 acceptableNodes.add(clusterService.getLocalNode().id());
521
522 Set<Device> devices = Sets.newHashSet();
523 for (Device dev : deviceService.getDevices()) {
524 if (acceptableNodes.contains(
525 mastershipService.getMasterFor(dev.id()))) {
526 devices.add(dev);
527 }
528 }
529
530 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
531 .setOutput(PortNumber.portNumber(RandomUtils.nextInt())).build();
532 TrafficSelector.Builder sbuilder;
533 FlowRuleOperations.Builder rules = FlowRuleOperations.builder();
534 FlowRuleOperations.Builder remove = FlowRuleOperations.builder();
535
536 for (Device d : devices) {
537 for (int i = 0; i < this.flowPerDevice; i++) {
538 sbuilder = DefaultTrafficSelector.builder();
539
540 sbuilder.matchEthSrc(MacAddress.valueOf(RandomUtils.nextInt() * i))
541 .matchEthDst(MacAddress.valueOf((Integer.MAX_VALUE - i) * RandomUtils.nextInt()));
542
543
544 int randomPriority = RandomUtils.nextInt();
545 DefaultFlowRule f = new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
546 randomPriority, appId, 10, false);
547 rules.add(f);
548 remove.remove(f);
549
550 }
551 }
552
553 this.adds = rules;
554 this.removes = remove;
555 }
556
557 @Override
558 public JsonNode call() throws Exception {
559 ObjectNode node = mapper.createObjectNode();
560 CountDownLatch latch = new CountDownLatch(1);
561 flowService.apply(adds.build(new FlowRuleOperationsContext() {
562
563 private final Stopwatch timer = Stopwatch.createStarted();
564
565 @Override
566 public void onSuccess(FlowRuleOperations ops) {
567
568 long elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
569 node.put("elapsed", elapsed);
570
571
572 latch.countDown();
573 }
574 }));
575
576 latch.await(10, TimeUnit.SECONDS);
577 flowService.apply(removes.build());
578 return node;
579 }
580 }
alshabibfd23d312014-11-11 18:14:47 -0800581}
582
583