001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor.idempotent; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.atomic.AtomicLong; 022 023import org.apache.camel.AsyncCallback; 024import org.apache.camel.AsyncProcessor; 025import org.apache.camel.CamelContext; 026import org.apache.camel.CamelContextAware; 027import org.apache.camel.Exchange; 028import org.apache.camel.Expression; 029import org.apache.camel.Navigate; 030import org.apache.camel.Processor; 031import org.apache.camel.spi.ExchangeIdempotentRepository; 032import org.apache.camel.spi.IdAware; 033import org.apache.camel.spi.IdempotentRepository; 034import org.apache.camel.spi.Synchronization; 035import org.apache.camel.support.ServiceSupport; 036import org.apache.camel.util.AsyncProcessorConverterHelper; 037import org.apache.camel.util.AsyncProcessorHelper; 038import org.apache.camel.util.ServiceHelper; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * An implementation of the <a 044 * href="http://camel.apache.org/idempotent-consumer.html">Idempotent Consumer</a> pattern. 045 * <p/> 046 * This implementation supports idempotent repositories implemented as 047 * <ul> 048 * <li>IdempotentRepository</li> 049 * <li>ExchangeIdempotentRepository</li> 050 * </ul> 051 * 052 * @see org.apache.camel.spi.IdempotentRepository 053 * @see org.apache.camel.spi.ExchangeIdempotentRepository 054 */ 055public class IdempotentConsumer extends ServiceSupport implements CamelContextAware, AsyncProcessor, Navigate<Processor>, IdAware { 056 private static final Logger LOG = LoggerFactory.getLogger(IdempotentConsumer.class); 057 private CamelContext camelContext; 058 private String id; 059 private final Expression messageIdExpression; 060 private final AsyncProcessor processor; 061 private final IdempotentRepository<String> idempotentRepository; 062 private final boolean eager; 063 private final boolean completionEager; 064 private final boolean skipDuplicate; 065 private final boolean removeOnFailure; 066 private final AtomicLong duplicateMessageCount = new AtomicLong(); 067 068 public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository, 069 boolean eager, boolean completionEager, boolean skipDuplicate, boolean removeOnFailure, Processor processor) { 070 this.messageIdExpression = messageIdExpression; 071 this.idempotentRepository = idempotentRepository; 072 this.eager = eager; 073 this.completionEager = completionEager; 074 this.skipDuplicate = skipDuplicate; 075 this.removeOnFailure = removeOnFailure; 076 this.processor = AsyncProcessorConverterHelper.convert(processor); 077 } 078 079 @Override 080 public String toString() { 081 return "IdempotentConsumer[" + messageIdExpression + " -> " + processor + "]"; 082 } 083 084 @Override 085 public CamelContext getCamelContext() { 086 return camelContext; 087 } 088 089 @Override 090 public void setCamelContext(CamelContext camelContext) { 091 this.camelContext = camelContext; 092 } 093 094 public String getId() { 095 return id; 096 } 097 098 public void setId(String id) { 099 this.id = id; 100 } 101 102 public void process(Exchange exchange) throws Exception { 103 AsyncProcessorHelper.process(this, exchange); 104 } 105 106 public boolean process(final Exchange exchange, final AsyncCallback callback) { 107 final AsyncCallback target; 108 109 final String messageId; 110 try { 111 messageId = messageIdExpression.evaluate(exchange, String.class); 112 if (messageId == null) { 113 exchange.setException(new NoMessageIdException(exchange, messageIdExpression)); 114 callback.done(true); 115 return true; 116 } 117 } catch (Exception e) { 118 exchange.setException(e); 119 callback.done(true); 120 return true; 121 } 122 123 try { 124 boolean newKey; 125 if (eager) { 126 // add the key to the repository 127 if (idempotentRepository instanceof ExchangeIdempotentRepository) { 128 newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).add(exchange, messageId); 129 } else { 130 newKey = idempotentRepository.add(messageId); 131 } 132 } else { 133 // check if we already have the key 134 if (idempotentRepository instanceof ExchangeIdempotentRepository) { 135 newKey = !((ExchangeIdempotentRepository<String>) idempotentRepository).contains(exchange, messageId); 136 } else { 137 newKey = !idempotentRepository.contains(messageId); 138 } 139 } 140 141 if (!newKey) { 142 // mark the exchange as duplicate 143 exchange.setProperty(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE); 144 145 // we already have this key so its a duplicate message 146 onDuplicate(exchange, messageId); 147 148 if (skipDuplicate) { 149 // if we should skip duplicate then we are done 150 LOG.debug("Ignoring duplicate message with id: {} for exchange: {}", messageId, exchange); 151 callback.done(true); 152 return true; 153 } 154 } 155 156 final Synchronization onCompletion = new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure); 157 target = new IdempotentConsumerCallback(exchange, onCompletion, callback, completionEager); 158 if (!completionEager) { 159 // the scope is to do the idempotent completion work as an unit of work on the exchange when its done being routed 160 exchange.addOnCompletion(onCompletion); 161 } 162 } catch (Exception e) { 163 exchange.setException(e); 164 callback.done(true); 165 return true; 166 } 167 168 // process the exchange 169 return processor.process(exchange, target); 170 } 171 172 public List<Processor> next() { 173 if (!hasNext()) { 174 return null; 175 } 176 List<Processor> answer = new ArrayList<Processor>(1); 177 answer.add(processor); 178 return answer; 179 } 180 181 public boolean hasNext() { 182 return processor != null; 183 } 184 185 // Properties 186 // ------------------------------------------------------------------------- 187 public Expression getMessageIdExpression() { 188 return messageIdExpression; 189 } 190 191 public IdempotentRepository<String> getIdempotentRepository() { 192 return idempotentRepository; 193 } 194 195 public Processor getProcessor() { 196 return processor; 197 } 198 199 public long getDuplicateMessageCount() { 200 return duplicateMessageCount.get(); 201 } 202 203 // Implementation methods 204 // ------------------------------------------------------------------------- 205 206 protected void doStart() throws Exception { 207 // must add before start so it will have CamelContext injected first 208 if (!camelContext.hasService(idempotentRepository)) { 209 camelContext.addService(idempotentRepository); 210 } 211 ServiceHelper.startServices(processor, idempotentRepository); 212 } 213 214 protected void doStop() throws Exception { 215 ServiceHelper.stopServices(processor, idempotentRepository); 216 } 217 218 @Override 219 protected void doShutdown() throws Exception { 220 ServiceHelper.stopAndShutdownServices(processor, idempotentRepository); 221 camelContext.removeService(idempotentRepository); 222 } 223 224 public boolean isEager() { 225 return eager; 226 } 227 228 public boolean isCompletionEager() { 229 return completionEager; 230 } 231 232 public boolean isSkipDuplicate() { 233 return skipDuplicate; 234 } 235 236 public boolean isRemoveOnFailure() { 237 return removeOnFailure; 238 } 239 240 /** 241 * Resets the duplicate message counter to <code>0L</code>. 242 */ 243 public void resetDuplicateMessageCount() { 244 duplicateMessageCount.set(0L); 245 } 246 247 private void onDuplicate(Exchange exchange, String messageId) { 248 duplicateMessageCount.incrementAndGet(); 249 250 onDuplicateMessage(exchange, messageId); 251 } 252 253 /** 254 * Clear the idempotent repository 255 */ 256 public void clear() { 257 idempotentRepository.clear(); 258 } 259 260 /** 261 * A strategy method to allow derived classes to overload the behaviour of 262 * processing a duplicate message 263 * 264 * @param exchange the exchange 265 * @param messageId the message ID of this exchange 266 */ 267 protected void onDuplicateMessage(Exchange exchange, String messageId) { 268 // noop 269 } 270 271 /** 272 * {@link org.apache.camel.AsyncCallback} that is invoked when the idempotent consumer block ends 273 */ 274 private static class IdempotentConsumerCallback implements AsyncCallback { 275 private final Exchange exchange; 276 private final Synchronization onCompletion; 277 private final AsyncCallback callback; 278 private final boolean completionEager; 279 280 IdempotentConsumerCallback(Exchange exchange, Synchronization onCompletion, AsyncCallback callback, boolean completionEager) { 281 this.exchange = exchange; 282 this.onCompletion = onCompletion; 283 this.callback = callback; 284 this.completionEager = completionEager; 285 } 286 287 @Override 288 public void done(boolean doneSync) { 289 try { 290 if (completionEager) { 291 if (exchange.isFailed()) { 292 onCompletion.onFailure(exchange); 293 } else { 294 onCompletion.onComplete(exchange); 295 } 296 } 297 // if completion is not eager then the onCompletion is invoked as part of the UoW of the Exchange 298 } finally { 299 callback.done(doneSync); 300 } 301 } 302 303 @Override 304 public String toString() { 305 return "IdempotentConsumerCallback"; 306 } 307 } 308}