001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.builder;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.Optional;
022import java.util.concurrent.Future;
023import java.util.function.Consumer;
024import java.util.function.Supplier;
025
026import org.apache.camel.CamelContext;
027import org.apache.camel.CamelExecutionException;
028import org.apache.camel.Endpoint;
029import org.apache.camel.Exchange;
030import org.apache.camel.ExchangePattern;
031import org.apache.camel.FluentProducerTemplate;
032import org.apache.camel.Message;
033import org.apache.camel.Processor;
034import org.apache.camel.ProducerTemplate;
035import org.apache.camel.processor.ConvertBodyProcessor;
036import org.apache.camel.support.ServiceSupport;
037import org.apache.camel.util.ExchangeHelper;
038import org.apache.camel.util.ObjectHelper;
039import org.apache.camel.util.ServiceHelper;
040
041public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate {
042
043    // transient state of headers and body which needs to be thread local scoped to be thread-safe
044    private final ThreadLocal<Map<String, Object>> headers = new ThreadLocal<>();
045    private final ThreadLocal<Object> body = new ThreadLocal<>();
046
047    private final CamelContext context;
048    private final ClassValue<ConvertBodyProcessor> resultProcessors;
049    private Optional<Consumer<ProducerTemplate>> templateCustomizer;
050    private Optional<Supplier<Exchange>> exchangeSupplier;
051    private Optional<Supplier<Processor>> processorSupplier;
052    private Optional<Endpoint> endpoint;
053    private Optional<Endpoint> defaultEndpoint;
054    private int maximumCacheSize;
055    private boolean eventNotifierEnabled;
056    private volatile ProducerTemplate template;
057
058    public DefaultFluentProducerTemplate(CamelContext context) {
059        this.context = context;
060        this.endpoint = Optional.empty();
061        this.defaultEndpoint = Optional.empty();
062        this.eventNotifierEnabled = true;
063        this.templateCustomizer = Optional.empty();
064        this.exchangeSupplier = Optional.empty();
065        this.processorSupplier = Optional.empty();
066        this.resultProcessors = new ClassValue<ConvertBodyProcessor>() {
067            @Override
068            protected ConvertBodyProcessor computeValue(Class<?> type) {
069                return new ConvertBodyProcessor(type);
070            }
071        };
072    }
073
074    @Override
075    public CamelContext getCamelContext() {
076        return context;
077    }
078
079    @Override
080    public int getCurrentCacheSize() {
081        if (template == null) {
082            return 0;
083        }
084        return template.getCurrentCacheSize();
085    }
086
087    @Override
088    public void cleanUp() {
089        if (template != null) {
090            template.cleanUp();
091        }
092    }
093
094    @Override
095    public void setDefaultEndpointUri(String endpointUri) {
096        setDefaultEndpoint(getCamelContext().getEndpoint(endpointUri));
097    }
098
099    @Override
100    public Endpoint getDefaultEndpoint() {
101        return defaultEndpoint.orElse(null);
102    }
103
104    @Override
105    public void setDefaultEndpoint(Endpoint defaultEndpoint) {
106        this.defaultEndpoint = Optional.ofNullable(defaultEndpoint);
107    }
108
109    @Override
110    public int getMaximumCacheSize() {
111        return maximumCacheSize;
112    }
113
114    @Override
115    public void setMaximumCacheSize(int maximumCacheSize) {
116        this.maximumCacheSize = maximumCacheSize;
117    }
118
119    @Override
120    public boolean isEventNotifierEnabled() {
121        return eventNotifierEnabled;
122    }
123
124    @Override
125    public void setEventNotifierEnabled(boolean eventNotifierEnabled) {
126        this.eventNotifierEnabled = eventNotifierEnabled;
127    }
128
129    @Override
130    public FluentProducerTemplate clearAll() {
131        clearBody();
132        clearHeaders();
133
134        return this;
135    }
136
137    @Override
138    public FluentProducerTemplate withHeader(String key, Object value) {
139        Map<String, Object> map = headers.get();
140        if (map == null) {
141            map = new HashMap<>();
142            headers.set(map);
143        }
144
145        map.put(key, value);
146
147        return this;
148    }
149
150    @Override
151    public FluentProducerTemplate clearHeaders() {
152        headers.remove();
153
154        return this;
155    }
156
157    @Override
158    public FluentProducerTemplate withBody(Object body) {
159        this.body.set(body);
160
161        return this;
162    }
163
164    @Override
165    public FluentProducerTemplate withBodyAs(Object body, Class<?> type) {
166        Object b = type != null
167            ? context.getTypeConverter().convertTo(type, body)
168            : body;
169
170        this.body.set(b);
171
172        return this;
173    }
174
175    @Override
176    public FluentProducerTemplate clearBody() {
177        body.remove();
178
179        return this;
180    }
181
182    @Override
183    public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) {
184        this.templateCustomizer = Optional.of(templateCustomizer);
185        return this;
186    }
187
188    @Override
189    public FluentProducerTemplate withExchange(final Exchange exchange) {
190        return withExchange(() -> exchange);
191    }
192
193    @Override
194    public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) {
195        this.exchangeSupplier = Optional.of(exchangeSupplier);
196        return this;
197    }
198
199    @Override
200    public FluentProducerTemplate withProcessor(final Processor processor) {
201        return withProcessor(() -> processor);
202    }
203
204    @Override
205    public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) {
206        this.processorSupplier = Optional.of(processorSupplier);
207        return this;
208    }
209
210    @Override
211    public FluentProducerTemplate to(String endpointUri) {
212        return to(context.getEndpoint(endpointUri));
213    }
214
215    @Override
216    public FluentProducerTemplate to(Endpoint endpoint) {
217        this.endpoint = Optional.of(endpoint);
218        return this;
219    }
220
221    // ************************
222    // REQUEST
223    // ************************
224
225    @Override
226    public Object request() throws CamelExecutionException {
227        return request(Object.class);
228    }
229
230    @Override
231    @SuppressWarnings("unchecked")
232    public <T> T request(Class<T> type) throws CamelExecutionException {
233        // Determine the target endpoint
234        final Endpoint target = target();
235
236        // Create the default processor if not provided.
237        final Supplier<Processor> processorSupplier = this.processorSupplier.orElse(() -> defaultProcessor());
238
239        T result;
240        if (type == Exchange.class) {
241            result = (T)template().request(target, processorSupplier.get());
242        } else if (type == Message.class) {
243            Exchange exchange = template().request(target, processorSupplier.get());
244            result = exchange.hasOut() ? (T)exchange.getOut() : (T)exchange.getIn();
245        } else {
246            Exchange exchange = template().send(
247                target,
248                ExchangePattern.InOut,
249                processorSupplier.get(),
250                resultProcessors.get(type)
251            );
252
253            result = context.getTypeConverter().convertTo(
254                type,
255                ExchangeHelper.extractResultBody(exchange, exchange.getPattern())
256            );
257        }
258
259        return result;
260    }
261
262    @Override
263    public Future<Object> asyncRequest() {
264        return asyncRequest(Object.class);
265    }
266
267    @Override
268    public <T> Future<T> asyncRequest(Class<T> type) {
269        // Determine the target endpoint
270        final Endpoint target = target();
271
272        Future<T> result;
273        if (ObjectHelper.isNotEmpty(headers.get())) {
274            // Make a copy of the headers and body so that async processing won't
275            // be invalidated by subsequent reuse of the template
276            final Map<String, Object> headersCopy = new HashMap<>(headers.get());
277            final Object bodyCopy = body.get();
278
279            result = template().asyncRequestBodyAndHeaders(target, bodyCopy, headersCopy, type);
280        } else {
281            // Make a copy of the and body so that async processing won't be
282            // invalidated by subsequent reuse of the template
283            final Object bodyCopy = body.get();
284
285            result = template().asyncRequestBody(target, bodyCopy, type);
286        }
287
288        return result;
289    }
290
291    // ************************
292    // SEND
293    // ************************
294
295    @Override
296    public Exchange send() throws CamelExecutionException {
297        // Determine the target endpoint
298        final Endpoint target = target();
299
300        return exchangeSupplier.isPresent()
301            ? template().send(target, exchangeSupplier.get().get())
302            : template().send(target, processorSupplier.orElse(() -> defaultProcessor()).get());
303    }
304
305    @Override
306    public Future<Exchange> asyncSend() {
307        // Determine the target endpoint
308        final Endpoint target = target();
309
310        return exchangeSupplier.isPresent()
311            ? template().asyncSend(target, exchangeSupplier.get().get())
312            : template().asyncSend(target, processorSupplier.orElse(() -> defaultAsyncProcessor()).get());
313    }
314
315    // ************************
316    // HELPERS
317    // ************************
318
319    /**
320     * Create the FluentProducerTemplate by setting the camel context
321     *
322     * @param context the camel context
323     */
324    public static FluentProducerTemplate on(CamelContext context) {
325        return new DefaultFluentProducerTemplate(context);
326    }
327
328    private ProducerTemplate template() {
329        ObjectHelper.notNull(context, "CamelContext");
330
331        if (template == null) {
332            template = maximumCacheSize > 0 ? context.createProducerTemplate(maximumCacheSize) : context.createProducerTemplate();
333            defaultEndpoint.ifPresent(template::setDefaultEndpoint);
334            template.setEventNotifierEnabled(eventNotifierEnabled);
335            templateCustomizer.ifPresent(tc -> tc.accept(template));
336        }
337
338        return template;
339    }
340
341    private Processor defaultProcessor() {
342        return exchange -> {
343            ObjectHelper.ifNotEmpty(headers.get(), exchange.getIn().getHeaders()::putAll);
344            ObjectHelper.ifNotEmpty(body.get(), exchange.getIn()::setBody);
345        };
346    }
347
348    private Processor defaultAsyncProcessor() {
349        final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers.get()) ? new HashMap<>(this.headers.get()) : null;
350        final Object bodyCopy = this.body.get();
351
352        return exchange -> {
353            ObjectHelper.ifNotEmpty(headersCopy, exchange.getIn().getHeaders()::putAll);
354            ObjectHelper.ifNotEmpty(bodyCopy, exchange.getIn()::setBody);
355        };
356    }
357
358    private Endpoint target() {
359        if (endpoint.isPresent()) {
360            return endpoint.get();
361        }
362        if (defaultEndpoint.isPresent()) {
363            return defaultEndpoint.get();
364        }
365
366        throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)");
367    }
368
369    @Override
370    protected void doStart() throws Exception {
371        if (template == null) {
372            template = template();
373        }
374        ServiceHelper.startService(template);
375    }
376
377    @Override
378    protected void doStop() throws Exception {
379        clearAll();
380        ServiceHelper.stopService(template);
381    }
382}