001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.seda;
018    
019    import java.util.ArrayList;
020    import java.util.HashSet;
021    import java.util.List;
022    import java.util.Set;
023    import java.util.concurrent.BlockingQueue;
024    import java.util.concurrent.CopyOnWriteArraySet;
025    import java.util.concurrent.ExecutorService;
026    import java.util.concurrent.LinkedBlockingQueue;
027    
028    import org.apache.camel.Component;
029    import org.apache.camel.Consumer;
030    import org.apache.camel.Exchange;
031    import org.apache.camel.Message;
032    import org.apache.camel.MultipleConsumersSupport;
033    import org.apache.camel.Processor;
034    import org.apache.camel.Producer;
035    import org.apache.camel.WaitForTaskToComplete;
036    import org.apache.camel.api.management.ManagedAttribute;
037    import org.apache.camel.api.management.ManagedOperation;
038    import org.apache.camel.api.management.ManagedResource;
039    import org.apache.camel.impl.DefaultEndpoint;
040    import org.apache.camel.processor.MulticastProcessor;
041    import org.apache.camel.spi.BrowsableEndpoint;
042    import org.apache.camel.util.EndpointHelper;
043    import org.apache.camel.util.MessageHelper;
044    import org.apache.camel.util.ServiceHelper;
045    import org.apache.camel.util.URISupport;
046    
047    /**
048     * An implementation of the <a
049     * href="http://camel.apache.org/queue.html">Queue components</a> for
050     * asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
051     */
052    @ManagedResource(description = "Managed SedaEndpoint")
053    public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport {
054        private volatile BlockingQueue<Exchange> queue;
055        private int size;
056        private int concurrentConsumers = 1;
057        private volatile ExecutorService multicastExecutor;
058        private boolean multipleConsumers;
059        private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
060        private long timeout = 30000;
061        private final Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>();
062        private final Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>();
063        private volatile MulticastProcessor consumerMulticastProcessor;
064        private volatile boolean multicastStarted;
065        private boolean blockWhenFull;
066        private int pollTimeout = 1000;
067    
068        public SedaEndpoint() {
069        }
070    
071        public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) {
072            this(endpointUri, component, queue, 1);
073        }
074    
075        public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) {
076            super(endpointUri, component);
077            this.queue = queue;
078            this.size = queue.remainingCapacity();
079            this.concurrentConsumers = concurrentConsumers;
080        }
081    
082        @Override
083        public SedaComponent getComponent() {
084            return (SedaComponent) super.getComponent();
085        }
086    
087        public Producer createProducer() throws Exception {
088            return new SedaProducer(this, getWaitForTaskToComplete(), getTimeout(), isBlockWhenFull());
089        }
090    
091        public Consumer createConsumer(Processor processor) throws Exception {
092            return new SedaConsumer(this, processor);
093        }
094    
095        public synchronized BlockingQueue<Exchange> getQueue() {
096            if (queue == null) {
097                // prefer to lookup queue from component, so if this endpoint is re-created or re-started
098                // then the existing queue from the component can be used, so new producers and consumers
099                // can use the already existing queue referenced from the component
100                if (getComponent() != null) {
101                    queue = getComponent().getOrCreateQueue(getEndpointUri(), getSize());
102                } else {
103                    // fallback and create queue (as this endpoint has no component)
104                    queue = createQueue();
105                }
106            }
107            return queue;
108        }
109    
110        protected BlockingQueue<Exchange> createQueue() {
111            if (size > 0) {
112                return new LinkedBlockingQueue<Exchange>(size);
113            } else {
114                return new LinkedBlockingQueue<Exchange>();
115            }
116        }
117    
118        protected synchronized MulticastProcessor getConsumerMulticastProcessor() throws Exception {
119            if (!multicastStarted && consumerMulticastProcessor != null) {
120                // only start it on-demand to avoid starting it during stopping
121                ServiceHelper.startService(consumerMulticastProcessor);
122                multicastStarted = true;
123            }
124            return consumerMulticastProcessor;
125        }
126    
127        protected synchronized void updateMulticastProcessor() throws Exception {
128            if (consumerMulticastProcessor != null) {
129                ServiceHelper.stopService(consumerMulticastProcessor);
130            }
131    
132            int size = getConsumers().size();
133            if (size == 0 && multicastExecutor != null) {
134                // stop the multicast executor as its not needed anymore when size is zero
135                getCamelContext().getExecutorServiceManager().shutdown(multicastExecutor);
136                multicastExecutor = null;
137            }
138            if (size > 1) {
139                if (multicastExecutor == null) {
140                    // create multicast executor as we need it when we have more than 1 processor
141                    multicastExecutor = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, URISupport.sanitizeUri(getEndpointUri()) + "(multicast)");
142                }
143                // create list of consumers to multicast to
144                List<Processor> processors = new ArrayList<Processor>(size);
145                for (SedaConsumer consumer : getConsumers()) {
146                    processors.add(consumer.getProcessor());
147                }
148                // create multicast processor
149                multicastStarted = false;
150                consumerMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, true, multicastExecutor, false, false, false, 0, null, false);
151            } else {
152                // not needed
153                consumerMulticastProcessor = null;
154            }
155        }
156    
157        public void setQueue(BlockingQueue<Exchange> queue) {
158            this.queue = queue;
159            this.size = queue.remainingCapacity();
160        }
161    
162        @ManagedAttribute(description = "Queue max capacity")
163        public int getSize() {
164            return size;
165        }
166    
167        public void setSize(int size) {
168            this.size = size;
169        }
170    
171        @ManagedAttribute(description = "Current queue size")
172        public int getCurrentQueueSize() {
173            return queue.size();
174        }
175    
176        public void setBlockWhenFull(boolean blockWhenFull) {
177            this.blockWhenFull = blockWhenFull;
178        }
179    
180        @ManagedAttribute(description = "Whether the caller will block sending to a full queue")
181        public boolean isBlockWhenFull() {
182            return blockWhenFull;
183        }
184    
185        public void setConcurrentConsumers(int concurrentConsumers) {
186            this.concurrentConsumers = concurrentConsumers;
187        }
188    
189        @ManagedAttribute(description = "Number of concurrent consumers")
190        public int getConcurrentConsumers() {
191            return concurrentConsumers;
192        }
193    
194        public WaitForTaskToComplete getWaitForTaskToComplete() {
195            return waitForTaskToComplete;
196        }
197    
198        public void setWaitForTaskToComplete(WaitForTaskToComplete waitForTaskToComplete) {
199            this.waitForTaskToComplete = waitForTaskToComplete;
200        }
201    
202        @ManagedAttribute
203        public long getTimeout() {
204            return timeout;
205        }
206    
207        public void setTimeout(long timeout) {
208            this.timeout = timeout;
209        }
210    
211        @ManagedAttribute
212        public boolean isMultipleConsumers() {
213            return multipleConsumers;
214        }
215    
216        public void setMultipleConsumers(boolean multipleConsumers) {
217            this.multipleConsumers = multipleConsumers;
218        }
219    
220        @ManagedAttribute
221        public int getPollTimeout() {
222            return pollTimeout;
223        }
224    
225        public void setPollTimeout(int pollTimeout) {
226            this.pollTimeout = pollTimeout;
227        }
228    
229        public boolean isSingleton() {
230            return true;
231        }
232    
233        /**
234         * Returns the current pending exchanges
235         */
236        public List<Exchange> getExchanges() {
237            return new ArrayList<Exchange>(getQueue());
238        }
239    
240        @ManagedAttribute
241        public boolean isMultipleConsumersSupported() {
242            return isMultipleConsumers();
243        }
244    
245        /**
246         * Purges the queue
247         */
248        @ManagedOperation(description = "Purges the seda queue")
249        public void purgeQueue() {
250            queue.clear();
251        }
252    
253        /**
254         * Returns the current active consumers on this endpoint
255         */
256        public Set<SedaConsumer> getConsumers() {
257            return new HashSet<SedaConsumer>(consumers);
258        }
259    
260        /**
261         * Returns the current active producers on this endpoint
262         */
263        public Set<SedaProducer> getProducers() {
264            return new HashSet<SedaProducer>(producers);
265        }
266    
267        @ManagedOperation(description = "Current number of Exchanges in Queue")
268        public long queueSize() {
269            return getExchanges().size();
270        }
271    
272        @ManagedOperation(description = "Get Exchange from queue by index")
273        public String browseExchange(Integer index) {
274            List<Exchange> exchanges = getExchanges();
275            if (index >= exchanges.size()) {
276                return null;
277            }
278            Exchange exchange = exchanges.get(index);
279            if (exchange == null) {
280                return null;
281            }
282            // must use java type with JMX such as java.lang.String
283            return exchange.toString();
284        }
285    
286        @ManagedOperation(description = "Get message body from queue by index")
287        public String browseMessageBody(Integer index) {
288            List<Exchange> exchanges = getExchanges();
289            if (index >= exchanges.size()) {
290                return null;
291            }
292            Exchange exchange = exchanges.get(index);
293            if (exchange == null) {
294                return null;
295            }
296    
297            // must use java type with JMX such as java.lang.String
298            String body;
299            if (exchange.hasOut()) {
300                body = exchange.getOut().getBody(String.class);
301            } else {
302                body = exchange.getIn().getBody(String.class);
303            }
304    
305            return body;
306        }
307    
308        @ManagedOperation(description = "Get message as XML from queue by index")
309        public String browseMessageAsXml(Integer index, Boolean includeBody) {
310            List<Exchange> exchanges = getExchanges();
311            if (index >= exchanges.size()) {
312                return null;
313            }
314            Exchange exchange = exchanges.get(index);
315            if (exchange == null) {
316                return null;
317            }
318    
319            Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
320            String xml = MessageHelper.dumpAsXml(msg, includeBody);
321    
322            return xml;
323        }
324    
325        @ManagedOperation(description = "Gets all the messages as XML from the queue")
326        public String browseAllMessagesAsXml(Boolean includeBody) {
327            return browseRangeMessagesAsXml(0, Integer.MAX_VALUE, includeBody);
328        }
329    
330        @ManagedOperation(description = "Gets the range of messages as XML from the queue")
331        public String browseRangeMessagesAsXml(Integer fromIndex, Integer toIndex, Boolean includeBody) {
332            return EndpointHelper.browseRangeMessagesAsXml(this, fromIndex, toIndex, includeBody);
333        }
334    
335        void onStarted(SedaProducer producer) {
336            producers.add(producer);
337        }
338    
339        void onStopped(SedaProducer producer) {
340            producers.remove(producer);
341        }
342    
343        void onStarted(SedaConsumer consumer) throws Exception {
344            consumers.add(consumer);
345            if (isMultipleConsumers()) {
346                updateMulticastProcessor();
347            }
348        }
349    
350        void onStopped(SedaConsumer consumer) throws Exception {
351            consumers.remove(consumer);
352            if (isMultipleConsumers()) {
353                updateMulticastProcessor();
354            }
355        }
356    
357        @Override
358        protected void doStart() throws Exception {
359            super.doStart();
360    
361            // special for unit testing where we can set a system property to make seda poll faster
362            // and therefore also react faster upon shutdown, which makes overall testing faster of the Camel project
363            String override = System.getProperty("CamelSedaPollTimeout", "" + getPollTimeout());
364            setPollTimeout(Integer.valueOf(override));
365        }
366    
367        @Override
368        protected void doShutdown() throws Exception {
369            // notify component we are shutting down this endpoint
370            if (getComponent() != null) {
371                getComponent().onShutdownEndpoint(this);
372            }
373            // shutdown thread pool if it was in use
374            if (multicastExecutor != null) {
375                getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor);
376                multicastExecutor = null;
377            }
378    
379            // clear queue, as we are shutdown, so if re-created then the queue must be updated
380            queue = null;
381    
382            super.doShutdown();
383        }
384    }