001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.builder; 018 019import java.util.HashMap; 020import java.util.Map; 021import java.util.concurrent.Future; 022import java.util.function.Consumer; 023import java.util.function.Supplier; 024 025import org.apache.camel.CamelContext; 026import org.apache.camel.CamelExecutionException; 027import org.apache.camel.Endpoint; 028import org.apache.camel.Exchange; 029import org.apache.camel.ExchangePattern; 030import org.apache.camel.FluentProducerTemplate; 031import org.apache.camel.Message; 032import org.apache.camel.Processor; 033import org.apache.camel.ProducerTemplate; 034import org.apache.camel.processor.ConvertBodyProcessor; 035import org.apache.camel.support.ServiceSupport; 036import org.apache.camel.util.ExchangeHelper; 037import org.apache.camel.util.ObjectHelper; 038import org.apache.camel.util.ServiceHelper; 039 040public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate { 041 private final CamelContext context; 042 private final ClassValue<ConvertBodyProcessor> resultProcessors; 043 private Map<String, Object> headers; 044 private Object body; 045 private Endpoint endpoint; 046 private Consumer<ProducerTemplate> templateCustomizer; 047 private Supplier<Exchange> exchangeSupplier; 048 private Supplier<Processor> processorSupplier; 049 private volatile ProducerTemplate template; 050 private Endpoint defaultEndpoint; 051 private int maximumCacheSize; 052 private boolean eventNotifierEnabled = true; 053 054 public DefaultFluentProducerTemplate(CamelContext context) { 055 this.context = context; 056 this.headers = null; 057 this.body = null; 058 this.endpoint = null; 059 this.templateCustomizer = null; 060 this.exchangeSupplier = null; 061 this.processorSupplier = () -> this::populateExchange; 062 this.template = null; 063 this.resultProcessors = new ClassValue<ConvertBodyProcessor>() { 064 @Override 065 protected ConvertBodyProcessor computeValue(Class<?> type) { 066 return new ConvertBodyProcessor(type); 067 } 068 }; 069 } 070 071 @Override 072 public CamelContext getCamelContext() { 073 return context; 074 } 075 076 @Override 077 public int getCurrentCacheSize() { 078 if (template == null) { 079 return 0; 080 } 081 return template.getCurrentCacheSize(); 082 } 083 084 @Override 085 public void cleanUp() { 086 if (template != null) { 087 template.cleanUp(); 088 } 089 } 090 091 @Override 092 public void setDefaultEndpointUri(String endpointUri) { 093 setDefaultEndpoint(getCamelContext().getEndpoint(endpointUri)); 094 } 095 096 @Override 097 public Endpoint getDefaultEndpoint() { 098 return defaultEndpoint; 099 } 100 101 @Override 102 public void setDefaultEndpoint(Endpoint defaultEndpoint) { 103 this.defaultEndpoint = defaultEndpoint; 104 } 105 106 @Override 107 public int getMaximumCacheSize() { 108 return maximumCacheSize; 109 } 110 111 @Override 112 public void setMaximumCacheSize(int maximumCacheSize) { 113 this.maximumCacheSize = maximumCacheSize; 114 } 115 116 @Override 117 public boolean isEventNotifierEnabled() { 118 return eventNotifierEnabled; 119 } 120 121 @Override 122 public void setEventNotifierEnabled(boolean eventNotifierEnabled) { 123 this.eventNotifierEnabled = eventNotifierEnabled; 124 } 125 126 @Override 127 public FluentProducerTemplate withHeader(String key, Object value) { 128 if (headers == null) { 129 headers = new HashMap<>(); 130 } 131 132 headers.put(key, value); 133 134 return this; 135 } 136 137 @Override 138 public FluentProducerTemplate clearHeaders() { 139 if (headers != null) { 140 headers.clear(); 141 } 142 143 return this; 144 } 145 146 @Override 147 public FluentProducerTemplate withBody(Object body) { 148 this.body = body; 149 150 return this; 151 } 152 153 @Override 154 public FluentProducerTemplate withBodyAs(Object body, Class<?> type) { 155 this.body = type != null 156 ? context.getTypeConverter().convertTo(type, body) 157 : body; 158 159 return this; 160 } 161 162 @Override 163 public FluentProducerTemplate clearBody() { 164 this.body = null; 165 166 return this; 167 } 168 169 @Override 170 public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) { 171 this.templateCustomizer = templateCustomizer; 172 return this; 173 } 174 175 @Override 176 public FluentProducerTemplate withExchange(final Exchange exchange) { 177 return withExchange(() -> exchange); 178 } 179 180 @Override 181 public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) { 182 this.exchangeSupplier = exchangeSupplier; 183 return this; 184 } 185 186 @Override 187 public FluentProducerTemplate withProcessor(final Processor processor) { 188 return withProcessor(() -> processor); 189 } 190 191 @Override 192 public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) { 193 this.processorSupplier = processorSupplier; 194 return this; 195 } 196 197 @Override 198 public FluentProducerTemplate to(String endpointUri) { 199 return to(context.getEndpoint(endpointUri)); 200 } 201 202 @Override 203 public FluentProducerTemplate to(Endpoint endpoint) { 204 this.endpoint = endpoint; 205 return this; 206 } 207 208 // ************************ 209 // REQUEST 210 // ************************ 211 212 @Override 213 public Object request() throws CamelExecutionException { 214 return request(Object.class); 215 } 216 217 @Override 218 @SuppressWarnings("unchecked") 219 public <T> T request(Class<T> type) throws CamelExecutionException { 220 T result; 221 Endpoint target = endpoint != null ? endpoint : defaultEndpoint; 222 // we must have an endpoint to send to 223 if (target == null) { 224 throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)"); 225 } 226 227 if (type == Exchange.class) { 228 result = (T)template().request(target, processorSupplier.get()); 229 } else if (type == Message.class) { 230 Exchange exchange = template().request(target, processorSupplier.get()); 231 result = exchange.hasOut() ? (T)exchange.getOut() : (T)exchange.getIn(); 232 } else { 233 Exchange exchange = template().send( 234 target, 235 ExchangePattern.InOut, 236 processorSupplier.get(), 237 resultProcessors.get(type) 238 ); 239 240 result = context.getTypeConverter().convertTo( 241 type, 242 ExchangeHelper.extractResultBody(exchange, exchange.getPattern()) 243 ); 244 } 245 246 return result; 247 } 248 249 @Override 250 public Future<Object> asyncRequest() { 251 return asyncRequest(Object.class); 252 } 253 254 @Override 255 public <T> Future<T> asyncRequest(Class<T> type) { 256 Endpoint target = endpoint != null ? endpoint : defaultEndpoint; 257 258 // we must have an endpoint to send to 259 if (target == null) { 260 throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)"); 261 } 262 263 Future<T> result; 264 if (headers != null) { 265 result = template().asyncRequestBodyAndHeaders(target, body, headers, type); 266 } else { 267 result = template().asyncRequestBody(target, body, type); 268 } 269 270 return result; 271 } 272 273 // ************************ 274 // SEND 275 // ************************ 276 277 @Override 278 public Exchange send() throws CamelExecutionException { 279 Endpoint target = endpoint != null ? endpoint : defaultEndpoint; 280 281 // we must have an endpoint to send to 282 if (target == null) { 283 throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)"); 284 } 285 286 return exchangeSupplier != null 287 ? template().send(target, exchangeSupplier.get()) 288 : template().send(target, processorSupplier.get()); 289 } 290 291 @Override 292 public Future<Exchange> asyncSend() { 293 Endpoint target = endpoint != null ? endpoint : defaultEndpoint; 294 295 // we must have an endpoint to send to 296 if (target == null) { 297 throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)"); 298 } 299 300 return exchangeSupplier != null 301 ? template().asyncSend(target, exchangeSupplier.get()) 302 : template().asyncSend(target, processorSupplier.get()); 303 } 304 305 // ************************ 306 // HELPERS 307 // ************************ 308 309 /** 310 * Create the FluentProducerTemplate by setting the camel context 311 * 312 * @param context the camel context 313 */ 314 public static FluentProducerTemplate on(CamelContext context) { 315 return new DefaultFluentProducerTemplate(context); 316 } 317 318 private ProducerTemplate template() { 319 ObjectHelper.notNull(context, "CamelContext"); 320 321 if (template == null) { 322 template = maximumCacheSize > 0 ? context.createProducerTemplate(maximumCacheSize) : context.createProducerTemplate(); 323 if (defaultEndpoint != null) { 324 template.setDefaultEndpoint(defaultEndpoint); 325 } 326 template.setEventNotifierEnabled(eventNotifierEnabled); 327 if (templateCustomizer != null) { 328 templateCustomizer.accept(template); 329 } 330 } 331 332 return template; 333 } 334 335 private void populateExchange(Exchange exchange) throws Exception { 336 if (headers != null && !headers.isEmpty()) { 337 exchange.getIn().getHeaders().putAll(headers); 338 } 339 if (body != null) { 340 exchange.getIn().setBody(body); 341 } 342 } 343 344 @Override 345 protected void doStart() throws Exception { 346 if (template == null) { 347 template = template(); 348 } 349 ServiceHelper.startService(template); 350 } 351 352 @Override 353 protected void doStop() throws Exception { 354 ServiceHelper.stopService(template); 355 } 356}