001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.seda; 018 019 import java.util.HashMap; 020 import java.util.Map; 021 import java.util.concurrent.BlockingQueue; 022 import java.util.concurrent.LinkedBlockingQueue; 023 024 import org.apache.camel.Endpoint; 025 import org.apache.camel.Exchange; 026 import org.apache.camel.impl.DefaultComponent; 027 028 /** 029 * An implementation of the <a href="http://camel.apache.org/seda.html">SEDA components</a> 030 * for asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext 031 * 032 * @version 033 */ 034 public class SedaComponent extends DefaultComponent { 035 protected final int maxConcurrentConsumers = 500; 036 protected int queueSize; 037 protected int defaultConcurrentConsumers = 1; 038 private final Map<String, QueueReference> queues = new HashMap<String, QueueReference>(); 039 040 public void setQueueSize(int size) { 041 queueSize = size; 042 } 043 044 public int getQueueSize() { 045 return queueSize; 046 } 047 048 public void setConcurrentConsumers(int size) { 049 defaultConcurrentConsumers = size; 050 } 051 052 public int getConcurrentConsumers() { 053 return defaultConcurrentConsumers; 054 } 055 056 public synchronized BlockingQueue<Exchange> getOrCreateQueue(String uri, Integer size) { 057 String key = getQueueKey(uri); 058 059 QueueReference ref = getQueues().get(key); 060 if (ref != null) { 061 // add the reference before returning queue 062 ref.addReference(); 063 return ref.getQueue(); 064 } 065 066 // create queue 067 BlockingQueue<Exchange> queue; 068 if (size != null && size > 0) { 069 queue = new LinkedBlockingQueue<Exchange>(size); 070 } else { 071 if (getQueueSize() > 0) { 072 queue = new LinkedBlockingQueue<Exchange>(getQueueSize()); 073 } else { 074 queue = new LinkedBlockingQueue<Exchange>(); 075 } 076 } 077 078 // create and add a new reference queue 079 ref = new QueueReference(queue); 080 ref.addReference(); 081 getQueues().put(key, ref); 082 083 return queue; 084 } 085 086 public Map<String, QueueReference> getQueues() { 087 return queues; 088 } 089 090 @Override 091 protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { 092 int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, defaultConcurrentConsumers); 093 boolean limitConcurrentConsumers = getAndRemoveParameter(parameters, "limitConcurrentConsumers", Boolean.class, true); 094 if (limitConcurrentConsumers && consumers > maxConcurrentConsumers) { 095 throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than " 096 + maxConcurrentConsumers + " was " + consumers); 097 } 098 Integer size = getAndRemoveParameter(parameters, "size", Integer.class); 099 SedaEndpoint answer = new SedaEndpoint(uri, this, getOrCreateQueue(uri, size), consumers); 100 answer.configureProperties(parameters); 101 return answer; 102 } 103 104 public String getQueueKey(String uri) { 105 if (uri.contains("?")) { 106 // strip parameters 107 uri = uri.substring(0, uri.indexOf('?')); 108 } 109 return uri; 110 } 111 112 @Override 113 protected void doStop() throws Exception { 114 getQueues().clear(); 115 super.doStop(); 116 } 117 118 /** 119 * On shutting down the endpoint 120 * 121 * @param endpoint the endpoint 122 */ 123 void onShutdownEndpoint(SedaEndpoint endpoint) { 124 // we need to remove the endpoint from the reference counter 125 String key = getQueueKey(endpoint.getEndpointUri()); 126 QueueReference ref = getQueues().get(key); 127 if (ref != null) { 128 ref.removeReference(); 129 if (ref.getCount() <= 0) { 130 // reference no longer needed so remove from queues 131 getQueues().remove(key); 132 } 133 } 134 } 135 136 /** 137 * Holder for queue references. 138 * <p/> 139 * This is used to keep track of the usages of the queues, so we know when a queue is no longer 140 * in use, and can safely be discarded. 141 */ 142 public static final class QueueReference { 143 144 private final BlockingQueue<Exchange> queue; 145 private volatile int count; 146 147 private QueueReference(BlockingQueue<Exchange> queue) { 148 this.queue = queue; 149 } 150 151 void addReference() { 152 count++; 153 } 154 155 void removeReference() { 156 count--; 157 } 158 159 /** 160 * Gets the reference counter 161 */ 162 public int getCount() { 163 return count; 164 } 165 166 /** 167 * Gets the queue 168 */ 169 public BlockingQueue<Exchange> getQueue() { 170 return queue; 171 } 172 } 173 }