001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.builder; 018 019import java.util.HashMap; 020import java.util.Map; 021import java.util.Optional; 022import java.util.concurrent.Future; 023import java.util.function.Consumer; 024import java.util.function.Supplier; 025 026import org.apache.camel.CamelContext; 027import org.apache.camel.CamelExecutionException; 028import org.apache.camel.Endpoint; 029import org.apache.camel.Exchange; 030import org.apache.camel.ExchangePattern; 031import org.apache.camel.FluentProducerTemplate; 032import org.apache.camel.Message; 033import org.apache.camel.Processor; 034import org.apache.camel.ProducerTemplate; 035import org.apache.camel.processor.ConvertBodyProcessor; 036import org.apache.camel.support.ServiceSupport; 037import org.apache.camel.util.ExchangeHelper; 038import org.apache.camel.util.ObjectHelper; 039import org.apache.camel.util.ServiceHelper; 040 041public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate { 042 043 // transient state of headers and body which needs to be thread local scoped to be thread-safe 044 private final ThreadLocal<Map<String, Object>> headers = new ThreadLocal<>(); 045 private final ThreadLocal<Object> body = new ThreadLocal<>(); 046 047 private final CamelContext context; 048 private final ClassValue<ConvertBodyProcessor> resultProcessors; 049 private Optional<Consumer<ProducerTemplate>> templateCustomizer; 050 private Optional<Supplier<Exchange>> exchangeSupplier; 051 private Optional<Supplier<Processor>> processorSupplier; 052 private Optional<Endpoint> endpoint; 053 private Optional<Endpoint> defaultEndpoint; 054 private int maximumCacheSize; 055 private boolean eventNotifierEnabled; 056 private volatile ProducerTemplate template; 057 058 public DefaultFluentProducerTemplate(CamelContext context) { 059 this.context = context; 060 this.endpoint = Optional.empty(); 061 this.defaultEndpoint = Optional.empty(); 062 this.eventNotifierEnabled = true; 063 this.templateCustomizer = Optional.empty(); 064 this.exchangeSupplier = Optional.empty(); 065 this.processorSupplier = Optional.empty(); 066 this.resultProcessors = new ClassValue<ConvertBodyProcessor>() { 067 @Override 068 protected ConvertBodyProcessor computeValue(Class<?> type) { 069 return new ConvertBodyProcessor(type); 070 } 071 }; 072 } 073 074 @Override 075 public CamelContext getCamelContext() { 076 return context; 077 } 078 079 @Override 080 public int getCurrentCacheSize() { 081 if (template == null) { 082 return 0; 083 } 084 return template.getCurrentCacheSize(); 085 } 086 087 @Override 088 public void cleanUp() { 089 if (template != null) { 090 template.cleanUp(); 091 } 092 } 093 094 @Override 095 public void setDefaultEndpointUri(String endpointUri) { 096 setDefaultEndpoint(getCamelContext().getEndpoint(endpointUri)); 097 } 098 099 @Override 100 public Endpoint getDefaultEndpoint() { 101 return defaultEndpoint.orElse(null); 102 } 103 104 @Override 105 public void setDefaultEndpoint(Endpoint defaultEndpoint) { 106 this.defaultEndpoint = Optional.ofNullable(defaultEndpoint); 107 } 108 109 @Override 110 public int getMaximumCacheSize() { 111 return maximumCacheSize; 112 } 113 114 @Override 115 public void setMaximumCacheSize(int maximumCacheSize) { 116 this.maximumCacheSize = maximumCacheSize; 117 } 118 119 @Override 120 public boolean isEventNotifierEnabled() { 121 return eventNotifierEnabled; 122 } 123 124 @Override 125 public void setEventNotifierEnabled(boolean eventNotifierEnabled) { 126 this.eventNotifierEnabled = eventNotifierEnabled; 127 } 128 129 @Override 130 public FluentProducerTemplate clearAll() { 131 clearBody(); 132 clearHeaders(); 133 134 return this; 135 } 136 137 @Override 138 public FluentProducerTemplate withHeader(String key, Object value) { 139 Map<String, Object> map = headers.get(); 140 if (map == null) { 141 map = new HashMap<>(); 142 headers.set(map); 143 } 144 145 map.put(key, value); 146 147 return this; 148 } 149 150 @Override 151 public FluentProducerTemplate clearHeaders() { 152 headers.remove(); 153 154 return this; 155 } 156 157 @Override 158 public FluentProducerTemplate withBody(Object body) { 159 this.body.set(body); 160 161 return this; 162 } 163 164 @Override 165 public FluentProducerTemplate withBodyAs(Object body, Class<?> type) { 166 Object b = type != null 167 ? context.getTypeConverter().convertTo(type, body) 168 : body; 169 170 this.body.set(b); 171 172 return this; 173 } 174 175 @Override 176 public FluentProducerTemplate clearBody() { 177 body.remove(); 178 179 return this; 180 } 181 182 @Override 183 public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) { 184 this.templateCustomizer = Optional.of(templateCustomizer); 185 return this; 186 } 187 188 @Override 189 public FluentProducerTemplate withExchange(final Exchange exchange) { 190 return withExchange(() -> exchange); 191 } 192 193 @Override 194 public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) { 195 this.exchangeSupplier = Optional.of(exchangeSupplier); 196 return this; 197 } 198 199 @Override 200 public FluentProducerTemplate withProcessor(final Processor processor) { 201 return withProcessor(() -> processor); 202 } 203 204 @Override 205 public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) { 206 this.processorSupplier = Optional.of(processorSupplier); 207 return this; 208 } 209 210 @Override 211 public FluentProducerTemplate to(String endpointUri) { 212 return to(context.getEndpoint(endpointUri)); 213 } 214 215 @Override 216 public FluentProducerTemplate to(Endpoint endpoint) { 217 this.endpoint = Optional.of(endpoint); 218 return this; 219 } 220 221 // ************************ 222 // REQUEST 223 // ************************ 224 225 @Override 226 public Object request() throws CamelExecutionException { 227 return request(Object.class); 228 } 229 230 @Override 231 @SuppressWarnings("unchecked") 232 public <T> T request(Class<T> type) throws CamelExecutionException { 233 // Determine the target endpoint 234 final Endpoint target = target(); 235 236 // Create the default processor if not provided. 237 final Supplier<Processor> processorSupplier = this.processorSupplier.orElse(() -> defaultProcessor()); 238 239 T result; 240 if (type == Exchange.class) { 241 result = (T)template().request(target, processorSupplier.get()); 242 } else if (type == Message.class) { 243 Exchange exchange = template().request(target, processorSupplier.get()); 244 result = exchange.hasOut() ? (T)exchange.getOut() : (T)exchange.getIn(); 245 } else { 246 Exchange exchange = template().send( 247 target, 248 ExchangePattern.InOut, 249 processorSupplier.get(), 250 resultProcessors.get(type) 251 ); 252 253 result = context.getTypeConverter().convertTo( 254 type, 255 ExchangeHelper.extractResultBody(exchange, exchange.getPattern()) 256 ); 257 } 258 259 return result; 260 } 261 262 @Override 263 public Future<Object> asyncRequest() { 264 return asyncRequest(Object.class); 265 } 266 267 @Override 268 public <T> Future<T> asyncRequest(Class<T> type) { 269 // Determine the target endpoint 270 final Endpoint target = target(); 271 272 Future<T> result; 273 if (ObjectHelper.isNotEmpty(headers.get())) { 274 // Make a copy of the headers and body so that async processing won't 275 // be invalidated by subsequent reuse of the template 276 final Map<String, Object> headersCopy = new HashMap<>(headers.get()); 277 final Object bodyCopy = body.get(); 278 279 result = template().asyncRequestBodyAndHeaders(target, bodyCopy, headersCopy, type); 280 } else { 281 // Make a copy of the and body so that async processing won't be 282 // invalidated by subsequent reuse of the template 283 final Object bodyCopy = body.get(); 284 285 result = template().asyncRequestBody(target, bodyCopy, type); 286 } 287 288 return result; 289 } 290 291 // ************************ 292 // SEND 293 // ************************ 294 295 @Override 296 public Exchange send() throws CamelExecutionException { 297 // Determine the target endpoint 298 final Endpoint target = target(); 299 300 return exchangeSupplier.isPresent() 301 ? template().send(target, exchangeSupplier.get().get()) 302 : template().send(target, processorSupplier.orElse(() -> defaultProcessor()).get()); 303 } 304 305 @Override 306 public Future<Exchange> asyncSend() { 307 // Determine the target endpoint 308 final Endpoint target = target(); 309 310 return exchangeSupplier.isPresent() 311 ? template().asyncSend(target, exchangeSupplier.get().get()) 312 : template().asyncSend(target, processorSupplier.orElse(() -> defaultAsyncProcessor()).get()); 313 } 314 315 // ************************ 316 // HELPERS 317 // ************************ 318 319 /** 320 * Create the FluentProducerTemplate by setting the camel context 321 * 322 * @param context the camel context 323 */ 324 public static FluentProducerTemplate on(CamelContext context) { 325 return new DefaultFluentProducerTemplate(context); 326 } 327 328 private ProducerTemplate template() { 329 ObjectHelper.notNull(context, "CamelContext"); 330 331 if (template == null) { 332 template = maximumCacheSize > 0 ? context.createProducerTemplate(maximumCacheSize) : context.createProducerTemplate(); 333 defaultEndpoint.ifPresent(template::setDefaultEndpoint); 334 template.setEventNotifierEnabled(eventNotifierEnabled); 335 templateCustomizer.ifPresent(tc -> tc.accept(template)); 336 } 337 338 return template; 339 } 340 341 private Processor defaultProcessor() { 342 return exchange -> { 343 ObjectHelper.ifNotEmpty(headers.get(), exchange.getIn().getHeaders()::putAll); 344 ObjectHelper.ifNotEmpty(body.get(), exchange.getIn()::setBody); 345 }; 346 } 347 348 private Processor defaultAsyncProcessor() { 349 final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers.get()) ? new HashMap<>(this.headers.get()) : null; 350 final Object bodyCopy = this.body.get(); 351 352 return exchange -> { 353 ObjectHelper.ifNotEmpty(headersCopy, exchange.getIn().getHeaders()::putAll); 354 ObjectHelper.ifNotEmpty(bodyCopy, exchange.getIn()::setBody); 355 }; 356 } 357 358 private Endpoint target() { 359 if (endpoint.isPresent()) { 360 return endpoint.get(); 361 } 362 if (defaultEndpoint.isPresent()) { 363 return defaultEndpoint.get(); 364 } 365 366 throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)"); 367 } 368 369 @Override 370 protected void doStart() throws Exception { 371 if (template == null) { 372 template = template(); 373 } 374 ServiceHelper.startService(template); 375 } 376 377 @Override 378 protected void doStop() throws Exception { 379 clearAll(); 380 ServiceHelper.stopService(template); 381 } 382}