001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import org.apache.camel.CamelExchangeException; 020 import org.apache.camel.Exchange; 021 import org.apache.camel.PollingConsumer; 022 import org.apache.camel.Processor; 023 import org.apache.camel.processor.aggregate.AggregationStrategy; 024 import org.apache.camel.support.ServiceSupport; 025 import org.apache.camel.util.ExchangeHelper; 026 import org.apache.camel.util.ServiceHelper; 027 import org.slf4j.Logger; 028 import org.slf4j.LoggerFactory; 029 030 import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; 031 032 /** 033 * A content enricher that enriches input data by first obtaining additional 034 * data from a <i>resource</i> represented by an endpoint <code>producer</code> 035 * and second by aggregating input data and additional data. Aggregation of 036 * input data and additional data is delegated to an {@link org.apache.camel.processor.aggregate.AggregationStrategy} 037 * object. 038 * <p/> 039 * Uses a {@link org.apache.camel.PollingConsumer} to obtain the additional data as opposed to {@link Enricher} 040 * that uses a {@link org.apache.camel.Producer}. 041 * 042 * @see Enricher 043 */ 044 public class PollEnricher extends ServiceSupport implements Processor { 045 046 private static final transient Logger LOG = LoggerFactory.getLogger(PollEnricher.class); 047 private AggregationStrategy aggregationStrategy; 048 private PollingConsumer consumer; 049 private long timeout; 050 051 /** 052 * Creates a new {@link PollEnricher}. The default aggregation strategy is to 053 * copy the additional data obtained from the enricher's resource over the 054 * input data. When using the copy aggregation strategy the enricher 055 * degenerates to a normal transformer. 056 * 057 * @param consumer consumer to resource endpoint. 058 */ 059 public PollEnricher(PollingConsumer consumer) { 060 this(defaultAggregationStrategy(), consumer, 0); 061 } 062 063 /** 064 * Creates a new {@link PollEnricher}. 065 * 066 * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. 067 * @param consumer consumer to resource endpoint. 068 * @param timeout timeout in millis 069 */ 070 public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) { 071 this.aggregationStrategy = aggregationStrategy; 072 this.consumer = consumer; 073 this.timeout = timeout; 074 } 075 076 /** 077 * Sets the aggregation strategy for this poll enricher. 078 * 079 * @param aggregationStrategy the aggregationStrategy to set 080 */ 081 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 082 this.aggregationStrategy = aggregationStrategy; 083 } 084 085 /** 086 * Sets the default aggregation strategy for this poll enricher. 087 */ 088 public void setDefaultAggregationStrategy() { 089 this.aggregationStrategy = defaultAggregationStrategy(); 090 } 091 092 /** 093 * Sets the timeout to use when polling. 094 * <p/> 095 * Use 0 or negative to not use timeout and block until data is available. 096 * 097 * @param timeout timeout in millis. 098 */ 099 public void setTimeout(long timeout) { 100 this.timeout = timeout; 101 } 102 103 /** 104 * Enriches the input data (<code>exchange</code>) by first obtaining 105 * additional data from an endpoint represented by an endpoint 106 * <code>producer</code> and second by aggregating input data and additional 107 * data. Aggregation of input data and additional data is delegated to an 108 * {@link org.apache.camel.processor.aggregate.AggregationStrategy} object set at construction time. If the 109 * message exchange with the resource endpoint fails then no aggregation 110 * will be done and the failed exchange content is copied over to the 111 * original message exchange. 112 * 113 * @param exchange input data. 114 */ 115 public void process(Exchange exchange) throws Exception { 116 preCheckPoll(exchange); 117 118 Exchange resourceExchange; 119 if (timeout < 0) { 120 LOG.debug("Consumer receive: {}", consumer); 121 resourceExchange = consumer.receive(); 122 } else if (timeout == 0) { 123 LOG.debug("Consumer receiveNoWait: {}", consumer); 124 resourceExchange = consumer.receiveNoWait(); 125 } else { 126 LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer); 127 resourceExchange = consumer.receive(timeout); 128 } 129 130 if (resourceExchange == null) { 131 LOG.debug("Consumer received no exchange"); 132 } else { 133 LOG.debug("Consumer received: {}", resourceExchange); 134 } 135 136 if (resourceExchange != null && resourceExchange.isFailed()) { 137 // copy resource exchange onto original exchange (preserving pattern) 138 copyResultsPreservePattern(exchange, resourceExchange); 139 } else { 140 prepareResult(exchange); 141 142 // prepare the exchanges for aggregation 143 ExchangeHelper.prepareAggregation(exchange, resourceExchange); 144 // must catch any exception from aggregation 145 Exchange aggregatedExchange; 146 try { 147 aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); 148 } catch (Throwable e) { 149 throw new CamelExchangeException("Error occurred during aggregation", exchange, e); 150 } 151 if (aggregatedExchange != null) { 152 // copy aggregation result onto original exchange (preserving pattern) 153 copyResultsPreservePattern(exchange, aggregatedExchange); 154 // handover any synchronization 155 if (resourceExchange != null) { 156 resourceExchange.handoverCompletions(exchange); 157 } 158 } 159 } 160 161 // set header with the uri of the endpoint enriched so we can use that for tracing etc 162 if (exchange.hasOut()) { 163 exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri()); 164 } else { 165 exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri()); 166 } 167 } 168 169 /** 170 * Strategy to pre check polling. 171 * <p/> 172 * Is currently used to prevent doing poll enrich from a file based endpoint when the current route also 173 * started from a file based endpoint as that is not currently supported. 174 * 175 * @param exchange the current exchange 176 */ 177 protected void preCheckPoll(Exchange exchange) throws Exception { 178 // noop 179 } 180 181 private static void prepareResult(Exchange exchange) { 182 if (exchange.getPattern().isOutCapable()) { 183 exchange.getOut().copyFrom(exchange.getIn()); 184 } 185 } 186 187 private static AggregationStrategy defaultAggregationStrategy() { 188 return new CopyAggregationStrategy(); 189 } 190 191 @Override 192 public String toString() { 193 return "PollEnrich[" + consumer + "]"; 194 } 195 196 protected void doStart() throws Exception { 197 ServiceHelper.startService(consumer); 198 } 199 200 protected void doStop() throws Exception { 201 ServiceHelper.stopService(consumer); 202 } 203 204 private static class CopyAggregationStrategy implements AggregationStrategy { 205 206 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 207 if (newExchange != null) { 208 copyResultsPreservePattern(oldExchange, newExchange); 209 } else { 210 // if no newExchange then there was no message from the external resource 211 // and therefore we should set an empty body to indicate this fact 212 // but keep headers/attachments as we want to propagate those 213 oldExchange.getIn().setBody(null); 214 oldExchange.setOut(null); 215 } 216 return oldExchange; 217 } 218 219 } 220 221 }