001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import org.apache.camel.AsyncCallback; 020import org.apache.camel.AsyncProcessor; 021import org.apache.camel.AsyncProducerCallback; 022import org.apache.camel.CamelContext; 023import org.apache.camel.CamelContextAware; 024import org.apache.camel.Endpoint; 025import org.apache.camel.Exchange; 026import org.apache.camel.ExchangePattern; 027import org.apache.camel.Expression; 028import org.apache.camel.NoTypeConversionAvailableException; 029import org.apache.camel.Producer; 030import org.apache.camel.impl.EmptyProducerCache; 031import org.apache.camel.impl.ProducerCache; 032import org.apache.camel.spi.EndpointUtilizationStatistics; 033import org.apache.camel.spi.IdAware; 034import org.apache.camel.support.ServiceSupport; 035import org.apache.camel.util.AsyncProcessorHelper; 036import org.apache.camel.util.EndpointHelper; 037import org.apache.camel.util.ExchangeHelper; 038import org.apache.camel.util.ServiceHelper; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * Processor for forwarding exchanges to a dynamic endpoint destination. 044 * 045 * @see org.apache.camel.processor.SendProcessor 046 */ 047public class SendDynamicProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware { 048 protected static final Logger LOG = LoggerFactory.getLogger(SendDynamicProcessor.class); 049 protected CamelContext camelContext; 050 protected final String uri; 051 protected final Expression expression; 052 protected ExchangePattern pattern; 053 protected ProducerCache producerCache; 054 protected String id; 055 protected boolean ignoreInvalidEndpoint; 056 protected int cacheSize; 057 058 public SendDynamicProcessor(Expression expression) { 059 this.uri = null; 060 this.expression = expression; 061 } 062 063 public SendDynamicProcessor(String uri, Expression expression) { 064 this.uri = uri; 065 this.expression = expression; 066 } 067 068 @Override 069 public String toString() { 070 return "sendTo(" + getExpression() + ")"; 071 } 072 073 public String getId() { 074 return id; 075 } 076 077 public void setId(String id) { 078 this.id = id; 079 } 080 081 public void process(final Exchange exchange) throws Exception { 082 AsyncProcessorHelper.process(this, exchange); 083 } 084 085 public boolean process(Exchange exchange, final AsyncCallback callback) { 086 if (!isStarted()) { 087 exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this)); 088 callback.done(true); 089 return true; 090 } 091 092 // we should preserve existing MEP so remember old MEP 093 // if you want to permanently to change the MEP then use .setExchangePattern in the DSL 094 final ExchangePattern existingPattern = exchange.getPattern(); 095 096 // which endpoint to send to 097 final Endpoint endpoint; 098 final ExchangePattern destinationExchangePattern; 099 100 // use dynamic endpoint so calculate the endpoint to use 101 Object recipient = null; 102 try { 103 recipient = expression.evaluate(exchange, Object.class); 104 endpoint = resolveEndpoint(exchange, recipient); 105 if (endpoint == null) { 106 if (LOG.isDebugEnabled()) { 107 LOG.debug("Send dynamic evaluated as null so cannot send to any endpoint"); 108 } 109 // no endpoint to send to, so ignore 110 callback.done(true); 111 return true; 112 } 113 destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(endpoint.getEndpointUri()); 114 } catch (Throwable e) { 115 if (isIgnoreInvalidEndpoint()) { 116 if (LOG.isDebugEnabled()) { 117 LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e); 118 } 119 } else { 120 exchange.setException(e); 121 } 122 callback.done(true); 123 return true; 124 } 125 126 // send the exchange to the destination using the producer cache 127 return producerCache.doInAsyncProducer(endpoint, exchange, pattern, callback, new AsyncProducerCallback() { 128 public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange, 129 ExchangePattern pattern, final AsyncCallback callback) { 130 final Exchange target = configureExchange(exchange, pattern, destinationExchangePattern, endpoint); 131 LOG.debug(">>>> {} {}", endpoint, exchange); 132 return asyncProducer.process(target, new AsyncCallback() { 133 public void done(boolean doneSync) { 134 // restore previous MEP 135 target.setPattern(existingPattern); 136 // signal we are done 137 callback.done(doneSync); 138 } 139 }); 140 } 141 }); 142 } 143 144 protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { 145 // trim strings as end users might have added spaces between separators 146 if (recipient instanceof String) { 147 recipient = ((String) recipient).trim(); 148 } else if (recipient instanceof Endpoint) { 149 return (Endpoint) recipient; 150 } else if (recipient != null) { 151 // convert to a string type we can work with 152 recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); 153 } 154 155 if (recipient != null) { 156 return ExchangeHelper.resolveEndpoint(exchange, recipient); 157 } else { 158 return null; 159 } 160 } 161 162 protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern, ExchangePattern destinationExchangePattern, Endpoint endpoint) { 163 // destination exchange pattern overrides pattern 164 if (destinationExchangePattern != null) { 165 exchange.setPattern(destinationExchangePattern); 166 } else if (pattern != null) { 167 exchange.setPattern(pattern); 168 } 169 // set property which endpoint we send to 170 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); 171 return exchange; 172 } 173 174 protected void doStart() throws Exception { 175 if (producerCache == null) { 176 if (cacheSize < 0) { 177 producerCache = new EmptyProducerCache(this, camelContext); 178 LOG.debug("DynamicSendTo {} is not using ProducerCache", this); 179 } else if (cacheSize == 0) { 180 producerCache = new ProducerCache(this, camelContext); 181 LOG.debug("DynamicSendTo {} using ProducerCache with default cache size", this); 182 } else { 183 producerCache = new ProducerCache(this, camelContext, cacheSize); 184 LOG.debug("DynamicSendTo {} using ProducerCache with cacheSize={}", this, cacheSize); 185 } 186 } 187 ServiceHelper.startService(producerCache); 188 } 189 190 protected void doStop() throws Exception { 191 ServiceHelper.stopServices(producerCache); 192 } 193 194 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 195 return producerCache.getEndpointUtilizationStatistics(); 196 } 197 198 public CamelContext getCamelContext() { 199 return camelContext; 200 } 201 202 public void setCamelContext(CamelContext camelContext) { 203 this.camelContext = camelContext; 204 } 205 206 public String getUri() { 207 return uri; 208 } 209 210 public Expression getExpression() { 211 return expression; 212 } 213 214 public ExchangePattern getPattern() { 215 return pattern; 216 } 217 218 public void setPattern(ExchangePattern pattern) { 219 this.pattern = pattern; 220 } 221 222 public boolean isIgnoreInvalidEndpoint() { 223 return ignoreInvalidEndpoint; 224 } 225 226 public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) { 227 this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; 228 } 229 230 public int getCacheSize() { 231 return cacheSize; 232 } 233 234 public void setCacheSize(int cacheSize) { 235 this.cacheSize = cacheSize; 236 } 237}