001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.seda;
018
019import java.util.List;
020import java.util.concurrent.BlockingQueue;
021import java.util.concurrent.CountDownLatch;
022import java.util.concurrent.TimeUnit;
023import java.util.function.Predicate;
024
025import org.apache.camel.AsyncCallback;
026import org.apache.camel.Exchange;
027import org.apache.camel.ExchangeTimedOutException;
028import org.apache.camel.WaitForTaskToComplete;
029import org.apache.camel.impl.DefaultAsyncProducer;
030import org.apache.camel.spi.Synchronization;
031import org.apache.camel.spi.SynchronizationVetoable;
032import org.apache.camel.support.SynchronizationAdapter;
033import org.apache.camel.util.ExchangeHelper;
034
035/**
036 * @version 
037 */
038public class SedaProducer extends DefaultAsyncProducer {
039    
040    /**
041     * @deprecated Better make use of the {@link SedaEndpoint#getQueue()} API which delivers the accurate reference to the queue currently being used.
042     */
043    @Deprecated
044    protected final BlockingQueue<Exchange> queue;
045    private final SedaEndpoint endpoint;
046    private final WaitForTaskToComplete waitForTaskToComplete;
047    private final long timeout;
048    private final boolean blockWhenFull;
049
050    /**
051     * @deprecated Use {@link #SedaProducer(SedaEndpoint, WaitForTaskToComplete, long, boolean) the other constructor}.
052     */
053    @Deprecated
054    public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout) {
055        this(endpoint, waitForTaskToComplete, timeout, false);
056    }
057
058    /**
059     * @deprecated Use {@link #SedaProducer(SedaEndpoint, WaitForTaskToComplete, long, boolean) the other constructor}.
060     */
061    @Deprecated
062    public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull) {
063        this(endpoint, waitForTaskToComplete, timeout, blockWhenFull);
064    }
065
066    public SedaProducer(SedaEndpoint endpoint, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull) {
067        super(endpoint);
068        this.queue = endpoint.getQueue();
069        this.endpoint = endpoint;
070        this.waitForTaskToComplete = waitForTaskToComplete;
071        this.timeout = timeout;
072        this.blockWhenFull = blockWhenFull;
073    }
074
075    @Override
076    public boolean process(final Exchange exchange, final AsyncCallback callback) {
077        WaitForTaskToComplete wait = waitForTaskToComplete;
078        if (exchange.getProperty(Exchange.ASYNC_WAIT) != null) {
079            wait = exchange.getProperty(Exchange.ASYNC_WAIT, WaitForTaskToComplete.class);
080        }
081
082        if (wait == WaitForTaskToComplete.Always
083            || (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))) {
084
085            // do not handover the completion as we wait for the copy to complete, and copy its result back when it done
086            Exchange copy = prepareCopy(exchange, false);
087
088            // latch that waits until we are complete
089            final CountDownLatch latch = new CountDownLatch(1);
090
091            // we should wait for the reply so install a on completion so we know when its complete
092            copy.addOnCompletion(new SynchronizationAdapter() {
093                @Override
094                public void onDone(Exchange response) {
095                    // check for timeout, which then already would have invoked the latch
096                    if (latch.getCount() == 0) {
097                        if (log.isTraceEnabled()) {
098                            log.trace("{}. Timeout occurred so response will be ignored: {}", this, response.hasOut() ? response.getOut() : response.getIn());
099                        }
100                        return;
101                    } else {
102                        if (log.isTraceEnabled()) {
103                            log.trace("{} with response: {}", this, response.hasOut() ? response.getOut() : response.getIn());
104                        }
105                        try {
106                            ExchangeHelper.copyResults(exchange, response);
107                        } finally {
108                            // always ensure latch is triggered
109                            latch.countDown();
110                        }
111                    }
112                }
113
114                @Override
115                public boolean allowHandover() {
116                    // do not allow handover as we want to seda producer to have its completion triggered
117                    // at this point in the routing (at this leg), instead of at the very last (this ensure timeout is honored)
118                    return false;
119                }
120
121                @Override
122                public String toString() {
123                    return "onDone at endpoint: " + endpoint;
124                }
125            });
126
127            log.trace("Adding Exchange to queue: {}", copy);
128            try {
129                // do not copy as we already did the copy
130                addToQueue(copy, false);
131            } catch (SedaConsumerNotAvailableException e) {
132                exchange.setException(e);
133                callback.done(true);
134                return true;
135            }
136
137            if (timeout > 0) {
138                if (log.isTraceEnabled()) {
139                    log.trace("Waiting for task to complete using timeout (ms): {} at [{}]", timeout, endpoint.getEndpointUri());
140                }
141                // lets see if we can get the task done before the timeout
142                boolean done = false;
143                try {
144                    done = latch.await(timeout, TimeUnit.MILLISECONDS);
145                } catch (InterruptedException e) {
146                    // ignore
147                }
148                if (!done) {
149                    exchange.setException(new ExchangeTimedOutException(exchange, timeout));
150                    // remove timed out Exchange from queue
151                    endpoint.getQueue().remove(copy);
152                    // count down to indicate timeout
153                    latch.countDown();
154                }
155            } else {
156                if (log.isTraceEnabled()) {
157                    log.trace("Waiting for task to complete (blocking) at [{}]", endpoint.getEndpointUri());
158                }
159                // no timeout then wait until its done
160                try {
161                    latch.await();
162                } catch (InterruptedException e) {
163                    // ignore
164                }
165            }
166        } else {
167            // no wait, eg its a InOnly then just add to queue and return
168            try {
169                addToQueue(exchange, true);
170            } catch (SedaConsumerNotAvailableException e) {
171                exchange.setException(e);
172                callback.done(true);
173                return true;
174            }
175        }
176
177        // we use OnCompletion on the Exchange to callback and wait for the Exchange to be done
178        // so we should just signal the callback we are done synchronously
179        callback.done(true);
180        return true;
181    }
182
183    protected Exchange prepareCopy(Exchange exchange, boolean handover) {
184        // use a new copy of the exchange to route async (and use same message id)
185
186        // if handover we need to do special handover to avoid handing over
187        // RestBindingMarshalOnCompletion as it should not be handed over with SEDA
188        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover, true,
189            synchronization -> !synchronization.getClass().getName().contains("RestBindingMarshalOnCompletion"));
190        // set a new from endpoint to be the seda queue
191        copy.setFromEndpoint(endpoint);
192        return copy;
193    }
194
195    @Override
196    protected void doStart() throws Exception {
197        super.doStart();
198        endpoint.onStarted(this);
199    }
200
201    @Override
202    protected void doStop() throws Exception {
203        endpoint.onStopped(this);
204        super.doStop();
205    }
206
207    /**
208     * Strategy method for adding the exchange to the queue.
209     * <p>
210     * Will perform a blocking "put" if blockWhenFull is true, otherwise it will
211     * simply add which will throw exception if the queue is full
212     * 
213     * @param exchange the exchange to add to the queue
214     * @param copy     whether to create a copy of the exchange to use for adding to the queue
215     */
216    protected void addToQueue(Exchange exchange, boolean copy) throws SedaConsumerNotAvailableException {
217        BlockingQueue<Exchange> queue = null;
218        QueueReference queueReference = endpoint.getQueueReference();
219        if (queueReference != null) {
220            queue = queueReference.getQueue();
221        }
222        if (queue == null) {
223            throw new SedaConsumerNotAvailableException("No queue available on endpoint: " + endpoint, exchange);
224        }
225
226        boolean empty = !queueReference.hasConsumers();
227        if (empty) {
228            if (endpoint.isFailIfNoConsumers()) {
229                throw new SedaConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
230            } else if (endpoint.isDiscardIfNoConsumers()) {
231                log.debug("Discard message as no active consumers on endpoint: " + endpoint);
232                return;
233            }
234        }
235
236        Exchange target = exchange;
237
238        // handover the completion so its the copy which performs that, as we do not wait
239        if (copy) {
240            target = prepareCopy(exchange, true);
241        }
242
243        log.trace("Adding Exchange to queue: {}", target);
244        if (blockWhenFull) {
245            try {
246                queue.put(target);
247            } catch (InterruptedException e) {
248                // ignore
249                log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped());
250            }
251        } else {
252            queue.add(target);
253        }
254    }
255
256}