001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.ExecutorService;
020    import java.util.concurrent.RejectedExecutionException;
021    import java.util.concurrent.atomic.AtomicBoolean;
022    
023    import org.apache.camel.AsyncCallback;
024    import org.apache.camel.AsyncProcessor;
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.Rejectable;
028    import org.apache.camel.ThreadPoolRejectedPolicy;
029    import org.apache.camel.support.ServiceSupport;
030    import org.apache.camel.util.AsyncProcessorHelper;
031    import org.apache.camel.util.ObjectHelper;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    /**
036     * Threads processor that leverage a thread pool for continue processing the {@link Exchange}s
037     * using the asynchronous routing engine.
038     * <p/>
039     * Pay attention to how this processor handles rejected tasks.
040     * <ul>
041     * <li>Abort - The current exchange will be set with a {@link RejectedExecutionException} exception,
042     * and marked to stop continue routing.
043     * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>failed</b>, due the exception.</li>
044     * <li>Discard - The current exchange will be marked to stop continue routing (notice no exception is set).
045     * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>successful</b>, due no exception being set.</li>
046     * <li>DiscardOldest - The oldest exchange will be marked to stop continue routing (notice no exception is set).
047     * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>successful</b>, due no exception being set.
048     * And the current exchange will be added to the task queue.</li>
049     * <li>CallerRuns - The current exchange will be processed by the current thread. Which mean the current thread
050     * will not be free to process a new exchange, as its processing the current exchange.</li>
051     * </ul>
052     */
053    public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor {
054    
055        private static final Logger LOG = LoggerFactory.getLogger(ThreadsProcessor.class);
056        private final CamelContext camelContext;
057        private final ExecutorService executorService;
058        private volatile boolean shutdownExecutorService;
059        private final AtomicBoolean shutdown = new AtomicBoolean(true);
060        private boolean callerRunsWhenRejected = true;
061        private ThreadPoolRejectedPolicy rejectedPolicy;
062    
063        private final class ProcessCall implements Runnable, Rejectable {
064            private final Exchange exchange;
065            private final AsyncCallback callback;
066    
067            public ProcessCall(Exchange exchange, AsyncCallback callback) {
068                this.exchange = exchange;
069                this.callback = callback;
070            }
071    
072            @Override
073            public void run() {
074                LOG.trace("Continue routing exchange {} ", exchange);
075                if (shutdown.get()) {
076                    exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running."));
077                }
078                callback.done(false);
079            }
080    
081            @Override
082            public void reject() {
083                // abort should mark the exchange with an rejected exception
084                boolean abort = ThreadPoolRejectedPolicy.Abort == rejectedPolicy;
085                if (abort) {
086                    exchange.setException(new RejectedExecutionException());
087                }
088    
089                LOG.trace("{} routing exchange {} ", abort ? "Aborted" : "Rejected", exchange);
090                // we should not continue routing, and no redelivery should be performed
091                exchange.setProperty(Exchange.ROUTE_STOP, true);
092                exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, true);
093    
094                if (shutdown.get()) {
095                    exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running."));
096                }
097                callback.done(false);
098            }
099    
100            @Override
101            public String toString() {
102                return "ProcessCall[" + exchange + "]";
103            }
104        }
105    
106        public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService, boolean shutdownExecutorService) {
107            ObjectHelper.notNull(camelContext, "camelContext");
108            ObjectHelper.notNull(executorService, "executorService");
109            this.camelContext = camelContext;
110            this.executorService = executorService;
111            this.shutdownExecutorService = shutdownExecutorService;
112        }
113    
114        public void process(final Exchange exchange) throws Exception {
115            AsyncProcessorHelper.process(this, exchange);
116        }
117    
118        public boolean process(Exchange exchange, AsyncCallback callback) {
119            if (shutdown.get()) {
120                throw new IllegalStateException("ThreadsProcessor is not running.");
121            }
122    
123            ProcessCall call = new ProcessCall(exchange, callback);
124            try {
125                LOG.trace("Submitting task {}", call);
126                executorService.submit(call);
127                // tell Camel routing engine we continue routing asynchronous
128                return false;
129            } catch (RejectedExecutionException e) {
130                boolean callerRuns = isCallerRunsWhenRejected();
131                if (!callerRuns) {
132                    exchange.setException(e);
133                }
134    
135                LOG.trace("{} executing task {}", callerRuns ? "CallerRuns" : "Aborted", call);
136                if (shutdown.get()) {
137                    exchange.setException(new RejectedExecutionException());
138                }
139                callback.done(true);
140                return true;
141            }
142        }
143    
144        public boolean isCallerRunsWhenRejected() {
145            return callerRunsWhenRejected;
146        }
147    
148        public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
149            this.callerRunsWhenRejected = callerRunsWhenRejected;
150        }
151    
152        public ThreadPoolRejectedPolicy getRejectedPolicy() {
153            return rejectedPolicy;
154        }
155    
156        public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
157            this.rejectedPolicy = rejectedPolicy;
158        }
159    
160        public String toString() {
161            return "Threads";
162        }
163    
164        protected void doStart() throws Exception {
165            shutdown.set(false);
166        }
167    
168        protected void doStop() throws Exception {
169            shutdown.set(true);
170        }
171    
172        protected void doShutdown() throws Exception {
173            if (shutdownExecutorService) {
174                camelContext.getExecutorServiceManager().shutdownNow(executorService);
175            }
176            super.doShutdown();
177        }
178    
179    }