001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.List; 022import java.util.concurrent.Callable; 023import java.util.concurrent.ExecutorService; 024 025import org.apache.camel.AsyncCallback; 026import org.apache.camel.AsyncProcessor; 027import org.apache.camel.CamelContext; 028import org.apache.camel.CamelContextAware; 029import org.apache.camel.Exchange; 030import org.apache.camel.ExchangePattern; 031import org.apache.camel.Expression; 032import org.apache.camel.Message; 033import org.apache.camel.Processor; 034import org.apache.camel.StreamCache; 035import org.apache.camel.Traceable; 036import org.apache.camel.impl.DefaultExchange; 037import org.apache.camel.spi.EndpointUtilizationStatistics; 038import org.apache.camel.spi.IdAware; 039import org.apache.camel.support.ServiceSupport; 040import org.apache.camel.util.AsyncProcessorHelper; 041import org.apache.camel.util.ExchangeHelper; 042import org.apache.camel.util.ObjectHelper; 043import org.apache.camel.util.ServiceHelper; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * Processor for wire tapping exchanges to an endpoint destination. 049 * 050 * @version 051 */ 052public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware, CamelContextAware { 053 private static final Logger LOG = LoggerFactory.getLogger(WireTapProcessor.class); 054 private String id; 055 private CamelContext camelContext; 056 private final SendDynamicProcessor dynamicProcessor; 057 private final String uri; 058 private final Processor processor; 059 private final ExchangePattern exchangePattern; 060 private final ExecutorService executorService; 061 private volatile boolean shutdownExecutorService; 062 063 // expression or processor used for populating a new exchange to send 064 // as opposed to traditional wiretap that sends a copy of the original exchange 065 private Expression newExchangeExpression; 066 private List<Processor> newExchangeProcessors; 067 private boolean copy; 068 private Processor onPrepare; 069 070 public WireTapProcessor(SendDynamicProcessor dynamicProcessor, Processor processor, ExchangePattern exchangePattern, 071 ExecutorService executorService, boolean shutdownExecutorService) { 072 this.dynamicProcessor = dynamicProcessor; 073 this.uri = dynamicProcessor.getUri(); 074 this.processor = processor; 075 this.exchangePattern = exchangePattern; 076 ObjectHelper.notNull(executorService, "executorService"); 077 this.executorService = executorService; 078 this.shutdownExecutorService = shutdownExecutorService; 079 } 080 081 @Override 082 public String toString() { 083 return "WireTap[" + uri + "]"; 084 } 085 086 @Override 087 public String getTraceLabel() { 088 return "wireTap(" + uri + ")"; 089 } 090 091 public String getId() { 092 return id; 093 } 094 095 public void setId(String id) { 096 this.id = id; 097 } 098 099 public CamelContext getCamelContext() { 100 return camelContext; 101 } 102 103 public void setCamelContext(CamelContext camelContext) { 104 this.camelContext = camelContext; 105 } 106 107 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 108 return dynamicProcessor.getEndpointUtilizationStatistics(); 109 } 110 111 public void process(Exchange exchange) throws Exception { 112 AsyncProcessorHelper.process(this, exchange); 113 } 114 115 public boolean process(final Exchange exchange, final AsyncCallback callback) { 116 if (!isStarted()) { 117 throw new IllegalStateException("WireTapProcessor has not been started: " + this); 118 } 119 120 // must configure the wire tap beforehand 121 Exchange target; 122 try { 123 target = configureExchange(exchange, exchangePattern); 124 } catch (Exception e) { 125 exchange.setException(e); 126 callback.done(true); 127 return true; 128 } 129 130 final Exchange wireTapExchange = target; 131 132 // send the exchange to the destination using an executor service 133 executorService.submit(new Callable<Exchange>() { 134 public Exchange call() throws Exception { 135 try { 136 LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange); 137 processor.process(wireTapExchange); 138 } catch (Throwable e) { 139 LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.", e); 140 } 141 return wireTapExchange; 142 } 143 }); 144 145 // continue routing this synchronously 146 callback.done(true); 147 return true; 148 } 149 150 151 protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) throws IOException { 152 Exchange answer; 153 if (copy) { 154 // use a copy of the original exchange 155 answer = configureCopyExchange(exchange); 156 } else { 157 // use a new exchange 158 answer = configureNewExchange(exchange); 159 } 160 161 // prepare the exchange 162 if (newExchangeExpression != null) { 163 Object body = newExchangeExpression.evaluate(answer, Object.class); 164 if (body != null) { 165 answer.getIn().setBody(body); 166 } 167 } 168 169 if (newExchangeProcessors != null) { 170 for (Processor processor : newExchangeProcessors) { 171 try { 172 processor.process(answer); 173 } catch (Exception e) { 174 throw ObjectHelper.wrapRuntimeCamelException(e); 175 } 176 } 177 } 178 179 // if the body is a stream cache we must use a copy of the stream in the wire tapped exchange 180 Message msg = answer.hasOut() ? answer.getOut() : answer.getIn(); 181 if (msg.getBody() instanceof StreamCache) { 182 // in parallel processing case, the stream must be copied, therefore get the stream 183 StreamCache cache = (StreamCache) msg.getBody(); 184 StreamCache copied = cache.copy(answer); 185 if (copied != null) { 186 msg.setBody(copied); 187 } 188 } 189 190 // invoke on prepare on the exchange if specified 191 if (onPrepare != null) { 192 try { 193 onPrepare.process(answer); 194 } catch (Exception e) { 195 throw ObjectHelper.wrapRuntimeCamelException(e); 196 } 197 } 198 199 return answer; 200 } 201 202 private Exchange configureCopyExchange(Exchange exchange) { 203 // must use a copy as we dont want it to cause side effects of the original exchange 204 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 205 // set MEP to InOnly as this wire tap is a fire and forget 206 copy.setPattern(ExchangePattern.InOnly); 207 return copy; 208 } 209 210 private Exchange configureNewExchange(Exchange exchange) { 211 return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly); 212 } 213 214 public List<Processor> getNewExchangeProcessors() { 215 return newExchangeProcessors; 216 } 217 218 public void setNewExchangeProcessors(List<Processor> newExchangeProcessors) { 219 this.newExchangeProcessors = newExchangeProcessors; 220 } 221 222 public Expression getNewExchangeExpression() { 223 return newExchangeExpression; 224 } 225 226 public void setNewExchangeExpression(Expression newExchangeExpression) { 227 this.newExchangeExpression = newExchangeExpression; 228 } 229 230 public void addNewExchangeProcessor(Processor processor) { 231 if (newExchangeProcessors == null) { 232 newExchangeProcessors = new ArrayList<Processor>(); 233 } 234 newExchangeProcessors.add(processor); 235 } 236 237 public boolean isCopy() { 238 return copy; 239 } 240 241 public void setCopy(boolean copy) { 242 this.copy = copy; 243 } 244 245 public Processor getOnPrepare() { 246 return onPrepare; 247 } 248 249 public void setOnPrepare(Processor onPrepare) { 250 this.onPrepare = onPrepare; 251 } 252 253 public String getUri() { 254 return uri; 255 } 256 257 public int getCacheSize() { 258 return dynamicProcessor.getCacheSize(); 259 } 260 261 public boolean isIgnoreInvalidEndpoint() { 262 return dynamicProcessor.isIgnoreInvalidEndpoint(); 263 } 264 265 @Override 266 protected void doStart() throws Exception { 267 ServiceHelper.startService(processor); 268 } 269 270 @Override 271 protected void doStop() throws Exception { 272 ServiceHelper.stopService(processor); 273 } 274 275 @Override 276 protected void doShutdown() throws Exception { 277 ServiceHelper.stopAndShutdownService(processor); 278 if (shutdownExecutorService) { 279 getCamelContext().getExecutorServiceManager().shutdownNow(executorService); 280 } 281 } 282}