David Jencks | bae4484 | 2014-06-21 20:15:24 +0000 | [diff] [blame^] | 1 | /* |
| 2 | * Copyright (c) OSGi Alliance (2014). All Rights Reserved. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package org.osgi.util.promise; |
| 18 | |
| 19 | import java.lang.reflect.InvocationTargetException; |
| 20 | import java.util.NoSuchElementException; |
| 21 | import java.util.concurrent.ConcurrentLinkedQueue; |
| 22 | import java.util.concurrent.CountDownLatch; |
| 23 | import org.osgi.util.function.Function; |
| 24 | import org.osgi.util.function.Predicate; |
| 25 | |
| 26 | /** |
| 27 | * Promise implementation. |
| 28 | * |
| 29 | * <p> |
| 30 | * This class is not used directly by clients. Clients should use |
| 31 | * {@link Deferred} to create a resolvable {@link Promise}. |
| 32 | * |
| 33 | * @param <T> The result type associated with the Promise. |
| 34 | * |
| 35 | * @ThreadSafe |
| 36 | * @author $Id: d8b44a36f3eb797316b213118192fac213fa0c59 $ |
| 37 | */ |
| 38 | final class PromiseImpl<T> implements Promise<T> { |
| 39 | /** |
| 40 | * A ConcurrentLinkedQueue to hold the callbacks for this Promise, so no |
| 41 | * additional synchronization is required to write to or read from the |
| 42 | * queue. |
| 43 | */ |
| 44 | private final ConcurrentLinkedQueue<Runnable> callbacks; |
| 45 | /** |
| 46 | * A CountDownLatch to manage the resolved state of this Promise. |
| 47 | * |
| 48 | * <p> |
| 49 | * This object is used as the synchronizing object to provide a critical |
| 50 | * section in {@link #resolve(Object, Throwable)} so that only a single |
| 51 | * thread can write the resolved state variables and open the latch. |
| 52 | * |
| 53 | * <p> |
| 54 | * The resolved state variables, {@link #value} and {@link #fail}, must only |
| 55 | * be written when the latch is closed (getCount() != 0) and must only be |
| 56 | * read when the latch is open (getCount() == 0). The latch state must |
| 57 | * always be checked before writing or reading since the resolved state |
| 58 | * variables' memory consistency is guarded by the latch. |
| 59 | */ |
| 60 | private final CountDownLatch resolved; |
| 61 | /** |
| 62 | * The value of this Promise if successfully resolved. |
| 63 | * |
| 64 | * @GuardedBy("resolved") |
| 65 | * @see #resolved |
| 66 | */ |
| 67 | private T value; |
| 68 | /** |
| 69 | * The failure of this Promise if resolved with a failure or {@code null} if |
| 70 | * successfully resolved. |
| 71 | * |
| 72 | * @GuardedBy("resolved") |
| 73 | * @see #resolved |
| 74 | */ |
| 75 | private Throwable fail; |
| 76 | |
| 77 | /** |
| 78 | * Initialize this Promise. |
| 79 | */ |
| 80 | PromiseImpl() { |
| 81 | callbacks = new ConcurrentLinkedQueue<Runnable>(); |
| 82 | resolved = new CountDownLatch(1); |
| 83 | } |
| 84 | |
| 85 | /** |
| 86 | * Initialize and resolve this Promise. |
| 87 | * |
| 88 | * @param v The value of this resolved Promise. |
| 89 | * @param f The failure of this resolved Promise. |
| 90 | */ |
| 91 | PromiseImpl(T v, Throwable f) { |
| 92 | value = v; |
| 93 | fail = f; |
| 94 | callbacks = new ConcurrentLinkedQueue<Runnable>(); |
| 95 | resolved = new CountDownLatch(0); |
| 96 | } |
| 97 | |
| 98 | /** |
| 99 | * Resolve this Promise. |
| 100 | * |
| 101 | * @param v The value of this Promise. |
| 102 | * @param f The failure of this Promise. |
| 103 | */ |
| 104 | void resolve(T v, Throwable f) { |
| 105 | // critical section: only one resolver at a time |
| 106 | synchronized (resolved) { |
| 107 | if (resolved.getCount() == 0) { |
| 108 | throw new IllegalStateException("Already resolved"); |
| 109 | } |
| 110 | /* |
| 111 | * The resolved state variables must be set before opening the |
| 112 | * latch. This safely publishes them to be read by other threads |
| 113 | * that must verify the latch is open before reading. |
| 114 | */ |
| 115 | value = v; |
| 116 | fail = f; |
| 117 | resolved.countDown(); |
| 118 | } |
| 119 | notifyCallbacks(); // call any registered callbacks |
| 120 | } |
| 121 | |
| 122 | /** |
| 123 | * Call any registered callbacks if this Promise is resolved. |
| 124 | */ |
| 125 | private void notifyCallbacks() { |
| 126 | if (resolved.getCount() != 0) { |
| 127 | return; // return if not resolved |
| 128 | } |
| 129 | |
| 130 | /* |
| 131 | * Note: multiple threads can be in this method removing callbacks from |
| 132 | * the queue and calling them, so the order in which callbacks are |
| 133 | * called cannot be specified. |
| 134 | */ |
| 135 | for (Runnable callback = callbacks.poll(); callback != null; callback = callbacks.poll()) { |
| 136 | try { |
| 137 | callback.run(); |
| 138 | } catch (Throwable t) { |
| 139 | Logger.logCallbackException(t); |
| 140 | } |
| 141 | } |
| 142 | } |
| 143 | |
| 144 | /** |
| 145 | * {@inheritDoc} |
| 146 | */ |
| 147 | public boolean isDone() { |
| 148 | return resolved.getCount() == 0; |
| 149 | } |
| 150 | |
| 151 | /** |
| 152 | * {@inheritDoc} |
| 153 | */ |
| 154 | public T getValue() throws InvocationTargetException, InterruptedException { |
| 155 | resolved.await(); |
| 156 | if (fail == null) { |
| 157 | return value; |
| 158 | } |
| 159 | throw new InvocationTargetException(fail); |
| 160 | } |
| 161 | |
| 162 | /** |
| 163 | * {@inheritDoc} |
| 164 | */ |
| 165 | public Throwable getFailure() throws InterruptedException { |
| 166 | resolved.await(); |
| 167 | return fail; |
| 168 | } |
| 169 | |
| 170 | /** |
| 171 | * {@inheritDoc} |
| 172 | */ |
| 173 | public Promise<T> onResolve(Runnable callback) { |
| 174 | callbacks.offer(callback); |
| 175 | notifyCallbacks(); // call any registered callbacks |
| 176 | return this; |
| 177 | } |
| 178 | |
| 179 | /** |
| 180 | * {@inheritDoc} |
| 181 | */ |
| 182 | public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) { |
| 183 | PromiseImpl<R> chained = new PromiseImpl<R>(); |
| 184 | onResolve(new Then<R>(chained, success, failure)); |
| 185 | return chained; |
| 186 | } |
| 187 | |
| 188 | /** |
| 189 | * {@inheritDoc} |
| 190 | */ |
| 191 | public <R> Promise<R> then(Success<? super T, ? extends R> success) { |
| 192 | return then(success, null); |
| 193 | } |
| 194 | |
| 195 | /** |
| 196 | * A callback used to chain promises for the {@link #then(Success, Failure)} |
| 197 | * method. |
| 198 | * |
| 199 | * @Immutable |
| 200 | */ |
| 201 | private final class Then<R> implements Runnable { |
| 202 | private final PromiseImpl<R> chained; |
| 203 | private final Success<T, ? extends R> success; |
| 204 | private final Failure failure; |
| 205 | |
| 206 | @SuppressWarnings("unchecked") |
| 207 | Then(PromiseImpl<R> chained, Success<? super T, ? extends R> success, Failure failure) { |
| 208 | this.chained = chained; |
| 209 | this.success = (Success<T, ? extends R>) success; |
| 210 | this.failure = failure; |
| 211 | } |
| 212 | |
| 213 | public void run() { |
| 214 | Throwable f; |
| 215 | final boolean interrupted = Thread.interrupted(); |
| 216 | try { |
| 217 | f = getFailure(); |
| 218 | } catch (Throwable e) { |
| 219 | f = e; // propagate new exception |
| 220 | } finally { |
| 221 | if (interrupted) { // restore interrupt status |
| 222 | Thread.currentThread().interrupt(); |
| 223 | } |
| 224 | } |
| 225 | if (f != null) { |
| 226 | if (failure != null) { |
| 227 | try { |
| 228 | failure.fail(PromiseImpl.this); |
| 229 | } catch (Throwable e) { |
| 230 | f = e; // propagate new exception |
| 231 | } |
| 232 | } |
| 233 | // fail chained |
| 234 | chained.resolve(null, f); |
| 235 | return; |
| 236 | } |
| 237 | Promise<? extends R> returned = null; |
| 238 | if (success != null) { |
| 239 | try { |
| 240 | returned = success.call(PromiseImpl.this); |
| 241 | } catch (Throwable e) { |
| 242 | chained.resolve(null, e); |
| 243 | return; |
| 244 | } |
| 245 | } |
| 246 | if (returned == null) { |
| 247 | // resolve chained with null value |
| 248 | chained.resolve(null, null); |
| 249 | } else { |
| 250 | // resolve chained when returned promise is resolved |
| 251 | returned.onResolve(new Chain<R>(chained, returned)); |
| 252 | } |
| 253 | } |
| 254 | } |
| 255 | |
| 256 | /** |
| 257 | * A callback used to resolve the chained Promise when the Promise promise |
| 258 | * is resolved. |
| 259 | * |
| 260 | * @Immutable |
| 261 | */ |
| 262 | private final static class Chain<R> implements Runnable { |
| 263 | private final PromiseImpl<R> chained; |
| 264 | private final Promise<? extends R> promise; |
| 265 | private final Throwable failure; |
| 266 | |
| 267 | Chain(PromiseImpl<R> chained, Promise<? extends R> promise) { |
| 268 | this.chained = chained; |
| 269 | this.promise = promise; |
| 270 | this.failure = null; |
| 271 | } |
| 272 | |
| 273 | Chain(PromiseImpl<R> chained, Promise<? extends R> promise, Throwable failure) { |
| 274 | this.chained = chained; |
| 275 | this.promise = promise; |
| 276 | this.failure = failure; |
| 277 | } |
| 278 | |
| 279 | public void run() { |
| 280 | R value = null; |
| 281 | Throwable f; |
| 282 | final boolean interrupted = Thread.interrupted(); |
| 283 | try { |
| 284 | f = promise.getFailure(); |
| 285 | if (f == null) { |
| 286 | value = promise.getValue(); |
| 287 | } else if (failure != null) { |
| 288 | f = failure; |
| 289 | } |
| 290 | } catch (Throwable e) { |
| 291 | f = e; // propagate new exception |
| 292 | } finally { |
| 293 | if (interrupted) { // restore interrupt status |
| 294 | Thread.currentThread().interrupt(); |
| 295 | } |
| 296 | } |
| 297 | chained.resolve(value, f); |
| 298 | } |
| 299 | } |
| 300 | |
| 301 | /** |
| 302 | * Resolve this Promise with the specified Promise. |
| 303 | * |
| 304 | * <p> |
| 305 | * If the specified Promise is successfully resolved, this Promise is |
| 306 | * resolved with the value of the specified Promise. If the specified |
| 307 | * Promise is resolved with a failure, this Promise is resolved with the |
| 308 | * failure of the specified Promise. |
| 309 | * |
| 310 | * @param with A Promise whose value or failure will be used to resolve this |
| 311 | * Promise. Must not be {@code null}. |
| 312 | * @return A Promise that is resolved only when this Promise is resolved by |
| 313 | * the specified Promise. The returned Promise will be successfully |
| 314 | * resolved, with the value {@code null}, if this Promise was |
| 315 | * resolved by the specified Promise. The returned Promise will be |
| 316 | * resolved with a failure of {@link IllegalStateException} if this |
| 317 | * Promise was already resolved when the specified Promise was |
| 318 | * resolved. |
| 319 | */ |
| 320 | Promise<Void> resolveWith(Promise<? extends T> with) { |
| 321 | PromiseImpl<Void> chained = new PromiseImpl<Void>(); |
| 322 | ResolveWith resolveWith = new ResolveWith(chained); |
| 323 | with.then(resolveWith, resolveWith); |
| 324 | return chained; |
| 325 | } |
| 326 | |
| 327 | /** |
| 328 | * A callback used to resolve this Promise with another Promise for the |
| 329 | * {@link PromiseImpl#resolveWith(Promise)} method. |
| 330 | * |
| 331 | * @Immutable |
| 332 | */ |
| 333 | private final class ResolveWith implements Success<T, Void>, Failure { |
| 334 | private final PromiseImpl<Void> chained; |
| 335 | |
| 336 | ResolveWith(PromiseImpl<Void> chained) { |
| 337 | this.chained = chained; |
| 338 | } |
| 339 | |
| 340 | public Promise<Void> call(Promise<T> with) throws Exception { |
| 341 | try { |
| 342 | resolve(with.getValue(), null); |
| 343 | } catch (Throwable e) { |
| 344 | chained.resolve(null, e); |
| 345 | return null; |
| 346 | } |
| 347 | chained.resolve(null, null); |
| 348 | return null; |
| 349 | } |
| 350 | |
| 351 | public void fail(Promise<?> with) throws Exception { |
| 352 | try { |
| 353 | resolve(null, with.getFailure()); |
| 354 | } catch (Throwable e) { |
| 355 | chained.resolve(null, e); |
| 356 | return; |
| 357 | } |
| 358 | chained.resolve(null, null); |
| 359 | } |
| 360 | } |
| 361 | |
| 362 | /** |
| 363 | * {@inheritDoc} |
| 364 | */ |
| 365 | public Promise<T> filter(Predicate<? super T> predicate) { |
| 366 | return then(new Filter<T>(predicate)); |
| 367 | } |
| 368 | |
| 369 | /** |
| 370 | * A callback used by the {@link PromiseImpl#filter(Predicate)} method. |
| 371 | * |
| 372 | * @Immutable |
| 373 | */ |
| 374 | private static final class Filter<T> implements Success<T, T> { |
| 375 | private final Predicate<? super T> predicate; |
| 376 | |
| 377 | Filter(Predicate<? super T> predicate) { |
| 378 | this.predicate = requireNonNull(predicate); |
| 379 | } |
| 380 | |
| 381 | public Promise<T> call(Promise<T> resolved) throws Exception { |
| 382 | if (predicate.test(resolved.getValue())) { |
| 383 | return resolved; |
| 384 | } |
| 385 | throw new NoSuchElementException(); |
| 386 | } |
| 387 | } |
| 388 | |
| 389 | /** |
| 390 | * {@inheritDoc} |
| 391 | */ |
| 392 | public <R> Promise<R> map(Function<? super T, ? extends R> mapper) { |
| 393 | return then(new Map<T, R>(mapper)); |
| 394 | } |
| 395 | |
| 396 | /** |
| 397 | * A callback used by the {@link PromiseImpl#map(Function)} method. |
| 398 | * |
| 399 | * @Immutable |
| 400 | */ |
| 401 | private static final class Map<T, R> implements Success<T, R> { |
| 402 | private final Function<? super T, ? extends R> mapper; |
| 403 | |
| 404 | Map(Function<? super T, ? extends R> mapper) { |
| 405 | this.mapper = requireNonNull(mapper); |
| 406 | } |
| 407 | |
| 408 | public Promise<R> call(Promise<T> resolved) throws Exception { |
| 409 | return new PromiseImpl<R>(mapper.apply(resolved.getValue()), null); |
| 410 | } |
| 411 | } |
| 412 | |
| 413 | /** |
| 414 | * {@inheritDoc} |
| 415 | */ |
| 416 | public <R> Promise<R> flatMap(Function<? super T, Promise<? extends R>> mapper) { |
| 417 | return then(new FlatMap<T, R>(mapper)); |
| 418 | } |
| 419 | |
| 420 | /** |
| 421 | * A callback used by the {@link PromiseImpl#flatMap(Function)} method. |
| 422 | * |
| 423 | * @Immutable |
| 424 | */ |
| 425 | private static final class FlatMap<T, R> implements Success<T, R> { |
| 426 | private final Function<? super T, Promise<? extends R>> mapper; |
| 427 | |
| 428 | FlatMap(Function<? super T, Promise<? extends R>> mapper) { |
| 429 | this.mapper = requireNonNull(mapper); |
| 430 | } |
| 431 | |
| 432 | @SuppressWarnings("unchecked") |
| 433 | public Promise<R> call(Promise<T> resolved) throws Exception { |
| 434 | return (Promise<R>) mapper.apply(resolved.getValue()); |
| 435 | } |
| 436 | } |
| 437 | |
| 438 | /** |
| 439 | * {@inheritDoc} |
| 440 | */ |
| 441 | public Promise<T> recover(Function<Promise<?>, ? extends T> recovery) { |
| 442 | PromiseImpl<T> chained = new PromiseImpl<T>(); |
| 443 | Recover<T> recover = new Recover<T>(chained, recovery); |
| 444 | then(recover, recover); |
| 445 | return chained; |
| 446 | } |
| 447 | |
| 448 | /** |
| 449 | * A callback used by the {@link PromiseImpl#recover(Function)} method. |
| 450 | * |
| 451 | * @Immutable |
| 452 | */ |
| 453 | private static final class Recover<T> implements Success<T, Void>, Failure { |
| 454 | private final PromiseImpl<T> chained; |
| 455 | private final Function<Promise<?>, ? extends T> recovery; |
| 456 | |
| 457 | Recover(PromiseImpl<T> chained, Function<Promise<?>, ? extends T> recovery) { |
| 458 | this.chained = chained; |
| 459 | this.recovery = requireNonNull(recovery); |
| 460 | } |
| 461 | |
| 462 | public Promise<Void> call(Promise<T> resolved) throws Exception { |
| 463 | T value; |
| 464 | try { |
| 465 | value = resolved.getValue(); |
| 466 | } catch (Throwable e) { |
| 467 | chained.resolve(null, e); |
| 468 | return null; |
| 469 | } |
| 470 | chained.resolve(value, null); |
| 471 | return null; |
| 472 | } |
| 473 | |
| 474 | public void fail(Promise<?> resolved) throws Exception { |
| 475 | T recovered; |
| 476 | Throwable failure; |
| 477 | try { |
| 478 | recovered = recovery.apply(resolved); |
| 479 | failure = resolved.getFailure(); |
| 480 | } catch (Throwable e) { |
| 481 | chained.resolve(null, e); |
| 482 | return; |
| 483 | } |
| 484 | if (recovered == null) { |
| 485 | chained.resolve(null, failure); |
| 486 | } else { |
| 487 | chained.resolve(recovered, null); |
| 488 | } |
| 489 | } |
| 490 | } |
| 491 | |
| 492 | /** |
| 493 | * {@inheritDoc} |
| 494 | */ |
| 495 | public Promise<T> recoverWith(Function<Promise<?>, Promise<? extends T>> recovery) { |
| 496 | PromiseImpl<T> chained = new PromiseImpl<T>(); |
| 497 | RecoverWith<T> recoverWith = new RecoverWith<T>(chained, recovery); |
| 498 | then(recoverWith, recoverWith); |
| 499 | return chained; |
| 500 | } |
| 501 | |
| 502 | /** |
| 503 | * A callback used by the {@link PromiseImpl#recoverWith(Function)} method. |
| 504 | * |
| 505 | * @Immutable |
| 506 | */ |
| 507 | private static final class RecoverWith<T> implements Success<T, Void>, Failure { |
| 508 | private final PromiseImpl<T> chained; |
| 509 | private final Function<Promise<?>, Promise<? extends T>> recovery; |
| 510 | |
| 511 | RecoverWith(PromiseImpl<T> chained, Function<Promise<?>, Promise<? extends T>> recovery) { |
| 512 | this.chained = chained; |
| 513 | this.recovery = requireNonNull(recovery); |
| 514 | } |
| 515 | |
| 516 | public Promise<Void> call(Promise<T> resolved) throws Exception { |
| 517 | T value; |
| 518 | try { |
| 519 | value = resolved.getValue(); |
| 520 | } catch (Throwable e) { |
| 521 | chained.resolve(null, e); |
| 522 | return null; |
| 523 | } |
| 524 | chained.resolve(value, null); |
| 525 | return null; |
| 526 | } |
| 527 | |
| 528 | public void fail(Promise<?> resolved) throws Exception { |
| 529 | Promise<? extends T> recovered; |
| 530 | Throwable failure; |
| 531 | try { |
| 532 | recovered = recovery.apply(resolved); |
| 533 | failure = resolved.getFailure(); |
| 534 | } catch (Throwable e) { |
| 535 | chained.resolve(null, e); |
| 536 | return; |
| 537 | } |
| 538 | if (recovered == null) { |
| 539 | chained.resolve(null, failure); |
| 540 | } else { |
| 541 | recovered.onResolve(new Chain<T>(chained, recovered)); |
| 542 | } |
| 543 | } |
| 544 | } |
| 545 | |
| 546 | /** |
| 547 | * {@inheritDoc} |
| 548 | */ |
| 549 | public Promise<T> fallbackTo(Promise<? extends T> fallback) { |
| 550 | PromiseImpl<T> chained = new PromiseImpl<T>(); |
| 551 | FallbackTo<T> fallbackTo = new FallbackTo<T>(chained, fallback); |
| 552 | then(fallbackTo, fallbackTo); |
| 553 | return chained; |
| 554 | } |
| 555 | |
| 556 | /** |
| 557 | * A callback used by the {@link PromiseImpl#fallbackTo(Promise)} method. |
| 558 | * |
| 559 | * @Immutable |
| 560 | */ |
| 561 | private static final class FallbackTo<T> implements Success<T, Void>, Failure { |
| 562 | private final PromiseImpl<T> chained; |
| 563 | private final Promise<? extends T> fallback; |
| 564 | |
| 565 | FallbackTo(PromiseImpl<T> chained, Promise<? extends T> fallback) { |
| 566 | this.chained = chained; |
| 567 | this.fallback = requireNonNull(fallback); |
| 568 | } |
| 569 | |
| 570 | public Promise<Void> call(Promise<T> resolved) throws Exception { |
| 571 | T value; |
| 572 | try { |
| 573 | value = resolved.getValue(); |
| 574 | } catch (Throwable e) { |
| 575 | chained.resolve(null, e); |
| 576 | return null; |
| 577 | } |
| 578 | chained.resolve(value, null); |
| 579 | return null; |
| 580 | } |
| 581 | |
| 582 | public void fail(Promise<?> resolved) throws Exception { |
| 583 | Throwable failure; |
| 584 | try { |
| 585 | failure = resolved.getFailure(); |
| 586 | } catch (Throwable e) { |
| 587 | chained.resolve(null, e); |
| 588 | return; |
| 589 | } |
| 590 | fallback.onResolve(new Chain<T>(chained, fallback, failure)); |
| 591 | } |
| 592 | } |
| 593 | |
| 594 | static <V> V requireNonNull(V value) { |
| 595 | if (value != null) { |
| 596 | return value; |
| 597 | } |
| 598 | throw new NullPointerException(); |
| 599 | } |
| 600 | |
| 601 | /** |
| 602 | * Use the lazy initialization holder class idiom to delay creating a Logger |
| 603 | * until we actually need it. |
| 604 | */ |
| 605 | private static final class Logger { |
| 606 | private final static java.util.logging.Logger LOGGER; |
| 607 | static { |
| 608 | LOGGER = java.util.logging.Logger.getLogger(PromiseImpl.class.getName()); |
| 609 | } |
| 610 | |
| 611 | static void logCallbackException(Throwable t) { |
| 612 | LOGGER.log(java.util.logging.Level.WARNING, "Exception from Promise callback", t); |
| 613 | } |
| 614 | } |
| 615 | } |