001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.seda; 018 019import java.util.HashMap; 020import java.util.Map; 021import java.util.concurrent.BlockingQueue; 022 023import org.apache.camel.Component; 024import org.apache.camel.Endpoint; 025import org.apache.camel.Exchange; 026import org.apache.camel.impl.UriEndpointComponent; 027import org.apache.camel.spi.Metadata; 028import org.apache.camel.util.SedaConstants; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * The <a href="http://camel.apache.org/seda.html">SEDA Component</a> is for asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext 034 * 035 * @version 036 */ 037public class SedaComponent extends UriEndpointComponent { 038 protected final Logger log = LoggerFactory.getLogger(getClass()); 039 protected final int maxConcurrentConsumers = SedaConstants.MAX_CONCURRENT_CONSUMERS; 040 041 @Metadata(label = "consumer", defaultValue = "" + SedaConstants.CONCURRENT_CONSUMERS) 042 protected int concurrentConsumers = SedaConstants.CONCURRENT_CONSUMERS; 043 @Metadata(label = "advanced", defaultValue = "" + SedaConstants.QUEUE_SIZE) 044 protected int queueSize = SedaConstants.QUEUE_SIZE; 045 @Metadata(label = "advanced") 046 protected BlockingQueueFactory<Exchange> defaultQueueFactory = new LinkedBlockingQueueFactory<>(); 047 @Metadata(label = "producer") 048 private boolean defaultBlockWhenFull; 049 @Metadata(label = "producer") 050 private long defaultOfferTimeout; 051 052 private final Map<String, QueueReference> queues = new HashMap<>(); 053 054 public SedaComponent() { 055 super(SedaEndpoint.class); 056 } 057 058 public SedaComponent(Class<? extends Endpoint> endpointClass) { 059 super(endpointClass); 060 } 061 062 /** 063 * Sets the default maximum capacity of the SEDA queue (i.e., the number of messages it can hold). 064 */ 065 public void setQueueSize(int size) { 066 queueSize = size; 067 } 068 069 public int getQueueSize() { 070 return queueSize; 071 } 072 073 /** 074 * Sets the default number of concurrent threads processing exchanges. 075 */ 076 public void setConcurrentConsumers(int size) { 077 concurrentConsumers = size; 078 } 079 080 public int getConcurrentConsumers() { 081 return concurrentConsumers; 082 } 083 084 public BlockingQueueFactory<Exchange> getDefaultQueueFactory() { 085 return defaultQueueFactory; 086 } 087 088 /** 089 * Sets the default queue factory. 090 */ 091 public void setDefaultQueueFactory(BlockingQueueFactory<Exchange> defaultQueueFactory) { 092 this.defaultQueueFactory = defaultQueueFactory; 093 } 094 095 public boolean isDefaultBlockWhenFull() { 096 return defaultBlockWhenFull; 097 } 098 099 /** 100 * Whether a thread that sends messages to a full SEDA queue will block until the queue's capacity is no longer exhausted. 101 * By default, an exception will be thrown stating that the queue is full. 102 * By enabling this option, the calling thread will instead block and wait until the message can be accepted. 103 */ 104 public void setDefaultBlockWhenFull(boolean defaultBlockWhenFull) { 105 this.defaultBlockWhenFull = defaultBlockWhenFull; 106 } 107 108 109 public long getDefaultOfferTimeout() { 110 return defaultOfferTimeout; 111 } 112 113 /** 114 * Whether a thread that sends messages to a full SEDA queue will block until the queue's capacity is no longer exhausted. 115 * By default, an exception will be thrown stating that the queue is full. 116 * By enabling this option, where a configured timeout can be added to the block case. Utilizing the .offer(timeout) method of the underlining java queue 117 */ 118 public void setDefaultOfferTimeout(long defaultOfferTimeout) { 119 this.defaultOfferTimeout = defaultOfferTimeout; 120 } 121 122 /** 123 * @deprecated use 124 */ 125 @Deprecated 126 public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size) { 127 return getOrCreateQueue(endpoint, size, null); 128 } 129 130 /** 131 * @deprecated use {@link #getOrCreateQueue(SedaEndpoint, Integer, Boolean, BlockingQueueFactory)} 132 */ 133 @Deprecated 134 public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size, Boolean multipleConsumers) { 135 return getOrCreateQueue(endpoint, size, multipleConsumers, null); 136 } 137 138 public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size, Boolean multipleConsumers, BlockingQueueFactory<Exchange> customQueueFactory) { 139 String key = getQueueKey(endpoint.getEndpointUri()); 140 141 QueueReference ref = getQueues().get(key); 142 if (ref != null) { 143 144 // if the given size is not provided, we just use the existing queue as is 145 if (size != null && !size.equals(ref.getSize())) { 146 // there is already a queue, so make sure the size matches 147 throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue size " 148 + (ref.getSize() != null ? ref.getSize() : SedaConstants.QUEUE_SIZE) + " does not match given queue size " + size); 149 } 150 // add the reference before returning queue 151 ref.addReference(endpoint); 152 153 if (log.isDebugEnabled()) { 154 log.debug("Reusing existing queue {} with size {} and reference count {}", key, size, ref.getCount()); 155 } 156 return ref; 157 } 158 159 // create queue 160 BlockingQueue<Exchange> queue; 161 BlockingQueueFactory<Exchange> queueFactory = customQueueFactory == null ? defaultQueueFactory : customQueueFactory; 162 if (size != null && size > 0) { 163 queue = queueFactory.create(size); 164 } else { 165 if (getQueueSize() > 0) { 166 size = getQueueSize(); 167 queue = queueFactory.create(getQueueSize()); 168 } else { 169 queue = queueFactory.create(); 170 } 171 } 172 log.debug("Created queue {} with size {}", key, size); 173 174 // create and add a new reference queue 175 ref = new QueueReference(queue, size, multipleConsumers); 176 ref.addReference(endpoint); 177 getQueues().put(key, ref); 178 179 return ref; 180 } 181 182 public synchronized QueueReference registerQueue(SedaEndpoint endpoint, BlockingQueue<Exchange> queue) { 183 String key = getQueueKey(endpoint.getEndpointUri()); 184 185 QueueReference ref = getQueues().get(key); 186 if (ref == null) { 187 ref = new QueueReference(queue, endpoint.getSize(), endpoint.isMultipleConsumers()); 188 ref.addReference(endpoint); 189 getQueues().put(key, ref); 190 } 191 192 return ref; 193 } 194 195 public Map<String, QueueReference> getQueues() { 196 return queues; 197 } 198 199 public QueueReference getQueueReference(String key) { 200 return queues.get(key); 201 } 202 203 @Override 204 @SuppressWarnings("unchecked") 205 protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { 206 int consumers = getAndRemoveOrResolveReferenceParameter(parameters, "concurrentConsumers", Integer.class, concurrentConsumers); 207 boolean limitConcurrentConsumers = getAndRemoveOrResolveReferenceParameter(parameters, "limitConcurrentConsumers", Boolean.class, true); 208 if (limitConcurrentConsumers && consumers > maxConcurrentConsumers) { 209 throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than " 210 + maxConcurrentConsumers + " was " + consumers); 211 } 212 213 // Resolve queue reference 214 BlockingQueue<Exchange> queue = resolveAndRemoveReferenceParameter(parameters, "queue", BlockingQueue.class); 215 SedaEndpoint answer; 216 // Resolve queue factory when no queue specified 217 if (queue == null) { 218 BlockingQueueFactory<Exchange> queueFactory = resolveAndRemoveReferenceParameter(parameters, "queueFactory", BlockingQueueFactory.class); 219 // defer creating queue till endpoint is started, so we pass the queue factory 220 answer = createEndpoint(uri, this, queueFactory, consumers); 221 } else { 222 answer = createEndpoint(uri, this, queue, consumers); 223 } 224 225 // if blockWhenFull is set on endpoint, defaultBlockWhenFull is ignored. 226 boolean blockWhenFull = getAndRemoveParameter(parameters, "blockWhenFull", Boolean.class, defaultBlockWhenFull); 227 // if offerTimeout is set on endpoint, defaultOfferTimeout is ignored. 228 long offerTimeout = getAndRemoveParameter(parameters, "offerTimeout", long.class, defaultOfferTimeout); 229 230 answer.setOfferTimeout(offerTimeout); 231 answer.setBlockWhenFull(blockWhenFull); 232 answer.configureProperties(parameters); 233 answer.setConcurrentConsumers(consumers); 234 answer.setLimitConcurrentConsumers(limitConcurrentConsumers); 235 return answer; 236 } 237 238 protected SedaEndpoint createEndpoint(String endpointUri, Component component, BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) { 239 return new SedaEndpoint(endpointUri, component, queueFactory, concurrentConsumers); 240 } 241 242 protected SedaEndpoint createEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) { 243 return new SedaEndpoint(endpointUri, component, queue, concurrentConsumers); 244 } 245 246 public String getQueueKey(String uri) { 247 if (uri.contains("?")) { 248 // strip parameters 249 uri = uri.substring(0, uri.indexOf('?')); 250 } 251 return uri; 252 } 253 254 @Override 255 protected void doStop() throws Exception { 256 getQueues().clear(); 257 super.doStop(); 258 } 259 260 /** 261 * On shutting down the endpoint 262 * 263 * @param endpoint the endpoint 264 */ 265 void onShutdownEndpoint(SedaEndpoint endpoint) { 266 // we need to remove the endpoint from the reference counter 267 String key = getQueueKey(endpoint.getEndpointUri()); 268 QueueReference ref = getQueues().get(key); 269 if (ref != null && endpoint.getConsumers().size() == 0) { 270 // only remove the endpoint when the consumers are removed 271 ref.removeReference(endpoint); 272 if (ref.getCount() <= 0) { 273 // reference no longer needed so remove from queues 274 getQueues().remove(key); 275 } 276 } 277 } 278 279}