001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.HashMap; 020import java.util.Map; 021 022import org.apache.camel.CamelContext; 023import org.apache.camel.CamelContextAware; 024import org.apache.camel.Component; 025import org.apache.camel.Consumer; 026import org.apache.camel.Endpoint; 027import org.apache.camel.EndpointConfiguration; 028import org.apache.camel.Exchange; 029import org.apache.camel.ExchangePattern; 030import org.apache.camel.PollingConsumer; 031import org.apache.camel.ResolveEndpointFailedException; 032import org.apache.camel.spi.ExceptionHandler; 033import org.apache.camel.spi.HasId; 034import org.apache.camel.spi.UriParam; 035import org.apache.camel.support.ServiceSupport; 036import org.apache.camel.util.EndpointHelper; 037import org.apache.camel.util.IntrospectionSupport; 038import org.apache.camel.util.ObjectHelper; 039import org.apache.camel.util.StringHelper; 040import org.apache.camel.util.URISupport; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * A default endpoint useful for implementation inheritance. 046 * <p/> 047 * Components which leverages <a 048 * href="http://camel.apache.org/asynchronous-routing-engine.html">asynchronous 049 * processing model</a> should check the {@link #isSynchronous()} to determine 050 * if asynchronous processing is allowed. The <tt>synchronous</tt> option on the 051 * endpoint allows Camel end users to dictate whether they want the asynchronous 052 * model or not. The option is default <tt>false</tt> which means asynchronous 053 * processing is allowed. 054 * 055 * @version 056 */ 057public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint, HasId, CamelContextAware { 058 059 private static final Logger LOG = LoggerFactory.getLogger(DefaultEndpoint.class); 060 private final String id = EndpointHelper.createEndpointId(); 061 private transient String endpointUriToString; 062 private String endpointUri; 063 private EndpointConfiguration endpointConfiguration; 064 private CamelContext camelContext; 065 private Component component; 066 @UriParam(label = "consumer", optionalPrefix = "consumer.", description = "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while" 067 + " the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler." 068 + " By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.") 069 private boolean bridgeErrorHandler; 070 @UriParam(label = "consumer,advanced", optionalPrefix = "consumer.", description = "To let the consumer use a custom ExceptionHandler." 071 + " Notice if the option bridgeErrorHandler is enabled then this option is not in use." 072 + " By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored.") 073 private ExceptionHandler exceptionHandler; 074 @UriParam(label = "consumer,advanced", 075 description = "Sets the exchange pattern when the consumer creates an exchange.") 076 // no default value set on @UriParam as the MEP is sometimes InOnly or InOut depending on the component in use 077 private ExchangePattern exchangePattern = ExchangePattern.InOnly; 078 // option to allow end user to dictate whether async processing should be 079 // used or not (if possible) 080 @UriParam(defaultValue = "false", label = "advanced", 081 description = "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).") 082 private boolean synchronous; 083 // these options are not really in use any option related to the consumer has a specific option on the endpoint 084 // and consumerProperties was added from the very start of Camel. 085 private Map<String, Object> consumerProperties; 086 // pooling consumer options only related to EventDrivenPollingConsumer which are very seldom in use 087 // so lets not expose them in the component docs as it will be included in every component 088 private int pollingConsumerQueueSize = 1000; 089 private boolean pollingConsumerBlockWhenFull = true; 090 private long pollingConsumerBlockTimeout; 091 092 /** 093 * Constructs a fully-initialized DefaultEndpoint instance. This is the 094 * preferred method of constructing an object from Java code (as opposed to 095 * Spring beans, etc.). 096 * 097 * @param endpointUri the full URI used to create this endpoint 098 * @param component the component that created this endpoint 099 */ 100 protected DefaultEndpoint(String endpointUri, Component component) { 101 this.camelContext = component == null ? null : component.getCamelContext(); 102 this.component = component; 103 this.setEndpointUri(endpointUri); 104 } 105 106 /** 107 * Constructs a DefaultEndpoint instance which has <b>not</b> been created 108 * using a {@link Component}. 109 * <p/> 110 * <b>Note:</b> It is preferred to create endpoints using the associated 111 * component. 112 * 113 * @param endpointUri the full URI used to create this endpoint 114 * @param camelContext the Camel Context in which this endpoint is operating 115 */ 116 @Deprecated 117 protected DefaultEndpoint(String endpointUri, CamelContext camelContext) { 118 this(endpointUri); 119 this.camelContext = camelContext; 120 } 121 122 /** 123 * Constructs a partially-initialized DefaultEndpoint instance. 124 * <p/> 125 * <b>Note:</b> It is preferred to create endpoints using the associated 126 * component. 127 * 128 * @param endpointUri the full URI used to create this endpoint 129 */ 130 @Deprecated 131 protected DefaultEndpoint(String endpointUri) { 132 this.setEndpointUri(endpointUri); 133 } 134 135 /** 136 * Constructs a partially-initialized DefaultEndpoint instance. Useful when 137 * creating endpoints manually (e.g., as beans in Spring). 138 * <p/> 139 * Please note that the endpoint URI must be set through properties (or 140 * overriding {@link #createEndpointUri()} if one uses this constructor. 141 * <p/> 142 * <b>Note:</b> It is preferred to create endpoints using the associated 143 * component. 144 */ 145 protected DefaultEndpoint() { 146 } 147 148 public int hashCode() { 149 return getEndpointUri().hashCode() * 37 + 1; 150 } 151 152 @Override 153 public boolean equals(Object object) { 154 if (object instanceof DefaultEndpoint) { 155 DefaultEndpoint that = (DefaultEndpoint)object; 156 // must also match the same CamelContext in case we compare endpoints from different contexts 157 String thisContextName = this.getCamelContext() != null ? this.getCamelContext().getName() : null; 158 String thatContextName = that.getCamelContext() != null ? that.getCamelContext().getName() : null; 159 return ObjectHelper.equal(this.getEndpointUri(), that.getEndpointUri()) && ObjectHelper.equal(thisContextName, thatContextName); 160 } 161 return false; 162 } 163 164 @Override 165 public String toString() { 166 if (endpointUriToString == null) { 167 String value = null; 168 try { 169 value = getEndpointUri(); 170 } catch (RuntimeException e) { 171 // ignore any exception and use null for building the string value 172 } 173 // ensure to sanitize uri so we do not show sensitive information such as passwords 174 endpointUriToString = URISupport.sanitizeUri(value); 175 } 176 return endpointUriToString; 177 } 178 179 /** 180 * Returns a unique String ID which can be used for aliasing without having 181 * to use the whole URI which is not unique 182 */ 183 public String getId() { 184 return id; 185 } 186 187 public String getEndpointUri() { 188 if (endpointUri == null) { 189 endpointUri = createEndpointUri(); 190 if (endpointUri == null) { 191 throw new IllegalArgumentException("endpointUri is not specified and " + getClass().getName() 192 + " does not implement createEndpointUri() to create a default value"); 193 } 194 } 195 return endpointUri; 196 } 197 198 public EndpointConfiguration getEndpointConfiguration() { 199 if (endpointConfiguration == null) { 200 endpointConfiguration = createEndpointConfiguration(getEndpointUri()); 201 } 202 return endpointConfiguration; 203 } 204 205 /** 206 * Sets a custom {@link EndpointConfiguration} 207 * 208 * @param endpointConfiguration a custom endpoint configuration to be used. 209 */ 210 @Deprecated 211 public void setEndpointConfiguration(EndpointConfiguration endpointConfiguration) { 212 this.endpointConfiguration = endpointConfiguration; 213 } 214 215 public String getEndpointKey() { 216 if (isLenientProperties()) { 217 // only use the endpoint uri without parameters as the properties are lenient 218 String uri = getEndpointUri(); 219 if (uri.indexOf('?') != -1) { 220 return StringHelper.before(uri, "?"); 221 } else { 222 return uri; 223 } 224 } else { 225 // use the full endpoint uri 226 return getEndpointUri(); 227 } 228 } 229 230 public CamelContext getCamelContext() { 231 return camelContext; 232 } 233 234 /** 235 * Returns the component that created this endpoint. 236 * 237 * @return the component that created this endpoint, or <tt>null</tt> if 238 * none set 239 */ 240 public Component getComponent() { 241 return component; 242 } 243 244 public void setCamelContext(CamelContext camelContext) { 245 this.camelContext = camelContext; 246 } 247 248 public PollingConsumer createPollingConsumer() throws Exception { 249 // should not call configurePollingConsumer when its EventDrivenPollingConsumer 250 if (LOG.isDebugEnabled()) { 251 LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} blockWhenFull: {} blockTimeout: {}", 252 new Object[]{getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull(), getPollingConsumerBlockTimeout()}); 253 } 254 EventDrivenPollingConsumer consumer = new EventDrivenPollingConsumer(this, getPollingConsumerQueueSize()); 255 consumer.setBlockWhenFull(isPollingConsumerBlockWhenFull()); 256 consumer.setBlockTimeout(getPollingConsumerBlockTimeout()); 257 return consumer; 258 } 259 260 public Exchange createExchange(Exchange exchange) { 261 return exchange.copy(); 262 } 263 264 public Exchange createExchange() { 265 return createExchange(getExchangePattern()); 266 } 267 268 public Exchange createExchange(ExchangePattern pattern) { 269 return new DefaultExchange(this, pattern); 270 } 271 272 /** 273 * Returns the default exchange pattern to use when creating an exchange. 274 */ 275 public ExchangePattern getExchangePattern() { 276 return exchangePattern; 277 } 278 279 /** 280 * Sets the default exchange pattern when creating an exchange. 281 */ 282 public void setExchangePattern(ExchangePattern exchangePattern) { 283 this.exchangePattern = exchangePattern; 284 } 285 286 /** 287 * Returns whether synchronous processing should be strictly used. 288 */ 289 public boolean isSynchronous() { 290 return synchronous; 291 } 292 293 /** 294 * Sets whether synchronous processing should be strictly used, or Camel is 295 * allowed to use asynchronous processing (if supported). 296 * 297 * @param synchronous <tt>true</tt> to enforce synchronous processing 298 */ 299 public void setSynchronous(boolean synchronous) { 300 this.synchronous = synchronous; 301 } 302 303 public boolean isBridgeErrorHandler() { 304 return bridgeErrorHandler; 305 } 306 307 /** 308 * Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while 309 * the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and 310 * handled by the routing Error Handler. 311 * <p/> 312 * By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, 313 * that will be logged at WARN/ERROR level and ignored. 314 */ 315 public void setBridgeErrorHandler(boolean bridgeErrorHandler) { 316 this.bridgeErrorHandler = bridgeErrorHandler; 317 } 318 319 public ExceptionHandler getExceptionHandler() { 320 return exceptionHandler; 321 } 322 323 /** 324 * To let the consumer use a custom ExceptionHandler. 325 + Notice if the option bridgeErrorHandler is enabled then this options is not in use. 326 + By default the consumer will deal with exceptions, that will be logged at WARN/ERROR level and ignored. 327 */ 328 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 329 this.exceptionHandler = exceptionHandler; 330 } 331 332 /** 333 * Gets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 334 * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and 335 * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. 336 * <p/> 337 * The default value is <tt>1000</tt> 338 */ 339 public int getPollingConsumerQueueSize() { 340 return pollingConsumerQueueSize; 341 } 342 343 /** 344 * Sets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 345 * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and 346 * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. 347 * <p/> 348 * The default value is <tt>1000</tt> 349 */ 350 public void setPollingConsumerQueueSize(int pollingConsumerQueueSize) { 351 this.pollingConsumerQueueSize = pollingConsumerQueueSize; 352 } 353 354 /** 355 * Whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 356 * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and 357 * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. 358 * <p/> 359 * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown 360 * when trying to add to the queue, and its full. 361 * <p/> 362 * The default value is <tt>true</tt> which will block the producer queue until the queue has space. 363 */ 364 public boolean isPollingConsumerBlockWhenFull() { 365 return pollingConsumerBlockWhenFull; 366 } 367 368 /** 369 * Set whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 370 * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and 371 * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. 372 * <p/> 373 * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown 374 * when trying to add to the queue, and its full. 375 * <p/> 376 * The default value is <tt>true</tt> which will block the producer queue until the queue has space. 377 */ 378 public void setPollingConsumerBlockWhenFull(boolean pollingConsumerBlockWhenFull) { 379 this.pollingConsumerBlockWhenFull = pollingConsumerBlockWhenFull; 380 } 381 382 /** 383 * Sets the timeout in millis to use when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 384 * is being used. 385 * 386 * @see #setPollingConsumerBlockWhenFull(boolean) 387 */ 388 public long getPollingConsumerBlockTimeout() { 389 return pollingConsumerBlockTimeout; 390 } 391 392 /** 393 * Sets the timeout in millis to use when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 394 * is being used. 395 * 396 * @see #setPollingConsumerBlockWhenFull(boolean) 397 */ 398 public void setPollingConsumerBlockTimeout(long pollingConsumerBlockTimeout) { 399 this.pollingConsumerBlockTimeout = pollingConsumerBlockTimeout; 400 } 401 402 public void configureProperties(Map<String, Object> options) { 403 Map<String, Object> consumerProperties = IntrospectionSupport.extractProperties(options, "consumer."); 404 if (consumerProperties != null && !consumerProperties.isEmpty()) { 405 setConsumerProperties(consumerProperties); 406 } 407 } 408 409 /** 410 * Sets the bean properties on the given bean. 411 * <p/> 412 * This is the same logical implementation as {@link DefaultComponent#setProperties(Object, java.util.Map)} 413 * 414 * @param bean the bean 415 * @param parameters properties to set 416 */ 417 protected void setProperties(Object bean, Map<String, Object> parameters) throws Exception { 418 // set reference properties first as they use # syntax that fools the regular properties setter 419 EndpointHelper.setReferenceProperties(getCamelContext(), bean, parameters); 420 EndpointHelper.setProperties(getCamelContext(), bean, parameters); 421 } 422 423 /** 424 * A factory method to lazily create the endpointUri if none is specified 425 */ 426 protected String createEndpointUri() { 427 return null; 428 } 429 430 /** 431 * A factory method to lazily create the endpoint configuration if none is specified 432 */ 433 @Deprecated 434 protected EndpointConfiguration createEndpointConfiguration(String uri) { 435 // using this factory method to be backwards compatible with the old code 436 if (getComponent() != null) { 437 // prefer to use component endpoint configuration 438 try { 439 return getComponent().createConfiguration(uri); 440 } catch (Exception e) { 441 throw ObjectHelper.wrapRuntimeCamelException(e); 442 } 443 } else if (getCamelContext() != null) { 444 // fallback and use a mapped endpoint configuration 445 return new MappedEndpointConfiguration(getCamelContext(), uri); 446 } 447 // not configuration possible 448 return null; 449 } 450 451 /** 452 * Sets the endpointUri if it has not been specified yet via some kind of 453 * dependency injection mechanism. This allows dependency injection 454 * frameworks such as Spring or Guice to set the default endpoint URI in 455 * cases where it has not been explicitly configured using the name/context 456 * in which an Endpoint is created. 457 */ 458 public void setEndpointUriIfNotSpecified(String value) { 459 if (endpointUri == null) { 460 setEndpointUri(value); 461 } 462 } 463 464 /** 465 * Sets the URI that created this endpoint. 466 */ 467 protected void setEndpointUri(String endpointUri) { 468 this.endpointUri = endpointUri; 469 } 470 471 public boolean isLenientProperties() { 472 // default should be false for most components 473 return false; 474 } 475 476 public Map<String, Object> getConsumerProperties() { 477 if (consumerProperties == null) { 478 // must create empty if none exists 479 consumerProperties = new HashMap<>(); 480 } 481 return consumerProperties; 482 } 483 484 public void setConsumerProperties(Map<String, Object> consumerProperties) { 485 // append consumer properties 486 if (consumerProperties != null && !consumerProperties.isEmpty()) { 487 if (this.consumerProperties == null) { 488 this.consumerProperties = new HashMap<>(consumerProperties); 489 } else { 490 this.consumerProperties.putAll(consumerProperties); 491 } 492 } 493 } 494 495 protected void configureConsumer(Consumer consumer) throws Exception { 496 // inject CamelContext 497 if (consumer instanceof CamelContextAware) { 498 ((CamelContextAware) consumer).setCamelContext(getCamelContext()); 499 } 500 501 if (consumerProperties != null) { 502 // use a defensive copy of the consumer properties as the methods below will remove the used properties 503 // and in case we restart routes, we need access to the original consumer properties again 504 Map<String, Object> copy = new HashMap<>(consumerProperties); 505 506 // set reference properties first as they use # syntax that fools the regular properties setter 507 EndpointHelper.setReferenceProperties(getCamelContext(), consumer, copy); 508 EndpointHelper.setProperties(getCamelContext(), consumer, copy); 509 510 // special consumer.bridgeErrorHandler option 511 Object bridge = copy.remove("bridgeErrorHandler"); 512 if (bridge != null && "true".equals(bridge)) { 513 if (consumer instanceof DefaultConsumer) { 514 DefaultConsumer defaultConsumer = (DefaultConsumer) consumer; 515 defaultConsumer.setExceptionHandler(new BridgeExceptionHandlerToErrorHandler(defaultConsumer)); 516 } else { 517 throw new IllegalArgumentException("Option consumer.bridgeErrorHandler is only supported by endpoints," 518 + " having their consumer extend DefaultConsumer. The consumer is a " + consumer.getClass().getName() + " class."); 519 } 520 } 521 522 if (!this.isLenientProperties() && copy.size() > 0) { 523 throw new ResolveEndpointFailedException(this.getEndpointUri(), "There are " + copy.size() 524 + " parameters that couldn't be set on the endpoint consumer." 525 + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint." 526 + " Unknown consumer parameters=[" + copy + "]"); 527 } 528 } 529 } 530 531 protected void configurePollingConsumer(PollingConsumer consumer) throws Exception { 532 configureConsumer(consumer); 533 } 534 535 @Override 536 protected void doStart() throws Exception { 537 // the bridgeErrorHandler/exceptionHandler was originally configured with consumer. prefix, such as consumer.bridgeErrorHandler=true 538 // so if they have been configured on the endpoint then map to the old naming style 539 if (bridgeErrorHandler) { 540 getConsumerProperties().put("bridgeErrorHandler", "true"); 541 } 542 if (exceptionHandler != null) { 543 getConsumerProperties().put("exceptionHandler", exceptionHandler); 544 } 545 } 546 547 @Override 548 protected void doStop() throws Exception { 549 // noop 550 } 551}