001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.seda; 018 019import java.util.List; 020import java.util.concurrent.BlockingQueue; 021import java.util.concurrent.CountDownLatch; 022import java.util.concurrent.TimeUnit; 023import java.util.function.Predicate; 024 025import org.apache.camel.AsyncCallback; 026import org.apache.camel.Exchange; 027import org.apache.camel.ExchangeTimedOutException; 028import org.apache.camel.WaitForTaskToComplete; 029import org.apache.camel.impl.DefaultAsyncProducer; 030import org.apache.camel.spi.Synchronization; 031import org.apache.camel.spi.SynchronizationVetoable; 032import org.apache.camel.support.SynchronizationAdapter; 033import org.apache.camel.util.ExchangeHelper; 034 035/** 036 * @version 037 */ 038public class SedaProducer extends DefaultAsyncProducer { 039 040 /** 041 * @deprecated Better make use of the {@link SedaEndpoint#getQueue()} API which delivers the accurate reference to the queue currently being used. 042 */ 043 @Deprecated 044 protected final BlockingQueue<Exchange> queue; 045 private final SedaEndpoint endpoint; 046 private final WaitForTaskToComplete waitForTaskToComplete; 047 private final long timeout; 048 private final boolean blockWhenFull; 049 050 /** 051 * @deprecated Use {@link #SedaProducer(SedaEndpoint, WaitForTaskToComplete, long, boolean) the other constructor}. 052 */ 053 @Deprecated 054 public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout) { 055 this(endpoint, waitForTaskToComplete, timeout, false); 056 } 057 058 /** 059 * @deprecated Use {@link #SedaProducer(SedaEndpoint, WaitForTaskToComplete, long, boolean) the other constructor}. 060 */ 061 @Deprecated 062 public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull) { 063 this(endpoint, waitForTaskToComplete, timeout, blockWhenFull); 064 } 065 066 public SedaProducer(SedaEndpoint endpoint, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull) { 067 super(endpoint); 068 this.queue = endpoint.getQueue(); 069 this.endpoint = endpoint; 070 this.waitForTaskToComplete = waitForTaskToComplete; 071 this.timeout = timeout; 072 this.blockWhenFull = blockWhenFull; 073 } 074 075 @Override 076 public boolean process(final Exchange exchange, final AsyncCallback callback) { 077 WaitForTaskToComplete wait = waitForTaskToComplete; 078 if (exchange.getProperty(Exchange.ASYNC_WAIT) != null) { 079 wait = exchange.getProperty(Exchange.ASYNC_WAIT, WaitForTaskToComplete.class); 080 } 081 082 if (wait == WaitForTaskToComplete.Always 083 || (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))) { 084 085 // do not handover the completion as we wait for the copy to complete, and copy its result back when it done 086 Exchange copy = prepareCopy(exchange, false); 087 088 // latch that waits until we are complete 089 final CountDownLatch latch = new CountDownLatch(1); 090 091 // we should wait for the reply so install a on completion so we know when its complete 092 copy.addOnCompletion(new SynchronizationAdapter() { 093 @Override 094 public void onDone(Exchange response) { 095 // check for timeout, which then already would have invoked the latch 096 if (latch.getCount() == 0) { 097 if (log.isTraceEnabled()) { 098 log.trace("{}. Timeout occurred so response will be ignored: {}", this, response.hasOut() ? response.getOut() : response.getIn()); 099 } 100 return; 101 } else { 102 if (log.isTraceEnabled()) { 103 log.trace("{} with response: {}", this, response.hasOut() ? response.getOut() : response.getIn()); 104 } 105 try { 106 ExchangeHelper.copyResults(exchange, response); 107 } finally { 108 // always ensure latch is triggered 109 latch.countDown(); 110 } 111 } 112 } 113 114 @Override 115 public boolean allowHandover() { 116 // do not allow handover as we want to seda producer to have its completion triggered 117 // at this point in the routing (at this leg), instead of at the very last (this ensure timeout is honored) 118 return false; 119 } 120 121 @Override 122 public String toString() { 123 return "onDone at endpoint: " + endpoint; 124 } 125 }); 126 127 log.trace("Adding Exchange to queue: {}", copy); 128 try { 129 // do not copy as we already did the copy 130 addToQueue(copy, false); 131 } catch (SedaConsumerNotAvailableException e) { 132 exchange.setException(e); 133 callback.done(true); 134 return true; 135 } 136 137 if (timeout > 0) { 138 if (log.isTraceEnabled()) { 139 log.trace("Waiting for task to complete using timeout (ms): {} at [{}]", timeout, endpoint.getEndpointUri()); 140 } 141 // lets see if we can get the task done before the timeout 142 boolean done = false; 143 try { 144 done = latch.await(timeout, TimeUnit.MILLISECONDS); 145 } catch (InterruptedException e) { 146 // ignore 147 } 148 if (!done) { 149 exchange.setException(new ExchangeTimedOutException(exchange, timeout)); 150 // remove timed out Exchange from queue 151 endpoint.getQueue().remove(copy); 152 // count down to indicate timeout 153 latch.countDown(); 154 } 155 } else { 156 if (log.isTraceEnabled()) { 157 log.trace("Waiting for task to complete (blocking) at [{}]", endpoint.getEndpointUri()); 158 } 159 // no timeout then wait until its done 160 try { 161 latch.await(); 162 } catch (InterruptedException e) { 163 // ignore 164 } 165 } 166 } else { 167 // no wait, eg its a InOnly then just add to queue and return 168 try { 169 addToQueue(exchange, true); 170 } catch (SedaConsumerNotAvailableException e) { 171 exchange.setException(e); 172 callback.done(true); 173 return true; 174 } 175 } 176 177 // we use OnCompletion on the Exchange to callback and wait for the Exchange to be done 178 // so we should just signal the callback we are done synchronously 179 callback.done(true); 180 return true; 181 } 182 183 protected Exchange prepareCopy(Exchange exchange, boolean handover) { 184 // use a new copy of the exchange to route async (and use same message id) 185 186 // if handover we need to do special handover to avoid handing over 187 // RestBindingMarshalOnCompletion as it should not be handed over with SEDA 188 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover, true, 189 synchronization -> !synchronization.getClass().getName().contains("RestBindingMarshalOnCompletion")); 190 // set a new from endpoint to be the seda queue 191 copy.setFromEndpoint(endpoint); 192 return copy; 193 } 194 195 @Override 196 protected void doStart() throws Exception { 197 super.doStart(); 198 endpoint.onStarted(this); 199 } 200 201 @Override 202 protected void doStop() throws Exception { 203 endpoint.onStopped(this); 204 super.doStop(); 205 } 206 207 /** 208 * Strategy method for adding the exchange to the queue. 209 * <p> 210 * Will perform a blocking "put" if blockWhenFull is true, otherwise it will 211 * simply add which will throw exception if the queue is full 212 * 213 * @param exchange the exchange to add to the queue 214 * @param copy whether to create a copy of the exchange to use for adding to the queue 215 */ 216 protected void addToQueue(Exchange exchange, boolean copy) throws SedaConsumerNotAvailableException { 217 BlockingQueue<Exchange> queue = null; 218 QueueReference queueReference = endpoint.getQueueReference(); 219 if (queueReference != null) { 220 queue = queueReference.getQueue(); 221 } 222 if (queue == null) { 223 throw new SedaConsumerNotAvailableException("No queue available on endpoint: " + endpoint, exchange); 224 } 225 226 boolean empty = !queueReference.hasConsumers(); 227 if (empty) { 228 if (endpoint.isFailIfNoConsumers()) { 229 throw new SedaConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); 230 } else if (endpoint.isDiscardIfNoConsumers()) { 231 log.debug("Discard message as no active consumers on endpoint: " + endpoint); 232 return; 233 } 234 } 235 236 Exchange target = exchange; 237 238 // handover the completion so its the copy which performs that, as we do not wait 239 if (copy) { 240 target = prepareCopy(exchange, true); 241 } 242 243 log.trace("Adding Exchange to queue: {}", target); 244 if (blockWhenFull) { 245 try { 246 queue.put(target); 247 } catch (InterruptedException e) { 248 // ignore 249 log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped()); 250 } 251 } else { 252 queue.add(target); 253 } 254 } 255 256}