001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import javax.xml.bind.annotation.XmlAccessType; 020import javax.xml.bind.annotation.XmlAccessorType; 021import javax.xml.bind.annotation.XmlAttribute; 022import javax.xml.bind.annotation.XmlRootElement; 023import javax.xml.bind.annotation.XmlTransient; 024 025import org.apache.camel.Expression; 026import org.apache.camel.Processor; 027import org.apache.camel.model.language.ExpressionDefinition; 028import org.apache.camel.processor.idempotent.IdempotentConsumer; 029import org.apache.camel.spi.IdempotentRepository; 030import org.apache.camel.spi.Metadata; 031import org.apache.camel.spi.RouteContext; 032import org.apache.camel.util.ObjectHelper; 033 034/** 035 * Filters out duplicate messages 036 */ 037@Metadata(label = "eip,routing") 038@XmlRootElement(name = "idempotentConsumer") 039@XmlAccessorType(XmlAccessType.FIELD) 040public class IdempotentConsumerDefinition extends ExpressionNode { 041 042 @XmlAttribute(required = true) 043 private String messageIdRepositoryRef; 044 @XmlAttribute @Metadata(defaultValue = "true") 045 private Boolean eager; 046 @XmlAttribute 047 private Boolean completionEager; 048 @XmlAttribute @Metadata(defaultValue = "true") 049 private Boolean skipDuplicate; 050 @XmlAttribute @Metadata(defaultValue = "true") 051 private Boolean removeOnFailure; 052 @XmlTransient 053 private IdempotentRepository<?> idempotentRepository; 054 055 public IdempotentConsumerDefinition() { 056 } 057 058 public IdempotentConsumerDefinition(Expression messageIdExpression, IdempotentRepository<?> idempotentRepository) { 059 super(messageIdExpression); 060 this.idempotentRepository = idempotentRepository; 061 } 062 063 @Override 064 public String toString() { 065 return "IdempotentConsumer[" + getExpression() + " -> " + getOutputs() + "]"; 066 } 067 068 @Override 069 public String getShortName() { 070 return "idempotentConsumer"; 071 } 072 073 @Override 074 public String getLabel() { 075 return "idempotentConsumer[" + getExpression() + "]"; 076 } 077 078 // Fluent API 079 //------------------------------------------------------------------------- 080 081 /** 082 * Sets the reference name of the message id repository 083 * 084 * @param messageIdRepositoryRef the reference name of message id repository 085 * @return builder 086 */ 087 public IdempotentConsumerDefinition messageIdRepositoryRef(String messageIdRepositoryRef) { 088 setMessageIdRepositoryRef(messageIdRepositoryRef); 089 return this; 090 } 091 092 /** 093 * Sets the message id repository for the idempotent consumer 094 * 095 * @param idempotentRepository the repository instance of idempotent 096 * @return builder 097 */ 098 public IdempotentConsumerDefinition messageIdRepository(IdempotentRepository<?> idempotentRepository) { 099 setMessageIdRepository(idempotentRepository); 100 return this; 101 } 102 103 /** 104 * Sets whether to eagerly add the key to the idempotent repository or wait until the exchange 105 * is complete. Eager is default enabled. 106 * 107 * @param eager <tt>true</tt> to add the key before processing, <tt>false</tt> to wait until 108 * the exchange is complete. 109 * @return builder 110 */ 111 public IdempotentConsumerDefinition eager(boolean eager) { 112 setEager(eager); 113 return this; 114 } 115 116 /** 117 * Sets whether to complete the idempotent consumer eager or when the exchange is done. 118 * <p/> 119 * If this option is <tt>true</tt> to complete eager, then the idempotent consumer will trigger its completion 120 * when the exchange reached the end of the block of the idempotent consumer pattern. So if the exchange 121 * is continued routed after the block ends, then whatever happens there does not affect the state. 122 * <p/> 123 * If this option is <tt>false</tt> (default) to <b>not</b> complete eager, then the idempotent consumer 124 * will complete when the exchange is done being routed. So if the exchange is continued routed after the block ends, 125 * then whatever happens there <b>also</b> affect the state. 126 * For example if the exchange failed due to an exception, then the state of the idempotent consumer will be a rollback. 127 * 128 * @param completionEager whether to complete eager or complete when the exchange is done 129 * @return builder 130 */ 131 public IdempotentConsumerDefinition completionEager(boolean completionEager) { 132 setCompletionEager(completionEager); 133 return this; 134 } 135 136 /** 137 * Sets whether to remove or keep the key on failure. 138 * <p/> 139 * The default behavior is to remove the key on failure. 140 * 141 * @param removeOnFailure <tt>true</tt> to remove the key, <tt>false</tt> to keep the key 142 * if the exchange fails. 143 * @return builder 144 */ 145 public IdempotentConsumerDefinition removeOnFailure(boolean removeOnFailure) { 146 setRemoveOnFailure(removeOnFailure); 147 return this; 148 } 149 150 /** 151 * Sets whether to skip duplicates or not. 152 * <p/> 153 * The default behavior is to skip duplicates. 154 * <p/> 155 * A duplicate message would have the Exchange property {@link org.apache.camel.Exchange#DUPLICATE_MESSAGE} set 156 * to a {@link Boolean#TRUE} value. A none duplicate message will not have this property set. 157 * 158 * @param skipDuplicate <tt>true</tt> to skip duplicates, <tt>false</tt> to allow duplicates. 159 * @return builder 160 */ 161 public IdempotentConsumerDefinition skipDuplicate(boolean skipDuplicate) { 162 setSkipDuplicate(skipDuplicate); 163 return this; 164 } 165 166 /** 167 * Expression used to calculate the correlation key to use for duplicate check. 168 * The Exchange which has the same correlation key is regarded as a duplicate and will be rejected. 169 */ 170 @Override 171 public void setExpression(ExpressionDefinition expression) { 172 // override to include javadoc what the expression is used for 173 super.setExpression(expression); 174 } 175 176 public String getMessageIdRepositoryRef() { 177 return messageIdRepositoryRef; 178 } 179 180 public void setMessageIdRepositoryRef(String messageIdRepositoryRef) { 181 this.messageIdRepositoryRef = messageIdRepositoryRef; 182 } 183 184 public IdempotentRepository<?> getMessageIdRepository() { 185 return idempotentRepository; 186 } 187 188 public void setMessageIdRepository(IdempotentRepository<?> idempotentRepository) { 189 this.idempotentRepository = idempotentRepository; 190 } 191 192 public Boolean getEager() { 193 return eager; 194 } 195 196 public void setEager(Boolean eager) { 197 this.eager = eager; 198 } 199 200 public Boolean getSkipDuplicate() { 201 return skipDuplicate; 202 } 203 204 public void setSkipDuplicate(Boolean skipDuplicate) { 205 this.skipDuplicate = skipDuplicate; 206 } 207 208 public Boolean getRemoveOnFailure() { 209 return removeOnFailure; 210 } 211 212 public void setRemoveOnFailure(Boolean removeOnFailure) { 213 this.removeOnFailure = removeOnFailure; 214 } 215 216 public Boolean getCompletionEager() { 217 return completionEager; 218 } 219 220 public void setCompletionEager(Boolean completionEager) { 221 this.completionEager = completionEager; 222 } 223 224 @Override 225 public Processor createProcessor(RouteContext routeContext) throws Exception { 226 Processor childProcessor = this.createChildProcessor(routeContext, true); 227 228 IdempotentRepository<String> idempotentRepository = resolveMessageIdRepository(routeContext); 229 ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this); 230 231 Expression expression = getExpression().createExpression(routeContext); 232 233 // these boolean should be true by default 234 boolean eager = getEager() == null || getEager(); 235 boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate(); 236 boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure(); 237 238 // these boolean should be false by default 239 boolean completionEager = getCompletionEager() != null && getCompletionEager(); 240 241 return new IdempotentConsumer(expression, idempotentRepository, eager, completionEager, duplicate, remove, childProcessor); 242 } 243 244 /** 245 * Strategy method to resolve the {@link org.apache.camel.spi.IdempotentRepository} to use 246 * 247 * @param routeContext route context 248 * @return the repository 249 */ 250 @SuppressWarnings("unchecked") 251 protected <T> IdempotentRepository<T> resolveMessageIdRepository(RouteContext routeContext) { 252 if (messageIdRepositoryRef != null) { 253 idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class); 254 } 255 return (IdempotentRepository<T>)idempotentRepository; 256 } 257}