001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.HashMap;
020import java.util.Map;
021
022import org.apache.camel.CamelContext;
023import org.apache.camel.CamelContextAware;
024import org.apache.camel.Component;
025import org.apache.camel.Consumer;
026import org.apache.camel.Endpoint;
027import org.apache.camel.EndpointConfiguration;
028import org.apache.camel.Exchange;
029import org.apache.camel.ExchangePattern;
030import org.apache.camel.PollingConsumer;
031import org.apache.camel.ResolveEndpointFailedException;
032import org.apache.camel.spi.ExceptionHandler;
033import org.apache.camel.spi.HasId;
034import org.apache.camel.spi.UriParam;
035import org.apache.camel.support.ServiceSupport;
036import org.apache.camel.util.EndpointHelper;
037import org.apache.camel.util.IntrospectionSupport;
038import org.apache.camel.util.ObjectHelper;
039import org.apache.camel.util.URISupport;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * A default endpoint useful for implementation inheritance.
045 * <p/>
046 * Components which leverages <a
047 * href="http://camel.apache.org/asynchronous-routing-engine.html">asynchronous
048 * processing model</a> should check the {@link #isSynchronous()} to determine
049 * if asynchronous processing is allowed. The <tt>synchronous</tt> option on the
050 * endpoint allows Camel end users to dictate whether they want the asynchronous
051 * model or not. The option is default <tt>false</tt> which means asynchronous
052 * processing is allowed.
053 * 
054 * @version
055 */
056public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint, HasId, CamelContextAware {
057
058    private static final Logger LOG = LoggerFactory.getLogger(DefaultEndpoint.class);
059    private transient String endpointUriToString;
060    private String endpointUri;
061    private EndpointConfiguration endpointConfiguration;
062    private CamelContext camelContext;
063    private Component component;
064    @UriParam(label = "consumer", optionalPrefix = "consumer.", description = "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while"
065                    + " the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler."
066                    + " By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN/ERROR level and ignored.")
067    private boolean bridgeErrorHandler;
068    @UriParam(label = "consumer,advanced", optionalPrefix = "consumer.", description = "To let the consumer use a custom ExceptionHandler."
069            + " Notice if the option bridgeErrorHandler is enabled then this options is not in use."
070            + " By default the consumer will deal with exceptions, that will be logged at WARN/ERROR level and ignored.")
071    private ExceptionHandler exceptionHandler;
072    @UriParam(defaultValue = "InOnly", label = "advanced",
073            description = "Sets the default exchange pattern when creating an exchange")
074    private ExchangePattern exchangePattern = ExchangePattern.InOnly;
075    // option to allow end user to dictate whether async processing should be
076    // used or not (if possible)
077    @UriParam(defaultValue = "false", label = "advanced",
078            description = "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).")
079    private boolean synchronous;
080    private final String id = EndpointHelper.createEndpointId();
081    private Map<String, Object> consumerProperties;
082    private int pollingConsumerQueueSize = 1000;
083    private boolean pollingConsumerBlockWhenFull = true;
084    private long pollingConsumerBlockTimeout;
085
086    /**
087     * Constructs a fully-initialized DefaultEndpoint instance. This is the
088     * preferred method of constructing an object from Java code (as opposed to
089     * Spring beans, etc.).
090     * 
091     * @param endpointUri the full URI used to create this endpoint
092     * @param component the component that created this endpoint
093     */
094    protected DefaultEndpoint(String endpointUri, Component component) {
095        this.camelContext = component == null ? null : component.getCamelContext();
096        this.component = component;
097        this.setEndpointUri(endpointUri);
098    }
099
100    /**
101     * Constructs a DefaultEndpoint instance which has <b>not</b> been created
102     * using a {@link Component}.
103     * <p/>
104     * <b>Note:</b> It is preferred to create endpoints using the associated
105     * component.
106     * 
107     * @param endpointUri the full URI used to create this endpoint
108     * @param camelContext the Camel Context in which this endpoint is operating
109     */
110    @Deprecated
111    protected DefaultEndpoint(String endpointUri, CamelContext camelContext) {
112        this(endpointUri);
113        this.camelContext = camelContext;
114    }
115
116    /**
117     * Constructs a partially-initialized DefaultEndpoint instance.
118     * <p/>
119     * <b>Note:</b> It is preferred to create endpoints using the associated
120     * component.
121     * 
122     * @param endpointUri the full URI used to create this endpoint
123     */
124    @Deprecated
125    protected DefaultEndpoint(String endpointUri) {
126        this.setEndpointUri(endpointUri);
127    }
128
129    /**
130     * Constructs a partially-initialized DefaultEndpoint instance. Useful when
131     * creating endpoints manually (e.g., as beans in Spring).
132     * <p/>
133     * Please note that the endpoint URI must be set through properties (or
134     * overriding {@link #createEndpointUri()} if one uses this constructor.
135     * <p/>
136     * <b>Note:</b> It is preferred to create endpoints using the associated
137     * component.
138     */
139    protected DefaultEndpoint() {
140    }
141
142    public int hashCode() {
143        return getEndpointUri().hashCode() * 37 + 1;
144    }
145
146    @Override
147    public boolean equals(Object object) {
148        if (object instanceof DefaultEndpoint) {
149            DefaultEndpoint that = (DefaultEndpoint)object;
150            // must also match the same CamelContext in case we compare endpoints from different contexts
151            String thisContextName = this.getCamelContext() != null ? this.getCamelContext().getName() : null;
152            String thatContextName = that.getCamelContext() != null ? that.getCamelContext().getName() : null;
153            return ObjectHelper.equal(this.getEndpointUri(), that.getEndpointUri()) && ObjectHelper.equal(thisContextName, thatContextName);
154        }
155        return false;
156    }
157
158    @Override
159    public String toString() {
160        if (endpointUriToString == null) {
161            String value = null;
162            try {
163                value = getEndpointUri();
164            } catch (RuntimeException e) {
165                // ignore any exception and use null for building the string value
166            }
167            endpointUriToString = String.format("Endpoint[%s]", URISupport.sanitizeUri(value));
168        }
169        return endpointUriToString;
170    }
171
172    /**
173     * Returns a unique String ID which can be used for aliasing without having
174     * to use the whole URI which is not unique
175     */
176    public String getId() {
177        return id;
178    }
179
180    public String getEndpointUri() {
181        if (endpointUri == null) {
182            endpointUri = createEndpointUri();
183            if (endpointUri == null) {
184                throw new IllegalArgumentException("endpointUri is not specified and " + getClass().getName()
185                    + " does not implement createEndpointUri() to create a default value");
186            }
187        }
188        return endpointUri;
189    }
190
191    public EndpointConfiguration getEndpointConfiguration() {
192        if (endpointConfiguration == null) {
193            endpointConfiguration = createEndpointConfiguration(getEndpointUri());
194        }
195        return endpointConfiguration;
196    }
197
198    /**
199     * Sets a custom {@link EndpointConfiguration}
200     *
201     * @param endpointConfiguration a custom endpoint configuration to be used.
202     */
203    public void setEndpointConfiguration(EndpointConfiguration endpointConfiguration) {
204        this.endpointConfiguration = endpointConfiguration;
205    }
206
207    public String getEndpointKey() {
208        if (isLenientProperties()) {
209            // only use the endpoint uri without parameters as the properties is
210            // lenient
211            String uri = getEndpointUri();
212            if (uri.indexOf('?') != -1) {
213                return ObjectHelper.before(uri, "?");
214            } else {
215                return uri;
216            }
217        } else {
218            // use the full endpoint uri
219            return getEndpointUri();
220        }
221    }
222
223    public CamelContext getCamelContext() {
224        return camelContext;
225    }
226
227    /**
228     * Returns the component that created this endpoint.
229     * 
230     * @return the component that created this endpoint, or <tt>null</tt> if
231     *         none set
232     */
233    public Component getComponent() {
234        return component;
235    }
236
237    public void setCamelContext(CamelContext camelContext) {
238        this.camelContext = camelContext;
239    }
240
241    public PollingConsumer createPollingConsumer() throws Exception {
242        // should not call configurePollingConsumer when its EventDrivenPollingConsumer
243        if (LOG.isDebugEnabled()) {
244            LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} blockWhenFull: {} blockTimeout: {}",
245                    new Object[]{getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull(), getPollingConsumerBlockTimeout()});
246        }
247        EventDrivenPollingConsumer consumer = new EventDrivenPollingConsumer(this, getPollingConsumerQueueSize());
248        consumer.setBlockWhenFull(isPollingConsumerBlockWhenFull());
249        consumer.setBlockTimeout(getPollingConsumerBlockTimeout());
250        return consumer;
251    }
252
253    public Exchange createExchange(Exchange exchange) {
254        return exchange.copy();
255    }
256
257    public Exchange createExchange() {
258        return createExchange(getExchangePattern());
259    }
260
261    public Exchange createExchange(ExchangePattern pattern) {
262        return new DefaultExchange(this, pattern);
263    }
264
265    /**
266     * Returns the default exchange pattern to use when creating an exchange.
267     */
268    public ExchangePattern getExchangePattern() {
269        return exchangePattern;
270    }
271
272    /**
273     * Sets the default exchange pattern when creating an exchange.
274     */
275    public void setExchangePattern(ExchangePattern exchangePattern) {
276        this.exchangePattern = exchangePattern;
277    }
278
279    /**
280     * Returns whether synchronous processing should be strictly used.
281     */
282    public boolean isSynchronous() {
283        return synchronous;
284    }
285
286    /**
287     * Sets whether synchronous processing should be strictly used, or Camel is
288     * allowed to use asynchronous processing (if supported).
289     * 
290     * @param synchronous <tt>true</tt> to enforce synchronous processing
291     */
292    public void setSynchronous(boolean synchronous) {
293        this.synchronous = synchronous;
294    }
295
296    public boolean isBridgeErrorHandler() {
297        return bridgeErrorHandler;
298    }
299
300    /**
301     * Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while
302     * the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and
303     * handled by the routing Error Handler.
304     * <p/>
305     * By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions,
306     * that will be logged at WARN/ERROR level and ignored.
307     */
308    public void setBridgeErrorHandler(boolean bridgeErrorHandler) {
309        this.bridgeErrorHandler = bridgeErrorHandler;
310    }
311
312    public ExceptionHandler getExceptionHandler() {
313        return exceptionHandler;
314    }
315
316    /**
317     * To let the consumer use a custom ExceptionHandler.
318     + Notice if the option bridgeErrorHandler is enabled then this options is not in use.
319     + By default the consumer will deal with exceptions, that will be logged at WARN/ERROR level and ignored.
320     */
321    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
322        this.exceptionHandler = exceptionHandler;
323    }
324
325    /**
326     * Gets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
327     * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
328     * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
329     * <p/>
330     * The default value is <tt>1000</tt>
331     */
332    public int getPollingConsumerQueueSize() {
333        return pollingConsumerQueueSize;
334    }
335
336    /**
337     * Sets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
338     * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
339     * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
340     * <p/>
341     * The default value is <tt>1000</tt>
342     */
343    public void setPollingConsumerQueueSize(int pollingConsumerQueueSize) {
344        this.pollingConsumerQueueSize = pollingConsumerQueueSize;
345    }
346
347    /**
348     * Whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
349     * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
350     * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
351     * <p/>
352     * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown
353     * when trying to add to the queue, and its full.
354     * <p/>
355     * The default value is <tt>true</tt> which will block the producer queue until the queue has space.
356     */
357    public boolean isPollingConsumerBlockWhenFull() {
358        return pollingConsumerBlockWhenFull;
359    }
360
361    /**
362     * Set whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
363     * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
364     * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
365     * <p/>
366     * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown
367     * when trying to add to the queue, and its full.
368     * <p/>
369     * The default value is <tt>true</tt> which will block the producer queue until the queue has space.
370     */
371    public void setPollingConsumerBlockWhenFull(boolean pollingConsumerBlockWhenFull) {
372        this.pollingConsumerBlockWhenFull = pollingConsumerBlockWhenFull;
373    }
374
375    /**
376     * Sets the timeout in millis to use when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
377     * is being used.
378     *
379     * @see #setPollingConsumerBlockWhenFull(boolean)
380     */
381    public long getPollingConsumerBlockTimeout() {
382        return pollingConsumerBlockTimeout;
383    }
384
385    /**
386     * Sets the timeout in millis to use when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
387     * is being used.
388     *
389     * @see #setPollingConsumerBlockWhenFull(boolean)
390     */
391    public void setPollingConsumerBlockTimeout(long pollingConsumerBlockTimeout) {
392        this.pollingConsumerBlockTimeout = pollingConsumerBlockTimeout;
393    }
394
395    public void configureProperties(Map<String, Object> options) {
396        Map<String, Object> consumerProperties = IntrospectionSupport.extractProperties(options, "consumer.");
397        if (consumerProperties != null && !consumerProperties.isEmpty()) {
398            setConsumerProperties(consumerProperties);
399        }
400    }
401
402    /**
403     * Sets the bean properties on the given bean.
404     * <p/>
405     * This is the same logical implementation as {@link DefaultComponent#setProperties(Object, java.util.Map)}
406     *
407     * @param bean  the bean
408     * @param parameters  properties to set
409     */
410    protected void setProperties(Object bean, Map<String, Object> parameters) throws Exception {
411        // set reference properties first as they use # syntax that fools the regular properties setter
412        EndpointHelper.setReferenceProperties(getCamelContext(), bean, parameters);
413        EndpointHelper.setProperties(getCamelContext(), bean, parameters);
414    }
415
416    /**
417     * A factory method to lazily create the endpointUri if none is specified
418     */
419    protected String createEndpointUri() {
420        return null;
421    }
422
423    /**
424     * A factory method to lazily create the endpoint configuration if none is specified
425     */
426    protected EndpointConfiguration createEndpointConfiguration(String uri) {
427        // using this factory method to be backwards compatible with the old code
428        if (getComponent() != null) {
429            // prefer to use component endpoint configuration
430            try {
431                return getComponent().createConfiguration(uri);
432            } catch (Exception e) {
433                throw ObjectHelper.wrapRuntimeCamelException(e);
434            }
435        } else if (getCamelContext() != null) {
436            // fallback and use a mapped endpoint configuration
437            return new MappedEndpointConfiguration(getCamelContext(), uri);
438        }
439        // not configuration possible
440        return null;
441    }
442
443    /**
444     * Sets the endpointUri if it has not been specified yet via some kind of
445     * dependency injection mechanism. This allows dependency injection
446     * frameworks such as Spring or Guice to set the default endpoint URI in
447     * cases where it has not been explicitly configured using the name/context
448     * in which an Endpoint is created.
449     */
450    public void setEndpointUriIfNotSpecified(String value) {
451        if (endpointUri == null) {
452            setEndpointUri(value);
453        }
454    }
455
456    /**
457     * Sets the URI that created this endpoint.
458     */
459    protected void setEndpointUri(String endpointUri) {
460        this.endpointUri = endpointUri;
461    }
462
463    public boolean isLenientProperties() {
464        // default should be false for most components
465        return false;
466    }
467
468    public Map<String, Object> getConsumerProperties() {
469        if (consumerProperties == null) {
470            // must create empty if none exists
471            consumerProperties = new HashMap<String, Object>();
472        }
473        return consumerProperties;
474    }
475
476    public void setConsumerProperties(Map<String, Object> consumerProperties) {
477        // append consumer properties
478        if (consumerProperties != null && !consumerProperties.isEmpty()) {
479            if (this.consumerProperties == null) {
480                this.consumerProperties = new HashMap<String, Object>(consumerProperties);
481            } else {
482                this.consumerProperties.putAll(consumerProperties);
483            }
484        }
485    }
486
487    protected void configureConsumer(Consumer consumer) throws Exception {
488        if (consumerProperties != null) {
489            // use a defensive copy of the consumer properties as the methods below will remove the used properties
490            // and in case we restart routes, we need access to the original consumer properties again
491            Map<String, Object> copy = new HashMap<String, Object>(consumerProperties);
492
493            // set reference properties first as they use # syntax that fools the regular properties setter
494            EndpointHelper.setReferenceProperties(getCamelContext(), consumer, copy);
495            EndpointHelper.setProperties(getCamelContext(), consumer, copy);
496
497            // special consumer.bridgeErrorHandler option
498            Object bridge = copy.remove("bridgeErrorHandler");
499            if (bridge != null && "true".equals(bridge)) {
500                if (consumer instanceof DefaultConsumer) {
501                    DefaultConsumer defaultConsumer = (DefaultConsumer) consumer;
502                    defaultConsumer.setExceptionHandler(new BridgeExceptionHandlerToErrorHandler(defaultConsumer));
503                } else {
504                    throw new IllegalArgumentException("Option consumer.bridgeErrorHandler is only supported by endpoints,"
505                            + " having their consumer extend DefaultConsumer. The consumer is a " + consumer.getClass().getName() + " class.");
506                }
507            }
508
509            if (!this.isLenientProperties() && copy.size() > 0) {
510                throw new ResolveEndpointFailedException(this.getEndpointUri(), "There are " + copy.size()
511                    + " parameters that couldn't be set on the endpoint consumer."
512                    + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint."
513                    + " Unknown consumer parameters=[" + copy + "]");
514            }
515        }
516    }
517
518    protected void configurePollingConsumer(PollingConsumer consumer) throws Exception {
519        configureConsumer(consumer);
520    }
521
522    @Override
523    protected void doStart() throws Exception {
524        // the bridgeErrorHandler/exceptionHandler was originally configured with consumer. prefix, such as consumer.bridgeErrorHandler=true
525        // so if they have been configured on the endpoint then map to the old naming style
526        if (bridgeErrorHandler) {
527            getConsumerProperties().put("bridgeErrorHandler", "true");
528        }
529        if (exceptionHandler != null) {
530            getConsumerProperties().put("exceptionHandler", exceptionHandler);
531        }
532    }
533
534    @Override
535    protected void doStop() throws Exception {
536        // noop
537    }
538}