001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.ExecutorService; 022 023import javax.xml.bind.annotation.XmlAccessType; 024import javax.xml.bind.annotation.XmlAccessorType; 025import javax.xml.bind.annotation.XmlAttribute; 026import javax.xml.bind.annotation.XmlElement; 027import javax.xml.bind.annotation.XmlElementRef; 028import javax.xml.bind.annotation.XmlRootElement; 029import javax.xml.bind.annotation.XmlTransient; 030 031import org.apache.camel.ExchangePattern; 032import org.apache.camel.Expression; 033import org.apache.camel.Processor; 034import org.apache.camel.builder.ExpressionBuilder; 035import org.apache.camel.processor.CamelInternalProcessor; 036import org.apache.camel.processor.SendDynamicProcessor; 037import org.apache.camel.processor.WireTapProcessor; 038import org.apache.camel.spi.Metadata; 039import org.apache.camel.spi.RouteContext; 040import org.apache.camel.util.CamelContextHelper; 041 042/** 043 * Routes a copy of a message (or creates a new message) to a secondary destination while continue routing the original message. 044 */ 045@Metadata(label = "eip,endpoint,routing") 046@XmlRootElement(name = "wireTap") 047@XmlAccessorType(XmlAccessType.FIELD) 048public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends ToDynamicDefinition implements ExecutorServiceAwareDefinition<WireTapDefinition<Type>> { 049 @XmlTransient 050 private Processor newExchangeProcessor; 051 @XmlAttribute(name = "processorRef") 052 private String newExchangeProcessorRef; 053 @XmlElement(name = "body") 054 private ExpressionSubElementDefinition newExchangeExpression; 055 @XmlElementRef 056 private List<SetHeaderDefinition> headers = new ArrayList<>(); 057 @XmlTransient 058 private ExecutorService executorService; 059 @XmlAttribute 060 private String executorServiceRef; 061 @XmlAttribute @Metadata(defaultValue = "true") 062 private Boolean copy; 063 @XmlAttribute @Metadata(defaultValue = "true") 064 private Boolean dynamicUri; 065 @XmlAttribute 066 private String onPrepareRef; 067 @XmlTransient 068 private Processor onPrepare; 069 070 public WireTapDefinition() { 071 } 072 073 @Override 074 public Processor createProcessor(RouteContext routeContext) throws Exception { 075 // executor service is mandatory for wire tap 076 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true); 077 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "WireTap", this, true); 078 079 // must use InOnly for WireTap 080 setPattern(ExchangePattern.InOnly); 081 082 // create the send dynamic producer to send to the wire tapped endpoint 083 SendDynamicProcessor dynamicTo = (SendDynamicProcessor) super.createProcessor(routeContext); 084 085 // create error handler we need to use for processing the wire tapped 086 Processor target = wrapInErrorHandler(routeContext, dynamicTo); 087 088 // and wrap in unit of work 089 CamelInternalProcessor internal = new CamelInternalProcessor(target); 090 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); 091 092 // is true bt default 093 boolean isCopy = getCopy() == null || getCopy(); 094 095 WireTapProcessor answer = new WireTapProcessor(dynamicTo, internal, getPattern(), threadPool, shutdownThreadPool, isDynamic()); 096 answer.setCopy(isCopy); 097 if (newExchangeProcessorRef != null) { 098 newExchangeProcessor = routeContext.mandatoryLookup(newExchangeProcessorRef, Processor.class); 099 } 100 if (newExchangeProcessor != null) { 101 answer.addNewExchangeProcessor(newExchangeProcessor); 102 } 103 if (newExchangeExpression != null) { 104 answer.setNewExchangeExpression(newExchangeExpression.createExpression(routeContext)); 105 } 106 if (headers != null && !headers.isEmpty()) { 107 for (SetHeaderDefinition header : headers) { 108 Processor processor = createProcessor(routeContext, header); 109 answer.addNewExchangeProcessor(processor); 110 } 111 } 112 if (onPrepareRef != null) { 113 onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); 114 } 115 if (onPrepare != null) { 116 answer.setOnPrepare(onPrepare); 117 } 118 119 return answer; 120 } 121 122 @Override 123 protected Expression createExpression(RouteContext routeContext) { 124 // whether to use dynamic or static uri 125 if (isDynamic()) { 126 return super.createExpression(routeContext); 127 } else { 128 return ExpressionBuilder.constantExpression(getUri()); 129 } 130 } 131 132 private boolean isDynamic() { 133 // its dynamic by default 134 return dynamicUri == null || dynamicUri; 135 } 136 137 public ExchangePattern getPattern() { 138 return ExchangePattern.InOnly; 139 } 140 141 @Override 142 public String toString() { 143 return "WireTap[" + getUri() + "]"; 144 } 145 146 @Override 147 public String getShortName() { 148 return "wireTap"; 149 } 150 151 @Override 152 public String getLabel() { 153 return "wireTap[" + getUri() + "]"; 154 } 155 156 @Override 157 @SuppressWarnings("unchecked") 158 public Type end() { 159 // allow end() to return to previous type so you can continue in the DSL 160 return (Type) super.end(); 161 } 162 163 @Override 164 public void addOutput(ProcessorDefinition<?> output) { 165 // add outputs on parent as this wiretap does not support outputs 166 getParent().addOutput(output); 167 } 168 169 // Fluent API 170 // ------------------------------------------------------------------------- 171 172 /** 173 * Uses a custom thread pool 174 * 175 * @param executorService a custom {@link ExecutorService} to use as thread pool 176 * for sending tapped exchanges 177 * @return the builder 178 */ 179 public WireTapDefinition<Type> executorService(ExecutorService executorService) { 180 setExecutorService(executorService); 181 return this; 182 } 183 184 /** 185 * Uses a custom thread pool 186 * 187 * @param executorServiceRef reference to lookup a custom {@link ExecutorService} 188 * to use as thread pool for sending tapped exchanges 189 * @return the builder 190 */ 191 public WireTapDefinition<Type> executorServiceRef(String executorServiceRef) { 192 setExecutorServiceRef(executorServiceRef); 193 return this; 194 } 195 196 /** 197 * Uses a copy of the original exchange 198 * 199 * @return the builder 200 */ 201 public WireTapDefinition<Type> copy() { 202 setCopy(true); 203 return this; 204 } 205 206 /** 207 * Uses a copy of the original exchange 208 * 209 * @param copy if it is true camel will copy the original exchange, 210 * if it is false camel will not copy the original exchange 211 * @return the builder 212 */ 213 public WireTapDefinition<Type> copy(boolean copy) { 214 setCopy(copy); 215 return this; 216 } 217 218 /** 219 * Whether the uri is dynamic or static. 220 * If the uri is dynamic then the simple language is used to evaluate a dynamic uri to use as the wire-tap destination, 221 * for each incoming message. This works similar to how the <tt>toD</tt> EIP pattern works. 222 * If static then the uri is used as-is as the wire-tap destination. 223 * 224 * @param dynamicUri whether to use dynamic or static uris 225 * @return the builder 226 */ 227 public WireTapDefinition<Type> dynamicUri(boolean dynamicUri) { 228 setDynamicUri(dynamicUri); 229 return this; 230 } 231 232 /** 233 * @deprecated will be removed in Camel 3.0 Instead use {@link #newExchangeBody(org.apache.camel.Expression)} 234 */ 235 @Deprecated 236 public WireTapDefinition<Type> newExchange(Expression expression) { 237 return newExchangeBody(expression); 238 } 239 240 /** 241 * Sends a <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly} 242 * 243 * @param expression expression that creates the new body to send 244 * @return the builder 245 * @see #newExchangeHeader(String, org.apache.camel.Expression) 246 */ 247 public WireTapDefinition<Type> newExchangeBody(Expression expression) { 248 setNewExchangeExpression(new ExpressionSubElementDefinition(expression)); 249 return this; 250 } 251 252 /** 253 * Sends a <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly} 254 * 255 * @param ref reference to the {@link Processor} to lookup in the {@link org.apache.camel.spi.Registry} to 256 * be used for preparing the new exchange to send 257 * @return the builder 258 */ 259 public WireTapDefinition<Type> newExchangeRef(String ref) { 260 setNewExchangeProcessorRef(ref); 261 return this; 262 } 263 264 /** 265 * Sends a <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly} 266 * 267 * @param processor processor preparing the new exchange to send 268 * @return the builder 269 * @see #newExchangeHeader(String, org.apache.camel.Expression) 270 */ 271 public WireTapDefinition<Type> newExchange(Processor processor) { 272 setNewExchangeProcessor(processor); 273 return this; 274 } 275 276 /** 277 * Sets a header on the <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly}. 278 * <p/> 279 * Use this together with the {@link #newExchange(org.apache.camel.Expression)} or {@link #newExchange(org.apache.camel.Processor)} 280 * methods. 281 * 282 * @param headerName the header name 283 * @param expression the expression setting the header value 284 * @return the builder 285 */ 286 public WireTapDefinition<Type> newExchangeHeader(String headerName, Expression expression) { 287 headers.add(new SetHeaderDefinition(headerName, expression)); 288 return this; 289 } 290 291 /** 292 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 293 * This can be used to deep-clone messages that should be send, or any custom logic needed before 294 * the exchange is send. 295 * 296 * @param onPrepare the processor 297 * @return the builder 298 */ 299 public WireTapDefinition<Type> onPrepare(Processor onPrepare) { 300 setOnPrepare(onPrepare); 301 return this; 302 } 303 304 /** 305 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 306 * This can be used to deep-clone messages that should be send, or any custom logic needed before 307 * the exchange is send. 308 * 309 * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry} 310 * @return the builder 311 */ 312 public WireTapDefinition<Type> onPrepareRef(String onPrepareRef) { 313 setOnPrepareRef(onPrepareRef); 314 return this; 315 } 316 317 /** 318 * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used 319 * to cache and reuse producers, when uris are reused. 320 * 321 * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off. 322 * @return the builder 323 */ 324 @Override 325 public WireTapDefinition<Type> cacheSize(int cacheSize) { 326 setCacheSize(cacheSize); 327 return this; 328 } 329 330 /** 331 * Ignore the invalidate endpoint exception when try to create a producer with that endpoint 332 * 333 * @return the builder 334 */ 335 @Override 336 public WireTapDefinition<Type> ignoreInvalidEndpoint() { 337 setIgnoreInvalidEndpoint(true); 338 return this; 339 } 340 341 // Properties 342 //------------------------------------------------------------------------- 343 344 @Override 345 public String getUri() { 346 return super.getUri(); 347 } 348 349 /** 350 * The uri of the endpoint to wiretap to. The uri can be dynamic computed using the {@link org.apache.camel.language.simple.SimpleLanguage} expression. 351 */ 352 @Override 353 public void setUri(String uri) { 354 super.setUri(uri); 355 } 356 357 public Processor getNewExchangeProcessor() { 358 return newExchangeProcessor; 359 } 360 361 /** 362 * To use a Processor for creating a new body as the message to use for wire tapping 363 */ 364 public void setNewExchangeProcessor(Processor processor) { 365 this.newExchangeProcessor = processor; 366 } 367 368 public String getNewExchangeProcessorRef() { 369 return newExchangeProcessorRef; 370 } 371 372 /** 373 * Reference to a Processor to use for creating a new body as the message to use for wire tapping 374 */ 375 public void setNewExchangeProcessorRef(String ref) { 376 this.newExchangeProcessorRef = ref; 377 } 378 379 public ExpressionSubElementDefinition getNewExchangeExpression() { 380 return newExchangeExpression; 381 } 382 383 /** 384 * Uses the expression for creating a new body as the message to use for wire tapping 385 */ 386 public void setNewExchangeExpression(ExpressionSubElementDefinition newExchangeExpression) { 387 this.newExchangeExpression = newExchangeExpression; 388 } 389 390 public ExecutorService getExecutorService() { 391 return executorService; 392 } 393 394 public void setExecutorService(ExecutorService executorService) { 395 this.executorService = executorService; 396 } 397 398 public String getExecutorServiceRef() { 399 return executorServiceRef; 400 } 401 402 public void setExecutorServiceRef(String executorServiceRef) { 403 this.executorServiceRef = executorServiceRef; 404 } 405 406 public Boolean getCopy() { 407 return copy; 408 } 409 410 public void setCopy(Boolean copy) { 411 this.copy = copy; 412 } 413 414 public Boolean getDynamicUri() { 415 return dynamicUri; 416 } 417 418 public void setDynamicUri(Boolean dynamicUri) { 419 this.dynamicUri = dynamicUri; 420 } 421 422 public String getOnPrepareRef() { 423 return onPrepareRef; 424 } 425 426 public void setOnPrepareRef(String onPrepareRef) { 427 this.onPrepareRef = onPrepareRef; 428 } 429 430 public Processor getOnPrepare() { 431 return onPrepare; 432 } 433 434 public void setOnPrepare(Processor onPrepare) { 435 this.onPrepare = onPrepare; 436 } 437 438 public List<SetHeaderDefinition> getHeaders() { 439 return headers; 440 } 441 442 public void setHeaders(List<SetHeaderDefinition> headers) { 443 this.headers = headers; 444 } 445 446}