001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.ArrayList; 020 import java.util.Iterator; 021 import java.util.List; 022 import java.util.concurrent.ExecutorService; 023 024 import org.apache.camel.CamelContext; 025 import org.apache.camel.Endpoint; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.Processor; 028 import org.apache.camel.Producer; 029 import org.apache.camel.impl.ProducerCache; 030 import org.apache.camel.processor.aggregate.AggregationStrategy; 031 import org.apache.camel.spi.RouteContext; 032 import org.apache.camel.util.ExchangeHelper; 033 import org.apache.camel.util.ServiceHelper; 034 import org.slf4j.Logger; 035 import org.slf4j.LoggerFactory; 036 037 /** 038 * Implements a dynamic <a 039 * href="http://camel.apache.org/recipient-list.html">Recipient List</a> 040 * pattern where the list of actual endpoints to send a message exchange to are 041 * dependent on some dynamic expression. 042 * <p/> 043 * This implementation is a specialized {@link org.apache.camel.processor.MulticastProcessor} which is based 044 * on recipient lists. This implementation have to handle the fact the processors is not known at design time 045 * but evaluated at runtime from the dynamic recipient list. Therefore this implementation have to at runtime 046 * lookup endpoints and create producers which should act as the processors for the multicast processors which 047 * runs under the hood. Also this implementation supports the asynchronous routing engine which makes the code 048 * more trickier. 049 * 050 * @version 051 */ 052 public class RecipientListProcessor extends MulticastProcessor { 053 054 private static final transient Logger LOG = LoggerFactory.getLogger(RecipientListProcessor.class); 055 private final Iterator<Object> iter; 056 private boolean ignoreInvalidEndpoints; 057 private ProducerCache producerCache; 058 059 /** 060 * Class that represent each step in the recipient list to do 061 * <p/> 062 * This implementation ensures the provided producer is being released back in the producer cache when 063 * its done using it. 064 */ 065 static final class RecipientProcessorExchangePair implements ProcessorExchangePair { 066 private final int index; 067 private final Endpoint endpoint; 068 private final Producer producer; 069 private Processor prepared; 070 private final Exchange exchange; 071 private final ProducerCache producerCache; 072 073 private RecipientProcessorExchangePair(int index, ProducerCache producerCache, Endpoint endpoint, Producer producer, 074 Processor prepared, Exchange exchange) { 075 this.index = index; 076 this.producerCache = producerCache; 077 this.endpoint = endpoint; 078 this.producer = producer; 079 this.prepared = prepared; 080 this.exchange = exchange; 081 } 082 083 public int getIndex() { 084 return index; 085 } 086 087 public Exchange getExchange() { 088 return exchange; 089 } 090 091 public Producer getProducer() { 092 return producer; 093 } 094 095 public Processor getProcessor() { 096 return prepared; 097 } 098 099 public void begin() { 100 // we have already acquired and prepare the producer so we 101 LOG.trace("RecipientProcessorExchangePair #{} begin: {}", index, exchange); 102 } 103 104 public void done() { 105 LOG.trace("RecipientProcessorExchangePair #{} done: {}", index, exchange); 106 // when we are done we should release back in pool 107 try { 108 producerCache.releaseProducer(endpoint, producer); 109 } catch (Exception e) { 110 if (LOG.isDebugEnabled()) { 111 LOG.debug("Error releasing producer: " + producer + ". This exception will be ignored.", e); 112 } 113 } 114 } 115 116 } 117 118 public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter) { 119 super(camelContext, null); 120 this.producerCache = producerCache; 121 this.iter = iter; 122 } 123 124 public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy) { 125 super(camelContext, null, aggregationStrategy); 126 this.producerCache = producerCache; 127 this.iter = iter; 128 } 129 130 public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy, 131 boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, 132 boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) { 133 super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, 134 streaming, stopOnException, timeout, onPrepare, shareUnitOfWork); 135 this.producerCache = producerCache; 136 this.iter = iter; 137 } 138 139 public boolean isIgnoreInvalidEndpoints() { 140 return ignoreInvalidEndpoints; 141 } 142 143 public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) { 144 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 145 } 146 147 @Override 148 protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { 149 // here we iterate the recipient lists and create the exchange pair for each of those 150 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(); 151 152 // at first we must lookup the endpoint and acquire the producer which can send to the endpoint 153 int index = 0; 154 while (iter.hasNext()) { 155 Object recipient = iter.next(); 156 Endpoint endpoint; 157 Producer producer; 158 try { 159 endpoint = resolveEndpoint(exchange, recipient); 160 producer = producerCache.acquireProducer(endpoint); 161 } catch (Exception e) { 162 if (isIgnoreInvalidEndpoints()) { 163 if (LOG.isDebugEnabled()) { 164 LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e); 165 } 166 continue; 167 } else { 168 // failure so break out 169 throw e; 170 } 171 } 172 173 // then create the exchange pair 174 result.add(createProcessorExchangePair(index++, endpoint, producer, exchange)); 175 } 176 177 return result; 178 } 179 180 /** 181 * This logic is similar to MulticastProcessor but we have to return a RecipientProcessorExchangePair instead 182 */ 183 protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer, Exchange exchange) { 184 Processor prepared = producer; 185 186 // copy exchange, and do not share the unit of work 187 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 188 189 // if we share unit of work, we need to prepare the child exchange 190 if (isShareUnitOfWork()) { 191 prepareSharedUnitOfWork(copy, exchange); 192 } 193 194 // set property which endpoint we send to 195 setToEndpoint(copy, prepared); 196 197 // rework error handling to support fine grained error handling 198 RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; 199 prepared = createErrorHandler(routeContext, copy, prepared); 200 201 // invoke on prepare on the exchange if specified 202 if (onPrepare != null) { 203 try { 204 onPrepare.process(copy); 205 } catch (Exception e) { 206 copy.setException(e); 207 } 208 } 209 210 // and create the pair 211 return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy); 212 } 213 214 protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) { 215 // trim strings as end users might have added spaces between separators 216 if (recipient instanceof String) { 217 recipient = ((String) recipient).trim(); 218 } 219 return ExchangeHelper.resolveEndpoint(exchange, recipient); 220 } 221 222 protected void doStart() throws Exception { 223 super.doStart(); 224 if (producerCache == null) { 225 producerCache = new ProducerCache(this, getCamelContext()); 226 } 227 ServiceHelper.startService(producerCache); 228 } 229 230 protected void doStop() throws Exception { 231 ServiceHelper.stopService(producerCache); 232 super.doStop(); 233 } 234 235 protected void doShutdown() throws Exception { 236 ServiceHelper.stopAndShutdownService(producerCache); 237 super.doShutdown(); 238 } 239 240 @Override 241 public String toString() { 242 return "RecipientList"; 243 } 244 245 @Override 246 public String getTraceLabel() { 247 return "recipientList"; 248 } 249 }