001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.seda;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.concurrent.BlockingQueue;
022
023import org.apache.camel.Component;
024import org.apache.camel.Endpoint;
025import org.apache.camel.Exchange;
026import org.apache.camel.impl.UriEndpointComponent;
027import org.apache.camel.spi.Metadata;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * The <a href="http://camel.apache.org/seda.html">SEDA Component</a> is for asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
033 *
034 * @version 
035 */
036public class SedaComponent extends UriEndpointComponent {
037    protected final Logger log = LoggerFactory.getLogger(getClass());
038    protected final int maxConcurrentConsumers = 500;
039    protected int queueSize;
040    @Metadata(defaultValue = "1")
041    protected int concurrentConsumers = 1;
042    private final Map<String, QueueReference> queues = new HashMap<String, QueueReference>();
043    private BlockingQueueFactory<Exchange> defaultQueueFactory = new LinkedBlockingQueueFactory<Exchange>();
044
045    public SedaComponent() {
046        super(SedaEndpoint.class);
047    }
048
049    public SedaComponent(Class<? extends Endpoint> endpointClass) {
050        super(endpointClass);
051    }
052
053    /**
054     * Sets the default maximum capacity of the SEDA queue (i.e., the number of messages it can hold).
055     */
056    public void setQueueSize(int size) {
057        queueSize = size;
058    }
059    
060    public int getQueueSize() {
061        return queueSize;
062    }
063
064    /**
065     * Sets the default number of concurrent threads processing exchanges.
066     */
067    public void setConcurrentConsumers(int size) {
068        concurrentConsumers = size;
069    }
070    
071    public int getConcurrentConsumers() {
072        return concurrentConsumers;
073    }
074
075    public BlockingQueueFactory<Exchange> getDefaultQueueFactory() {
076        return defaultQueueFactory;
077    }
078
079    /**
080     * Sets the default queue factory.
081     */
082    public void setDefaultQueueFactory(BlockingQueueFactory<Exchange> defaultQueueFactory) {
083        this.defaultQueueFactory = defaultQueueFactory;
084    }
085
086    /**
087     * @deprecated use
088     */
089    @Deprecated
090    public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size) {
091        return getOrCreateQueue(endpoint, size, null);
092    }
093
094    /**
095     * @deprecated use {@link #getOrCreateQueue(SedaEndpoint, Integer, Boolean, BlockingQueueFactory)}
096     */
097    @Deprecated
098    public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size, Boolean multipleConsumers) {
099        return getOrCreateQueue(endpoint, size, multipleConsumers, null);
100    }
101
102    public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size, Boolean multipleConsumers, BlockingQueueFactory<Exchange> customQueueFactory) {
103        String key = getQueueKey(endpoint.getEndpointUri());
104
105        QueueReference ref = getQueues().get(key);
106        if (ref != null) {
107
108            // if the given size is not provided, we just use the existing queue as is
109            if (size != null && !size.equals(ref.getSize())) {
110                // there is already a queue, so make sure the size matches
111                throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue size "
112                        + (ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE) + " does not match given queue size " + size);
113            }
114            // add the reference before returning queue
115            ref.addReference(endpoint);
116
117            if (log.isDebugEnabled()) {
118                log.debug("Reusing existing queue {} with size {} and reference count {}", new Object[]{key, size, ref.getCount()});
119            }
120            return ref;
121        }
122
123        // create queue
124        BlockingQueue<Exchange> queue;
125        BlockingQueueFactory<Exchange> queueFactory = customQueueFactory == null ? defaultQueueFactory : customQueueFactory;
126        if (size != null && size > 0) {
127            queue = queueFactory.create(size);
128        } else {
129            if (getQueueSize() > 0) {
130                size = getQueueSize();
131                queue = queueFactory.create(getQueueSize());
132            } else {
133                queue = queueFactory.create();
134            }
135        }
136        log.debug("Created queue {} with size {}", key, size);
137
138        // create and add a new reference queue
139        ref = new QueueReference(queue, size, multipleConsumers);
140        ref.addReference(endpoint);
141        getQueues().put(key, ref);
142
143        return ref;
144    }
145
146    public synchronized QueueReference registerQueue(SedaEndpoint endpoint, BlockingQueue<Exchange> queue) {
147        String key = getQueueKey(endpoint.getEndpointUri());
148
149        QueueReference ref = getQueues().get(key);
150        if (ref == null) {
151            ref = new QueueReference(queue, endpoint.getSize(), endpoint.isMultipleConsumers());
152            ref.addReference(endpoint);
153            getQueues().put(key, ref);
154        }
155
156        return ref;
157    }
158
159    public Map<String, QueueReference> getQueues() {
160        return queues;
161    }
162
163    public QueueReference getQueueReference(String key) {
164        return queues.get(key);
165    }
166
167    @Override
168    @SuppressWarnings("unchecked")
169    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
170        int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, concurrentConsumers);
171        boolean limitConcurrentConsumers = getAndRemoveParameter(parameters, "limitConcurrentConsumers", Boolean.class, true);
172        if (limitConcurrentConsumers && consumers >  maxConcurrentConsumers) {
173            throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
174                    + maxConcurrentConsumers + " was " + consumers);
175        }
176
177        // Resolve queue reference
178        BlockingQueue<Exchange> queue = resolveAndRemoveReferenceParameter(parameters, "queue", BlockingQueue.class);
179        SedaEndpoint answer;
180        // Resolve queue factory when no queue specified
181        if (queue == null) {
182            BlockingQueueFactory<Exchange> queueFactory = resolveAndRemoveReferenceParameter(parameters, "queueFactory", BlockingQueueFactory.class);
183            // defer creating queue till endpoint is started, so we pass the queue factory
184            answer = createEndpoint(uri, this, queueFactory, consumers);
185        } else {
186            answer = createEndpoint(uri, this, queue, consumers);
187        }
188        answer.configureProperties(parameters);
189        answer.setConcurrentConsumers(consumers);
190        answer.setLimitConcurrentConsumers(limitConcurrentConsumers);
191        return answer;
192    }
193
194    protected SedaEndpoint createEndpoint(String endpointUri, Component component, BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) {
195        return new SedaEndpoint(endpointUri, component, queueFactory, concurrentConsumers);
196    }
197
198    protected SedaEndpoint createEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) {
199        return new SedaEndpoint(endpointUri, component, queue, concurrentConsumers);
200    }
201
202    public String getQueueKey(String uri) {
203        if (uri.contains("?")) {
204            // strip parameters
205            uri = uri.substring(0, uri.indexOf('?'));
206        }
207        return uri;
208    }
209
210    @Override
211    protected void doStop() throws Exception {
212        getQueues().clear();
213        super.doStop();
214    }
215
216    /**
217     * On shutting down the endpoint
218     * 
219     * @param endpoint the endpoint
220     */
221    void onShutdownEndpoint(SedaEndpoint endpoint) {
222        // we need to remove the endpoint from the reference counter
223        String key = getQueueKey(endpoint.getEndpointUri());
224        QueueReference ref = getQueues().get(key);
225        if (ref != null && endpoint.getConsumers().size() == 0) {
226            // only remove the endpoint when the consumers are removed
227            ref.removeReference(endpoint);
228            if (ref.getCount() <= 0) {
229                // reference no longer needed so remove from queues
230                getQueues().remove(key);
231            }
232        }
233    }
234
235}