blob: 86d98376109da4ec44d67c88c80677d78eb0da33 [file] [log] [blame]
alshabibfd23d312014-11-11 18:14:47 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.demo;
alshabibfd23d312014-11-11 18:14:47 -080017
alshabib486349d2014-11-25 18:09:25 -050018import com.fasterxml.jackson.databind.JsonNode;
alshabib3a0e9f52015-02-08 14:51:16 -080019import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ObjectNode;
alshabib486349d2014-11-25 18:09:25 -050021import com.google.common.base.Predicate;
alshabib3a0e9f52015-02-08 14:51:16 -080022import com.google.common.base.Stopwatch;
alshabib486349d2014-11-25 18:09:25 -050023import com.google.common.collect.FluentIterable;
alshabibfd23d312014-11-11 18:14:47 -080024import com.google.common.collect.Lists;
alshabib486349d2014-11-25 18:09:25 -050025import com.google.common.collect.Sets;
alshabibfd23d312014-11-11 18:14:47 -080026import com.google.common.util.concurrent.ThreadFactoryBuilder;
alshabib3a0e9f52015-02-08 14:51:16 -080027import org.apache.commons.lang.math.RandomUtils;
alshabibfd23d312014-11-11 18:14:47 -080028import org.apache.felix.scr.annotations.Activate;
29import org.apache.felix.scr.annotations.Component;
30import org.apache.felix.scr.annotations.Deactivate;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
alshabib3a0e9f52015-02-08 14:51:16 -080034import org.onlab.packet.MacAddress;
Brian O'Connorabafb502014-12-02 22:26:20 -080035import org.onosproject.cluster.ClusterService;
alshabib3a0e9f52015-02-08 14:51:16 -080036import org.onosproject.cluster.ControllerNode;
37import org.onosproject.cluster.NodeId;
Brian O'Connorabafb502014-12-02 22:26:20 -080038import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
40import org.onosproject.mastership.MastershipService;
alshabib3a0e9f52015-02-08 14:51:16 -080041import org.onosproject.net.Device;
Brian O'Connorabafb502014-12-02 22:26:20 -080042import org.onosproject.net.Host;
43import org.onosproject.net.HostId;
44import org.onosproject.net.MastershipRole;
alshabib3a0e9f52015-02-08 14:51:16 -080045import org.onosproject.net.PortNumber;
46import org.onosproject.net.device.DeviceService;
47import org.onosproject.net.flow.DefaultFlowRule;
Brian O'Connorabafb502014-12-02 22:26:20 -080048import org.onosproject.net.flow.DefaultTrafficSelector;
49import org.onosproject.net.flow.DefaultTrafficTreatment;
alshabib3a0e9f52015-02-08 14:51:16 -080050import org.onosproject.net.flow.FlowRuleOperations;
51import org.onosproject.net.flow.FlowRuleOperationsContext;
52import org.onosproject.net.flow.FlowRuleService;
Brian O'Connorabafb502014-12-02 22:26:20 -080053import org.onosproject.net.flow.TrafficSelector;
54import org.onosproject.net.flow.TrafficTreatment;
55import org.onosproject.net.host.HostService;
56import org.onosproject.net.intent.Constraint;
57import org.onosproject.net.intent.HostToHostIntent;
58import org.onosproject.net.intent.Intent;
59import org.onosproject.net.intent.IntentBatchService;
Brian O'Connorabafb502014-12-02 22:26:20 -080060import org.onosproject.net.intent.IntentService;
alshabibfd23d312014-11-11 18:14:47 -080061import org.slf4j.Logger;
62
alshabib486349d2014-11-25 18:09:25 -050063import java.util.Collection;
64import java.util.Collections;
alshabibfd23d312014-11-11 18:14:47 -080065import java.util.HashSet;
alshabib486349d2014-11-25 18:09:25 -050066import java.util.Iterator;
67import java.util.LinkedList;
alshabibfd23d312014-11-11 18:14:47 -080068import java.util.List;
alshabib486349d2014-11-25 18:09:25 -050069import java.util.Objects;
70import java.util.Optional;
71import java.util.Random;
alshabibfd23d312014-11-11 18:14:47 -080072import java.util.Set;
alshabib3a0e9f52015-02-08 14:51:16 -080073import java.util.concurrent.Callable;
alshabib486349d2014-11-25 18:09:25 -050074import java.util.concurrent.CountDownLatch;
alshabib3a0e9f52015-02-08 14:51:16 -080075import java.util.concurrent.ExecutionException;
alshabibfd23d312014-11-11 18:14:47 -080076import java.util.concurrent.ExecutorService;
77import java.util.concurrent.Executors;
alshabib3a0e9f52015-02-08 14:51:16 -080078import java.util.concurrent.Future;
alshabib486349d2014-11-25 18:09:25 -050079import java.util.concurrent.TimeUnit;
alshabib3a0e9f52015-02-08 14:51:16 -080080import java.util.concurrent.TimeoutException;
alshabibfd23d312014-11-11 18:14:47 -080081
82import static org.slf4j.LoggerFactory.getLogger;
83
84/**
85 * Application to set up demos.
86 */
87@Component(immediate = true)
88@Service
89public class DemoInstaller implements DemoAPI {
90
91 private final Logger log = getLogger(getClass());
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected CoreService coreService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 protected IntentService intentService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected HostService hostService;
101
alshabib486349d2014-11-25 18:09:25 -0500102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected MastershipService mastershipService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected IntentBatchService intentBatchService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109 protected ClusterService clusterService;
110
alshabib3a0e9f52015-02-08 14:51:16 -0800111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112 protected DeviceService deviceService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
115 protected FlowRuleService flowService;
116
alshabibfd23d312014-11-11 18:14:47 -0800117 private ExecutorService worker;
118
alshabib3a0e9f52015-02-08 14:51:16 -0800119 private ExecutorService installWorker;
alshabib486349d2014-11-25 18:09:25 -0500120
alshabibfd23d312014-11-11 18:14:47 -0800121 private ApplicationId appId;
122
123 private final Set<Intent> existingIntents = new HashSet<>();
alshabib486349d2014-11-25 18:09:25 -0500124 private RandomInstaller randomInstaller;
alshabibfd23d312014-11-11 18:14:47 -0800125
alshabib3a0e9f52015-02-08 14:51:16 -0800126 private ObjectMapper mapper = new ObjectMapper();
127
alshabibfd23d312014-11-11 18:14:47 -0800128
129
130 @Activate
131 public void activate() {
alshabib486349d2014-11-25 18:09:25 -0500132 String nodeId = clusterService.getLocalNode().ip().toString();
Brian O'Connorabafb502014-12-02 22:26:20 -0800133 appId = coreService.registerApplication("org.onosproject.demo.installer."
alshabib486349d2014-11-25 18:09:25 -0500134 + nodeId);
alshabibfd23d312014-11-11 18:14:47 -0800135 worker = Executors.newFixedThreadPool(1,
136 new ThreadFactoryBuilder()
137 .setNameFormat("demo-app-worker")
138 .build());
139 log.info("Started with Application ID {}", appId.id());
140 }
141
142 @Deactivate
143 public void deactivate() {
alshabib486349d2014-11-25 18:09:25 -0500144 shutdownAndAwaitTermination(worker);
alshabib3a0e9f52015-02-08 14:51:16 -0800145 if (installWorker != null && !installWorker.isShutdown()) {
146 shutdownAndAwaitTermination(installWorker);
alshabib486349d2014-11-25 18:09:25 -0500147 }
alshabibfd23d312014-11-11 18:14:47 -0800148 log.info("Stopped");
149 }
150
151 @Override
alshabib3a0e9f52015-02-08 14:51:16 -0800152 public JsonNode flowTest(Optional<JsonNode> params) {
153 int flowsPerDevice = 1000;
154 int neighbours = 0;
155 if (params.isPresent()) {
156 flowsPerDevice = params.get().get("flowsPerDevice").asInt();
157 neighbours = params.get().get("neighbours").asInt();
158 }
159
160 Future<JsonNode> future = worker.submit(new FlowTest(flowsPerDevice, neighbours));
161
162 try {
163 return future.get(10, TimeUnit.SECONDS);
164 } catch (InterruptedException | ExecutionException | TimeoutException e) {
165 ObjectNode node = mapper.createObjectNode();
166 node.put("Error", e.getMessage());
167 return node;
168 }
169 }
170
171 @Override
alshabib486349d2014-11-25 18:09:25 -0500172 public void setup(InstallType type, Optional<JsonNode> runParams) {
alshabibfd23d312014-11-11 18:14:47 -0800173 switch (type) {
174 case MESH:
175 log.debug("Installing mesh intents");
176 worker.execute(new MeshInstaller());
177 break;
178 case RANDOM:
alshabib486349d2014-11-25 18:09:25 -0500179 //check that we do not have a random installer running
alshabib3a0e9f52015-02-08 14:51:16 -0800180 if (installWorker == null || installWorker.isShutdown()) {
181 installWorker = Executors.newFixedThreadPool(1,
alshabib486349d2014-11-25 18:09:25 -0500182 new ThreadFactoryBuilder()
183 .setNameFormat("random-worker")
184 .build());
185 log.debug("Installing random sequence of intents");
186 randomInstaller = new RandomInstaller(runParams);
alshabib3a0e9f52015-02-08 14:51:16 -0800187 installWorker.execute(randomInstaller);
alshabib486349d2014-11-25 18:09:25 -0500188 } else {
189 log.warn("Random installer is already running");
190 }
191 break;
alshabibfd23d312014-11-11 18:14:47 -0800192 default:
193 throw new IllegalArgumentException("What is it you want exactly?");
194 }
195 }
196
197 @Override
198 public void tearDown() {
199 worker.submit(new UnInstaller());
200 }
201
202
alshabib486349d2014-11-25 18:09:25 -0500203 /**
204 * Simply installs a mesh of intents from all the hosts existing in the network.
205 */
alshabibfd23d312014-11-11 18:14:47 -0800206 private class MeshInstaller implements Runnable {
207
208 @Override
209 public void run() {
210 TrafficSelector selector = DefaultTrafficSelector.builder().build();
211 TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
alshabib19678cc2014-11-12 11:06:08 -0800212 List<Constraint> constraint = Lists.newArrayList();
alshabibfd23d312014-11-11 18:14:47 -0800213 List<Host> hosts = Lists.newArrayList(hostService.getHosts());
214 while (!hosts.isEmpty()) {
215 Host src = hosts.remove(0);
216 for (Host dst : hosts) {
217 HostToHostIntent intent = new HostToHostIntent(appId, src.id(), dst.id(),
218 selector, treatment,
alshabib19678cc2014-11-12 11:06:08 -0800219 constraint);
alshabibfd23d312014-11-11 18:14:47 -0800220 existingIntents.add(intent);
221 intentService.submit(intent);
222 }
223 }
224 }
225 }
226
alshabib486349d2014-11-25 18:09:25 -0500227 /**
228 * Randomly installs and withdraws intents.
229 */
230 private class RandomInstaller implements Runnable {
alshabibfd23d312014-11-11 18:14:47 -0800231
alshabib486349d2014-11-25 18:09:25 -0500232 private final boolean isLocal;
233 private final Set<Host> hosts;
234
235 private final Random random = new Random(System.currentTimeMillis());
236
237 private Set<HostPair> uninstalledOrWithdrawn;
238 private Set<HostPair> installed;
239
240 private CountDownLatch latch;
241
242 //used to wait on a batch to be processed.
243 private static final int ITERATIONMAX = 50000000;
244
245
246 public RandomInstaller(Optional<JsonNode> runParams) {
247 /*
248 Check if we have params and honour them. Otherwise
249 set defaults to processing only local stuff and
250 all local hosts.
251 */
252 if (runParams.isPresent()) {
253 JsonNode node = runParams.get();
254 isLocal = node.get("local").asBoolean();
255 hosts = node.get("hosts") == null ? Sets.newHashSet(hostService.getHosts()) :
256 constructHostIds(node.get("hosts").elements());
257 } else {
258 isLocal = true;
259 hosts = Sets.newHashSet(hostService.getHosts());
260 }
261
262 //construct list of intents.
263 installed = Sets.newHashSet();
264 if (isLocal) {
265 uninstalledOrWithdrawn = buildPairs(pruneHostsByMasterShip());
266 } else {
267 uninstalledOrWithdrawn = buildPairs(hosts);
268 }
269
270 }
271
272 private Set<Host> constructHostIds(Iterator<JsonNode> elements) {
273 Set<Host> hostIds = Sets.newHashSet();
274 JsonNode n;
275 while (elements.hasNext()) {
276 n = elements.next();
277 hostIds.add(hostService.getHost(HostId.hostId(n.textValue())));
278 }
279 return hostIds;
280 }
281
282 @Override
283 public void run() {
alshabib3a0e9f52015-02-08 14:51:16 -0800284 if (!installWorker.isShutdown()) {
alshabib486349d2014-11-25 18:09:25 -0500285 randomize();
286 latch = new CountDownLatch(1);
287 try {
288 trackIntents();
289 } catch (InterruptedException e) {
290 shutdown();
291 }
292 }
293
294 }
295
296
297 /**
298 * Check whether the previously submitted batch is in progress
299 * and if yes submit the next one. If things hang, wait for at
300 * most 5 seconds and bail.
301 * @throws InterruptedException if the thread go interupted
302 */
303 private void trackIntents() throws InterruptedException {
Sho SHIMIZU4931ee52015-02-03 21:09:28 -0800304 //FIXME
305 // TODO generate keys for each set of intents to allow manager to throttle
306 // TODO may also look into the store to see how many operations are pending
307
Brian O'Connor03406a42015-02-03 17:28:57 -0800308 //if everything is good proceed.
alshabib3a0e9f52015-02-08 14:51:16 -0800309 if (!installWorker.isShutdown()) {
310 installWorker.execute(this);
alshabib486349d2014-11-25 18:09:25 -0500311 }
312
313 }
314
315 public void shutdown() {
316 log.warn("Shutting down random installer!");
317 cleanUp();
318 }
319
320
321 /**
322 * Shuffle the uninstalled and installed list (separately) and select
323 * a random number of them and install or uninstall them respectively.
324 */
325 private void randomize() {
326 List<HostPair> hostList = new LinkedList<>(uninstalledOrWithdrawn);
327 Collections.shuffle(hostList);
328 List<HostPair> toInstall = hostList.subList(0,
329 random.nextInt(hostList.size() - 1));
330 List<HostPair> toRemove;
331 if (!installed.isEmpty()) {
332 hostList = new LinkedList<>(installed);
333 Collections.shuffle(hostList);
334 toRemove = hostList.subList(0,
335 random.nextInt(hostList.size() - 1));
336 uninstallIntents(toRemove);
337 }
338 installIntents(toInstall);
339
340 }
341
342 private void installIntents(List<HostPair> toInstall) {
alshabib486349d2014-11-25 18:09:25 -0500343 for (HostPair pair : toInstall) {
344 installed.add(pair);
345 uninstalledOrWithdrawn.remove(pair);
Brian O'Connor03406a42015-02-03 17:28:57 -0800346 intentService.submit(pair.h2hIntent());
alshabib486349d2014-11-25 18:09:25 -0500347 }
alshabib486349d2014-11-25 18:09:25 -0500348 }
349
350 private void uninstallIntents(Collection<HostPair> toRemove) {
alshabib486349d2014-11-25 18:09:25 -0500351 for (HostPair pair : toRemove) {
352 installed.remove(pair);
353 uninstalledOrWithdrawn.add(pair);
Brian O'Connor03406a42015-02-03 17:28:57 -0800354 intentService.withdraw(pair.h2hIntent());
alshabib486349d2014-11-25 18:09:25 -0500355 }
alshabib486349d2014-11-25 18:09:25 -0500356 }
357
358 /**
359 * Take everything and remove it all.
360 */
361 private void cleanUp() {
362 List<HostPair> allPairs = Lists.newArrayList(installed);
363 allPairs.addAll(uninstalledOrWithdrawn);
alshabib486349d2014-11-25 18:09:25 -0500364 for (HostPair pair : allPairs) {
Brian O'Connor03406a42015-02-03 17:28:57 -0800365 intentService.withdraw(pair.h2hIntent());
alshabib486349d2014-11-25 18:09:25 -0500366 }
alshabib486349d2014-11-25 18:09:25 -0500367 }
368
369
370 private Set<HostPair> buildPairs(Set<Host> hosts) {
371 Set<HostPair> pairs = Sets.newHashSet();
372 Iterator<Host> it = Sets.newHashSet(hosts).iterator();
373 while (it.hasNext()) {
374 Host src = it.next();
375 it.remove();
376 for (Host dst : hosts) {
377 pairs.add(new HostPair(src, dst));
378 }
379 }
380 return pairs;
381 }
382
383 private Set<Host> pruneHostsByMasterShip() {
384 return FluentIterable.from(hosts)
385 .filter(hasLocalMaster())
386 .toSet();
387
388 }
389
390 private Predicate<? super Host> hasLocalMaster() {
391 return new Predicate<Host>() {
392 @Override
393 public boolean apply(Host host) {
394 return mastershipService.getLocalRole(
395 host.location().deviceId()).equals(MastershipRole.MASTER);
396 }
397 };
398 }
399
400
401 /**
402 * Simple class representing a pair of hosts and precomputes the associated
403 * h2h intent.
404 */
405 private class HostPair {
406
407 private final Host src;
408 private final Host dst;
409
410 private final TrafficSelector selector = DefaultTrafficSelector.builder().build();
411 private final TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
412 private final List<Constraint> constraint = Lists.newArrayList();
413 private final HostToHostIntent intent;
414
415 public HostPair(Host src, Host dst) {
416 this.src = src;
417 this.dst = dst;
418 this.intent = new HostToHostIntent(appId, src.id(), dst.id(),
419 selector, treatment, constraint);
420 }
421
422 public HostToHostIntent h2hIntent() {
423 return intent;
424 }
425
426 @Override
427 public boolean equals(Object o) {
428 if (this == o) {
429 return true;
430 }
431 if (o == null || getClass() != o.getClass()) {
432 return false;
433 }
434
435 HostPair hostPair = (HostPair) o;
436
437 return Objects.equals(src, hostPair.src) &&
438 Objects.equals(dst, hostPair.dst);
439
440 }
441
442 @Override
443 public int hashCode() {
444 return Objects.hash(src, dst);
445 }
446
447
448 }
449
450 }
451
452 /**
453 * Remove anything that is running and clear it all out.
454 */
alshabibfd23d312014-11-11 18:14:47 -0800455 private class UnInstaller implements Runnable {
456 @Override
457 public void run() {
alshabib486349d2014-11-25 18:09:25 -0500458 if (!existingIntents.isEmpty()) {
459 clearExistingIntents();
460 }
461
alshabib3a0e9f52015-02-08 14:51:16 -0800462 if (installWorker != null && !installWorker.isShutdown()) {
463 shutdownAndAwaitTermination(installWorker);
alshabib486349d2014-11-25 18:09:25 -0500464 randomInstaller.shutdown();
465 }
466 }
467
468 private void clearExistingIntents() {
alshabibfd23d312014-11-11 18:14:47 -0800469 for (Intent i : existingIntents) {
470 intentService.withdraw(i);
471 }
alshabib486349d2014-11-25 18:09:25 -0500472 existingIntents.clear();
alshabibfd23d312014-11-11 18:14:47 -0800473 }
474 }
alshabib486349d2014-11-25 18:09:25 -0500475
476 /**
477 * Shutdown a pool cleanly if possible.
478 *
479 * @param pool an executorService
480 */
481 private void shutdownAndAwaitTermination(ExecutorService pool) {
482 pool.shutdown(); // Disable new tasks from being submitted
483 try {
484 // Wait a while for existing tasks to terminate
485 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
486 pool.shutdownNow(); // Cancel currently executing tasks
487 // Wait a while for tasks to respond to being cancelled
488 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
489 log.error("Pool did not terminate");
490 }
491 }
492 } catch (Exception ie) {
493 // (Re-)Cancel if current thread also interrupted
494 pool.shutdownNow();
495 // Preserve interrupt status
496 Thread.currentThread().interrupt();
497 }
498 }
499
alshabib3a0e9f52015-02-08 14:51:16 -0800500 private class FlowTest implements Callable<JsonNode> {
501 private final int flowPerDevice;
502 private final int neighbours;
503 private FlowRuleOperations.Builder adds;
504 private FlowRuleOperations.Builder removes;
505
506 public FlowTest(int flowsPerDevice, int neighbours) {
507 this.flowPerDevice = flowsPerDevice;
508 this.neighbours = neighbours;
509 prepareInstallation();
510 }
511
512 private void prepareInstallation() {
513 Set<ControllerNode> instances = Sets.newHashSet(clusterService.getNodes());
514 instances.remove(clusterService.getLocalNode());
515 Set<NodeId> acceptableNodes = Sets.newHashSet();
516 if (neighbours >= instances.size()) {
517 instances.forEach(instance -> acceptableNodes.add(instance.id()));
518 } else {
519 Iterator<ControllerNode> nodes = instances.iterator();
520 for (int i = neighbours; i > 0; i--) {
521 acceptableNodes.add(nodes.next().id());
522 }
523 }
524 acceptableNodes.add(clusterService.getLocalNode().id());
525
526 Set<Device> devices = Sets.newHashSet();
527 for (Device dev : deviceService.getDevices()) {
528 if (acceptableNodes.contains(
529 mastershipService.getMasterFor(dev.id()))) {
530 devices.add(dev);
531 }
532 }
533
534 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
535 .setOutput(PortNumber.portNumber(RandomUtils.nextInt())).build();
536 TrafficSelector.Builder sbuilder;
537 FlowRuleOperations.Builder rules = FlowRuleOperations.builder();
538 FlowRuleOperations.Builder remove = FlowRuleOperations.builder();
539
540 for (Device d : devices) {
541 for (int i = 0; i < this.flowPerDevice; i++) {
542 sbuilder = DefaultTrafficSelector.builder();
543
544 sbuilder.matchEthSrc(MacAddress.valueOf(RandomUtils.nextInt() * i))
545 .matchEthDst(MacAddress.valueOf((Integer.MAX_VALUE - i) * RandomUtils.nextInt()));
546
547
548 int randomPriority = RandomUtils.nextInt();
549 DefaultFlowRule f = new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
550 randomPriority, appId, 10, false);
551 rules.add(f);
552 remove.remove(f);
553
554 }
555 }
556
557 this.adds = rules;
558 this.removes = remove;
559 }
560
561 @Override
562 public JsonNode call() throws Exception {
563 ObjectNode node = mapper.createObjectNode();
564 CountDownLatch latch = new CountDownLatch(1);
565 flowService.apply(adds.build(new FlowRuleOperationsContext() {
566
567 private final Stopwatch timer = Stopwatch.createStarted();
568
569 @Override
570 public void onSuccess(FlowRuleOperations ops) {
571
572 long elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
573 node.put("elapsed", elapsed);
574
575
576 latch.countDown();
577 }
578 }));
579
580 latch.await(10, TimeUnit.SECONDS);
581 flowService.apply(removes.build());
582 return node;
583 }
584 }
alshabibfd23d312014-11-11 18:14:47 -0800585}
586
587