001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import javax.xml.bind.annotation.XmlAccessType;
020import javax.xml.bind.annotation.XmlAccessorType;
021import javax.xml.bind.annotation.XmlAttribute;
022import javax.xml.bind.annotation.XmlRootElement;
023import javax.xml.bind.annotation.XmlTransient;
024
025import org.apache.camel.Expression;
026import org.apache.camel.Processor;
027import org.apache.camel.model.language.ExpressionDefinition;
028import org.apache.camel.processor.idempotent.IdempotentConsumer;
029import org.apache.camel.spi.IdempotentRepository;
030import org.apache.camel.spi.Metadata;
031import org.apache.camel.spi.RouteContext;
032import org.apache.camel.util.ObjectHelper;
033
034/**
035 * Filters out duplicate messages
036 */
037@Metadata(label = "eip,routing")
038@XmlRootElement(name = "idempotentConsumer")
039@XmlAccessorType(XmlAccessType.FIELD)
040public class IdempotentConsumerDefinition extends ExpressionNode {
041
042    @XmlAttribute(required = true)
043    private String messageIdRepositoryRef;
044    @XmlAttribute @Metadata(defaultValue = "true")
045    private Boolean eager;
046    @XmlAttribute
047    private Boolean completionEager;
048    @XmlAttribute @Metadata(defaultValue = "true")
049    private Boolean skipDuplicate;
050    @XmlAttribute @Metadata(defaultValue = "true")
051    private Boolean removeOnFailure;
052    @XmlTransient
053    private IdempotentRepository<?> idempotentRepository;
054
055    public IdempotentConsumerDefinition() {
056    }
057
058    public IdempotentConsumerDefinition(Expression messageIdExpression, IdempotentRepository<?> idempotentRepository) {
059        super(messageIdExpression);
060        this.idempotentRepository = idempotentRepository;
061    }
062
063    @Override
064    public String toString() {
065        return "IdempotentConsumer[" + getExpression() + " -> " + getOutputs() + "]";
066    }
067
068    @Override
069    public String getShortName() {
070        return "idempotentConsumer";
071    }
072
073    @Override
074    public String getLabel() {
075        return "idempotentConsumer[" + getExpression() + "]";
076    }
077
078    // Fluent API
079    //-------------------------------------------------------------------------
080
081    /**
082     * Sets the reference name of the message id repository
083     *
084     * @param messageIdRepositoryRef the reference name of message id repository
085     * @return builder
086     */
087    public IdempotentConsumerDefinition messageIdRepositoryRef(String messageIdRepositoryRef) {
088        setMessageIdRepositoryRef(messageIdRepositoryRef);
089        return this;
090    }
091
092    /**
093     * Sets the message id repository for the idempotent consumer
094     *
095     * @param idempotentRepository the repository instance of idempotent
096     * @return builder
097     */
098    public IdempotentConsumerDefinition messageIdRepository(IdempotentRepository<?> idempotentRepository) {
099        setMessageIdRepository(idempotentRepository);
100        return this;
101    }
102
103    /**
104     * Sets whether to eagerly add the key to the idempotent repository or wait until the exchange
105     * is complete. Eager is default enabled.
106     *
107     * @param eager <tt>true</tt> to add the key before processing, <tt>false</tt> to wait until
108     *              the exchange is complete.
109     * @return builder
110     */
111    public IdempotentConsumerDefinition eager(boolean eager) {
112        setEager(eager);
113        return this;
114    }
115
116    /**
117     * Sets whether to complete the idempotent consumer eager or when the exchange is done.
118     * <p/>
119     * If this option is <tt>true</tt> to complete eager, then the idempotent consumer will trigger its completion
120     * when the exchange reached the end of the block of the idempotent consumer pattern. So if the exchange
121     * is continued routed after the block ends, then whatever happens there does not affect the state.
122     * <p/>
123     * If this option is <tt>false</tt> (default) to <b>not</b> complete eager, then the idempotent consumer
124     * will complete when the exchange is done being routed. So if the exchange is continued routed after the block ends,
125     * then whatever happens there <b>also</b> affect the state.
126     * For example if the exchange failed due to an exception, then the state of the idempotent consumer will be a rollback.
127     *
128     * @param completionEager   whether to complete eager or complete when the exchange is done
129     * @return builder
130     */
131    public IdempotentConsumerDefinition completionEager(boolean completionEager) {
132        setCompletionEager(completionEager);
133        return this;
134    }
135
136    /**
137     * Sets whether to remove or keep the key on failure.
138     * <p/>
139     * The default behavior is to remove the key on failure.
140     *
141     * @param removeOnFailure <tt>true</tt> to remove the key, <tt>false</tt> to keep the key
142     *                        if the exchange fails.
143     * @return builder
144     */
145    public IdempotentConsumerDefinition removeOnFailure(boolean removeOnFailure) {
146        setRemoveOnFailure(removeOnFailure);
147        return this;
148    }
149
150    /**
151     * Sets whether to skip duplicates or not.
152     * <p/>
153     * The default behavior is to skip duplicates.
154     * <p/>
155     * A duplicate message would have the Exchange property {@link org.apache.camel.Exchange#DUPLICATE_MESSAGE} set
156     * to a {@link Boolean#TRUE} value. A none duplicate message will not have this property set.
157     *
158     * @param skipDuplicate <tt>true</tt> to skip duplicates, <tt>false</tt> to allow duplicates.
159     * @return builder
160     */
161    public IdempotentConsumerDefinition skipDuplicate(boolean skipDuplicate) {
162        setSkipDuplicate(skipDuplicate);
163        return this;
164    }
165
166    /**
167     * Expression used to calculate the correlation key to use for duplicate check.
168     * The Exchange which has the same correlation key is regarded as a duplicate and will be rejected.
169     */
170    @Override
171    public void setExpression(ExpressionDefinition expression) {
172        // override to include javadoc what the expression is used for
173        super.setExpression(expression);
174    }
175
176    public String getMessageIdRepositoryRef() {
177        return messageIdRepositoryRef;
178    }
179
180    public void setMessageIdRepositoryRef(String messageIdRepositoryRef) {
181        this.messageIdRepositoryRef = messageIdRepositoryRef;
182    }
183
184    public IdempotentRepository<?> getMessageIdRepository() {
185        return idempotentRepository;
186    }
187
188    public void setMessageIdRepository(IdempotentRepository<?> idempotentRepository) {
189        this.idempotentRepository = idempotentRepository;
190    }
191
192    public Boolean getEager() {
193        return eager;
194    }
195
196    public void setEager(Boolean eager) {
197        this.eager = eager;
198    }
199
200    public Boolean getSkipDuplicate() {
201        return skipDuplicate;
202    }
203
204    public void setSkipDuplicate(Boolean skipDuplicate) {
205        this.skipDuplicate = skipDuplicate;
206    }
207
208    public Boolean getRemoveOnFailure() {
209        return removeOnFailure;
210    }
211
212    public void setRemoveOnFailure(Boolean removeOnFailure) {
213        this.removeOnFailure = removeOnFailure;
214    }
215
216    public Boolean getCompletionEager() {
217        return completionEager;
218    }
219
220    public void setCompletionEager(Boolean completionEager) {
221        this.completionEager = completionEager;
222    }
223
224    @Override
225    public Processor createProcessor(RouteContext routeContext) throws Exception {
226        Processor childProcessor = this.createChildProcessor(routeContext, true);
227
228        IdempotentRepository<String> idempotentRepository = resolveMessageIdRepository(routeContext);
229        ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
230
231        Expression expression = getExpression().createExpression(routeContext);
232
233        // these boolean should be true by default
234        boolean eager = getEager() == null || getEager();
235        boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate();
236        boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure();
237
238        // these boolean should be false by default
239        boolean completionEager = getCompletionEager() != null && getCompletionEager();
240
241        return new IdempotentConsumer(expression, idempotentRepository, eager, completionEager, duplicate, remove, childProcessor);
242    }
243
244    /**
245     * Strategy method to resolve the {@link org.apache.camel.spi.IdempotentRepository} to use
246     *
247     * @param routeContext route context
248     * @return the repository
249     */
250    @SuppressWarnings("unchecked")
251    protected <T> IdempotentRepository<T> resolveMessageIdRepository(RouteContext routeContext) {
252        if (messageIdRepositoryRef != null) {
253            idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class);
254        }
255        return (IdempotentRepository<T>)idempotentRepository;
256    }
257}