001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.seda;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    import java.util.concurrent.BlockingQueue;
022    import java.util.concurrent.LinkedBlockingQueue;
023    
024    import org.apache.camel.Endpoint;
025    import org.apache.camel.Exchange;
026    import org.apache.camel.impl.DefaultComponent;
027    
028    /**
029     * An implementation of the <a href="http://camel.apache.org/seda.html">SEDA components</a>
030     * for asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
031     *
032     * @version 
033     */
034    public class SedaComponent extends DefaultComponent {
035        protected final int maxConcurrentConsumers = 500;
036        protected int queueSize;
037        protected int defaultConcurrentConsumers = 1;
038        private final Map<String, QueueReference> queues = new HashMap<String, QueueReference>();
039        
040        public void setQueueSize(int size) {
041            queueSize = size;
042        }
043        
044        public int getQueueSize() {
045            return queueSize;
046        }
047        
048        public void setConcurrentConsumers(int size) {
049            defaultConcurrentConsumers = size;
050        }
051        
052        public int getConcurrentConsumers() {
053            return defaultConcurrentConsumers;
054        }
055    
056        public synchronized BlockingQueue<Exchange> getOrCreateQueue(String uri, Integer size) {
057            String key = getQueueKey(uri);
058    
059            QueueReference ref = getQueues().get(key);
060            if (ref != null) {
061                // add the reference before returning queue
062                ref.addReference();
063                return ref.getQueue();
064            }
065    
066            // create queue
067            BlockingQueue<Exchange> queue;
068            if (size != null && size > 0) {
069                queue = new LinkedBlockingQueue<Exchange>(size);
070            } else {
071                if (getQueueSize() > 0) {
072                    queue = new LinkedBlockingQueue<Exchange>(getQueueSize());
073                } else {
074                    queue = new LinkedBlockingQueue<Exchange>();
075                }
076            }
077    
078            // create and add a new reference queue
079            ref = new QueueReference(queue);
080            ref.addReference();
081            getQueues().put(key, ref);
082    
083            return queue;
084        }
085    
086        public Map<String, QueueReference> getQueues() {
087            return queues;
088        }
089    
090        @Override
091        protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
092            int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, defaultConcurrentConsumers);
093            boolean limitConcurrentConsumers = getAndRemoveParameter(parameters, "limitConcurrentConsumers", Boolean.class, true);
094            if (limitConcurrentConsumers && consumers >  maxConcurrentConsumers) {
095                throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
096                        + maxConcurrentConsumers + " was " + consumers);
097            }
098            Integer size = getAndRemoveParameter(parameters, "size", Integer.class);
099            SedaEndpoint answer = new SedaEndpoint(uri, this, getOrCreateQueue(uri, size), consumers);
100            answer.configureProperties(parameters);
101            return answer;
102        }
103    
104        public String getQueueKey(String uri) {
105            if (uri.contains("?")) {
106                // strip parameters
107                uri = uri.substring(0, uri.indexOf('?'));
108            }
109            return uri;
110        }
111    
112        @Override
113        protected void doStop() throws Exception {
114            getQueues().clear();
115            super.doStop();
116        }
117    
118        /**
119         * On shutting down the endpoint
120         * 
121         * @param endpoint the endpoint
122         */
123        void onShutdownEndpoint(SedaEndpoint endpoint) {
124            // we need to remove the endpoint from the reference counter
125            String key = getQueueKey(endpoint.getEndpointUri());
126            QueueReference ref = getQueues().get(key);
127            if (ref != null) {
128                ref.removeReference();
129                if (ref.getCount() <= 0) {
130                    // reference no longer needed so remove from queues
131                    getQueues().remove(key);
132                }
133            }
134        }
135    
136        /**
137         * Holder for queue references.
138         * <p/>
139         * This is used to keep track of the usages of the queues, so we know when a queue is no longer
140         * in use, and can safely be discarded.
141         */
142        public static final class QueueReference {
143            
144            private final BlockingQueue<Exchange> queue;
145            private volatile int count;
146    
147            private QueueReference(BlockingQueue<Exchange> queue) {
148                this.queue = queue;
149            }
150            
151            void addReference() {
152                count++;
153            }
154            
155            void removeReference() {
156                count--;
157            }
158    
159            /**
160             * Gets the reference counter
161             */
162            public int getCount() {
163                return count;
164            }
165    
166            /**
167             * Gets the queue
168             */
169            public BlockingQueue<Exchange> getQueue() {
170                return queue;
171            }
172        }
173    }