001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.directvm;
018
019import org.apache.camel.AsyncCallback;
020import org.apache.camel.Exchange;
021import org.apache.camel.impl.DefaultAsyncProducer;
022import org.apache.camel.util.StopWatch;
023import org.slf4j.Logger;
024import org.slf4j.LoggerFactory;
025
026/**
027 * The direct producer.
028 * <p/>
029 * If blocking is enabled ({@code DirectEndpoint#isBlock}) then the
030 * DirectEndpoint will create an instance of this class instead of
031 * {@code DirectProducer}. This producers {@code process} method will block for
032 * the configured duration ({@code DirectEndpoint#getTimeout}, default to 30
033 * seconds). After which if a consumer is still unavailable a
034 * DirectConsumerNotAvailableException will be thrown.
035 * <p/>
036 * Implementation note: Concurrent Producers will block for the duration it
037 * takes to determine if a consumer is available, but actual consumer execution
038 * will happen concurrently.
039 */
040public class DirectVmBlockingProducer extends DefaultAsyncProducer {
041    private static final Logger LOG = LoggerFactory.getLogger(DirectVmBlockingProducer.class);
042    private final DirectVmEndpoint endpoint;
043
044    public DirectVmBlockingProducer(DirectVmEndpoint endpoint) {
045        super(endpoint);
046        this.endpoint = endpoint;
047    }
048
049    public void process(Exchange exchange) throws Exception {
050        getConsumer(exchange).getProcessor().process(exchange);
051    }
052
053    public boolean process(Exchange exchange, AsyncCallback callback) {
054        try {
055            return getConsumer(exchange).getAsyncProcessor().process(exchange, callback);
056        } catch (Exception e) {
057            exchange.setException(e);
058            callback.done(true);
059            return true;
060        }
061    }
062
063    protected DirectVmConsumer getConsumer(Exchange exchange) throws Exception {
064        DirectVmConsumer answer = endpoint.getConsumer();
065        if (answer == null) {
066            // okay then await until we have a consumer or we timed out
067            if (endpoint.isFailIfNoConsumers()) {
068                throw new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
069            } else {
070                answer = awaitConsumer();
071                if (answer == null) {
072                    throw new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
073                }
074            }
075        }
076
077        return answer;
078    }
079
080    private DirectVmConsumer awaitConsumer() throws InterruptedException {
081        DirectVmConsumer answer = null;
082
083        StopWatch watch = new StopWatch();
084        boolean done = false;
085        while (!done) {
086            // sleep a bit to give chance for the consumer to be ready
087            Thread.sleep(500);
088            if (LOG.isDebugEnabled()) {
089                LOG.debug("Waited {} for consumer to be ready", watch.taken());
090            }
091
092            answer = endpoint.getConsumer();
093            if (answer != null) {
094                return answer;
095            }
096            // we are done if we hit the timeout
097            done = watch.taken() >= endpoint.getTimeout();
098        }
099        return answer;
100    }
101
102}