001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import org.apache.camel.AsyncCallback; 020import org.apache.camel.AsyncProcessor; 021import org.apache.camel.CamelContext; 022import org.apache.camel.CamelContextAware; 023import org.apache.camel.CamelExchangeException; 024import org.apache.camel.Endpoint; 025import org.apache.camel.Exchange; 026import org.apache.camel.ExchangePattern; 027import org.apache.camel.Expression; 028import org.apache.camel.Producer; 029import org.apache.camel.impl.DefaultExchange; 030import org.apache.camel.impl.EmptyProducerCache; 031import org.apache.camel.impl.ProducerCache; 032import org.apache.camel.processor.aggregate.AggregationStrategy; 033import org.apache.camel.spi.EndpointUtilizationStatistics; 034import org.apache.camel.spi.IdAware; 035import org.apache.camel.support.ServiceSupport; 036import org.apache.camel.util.AsyncProcessorConverterHelper; 037import org.apache.camel.util.AsyncProcessorHelper; 038import org.apache.camel.util.EventHelper; 039import org.apache.camel.util.ExchangeHelper; 040import org.apache.camel.util.ServiceHelper; 041import org.apache.camel.util.StopWatch; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; 046 047/** 048 * A content enricher that enriches input data by first obtaining additional 049 * data from a <i>resource</i> represented by an endpoint <code>producer</code> 050 * and second by aggregating input data and additional data. Aggregation of 051 * input data and additional data is delegated to an {@link AggregationStrategy} 052 * object. 053 * <p/> 054 * Uses a {@link org.apache.camel.Producer} to obtain the additional data as opposed to {@link PollEnricher} 055 * that uses a {@link org.apache.camel.PollingConsumer}. 056 * 057 * @see PollEnricher 058 */ 059public class Enricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware { 060 061 private static final Logger LOG = LoggerFactory.getLogger(Enricher.class); 062 private CamelContext camelContext; 063 private String id; 064 private ProducerCache producerCache; 065 private final Expression expression; 066 private AggregationStrategy aggregationStrategy; 067 private boolean aggregateOnException; 068 private boolean shareUnitOfWork; 069 private int cacheSize; 070 private boolean ignoreInvalidEndpoint; 071 072 public Enricher(Expression expression) { 073 this.expression = expression; 074 } 075 076 public CamelContext getCamelContext() { 077 return camelContext; 078 } 079 080 public void setCamelContext(CamelContext camelContext) { 081 this.camelContext = camelContext; 082 } 083 084 public String getId() { 085 return id; 086 } 087 088 public void setId(String id) { 089 this.id = id; 090 } 091 092 public Expression getExpression() { 093 return expression; 094 } 095 096 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 097 return producerCache.getEndpointUtilizationStatistics(); 098 } 099 100 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 101 this.aggregationStrategy = aggregationStrategy; 102 } 103 104 public AggregationStrategy getAggregationStrategy() { 105 return aggregationStrategy; 106 } 107 108 public boolean isAggregateOnException() { 109 return aggregateOnException; 110 } 111 112 public void setAggregateOnException(boolean aggregateOnException) { 113 this.aggregateOnException = aggregateOnException; 114 } 115 116 public boolean isShareUnitOfWork() { 117 return shareUnitOfWork; 118 } 119 120 public void setShareUnitOfWork(boolean shareUnitOfWork) { 121 this.shareUnitOfWork = shareUnitOfWork; 122 } 123 124 public int getCacheSize() { 125 return cacheSize; 126 } 127 128 public void setCacheSize(int cacheSize) { 129 this.cacheSize = cacheSize; 130 } 131 132 public boolean isIgnoreInvalidEndpoint() { 133 return ignoreInvalidEndpoint; 134 } 135 136 public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) { 137 this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; 138 } 139 140 public void process(Exchange exchange) throws Exception { 141 AsyncProcessorHelper.process(this, exchange); 142 } 143 144 /** 145 * Enriches the input data (<code>exchange</code>) by first obtaining 146 * additional data from an endpoint represented by an endpoint 147 * <code>producer</code> and second by aggregating input data and additional 148 * data. Aggregation of input data and additional data is delegated to an 149 * {@link AggregationStrategy} object set at construction time. If the 150 * message exchange with the resource endpoint fails then no aggregation 151 * will be done and the failed exchange content is copied over to the 152 * original message exchange. 153 * 154 * @param exchange input data. 155 */ 156 public boolean process(final Exchange exchange, final AsyncCallback callback) { 157 // which producer to use 158 final Producer producer; 159 final Endpoint endpoint; 160 161 // use dynamic endpoint so calculate the endpoint to use 162 Object recipient = null; 163 try { 164 recipient = expression.evaluate(exchange, Object.class); 165 endpoint = resolveEndpoint(exchange, recipient); 166 // acquire the consumer from the cache 167 producer = producerCache.acquireProducer(endpoint); 168 } catch (Throwable e) { 169 if (isIgnoreInvalidEndpoint()) { 170 if (LOG.isDebugEnabled()) { 171 LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e); 172 } 173 } else { 174 exchange.setException(e); 175 } 176 callback.done(true); 177 return true; 178 } 179 180 final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut); 181 final Endpoint destination = producer.getEndpoint(); 182 183 EventHelper.notifyExchangeSending(exchange.getContext(), resourceExchange, destination); 184 // record timing for sending the exchange using the producer 185 final StopWatch watch = new StopWatch(); 186 AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer); 187 boolean sync = ap.process(resourceExchange, new AsyncCallback() { 188 public void done(boolean doneSync) { 189 // we only have to handle async completion of the routing slip 190 if (doneSync) { 191 return; 192 } 193 194 // emit event that the exchange was sent to the endpoint 195 long timeTaken = watch.stop(); 196 EventHelper.notifyExchangeSent(resourceExchange.getContext(), resourceExchange, destination, timeTaken); 197 198 if (!isAggregateOnException() && resourceExchange.isFailed()) { 199 // copy resource exchange onto original exchange (preserving pattern) 200 copyResultsPreservePattern(exchange, resourceExchange); 201 } else { 202 prepareResult(exchange); 203 try { 204 // prepare the exchanges for aggregation 205 ExchangeHelper.prepareAggregation(exchange, resourceExchange); 206 207 Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); 208 if (aggregatedExchange != null) { 209 // copy aggregation result onto original exchange (preserving pattern) 210 copyResultsPreservePattern(exchange, aggregatedExchange); 211 } 212 } catch (Throwable e) { 213 // if the aggregationStrategy threw an exception, set it on the original exchange 214 exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e)); 215 callback.done(false); 216 // we failed so break out now 217 return; 218 } 219 } 220 221 // set property with the uri of the endpoint enriched so we can use that for tracing etc 222 exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); 223 224 // return the producer back to the cache 225 try { 226 producerCache.releaseProducer(endpoint, producer); 227 } catch (Exception e) { 228 // ignore 229 } 230 231 callback.done(false); 232 } 233 }); 234 235 if (!sync) { 236 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); 237 // the remainder of the routing slip will be completed async 238 // so we break out now, then the callback will be invoked which then continue routing from where we left here 239 return false; 240 } 241 242 LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); 243 244 // emit event that the exchange was sent to the endpoint 245 long timeTaken = watch.stop(); 246 EventHelper.notifyExchangeSent(resourceExchange.getContext(), resourceExchange, destination, timeTaken); 247 248 if (!isAggregateOnException() && resourceExchange.isFailed()) { 249 // copy resource exchange onto original exchange (preserving pattern) 250 copyResultsPreservePattern(exchange, resourceExchange); 251 } else { 252 prepareResult(exchange); 253 254 try { 255 // prepare the exchanges for aggregation 256 ExchangeHelper.prepareAggregation(exchange, resourceExchange); 257 258 Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); 259 if (aggregatedExchange != null) { 260 // copy aggregation result onto original exchange (preserving pattern) 261 copyResultsPreservePattern(exchange, aggregatedExchange); 262 } 263 } catch (Throwable e) { 264 // if the aggregationStrategy threw an exception, set it on the original exchange 265 exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e)); 266 callback.done(true); 267 // we failed so break out now 268 return true; 269 } 270 } 271 272 // set property with the uri of the endpoint enriched so we can use that for tracing etc 273 exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); 274 275 // return the producer back to the cache 276 try { 277 producerCache.releaseProducer(endpoint, producer); 278 } catch (Exception e) { 279 // ignore 280 } 281 282 callback.done(true); 283 return true; 284 } 285 286 protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) { 287 // trim strings as end users might have added spaces between separators 288 if (recipient instanceof String) { 289 recipient = ((String)recipient).trim(); 290 } 291 return ExchangeHelper.resolveEndpoint(exchange, recipient); 292 } 293 294 /** 295 * Creates a new {@link DefaultExchange} instance from the given 296 * <code>exchange</code>. The resulting exchange's pattern is defined by 297 * <code>pattern</code>. 298 * 299 * @param source exchange to copy from. 300 * @param pattern exchange pattern to set. 301 * @return created exchange. 302 */ 303 protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) { 304 // copy exchange, and do not share the unit of work 305 Exchange target = ExchangeHelper.createCorrelatedCopy(source, false); 306 target.setPattern(pattern); 307 308 // if we share unit of work, we need to prepare the resource exchange 309 if (isShareUnitOfWork()) { 310 target.setProperty(Exchange.PARENT_UNIT_OF_WORK, source.getUnitOfWork()); 311 // and then share the unit of work 312 target.setUnitOfWork(source.getUnitOfWork()); 313 } 314 return target; 315 } 316 317 private static void prepareResult(Exchange exchange) { 318 if (exchange.getPattern().isOutCapable()) { 319 exchange.getOut().copyFrom(exchange.getIn()); 320 } 321 } 322 323 private static AggregationStrategy defaultAggregationStrategy() { 324 return new CopyAggregationStrategy(); 325 } 326 327 @Override 328 public String toString() { 329 return "Enrich[" + expression + "]"; 330 } 331 332 protected void doStart() throws Exception { 333 if (aggregationStrategy == null) { 334 aggregationStrategy = defaultAggregationStrategy(); 335 } 336 337 if (producerCache == null) { 338 if (cacheSize < 0) { 339 producerCache = new EmptyProducerCache(this, camelContext); 340 LOG.debug("Enricher {} is not using ProducerCache", this); 341 } else if (cacheSize == 0) { 342 producerCache = new ProducerCache(this, camelContext); 343 LOG.debug("Enricher {} using ProducerCache with default cache size", this); 344 } else { 345 producerCache = new ProducerCache(this, camelContext, cacheSize); 346 LOG.debug("Enricher {} using ProducerCache with cacheSize={}", this, cacheSize); 347 } 348 } 349 350 ServiceHelper.startServices(producerCache, aggregationStrategy); 351 } 352 353 protected void doStop() throws Exception { 354 ServiceHelper.stopServices(aggregationStrategy, producerCache); 355 } 356 357 private static class CopyAggregationStrategy implements AggregationStrategy { 358 359 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 360 if (newExchange != null) { 361 copyResultsPreservePattern(oldExchange, newExchange); 362 } 363 return oldExchange; 364 } 365 366 } 367 368}