001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.List;
020
021import org.apache.camel.CamelContext;
022import org.apache.camel.ConsumerTemplate;
023import org.apache.camel.Endpoint;
024import org.apache.camel.Exchange;
025import org.apache.camel.spi.Synchronization;
026import org.apache.camel.support.ServiceSupport;
027import org.apache.camel.util.CamelContextHelper;
028import org.apache.camel.util.ServiceHelper;
029import org.apache.camel.util.UnitOfWorkHelper;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
034
035/**
036 * Template (named like Spring's TransactionTemplate & JmsTemplate
037 * et al) for working with Camel and consuming {@link org.apache.camel.Message} instances in an
038 * {@link Exchange} from an {@link Endpoint}.
039 *
040 * @version 
041 */
042public class DefaultConsumerTemplate extends ServiceSupport implements ConsumerTemplate {
043
044    private static final Logger LOG = LoggerFactory.getLogger(DefaultConsumerTemplate.class);
045    private final CamelContext camelContext;
046    private ConsumerCache consumerCache;
047    private int maximumCacheSize;
048
049    public DefaultConsumerTemplate(CamelContext camelContext) {
050        this.camelContext = camelContext;
051    }
052
053    public int getMaximumCacheSize() {
054        return maximumCacheSize;
055    }
056
057    public void setMaximumCacheSize(int maximumCacheSize) {
058        this.maximumCacheSize = maximumCacheSize;
059    }
060
061    public int getCurrentCacheSize() {
062        if (consumerCache == null) {
063            return 0;
064        }
065        return consumerCache.size();
066    }
067
068    public void cleanUp() {
069        if (consumerCache != null) {
070            consumerCache.cleanUp();
071        }
072    }
073
074    /**
075     * @deprecated use {@link #getCamelContext()}
076     */
077    @Deprecated
078    public CamelContext getContext() {
079        return getCamelContext();
080    }
081
082    public CamelContext getCamelContext() {
083        return camelContext;
084    }
085
086    public Exchange receive(String endpointUri) {
087        Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
088        return getConsumerCache().receive(endpoint);
089    }
090
091    public Exchange receive(Endpoint endpoint) {
092        return receive(endpoint.getEndpointUri());
093    }
094
095    public Exchange receive(String endpointUri, long timeout) {
096        Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
097        return getConsumerCache().receive(endpoint, timeout);
098    }
099
100    public Exchange receive(Endpoint endpoint, long timeout) {
101        return receive(endpoint.getEndpointUri(), timeout);
102    }
103
104    public Exchange receiveNoWait(String endpointUri) {
105        Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
106        return getConsumerCache().receiveNoWait(endpoint);
107    }
108
109    public Exchange receiveNoWait(Endpoint endpoint) {
110        return receiveNoWait(endpoint.getEndpointUri());
111    }
112
113    public Object receiveBody(String endpointUri) {
114        Object answer = null;
115        Exchange exchange = receive(endpointUri);
116        try {
117            answer = extractResultBody(exchange);
118        } finally {
119            doneUoW(exchange);
120        }
121        return answer;
122    }
123
124    public Object receiveBody(Endpoint endpoint) {
125        return receiveBody(endpoint.getEndpointUri());
126    }
127
128    public Object receiveBody(String endpointUri, long timeout) {
129        Object answer = null;
130        Exchange exchange = receive(endpointUri, timeout);
131        try {
132            answer = extractResultBody(exchange);
133        } finally {
134            doneUoW(exchange);
135        }
136        return answer;
137    }
138
139    public Object receiveBody(Endpoint endpoint, long timeout) {
140        return receiveBody(endpoint.getEndpointUri(), timeout);
141    }
142
143    public Object receiveBodyNoWait(String endpointUri) {
144        Object answer = null;
145        Exchange exchange = receiveNoWait(endpointUri);
146        try {
147            answer = extractResultBody(exchange);
148        } finally {
149            doneUoW(exchange);
150        }
151        return answer;
152    }
153
154    public Object receiveBodyNoWait(Endpoint endpoint) {
155        return receiveBodyNoWait(endpoint.getEndpointUri());
156    }
157
158    @SuppressWarnings("unchecked")
159    public <T> T receiveBody(String endpointUri, Class<T> type) {
160        Object answer = null;
161        Exchange exchange = receive(endpointUri);
162        try {
163            answer = extractResultBody(exchange);
164            answer = camelContext.getTypeConverter().convertTo(type, exchange, answer);
165        } finally {
166            doneUoW(exchange);
167        }
168        return (T) answer;
169    }
170
171    public <T> T receiveBody(Endpoint endpoint, Class<T> type) {
172        return receiveBody(endpoint.getEndpointUri(), type);
173    }
174
175    @SuppressWarnings("unchecked")
176    public <T> T receiveBody(String endpointUri, long timeout, Class<T> type) {
177        Object answer = null;
178        Exchange exchange = receive(endpointUri, timeout);
179        try {
180            answer = extractResultBody(exchange);
181            answer = camelContext.getTypeConverter().convertTo(type, exchange, answer);
182        } finally {
183            doneUoW(exchange);
184        }
185        return (T) answer;
186    }
187
188    public <T> T receiveBody(Endpoint endpoint, long timeout, Class<T> type) {
189        return receiveBody(endpoint.getEndpointUri(), timeout, type);
190    }
191
192    @SuppressWarnings("unchecked")
193    public <T> T receiveBodyNoWait(String endpointUri, Class<T> type) {
194        Object answer = null;
195        Exchange exchange = receiveNoWait(endpointUri);
196        try {
197            answer = extractResultBody(exchange);
198            answer = camelContext.getTypeConverter().convertTo(type, exchange, answer);
199        } finally {
200            doneUoW(exchange);
201        }
202        return (T) answer;
203    }
204
205    public <T> T receiveBodyNoWait(Endpoint endpoint, Class<T> type) {
206        return receiveBodyNoWait(endpoint.getEndpointUri(), type);
207    }
208
209    public void doneUoW(Exchange exchange) {
210        try {
211            // The receiveBody method will get a null exchange
212            if (exchange == null) {
213                return;
214            }
215            if (exchange.getUnitOfWork() == null) {
216                // handover completions and done them manually to ensure they are being executed
217                List<Synchronization> synchronizations = exchange.handoverCompletions();
218                UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG);
219            } else {
220                // done the unit of work
221                exchange.getUnitOfWork().done(exchange);
222            }
223        } catch (Throwable e) {
224            LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange
225                    + ". This exception will be ignored.", e);
226        }
227    }
228
229    protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
230        return CamelContextHelper.getMandatoryEndpoint(camelContext, endpointUri);
231    }
232
233    /**
234     * Extracts the body from the given result.
235     * <p/>
236     * If the exchange pattern is provided it will try to honor it and retrieve the body
237     * from either IN or OUT according to the pattern.
238     *
239     * @param result   the result
240     * @return  the result, can be <tt>null</tt>.
241     */
242    protected Object extractResultBody(Exchange result) {
243        Object answer = null;
244        if (result != null) {
245            // rethrow if there was an exception
246            if (result.getException() != null) {
247                throw wrapRuntimeCamelException(result.getException());
248            }
249
250            // okay no fault then return the response
251            if (result.hasOut()) {
252                // use OUT as the response
253                answer = result.getOut().getBody();
254            } else {
255                // use IN as the response
256                answer = result.getIn().getBody();
257            }
258        }
259        return answer;
260    }
261
262    private ConsumerCache getConsumerCache() {
263        if (!isStarted()) {
264            throw new IllegalStateException("ConsumerTemplate has not been started");
265        }
266        return consumerCache;
267    }
268
269    protected void doStart() throws Exception {
270        if (consumerCache == null) {
271            if (maximumCacheSize > 0) {
272                consumerCache = new ConsumerCache(this, camelContext, maximumCacheSize);
273            } else {
274                consumerCache = new ConsumerCache(this, camelContext);
275            }
276        }
277        ServiceHelper.startService(consumerCache);
278    }
279
280    protected void doStop() throws Exception {
281        // we should shutdown the services as this is our intention, to not re-use the services anymore
282        ServiceHelper.stopAndShutdownService(consumerCache);
283        consumerCache = null;
284    }
285
286}