001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.Map;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.AsyncProcessor;
023    import org.apache.camel.CamelContext;
024    import org.apache.camel.Consumer;
025    import org.apache.camel.Endpoint;
026    import org.apache.camel.EndpointConfiguration;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.ExchangePattern;
029    import org.apache.camel.PollingConsumer;
030    import org.apache.camel.Processor;
031    import org.apache.camel.Producer;
032    import org.apache.camel.util.AsyncProcessorConverterHelper;
033    import org.apache.camel.util.AsyncProcessorHelper;
034    import org.apache.camel.util.ServiceHelper;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    import static org.apache.camel.processor.PipelineHelper.continueProcessing;
039    
040    /**
041     * This is an endpoint when sending to it, is intercepted and is routed in a detour
042     *
043     * @version 
044     */
045    public class InterceptSendToEndpoint implements Endpoint {
046    
047        private static final transient Logger LOG = LoggerFactory.getLogger(InterceptSendToEndpoint.class);
048    
049        private final Endpoint delegate;
050        private Producer producer;
051        private Processor detour;
052        private boolean skip;
053    
054        /**
055         * Intercepts sending to the given endpoint
056         *
057         * @param destination  the original endpoint
058         * @param skip <tt>true</tt> to skip sending after the detour to the original endpoint
059         */
060        public InterceptSendToEndpoint(final Endpoint destination, boolean skip) {
061            this.delegate = destination;
062            this.skip = skip;
063        }
064    
065        public void setDetour(Processor detour) {
066            this.detour = detour;
067        }
068    
069        public Endpoint getDelegate() {
070            return delegate;
071        }
072    
073        public String getEndpointUri() {
074            return delegate.getEndpointUri();
075        }
076    
077        public EndpointConfiguration getEndpointConfiguration() {
078            return delegate.getEndpointConfiguration();
079        }
080    
081        public String getEndpointKey() {
082            return delegate.getEndpointKey();
083        }
084    
085        public Exchange createExchange() {
086            return delegate.createExchange();
087        }
088    
089        public Exchange createExchange(ExchangePattern pattern) {
090            return delegate.createExchange(pattern);
091        }
092    
093        public Exchange createExchange(Exchange exchange) {
094            return delegate.createExchange(exchange);
095        }
096    
097        public CamelContext getCamelContext() {
098            return delegate.getCamelContext();
099        }
100    
101        public Producer createProducer() throws Exception {
102            producer = delegate.createProducer();
103            return new DefaultAsyncProducer(delegate) {
104    
105                public Endpoint getEndpoint() {
106                    return producer.getEndpoint();
107                }
108    
109                public Exchange createExchange() {
110                    return producer.createExchange();
111                }
112    
113                public Exchange createExchange(ExchangePattern pattern) {
114                    return producer.createExchange(pattern);
115                }
116    
117                public Exchange createExchange(Exchange exchange) {
118                    return producer.createExchange(exchange);
119                }
120    
121                @Override
122                public boolean process(Exchange exchange, AsyncCallback callback) {
123                    // process the detour so we do the detour routing
124                    if (LOG.isDebugEnabled()) {
125                        LOG.debug("Sending to endpoint: {} is intercepted and detoured to: {} for exchange: {}", new Object[]{getEndpoint(), detour, exchange});
126                    }
127                    // add header with the real endpoint uri
128                    exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT, delegate.getEndpointUri());
129    
130                    // detour the exchange using synchronous processing
131                    try {
132                        detour.process(exchange);
133                    } catch (Exception e) {
134                        exchange.setException(e);
135                        callback.done(true);
136                        return true;
137                    }
138    
139                    // Decide whether to continue or not; similar logic to the Pipeline
140                    // check for error if so we should break out
141                    if (!continueProcessing(exchange, "skip sending to original intended destination: " + getEndpoint(), LOG)) {
142                        callback.done(true);
143                        return true;
144                    }
145    
146                    // determine if we should skip or not
147                    boolean shouldSkip = skip;
148    
149                    // if then interceptor had a when predicate, then we should only skip if it matched
150                    Boolean whenMatches = (Boolean) exchange.removeProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
151                    if (whenMatches != null) {
152                        shouldSkip = skip && whenMatches;
153                    }
154    
155                    if (!shouldSkip) {
156                        if (exchange.hasOut()) {
157                            // replace OUT with IN as detour changed something
158                            exchange.setIn(exchange.getOut());
159                            exchange.setOut(null);
160                        }
161    
162                        // route to original destination leveraging the asynchronous routing engine
163                        AsyncProcessor async = AsyncProcessorConverterHelper.convert(producer);
164                        return AsyncProcessorHelper.process(async, exchange, callback);
165                    } else {
166                        if (LOG.isDebugEnabled()) {
167                            LOG.debug("Stop() means skip sending exchange to original intended destination: {} for exchange: {}", getEndpoint(), exchange);
168                        }
169                        callback.done(true);
170                        return true;
171                    }
172                }
173    
174                public boolean isSingleton() {
175                    return producer.isSingleton();
176                }
177    
178                public void start() throws Exception {
179                    ServiceHelper.startService(detour);
180                    // here we also need to start the producer
181                    ServiceHelper.startService(producer);
182                }
183    
184                public void stop() throws Exception {
185                    // do not stop detour as it should only be stopped when the interceptor stops
186                    // we should stop the producer here
187                    ServiceHelper.stopService(producer);
188                }
189            };
190        }
191    
192        public Consumer createConsumer(Processor processor) throws Exception {
193            return delegate.createConsumer(processor);
194        }
195    
196        public PollingConsumer createPollingConsumer() throws Exception {
197            return delegate.createPollingConsumer();
198        }
199    
200        public void configureProperties(Map<String, Object> options) {
201            delegate.configureProperties(options);
202        }
203    
204        public void setCamelContext(CamelContext context) {
205            delegate.setCamelContext(context);
206        }
207    
208        public boolean isLenientProperties() {
209            return delegate.isLenientProperties();
210        }
211    
212        public boolean isSingleton() {
213            return delegate.isSingleton();
214        }
215    
216        public void start() throws Exception {
217            ServiceHelper.startServices(detour, delegate);
218        }
219    
220        public void stop() throws Exception {
221            ServiceHelper.stopServices(delegate, detour);
222        }
223    
224        @Override
225        public String toString() {
226            return delegate.toString();
227        }
228    }