001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.builder;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.Optional;
022import java.util.concurrent.Future;
023import java.util.function.Consumer;
024import java.util.function.Supplier;
025
026import org.apache.camel.CamelContext;
027import org.apache.camel.CamelExecutionException;
028import org.apache.camel.Endpoint;
029import org.apache.camel.Exchange;
030import org.apache.camel.ExchangePattern;
031import org.apache.camel.FluentProducerTemplate;
032import org.apache.camel.Message;
033import org.apache.camel.Processor;
034import org.apache.camel.ProducerTemplate;
035import org.apache.camel.processor.ConvertBodyProcessor;
036import org.apache.camel.support.ServiceSupport;
037import org.apache.camel.util.ExchangeHelper;
038import org.apache.camel.util.ObjectHelper;
039import org.apache.camel.util.ServiceHelper;
040
041public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate {
042
043    // transient state of headers and body which needs to be thread local scoped to be thread-safe
044    private final ThreadLocal<Map<String, Object>> headers = new ThreadLocal<>();
045    private final ThreadLocal<Object> body = new ThreadLocal<>();
046
047    private final CamelContext context;
048    private final ClassValue<ConvertBodyProcessor> resultProcessors;
049    private Optional<Consumer<ProducerTemplate>> templateCustomizer;
050    private Optional<Supplier<Exchange>> exchangeSupplier;
051    private Optional<Supplier<Processor>> processorSupplier;
052    private Optional<Endpoint> endpoint;
053    private Optional<Endpoint> defaultEndpoint;
054    private int maximumCacheSize;
055    private boolean eventNotifierEnabled;
056    private volatile ProducerTemplate template;
057
058    public DefaultFluentProducerTemplate(CamelContext context) {
059        this.context = context;
060        this.endpoint = Optional.empty();
061        this.defaultEndpoint = Optional.empty();
062        this.eventNotifierEnabled = true;
063        this.templateCustomizer = Optional.empty();
064        this.exchangeSupplier = Optional.empty();
065        this.processorSupplier = Optional.empty();
066        this.resultProcessors = new ClassValue<ConvertBodyProcessor>() {
067            @Override
068            protected ConvertBodyProcessor computeValue(Class<?> type) {
069                return new ConvertBodyProcessor(type);
070            }
071        };
072    }
073
074    @Override
075    public CamelContext getCamelContext() {
076        return context;
077    }
078
079    @Override
080    public int getCurrentCacheSize() {
081        if (template == null) {
082            return 0;
083        }
084        return template.getCurrentCacheSize();
085    }
086
087    @Override
088    public void cleanUp() {
089        if (template != null) {
090            template.cleanUp();
091        }
092    }
093
094    @Override
095    public void setDefaultEndpointUri(String endpointUri) {
096        setDefaultEndpoint(getCamelContext().getEndpoint(endpointUri));
097    }
098
099    @Override
100    public Endpoint getDefaultEndpoint() {
101        return defaultEndpoint.orElse(null);
102    }
103
104    @Override
105    public void setDefaultEndpoint(Endpoint defaultEndpoint) {
106        this.defaultEndpoint = Optional.ofNullable(defaultEndpoint);
107    }
108
109    @Override
110    public int getMaximumCacheSize() {
111        return maximumCacheSize;
112    }
113
114    @Override
115    public void setMaximumCacheSize(int maximumCacheSize) {
116        this.maximumCacheSize = maximumCacheSize;
117    }
118
119    @Override
120    public boolean isEventNotifierEnabled() {
121        return eventNotifierEnabled;
122    }
123
124    @Override
125    public void setEventNotifierEnabled(boolean eventNotifierEnabled) {
126        this.eventNotifierEnabled = eventNotifierEnabled;
127    }
128
129    @Override
130    public FluentProducerTemplate clearAll() {
131        clearBody();
132        clearHeaders();
133
134        return this;
135    }
136
137    @Override
138    public FluentProducerTemplate withHeader(String key, Object value) {
139        Map<String, Object> map = headers.get();
140        if (map == null) {
141            map = new HashMap<>();
142            headers.set(map);
143        }
144
145        map.put(key, value);
146
147        return this;
148    }
149
150    @Override
151    public FluentProducerTemplate clearHeaders() {
152        headers.remove();
153
154        return this;
155    }
156
157    @Override
158    public FluentProducerTemplate withBody(Object body) {
159        this.body.set(body);
160
161        return this;
162    }
163
164    @Override
165    public FluentProducerTemplate withBodyAs(Object body, Class<?> type) {
166        Object b = type != null
167            ? context.getTypeConverter().convertTo(type, body)
168            : body;
169
170        this.body.set(b);
171
172        return this;
173    }
174
175    @Override
176    public FluentProducerTemplate clearBody() {
177        body.remove();
178
179        return this;
180    }
181
182    @Override
183    public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) {
184        this.templateCustomizer = Optional.of(templateCustomizer);
185        return this;
186    }
187
188    @Override
189    public FluentProducerTemplate withExchange(final Exchange exchange) {
190        return withExchange(() -> exchange);
191    }
192
193    @Override
194    public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) {
195        this.exchangeSupplier = Optional.of(exchangeSupplier);
196        return this;
197    }
198
199    @Override
200    public FluentProducerTemplate withProcessor(final Processor processor) {
201        return withProcessor(() -> processor);
202    }
203
204    @Override
205    public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) {
206        this.processorSupplier = Optional.of(processorSupplier);
207        return this;
208    }
209
210    @Override
211    public FluentProducerTemplate to(String endpointUri) {
212        return to(context.getEndpoint(endpointUri));
213    }
214
215    @Override
216    public FluentProducerTemplate to(Endpoint endpoint) {
217        this.endpoint = Optional.of(endpoint);
218        return this;
219    }
220
221    // ************************
222    // REQUEST
223    // ************************
224
225    @Override
226    public Object request() throws CamelExecutionException {
227        return request(Object.class);
228    }
229
230    @Override
231    @SuppressWarnings("unchecked")
232    public <T> T request(Class<T> type) throws CamelExecutionException {
233        if (exchangeSupplier.isPresent()) {
234            throw new IllegalArgumentException("withExchange not supported on FluentProducerTemplate.request method. Use send method instead.");
235        }
236
237        // Determine the target endpoint
238        final Endpoint target = target();
239
240        // Create the default processor if not provided.
241        final Supplier<Processor> processorSupplier = this.processorSupplier.orElse(() -> defaultProcessor());
242
243        T result;
244        if (type == Exchange.class) {
245            result = (T)template().request(target, processorSupplier.get());
246        } else if (type == Message.class) {
247            Exchange exchange = template().request(target, processorSupplier.get());
248            result = exchange.hasOut() ? (T)exchange.getOut() : (T)exchange.getIn();
249        } else {
250            Exchange exchange = template().send(
251                target,
252                ExchangePattern.InOut,
253                processorSupplier.get(),
254                resultProcessors.get(type)
255            );
256
257            result = context.getTypeConverter().convertTo(
258                type,
259                ExchangeHelper.extractResultBody(exchange, exchange.getPattern())
260            );
261        }
262
263        return result;
264    }
265
266    @Override
267    public Future<Object> asyncRequest() {
268        return asyncRequest(Object.class);
269    }
270
271    @Override
272    public <T> Future<T> asyncRequest(Class<T> type) {
273        // Determine the target endpoint
274        final Endpoint target = target();
275
276        Future<T> result;
277        if (ObjectHelper.isNotEmpty(headers.get())) {
278            // Make a copy of the headers and body so that async processing won't
279            // be invalidated by subsequent reuse of the template
280            final Map<String, Object> headersCopy = new HashMap<>(headers.get());
281            final Object bodyCopy = body.get();
282
283            result = template().asyncRequestBodyAndHeaders(target, bodyCopy, headersCopy, type);
284        } else {
285            // Make a copy of the and body so that async processing won't be
286            // invalidated by subsequent reuse of the template
287            final Object bodyCopy = body.get();
288
289            result = template().asyncRequestBody(target, bodyCopy, type);
290        }
291
292        return result;
293    }
294
295    // ************************
296    // SEND
297    // ************************
298
299    @Override
300    public Exchange send() throws CamelExecutionException {
301        // Determine the target endpoint
302        final Endpoint target = target();
303
304        return exchangeSupplier.isPresent()
305            ? template().send(target, exchangeSupplier.get().get())
306            : template().send(target, processorSupplier.orElse(() -> defaultProcessor()).get());
307    }
308
309    @Override
310    public Future<Exchange> asyncSend() {
311        // Determine the target endpoint
312        final Endpoint target = target();
313
314        return exchangeSupplier.isPresent()
315            ? template().asyncSend(target, exchangeSupplier.get().get())
316            : template().asyncSend(target, processorSupplier.orElse(() -> defaultAsyncProcessor()).get());
317    }
318
319    // ************************
320    // HELPERS
321    // ************************
322
323    /**
324     * Create the FluentProducerTemplate by setting the camel context
325     *
326     * @param context the camel context
327     */
328    public static FluentProducerTemplate on(CamelContext context) {
329        return new DefaultFluentProducerTemplate(context);
330    }
331
332    private ProducerTemplate template() {
333        ObjectHelper.notNull(context, "CamelContext");
334
335        if (template == null) {
336            template = maximumCacheSize > 0 ? context.createProducerTemplate(maximumCacheSize) : context.createProducerTemplate();
337            defaultEndpoint.ifPresent(template::setDefaultEndpoint);
338            template.setEventNotifierEnabled(eventNotifierEnabled);
339            templateCustomizer.ifPresent(tc -> tc.accept(template));
340        }
341
342        return template;
343    }
344
345    private Processor defaultProcessor() {
346        return exchange -> {
347            ObjectHelper.ifNotEmpty(headers.get(), exchange.getIn().getHeaders()::putAll);
348            ObjectHelper.ifNotEmpty(body.get(), exchange.getIn()::setBody);
349        };
350    }
351
352    private Processor defaultAsyncProcessor() {
353        final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers.get()) ? new HashMap<>(this.headers.get()) : null;
354        final Object bodyCopy = this.body.get();
355
356        return exchange -> {
357            ObjectHelper.ifNotEmpty(headersCopy, exchange.getIn().getHeaders()::putAll);
358            ObjectHelper.ifNotEmpty(bodyCopy, exchange.getIn()::setBody);
359        };
360    }
361
362    private Endpoint target() {
363        if (endpoint.isPresent()) {
364            return endpoint.get();
365        }
366        if (defaultEndpoint.isPresent()) {
367            return defaultEndpoint.get();
368        }
369
370        throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)");
371    }
372
373    @Override
374    protected void doStart() throws Exception {
375        if (template == null) {
376            template = template();
377        }
378        ServiceHelper.startService(template);
379    }
380
381    @Override
382    protected void doStop() throws Exception {
383        clearAll();
384        ServiceHelper.stopService(template);
385    }
386}