001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.io.UnsupportedEncodingException; 020import java.net.MalformedURLException; 021import java.net.URISyntaxException; 022import java.util.ArrayList; 023import java.util.Iterator; 024import java.util.List; 025import java.util.concurrent.ExecutorService; 026 027import org.apache.camel.CamelContext; 028import org.apache.camel.Endpoint; 029import org.apache.camel.Exchange; 030import org.apache.camel.ExchangePattern; 031import org.apache.camel.Processor; 032import org.apache.camel.Producer; 033import org.apache.camel.impl.ProducerCache; 034import org.apache.camel.processor.aggregate.AggregationStrategy; 035import org.apache.camel.spi.RouteContext; 036import org.apache.camel.util.EndpointHelper; 037import org.apache.camel.util.ExchangeHelper; 038import org.apache.camel.util.MessageHelper; 039import org.apache.camel.util.ServiceHelper; 040import org.apache.camel.util.URISupport; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * Implements a dynamic <a 046 * href="http://camel.apache.org/recipient-list.html">Recipient List</a> 047 * pattern where the list of actual endpoints to send a message exchange to are 048 * dependent on some dynamic expression. 049 * <p/> 050 * This implementation is a specialized {@link org.apache.camel.processor.MulticastProcessor} which is based 051 * on recipient lists. This implementation have to handle the fact the processors is not known at design time 052 * but evaluated at runtime from the dynamic recipient list. Therefore this implementation have to at runtime 053 * lookup endpoints and create producers which should act as the processors for the multicast processors which 054 * runs under the hood. Also this implementation supports the asynchronous routing engine which makes the code 055 * more trickier. 056 * 057 * @version 058 */ 059public class RecipientListProcessor extends MulticastProcessor { 060 061 private static final Logger LOG = LoggerFactory.getLogger(RecipientListProcessor.class); 062 private final Iterator<Object> iter; 063 private boolean ignoreInvalidEndpoints; 064 private ProducerCache producerCache; 065 066 /** 067 * Class that represent each step in the recipient list to do 068 * <p/> 069 * This implementation ensures the provided producer is being released back in the producer cache when 070 * its done using it. 071 */ 072 static final class RecipientProcessorExchangePair implements ProcessorExchangePair { 073 private final int index; 074 private final Endpoint endpoint; 075 private final Producer producer; 076 private Processor prepared; 077 private final Exchange exchange; 078 private final ProducerCache producerCache; 079 private final ExchangePattern pattern; 080 private volatile ExchangePattern originalPattern; 081 082 private RecipientProcessorExchangePair(int index, ProducerCache producerCache, Endpoint endpoint, Producer producer, 083 Processor prepared, Exchange exchange, ExchangePattern pattern) { 084 this.index = index; 085 this.producerCache = producerCache; 086 this.endpoint = endpoint; 087 this.producer = producer; 088 this.prepared = prepared; 089 this.exchange = exchange; 090 this.pattern = pattern; 091 } 092 093 public int getIndex() { 094 return index; 095 } 096 097 public Exchange getExchange() { 098 return exchange; 099 } 100 101 public Producer getProducer() { 102 return producer; 103 } 104 105 public Processor getProcessor() { 106 return prepared; 107 } 108 109 public void begin() { 110 // we have already acquired and prepare the producer 111 LOG.trace("RecipientProcessorExchangePair #{} begin: {}", index, exchange); 112 exchange.setProperty(Exchange.RECIPIENT_LIST_ENDPOINT, endpoint.getEndpointUri()); 113 // ensure stream caching is reset 114 MessageHelper.resetStreamCache(exchange.getIn()); 115 // if the MEP on the endpoint is different then 116 if (pattern != null) { 117 originalPattern = exchange.getPattern(); 118 LOG.trace("Using exchangePattern: {} on exchange: {}", pattern, exchange); 119 exchange.setPattern(pattern); 120 } 121 } 122 123 public void done() { 124 LOG.trace("RecipientProcessorExchangePair #{} done: {}", index, exchange); 125 try { 126 // preserve original MEP 127 if (originalPattern != null) { 128 exchange.setPattern(originalPattern); 129 } 130 // when we are done we should release back in pool 131 producerCache.releaseProducer(endpoint, producer); 132 } catch (Exception e) { 133 if (LOG.isDebugEnabled()) { 134 LOG.debug("Error releasing producer: " + producer + ". This exception will be ignored.", e); 135 } 136 } 137 } 138 139 } 140 141 public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter) { 142 super(camelContext, null); 143 this.producerCache = producerCache; 144 this.iter = iter; 145 } 146 147 public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy) { 148 super(camelContext, null, aggregationStrategy); 149 this.producerCache = producerCache; 150 this.iter = iter; 151 } 152 153 @Deprecated 154 public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy, 155 boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, 156 boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) { 157 super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, 158 streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false); 159 this.producerCache = producerCache; 160 this.iter = iter; 161 } 162 163 public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy, 164 boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, 165 boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate) { 166 super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, 167 streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, parallelAggregate); 168 this.producerCache = producerCache; 169 this.iter = iter; 170 } 171 172 public boolean isIgnoreInvalidEndpoints() { 173 return ignoreInvalidEndpoints; 174 } 175 176 public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) { 177 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 178 } 179 180 @Override 181 protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { 182 // here we iterate the recipient lists and create the exchange pair for each of those 183 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(); 184 185 // at first we must lookup the endpoint and acquire the producer which can send to the endpoint 186 int index = 0; 187 while (iter.hasNext()) { 188 Object recipient = iter.next(); 189 Endpoint endpoint; 190 Producer producer; 191 ExchangePattern pattern; 192 try { 193 endpoint = resolveEndpoint(exchange, recipient); 194 pattern = resolveExchangePattern(recipient); 195 producer = producerCache.acquireProducer(endpoint); 196 } catch (Exception e) { 197 if (isIgnoreInvalidEndpoints()) { 198 if (LOG.isDebugEnabled()) { 199 LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e); 200 } 201 continue; 202 } else { 203 // failure so break out 204 throw e; 205 } 206 } 207 208 // then create the exchange pair 209 result.add(createProcessorExchangePair(index++, endpoint, producer, exchange, pattern)); 210 } 211 212 return result; 213 } 214 215 /** 216 * This logic is similar to MulticastProcessor but we have to return a RecipientProcessorExchangePair instead 217 */ 218 protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer, Exchange exchange, ExchangePattern pattern) { 219 Processor prepared = producer; 220 221 // copy exchange, and do not share the unit of work 222 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 223 224 // if we share unit of work, we need to prepare the child exchange 225 if (isShareUnitOfWork()) { 226 prepareSharedUnitOfWork(copy, exchange); 227 } 228 229 // set property which endpoint we send to 230 setToEndpoint(copy, prepared); 231 232 // rework error handling to support fine grained error handling 233 RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; 234 prepared = createErrorHandler(routeContext, copy, prepared); 235 236 // invoke on prepare on the exchange if specified 237 if (onPrepare != null) { 238 try { 239 onPrepare.process(copy); 240 } catch (Exception e) { 241 copy.setException(e); 242 } 243 } 244 245 // and create the pair 246 return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy, pattern); 247 } 248 249 protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) { 250 // trim strings as end users might have added spaces between separators 251 if (recipient instanceof String) { 252 recipient = ((String) recipient).trim(); 253 } 254 return ExchangeHelper.resolveEndpoint(exchange, recipient); 255 } 256 257 protected ExchangePattern resolveExchangePattern(Object recipient) throws UnsupportedEncodingException, URISyntaxException, MalformedURLException { 258 // trim strings as end users might have added spaces between separators 259 if (recipient instanceof String) { 260 String s = ((String) recipient).trim(); 261 // see if exchangePattern is a parameter in the url 262 s = URISupport.normalizeUri(s); 263 return EndpointHelper.resolveExchangePatternFromUrl(s); 264 } 265 return null; 266 } 267 268 protected void doStart() throws Exception { 269 super.doStart(); 270 if (producerCache == null) { 271 producerCache = new ProducerCache(this, getCamelContext()); 272 } 273 ServiceHelper.startService(producerCache); 274 } 275 276 protected void doStop() throws Exception { 277 ServiceHelper.stopService(producerCache); 278 super.doStop(); 279 } 280 281 protected void doShutdown() throws Exception { 282 ServiceHelper.stopAndShutdownService(producerCache); 283 super.doShutdown(); 284 } 285 286 @Override 287 public String toString() { 288 return "RecipientList"; 289 } 290 291 @Override 292 public String getTraceLabel() { 293 return "recipientList"; 294 } 295}