001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor.idempotent; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.atomic.AtomicLong; 022 023import org.apache.camel.AsyncCallback; 024import org.apache.camel.AsyncProcessor; 025import org.apache.camel.CamelContext; 026import org.apache.camel.CamelContextAware; 027import org.apache.camel.Exchange; 028import org.apache.camel.Expression; 029import org.apache.camel.Navigate; 030import org.apache.camel.Processor; 031import org.apache.camel.spi.ExchangeIdempotentRepository; 032import org.apache.camel.spi.IdAware; 033import org.apache.camel.spi.IdempotentRepository; 034import org.apache.camel.spi.Synchronization; 035import org.apache.camel.support.ServiceSupport; 036import org.apache.camel.util.AsyncProcessorConverterHelper; 037import org.apache.camel.util.AsyncProcessorHelper; 038import org.apache.camel.util.ServiceHelper; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * An implementation of the <a 044 * href="http://camel.apache.org/idempotent-consumer.html">Idempotent Consumer</a> pattern. 045 * <p/> 046 * This implementation supports idempotent repositories implemented as 047 * <ul> 048 * <li>IdempotentRepository</li> 049 * <li>ExchangeIdempotentRepository</li> 050 * </ul> 051 * 052 * @see org.apache.camel.spi.IdempotentRepository 053 * @see org.apache.camel.spi.ExchangeIdempotentRepository 054 */ 055public class IdempotentConsumer extends ServiceSupport implements CamelContextAware, AsyncProcessor, Navigate<Processor>, IdAware { 056 private static final Logger LOG = LoggerFactory.getLogger(IdempotentConsumer.class); 057 private CamelContext camelContext; 058 private String id; 059 private final Expression messageIdExpression; 060 private final AsyncProcessor processor; 061 private final IdempotentRepository<String> idempotentRepository; 062 private final boolean eager; 063 private final boolean completionEager; 064 private final boolean skipDuplicate; 065 private final boolean removeOnFailure; 066 private final AtomicLong duplicateMessageCount = new AtomicLong(); 067 068 public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository, 069 boolean eager, boolean completionEager, boolean skipDuplicate, boolean removeOnFailure, Processor processor) { 070 this.messageIdExpression = messageIdExpression; 071 this.idempotentRepository = idempotentRepository; 072 this.eager = eager; 073 this.completionEager = completionEager; 074 this.skipDuplicate = skipDuplicate; 075 this.removeOnFailure = removeOnFailure; 076 this.processor = AsyncProcessorConverterHelper.convert(processor); 077 } 078 079 @Override 080 public String toString() { 081 return "IdempotentConsumer[" + messageIdExpression + " -> " + processor + "]"; 082 } 083 084 @Override 085 public CamelContext getCamelContext() { 086 return camelContext; 087 } 088 089 @Override 090 public void setCamelContext(CamelContext camelContext) { 091 this.camelContext = camelContext; 092 } 093 094 public String getId() { 095 return id; 096 } 097 098 public void setId(String id) { 099 this.id = id; 100 } 101 102 public void process(Exchange exchange) throws Exception { 103 AsyncProcessorHelper.process(this, exchange); 104 } 105 106 public boolean process(final Exchange exchange, final AsyncCallback callback) { 107 final AsyncCallback target; 108 109 final String messageId; 110 try { 111 messageId = messageIdExpression.evaluate(exchange, String.class); 112 if (messageId == null) { 113 exchange.setException(new NoMessageIdException(exchange, messageIdExpression)); 114 callback.done(true); 115 return true; 116 } 117 } catch (Exception e) { 118 exchange.setException(e); 119 callback.done(true); 120 return true; 121 } 122 123 try { 124 boolean newKey; 125 if (eager) { 126 // add the key to the repository 127 if (idempotentRepository instanceof ExchangeIdempotentRepository) { 128 newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).add(exchange, messageId); 129 } else { 130 newKey = idempotentRepository.add(messageId); 131 } 132 } else { 133 // check if we already have the key 134 if (idempotentRepository instanceof ExchangeIdempotentRepository) { 135 newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).contains(exchange, messageId); 136 } else { 137 newKey = !idempotentRepository.contains(messageId); 138 } 139 } 140 141 if (!newKey) { 142 // mark the exchange as duplicate 143 exchange.setProperty(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE); 144 145 // we already have this key so its a duplicate message 146 onDuplicate(exchange, messageId); 147 148 if (skipDuplicate) { 149 // if we should skip duplicate then we are done 150 LOG.debug("Ignoring duplicate message with id: {} for exchange: {}", messageId, exchange); 151 callback.done(true); 152 return true; 153 } 154 } 155 156 final Synchronization onCompletion = new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure); 157 target = new IdempotentConsumerCallback(exchange, onCompletion, callback, completionEager); 158 if (!completionEager) { 159 // the scope is to do the idempotent completion work as an unit of work on the exchange when its done being routed 160 exchange.addOnCompletion(onCompletion); 161 } 162 } catch (Exception e) { 163 exchange.setException(e); 164 callback.done(true); 165 return true; 166 } 167 168 // process the exchange 169 return processor.process(exchange, target); 170 } 171 172 public List<Processor> next() { 173 if (!hasNext()) { 174 return null; 175 } 176 List<Processor> answer = new ArrayList<Processor>(1); 177 answer.add(processor); 178 return answer; 179 } 180 181 public boolean hasNext() { 182 return processor != null; 183 } 184 185 // Properties 186 // ------------------------------------------------------------------------- 187 public Expression getMessageIdExpression() { 188 return messageIdExpression; 189 } 190 191 public IdempotentRepository<String> getIdempotentRepository() { 192 return idempotentRepository; 193 } 194 195 public Processor getProcessor() { 196 return processor; 197 } 198 199 public long getDuplicateMessageCount() { 200 return duplicateMessageCount.get(); 201 } 202 203 // Implementation methods 204 // ------------------------------------------------------------------------- 205 206 protected void doStart() throws Exception { 207 ServiceHelper.startServices(processor, idempotentRepository); 208 if (!camelContext.hasService(idempotentRepository)) { 209 camelContext.addService(idempotentRepository); 210 } 211 } 212 213 protected void doStop() throws Exception { 214 ServiceHelper.stopServices(processor, idempotentRepository); 215 } 216 217 @Override 218 protected void doShutdown() throws Exception { 219 ServiceHelper.stopAndShutdownServices(processor, idempotentRepository); 220 camelContext.removeService(idempotentRepository); 221 } 222 223 public boolean isEager() { 224 return eager; 225 } 226 227 public boolean isCompletionEager() { 228 return completionEager; 229 } 230 231 public boolean isSkipDuplicate() { 232 return skipDuplicate; 233 } 234 235 public boolean isRemoveOnFailure() { 236 return removeOnFailure; 237 } 238 239 /** 240 * Resets the duplicate message counter to <code>0L</code>. 241 */ 242 public void resetDuplicateMessageCount() { 243 duplicateMessageCount.set(0L); 244 } 245 246 private void onDuplicate(Exchange exchange, String messageId) { 247 duplicateMessageCount.incrementAndGet(); 248 249 onDuplicateMessage(exchange, messageId); 250 } 251 252 /** 253 * Clear the idempotent repository 254 */ 255 public void clear() { 256 idempotentRepository.clear(); 257 } 258 259 /** 260 * A strategy method to allow derived classes to overload the behaviour of 261 * processing a duplicate message 262 * 263 * @param exchange the exchange 264 * @param messageId the message ID of this exchange 265 */ 266 protected void onDuplicateMessage(Exchange exchange, String messageId) { 267 // noop 268 } 269 270 /** 271 * {@link org.apache.camel.AsyncCallback} that is invoked when the idempotent consumer block ends 272 */ 273 private static class IdempotentConsumerCallback implements AsyncCallback { 274 private final Exchange exchange; 275 private final Synchronization onCompletion; 276 private final AsyncCallback callback; 277 private final boolean completionEager; 278 279 IdempotentConsumerCallback(Exchange exchange, Synchronization onCompletion, AsyncCallback callback, boolean completionEager) { 280 this.exchange = exchange; 281 this.onCompletion = onCompletion; 282 this.callback = callback; 283 this.completionEager = completionEager; 284 } 285 286 @Override 287 public void done(boolean doneSync) { 288 try { 289 if (completionEager) { 290 if (exchange.isFailed()) { 291 onCompletion.onFailure(exchange); 292 } else { 293 onCompletion.onComplete(exchange); 294 } 295 } 296 // if completion is not eager then the onCompletion is invoked as part of the UoW of the Exchange 297 } finally { 298 callback.done(doneSync); 299 } 300 } 301 302 @Override 303 public String toString() { 304 return "IdempotentConsumerCallback"; 305 } 306 } 307}