001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.HashMap; 020 021 import org.apache.camel.AsyncCallback; 022 import org.apache.camel.AsyncProcessor; 023 import org.apache.camel.AsyncProducerCallback; 024 import org.apache.camel.CamelContext; 025 import org.apache.camel.Endpoint; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.ExchangePattern; 028 import org.apache.camel.Producer; 029 import org.apache.camel.ProducerCallback; 030 import org.apache.camel.Traceable; 031 import org.apache.camel.impl.InterceptSendToEndpoint; 032 import org.apache.camel.impl.ProducerCache; 033 import org.apache.camel.support.ServiceSupport; 034 import org.apache.camel.util.AsyncProcessorHelper; 035 import org.apache.camel.util.ObjectHelper; 036 import org.apache.camel.util.ServiceHelper; 037 import org.apache.camel.util.URISupport; 038 import org.slf4j.Logger; 039 import org.slf4j.LoggerFactory; 040 041 /** 042 * Processor for forwarding exchanges to an endpoint destination. 043 * 044 * @version 045 */ 046 public class SendProcessor extends ServiceSupport implements AsyncProcessor, Traceable { 047 protected final transient Logger log = LoggerFactory.getLogger(getClass()); 048 protected final CamelContext camelContext; 049 protected ProducerCache producerCache; 050 protected Endpoint destination; 051 protected ExchangePattern pattern; 052 053 public SendProcessor(Endpoint destination) { 054 ObjectHelper.notNull(destination, "destination"); 055 this.destination = destination; 056 this.camelContext = destination.getCamelContext(); 057 ObjectHelper.notNull(this.camelContext, "camelContext"); 058 } 059 060 public SendProcessor(Endpoint destination, ExchangePattern pattern) { 061 this(destination); 062 this.pattern = pattern; 063 } 064 065 @Override 066 public String toString() { 067 return "sendTo(" + destination + (pattern != null ? " " + pattern : "") + ")"; 068 } 069 070 public void setDestination(Endpoint destination) { 071 this.destination = destination; 072 // destination changed so purge the cache 073 if (producerCache != null) { 074 producerCache.purge(); 075 } 076 } 077 078 public String getTraceLabel() { 079 return URISupport.sanitizeUri(destination.getEndpointUri()); 080 } 081 082 public void process(final Exchange exchange) throws Exception { 083 if (!isStarted()) { 084 throw new IllegalStateException("SendProcessor has not been started: " + this); 085 } 086 087 // we should preserve existing MEP so remember old MEP 088 // if you want to permanently to change the MEP then use .setExchangePattern in the DSL 089 final ExchangePattern existingPattern = exchange.getPattern(); 090 091 // send the exchange to the destination using a producer 092 producerCache.doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() { 093 public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception { 094 exchange = configureExchange(exchange, pattern); 095 log.debug(">>>> {} {}", destination, exchange); 096 try { 097 producer.process(exchange); 098 } finally { 099 // restore previous MEP 100 exchange.setPattern(existingPattern); 101 } 102 return exchange; 103 } 104 }); 105 } 106 107 public boolean process(Exchange exchange, final AsyncCallback callback) { 108 if (!isStarted()) { 109 throw new IllegalStateException("SendProcessor has not been started: " + this); 110 } 111 112 // we should preserve existing MEP so remember old MEP 113 // if you want to permanently to change the MEP then use .setExchangePattern in the DSL 114 final ExchangePattern existingPattern = exchange.getPattern(); 115 116 // send the exchange to the destination using a producer 117 return producerCache.doInAsyncProducer(destination, exchange, pattern, callback, new AsyncProducerCallback() { 118 public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange, 119 ExchangePattern pattern, final AsyncCallback callback) { 120 final Exchange target = configureExchange(exchange, pattern); 121 log.debug(">>>> {} {}", destination, exchange); 122 return AsyncProcessorHelper.process(asyncProducer, target, new AsyncCallback() { 123 public void done(boolean doneSync) { 124 // restore previous MEP 125 target.setPattern(existingPattern); 126 // signal we are done 127 callback.done(doneSync); 128 } 129 }); 130 } 131 }); 132 } 133 134 public Endpoint getDestination() { 135 return destination; 136 } 137 138 public ExchangePattern getPattern() { 139 return pattern; 140 } 141 142 protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) { 143 if (pattern != null) { 144 exchange.setPattern(pattern); 145 } 146 // set property which endpoint we send to 147 exchange.setProperty(Exchange.TO_ENDPOINT, destination.getEndpointUri()); 148 return exchange; 149 } 150 151 protected void doStart() throws Exception { 152 if (producerCache == null) { 153 // use a single producer cache as we need to only hold reference for one destination 154 // and use a regular HashMap as we do not want a soft reference store that may get re-claimed when low on memory 155 // as we want to ensure the producer is kept around, to ensure its lifecycle is fully managed, 156 // eg stopping the producer when we stop etc. 157 producerCache = new ProducerCache(this, camelContext, new HashMap<String, Producer>(1)); 158 // do not add as service as we do not want to manage the producer cache 159 } 160 ServiceHelper.startService(producerCache); 161 162 // the destination could since have been intercepted by a interceptSendToEndpoint so we got to 163 // lookup this before we can use the destination 164 Endpoint lookup = camelContext.hasEndpoint(destination.getEndpointKey()); 165 if (lookup instanceof InterceptSendToEndpoint) { 166 if (log.isDebugEnabled()) { 167 log.debug("Intercepted sending to {} -> {}", 168 URISupport.sanitizeUri(destination.getEndpointUri()), URISupport.sanitizeUri(lookup.getEndpointUri())); 169 } 170 destination = lookup; 171 } 172 // warm up the producer by starting it so we can fail fast if there was a problem 173 // however must start endpoint first 174 ServiceHelper.startService(destination); 175 producerCache.startProducer(destination); 176 } 177 178 protected void doStop() throws Exception { 179 ServiceHelper.stopService(producerCache); 180 } 181 182 protected void doShutdown() throws Exception { 183 ServiceHelper.stopAndShutdownService(producerCache); 184 } 185 }