001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.ExecutorService; 020 import java.util.concurrent.RejectedExecutionException; 021 import java.util.concurrent.atomic.AtomicBoolean; 022 023 import org.apache.camel.AsyncCallback; 024 import org.apache.camel.AsyncProcessor; 025 import org.apache.camel.CamelContext; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.Rejectable; 028 import org.apache.camel.ThreadPoolRejectedPolicy; 029 import org.apache.camel.support.ServiceSupport; 030 import org.apache.camel.util.AsyncProcessorHelper; 031 import org.apache.camel.util.ObjectHelper; 032 import org.slf4j.Logger; 033 import org.slf4j.LoggerFactory; 034 035 /** 036 * Threads processor that leverage a thread pool for continue processing the {@link Exchange}s 037 * using the asynchronous routing engine. 038 * <p/> 039 * Pay attention to how this processor handles rejected tasks. 040 * <ul> 041 * <li>Abort - The current exchange will be set with a {@link RejectedExecutionException} exception, 042 * and marked to stop continue routing. 043 * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>failed</b>, due the exception.</li> 044 * <li>Discard - The current exchange will be marked to stop continue routing (notice no exception is set). 045 * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>successful</b>, due no exception being set.</li> 046 * <li>DiscardOldest - The oldest exchange will be marked to stop continue routing (notice no exception is set). 047 * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>successful</b>, due no exception being set. 048 * And the current exchange will be added to the task queue.</li> 049 * <li>CallerRuns - The current exchange will be processed by the current thread. Which mean the current thread 050 * will not be free to process a new exchange, as its processing the current exchange.</li> 051 * </ul> 052 */ 053 public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor { 054 055 private static final Logger LOG = LoggerFactory.getLogger(ThreadsProcessor.class); 056 private final CamelContext camelContext; 057 private final ExecutorService executorService; 058 private volatile boolean shutdownExecutorService; 059 private final AtomicBoolean shutdown = new AtomicBoolean(true); 060 private boolean callerRunsWhenRejected = true; 061 private ThreadPoolRejectedPolicy rejectedPolicy; 062 063 private final class ProcessCall implements Runnable, Rejectable { 064 private final Exchange exchange; 065 private final AsyncCallback callback; 066 067 public ProcessCall(Exchange exchange, AsyncCallback callback) { 068 this.exchange = exchange; 069 this.callback = callback; 070 } 071 072 @Override 073 public void run() { 074 LOG.trace("Continue routing exchange {} ", exchange); 075 if (shutdown.get()) { 076 exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running.")); 077 } 078 callback.done(false); 079 } 080 081 @Override 082 public void reject() { 083 // abort should mark the exchange with an rejected exception 084 boolean abort = ThreadPoolRejectedPolicy.Abort == rejectedPolicy; 085 if (abort) { 086 exchange.setException(new RejectedExecutionException()); 087 } 088 089 LOG.trace("{} routing exchange {} ", abort ? "Aborted" : "Rejected", exchange); 090 // we should not continue routing, and no redelivery should be performed 091 exchange.setProperty(Exchange.ROUTE_STOP, true); 092 exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, true); 093 094 if (shutdown.get()) { 095 exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running.")); 096 } 097 callback.done(false); 098 } 099 100 @Override 101 public String toString() { 102 return "ProcessCall[" + exchange + "]"; 103 } 104 } 105 106 public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService, boolean shutdownExecutorService) { 107 ObjectHelper.notNull(camelContext, "camelContext"); 108 ObjectHelper.notNull(executorService, "executorService"); 109 this.camelContext = camelContext; 110 this.executorService = executorService; 111 this.shutdownExecutorService = shutdownExecutorService; 112 } 113 114 public void process(final Exchange exchange) throws Exception { 115 AsyncProcessorHelper.process(this, exchange); 116 } 117 118 public boolean process(Exchange exchange, AsyncCallback callback) { 119 if (shutdown.get()) { 120 throw new IllegalStateException("ThreadsProcessor is not running."); 121 } 122 123 ProcessCall call = new ProcessCall(exchange, callback); 124 try { 125 LOG.trace("Submitting task {}", call); 126 executorService.submit(call); 127 // tell Camel routing engine we continue routing asynchronous 128 return false; 129 } catch (RejectedExecutionException e) { 130 boolean callerRuns = isCallerRunsWhenRejected(); 131 if (!callerRuns) { 132 exchange.setException(e); 133 } 134 135 LOG.trace("{} executing task {}", callerRuns ? "CallerRuns" : "Aborted", call); 136 if (shutdown.get()) { 137 exchange.setException(new RejectedExecutionException()); 138 } 139 callback.done(true); 140 return true; 141 } 142 } 143 144 public boolean isCallerRunsWhenRejected() { 145 return callerRunsWhenRejected; 146 } 147 148 public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) { 149 this.callerRunsWhenRejected = callerRunsWhenRejected; 150 } 151 152 public ThreadPoolRejectedPolicy getRejectedPolicy() { 153 return rejectedPolicy; 154 } 155 156 public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) { 157 this.rejectedPolicy = rejectedPolicy; 158 } 159 160 public String toString() { 161 return "Threads"; 162 } 163 164 protected void doStart() throws Exception { 165 shutdown.set(false); 166 } 167 168 protected void doStop() throws Exception { 169 shutdown.set(true); 170 } 171 172 protected void doShutdown() throws Exception { 173 if (shutdownExecutorService) { 174 camelContext.getExecutorServiceManager().shutdownNow(executorService); 175 } 176 super.doShutdown(); 177 } 178 179 }