001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.HashMap;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.AsyncProcessor;
023    import org.apache.camel.AsyncProducerCallback;
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Endpoint;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.ExchangePattern;
028    import org.apache.camel.Producer;
029    import org.apache.camel.ProducerCallback;
030    import org.apache.camel.Traceable;
031    import org.apache.camel.impl.InterceptSendToEndpoint;
032    import org.apache.camel.impl.ProducerCache;
033    import org.apache.camel.support.ServiceSupport;
034    import org.apache.camel.util.AsyncProcessorHelper;
035    import org.apache.camel.util.ObjectHelper;
036    import org.apache.camel.util.ServiceHelper;
037    import org.apache.camel.util.URISupport;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     * Processor for forwarding exchanges to an endpoint destination.
043     *
044     * @version 
045     */
046    public class SendProcessor extends ServiceSupport implements AsyncProcessor, Traceable {
047        protected final transient Logger log = LoggerFactory.getLogger(getClass());
048        protected final CamelContext camelContext;
049        protected ProducerCache producerCache;
050        protected Endpoint destination;
051        protected ExchangePattern pattern;
052    
053        public SendProcessor(Endpoint destination) {
054            ObjectHelper.notNull(destination, "destination");
055            this.destination = destination;
056            this.camelContext = destination.getCamelContext();
057            ObjectHelper.notNull(this.camelContext, "camelContext");
058        }
059    
060        public SendProcessor(Endpoint destination, ExchangePattern pattern) {
061            this(destination);
062            this.pattern = pattern;
063        }
064    
065        @Override
066        public String toString() {
067            return "sendTo(" + destination + (pattern != null ? " " + pattern : "") + ")";
068        }
069    
070        public void setDestination(Endpoint destination) {
071            this.destination = destination;
072            // destination changed so purge the cache
073            if (producerCache != null) {
074                producerCache.purge();
075            }
076        }
077    
078        public String getTraceLabel() {
079            return URISupport.sanitizeUri(destination.getEndpointUri());
080        }
081    
082        public void process(final Exchange exchange) throws Exception {
083            if (!isStarted()) {
084                throw new IllegalStateException("SendProcessor has not been started: " + this);
085            }
086    
087            // we should preserve existing MEP so remember old MEP
088            // if you want to permanently to change the MEP then use .setExchangePattern in the DSL
089            final ExchangePattern existingPattern = exchange.getPattern();
090    
091            // send the exchange to the destination using a producer
092            producerCache.doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
093                public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
094                    exchange = configureExchange(exchange, pattern);
095                    log.debug(">>>> {} {}", destination, exchange);
096                    try {
097                        producer.process(exchange);
098                    } finally {
099                        // restore previous MEP
100                        exchange.setPattern(existingPattern);
101                    }
102                    return exchange;
103                }
104            });
105        }
106    
107        public boolean process(Exchange exchange, final AsyncCallback callback) {
108            if (!isStarted()) {
109                throw new IllegalStateException("SendProcessor has not been started: " + this);
110            }
111    
112            // we should preserve existing MEP so remember old MEP
113            // if you want to permanently to change the MEP then use .setExchangePattern in the DSL
114            final ExchangePattern existingPattern = exchange.getPattern();
115    
116            // send the exchange to the destination using a producer
117            return producerCache.doInAsyncProducer(destination, exchange, pattern, callback, new AsyncProducerCallback() {
118                public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange,
119                                                 ExchangePattern pattern, final AsyncCallback callback) {
120                    final Exchange target = configureExchange(exchange, pattern);
121                    log.debug(">>>> {} {}", destination, exchange);
122                    return AsyncProcessorHelper.process(asyncProducer, target, new AsyncCallback() {
123                        public void done(boolean doneSync) {
124                            // restore previous MEP
125                            target.setPattern(existingPattern);
126                            // signal we are done
127                            callback.done(doneSync);
128                        }
129                    });
130                }
131            });
132        }
133    
134        public Endpoint getDestination() {
135            return destination;
136        }
137    
138        public ExchangePattern getPattern() {
139            return pattern;
140        }
141    
142        protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) {
143            if (pattern != null) {
144                exchange.setPattern(pattern);
145            }
146            // set property which endpoint we send to
147            exchange.setProperty(Exchange.TO_ENDPOINT, destination.getEndpointUri());
148            return exchange;
149        }
150    
151        protected void doStart() throws Exception {
152            if (producerCache == null) {
153                // use a single producer cache as we need to only hold reference for one destination
154                // and use a regular HashMap as we do not want a soft reference store that may get re-claimed when low on memory
155                // as we want to ensure the producer is kept around, to ensure its lifecycle is fully managed,
156                // eg stopping the producer when we stop etc.
157                producerCache = new ProducerCache(this, camelContext, new HashMap<String, Producer>(1));
158                // do not add as service as we do not want to manage the producer cache
159            }
160            ServiceHelper.startService(producerCache);
161    
162            // the destination could since have been intercepted by a interceptSendToEndpoint so we got to
163            // lookup this before we can use the destination
164            Endpoint lookup = camelContext.hasEndpoint(destination.getEndpointKey());
165            if (lookup instanceof InterceptSendToEndpoint) {
166                if (log.isDebugEnabled()) {
167                    log.debug("Intercepted sending to {} -> {}",
168                            URISupport.sanitizeUri(destination.getEndpointUri()), URISupport.sanitizeUri(lookup.getEndpointUri()));
169                }
170                destination = lookup;
171            }
172            // warm up the producer by starting it so we can fail fast if there was a problem
173            // however must start endpoint first
174            ServiceHelper.startService(destination);
175            producerCache.startProducer(destination);
176        }
177    
178        protected void doStop() throws Exception {
179            ServiceHelper.stopService(producerCache);
180        }
181    
182        protected void doShutdown() throws Exception {
183            ServiceHelper.stopAndShutdownService(producerCache);
184        }
185    }