001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.lang.reflect.Method;
020import java.util.ArrayList;
021import java.util.LinkedHashMap;
022import java.util.List;
023import java.util.Map;
024
025import org.apache.camel.AsyncCallback;
026import org.apache.camel.AsyncProcessor;
027import org.apache.camel.Endpoint;
028import org.apache.camel.Exchange;
029import org.apache.camel.Navigate;
030import org.apache.camel.Predicate;
031import org.apache.camel.Processor;
032import org.apache.camel.builder.PredicateBuilder;
033import org.apache.camel.component.bean.BeanInfo;
034import org.apache.camel.component.bean.BeanProcessor;
035import org.apache.camel.processor.CamelInternalProcessor;
036import org.apache.camel.support.ServiceSupport;
037import org.apache.camel.util.AsyncProcessorHelper;
038import org.apache.camel.util.ObjectHelper;
039import org.apache.camel.util.ServiceHelper;
040
041/**
042 * A {@link Processor} which is used for POJO @Consume where you can have multiple @Consume on the same endpoint/consumer
043 * and via predicate's can filter and call different methods.
044 */
045public final class SubscribeMethodProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor> {
046
047    private final Endpoint endpoint;
048    private final Map<AsyncProcessor, Predicate> methods = new LinkedHashMap<>();
049
050    public SubscribeMethodProcessor(Endpoint endpoint) {
051        this.endpoint = endpoint;
052    }
053
054    public Endpoint getEndpoint() {
055        return endpoint;
056    }
057
058    protected void addMethod(final Object pojo, final Method method, final Endpoint endpoint, String predicate) {
059        BeanInfo info = new BeanInfo(endpoint.getCamelContext(), method);
060        BeanProcessor answer = new BeanProcessor(pojo, info);
061        // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked
062        CamelInternalProcessor internal = new CamelInternalProcessor(answer);
063        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
064
065        Predicate p;
066        if (ObjectHelper.isEmpty(predicate)) {
067            p = PredicateBuilder.constant(true);
068        } else {
069            p = endpoint.getCamelContext().resolveLanguage("simple").createPredicate(predicate);
070        }
071        methods.put(internal, p);
072    }
073
074    @Override
075    public void process(Exchange exchange) throws Exception {
076        AsyncProcessorHelper.process(this, exchange);
077    }
078
079    @Override
080    public boolean process(Exchange exchange, AsyncCallback callback) {
081        try {
082            // evaluate which predicate matches and call the method
083            for (Map.Entry<AsyncProcessor, Predicate> entry : methods.entrySet()) {
084                Predicate predicate = entry.getValue();
085                if (predicate.matches(exchange)) {
086                    return entry.getKey().process(exchange, callback);
087                }
088            }
089        } catch (Throwable e) {
090            exchange.setException(e);
091        }
092        callback.done(true);
093        return true;
094    }
095
096    @Override
097    protected void doStart() throws Exception {
098        ServiceHelper.startServices(methods.keySet());
099    }
100
101    @Override
102    protected void doStop() throws Exception {
103        ServiceHelper.stopServices(methods.keySet());
104    }
105
106    @Override
107    protected void doShutdown() throws Exception {
108        ServiceHelper.stopAndShutdownServices(methods.keySet());
109    }
110
111    @Override
112    public String toString() {
113        return "SubscribeMethodProcessor[" + endpoint + "]";
114    }
115
116    @Override
117    public List<Processor> next() {
118        return new ArrayList<>(methods.keySet());
119    }
120
121    @Override
122    public boolean hasNext() {
123        return !methods.isEmpty();
124    }
125}