001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.List;
020
021import org.apache.camel.CamelContext;
022import org.apache.camel.ConsumerTemplate;
023import org.apache.camel.Endpoint;
024import org.apache.camel.Exchange;
025import org.apache.camel.PollingConsumer;
026import org.apache.camel.spi.Synchronization;
027import org.apache.camel.support.ServiceSupport;
028import org.apache.camel.util.CamelContextHelper;
029import org.apache.camel.util.LRUCache;
030import org.apache.camel.util.ServiceHelper;
031import org.apache.camel.util.UnitOfWorkHelper;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
036
037/**
038 * Template (named like Spring's TransactionTemplate & JmsTemplate
039 * et al) for working with Camel and consuming {@link org.apache.camel.Message} instances in an
040 * {@link Exchange} from an {@link Endpoint}.
041 *
042 * @version 
043 */
044public class DefaultConsumerTemplate extends ServiceSupport implements ConsumerTemplate {
045
046    private static final Logger LOG = LoggerFactory.getLogger(DefaultConsumerTemplate.class);
047    private final CamelContext camelContext;
048    private ConsumerCache consumerCache;
049    private int maximumCacheSize;
050
051    public DefaultConsumerTemplate(CamelContext camelContext) {
052        this.camelContext = camelContext;
053    }
054
055    public int getMaximumCacheSize() {
056        return maximumCacheSize;
057    }
058
059    public void setMaximumCacheSize(int maximumCacheSize) {
060        this.maximumCacheSize = maximumCacheSize;
061    }
062
063    public int getCurrentCacheSize() {
064        if (consumerCache == null) {
065            return 0;
066        }
067        return consumerCache.size();
068    }
069
070    public void cleanUp() {
071        if (consumerCache != null) {
072            consumerCache.cleanUp();
073        }
074    }
075
076    /**
077     * @deprecated use {@link #getCamelContext()}
078     */
079    @Deprecated
080    public CamelContext getContext() {
081        return getCamelContext();
082    }
083
084    public CamelContext getCamelContext() {
085        return camelContext;
086    }
087
088    public Exchange receive(String endpointUri) {
089        Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
090        return getConsumerCache().receive(endpoint);
091    }
092
093    public Exchange receive(Endpoint endpoint) {
094        return receive(endpoint.getEndpointUri());
095    }
096
097    public Exchange receive(String endpointUri, long timeout) {
098        Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
099        return getConsumerCache().receive(endpoint, timeout);
100    }
101
102    public Exchange receive(Endpoint endpoint, long timeout) {
103        return receive(endpoint.getEndpointUri(), timeout);
104    }
105
106    public Exchange receiveNoWait(String endpointUri) {
107        Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
108        return getConsumerCache().receiveNoWait(endpoint);
109    }
110
111    public Exchange receiveNoWait(Endpoint endpoint) {
112        return receiveNoWait(endpoint.getEndpointUri());
113    }
114
115    public Object receiveBody(String endpointUri) {
116        Object answer = null;
117        Exchange exchange = receive(endpointUri);
118        try {
119            answer = extractResultBody(exchange);
120        } finally {
121            doneUoW(exchange);
122        }
123        return answer;
124    }
125
126    public Object receiveBody(Endpoint endpoint) {
127        return receiveBody(endpoint.getEndpointUri());
128    }
129
130    public Object receiveBody(String endpointUri, long timeout) {
131        Object answer = null;
132        Exchange exchange = receive(endpointUri, timeout);
133        try {
134            answer = extractResultBody(exchange);
135        } finally {
136            doneUoW(exchange);
137        }
138        return answer;
139    }
140
141    public Object receiveBody(Endpoint endpoint, long timeout) {
142        return receiveBody(endpoint.getEndpointUri(), timeout);
143    }
144
145    public Object receiveBodyNoWait(String endpointUri) {
146        Object answer = null;
147        Exchange exchange = receiveNoWait(endpointUri);
148        try {
149            answer = extractResultBody(exchange);
150        } finally {
151            doneUoW(exchange);
152        }
153        return answer;
154    }
155
156    public Object receiveBodyNoWait(Endpoint endpoint) {
157        return receiveBodyNoWait(endpoint.getEndpointUri());
158    }
159
160    @SuppressWarnings("unchecked")
161    public <T> T receiveBody(String endpointUri, Class<T> type) {
162        Object answer = null;
163        Exchange exchange = receive(endpointUri);
164        try {
165            answer = extractResultBody(exchange);
166            answer = camelContext.getTypeConverter().convertTo(type, exchange, answer);
167        } finally {
168            doneUoW(exchange);
169        }
170        return (T) answer;
171    }
172
173    public <T> T receiveBody(Endpoint endpoint, Class<T> type) {
174        return receiveBody(endpoint.getEndpointUri(), type);
175    }
176
177    @SuppressWarnings("unchecked")
178    public <T> T receiveBody(String endpointUri, long timeout, Class<T> type) {
179        Object answer = null;
180        Exchange exchange = receive(endpointUri, timeout);
181        try {
182            answer = extractResultBody(exchange);
183            answer = camelContext.getTypeConverter().convertTo(type, exchange, answer);
184        } finally {
185            doneUoW(exchange);
186        }
187        return (T) answer;
188    }
189
190    public <T> T receiveBody(Endpoint endpoint, long timeout, Class<T> type) {
191        return receiveBody(endpoint.getEndpointUri(), timeout, type);
192    }
193
194    @SuppressWarnings("unchecked")
195    public <T> T receiveBodyNoWait(String endpointUri, Class<T> type) {
196        Object answer = null;
197        Exchange exchange = receiveNoWait(endpointUri);
198        try {
199            answer = extractResultBody(exchange);
200            answer = camelContext.getTypeConverter().convertTo(type, exchange, answer);
201        } finally {
202            doneUoW(exchange);
203        }
204        return (T) answer;
205    }
206
207    public <T> T receiveBodyNoWait(Endpoint endpoint, Class<T> type) {
208        return receiveBodyNoWait(endpoint.getEndpointUri(), type);
209    }
210
211    public void doneUoW(Exchange exchange) {
212        try {
213            // The receiveBody method will get a null exchange
214            if (exchange == null) {
215                return;
216            }
217            if (exchange.getUnitOfWork() == null) {
218                // handover completions and done them manually to ensure they are being executed
219                List<Synchronization> synchronizations = exchange.handoverCompletions();
220                UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG);
221            } else {
222                // done the unit of work
223                exchange.getUnitOfWork().done(exchange);
224            }
225        } catch (Throwable e) {
226            LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange
227                    + ". This exception will be ignored.", e);
228        }
229    }
230
231    protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
232        return CamelContextHelper.getMandatoryEndpoint(camelContext, endpointUri);
233    }
234
235    /**
236     * Extracts the body from the given result.
237     * <p/>
238     * If the exchange pattern is provided it will try to honor it and retrieve the body
239     * from either IN or OUT according to the pattern.
240     *
241     * @param result   the result
242     * @return  the result, can be <tt>null</tt>.
243     */
244    protected Object extractResultBody(Exchange result) {
245        Object answer = null;
246        if (result != null) {
247            // rethrow if there was an exception
248            if (result.getException() != null) {
249                throw wrapRuntimeCamelException(result.getException());
250            }
251
252            // okay no fault then return the response
253            if (result.hasOut()) {
254                // use OUT as the response
255                answer = result.getOut().getBody();
256            } else {
257                // use IN as the response
258                answer = result.getIn().getBody();
259            }
260        }
261        return answer;
262    }
263
264    private ConsumerCache getConsumerCache() {
265        if (!isStarted()) {
266            throw new IllegalStateException("ConsumerTemplate has not been started");
267        }
268        return consumerCache;
269    }
270
271    protected void doStart() throws Exception {
272        if (consumerCache == null) {
273            if (maximumCacheSize > 0) {
274                consumerCache = new ConsumerCache(this, camelContext, maximumCacheSize);
275            } else {
276                consumerCache = new ConsumerCache(this, camelContext);
277            }
278        }
279        ServiceHelper.startService(consumerCache);
280    }
281
282    protected void doStop() throws Exception {
283        // we should shutdown the services as this is our intention, to not re-use the services anymore
284        ServiceHelper.stopAndShutdownService(consumerCache);
285        consumerCache = null;
286    }
287
288}