001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021import static com.google.common.base.Preconditions.checkState;
022
023import com.google.common.annotations.Beta;
024import com.google.common.collect.Lists;
025import com.google.common.collect.Queues;
026import com.google.common.util.concurrent.Service.State; // javadoc needs this
027
028import java.util.List;
029import java.util.Queue;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.Executor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.TimeoutException;
034import java.util.concurrent.locks.ReentrantLock;
035import java.util.logging.Level;
036import java.util.logging.Logger;
037
038import javax.annotation.Nullable;
039import javax.annotation.concurrent.GuardedBy;
040import javax.annotation.concurrent.Immutable;
041
042/**
043 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
044 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
045 * callbacks. Its subclasses must manage threads manually; consider
046 * {@link AbstractExecutionThreadService} if you need only a single execution thread.
047 *
048 * @author Jesse Wilson
049 * @author Luke Sandberg
050 * @since 1.0
051 */
052@Beta
053public abstract class AbstractService implements Service {
054  private static final Logger logger = Logger.getLogger(AbstractService.class.getName());
055  private final ReentrantLock lock = new ReentrantLock();
056
057  private final Transition startup = new Transition();
058  private final Transition shutdown = new Transition();
059
060  /**
061   * The listeners to notify during a state transition.
062   */
063  @GuardedBy("lock")
064  private final List<ListenerExecutorPair> listeners = Lists.newArrayList();
065
066  /**
067   * The queue of listeners that are waiting to be executed.
068   *
069   * <p>Enqueue operations should be protected by {@link #lock} while dequeue operations should be
070   * protected by the implicit lock on this object. Dequeue operations should be executed atomically
071   * with the execution of the {@link Runnable} and additionally the {@link #lock} should not be
072   * held when the listeners are being executed. Use {@link #executeListeners} for this operation.
073   * This is necessary to ensure that elements on the queue are executed in the correct order.
074   * Enqueue operations should be protected so that listeners are added in the correct order. We use
075   * a concurrent queue implementation so that enqueues can be executed concurrently with dequeues.
076   */
077  @GuardedBy("queuedListeners")
078  private final Queue<Runnable> queuedListeners = Queues.newConcurrentLinkedQueue();
079
080  /**
081   * The current state of the service.  This should be written with the lock held but can be read
082   * without it because it is an immutable object in a volatile field.  This is desirable so that
083   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
084   * without grabbing the lock.
085   *
086   * <p>To update this field correctly the lock must be held to guarantee that the state is
087   * consistent.
088   */
089  @GuardedBy("lock")
090  private volatile StateSnapshot snapshot = new StateSnapshot(State.NEW);
091
092  protected AbstractService() {
093    // Add a listener to update the futures. This needs to be added first so that it is executed
094    // before the other listeners. This way the other listeners can access the completed futures.
095    addListener(
096        new Listener() {
097          @Override public void starting() {}
098
099          @Override public void running() {
100            startup.set(State.RUNNING);
101          }
102
103          @Override public void stopping(State from) {
104            if (from == State.STARTING) {
105              startup.set(State.STOPPING);
106            }
107          }
108
109          @Override public void terminated(State from) {
110            if (from == State.NEW) {
111              startup.set(State.TERMINATED);
112            }
113            shutdown.set(State.TERMINATED);
114          }
115
116          @Override public void failed(State from, Throwable failure) {
117            switch (from) {
118              case STARTING:
119                startup.setException(failure);
120                shutdown.setException(new Exception("Service failed to start.", failure));
121                break;
122              case RUNNING:
123                shutdown.setException(new Exception("Service failed while running", failure));
124                break;
125              case STOPPING:
126                shutdown.setException(failure);
127                break;
128              case TERMINATED:  /* fall-through */
129              case FAILED:  /* fall-through */
130              case NEW:  /* fall-through */
131              default:
132                throw new AssertionError("Unexpected from state: " + from);
133            }
134          }
135        },
136        MoreExecutors.sameThreadExecutor());
137  }
138
139  /**
140   * This method is called by {@link #start} to initiate service startup. The invocation of this
141   * method should cause a call to {@link #notifyStarted()}, either during this method's run, or
142   * after it has returned. If startup fails, the invocation should cause a call to
143   * {@link #notifyFailed(Throwable)} instead.
144   *
145   * <p>This method should return promptly; prefer to do work on a different thread where it is
146   * convenient. It is invoked exactly once on service startup, even when {@link #start} is called
147   * multiple times.
148   */
149  protected abstract void doStart();
150
151  /**
152   * This method should be used to initiate service shutdown. The invocation of this method should
153   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
154   * returned. If shutdown fails, the invocation should cause a call to
155   * {@link #notifyFailed(Throwable)} instead.
156   *
157   * <p> This method should return promptly; prefer to do work on a different thread where it is
158   * convenient. It is invoked exactly once on service shutdown, even when {@link #stop} is called
159   * multiple times.
160   */
161  protected abstract void doStop();
162
163  @Override
164  public final ListenableFuture<State> start() {
165    lock.lock();
166    try {
167      if (snapshot.state == State.NEW) {
168        snapshot = new StateSnapshot(State.STARTING);
169        starting();
170        doStart();
171      }
172    } catch (Throwable startupFailure) {
173      notifyFailed(startupFailure);
174    } finally {
175      lock.unlock();
176      executeListeners();
177    }
178
179    return startup;
180  }
181
182  @Override
183  public final ListenableFuture<State> stop() {
184    lock.lock();
185    try {
186      switch (snapshot.state) {
187        case NEW:
188          snapshot = new StateSnapshot(State.TERMINATED);
189          terminated(State.NEW);
190          break;
191        case STARTING:
192          snapshot = new StateSnapshot(State.STARTING, true, null);
193          stopping(State.STARTING);
194          break;
195        case RUNNING:
196          snapshot = new StateSnapshot(State.STOPPING);
197          stopping(State.RUNNING);
198          doStop();
199          break;
200        case STOPPING:
201        case TERMINATED:
202        case FAILED:
203          // do nothing
204          break;
205        default:
206          throw new AssertionError("Unexpected state: " + snapshot.state);
207      }
208    } catch (Throwable shutdownFailure) {
209      notifyFailed(shutdownFailure);
210    } finally {
211      lock.unlock();
212      executeListeners();
213    }
214
215    return shutdown;
216  }
217
218  @Override
219  public State startAndWait() {
220    return Futures.getUnchecked(start());
221  }
222
223  @Override
224  public State stopAndWait() {
225    return Futures.getUnchecked(stop());
226  }
227
228  /**
229   * Implementing classes should invoke this method once their service has started. It will cause
230   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
231   *
232   * @throws IllegalStateException if the service is not {@link State#STARTING}.
233   */
234  protected final void notifyStarted() {
235    lock.lock();
236    try {
237      if (snapshot.state != State.STARTING) {
238        IllegalStateException failure = new IllegalStateException(
239            "Cannot notifyStarted() when the service is " + snapshot.state);
240        notifyFailed(failure);
241        throw failure;
242      }
243
244      if (snapshot.shutdownWhenStartupFinishes) {
245        snapshot = new StateSnapshot(State.STOPPING);
246        // We don't call listeners here because we already did that when we set the
247        // shutdownWhenStartupFinishes flag.
248        doStop();
249      } else {
250        snapshot = new StateSnapshot(State.RUNNING);
251        running();
252      }
253    } finally {
254      lock.unlock();
255      executeListeners();
256    }
257  }
258
259  /**
260   * Implementing classes should invoke this method once their service has stopped. It will cause
261   * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
262   *
263   * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor
264   *         {@link State#RUNNING}.
265   */
266  protected final void notifyStopped() {
267    lock.lock();
268    try {
269      if (snapshot.state != State.STOPPING && snapshot.state != State.RUNNING) {
270        IllegalStateException failure = new IllegalStateException(
271            "Cannot notifyStopped() when the service is " + snapshot.state);
272        notifyFailed(failure);
273        throw failure;
274      }
275      State previous = snapshot.state;
276      snapshot = new StateSnapshot(State.TERMINATED);
277      terminated(previous);
278    } finally {
279      lock.unlock();
280      executeListeners();
281    }
282  }
283
284  /**
285   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
286   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
287   * or otherwise cannot be started nor stopped.
288   */
289  protected final void notifyFailed(Throwable cause) {
290    checkNotNull(cause);
291
292    lock.lock();
293    try {
294      switch (snapshot.state) {
295        case NEW:
296        case TERMINATED:
297          throw new IllegalStateException("Failed while in state:" + snapshot.state, cause);
298        case RUNNING:
299        case STARTING:
300        case STOPPING:
301          State previous = snapshot.state;
302          snapshot = new StateSnapshot(State.FAILED, false, cause);
303          failed(previous, cause);
304          break;
305        case FAILED:
306          // Do nothing
307          break;
308        default:
309          throw new AssertionError("Unexpected state: " + snapshot.state);
310      }
311    } finally {
312      lock.unlock();
313      executeListeners();
314    }
315  }
316
317  @Override
318  public final boolean isRunning() {
319    return state() == State.RUNNING;
320  }
321
322  @Override
323  public final State state() {
324    return snapshot.externalState();
325  }
326
327  @Override
328  public final void addListener(Listener listener, Executor executor) {
329    checkNotNull(listener, "listener");
330    checkNotNull(executor, "executor");
331    lock.lock();
332    try {
333      if (snapshot.state != State.TERMINATED && snapshot.state != State.FAILED) {
334        listeners.add(new ListenerExecutorPair(listener, executor));
335      }
336    } finally {
337      lock.unlock();
338    }
339  }
340
341  @Override public String toString() {
342    return getClass().getSimpleName() + " [" + state() + "]";
343  }
344
345  /**
346   * A change from one service state to another, plus the result of the change.
347   */
348  private class Transition extends AbstractFuture<State> {
349    @Override
350    public State get(long timeout, TimeUnit unit)
351        throws InterruptedException, TimeoutException, ExecutionException {
352      try {
353        return super.get(timeout, unit);
354      } catch (TimeoutException e) {
355        throw new TimeoutException(AbstractService.this.toString());
356      }
357    }
358  }
359
360  /**
361   * Attempts to execute all the listeners in {@link #queuedListeners} while not holding the
362   * {@link #lock}.
363   */
364  private void executeListeners() {
365    if (!lock.isHeldByCurrentThread()) {
366      synchronized (queuedListeners) {
367        Runnable listener;
368        while ((listener = queuedListeners.poll()) != null) {
369          listener.run();
370        }
371      }
372    }
373  }
374
375  @GuardedBy("lock")
376  private void starting() {
377    for (final ListenerExecutorPair pair : listeners) {
378      queuedListeners.add(new Runnable() {
379        @Override public void run() {
380          pair.execute(new Runnable() {
381            @Override public void run() {
382              pair.listener.starting();
383            }
384          });
385        }
386      });
387    }
388  }
389
390  @GuardedBy("lock")
391  private void running() {
392    for (final ListenerExecutorPair pair : listeners) {
393      queuedListeners.add(new Runnable() {
394        @Override public void run() {
395          pair.execute(new Runnable() {
396            @Override public void run() {
397              pair.listener.running();
398            }
399          });
400        }
401      });
402    }
403  }
404
405  @GuardedBy("lock")
406  private void stopping(final State from) {
407    for (final ListenerExecutorPair pair : listeners) {
408      queuedListeners.add(new Runnable() {
409        @Override public void run() {
410          pair.execute(new Runnable() {
411            @Override public void run() {
412              pair.listener.stopping(from);
413            }
414          });
415        }
416      });
417    }
418  }
419
420  @GuardedBy("lock")
421  private void terminated(final State from) {
422    for (final ListenerExecutorPair pair : listeners) {
423      queuedListeners.add(new Runnable() {
424        @Override public void run() {
425          pair.execute(new Runnable() {
426            @Override public void run() {
427              pair.listener.terminated(from);
428            }
429          });
430        }
431      });
432    }
433    // There are no more state transitions so we can clear this out.
434    listeners.clear();
435  }
436
437  @GuardedBy("lock")
438  private void failed(final State from, final Throwable cause) {
439    for (final ListenerExecutorPair pair : listeners) {
440      queuedListeners.add(new Runnable() {
441        @Override public void run() {
442          pair.execute(new Runnable() {
443            @Override public void run() {
444              pair.listener.failed(from, cause);
445            }
446          });
447        }
448      });
449    }
450    // There are no more state transitions so we can clear this out.
451    listeners.clear();
452  }
453
454  /** A simple holder for a listener and its executor. */
455  private static class ListenerExecutorPair {
456    final Listener listener;
457    final Executor executor;
458
459    ListenerExecutorPair(Listener listener, Executor executor) {
460      this.listener = listener;
461      this.executor = executor;
462    }
463
464    /**
465     * Executes the given {@link Runnable} on {@link #executor} logging and swallowing all
466     * exceptions
467     */
468    void execute(Runnable runnable) {
469      try {
470        executor.execute(runnable);
471      } catch (Exception e) {
472        logger.log(Level.SEVERE, "Exception while executing listener " + listener
473            + " with executor " + executor, e);
474      }
475    }
476  }
477
478  /**
479   * An immutable snapshot of the current state of the service. This class represents a consistent
480   * snapshot of the state and therefore it can be used to answer simple queries without needing to
481   * grab a lock.
482   */
483  @Immutable
484  private static final class StateSnapshot {
485    /**
486     * The internal state, which equals external state unless
487     * shutdownWhenStartupFinishes is true.
488     */
489    final State state;
490
491    /**
492     * If true, the user requested a shutdown while the service was still starting
493     * up.
494     */
495    final boolean shutdownWhenStartupFinishes;
496
497    /**
498     * The exception that caused this service to fail.  This will be {@code null}
499     * unless the service has failed.
500     */
501    @Nullable
502    final Throwable failure;
503
504    StateSnapshot(State internalState) {
505      this(internalState, false, null);
506    }
507
508    StateSnapshot(State internalState, boolean shutdownWhenStartupFinishes, Throwable failure) {
509      checkArgument(!shutdownWhenStartupFinishes || internalState == State.STARTING,
510          "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
511          internalState);
512      checkArgument(!(failure != null ^ internalState == State.FAILED),
513          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
514          + "instead.", internalState, failure);
515      this.state = internalState;
516      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
517      this.failure = failure;
518    }
519
520    /** @see Service#state() */
521    State externalState() {
522      if (shutdownWhenStartupFinishes && state == State.STARTING) {
523        return State.STOPPING;
524      } else {
525        return state;
526      }
527    }
528
529    /** @see Service#failureCause() */
530    Throwable failureCause() {
531      checkState(state == State.FAILED,
532          "failureCause() is only valid if the service has failed, service is %s", state);
533      return failure;
534    }
535  }
536}