001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import javax.xml.bind.annotation.XmlAccessType; 020import javax.xml.bind.annotation.XmlAccessorType; 021import javax.xml.bind.annotation.XmlAttribute; 022import javax.xml.bind.annotation.XmlRootElement; 023import javax.xml.bind.annotation.XmlTransient; 024 025import org.apache.camel.Expression; 026import org.apache.camel.Processor; 027import org.apache.camel.model.language.ExpressionDefinition; 028import org.apache.camel.processor.idempotent.IdempotentConsumer; 029import org.apache.camel.spi.IdempotentRepository; 030import org.apache.camel.spi.Metadata; 031import org.apache.camel.spi.RouteContext; 032import org.apache.camel.util.ObjectHelper; 033 034/** 035 * Filters out duplicate messages 036 */ 037@Metadata(label = "eip,routing") 038@XmlRootElement(name = "idempotentConsumer") 039@XmlAccessorType(XmlAccessType.FIELD) 040public class IdempotentConsumerDefinition extends ExpressionNode { 041 042 @XmlAttribute(required = true) 043 private String messageIdRepositoryRef; 044 @XmlAttribute @Metadata(defaultValue = "true") 045 private Boolean eager; 046 @XmlAttribute 047 private Boolean completionEager; 048 @XmlAttribute @Metadata(defaultValue = "true") 049 private Boolean skipDuplicate; 050 @XmlAttribute @Metadata(defaultValue = "true") 051 private Boolean removeOnFailure; 052 @XmlTransient 053 private IdempotentRepository<?> idempotentRepository; 054 055 public IdempotentConsumerDefinition() { 056 } 057 058 public IdempotentConsumerDefinition(Expression messageIdExpression, IdempotentRepository<?> idempotentRepository) { 059 super(messageIdExpression); 060 this.idempotentRepository = idempotentRepository; 061 } 062 063 @Override 064 public String toString() { 065 return "IdempotentConsumer[" + getExpression() + " -> " + getOutputs() + "]"; 066 } 067 068 @Override 069 public String getLabel() { 070 return "idempotentConsumer[" + getExpression() + "]"; 071 } 072 073 // Fluent API 074 //------------------------------------------------------------------------- 075 076 /** 077 * Sets the reference name of the message id repository 078 * 079 * @param messageIdRepositoryRef the reference name of message id repository 080 * @return builder 081 */ 082 public IdempotentConsumerDefinition messageIdRepositoryRef(String messageIdRepositoryRef) { 083 setMessageIdRepositoryRef(messageIdRepositoryRef); 084 return this; 085 } 086 087 /** 088 * Sets the the message id repository for the idempotent consumer 089 * 090 * @param idempotentRepository the repository instance of idempotent 091 * @return builder 092 */ 093 public IdempotentConsumerDefinition messageIdRepository(IdempotentRepository<?> idempotentRepository) { 094 setMessageIdRepository(idempotentRepository); 095 return this; 096 } 097 098 /** 099 * Sets whether to eagerly add the key to the idempotent repository or wait until the exchange 100 * is complete. Eager is default enabled. 101 * 102 * @param eager <tt>true</tt> to add the key before processing, <tt>false</tt> to wait until 103 * the exchange is complete. 104 * @return builder 105 */ 106 public IdempotentConsumerDefinition eager(boolean eager) { 107 setEager(eager); 108 return this; 109 } 110 111 /** 112 * Sets whether to complete the idempotent consumer eager or when the exchange is done. 113 * <p/> 114 * If this option is <tt>true</tt> to complete eager, then the idempotent consumer will trigger its completion 115 * when the exchange reached the end of the block of the idempotent consumer pattern. So if the exchange 116 * is continued routed after the block ends, then whatever happens there does not affect the state. 117 * <p/> 118 * If this option is <tt>false</tt> (default) to <b>not</b> complete eager, then the idempotent consumer 119 * will complete when the exchange is done being routed. So if the exchange is continued routed after the block ends, 120 * then whatever happens there <b>also</b> affect the state. 121 * For example if the exchange failed due to an exception, then the state of the idempotent consumer will be a rollback. 122 * 123 * @param completionEager whether to complete eager or complete when the exchange is done 124 * @return builder 125 */ 126 public IdempotentConsumerDefinition completionEager(boolean completionEager) { 127 setCompletionEager(completionEager); 128 return this; 129 } 130 131 /** 132 * Sets whether to remove or keep the key on failure. 133 * <p/> 134 * The default behavior is to remove the key on failure. 135 * 136 * @param removeOnFailure <tt>true</tt> to remove the key, <tt>false</tt> to keep the key 137 * if the exchange fails. 138 * @return builder 139 */ 140 public IdempotentConsumerDefinition removeOnFailure(boolean removeOnFailure) { 141 setRemoveOnFailure(removeOnFailure); 142 return this; 143 } 144 145 /** 146 * Sets whether to skip duplicates or not. 147 * <p/> 148 * The default behavior is to skip duplicates. 149 * <p/> 150 * A duplicate message would have the Exchange property {@link org.apache.camel.Exchange#DUPLICATE_MESSAGE} set 151 * to a {@link Boolean#TRUE} value. A none duplicate message will not have this property set. 152 * 153 * @param skipDuplicate <tt>true</tt> to skip duplicates, <tt>false</tt> to allow duplicates. 154 * @return builder 155 */ 156 public IdempotentConsumerDefinition skipDuplicate(boolean skipDuplicate) { 157 setSkipDuplicate(skipDuplicate); 158 return this; 159 } 160 161 /** 162 * Expression used to calculate the correlation key to use for duplicate check. 163 * The Exchange which has the same correlation key is regarded as a duplicate and will be rejected. 164 */ 165 @Override 166 public void setExpression(ExpressionDefinition expression) { 167 // override to include javadoc what the expression is used for 168 super.setExpression(expression); 169 } 170 171 public String getMessageIdRepositoryRef() { 172 return messageIdRepositoryRef; 173 } 174 175 public void setMessageIdRepositoryRef(String messageIdRepositoryRef) { 176 this.messageIdRepositoryRef = messageIdRepositoryRef; 177 } 178 179 public IdempotentRepository<?> getMessageIdRepository() { 180 return idempotentRepository; 181 } 182 183 public void setMessageIdRepository(IdempotentRepository<?> idempotentRepository) { 184 this.idempotentRepository = idempotentRepository; 185 } 186 187 public Boolean getEager() { 188 return eager; 189 } 190 191 public void setEager(Boolean eager) { 192 this.eager = eager; 193 } 194 195 public Boolean getSkipDuplicate() { 196 return skipDuplicate; 197 } 198 199 public void setSkipDuplicate(Boolean skipDuplicate) { 200 this.skipDuplicate = skipDuplicate; 201 } 202 203 public Boolean getRemoveOnFailure() { 204 return removeOnFailure; 205 } 206 207 public void setRemoveOnFailure(Boolean removeOnFailure) { 208 this.removeOnFailure = removeOnFailure; 209 } 210 211 public Boolean getCompletionEager() { 212 return completionEager; 213 } 214 215 public void setCompletionEager(Boolean completionEager) { 216 this.completionEager = completionEager; 217 } 218 219 @Override 220 public Processor createProcessor(RouteContext routeContext) throws Exception { 221 Processor childProcessor = this.createChildProcessor(routeContext, true); 222 223 IdempotentRepository<String> idempotentRepository = resolveMessageIdRepository(routeContext); 224 ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this); 225 226 Expression expression = getExpression().createExpression(routeContext); 227 228 // these boolean should be true by default 229 boolean eager = getEager() == null || getEager(); 230 boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate(); 231 boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure(); 232 233 // these boolean should be false by default 234 boolean completionEager = getCompletionEager() != null && getCompletionEager(); 235 236 return new IdempotentConsumer(expression, idempotentRepository, eager, completionEager, duplicate, remove, childProcessor); 237 } 238 239 /** 240 * Strategy method to resolve the {@link org.apache.camel.spi.IdempotentRepository} to use 241 * 242 * @param routeContext route context 243 * @return the repository 244 */ 245 @SuppressWarnings("unchecked") 246 protected <T> IdempotentRepository<T> resolveMessageIdRepository(RouteContext routeContext) { 247 if (messageIdRepositoryRef != null) { 248 idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class); 249 } 250 return (IdempotentRepository<T>)idempotentRepository; 251 } 252}