001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.concurrent.ExecutorService; 020import java.util.concurrent.RejectedExecutionException; 021import java.util.concurrent.RejectedExecutionHandler; 022import java.util.concurrent.ThreadPoolExecutor; 023import java.util.concurrent.atomic.AtomicBoolean; 024 025import org.apache.camel.AsyncCallback; 026import org.apache.camel.AsyncProcessor; 027import org.apache.camel.CamelContext; 028import org.apache.camel.Exchange; 029import org.apache.camel.Rejectable; 030import org.apache.camel.ThreadPoolRejectedPolicy; 031import org.apache.camel.spi.IdAware; 032import org.apache.camel.support.ServiceSupport; 033import org.apache.camel.util.AsyncProcessorHelper; 034import org.apache.camel.util.ObjectHelper; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Threads processor that leverage a thread pool for continue processing the {@link Exchange}s 040 * using the asynchronous routing engine. 041 * <p/> 042 * <b>Notice:</b> For transacted routes then this {@link ThreadsProcessor} is not in use, as we want to 043 * process messages using the same thread to support all work done in the same transaction. The reason 044 * is that the transaction manager that orchestrate the transaction, requires all the work to be done 045 * on the same thread. 046 * <p/> 047 * Pay attention to how this processor handles rejected tasks. 048 * <ul> 049 * <li>Abort - The current exchange will be set with a {@link RejectedExecutionException} exception, 050 * and marked to stop continue routing. 051 * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>failed</b>, due the exception.</li> 052 * <li>Discard - The current exchange will be marked to stop continue routing (notice no exception is set). 053 * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>successful</b>, due no exception being set.</li> 054 * <li>DiscardOldest - The oldest exchange will be marked to stop continue routing (notice no exception is set). 055 * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>successful</b>, due no exception being set. 056 * And the current exchange will be added to the task queue.</li> 057 * <li>CallerRuns - The current exchange will be processed by the current thread. Which mean the current thread 058 * will not be free to process a new exchange, as its processing the current exchange.</li> 059 * </ul> 060 */ 061public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor, IdAware { 062 063 private static final Logger LOG = LoggerFactory.getLogger(ThreadsProcessor.class); 064 private String id; 065 private final CamelContext camelContext; 066 private final ExecutorService executorService; 067 private final ThreadPoolRejectedPolicy rejectedPolicy; 068 private volatile boolean shutdownExecutorService; 069 private final AtomicBoolean shutdown = new AtomicBoolean(true); 070 071 private final class ProcessCall implements Runnable, Rejectable { 072 private final Exchange exchange; 073 private final AsyncCallback callback; 074 private final boolean done; 075 076 ProcessCall(Exchange exchange, AsyncCallback callback, boolean done) { 077 this.exchange = exchange; 078 this.callback = callback; 079 this.done = done; 080 } 081 082 @Override 083 public void run() { 084 LOG.trace("Continue routing exchange {}", exchange); 085 if (shutdown.get()) { 086 exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running.")); 087 } 088 callback.done(done); 089 } 090 091 @Override 092 public void reject() { 093 // reject should mark the exchange with an rejected exception and mark not to route anymore 094 exchange.setException(new RejectedExecutionException()); 095 LOG.trace("Rejected routing exchange {}", exchange); 096 if (shutdown.get()) { 097 exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running.")); 098 } 099 callback.done(done); 100 } 101 102 @Override 103 public String toString() { 104 return "ProcessCall[" + exchange + "]"; 105 } 106 } 107 108 public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService, boolean shutdownExecutorService, ThreadPoolRejectedPolicy rejectedPolicy) { 109 ObjectHelper.notNull(camelContext, "camelContext"); 110 ObjectHelper.notNull(executorService, "executorService"); 111 ObjectHelper.notNull(rejectedPolicy, "rejectedPolicy"); 112 this.camelContext = camelContext; 113 this.executorService = executorService; 114 this.shutdownExecutorService = shutdownExecutorService; 115 this.rejectedPolicy = rejectedPolicy; 116 } 117 118 public void process(final Exchange exchange) throws Exception { 119 AsyncProcessorHelper.process(this, exchange); 120 } 121 122 public boolean process(Exchange exchange, AsyncCallback callback) { 123 if (shutdown.get()) { 124 throw new IllegalStateException("ThreadsProcessor is not running."); 125 } 126 127 // we cannot execute this asynchronously for transacted exchanges, as the transaction manager doesn't support 128 // using different threads in the same transaction 129 if (exchange.isTransacted()) { 130 LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); 131 callback.done(true); 132 return true; 133 } 134 135 try { 136 // process the call in asynchronous mode 137 ProcessCall call = new ProcessCall(exchange, callback, false); 138 LOG.trace("Submitting task {}", call); 139 executorService.submit(call); 140 // tell Camel routing engine we continue routing asynchronous 141 return false; 142 } catch (Throwable e) { 143 if (executorService instanceof ThreadPoolExecutor) { 144 ThreadPoolExecutor tpe = (ThreadPoolExecutor) executorService; 145 // process the call in synchronous mode 146 ProcessCall call = new ProcessCall(exchange, callback, true); 147 rejectedPolicy.asRejectedExecutionHandler().rejectedExecution(call, tpe); 148 return true; 149 } else { 150 exchange.setException(e); 151 callback.done(true); 152 return true; 153 } 154 } 155 } 156 157 public ExecutorService getExecutorService() { 158 return executorService; 159 } 160 161 public String toString() { 162 return "Threads"; 163 } 164 165 public String getId() { 166 return id; 167 } 168 169 public void setId(String id) { 170 this.id = id; 171 } 172 173 public ThreadPoolRejectedPolicy getRejectedPolicy() { 174 return rejectedPolicy; 175 } 176 177 protected void doStart() throws Exception { 178 shutdown.set(false); 179 } 180 181 protected void doStop() throws Exception { 182 shutdown.set(true); 183 } 184 185 protected void doShutdown() throws Exception { 186 if (shutdownExecutorService) { 187 camelContext.getExecutorServiceManager().shutdownNow(executorService); 188 } 189 super.doShutdown(); 190 } 191 192}