001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.seda; 018 019import java.util.concurrent.BlockingQueue; 020import java.util.concurrent.CountDownLatch; 021import java.util.concurrent.TimeUnit; 022 023import org.apache.camel.AsyncCallback; 024import org.apache.camel.Exchange; 025import org.apache.camel.ExchangeTimedOutException; 026import org.apache.camel.WaitForTaskToComplete; 027import org.apache.camel.impl.DefaultAsyncProducer; 028import org.apache.camel.support.SynchronizationAdapter; 029import org.apache.camel.util.ExchangeHelper; 030 031/** 032 * @version 033 */ 034public class SedaProducer extends DefaultAsyncProducer { 035 036 /** 037 * @deprecated Better make use of the {@link SedaEndpoint#getQueue()} API which delivers the accurate reference to the queue currently being used. 038 */ 039 @Deprecated 040 protected final BlockingQueue<Exchange> queue; 041 private final SedaEndpoint endpoint; 042 private final WaitForTaskToComplete waitForTaskToComplete; 043 private final long timeout; 044 private final boolean blockWhenFull; 045 046 /** 047 * @deprecated Use {@link #SedaProducer(SedaEndpoint, WaitForTaskToComplete, long, boolean) the other constructor}. 048 */ 049 @Deprecated 050 public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout) { 051 this(endpoint, waitForTaskToComplete, timeout, false); 052 } 053 054 /** 055 * @deprecated Use {@link #SedaProducer(SedaEndpoint, WaitForTaskToComplete, long, boolean) the other constructor}. 056 */ 057 @Deprecated 058 public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull) { 059 this(endpoint, waitForTaskToComplete, timeout, blockWhenFull); 060 } 061 062 public SedaProducer(SedaEndpoint endpoint, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull) { 063 super(endpoint); 064 this.queue = endpoint.getQueue(); 065 this.endpoint = endpoint; 066 this.waitForTaskToComplete = waitForTaskToComplete; 067 this.timeout = timeout; 068 this.blockWhenFull = blockWhenFull; 069 } 070 071 @Override 072 public boolean process(final Exchange exchange, final AsyncCallback callback) { 073 WaitForTaskToComplete wait = waitForTaskToComplete; 074 if (exchange.getProperty(Exchange.ASYNC_WAIT) != null) { 075 wait = exchange.getProperty(Exchange.ASYNC_WAIT, WaitForTaskToComplete.class); 076 } 077 078 if (wait == WaitForTaskToComplete.Always 079 || (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))) { 080 081 // do not handover the completion as we wait for the copy to complete, and copy its result back when it done 082 Exchange copy = prepareCopy(exchange, false); 083 084 // latch that waits until we are complete 085 final CountDownLatch latch = new CountDownLatch(1); 086 087 // we should wait for the reply so install a on completion so we know when its complete 088 copy.addOnCompletion(new SynchronizationAdapter() { 089 @Override 090 public void onDone(Exchange response) { 091 // check for timeout, which then already would have invoked the latch 092 if (latch.getCount() == 0) { 093 if (log.isTraceEnabled()) { 094 log.trace("{}. Timeout occurred so response will be ignored: {}", this, response.hasOut() ? response.getOut() : response.getIn()); 095 } 096 return; 097 } else { 098 if (log.isTraceEnabled()) { 099 log.trace("{} with response: {}", this, response.hasOut() ? response.getOut() : response.getIn()); 100 } 101 try { 102 ExchangeHelper.copyResults(exchange, response); 103 } finally { 104 // always ensure latch is triggered 105 latch.countDown(); 106 } 107 } 108 } 109 110 @Override 111 public boolean allowHandover() { 112 // do not allow handover as we want to seda producer to have its completion triggered 113 // at this point in the routing (at this leg), instead of at the very last (this ensure timeout is honored) 114 return false; 115 } 116 117 @Override 118 public String toString() { 119 return "onDone at endpoint: " + endpoint; 120 } 121 }); 122 123 log.trace("Adding Exchange to queue: {}", copy); 124 try { 125 // do not copy as we already did the copy 126 addToQueue(copy, false); 127 } catch (SedaConsumerNotAvailableException e) { 128 exchange.setException(e); 129 callback.done(true); 130 return true; 131 } 132 133 if (timeout > 0) { 134 if (log.isTraceEnabled()) { 135 log.trace("Waiting for task to complete using timeout (ms): {} at [{}]", timeout, endpoint.getEndpointUri()); 136 } 137 // lets see if we can get the task done before the timeout 138 boolean done = false; 139 try { 140 done = latch.await(timeout, TimeUnit.MILLISECONDS); 141 } catch (InterruptedException e) { 142 // ignore 143 } 144 if (!done) { 145 exchange.setException(new ExchangeTimedOutException(exchange, timeout)); 146 // remove timed out Exchange from queue 147 endpoint.getQueue().remove(copy); 148 // count down to indicate timeout 149 latch.countDown(); 150 } 151 } else { 152 if (log.isTraceEnabled()) { 153 log.trace("Waiting for task to complete (blocking) at [{}]", endpoint.getEndpointUri()); 154 } 155 // no timeout then wait until its done 156 try { 157 latch.await(); 158 } catch (InterruptedException e) { 159 // ignore 160 } 161 } 162 } else { 163 // no wait, eg its a InOnly then just add to queue and return 164 try { 165 addToQueue(exchange, true); 166 } catch (SedaConsumerNotAvailableException e) { 167 exchange.setException(e); 168 callback.done(true); 169 return true; 170 } 171 } 172 173 // we use OnCompletion on the Exchange to callback and wait for the Exchange to be done 174 // so we should just signal the callback we are done synchronously 175 callback.done(true); 176 return true; 177 } 178 179 protected Exchange prepareCopy(Exchange exchange, boolean handover) { 180 // use a new copy of the exchange to route async (and use same message id) 181 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover, true); 182 // set a new from endpoint to be the seda queue 183 copy.setFromEndpoint(endpoint); 184 return copy; 185 } 186 187 @Override 188 protected void doStart() throws Exception { 189 super.doStart(); 190 endpoint.onStarted(this); 191 } 192 193 @Override 194 protected void doStop() throws Exception { 195 endpoint.onStopped(this); 196 super.doStop(); 197 } 198 199 /** 200 * Strategy method for adding the exchange to the queue. 201 * <p> 202 * Will perform a blocking "put" if blockWhenFull is true, otherwise it will 203 * simply add which will throw exception if the queue is full 204 * 205 * @param exchange the exchange to add to the queue 206 * @param copy whether to create a copy of the exchange to use for adding to the queue 207 */ 208 protected void addToQueue(Exchange exchange, boolean copy) throws SedaConsumerNotAvailableException { 209 BlockingQueue<Exchange> queue = null; 210 QueueReference queueReference = endpoint.getQueueReference(); 211 if (queueReference != null) { 212 queue = queueReference.getQueue(); 213 } 214 if (queue == null) { 215 throw new SedaConsumerNotAvailableException("No queue available on endpoint: " + endpoint, exchange); 216 } 217 218 boolean empty = !queueReference.hasConsumers(); 219 if (empty) { 220 if (endpoint.isFailIfNoConsumers()) { 221 throw new SedaConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); 222 } else if (endpoint.isDiscardIfNoConsumers()) { 223 log.debug("Discard message as no active consumers on endpoint: " + endpoint); 224 return; 225 } 226 } 227 228 Exchange target = exchange; 229 230 // handover the completion so its the copy which performs that, as we do not wait 231 if (copy) { 232 target = prepareCopy(exchange, true); 233 } 234 235 log.trace("Adding Exchange to queue: {}", target); 236 if (blockWhenFull) { 237 try { 238 queue.put(target); 239 } catch (InterruptedException e) { 240 // ignore 241 log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped()); 242 } 243 } else { 244 queue.add(target); 245 } 246 } 247 248}