001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import org.apache.camel.AsyncCallback; 020import org.apache.camel.AsyncProcessor; 021import org.apache.camel.CamelContext; 022import org.apache.camel.CamelContextAware; 023import org.apache.camel.CamelExchangeException; 024import org.apache.camel.Consumer; 025import org.apache.camel.Endpoint; 026import org.apache.camel.Exchange; 027import org.apache.camel.Expression; 028import org.apache.camel.PollingConsumer; 029import org.apache.camel.impl.BridgeExceptionHandlerToErrorHandler; 030import org.apache.camel.impl.ConsumerCache; 031import org.apache.camel.impl.DefaultConsumer; 032import org.apache.camel.impl.EmptyConsumerCache; 033import org.apache.camel.impl.EventDrivenPollingConsumer; 034import org.apache.camel.processor.aggregate.AggregationStrategy; 035import org.apache.camel.spi.EndpointUtilizationStatistics; 036import org.apache.camel.spi.ExceptionHandler; 037import org.apache.camel.spi.IdAware; 038import org.apache.camel.support.ServiceSupport; 039import org.apache.camel.util.AsyncProcessorHelper; 040import org.apache.camel.util.ExchangeHelper; 041import org.apache.camel.util.ServiceHelper; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; 046 047/** 048 * A content enricher that enriches input data by first obtaining additional 049 * data from a <i>resource</i> represented by an endpoint <code>producer</code> 050 * and second by aggregating input data and additional data. Aggregation of 051 * input data and additional data is delegated to an {@link org.apache.camel.processor.aggregate.AggregationStrategy} 052 * object. 053 * <p/> 054 * Uses a {@link org.apache.camel.PollingConsumer} to obtain the additional data as opposed to {@link Enricher} 055 * that uses a {@link org.apache.camel.Producer}. 056 * 057 * @see Enricher 058 */ 059public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware { 060 061 private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class); 062 private CamelContext camelContext; 063 private ConsumerCache consumerCache; 064 private String id; 065 private AggregationStrategy aggregationStrategy; 066 private final Expression expression; 067 private long timeout; 068 private boolean aggregateOnException; 069 private int cacheSize; 070 private boolean ignoreInvalidEndpoint; 071 072 /** 073 * Creates a new {@link PollEnricher}. 074 * 075 * @param expression expression to use to compute the endpoint to poll from. 076 * @param timeout timeout in millis 077 */ 078 public PollEnricher(Expression expression, long timeout) { 079 this.expression = expression; 080 this.timeout = timeout; 081 } 082 083 public CamelContext getCamelContext() { 084 return camelContext; 085 } 086 087 public void setCamelContext(CamelContext camelContext) { 088 this.camelContext = camelContext; 089 } 090 091 public String getId() { 092 return id; 093 } 094 095 public void setId(String id) { 096 this.id = id; 097 } 098 099 public Expression getExpression() { 100 return expression; 101 } 102 103 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 104 return consumerCache.getEndpointUtilizationStatistics(); 105 } 106 107 public AggregationStrategy getAggregationStrategy() { 108 return aggregationStrategy; 109 } 110 111 /** 112 * Sets the aggregation strategy for this poll enricher. 113 * 114 * @param aggregationStrategy the aggregationStrategy to set 115 */ 116 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 117 this.aggregationStrategy = aggregationStrategy; 118 } 119 120 public long getTimeout() { 121 return timeout; 122 } 123 124 /** 125 * Sets the timeout to use when polling. 126 * <p/> 127 * Use 0 to use receiveNoWait, 128 * Use -1 to use receive with no timeout (which will block until data is available). 129 * 130 * @param timeout timeout in millis. 131 */ 132 public void setTimeout(long timeout) { 133 this.timeout = timeout; 134 } 135 136 public boolean isAggregateOnException() { 137 return aggregateOnException; 138 } 139 140 public void setAggregateOnException(boolean aggregateOnException) { 141 this.aggregateOnException = aggregateOnException; 142 } 143 144 /** 145 * Sets the default aggregation strategy for this poll enricher. 146 */ 147 public void setDefaultAggregationStrategy() { 148 this.aggregationStrategy = defaultAggregationStrategy(); 149 } 150 151 public int getCacheSize() { 152 return cacheSize; 153 } 154 155 public void setCacheSize(int cacheSize) { 156 this.cacheSize = cacheSize; 157 } 158 159 public boolean isIgnoreInvalidEndpoint() { 160 return ignoreInvalidEndpoint; 161 } 162 163 public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) { 164 this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; 165 } 166 167 public void process(Exchange exchange) throws Exception { 168 AsyncProcessorHelper.process(this, exchange); 169 } 170 171 /** 172 * Enriches the input data (<code>exchange</code>) by first obtaining 173 * additional data from an endpoint represented by an endpoint 174 * <code>producer</code> and second by aggregating input data and additional 175 * data. Aggregation of input data and additional data is delegated to an 176 * {@link org.apache.camel.processor.aggregate.AggregationStrategy} object set at construction time. If the 177 * message exchange with the resource endpoint fails then no aggregation 178 * will be done and the failed exchange content is copied over to the 179 * original message exchange. 180 * 181 * @param exchange input data. 182 */ 183 @Override 184 public boolean process(Exchange exchange, AsyncCallback callback) { 185 try { 186 preCheckPoll(exchange); 187 } catch (Exception e) { 188 exchange.setException(new CamelExchangeException("Error during pre poll check", exchange, e)); 189 callback.done(true); 190 return true; 191 } 192 193 // which consumer to use 194 PollingConsumer consumer; 195 Endpoint endpoint; 196 197 // use dynamic endpoint so calculate the endpoint to use 198 Object recipient = null; 199 try { 200 recipient = expression.evaluate(exchange, Object.class); 201 endpoint = resolveEndpoint(exchange, recipient); 202 // acquire the consumer from the cache 203 consumer = consumerCache.acquirePollingConsumer(endpoint); 204 } catch (Throwable e) { 205 if (isIgnoreInvalidEndpoint()) { 206 if (LOG.isDebugEnabled()) { 207 LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e); 208 } 209 } else { 210 exchange.setException(e); 211 } 212 callback.done(true); 213 return true; 214 } 215 216 // grab the real delegate consumer that performs the actual polling 217 Consumer delegate = consumer; 218 if (consumer instanceof EventDrivenPollingConsumer) { 219 delegate = ((EventDrivenPollingConsumer) consumer).getDelegateConsumer(); 220 } 221 222 // is the consumer bridging the error handler? 223 boolean bridgeErrorHandler = false; 224 if (delegate instanceof DefaultConsumer) { 225 ExceptionHandler handler = ((DefaultConsumer) delegate).getExceptionHandler(); 226 if (handler != null && handler instanceof BridgeExceptionHandlerToErrorHandler) { 227 bridgeErrorHandler = true; 228 } 229 } 230 231 Exchange resourceExchange; 232 try { 233 if (timeout < 0) { 234 LOG.debug("Consumer receive: {}", consumer); 235 resourceExchange = consumer.receive(); 236 } else if (timeout == 0) { 237 LOG.debug("Consumer receiveNoWait: {}", consumer); 238 resourceExchange = consumer.receiveNoWait(); 239 } else { 240 LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer); 241 resourceExchange = consumer.receive(timeout); 242 } 243 244 if (resourceExchange == null) { 245 LOG.debug("Consumer received no exchange"); 246 } else { 247 LOG.debug("Consumer received: {}", resourceExchange); 248 } 249 } catch (Exception e) { 250 exchange.setException(new CamelExchangeException("Error during poll", exchange, e)); 251 callback.done(true); 252 return true; 253 } finally { 254 // return the consumer back to the cache 255 consumerCache.releasePollingConsumer(endpoint, consumer); 256 } 257 258 // remember current redelivery stats 259 Object redeliveried = exchange.getIn().getHeader(Exchange.REDELIVERED); 260 Object redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER); 261 Object redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER); 262 263 // if we are bridging error handler and failed then remember the caused exception 264 Throwable cause = null; 265 if (resourceExchange != null && bridgeErrorHandler) { 266 cause = resourceExchange.getException(); 267 } 268 269 try { 270 if (!isAggregateOnException() && (resourceExchange != null && resourceExchange.isFailed())) { 271 // copy resource exchange onto original exchange (preserving pattern) 272 // and preserve redelivery headers 273 copyResultsPreservePattern(exchange, resourceExchange); 274 } else { 275 prepareResult(exchange); 276 277 // prepare the exchanges for aggregation 278 ExchangeHelper.prepareAggregation(exchange, resourceExchange); 279 // must catch any exception from aggregation 280 Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); 281 if (aggregatedExchange != null) { 282 // copy aggregation result onto original exchange (preserving pattern) 283 copyResultsPreservePattern(exchange, aggregatedExchange); 284 // handover any synchronization 285 if (resourceExchange != null) { 286 resourceExchange.handoverCompletions(exchange); 287 } 288 } 289 } 290 291 // if we failed then restore caused exception 292 if (cause != null) { 293 // restore caused exception 294 exchange.setException(cause); 295 // remove the exhausted marker as we want to be able to perform redeliveries with the error handler 296 exchange.removeProperties(Exchange.REDELIVERY_EXHAUSTED); 297 298 // preserve the redelivery stats 299 if (redeliveried != null) { 300 if (exchange.hasOut()) { 301 exchange.getOut().setHeader(Exchange.REDELIVERED, redeliveried); 302 } else { 303 exchange.getIn().setHeader(Exchange.REDELIVERED, redeliveried); 304 } 305 } 306 if (redeliveryCounter != null) { 307 if (exchange.hasOut()) { 308 exchange.getOut().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter); 309 } else { 310 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter); 311 } 312 } 313 if (redeliveryMaxCounter != null) { 314 if (exchange.hasOut()) { 315 exchange.getOut().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter); 316 } else { 317 exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter); 318 } 319 } 320 } 321 322 // set header with the uri of the endpoint enriched so we can use that for tracing etc 323 if (exchange.hasOut()) { 324 exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri()); 325 } else { 326 exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri()); 327 } 328 } catch (Throwable e) { 329 exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e)); 330 callback.done(true); 331 return true; 332 } 333 334 callback.done(true); 335 return true; 336 } 337 338 protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) { 339 // trim strings as end users might have added spaces between separators 340 if (recipient instanceof String) { 341 recipient = ((String)recipient).trim(); 342 } 343 return ExchangeHelper.resolveEndpoint(exchange, recipient); 344 } 345 346 /** 347 * Strategy to pre check polling. 348 * <p/> 349 * Is currently used to prevent doing poll enrich from a file based endpoint when the current route also 350 * started from a file based endpoint as that is not currently supported. 351 * 352 * @param exchange the current exchange 353 */ 354 protected void preCheckPoll(Exchange exchange) throws Exception { 355 // noop 356 } 357 358 private static void prepareResult(Exchange exchange) { 359 if (exchange.getPattern().isOutCapable()) { 360 exchange.getOut().copyFrom(exchange.getIn()); 361 } 362 } 363 364 private static AggregationStrategy defaultAggregationStrategy() { 365 return new CopyAggregationStrategy(); 366 } 367 368 @Override 369 public String toString() { 370 return "PollEnrich[" + expression + "]"; 371 } 372 373 protected void doStart() throws Exception { 374 if (consumerCache == null) { 375 // create consumer cache if we use dynamic expressions for computing the endpoints to poll 376 if (cacheSize < 0) { 377 consumerCache = new EmptyConsumerCache(this, camelContext); 378 LOG.debug("PollEnrich {} is not using ConsumerCache", this); 379 } else if (cacheSize == 0) { 380 consumerCache = new ConsumerCache(this, camelContext); 381 LOG.debug("PollEnrich {} using ConsumerCache with default cache size", this); 382 } else { 383 consumerCache = new ConsumerCache(this, camelContext, cacheSize); 384 LOG.debug("PollEnrich {} using ConsumerCache with cacheSize={}", this, cacheSize); 385 } 386 } 387 ServiceHelper.startServices(consumerCache, aggregationStrategy); 388 } 389 390 protected void doStop() throws Exception { 391 ServiceHelper.stopServices(aggregationStrategy, consumerCache); 392 } 393 394 protected void doShutdown() throws Exception { 395 ServiceHelper.stopAndShutdownServices(aggregationStrategy, consumerCache); 396 } 397 398 private static class CopyAggregationStrategy implements AggregationStrategy { 399 400 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 401 if (newExchange != null) { 402 copyResultsPreservePattern(oldExchange, newExchange); 403 } else { 404 // if no newExchange then there was no message from the external resource 405 // and therefore we should set an empty body to indicate this fact 406 // but keep headers/attachments as we want to propagate those 407 oldExchange.getIn().setBody(null); 408 oldExchange.setOut(null); 409 } 410 return oldExchange; 411 } 412 413 } 414 415}