001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.Iterator; 020import java.util.concurrent.ExecutorService; 021 022import org.apache.camel.AsyncCallback; 023import org.apache.camel.AsyncProcessor; 024import org.apache.camel.CamelContext; 025import org.apache.camel.Endpoint; 026import org.apache.camel.Exchange; 027import org.apache.camel.Expression; 028import org.apache.camel.Processor; 029import org.apache.camel.impl.EmptyProducerCache; 030import org.apache.camel.impl.ProducerCache; 031import org.apache.camel.processor.aggregate.AggregationStrategy; 032import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 033import org.apache.camel.spi.EndpointUtilizationStatistics; 034import org.apache.camel.spi.IdAware; 035import org.apache.camel.support.ServiceSupport; 036import org.apache.camel.util.AsyncProcessorHelper; 037import org.apache.camel.util.ExchangeHelper; 038import org.apache.camel.util.ObjectHelper; 039import org.apache.camel.util.ServiceHelper; 040import org.apache.camel.util.StringHelper; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import static org.apache.camel.util.ObjectHelper.notNull; 045 046/** 047 * Implements a dynamic <a 048 * href="http://camel.apache.org/recipient-list.html">Recipient List</a> 049 * pattern where the list of actual endpoints to send a message exchange to are 050 * dependent on some dynamic expression. 051 * 052 * @version 053 */ 054public class RecipientList extends ServiceSupport implements AsyncProcessor, IdAware { 055 056 private static final Logger LOG = LoggerFactory.getLogger(RecipientList.class); 057 private static final String IGNORE_DELIMITER_MARKER = "false"; 058 private final CamelContext camelContext; 059 private String id; 060 private ProducerCache producerCache; 061 private Expression expression; 062 private final String delimiter; 063 private boolean parallelProcessing; 064 private boolean parallelAggregate; 065 private boolean stopOnAggregateException; 066 private boolean stopOnException; 067 private boolean ignoreInvalidEndpoints; 068 private boolean streaming; 069 private long timeout; 070 private int cacheSize; 071 private Processor onPrepare; 072 private boolean shareUnitOfWork; 073 private ExecutorService executorService; 074 private boolean shutdownExecutorService; 075 private ExecutorService aggregateExecutorService; 076 private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy(); 077 078 public RecipientList(CamelContext camelContext) { 079 // use comma by default as delimiter 080 this(camelContext, ","); 081 } 082 083 public RecipientList(CamelContext camelContext, String delimiter) { 084 notNull(camelContext, "camelContext"); 085 StringHelper.notEmpty(delimiter, "delimiter"); 086 this.camelContext = camelContext; 087 this.delimiter = delimiter; 088 } 089 090 public RecipientList(CamelContext camelContext, Expression expression) { 091 // use comma by default as delimiter 092 this(camelContext, expression, ","); 093 } 094 095 public RecipientList(CamelContext camelContext, Expression expression, String delimiter) { 096 notNull(camelContext, "camelContext"); 097 ObjectHelper.notNull(expression, "expression"); 098 StringHelper.notEmpty(delimiter, "delimiter"); 099 this.camelContext = camelContext; 100 this.expression = expression; 101 this.delimiter = delimiter; 102 } 103 104 @Override 105 public String toString() { 106 return "RecipientList[" + (expression != null ? expression : "") + "]"; 107 } 108 109 public String getId() { 110 return id; 111 } 112 113 public void setId(String id) { 114 this.id = id; 115 } 116 117 public void process(Exchange exchange) throws Exception { 118 AsyncProcessorHelper.process(this, exchange); 119 } 120 121 public boolean process(Exchange exchange, AsyncCallback callback) { 122 if (!isStarted()) { 123 throw new IllegalStateException("RecipientList has not been started: " + this); 124 } 125 126 // use the evaluate expression result if exists 127 Object recipientList = exchange.removeProperty(Exchange.EVALUATE_EXPRESSION_RESULT); 128 if (recipientList == null && expression != null) { 129 // fallback and evaluate the expression 130 recipientList = expression.evaluate(exchange, Object.class); 131 } 132 133 return sendToRecipientList(exchange, recipientList, callback); 134 } 135 136 /** 137 * Sends the given exchange to the recipient list 138 */ 139 public boolean sendToRecipientList(Exchange exchange, Object recipientList, AsyncCallback callback) { 140 Iterator<Object> iter; 141 142 if (delimiter != null && delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) { 143 iter = ObjectHelper.createIterator(recipientList, null); 144 } else { 145 iter = ObjectHelper.createIterator(recipientList, delimiter); 146 } 147 148 RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(), 149 isParallelProcessing(), getExecutorService(), isShutdownExecutorService(), 150 isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(), 151 isStopOnAggregateException()) { 152 @Override 153 protected synchronized ExecutorService createAggregateExecutorService(String name) { 154 // use a shared executor service to avoid creating new thread pools 155 if (aggregateExecutorService == null) { 156 aggregateExecutorService = super.createAggregateExecutorService("RecipientList-AggregateTask"); 157 } 158 return aggregateExecutorService; 159 } 160 }; 161 rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints()); 162 163 // start the service 164 try { 165 ServiceHelper.startService(rlp); 166 } catch (Exception e) { 167 exchange.setException(e); 168 callback.done(true); 169 return true; 170 } 171 172 // now let the multicast process the exchange 173 return rlp.process(exchange, callback); 174 } 175 176 protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) { 177 // trim strings as end users might have added spaces between separators 178 if (recipient instanceof String) { 179 recipient = ((String)recipient).trim(); 180 } 181 return ExchangeHelper.resolveEndpoint(exchange, recipient); 182 } 183 184 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 185 return producerCache.getEndpointUtilizationStatistics(); 186 } 187 188 protected void doStart() throws Exception { 189 if (producerCache == null) { 190 if (cacheSize < 0) { 191 producerCache = new EmptyProducerCache(this, camelContext); 192 LOG.debug("RecipientList {} is not using ProducerCache", this); 193 } else if (cacheSize == 0) { 194 producerCache = new ProducerCache(this, camelContext); 195 LOG.debug("RecipientList {} using ProducerCache with default cache size", this); 196 } else { 197 producerCache = new ProducerCache(this, camelContext, cacheSize); 198 LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this, cacheSize); 199 } 200 } 201 ServiceHelper.startServices(aggregationStrategy, producerCache); 202 } 203 204 protected void doStop() throws Exception { 205 ServiceHelper.stopServices(producerCache, aggregationStrategy); 206 } 207 208 protected void doShutdown() throws Exception { 209 ServiceHelper.stopAndShutdownServices(producerCache, aggregationStrategy); 210 211 if (shutdownExecutorService && executorService != null) { 212 camelContext.getExecutorServiceManager().shutdownNow(executorService); 213 } 214 } 215 216 public Expression getExpression() { 217 return expression; 218 } 219 220 public String getDelimiter() { 221 return delimiter; 222 } 223 224 public boolean isStreaming() { 225 return streaming; 226 } 227 228 public void setStreaming(boolean streaming) { 229 this.streaming = streaming; 230 } 231 232 public boolean isIgnoreInvalidEndpoints() { 233 return ignoreInvalidEndpoints; 234 } 235 236 public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) { 237 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 238 } 239 240 public boolean isParallelProcessing() { 241 return parallelProcessing; 242 } 243 244 public void setParallelProcessing(boolean parallelProcessing) { 245 this.parallelProcessing = parallelProcessing; 246 } 247 248 public boolean isParallelAggregate() { 249 return parallelAggregate; 250 } 251 252 public void setParallelAggregate(boolean parallelAggregate) { 253 this.parallelAggregate = parallelAggregate; 254 } 255 256 public boolean isStopOnAggregateException() { 257 return stopOnAggregateException; 258 } 259 260 public void setStopOnAggregateException(boolean stopOnAggregateException) { 261 this.stopOnAggregateException = stopOnAggregateException; 262 } 263 264 public boolean isStopOnException() { 265 return stopOnException; 266 } 267 268 public void setStopOnException(boolean stopOnException) { 269 this.stopOnException = stopOnException; 270 } 271 272 public ExecutorService getExecutorService() { 273 return executorService; 274 } 275 276 public void setExecutorService(ExecutorService executorService) { 277 this.executorService = executorService; 278 } 279 280 public boolean isShutdownExecutorService() { 281 return shutdownExecutorService; 282 } 283 284 public void setShutdownExecutorService(boolean shutdownExecutorService) { 285 this.shutdownExecutorService = shutdownExecutorService; 286 } 287 288 public AggregationStrategy getAggregationStrategy() { 289 return aggregationStrategy; 290 } 291 292 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 293 this.aggregationStrategy = aggregationStrategy; 294 } 295 296 public long getTimeout() { 297 return timeout; 298 } 299 300 public void setTimeout(long timeout) { 301 this.timeout = timeout; 302 } 303 304 public Processor getOnPrepare() { 305 return onPrepare; 306 } 307 308 public void setOnPrepare(Processor onPrepare) { 309 this.onPrepare = onPrepare; 310 } 311 312 public boolean isShareUnitOfWork() { 313 return shareUnitOfWork; 314 } 315 316 public void setShareUnitOfWork(boolean shareUnitOfWork) { 317 this.shareUnitOfWork = shareUnitOfWork; 318 } 319 320 public int getCacheSize() { 321 return cacheSize; 322 } 323 324 public void setCacheSize(int cacheSize) { 325 this.cacheSize = cacheSize; 326 } 327}