001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.direct;
018
019import java.util.ArrayList;
020import java.util.HashMap;
021import java.util.List;
022import java.util.Map;
023
024import org.apache.camel.Component;
025import org.apache.camel.Consumer;
026import org.apache.camel.Processor;
027import org.apache.camel.Producer;
028import org.apache.camel.impl.DefaultEndpoint;
029import org.apache.camel.spi.Metadata;
030import org.apache.camel.spi.UriEndpoint;
031import org.apache.camel.spi.UriParam;
032import org.apache.camel.spi.UriPath;
033import org.apache.camel.util.StopWatch;
034import org.apache.camel.util.StringHelper;
035
036/**
037 * The direct component provides direct, synchronous call to another endpoint from the same CamelContext.
038 *
039 * This endpoint can be used to connect existing routes in the same CamelContext.
040 */
041@UriEndpoint(firstVersion = "1.0.0", scheme = "direct", title = "Direct", syntax = "direct:name", consumerClass = DirectConsumer.class, label = "core,endpoint")
042public class DirectEndpoint extends DefaultEndpoint {
043
044    private final Map<String, DirectConsumer> consumers;
045    private final List<DirectProducer> producers = new ArrayList<>();
046
047    @UriPath(description = "Name of direct endpoint") @Metadata(required = "true")
048    private String name;
049
050    @UriParam(label = "producer", defaultValue = "true")
051    private boolean block = true;
052    @UriParam(label = "producer", defaultValue = "30000")
053    private long timeout = 30000L;
054    @UriParam(label = "producer")
055    private boolean failIfNoConsumers = true;
056
057    public DirectEndpoint() {
058        this.consumers = new HashMap<>();
059    }
060
061    public DirectEndpoint(String endpointUri, Component component) {
062        this(endpointUri, component, new HashMap<>());
063    }
064
065    public DirectEndpoint(String uri, Component component, Map<String, DirectConsumer> consumers) {
066        super(uri, component);
067        this.consumers = consumers;
068    }
069
070    public Producer createProducer() throws Exception {
071        return new DirectProducer(this);
072    }
073
074    public Consumer createConsumer(Processor processor) throws Exception {
075        Consumer answer = new DirectConsumer(this, processor);
076        configureConsumer(answer);
077        return answer;
078    }
079
080    public boolean isSingleton() {
081        return true;
082    }
083
084    public void addConsumer(DirectConsumer consumer) {
085        String key = getKey();
086        synchronized (consumers) {
087            if (consumers.putIfAbsent(key, consumer) != null) {
088                throw new IllegalArgumentException("Cannot add a 2nd consumer to the same endpoint. Endpoint " + this + " only allows one consumer.");
089            }
090            consumers.notifyAll();
091        }
092    }
093
094    public void removeConsumer(DirectConsumer consumer) {
095        String key = getKey();
096        synchronized (consumers) {
097            consumers.remove(key, consumer);
098            consumers.notifyAll();
099        }
100    }
101
102    public void addProducer(DirectProducer producer) {
103        synchronized (consumers) {
104            producers.add(producer);
105        }
106    }
107
108    public void removeProducer(DirectProducer producer) {
109        synchronized (consumers) {
110            producers.remove(producer);
111        }
112    }
113
114    protected DirectConsumer getConsumer() throws InterruptedException {
115        String key = getKey();
116        synchronized (consumers) {
117            DirectConsumer answer = consumers.get(key);
118            if (answer == null && block) {
119                StopWatch watch = new StopWatch();
120                for (;;) {
121                    answer = consumers.get(key);
122                    if (answer != null) {
123                        break;
124                    }
125                    long rem = timeout - watch.taken();
126                    if (rem <= 0) {
127                        break;
128                    }
129                    consumers.wait(rem);
130                }
131            }
132//            if (answer != null && answer.getEndpoint() != this) {
133//                throw new IllegalStateException();
134//            }
135            return answer;
136        }
137    }
138
139    public boolean isBlock() {
140        return block;
141    }
142
143    /**
144     * If sending a message to a direct endpoint which has no active consumer,
145     * then we can tell the producer to block and wait for the consumer to become active.
146     */
147    public void setBlock(boolean block) {
148        this.block = block;
149    }
150
151    public long getTimeout() {
152        return timeout;
153    }
154
155    /**
156     * The timeout value to use if block is enabled.
157     *
158     * @param timeout the timeout value
159     */
160    public void setTimeout(long timeout) {
161        this.timeout = timeout;
162    }
163
164    public boolean isFailIfNoConsumers() {
165        return failIfNoConsumers;
166    }
167
168    /**
169     * Whether the producer should fail by throwing an exception, when sending to a DIRECT endpoint with no active consumers.
170     */
171    public void setFailIfNoConsumers(boolean failIfNoConsumers) {
172        this.failIfNoConsumers = failIfNoConsumers;
173    }
174
175    protected String getKey() {
176        String uri = getEndpointUri();
177        if (uri.indexOf('?') != -1) {
178            return StringHelper.before(uri, "?");
179        } else {
180            return uri;
181        }
182    }
183}