001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.List; 020 021import org.apache.camel.CamelContext; 022import org.apache.camel.ConsumerTemplate; 023import org.apache.camel.Endpoint; 024import org.apache.camel.Exchange; 025import org.apache.camel.spi.Synchronization; 026import org.apache.camel.support.ServiceSupport; 027import org.apache.camel.util.CamelContextHelper; 028import org.apache.camel.util.ServiceHelper; 029import org.apache.camel.util.UnitOfWorkHelper; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException; 034 035/** 036 * Template (named like Spring's TransactionTemplate & JmsTemplate 037 * et al) for working with Camel and consuming {@link org.apache.camel.Message} instances in an 038 * {@link Exchange} from an {@link Endpoint}. 039 * 040 * @version 041 */ 042public class DefaultConsumerTemplate extends ServiceSupport implements ConsumerTemplate { 043 044 private static final Logger LOG = LoggerFactory.getLogger(DefaultConsumerTemplate.class); 045 private final CamelContext camelContext; 046 private ConsumerCache consumerCache; 047 private int maximumCacheSize; 048 049 public DefaultConsumerTemplate(CamelContext camelContext) { 050 this.camelContext = camelContext; 051 } 052 053 public int getMaximumCacheSize() { 054 return maximumCacheSize; 055 } 056 057 public void setMaximumCacheSize(int maximumCacheSize) { 058 this.maximumCacheSize = maximumCacheSize; 059 } 060 061 public int getCurrentCacheSize() { 062 if (consumerCache == null) { 063 return 0; 064 } 065 return consumerCache.size(); 066 } 067 068 public void cleanUp() { 069 if (consumerCache != null) { 070 consumerCache.cleanUp(); 071 } 072 } 073 074 /** 075 * @deprecated use {@link #getCamelContext()} 076 */ 077 @Deprecated 078 public CamelContext getContext() { 079 return getCamelContext(); 080 } 081 082 public CamelContext getCamelContext() { 083 return camelContext; 084 } 085 086 public Exchange receive(String endpointUri) { 087 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 088 return getConsumerCache().receive(endpoint); 089 } 090 091 public Exchange receive(Endpoint endpoint) { 092 return receive(endpoint.getEndpointUri()); 093 } 094 095 public Exchange receive(String endpointUri, long timeout) { 096 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 097 return getConsumerCache().receive(endpoint, timeout); 098 } 099 100 public Exchange receive(Endpoint endpoint, long timeout) { 101 return receive(endpoint.getEndpointUri(), timeout); 102 } 103 104 public Exchange receiveNoWait(String endpointUri) { 105 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 106 return getConsumerCache().receiveNoWait(endpoint); 107 } 108 109 public Exchange receiveNoWait(Endpoint endpoint) { 110 return receiveNoWait(endpoint.getEndpointUri()); 111 } 112 113 public Object receiveBody(String endpointUri) { 114 Object answer = null; 115 Exchange exchange = receive(endpointUri); 116 try { 117 answer = extractResultBody(exchange); 118 } finally { 119 doneUoW(exchange); 120 } 121 return answer; 122 } 123 124 public Object receiveBody(Endpoint endpoint) { 125 return receiveBody(endpoint.getEndpointUri()); 126 } 127 128 public Object receiveBody(String endpointUri, long timeout) { 129 Object answer = null; 130 Exchange exchange = receive(endpointUri, timeout); 131 try { 132 answer = extractResultBody(exchange); 133 } finally { 134 doneUoW(exchange); 135 } 136 return answer; 137 } 138 139 public Object receiveBody(Endpoint endpoint, long timeout) { 140 return receiveBody(endpoint.getEndpointUri(), timeout); 141 } 142 143 public Object receiveBodyNoWait(String endpointUri) { 144 Object answer = null; 145 Exchange exchange = receiveNoWait(endpointUri); 146 try { 147 answer = extractResultBody(exchange); 148 } finally { 149 doneUoW(exchange); 150 } 151 return answer; 152 } 153 154 public Object receiveBodyNoWait(Endpoint endpoint) { 155 return receiveBodyNoWait(endpoint.getEndpointUri()); 156 } 157 158 @SuppressWarnings("unchecked") 159 public <T> T receiveBody(String endpointUri, Class<T> type) { 160 Object answer = null; 161 Exchange exchange = receive(endpointUri); 162 try { 163 answer = extractResultBody(exchange); 164 answer = camelContext.getTypeConverter().convertTo(type, exchange, answer); 165 } finally { 166 doneUoW(exchange); 167 } 168 return (T) answer; 169 } 170 171 public <T> T receiveBody(Endpoint endpoint, Class<T> type) { 172 return receiveBody(endpoint.getEndpointUri(), type); 173 } 174 175 @SuppressWarnings("unchecked") 176 public <T> T receiveBody(String endpointUri, long timeout, Class<T> type) { 177 Object answer = null; 178 Exchange exchange = receive(endpointUri, timeout); 179 try { 180 answer = extractResultBody(exchange); 181 answer = camelContext.getTypeConverter().convertTo(type, exchange, answer); 182 } finally { 183 doneUoW(exchange); 184 } 185 return (T) answer; 186 } 187 188 public <T> T receiveBody(Endpoint endpoint, long timeout, Class<T> type) { 189 return receiveBody(endpoint.getEndpointUri(), timeout, type); 190 } 191 192 @SuppressWarnings("unchecked") 193 public <T> T receiveBodyNoWait(String endpointUri, Class<T> type) { 194 Object answer = null; 195 Exchange exchange = receiveNoWait(endpointUri); 196 try { 197 answer = extractResultBody(exchange); 198 answer = camelContext.getTypeConverter().convertTo(type, exchange, answer); 199 } finally { 200 doneUoW(exchange); 201 } 202 return (T) answer; 203 } 204 205 public <T> T receiveBodyNoWait(Endpoint endpoint, Class<T> type) { 206 return receiveBodyNoWait(endpoint.getEndpointUri(), type); 207 } 208 209 public void doneUoW(Exchange exchange) { 210 try { 211 // The receiveBody method will get a null exchange 212 if (exchange == null) { 213 return; 214 } 215 if (exchange.getUnitOfWork() == null) { 216 // handover completions and done them manually to ensure they are being executed 217 List<Synchronization> synchronizations = exchange.handoverCompletions(); 218 UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG); 219 } else { 220 // done the unit of work 221 exchange.getUnitOfWork().done(exchange); 222 } 223 } catch (Throwable e) { 224 LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange 225 + ". This exception will be ignored.", e); 226 } 227 } 228 229 protected Endpoint resolveMandatoryEndpoint(String endpointUri) { 230 return CamelContextHelper.getMandatoryEndpoint(camelContext, endpointUri); 231 } 232 233 /** 234 * Extracts the body from the given result. 235 * <p/> 236 * If the exchange pattern is provided it will try to honor it and retrieve the body 237 * from either IN or OUT according to the pattern. 238 * 239 * @param result the result 240 * @return the result, can be <tt>null</tt>. 241 */ 242 protected Object extractResultBody(Exchange result) { 243 Object answer = null; 244 if (result != null) { 245 // rethrow if there was an exception 246 if (result.getException() != null) { 247 throw wrapRuntimeCamelException(result.getException()); 248 } 249 250 // okay no fault then return the response 251 if (result.hasOut()) { 252 // use OUT as the response 253 answer = result.getOut().getBody(); 254 } else { 255 // use IN as the response 256 answer = result.getIn().getBody(); 257 } 258 } 259 return answer; 260 } 261 262 private ConsumerCache getConsumerCache() { 263 if (!isStarted()) { 264 throw new IllegalStateException("ConsumerTemplate has not been started"); 265 } 266 return consumerCache; 267 } 268 269 protected void doStart() throws Exception { 270 if (consumerCache == null) { 271 if (maximumCacheSize > 0) { 272 consumerCache = new ConsumerCache(this, camelContext, maximumCacheSize); 273 } else { 274 consumerCache = new ConsumerCache(this, camelContext); 275 } 276 } 277 ServiceHelper.startService(consumerCache); 278 } 279 280 protected void doStop() throws Exception { 281 // we should shutdown the services as this is our intention, to not re-use the services anymore 282 ServiceHelper.stopAndShutdownService(consumerCache); 283 consumerCache = null; 284 } 285 286}