001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.bean;
018
019import java.lang.annotation.Annotation;
020import java.lang.reflect.InvocationHandler;
021import java.lang.reflect.Method;
022import java.lang.reflect.Type;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Future;
031import java.util.concurrent.FutureTask;
032
033import org.apache.camel.Body;
034import org.apache.camel.CamelContext;
035import org.apache.camel.CamelExchangeException;
036import org.apache.camel.Endpoint;
037import org.apache.camel.Exchange;
038import org.apache.camel.ExchangePattern;
039import org.apache.camel.ExchangeProperty;
040import org.apache.camel.Header;
041import org.apache.camel.Headers;
042import org.apache.camel.InvalidPayloadException;
043import org.apache.camel.Producer;
044import org.apache.camel.RuntimeCamelException;
045import org.apache.camel.impl.DefaultExchange;
046import org.apache.camel.util.ObjectHelper;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050public abstract class AbstractCamelInvocationHandler implements InvocationHandler {
051
052    private static final Logger LOG = LoggerFactory.getLogger(CamelInvocationHandler.class);
053    private static final List<Method> EXCLUDED_METHODS = new ArrayList<Method>();
054    private static ExecutorService executorService;
055    protected final Endpoint endpoint;
056    protected final Producer producer;
057
058    static {
059        // exclude all java.lang.Object methods as we dont want to invoke them
060        EXCLUDED_METHODS.addAll(Arrays.asList(Object.class.getMethods()));
061    }
062
063    public AbstractCamelInvocationHandler(Endpoint endpoint, Producer producer) {
064        this.endpoint = endpoint;
065        this.producer = producer;
066    }
067
068    private static Object getBody(Exchange exchange, Class<?> type) throws InvalidPayloadException {
069        // get the body from the Exchange from either OUT or IN
070        if (exchange.hasOut()) {
071            if (exchange.getOut().getBody() != null) {
072                return exchange.getOut().getMandatoryBody(type);
073            } else {
074                return null;
075            }
076        } else {
077            if (exchange.getIn().getBody() != null) {
078                return exchange.getIn().getMandatoryBody(type);
079            } else {
080                return null;
081            }
082        }
083    }
084
085    @Override
086    public final Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
087        if (isValidMethod(method)) {
088            return doInvokeProxy(proxy, method, args);
089        } else {
090            // invalid method then invoke methods on this instead
091            if ("toString".equals(method.getName())) {
092                return this.toString();
093            } else if ("hashCode".equals(method.getName())) {
094                return this.hashCode();
095            } else if ("equals".equals(method.getName())) {
096                return Boolean.FALSE;
097            }
098            return null;
099        }
100    }
101
102    public abstract Object doInvokeProxy(final Object proxy, final Method method, final Object[] args) throws Throwable;
103
104    @SuppressWarnings("unchecked")
105    protected Object invokeProxy(final Method method, final ExchangePattern pattern, Object[] args, boolean binding) throws Throwable {
106        final Exchange exchange = new DefaultExchange(endpoint, pattern);
107
108        if (binding) {
109            // in binding mode we bind the passed in arguments (args) to the created exchange
110            // using the existing Camel @Body, @Header, @Headers, @ExchangeProperty annotations
111            // if no annotation then its bound as the message body
112            int index = 0;
113            for (Annotation[] row : method.getParameterAnnotations()) {
114                Object value = args[index];
115                if (row == null || row.length == 0) {
116                    // assume its message body when there is no annotations
117                    exchange.getIn().setBody(value);
118                } else {
119                    for (Annotation ann : row) {
120                        if (ann.annotationType().isAssignableFrom(Header.class)) {
121                            Header header = (Header) ann;
122                            String name = header.value();
123                            exchange.getIn().setHeader(name, value);
124                        } else if (ann.annotationType().isAssignableFrom(Headers.class)) {
125                            Map map = exchange.getContext().getTypeConverter().tryConvertTo(Map.class, exchange, value);
126                            if (map != null) {
127                                exchange.getIn().getHeaders().putAll(map);
128                            }
129                        } else if (ann.annotationType().isAssignableFrom(ExchangeProperty.class)) {
130                            ExchangeProperty ep = (ExchangeProperty) ann;
131                            String name = ep.value();
132                            exchange.setProperty(name, value);
133                        } else if (ann.annotationType().isAssignableFrom(Body.class)) {
134                            exchange.getIn().setBody(value);
135                        } else {
136                            // assume its message body when there is no annotations
137                            exchange.getIn().setBody(value);
138                        }
139                    }
140                }
141                index++;
142            }
143        } else {
144            // no binding so use the old behavior with BeanInvocation as the body
145            BeanInvocation invocation = new BeanInvocation(method, args);
146            exchange.getIn().setBody(invocation);
147        }
148
149        if (binding) {
150            LOG.trace("Binding to service interface as @Body,@Header,@ExchangeProperty detected when calling proxy method: {}", method);
151        } else {
152            LOG.trace("No binding to service interface as @Body,@Header,@ExchangeProperty not detected. Using BeanInvocation as message body when calling proxy method: {}");
153        }
154
155        return doInvoke(method, exchange);
156    }
157
158    protected Object invokeWithBody(final Method method, Object body, final ExchangePattern pattern) throws Throwable {
159        final Exchange exchange = new DefaultExchange(endpoint, pattern);
160        exchange.getIn().setBody(body);
161
162        return doInvoke(method, exchange);
163    }
164
165    protected Object doInvoke(final Method method, final Exchange exchange) throws Throwable {
166
167        // is the return type a future
168        final boolean isFuture = method.getReturnType() == Future.class;
169
170        // create task to execute the proxy and gather the reply
171        FutureTask<Object> task = new FutureTask<Object>(new Callable<Object>() {
172            public Object call() throws Exception {
173                // process the exchange
174                LOG.trace("Proxied method call {} invoking producer: {}", method.getName(), producer);
175                producer.process(exchange);
176
177                Object answer = afterInvoke(method, exchange, exchange.getPattern(), isFuture);
178                LOG.trace("Proxied method call {} returning: {}", method.getName(), answer);
179                return answer;
180            }
181        });
182
183        if (isFuture) {
184            // submit task and return future
185            if (LOG.isTraceEnabled()) {
186                LOG.trace("Submitting task for exchange id {}", exchange.getExchangeId());
187            }
188            getExecutorService(exchange.getContext()).submit(task);
189            return task;
190        } else {
191            // execute task now
192            try {
193                task.run();
194                return task.get();
195            } catch (ExecutionException e) {
196                // we don't want the wrapped exception from JDK
197                throw e.getCause();
198            }
199        }
200    }
201
202    protected Object afterInvoke(Method method, Exchange exchange, ExchangePattern pattern, boolean isFuture) throws Exception {
203        // check if we had an exception
204        Throwable cause = exchange.getException();
205        if (cause != null) {
206            Throwable found = findSuitableException(cause, method);
207            if (found != null) {
208                if (found instanceof Exception) {
209                    throw (Exception)found;
210                } else {
211                    // wrap as exception
212                    throw new CamelExchangeException("Error processing exchange", exchange, cause);
213                }
214            }
215            // special for runtime camel exceptions as they can be nested
216            if (cause instanceof RuntimeCamelException) {
217                // if the inner cause is a runtime exception we can throw it
218                // directly
219                if (cause.getCause() instanceof RuntimeException) {
220                    throw (RuntimeException)((RuntimeCamelException)cause).getCause();
221                }
222                throw (RuntimeCamelException)cause;
223            }
224            // okay just throw the exception as is
225            if (cause instanceof Exception) {
226                throw (Exception)cause;
227            } else {
228                // wrap as exception
229                throw new CamelExchangeException("Error processing exchange", exchange, cause);
230            }
231        }
232
233        Class<?> to = isFuture ? getGenericType(exchange.getContext(), method.getGenericReturnType()) : method.getReturnType();
234
235        // do not return a reply if the method is VOID
236        if (to == Void.TYPE) {
237            return null;
238        }
239
240        return getBody(exchange, to);
241    }
242
243    protected static Class<?> getGenericType(CamelContext context, Type type) throws ClassNotFoundException {
244        if (type == null) {
245            // fallback and use object
246            return Object.class;
247        }
248
249        // unfortunately java dont provide a nice api for getting the generic
250        // type of the return type
251        // due type erasure, so we have to gather it based on a String
252        // representation
253        String name = ObjectHelper.between(type.toString(), "<", ">");
254        if (name != null) {
255            if (name.contains("<")) {
256                // we only need the outer type
257                name = ObjectHelper.before(name, "<");
258            }
259            return context.getClassResolver().resolveMandatoryClass(name);
260        } else {
261            // fallback and use object
262            return Object.class;
263        }
264    }
265
266    @SuppressWarnings("deprecation")
267    protected static synchronized ExecutorService getExecutorService(CamelContext context) {
268        // CamelContext will shutdown thread pool when it shutdown so we can
269        // lazy create it on demand
270        // but in case of hot-deploy or the likes we need to be able to
271        // re-create it (its a shared static instance)
272        if (executorService == null || executorService.isTerminated() || executorService.isShutdown()) {
273            // try to lookup a pool first based on id/profile
274            executorService = context.getExecutorServiceStrategy().lookup(CamelInvocationHandler.class, "CamelInvocationHandler", "CamelInvocationHandler");
275            if (executorService == null) {
276                executorService = context.getExecutorServiceStrategy().newDefaultThreadPool(CamelInvocationHandler.class, "CamelInvocationHandler");
277            }
278        }
279        return executorService;
280    }
281
282    /**
283     * Tries to find the best suited exception to throw.
284     * <p/>
285     * It looks in the exception hierarchy from the caused exception and matches
286     * this against the declared exceptions being thrown on the method.
287     * 
288     * @param cause the caused exception
289     * @param method the method
290     * @return the exception to throw, or <tt>null</tt> if not possible to find
291     *         a suitable exception
292     */
293    protected Throwable findSuitableException(Throwable cause, Method method) {
294        if (method.getExceptionTypes() == null || method.getExceptionTypes().length == 0) {
295            return null;
296        }
297
298        // see if there is any exception which matches the declared exception on
299        // the method
300        for (Class<?> type : method.getExceptionTypes()) {
301            Object fault = ObjectHelper.getException(type, cause);
302            if (fault != null) {
303                return Throwable.class.cast(fault);
304            }
305        }
306
307        return null;
308    }
309
310    protected boolean isValidMethod(Method method) {
311        // must not be in the excluded list
312        for (Method excluded : EXCLUDED_METHODS) {
313            if (ObjectHelper.isOverridingMethod(excluded, method)) {
314                // the method is overriding an excluded method so its not valid
315                return false;
316            }
317        }
318        return true;
319    }
320
321}