001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import org.apache.camel.AsyncCallback; 020import org.apache.camel.AsyncProcessor; 021import org.apache.camel.AsyncProducerCallback; 022import org.apache.camel.CamelContext; 023import org.apache.camel.CamelContextAware; 024import org.apache.camel.Endpoint; 025import org.apache.camel.Exchange; 026import org.apache.camel.ExchangePattern; 027import org.apache.camel.Expression; 028import org.apache.camel.NoTypeConversionAvailableException; 029import org.apache.camel.Producer; 030import org.apache.camel.impl.EmptyProducerCache; 031import org.apache.camel.impl.ProducerCache; 032import org.apache.camel.spi.EndpointUtilizationStatistics; 033import org.apache.camel.spi.IdAware; 034import org.apache.camel.support.ServiceSupport; 035import org.apache.camel.util.AsyncProcessorHelper; 036import org.apache.camel.util.EndpointHelper; 037import org.apache.camel.util.ExchangeHelper; 038import org.apache.camel.util.ServiceHelper; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * Processor for forwarding exchanges to a dynamic endpoint destination. 044 * 045 * @see org.apache.camel.processor.SendProcessor 046 */ 047public class SendDynamicProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware { 048 protected static final Logger LOG = LoggerFactory.getLogger(SendDynamicProcessor.class); 049 protected CamelContext camelContext; 050 protected final String uri; 051 protected final Expression expression; 052 protected ExchangePattern pattern; 053 protected ProducerCache producerCache; 054 protected String id; 055 protected boolean ignoreInvalidEndpoint; 056 protected int cacheSize; 057 058 public SendDynamicProcessor(Expression expression) { 059 this.uri = null; 060 this.expression = expression; 061 } 062 063 public SendDynamicProcessor(String uri, Expression expression) { 064 this.uri = uri; 065 this.expression = expression; 066 } 067 068 @Override 069 public String toString() { 070 return "sendTo(" + getExpression() + ")"; 071 } 072 073 public String getId() { 074 return id; 075 } 076 077 public void setId(String id) { 078 this.id = id; 079 } 080 081 public void process(final Exchange exchange) throws Exception { 082 AsyncProcessorHelper.process(this, exchange); 083 } 084 085 public boolean process(Exchange exchange, final AsyncCallback callback) { 086 if (!isStarted()) { 087 exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this)); 088 callback.done(true); 089 return true; 090 } 091 092 // we should preserve existing MEP so remember old MEP 093 // if you want to permanently to change the MEP then use .setExchangePattern in the DSL 094 final ExchangePattern existingPattern = exchange.getPattern(); 095 096 // which endpoint to send to 097 final Endpoint endpoint; 098 final ExchangePattern destinationExchangePattern; 099 100 // use dynamic endpoint so calculate the endpoint to use 101 Object recipient = null; 102 try { 103 recipient = expression.evaluate(exchange, Object.class); 104 endpoint = resolveEndpoint(exchange, recipient); 105 destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(endpoint.getEndpointUri()); 106 } catch (Throwable e) { 107 if (isIgnoreInvalidEndpoint()) { 108 if (LOG.isDebugEnabled()) { 109 LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e); 110 } 111 } else { 112 exchange.setException(e); 113 } 114 callback.done(true); 115 return true; 116 } 117 118 // send the exchange to the destination using the producer cache 119 return producerCache.doInAsyncProducer(endpoint, exchange, pattern, callback, new AsyncProducerCallback() { 120 public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange, 121 ExchangePattern pattern, final AsyncCallback callback) { 122 final Exchange target = configureExchange(exchange, pattern, destinationExchangePattern, endpoint); 123 LOG.debug(">>>> {} {}", endpoint, exchange); 124 return asyncProducer.process(target, new AsyncCallback() { 125 public void done(boolean doneSync) { 126 // restore previous MEP 127 target.setPattern(existingPattern); 128 // signal we are done 129 callback.done(doneSync); 130 } 131 }); 132 } 133 }); 134 } 135 136 protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { 137 // trim strings as end users might have added spaces between separators 138 if (recipient instanceof String) { 139 recipient = ((String) recipient).trim(); 140 } else if (recipient instanceof Endpoint) { 141 return (Endpoint) recipient; 142 } else { 143 // convert to a string type we can work with 144 recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); 145 } 146 147 return ExchangeHelper.resolveEndpoint(exchange, recipient); 148 } 149 150 protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern, ExchangePattern destinationExchangePattern, Endpoint endpoint) { 151 // destination exchange pattern overrides pattern 152 if (destinationExchangePattern != null) { 153 exchange.setPattern(destinationExchangePattern); 154 } else if (pattern != null) { 155 exchange.setPattern(pattern); 156 } 157 // set property which endpoint we send to 158 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); 159 return exchange; 160 } 161 162 protected void doStart() throws Exception { 163 if (producerCache == null) { 164 if (cacheSize < 0) { 165 producerCache = new EmptyProducerCache(this, camelContext); 166 LOG.debug("DynamicSendTo {} is not using ProducerCache", this); 167 } else if (cacheSize == 0) { 168 producerCache = new ProducerCache(this, camelContext); 169 LOG.debug("DynamicSendTo {} using ProducerCache with default cache size", this); 170 } else { 171 producerCache = new ProducerCache(this, camelContext, cacheSize); 172 LOG.debug("DynamicSendTo {} using ProducerCache with cacheSize={}", this, cacheSize); 173 } 174 } 175 ServiceHelper.startService(producerCache); 176 } 177 178 protected void doStop() throws Exception { 179 ServiceHelper.stopServices(producerCache); 180 } 181 182 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 183 return producerCache.getEndpointUtilizationStatistics(); 184 } 185 186 public CamelContext getCamelContext() { 187 return camelContext; 188 } 189 190 public void setCamelContext(CamelContext camelContext) { 191 this.camelContext = camelContext; 192 } 193 194 public String getUri() { 195 return uri; 196 } 197 198 public Expression getExpression() { 199 return expression; 200 } 201 202 public ExchangePattern getPattern() { 203 return pattern; 204 } 205 206 public void setPattern(ExchangePattern pattern) { 207 this.pattern = pattern; 208 } 209 210 public boolean isIgnoreInvalidEndpoint() { 211 return ignoreInvalidEndpoint; 212 } 213 214 public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) { 215 this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; 216 } 217 218 public int getCacheSize() { 219 return cacheSize; 220 } 221 222 public void setCacheSize(int cacheSize) { 223 this.cacheSize = cacheSize; 224 } 225}