001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.builder; 018 019import java.util.HashMap; 020import java.util.Map; 021import java.util.Optional; 022import java.util.concurrent.Future; 023import java.util.function.Consumer; 024import java.util.function.Supplier; 025 026import org.apache.camel.CamelContext; 027import org.apache.camel.CamelExecutionException; 028import org.apache.camel.Endpoint; 029import org.apache.camel.Exchange; 030import org.apache.camel.ExchangePattern; 031import org.apache.camel.FluentProducerTemplate; 032import org.apache.camel.Message; 033import org.apache.camel.Processor; 034import org.apache.camel.ProducerTemplate; 035import org.apache.camel.processor.ConvertBodyProcessor; 036import org.apache.camel.support.ServiceSupport; 037import org.apache.camel.util.ExchangeHelper; 038import org.apache.camel.util.ObjectHelper; 039import org.apache.camel.util.ServiceHelper; 040 041public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate { 042 043 // transient state of headers and body which needs to be thread local scoped to be thread-safe 044 private final ThreadLocal<Map<String, Object>> headers = new ThreadLocal<>(); 045 private final ThreadLocal<Object> body = new ThreadLocal<>(); 046 047 private final CamelContext context; 048 private final ClassValue<ConvertBodyProcessor> resultProcessors; 049 private Optional<Consumer<ProducerTemplate>> templateCustomizer; 050 private Optional<Supplier<Exchange>> exchangeSupplier; 051 private Optional<Supplier<Processor>> processorSupplier; 052 private Optional<Endpoint> endpoint; 053 private Optional<Endpoint> defaultEndpoint; 054 private int maximumCacheSize; 055 private boolean eventNotifierEnabled; 056 private volatile ProducerTemplate template; 057 058 public DefaultFluentProducerTemplate(CamelContext context) { 059 this.context = context; 060 this.endpoint = Optional.empty(); 061 this.defaultEndpoint = Optional.empty(); 062 this.eventNotifierEnabled = true; 063 this.templateCustomizer = Optional.empty(); 064 this.exchangeSupplier = Optional.empty(); 065 this.processorSupplier = Optional.empty(); 066 this.resultProcessors = new ClassValue<ConvertBodyProcessor>() { 067 @Override 068 protected ConvertBodyProcessor computeValue(Class<?> type) { 069 return new ConvertBodyProcessor(type); 070 } 071 }; 072 } 073 074 @Override 075 public CamelContext getCamelContext() { 076 return context; 077 } 078 079 @Override 080 public int getCurrentCacheSize() { 081 if (template == null) { 082 return 0; 083 } 084 return template.getCurrentCacheSize(); 085 } 086 087 @Override 088 public void cleanUp() { 089 if (template != null) { 090 template.cleanUp(); 091 } 092 } 093 094 @Override 095 public void setDefaultEndpointUri(String endpointUri) { 096 setDefaultEndpoint(getCamelContext().getEndpoint(endpointUri)); 097 } 098 099 @Override 100 public Endpoint getDefaultEndpoint() { 101 return defaultEndpoint.orElse(null); 102 } 103 104 @Override 105 public void setDefaultEndpoint(Endpoint defaultEndpoint) { 106 this.defaultEndpoint = Optional.ofNullable(defaultEndpoint); 107 } 108 109 @Override 110 public int getMaximumCacheSize() { 111 return maximumCacheSize; 112 } 113 114 @Override 115 public void setMaximumCacheSize(int maximumCacheSize) { 116 this.maximumCacheSize = maximumCacheSize; 117 } 118 119 @Override 120 public boolean isEventNotifierEnabled() { 121 return eventNotifierEnabled; 122 } 123 124 @Override 125 public void setEventNotifierEnabled(boolean eventNotifierEnabled) { 126 this.eventNotifierEnabled = eventNotifierEnabled; 127 } 128 129 @Override 130 public FluentProducerTemplate clearAll() { 131 clearBody(); 132 clearHeaders(); 133 134 return this; 135 } 136 137 @Override 138 public FluentProducerTemplate withHeader(String key, Object value) { 139 Map<String, Object> map = headers.get(); 140 if (map == null) { 141 map = new HashMap<>(); 142 headers.set(map); 143 } 144 145 map.put(key, value); 146 147 return this; 148 } 149 150 @Override 151 public FluentProducerTemplate clearHeaders() { 152 headers.remove(); 153 154 return this; 155 } 156 157 @Override 158 public FluentProducerTemplate withBody(Object body) { 159 this.body.set(body); 160 161 return this; 162 } 163 164 @Override 165 public FluentProducerTemplate withBodyAs(Object body, Class<?> type) { 166 Object b = type != null 167 ? context.getTypeConverter().convertTo(type, body) 168 : body; 169 170 this.body.set(b); 171 172 return this; 173 } 174 175 @Override 176 public FluentProducerTemplate clearBody() { 177 body.remove(); 178 179 return this; 180 } 181 182 @Override 183 public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) { 184 this.templateCustomizer = Optional.of(templateCustomizer); 185 return this; 186 } 187 188 @Override 189 public FluentProducerTemplate withExchange(final Exchange exchange) { 190 return withExchange(() -> exchange); 191 } 192 193 @Override 194 public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) { 195 this.exchangeSupplier = Optional.of(exchangeSupplier); 196 return this; 197 } 198 199 @Override 200 public FluentProducerTemplate withProcessor(final Processor processor) { 201 return withProcessor(() -> processor); 202 } 203 204 @Override 205 public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) { 206 this.processorSupplier = Optional.of(processorSupplier); 207 return this; 208 } 209 210 @Override 211 public FluentProducerTemplate to(String endpointUri) { 212 return to(context.getEndpoint(endpointUri)); 213 } 214 215 @Override 216 public FluentProducerTemplate to(Endpoint endpoint) { 217 this.endpoint = Optional.of(endpoint); 218 return this; 219 } 220 221 // ************************ 222 // REQUEST 223 // ************************ 224 225 @Override 226 public Object request() throws CamelExecutionException { 227 return request(Object.class); 228 } 229 230 @Override 231 @SuppressWarnings("unchecked") 232 public <T> T request(Class<T> type) throws CamelExecutionException { 233 if (exchangeSupplier.isPresent()) { 234 throw new IllegalArgumentException("withExchange not supported on FluentProducerTemplate.request method. Use send method instead."); 235 } 236 237 // Determine the target endpoint 238 final Endpoint target = target(); 239 240 // Create the default processor if not provided. 241 final Supplier<Processor> processorSupplier = this.processorSupplier.orElse(() -> defaultProcessor()); 242 243 T result; 244 if (type == Exchange.class) { 245 result = (T)template().request(target, processorSupplier.get()); 246 } else if (type == Message.class) { 247 Exchange exchange = template().request(target, processorSupplier.get()); 248 result = exchange.hasOut() ? (T)exchange.getOut() : (T)exchange.getIn(); 249 } else { 250 Exchange exchange = template().send( 251 target, 252 ExchangePattern.InOut, 253 processorSupplier.get(), 254 resultProcessors.get(type) 255 ); 256 257 result = context.getTypeConverter().convertTo( 258 type, 259 ExchangeHelper.extractResultBody(exchange, exchange.getPattern()) 260 ); 261 } 262 263 return result; 264 } 265 266 @Override 267 public Future<Object> asyncRequest() { 268 return asyncRequest(Object.class); 269 } 270 271 @Override 272 public <T> Future<T> asyncRequest(Class<T> type) { 273 // Determine the target endpoint 274 final Endpoint target = target(); 275 276 Future<T> result; 277 if (ObjectHelper.isNotEmpty(headers.get())) { 278 // Make a copy of the headers and body so that async processing won't 279 // be invalidated by subsequent reuse of the template 280 final Map<String, Object> headersCopy = new HashMap<>(headers.get()); 281 final Object bodyCopy = body.get(); 282 283 result = template().asyncRequestBodyAndHeaders(target, bodyCopy, headersCopy, type); 284 } else { 285 // Make a copy of the and body so that async processing won't be 286 // invalidated by subsequent reuse of the template 287 final Object bodyCopy = body.get(); 288 289 result = template().asyncRequestBody(target, bodyCopy, type); 290 } 291 292 return result; 293 } 294 295 // ************************ 296 // SEND 297 // ************************ 298 299 @Override 300 public Exchange send() throws CamelExecutionException { 301 // Determine the target endpoint 302 final Endpoint target = target(); 303 304 return exchangeSupplier.isPresent() 305 ? template().send(target, exchangeSupplier.get().get()) 306 : template().send(target, processorSupplier.orElse(() -> defaultProcessor()).get()); 307 } 308 309 @Override 310 public Future<Exchange> asyncSend() { 311 // Determine the target endpoint 312 final Endpoint target = target(); 313 314 return exchangeSupplier.isPresent() 315 ? template().asyncSend(target, exchangeSupplier.get().get()) 316 : template().asyncSend(target, processorSupplier.orElse(() -> defaultAsyncProcessor()).get()); 317 } 318 319 // ************************ 320 // HELPERS 321 // ************************ 322 323 /** 324 * Create the FluentProducerTemplate by setting the camel context 325 * 326 * @param context the camel context 327 */ 328 public static FluentProducerTemplate on(CamelContext context) { 329 return new DefaultFluentProducerTemplate(context); 330 } 331 332 private ProducerTemplate template() { 333 ObjectHelper.notNull(context, "CamelContext"); 334 335 if (template == null) { 336 template = maximumCacheSize > 0 ? context.createProducerTemplate(maximumCacheSize) : context.createProducerTemplate(); 337 defaultEndpoint.ifPresent(template::setDefaultEndpoint); 338 template.setEventNotifierEnabled(eventNotifierEnabled); 339 templateCustomizer.ifPresent(tc -> tc.accept(template)); 340 } 341 342 return template; 343 } 344 345 private Processor defaultProcessor() { 346 return exchange -> { 347 ObjectHelper.ifNotEmpty(headers.get(), exchange.getIn().getHeaders()::putAll); 348 ObjectHelper.ifNotEmpty(body.get(), exchange.getIn()::setBody); 349 }; 350 } 351 352 private Processor defaultAsyncProcessor() { 353 final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers.get()) ? new HashMap<>(this.headers.get()) : null; 354 final Object bodyCopy = this.body.get(); 355 356 return exchange -> { 357 ObjectHelper.ifNotEmpty(headersCopy, exchange.getIn().getHeaders()::putAll); 358 ObjectHelper.ifNotEmpty(bodyCopy, exchange.getIn()::setBody); 359 }; 360 } 361 362 private Endpoint target() { 363 if (endpoint.isPresent()) { 364 return endpoint.get(); 365 } 366 if (defaultEndpoint.isPresent()) { 367 return defaultEndpoint.get(); 368 } 369 370 throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)"); 371 } 372 373 @Override 374 protected void doStart() throws Exception { 375 if (template == null) { 376 template = template(); 377 } 378 ServiceHelper.startService(template); 379 } 380 381 @Override 382 protected void doStop() throws Exception { 383 clearAll(); 384 ServiceHelper.stopService(template); 385 } 386}