001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import org.apache.camel.AsyncCallback; 020import org.apache.camel.AsyncProcessor; 021import org.apache.camel.CamelContext; 022import org.apache.camel.CamelContextAware; 023import org.apache.camel.CamelExchangeException; 024import org.apache.camel.Endpoint; 025import org.apache.camel.Exchange; 026import org.apache.camel.Expression; 027import org.apache.camel.PollingConsumer; 028import org.apache.camel.impl.ConsumerCache; 029import org.apache.camel.impl.EmptyConsumerCache; 030import org.apache.camel.processor.aggregate.AggregationStrategy; 031import org.apache.camel.spi.EndpointUtilizationStatistics; 032import org.apache.camel.spi.IdAware; 033import org.apache.camel.support.ServiceSupport; 034import org.apache.camel.util.AsyncProcessorHelper; 035import org.apache.camel.util.ExchangeHelper; 036import org.apache.camel.util.ServiceHelper; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; 041 042/** 043 * A content enricher that enriches input data by first obtaining additional 044 * data from a <i>resource</i> represented by an endpoint <code>producer</code> 045 * and second by aggregating input data and additional data. Aggregation of 046 * input data and additional data is delegated to an {@link org.apache.camel.processor.aggregate.AggregationStrategy} 047 * object. 048 * <p/> 049 * Uses a {@link org.apache.camel.PollingConsumer} to obtain the additional data as opposed to {@link Enricher} 050 * that uses a {@link org.apache.camel.Producer}. 051 * 052 * @see Enricher 053 */ 054public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware { 055 056 private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class); 057 private CamelContext camelContext; 058 private ConsumerCache consumerCache; 059 private String id; 060 private AggregationStrategy aggregationStrategy; 061 private final Expression expression; 062 private long timeout; 063 private boolean aggregateOnException; 064 private int cacheSize; 065 private boolean ignoreInvalidEndpoint; 066 067 /** 068 * Creates a new {@link PollEnricher}. 069 * 070 * @param expression expression to use to compute the endpoint to poll from. 071 * @param timeout timeout in millis 072 */ 073 public PollEnricher(Expression expression, long timeout) { 074 this.expression = expression; 075 this.timeout = timeout; 076 } 077 078 public CamelContext getCamelContext() { 079 return camelContext; 080 } 081 082 public void setCamelContext(CamelContext camelContext) { 083 this.camelContext = camelContext; 084 } 085 086 public String getId() { 087 return id; 088 } 089 090 public void setId(String id) { 091 this.id = id; 092 } 093 094 public Expression getExpression() { 095 return expression; 096 } 097 098 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 099 return consumerCache.getEndpointUtilizationStatistics(); 100 } 101 102 public AggregationStrategy getAggregationStrategy() { 103 return aggregationStrategy; 104 } 105 106 /** 107 * Sets the aggregation strategy for this poll enricher. 108 * 109 * @param aggregationStrategy the aggregationStrategy to set 110 */ 111 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 112 this.aggregationStrategy = aggregationStrategy; 113 } 114 115 public long getTimeout() { 116 return timeout; 117 } 118 119 /** 120 * Sets the timeout to use when polling. 121 * <p/> 122 * Use 0 to use receiveNoWait, 123 * Use -1 to use receive with no timeout (which will block until data is available). 124 * 125 * @param timeout timeout in millis. 126 */ 127 public void setTimeout(long timeout) { 128 this.timeout = timeout; 129 } 130 131 public boolean isAggregateOnException() { 132 return aggregateOnException; 133 } 134 135 public void setAggregateOnException(boolean aggregateOnException) { 136 this.aggregateOnException = aggregateOnException; 137 } 138 139 /** 140 * Sets the default aggregation strategy for this poll enricher. 141 */ 142 public void setDefaultAggregationStrategy() { 143 this.aggregationStrategy = defaultAggregationStrategy(); 144 } 145 146 public int getCacheSize() { 147 return cacheSize; 148 } 149 150 public void setCacheSize(int cacheSize) { 151 this.cacheSize = cacheSize; 152 } 153 154 public boolean isIgnoreInvalidEndpoint() { 155 return ignoreInvalidEndpoint; 156 } 157 158 public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) { 159 this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; 160 } 161 162 public void process(Exchange exchange) throws Exception { 163 AsyncProcessorHelper.process(this, exchange); 164 } 165 166 /** 167 * Enriches the input data (<code>exchange</code>) by first obtaining 168 * additional data from an endpoint represented by an endpoint 169 * <code>producer</code> and second by aggregating input data and additional 170 * data. Aggregation of input data and additional data is delegated to an 171 * {@link org.apache.camel.processor.aggregate.AggregationStrategy} object set at construction time. If the 172 * message exchange with the resource endpoint fails then no aggregation 173 * will be done and the failed exchange content is copied over to the 174 * original message exchange. 175 * 176 * @param exchange input data. 177 */ 178 @Override 179 public boolean process(Exchange exchange, AsyncCallback callback) { 180 try { 181 preCheckPoll(exchange); 182 } catch (Exception e) { 183 exchange.setException(new CamelExchangeException("Error during pre poll check", exchange, e)); 184 callback.done(true); 185 return true; 186 } 187 188 // which consumer to use 189 PollingConsumer consumer; 190 Endpoint endpoint; 191 192 // use dynamic endpoint so calculate the endpoint to use 193 Object recipient = null; 194 try { 195 recipient = expression.evaluate(exchange, Object.class); 196 endpoint = resolveEndpoint(exchange, recipient); 197 // acquire the consumer from the cache 198 consumer = consumerCache.acquirePollingConsumer(endpoint); 199 } catch (Throwable e) { 200 if (isIgnoreInvalidEndpoint()) { 201 if (LOG.isDebugEnabled()) { 202 LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e); 203 } 204 } else { 205 exchange.setException(e); 206 } 207 callback.done(true); 208 return true; 209 } 210 211 Exchange resourceExchange; 212 try { 213 if (timeout < 0) { 214 LOG.debug("Consumer receive: {}", consumer); 215 resourceExchange = consumer.receive(); 216 } else if (timeout == 0) { 217 LOG.debug("Consumer receiveNoWait: {}", consumer); 218 resourceExchange = consumer.receiveNoWait(); 219 } else { 220 LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer); 221 resourceExchange = consumer.receive(timeout); 222 } 223 224 if (resourceExchange == null) { 225 LOG.debug("Consumer received no exchange"); 226 } else { 227 LOG.debug("Consumer received: {}", resourceExchange); 228 } 229 } catch (Exception e) { 230 exchange.setException(new CamelExchangeException("Error during poll", exchange, e)); 231 callback.done(true); 232 return true; 233 } finally { 234 // return the consumer back to the cache 235 consumerCache.releasePollingConsumer(endpoint, consumer); 236 } 237 238 try { 239 if (!isAggregateOnException() && (resourceExchange != null && resourceExchange.isFailed())) { 240 // copy resource exchange onto original exchange (preserving pattern) 241 copyResultsPreservePattern(exchange, resourceExchange); 242 } else { 243 prepareResult(exchange); 244 245 // prepare the exchanges for aggregation 246 ExchangeHelper.prepareAggregation(exchange, resourceExchange); 247 // must catch any exception from aggregation 248 Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); 249 if (aggregatedExchange != null) { 250 // copy aggregation result onto original exchange (preserving pattern) 251 copyResultsPreservePattern(exchange, aggregatedExchange); 252 // handover any synchronization 253 if (resourceExchange != null) { 254 resourceExchange.handoverCompletions(exchange); 255 } 256 } 257 } 258 259 // set header with the uri of the endpoint enriched so we can use that for tracing etc 260 if (exchange.hasOut()) { 261 exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri()); 262 } else { 263 exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri()); 264 } 265 } catch (Throwable e) { 266 exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e)); 267 callback.done(true); 268 return true; 269 } 270 271 callback.done(true); 272 return true; 273 } 274 275 protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) { 276 // trim strings as end users might have added spaces between separators 277 if (recipient instanceof String) { 278 recipient = ((String)recipient).trim(); 279 } 280 return ExchangeHelper.resolveEndpoint(exchange, recipient); 281 } 282 283 /** 284 * Strategy to pre check polling. 285 * <p/> 286 * Is currently used to prevent doing poll enrich from a file based endpoint when the current route also 287 * started from a file based endpoint as that is not currently supported. 288 * 289 * @param exchange the current exchange 290 */ 291 protected void preCheckPoll(Exchange exchange) throws Exception { 292 // noop 293 } 294 295 private static void prepareResult(Exchange exchange) { 296 if (exchange.getPattern().isOutCapable()) { 297 exchange.getOut().copyFrom(exchange.getIn()); 298 } 299 } 300 301 private static AggregationStrategy defaultAggregationStrategy() { 302 return new CopyAggregationStrategy(); 303 } 304 305 @Override 306 public String toString() { 307 return "PollEnrich[" + expression + "]"; 308 } 309 310 protected void doStart() throws Exception { 311 if (consumerCache == null) { 312 // create consumer cache if we use dynamic expressions for computing the endpoints to poll 313 if (cacheSize < 0) { 314 consumerCache = new EmptyConsumerCache(this, camelContext); 315 LOG.debug("PollEnrich {} is not using ConsumerCache", this); 316 } else if (cacheSize == 0) { 317 consumerCache = new ConsumerCache(this, camelContext); 318 LOG.debug("PollEnrich {} using ConsumerCache with default cache size", this); 319 } else { 320 consumerCache = new ConsumerCache(this, camelContext, cacheSize); 321 LOG.debug("PollEnrich {} using ConsumerCache with cacheSize={}", this, cacheSize); 322 } 323 } 324 ServiceHelper.startServices(consumerCache, aggregationStrategy); 325 } 326 327 protected void doStop() throws Exception { 328 ServiceHelper.stopServices(aggregationStrategy, consumerCache); 329 } 330 331 protected void doShutdown() throws Exception { 332 ServiceHelper.stopAndShutdownServices(aggregationStrategy, consumerCache); 333 } 334 335 private static class CopyAggregationStrategy implements AggregationStrategy { 336 337 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 338 if (newExchange != null) { 339 copyResultsPreservePattern(oldExchange, newExchange); 340 } else { 341 // if no newExchange then there was no message from the external resource 342 // and therefore we should set an empty body to indicate this fact 343 // but keep headers/attachments as we want to propagate those 344 oldExchange.getIn().setBody(null); 345 oldExchange.setOut(null); 346 } 347 return oldExchange; 348 } 349 350 } 351 352}