001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.HashMap; 020import java.util.Map; 021 022import org.apache.camel.CamelContext; 023import org.apache.camel.CamelContextAware; 024import org.apache.camel.Component; 025import org.apache.camel.Consumer; 026import org.apache.camel.Endpoint; 027import org.apache.camel.EndpointConfiguration; 028import org.apache.camel.Exchange; 029import org.apache.camel.ExchangePattern; 030import org.apache.camel.PollingConsumer; 031import org.apache.camel.ResolveEndpointFailedException; 032import org.apache.camel.spi.ExceptionHandler; 033import org.apache.camel.spi.HasId; 034import org.apache.camel.spi.UriParam; 035import org.apache.camel.support.ServiceSupport; 036import org.apache.camel.util.EndpointHelper; 037import org.apache.camel.util.IntrospectionSupport; 038import org.apache.camel.util.ObjectHelper; 039import org.apache.camel.util.URISupport; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * A default endpoint useful for implementation inheritance. 045 * <p/> 046 * Components which leverages <a 047 * href="http://camel.apache.org/asynchronous-routing-engine.html">asynchronous 048 * processing model</a> should check the {@link #isSynchronous()} to determine 049 * if asynchronous processing is allowed. The <tt>synchronous</tt> option on the 050 * endpoint allows Camel end users to dictate whether they want the asynchronous 051 * model or not. The option is default <tt>false</tt> which means asynchronous 052 * processing is allowed. 053 * 054 * @version 055 */ 056public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint, HasId, CamelContextAware { 057 058 private static final Logger LOG = LoggerFactory.getLogger(DefaultEndpoint.class); 059 private transient String endpointUriToString; 060 private String endpointUri; 061 private EndpointConfiguration endpointConfiguration; 062 private CamelContext camelContext; 063 private Component component; 064 @UriParam(label = "consumer", optionalPrefix = "consumer.", description = "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while" 065 + " the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler." 066 + " By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN/ERROR level and ignored.") 067 private boolean bridgeErrorHandler; 068 @UriParam(label = "consumer,advanced", optionalPrefix = "consumer.", description = "To let the consumer use a custom ExceptionHandler." 069 + " Notice if the option bridgeErrorHandler is enabled then this options is not in use." 070 + " By default the consumer will deal with exceptions, that will be logged at WARN/ERROR level and ignored.") 071 private ExceptionHandler exceptionHandler; 072 @UriParam(defaultValue = "InOnly", label = "advanced", 073 description = "Sets the default exchange pattern when creating an exchange") 074 private ExchangePattern exchangePattern = ExchangePattern.InOnly; 075 // option to allow end user to dictate whether async processing should be 076 // used or not (if possible) 077 @UriParam(defaultValue = "false", label = "advanced", 078 description = "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).") 079 private boolean synchronous; 080 private final String id = EndpointHelper.createEndpointId(); 081 private Map<String, Object> consumerProperties; 082 private int pollingConsumerQueueSize = 1000; 083 private boolean pollingConsumerBlockWhenFull = true; 084 private long pollingConsumerBlockTimeout; 085 086 /** 087 * Constructs a fully-initialized DefaultEndpoint instance. This is the 088 * preferred method of constructing an object from Java code (as opposed to 089 * Spring beans, etc.). 090 * 091 * @param endpointUri the full URI used to create this endpoint 092 * @param component the component that created this endpoint 093 */ 094 protected DefaultEndpoint(String endpointUri, Component component) { 095 this.camelContext = component == null ? null : component.getCamelContext(); 096 this.component = component; 097 this.setEndpointUri(endpointUri); 098 } 099 100 /** 101 * Constructs a DefaultEndpoint instance which has <b>not</b> been created 102 * using a {@link Component}. 103 * <p/> 104 * <b>Note:</b> It is preferred to create endpoints using the associated 105 * component. 106 * 107 * @param endpointUri the full URI used to create this endpoint 108 * @param camelContext the Camel Context in which this endpoint is operating 109 */ 110 @Deprecated 111 protected DefaultEndpoint(String endpointUri, CamelContext camelContext) { 112 this(endpointUri); 113 this.camelContext = camelContext; 114 } 115 116 /** 117 * Constructs a partially-initialized DefaultEndpoint instance. 118 * <p/> 119 * <b>Note:</b> It is preferred to create endpoints using the associated 120 * component. 121 * 122 * @param endpointUri the full URI used to create this endpoint 123 */ 124 @Deprecated 125 protected DefaultEndpoint(String endpointUri) { 126 this.setEndpointUri(endpointUri); 127 } 128 129 /** 130 * Constructs a partially-initialized DefaultEndpoint instance. Useful when 131 * creating endpoints manually (e.g., as beans in Spring). 132 * <p/> 133 * Please note that the endpoint URI must be set through properties (or 134 * overriding {@link #createEndpointUri()} if one uses this constructor. 135 * <p/> 136 * <b>Note:</b> It is preferred to create endpoints using the associated 137 * component. 138 */ 139 protected DefaultEndpoint() { 140 } 141 142 public int hashCode() { 143 return getEndpointUri().hashCode() * 37 + 1; 144 } 145 146 @Override 147 public boolean equals(Object object) { 148 if (object instanceof DefaultEndpoint) { 149 DefaultEndpoint that = (DefaultEndpoint)object; 150 // must also match the same CamelContext in case we compare endpoints from different contexts 151 String thisContextName = this.getCamelContext() != null ? this.getCamelContext().getName() : null; 152 String thatContextName = that.getCamelContext() != null ? that.getCamelContext().getName() : null; 153 return ObjectHelper.equal(this.getEndpointUri(), that.getEndpointUri()) && ObjectHelper.equal(thisContextName, thatContextName); 154 } 155 return false; 156 } 157 158 @Override 159 public String toString() { 160 if (endpointUriToString == null) { 161 String value = null; 162 try { 163 value = getEndpointUri(); 164 } catch (RuntimeException e) { 165 // ignore any exception and use null for building the string value 166 } 167 endpointUriToString = String.format("Endpoint[%s]", URISupport.sanitizeUri(value)); 168 } 169 return endpointUriToString; 170 } 171 172 /** 173 * Returns a unique String ID which can be used for aliasing without having 174 * to use the whole URI which is not unique 175 */ 176 public String getId() { 177 return id; 178 } 179 180 public String getEndpointUri() { 181 if (endpointUri == null) { 182 endpointUri = createEndpointUri(); 183 if (endpointUri == null) { 184 throw new IllegalArgumentException("endpointUri is not specified and " + getClass().getName() 185 + " does not implement createEndpointUri() to create a default value"); 186 } 187 } 188 return endpointUri; 189 } 190 191 public EndpointConfiguration getEndpointConfiguration() { 192 if (endpointConfiguration == null) { 193 endpointConfiguration = createEndpointConfiguration(getEndpointUri()); 194 } 195 return endpointConfiguration; 196 } 197 198 /** 199 * Sets a custom {@link EndpointConfiguration} 200 * 201 * @param endpointConfiguration a custom endpoint configuration to be used. 202 */ 203 public void setEndpointConfiguration(EndpointConfiguration endpointConfiguration) { 204 this.endpointConfiguration = endpointConfiguration; 205 } 206 207 public String getEndpointKey() { 208 if (isLenientProperties()) { 209 // only use the endpoint uri without parameters as the properties is 210 // lenient 211 String uri = getEndpointUri(); 212 if (uri.indexOf('?') != -1) { 213 return ObjectHelper.before(uri, "?"); 214 } else { 215 return uri; 216 } 217 } else { 218 // use the full endpoint uri 219 return getEndpointUri(); 220 } 221 } 222 223 public CamelContext getCamelContext() { 224 return camelContext; 225 } 226 227 /** 228 * Returns the component that created this endpoint. 229 * 230 * @return the component that created this endpoint, or <tt>null</tt> if 231 * none set 232 */ 233 public Component getComponent() { 234 return component; 235 } 236 237 public void setCamelContext(CamelContext camelContext) { 238 this.camelContext = camelContext; 239 } 240 241 public PollingConsumer createPollingConsumer() throws Exception { 242 // should not call configurePollingConsumer when its EventDrivenPollingConsumer 243 if (LOG.isDebugEnabled()) { 244 LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} blockWhenFull: {} blockTimeout: {}", 245 new Object[]{getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull(), getPollingConsumerBlockTimeout()}); 246 } 247 EventDrivenPollingConsumer consumer = new EventDrivenPollingConsumer(this, getPollingConsumerQueueSize()); 248 consumer.setBlockWhenFull(isPollingConsumerBlockWhenFull()); 249 consumer.setBlockTimeout(getPollingConsumerBlockTimeout()); 250 return consumer; 251 } 252 253 public Exchange createExchange(Exchange exchange) { 254 return exchange.copy(); 255 } 256 257 public Exchange createExchange() { 258 return createExchange(getExchangePattern()); 259 } 260 261 public Exchange createExchange(ExchangePattern pattern) { 262 return new DefaultExchange(this, pattern); 263 } 264 265 /** 266 * Returns the default exchange pattern to use when creating an exchange. 267 */ 268 public ExchangePattern getExchangePattern() { 269 return exchangePattern; 270 } 271 272 /** 273 * Sets the default exchange pattern when creating an exchange. 274 */ 275 public void setExchangePattern(ExchangePattern exchangePattern) { 276 this.exchangePattern = exchangePattern; 277 } 278 279 /** 280 * Returns whether synchronous processing should be strictly used. 281 */ 282 public boolean isSynchronous() { 283 return synchronous; 284 } 285 286 /** 287 * Sets whether synchronous processing should be strictly used, or Camel is 288 * allowed to use asynchronous processing (if supported). 289 * 290 * @param synchronous <tt>true</tt> to enforce synchronous processing 291 */ 292 public void setSynchronous(boolean synchronous) { 293 this.synchronous = synchronous; 294 } 295 296 public boolean isBridgeErrorHandler() { 297 return bridgeErrorHandler; 298 } 299 300 /** 301 * Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while 302 * the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and 303 * handled by the routing Error Handler. 304 * <p/> 305 * By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, 306 * that will be logged at WARN/ERROR level and ignored. 307 */ 308 public void setBridgeErrorHandler(boolean bridgeErrorHandler) { 309 this.bridgeErrorHandler = bridgeErrorHandler; 310 } 311 312 public ExceptionHandler getExceptionHandler() { 313 return exceptionHandler; 314 } 315 316 /** 317 * To let the consumer use a custom ExceptionHandler. 318 + Notice if the option bridgeErrorHandler is enabled then this options is not in use. 319 + By default the consumer will deal with exceptions, that will be logged at WARN/ERROR level and ignored. 320 */ 321 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 322 this.exceptionHandler = exceptionHandler; 323 } 324 325 /** 326 * Gets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 327 * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and 328 * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. 329 * <p/> 330 * The default value is <tt>1000</tt> 331 */ 332 public int getPollingConsumerQueueSize() { 333 return pollingConsumerQueueSize; 334 } 335 336 /** 337 * Sets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 338 * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and 339 * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. 340 * <p/> 341 * The default value is <tt>1000</tt> 342 */ 343 public void setPollingConsumerQueueSize(int pollingConsumerQueueSize) { 344 this.pollingConsumerQueueSize = pollingConsumerQueueSize; 345 } 346 347 /** 348 * Whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 349 * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and 350 * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. 351 * <p/> 352 * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown 353 * when trying to add to the queue, and its full. 354 * <p/> 355 * The default value is <tt>true</tt> which will block the producer queue until the queue has space. 356 */ 357 public boolean isPollingConsumerBlockWhenFull() { 358 return pollingConsumerBlockWhenFull; 359 } 360 361 /** 362 * Set whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 363 * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and 364 * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation. 365 * <p/> 366 * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown 367 * when trying to add to the queue, and its full. 368 * <p/> 369 * The default value is <tt>true</tt> which will block the producer queue until the queue has space. 370 */ 371 public void setPollingConsumerBlockWhenFull(boolean pollingConsumerBlockWhenFull) { 372 this.pollingConsumerBlockWhenFull = pollingConsumerBlockWhenFull; 373 } 374 375 /** 376 * Sets the timeout in millis to use when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 377 * is being used. 378 * 379 * @see #setPollingConsumerBlockWhenFull(boolean) 380 */ 381 public long getPollingConsumerBlockTimeout() { 382 return pollingConsumerBlockTimeout; 383 } 384 385 /** 386 * Sets the timeout in millis to use when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer} 387 * is being used. 388 * 389 * @see #setPollingConsumerBlockWhenFull(boolean) 390 */ 391 public void setPollingConsumerBlockTimeout(long pollingConsumerBlockTimeout) { 392 this.pollingConsumerBlockTimeout = pollingConsumerBlockTimeout; 393 } 394 395 public void configureProperties(Map<String, Object> options) { 396 Map<String, Object> consumerProperties = IntrospectionSupport.extractProperties(options, "consumer."); 397 if (consumerProperties != null && !consumerProperties.isEmpty()) { 398 setConsumerProperties(consumerProperties); 399 } 400 } 401 402 /** 403 * Sets the bean properties on the given bean. 404 * <p/> 405 * This is the same logical implementation as {@link DefaultComponent#setProperties(Object, java.util.Map)} 406 * 407 * @param bean the bean 408 * @param parameters properties to set 409 */ 410 protected void setProperties(Object bean, Map<String, Object> parameters) throws Exception { 411 // set reference properties first as they use # syntax that fools the regular properties setter 412 EndpointHelper.setReferenceProperties(getCamelContext(), bean, parameters); 413 EndpointHelper.setProperties(getCamelContext(), bean, parameters); 414 } 415 416 /** 417 * A factory method to lazily create the endpointUri if none is specified 418 */ 419 protected String createEndpointUri() { 420 return null; 421 } 422 423 /** 424 * A factory method to lazily create the endpoint configuration if none is specified 425 */ 426 protected EndpointConfiguration createEndpointConfiguration(String uri) { 427 // using this factory method to be backwards compatible with the old code 428 if (getComponent() != null) { 429 // prefer to use component endpoint configuration 430 try { 431 return getComponent().createConfiguration(uri); 432 } catch (Exception e) { 433 throw ObjectHelper.wrapRuntimeCamelException(e); 434 } 435 } else if (getCamelContext() != null) { 436 // fallback and use a mapped endpoint configuration 437 return new MappedEndpointConfiguration(getCamelContext(), uri); 438 } 439 // not configuration possible 440 return null; 441 } 442 443 /** 444 * Sets the endpointUri if it has not been specified yet via some kind of 445 * dependency injection mechanism. This allows dependency injection 446 * frameworks such as Spring or Guice to set the default endpoint URI in 447 * cases where it has not been explicitly configured using the name/context 448 * in which an Endpoint is created. 449 */ 450 public void setEndpointUriIfNotSpecified(String value) { 451 if (endpointUri == null) { 452 setEndpointUri(value); 453 } 454 } 455 456 /** 457 * Sets the URI that created this endpoint. 458 */ 459 protected void setEndpointUri(String endpointUri) { 460 this.endpointUri = endpointUri; 461 } 462 463 public boolean isLenientProperties() { 464 // default should be false for most components 465 return false; 466 } 467 468 public Map<String, Object> getConsumerProperties() { 469 if (consumerProperties == null) { 470 // must create empty if none exists 471 consumerProperties = new HashMap<String, Object>(); 472 } 473 return consumerProperties; 474 } 475 476 public void setConsumerProperties(Map<String, Object> consumerProperties) { 477 // append consumer properties 478 if (consumerProperties != null && !consumerProperties.isEmpty()) { 479 if (this.consumerProperties == null) { 480 this.consumerProperties = new HashMap<String, Object>(consumerProperties); 481 } else { 482 this.consumerProperties.putAll(consumerProperties); 483 } 484 } 485 } 486 487 protected void configureConsumer(Consumer consumer) throws Exception { 488 if (consumerProperties != null) { 489 // use a defensive copy of the consumer properties as the methods below will remove the used properties 490 // and in case we restart routes, we need access to the original consumer properties again 491 Map<String, Object> copy = new HashMap<String, Object>(consumerProperties); 492 493 // set reference properties first as they use # syntax that fools the regular properties setter 494 EndpointHelper.setReferenceProperties(getCamelContext(), consumer, copy); 495 EndpointHelper.setProperties(getCamelContext(), consumer, copy); 496 497 // special consumer.bridgeErrorHandler option 498 Object bridge = copy.remove("bridgeErrorHandler"); 499 if (bridge != null && "true".equals(bridge)) { 500 if (consumer instanceof DefaultConsumer) { 501 DefaultConsumer defaultConsumer = (DefaultConsumer) consumer; 502 defaultConsumer.setExceptionHandler(new BridgeExceptionHandlerToErrorHandler(defaultConsumer)); 503 } else { 504 throw new IllegalArgumentException("Option consumer.bridgeErrorHandler is only supported by endpoints," 505 + " having their consumer extend DefaultConsumer. The consumer is a " + consumer.getClass().getName() + " class."); 506 } 507 } 508 509 if (!this.isLenientProperties() && copy.size() > 0) { 510 throw new ResolveEndpointFailedException(this.getEndpointUri(), "There are " + copy.size() 511 + " parameters that couldn't be set on the endpoint consumer." 512 + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint." 513 + " Unknown consumer parameters=[" + copy + "]"); 514 } 515 } 516 } 517 518 protected void configurePollingConsumer(PollingConsumer consumer) throws Exception { 519 configureConsumer(consumer); 520 } 521 522 @Override 523 protected void doStart() throws Exception { 524 // the bridgeErrorHandler/exceptionHandler was originally configured with consumer. prefix, such as consumer.bridgeErrorHandler=true 525 // so if they have been configured on the endpoint then map to the old naming style 526 if (bridgeErrorHandler) { 527 getConsumerProperties().put("bridgeErrorHandler", "true"); 528 } 529 if (exceptionHandler != null) { 530 getConsumerProperties().put("exceptionHandler", exceptionHandler); 531 } 532 } 533 534 @Override 535 protected void doStop() throws Exception { 536 // noop 537 } 538}