001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.lang.reflect.Method; 020 import javax.xml.bind.annotation.XmlTransient; 021 022 import org.apache.camel.CamelContext; 023 import org.apache.camel.CamelContextAware; 024 import org.apache.camel.Consume; 025 import org.apache.camel.Consumer; 026 import org.apache.camel.ConsumerTemplate; 027 import org.apache.camel.Endpoint; 028 import org.apache.camel.IsSingleton; 029 import org.apache.camel.PollingConsumer; 030 import org.apache.camel.Processor; 031 import org.apache.camel.Producer; 032 import org.apache.camel.ProducerTemplate; 033 import org.apache.camel.ProxyInstantiationException; 034 import org.apache.camel.Service; 035 import org.apache.camel.component.bean.BeanInfo; 036 import org.apache.camel.component.bean.BeanProcessor; 037 import org.apache.camel.component.bean.ProxyHelper; 038 import org.apache.camel.processor.UnitOfWorkProcessor; 039 import org.apache.camel.processor.UnitOfWorkProducer; 040 import org.apache.camel.util.CamelContextHelper; 041 import org.apache.camel.util.ObjectHelper; 042 import org.apache.camel.util.ServiceHelper; 043 import org.slf4j.Logger; 044 import org.slf4j.LoggerFactory; 045 046 /** 047 * A helper class for Camel based injector or post processing hooks which can be reused by 048 * both the <a href="http://camel.apache.org/spring.html">Spring</a>, 049 * <a href="http://camel.apache.org/guice.html">Guice</a> and 050 * <a href="http://camel.apache.org/blueprint.html">Blueprint</a> support. 051 * 052 * @version 053 */ 054 public class CamelPostProcessorHelper implements CamelContextAware { 055 private static final transient Logger LOG = LoggerFactory.getLogger(CamelPostProcessorHelper.class); 056 057 @XmlTransient 058 private CamelContext camelContext; 059 060 public CamelPostProcessorHelper() { 061 } 062 063 public CamelPostProcessorHelper(CamelContext camelContext) { 064 this.setCamelContext(camelContext); 065 } 066 067 public CamelContext getCamelContext() { 068 return camelContext; 069 } 070 071 public void setCamelContext(CamelContext camelContext) { 072 this.camelContext = camelContext; 073 } 074 075 /** 076 * Does the given context match this camel context 077 */ 078 public boolean matchContext(String context) { 079 if (ObjectHelper.isNotEmpty(context)) { 080 if (!getCamelContext().getName().equals(context)) { 081 return false; 082 } 083 } 084 return true; 085 } 086 087 public void consumerInjection(Method method, Object bean, String beanName) { 088 Consume consume = method.getAnnotation(Consume.class); 089 if (consume != null && matchContext(consume.context())) { 090 LOG.info("Creating a consumer for: " + consume); 091 subscribeMethod(method, bean, beanName, consume.uri(), consume.ref()); 092 } 093 } 094 095 public void subscribeMethod(Method method, Object bean, String beanName, String endpointUri, String endpointName) { 096 // lets bind this method to a listener 097 String injectionPointName = method.getName(); 098 Endpoint endpoint = getEndpointInjection(endpointUri, endpointName, injectionPointName, true); 099 if (endpoint != null) { 100 try { 101 Processor processor = createConsumerProcessor(bean, method, endpoint); 102 Consumer consumer = endpoint.createConsumer(processor); 103 LOG.debug("Created processor: {} for consumer: {}", processor, consumer); 104 startService(consumer, bean, beanName); 105 } catch (Exception e) { 106 throw ObjectHelper.wrapRuntimeCamelException(e); 107 } 108 } 109 } 110 111 /** 112 * Stats the given service 113 */ 114 protected void startService(Service service, Object bean, String beanName) throws Exception { 115 if (isSingleton(bean, beanName)) { 116 getCamelContext().addService(service); 117 } else { 118 LOG.debug("Service is not singleton so you must remember to stop it manually {}", service); 119 ServiceHelper.startService(service); 120 } 121 } 122 123 /** 124 * Create a processor which invokes the given method when an incoming 125 * message exchange is received 126 */ 127 protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) { 128 BeanInfo info = new BeanInfo(getCamelContext(), method); 129 BeanProcessor answer = new BeanProcessor(pojo, info); 130 // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked 131 return new UnitOfWorkProcessor(answer); 132 } 133 134 protected Endpoint getEndpointInjection(String uri, String name, String injectionPointName, boolean mandatory) { 135 return CamelContextHelper.getEndpointInjection(getCamelContext(), uri, name, injectionPointName, mandatory); 136 } 137 138 /** 139 * Creates the object to be injected for an {@link org.apache.camel.EndpointInject} or {@link org.apache.camel.Produce} injection point 140 */ 141 public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String injectionPointName, 142 Object bean, String beanName) { 143 if (type.isAssignableFrom(ProducerTemplate.class)) { 144 return createInjectionProducerTemplate(endpointUri, endpointRef, injectionPointName); 145 } else if (type.isAssignableFrom(ConsumerTemplate.class)) { 146 return createInjectionConsumerTemplate(endpointUri, endpointRef, injectionPointName); 147 } else { 148 Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, true); 149 if (endpoint != null) { 150 if (type.isInstance(endpoint)) { 151 return endpoint; 152 } else if (type.isAssignableFrom(Producer.class)) { 153 return createInjectionProducer(endpoint, bean, beanName); 154 } else if (type.isAssignableFrom(PollingConsumer.class)) { 155 return createInjectionPollingConsumer(endpoint, bean, beanName); 156 } else if (type.isInterface()) { 157 // lets create a proxy 158 try { 159 return ProxyHelper.createProxy(endpoint, type); 160 } catch (Exception e) { 161 throw createProxyInstantiationRuntimeException(type, endpoint, e); 162 } 163 } else { 164 throw new IllegalArgumentException("Invalid type: " + type.getName() 165 + " which cannot be injected via @EndpointInject/@Produce for: " + endpoint); 166 } 167 } 168 return null; 169 } 170 } 171 172 /** 173 * Factory method to create a {@link org.apache.camel.ProducerTemplate} to be injected into a POJO 174 */ 175 protected ProducerTemplate createInjectionProducerTemplate(String endpointUri, String endpointRef, String injectionPointName) { 176 // endpoint is optional for this injection point 177 Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, false); 178 ProducerTemplate answer = new DefaultProducerTemplate(getCamelContext(), endpoint); 179 // start the template so its ready to use 180 try { 181 answer.start(); 182 } catch (Exception e) { 183 throw ObjectHelper.wrapRuntimeCamelException(e); 184 } 185 return answer; 186 } 187 188 /** 189 * Factory method to create a {@link org.apache.camel.ConsumerTemplate} to be injected into a POJO 190 */ 191 protected ConsumerTemplate createInjectionConsumerTemplate(String endpointUri, String endpointRef, String injectionPointName) { 192 ConsumerTemplate answer = new DefaultConsumerTemplate(getCamelContext()); 193 // start the template so its ready to use 194 try { 195 answer.start(); 196 } catch (Exception e) { 197 throw ObjectHelper.wrapRuntimeCamelException(e); 198 } 199 return answer; 200 } 201 202 /** 203 * Factory method to create a started {@link org.apache.camel.PollingConsumer} to be injected into a POJO 204 */ 205 protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint, Object bean, String beanName) { 206 try { 207 PollingConsumer pollingConsumer = endpoint.createPollingConsumer(); 208 startService(pollingConsumer, bean, beanName); 209 return pollingConsumer; 210 } catch (Exception e) { 211 throw ObjectHelper.wrapRuntimeCamelException(e); 212 } 213 } 214 215 /** 216 * A Factory method to create a started {@link org.apache.camel.Producer} to be injected into a POJO 217 */ 218 protected Producer createInjectionProducer(Endpoint endpoint, Object bean, String beanName) { 219 try { 220 Producer producer = endpoint.createProducer(); 221 startService(producer, bean, beanName); 222 return new UnitOfWorkProducer(producer); 223 } catch (Exception e) { 224 throw ObjectHelper.wrapRuntimeCamelException(e); 225 } 226 } 227 228 protected RuntimeException createProxyInstantiationRuntimeException(Class<?> type, Endpoint endpoint, Exception e) { 229 return new ProxyInstantiationException(type, endpoint, e); 230 } 231 232 /** 233 * Implementations can override this method to determine if the bean is singleton. 234 * 235 * @param bean the bean 236 * @return <tt>true</tt> if its singleton scoped, for prototype scoped <tt>false</tt> is returned. 237 */ 238 protected boolean isSingleton(Object bean, String beanName) { 239 if (bean instanceof IsSingleton) { 240 IsSingleton singleton = (IsSingleton) bean; 241 return singleton.isSingleton(); 242 } 243 return true; 244 } 245 }