001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import javax.xml.bind.annotation.XmlAccessType;
020import javax.xml.bind.annotation.XmlAccessorType;
021import javax.xml.bind.annotation.XmlAttribute;
022import javax.xml.bind.annotation.XmlRootElement;
023import javax.xml.bind.annotation.XmlTransient;
024
025import org.apache.camel.Expression;
026import org.apache.camel.Processor;
027import org.apache.camel.model.language.ExpressionDefinition;
028import org.apache.camel.processor.idempotent.IdempotentConsumer;
029import org.apache.camel.spi.IdempotentRepository;
030import org.apache.camel.spi.Metadata;
031import org.apache.camel.spi.RouteContext;
032import org.apache.camel.util.ObjectHelper;
033
034/**
035 * Filters out duplicate messages
036 */
037@Metadata(label = "eip,routing")
038@XmlRootElement(name = "idempotentConsumer")
039@XmlAccessorType(XmlAccessType.FIELD)
040public class IdempotentConsumerDefinition extends ExpressionNode {
041
042    @XmlAttribute(required = true)
043    private String messageIdRepositoryRef;
044    @XmlAttribute @Metadata(defaultValue = "true")
045    private Boolean eager;
046    @XmlAttribute
047    private Boolean completionEager;
048    @XmlAttribute @Metadata(defaultValue = "true")
049    private Boolean skipDuplicate;
050    @XmlAttribute @Metadata(defaultValue = "true")
051    private Boolean removeOnFailure;
052    @XmlTransient
053    private IdempotentRepository<?> idempotentRepository;
054
055    public IdempotentConsumerDefinition() {
056    }
057
058    public IdempotentConsumerDefinition(Expression messageIdExpression, IdempotentRepository<?> idempotentRepository) {
059        super(messageIdExpression);
060        this.idempotentRepository = idempotentRepository;
061    }
062
063    @Override
064    public String toString() {
065        return "IdempotentConsumer[" + getExpression() + " -> " + getOutputs() + "]";
066    }
067
068    @Override
069    public String getLabel() {
070        return "idempotentConsumer[" + getExpression() + "]";
071    }
072
073    // Fluent API
074    //-------------------------------------------------------------------------
075
076    /**
077     * Sets the reference name of the message id repository
078     *
079     * @param messageIdRepositoryRef the reference name of message id repository
080     * @return builder
081     */
082    public IdempotentConsumerDefinition messageIdRepositoryRef(String messageIdRepositoryRef) {
083        setMessageIdRepositoryRef(messageIdRepositoryRef);
084        return this;
085    }
086
087    /**
088     * Sets the the message id repository for the idempotent consumer
089     *
090     * @param idempotentRepository the repository instance of idempotent
091     * @return builder
092     */
093    public IdempotentConsumerDefinition messageIdRepository(IdempotentRepository<?> idempotentRepository) {
094        setMessageIdRepository(idempotentRepository);
095        return this;
096    }
097
098    /**
099     * Sets whether to eagerly add the key to the idempotent repository or wait until the exchange
100     * is complete. Eager is default enabled.
101     *
102     * @param eager <tt>true</tt> to add the key before processing, <tt>false</tt> to wait until
103     *              the exchange is complete.
104     * @return builder
105     */
106    public IdempotentConsumerDefinition eager(boolean eager) {
107        setEager(eager);
108        return this;
109    }
110
111    /**
112     * Sets whether to complete the idempotent consumer eager or when the exchange is done.
113     * <p/>
114     * If this option is <tt>true</tt> to complete eager, then the idempotent consumer will trigger its completion
115     * when the exchange reached the end of the block of the idempotent consumer pattern. So if the exchange
116     * is continued routed after the block ends, then whatever happens there does not affect the state.
117     * <p/>
118     * If this option is <tt>false</tt> (default) to <b>not</b> complete eager, then the idempotent consumer
119     * will complete when the exchange is done being routed. So if the exchange is continued routed after the block ends,
120     * then whatever happens there <b>also</b> affect the state.
121     * For example if the exchange failed due to an exception, then the state of the idempotent consumer will be a rollback.
122     *
123     * @param completionEager   whether to complete eager or complete when the exchange is done
124     * @return builder
125     */
126    public IdempotentConsumerDefinition completionEager(boolean completionEager) {
127        setCompletionEager(completionEager);
128        return this;
129    }
130
131    /**
132     * Sets whether to remove or keep the key on failure.
133     * <p/>
134     * The default behavior is to remove the key on failure.
135     *
136     * @param removeOnFailure <tt>true</tt> to remove the key, <tt>false</tt> to keep the key
137     *                        if the exchange fails.
138     * @return builder
139     */
140    public IdempotentConsumerDefinition removeOnFailure(boolean removeOnFailure) {
141        setRemoveOnFailure(removeOnFailure);
142        return this;
143    }
144
145    /**
146     * Sets whether to skip duplicates or not.
147     * <p/>
148     * The default behavior is to skip duplicates.
149     * <p/>
150     * A duplicate message would have the Exchange property {@link org.apache.camel.Exchange#DUPLICATE_MESSAGE} set
151     * to a {@link Boolean#TRUE} value. A none duplicate message will not have this property set.
152     *
153     * @param skipDuplicate <tt>true</tt> to skip duplicates, <tt>false</tt> to allow duplicates.
154     * @return builder
155     */
156    public IdempotentConsumerDefinition skipDuplicate(boolean skipDuplicate) {
157        setSkipDuplicate(skipDuplicate);
158        return this;
159    }
160
161    /**
162     * Expression used to calculate the correlation key to use for duplicate check.
163     * The Exchange which has the same correlation key is regarded as a duplicate and will be rejected.
164     */
165    @Override
166    public void setExpression(ExpressionDefinition expression) {
167        // override to include javadoc what the expression is used for
168        super.setExpression(expression);
169    }
170
171    public String getMessageIdRepositoryRef() {
172        return messageIdRepositoryRef;
173    }
174
175    public void setMessageIdRepositoryRef(String messageIdRepositoryRef) {
176        this.messageIdRepositoryRef = messageIdRepositoryRef;
177    }
178
179    public IdempotentRepository<?> getMessageIdRepository() {
180        return idempotentRepository;
181    }
182
183    public void setMessageIdRepository(IdempotentRepository<?> idempotentRepository) {
184        this.idempotentRepository = idempotentRepository;
185    }
186
187    public Boolean getEager() {
188        return eager;
189    }
190
191    public void setEager(Boolean eager) {
192        this.eager = eager;
193    }
194
195    public Boolean getSkipDuplicate() {
196        return skipDuplicate;
197    }
198
199    public void setSkipDuplicate(Boolean skipDuplicate) {
200        this.skipDuplicate = skipDuplicate;
201    }
202
203    public Boolean getRemoveOnFailure() {
204        return removeOnFailure;
205    }
206
207    public void setRemoveOnFailure(Boolean removeOnFailure) {
208        this.removeOnFailure = removeOnFailure;
209    }
210
211    public Boolean getCompletionEager() {
212        return completionEager;
213    }
214
215    public void setCompletionEager(Boolean completionEager) {
216        this.completionEager = completionEager;
217    }
218
219    @Override
220    @SuppressWarnings("unchecked")
221    public Processor createProcessor(RouteContext routeContext) throws Exception {
222        Processor childProcessor = this.createChildProcessor(routeContext, true);
223
224        IdempotentRepository<String> idempotentRepository =
225                (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
226        ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
227
228        Expression expression = getExpression().createExpression(routeContext);
229
230        // these boolean should be true by default
231        boolean eager = getEager() == null || getEager();
232        boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate();
233        boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure();
234        // these boolean should be false by default
235        boolean completionEager = getCompletionEager() != null && getCompletionEager();
236
237        return new IdempotentConsumer(expression, idempotentRepository, eager, completionEager, duplicate, remove, childProcessor);
238    }
239
240    /**
241     * Strategy method to resolve the {@link org.apache.camel.spi.IdempotentRepository} to use
242     *
243     * @param routeContext route context
244     * @return the repository
245     */
246    protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) {
247        if (messageIdRepositoryRef != null) {
248            idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class);
249        }
250        return idempotentRepository;
251    }
252}