001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor.idempotent;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.atomic.AtomicLong;
022
023import org.apache.camel.AsyncCallback;
024import org.apache.camel.AsyncProcessor;
025import org.apache.camel.CamelContext;
026import org.apache.camel.CamelContextAware;
027import org.apache.camel.Exchange;
028import org.apache.camel.Expression;
029import org.apache.camel.Navigate;
030import org.apache.camel.Processor;
031import org.apache.camel.spi.ExchangeIdempotentRepository;
032import org.apache.camel.spi.IdAware;
033import org.apache.camel.spi.IdempotentRepository;
034import org.apache.camel.spi.Synchronization;
035import org.apache.camel.support.ServiceSupport;
036import org.apache.camel.util.AsyncProcessorConverterHelper;
037import org.apache.camel.util.AsyncProcessorHelper;
038import org.apache.camel.util.ServiceHelper;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * An implementation of the <a
044 * href="http://camel.apache.org/idempotent-consumer.html">Idempotent Consumer</a> pattern.
045 * <p/>
046 * This implementation supports idempotent repositories implemented as
047 * <ul>
048 *     <li>IdempotentRepository</li>
049 *     <li>ExchangeIdempotentRepository</li>
050 * </ul>
051 *
052 * @see org.apache.camel.spi.IdempotentRepository
053 * @see org.apache.camel.spi.ExchangeIdempotentRepository
054 */
055public class IdempotentConsumer extends ServiceSupport implements CamelContextAware, AsyncProcessor, Navigate<Processor>, IdAware {
056    private static final Logger LOG = LoggerFactory.getLogger(IdempotentConsumer.class);
057    private CamelContext camelContext;
058    private String id;
059    private final Expression messageIdExpression;
060    private final AsyncProcessor processor;
061    private final IdempotentRepository<String> idempotentRepository;
062    private final boolean eager;
063    private final boolean completionEager;
064    private final boolean skipDuplicate;
065    private final boolean removeOnFailure;
066    private final AtomicLong duplicateMessageCount = new AtomicLong();
067
068    public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository,
069                              boolean eager, boolean completionEager, boolean skipDuplicate, boolean removeOnFailure, Processor processor) {
070        this.messageIdExpression = messageIdExpression;
071        this.idempotentRepository = idempotentRepository;
072        this.eager = eager;
073        this.completionEager = completionEager;
074        this.skipDuplicate = skipDuplicate;
075        this.removeOnFailure = removeOnFailure;
076        this.processor = AsyncProcessorConverterHelper.convert(processor);
077    }
078
079    @Override
080    public String toString() {
081        return "IdempotentConsumer[" + messageIdExpression + " -> " + processor + "]";
082    }
083
084    @Override
085    public CamelContext getCamelContext() {
086        return camelContext;
087    }
088
089    @Override
090    public void setCamelContext(CamelContext camelContext) {
091        this.camelContext = camelContext;
092    }
093
094    public String getId() {
095        return id;
096    }
097
098    public void setId(String id) {
099        this.id = id;
100    }
101
102    public void process(Exchange exchange) throws Exception {
103        AsyncProcessorHelper.process(this, exchange);
104    }
105
106    public boolean process(final Exchange exchange, final AsyncCallback callback) {
107        final AsyncCallback target;
108
109        final String messageId;
110        try {
111            messageId = messageIdExpression.evaluate(exchange, String.class);
112            if (messageId == null) {
113                exchange.setException(new NoMessageIdException(exchange, messageIdExpression));
114                callback.done(true);
115                return true;
116            }
117        } catch (Exception e) {
118            exchange.setException(e);
119            callback.done(true);
120            return true;
121        }
122
123        try {
124            boolean newKey;
125            if (eager) {
126                // add the key to the repository
127                if (idempotentRepository instanceof ExchangeIdempotentRepository) {
128                    newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).add(exchange, messageId);
129                } else {
130                    newKey = idempotentRepository.add(messageId);
131                }
132            } else {
133                // check if we already have the key
134                if (idempotentRepository instanceof ExchangeIdempotentRepository) {
135                    newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).contains(exchange, messageId);
136                } else {
137                    newKey = !idempotentRepository.contains(messageId);
138                }
139            }
140
141            if (!newKey) {
142                // mark the exchange as duplicate
143                exchange.setProperty(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE);
144
145                // we already have this key so its a duplicate message
146                onDuplicate(exchange, messageId);
147
148                if (skipDuplicate) {
149                    // if we should skip duplicate then we are done
150                    LOG.debug("Ignoring duplicate message with id: {} for exchange: {}", messageId, exchange);
151                    callback.done(true);
152                    return true;
153                }
154            }
155
156            final Synchronization onCompletion = new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure);
157            target = new IdempotentConsumerCallback(exchange, onCompletion, callback, completionEager);
158            if (!completionEager) {
159                // the scope is to do the idempotent completion work as an unit of work on the exchange when its done being routed
160                exchange.addOnCompletion(onCompletion);
161            }
162        } catch (Exception e) {
163            exchange.setException(e);
164            callback.done(true);
165            return true;
166        }
167
168        // process the exchange
169        return processor.process(exchange, target);
170    }
171
172    public List<Processor> next() {
173        if (!hasNext()) {
174            return null;
175        }
176        List<Processor> answer = new ArrayList<Processor>(1);
177        answer.add(processor);
178        return answer;
179    }
180
181    public boolean hasNext() {
182        return processor != null;
183    }
184
185    // Properties
186    // -------------------------------------------------------------------------
187    public Expression getMessageIdExpression() {
188        return messageIdExpression;
189    }
190
191    public IdempotentRepository<String> getIdempotentRepository() {
192        return idempotentRepository;
193    }
194
195    public Processor getProcessor() {
196        return processor;
197    }
198
199    public long getDuplicateMessageCount() {
200        return duplicateMessageCount.get();
201    }
202
203    // Implementation methods
204    // -------------------------------------------------------------------------
205
206    protected void doStart() throws Exception {
207        ServiceHelper.startServices(processor, idempotentRepository);
208        if (!camelContext.hasService(idempotentRepository)) {
209            camelContext.addService(idempotentRepository);
210        }
211    }
212
213    protected void doStop() throws Exception {
214        ServiceHelper.stopServices(processor, idempotentRepository);
215    }
216
217    @Override
218    protected void doShutdown() throws Exception {
219        ServiceHelper.stopAndShutdownServices(processor, idempotentRepository);
220        camelContext.removeService(idempotentRepository);
221    }
222
223    public boolean isEager() {
224        return eager;
225    }
226
227    public boolean isCompletionEager() {
228        return completionEager;
229    }
230
231    public boolean isSkipDuplicate() {
232        return skipDuplicate;
233    }
234
235    public boolean isRemoveOnFailure() {
236        return removeOnFailure;
237    }
238
239    /**
240     * Resets the duplicate message counter to <code>0L</code>.
241     */
242    public void resetDuplicateMessageCount() {
243        duplicateMessageCount.set(0L);
244    }
245
246    private void onDuplicate(Exchange exchange, String messageId) {
247        duplicateMessageCount.incrementAndGet();
248
249        onDuplicateMessage(exchange, messageId);
250    }
251
252    /**
253     * Clear the idempotent repository
254     */
255    public void clear() {
256        idempotentRepository.clear();
257    }
258
259    /**
260     * A strategy method to allow derived classes to overload the behaviour of
261     * processing a duplicate message
262     *
263     * @param exchange  the exchange
264     * @param messageId the message ID of this exchange
265     */
266    protected void onDuplicateMessage(Exchange exchange, String messageId) {
267        // noop
268    }
269
270    /**
271     * {@link org.apache.camel.AsyncCallback} that is invoked when the idempotent consumer block ends
272     */
273    private static class IdempotentConsumerCallback implements AsyncCallback {
274        private final Exchange exchange;
275        private final Synchronization onCompletion;
276        private final AsyncCallback callback;
277        private final boolean completionEager;
278
279        IdempotentConsumerCallback(Exchange exchange, Synchronization onCompletion, AsyncCallback callback, boolean completionEager) {
280            this.exchange = exchange;
281            this.onCompletion = onCompletion;
282            this.callback = callback;
283            this.completionEager = completionEager;
284        }
285
286        @Override
287        public void done(boolean doneSync) {
288            try {
289                if (completionEager) {
290                    if (exchange.isFailed()) {
291                        onCompletion.onFailure(exchange);
292                    } else {
293                        onCompletion.onComplete(exchange);
294                    }
295                }
296                // if completion is not eager then the onCompletion is invoked as part of the UoW of the Exchange
297            } finally {
298                callback.done(doneSync);
299            }
300        }
301
302        @Override
303        public String toString() {
304            return "IdempotentConsumerCallback";
305        }
306    }
307}