001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.lang.reflect.Method; 020import java.util.Set; 021import javax.xml.bind.annotation.XmlTransient; 022 023import org.apache.camel.CamelContext; 024import org.apache.camel.CamelContextAware; 025import org.apache.camel.Consume; 026import org.apache.camel.Consumer; 027import org.apache.camel.ConsumerTemplate; 028import org.apache.camel.Endpoint; 029import org.apache.camel.IsSingleton; 030import org.apache.camel.NoSuchBeanException; 031import org.apache.camel.PollingConsumer; 032import org.apache.camel.Processor; 033import org.apache.camel.Producer; 034import org.apache.camel.ProducerTemplate; 035import org.apache.camel.ProxyInstantiationException; 036import org.apache.camel.Service; 037import org.apache.camel.component.bean.BeanInfo; 038import org.apache.camel.component.bean.BeanProcessor; 039import org.apache.camel.component.bean.ProxyHelper; 040import org.apache.camel.processor.CamelInternalProcessor; 041import org.apache.camel.processor.DeferServiceFactory; 042import org.apache.camel.processor.UnitOfWorkProducer; 043import org.apache.camel.util.CamelContextHelper; 044import org.apache.camel.util.IntrospectionSupport; 045import org.apache.camel.util.ObjectHelper; 046import org.apache.camel.util.ServiceHelper; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * A helper class for Camel based injector or post processing hooks which can be reused by 052 * both the <a href="http://camel.apache.org/spring.html">Spring</a>, 053 * <a href="http://camel.apache.org/guice.html">Guice</a> and 054 * <a href="http://camel.apache.org/blueprint.html">Blueprint</a> support. 055 * 056 * @version 057 */ 058public class CamelPostProcessorHelper implements CamelContextAware { 059 private static final Logger LOG = LoggerFactory.getLogger(CamelPostProcessorHelper.class); 060 061 @XmlTransient 062 private CamelContext camelContext; 063 064 public CamelPostProcessorHelper() { 065 } 066 067 public CamelPostProcessorHelper(CamelContext camelContext) { 068 this.setCamelContext(camelContext); 069 } 070 071 public CamelContext getCamelContext() { 072 return camelContext; 073 } 074 075 public void setCamelContext(CamelContext camelContext) { 076 this.camelContext = camelContext; 077 } 078 079 /** 080 * Does the given context match this camel context 081 */ 082 public boolean matchContext(String context) { 083 if (ObjectHelper.isNotEmpty(context)) { 084 if (!getCamelContext().getName().equals(context)) { 085 return false; 086 } 087 } 088 return true; 089 } 090 091 public void consumerInjection(Method method, Object bean, String beanName) { 092 Consume consume = method.getAnnotation(Consume.class); 093 if (consume != null && matchContext(consume.context())) { 094 LOG.debug("Creating a consumer for: " + consume); 095 subscribeMethod(method, bean, beanName, consume.uri(), consume.ref(), consume.property()); 096 } 097 } 098 099 public void subscribeMethod(Method method, Object bean, String beanName, String endpointUri, String endpointName, String endpointProperty) { 100 // lets bind this method to a listener 101 String injectionPointName = method.getName(); 102 Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointName, endpointProperty, injectionPointName, true); 103 if (endpoint != null) { 104 try { 105 Processor processor = createConsumerProcessor(bean, method, endpoint); 106 Consumer consumer = endpoint.createConsumer(processor); 107 LOG.debug("Created processor: {} for consumer: {}", processor, consumer); 108 startService(consumer, endpoint.getCamelContext(), bean, beanName); 109 } catch (Exception e) { 110 throw ObjectHelper.wrapRuntimeCamelException(e); 111 } 112 } 113 } 114 115 /** 116 * Stats the given service 117 */ 118 protected void startService(Service service, CamelContext camelContext, Object bean, String beanName) throws Exception { 119 // defer starting the service until CamelContext has started all its initial services 120 if (camelContext != null) { 121 camelContext.deferStartService(service, true); 122 } else { 123 // mo CamelContext then start service manually 124 ServiceHelper.startService(service); 125 } 126 127 boolean singleton = isSingleton(bean, beanName); 128 if (!singleton) { 129 LOG.debug("Service is not singleton so you must remember to stop it manually {}", service); 130 } 131 } 132 133 /** 134 * Create a processor which invokes the given method when an incoming 135 * message exchange is received 136 */ 137 protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) { 138 BeanInfo info = new BeanInfo(getCamelContext(), method); 139 BeanProcessor answer = new BeanProcessor(pojo, info); 140 // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked 141 CamelInternalProcessor internal = new CamelInternalProcessor(answer); 142 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); 143 return internal; 144 } 145 146 public Endpoint getEndpointInjection(Object bean, String uri, String name, String propertyName, 147 String injectionPointName, boolean mandatory) { 148 if (ObjectHelper.isEmpty(uri) && ObjectHelper.isEmpty(name)) { 149 // if no uri or ref, then fallback and try the endpoint property 150 return doGetEndpointInjection(bean, propertyName, injectionPointName); 151 } else { 152 return doGetEndpointInjection(uri, name, injectionPointName, mandatory); 153 } 154 } 155 156 private Endpoint doGetEndpointInjection(String uri, String name, String injectionPointName, boolean mandatory) { 157 return CamelContextHelper.getEndpointInjection(getCamelContext(), uri, name, injectionPointName, mandatory); 158 } 159 160 /** 161 * Gets the injection endpoint from a bean property. 162 * @param bean the bean 163 * @param propertyName the property name on the bean 164 */ 165 private Endpoint doGetEndpointInjection(Object bean, String propertyName, String injectionPointName) { 166 // fall back and use the method name if no explicit property name was given 167 if (ObjectHelper.isEmpty(propertyName)) { 168 propertyName = injectionPointName; 169 } 170 171 // we have a property name, try to lookup a getter method on the bean with that name using this strategy 172 // 1. first the getter with the name as given 173 // 2. then the getter with Endpoint as postfix 174 // 3. then if start with on then try step 1 and 2 again, but omit the on prefix 175 try { 176 Object value = IntrospectionSupport.getOrElseProperty(bean, propertyName, null); 177 if (value == null) { 178 // try endpoint as postfix 179 value = IntrospectionSupport.getOrElseProperty(bean, propertyName + "Endpoint", null); 180 } 181 if (value == null && propertyName.startsWith("on")) { 182 // retry but without the on as prefix 183 propertyName = propertyName.substring(2); 184 return doGetEndpointInjection(bean, propertyName, injectionPointName); 185 } 186 if (value == null) { 187 return null; 188 } else if (value instanceof Endpoint) { 189 return (Endpoint) value; 190 } else { 191 String uriOrRef = getCamelContext().getTypeConverter().mandatoryConvertTo(String.class, value); 192 return getCamelContext().getEndpoint(uriOrRef); 193 } 194 } catch (Exception e) { 195 throw new IllegalArgumentException("Error getting property " + propertyName + " from bean " + bean + " due " + e.getMessage(), e); 196 } 197 } 198 199 /** 200 * Creates the object to be injected for an {@link org.apache.camel.EndpointInject} or {@link org.apache.camel.Produce} injection point 201 */ 202 public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String endpointProperty, 203 String injectionPointName, Object bean, String beanName) { 204 if (type.isAssignableFrom(ProducerTemplate.class)) { 205 return createInjectionProducerTemplate(endpointUri, endpointRef, endpointProperty, injectionPointName, bean); 206 } else if (type.isAssignableFrom(ConsumerTemplate.class)) { 207 return createInjectionConsumerTemplate(endpointUri, endpointRef, endpointProperty, injectionPointName); 208 } else { 209 Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointRef, endpointProperty, injectionPointName, true); 210 if (endpoint != null) { 211 if (type.isInstance(endpoint)) { 212 return endpoint; 213 } else if (type.isAssignableFrom(Producer.class)) { 214 return createInjectionProducer(endpoint, bean, beanName); 215 } else if (type.isAssignableFrom(PollingConsumer.class)) { 216 return createInjectionPollingConsumer(endpoint, bean, beanName); 217 } else if (type.isInterface()) { 218 // lets create a proxy 219 try { 220 return ProxyHelper.createProxy(endpoint, type); 221 } catch (Exception e) { 222 throw createProxyInstantiationRuntimeException(type, endpoint, e); 223 } 224 } else { 225 throw new IllegalArgumentException("Invalid type: " + type.getName() 226 + " which cannot be injected via @EndpointInject/@Produce for: " + endpoint); 227 } 228 } 229 return null; 230 } 231 } 232 233 public Object getInjectionPropertyValue(Class<?> type, String propertyName, String propertyDefaultValue, 234 String injectionPointName, Object bean, String beanName) { 235 try { 236 // enforce a properties component to be created if none existed 237 CamelContextHelper.lookupPropertiesComponent(getCamelContext(), true); 238 239 String key; 240 String prefix = getCamelContext().getPropertyPrefixToken(); 241 String suffix = getCamelContext().getPropertySuffixToken(); 242 if (!propertyName.contains(prefix)) { 243 // must enclose the property name with prefix/suffix to have it resolved 244 key = prefix + propertyName + suffix; 245 } else { 246 // key has already prefix/suffix so use it as-is as it may be a compound key 247 key = propertyName; 248 } 249 String value = getCamelContext().resolvePropertyPlaceholders(key); 250 if (value != null) { 251 return getCamelContext().getTypeConverter().mandatoryConvertTo(type, value); 252 } else { 253 return null; 254 } 255 } catch (Exception e) { 256 if (ObjectHelper.isNotEmpty(propertyDefaultValue)) { 257 try { 258 return getCamelContext().getTypeConverter().mandatoryConvertTo(type, propertyDefaultValue); 259 } catch (Exception e2) { 260 throw ObjectHelper.wrapRuntimeCamelException(e2); 261 } 262 } 263 throw ObjectHelper.wrapRuntimeCamelException(e); 264 } 265 } 266 267 public Object getInjectionBeanValue(Class<?> type, String name) { 268 if (ObjectHelper.isEmpty(name)) { 269 Set<?> found = getCamelContext().getRegistry().findByType(type); 270 if (found == null || found.isEmpty()) { 271 throw new NoSuchBeanException(name, type.getName()); 272 } else if (found.size() > 1) { 273 throw new NoSuchBeanException("Found " + found.size() + " beans of type: " + type + ". Only one bean expected."); 274 } else { 275 // we found only one 276 return found.iterator().next(); 277 } 278 } else { 279 return CamelContextHelper.mandatoryLookup(getCamelContext(), name, type); 280 } 281 } 282 283 /** 284 * Factory method to create a {@link org.apache.camel.ProducerTemplate} to be injected into a POJO 285 */ 286 protected ProducerTemplate createInjectionProducerTemplate(String endpointUri, String endpointRef, String endpointProperty, 287 String injectionPointName, Object bean) { 288 // endpoint is optional for this injection point 289 Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointRef, endpointProperty, injectionPointName, false); 290 CamelContext context = endpoint != null ? endpoint.getCamelContext() : getCamelContext(); 291 ProducerTemplate answer = new DefaultProducerTemplate(context, endpoint); 292 // start the template so its ready to use 293 try { 294 // no need to defer the template as it can adjust to the endpoint at runtime 295 startService(answer, context, bean, null); 296 } catch (Exception e) { 297 throw ObjectHelper.wrapRuntimeCamelException(e); 298 } 299 return answer; 300 } 301 302 /** 303 * Factory method to create a {@link org.apache.camel.ConsumerTemplate} to be injected into a POJO 304 */ 305 protected ConsumerTemplate createInjectionConsumerTemplate(String endpointUri, String endpointRef, String endpointProperty, 306 String injectionPointName) { 307 ConsumerTemplate answer = new DefaultConsumerTemplate(getCamelContext()); 308 // start the template so its ready to use 309 try { 310 startService(answer, null, null, null); 311 } catch (Exception e) { 312 throw ObjectHelper.wrapRuntimeCamelException(e); 313 } 314 return answer; 315 } 316 317 /** 318 * Factory method to create a started {@link org.apache.camel.PollingConsumer} to be injected into a POJO 319 */ 320 protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint, Object bean, String beanName) { 321 try { 322 PollingConsumer consumer = endpoint.createPollingConsumer(); 323 startService(consumer, endpoint.getCamelContext(), bean, beanName); 324 return consumer; 325 } catch (Exception e) { 326 throw ObjectHelper.wrapRuntimeCamelException(e); 327 } 328 } 329 330 /** 331 * A Factory method to create a started {@link org.apache.camel.Producer} to be injected into a POJO 332 */ 333 protected Producer createInjectionProducer(Endpoint endpoint, Object bean, String beanName) { 334 try { 335 Producer producer = DeferServiceFactory.createProducer(endpoint); 336 return new UnitOfWorkProducer(producer); 337 } catch (Exception e) { 338 throw ObjectHelper.wrapRuntimeCamelException(e); 339 } 340 } 341 342 protected RuntimeException createProxyInstantiationRuntimeException(Class<?> type, Endpoint endpoint, Exception e) { 343 return new ProxyInstantiationException(type, endpoint, e); 344 } 345 346 /** 347 * Implementations can override this method to determine if the bean is singleton. 348 * 349 * @param bean the bean 350 * @return <tt>true</tt> if its singleton scoped, for prototype scoped <tt>false</tt> is returned. 351 */ 352 protected boolean isSingleton(Object bean, String beanName) { 353 if (bean instanceof IsSingleton) { 354 IsSingleton singleton = (IsSingleton) bean; 355 return singleton.isSingleton(); 356 } 357 return true; 358 } 359}