001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.ExecutorService; 022import javax.xml.bind.annotation.XmlAccessType; 023import javax.xml.bind.annotation.XmlAccessorType; 024import javax.xml.bind.annotation.XmlAttribute; 025import javax.xml.bind.annotation.XmlElement; 026import javax.xml.bind.annotation.XmlElementRef; 027import javax.xml.bind.annotation.XmlRootElement; 028import javax.xml.bind.annotation.XmlTransient; 029 030import org.apache.camel.ExchangePattern; 031import org.apache.camel.Expression; 032import org.apache.camel.Processor; 033import org.apache.camel.processor.CamelInternalProcessor; 034import org.apache.camel.processor.SendDynamicProcessor; 035import org.apache.camel.processor.WireTapProcessor; 036import org.apache.camel.spi.Metadata; 037import org.apache.camel.spi.RouteContext; 038import org.apache.camel.util.CamelContextHelper; 039 040/** 041 * Routes a copy of a message (or creates a new message) to a secondary destination while continue routing the original message. 042 */ 043@Metadata(label = "eip,endpoint,routing") 044@XmlRootElement(name = "wireTap") 045@XmlAccessorType(XmlAccessType.FIELD) 046public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends ToDynamicDefinition implements ExecutorServiceAwareDefinition<WireTapDefinition<Type>> { 047 @XmlTransient 048 private Processor newExchangeProcessor; 049 @XmlAttribute(name = "processorRef") 050 private String newExchangeProcessorRef; 051 @XmlElement(name = "body") 052 private ExpressionSubElementDefinition newExchangeExpression; 053 @XmlElementRef 054 private List<SetHeaderDefinition> headers = new ArrayList<SetHeaderDefinition>(); 055 @XmlTransient 056 private ExecutorService executorService; 057 @XmlAttribute 058 private String executorServiceRef; 059 @XmlAttribute @Metadata(defaultValue = "true") 060 private Boolean copy; 061 @XmlAttribute 062 private String onPrepareRef; 063 @XmlTransient 064 private Processor onPrepare; 065 066 public WireTapDefinition() { 067 } 068 069 @Override 070 public Processor createProcessor(RouteContext routeContext) throws Exception { 071 // executor service is mandatory for wire tap 072 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true); 073 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "WireTap", this, true); 074 075 // must use InOnly for WireTap 076 setPattern(ExchangePattern.InOnly); 077 078 // create the send dynamic producer to send to the wire tapped endpoint 079 SendDynamicProcessor dynamicTo = (SendDynamicProcessor) super.createProcessor(routeContext); 080 081 // create error handler we need to use for processing the wire tapped 082 Processor target = wrapInErrorHandler(routeContext, dynamicTo); 083 084 // and wrap in unit of work 085 CamelInternalProcessor internal = new CamelInternalProcessor(target); 086 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); 087 088 // is true bt default 089 boolean isCopy = getCopy() == null || getCopy(); 090 091 WireTapProcessor answer = new WireTapProcessor(dynamicTo, internal, getPattern(), threadPool, shutdownThreadPool); 092 answer.setCopy(isCopy); 093 if (newExchangeProcessorRef != null) { 094 newExchangeProcessor = routeContext.mandatoryLookup(newExchangeProcessorRef, Processor.class); 095 } 096 if (newExchangeProcessor != null) { 097 answer.addNewExchangeProcessor(newExchangeProcessor); 098 } 099 if (newExchangeExpression != null) { 100 answer.setNewExchangeExpression(newExchangeExpression.createExpression(routeContext)); 101 } 102 if (headers != null && !headers.isEmpty()) { 103 for (SetHeaderDefinition header : headers) { 104 Processor processor = createProcessor(routeContext, header); 105 answer.addNewExchangeProcessor(processor); 106 } 107 } 108 if (onPrepareRef != null) { 109 onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); 110 } 111 if (onPrepare != null) { 112 answer.setOnPrepare(onPrepare); 113 } 114 115 return answer; 116 } 117 118 public ExchangePattern getPattern() { 119 return ExchangePattern.InOnly; 120 } 121 122 @Override 123 public String toString() { 124 return "WireTap[" + getUri() + "]"; 125 } 126 127 @Override 128 public String getLabel() { 129 return "wireTap[" + getUri() + "]"; 130 } 131 132 @Override 133 @SuppressWarnings("unchecked") 134 public Type end() { 135 // allow end() to return to previous type so you can continue in the DSL 136 return (Type) super.end(); 137 } 138 139 @Override 140 public void addOutput(ProcessorDefinition<?> output) { 141 // add outputs on parent as this wiretap does not support outputs 142 getParent().addOutput(output); 143 } 144 145 // Fluent API 146 // ------------------------------------------------------------------------- 147 148 /** 149 * Uses a custom thread pool 150 * 151 * @param executorService a custom {@link ExecutorService} to use as thread pool 152 * for sending tapped exchanges 153 * @return the builder 154 */ 155 public WireTapDefinition<Type> executorService(ExecutorService executorService) { 156 setExecutorService(executorService); 157 return this; 158 } 159 160 /** 161 * Uses a custom thread pool 162 * 163 * @param executorServiceRef reference to lookup a custom {@link ExecutorService} 164 * to use as thread pool for sending tapped exchanges 165 * @return the builder 166 */ 167 public WireTapDefinition<Type> executorServiceRef(String executorServiceRef) { 168 setExecutorServiceRef(executorServiceRef); 169 return this; 170 } 171 172 /** 173 * Uses a copy of the original exchange 174 * 175 * @return the builder 176 */ 177 public WireTapDefinition<Type> copy() { 178 setCopy(true); 179 return this; 180 } 181 182 /** 183 * Uses a copy of the original exchange 184 * 185 * @param copy if it is true camel will copy the original exchange, 186 * if it is false camel will not copy the original exchange 187 * @return the builder 188 */ 189 public WireTapDefinition<Type> copy(boolean copy) { 190 setCopy(copy); 191 return this; 192 } 193 194 /** 195 * @deprecated will be removed in Camel 3.0 Instead use {@link #newExchangeBody(org.apache.camel.Expression)} 196 */ 197 @Deprecated 198 public WireTapDefinition<Type> newExchange(Expression expression) { 199 return newExchangeBody(expression); 200 } 201 202 /** 203 * Sends a <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly} 204 * 205 * @param expression expression that creates the new body to send 206 * @return the builder 207 * @see #newExchangeHeader(String, org.apache.camel.Expression) 208 */ 209 public WireTapDefinition<Type> newExchangeBody(Expression expression) { 210 setNewExchangeExpression(new ExpressionSubElementDefinition(expression)); 211 return this; 212 } 213 214 /** 215 * Sends a <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly} 216 * 217 * @param ref reference to the {@link Processor} to lookup in the {@link org.apache.camel.spi.Registry} to 218 * be used for preparing the new exchange to send 219 * @return the builder 220 */ 221 public WireTapDefinition<Type> newExchangeRef(String ref) { 222 setNewExchangeProcessorRef(ref); 223 return this; 224 } 225 226 /** 227 * Sends a <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly} 228 * 229 * @param processor processor preparing the new exchange to send 230 * @return the builder 231 * @see #newExchangeHeader(String, org.apache.camel.Expression) 232 */ 233 public WireTapDefinition<Type> newExchange(Processor processor) { 234 setNewExchangeProcessor(processor); 235 return this; 236 } 237 238 /** 239 * Sets a header on the <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly}. 240 * <p/> 241 * Use this together with the {@link #newExchange(org.apache.camel.Expression)} or {@link #newExchange(org.apache.camel.Processor)} 242 * methods. 243 * 244 * @param headerName the header name 245 * @param expression the expression setting the header value 246 * @return the builder 247 */ 248 public WireTapDefinition<Type> newExchangeHeader(String headerName, Expression expression) { 249 headers.add(new SetHeaderDefinition(headerName, expression)); 250 return this; 251 } 252 253 /** 254 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 255 * This can be used to deep-clone messages that should be send, or any custom logic needed before 256 * the exchange is send. 257 * 258 * @param onPrepare the processor 259 * @return the builder 260 */ 261 public WireTapDefinition<Type> onPrepare(Processor onPrepare) { 262 setOnPrepare(onPrepare); 263 return this; 264 } 265 266 /** 267 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 268 * This can be used to deep-clone messages that should be send, or any custom logic needed before 269 * the exchange is send. 270 * 271 * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry} 272 * @return the builder 273 */ 274 public WireTapDefinition<Type> onPrepareRef(String onPrepareRef) { 275 setOnPrepareRef(onPrepareRef); 276 return this; 277 } 278 279 /** 280 * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used 281 * to cache and reuse producers, when uris are reused. 282 * 283 * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off. 284 * @return the builder 285 */ 286 @Override 287 public WireTapDefinition<Type> cacheSize(int cacheSize) { 288 setCacheSize(cacheSize); 289 return this; 290 } 291 292 /** 293 * Ignore the invalidate endpoint exception when try to create a producer with that endpoint 294 * 295 * @return the builder 296 */ 297 @Override 298 public WireTapDefinition<Type> ignoreInvalidEndpoint() { 299 setIgnoreInvalidEndpoint(true); 300 return this; 301 } 302 303 // Properties 304 //------------------------------------------------------------------------- 305 306 @Override 307 public String getUri() { 308 return super.getUri(); 309 } 310 311 /** 312 * The uri of the endpoint to wiretap to. The uri can be dynamic computed using the {@link org.apache.camel.language.simple.SimpleLanguage} expression. 313 */ 314 @Override 315 public void setUri(String uri) { 316 super.setUri(uri); 317 } 318 319 public Processor getNewExchangeProcessor() { 320 return newExchangeProcessor; 321 } 322 323 /** 324 * To use a Processor for creating a new body as the message to use for wire tapping 325 */ 326 public void setNewExchangeProcessor(Processor processor) { 327 this.newExchangeProcessor = processor; 328 } 329 330 public String getNewExchangeProcessorRef() { 331 return newExchangeProcessorRef; 332 } 333 334 /** 335 * Reference to a Processor to use for creating a new body as the message to use for wire tapping 336 */ 337 public void setNewExchangeProcessorRef(String ref) { 338 this.newExchangeProcessorRef = ref; 339 } 340 341 public ExpressionSubElementDefinition getNewExchangeExpression() { 342 return newExchangeExpression; 343 } 344 345 /** 346 * Uses the expression for creating a new body as the message to use for wire tapping 347 */ 348 public void setNewExchangeExpression(ExpressionSubElementDefinition newExchangeExpression) { 349 this.newExchangeExpression = newExchangeExpression; 350 } 351 352 public ExecutorService getExecutorService() { 353 return executorService; 354 } 355 356 public void setExecutorService(ExecutorService executorService) { 357 this.executorService = executorService; 358 } 359 360 public String getExecutorServiceRef() { 361 return executorServiceRef; 362 } 363 364 public void setExecutorServiceRef(String executorServiceRef) { 365 this.executorServiceRef = executorServiceRef; 366 } 367 368 public Boolean getCopy() { 369 return copy; 370 } 371 372 public void setCopy(Boolean copy) { 373 this.copy = copy; 374 } 375 376 public String getOnPrepareRef() { 377 return onPrepareRef; 378 } 379 380 public void setOnPrepareRef(String onPrepareRef) { 381 this.onPrepareRef = onPrepareRef; 382 } 383 384 public Processor getOnPrepare() { 385 return onPrepare; 386 } 387 388 public void setOnPrepare(Processor onPrepare) { 389 this.onPrepare = onPrepare; 390 } 391 392 public List<SetHeaderDefinition> getHeaders() { 393 return headers; 394 } 395 396 public void setHeaders(List<SetHeaderDefinition> headers) { 397 this.headers = headers; 398 } 399 400}