001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.Iterator; 020 import java.util.concurrent.ExecutorService; 021 022 import org.apache.camel.AsyncCallback; 023 import org.apache.camel.AsyncProcessor; 024 import org.apache.camel.CamelContext; 025 import org.apache.camel.Endpoint; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.Expression; 028 import org.apache.camel.Processor; 029 import org.apache.camel.impl.ProducerCache; 030 import org.apache.camel.processor.aggregate.AggregationStrategy; 031 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 032 import org.apache.camel.support.ServiceSupport; 033 import org.apache.camel.util.AsyncProcessorHelper; 034 import org.apache.camel.util.ExchangeHelper; 035 import org.apache.camel.util.ObjectHelper; 036 import org.apache.camel.util.ServiceHelper; 037 038 import static org.apache.camel.util.ObjectHelper.notNull; 039 040 /** 041 * Implements a dynamic <a 042 * href="http://camel.apache.org/recipient-list.html">Recipient List</a> 043 * pattern where the list of actual endpoints to send a message exchange to are 044 * dependent on some dynamic expression. 045 * 046 * @version 047 */ 048 public class RecipientList extends ServiceSupport implements AsyncProcessor { 049 private final CamelContext camelContext; 050 private ProducerCache producerCache; 051 private Expression expression; 052 private final String delimiter; 053 private boolean parallelProcessing; 054 private boolean stopOnException; 055 private boolean ignoreInvalidEndpoints; 056 private boolean streaming; 057 private long timeout; 058 private Processor onPrepare; 059 private boolean shareUnitOfWork; 060 private ExecutorService executorService; 061 private boolean shutdownExecutorService; 062 private ExecutorService aggregateExecutorService; 063 private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy(); 064 065 public RecipientList(CamelContext camelContext) { 066 // use comma by default as delimiter 067 this(camelContext, ","); 068 } 069 070 public RecipientList(CamelContext camelContext, String delimiter) { 071 notNull(camelContext, "camelContext"); 072 ObjectHelper.notEmpty(delimiter, "delimiter"); 073 this.camelContext = camelContext; 074 this.delimiter = delimiter; 075 } 076 077 public RecipientList(CamelContext camelContext, Expression expression) { 078 // use comma by default as delimiter 079 this(camelContext, expression, ","); 080 } 081 082 public RecipientList(CamelContext camelContext, Expression expression, String delimiter) { 083 notNull(camelContext, "camelContext"); 084 ObjectHelper.notNull(expression, "expression"); 085 ObjectHelper.notEmpty(delimiter, "delimiter"); 086 this.camelContext = camelContext; 087 this.expression = expression; 088 this.delimiter = delimiter; 089 } 090 091 @Override 092 public String toString() { 093 return "RecipientList[" + (expression != null ? expression : "") + "]"; 094 } 095 096 public void process(Exchange exchange) throws Exception { 097 AsyncProcessorHelper.process(this, exchange); 098 } 099 100 public boolean process(Exchange exchange, AsyncCallback callback) { 101 if (!isStarted()) { 102 throw new IllegalStateException("RecipientList has not been started: " + this); 103 } 104 105 // use the evaluate expression result if exists 106 Object recipientList = exchange.removeProperty(Exchange.EVALUATE_EXPRESSION_RESULT); 107 if (recipientList == null && expression != null) { 108 // fallback and evaluate the expression 109 recipientList = expression.evaluate(exchange, Object.class); 110 } 111 112 return sendToRecipientList(exchange, recipientList, callback); 113 } 114 115 /** 116 * Sends the given exchange to the recipient list 117 */ 118 public boolean sendToRecipientList(Exchange exchange, Object recipientList, AsyncCallback callback) { 119 Iterator<Object> iter = ObjectHelper.createIterator(recipientList, delimiter); 120 121 RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(), 122 isParallelProcessing(), getExecutorService(), isShutdownExecutorService(), 123 isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork()) { 124 @Override 125 protected synchronized ExecutorService createAggregateExecutorService(String name) { 126 // use a shared executor service to avoid creating new thread pools 127 if (aggregateExecutorService == null) { 128 aggregateExecutorService = super.createAggregateExecutorService("RecipientList-AggregateTask"); 129 } 130 return aggregateExecutorService; 131 } 132 }; 133 rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints()); 134 135 // start the service 136 try { 137 ServiceHelper.startService(rlp); 138 } catch (Exception e) { 139 exchange.setException(e); 140 callback.done(true); 141 return true; 142 } 143 144 AsyncProcessor target = rlp; 145 if (isShareUnitOfWork()) { 146 // wrap answer in a sub unit of work, since we share the unit of work 147 target = new SubUnitOfWorkProcessor(rlp); 148 } 149 150 // now let the multicast process the exchange 151 return AsyncProcessorHelper.process(target, exchange, callback); 152 } 153 154 protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) { 155 // trim strings as end users might have added spaces between separators 156 if (recipient instanceof String) { 157 recipient = ((String)recipient).trim(); 158 } 159 return ExchangeHelper.resolveEndpoint(exchange, recipient); 160 } 161 162 protected void doStart() throws Exception { 163 if (producerCache == null) { 164 producerCache = new ProducerCache(this, camelContext); 165 } 166 ServiceHelper.startService(producerCache); 167 } 168 169 protected void doStop() throws Exception { 170 ServiceHelper.stopService(producerCache); 171 } 172 173 protected void doShutdown() throws Exception { 174 ServiceHelper.stopAndShutdownService(producerCache); 175 176 if (shutdownExecutorService && executorService != null) { 177 camelContext.getExecutorServiceManager().shutdownNow(executorService); 178 } 179 } 180 181 public boolean isStreaming() { 182 return streaming; 183 } 184 185 public void setStreaming(boolean streaming) { 186 this.streaming = streaming; 187 } 188 189 public boolean isIgnoreInvalidEndpoints() { 190 return ignoreInvalidEndpoints; 191 } 192 193 public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) { 194 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 195 } 196 197 public boolean isParallelProcessing() { 198 return parallelProcessing; 199 } 200 201 public void setParallelProcessing(boolean parallelProcessing) { 202 this.parallelProcessing = parallelProcessing; 203 } 204 205 public boolean isStopOnException() { 206 return stopOnException; 207 } 208 209 public void setStopOnException(boolean stopOnException) { 210 this.stopOnException = stopOnException; 211 } 212 213 public ExecutorService getExecutorService() { 214 return executorService; 215 } 216 217 public void setExecutorService(ExecutorService executorService) { 218 this.executorService = executorService; 219 } 220 221 public boolean isShutdownExecutorService() { 222 return shutdownExecutorService; 223 } 224 225 public void setShutdownExecutorService(boolean shutdownExecutorService) { 226 this.shutdownExecutorService = shutdownExecutorService; 227 } 228 229 public AggregationStrategy getAggregationStrategy() { 230 return aggregationStrategy; 231 } 232 233 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 234 this.aggregationStrategy = aggregationStrategy; 235 } 236 237 public long getTimeout() { 238 return timeout; 239 } 240 241 public void setTimeout(long timeout) { 242 this.timeout = timeout; 243 } 244 245 public Processor getOnPrepare() { 246 return onPrepare; 247 } 248 249 public void setOnPrepare(Processor onPrepare) { 250 this.onPrepare = onPrepare; 251 } 252 253 public boolean isShareUnitOfWork() { 254 return shareUnitOfWork; 255 } 256 257 public void setShareUnitOfWork(boolean shareUnitOfWork) { 258 this.shareUnitOfWork = shareUnitOfWork; 259 } 260 }