001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.Map; 020 021 import org.apache.camel.AsyncCallback; 022 import org.apache.camel.AsyncProcessor; 023 import org.apache.camel.CamelContext; 024 import org.apache.camel.Consumer; 025 import org.apache.camel.Endpoint; 026 import org.apache.camel.EndpointConfiguration; 027 import org.apache.camel.Exchange; 028 import org.apache.camel.ExchangePattern; 029 import org.apache.camel.PollingConsumer; 030 import org.apache.camel.Processor; 031 import org.apache.camel.Producer; 032 import org.apache.camel.util.AsyncProcessorConverterHelper; 033 import org.apache.camel.util.AsyncProcessorHelper; 034 import org.apache.camel.util.ServiceHelper; 035 import org.slf4j.Logger; 036 import org.slf4j.LoggerFactory; 037 038 import static org.apache.camel.processor.PipelineHelper.continueProcessing; 039 040 /** 041 * This is an endpoint when sending to it, is intercepted and is routed in a detour 042 * 043 * @version 044 */ 045 public class InterceptSendToEndpoint implements Endpoint { 046 047 private static final transient Logger LOG = LoggerFactory.getLogger(InterceptSendToEndpoint.class); 048 049 private final Endpoint delegate; 050 private Producer producer; 051 private Processor detour; 052 private boolean skip; 053 054 /** 055 * Intercepts sending to the given endpoint 056 * 057 * @param destination the original endpoint 058 * @param skip <tt>true</tt> to skip sending after the detour to the original endpoint 059 */ 060 public InterceptSendToEndpoint(final Endpoint destination, boolean skip) { 061 this.delegate = destination; 062 this.skip = skip; 063 } 064 065 public void setDetour(Processor detour) { 066 this.detour = detour; 067 } 068 069 public Endpoint getDelegate() { 070 return delegate; 071 } 072 073 public String getEndpointUri() { 074 return delegate.getEndpointUri(); 075 } 076 077 public EndpointConfiguration getEndpointConfiguration() { 078 return delegate.getEndpointConfiguration(); 079 } 080 081 public String getEndpointKey() { 082 return delegate.getEndpointKey(); 083 } 084 085 public Exchange createExchange() { 086 return delegate.createExchange(); 087 } 088 089 public Exchange createExchange(ExchangePattern pattern) { 090 return delegate.createExchange(pattern); 091 } 092 093 public Exchange createExchange(Exchange exchange) { 094 return delegate.createExchange(exchange); 095 } 096 097 public CamelContext getCamelContext() { 098 return delegate.getCamelContext(); 099 } 100 101 public Producer createProducer() throws Exception { 102 producer = delegate.createProducer(); 103 return new DefaultAsyncProducer(delegate) { 104 105 public Endpoint getEndpoint() { 106 return producer.getEndpoint(); 107 } 108 109 public Exchange createExchange() { 110 return producer.createExchange(); 111 } 112 113 public Exchange createExchange(ExchangePattern pattern) { 114 return producer.createExchange(pattern); 115 } 116 117 public Exchange createExchange(Exchange exchange) { 118 return producer.createExchange(exchange); 119 } 120 121 @Override 122 public boolean process(Exchange exchange, AsyncCallback callback) { 123 // process the detour so we do the detour routing 124 if (LOG.isDebugEnabled()) { 125 LOG.debug("Sending to endpoint: {} is intercepted and detoured to: {} for exchange: {}", new Object[]{getEndpoint(), detour, exchange}); 126 } 127 // add header with the real endpoint uri 128 exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT, delegate.getEndpointUri()); 129 130 // detour the exchange using synchronous processing 131 try { 132 detour.process(exchange); 133 } catch (Exception e) { 134 exchange.setException(e); 135 callback.done(true); 136 return true; 137 } 138 139 // Decide whether to continue or not; similar logic to the Pipeline 140 // check for error if so we should break out 141 if (!continueProcessing(exchange, "skip sending to original intended destination: " + getEndpoint(), LOG)) { 142 callback.done(true); 143 return true; 144 } 145 146 // determine if we should skip or not 147 boolean shouldSkip = skip; 148 149 // if then interceptor had a when predicate, then we should only skip if it matched 150 Boolean whenMatches = (Boolean) exchange.removeProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); 151 if (whenMatches != null) { 152 shouldSkip = skip && whenMatches; 153 } 154 155 if (!shouldSkip) { 156 if (exchange.hasOut()) { 157 // replace OUT with IN as detour changed something 158 exchange.setIn(exchange.getOut()); 159 exchange.setOut(null); 160 } 161 162 // route to original destination leveraging the asynchronous routing engine 163 AsyncProcessor async = AsyncProcessorConverterHelper.convert(producer); 164 return AsyncProcessorHelper.process(async, exchange, callback); 165 } else { 166 if (LOG.isDebugEnabled()) { 167 LOG.debug("Stop() means skip sending exchange to original intended destination: {} for exchange: {}", getEndpoint(), exchange); 168 } 169 callback.done(true); 170 return true; 171 } 172 } 173 174 public boolean isSingleton() { 175 return producer.isSingleton(); 176 } 177 178 public void start() throws Exception { 179 ServiceHelper.startService(detour); 180 // here we also need to start the producer 181 ServiceHelper.startService(producer); 182 } 183 184 public void stop() throws Exception { 185 // do not stop detour as it should only be stopped when the interceptor stops 186 // we should stop the producer here 187 ServiceHelper.stopService(producer); 188 } 189 }; 190 } 191 192 public Consumer createConsumer(Processor processor) throws Exception { 193 return delegate.createConsumer(processor); 194 } 195 196 public PollingConsumer createPollingConsumer() throws Exception { 197 return delegate.createPollingConsumer(); 198 } 199 200 public void configureProperties(Map<String, Object> options) { 201 delegate.configureProperties(options); 202 } 203 204 public void setCamelContext(CamelContext context) { 205 delegate.setCamelContext(context); 206 } 207 208 public boolean isLenientProperties() { 209 return delegate.isLenientProperties(); 210 } 211 212 public boolean isSingleton() { 213 return delegate.isSingleton(); 214 } 215 216 public void start() throws Exception { 217 ServiceHelper.startServices(detour, delegate); 218 } 219 220 public void stop() throws Exception { 221 ServiceHelper.stopServices(delegate, detour); 222 } 223 224 @Override 225 public String toString() { 226 return delegate.toString(); 227 } 228 }