001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.seda;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.concurrent.BlockingQueue;
022
023import org.apache.camel.Component;
024import org.apache.camel.Endpoint;
025import org.apache.camel.Exchange;
026import org.apache.camel.impl.UriEndpointComponent;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * The <a href="http://camel.apache.org/seda.html">SEDA Component</a> is for asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
032 *
033 * @version 
034 */
035public class SedaComponent extends UriEndpointComponent {
036    protected final Logger log = LoggerFactory.getLogger(getClass());
037    protected final int maxConcurrentConsumers = 500;
038    protected int queueSize;
039    protected int concurrentConsumers = 1;
040    private final Map<String, QueueReference> queues = new HashMap<String, QueueReference>();
041    private BlockingQueueFactory<Exchange> defaultQueueFactory = new LinkedBlockingQueueFactory<Exchange>();
042
043    public SedaComponent() {
044        super(SedaEndpoint.class);
045    }
046
047    public SedaComponent(Class<? extends Endpoint> endpointClass) {
048        super(endpointClass);
049    }
050
051    /**
052     * Sets the default maximum capacity of the SEDA queue (i.e., the number of messages it can hold).
053     */
054    public void setQueueSize(int size) {
055        queueSize = size;
056    }
057    
058    public int getQueueSize() {
059        return queueSize;
060    }
061
062    /**
063     * Sets the default number of concurrent threads processing exchanges.
064     */
065    public void setConcurrentConsumers(int size) {
066        concurrentConsumers = size;
067    }
068    
069    public int getConcurrentConsumers() {
070        return concurrentConsumers;
071    }
072
073    public BlockingQueueFactory<Exchange> getDefaultQueueFactory() {
074        return defaultQueueFactory;
075    }
076
077    /**
078     * Sets the default queue factory.
079     */
080    public void setDefaultQueueFactory(BlockingQueueFactory<Exchange> defaultQueueFactory) {
081        this.defaultQueueFactory = defaultQueueFactory;
082    }
083
084    /**
085     * @deprecated use
086     */
087    @Deprecated
088    public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size) {
089        return getOrCreateQueue(endpoint, size, null);
090    }
091
092    /**
093     * @deprecated use {@link #getOrCreateQueue(SedaEndpoint, Integer, Boolean, BlockingQueueFactory)}
094     */
095    @Deprecated
096    public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size, Boolean multipleConsumers) {
097        return getOrCreateQueue(endpoint, size, multipleConsumers, null);
098    }
099
100    public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size, Boolean multipleConsumers, BlockingQueueFactory<Exchange> customQueueFactory) {
101        String key = getQueueKey(endpoint.getEndpointUri());
102
103        QueueReference ref = getQueues().get(key);
104        if (ref != null) {
105
106            // if the given size is not provided, we just use the existing queue as is
107            if (size != null && !size.equals(ref.getSize())) {
108                // there is already a queue, so make sure the size matches
109                throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue size "
110                        + (ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE) + " does not match given queue size " + size);
111            }
112            // add the reference before returning queue
113            ref.addReference(endpoint);
114
115            if (log.isDebugEnabled()) {
116                log.debug("Reusing existing queue {} with size {} and reference count {}", new Object[]{key, size, ref.getCount()});
117            }
118            return ref;
119        }
120
121        // create queue
122        BlockingQueue<Exchange> queue;
123        BlockingQueueFactory<Exchange> queueFactory = customQueueFactory == null ? defaultQueueFactory : customQueueFactory;
124        if (size != null && size > 0) {
125            queue = queueFactory.create(size);
126        } else {
127            if (getQueueSize() > 0) {
128                size = getQueueSize();
129                queue = queueFactory.create(getQueueSize());
130            } else {
131                queue = queueFactory.create();
132            }
133        }
134        log.debug("Created queue {} with size {}", key, size);
135
136        // create and add a new reference queue
137        ref = new QueueReference(queue, size, multipleConsumers);
138        ref.addReference(endpoint);
139        getQueues().put(key, ref);
140
141        return ref;
142    }
143
144    public synchronized QueueReference registerQueue(SedaEndpoint endpoint, BlockingQueue<Exchange> queue) {
145        String key = getQueueKey(endpoint.getEndpointUri());
146
147        QueueReference ref = getQueues().get(key);
148        if (ref == null) {
149            ref = new QueueReference(queue, endpoint.getSize(), endpoint.isMultipleConsumers());
150            ref.addReference(endpoint);
151            getQueues().put(key, ref);
152        }
153
154        return ref;
155    }
156
157    public Map<String, QueueReference> getQueues() {
158        return queues;
159    }
160
161    public QueueReference getQueueReference(String key) {
162        return queues.get(key);
163    }
164
165    @Override
166    @SuppressWarnings("unchecked")
167    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
168        int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, concurrentConsumers);
169        boolean limitConcurrentConsumers = getAndRemoveParameter(parameters, "limitConcurrentConsumers", Boolean.class, true);
170        if (limitConcurrentConsumers && consumers >  maxConcurrentConsumers) {
171            throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
172                    + maxConcurrentConsumers + " was " + consumers);
173        }
174
175        // Resolve queue reference
176        BlockingQueue<Exchange> queue = resolveAndRemoveReferenceParameter(parameters, "queue", BlockingQueue.class);
177        SedaEndpoint answer;
178        // Resolve queue factory when no queue specified
179        if (queue == null) {
180            BlockingQueueFactory<Exchange> queueFactory = resolveAndRemoveReferenceParameter(parameters, "queueFactory", BlockingQueueFactory.class);
181            // defer creating queue till endpoint is started, so we pass the queue factory
182            answer = createEndpoint(uri, this, queueFactory, consumers);
183        } else {
184            answer = createEndpoint(uri, this, queue, consumers);
185        }
186        answer.configureProperties(parameters);
187        answer.setConcurrentConsumers(consumers);
188        answer.setLimitConcurrentConsumers(limitConcurrentConsumers);
189        return answer;
190    }
191
192    protected SedaEndpoint createEndpoint(String endpointUri, Component component, BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) {
193        return new SedaEndpoint(endpointUri, component, queueFactory, concurrentConsumers);
194    }
195
196    protected SedaEndpoint createEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) {
197        return new SedaEndpoint(endpointUri, component, queue, concurrentConsumers);
198    }
199
200    public String getQueueKey(String uri) {
201        if (uri.contains("?")) {
202            // strip parameters
203            uri = uri.substring(0, uri.indexOf('?'));
204        }
205        return uri;
206    }
207
208    @Override
209    protected void doStop() throws Exception {
210        getQueues().clear();
211        super.doStop();
212    }
213
214    /**
215     * On shutting down the endpoint
216     * 
217     * @param endpoint the endpoint
218     */
219    void onShutdownEndpoint(SedaEndpoint endpoint) {
220        // we need to remove the endpoint from the reference counter
221        String key = getQueueKey(endpoint.getEndpointUri());
222        QueueReference ref = getQueues().get(key);
223        if (ref != null && endpoint.getConsumers().size() == 0) {
224            // only remove the endpoint when the consumers are removed
225            ref.removeReference(endpoint);
226            if (ref.getCount() <= 0) {
227                // reference no longer needed so remove from queues
228                getQueues().remove(key);
229            }
230        }
231    }
232
233}