001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.seda;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.concurrent.BlockingQueue;
022
023import org.apache.camel.Component;
024import org.apache.camel.Endpoint;
025import org.apache.camel.Exchange;
026import org.apache.camel.impl.UriEndpointComponent;
027import org.apache.camel.spi.Metadata;
028import org.apache.camel.util.SedaConstants;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * The <a href="http://camel.apache.org/seda.html">SEDA Component</a> is for asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
034 *
035 * @version 
036 */
037public class SedaComponent extends UriEndpointComponent {
038    protected final Logger log = LoggerFactory.getLogger(getClass());
039    protected final int maxConcurrentConsumers = SedaConstants.MAX_CONCURRENT_CONSUMERS;
040
041    @Metadata(label = "consumer", defaultValue = "" + SedaConstants.CONCURRENT_CONSUMERS)
042    protected int concurrentConsumers = SedaConstants.CONCURRENT_CONSUMERS;
043    @Metadata(label = "advanced", defaultValue = "" + SedaConstants.QUEUE_SIZE)
044    protected int queueSize = SedaConstants.QUEUE_SIZE;
045    @Metadata(label = "advanced")
046    protected BlockingQueueFactory<Exchange> defaultQueueFactory = new LinkedBlockingQueueFactory<>();
047    @Metadata(label = "producer")
048    private boolean defaultBlockWhenFull;
049    @Metadata(label = "producer")
050    private long defaultOfferTimeout;
051
052    private final Map<String, QueueReference> queues = new HashMap<>();
053
054    public SedaComponent() {
055        super(SedaEndpoint.class);
056    }
057
058    public SedaComponent(Class<? extends Endpoint> endpointClass) {
059        super(endpointClass);
060    }
061
062    /**
063     * Sets the default maximum capacity of the SEDA queue (i.e., the number of messages it can hold).
064     */
065    public void setQueueSize(int size) {
066        queueSize = size;
067    }
068    
069    public int getQueueSize() {
070        return queueSize;
071    }
072
073    /**
074     * Sets the default number of concurrent threads processing exchanges.
075     */
076    public void setConcurrentConsumers(int size) {
077        concurrentConsumers = size;
078    }
079    
080    public int getConcurrentConsumers() {
081        return concurrentConsumers;
082    }
083
084    public BlockingQueueFactory<Exchange> getDefaultQueueFactory() {
085        return defaultQueueFactory;
086    }
087
088    /**
089     * Sets the default queue factory.
090     */
091    public void setDefaultQueueFactory(BlockingQueueFactory<Exchange> defaultQueueFactory) {
092        this.defaultQueueFactory = defaultQueueFactory;
093    }
094
095    public boolean isDefaultBlockWhenFull() {
096        return defaultBlockWhenFull;
097    }
098
099    /**
100     * Whether a thread that sends messages to a full SEDA queue will block until the queue's capacity is no longer exhausted.
101     * By default, an exception will be thrown stating that the queue is full.
102     * By enabling this option, the calling thread will instead block and wait until the message can be accepted.
103     */
104    public void setDefaultBlockWhenFull(boolean defaultBlockWhenFull) {
105        this.defaultBlockWhenFull = defaultBlockWhenFull;
106    }
107    
108    
109    public long getDefaultOfferTimeout() {
110        return defaultOfferTimeout;
111    }
112    
113    /**
114     * Whether a thread that sends messages to a full SEDA queue will block until the queue's capacity is no longer exhausted.
115     * By default, an exception will be thrown stating that the queue is full.
116     * By enabling this option, where a configured timeout can be added to the block case.  Utilizing the .offer(timeout) method of the underlining java queue
117     */
118    public void setDefaultOfferTimeout(long defaultOfferTimeout) {
119        this.defaultOfferTimeout = defaultOfferTimeout;
120    }
121
122    /**
123     * @deprecated use
124     */
125    @Deprecated
126    public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size) {
127        return getOrCreateQueue(endpoint, size, null);
128    }
129
130    /**
131     * @deprecated use {@link #getOrCreateQueue(SedaEndpoint, Integer, Boolean, BlockingQueueFactory)}
132     */
133    @Deprecated
134    public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size, Boolean multipleConsumers) {
135        return getOrCreateQueue(endpoint, size, multipleConsumers, null);
136    }
137
138    public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size, Boolean multipleConsumers, BlockingQueueFactory<Exchange> customQueueFactory) {
139        String key = getQueueKey(endpoint.getEndpointUri());
140
141        QueueReference ref = getQueues().get(key);
142        if (ref != null) {
143
144            // if the given size is not provided, we just use the existing queue as is
145            if (size != null && !size.equals(ref.getSize())) {
146                // there is already a queue, so make sure the size matches
147                throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue size "
148                        + (ref.getSize() != null ? ref.getSize() : SedaConstants.QUEUE_SIZE) + " does not match given queue size " + size);
149            }
150            // add the reference before returning queue
151            ref.addReference(endpoint);
152
153            if (log.isDebugEnabled()) {
154                log.debug("Reusing existing queue {} with size {} and reference count {}", key, size, ref.getCount());
155            }
156            return ref;
157        }
158
159        // create queue
160        BlockingQueue<Exchange> queue;
161        BlockingQueueFactory<Exchange> queueFactory = customQueueFactory == null ? defaultQueueFactory : customQueueFactory;
162        if (size != null && size > 0) {
163            queue = queueFactory.create(size);
164        } else {
165            if (getQueueSize() > 0) {
166                size = getQueueSize();
167                queue = queueFactory.create(getQueueSize());
168            } else {
169                queue = queueFactory.create();
170            }
171        }
172        log.debug("Created queue {} with size {}", key, size);
173
174        // create and add a new reference queue
175        ref = new QueueReference(queue, size, multipleConsumers);
176        ref.addReference(endpoint);
177        getQueues().put(key, ref);
178
179        return ref;
180    }
181
182    public synchronized QueueReference registerQueue(SedaEndpoint endpoint, BlockingQueue<Exchange> queue) {
183        String key = getQueueKey(endpoint.getEndpointUri());
184
185        QueueReference ref = getQueues().get(key);
186        if (ref == null) {
187            ref = new QueueReference(queue, endpoint.getSize(), endpoint.isMultipleConsumers());
188            ref.addReference(endpoint);
189            getQueues().put(key, ref);
190        }
191
192        return ref;
193    }
194
195    public Map<String, QueueReference> getQueues() {
196        return queues;
197    }
198
199    public QueueReference getQueueReference(String key) {
200        return queues.get(key);
201    }
202
203    @Override
204    @SuppressWarnings("unchecked")
205    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
206        int consumers = getAndRemoveOrResolveReferenceParameter(parameters, "concurrentConsumers", Integer.class, concurrentConsumers);
207        boolean limitConcurrentConsumers = getAndRemoveOrResolveReferenceParameter(parameters, "limitConcurrentConsumers", Boolean.class, true);
208        if (limitConcurrentConsumers && consumers >  maxConcurrentConsumers) {
209            throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
210                    + maxConcurrentConsumers + " was " + consumers);
211        }
212
213        // Resolve queue reference
214        BlockingQueue<Exchange> queue = resolveAndRemoveReferenceParameter(parameters, "queue", BlockingQueue.class);
215        SedaEndpoint answer;
216        // Resolve queue factory when no queue specified
217        if (queue == null) {
218            BlockingQueueFactory<Exchange> queueFactory = resolveAndRemoveReferenceParameter(parameters, "queueFactory", BlockingQueueFactory.class);
219            // defer creating queue till endpoint is started, so we pass the queue factory
220            answer = createEndpoint(uri, this, queueFactory, consumers);
221        } else {
222            answer = createEndpoint(uri, this, queue, consumers);
223        }
224
225        // if blockWhenFull is set on endpoint, defaultBlockWhenFull is ignored.
226        boolean blockWhenFull = getAndRemoveParameter(parameters, "blockWhenFull", Boolean.class, defaultBlockWhenFull);
227        // if offerTimeout is set on endpoint, defaultOfferTimeout is ignored.
228        long offerTimeout = getAndRemoveParameter(parameters, "offerTimeout", long.class, defaultOfferTimeout);
229        
230        answer.setOfferTimeout(offerTimeout);
231        answer.setBlockWhenFull(blockWhenFull);
232        answer.configureProperties(parameters);
233        answer.setConcurrentConsumers(consumers);
234        answer.setLimitConcurrentConsumers(limitConcurrentConsumers);
235        return answer;
236    }
237
238    protected SedaEndpoint createEndpoint(String endpointUri, Component component, BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) {
239        return new SedaEndpoint(endpointUri, component, queueFactory, concurrentConsumers);
240    }
241
242    protected SedaEndpoint createEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) {
243        return new SedaEndpoint(endpointUri, component, queue, concurrentConsumers);
244    }
245
246    public String getQueueKey(String uri) {
247        if (uri.contains("?")) {
248            // strip parameters
249            uri = uri.substring(0, uri.indexOf('?'));
250        }
251        return uri;
252    }
253
254    @Override
255    protected void doStop() throws Exception {
256        getQueues().clear();
257        super.doStop();
258    }
259
260    /**
261     * On shutting down the endpoint
262     * 
263     * @param endpoint the endpoint
264     */
265    void onShutdownEndpoint(SedaEndpoint endpoint) {
266        // we need to remove the endpoint from the reference counter
267        String key = getQueueKey(endpoint.getEndpointUri());
268        QueueReference ref = getQueues().get(key);
269        if (ref != null && endpoint.getConsumers().size() == 0) {
270            // only remove the endpoint when the consumers are removed
271            ref.removeReference(endpoint);
272            if (ref.getCount() <= 0) {
273                // reference no longer needed so remove from queues
274                getQueues().remove(key);
275            }
276        }
277    }
278
279}