001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.lang.reflect.Method;
020    import javax.xml.bind.annotation.XmlTransient;
021    
022    import org.apache.camel.CamelContext;
023    import org.apache.camel.CamelContextAware;
024    import org.apache.camel.Consume;
025    import org.apache.camel.Consumer;
026    import org.apache.camel.ConsumerTemplate;
027    import org.apache.camel.Endpoint;
028    import org.apache.camel.IsSingleton;
029    import org.apache.camel.PollingConsumer;
030    import org.apache.camel.Processor;
031    import org.apache.camel.Producer;
032    import org.apache.camel.ProducerTemplate;
033    import org.apache.camel.ProxyInstantiationException;
034    import org.apache.camel.Service;
035    import org.apache.camel.component.bean.BeanInfo;
036    import org.apache.camel.component.bean.BeanProcessor;
037    import org.apache.camel.component.bean.ProxyHelper;
038    import org.apache.camel.processor.UnitOfWorkProcessor;
039    import org.apache.camel.processor.UnitOfWorkProducer;
040    import org.apache.camel.util.CamelContextHelper;
041    import org.apache.camel.util.ObjectHelper;
042    import org.apache.camel.util.ServiceHelper;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    /**
047     * A helper class for Camel based injector or post processing hooks which can be reused by
048     * both the <a href="http://camel.apache.org/spring.html">Spring</a>,
049     * <a href="http://camel.apache.org/guice.html">Guice</a> and
050     * <a href="http://camel.apache.org/blueprint.html">Blueprint</a> support.
051     *
052     * @version 
053     */
054    public class CamelPostProcessorHelper implements CamelContextAware {
055        private static final transient Logger LOG = LoggerFactory.getLogger(CamelPostProcessorHelper.class);
056    
057        @XmlTransient
058        private CamelContext camelContext;
059    
060        public CamelPostProcessorHelper() {
061        }
062    
063        public CamelPostProcessorHelper(CamelContext camelContext) {
064            this.setCamelContext(camelContext);
065        }
066    
067        public CamelContext getCamelContext() {
068            return camelContext;
069        }
070    
071        public void setCamelContext(CamelContext camelContext) {
072            this.camelContext = camelContext;
073        }
074    
075        /**
076         * Does the given context match this camel context
077         */
078        public boolean matchContext(String context) {
079            if (ObjectHelper.isNotEmpty(context)) {
080                if (!getCamelContext().getName().equals(context)) {
081                    return false;
082                }
083            }
084            return true;
085        }
086    
087        public void consumerInjection(Method method, Object bean, String beanName) {
088            Consume consume = method.getAnnotation(Consume.class);
089            if (consume != null && matchContext(consume.context())) {
090                LOG.info("Creating a consumer for: " + consume);
091                subscribeMethod(method, bean, beanName, consume.uri(), consume.ref());
092            }
093        }
094    
095        public void subscribeMethod(Method method, Object bean, String beanName, String endpointUri, String endpointName) {
096            // lets bind this method to a listener
097            String injectionPointName = method.getName();
098            Endpoint endpoint = getEndpointInjection(endpointUri, endpointName, injectionPointName, true);
099            if (endpoint != null) {
100                try {
101                    Processor processor = createConsumerProcessor(bean, method, endpoint);
102                    Consumer consumer = endpoint.createConsumer(processor);
103                    LOG.debug("Created processor: {} for consumer: {}", processor, consumer);
104                    startService(consumer, bean, beanName);
105                } catch (Exception e) {
106                    throw ObjectHelper.wrapRuntimeCamelException(e);
107                }
108            }
109        }
110    
111        /**
112         * Stats the given service
113         */
114        protected void startService(Service service, Object bean, String beanName) throws Exception {
115            if (isSingleton(bean, beanName)) {
116                getCamelContext().addService(service);
117            } else {
118                LOG.debug("Service is not singleton so you must remember to stop it manually {}", service);
119                ServiceHelper.startService(service);
120            }
121        }
122    
123        /**
124         * Create a processor which invokes the given method when an incoming
125         * message exchange is received
126         */
127        protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) {
128            BeanInfo info = new BeanInfo(getCamelContext(), method);
129            BeanProcessor answer = new BeanProcessor(pojo, info);
130            // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked
131            return new UnitOfWorkProcessor(answer);
132        }
133    
134        protected Endpoint getEndpointInjection(String uri, String name, String injectionPointName, boolean mandatory) {
135            return CamelContextHelper.getEndpointInjection(getCamelContext(), uri, name, injectionPointName, mandatory);
136        }
137    
138        /**
139         * Creates the object to be injected for an {@link org.apache.camel.EndpointInject} or {@link org.apache.camel.Produce} injection point
140         */
141        public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String injectionPointName,
142                                        Object bean, String beanName) {
143            if (type.isAssignableFrom(ProducerTemplate.class)) {
144                return createInjectionProducerTemplate(endpointUri, endpointRef, injectionPointName);
145            } else if (type.isAssignableFrom(ConsumerTemplate.class)) {
146                return createInjectionConsumerTemplate(endpointUri, endpointRef, injectionPointName);
147            } else {
148                Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, true);
149                if (endpoint != null) {
150                    if (type.isInstance(endpoint)) {
151                        return endpoint;
152                    } else if (type.isAssignableFrom(Producer.class)) {
153                        return createInjectionProducer(endpoint, bean, beanName);
154                    } else if (type.isAssignableFrom(PollingConsumer.class)) {
155                        return createInjectionPollingConsumer(endpoint, bean, beanName);
156                    } else if (type.isInterface()) {
157                        // lets create a proxy
158                        try {
159                            return ProxyHelper.createProxy(endpoint, type);
160                        } catch (Exception e) {
161                            throw createProxyInstantiationRuntimeException(type, endpoint, e);
162                        }
163                    } else {
164                        throw new IllegalArgumentException("Invalid type: " + type.getName()
165                                + " which cannot be injected via @EndpointInject/@Produce for: " + endpoint);
166                    }
167                }
168                return null;
169            }
170        }
171    
172        /**
173         * Factory method to create a {@link org.apache.camel.ProducerTemplate} to be injected into a POJO
174         */
175        protected ProducerTemplate createInjectionProducerTemplate(String endpointUri, String endpointRef, String injectionPointName) {
176            // endpoint is optional for this injection point
177            Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, false);
178            ProducerTemplate answer = new DefaultProducerTemplate(getCamelContext(), endpoint);
179            // start the template so its ready to use
180            try {
181                answer.start();
182            } catch (Exception e) {
183                throw ObjectHelper.wrapRuntimeCamelException(e);
184            }
185            return answer;
186        }
187    
188        /**
189         * Factory method to create a {@link org.apache.camel.ConsumerTemplate} to be injected into a POJO
190         */
191        protected ConsumerTemplate createInjectionConsumerTemplate(String endpointUri, String endpointRef, String injectionPointName) {
192            ConsumerTemplate answer = new DefaultConsumerTemplate(getCamelContext());
193            // start the template so its ready to use
194            try {
195                answer.start();
196            } catch (Exception e) {
197                throw ObjectHelper.wrapRuntimeCamelException(e);
198            }
199            return answer;
200        }
201    
202        /**
203         * Factory method to create a started {@link org.apache.camel.PollingConsumer} to be injected into a POJO
204         */
205        protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint, Object bean, String beanName) {
206            try {
207                PollingConsumer pollingConsumer = endpoint.createPollingConsumer();
208                startService(pollingConsumer, bean, beanName);
209                return pollingConsumer;
210            } catch (Exception e) {
211                throw ObjectHelper.wrapRuntimeCamelException(e);
212            }
213        }
214    
215        /**
216         * A Factory method to create a started {@link org.apache.camel.Producer} to be injected into a POJO
217         */
218        protected Producer createInjectionProducer(Endpoint endpoint, Object bean, String beanName) {
219            try {
220                Producer producer = endpoint.createProducer();
221                startService(producer, bean, beanName);
222                return new UnitOfWorkProducer(producer);
223            } catch (Exception e) {
224                throw ObjectHelper.wrapRuntimeCamelException(e);
225            }
226        }
227    
228        protected RuntimeException createProxyInstantiationRuntimeException(Class<?> type, Endpoint endpoint, Exception e) {
229            return new ProxyInstantiationException(type, endpoint, e);
230        }
231    
232        /**
233         * Implementations can override this method to determine if the bean is singleton.
234         *
235         * @param bean the bean
236         * @return <tt>true</tt> if its singleton scoped, for prototype scoped <tt>false</tt> is returned.
237         */
238        protected boolean isSingleton(Object bean, String beanName) {
239            if (bean instanceof IsSingleton) {
240                IsSingleton singleton = (IsSingleton) bean;
241                return singleton.isSingleton();
242            }
243            return true;
244        }
245    }