001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.List; 021 022import javax.xml.bind.annotation.XmlAccessType; 023import javax.xml.bind.annotation.XmlAccessorType; 024import javax.xml.bind.annotation.XmlElement; 025import javax.xml.bind.annotation.XmlElementRef; 026import javax.xml.bind.annotation.XmlElements; 027import javax.xml.bind.annotation.XmlRootElement; 028import javax.xml.bind.annotation.XmlTransient; 029 030import org.apache.camel.Expression; 031import org.apache.camel.Processor; 032import org.apache.camel.model.config.BatchResequencerConfig; 033import org.apache.camel.model.config.ResequencerConfig; 034import org.apache.camel.model.config.StreamResequencerConfig; 035import org.apache.camel.model.language.ExpressionDefinition; 036import org.apache.camel.processor.CamelInternalProcessor; 037import org.apache.camel.processor.Resequencer; 038import org.apache.camel.processor.StreamResequencer; 039import org.apache.camel.processor.resequencer.ExpressionResultComparator; 040import org.apache.camel.spi.Metadata; 041import org.apache.camel.spi.RouteContext; 042import org.apache.camel.util.CamelContextHelper; 043import org.apache.camel.util.ObjectHelper; 044 045/** 046 * Resequences (re-order) messages based on an expression 047 * 048 * @version 049 */ 050@Metadata(label = "eip,routing") 051@XmlRootElement(name = "resequence") 052@XmlAccessorType(XmlAccessType.FIELD) 053public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefinition> { 054 @Metadata(required = "false") 055 @XmlElements({ 056 @XmlElement(name = "batch-config", type = BatchResequencerConfig.class), 057 @XmlElement(name = "stream-config", type = StreamResequencerConfig.class)} 058 ) 059 private ResequencerConfig resequencerConfig; 060 @XmlTransient 061 private BatchResequencerConfig batchConfig; 062 @XmlTransient 063 private StreamResequencerConfig streamConfig; 064 @XmlElementRef @Metadata(required = "true") 065 private ExpressionDefinition expression; 066 @XmlElementRef 067 private List<ProcessorDefinition<?>> outputs = new ArrayList<>(); 068 069 public ResequenceDefinition() { 070 } 071 072 public ResequenceDefinition(Expression expression) { 073 if (expression != null) { 074 setExpression(ExpressionNodeHelper.toExpressionDefinition(expression)); 075 } 076 } 077 078 public List<ProcessorDefinition<?>> getOutputs() { 079 return outputs; 080 } 081 082 public void setOutputs(List<ProcessorDefinition<?>> outputs) { 083 this.outputs = outputs; 084 } 085 086 @Override 087 public boolean isOutputSupported() { 088 return true; 089 } 090 091 // Fluent API 092 // ------------------------------------------------------------------------- 093 /** 094 * Configures the stream-based resequencing algorithm using the default 095 * configuration. 096 * 097 * @return the builder 098 */ 099 public ResequenceDefinition stream() { 100 return stream(StreamResequencerConfig.getDefault()); 101 } 102 103 /** 104 * Configures the batch-based resequencing algorithm using the default 105 * configuration. 106 * 107 * @return the builder 108 */ 109 public ResequenceDefinition batch() { 110 return batch(BatchResequencerConfig.getDefault()); 111 } 112 113 /** 114 * Configures the stream-based resequencing algorithm using the given 115 * {@link StreamResequencerConfig}. 116 * 117 * @param config the config 118 * @return the builder 119 */ 120 public ResequenceDefinition stream(StreamResequencerConfig config) { 121 this.streamConfig = config; 122 this.batchConfig = null; 123 return this; 124 } 125 126 /** 127 * Configures the batch-based resequencing algorithm using the given 128 * {@link BatchResequencerConfig}. 129 * 130 * @param config the config 131 * @return the builder 132 */ 133 public ResequenceDefinition batch(BatchResequencerConfig config) { 134 this.batchConfig = config; 135 this.streamConfig = null; 136 return this; 137 } 138 139 /** 140 * Sets the timeout 141 * @param timeout timeout in millis 142 * @return the builder 143 */ 144 public ResequenceDefinition timeout(long timeout) { 145 if (streamConfig != null) { 146 streamConfig.setTimeout(timeout); 147 } else { 148 // initialize batch mode as its default mode 149 if (batchConfig == null) { 150 batch(); 151 } 152 batchConfig.setBatchTimeout(timeout); 153 } 154 return this; 155 } 156 157 /** 158 * Sets the interval in milli seconds the stream resequencer will at most wait 159 * while waiting for condition of being able to deliver. 160 * 161 * @param deliveryAttemptInterval interval in millis 162 * @return the builder 163 */ 164 public ResequenceDefinition deliveryAttemptInterval(long deliveryAttemptInterval) { 165 if (streamConfig == null) { 166 throw new IllegalStateException("deliveryAttemptInterval() only supported for stream resequencer"); 167 } 168 streamConfig.setDeliveryAttemptInterval(deliveryAttemptInterval); 169 return this; 170 } 171 172 /** 173 * Sets the rejectOld flag to throw an error when a message older than the last delivered message is processed 174 * @return the builder 175 */ 176 public ResequenceDefinition rejectOld() { 177 if (streamConfig == null) { 178 throw new IllegalStateException("rejectOld() only supported for stream resequencer"); 179 } 180 streamConfig.setRejectOld(true); 181 return this; 182 } 183 184 /** 185 * Sets the in batch size for number of exchanges received 186 * @param batchSize the batch size 187 * @return the builder 188 */ 189 public ResequenceDefinition size(int batchSize) { 190 if (streamConfig != null) { 191 throw new IllegalStateException("size() only supported for batch resequencer"); 192 } 193 // initialize batch mode as its default mode 194 if (batchConfig == null) { 195 batch(); 196 } 197 batchConfig.setBatchSize(batchSize); 198 return this; 199 } 200 201 /** 202 * Sets the capacity for the stream resequencer 203 * 204 * @param capacity the capacity 205 * @return the builder 206 */ 207 public ResequenceDefinition capacity(int capacity) { 208 if (streamConfig == null) { 209 throw new IllegalStateException("capacity() only supported for stream resequencer"); 210 } 211 streamConfig.setCapacity(capacity); 212 return this; 213 214 } 215 216 /** 217 * Enables duplicates for the batch resequencer mode 218 * @return the builder 219 */ 220 public ResequenceDefinition allowDuplicates() { 221 if (streamConfig != null) { 222 throw new IllegalStateException("allowDuplicates() only supported for batch resequencer"); 223 } 224 // initialize batch mode as its default mode 225 if (batchConfig == null) { 226 batch(); 227 } 228 batchConfig.setAllowDuplicates(true); 229 return this; 230 } 231 232 /** 233 * Enables reverse mode for the batch resequencer mode. 234 * <p/> 235 * This means the expression for determine the sequence order will be reversed. 236 * Can be used for Z..A or 9..0 ordering. 237 * 238 * @return the builder 239 */ 240 public ResequenceDefinition reverse() { 241 if (streamConfig != null) { 242 throw new IllegalStateException("reverse() only supported for batch resequencer"); 243 } 244 // initialize batch mode as its default mode 245 if (batchConfig == null) { 246 batch(); 247 } 248 batchConfig.setReverse(true); 249 return this; 250 } 251 252 /** 253 * If an incoming {@link org.apache.camel.Exchange} is invalid, then it will be ignored. 254 * 255 * @return builder 256 */ 257 public ResequenceDefinition ignoreInvalidExchanges() { 258 if (streamConfig != null) { 259 streamConfig.setIgnoreInvalidExchanges(true); 260 } else { 261 // initialize batch mode as its default mode 262 if (batchConfig == null) { 263 batch(); 264 } 265 batchConfig.setIgnoreInvalidExchanges(true); 266 } 267 return this; 268 } 269 270 /** 271 * Sets the comparator to use for stream resequencer 272 * 273 * @param comparator the comparator 274 * @return the builder 275 */ 276 public ResequenceDefinition comparator(ExpressionResultComparator comparator) { 277 if (streamConfig == null) { 278 throw new IllegalStateException("comparator() only supported for stream resequencer"); 279 } 280 streamConfig.setComparator(comparator); 281 return this; 282 } 283 284 @Override 285 public String toString() { 286 return "Resequencer[" + getExpression() + " -> " + getOutputs() + "]"; 287 } 288 289 @Override 290 public String getShortName() { 291 return "resequence"; 292 } 293 294 @Override 295 public String getLabel() { 296 return "resequencer[" + (getExpression() != null ? getExpression().getLabel() : "") + "]"; 297 } 298 299 public ResequencerConfig getResequencerConfig() { 300 return resequencerConfig; 301 } 302 303 /** 304 * To configure the resequencer in using either batch or stream configuration. Will by default use batch configuration. 305 */ 306 public void setResequencerConfig(ResequencerConfig resequencerConfig) { 307 this.resequencerConfig = resequencerConfig; 308 } 309 310 public BatchResequencerConfig getBatchConfig() { 311 if (batchConfig == null && resequencerConfig != null && resequencerConfig instanceof BatchResequencerConfig) { 312 return (BatchResequencerConfig) resequencerConfig; 313 } 314 return batchConfig; 315 } 316 317 public StreamResequencerConfig getStreamConfig() { 318 if (streamConfig == null && resequencerConfig != null && resequencerConfig instanceof StreamResequencerConfig) { 319 return (StreamResequencerConfig) resequencerConfig; 320 } 321 return streamConfig; 322 } 323 324 public void setBatchConfig(BatchResequencerConfig batchConfig) { 325 this.batchConfig = batchConfig; 326 } 327 328 public void setStreamConfig(StreamResequencerConfig streamConfig) { 329 this.streamConfig = streamConfig; 330 } 331 332 public ExpressionDefinition getExpression() { 333 return expression; 334 } 335 336 /** 337 * Expression to use for re-ordering the messages, such as a header with a sequence number 338 */ 339 public void setExpression(ExpressionDefinition expression) { 340 this.expression = expression; 341 } 342 343 @Override 344 public Processor createProcessor(RouteContext routeContext) throws Exception { 345 // if configured from XML then streamConfig has been set with the configuration 346 if (resequencerConfig != null) { 347 if (resequencerConfig instanceof StreamResequencerConfig) { 348 streamConfig = (StreamResequencerConfig) resequencerConfig; 349 } else { 350 batchConfig = (BatchResequencerConfig) resequencerConfig; 351 } 352 } 353 354 if (streamConfig != null) { 355 return createStreamResequencer(routeContext, streamConfig); 356 } else { 357 if (batchConfig == null) { 358 // default as batch mode 359 batch(); 360 } 361 return createBatchResequencer(routeContext, batchConfig); 362 } 363 } 364 365 /** 366 * Creates a batch {@link Resequencer} instance applying the given <code>config</code>. 367 * 368 * @param routeContext route context. 369 * @param config batch resequencer configuration. 370 * @return the configured batch resequencer. 371 * @throws Exception can be thrown 372 */ 373 @SuppressWarnings("deprecation") 374 protected Resequencer createBatchResequencer(RouteContext routeContext, 375 BatchResequencerConfig config) throws Exception { 376 Processor processor = this.createChildProcessor(routeContext, true); 377 Expression expression = getExpression().createExpression(routeContext); 378 379 // and wrap in unit of work 380 CamelInternalProcessor internal = new CamelInternalProcessor(processor); 381 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); 382 383 ObjectHelper.notNull(config, "config", this); 384 ObjectHelper.notNull(expression, "expression", this); 385 386 boolean isReverse = config.getReverse() != null && config.getReverse(); 387 boolean isAllowDuplicates = config.getAllowDuplicates() != null && config.getAllowDuplicates(); 388 389 Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), internal, expression, isAllowDuplicates, isReverse); 390 resequencer.setBatchSize(config.getBatchSize()); 391 resequencer.setBatchTimeout(config.getBatchTimeout()); 392 resequencer.setReverse(isReverse); 393 resequencer.setAllowDuplicates(isAllowDuplicates); 394 if (config.getIgnoreInvalidExchanges() != null) { 395 resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges()); 396 } 397 return resequencer; 398 } 399 400 /** 401 * Creates a {@link StreamResequencer} instance applying the given <code>config</code>. 402 * 403 * @param routeContext route context. 404 * @param config stream resequencer configuration. 405 * @return the configured stream resequencer. 406 * @throws Exception can be thrwon 407 */ 408 protected StreamResequencer createStreamResequencer(RouteContext routeContext, 409 StreamResequencerConfig config) throws Exception { 410 Processor processor = this.createChildProcessor(routeContext, true); 411 Expression expression = getExpression().createExpression(routeContext); 412 413 CamelInternalProcessor internal = new CamelInternalProcessor(processor); 414 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); 415 416 ObjectHelper.notNull(config, "config", this); 417 ObjectHelper.notNull(expression, "expression", this); 418 419 ExpressionResultComparator comparator; 420 if (config.getComparatorRef() != null) { 421 comparator = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), config.getComparatorRef(), ExpressionResultComparator.class); 422 } else { 423 comparator = config.getComparator(); 424 } 425 comparator.setExpression(expression); 426 427 StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), internal, comparator, expression); 428 resequencer.setTimeout(config.getTimeout()); 429 if (config.getDeliveryAttemptInterval() != null) { 430 resequencer.setDeliveryAttemptInterval(config.getDeliveryAttemptInterval()); 431 } 432 resequencer.setCapacity(config.getCapacity()); 433 resequencer.setRejectOld(config.getRejectOld()); 434 if (config.getIgnoreInvalidExchanges() != null) { 435 resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges()); 436 } 437 return resequencer; 438 } 439 440}