001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.HashMap; 020import java.util.Map; 021 022import org.apache.camel.CamelContext; 023import org.apache.camel.CamelContextAware; 024import org.apache.camel.Component; 025import org.apache.camel.Consumer; 026import org.apache.camel.Endpoint; 027import org.apache.camel.EndpointConfiguration; 028import org.apache.camel.Exchange; 029import org.apache.camel.ExchangePattern; 030import org.apache.camel.PollingConsumer; 031import org.apache.camel.ResolveEndpointFailedException; 032import org.apache.camel.spi.ExceptionHandler; 033import org.apache.camel.spi.HasId; 034import org.apache.camel.spi.UriParam; 035import org.apache.camel.support.ServiceSupport; 036import org.apache.camel.util.EndpointHelper; 037import org.apache.camel.util.IntrospectionSupport; 038import org.apache.camel.util.ObjectHelper; 039import org.apache.camel.util.URISupport; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * A default endpoint useful for implementation inheritance. 045 * <p/> 046 * Components which leverages <a 047 * href="http://camel.apache.org/asynchronous-routing-engine.html">asynchronous 048 * processing model</a> should check the {@link #isSynchronous()} to determine 049 * if asynchronous processing is allowed. The <tt>synchronous</tt> option on the 050 * endpoint allows Camel end users to dictate whether they want the asynchronous 051 * model or not. The option is default <tt>false</tt> which means asynchronous 052 * processing is allowed. 053 * 054 * @version 055 */ 056public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint, HasId, CamelContextAware { 057 058 private static final Logger LOG = LoggerFactory.getLogger(DefaultEndpoint.class); 059 private final String id = EndpointHelper.createEndpointId(); 060 private transient String endpointUriToString; 061 private String endpointUri; 062 private EndpointConfiguration endpointConfiguration; 063 private CamelContext camelContext; 064 private Component component; 065 @UriParam(label = "consumer", optionalPrefix = "consumer.", description = "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while" 066 + " the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler." 067 + " By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN/ERROR level and ignored.") 068 private boolean bridgeErrorHandler; 069 @UriParam(label = "consumer,advanced", optionalPrefix = "consumer.", description = "To let the consumer use a custom ExceptionHandler." 070 + " Notice if the option bridgeErrorHandler is enabled then this options is not in use." 071 + " By default the consumer will deal with exceptions, that will be logged at WARN/ERROR level and ignored.") 072 private ExceptionHandler exceptionHandler; 073 @UriParam(label = "consumer,advanced", 074 description = "Sets the exchange pattern when the consumer creates an exchange.") 075 // no default value set on @UriParam as the MEP is sometimes InOnly or InOut depending on the component in use 076 private ExchangePattern exchangePattern = ExchangePattern.InOnly; 077 // option to allow end user to dictate whether async processing should be 078 // used or not (if possible) 079 @UriParam(defaultValue = "false", label = "advanced", 080 description = "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).") 081 private boolean synchronous; 082 // these options are not really in use any option related to the consumer has a specific option on the endpoint 083 // and consumerProperties was added from the very start of Camel. 084 private Map<String, Object> consumerProperties; 085 // pooling consumer options only related to EventDrivenPollingConsumer which are very seldom in use 086 // so lets not expose them in the component docs as it will be included in every component 087 private int pollingConsumerQueueSize = 1000; 088 private boolean pollingConsumerBlockWhenFull = true; 089 private long pollingConsumerBlockTimeout; 090 091 /** 092 * Constructs a fully-initialized DefaultEndpoint instance. This is the 093 * preferred method of constructing an object from Java code (as opposed to 094 * Spring beans, etc.). 095 * 096 * @param endpointUri the full URI used to create this endpoint 097 * @param component the component that created this endpoint 098 */ 099 protected DefaultEndpoint(String endpointUri, Component component) { 100 this.camelContext = component == null ? null : component.getCamelContext(); 101 this.component = component; 102 this.setEndpointUri(endpointUri); 103 } 104 105 /** 106 * Constructs a DefaultEndpoint instance which has <b>not</b> been created 107 * using a {@link Component}. 108 * <p/> 109 * <b>Note:</b> It is preferred to create endpoints using the associated 110 * component. 111 * 112 * @param endpointUri the full URI used to create this endpoint 113 * @param camelContext the Camel Context in which this endpoint is operating 114 */ 115 @Deprecated 116 protected DefaultEndpoint(String endpointUri, CamelContext camelContext) { 117 this(endpointUri); 118 this.camelContext = camelContext; 119 } 120 121 /** 122 * Constructs a partially-initialized DefaultEndpoint instance. 123 * <p/> 124 * <b>Note:</b> It is preferred to create endpoints using the associated 125 * component. 126 * 127 * @param endpointUri the full URI used to create this endpoint 128 */ 129 @Deprecated 130 protected DefaultEndpoint(String endpointUri) { 131 this.setEndpointUri(endpointUri); 132 } 133 134 /** 135 * Constructs a partially-initialized DefaultEndpoint instance. Useful when 136 * creating endpoints manually (e.g., as beans in Spring). 137 * <p/> 138 * Please note that the endpoint URI must be set through properties (or 139 * overriding {@link #createEndpointUri()} if one uses this constructor. 140 * <p/> 141 * <b>Note:</b> It is preferred to create endpoints using the associated 142 * component. 143 */ 144 protected DefaultEndpoint() { 145 } 146 147 public int hashCode() { 148 return getEndpointUri().hashCode() * 37 + 1; 149 } 150 151 @Override 152 public boolean equals(Object object) { 153 if (object instanceof DefaultEndpoint) { 154 DefaultEndpoint that = (DefaultEndpoint)object; 155 // must also match the same CamelContext in case we compare endpoints from different contexts 156 String thisContextName = this.getCamelContext() != null ? this.getCamelContext().getName() : null; 157 String thatContextName = that.getCamelContext() != null ? that.getCamelContext().getName() : null; 158 return ObjectHelper.equal(this.getEndpointUri(), that.getEndpointUri()) && ObjectHelper.equal(thisContextName, thatContextName); 159 } 160 return false; 161 } 162 163 @Override 164 public String toString() { 165 if (endpointUriToString == null) { 166 String value = null; 167 try { 168 value = getEndpointUri(); 169 } catch (RuntimeException e) { 170 // ignore any exception and use null for building the string value 171 } 172 // ensure to sanitize uri so we do not show sensitive information such as passwords 173 endpointUriToString = URISupport.sanitizeUri(value); 174 } 175 return endpointUriToString; 176 } 177 178 /** 179 * Returns a unique String ID which can be used for aliasing without having 180 * to use the whole URI which is not unique 181 */ 182 public String getId() { 183 return id; 184 } 185 186 public String getEndpointUri() { 187 if (endpointUri == null) { 188 endpointUri = createEndpointUri(); 189 if (endpointUri == null) { 190 throw new IllegalArgumentException("endpointUri is not specified and " + getClass().getName() 191 + " does not implement createEndpointUri() to create a default value"); 192 } 193 } 194 return endpointUri; 195 } 196 197 public EndpointConfiguration getEndpointConfiguration() { 198 if (endpointConfiguration == null) { 199 endpointConfiguration = createEndpointConfiguration(getEndpointUri()); 200 } 201 return endpointConfiguration; 202 } 203 204 /** 205 * Sets a custom {@link EndpointConfiguration} 206 * 207 * @param endpointConfiguration a custom endpoint configuration to be used. 208 */ 209 public void setEndpointConfiguration(EndpointConfiguration endpointConfiguration) { 210 this.endpointConfiguration = endpointConfiguration; 211 } 212 213 public String getEndpointKey() { 214 if (isLenientProperties()) { 215 // only use the endpoint uri without parameters as the properties is 216 // lenient 217 String uri = getEndpointUri(); 218 if (uri.indexOf('?') != -1) { 219 return ObjectHelper.before(uri, "?"); 220 } else { 221 return uri; 222 } 223 } else { 224 // use the full endpoint uri 225 return getEndpointUri(); 226 } 227 } 228 229 public CamelContext getCamelContext() { 230 return camelContext; 231 } 232 233 /** 234 * Returns the component that created this endpoint. 235 * 236 * @return the component that created this endpoint, or <tt>null</tt> if 237 * none set 238 */ 239 public Component getComponent() { 240 return component; 241 } 242 243 public void setCamelContext(CamelContext camelContext) { 244 this.camelContext = camelContext; 245 } 246 247 public PollingConsumer createPollingConsumer() throws Exception { 248 // should not call configurePollingConsumer when its EventDrivenPollingConsumer 249 if (LOG.isDebugEnabled()) { 250 LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} blockWhenFull: {} blockTimeout: {}", 251 new Object[]{getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull(), getPollingConsumerBlockTimeout()}); 252 } 253 EventDrivenPollingConsumer consumer = new EventDrivenPollingConsumer(this, getPollingConsumerQueueSize()); 254 consumer.setBlockWhenFull(isPollingConsumerBlockWhenFull()); 255 consumer.setBlockTimeout(getPollingConsumerBlockTimeout()); 256 return consumer; 257 } 258 259 public Exchange createExchange(Exchange exchange) { 260 return exchange.copy(); 261 } 262 263 public Exchange createExchange() { 264 return createExchange(getExchangePattern()); 265 } 266 267 public Exchange createExchange(ExchangePattern pattern) { 268 return new DefaultExchange(this, pattern); 269 } 270 271 /** 272 * Returns the default exchange pattern to use when creating an exchange. 273 */ 274 public ExchangePattern getExchangePattern() { 275 return exchangePattern; 276 } 277 278 /** 279 * Sets the default exchange pattern when creating an exchange. 280 */ 281 public void setExchangePattern(ExchangePattern exchangePattern) { 282 this.exchangePattern = exchangePattern; 283 } 284 285 /** 286 * Returns whether synchronous processing should be strictly used. 287 */ 288 public boolean isSynchronous() { 289 return synchronous; 290 } 291 292 /** 293 * Sets whether synchronous processing should be strictly used, or Camel is 294 * allowed to use asynchronous processing (if supported). 295 * 296 * @param synchronous <tt>true</tt> to enforce synchronous processing 297 */ 298 public void setSynchronous(boolean synchronous) { 299 this.synchronous = synchronous; 300 } 301 302 public boolean isBridgeErrorHandler() { 303 return bridgeErrorHandler; 304 } 305 306 /** 307 * Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while 308 * the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and 309 * handled by the routing Error Handler. 310 * <p/> 311 * By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, 312 * that will be logged at WARN/ERROR level and ignored. 313 */ 314 public void setBridgeErrorHandler(boolean bridgeErrorHandler) { 315 this.bridgeErrorHandler = bridgeErrorHandler; 316 } 317 318 public ExceptionHandler getExceptionHandler() { 319 return exceptionHandler; 320 } 321 322 /** 323 * To let the consumer use a custom ExceptionHandler. 324 + Notice if the option bridgeErrorHandler is enabled then this options is not in use. 325 + By default the consumer will deal with exceptions, that will be logged at WARN/ERROR level and ignored. 326 */ 327 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 328 this.exceptionHandler = exceptionHandler; 329 } 330 331 /** 332 * Gets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 333 * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and 334 * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. 335 * <p/> 336 * The default value is <tt>1000</tt> 337 */ 338 public int getPollingConsumerQueueSize() { 339 return pollingConsumerQueueSize; 340 } 341 342 /** 343 * Sets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 344 * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and 345 * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. 346 * <p/> 347 * The default value is <tt>1000</tt> 348 */ 349 public void setPollingConsumerQueueSize(int pollingConsumerQueueSize) { 350 this.pollingConsumerQueueSize = pollingConsumerQueueSize; 351 } 352 353 /** 354 * Whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 355 * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and 356 * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. 357 * <p/> 358 * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown 359 * when trying to add to the queue, and its full. 360 * <p/> 361 * The default value is <tt>true</tt> which will block the producer queue until the queue has space. 362 */ 363 public boolean isPollingConsumerBlockWhenFull() { 364 return pollingConsumerBlockWhenFull; 365 } 366 367 /** 368 * Set whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 369 * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and 370 * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. 371 * <p/> 372 * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown 373 * when trying to add to the queue, and its full. 374 * <p/> 375 * The default value is <tt>true</tt> which will block the producer queue until the queue has space. 376 */ 377 public void setPollingConsumerBlockWhenFull(boolean pollingConsumerBlockWhenFull) { 378 this.pollingConsumerBlockWhenFull = pollingConsumerBlockWhenFull; 379 } 380 381 /** 382 * Sets the timeout in millis to use when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 383 * is being used. 384 * 385 * @see #setPollingConsumerBlockWhenFull(boolean) 386 */ 387 public long getPollingConsumerBlockTimeout() { 388 return pollingConsumerBlockTimeout; 389 } 390 391 /** 392 * Sets the timeout in millis to use when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 393 * is being used. 394 * 395 * @see #setPollingConsumerBlockWhenFull(boolean) 396 */ 397 public void setPollingConsumerBlockTimeout(long pollingConsumerBlockTimeout) { 398 this.pollingConsumerBlockTimeout = pollingConsumerBlockTimeout; 399 } 400 401 public void configureProperties(Map<String, Object> options) { 402 Map<String, Object> consumerProperties = IntrospectionSupport.extractProperties(options, "consumer."); 403 if (consumerProperties != null && !consumerProperties.isEmpty()) { 404 setConsumerProperties(consumerProperties); 405 } 406 } 407 408 /** 409 * Sets the bean properties on the given bean. 410 * <p/> 411 * This is the same logical implementation as {@link DefaultComponent#setProperties(Object, java.util.Map)} 412 * 413 * @param bean the bean 414 * @param parameters properties to set 415 */ 416 protected void setProperties(Object bean, Map<String, Object> parameters) throws Exception { 417 // set reference properties first as they use # syntax that fools the regular properties setter 418 EndpointHelper.setReferenceProperties(getCamelContext(), bean, parameters); 419 EndpointHelper.setProperties(getCamelContext(), bean, parameters); 420 } 421 422 /** 423 * A factory method to lazily create the endpointUri if none is specified 424 */ 425 protected String createEndpointUri() { 426 return null; 427 } 428 429 /** 430 * A factory method to lazily create the endpoint configuration if none is specified 431 */ 432 protected EndpointConfiguration createEndpointConfiguration(String uri) { 433 // using this factory method to be backwards compatible with the old code 434 if (getComponent() != null) { 435 // prefer to use component endpoint configuration 436 try { 437 return getComponent().createConfiguration(uri); 438 } catch (Exception e) { 439 throw ObjectHelper.wrapRuntimeCamelException(e); 440 } 441 } else if (getCamelContext() != null) { 442 // fallback and use a mapped endpoint configuration 443 return new MappedEndpointConfiguration(getCamelContext(), uri); 444 } 445 // not configuration possible 446 return null; 447 } 448 449 /** 450 * Sets the endpointUri if it has not been specified yet via some kind of 451 * dependency injection mechanism. This allows dependency injection 452 * frameworks such as Spring or Guice to set the default endpoint URI in 453 * cases where it has not been explicitly configured using the name/context 454 * in which an Endpoint is created. 455 */ 456 public void setEndpointUriIfNotSpecified(String value) { 457 if (endpointUri == null) { 458 setEndpointUri(value); 459 } 460 } 461 462 /** 463 * Sets the URI that created this endpoint. 464 */ 465 protected void setEndpointUri(String endpointUri) { 466 this.endpointUri = endpointUri; 467 } 468 469 public boolean isLenientProperties() { 470 // default should be false for most components 471 return false; 472 } 473 474 public Map<String, Object> getConsumerProperties() { 475 if (consumerProperties == null) { 476 // must create empty if none exists 477 consumerProperties = new HashMap<String, Object>(); 478 } 479 return consumerProperties; 480 } 481 482 public void setConsumerProperties(Map<String, Object> consumerProperties) { 483 // append consumer properties 484 if (consumerProperties != null && !consumerProperties.isEmpty()) { 485 if (this.consumerProperties == null) { 486 this.consumerProperties = new HashMap<String, Object>(consumerProperties); 487 } else { 488 this.consumerProperties.putAll(consumerProperties); 489 } 490 } 491 } 492 493 protected void configureConsumer(Consumer consumer) throws Exception { 494 if (consumerProperties != null) { 495 // use a defensive copy of the consumer properties as the methods below will remove the used properties 496 // and in case we restart routes, we need access to the original consumer properties again 497 Map<String, Object> copy = new HashMap<String, Object>(consumerProperties); 498 499 // set reference properties first as they use # syntax that fools the regular properties setter 500 EndpointHelper.setReferenceProperties(getCamelContext(), consumer, copy); 501 EndpointHelper.setProperties(getCamelContext(), consumer, copy); 502 503 // special consumer.bridgeErrorHandler option 504 Object bridge = copy.remove("bridgeErrorHandler"); 505 if (bridge != null && "true".equals(bridge)) { 506 if (consumer instanceof DefaultConsumer) { 507 DefaultConsumer defaultConsumer = (DefaultConsumer) consumer; 508 defaultConsumer.setExceptionHandler(new BridgeExceptionHandlerToErrorHandler(defaultConsumer)); 509 } else { 510 throw new IllegalArgumentException("Option consumer.bridgeErrorHandler is only supported by endpoints," 511 + " having their consumer extend DefaultConsumer. The consumer is a " + consumer.getClass().getName() + " class."); 512 } 513 } 514 515 if (!this.isLenientProperties() && copy.size() > 0) { 516 throw new ResolveEndpointFailedException(this.getEndpointUri(), "There are " + copy.size() 517 + " parameters that couldn't be set on the endpoint consumer." 518 + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint." 519 + " Unknown consumer parameters=[" + copy + "]"); 520 } 521 } 522 } 523 524 protected void configurePollingConsumer(PollingConsumer consumer) throws Exception { 525 configureConsumer(consumer); 526 } 527 528 @Override 529 protected void doStart() throws Exception { 530 // the bridgeErrorHandler/exceptionHandler was originally configured with consumer. prefix, such as consumer.bridgeErrorHandler=true 531 // so if they have been configured on the endpoint then map to the old naming style 532 if (bridgeErrorHandler) { 533 getConsumerProperties().put("bridgeErrorHandler", "true"); 534 } 535 if (exceptionHandler != null) { 536 getConsumerProperties().put("exceptionHandler", exceptionHandler); 537 } 538 } 539 540 @Override 541 protected void doStop() throws Exception { 542 // noop 543 } 544}