001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.List; 020 021import org.apache.camel.CamelContext; 022import org.apache.camel.ConsumerTemplate; 023import org.apache.camel.Endpoint; 024import org.apache.camel.Exchange; 025import org.apache.camel.PollingConsumer; 026import org.apache.camel.spi.Synchronization; 027import org.apache.camel.support.ServiceSupport; 028import org.apache.camel.util.CamelContextHelper; 029import org.apache.camel.util.LRUCache; 030import org.apache.camel.util.ServiceHelper; 031import org.apache.camel.util.UnitOfWorkHelper; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException; 036 037/** 038 * Template (named like Spring's TransactionTemplate & JmsTemplate 039 * et al) for working with Camel and consuming {@link org.apache.camel.Message} instances in an 040 * {@link Exchange} from an {@link Endpoint}. 041 * 042 * @version 043 */ 044public class DefaultConsumerTemplate extends ServiceSupport implements ConsumerTemplate { 045 046 private static final Logger LOG = LoggerFactory.getLogger(DefaultConsumerTemplate.class); 047 private final CamelContext camelContext; 048 private ConsumerCache consumerCache; 049 private int maximumCacheSize; 050 051 public DefaultConsumerTemplate(CamelContext camelContext) { 052 this.camelContext = camelContext; 053 } 054 055 public int getMaximumCacheSize() { 056 return maximumCacheSize; 057 } 058 059 public void setMaximumCacheSize(int maximumCacheSize) { 060 this.maximumCacheSize = maximumCacheSize; 061 } 062 063 public int getCurrentCacheSize() { 064 if (consumerCache == null) { 065 return 0; 066 } 067 return consumerCache.size(); 068 } 069 070 public void cleanUp() { 071 if (consumerCache != null) { 072 consumerCache.cleanUp(); 073 } 074 } 075 076 /** 077 * @deprecated use {@link #getCamelContext()} 078 */ 079 @Deprecated 080 public CamelContext getContext() { 081 return getCamelContext(); 082 } 083 084 public CamelContext getCamelContext() { 085 return camelContext; 086 } 087 088 public Exchange receive(String endpointUri) { 089 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 090 return getConsumerCache().receive(endpoint); 091 } 092 093 public Exchange receive(Endpoint endpoint) { 094 return receive(endpoint.getEndpointUri()); 095 } 096 097 public Exchange receive(String endpointUri, long timeout) { 098 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 099 return getConsumerCache().receive(endpoint, timeout); 100 } 101 102 public Exchange receive(Endpoint endpoint, long timeout) { 103 return receive(endpoint.getEndpointUri(), timeout); 104 } 105 106 public Exchange receiveNoWait(String endpointUri) { 107 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 108 return getConsumerCache().receiveNoWait(endpoint); 109 } 110 111 public Exchange receiveNoWait(Endpoint endpoint) { 112 return receiveNoWait(endpoint.getEndpointUri()); 113 } 114 115 public Object receiveBody(String endpointUri) { 116 Object answer = null; 117 Exchange exchange = receive(endpointUri); 118 try { 119 answer = extractResultBody(exchange); 120 } finally { 121 doneUoW(exchange); 122 } 123 return answer; 124 } 125 126 public Object receiveBody(Endpoint endpoint) { 127 return receiveBody(endpoint.getEndpointUri()); 128 } 129 130 public Object receiveBody(String endpointUri, long timeout) { 131 Object answer = null; 132 Exchange exchange = receive(endpointUri, timeout); 133 try { 134 answer = extractResultBody(exchange); 135 } finally { 136 doneUoW(exchange); 137 } 138 return answer; 139 } 140 141 public Object receiveBody(Endpoint endpoint, long timeout) { 142 return receiveBody(endpoint.getEndpointUri(), timeout); 143 } 144 145 public Object receiveBodyNoWait(String endpointUri) { 146 Object answer = null; 147 Exchange exchange = receiveNoWait(endpointUri); 148 try { 149 answer = extractResultBody(exchange); 150 } finally { 151 doneUoW(exchange); 152 } 153 return answer; 154 } 155 156 public Object receiveBodyNoWait(Endpoint endpoint) { 157 return receiveBodyNoWait(endpoint.getEndpointUri()); 158 } 159 160 @SuppressWarnings("unchecked") 161 public <T> T receiveBody(String endpointUri, Class<T> type) { 162 Object answer = null; 163 Exchange exchange = receive(endpointUri); 164 try { 165 answer = extractResultBody(exchange); 166 answer = camelContext.getTypeConverter().convertTo(type, exchange, answer); 167 } finally { 168 doneUoW(exchange); 169 } 170 return (T) answer; 171 } 172 173 public <T> T receiveBody(Endpoint endpoint, Class<T> type) { 174 return receiveBody(endpoint.getEndpointUri(), type); 175 } 176 177 @SuppressWarnings("unchecked") 178 public <T> T receiveBody(String endpointUri, long timeout, Class<T> type) { 179 Object answer = null; 180 Exchange exchange = receive(endpointUri, timeout); 181 try { 182 answer = extractResultBody(exchange); 183 answer = camelContext.getTypeConverter().convertTo(type, exchange, answer); 184 } finally { 185 doneUoW(exchange); 186 } 187 return (T) answer; 188 } 189 190 public <T> T receiveBody(Endpoint endpoint, long timeout, Class<T> type) { 191 return receiveBody(endpoint.getEndpointUri(), timeout, type); 192 } 193 194 @SuppressWarnings("unchecked") 195 public <T> T receiveBodyNoWait(String endpointUri, Class<T> type) { 196 Object answer = null; 197 Exchange exchange = receiveNoWait(endpointUri); 198 try { 199 answer = extractResultBody(exchange); 200 answer = camelContext.getTypeConverter().convertTo(type, exchange, answer); 201 } finally { 202 doneUoW(exchange); 203 } 204 return (T) answer; 205 } 206 207 public <T> T receiveBodyNoWait(Endpoint endpoint, Class<T> type) { 208 return receiveBodyNoWait(endpoint.getEndpointUri(), type); 209 } 210 211 public void doneUoW(Exchange exchange) { 212 try { 213 // The receiveBody method will get a null exchange 214 if (exchange == null) { 215 return; 216 } 217 if (exchange.getUnitOfWork() == null) { 218 // handover completions and done them manually to ensure they are being executed 219 List<Synchronization> synchronizations = exchange.handoverCompletions(); 220 UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG); 221 } else { 222 // done the unit of work 223 exchange.getUnitOfWork().done(exchange); 224 } 225 } catch (Throwable e) { 226 LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange 227 + ". This exception will be ignored.", e); 228 } 229 } 230 231 protected Endpoint resolveMandatoryEndpoint(String endpointUri) { 232 return CamelContextHelper.getMandatoryEndpoint(camelContext, endpointUri); 233 } 234 235 /** 236 * Extracts the body from the given result. 237 * <p/> 238 * If the exchange pattern is provided it will try to honor it and retrieve the body 239 * from either IN or OUT according to the pattern. 240 * 241 * @param result the result 242 * @return the result, can be <tt>null</tt>. 243 */ 244 protected Object extractResultBody(Exchange result) { 245 Object answer = null; 246 if (result != null) { 247 // rethrow if there was an exception 248 if (result.getException() != null) { 249 throw wrapRuntimeCamelException(result.getException()); 250 } 251 252 // okay no fault then return the response 253 if (result.hasOut()) { 254 // use OUT as the response 255 answer = result.getOut().getBody(); 256 } else { 257 // use IN as the response 258 answer = result.getIn().getBody(); 259 } 260 } 261 return answer; 262 } 263 264 private ConsumerCache getConsumerCache() { 265 if (!isStarted()) { 266 throw new IllegalStateException("ConsumerTemplate has not been started"); 267 } 268 return consumerCache; 269 } 270 271 protected void doStart() throws Exception { 272 if (consumerCache == null) { 273 if (maximumCacheSize > 0) { 274 consumerCache = new ConsumerCache(this, camelContext, maximumCacheSize); 275 } else { 276 consumerCache = new ConsumerCache(this, camelContext); 277 } 278 } 279 ServiceHelper.startService(consumerCache); 280 } 281 282 protected void doStop() throws Exception { 283 // we should shutdown the services as this is our intention, to not re-use the services anymore 284 ServiceHelper.stopAndShutdownService(consumerCache); 285 consumerCache = null; 286 } 287 288}