001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.lang.reflect.Method; 020import java.util.Set; 021import javax.xml.bind.annotation.XmlTransient; 022 023import org.apache.camel.CamelContext; 024import org.apache.camel.CamelContextAware; 025import org.apache.camel.Consume; 026import org.apache.camel.Consumer; 027import org.apache.camel.ConsumerTemplate; 028import org.apache.camel.Endpoint; 029import org.apache.camel.FluentProducerTemplate; 030import org.apache.camel.IsSingleton; 031import org.apache.camel.NoSuchBeanException; 032import org.apache.camel.PollingConsumer; 033import org.apache.camel.Processor; 034import org.apache.camel.Producer; 035import org.apache.camel.ProducerTemplate; 036import org.apache.camel.ProxyInstantiationException; 037import org.apache.camel.Service; 038import org.apache.camel.builder.DefaultFluentProducerTemplate; 039import org.apache.camel.component.bean.BeanInfo; 040import org.apache.camel.component.bean.BeanProcessor; 041import org.apache.camel.component.bean.ProxyHelper; 042import org.apache.camel.processor.CamelInternalProcessor; 043import org.apache.camel.processor.DeferServiceFactory; 044import org.apache.camel.processor.UnitOfWorkProducer; 045import org.apache.camel.util.CamelContextHelper; 046import org.apache.camel.util.IntrospectionSupport; 047import org.apache.camel.util.ObjectHelper; 048import org.apache.camel.util.ServiceHelper; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * A helper class for Camel based injector or post processing hooks which can be reused by 054 * both the <a href="http://camel.apache.org/spring.html">Spring</a>, 055 * <a href="http://camel.apache.org/guice.html">Guice</a> and 056 * <a href="http://camel.apache.org/blueprint.html">Blueprint</a> support. 057 * 058 * @version 059 */ 060public class CamelPostProcessorHelper implements CamelContextAware { 061 private static final Logger LOG = LoggerFactory.getLogger(CamelPostProcessorHelper.class); 062 063 @XmlTransient 064 private CamelContext camelContext; 065 066 public CamelPostProcessorHelper() { 067 } 068 069 public CamelPostProcessorHelper(CamelContext camelContext) { 070 this.setCamelContext(camelContext); 071 } 072 073 public CamelContext getCamelContext() { 074 return camelContext; 075 } 076 077 public void setCamelContext(CamelContext camelContext) { 078 this.camelContext = camelContext; 079 } 080 081 /** 082 * Does the given context match this camel context 083 */ 084 public boolean matchContext(String context) { 085 if (ObjectHelper.isNotEmpty(context)) { 086 if (!getCamelContext().getName().equals(context)) { 087 return false; 088 } 089 } 090 return true; 091 } 092 093 public void consumerInjection(Method method, Object bean, String beanName) { 094 Consume consume = method.getAnnotation(Consume.class); 095 if (consume != null && matchContext(consume.context())) { 096 LOG.debug("Creating a consumer for: " + consume); 097 subscribeMethod(method, bean, beanName, consume.uri(), consume.ref(), consume.property()); 098 } 099 } 100 101 public void subscribeMethod(Method method, Object bean, String beanName, String endpointUri, String endpointName, String endpointProperty) { 102 // lets bind this method to a listener 103 String injectionPointName = method.getName(); 104 Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointName, endpointProperty, injectionPointName, true); 105 if (endpoint != null) { 106 try { 107 Processor processor = createConsumerProcessor(bean, method, endpoint); 108 Consumer consumer = endpoint.createConsumer(processor); 109 LOG.debug("Created processor: {} for consumer: {}", processor, consumer); 110 startService(consumer, endpoint.getCamelContext(), bean, beanName); 111 } catch (Exception e) { 112 throw ObjectHelper.wrapRuntimeCamelException(e); 113 } 114 } 115 } 116 117 /** 118 * Stats the given service 119 */ 120 protected void startService(Service service, CamelContext camelContext, Object bean, String beanName) throws Exception { 121 // defer starting the service until CamelContext has started all its initial services 122 if (camelContext != null) { 123 camelContext.deferStartService(service, true); 124 } else { 125 // mo CamelContext then start service manually 126 ServiceHelper.startService(service); 127 } 128 129 boolean singleton = isSingleton(bean, beanName); 130 if (!singleton) { 131 LOG.debug("Service is not singleton so you must remember to stop it manually {}", service); 132 } 133 } 134 135 /** 136 * Create a processor which invokes the given method when an incoming 137 * message exchange is received 138 */ 139 protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) { 140 BeanInfo info = new BeanInfo(getCamelContext(), method); 141 BeanProcessor answer = new BeanProcessor(pojo, info); 142 // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked 143 CamelInternalProcessor internal = new CamelInternalProcessor(answer); 144 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); 145 return internal; 146 } 147 148 public Endpoint getEndpointInjection(Object bean, String uri, String name, String propertyName, 149 String injectionPointName, boolean mandatory) { 150 if (ObjectHelper.isEmpty(uri) && ObjectHelper.isEmpty(name)) { 151 // if no uri or ref, then fallback and try the endpoint property 152 return doGetEndpointInjection(bean, propertyName, injectionPointName); 153 } else { 154 return doGetEndpointInjection(uri, name, injectionPointName, mandatory); 155 } 156 } 157 158 private Endpoint doGetEndpointInjection(String uri, String name, String injectionPointName, boolean mandatory) { 159 return CamelContextHelper.getEndpointInjection(getCamelContext(), uri, name, injectionPointName, mandatory); 160 } 161 162 /** 163 * Gets the injection endpoint from a bean property. 164 * @param bean the bean 165 * @param propertyName the property name on the bean 166 */ 167 private Endpoint doGetEndpointInjection(Object bean, String propertyName, String injectionPointName) { 168 // fall back and use the method name if no explicit property name was given 169 if (ObjectHelper.isEmpty(propertyName)) { 170 propertyName = injectionPointName; 171 } 172 173 // we have a property name, try to lookup a getter method on the bean with that name using this strategy 174 // 1. first the getter with the name as given 175 // 2. then the getter with Endpoint as postfix 176 // 3. then if start with on then try step 1 and 2 again, but omit the on prefix 177 try { 178 Object value = IntrospectionSupport.getOrElseProperty(bean, propertyName, null); 179 if (value == null) { 180 // try endpoint as postfix 181 value = IntrospectionSupport.getOrElseProperty(bean, propertyName + "Endpoint", null); 182 } 183 if (value == null && propertyName.startsWith("on")) { 184 // retry but without the on as prefix 185 propertyName = propertyName.substring(2); 186 return doGetEndpointInjection(bean, propertyName, injectionPointName); 187 } 188 if (value == null) { 189 return null; 190 } else if (value instanceof Endpoint) { 191 return (Endpoint) value; 192 } else { 193 String uriOrRef = getCamelContext().getTypeConverter().mandatoryConvertTo(String.class, value); 194 return getCamelContext().getEndpoint(uriOrRef); 195 } 196 } catch (Exception e) { 197 throw new IllegalArgumentException("Error getting property " + propertyName + " from bean " + bean + " due " + e.getMessage(), e); 198 } 199 } 200 201 /** 202 * Creates the object to be injected for an {@link org.apache.camel.EndpointInject} or {@link org.apache.camel.Produce} injection point 203 */ 204 public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String endpointProperty, 205 String injectionPointName, Object bean, String beanName) { 206 if (type.isAssignableFrom(ProducerTemplate.class)) { 207 return createInjectionProducerTemplate(endpointUri, endpointRef, endpointProperty, injectionPointName, bean); 208 } else if (type.isAssignableFrom(FluentProducerTemplate.class)) { 209 return createInjectionFluentProducerTemplate(endpointUri, endpointRef, endpointProperty, injectionPointName, bean); 210 } else if (type.isAssignableFrom(ConsumerTemplate.class)) { 211 return createInjectionConsumerTemplate(endpointUri, endpointRef, endpointProperty, injectionPointName); 212 } else { 213 Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointRef, endpointProperty, injectionPointName, true); 214 if (endpoint != null) { 215 if (type.isInstance(endpoint)) { 216 return endpoint; 217 } else if (type.isAssignableFrom(Producer.class)) { 218 return createInjectionProducer(endpoint, bean, beanName); 219 } else if (type.isAssignableFrom(PollingConsumer.class)) { 220 return createInjectionPollingConsumer(endpoint, bean, beanName); 221 } else if (type.isInterface()) { 222 // lets create a proxy 223 try { 224 return ProxyHelper.createProxy(endpoint, type); 225 } catch (Exception e) { 226 throw createProxyInstantiationRuntimeException(type, endpoint, e); 227 } 228 } else { 229 throw new IllegalArgumentException("Invalid type: " + type.getName() 230 + " which cannot be injected via @EndpointInject/@Produce for: " + endpoint); 231 } 232 } 233 return null; 234 } 235 } 236 237 public Object getInjectionPropertyValue(Class<?> type, String propertyName, String propertyDefaultValue, 238 String injectionPointName, Object bean, String beanName) { 239 try { 240 // enforce a properties component to be created if none existed 241 CamelContextHelper.lookupPropertiesComponent(getCamelContext(), true); 242 243 String key; 244 String prefix = getCamelContext().getPropertyPrefixToken(); 245 String suffix = getCamelContext().getPropertySuffixToken(); 246 if (!propertyName.contains(prefix)) { 247 // must enclose the property name with prefix/suffix to have it resolved 248 key = prefix + propertyName + suffix; 249 } else { 250 // key has already prefix/suffix so use it as-is as it may be a compound key 251 key = propertyName; 252 } 253 String value = getCamelContext().resolvePropertyPlaceholders(key); 254 if (value != null) { 255 return getCamelContext().getTypeConverter().mandatoryConvertTo(type, value); 256 } else { 257 return null; 258 } 259 } catch (Exception e) { 260 if (ObjectHelper.isNotEmpty(propertyDefaultValue)) { 261 try { 262 return getCamelContext().getTypeConverter().mandatoryConvertTo(type, propertyDefaultValue); 263 } catch (Exception e2) { 264 throw ObjectHelper.wrapRuntimeCamelException(e2); 265 } 266 } 267 throw ObjectHelper.wrapRuntimeCamelException(e); 268 } 269 } 270 271 public Object getInjectionBeanValue(Class<?> type, String name) { 272 if (ObjectHelper.isEmpty(name)) { 273 Set<?> found = getCamelContext().getRegistry().findByType(type); 274 if (found == null || found.isEmpty()) { 275 throw new NoSuchBeanException(name, type.getName()); 276 } else if (found.size() > 1) { 277 throw new NoSuchBeanException("Found " + found.size() + " beans of type: " + type + ". Only one bean expected."); 278 } else { 279 // we found only one 280 return found.iterator().next(); 281 } 282 } else { 283 return CamelContextHelper.mandatoryLookup(getCamelContext(), name, type); 284 } 285 } 286 287 /** 288 * Factory method to create a {@link org.apache.camel.ProducerTemplate} to be injected into a POJO 289 */ 290 protected ProducerTemplate createInjectionProducerTemplate(String endpointUri, String endpointRef, String endpointProperty, 291 String injectionPointName, Object bean) { 292 // endpoint is optional for this injection point 293 Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointRef, endpointProperty, injectionPointName, false); 294 CamelContext context = endpoint != null ? endpoint.getCamelContext() : getCamelContext(); 295 ProducerTemplate answer = new DefaultProducerTemplate(context, endpoint); 296 // start the template so its ready to use 297 try { 298 // no need to defer the template as it can adjust to the endpoint at runtime 299 startService(answer, context, bean, null); 300 } catch (Exception e) { 301 throw ObjectHelper.wrapRuntimeCamelException(e); 302 } 303 return answer; 304 } 305 306 /** 307 * Factory method to create a {@link org.apache.camel.FluentProducerTemplate} to be injected into a POJO 308 */ 309 protected FluentProducerTemplate createInjectionFluentProducerTemplate(String endpointUri, String endpointRef, String endpointProperty, 310 String injectionPointName, Object bean) { 311 // endpoint is optional for this injection point 312 Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointRef, endpointProperty, injectionPointName, false); 313 CamelContext context = endpoint != null ? endpoint.getCamelContext() : getCamelContext(); 314 FluentProducerTemplate answer = new DefaultFluentProducerTemplate(context); 315 answer.setDefaultEndpoint(endpoint); 316 // start the template so its ready to use 317 try { 318 // no need to defer the template as it can adjust to the endpoint at runtime 319 startService(answer, context, bean, null); 320 } catch (Exception e) { 321 throw ObjectHelper.wrapRuntimeCamelException(e); 322 } 323 return answer; 324 } 325 326 /** 327 * Factory method to create a {@link org.apache.camel.ConsumerTemplate} to be injected into a POJO 328 */ 329 protected ConsumerTemplate createInjectionConsumerTemplate(String endpointUri, String endpointRef, String endpointProperty, 330 String injectionPointName) { 331 ConsumerTemplate answer = new DefaultConsumerTemplate(getCamelContext()); 332 // start the template so its ready to use 333 try { 334 startService(answer, null, null, null); 335 } catch (Exception e) { 336 throw ObjectHelper.wrapRuntimeCamelException(e); 337 } 338 return answer; 339 } 340 341 /** 342 * Factory method to create a started {@link org.apache.camel.PollingConsumer} to be injected into a POJO 343 */ 344 protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint, Object bean, String beanName) { 345 try { 346 PollingConsumer consumer = endpoint.createPollingConsumer(); 347 startService(consumer, endpoint.getCamelContext(), bean, beanName); 348 return consumer; 349 } catch (Exception e) { 350 throw ObjectHelper.wrapRuntimeCamelException(e); 351 } 352 } 353 354 /** 355 * A Factory method to create a started {@link org.apache.camel.Producer} to be injected into a POJO 356 */ 357 protected Producer createInjectionProducer(Endpoint endpoint, Object bean, String beanName) { 358 try { 359 Producer producer = DeferServiceFactory.createProducer(endpoint); 360 return new UnitOfWorkProducer(producer); 361 } catch (Exception e) { 362 throw ObjectHelper.wrapRuntimeCamelException(e); 363 } 364 } 365 366 protected RuntimeException createProxyInstantiationRuntimeException(Class<?> type, Endpoint endpoint, Exception e) { 367 return new ProxyInstantiationException(type, endpoint, e); 368 } 369 370 /** 371 * Implementations can override this method to determine if the bean is singleton. 372 * 373 * @param bean the bean 374 * @return <tt>true</tt> if its singleton scoped, for prototype scoped <tt>false</tt> is returned. 375 */ 376 protected boolean isSingleton(Object bean, String beanName) { 377 if (bean instanceof IsSingleton) { 378 IsSingleton singleton = (IsSingleton) bean; 379 return singleton.isSingleton(); 380 } 381 return true; 382 } 383}