001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import org.apache.camel.AsyncProcessor;
020import org.apache.camel.Consumer;
021import org.apache.camel.Endpoint;
022import org.apache.camel.Exchange;
023import org.apache.camel.Processor;
024import org.apache.camel.Route;
025import org.apache.camel.RouteAware;
026import org.apache.camel.spi.ExceptionHandler;
027import org.apache.camel.spi.UnitOfWork;
028import org.apache.camel.support.LoggingExceptionHandler;
029import org.apache.camel.support.ServiceSupport;
030import org.apache.camel.util.AsyncProcessorConverterHelper;
031import org.apache.camel.util.ServiceHelper;
032import org.apache.camel.util.URISupport;
033import org.apache.camel.util.UnitOfWorkHelper;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * A default consumer useful for implementation inheritance.
039 *
040 * @version 
041 */
042public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAware {
043    protected final Logger log = LoggerFactory.getLogger(getClass());
044    private transient String consumerToString;
045    private final Endpoint endpoint;
046    private final Processor processor;
047    private volatile AsyncProcessor asyncProcessor;
048    private ExceptionHandler exceptionHandler;
049    private Route route;
050
051    public DefaultConsumer(Endpoint endpoint, Processor processor) {
052        this.endpoint = endpoint;
053        this.processor = processor;
054        this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
055    }
056
057    @Override
058    public String toString() {
059        if (consumerToString == null) {
060            consumerToString = "Consumer[" + URISupport.sanitizeUri(endpoint.getEndpointUri()) + "]";
061        }
062        return consumerToString;
063    }
064
065    public Route getRoute() {
066        return route;
067    }
068
069    public void setRoute(Route route) {
070        this.route = route;
071    }
072
073    /**
074     * If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on
075     * the processed {@link Exchange} then this method should be use to create and start
076     * the {@link UnitOfWork} on the exchange.
077     *
078     * @param exchange the exchange
079     * @return the created and started unit of work
080     * @throws Exception is thrown if error starting the unit of work
081     *
082     * @see #doneUoW(org.apache.camel.Exchange)
083     */
084    public UnitOfWork createUoW(Exchange exchange) throws Exception {
085        // if the exchange doesn't have from route id set, then set it if it originated
086        // from this unit of work
087        if (route != null && exchange.getFromRouteId() == null) {
088            exchange.setFromRouteId(route.getId());
089        }
090
091        UnitOfWork uow = endpoint.getCamelContext().getUnitOfWorkFactory().createUnitOfWork(exchange);
092        exchange.setUnitOfWork(uow);
093        uow.start();
094        return uow;
095    }
096
097    /**
098     * If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on
099     * the processed {@link Exchange} then this method should be executed when the consumer
100     * is finished processing the message.
101     *
102     * @param exchange the exchange
103     *
104     * @see #createUoW(org.apache.camel.Exchange)
105     */
106    public void doneUoW(Exchange exchange) {
107        UnitOfWorkHelper.doneUow(exchange.getUnitOfWork(), exchange);
108    }
109
110    public Endpoint getEndpoint() {
111        return endpoint;
112    }
113
114    public Processor getProcessor() {
115        return processor;
116    }
117
118    /**
119     * Provides an {@link org.apache.camel.AsyncProcessor} interface to the configured
120     * processor on the consumer. If the processor does not implement the interface,
121     * it will be adapted so that it does.
122     */
123    public synchronized AsyncProcessor getAsyncProcessor() {
124        if (asyncProcessor == null) {            
125            asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
126        }
127        return asyncProcessor;
128    }
129
130    public ExceptionHandler getExceptionHandler() {
131        return exceptionHandler;
132    }
133
134    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
135        this.exceptionHandler = exceptionHandler;
136    }
137
138    protected void doStop() throws Exception {
139        log.debug("Stopping consumer: {}", this);
140        ServiceHelper.stopServices(processor);
141    }
142
143    protected void doStart() throws Exception {
144        log.debug("Starting consumer: {}", this);
145        ServiceHelper.startServices(processor);
146    }
147
148    /**
149     * Handles the given exception using the {@link #getExceptionHandler()}
150     * 
151     * @param t the exception to handle
152     */
153    protected void handleException(Throwable t) {
154        Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t;
155        getExceptionHandler().handleException(newt);
156    }
157
158    /**
159     * Handles the given exception using the {@link #getExceptionHandler()}
160     *
161     * @param message additional message about the exception
162     * @param t the exception to handle
163     */
164    protected void handleException(String message, Throwable t) {
165        Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t;
166        getExceptionHandler().handleException(message, newt);
167    }
168}