001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.CountDownLatch;
022
023import org.apache.camel.AsyncCallback;
024import org.apache.camel.AsyncProcessor;
025import org.apache.camel.DelegateProcessor;
026import org.apache.camel.Exchange;
027import org.apache.camel.Navigate;
028import org.apache.camel.Processor;
029import org.apache.camel.spi.AsyncProcessorAwaitManager;
030import org.apache.camel.support.ServiceSupport;
031import org.apache.camel.util.AsyncProcessorConverterHelper;
032import org.apache.camel.util.ServiceHelper;
033
034/**
035 * A Delegate pattern which delegates processing to a nested {@link AsyncProcessor} which can
036 * be useful for implementation inheritance when writing an {@link org.apache.camel.spi.Policy}
037 * <p/>
038 * <b>Important:</b> This implementation <b>does</b> support the asynchronous routing engine.
039 * If you are implementing a EIP pattern please use this as the delegate.
040 *
041 * @version
042 * @see DelegateSyncProcessor
043 * @see org.apache.camel.processor.DelegateProcessor
044 */
045public class DelegateAsyncProcessor extends ServiceSupport implements DelegateProcessor, AsyncProcessor, Navigate<Processor> {
046    protected AsyncProcessor processor;
047
048    public DelegateAsyncProcessor() {
049    }
050
051    public DelegateAsyncProcessor(AsyncProcessor processor) {
052        if (processor == this) {
053            throw new IllegalArgumentException("Recursive DelegateAsyncProcessor!");
054        }
055        this.processor = processor;
056    }
057
058    public DelegateAsyncProcessor(Processor processor) {
059        this(AsyncProcessorConverterHelper.convert(processor));
060    }
061
062    @Override
063    public String toString() {
064        return "DelegateAsync[" + processor + "]";
065    }
066
067    public AsyncProcessor getProcessor() {
068        return processor;
069    }
070
071    public void setProcessor(AsyncProcessor processor) {
072        this.processor = processor;
073    }
074
075    public void setProcessor(Processor processor) {
076        this.processor = AsyncProcessorConverterHelper.convert(processor);
077    }
078
079    protected void doStart() throws Exception {
080        ServiceHelper.startServices(processor);
081    }
082
083    protected void doStop() throws Exception {
084        ServiceHelper.stopServices(processor);
085    }
086
087    protected void doShutdown() throws Exception {
088        ServiceHelper.stopAndShutdownServices(processor);
089    }
090
091    public void process(Exchange exchange) throws Exception {
092        // inline org.apache.camel.util.AsyncProcessorHelper.process(org.apache.camel.AsyncProcessor, org.apache.camel.Exchange)
093        // to optimize and reduce stacktrace lengths
094        final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
095        final CountDownLatch latch = new CountDownLatch(1);
096        // call the asynchronous method and wait for it to be done
097        boolean sync = process(exchange, new AsyncCallback() {
098            public void done(boolean doneSync) {
099                if (!doneSync) {
100                    awaitManager.countDown(exchange, latch);
101                }
102            }
103        });
104        if (!sync) {
105            awaitManager.await(exchange, latch);
106        }
107    }
108
109    public boolean process(final Exchange exchange, final AsyncCallback callback) {
110        return processor.process(exchange, callback);
111    }
112
113    /**
114     * @deprecated use {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} instead
115     */
116    @Deprecated
117    protected boolean processNext(Exchange exchange, AsyncCallback callback) {
118        throw new UnsupportedOperationException("This method is deprecated, use process(Exchange, AsyncCallback) instead");
119    }
120
121    public boolean hasNext() {
122        return processor != null;
123    }
124
125    public List<Processor> next() {
126        if (!hasNext()) {
127            return null;
128        }
129        List<Processor> answer = new ArrayList<Processor>(1);
130        answer.add(processor);
131        return answer;
132    }
133
134}