001/*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021import static com.google.common.base.Preconditions.checkState;
022import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
023import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
024import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
025import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
026import static java.lang.Thread.currentThread;
027import static java.util.Arrays.asList;
028
029import com.google.common.annotations.Beta;
030import com.google.common.base.Function;
031import com.google.common.base.Preconditions;
032import com.google.common.collect.ImmutableList;
033import com.google.common.collect.Lists;
034import com.google.common.collect.Ordering;
035
036import java.lang.reflect.Constructor;
037import java.lang.reflect.InvocationTargetException;
038import java.lang.reflect.UndeclaredThrowableException;
039import java.util.Arrays;
040import java.util.List;
041import java.util.concurrent.BlockingQueue;
042import java.util.concurrent.CancellationException;
043import java.util.concurrent.CountDownLatch;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.Executor;
046import java.util.concurrent.Future;
047import java.util.concurrent.LinkedBlockingQueue;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.TimeoutException;
050import java.util.concurrent.atomic.AtomicInteger;
051
052import javax.annotation.Nullable;
053
054/**
055 * Static utility methods pertaining to the {@link Future} interface.
056 *
057 * <p>Many of these methods use the {@link ListenableFuture} API; consult the
058 * Guava User Guide article on <a href=
059 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
060 * {@code ListenableFuture}</a>.
061 *
062 * @author Kevin Bourrillion
063 * @author Nishant Thakkar
064 * @author Sven Mawson
065 * @since 1.0
066 */
067@Beta
068public final class Futures {
069  private Futures() {}
070
071  /**
072   * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
073   * and a {@link Function} that maps from {@link Exception} instances into the
074   * appropriate checked type.
075   *
076   * <p>The given mapping function will be applied to an
077   * {@link InterruptedException}, a {@link CancellationException}, or an
078   * {@link ExecutionException} with the actual cause of the exception.
079   * See {@link Future#get()} for details on the exceptions thrown.
080   *
081   * @since 9.0 (source-compatible since 1.0)
082   */
083  public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
084      ListenableFuture<V> future, Function<Exception, X> mapper) {
085    return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
086  }
087
088  /**
089   * Creates a {@code ListenableFuture} which has its value set immediately upon
090   * construction. The getters just return the value. This {@code Future} can't
091   * be canceled or timed out and its {@code isDone()} method always returns
092   * {@code true}.
093   */
094  public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
095    SettableFuture<V> future = SettableFuture.create();
096    future.set(value);
097    return future;
098  }
099
100  /**
101   * Returns a {@code CheckedFuture} which has its value set immediately upon
102   * construction.
103   *
104   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
105   * method always returns {@code true}. Calling {@code get()} or {@code
106   * checkedGet()} will immediately return the provided value.
107   */
108  public static <V, X extends Exception> CheckedFuture<V, X>
109      immediateCheckedFuture(@Nullable V value) {
110    SettableFuture<V> future = SettableFuture.create();
111    future.set(value);
112    return Futures.makeChecked(future, new Function<Exception, X>() {
113      @Override
114      public X apply(Exception e) {
115        throw new AssertionError("impossible");
116      }
117    });
118  }
119
120  /**
121   * Returns a {@code ListenableFuture} which has an exception set immediately
122   * upon construction.
123   *
124   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
125   * method always returns {@code true}. Calling {@code get()} will immediately
126   * throw the provided {@code Throwable} wrapped in an {@code
127   * ExecutionException}.
128   *
129   * @throws Error if the throwable is an {@link Error}.
130   */
131  public static <V> ListenableFuture<V> immediateFailedFuture(
132      Throwable throwable) {
133    checkNotNull(throwable);
134    SettableFuture<V> future = SettableFuture.create();
135    future.setException(throwable);
136    return future;
137  }
138
139  /**
140   * Returns a {@code CheckedFuture} which has an exception set immediately upon
141   * construction.
142   *
143   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
144   * method always returns {@code true}. Calling {@code get()} will immediately
145   * throw the provided {@code Throwable} wrapped in an {@code
146   * ExecutionException}, and calling {@code checkedGet()} will throw the
147   * provided exception itself.
148   *
149   * @throws Error if the throwable is an {@link Error}.
150   */
151  public static <V, X extends Exception> CheckedFuture<V, X>
152      immediateFailedCheckedFuture(final X exception) {
153    checkNotNull(exception);
154    return makeChecked(Futures.<V>immediateFailedFuture(exception),
155        new Function<Exception, X>() {
156          @Override
157          public X apply(Exception e) {
158            return exception;
159          }
160        });
161  }
162
163  /**
164   * Returns a new {@code ListenableFuture} whose result is asynchronously
165   * derived from the result of the given {@code Future}. More precisely, the
166   * returned {@code Future} takes its result from a {@code Future} produced by
167   * applying the given {@code AsyncFunction} to the result of the original
168   * {@code Future}. Example:
169   *
170   * <pre>   {@code
171   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
172   *   AsyncFunction<RowKey, QueryResult> queryFunction =
173   *       new AsyncFunction<RowKey, QueryResult>() {
174   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
175   *           return dataService.read(rowKey);
176   *         }
177   *       };
178   *   ListenableFuture<QueryResult> queryFuture =
179   *       transform(rowKeyFuture, queryFunction);
180   * }</pre>
181   *
182   * Note: If the derived {@code Future} is slow or heavyweight to create
183   * (whether the {@code Future} itself is slow or heavyweight to complete is
184   * irrelevant), consider {@linkplain #transform(ListenableFuture,
185   * AsyncFunction, Executor) supplying an executor}. If you do not supply an
186   * executor, {@code transform} will use {@link
187   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
188   * caveats for heavier operations. For example, the call to {@code
189   * function.apply} may run on an unpredictable or undesirable thread:
190   *
191   * <ul>
192   * <li>If the input {@code Future} is done at the time {@code transform} is
193   * called, {@code transform} will call {@code function.apply} inline.
194   * <li>If the input {@code Future} is not yet done, {@code transform} will
195   * schedule {@code function.apply} to be run by the thread that completes the
196   * input {@code Future}, which may be an internal system thread such as an
197   * RPC network thread.
198   * </ul>
199   *
200   * Also note that, regardless of which thread executes {@code
201   * function.apply}, all other registered but unexecuted listeners are
202   * prevented from running during its execution, even if those listeners are
203   * to run in other executors.
204   *
205   * <p>The returned {@code Future} attempts to keep its cancellation state in
206   * sync with that of the input future and that of the future returned by the
207   * function. That is, if the returned {@code Future} is cancelled, it will
208   * attempt to cancel the other two, and if either of the other two is
209   * cancelled, the returned {@code Future} will receive a callback in which it
210   * will attempt to cancel itself.
211   *
212   * @param input The future to transform
213   * @param function A function to transform the result of the input future
214   *     to the result of the output future
215   * @return A future that holds result of the function (if the input succeeded)
216   *     or the original input's failure (if not)
217   * @since 11.0
218   */
219  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
220      AsyncFunction<? super I, ? extends O> function) {
221    return transform(input, function, MoreExecutors.sameThreadExecutor());
222  }
223
224  /**
225   * Returns a new {@code ListenableFuture} whose result is asynchronously
226   * derived from the result of the given {@code Future}. More precisely, the
227   * returned {@code Future} takes its result from a {@code Future} produced by
228   * applying the given {@code AsyncFunction} to the result of the original
229   * {@code Future}. Example:
230   *
231   * <pre>   {@code
232   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
233   *   AsyncFunction<RowKey, QueryResult> queryFunction =
234   *       new AsyncFunction<RowKey, QueryResult>() {
235   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
236   *           return dataService.read(rowKey);
237   *         }
238   *       };
239   *   ListenableFuture<QueryResult> queryFuture =
240   *       transform(rowKeyFuture, queryFunction, executor);
241   * }</pre>
242   *
243   * <p>The returned {@code Future} attempts to keep its cancellation state in
244   * sync with that of the input future and that of the future returned by the
245   * chain function. That is, if the returned {@code Future} is cancelled, it
246   * will attempt to cancel the other two, and if either of the other two is
247   * cancelled, the returned {@code Future} will receive a callback in which it
248   * will attempt to cancel itself.
249   *
250   * <p>When the execution of {@code function.apply} is fast and lightweight
251   * (though the {@code Future} it returns need not meet these criteria),
252   * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting
253   * the executor} or explicitly specifying {@code sameThreadExecutor}.
254   * However, be aware of the caveats documented in the link above.
255   *
256   * @param input The future to transform
257   * @param function A function to transform the result of the input future
258   *     to the result of the output future
259   * @param executor Executor to run the function in.
260   * @return A future that holds result of the function (if the input succeeded)
261   *     or the original input's failure (if not)
262   * @since 11.0
263   */
264  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
265      AsyncFunction<? super I, ? extends O> function,
266      Executor executor) {
267    ChainingListenableFuture<I, O> output =
268        new ChainingListenableFuture<I, O>(function, input);
269    input.addListener(output, executor);
270    return output;
271  }
272
273  /**
274   * Returns a new {@code ListenableFuture} whose result is the product of
275   * applying the given {@code Function} to the result of the given {@code
276   * Future}. Example:
277   *
278   * <pre>   {@code
279   *   ListenableFuture<QueryResult> queryFuture = ...;
280   *   Function<QueryResult, List<Row>> rowsFunction =
281   *       new Function<QueryResult, List<Row>>() {
282   *         public List<Row> apply(QueryResult queryResult) {
283   *           return queryResult.getRows();
284   *         }
285   *       };
286   *   ListenableFuture<List<Row>> rowsFuture =
287   *       transform(queryFuture, rowsFunction);
288   * }</pre>
289   *
290   * Note: If the transformation is slow or heavyweight, consider {@linkplain
291   * #transform(ListenableFuture, Function, Executor) supplying an executor}.
292   * If you do not supply an executor, {@code transform} will use {@link
293   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
294   * caveats for heavier operations.  For example, the call to {@code
295   * function.apply} may run on an unpredictable or undesirable thread:
296   *
297   * <ul>
298   * <li>If the input {@code Future} is done at the time {@code transform} is
299   * called, {@code transform} will call {@code function.apply} inline.
300   * <li>If the input {@code Future} is not yet done, {@code transform} will
301   * schedule {@code function.apply} to be run by the thread that completes the
302   * input {@code Future}, which may be an internal system thread such as an
303   * RPC network thread.
304   * </ul>
305   *
306   * Also note that, regardless of which thread executes {@code
307   * function.apply}, all other registered but unexecuted listeners are
308   * prevented from running during its execution, even if those listeners are
309   * to run in other executors.
310   *
311   * <p>The returned {@code Future} attempts to keep its cancellation state in
312   * sync with that of the input future. That is, if the returned {@code Future}
313   * is cancelled, it will attempt to cancel the input, and if the input is
314   * cancelled, the returned {@code Future} will receive a callback in which it
315   * will attempt to cancel itself.
316   *
317   * <p>An example use of this method is to convert a serializable object
318   * returned from an RPC into a POJO.
319   *
320   * @param input The future to transform
321   * @param function A Function to transform the results of the provided future
322   *     to the results of the returned future.  This will be run in the thread
323   *     that notifies input it is complete.
324   * @return A future that holds result of the transformation.
325   * @since 9.0 (in 1.0 as {@code compose})
326   */
327  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
328      final Function<? super I, ? extends O> function) {
329    return transform(input, function, MoreExecutors.sameThreadExecutor());
330  }
331
332  /**
333   * Returns a new {@code ListenableFuture} whose result is the product of
334   * applying the given {@code Function} to the result of the given {@code
335   * Future}. Example:
336   *
337   * <pre>   {@code
338   *   ListenableFuture<QueryResult> queryFuture = ...;
339   *   Function<QueryResult, List<Row>> rowsFunction =
340   *       new Function<QueryResult, List<Row>>() {
341   *         public List<Row> apply(QueryResult queryResult) {
342   *           return queryResult.getRows();
343   *         }
344   *       };
345   *   ListenableFuture<List<Row>> rowsFuture =
346   *       transform(queryFuture, rowsFunction, executor);
347   * }</pre>
348   *
349   * <p>The returned {@code Future} attempts to keep its cancellation state in
350   * sync with that of the input future. That is, if the returned {@code Future}
351   * is cancelled, it will attempt to cancel the input, and if the input is
352   * cancelled, the returned {@code Future} will receive a callback in which it
353   * will attempt to cancel itself.
354   *
355   * <p>An example use of this method is to convert a serializable object
356   * returned from an RPC into a POJO.
357   *
358   * <p>When the transformation is fast and lightweight, consider {@linkplain
359   * #transform(ListenableFuture, Function) omitting the executor} or
360   * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
361   * caveats documented in the link above.
362   *
363   * @param input The future to transform
364   * @param function A Function to transform the results of the provided future
365   *     to the results of the returned future.
366   * @param executor Executor to run the function in.
367   * @return A future that holds result of the transformation.
368   * @since 9.0 (in 2.0 as {@code compose})
369   */
370  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
371      final Function<? super I, ? extends O> function, Executor executor) {
372    checkNotNull(function);
373    AsyncFunction<I, O> wrapperFunction
374        = new AsyncFunction<I, O>() {
375            @Override public ListenableFuture<O> apply(I input) {
376              O output = function.apply(input);
377              return immediateFuture(output);
378            }
379        };
380    return transform(input, wrapperFunction, executor);
381  }
382
383  /**
384   * Like {@link #transform(ListenableFuture, Function)} except that the
385   * transformation {@code function} is invoked on each call to
386   * {@link Future#get() get()} on the returned future.
387   *
388   * <p>The returned {@code Future} reflects the input's cancellation
389   * state directly, and any attempt to cancel the returned Future is likewise
390   * passed through to the input Future.
391   *
392   * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
393   * only apply the timeout to the execution of the underlying {@code Future},
394   * <em>not</em> to the execution of the transformation function.
395   *
396   * <p>The primary audience of this method is callers of {@code transform}
397   * who don't have a {@code ListenableFuture} available and
398   * do not mind repeated, lazy function evaluation.
399   *
400   * @param input The future to transform
401   * @param function A Function to transform the results of the provided future
402   *     to the results of the returned future.
403   * @return A future that returns the result of the transformation.
404   * @since 10.0
405   */
406  @Beta
407  public static <I, O> Future<O> lazyTransform(final Future<I> input,
408      final Function<? super I, ? extends O> function) {
409    checkNotNull(input);
410    checkNotNull(function);
411    return new Future<O>() {
412
413      @Override
414      public boolean cancel(boolean mayInterruptIfRunning) {
415        return input.cancel(mayInterruptIfRunning);
416      }
417
418      @Override
419      public boolean isCancelled() {
420        return input.isCancelled();
421      }
422
423      @Override
424      public boolean isDone() {
425        return input.isDone();
426      }
427
428      @Override
429      public O get() throws InterruptedException, ExecutionException {
430        return applyTransformation(input.get());
431      }
432
433      @Override
434      public O get(long timeout, TimeUnit unit)
435          throws InterruptedException, ExecutionException, TimeoutException {
436        return applyTransformation(input.get(timeout, unit));
437      }
438
439      private O applyTransformation(I input) throws ExecutionException {
440        try {
441          return function.apply(input);
442        } catch (Throwable t) {
443          throw new ExecutionException(t);
444        }
445      }
446    };
447  }
448
449  /**
450   * An implementation of {@code ListenableFuture} that also implements
451   * {@code Runnable} so that it can be used to nest ListenableFutures.
452   * Once the passed-in {@code ListenableFuture} is complete, it calls the
453   * passed-in {@code Function} to generate the result.
454   *
455   * <p>If the function throws any checked exceptions, they should be wrapped
456   * in a {@code UndeclaredThrowableException} so that this class can get
457   * access to the cause.
458   */
459  private static class ChainingListenableFuture<I, O>
460      extends AbstractFuture<O> implements Runnable {
461
462    private AsyncFunction<? super I, ? extends O> function;
463    private ListenableFuture<? extends I> inputFuture;
464    private volatile ListenableFuture<? extends O> outputFuture;
465    private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
466        new LinkedBlockingQueue<Boolean>(1);
467    private final CountDownLatch outputCreated = new CountDownLatch(1);
468
469    private ChainingListenableFuture(
470        AsyncFunction<? super I, ? extends O> function,
471        ListenableFuture<? extends I> inputFuture) {
472      this.function = checkNotNull(function);
473      this.inputFuture = checkNotNull(inputFuture);
474    }
475
476    @Override
477    public boolean cancel(boolean mayInterruptIfRunning) {
478      /*
479       * Our additional cancellation work needs to occur even if
480       * !mayInterruptIfRunning, so we can't move it into interruptTask().
481       */
482      if (super.cancel(mayInterruptIfRunning)) {
483        // This should never block since only one thread is allowed to cancel
484        // this Future.
485        putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning);
486        cancel(inputFuture, mayInterruptIfRunning);
487        cancel(outputFuture, mayInterruptIfRunning);
488        return true;
489      }
490      return false;
491    }
492
493    private void cancel(@Nullable Future<?> future,
494        boolean mayInterruptIfRunning) {
495      if (future != null) {
496        future.cancel(mayInterruptIfRunning);
497      }
498    }
499
500    @Override
501    public void run() {
502      try {
503        I sourceResult;
504        try {
505          sourceResult = getUninterruptibly(inputFuture);
506        } catch (CancellationException e) {
507          // Cancel this future and return.
508          // At this point, inputFuture is cancelled and outputFuture doesn't
509          // exist, so the value of mayInterruptIfRunning is irrelevant.
510          cancel(false);
511          return;
512        } catch (ExecutionException e) {
513          // Set the cause of the exception as this future's exception
514          setException(e.getCause());
515          return;
516        }
517
518        final ListenableFuture<? extends O> outputFuture = this.outputFuture =
519            function.apply(sourceResult);
520        if (isCancelled()) {
521          // Handles the case where cancel was called while the function was
522          // being applied.
523          // There is a gap in cancel(boolean) between calling sync.cancel()
524          // and storing the value of mayInterruptIfRunning, so this thread
525          // needs to block, waiting for that value.
526          outputFuture.cancel(
527              takeUninterruptibly(mayInterruptIfRunningChannel));
528          this.outputFuture = null;
529          return;
530        }
531        outputFuture.addListener(new Runnable() {
532            @Override
533            public void run() {
534              try {
535                // Here it would have been nice to have had an
536                // UninterruptibleListenableFuture, but we don't want to start a
537                // combinatorial explosion of interfaces, so we have to make do.
538                set(getUninterruptibly(outputFuture));
539              } catch (CancellationException e) {
540                // Cancel this future and return.
541                // At this point, inputFuture and outputFuture are done, so the
542                // value of mayInterruptIfRunning is irrelevant.
543                cancel(false);
544                return;
545              } catch (ExecutionException e) {
546                // Set the cause of the exception as this future's exception
547                setException(e.getCause());
548              } finally {
549                // Don't pin inputs beyond completion
550                ChainingListenableFuture.this.outputFuture = null;
551              }
552            }
553          }, MoreExecutors.sameThreadExecutor());
554      } catch (UndeclaredThrowableException e) {
555        // Set the cause of the exception as this future's exception
556        setException(e.getCause());
557      } catch (Exception e) {
558        // This exception is irrelevant in this thread, but useful for the
559        // client
560        setException(e);
561      } catch (Error e) {
562        // Propagate errors up ASAP - our superclass will rethrow the error
563        setException(e);
564      } finally {
565        // Don't pin inputs beyond completion
566        function = null;
567        inputFuture = null;
568        // Allow our get routines to examine outputFuture now.
569        outputCreated.countDown();
570      }
571    }
572  }
573
574  /**
575   * Returns a new {@code ListenableFuture} whose result is the product of
576   * calling {@code get()} on the {@code Future} nested within the given {@code
577   * Future}, effectively chaining the futures one after the other.  Example:
578   *
579   * <pre>   {@code
580   *   SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
581   *   ListenableFuture<String> dereferenced = dereference(nested);
582   * }</pre>
583   *
584   * <p>This call has the same cancellation and execution semantics as {@link
585   * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
586   * Future} attempts to keep its cancellation state in sync with both the
587   * input {@code Future} and the nested {@code Future}.  The transformation
588   * is very lightweight and therefore takes place in the thread that called
589   * {@code dereference}.
590   *
591   * @param nested The nested future to transform.
592   * @return A future that holds result of the inner future.
593   * @since 13.0
594   */
595  @Beta
596  @SuppressWarnings({"rawtypes", "unchecked"})
597  public static <V> ListenableFuture<V> dereference(
598      ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
599    return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
600  }
601
602  /**
603   * Helper {@code Function} for {@link #dereference}.
604   */
605  private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
606      new AsyncFunction<ListenableFuture<Object>, Object>() {
607        @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
608          return input;
609        }
610      };
611
612  /**
613   * Creates a new {@code ListenableFuture} whose value is a list containing the
614   * values of all its input futures, if all succeed. If any input fails, the
615   * returned future fails.
616   *
617   * <p>The list of results is in the same order as the input list.
618   *
619   * <p>Canceling this future does not cancel any of the component futures;
620   * however, if any of the provided futures fails or is canceled, this one is,
621   * too.
622   *
623   * @param futures futures to combine
624   * @return a future that provides a list of the results of the component
625   *         futures
626   * @since 10.0
627   */
628  @Beta
629  public static <V> ListenableFuture<List<V>> allAsList(
630      ListenableFuture<? extends V>... futures) {
631    return new ListFuture<V>(ImmutableList.copyOf(futures), true,
632        MoreExecutors.sameThreadExecutor());
633  }
634
635  /**
636   * Creates a new {@code ListenableFuture} whose value is a list containing the
637   * values of all its input futures, if all succeed. If any input fails, the
638   * returned future fails.
639   *
640   * <p>The list of results is in the same order as the input list.
641   *
642   * <p>Canceling this future does not cancel any of the component futures;
643   * however, if any of the provided futures fails or is canceled, this one is,
644   * too.
645   *
646   * @param futures futures to combine
647   * @return a future that provides a list of the results of the component
648   *         futures
649   * @since 10.0
650   */
651  @Beta
652  public static <V> ListenableFuture<List<V>> allAsList(
653      Iterable<? extends ListenableFuture<? extends V>> futures) {
654    return new ListFuture<V>(ImmutableList.copyOf(futures), true,
655        MoreExecutors.sameThreadExecutor());
656  }
657
658  /**
659   * Creates a new {@code ListenableFuture} whose value is a list containing the
660   * values of all its successful input futures. The list of results is in the
661   * same order as the input list, and if any of the provided futures fails or
662   * is canceled, its corresponding position will contain {@code null} (which is
663   * indistinguishable from the future having a successful value of
664   * {@code null}).
665   *
666   * @param futures futures to combine
667   * @return a future that provides a list of the results of the component
668   *         futures
669   * @since 10.0
670   */
671  @Beta
672  public static <V> ListenableFuture<List<V>> successfulAsList(
673      ListenableFuture<? extends V>... futures) {
674    return new ListFuture<V>(ImmutableList.copyOf(futures), false,
675        MoreExecutors.sameThreadExecutor());
676  }
677
678  /**
679   * Creates a new {@code ListenableFuture} whose value is a list containing the
680   * values of all its successful input futures. The list of results is in the
681   * same order as the input list, and if any of the provided futures fails or
682   * is canceled, its corresponding position will contain {@code null} (which is
683   * indistinguishable from the future having a successful value of
684   * {@code null}).
685   *
686   * @param futures futures to combine
687   * @return a future that provides a list of the results of the component
688   *         futures
689   * @since 10.0
690   */
691  @Beta
692  public static <V> ListenableFuture<List<V>> successfulAsList(
693      Iterable<? extends ListenableFuture<? extends V>> futures) {
694    return new ListFuture<V>(ImmutableList.copyOf(futures), false,
695        MoreExecutors.sameThreadExecutor());
696  }
697
698  /**
699   * Registers separate success and failure callbacks to be run when the {@code
700   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
701   * complete} or, if the computation is already complete, immediately.
702   *
703   * <p>There is no guaranteed ordering of execution of callbacks, but any
704   * callback added through this method is guaranteed to be called once the
705   * computation is complete.
706   *
707   * Example: <pre> {@code
708   * ListenableFuture<QueryResult> future = ...;
709   * addCallback(future,
710   *     new FutureCallback<QueryResult> {
711   *       public void onSuccess(QueryResult result) {
712   *         storeInCache(result);
713   *       }
714   *       public void onFailure(Throwable t) {
715   *         reportError(t);
716   *       }
717   *     });}</pre>
718   *
719   * Note: If the callback is slow or heavyweight, consider {@linkplain
720   * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an
721   * executor}. If you do not supply an executor, {@code addCallback} will use
722   * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries
723   * some caveats for heavier operations. For example, the callback may run on
724   * an unpredictable or undesirable thread:
725   *
726   * <ul>
727   * <li>If the input {@code Future} is done at the time {@code addCallback} is
728   * called, {@code addCallback} will execute the callback inline.
729   * <li>If the input {@code Future} is not yet done, {@code addCallback} will
730   * schedule the callback to be run by the thread that completes the input
731   * {@code Future}, which may be an internal system thread such as an RPC
732   * network thread.
733   * </ul>
734   *
735   * Also note that, regardless of which thread executes the callback, all
736   * other registered but unexecuted listeners are prevented from running
737   * during its execution, even if those listeners are to run in other
738   * executors.
739   *
740   * <p>For a more general interface to attach a completion listener to a
741   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
742   *
743   * @param future The future attach the callback to.
744   * @param callback The callback to invoke when {@code future} is completed.
745   * @since 10.0
746   */
747  public static <V> void addCallback(ListenableFuture<V> future,
748      FutureCallback<? super V> callback) {
749    addCallback(future, callback, MoreExecutors.sameThreadExecutor());
750  }
751
752  /**
753   * Registers separate success and failure callbacks to be run when the {@code
754   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
755   * complete} or, if the computation is already complete, immediately.
756   *
757   * <p>The callback is run in {@code executor}.
758   * There is no guaranteed ordering of execution of callbacks, but any
759   * callback added through this method is guaranteed to be called once the
760   * computation is complete.
761   *
762   * Example: <pre> {@code
763   * ListenableFuture<QueryResult> future = ...;
764   * Executor e = ...
765   * addCallback(future, e,
766   *     new FutureCallback<QueryResult> {
767   *       public void onSuccess(QueryResult result) {
768   *         storeInCache(result);
769   *       }
770   *       public void onFailure(Throwable t) {
771   *         reportError(t);
772   *       }
773   *     });}</pre>
774   *
775   * When the callback is fast and lightweight, consider {@linkplain
776   * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or
777   * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
778   * caveats documented in the link above.
779   *
780   * <p>For a more general interface to attach a completion listener to a
781   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
782   *
783   * @param future The future attach the callback to.
784   * @param callback The callback to invoke when {@code future} is completed.
785   * @param executor The executor to run {@code callback} when the future
786   *    completes.
787   * @since 10.0
788   */
789  public static <V> void addCallback(final ListenableFuture<V> future,
790      final FutureCallback<? super V> callback, Executor executor) {
791    Preconditions.checkNotNull(callback);
792    Runnable callbackListener = new Runnable() {
793      @Override
794      public void run() {
795        try {
796          // TODO(user): (Before Guava release), validate that this
797          // is the thing for IE.
798          V value = getUninterruptibly(future);
799          callback.onSuccess(value);
800        } catch (ExecutionException e) {
801          callback.onFailure(e.getCause());
802        } catch (RuntimeException e) {
803          callback.onFailure(e);
804        } catch (Error e) {
805          callback.onFailure(e);
806        }
807      }
808    };
809    future.addListener(callbackListener, executor);
810  }
811
812  /**
813   * Returns the result of {@link Future#get()}, converting most exceptions to a
814   * new instance of the given checked exception type. This reduces boilerplate
815   * for a common use of {@code Future} in which it is unnecessary to
816   * programmatically distinguish between exception types or to extract other
817   * information from the exception instance.
818   *
819   * <p>Exceptions from {@code Future.get} are treated as follows:
820   * <ul>
821   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
822   *     {@code X} if the cause is a checked exception, an {@link
823   *     UncheckedExecutionException} if the cause is a {@code
824   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
825   *     {@code Error}.
826   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
827   *     restoring the interrupt).
828   * <li>Any {@link CancellationException} is propagated untouched, as is any
829   *     other {@link RuntimeException} (though {@code get} implementations are
830   *     discouraged from throwing such exceptions).
831   * </ul>
832   *
833   * The overall principle is to continue to treat every checked exception as a
834   * checked exception, every unchecked exception as an unchecked exception, and
835   * every error as an error. In addition, the cause of any {@code
836   * ExecutionException} is wrapped in order to ensure that the new stack trace
837   * matches that of the current thread.
838   *
839   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
840   * public constructor that accepts zero or more arguments, all of type {@code
841   * String} or {@code Throwable} (preferring constructors with at least one
842   * {@code String}) and calling the constructor via reflection. If the
843   * exception did not already have a cause, one is set by calling {@link
844   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
845   * {@code IllegalArgumentException} is thrown.
846   *
847   * @throws X if {@code get} throws any checked exception except for an {@code
848   *         ExecutionException} whose cause is not itself a checked exception
849   * @throws UncheckedExecutionException if {@code get} throws an {@code
850   *         ExecutionException} with a {@code RuntimeException} as its cause
851   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
852   *         with an {@code Error} as its cause
853   * @throws CancellationException if {@code get} throws a {@code
854   *         CancellationException}
855   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
856   *         RuntimeException} or does not have a suitable constructor
857   * @since 10.0
858   */
859  @Beta
860  public static <V, X extends Exception> V get(
861      Future<V> future, Class<X> exceptionClass) throws X {
862    checkNotNull(future);
863    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
864        "Futures.get exception type (%s) must not be a RuntimeException",
865        exceptionClass);
866    try {
867      return future.get();
868    } catch (InterruptedException e) {
869      currentThread().interrupt();
870      throw newWithCause(exceptionClass, e);
871    } catch (ExecutionException e) {
872      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
873      throw new AssertionError();
874    }
875  }
876
877  /**
878   * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
879   * exceptions to a new instance of the given checked exception type. This
880   * reduces boilerplate for a common use of {@code Future} in which it is
881   * unnecessary to programmatically distinguish between exception types or to
882   * extract other information from the exception instance.
883   *
884   * <p>Exceptions from {@code Future.get} are treated as follows:
885   * <ul>
886   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
887   *     {@code X} if the cause is a checked exception, an {@link
888   *     UncheckedExecutionException} if the cause is a {@code
889   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
890   *     {@code Error}.
891   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
892   *     restoring the interrupt).
893   * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
894   * <li>Any {@link CancellationException} is propagated untouched, as is any
895   *     other {@link RuntimeException} (though {@code get} implementations are
896   *     discouraged from throwing such exceptions).
897   * </ul>
898   *
899   * The overall principle is to continue to treat every checked exception as a
900   * checked exception, every unchecked exception as an unchecked exception, and
901   * every error as an error. In addition, the cause of any {@code
902   * ExecutionException} is wrapped in order to ensure that the new stack trace
903   * matches that of the current thread.
904   *
905   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
906   * public constructor that accepts zero or more arguments, all of type {@code
907   * String} or {@code Throwable} (preferring constructors with at least one
908   * {@code String}) and calling the constructor via reflection. If the
909   * exception did not already have a cause, one is set by calling {@link
910   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
911   * {@code IllegalArgumentException} is thrown.
912   *
913   * @throws X if {@code get} throws any checked exception except for an {@code
914   *         ExecutionException} whose cause is not itself a checked exception
915   * @throws UncheckedExecutionException if {@code get} throws an {@code
916   *         ExecutionException} with a {@code RuntimeException} as its cause
917   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
918   *         with an {@code Error} as its cause
919   * @throws CancellationException if {@code get} throws a {@code
920   *         CancellationException}
921   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
922   *         RuntimeException} or does not have a suitable constructor
923   * @since 10.0
924   */
925  @Beta
926  public static <V, X extends Exception> V get(
927      Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
928      throws X {
929    checkNotNull(future);
930    checkNotNull(unit);
931    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
932        "Futures.get exception type (%s) must not be a RuntimeException",
933        exceptionClass);
934    try {
935      return future.get(timeout, unit);
936    } catch (InterruptedException e) {
937      currentThread().interrupt();
938      throw newWithCause(exceptionClass, e);
939    } catch (TimeoutException e) {
940      throw newWithCause(exceptionClass, e);
941    } catch (ExecutionException e) {
942      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
943      throw new AssertionError();
944    }
945  }
946
947  private static <X extends Exception> void wrapAndThrowExceptionOrError(
948      Throwable cause, Class<X> exceptionClass) throws X {
949    if (cause instanceof Error) {
950      throw new ExecutionError((Error) cause);
951    }
952    if (cause instanceof RuntimeException) {
953      throw new UncheckedExecutionException(cause);
954    }
955    throw newWithCause(exceptionClass, cause);
956  }
957
958  /**
959   * Returns the result of calling {@link Future#get()} uninterruptibly on a
960   * task known not to throw a checked exception. This makes {@code Future} more
961   * suitable for lightweight, fast-running tasks that, barring bugs in the
962   * code, will not fail. This gives it exception-handling behavior similar to
963   * that of {@code ForkJoinTask.join}.
964   *
965   * <p>Exceptions from {@code Future.get} are treated as follows:
966   * <ul>
967   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
968   *     {@link UncheckedExecutionException} (if the cause is an {@code
969   *     Exception}) or {@link ExecutionError} (if the cause is an {@code
970   *     Error}).
971   * <li>Any {@link InterruptedException} causes a retry of the {@code get}
972   *     call. The interrupt is restored before {@code getUnchecked} returns.
973   * <li>Any {@link CancellationException} is propagated untouched. So is any
974   *     other {@link RuntimeException} ({@code get} implementations are
975   *     discouraged from throwing such exceptions).
976   * </ul>
977   *
978   * The overall principle is to eliminate all checked exceptions: to loop to
979   * avoid {@code InterruptedException}, to pass through {@code
980   * CancellationException}, and to wrap any exception from the underlying
981   * computation in an {@code UncheckedExecutionException} or {@code
982   * ExecutionError}.
983   *
984   * <p>For an uninterruptible {@code get} that preserves other exceptions, see
985   * {@link Uninterruptibles#getUninterruptibly(Future)}.
986   *
987   * @throws UncheckedExecutionException if {@code get} throws an {@code
988   *         ExecutionException} with an {@code Exception} as its cause
989   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
990   *         with an {@code Error} as its cause
991   * @throws CancellationException if {@code get} throws a {@code
992   *         CancellationException}
993   * @since 10.0
994   */
995  @Beta
996  public static <V> V getUnchecked(Future<V> future) {
997    checkNotNull(future);
998    try {
999      return getUninterruptibly(future);
1000    } catch (ExecutionException e) {
1001      wrapAndThrowUnchecked(e.getCause());
1002      throw new AssertionError();
1003    }
1004  }
1005
1006  private static void wrapAndThrowUnchecked(Throwable cause) {
1007    if (cause instanceof Error) {
1008      throw new ExecutionError((Error) cause);
1009    }
1010    /*
1011     * It's a non-Error, non-Exception Throwable. From my survey of such
1012     * classes, I believe that most users intended to extend Exception, so we'll
1013     * treat it like an Exception.
1014     */
1015    throw new UncheckedExecutionException(cause);
1016  }
1017
1018  /*
1019   * TODO(user): FutureChecker interface for these to be static methods on? If
1020   * so, refer to it in the (static-method) Futures.get documentation
1021   */
1022
1023  /*
1024   * Arguably we don't need a timed getUnchecked because any operation slow
1025   * enough to require a timeout is heavyweight enough to throw a checked
1026   * exception and therefore be inappropriate to use with getUnchecked. Further,
1027   * it's not clear that converting the checked TimeoutException to a
1028   * RuntimeException -- especially to an UncheckedExecutionException, since it
1029   * wasn't thrown by the computation -- makes sense, and if we don't convert
1030   * it, the user still has to write a try-catch block.
1031   *
1032   * If you think you would use this method, let us know.
1033   */
1034
1035  private static <X extends Exception> X newWithCause(
1036      Class<X> exceptionClass, Throwable cause) {
1037    // getConstructors() guarantees this as long as we don't modify the array.
1038    @SuppressWarnings("unchecked")
1039    List<Constructor<X>> constructors =
1040        (List) Arrays.asList(exceptionClass.getConstructors());
1041    for (Constructor<X> constructor : preferringStrings(constructors)) {
1042      @Nullable X instance = newFromConstructor(constructor, cause);
1043      if (instance != null) {
1044        if (instance.getCause() == null) {
1045          instance.initCause(cause);
1046        }
1047        return instance;
1048      }
1049    }
1050    throw new IllegalArgumentException(
1051        "No appropriate constructor for exception of type " + exceptionClass
1052            + " in response to chained exception", cause);
1053  }
1054
1055  private static <X extends Exception> List<Constructor<X>>
1056      preferringStrings(List<Constructor<X>> constructors) {
1057    return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1058  }
1059
1060  private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1061      Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1062        @Override public Boolean apply(Constructor<?> input) {
1063          return asList(input.getParameterTypes()).contains(String.class);
1064        }
1065      }).reverse();
1066
1067  @Nullable private static <X> X newFromConstructor(
1068      Constructor<X> constructor, Throwable cause) {
1069    Class<?>[] paramTypes = constructor.getParameterTypes();
1070    Object[] params = new Object[paramTypes.length];
1071    for (int i = 0; i < paramTypes.length; i++) {
1072      Class<?> paramType = paramTypes[i];
1073      if (paramType.equals(String.class)) {
1074        params[i] = cause.toString();
1075      } else if (paramType.equals(Throwable.class)) {
1076        params[i] = cause;
1077      } else {
1078        return null;
1079      }
1080    }
1081    try {
1082      return constructor.newInstance(params);
1083    } catch (IllegalArgumentException e) {
1084      return null;
1085    } catch (InstantiationException e) {
1086      return null;
1087    } catch (IllegalAccessException e) {
1088      return null;
1089    } catch (InvocationTargetException e) {
1090      return null;
1091    }
1092  }
1093
1094  /**
1095   * Class that implements {@link #allAsList} and {@link #successfulAsList}.
1096   * The idea is to create a (null-filled) List and register a listener with
1097   * each component future to fill out the value in the List when that future
1098   * completes.
1099   */
1100  private static class ListFuture<V> extends AbstractFuture<List<V>> {
1101    ImmutableList<? extends ListenableFuture<? extends V>> futures;
1102    final boolean allMustSucceed;
1103    final AtomicInteger remaining;
1104    List<V> values;
1105
1106    /**
1107     * Constructor.
1108     *
1109     * @param futures all the futures to build the list from
1110     * @param allMustSucceed whether a single failure or cancellation should
1111     *        propagate to this future
1112     * @param listenerExecutor used to run listeners on all the passed in
1113     *        futures.
1114     */
1115    ListFuture(
1116        final ImmutableList<? extends ListenableFuture<? extends V>> futures,
1117        final boolean allMustSucceed, final Executor listenerExecutor) {
1118      this.futures = futures;
1119      this.values = Lists.newArrayListWithCapacity(futures.size());
1120      this.allMustSucceed = allMustSucceed;
1121      this.remaining = new AtomicInteger(futures.size());
1122
1123      init(listenerExecutor);
1124    }
1125
1126    private void init(final Executor listenerExecutor) {
1127      // First, schedule cleanup to execute when the Future is done.
1128      addListener(new Runnable() {
1129        @Override
1130        public void run() {
1131          // By now the values array has either been set as the Future's value,
1132          // or (in case of failure) is no longer useful.
1133          ListFuture.this.values = null;
1134
1135          // Let go of the memory held by other futures
1136          ListFuture.this.futures = null;
1137        }
1138      }, MoreExecutors.sameThreadExecutor());
1139
1140      // Now begin the "real" initialization.
1141
1142      // Corner case: List is empty.
1143      if (futures.isEmpty()) {
1144        set(Lists.newArrayList(values));
1145        return;
1146      }
1147
1148      // Populate the results list with null initially.
1149      for (int i = 0; i < futures.size(); ++i) {
1150        values.add(null);
1151      }
1152
1153      // Register a listener on each Future in the list to update
1154      // the state of this future.
1155      // Note that if all the futures on the list are done prior to completing
1156      // this loop, the last call to addListener() will callback to
1157      // setOneValue(), transitively call our cleanup listener, and set
1158      // this.futures to null.
1159      // We store a reference to futures to avoid the NPE.
1160      ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures;
1161      for (int i = 0; i < localFutures.size(); i++) {
1162        final ListenableFuture<? extends V> listenable = localFutures.get(i);
1163        final int index = i;
1164        listenable.addListener(new Runnable() {
1165          @Override
1166          public void run() {
1167            setOneValue(index, listenable);
1168          }
1169        }, listenerExecutor);
1170      }
1171    }
1172
1173    /**
1174     * Sets the value at the given index to that of the given future.
1175     */
1176    private void setOneValue(int index, Future<? extends V> future) {
1177      List<V> localValues = values;
1178      if (isDone() || localValues == null) {
1179        // Some other future failed or has been cancelled, causing this one to
1180        // also be cancelled or have an exception set. This should only happen
1181        // if allMustSucceed is true.
1182        checkState(allMustSucceed,
1183            "Future was done before all dependencies completed");
1184        return;
1185      }
1186
1187      try {
1188        checkState(future.isDone(),
1189            "Tried to set value from future which is not done");
1190        localValues.set(index, getUninterruptibly(future));
1191      } catch (CancellationException e) {
1192        if (allMustSucceed) {
1193          // Set ourselves as cancelled. Let the input futures keep running
1194          // as some of them may be used elsewhere.
1195          // (Currently we don't override interruptTask, so
1196          // mayInterruptIfRunning==false isn't technically necessary.)
1197          cancel(false);
1198        }
1199      } catch (ExecutionException e) {
1200        if (allMustSucceed) {
1201          // As soon as the first one fails, throw the exception up.
1202          // The result of all other inputs is then ignored.
1203          setException(e.getCause());
1204        }
1205      } catch (RuntimeException e) {
1206        if (allMustSucceed) {
1207          setException(e);
1208        }
1209      } catch (Error e) {
1210        // Propagate errors up ASAP - our superclass will rethrow the error
1211        setException(e);
1212      } finally {
1213        int newRemaining = remaining.decrementAndGet();
1214        checkState(newRemaining >= 0, "Less than 0 remaining futures");
1215        if (newRemaining == 0) {
1216          localValues = values;
1217          if (localValues != null) {
1218            set(Lists.newArrayList(localValues));
1219          } else {
1220            checkState(isDone());
1221          }
1222        }
1223      }
1224    }
1225
1226  }
1227
1228  /**
1229   * A checked future that uses a function to map from exceptions to the
1230   * appropriate checked type.
1231   */
1232  private static class MappingCheckedFuture<V, X extends Exception> extends
1233      AbstractCheckedFuture<V, X> {
1234
1235    final Function<Exception, X> mapper;
1236
1237    MappingCheckedFuture(ListenableFuture<V> delegate,
1238        Function<Exception, X> mapper) {
1239      super(delegate);
1240
1241      this.mapper = checkNotNull(mapper);
1242    }
1243
1244    @Override
1245    protected X mapException(Exception e) {
1246      return mapper.apply(e);
1247    }
1248  }
1249}