001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.seda; 018 019import java.util.concurrent.BlockingQueue; 020import java.util.concurrent.CountDownLatch; 021import java.util.concurrent.TimeUnit; 022 023import org.apache.camel.AsyncCallback; 024import org.apache.camel.Exchange; 025import org.apache.camel.ExchangeTimedOutException; 026import org.apache.camel.WaitForTaskToComplete; 027import org.apache.camel.impl.DefaultAsyncProducer; 028import org.apache.camel.support.SynchronizationAdapter; 029import org.apache.camel.util.ExchangeHelper; 030 031/** 032 * @version 033 */ 034public class SedaProducer extends DefaultAsyncProducer { 035 036 /** 037 * @deprecated Better make use of the {@link SedaEndpoint#getQueue()} API which delivers the accurate reference to the queue currently being used. 038 */ 039 @Deprecated 040 protected final BlockingQueue<Exchange> queue; 041 private final SedaEndpoint endpoint; 042 private final WaitForTaskToComplete waitForTaskToComplete; 043 private final long timeout; 044 private final boolean blockWhenFull; 045 private final long offerTimeout; 046 047 /** 048 * @deprecated Use {@link #SedaProducer(SedaEndpoint, WaitForTaskToComplete, long, boolean) the other constructor}. 049 */ 050 @Deprecated 051 public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout) { 052 this(endpoint, waitForTaskToComplete, timeout, false, 0); 053 } 054 055 /** 056 * @deprecated Use {@link #SedaProducer(SedaEndpoint, WaitForTaskToComplete, long, boolean) the other constructor}. 057 */ 058 @Deprecated 059 public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull, long offerTimeout) { 060 this(endpoint, waitForTaskToComplete, timeout, blockWhenFull, offerTimeout); 061 } 062 063 public SedaProducer(SedaEndpoint endpoint, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull, long offerTimeout) { 064 super(endpoint); 065 this.queue = endpoint.getQueue(); 066 this.endpoint = endpoint; 067 this.waitForTaskToComplete = waitForTaskToComplete; 068 this.timeout = timeout; 069 this.blockWhenFull = blockWhenFull; 070 this.offerTimeout = offerTimeout; 071 } 072 073 @Override 074 public boolean process(final Exchange exchange, final AsyncCallback callback) { 075 WaitForTaskToComplete wait = waitForTaskToComplete; 076 if (exchange.getProperty(Exchange.ASYNC_WAIT) != null) { 077 wait = exchange.getProperty(Exchange.ASYNC_WAIT, WaitForTaskToComplete.class); 078 } 079 080 if (wait == WaitForTaskToComplete.Always 081 || (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))) { 082 083 // do not handover the completion as we wait for the copy to complete, and copy its result back when it done 084 Exchange copy = prepareCopy(exchange, false); 085 086 // latch that waits until we are complete 087 final CountDownLatch latch = new CountDownLatch(1); 088 089 // we should wait for the reply so install a on completion so we know when its complete 090 copy.addOnCompletion(new SynchronizationAdapter() { 091 @Override 092 public void onDone(Exchange response) { 093 // check for timeout, which then already would have invoked the latch 094 if (latch.getCount() == 0) { 095 if (log.isTraceEnabled()) { 096 log.trace("{}. Timeout occurred so response will be ignored: {}", this, response.hasOut() ? response.getOut() : response.getIn()); 097 } 098 return; 099 } else { 100 if (log.isTraceEnabled()) { 101 log.trace("{} with response: {}", this, response.hasOut() ? response.getOut() : response.getIn()); 102 } 103 try { 104 ExchangeHelper.copyResults(exchange, response); 105 } finally { 106 // always ensure latch is triggered 107 latch.countDown(); 108 } 109 } 110 } 111 112 @Override 113 public boolean allowHandover() { 114 // do not allow handover as we want to seda producer to have its completion triggered 115 // at this point in the routing (at this leg), instead of at the very last (this ensure timeout is honored) 116 return false; 117 } 118 119 @Override 120 public String toString() { 121 return "onDone at endpoint: " + endpoint; 122 } 123 }); 124 125 log.trace("Adding Exchange to queue: {}", copy); 126 try { 127 // do not copy as we already did the copy 128 addToQueue(copy, false); 129 } catch (SedaConsumerNotAvailableException e) { 130 exchange.setException(e); 131 callback.done(true); 132 return true; 133 } 134 135 if (timeout > 0) { 136 if (log.isTraceEnabled()) { 137 log.trace("Waiting for task to complete using timeout (ms): {} at [{}]", timeout, endpoint.getEndpointUri()); 138 } 139 // lets see if we can get the task done before the timeout 140 boolean done = false; 141 try { 142 done = latch.await(timeout, TimeUnit.MILLISECONDS); 143 } catch (InterruptedException e) { 144 // ignore 145 } 146 if (!done) { 147 exchange.setException(new ExchangeTimedOutException(exchange, timeout)); 148 // remove timed out Exchange from queue 149 endpoint.getQueue().remove(copy); 150 // count down to indicate timeout 151 latch.countDown(); 152 } 153 } else { 154 if (log.isTraceEnabled()) { 155 log.trace("Waiting for task to complete (blocking) at [{}]", endpoint.getEndpointUri()); 156 } 157 // no timeout then wait until its done 158 try { 159 latch.await(); 160 } catch (InterruptedException e) { 161 // ignore 162 } 163 } 164 } else { 165 // no wait, eg its a InOnly then just add to queue and return 166 try { 167 addToQueue(exchange, true); 168 } catch (SedaConsumerNotAvailableException e) { 169 exchange.setException(e); 170 callback.done(true); 171 return true; 172 } 173 } 174 175 // we use OnCompletion on the Exchange to callback and wait for the Exchange to be done 176 // so we should just signal the callback we are done synchronously 177 callback.done(true); 178 return true; 179 } 180 181 protected Exchange prepareCopy(Exchange exchange, boolean handover) { 182 // use a new copy of the exchange to route async (and use same message id) 183 184 // if handover we need to do special handover to avoid handing over 185 // RestBindingMarshalOnCompletion as it should not be handed over with SEDA 186 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover, true, 187 synchronization -> !synchronization.getClass().getName().contains("RestBindingMarshalOnCompletion")); 188 // set a new from endpoint to be the seda queue 189 copy.setFromEndpoint(endpoint); 190 return copy; 191 } 192 193 @Override 194 protected void doStart() throws Exception { 195 super.doStart(); 196 endpoint.onStarted(this); 197 } 198 199 @Override 200 protected void doStop() throws Exception { 201 endpoint.onStopped(this); 202 super.doStop(); 203 } 204 205 /** 206 * Strategy method for adding the exchange to the queue. 207 * <p> 208 * Will perform a blocking "put" if blockWhenFull is true, otherwise it will 209 * simply add which will throw exception if the queue is full 210 * 211 * @param exchange the exchange to add to the queue 212 * @param copy whether to create a copy of the exchange to use for adding to the queue 213 */ 214 protected void addToQueue(Exchange exchange, boolean copy) throws SedaConsumerNotAvailableException { 215 boolean offerTime; 216 BlockingQueue<Exchange> queue = null; 217 QueueReference queueReference = endpoint.getQueueReference(); 218 if (queueReference != null) { 219 queue = queueReference.getQueue(); 220 } 221 if (queue == null) { 222 throw new SedaConsumerNotAvailableException("No queue available on endpoint: " + endpoint, exchange); 223 } 224 225 boolean empty = !queueReference.hasConsumers(); 226 if (empty) { 227 if (endpoint.isFailIfNoConsumers()) { 228 throw new SedaConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); 229 } else if (endpoint.isDiscardIfNoConsumers()) { 230 log.debug("Discard message as no active consumers on endpoint: {}", endpoint); 231 return; 232 } 233 } 234 235 Exchange target = exchange; 236 237 // handover the completion so its the copy which performs that, as we do not wait 238 if (copy) { 239 target = prepareCopy(exchange, true); 240 } 241 242 log.trace("Adding Exchange to queue: {}", target); 243 if (blockWhenFull && offerTimeout == 0) { 244 try { 245 queue.put(target); 246 } catch (InterruptedException e) { 247 // ignore 248 log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped()); 249 } 250 } else if (blockWhenFull && offerTimeout > 0) { 251 try { 252 offerTime = queue.offer(target, offerTimeout, TimeUnit.MILLISECONDS); 253 if (!offerTime) { 254 throw new IllegalStateException("Fails to insert element into queue, " 255 + "after timeout of" + offerTimeout + "milliseconds"); 256 } 257 } catch (InterruptedException e) { 258 // ignore 259 log.debug("Offer interrupted, are we stopping? {}", isStopping() || isStopped()); 260 } 261 } else { 262 queue.add(target); 263 } 264 } 265 266}