001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021
022import com.google.common.annotations.Beta;
023import com.google.common.collect.Lists;
024import com.google.common.collect.Queues;
025
026import java.util.Collection;
027import java.util.Collections;
028import java.util.Iterator;
029import java.util.List;
030import java.util.concurrent.BlockingQueue;
031import java.util.concurrent.Callable;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.Future;
036import java.util.concurrent.RejectedExecutionException;
037import java.util.concurrent.ScheduledExecutorService;
038import java.util.concurrent.ScheduledFuture;
039import java.util.concurrent.ScheduledThreadPoolExecutor;
040import java.util.concurrent.ThreadFactory;
041import java.util.concurrent.ThreadPoolExecutor;
042import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
043import java.util.concurrent.TimeUnit;
044import java.util.concurrent.TimeoutException;
045import java.util.concurrent.locks.Condition;
046import java.util.concurrent.locks.Lock;
047import java.util.concurrent.locks.ReentrantLock;
048
049/**
050 * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link
051 * ExecutorService}, and {@link ThreadFactory}.
052 *
053 * @author Eric Fellheimer
054 * @author Kyle Littlefield
055 * @author Justin Mahoney
056 * @since 3.0
057 */
058public final class MoreExecutors {
059  private MoreExecutors() {}
060
061  /**
062   * Converts the given ThreadPoolExecutor into an ExecutorService that exits
063   * when the application is complete.  It does so by using daemon threads and
064   * adding a shutdown hook to wait for their completion.
065   *
066   * <p>This is mainly for fixed thread pools.
067   * See {@link Executors#newFixedThreadPool(int)}.
068   *
069   * @param executor the executor to modify to make sure it exits when the
070   *        application is finished
071   * @param terminationTimeout how long to wait for the executor to
072   *        finish before terminating the JVM
073   * @param timeUnit unit of time for the time parameter
074   * @return an unmodifiable version of the input which will not hang the JVM
075   */
076  @Beta
077  public static ExecutorService getExitingExecutorService(
078      ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
079    executor.setThreadFactory(new ThreadFactoryBuilder()
080        .setDaemon(true)
081        .setThreadFactory(executor.getThreadFactory())
082        .build());
083
084    ExecutorService service = Executors.unconfigurableExecutorService(executor);
085
086    addDelayedShutdownHook(service, terminationTimeout, timeUnit);
087
088    return service;
089  }
090
091  /**
092   * Converts the given ScheduledThreadPoolExecutor into a
093   * ScheduledExecutorService that exits when the application is complete.  It
094   * does so by using daemon threads and adding a shutdown hook to wait for
095   * their completion.
096   *
097   * <p>This is mainly for fixed thread pools.
098   * See {@link Executors#newScheduledThreadPool(int)}.
099   *
100   * @param executor the executor to modify to make sure it exits when the
101   *        application is finished
102   * @param terminationTimeout how long to wait for the executor to
103   *        finish before terminating the JVM
104   * @param timeUnit unit of time for the time parameter
105   * @return an unmodifiable version of the input which will not hang the JVM
106   */
107  @Beta
108  public static ScheduledExecutorService getExitingScheduledExecutorService(
109      ScheduledThreadPoolExecutor executor, long terminationTimeout,
110      TimeUnit timeUnit) {
111    executor.setThreadFactory(new ThreadFactoryBuilder()
112        .setDaemon(true)
113        .setThreadFactory(executor.getThreadFactory())
114        .build());
115
116    ScheduledExecutorService service =
117        Executors.unconfigurableScheduledExecutorService(executor);
118
119    addDelayedShutdownHook(service, terminationTimeout, timeUnit);
120
121    return service;
122  }
123
124  /**
125   * Add a shutdown hook to wait for thread completion in the given
126   * {@link ExecutorService service}.  This is useful if the given service uses
127   * daemon threads, and we want to keep the JVM from exiting immediately on
128   * shutdown, instead giving these daemon threads a chance to terminate
129   * normally.
130   * @param service ExecutorService which uses daemon threads
131   * @param terminationTimeout how long to wait for the executor to finish
132   *        before terminating the JVM
133   * @param timeUnit unit of time for the time parameter
134   */
135  @Beta
136  public static void addDelayedShutdownHook(
137      final ExecutorService service, final long terminationTimeout,
138      final TimeUnit timeUnit) {
139    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
140      @Override
141      public void run() {
142        try {
143          // We'd like to log progress and failures that may arise in the
144          // following code, but unfortunately the behavior of logging
145          // is undefined in shutdown hooks.
146          // This is because the logging code installs a shutdown hook of its
147          // own. See Cleaner class inside {@link LogManager}.
148          service.shutdown();
149          service.awaitTermination(terminationTimeout, timeUnit);
150        } catch (InterruptedException ignored) {
151          // We're shutting down anyway, so just ignore.
152        }
153      }
154    }, "DelayedShutdownHook-for-" + service));
155  }
156
157  /**
158   * Converts the given ThreadPoolExecutor into an ExecutorService that exits
159   * when the application is complete.  It does so by using daemon threads and
160   * adding a shutdown hook to wait for their completion.
161   *
162   * <p>This method waits 120 seconds before continuing with JVM termination,
163   * even if the executor has not finished its work.
164   *
165   * <p>This is mainly for fixed thread pools.
166   * See {@link Executors#newFixedThreadPool(int)}.
167   *
168   * @param executor the executor to modify to make sure it exits when the
169   *        application is finished
170   * @return an unmodifiable version of the input which will not hang the JVM
171   */
172  @Beta
173  public static ExecutorService getExitingExecutorService(
174      ThreadPoolExecutor executor) {
175    return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
176  }
177
178  /**
179   * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that
180   * exits when the application is complete.  It does so by using daemon threads
181   * and adding a shutdown hook to wait for their completion.
182   *
183   * <p>This method waits 120 seconds before continuing with JVM termination,
184   * even if the executor has not finished its work.
185   *
186   * <p>This is mainly for fixed thread pools.
187   * See {@link Executors#newScheduledThreadPool(int)}.
188   *
189   * @param executor the executor to modify to make sure it exits when the
190   *        application is finished
191   * @return an unmodifiable version of the input which will not hang the JVM
192   */
193  @Beta
194  public static ScheduledExecutorService getExitingScheduledExecutorService(
195      ScheduledThreadPoolExecutor executor) {
196    return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
197  }
198
199  /**
200   * Creates an executor service that runs each task in the thread
201   * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
202   * applies both to individually submitted tasks and to collections of tasks
203   * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
204   * tasks will run serially on the calling thread.  Tasks are run to
205   * completion before a {@code Future} is returned to the caller (unless the
206   * executor has been shutdown).
207   *
208   * <p>Although all tasks are immediately executed in the thread that
209   * submitted the task, this {@code ExecutorService} imposes a small
210   * locking overhead on each task submission in order to implement shutdown
211   * and termination behavior.
212   *
213   * <p>The implementation deviates from the {@code ExecutorService}
214   * specification with regards to the {@code shutdownNow} method.  First,
215   * "best-effort" with regards to canceling running tasks is implemented
216   * as "no-effort".  No interrupts or other attempts are made to stop
217   * threads executing tasks.  Second, the returned list will always be empty,
218   * as any submitted task is considered to have started execution.
219   * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
220   * which are pending serial execution, even the subset of the tasks that
221   * have not yet started execution.  It is unclear from the
222   * {@code ExecutorService} specification if these should be included, and
223   * it's much easier to implement the interpretation that they not be.
224   * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
225   * in concurrent calls to {@code invokeAll/invokeAny} throwing
226   * RejectedExecutionException, although a subset of the tasks may already
227   * have been executed.
228   *
229   * @since 10.0 (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility"
230   *        >mostly source-compatible</a> since 3.0)
231   */
232  public static ListeningExecutorService sameThreadExecutor() {
233    return new SameThreadExecutorService();
234  }
235
236  // See sameThreadExecutor javadoc for behavioral notes.
237  private static class SameThreadExecutorService
238      extends AbstractListeningExecutorService {
239    /**
240     * Lock used whenever accessing the state variables
241     * (runningTasks, shutdown, terminationCondition) of the executor
242     */
243    private final Lock lock = new ReentrantLock();
244
245    /** Signaled after the executor is shutdown and running tasks are done */
246    private final Condition termination = lock.newCondition();
247
248    /*
249     * Conceptually, these two variables describe the executor being in
250     * one of three states:
251     *   - Active: shutdown == false
252     *   - Shutdown: runningTasks > 0 and shutdown == true
253     *   - Terminated: runningTasks == 0 and shutdown == true
254     */
255    private int runningTasks = 0;
256    private boolean shutdown = false;
257
258    @Override
259    public void execute(Runnable command) {
260      startTask();
261      try {
262        command.run();
263      } finally {
264        endTask();
265      }
266    }
267
268    @Override
269    public boolean isShutdown() {
270      lock.lock();
271      try {
272        return shutdown;
273      } finally {
274        lock.unlock();
275      }
276    }
277
278    @Override
279    public void shutdown() {
280      lock.lock();
281      try {
282        shutdown = true;
283      } finally {
284        lock.unlock();
285      }
286    }
287
288    // See sameThreadExecutor javadoc for unusual behavior of this method.
289    @Override
290    public List<Runnable> shutdownNow() {
291      shutdown();
292      return Collections.emptyList();
293    }
294
295    @Override
296    public boolean isTerminated() {
297      lock.lock();
298      try {
299        return shutdown && runningTasks == 0;
300      } finally {
301        lock.unlock();
302      }
303    }
304
305    @Override
306    public boolean awaitTermination(long timeout, TimeUnit unit)
307        throws InterruptedException {
308      long nanos = unit.toNanos(timeout);
309      lock.lock();
310      try {
311        for (;;) {
312          if (isTerminated()) {
313            return true;
314          } else if (nanos <= 0) {
315            return false;
316          } else {
317            nanos = termination.awaitNanos(nanos);
318          }
319        }
320      } finally {
321        lock.unlock();
322      }
323    }
324
325    /**
326     * Checks if the executor has been shut down and increments the running
327     * task count.
328     *
329     * @throws RejectedExecutionException if the executor has been previously
330     *         shutdown
331     */
332    private void startTask() {
333      lock.lock();
334      try {
335        if (isShutdown()) {
336          throw new RejectedExecutionException("Executor already shutdown");
337        }
338        runningTasks++;
339      } finally {
340        lock.unlock();
341      }
342    }
343
344    /**
345     * Decrements the running task count.
346     */
347    private void endTask() {
348      lock.lock();
349      try {
350        runningTasks--;
351        if (isTerminated()) {
352          termination.signalAll();
353        }
354      } finally {
355        lock.unlock();
356      }
357    }
358  }
359
360  /**
361   * Creates an {@link ExecutorService} whose {@code submit} and {@code
362   * invokeAll} methods submit {@link ListenableFutureTask} instances to the
363   * given delegate executor. Those methods, as well as {@code execute} and
364   * {@code invokeAny}, are implemented in terms of calls to {@code
365   * delegate.execute}. All other methods are forwarded unchanged to the
366   * delegate. This implies that the returned {@code ListeningExecutorService}
367   * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code
368   * invokeAny} methods, so any special handling of tasks must be implemented in
369   * the delegate's {@code execute} method or by wrapping the returned {@code
370   * ListeningExecutorService}.
371   *
372   * <p>If the delegate executor was already an instance of {@code
373   * ListeningExecutorService}, it is returned untouched, and the rest of this
374   * documentation does not apply.
375   *
376   * @since 10.0
377   */
378  public static ListeningExecutorService listeningDecorator(
379      ExecutorService delegate) {
380    return (delegate instanceof ListeningExecutorService)
381        ? (ListeningExecutorService) delegate
382        : (delegate instanceof ScheduledExecutorService)
383        ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
384        : new ListeningDecorator(delegate);
385  }
386
387  /**
388   * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code
389   * invokeAll} methods submit {@link ListenableFutureTask} instances to the
390   * given delegate executor. Those methods, as well as {@code execute} and
391   * {@code invokeAny}, are implemented in terms of calls to {@code
392   * delegate.execute}. All other methods are forwarded unchanged to the
393   * delegate. This implies that the returned {@code
394   * SchedulingListeningExecutorService} never calls the delegate's {@code
395   * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special
396   * handling of tasks must be implemented in the delegate's {@code execute}
397   * method or by wrapping the returned {@code
398   * SchedulingListeningExecutorService}.
399   *
400   * <p>If the delegate executor was already an instance of {@code
401   * ListeningScheduledExecutorService}, it is returned untouched, and the rest
402   * of this documentation does not apply.
403   *
404   * @since 10.0
405   */
406  public static ListeningScheduledExecutorService listeningDecorator(
407      ScheduledExecutorService delegate) {
408    return (delegate instanceof ListeningScheduledExecutorService)
409        ? (ListeningScheduledExecutorService) delegate
410        : new ScheduledListeningDecorator(delegate);
411  }
412
413  private static class ListeningDecorator
414      extends AbstractListeningExecutorService {
415    final ExecutorService delegate;
416
417    ListeningDecorator(ExecutorService delegate) {
418      this.delegate = checkNotNull(delegate);
419    }
420
421    @Override
422    public boolean awaitTermination(long timeout, TimeUnit unit)
423        throws InterruptedException {
424      return delegate.awaitTermination(timeout, unit);
425    }
426
427    @Override
428    public boolean isShutdown() {
429      return delegate.isShutdown();
430    }
431
432    @Override
433    public boolean isTerminated() {
434      return delegate.isTerminated();
435    }
436
437    @Override
438    public void shutdown() {
439      delegate.shutdown();
440    }
441
442    @Override
443    public List<Runnable> shutdownNow() {
444      return delegate.shutdownNow();
445    }
446
447    @Override
448    public void execute(Runnable command) {
449      delegate.execute(command);
450    }
451  }
452
453  private static class ScheduledListeningDecorator
454      extends ListeningDecorator implements ListeningScheduledExecutorService {
455    @SuppressWarnings("hiding")
456    final ScheduledExecutorService delegate;
457
458    ScheduledListeningDecorator(ScheduledExecutorService delegate) {
459      super(delegate);
460      this.delegate = checkNotNull(delegate);
461    }
462
463    @Override
464    public ScheduledFuture<?> schedule(
465        Runnable command, long delay, TimeUnit unit) {
466      return delegate.schedule(command, delay, unit);
467    }
468
469    @Override
470    public <V> ScheduledFuture<V> schedule(
471        Callable<V> callable, long delay, TimeUnit unit) {
472      return delegate.schedule(callable, delay, unit);
473    }
474
475    @Override
476    public ScheduledFuture<?> scheduleAtFixedRate(
477        Runnable command, long initialDelay, long period, TimeUnit unit) {
478      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
479    }
480
481    @Override
482    public ScheduledFuture<?> scheduleWithFixedDelay(
483        Runnable command, long initialDelay, long delay, TimeUnit unit) {
484      return delegate.scheduleWithFixedDelay(
485          command, initialDelay, delay, unit);
486    }
487  }
488
489  /*
490   * This following method is a modified version of one found in
491   * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30
492   * which contained the following notice:
493   *
494   * Written by Doug Lea with assistance from members of JCP JSR-166
495   * Expert Group and released to the public domain, as explained at
496   * http://creativecommons.org/publicdomain/zero/1.0/
497   * Other contributors include Andrew Wright, Jeffrey Hayes,
498   * Pat Fisher, Mike Judd.
499   */
500
501  /**
502   * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
503   * implementations.
504   */ static <T> T invokeAnyImpl(ListeningExecutorService executorService,
505      Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
506          throws InterruptedException, ExecutionException, TimeoutException {
507    int ntasks = tasks.size();
508    checkArgument(ntasks > 0);
509    List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks);
510    BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue();
511
512    // For efficiency, especially in executors with limited
513    // parallelism, check to see if previously submitted tasks are
514    // done before submitting more of them. This interleaving
515    // plus the exception mechanics account for messiness of main
516    // loop.
517
518    try {
519      // Record exceptions so that if we fail to obtain any
520      // result, we can throw the last exception we got.
521      ExecutionException ee = null;
522      long lastTime = timed ? System.nanoTime() : 0;
523      Iterator<? extends Callable<T>> it = tasks.iterator();
524
525      futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
526      --ntasks;
527      int active = 1;
528
529      for (;;) {
530        Future<T> f = futureQueue.poll();
531        if (f == null) {
532          if (ntasks > 0) {
533            --ntasks;
534            futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
535            ++active;
536          } else if (active == 0) {
537            break;
538          } else if (timed) {
539            f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS);
540            if (f == null) {
541              throw new TimeoutException();
542            }
543            long now = System.nanoTime();
544            nanos -= now - lastTime;
545            lastTime = now;
546          } else {
547            f = futureQueue.take();
548          }
549        }
550        if (f != null) {
551          --active;
552          try {
553            return f.get();
554          } catch (ExecutionException eex) {
555            ee = eex;
556          } catch (RuntimeException rex) {
557            ee = new ExecutionException(rex);
558          }
559        }
560      }
561
562      if (ee == null) {
563        ee = new ExecutionException(null);
564      }
565      throw ee;
566    } finally {
567      for (Future<T> f : futures) {
568        f.cancel(true);
569      }
570    }
571  }
572
573  /**
574   * Submits the task and adds a listener that adds the future to {@code queue} when it completes.
575   */
576  private static <T> ListenableFuture<T> submitAndAddQueueListener(
577      ListeningExecutorService executorService, Callable<T> task,
578      final BlockingQueue<Future<T>> queue) {
579    final ListenableFuture<T> future = executorService.submit(task);
580    future.addListener(new Runnable() {
581      @Override public void run() {
582        queue.add(future);
583      }
584    }, MoreExecutors.sameThreadExecutor());
585    return future;
586  }
587}