001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.Iterator; 020import java.util.concurrent.ExecutorService; 021 022import org.apache.camel.AsyncCallback; 023import org.apache.camel.AsyncProcessor; 024import org.apache.camel.CamelContext; 025import org.apache.camel.Endpoint; 026import org.apache.camel.Exchange; 027import org.apache.camel.Expression; 028import org.apache.camel.Processor; 029import org.apache.camel.impl.EmptyProducerCache; 030import org.apache.camel.impl.ProducerCache; 031import org.apache.camel.processor.aggregate.AggregationStrategy; 032import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 033import org.apache.camel.spi.EndpointUtilizationStatistics; 034import org.apache.camel.spi.IdAware; 035import org.apache.camel.support.ServiceSupport; 036import org.apache.camel.util.AsyncProcessorHelper; 037import org.apache.camel.util.ExchangeHelper; 038import org.apache.camel.util.ObjectHelper; 039import org.apache.camel.util.ServiceHelper; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import static org.apache.camel.util.ObjectHelper.notNull; 044 045/** 046 * Implements a dynamic <a 047 * href="http://camel.apache.org/recipient-list.html">Recipient List</a> 048 * pattern where the list of actual endpoints to send a message exchange to are 049 * dependent on some dynamic expression. 050 * 051 * @version 052 */ 053public class RecipientList extends ServiceSupport implements AsyncProcessor, IdAware { 054 055 private static final Logger LOG = LoggerFactory.getLogger(RecipientList.class); 056 private static final String IGNORE_DELIMITER_MARKER = "false"; 057 private final CamelContext camelContext; 058 private String id; 059 private ProducerCache producerCache; 060 private Expression expression; 061 private final String delimiter; 062 private boolean parallelProcessing; 063 private boolean parallelAggregate; 064 private boolean stopOnException; 065 private boolean ignoreInvalidEndpoints; 066 private boolean streaming; 067 private long timeout; 068 private int cacheSize; 069 private Processor onPrepare; 070 private boolean shareUnitOfWork; 071 private ExecutorService executorService; 072 private boolean shutdownExecutorService; 073 private ExecutorService aggregateExecutorService; 074 private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy(); 075 076 public RecipientList(CamelContext camelContext) { 077 // use comma by default as delimiter 078 this(camelContext, ","); 079 } 080 081 public RecipientList(CamelContext camelContext, String delimiter) { 082 notNull(camelContext, "camelContext"); 083 ObjectHelper.notEmpty(delimiter, "delimiter"); 084 this.camelContext = camelContext; 085 this.delimiter = delimiter; 086 } 087 088 public RecipientList(CamelContext camelContext, Expression expression) { 089 // use comma by default as delimiter 090 this(camelContext, expression, ","); 091 } 092 093 public RecipientList(CamelContext camelContext, Expression expression, String delimiter) { 094 notNull(camelContext, "camelContext"); 095 ObjectHelper.notNull(expression, "expression"); 096 ObjectHelper.notEmpty(delimiter, "delimiter"); 097 this.camelContext = camelContext; 098 this.expression = expression; 099 this.delimiter = delimiter; 100 } 101 102 @Override 103 public String toString() { 104 return "RecipientList[" + (expression != null ? expression : "") + "]"; 105 } 106 107 public String getId() { 108 return id; 109 } 110 111 public void setId(String id) { 112 this.id = id; 113 } 114 115 public void process(Exchange exchange) throws Exception { 116 AsyncProcessorHelper.process(this, exchange); 117 } 118 119 public boolean process(Exchange exchange, AsyncCallback callback) { 120 if (!isStarted()) { 121 throw new IllegalStateException("RecipientList has not been started: " + this); 122 } 123 124 // use the evaluate expression result if exists 125 Object recipientList = exchange.removeProperty(Exchange.EVALUATE_EXPRESSION_RESULT); 126 if (recipientList == null && expression != null) { 127 // fallback and evaluate the expression 128 recipientList = expression.evaluate(exchange, Object.class); 129 } 130 131 return sendToRecipientList(exchange, recipientList, callback); 132 } 133 134 /** 135 * Sends the given exchange to the recipient list 136 */ 137 public boolean sendToRecipientList(Exchange exchange, Object recipientList, AsyncCallback callback) { 138 Iterator<Object> iter; 139 140 if (delimiter != null && delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) { 141 iter = ObjectHelper.createIterator(recipientList, null); 142 } else { 143 iter = ObjectHelper.createIterator(recipientList, delimiter); 144 } 145 146 RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(), 147 isParallelProcessing(), getExecutorService(), isShutdownExecutorService(), 148 isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate()) { 149 @Override 150 protected synchronized ExecutorService createAggregateExecutorService(String name) { 151 // use a shared executor service to avoid creating new thread pools 152 if (aggregateExecutorService == null) { 153 aggregateExecutorService = super.createAggregateExecutorService("RecipientList-AggregateTask"); 154 } 155 return aggregateExecutorService; 156 } 157 }; 158 rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints()); 159 160 // start the service 161 try { 162 ServiceHelper.startService(rlp); 163 } catch (Exception e) { 164 exchange.setException(e); 165 callback.done(true); 166 return true; 167 } 168 169 // now let the multicast process the exchange 170 return rlp.process(exchange, callback); 171 } 172 173 protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) { 174 // trim strings as end users might have added spaces between separators 175 if (recipient instanceof String) { 176 recipient = ((String)recipient).trim(); 177 } 178 return ExchangeHelper.resolveEndpoint(exchange, recipient); 179 } 180 181 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 182 return producerCache.getEndpointUtilizationStatistics(); 183 } 184 185 protected void doStart() throws Exception { 186 if (producerCache == null) { 187 if (cacheSize < 0) { 188 producerCache = new EmptyProducerCache(this, camelContext); 189 LOG.debug("RecipientList {} is not using ProducerCache", this); 190 } else if (cacheSize == 0) { 191 producerCache = new ProducerCache(this, camelContext); 192 LOG.debug("RecipientList {} using ProducerCache with default cache size", this); 193 } else { 194 producerCache = new ProducerCache(this, camelContext, cacheSize); 195 LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this, cacheSize); 196 } 197 } 198 ServiceHelper.startServices(aggregationStrategy, producerCache); 199 } 200 201 protected void doStop() throws Exception { 202 ServiceHelper.stopServices(producerCache, aggregationStrategy); 203 } 204 205 protected void doShutdown() throws Exception { 206 ServiceHelper.stopAndShutdownServices(producerCache, aggregationStrategy); 207 208 if (shutdownExecutorService && executorService != null) { 209 camelContext.getExecutorServiceManager().shutdownNow(executorService); 210 } 211 } 212 213 public Expression getExpression() { 214 return expression; 215 } 216 217 public String getDelimiter() { 218 return delimiter; 219 } 220 221 public boolean isStreaming() { 222 return streaming; 223 } 224 225 public void setStreaming(boolean streaming) { 226 this.streaming = streaming; 227 } 228 229 public boolean isIgnoreInvalidEndpoints() { 230 return ignoreInvalidEndpoints; 231 } 232 233 public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) { 234 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 235 } 236 237 public boolean isParallelProcessing() { 238 return parallelProcessing; 239 } 240 241 public void setParallelProcessing(boolean parallelProcessing) { 242 this.parallelProcessing = parallelProcessing; 243 } 244 245 public boolean isParallelAggregate() { 246 return parallelAggregate; 247 } 248 249 public void setParallelAggregate(boolean parallelAggregate) { 250 this.parallelAggregate = parallelAggregate; 251 } 252 253 public boolean isStopOnException() { 254 return stopOnException; 255 } 256 257 public void setStopOnException(boolean stopOnException) { 258 this.stopOnException = stopOnException; 259 } 260 261 public ExecutorService getExecutorService() { 262 return executorService; 263 } 264 265 public void setExecutorService(ExecutorService executorService) { 266 this.executorService = executorService; 267 } 268 269 public boolean isShutdownExecutorService() { 270 return shutdownExecutorService; 271 } 272 273 public void setShutdownExecutorService(boolean shutdownExecutorService) { 274 this.shutdownExecutorService = shutdownExecutorService; 275 } 276 277 public AggregationStrategy getAggregationStrategy() { 278 return aggregationStrategy; 279 } 280 281 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 282 this.aggregationStrategy = aggregationStrategy; 283 } 284 285 public long getTimeout() { 286 return timeout; 287 } 288 289 public void setTimeout(long timeout) { 290 this.timeout = timeout; 291 } 292 293 public Processor getOnPrepare() { 294 return onPrepare; 295 } 296 297 public void setOnPrepare(Processor onPrepare) { 298 this.onPrepare = onPrepare; 299 } 300 301 public boolean isShareUnitOfWork() { 302 return shareUnitOfWork; 303 } 304 305 public void setShareUnitOfWork(boolean shareUnitOfWork) { 306 this.shareUnitOfWork = shareUnitOfWork; 307 } 308 309 public int getCacheSize() { 310 return cacheSize; 311 } 312 313 public void setCacheSize(int cacheSize) { 314 this.cacheSize = cacheSize; 315 } 316}