001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.List; 022import java.util.concurrent.Callable; 023import java.util.concurrent.ExecutorService; 024 025import org.apache.camel.AsyncCallback; 026import org.apache.camel.AsyncProcessor; 027import org.apache.camel.CamelContext; 028import org.apache.camel.CamelContextAware; 029import org.apache.camel.Exchange; 030import org.apache.camel.ExchangePattern; 031import org.apache.camel.Expression; 032import org.apache.camel.Message; 033import org.apache.camel.Processor; 034import org.apache.camel.StreamCache; 035import org.apache.camel.Traceable; 036import org.apache.camel.impl.DefaultExchange; 037import org.apache.camel.spi.EndpointUtilizationStatistics; 038import org.apache.camel.spi.IdAware; 039import org.apache.camel.support.ServiceSupport; 040import org.apache.camel.util.AsyncProcessorHelper; 041import org.apache.camel.util.ExchangeHelper; 042import org.apache.camel.util.ObjectHelper; 043import org.apache.camel.util.ServiceHelper; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * Processor for wire tapping exchanges to an endpoint destination. 049 * 050 * @version 051 */ 052public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware, CamelContextAware { 053 private static final Logger LOG = LoggerFactory.getLogger(WireTapProcessor.class); 054 private String id; 055 private CamelContext camelContext; 056 private final SendDynamicProcessor dynamicProcessor; 057 private final String uri; 058 private final Processor processor; 059 private final ExchangePattern exchangePattern; 060 private final ExecutorService executorService; 061 private volatile boolean shutdownExecutorService; 062 063 // expression or processor used for populating a new exchange to send 064 // as opposed to traditional wiretap that sends a copy of the original exchange 065 private Expression newExchangeExpression; 066 private List<Processor> newExchangeProcessors; 067 private boolean copy; 068 private Processor onPrepare; 069 070 public WireTapProcessor(SendDynamicProcessor dynamicProcessor, Processor processor, ExchangePattern exchangePattern, 071 ExecutorService executorService, boolean shutdownExecutorService) { 072 this.dynamicProcessor = dynamicProcessor; 073 this.uri = dynamicProcessor.getUri(); 074 this.processor = processor; 075 this.exchangePattern = exchangePattern; 076 ObjectHelper.notNull(executorService, "executorService"); 077 this.executorService = executorService; 078 this.shutdownExecutorService = shutdownExecutorService; 079 } 080 081 @Override 082 public String toString() { 083 return "WireTap[" + uri + "]"; 084 } 085 086 @Override 087 public String getTraceLabel() { 088 return "wireTap(" + uri + ")"; 089 } 090 091 public String getId() { 092 return id; 093 } 094 095 public void setId(String id) { 096 this.id = id; 097 } 098 099 public CamelContext getCamelContext() { 100 return camelContext; 101 } 102 103 public void setCamelContext(CamelContext camelContext) { 104 this.camelContext = camelContext; 105 } 106 107 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 108 return dynamicProcessor.getEndpointUtilizationStatistics(); 109 } 110 111 public void process(Exchange exchange) throws Exception { 112 AsyncProcessorHelper.process(this, exchange); 113 } 114 115 public boolean process(final Exchange exchange, final AsyncCallback callback) { 116 if (!isStarted()) { 117 throw new IllegalStateException("WireTapProcessor has not been started: " + this); 118 } 119 120 // must configure the wire tap beforehand 121 Exchange target; 122 try { 123 target = configureExchange(exchange, exchangePattern); 124 } catch (Exception e) { 125 exchange.setException(e); 126 callback.done(true); 127 return true; 128 } 129 130 final Exchange wireTapExchange = target; 131 132 // send the exchange to the destination using an executor service 133 try { 134 executorService.submit(new Callable<Exchange>() { 135 public Exchange call() throws Exception { 136 try { 137 LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange); 138 processor.process(wireTapExchange); 139 } catch (Throwable e) { 140 LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.", e); 141 } 142 return wireTapExchange; 143 } 144 }); 145 } catch (Throwable e) { 146 // in case the thread pool rejects or cannot submit the task then we need to catch 147 // so camel error handler can react 148 exchange.setException(e); 149 } 150 151 // continue routing this synchronously 152 callback.done(true); 153 return true; 154 } 155 156 157 protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) throws IOException { 158 Exchange answer; 159 if (copy) { 160 // use a copy of the original exchange 161 answer = configureCopyExchange(exchange); 162 } else { 163 // use a new exchange 164 answer = configureNewExchange(exchange); 165 } 166 167 // prepare the exchange 168 if (newExchangeExpression != null) { 169 Object body = newExchangeExpression.evaluate(answer, Object.class); 170 if (body != null) { 171 answer.getIn().setBody(body); 172 } 173 } 174 175 if (newExchangeProcessors != null) { 176 for (Processor processor : newExchangeProcessors) { 177 try { 178 processor.process(answer); 179 } catch (Exception e) { 180 throw ObjectHelper.wrapRuntimeCamelException(e); 181 } 182 } 183 } 184 185 // if the body is a stream cache we must use a copy of the stream in the wire tapped exchange 186 Message msg = answer.hasOut() ? answer.getOut() : answer.getIn(); 187 if (msg.getBody() instanceof StreamCache) { 188 // in parallel processing case, the stream must be copied, therefore get the stream 189 StreamCache cache = (StreamCache) msg.getBody(); 190 StreamCache copied = cache.copy(answer); 191 if (copied != null) { 192 msg.setBody(copied); 193 } 194 } 195 196 // invoke on prepare on the exchange if specified 197 if (onPrepare != null) { 198 try { 199 onPrepare.process(answer); 200 } catch (Exception e) { 201 throw ObjectHelper.wrapRuntimeCamelException(e); 202 } 203 } 204 205 return answer; 206 } 207 208 private Exchange configureCopyExchange(Exchange exchange) { 209 // must use a copy as we dont want it to cause side effects of the original exchange 210 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 211 // set MEP to InOnly as this wire tap is a fire and forget 212 copy.setPattern(ExchangePattern.InOnly); 213 return copy; 214 } 215 216 private Exchange configureNewExchange(Exchange exchange) { 217 return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly); 218 } 219 220 public List<Processor> getNewExchangeProcessors() { 221 return newExchangeProcessors; 222 } 223 224 public void setNewExchangeProcessors(List<Processor> newExchangeProcessors) { 225 this.newExchangeProcessors = newExchangeProcessors; 226 } 227 228 public Expression getNewExchangeExpression() { 229 return newExchangeExpression; 230 } 231 232 public void setNewExchangeExpression(Expression newExchangeExpression) { 233 this.newExchangeExpression = newExchangeExpression; 234 } 235 236 public void addNewExchangeProcessor(Processor processor) { 237 if (newExchangeProcessors == null) { 238 newExchangeProcessors = new ArrayList<Processor>(); 239 } 240 newExchangeProcessors.add(processor); 241 } 242 243 public boolean isCopy() { 244 return copy; 245 } 246 247 public void setCopy(boolean copy) { 248 this.copy = copy; 249 } 250 251 public Processor getOnPrepare() { 252 return onPrepare; 253 } 254 255 public void setOnPrepare(Processor onPrepare) { 256 this.onPrepare = onPrepare; 257 } 258 259 public String getUri() { 260 return uri; 261 } 262 263 public int getCacheSize() { 264 return dynamicProcessor.getCacheSize(); 265 } 266 267 public boolean isIgnoreInvalidEndpoint() { 268 return dynamicProcessor.isIgnoreInvalidEndpoint(); 269 } 270 271 @Override 272 protected void doStart() throws Exception { 273 ServiceHelper.startService(processor); 274 } 275 276 @Override 277 protected void doStop() throws Exception { 278 ServiceHelper.stopService(processor); 279 } 280 281 @Override 282 protected void doShutdown() throws Exception { 283 ServiceHelper.stopAndShutdownService(processor); 284 if (shutdownExecutorService) { 285 getCamelContext().getExecutorServiceManager().shutdownNow(executorService); 286 } 287 } 288}