blob: 1b014720c6c27e93e4ef7b124fb76ed4dcba7b8a [file] [log] [blame]
David Jencksbae44842014-06-21 20:15:24 +00001/*
2 * Copyright (c) OSGi Alliance (2014). All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.osgi.util.promise;
18
19import java.lang.reflect.InvocationTargetException;
20import java.util.NoSuchElementException;
21import java.util.concurrent.ConcurrentLinkedQueue;
22import java.util.concurrent.CountDownLatch;
23import org.osgi.util.function.Function;
24import org.osgi.util.function.Predicate;
25
26/**
27 * Promise implementation.
28 *
29 * <p>
30 * This class is not used directly by clients. Clients should use
31 * {@link Deferred} to create a resolvable {@link Promise}.
32 *
33 * @param <T> The result type associated with the Promise.
34 *
35 * @ThreadSafe
36 * @author $Id: d8b44a36f3eb797316b213118192fac213fa0c59 $
37 */
38final class PromiseImpl<T> implements Promise<T> {
39 /**
40 * A ConcurrentLinkedQueue to hold the callbacks for this Promise, so no
41 * additional synchronization is required to write to or read from the
42 * queue.
43 */
44 private final ConcurrentLinkedQueue<Runnable> callbacks;
45 /**
46 * A CountDownLatch to manage the resolved state of this Promise.
47 *
48 * <p>
49 * This object is used as the synchronizing object to provide a critical
50 * section in {@link #resolve(Object, Throwable)} so that only a single
51 * thread can write the resolved state variables and open the latch.
52 *
53 * <p>
54 * The resolved state variables, {@link #value} and {@link #fail}, must only
55 * be written when the latch is closed (getCount() != 0) and must only be
56 * read when the latch is open (getCount() == 0). The latch state must
57 * always be checked before writing or reading since the resolved state
58 * variables' memory consistency is guarded by the latch.
59 */
60 private final CountDownLatch resolved;
61 /**
62 * The value of this Promise if successfully resolved.
63 *
64 * @GuardedBy("resolved")
65 * @see #resolved
66 */
67 private T value;
68 /**
69 * The failure of this Promise if resolved with a failure or {@code null} if
70 * successfully resolved.
71 *
72 * @GuardedBy("resolved")
73 * @see #resolved
74 */
75 private Throwable fail;
76
77 /**
78 * Initialize this Promise.
79 */
80 PromiseImpl() {
81 callbacks = new ConcurrentLinkedQueue<Runnable>();
82 resolved = new CountDownLatch(1);
83 }
84
85 /**
86 * Initialize and resolve this Promise.
87 *
88 * @param v The value of this resolved Promise.
89 * @param f The failure of this resolved Promise.
90 */
91 PromiseImpl(T v, Throwable f) {
92 value = v;
93 fail = f;
94 callbacks = new ConcurrentLinkedQueue<Runnable>();
95 resolved = new CountDownLatch(0);
96 }
97
98 /**
99 * Resolve this Promise.
100 *
101 * @param v The value of this Promise.
102 * @param f The failure of this Promise.
103 */
104 void resolve(T v, Throwable f) {
105 // critical section: only one resolver at a time
106 synchronized (resolved) {
107 if (resolved.getCount() == 0) {
108 throw new IllegalStateException("Already resolved");
109 }
110 /*
111 * The resolved state variables must be set before opening the
112 * latch. This safely publishes them to be read by other threads
113 * that must verify the latch is open before reading.
114 */
115 value = v;
116 fail = f;
117 resolved.countDown();
118 }
119 notifyCallbacks(); // call any registered callbacks
120 }
121
122 /**
123 * Call any registered callbacks if this Promise is resolved.
124 */
125 private void notifyCallbacks() {
126 if (resolved.getCount() != 0) {
127 return; // return if not resolved
128 }
129
130 /*
131 * Note: multiple threads can be in this method removing callbacks from
132 * the queue and calling them, so the order in which callbacks are
133 * called cannot be specified.
134 */
135 for (Runnable callback = callbacks.poll(); callback != null; callback = callbacks.poll()) {
136 try {
137 callback.run();
138 } catch (Throwable t) {
139 Logger.logCallbackException(t);
140 }
141 }
142 }
143
144 /**
145 * {@inheritDoc}
146 */
147 public boolean isDone() {
148 return resolved.getCount() == 0;
149 }
150
151 /**
152 * {@inheritDoc}
153 */
154 public T getValue() throws InvocationTargetException, InterruptedException {
155 resolved.await();
156 if (fail == null) {
157 return value;
158 }
159 throw new InvocationTargetException(fail);
160 }
161
162 /**
163 * {@inheritDoc}
164 */
165 public Throwable getFailure() throws InterruptedException {
166 resolved.await();
167 return fail;
168 }
169
170 /**
171 * {@inheritDoc}
172 */
173 public Promise<T> onResolve(Runnable callback) {
174 callbacks.offer(callback);
175 notifyCallbacks(); // call any registered callbacks
176 return this;
177 }
178
179 /**
180 * {@inheritDoc}
181 */
182 public <R> Promise<R> then(Success<? super T, ? extends R> success, Failure failure) {
183 PromiseImpl<R> chained = new PromiseImpl<R>();
184 onResolve(new Then<R>(chained, success, failure));
185 return chained;
186 }
187
188 /**
189 * {@inheritDoc}
190 */
191 public <R> Promise<R> then(Success<? super T, ? extends R> success) {
192 return then(success, null);
193 }
194
195 /**
196 * A callback used to chain promises for the {@link #then(Success, Failure)}
197 * method.
198 *
199 * @Immutable
200 */
201 private final class Then<R> implements Runnable {
202 private final PromiseImpl<R> chained;
203 private final Success<T, ? extends R> success;
204 private final Failure failure;
205
206 @SuppressWarnings("unchecked")
207 Then(PromiseImpl<R> chained, Success<? super T, ? extends R> success, Failure failure) {
208 this.chained = chained;
209 this.success = (Success<T, ? extends R>) success;
210 this.failure = failure;
211 }
212
213 public void run() {
214 Throwable f;
215 final boolean interrupted = Thread.interrupted();
216 try {
217 f = getFailure();
218 } catch (Throwable e) {
219 f = e; // propagate new exception
220 } finally {
221 if (interrupted) { // restore interrupt status
222 Thread.currentThread().interrupt();
223 }
224 }
225 if (f != null) {
226 if (failure != null) {
227 try {
228 failure.fail(PromiseImpl.this);
229 } catch (Throwable e) {
230 f = e; // propagate new exception
231 }
232 }
233 // fail chained
234 chained.resolve(null, f);
235 return;
236 }
237 Promise<? extends R> returned = null;
238 if (success != null) {
239 try {
240 returned = success.call(PromiseImpl.this);
241 } catch (Throwable e) {
242 chained.resolve(null, e);
243 return;
244 }
245 }
246 if (returned == null) {
247 // resolve chained with null value
248 chained.resolve(null, null);
249 } else {
250 // resolve chained when returned promise is resolved
251 returned.onResolve(new Chain<R>(chained, returned));
252 }
253 }
254 }
255
256 /**
257 * A callback used to resolve the chained Promise when the Promise promise
258 * is resolved.
259 *
260 * @Immutable
261 */
262 private final static class Chain<R> implements Runnable {
263 private final PromiseImpl<R> chained;
264 private final Promise<? extends R> promise;
265 private final Throwable failure;
266
267 Chain(PromiseImpl<R> chained, Promise<? extends R> promise) {
268 this.chained = chained;
269 this.promise = promise;
270 this.failure = null;
271 }
272
273 Chain(PromiseImpl<R> chained, Promise<? extends R> promise, Throwable failure) {
274 this.chained = chained;
275 this.promise = promise;
276 this.failure = failure;
277 }
278
279 public void run() {
280 R value = null;
281 Throwable f;
282 final boolean interrupted = Thread.interrupted();
283 try {
284 f = promise.getFailure();
285 if (f == null) {
286 value = promise.getValue();
287 } else if (failure != null) {
288 f = failure;
289 }
290 } catch (Throwable e) {
291 f = e; // propagate new exception
292 } finally {
293 if (interrupted) { // restore interrupt status
294 Thread.currentThread().interrupt();
295 }
296 }
297 chained.resolve(value, f);
298 }
299 }
300
301 /**
302 * Resolve this Promise with the specified Promise.
303 *
304 * <p>
305 * If the specified Promise is successfully resolved, this Promise is
306 * resolved with the value of the specified Promise. If the specified
307 * Promise is resolved with a failure, this Promise is resolved with the
308 * failure of the specified Promise.
309 *
310 * @param with A Promise whose value or failure will be used to resolve this
311 * Promise. Must not be {@code null}.
312 * @return A Promise that is resolved only when this Promise is resolved by
313 * the specified Promise. The returned Promise will be successfully
314 * resolved, with the value {@code null}, if this Promise was
315 * resolved by the specified Promise. The returned Promise will be
316 * resolved with a failure of {@link IllegalStateException} if this
317 * Promise was already resolved when the specified Promise was
318 * resolved.
319 */
320 Promise<Void> resolveWith(Promise<? extends T> with) {
321 PromiseImpl<Void> chained = new PromiseImpl<Void>();
322 ResolveWith resolveWith = new ResolveWith(chained);
323 with.then(resolveWith, resolveWith);
324 return chained;
325 }
326
327 /**
328 * A callback used to resolve this Promise with another Promise for the
329 * {@link PromiseImpl#resolveWith(Promise)} method.
330 *
331 * @Immutable
332 */
333 private final class ResolveWith implements Success<T, Void>, Failure {
334 private final PromiseImpl<Void> chained;
335
336 ResolveWith(PromiseImpl<Void> chained) {
337 this.chained = chained;
338 }
339
340 public Promise<Void> call(Promise<T> with) throws Exception {
341 try {
342 resolve(with.getValue(), null);
343 } catch (Throwable e) {
344 chained.resolve(null, e);
345 return null;
346 }
347 chained.resolve(null, null);
348 return null;
349 }
350
351 public void fail(Promise<?> with) throws Exception {
352 try {
353 resolve(null, with.getFailure());
354 } catch (Throwable e) {
355 chained.resolve(null, e);
356 return;
357 }
358 chained.resolve(null, null);
359 }
360 }
361
362 /**
363 * {@inheritDoc}
364 */
365 public Promise<T> filter(Predicate<? super T> predicate) {
366 return then(new Filter<T>(predicate));
367 }
368
369 /**
370 * A callback used by the {@link PromiseImpl#filter(Predicate)} method.
371 *
372 * @Immutable
373 */
374 private static final class Filter<T> implements Success<T, T> {
375 private final Predicate<? super T> predicate;
376
377 Filter(Predicate<? super T> predicate) {
378 this.predicate = requireNonNull(predicate);
379 }
380
381 public Promise<T> call(Promise<T> resolved) throws Exception {
382 if (predicate.test(resolved.getValue())) {
383 return resolved;
384 }
385 throw new NoSuchElementException();
386 }
387 }
388
389 /**
390 * {@inheritDoc}
391 */
392 public <R> Promise<R> map(Function<? super T, ? extends R> mapper) {
393 return then(new Map<T, R>(mapper));
394 }
395
396 /**
397 * A callback used by the {@link PromiseImpl#map(Function)} method.
398 *
399 * @Immutable
400 */
401 private static final class Map<T, R> implements Success<T, R> {
402 private final Function<? super T, ? extends R> mapper;
403
404 Map(Function<? super T, ? extends R> mapper) {
405 this.mapper = requireNonNull(mapper);
406 }
407
408 public Promise<R> call(Promise<T> resolved) throws Exception {
409 return new PromiseImpl<R>(mapper.apply(resolved.getValue()), null);
410 }
411 }
412
413 /**
414 * {@inheritDoc}
415 */
416 public <R> Promise<R> flatMap(Function<? super T, Promise<? extends R>> mapper) {
417 return then(new FlatMap<T, R>(mapper));
418 }
419
420 /**
421 * A callback used by the {@link PromiseImpl#flatMap(Function)} method.
422 *
423 * @Immutable
424 */
425 private static final class FlatMap<T, R> implements Success<T, R> {
426 private final Function<? super T, Promise<? extends R>> mapper;
427
428 FlatMap(Function<? super T, Promise<? extends R>> mapper) {
429 this.mapper = requireNonNull(mapper);
430 }
431
432 @SuppressWarnings("unchecked")
433 public Promise<R> call(Promise<T> resolved) throws Exception {
434 return (Promise<R>) mapper.apply(resolved.getValue());
435 }
436 }
437
438 /**
439 * {@inheritDoc}
440 */
441 public Promise<T> recover(Function<Promise<?>, ? extends T> recovery) {
442 PromiseImpl<T> chained = new PromiseImpl<T>();
443 Recover<T> recover = new Recover<T>(chained, recovery);
444 then(recover, recover);
445 return chained;
446 }
447
448 /**
449 * A callback used by the {@link PromiseImpl#recover(Function)} method.
450 *
451 * @Immutable
452 */
453 private static final class Recover<T> implements Success<T, Void>, Failure {
454 private final PromiseImpl<T> chained;
455 private final Function<Promise<?>, ? extends T> recovery;
456
457 Recover(PromiseImpl<T> chained, Function<Promise<?>, ? extends T> recovery) {
458 this.chained = chained;
459 this.recovery = requireNonNull(recovery);
460 }
461
462 public Promise<Void> call(Promise<T> resolved) throws Exception {
463 T value;
464 try {
465 value = resolved.getValue();
466 } catch (Throwable e) {
467 chained.resolve(null, e);
468 return null;
469 }
470 chained.resolve(value, null);
471 return null;
472 }
473
474 public void fail(Promise<?> resolved) throws Exception {
475 T recovered;
476 Throwable failure;
477 try {
478 recovered = recovery.apply(resolved);
479 failure = resolved.getFailure();
480 } catch (Throwable e) {
481 chained.resolve(null, e);
482 return;
483 }
484 if (recovered == null) {
485 chained.resolve(null, failure);
486 } else {
487 chained.resolve(recovered, null);
488 }
489 }
490 }
491
492 /**
493 * {@inheritDoc}
494 */
495 public Promise<T> recoverWith(Function<Promise<?>, Promise<? extends T>> recovery) {
496 PromiseImpl<T> chained = new PromiseImpl<T>();
497 RecoverWith<T> recoverWith = new RecoverWith<T>(chained, recovery);
498 then(recoverWith, recoverWith);
499 return chained;
500 }
501
502 /**
503 * A callback used by the {@link PromiseImpl#recoverWith(Function)} method.
504 *
505 * @Immutable
506 */
507 private static final class RecoverWith<T> implements Success<T, Void>, Failure {
508 private final PromiseImpl<T> chained;
509 private final Function<Promise<?>, Promise<? extends T>> recovery;
510
511 RecoverWith(PromiseImpl<T> chained, Function<Promise<?>, Promise<? extends T>> recovery) {
512 this.chained = chained;
513 this.recovery = requireNonNull(recovery);
514 }
515
516 public Promise<Void> call(Promise<T> resolved) throws Exception {
517 T value;
518 try {
519 value = resolved.getValue();
520 } catch (Throwable e) {
521 chained.resolve(null, e);
522 return null;
523 }
524 chained.resolve(value, null);
525 return null;
526 }
527
528 public void fail(Promise<?> resolved) throws Exception {
529 Promise<? extends T> recovered;
530 Throwable failure;
531 try {
532 recovered = recovery.apply(resolved);
533 failure = resolved.getFailure();
534 } catch (Throwable e) {
535 chained.resolve(null, e);
536 return;
537 }
538 if (recovered == null) {
539 chained.resolve(null, failure);
540 } else {
541 recovered.onResolve(new Chain<T>(chained, recovered));
542 }
543 }
544 }
545
546 /**
547 * {@inheritDoc}
548 */
549 public Promise<T> fallbackTo(Promise<? extends T> fallback) {
550 PromiseImpl<T> chained = new PromiseImpl<T>();
551 FallbackTo<T> fallbackTo = new FallbackTo<T>(chained, fallback);
552 then(fallbackTo, fallbackTo);
553 return chained;
554 }
555
556 /**
557 * A callback used by the {@link PromiseImpl#fallbackTo(Promise)} method.
558 *
559 * @Immutable
560 */
561 private static final class FallbackTo<T> implements Success<T, Void>, Failure {
562 private final PromiseImpl<T> chained;
563 private final Promise<? extends T> fallback;
564
565 FallbackTo(PromiseImpl<T> chained, Promise<? extends T> fallback) {
566 this.chained = chained;
567 this.fallback = requireNonNull(fallback);
568 }
569
570 public Promise<Void> call(Promise<T> resolved) throws Exception {
571 T value;
572 try {
573 value = resolved.getValue();
574 } catch (Throwable e) {
575 chained.resolve(null, e);
576 return null;
577 }
578 chained.resolve(value, null);
579 return null;
580 }
581
582 public void fail(Promise<?> resolved) throws Exception {
583 Throwable failure;
584 try {
585 failure = resolved.getFailure();
586 } catch (Throwable e) {
587 chained.resolve(null, e);
588 return;
589 }
590 fallback.onResolve(new Chain<T>(chained, fallback, failure));
591 }
592 }
593
594 static <V> V requireNonNull(V value) {
595 if (value != null) {
596 return value;
597 }
598 throw new NullPointerException();
599 }
600
601 /**
602 * Use the lazy initialization holder class idiom to delay creating a Logger
603 * until we actually need it.
604 */
605 private static final class Logger {
606 private final static java.util.logging.Logger LOGGER;
607 static {
608 LOGGER = java.util.logging.Logger.getLogger(PromiseImpl.class.getName());
609 }
610
611 static void logCallbackException(Throwable t) {
612 LOGGER.log(java.util.logging.Level.WARNING, "Exception from Promise callback", t);
613 }
614 }
615}