001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import javax.xml.bind.annotation.XmlAccessType; 020import javax.xml.bind.annotation.XmlAccessorType; 021import javax.xml.bind.annotation.XmlAttribute; 022import javax.xml.bind.annotation.XmlRootElement; 023import javax.xml.bind.annotation.XmlTransient; 024 025import org.apache.camel.CamelContextAware; 026import org.apache.camel.Expression; 027import org.apache.camel.Processor; 028import org.apache.camel.model.language.ExpressionDefinition; 029import org.apache.camel.processor.PollEnricher; 030import org.apache.camel.processor.aggregate.AggregationStrategy; 031import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; 032import org.apache.camel.spi.Metadata; 033import org.apache.camel.spi.RouteContext; 034 035/** 036 * Enriches messages with data polled from a secondary resource 037 * 038 * @see org.apache.camel.processor.Enricher 039 */ 040@Metadata(label = "eip,transformation") 041@XmlRootElement(name = "pollEnrich") 042@XmlAccessorType(XmlAccessType.FIELD) 043public class PollEnrichDefinition extends NoOutputExpressionNode { 044 @XmlAttribute @Metadata(defaultValue = "-1") 045 private Long timeout; 046 @XmlAttribute(name = "strategyRef") 047 private String aggregationStrategyRef; 048 @XmlAttribute(name = "strategyMethodName") 049 private String aggregationStrategyMethodName; 050 @XmlAttribute(name = "strategyMethodAllowNull") 051 private Boolean aggregationStrategyMethodAllowNull; 052 @XmlAttribute 053 private Boolean aggregateOnException; 054 @XmlTransient 055 private AggregationStrategy aggregationStrategy; 056 @XmlAttribute 057 private Integer cacheSize; 058 @XmlAttribute 059 private Boolean ignoreInvalidEndpoint; 060 061 public PollEnrichDefinition() { 062 } 063 064 public PollEnrichDefinition(AggregationStrategy aggregationStrategy, long timeout) { 065 this.aggregationStrategy = aggregationStrategy; 066 this.timeout = timeout; 067 } 068 069 @Override 070 public String toString() { 071 return "PollEnrich[" + getExpression() + "]"; 072 } 073 074 @Override 075 public String getLabel() { 076 return "pollEnrich[" + getExpression() + "]"; 077 } 078 079 @Override 080 public Processor createProcessor(RouteContext routeContext) throws Exception { 081 082 // if no timeout then we should block, and there use a negative timeout 083 long time = timeout != null ? timeout : -1; 084 boolean isIgnoreInvalidEndpoint = getIgnoreInvalidEndpoint() != null && getIgnoreInvalidEndpoint(); 085 Expression exp = getExpression().createExpression(routeContext); 086 087 PollEnricher enricher = new PollEnricher(exp, time); 088 089 AggregationStrategy strategy = createAggregationStrategy(routeContext); 090 if (strategy == null) { 091 enricher.setDefaultAggregationStrategy(); 092 } else { 093 enricher.setAggregationStrategy(strategy); 094 } 095 if (getAggregateOnException() != null) { 096 enricher.setAggregateOnException(getAggregateOnException()); 097 } 098 if (getCacheSize() != null) { 099 enricher.setCacheSize(getCacheSize()); 100 } 101 enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint); 102 103 return enricher; 104 } 105 106 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 107 AggregationStrategy strategy = getAggregationStrategy(); 108 if (strategy == null && aggregationStrategyRef != null) { 109 Object aggStrategy = routeContext.lookup(aggregationStrategyRef, Object.class); 110 if (aggStrategy instanceof AggregationStrategy) { 111 strategy = (AggregationStrategy) aggStrategy; 112 } else if (aggStrategy != null) { 113 AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName()); 114 if (getAggregationStrategyMethodAllowNull() != null) { 115 adapter.setAllowNullNewExchange(getAggregationStrategyMethodAllowNull()); 116 adapter.setAllowNullOldExchange(getAggregationStrategyMethodAllowNull()); 117 } 118 strategy = adapter; 119 } else { 120 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + aggregationStrategyRef); 121 } 122 } 123 124 if (strategy instanceof CamelContextAware) { 125 ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); 126 } 127 128 return strategy; 129 } 130 131 // Fluent API 132 // ------------------------------------------------------------------------- 133 134 /** 135 * Timeout in millis when polling from the external service. 136 * <p/> 137 * The timeout has influence about the poll enrich behavior. It basically operations in three different modes: 138 * <ul> 139 * <li>negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available.</li> 140 * <li>0 - Attempts to receive a message exchange immediately without waiting and returning <tt>null</tt> if a message exchange is not available yet.</li> 141 * <li>positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns <tt>null</tt> if timed out</li> 142 * </ul> 143 * The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value 144 */ 145 public PollEnrichDefinition timeout(long timeout) { 146 setTimeout(timeout); 147 return this; 148 } 149 150 /** 151 * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. 152 * By default Camel will use the reply from the external service as outgoing message. 153 */ 154 public PollEnrichDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 155 setAggregationStrategy(aggregationStrategy); 156 return this; 157 } 158 159 /** 160 * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. 161 * By default Camel will use the reply from the external service as outgoing message. 162 */ 163 public PollEnrichDefinition aggregationStrategyRef(String aggregationStrategyRef) { 164 setAggregationStrategyRef(aggregationStrategyRef); 165 return this; 166 } 167 168 /** 169 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 170 */ 171 public PollEnrichDefinition aggregationStrategyMethodName(String aggregationStrategyMethodName) { 172 setAggregationStrategyMethodName(aggregationStrategyMethodName); 173 return this; 174 } 175 176 /** 177 * If this option is false then the aggregate method is not used if there was no data to enrich. 178 * If this option is true then null values is used as the oldExchange (when no data to enrich), 179 * when using POJOs as the AggregationStrategy. 180 */ 181 public PollEnrichDefinition aggregationStrategyMethodAllowNull(boolean aggregationStrategyMethodAllowNull) { 182 setAggregationStrategyMethodAllowNull(aggregationStrategyMethodAllowNull); 183 return this; 184 } 185 186 /** 187 * If this option is false then the aggregate method is not used if there was an exception thrown while trying 188 * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what 189 * to do if there was an exception in the aggregate method. For example to suppress the exception 190 * or set a custom message body etc. 191 */ 192 public PollEnrichDefinition aggregateOnException(boolean aggregateOnException) { 193 setAggregateOnException(aggregateOnException); 194 return this; 195 } 196 197 /** 198 * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used 199 * to cache and reuse consumers when uris are reused. 200 * 201 * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off. 202 * @return the builder 203 */ 204 public PollEnrichDefinition cacheSize(int cacheSize) { 205 setCacheSize(cacheSize); 206 return this; 207 } 208 209 /** 210 * Ignore the invalidate endpoint exception when try to create a producer with that endpoint 211 * 212 * @return the builder 213 */ 214 public PollEnrichDefinition ignoreInvalidEndpoint() { 215 setIgnoreInvalidEndpoint(true); 216 return this; 217 } 218 219 // Properties 220 // ------------------------------------------------------------------------- 221 222 /** 223 * Expression that computes the endpoint uri to use as the resource endpoint to enrich from 224 */ 225 @Override 226 public void setExpression(ExpressionDefinition expression) { 227 // override to include javadoc what the expression is used for 228 super.setExpression(expression); 229 } 230 231 public Long getTimeout() { 232 return timeout; 233 } 234 235 public void setTimeout(Long timeout) { 236 this.timeout = timeout; 237 } 238 239 public String getAggregationStrategyRef() { 240 return aggregationStrategyRef; 241 } 242 243 public void setAggregationStrategyRef(String aggregationStrategyRef) { 244 this.aggregationStrategyRef = aggregationStrategyRef; 245 } 246 247 public String getAggregationStrategyMethodName() { 248 return aggregationStrategyMethodName; 249 } 250 251 public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) { 252 this.aggregationStrategyMethodName = aggregationStrategyMethodName; 253 } 254 255 public Boolean getAggregationStrategyMethodAllowNull() { 256 return aggregationStrategyMethodAllowNull; 257 } 258 259 public void setAggregationStrategyMethodAllowNull(Boolean aggregationStrategyMethodAllowNull) { 260 this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull; 261 } 262 263 public AggregationStrategy getAggregationStrategy() { 264 return aggregationStrategy; 265 } 266 267 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 268 this.aggregationStrategy = aggregationStrategy; 269 } 270 271 public Boolean getAggregateOnException() { 272 return aggregateOnException; 273 } 274 275 public void setAggregateOnException(Boolean aggregateOnException) { 276 this.aggregateOnException = aggregateOnException; 277 } 278 279 public Integer getCacheSize() { 280 return cacheSize; 281 } 282 283 public void setCacheSize(Integer cacheSize) { 284 this.cacheSize = cacheSize; 285 } 286 287 public Boolean getIgnoreInvalidEndpoint() { 288 return ignoreInvalidEndpoint; 289 } 290 291 public void setIgnoreInvalidEndpoint(Boolean ignoreInvalidEndpoint) { 292 this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; 293 } 294}