001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.lang.reflect.Method;
020import java.util.Set;
021import javax.xml.bind.annotation.XmlTransient;
022
023import org.apache.camel.CamelContext;
024import org.apache.camel.CamelContextAware;
025import org.apache.camel.Consume;
026import org.apache.camel.Consumer;
027import org.apache.camel.ConsumerTemplate;
028import org.apache.camel.Endpoint;
029import org.apache.camel.FluentProducerTemplate;
030import org.apache.camel.IsSingleton;
031import org.apache.camel.NoSuchBeanException;
032import org.apache.camel.PollingConsumer;
033import org.apache.camel.Processor;
034import org.apache.camel.Producer;
035import org.apache.camel.ProducerTemplate;
036import org.apache.camel.ProxyInstantiationException;
037import org.apache.camel.Service;
038import org.apache.camel.builder.DefaultFluentProducerTemplate;
039import org.apache.camel.component.bean.BeanInfo;
040import org.apache.camel.component.bean.BeanProcessor;
041import org.apache.camel.component.bean.ProxyHelper;
042import org.apache.camel.processor.CamelInternalProcessor;
043import org.apache.camel.processor.DeferServiceFactory;
044import org.apache.camel.processor.UnitOfWorkProducer;
045import org.apache.camel.util.CamelContextHelper;
046import org.apache.camel.util.IntrospectionSupport;
047import org.apache.camel.util.ObjectHelper;
048import org.apache.camel.util.ServiceHelper;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * A helper class for Camel based injector or post processing hooks which can be reused by
054 * both the <a href="http://camel.apache.org/spring.html">Spring</a>,
055 * <a href="http://camel.apache.org/guice.html">Guice</a> and
056 * <a href="http://camel.apache.org/blueprint.html">Blueprint</a> support.
057 *
058 * @version 
059 */
060public class CamelPostProcessorHelper implements CamelContextAware {
061    private static final Logger LOG = LoggerFactory.getLogger(CamelPostProcessorHelper.class);
062
063    @XmlTransient
064    private CamelContext camelContext;
065
066    public CamelPostProcessorHelper() {
067    }
068
069    public CamelPostProcessorHelper(CamelContext camelContext) {
070        this.setCamelContext(camelContext);
071    }
072
073    public CamelContext getCamelContext() {
074        return camelContext;
075    }
076
077    public void setCamelContext(CamelContext camelContext) {
078        this.camelContext = camelContext;
079    }
080
081    /**
082     * Does the given context match this camel context
083     */
084    public boolean matchContext(String context) {
085        if (ObjectHelper.isNotEmpty(context)) {
086            if (!getCamelContext().getName().equals(context)) {
087                return false;
088            }
089        }
090        return true;
091    }
092
093    public void consumerInjection(Method method, Object bean, String beanName) {
094        Consume consume = method.getAnnotation(Consume.class);
095        if (consume != null && matchContext(consume.context())) {
096            LOG.debug("Creating a consumer for: " + consume);
097            subscribeMethod(method, bean, beanName, consume.uri(), consume.ref(), consume.property());
098        }
099    }
100
101    public void subscribeMethod(Method method, Object bean, String beanName, String endpointUri, String endpointName, String endpointProperty) {
102        // lets bind this method to a listener
103        String injectionPointName = method.getName();
104        Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointName, endpointProperty, injectionPointName, true);
105        if (endpoint != null) {
106            try {
107                Processor processor = createConsumerProcessor(bean, method, endpoint);
108                Consumer consumer = endpoint.createConsumer(processor);
109                LOG.debug("Created processor: {} for consumer: {}", processor, consumer);
110                startService(consumer, endpoint.getCamelContext(), bean, beanName);
111            } catch (Exception e) {
112                throw ObjectHelper.wrapRuntimeCamelException(e);
113            }
114        }
115    }
116
117    /**
118     * Stats the given service
119     */
120    protected void startService(Service service, CamelContext camelContext, Object bean, String beanName) throws Exception {
121        // defer starting the service until CamelContext has started all its initial services
122        if (camelContext != null) {
123            camelContext.deferStartService(service, true);
124        } else {
125            // mo CamelContext then start service manually
126            ServiceHelper.startService(service);
127        }
128
129        boolean singleton = isSingleton(bean, beanName);
130        if (!singleton) {
131            LOG.debug("Service is not singleton so you must remember to stop it manually {}", service);
132        }
133    }
134
135    /**
136     * Create a processor which invokes the given method when an incoming
137     * message exchange is received
138     */
139    protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) {
140        BeanInfo info = new BeanInfo(getCamelContext(), method);
141        BeanProcessor answer = new BeanProcessor(pojo, info);
142        // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked
143        CamelInternalProcessor internal = new CamelInternalProcessor(answer);
144        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
145        return internal;
146    }
147
148    public Endpoint getEndpointInjection(Object bean, String uri, String name, String propertyName,
149                                         String injectionPointName, boolean mandatory) {
150        if (ObjectHelper.isEmpty(uri) && ObjectHelper.isEmpty(name)) {
151            // if no uri or ref, then fallback and try the endpoint property
152            return doGetEndpointInjection(bean, propertyName, injectionPointName);
153        } else {
154            return doGetEndpointInjection(uri, name, injectionPointName, mandatory);
155        }
156    }
157
158    private Endpoint doGetEndpointInjection(String uri, String name, String injectionPointName, boolean mandatory) {
159        return CamelContextHelper.getEndpointInjection(getCamelContext(), uri, name, injectionPointName, mandatory);
160    }
161
162    /**
163     * Gets the injection endpoint from a bean property.
164     * @param bean the bean
165     * @param propertyName the property name on the bean
166     */
167    private Endpoint doGetEndpointInjection(Object bean, String propertyName, String injectionPointName) {
168        // fall back and use the method name if no explicit property name was given
169        if (ObjectHelper.isEmpty(propertyName)) {
170            propertyName = injectionPointName;
171        }
172
173        // we have a property name, try to lookup a getter method on the bean with that name using this strategy
174        // 1. first the getter with the name as given
175        // 2. then the getter with Endpoint as postfix
176        // 3. then if start with on then try step 1 and 2 again, but omit the on prefix
177        try {
178            Object value = IntrospectionSupport.getOrElseProperty(bean, propertyName, null);
179            if (value == null) {
180                // try endpoint as postfix
181                value = IntrospectionSupport.getOrElseProperty(bean, propertyName + "Endpoint", null);
182            }
183            if (value == null && propertyName.startsWith("on")) {
184                // retry but without the on as prefix
185                propertyName = propertyName.substring(2);
186                return doGetEndpointInjection(bean, propertyName, injectionPointName);
187            }
188            if (value == null) {
189                return null;
190            } else if (value instanceof Endpoint) {
191                return (Endpoint) value;
192            } else {
193                String uriOrRef = getCamelContext().getTypeConverter().mandatoryConvertTo(String.class, value);
194                return getCamelContext().getEndpoint(uriOrRef);
195            }
196        } catch (Exception e) {
197            throw new IllegalArgumentException("Error getting property " + propertyName + " from bean " + bean + " due " + e.getMessage(), e);
198        }
199    }
200
201    /**
202     * Creates the object to be injected for an {@link org.apache.camel.EndpointInject} or {@link org.apache.camel.Produce} injection point
203     */
204    public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String endpointProperty,
205                                    String injectionPointName, Object bean, String beanName) {
206        if (type.isAssignableFrom(ProducerTemplate.class)) {
207            return createInjectionProducerTemplate(endpointUri, endpointRef, endpointProperty, injectionPointName, bean);
208        } else if (type.isAssignableFrom(FluentProducerTemplate.class)) {
209            return createInjectionFluentProducerTemplate(endpointUri, endpointRef, endpointProperty, injectionPointName, bean);
210        } else if (type.isAssignableFrom(ConsumerTemplate.class)) {
211            return createInjectionConsumerTemplate(endpointUri, endpointRef, endpointProperty, injectionPointName);
212        } else {
213            Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointRef, endpointProperty, injectionPointName, true);
214            if (endpoint != null) {
215                if (type.isInstance(endpoint)) {
216                    return endpoint;
217                } else if (type.isAssignableFrom(Producer.class)) {
218                    return createInjectionProducer(endpoint, bean, beanName);
219                } else if (type.isAssignableFrom(PollingConsumer.class)) {
220                    return createInjectionPollingConsumer(endpoint, bean, beanName);
221                } else if (type.isInterface()) {
222                    // lets create a proxy
223                    try {
224                        return ProxyHelper.createProxy(endpoint, type);
225                    } catch (Exception e) {
226                        throw createProxyInstantiationRuntimeException(type, endpoint, e);
227                    }
228                } else {
229                    throw new IllegalArgumentException("Invalid type: " + type.getName()
230                            + " which cannot be injected via @EndpointInject/@Produce for: " + endpoint);
231                }
232            }
233            return null;
234        }
235    }
236
237    public Object getInjectionPropertyValue(Class<?> type, String propertyName, String propertyDefaultValue,
238                                            String injectionPointName, Object bean, String beanName) {
239        try {
240            // enforce a properties component to be created if none existed
241            CamelContextHelper.lookupPropertiesComponent(getCamelContext(), true);
242
243            String key;
244            String prefix = getCamelContext().getPropertyPrefixToken();
245            String suffix = getCamelContext().getPropertySuffixToken();
246            if (!propertyName.contains(prefix)) {
247                // must enclose the property name with prefix/suffix to have it resolved
248                key = prefix + propertyName + suffix;
249            } else {
250                // key has already prefix/suffix so use it as-is as it may be a compound key
251                key = propertyName;
252            }
253            String value = getCamelContext().resolvePropertyPlaceholders(key);
254            if (value != null) {
255                return getCamelContext().getTypeConverter().mandatoryConvertTo(type, value);
256            } else {
257                return null;
258            }
259        } catch (Exception e) {
260            if (ObjectHelper.isNotEmpty(propertyDefaultValue)) {
261                try {
262                    return getCamelContext().getTypeConverter().mandatoryConvertTo(type, propertyDefaultValue);
263                } catch (Exception e2) {
264                    throw ObjectHelper.wrapRuntimeCamelException(e2);
265                }
266            }
267            throw ObjectHelper.wrapRuntimeCamelException(e);
268        }
269    }
270
271    public Object getInjectionBeanValue(Class<?> type, String name) {
272        if (ObjectHelper.isEmpty(name)) {
273            Set<?> found = getCamelContext().getRegistry().findByType(type);
274            if (found == null || found.isEmpty()) {
275                throw new NoSuchBeanException(name, type.getName());
276            } else if (found.size() > 1) {
277                throw new NoSuchBeanException("Found " + found.size() + " beans of type: " + type + ". Only one bean expected.");
278            } else {
279                // we found only one
280                return found.iterator().next();
281            }
282        } else {
283            return CamelContextHelper.mandatoryLookup(getCamelContext(), name, type);
284        }
285    }
286
287    /**
288     * Factory method to create a {@link org.apache.camel.ProducerTemplate} to be injected into a POJO
289     */
290    protected ProducerTemplate createInjectionProducerTemplate(String endpointUri, String endpointRef, String endpointProperty,
291                                                               String injectionPointName, Object bean) {
292        // endpoint is optional for this injection point
293        Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointRef, endpointProperty, injectionPointName, false);
294        CamelContext context = endpoint != null ? endpoint.getCamelContext() : getCamelContext();
295        ProducerTemplate answer = new DefaultProducerTemplate(context, endpoint);
296        // start the template so its ready to use
297        try {
298            // no need to defer the template as it can adjust to the endpoint at runtime
299            startService(answer, context, bean, null);
300        } catch (Exception e) {
301            throw ObjectHelper.wrapRuntimeCamelException(e);
302        }
303        return answer;
304    }
305
306    /**
307     * Factory method to create a {@link org.apache.camel.FluentProducerTemplate} to be injected into a POJO
308     */
309    protected FluentProducerTemplate createInjectionFluentProducerTemplate(String endpointUri, String endpointRef, String endpointProperty,
310                                                                           String injectionPointName, Object bean) {
311        // endpoint is optional for this injection point
312        Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointRef, endpointProperty, injectionPointName, false);
313        CamelContext context = endpoint != null ? endpoint.getCamelContext() : getCamelContext();
314        FluentProducerTemplate answer = new DefaultFluentProducerTemplate(context);
315        answer.setDefaultEndpoint(endpoint);
316        // start the template so its ready to use
317        try {
318            // no need to defer the template as it can adjust to the endpoint at runtime
319            startService(answer, context, bean, null);
320        } catch (Exception e) {
321            throw ObjectHelper.wrapRuntimeCamelException(e);
322        }
323        return answer;
324    }
325
326    /**
327     * Factory method to create a {@link org.apache.camel.ConsumerTemplate} to be injected into a POJO
328     */
329    protected ConsumerTemplate createInjectionConsumerTemplate(String endpointUri, String endpointRef, String endpointProperty,
330                                                               String injectionPointName) {
331        ConsumerTemplate answer = new DefaultConsumerTemplate(getCamelContext());
332        // start the template so its ready to use
333        try {
334            startService(answer, null, null, null);
335        } catch (Exception e) {
336            throw ObjectHelper.wrapRuntimeCamelException(e);
337        }
338        return answer;
339    }
340
341    /**
342     * Factory method to create a started {@link org.apache.camel.PollingConsumer} to be injected into a POJO
343     */
344    protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint, Object bean, String beanName) {
345        try {
346            PollingConsumer consumer = endpoint.createPollingConsumer();
347            startService(consumer, endpoint.getCamelContext(), bean, beanName);
348            return consumer;
349        } catch (Exception e) {
350            throw ObjectHelper.wrapRuntimeCamelException(e);
351        }
352    }
353
354    /**
355     * A Factory method to create a started {@link org.apache.camel.Producer} to be injected into a POJO
356     */
357    protected Producer createInjectionProducer(Endpoint endpoint, Object bean, String beanName) {
358        try {
359            Producer producer = DeferServiceFactory.createProducer(endpoint);
360            return new UnitOfWorkProducer(producer);
361        } catch (Exception e) {
362            throw ObjectHelper.wrapRuntimeCamelException(e);
363        }
364    }
365
366    protected RuntimeException createProxyInstantiationRuntimeException(Class<?> type, Endpoint endpoint, Exception e) {
367        return new ProxyInstantiationException(type, endpoint, e);
368    }
369
370    /**
371     * Implementations can override this method to determine if the bean is singleton.
372     *
373     * @param bean the bean
374     * @return <tt>true</tt> if its singleton scoped, for prototype scoped <tt>false</tt> is returned.
375     */
376    protected boolean isSingleton(Object bean, String beanName) {
377        if (bean instanceof IsSingleton) {
378            IsSingleton singleton = (IsSingleton) bean;
379            return singleton.isSingleton();
380        }
381        return true;
382    }
383}