001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.bean; 018 019import java.lang.annotation.Annotation; 020import java.lang.reflect.InvocationHandler; 021import java.lang.reflect.Method; 022import java.lang.reflect.Type; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Future; 031import java.util.concurrent.FutureTask; 032 033import org.apache.camel.Body; 034import org.apache.camel.CamelContext; 035import org.apache.camel.CamelExchangeException; 036import org.apache.camel.Endpoint; 037import org.apache.camel.Exchange; 038import org.apache.camel.ExchangePattern; 039import org.apache.camel.ExchangeProperty; 040import org.apache.camel.Header; 041import org.apache.camel.Headers; 042import org.apache.camel.InvalidPayloadException; 043import org.apache.camel.Producer; 044import org.apache.camel.RuntimeCamelException; 045import org.apache.camel.impl.DefaultExchange; 046import org.apache.camel.util.ObjectHelper; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050public abstract class AbstractCamelInvocationHandler implements InvocationHandler { 051 052 private static final Logger LOG = LoggerFactory.getLogger(CamelInvocationHandler.class); 053 private static final List<Method> EXCLUDED_METHODS = new ArrayList<Method>(); 054 private static ExecutorService executorService; 055 protected final Endpoint endpoint; 056 protected final Producer producer; 057 058 static { 059 // exclude all java.lang.Object methods as we dont want to invoke them 060 EXCLUDED_METHODS.addAll(Arrays.asList(Object.class.getMethods())); 061 } 062 063 public AbstractCamelInvocationHandler(Endpoint endpoint, Producer producer) { 064 this.endpoint = endpoint; 065 this.producer = producer; 066 } 067 068 private static Object getBody(Exchange exchange, Class<?> type) throws InvalidPayloadException { 069 // get the body from the Exchange from either OUT or IN 070 if (exchange.hasOut()) { 071 if (exchange.getOut().getBody() != null) { 072 return exchange.getOut().getMandatoryBody(type); 073 } else { 074 return null; 075 } 076 } else { 077 if (exchange.getIn().getBody() != null) { 078 return exchange.getIn().getMandatoryBody(type); 079 } else { 080 return null; 081 } 082 } 083 } 084 085 @Override 086 public final Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { 087 if (isValidMethod(method)) { 088 return doInvokeProxy(proxy, method, args); 089 } else { 090 // invalid method then invoke methods on this instead 091 if ("toString".equals(method.getName())) { 092 return this.toString(); 093 } else if ("hashCode".equals(method.getName())) { 094 return this.hashCode(); 095 } else if ("equals".equals(method.getName())) { 096 return Boolean.FALSE; 097 } 098 return null; 099 } 100 } 101 102 public abstract Object doInvokeProxy(final Object proxy, final Method method, final Object[] args) throws Throwable; 103 104 @SuppressWarnings("unchecked") 105 protected Object invokeProxy(final Method method, final ExchangePattern pattern, Object[] args, boolean binding) throws Throwable { 106 final Exchange exchange = new DefaultExchange(endpoint, pattern); 107 108 if (binding) { 109 // in binding mode we bind the passed in arguments (args) to the created exchange 110 // using the existing Camel @Body, @Header, @Headers, @ExchangeProperty annotations 111 // if no annotation then its bound as the message body 112 int index = 0; 113 for (Annotation[] row : method.getParameterAnnotations()) { 114 Object value = args[index]; 115 if (row == null || row.length == 0) { 116 // assume its message body when there is no annotations 117 exchange.getIn().setBody(value); 118 } else { 119 for (Annotation ann : row) { 120 if (ann.annotationType().isAssignableFrom(Header.class)) { 121 Header header = (Header) ann; 122 String name = header.value(); 123 exchange.getIn().setHeader(name, value); 124 } else if (ann.annotationType().isAssignableFrom(Headers.class)) { 125 Map map = exchange.getContext().getTypeConverter().tryConvertTo(Map.class, exchange, value); 126 if (map != null) { 127 exchange.getIn().getHeaders().putAll(map); 128 } 129 } else if (ann.annotationType().isAssignableFrom(ExchangeProperty.class)) { 130 ExchangeProperty ep = (ExchangeProperty) ann; 131 String name = ep.value(); 132 exchange.setProperty(name, value); 133 } else if (ann.annotationType().isAssignableFrom(Body.class)) { 134 exchange.getIn().setBody(value); 135 } else { 136 // assume its message body when there is no annotations 137 exchange.getIn().setBody(value); 138 } 139 } 140 } 141 index++; 142 } 143 } else { 144 // no binding so use the old behavior with BeanInvocation as the body 145 BeanInvocation invocation = new BeanInvocation(method, args); 146 exchange.getIn().setBody(invocation); 147 } 148 149 if (binding) { 150 LOG.trace("Binding to service interface as @Body,@Header,@ExchangeProperty detected when calling proxy method: {}", method); 151 } else { 152 LOG.trace("No binding to service interface as @Body,@Header,@ExchangeProperty not detected. Using BeanInvocation as message body when calling proxy method: {}"); 153 } 154 155 return doInvoke(method, exchange); 156 } 157 158 protected Object invokeWithBody(final Method method, Object body, final ExchangePattern pattern) throws Throwable { 159 final Exchange exchange = new DefaultExchange(endpoint, pattern); 160 exchange.getIn().setBody(body); 161 162 return doInvoke(method, exchange); 163 } 164 165 protected Object doInvoke(final Method method, final Exchange exchange) throws Throwable { 166 167 // is the return type a future 168 final boolean isFuture = method.getReturnType() == Future.class; 169 170 // create task to execute the proxy and gather the reply 171 FutureTask<Object> task = new FutureTask<Object>(new Callable<Object>() { 172 public Object call() throws Exception { 173 // process the exchange 174 LOG.trace("Proxied method call {} invoking producer: {}", method.getName(), producer); 175 producer.process(exchange); 176 177 Object answer = afterInvoke(method, exchange, exchange.getPattern(), isFuture); 178 LOG.trace("Proxied method call {} returning: {}", method.getName(), answer); 179 return answer; 180 } 181 }); 182 183 if (isFuture) { 184 // submit task and return future 185 if (LOG.isTraceEnabled()) { 186 LOG.trace("Submitting task for exchange id {}", exchange.getExchangeId()); 187 } 188 getExecutorService(exchange.getContext()).submit(task); 189 return task; 190 } else { 191 // execute task now 192 try { 193 task.run(); 194 return task.get(); 195 } catch (ExecutionException e) { 196 // we don't want the wrapped exception from JDK 197 throw e.getCause(); 198 } 199 } 200 } 201 202 protected Object afterInvoke(Method method, Exchange exchange, ExchangePattern pattern, boolean isFuture) throws Exception { 203 // check if we had an exception 204 Throwable cause = exchange.getException(); 205 if (cause != null) { 206 Throwable found = findSuitableException(cause, method); 207 if (found != null) { 208 if (found instanceof Exception) { 209 throw (Exception)found; 210 } else { 211 // wrap as exception 212 throw new CamelExchangeException("Error processing exchange", exchange, cause); 213 } 214 } 215 // special for runtime camel exceptions as they can be nested 216 if (cause instanceof RuntimeCamelException) { 217 // if the inner cause is a runtime exception we can throw it 218 // directly 219 if (cause.getCause() instanceof RuntimeException) { 220 throw (RuntimeException)((RuntimeCamelException)cause).getCause(); 221 } 222 throw (RuntimeCamelException)cause; 223 } 224 // okay just throw the exception as is 225 if (cause instanceof Exception) { 226 throw (Exception)cause; 227 } else { 228 // wrap as exception 229 throw new CamelExchangeException("Error processing exchange", exchange, cause); 230 } 231 } 232 233 Class<?> to = isFuture ? getGenericType(exchange.getContext(), method.getGenericReturnType()) : method.getReturnType(); 234 235 // do not return a reply if the method is VOID 236 if (to == Void.TYPE) { 237 return null; 238 } 239 240 return getBody(exchange, to); 241 } 242 243 protected static Class<?> getGenericType(CamelContext context, Type type) throws ClassNotFoundException { 244 if (type == null) { 245 // fallback and use object 246 return Object.class; 247 } 248 249 // unfortunately java dont provide a nice api for getting the generic 250 // type of the return type 251 // due type erasure, so we have to gather it based on a String 252 // representation 253 String name = ObjectHelper.between(type.toString(), "<", ">"); 254 if (name != null) { 255 if (name.contains("<")) { 256 // we only need the outer type 257 name = ObjectHelper.before(name, "<"); 258 } 259 return context.getClassResolver().resolveMandatoryClass(name); 260 } else { 261 // fallback and use object 262 return Object.class; 263 } 264 } 265 266 @SuppressWarnings("deprecation") 267 protected static synchronized ExecutorService getExecutorService(CamelContext context) { 268 // CamelContext will shutdown thread pool when it shutdown so we can 269 // lazy create it on demand 270 // but in case of hot-deploy or the likes we need to be able to 271 // re-create it (its a shared static instance) 272 if (executorService == null || executorService.isTerminated() || executorService.isShutdown()) { 273 // try to lookup a pool first based on id/profile 274 executorService = context.getExecutorServiceStrategy().lookup(CamelInvocationHandler.class, "CamelInvocationHandler", "CamelInvocationHandler"); 275 if (executorService == null) { 276 executorService = context.getExecutorServiceStrategy().newDefaultThreadPool(CamelInvocationHandler.class, "CamelInvocationHandler"); 277 } 278 } 279 return executorService; 280 } 281 282 /** 283 * Tries to find the best suited exception to throw. 284 * <p/> 285 * It looks in the exception hierarchy from the caused exception and matches 286 * this against the declared exceptions being thrown on the method. 287 * 288 * @param cause the caused exception 289 * @param method the method 290 * @return the exception to throw, or <tt>null</tt> if not possible to find 291 * a suitable exception 292 */ 293 protected Throwable findSuitableException(Throwable cause, Method method) { 294 if (method.getExceptionTypes() == null || method.getExceptionTypes().length == 0) { 295 return null; 296 } 297 298 // see if there is any exception which matches the declared exception on 299 // the method 300 for (Class<?> type : method.getExceptionTypes()) { 301 Object fault = ObjectHelper.getException(type, cause); 302 if (fault != null) { 303 return Throwable.class.cast(fault); 304 } 305 } 306 307 return null; 308 } 309 310 protected boolean isValidMethod(Method method) { 311 // must not be in the excluded list 312 for (Method excluded : EXCLUDED_METHODS) { 313 if (ObjectHelper.isOverridingMethod(excluded, method)) { 314 // the method is overriding an excluded method so its not valid 315 return false; 316 } 317 } 318 return true; 319 } 320 321}