001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.seda; 018 019 import java.util.concurrent.BlockingQueue; 020 import java.util.concurrent.CountDownLatch; 021 import java.util.concurrent.TimeUnit; 022 023 import org.apache.camel.AsyncCallback; 024 import org.apache.camel.Exchange; 025 import org.apache.camel.ExchangeTimedOutException; 026 import org.apache.camel.WaitForTaskToComplete; 027 import org.apache.camel.impl.DefaultAsyncProducer; 028 import org.apache.camel.support.SynchronizationAdapter; 029 import org.apache.camel.util.ExchangeHelper; 030 031 /** 032 * @version 033 */ 034 public class SedaProducer extends DefaultAsyncProducer { 035 /** 036 * @deprecated Better make use of the {@link SedaEndpoint#getQueue()} API which delivers the accurate reference to the queue currently being used. 037 */ 038 @Deprecated 039 protected final BlockingQueue<Exchange> queue; 040 private final SedaEndpoint endpoint; 041 private final WaitForTaskToComplete waitForTaskToComplete; 042 private final long timeout; 043 private final boolean blockWhenFull; 044 045 /** 046 * @deprecated Use {@link #SedaProducer(SedaEndpoint, WaitForTaskToComplete, long, boolean) the other constructor}. 047 */ 048 @Deprecated 049 public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout) { 050 this(endpoint, waitForTaskToComplete, timeout, false); 051 } 052 053 /** 054 * @deprecated Use {@link #SedaProducer(SedaEndpoint, WaitForTaskToComplete, long, boolean) the other constructor}. 055 */ 056 @Deprecated 057 public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull) { 058 this(endpoint, waitForTaskToComplete, timeout, blockWhenFull); 059 } 060 061 public SedaProducer(SedaEndpoint endpoint, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull) { 062 super(endpoint); 063 this.queue = endpoint.getQueue(); 064 this.endpoint = endpoint; 065 this.waitForTaskToComplete = waitForTaskToComplete; 066 this.timeout = timeout; 067 this.blockWhenFull = blockWhenFull; 068 } 069 070 @Override 071 public boolean process(final Exchange exchange, final AsyncCallback callback) { 072 WaitForTaskToComplete wait = waitForTaskToComplete; 073 if (exchange.getProperty(Exchange.ASYNC_WAIT) != null) { 074 wait = exchange.getProperty(Exchange.ASYNC_WAIT, WaitForTaskToComplete.class); 075 } 076 077 if (wait == WaitForTaskToComplete.Always 078 || (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))) { 079 080 // do not handover the completion as we wait for the copy to complete, and copy its result back when it done 081 Exchange copy = prepareCopy(exchange, false); 082 083 // latch that waits until we are complete 084 final CountDownLatch latch = new CountDownLatch(1); 085 086 // we should wait for the reply so install a on completion so we know when its complete 087 copy.addOnCompletion(new SynchronizationAdapter() { 088 @Override 089 public void onDone(Exchange response) { 090 // check for timeout, which then already would have invoked the latch 091 if (latch.getCount() == 0) { 092 if (log.isTraceEnabled()) { 093 log.trace("{}. Timeout occurred so response will be ignored: {}", this, response.hasOut() ? response.getOut() : response.getIn()); 094 } 095 return; 096 } else { 097 if (log.isTraceEnabled()) { 098 log.trace("{} with response: {}", this, response.hasOut() ? response.getOut() : response.getIn()); 099 } 100 try { 101 ExchangeHelper.copyResults(exchange, response); 102 } finally { 103 // always ensure latch is triggered 104 latch.countDown(); 105 } 106 } 107 } 108 109 @Override 110 public boolean allowHandover() { 111 // do not allow handover as we want to seda producer to have its completion triggered 112 // at this point in the routing (at this leg), instead of at the very last (this ensure timeout is honored) 113 return false; 114 } 115 116 @Override 117 public String toString() { 118 return "onDone at endpoint: " + endpoint; 119 } 120 }); 121 122 log.trace("Adding Exchange to queue: {}", copy); 123 addToQueue(copy); 124 125 if (timeout > 0) { 126 if (log.isTraceEnabled()) { 127 log.trace("Waiting for task to complete using timeout (ms): {} at [{}]", timeout, endpoint.getEndpointUri()); 128 } 129 // lets see if we can get the task done before the timeout 130 boolean done = false; 131 try { 132 done = latch.await(timeout, TimeUnit.MILLISECONDS); 133 } catch (InterruptedException e) { 134 // ignore 135 } 136 if (!done) { 137 exchange.setException(new ExchangeTimedOutException(exchange, timeout)); 138 // remove timed out Exchange from queue 139 endpoint.getQueue().remove(copy); 140 // count down to indicate timeout 141 latch.countDown(); 142 } 143 } else { 144 if (log.isTraceEnabled()) { 145 log.trace("Waiting for task to complete (blocking) at [{}]", endpoint.getEndpointUri()); 146 } 147 // no timeout then wait until its done 148 try { 149 latch.await(); 150 } catch (InterruptedException e) { 151 // ignore 152 } 153 } 154 } else { 155 // no wait, eg its a InOnly then just add to queue and return 156 // handover the completion so its the copy which performs that, as we do not wait 157 Exchange copy = prepareCopy(exchange, true); 158 log.trace("Adding Exchange to queue: {}", copy); 159 addToQueue(copy); 160 } 161 162 // we use OnCompletion on the Exchange to callback and wait for the Exchange to be done 163 // so we should just signal the callback we are done synchronously 164 callback.done(true); 165 return true; 166 } 167 168 protected Exchange prepareCopy(Exchange exchange, boolean handover) { 169 // use a new copy of the exchange to route async 170 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover); 171 // set a new from endpoint to be the seda queue 172 copy.setFromEndpoint(endpoint); 173 return copy; 174 } 175 176 @Override 177 protected void doStart() throws Exception { 178 super.doStart(); 179 endpoint.onStarted(this); 180 } 181 182 @Override 183 protected void doStop() throws Exception { 184 endpoint.onStopped(this); 185 super.doStop(); 186 } 187 188 /** 189 * Strategy method for adding the exchange to the queue. 190 * <p> 191 * Will perform a blocking "put" if blockWhenFull is true, otherwise it will 192 * simply add which will throw exception if the queue is full 193 * 194 * @param exchange the exchange to add to the queue 195 */ 196 protected void addToQueue(Exchange exchange) { 197 BlockingQueue<Exchange> queue = endpoint.getQueue(); 198 if (blockWhenFull) { 199 try { 200 queue.put(exchange); 201 } catch (InterruptedException e) { 202 // ignore 203 log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped()); 204 } 205 } else { 206 queue.add(exchange); 207 } 208 } 209 210 }