001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.seda; 018 019 import java.util.ArrayList; 020 import java.util.HashSet; 021 import java.util.List; 022 import java.util.Set; 023 import java.util.concurrent.BlockingQueue; 024 import java.util.concurrent.CopyOnWriteArraySet; 025 import java.util.concurrent.ExecutorService; 026 import java.util.concurrent.LinkedBlockingQueue; 027 028 import org.apache.camel.Component; 029 import org.apache.camel.Consumer; 030 import org.apache.camel.Exchange; 031 import org.apache.camel.Message; 032 import org.apache.camel.MultipleConsumersSupport; 033 import org.apache.camel.Processor; 034 import org.apache.camel.Producer; 035 import org.apache.camel.WaitForTaskToComplete; 036 import org.apache.camel.api.management.ManagedAttribute; 037 import org.apache.camel.api.management.ManagedOperation; 038 import org.apache.camel.api.management.ManagedResource; 039 import org.apache.camel.impl.DefaultEndpoint; 040 import org.apache.camel.processor.MulticastProcessor; 041 import org.apache.camel.spi.BrowsableEndpoint; 042 import org.apache.camel.util.EndpointHelper; 043 import org.apache.camel.util.MessageHelper; 044 import org.apache.camel.util.ServiceHelper; 045 import org.apache.camel.util.URISupport; 046 047 /** 048 * An implementation of the <a 049 * href="http://camel.apache.org/queue.html">Queue components</a> for 050 * asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext 051 */ 052 @ManagedResource(description = "Managed SedaEndpoint") 053 public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport { 054 private volatile BlockingQueue<Exchange> queue; 055 private int size; 056 private int concurrentConsumers = 1; 057 private volatile ExecutorService multicastExecutor; 058 private boolean multipleConsumers; 059 private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected; 060 private long timeout = 30000; 061 private final Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>(); 062 private final Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>(); 063 private volatile MulticastProcessor consumerMulticastProcessor; 064 private volatile boolean multicastStarted; 065 private boolean blockWhenFull; 066 private int pollTimeout = 1000; 067 068 public SedaEndpoint() { 069 } 070 071 public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) { 072 this(endpointUri, component, queue, 1); 073 } 074 075 public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) { 076 super(endpointUri, component); 077 this.queue = queue; 078 this.size = queue.remainingCapacity(); 079 this.concurrentConsumers = concurrentConsumers; 080 } 081 082 @Override 083 public SedaComponent getComponent() { 084 return (SedaComponent) super.getComponent(); 085 } 086 087 public Producer createProducer() throws Exception { 088 return new SedaProducer(this, getWaitForTaskToComplete(), getTimeout(), isBlockWhenFull()); 089 } 090 091 public Consumer createConsumer(Processor processor) throws Exception { 092 return new SedaConsumer(this, processor); 093 } 094 095 public synchronized BlockingQueue<Exchange> getQueue() { 096 if (queue == null) { 097 // prefer to lookup queue from component, so if this endpoint is re-created or re-started 098 // then the existing queue from the component can be used, so new producers and consumers 099 // can use the already existing queue referenced from the component 100 if (getComponent() != null) { 101 queue = getComponent().getOrCreateQueue(getEndpointUri(), getSize()); 102 } else { 103 // fallback and create queue (as this endpoint has no component) 104 queue = createQueue(); 105 } 106 } 107 return queue; 108 } 109 110 protected BlockingQueue<Exchange> createQueue() { 111 if (size > 0) { 112 return new LinkedBlockingQueue<Exchange>(size); 113 } else { 114 return new LinkedBlockingQueue<Exchange>(); 115 } 116 } 117 118 protected synchronized MulticastProcessor getConsumerMulticastProcessor() throws Exception { 119 if (!multicastStarted && consumerMulticastProcessor != null) { 120 // only start it on-demand to avoid starting it during stopping 121 ServiceHelper.startService(consumerMulticastProcessor); 122 multicastStarted = true; 123 } 124 return consumerMulticastProcessor; 125 } 126 127 protected synchronized void updateMulticastProcessor() throws Exception { 128 if (consumerMulticastProcessor != null) { 129 ServiceHelper.stopService(consumerMulticastProcessor); 130 } 131 132 int size = getConsumers().size(); 133 if (size == 0 && multicastExecutor != null) { 134 // stop the multicast executor as its not needed anymore when size is zero 135 getCamelContext().getExecutorServiceManager().shutdown(multicastExecutor); 136 multicastExecutor = null; 137 } 138 if (size > 1) { 139 if (multicastExecutor == null) { 140 // create multicast executor as we need it when we have more than 1 processor 141 multicastExecutor = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, URISupport.sanitizeUri(getEndpointUri()) + "(multicast)"); 142 } 143 // create list of consumers to multicast to 144 List<Processor> processors = new ArrayList<Processor>(size); 145 for (SedaConsumer consumer : getConsumers()) { 146 processors.add(consumer.getProcessor()); 147 } 148 // create multicast processor 149 multicastStarted = false; 150 consumerMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, true, multicastExecutor, false, false, false, 0, null, false); 151 } else { 152 // not needed 153 consumerMulticastProcessor = null; 154 } 155 } 156 157 public void setQueue(BlockingQueue<Exchange> queue) { 158 this.queue = queue; 159 this.size = queue.remainingCapacity(); 160 } 161 162 @ManagedAttribute(description = "Queue max capacity") 163 public int getSize() { 164 return size; 165 } 166 167 public void setSize(int size) { 168 this.size = size; 169 } 170 171 @ManagedAttribute(description = "Current queue size") 172 public int getCurrentQueueSize() { 173 return queue.size(); 174 } 175 176 public void setBlockWhenFull(boolean blockWhenFull) { 177 this.blockWhenFull = blockWhenFull; 178 } 179 180 @ManagedAttribute(description = "Whether the caller will block sending to a full queue") 181 public boolean isBlockWhenFull() { 182 return blockWhenFull; 183 } 184 185 public void setConcurrentConsumers(int concurrentConsumers) { 186 this.concurrentConsumers = concurrentConsumers; 187 } 188 189 @ManagedAttribute(description = "Number of concurrent consumers") 190 public int getConcurrentConsumers() { 191 return concurrentConsumers; 192 } 193 194 public WaitForTaskToComplete getWaitForTaskToComplete() { 195 return waitForTaskToComplete; 196 } 197 198 public void setWaitForTaskToComplete(WaitForTaskToComplete waitForTaskToComplete) { 199 this.waitForTaskToComplete = waitForTaskToComplete; 200 } 201 202 @ManagedAttribute 203 public long getTimeout() { 204 return timeout; 205 } 206 207 public void setTimeout(long timeout) { 208 this.timeout = timeout; 209 } 210 211 @ManagedAttribute 212 public boolean isMultipleConsumers() { 213 return multipleConsumers; 214 } 215 216 public void setMultipleConsumers(boolean multipleConsumers) { 217 this.multipleConsumers = multipleConsumers; 218 } 219 220 @ManagedAttribute 221 public int getPollTimeout() { 222 return pollTimeout; 223 } 224 225 public void setPollTimeout(int pollTimeout) { 226 this.pollTimeout = pollTimeout; 227 } 228 229 public boolean isSingleton() { 230 return true; 231 } 232 233 /** 234 * Returns the current pending exchanges 235 */ 236 public List<Exchange> getExchanges() { 237 return new ArrayList<Exchange>(getQueue()); 238 } 239 240 @ManagedAttribute 241 public boolean isMultipleConsumersSupported() { 242 return isMultipleConsumers(); 243 } 244 245 /** 246 * Purges the queue 247 */ 248 @ManagedOperation(description = "Purges the seda queue") 249 public void purgeQueue() { 250 queue.clear(); 251 } 252 253 /** 254 * Returns the current active consumers on this endpoint 255 */ 256 public Set<SedaConsumer> getConsumers() { 257 return new HashSet<SedaConsumer>(consumers); 258 } 259 260 /** 261 * Returns the current active producers on this endpoint 262 */ 263 public Set<SedaProducer> getProducers() { 264 return new HashSet<SedaProducer>(producers); 265 } 266 267 @ManagedOperation(description = "Current number of Exchanges in Queue") 268 public long queueSize() { 269 return getExchanges().size(); 270 } 271 272 @ManagedOperation(description = "Get Exchange from queue by index") 273 public String browseExchange(Integer index) { 274 List<Exchange> exchanges = getExchanges(); 275 if (index >= exchanges.size()) { 276 return null; 277 } 278 Exchange exchange = exchanges.get(index); 279 if (exchange == null) { 280 return null; 281 } 282 // must use java type with JMX such as java.lang.String 283 return exchange.toString(); 284 } 285 286 @ManagedOperation(description = "Get message body from queue by index") 287 public String browseMessageBody(Integer index) { 288 List<Exchange> exchanges = getExchanges(); 289 if (index >= exchanges.size()) { 290 return null; 291 } 292 Exchange exchange = exchanges.get(index); 293 if (exchange == null) { 294 return null; 295 } 296 297 // must use java type with JMX such as java.lang.String 298 String body; 299 if (exchange.hasOut()) { 300 body = exchange.getOut().getBody(String.class); 301 } else { 302 body = exchange.getIn().getBody(String.class); 303 } 304 305 return body; 306 } 307 308 @ManagedOperation(description = "Get message as XML from queue by index") 309 public String browseMessageAsXml(Integer index, Boolean includeBody) { 310 List<Exchange> exchanges = getExchanges(); 311 if (index >= exchanges.size()) { 312 return null; 313 } 314 Exchange exchange = exchanges.get(index); 315 if (exchange == null) { 316 return null; 317 } 318 319 Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 320 String xml = MessageHelper.dumpAsXml(msg, includeBody); 321 322 return xml; 323 } 324 325 @ManagedOperation(description = "Gets all the messages as XML from the queue") 326 public String browseAllMessagesAsXml(Boolean includeBody) { 327 return browseRangeMessagesAsXml(0, Integer.MAX_VALUE, includeBody); 328 } 329 330 @ManagedOperation(description = "Gets the range of messages as XML from the queue") 331 public String browseRangeMessagesAsXml(Integer fromIndex, Integer toIndex, Boolean includeBody) { 332 return EndpointHelper.browseRangeMessagesAsXml(this, fromIndex, toIndex, includeBody); 333 } 334 335 void onStarted(SedaProducer producer) { 336 producers.add(producer); 337 } 338 339 void onStopped(SedaProducer producer) { 340 producers.remove(producer); 341 } 342 343 void onStarted(SedaConsumer consumer) throws Exception { 344 consumers.add(consumer); 345 if (isMultipleConsumers()) { 346 updateMulticastProcessor(); 347 } 348 } 349 350 void onStopped(SedaConsumer consumer) throws Exception { 351 consumers.remove(consumer); 352 if (isMultipleConsumers()) { 353 updateMulticastProcessor(); 354 } 355 } 356 357 @Override 358 protected void doStart() throws Exception { 359 super.doStart(); 360 361 // special for unit testing where we can set a system property to make seda poll faster 362 // and therefore also react faster upon shutdown, which makes overall testing faster of the Camel project 363 String override = System.getProperty("CamelSedaPollTimeout", "" + getPollTimeout()); 364 setPollTimeout(Integer.valueOf(override)); 365 } 366 367 @Override 368 protected void doShutdown() throws Exception { 369 // notify component we are shutting down this endpoint 370 if (getComponent() != null) { 371 getComponent().onShutdownEndpoint(this); 372 } 373 // shutdown thread pool if it was in use 374 if (multicastExecutor != null) { 375 getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor); 376 multicastExecutor = null; 377 } 378 379 // clear queue, as we are shutdown, so if re-created then the queue must be updated 380 queue = null; 381 382 super.doShutdown(); 383 } 384 }