001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.seda; 018 019import java.util.ArrayList; 020import java.util.HashSet; 021import java.util.List; 022import java.util.Set; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.CopyOnWriteArraySet; 025import java.util.concurrent.ExecutorService; 026 027import org.apache.camel.Component; 028import org.apache.camel.Consumer; 029import org.apache.camel.Exchange; 030import org.apache.camel.MultipleConsumersSupport; 031import org.apache.camel.PollingConsumer; 032import org.apache.camel.Processor; 033import org.apache.camel.Producer; 034import org.apache.camel.WaitForTaskToComplete; 035import org.apache.camel.api.management.ManagedAttribute; 036import org.apache.camel.api.management.ManagedOperation; 037import org.apache.camel.api.management.ManagedResource; 038import org.apache.camel.impl.DefaultEndpoint; 039import org.apache.camel.processor.MulticastProcessor; 040import org.apache.camel.spi.BrowsableEndpoint; 041import org.apache.camel.spi.Metadata; 042import org.apache.camel.spi.UriEndpoint; 043import org.apache.camel.spi.UriParam; 044import org.apache.camel.spi.UriPath; 045import org.apache.camel.util.ServiceHelper; 046import org.apache.camel.util.URISupport; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * The seda component provides asynchronous call to another endpoint from any CamelContext in the same JVM. 052 */ 053@ManagedResource(description = "Managed SedaEndpoint") 054@UriEndpoint(scheme = "seda", title = "SEDA", syntax = "seda:name", consumerClass = SedaConsumer.class, label = "core,endpoint") 055public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport { 056 private static final Logger LOG = LoggerFactory.getLogger(SedaEndpoint.class); 057 private final Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>(); 058 private final Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>(); 059 private volatile MulticastProcessor consumerMulticastProcessor; 060 private volatile boolean multicastStarted; 061 private volatile ExecutorService multicastExecutor; 062 063 @UriPath(description = "Name of queue") @Metadata(required = "true") 064 private String name; 065 @UriParam(label = "advanced", description = "Define the queue instance which will be used by the endpoint") 066 private BlockingQueue queue; 067 @UriParam(defaultValue = "" + Integer.MAX_VALUE) 068 private int size = Integer.MAX_VALUE; 069 070 @UriParam(label = "consumer", defaultValue = "1") 071 private int concurrentConsumers = 1; 072 @UriParam(label = "consumer,advanced", defaultValue = "true") 073 private boolean limitConcurrentConsumers = true; 074 @UriParam(label = "consumer,advanced") 075 private boolean multipleConsumers; 076 @UriParam(label = "consumer,advanced") 077 private boolean purgeWhenStopping; 078 @UriParam(label = "consumer,advanced", defaultValue = "1000") 079 private int pollTimeout = 1000; 080 081 @UriParam(label = "producer", defaultValue = "IfReplyExpected") 082 private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected; 083 @UriParam(label = "producer", defaultValue = "30000") 084 private long timeout = 30000; 085 @UriParam(label = "producer") 086 private boolean blockWhenFull; 087 @UriParam(label = "producer") 088 private boolean failIfNoConsumers; 089 @UriParam(label = "producer") 090 private boolean discardIfNoConsumers; 091 092 private BlockingQueueFactory<Exchange> queueFactory; 093 094 public SedaEndpoint() { 095 queueFactory = new LinkedBlockingQueueFactory<Exchange>(); 096 } 097 098 public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) { 099 this(endpointUri, component, queue, 1); 100 } 101 102 public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) { 103 this(endpointUri, component, concurrentConsumers); 104 this.queue = queue; 105 if (queue != null) { 106 this.size = queue.remainingCapacity(); 107 } 108 queueFactory = new LinkedBlockingQueueFactory<Exchange>(); 109 getComponent().registerQueue(this, queue); 110 } 111 112 public SedaEndpoint(String endpointUri, Component component, BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) { 113 this(endpointUri, component, concurrentConsumers); 114 this.queueFactory = queueFactory; 115 } 116 117 private SedaEndpoint(String endpointUri, Component component, int concurrentConsumers) { 118 super(endpointUri, component); 119 this.concurrentConsumers = concurrentConsumers; 120 } 121 122 @Override 123 public SedaComponent getComponent() { 124 return (SedaComponent) super.getComponent(); 125 } 126 127 public Producer createProducer() throws Exception { 128 return new SedaProducer(this, getWaitForTaskToComplete(), getTimeout(), isBlockWhenFull()); 129 } 130 131 public Consumer createConsumer(Processor processor) throws Exception { 132 if (getComponent() != null) { 133 // all consumers must match having the same multipleConsumers options 134 String key = getComponent().getQueueKey(getEndpointUri()); 135 QueueReference ref = getComponent().getQueueReference(key); 136 if (ref != null && ref.getMultipleConsumers() != isMultipleConsumers()) { 137 // there is already a multiple consumers, so make sure they matches 138 throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue multiple consumers " 139 + ref.getMultipleConsumers() + " does not match given multiple consumers " + multipleConsumers); 140 } 141 } 142 143 Consumer answer = createNewConsumer(processor); 144 configureConsumer(answer); 145 return answer; 146 } 147 148 protected SedaConsumer createNewConsumer(Processor processor) { 149 return new SedaConsumer(this, processor); 150 } 151 152 @Override 153 public PollingConsumer createPollingConsumer() throws Exception { 154 SedaPollingConsumer answer = new SedaPollingConsumer(this); 155 configureConsumer(answer); 156 return answer; 157 } 158 159 public synchronized BlockingQueue<Exchange> getQueue() { 160 if (queue == null) { 161 // prefer to lookup queue from component, so if this endpoint is re-created or re-started 162 // then the existing queue from the component can be used, so new producers and consumers 163 // can use the already existing queue referenced from the component 164 if (getComponent() != null) { 165 // use null to indicate default size (= use what the existing queue has been configured with) 166 Integer size = getSize() == Integer.MAX_VALUE ? null : getSize(); 167 QueueReference ref = getComponent().getOrCreateQueue(this, size, isMultipleConsumers(), queueFactory); 168 queue = ref.getQueue(); 169 String key = getComponent().getQueueKey(getEndpointUri()); 170 LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE}); 171 // and set the size we are using 172 if (ref.getSize() != null) { 173 setSize(ref.getSize()); 174 } 175 } else { 176 // fallback and create queue (as this endpoint has no component) 177 queue = createQueue(); 178 LOG.info("Endpoint {} is using queue: {} with size: {}", new Object[]{this, getEndpointUri(), getSize()}); 179 } 180 } 181 return queue; 182 } 183 184 protected BlockingQueue<Exchange> createQueue() { 185 if (size > 0) { 186 return queueFactory.create(size); 187 } else { 188 return queueFactory.create(); 189 } 190 } 191 192 /** 193 * Get's the {@link QueueReference} for the this endpoint. 194 * @return the reference, or <tt>null</tt> if no queue reference exists. 195 */ 196 public synchronized QueueReference getQueueReference() { 197 String key = getComponent().getQueueKey(getEndpointUri()); 198 QueueReference ref = getComponent().getQueueReference(key); 199 return ref; 200 } 201 202 protected synchronized MulticastProcessor getConsumerMulticastProcessor() throws Exception { 203 if (!multicastStarted && consumerMulticastProcessor != null) { 204 // only start it on-demand to avoid starting it during stopping 205 ServiceHelper.startService(consumerMulticastProcessor); 206 multicastStarted = true; 207 } 208 return consumerMulticastProcessor; 209 } 210 211 protected synchronized void updateMulticastProcessor() throws Exception { 212 // only needed if we support multiple consumers 213 if (!isMultipleConsumersSupported()) { 214 return; 215 } 216 217 // stop old before we create a new 218 if (consumerMulticastProcessor != null) { 219 ServiceHelper.stopService(consumerMulticastProcessor); 220 consumerMulticastProcessor = null; 221 } 222 223 int size = getConsumers().size(); 224 if (size >= 1) { 225 if (multicastExecutor == null) { 226 // create multicast executor as we need it when we have more than 1 processor 227 multicastExecutor = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, URISupport.sanitizeUri(getEndpointUri()) + "(multicast)"); 228 } 229 // create list of consumers to multicast to 230 List<Processor> processors = new ArrayList<Processor>(size); 231 for (SedaConsumer consumer : getConsumers()) { 232 processors.add(consumer.getProcessor()); 233 } 234 // create multicast processor 235 multicastStarted = false; 236 consumerMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, 237 true, multicastExecutor, false, false, false, 238 0, null, false, false); 239 } 240 } 241 242 /** 243 * Define the queue instance which will be used by the endpoint. 244 * <p/> 245 * This option is only for rare use-cases where you want to use a custom queue instance. 246 */ 247 public void setQueue(BlockingQueue<Exchange> queue) { 248 this.queue = queue; 249 this.size = queue.remainingCapacity(); 250 } 251 252 @ManagedAttribute(description = "Queue max capacity") 253 public int getSize() { 254 return size; 255 } 256 257 /** 258 * The maximum capacity of the SEDA queue (i.e., the number of messages it can hold). 259 */ 260 public void setSize(int size) { 261 this.size = size; 262 } 263 264 @ManagedAttribute(description = "Current queue size") 265 public int getCurrentQueueSize() { 266 return queue.size(); 267 } 268 269 /** 270 * Whether a thread that sends messages to a full SEDA queue will block until the queue's capacity is no longer exhausted. 271 * By default, an exception will be thrown stating that the queue is full. 272 * By enabling this option, the calling thread will instead block and wait until the message can be accepted. 273 */ 274 public void setBlockWhenFull(boolean blockWhenFull) { 275 this.blockWhenFull = blockWhenFull; 276 } 277 278 @ManagedAttribute(description = "Whether the caller will block sending to a full queue") 279 public boolean isBlockWhenFull() { 280 return blockWhenFull; 281 } 282 283 /** 284 * Number of concurrent threads processing exchanges. 285 */ 286 public void setConcurrentConsumers(int concurrentConsumers) { 287 this.concurrentConsumers = concurrentConsumers; 288 } 289 290 @ManagedAttribute(description = "Number of concurrent consumers") 291 public int getConcurrentConsumers() { 292 return concurrentConsumers; 293 } 294 295 @ManagedAttribute 296 public boolean isLimitConcurrentConsumers() { 297 return limitConcurrentConsumers; 298 } 299 300 /** 301 * Whether to limit the number of concurrentConsumers to the maximum of 500. 302 * By default, an exception will be thrown if an endpoint is configured with a greater number. You can disable that check by turning this option off. 303 */ 304 public void setLimitConcurrentConsumers(boolean limitConcurrentConsumers) { 305 this.limitConcurrentConsumers = limitConcurrentConsumers; 306 } 307 308 public WaitForTaskToComplete getWaitForTaskToComplete() { 309 return waitForTaskToComplete; 310 } 311 312 /** 313 * Option to specify whether the caller should wait for the async task to complete or not before continuing. 314 * The following three options are supported: Always, Never or IfReplyExpected. 315 * The first two values are self-explanatory. 316 * The last value, IfReplyExpected, will only wait if the message is Request Reply based. 317 * The default option is IfReplyExpected. 318 */ 319 public void setWaitForTaskToComplete(WaitForTaskToComplete waitForTaskToComplete) { 320 this.waitForTaskToComplete = waitForTaskToComplete; 321 } 322 323 @ManagedAttribute 324 public long getTimeout() { 325 return timeout; 326 } 327 328 /** 329 * Timeout (in milliseconds) before a SEDA producer will stop waiting for an asynchronous task to complete. 330 * You can disable timeout by using 0 or a negative value. 331 */ 332 public void setTimeout(long timeout) { 333 this.timeout = timeout; 334 } 335 336 @ManagedAttribute 337 public boolean isFailIfNoConsumers() { 338 return failIfNoConsumers; 339 } 340 341 /** 342 * Whether the producer should fail by throwing an exception, when sending to a queue with no active consumers. 343 * <p/> 344 * Only one of the options <tt>discardIfNoConsumers</tt> and <tt>failIfNoConsumers</tt> can be enabled at the same time. 345 */ 346 public void setFailIfNoConsumers(boolean failIfNoConsumers) { 347 this.failIfNoConsumers = failIfNoConsumers; 348 } 349 350 @ManagedAttribute 351 public boolean isDiscardIfNoConsumers() { 352 return discardIfNoConsumers; 353 } 354 355 /** 356 * Whether the producer should discard the message (do not add the message to the queue), when sending to a queue with no active consumers. 357 * <p/> 358 * Only one of the options <tt>discardIfNoConsumers</tt> and <tt>failIfNoConsumers</tt> can be enabled at the same time. 359 */ 360 public void setDiscardIfNoConsumers(boolean discardIfNoConsumers) { 361 this.discardIfNoConsumers = discardIfNoConsumers; 362 } 363 364 @ManagedAttribute 365 public boolean isMultipleConsumers() { 366 return multipleConsumers; 367 } 368 369 /** 370 * Specifies whether multiple consumers are allowed. If enabled, you can use SEDA for Publish-Subscribe messaging. 371 * That is, you can send a message to the SEDA queue and have each consumer receive a copy of the message. 372 * When enabled, this option should be specified on every consumer endpoint. 373 */ 374 public void setMultipleConsumers(boolean multipleConsumers) { 375 this.multipleConsumers = multipleConsumers; 376 } 377 378 @ManagedAttribute 379 public int getPollTimeout() { 380 return pollTimeout; 381 } 382 383 /** 384 * The timeout used when polling. When a timeout occurs, the consumer can check whether it is allowed to continue running. 385 * Setting a lower value allows the consumer to react more quickly upon shutdown. 386 */ 387 public void setPollTimeout(int pollTimeout) { 388 this.pollTimeout = pollTimeout; 389 } 390 391 @ManagedAttribute 392 public boolean isPurgeWhenStopping() { 393 return purgeWhenStopping; 394 } 395 396 /** 397 * Whether to purge the task queue when stopping the consumer/route. 398 * This allows to stop faster, as any pending messages on the queue is discarded. 399 */ 400 public void setPurgeWhenStopping(boolean purgeWhenStopping) { 401 this.purgeWhenStopping = purgeWhenStopping; 402 } 403 404 public boolean isSingleton() { 405 return true; 406 } 407 408 /** 409 * Returns the current pending exchanges 410 */ 411 public List<Exchange> getExchanges() { 412 return new ArrayList<Exchange>(getQueue()); 413 } 414 415 @ManagedAttribute 416 public boolean isMultipleConsumersSupported() { 417 return isMultipleConsumers(); 418 } 419 420 /** 421 * Purges the queue 422 */ 423 @ManagedOperation(description = "Purges the seda queue") 424 public void purgeQueue() { 425 LOG.debug("Purging queue with {} exchanges", queue.size()); 426 queue.clear(); 427 } 428 429 /** 430 * Returns the current active consumers on this endpoint 431 */ 432 public Set<SedaConsumer> getConsumers() { 433 return new HashSet<SedaConsumer>(consumers); 434 } 435 436 /** 437 * Returns the current active producers on this endpoint 438 */ 439 public Set<SedaProducer> getProducers() { 440 return new HashSet<SedaProducer>(producers); 441 } 442 443 void onStarted(SedaProducer producer) { 444 producers.add(producer); 445 } 446 447 void onStopped(SedaProducer producer) { 448 producers.remove(producer); 449 } 450 451 void onStarted(SedaConsumer consumer) throws Exception { 452 consumers.add(consumer); 453 if (isMultipleConsumers()) { 454 updateMulticastProcessor(); 455 } 456 } 457 458 void onStopped(SedaConsumer consumer) throws Exception { 459 consumers.remove(consumer); 460 if (isMultipleConsumers()) { 461 updateMulticastProcessor(); 462 } 463 } 464 465 public boolean hasConsumers() { 466 return this.consumers.size() > 0; 467 } 468 469 @Override 470 protected void doStart() throws Exception { 471 super.doStart(); 472 473 // force creating queue when starting 474 if (queue == null) { 475 queue = getQueue(); 476 } 477 478 // special for unit testing where we can set a system property to make seda poll faster 479 // and therefore also react faster upon shutdown, which makes overall testing faster of the Camel project 480 String override = System.getProperty("CamelSedaPollTimeout", "" + getPollTimeout()); 481 setPollTimeout(Integer.valueOf(override)); 482 } 483 484 @Override 485 public void stop() throws Exception { 486 if (getConsumers().isEmpty()) { 487 super.stop(); 488 } else { 489 LOG.debug("There is still active consumers."); 490 } 491 } 492 493 @Override 494 public void shutdown() throws Exception { 495 if (shutdown.get()) { 496 LOG.trace("Service already shut down"); 497 return; 498 } 499 500 // notify component we are shutting down this endpoint 501 if (getComponent() != null) { 502 getComponent().onShutdownEndpoint(this); 503 } 504 505 if (getConsumers().isEmpty()) { 506 super.shutdown(); 507 } else { 508 LOG.debug("There is still active consumers."); 509 } 510 } 511 512 @Override 513 protected void doShutdown() throws Exception { 514 // shutdown thread pool if it was in use 515 if (multicastExecutor != null) { 516 getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor); 517 multicastExecutor = null; 518 } 519 520 // clear queue, as we are shutdown, so if re-created then the queue must be updated 521 queue = null; 522 } 523 524}