001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 import javax.xml.bind.annotation.XmlAccessType; 022 import javax.xml.bind.annotation.XmlAccessorType; 023 import javax.xml.bind.annotation.XmlElement; 024 import javax.xml.bind.annotation.XmlElementRef; 025 import javax.xml.bind.annotation.XmlElements; 026 import javax.xml.bind.annotation.XmlRootElement; 027 import javax.xml.bind.annotation.XmlTransient; 028 029 import org.apache.camel.Expression; 030 import org.apache.camel.Processor; 031 import org.apache.camel.model.config.BatchResequencerConfig; 032 import org.apache.camel.model.config.ResequencerConfig; 033 import org.apache.camel.model.config.StreamResequencerConfig; 034 import org.apache.camel.model.language.ExpressionDefinition; 035 import org.apache.camel.processor.Resequencer; 036 import org.apache.camel.processor.StreamResequencer; 037 import org.apache.camel.processor.resequencer.ExpressionResultComparator; 038 import org.apache.camel.spi.Required; 039 import org.apache.camel.spi.RouteContext; 040 import org.apache.camel.util.ObjectHelper; 041 042 /** 043 * Represents an XML <resequence/> element 044 * 045 * @version 046 */ 047 @XmlRootElement(name = "resequence") 048 @XmlAccessorType(XmlAccessType.FIELD) 049 public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefinition> { 050 @XmlElements({ 051 @XmlElement(required = false, name = "batch-config", type = BatchResequencerConfig.class), 052 @XmlElement(required = false, name = "stream-config", type = StreamResequencerConfig.class)} 053 ) 054 private ResequencerConfig resequencerConfig; 055 @XmlTransient 056 private BatchResequencerConfig batchConfig; 057 @XmlTransient 058 private StreamResequencerConfig streamConfig; 059 @XmlElementRef 060 @Required 061 private ExpressionDefinition expression; 062 @XmlElementRef 063 private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>(); 064 065 public ResequenceDefinition() { 066 } 067 068 @Override 069 public String getShortName() { 070 return "resequence"; 071 } 072 073 public List<ProcessorDefinition<?>> getOutputs() { 074 return outputs; 075 } 076 077 public void setOutputs(List<ProcessorDefinition<?>> outputs) { 078 this.outputs = outputs; 079 } 080 081 @Override 082 public boolean isOutputSupported() { 083 return true; 084 } 085 086 // Fluent API 087 // ------------------------------------------------------------------------- 088 /** 089 * Configures the stream-based resequencing algorithm using the default 090 * configuration. 091 * 092 * @return the builder 093 */ 094 public ResequenceDefinition stream() { 095 return stream(StreamResequencerConfig.getDefault()); 096 } 097 098 /** 099 * Configures the batch-based resequencing algorithm using the default 100 * configuration. 101 * 102 * @return the builder 103 */ 104 public ResequenceDefinition batch() { 105 return batch(BatchResequencerConfig.getDefault()); 106 } 107 108 /** 109 * Configures the stream-based resequencing algorithm using the given 110 * {@link StreamResequencerConfig}. 111 * 112 * @param config the config 113 * @return the builder 114 */ 115 public ResequenceDefinition stream(StreamResequencerConfig config) { 116 this.streamConfig = config; 117 this.batchConfig = null; 118 return this; 119 } 120 121 /** 122 * Configures the batch-based resequencing algorithm using the given 123 * {@link BatchResequencerConfig}. 124 * 125 * @param config the config 126 * @return the builder 127 */ 128 public ResequenceDefinition batch(BatchResequencerConfig config) { 129 this.batchConfig = config; 130 this.streamConfig = null; 131 return this; 132 } 133 134 /** 135 * Sets the timeout 136 * @param timeout timeout in millis 137 * @return the builder 138 */ 139 public ResequenceDefinition timeout(long timeout) { 140 if (streamConfig != null) { 141 streamConfig.setTimeout(timeout); 142 } else { 143 // initialize batch mode as its default mode 144 if (batchConfig == null) { 145 batch(); 146 } 147 batchConfig.setBatchTimeout(timeout); 148 } 149 return this; 150 } 151 152 /** 153 * Sets the in batch size for number of exchanges received 154 * @param batchSize the batch size 155 * @return the builder 156 */ 157 public ResequenceDefinition size(int batchSize) { 158 if (streamConfig != null) { 159 throw new IllegalStateException("size() only supported for batch resequencer"); 160 } 161 // initialize batch mode as its default mode 162 if (batchConfig == null) { 163 batch(); 164 } 165 batchConfig.setBatchSize(batchSize); 166 return this; 167 } 168 169 /** 170 * Sets the capacity for the stream resequencer 171 * 172 * @param capacity the capacity 173 * @return the builder 174 */ 175 public ResequenceDefinition capacity(int capacity) { 176 if (streamConfig == null) { 177 throw new IllegalStateException("capacity() only supported for stream resequencer"); 178 } 179 streamConfig.setCapacity(capacity); 180 return this; 181 182 } 183 184 /** 185 * Enables duplicates for the batch resequencer mode 186 * @return the builder 187 */ 188 public ResequenceDefinition allowDuplicates() { 189 if (streamConfig != null) { 190 throw new IllegalStateException("allowDuplicates() only supported for batch resequencer"); 191 } 192 // initialize batch mode as its default mode 193 if (batchConfig == null) { 194 batch(); 195 } 196 batchConfig.setAllowDuplicates(true); 197 return this; 198 } 199 200 /** 201 * Enables reverse mode for the batch resequencer mode. 202 * <p/> 203 * This means the expression for determine the sequence order will be reversed. 204 * Can be used for Z..A or 9..0 ordering. 205 * 206 * @return the builder 207 */ 208 public ResequenceDefinition reverse() { 209 if (streamConfig != null) { 210 throw new IllegalStateException("reverse() only supported for batch resequencer"); 211 } 212 // initialize batch mode as its default mode 213 if (batchConfig == null) { 214 batch(); 215 } 216 batchConfig.setReverse(true); 217 return this; 218 } 219 220 /** 221 * If an incoming {@link org.apache.camel.Exchange} is invalid, then it will be ignored. 222 * 223 * @return builder 224 */ 225 public ResequenceDefinition ignoreInvalidExchanges() { 226 if (streamConfig != null) { 227 streamConfig.setIgnoreInvalidExchanges(true); 228 } else { 229 // initialize batch mode as its default mode 230 if (batchConfig == null) { 231 batch(); 232 } 233 batchConfig.setIgnoreInvalidExchanges(true); 234 } 235 return this; 236 } 237 238 /** 239 * Sets the comparator to use for stream resequencer 240 * 241 * @param comparator the comparator 242 * @return the builder 243 */ 244 public ResequenceDefinition comparator(ExpressionResultComparator comparator) { 245 if (streamConfig == null) { 246 throw new IllegalStateException("comparator() only supported for stream resequencer"); 247 } 248 streamConfig.setComparator(comparator); 249 return this; 250 } 251 252 @Override 253 public String toString() { 254 return "Resequencer[" + getExpression() + " -> " + getOutputs() + "]"; 255 } 256 257 @Override 258 public String getLabel() { 259 return "resequencer[" + (getExpression() != null ? getExpression().getLabel() : "") + "]"; 260 } 261 262 public ResequencerConfig getResequencerConfig() { 263 return resequencerConfig; 264 } 265 266 public void setResequencerConfig(ResequencerConfig resequencerConfig) { 267 this.resequencerConfig = resequencerConfig; 268 } 269 270 public BatchResequencerConfig getBatchConfig() { 271 if (batchConfig == null && resequencerConfig != null && resequencerConfig instanceof BatchResequencerConfig) { 272 return (BatchResequencerConfig) resequencerConfig; 273 } 274 return batchConfig; 275 } 276 277 public StreamResequencerConfig getStreamConfig() { 278 if (streamConfig == null && resequencerConfig != null && resequencerConfig instanceof StreamResequencerConfig) { 279 return (StreamResequencerConfig) resequencerConfig; 280 } 281 return streamConfig; 282 } 283 284 public void setBatchConfig(BatchResequencerConfig batchConfig) { 285 this.batchConfig = batchConfig; 286 } 287 288 public void setStreamConfig(StreamResequencerConfig streamConfig) { 289 this.streamConfig = streamConfig; 290 } 291 292 public ExpressionDefinition getExpression() { 293 return expression; 294 } 295 296 public void setExpression(ExpressionDefinition expression) { 297 this.expression = expression; 298 } 299 300 @Override 301 public Processor createProcessor(RouteContext routeContext) throws Exception { 302 // if configured from XML then streamConfig has been set with the configuration 303 if (resequencerConfig != null) { 304 if (resequencerConfig instanceof StreamResequencerConfig) { 305 streamConfig = (StreamResequencerConfig) resequencerConfig; 306 } else { 307 batchConfig = (BatchResequencerConfig) resequencerConfig; 308 } 309 } 310 311 if (streamConfig != null) { 312 return createStreamResequencer(routeContext, streamConfig); 313 } else { 314 if (batchConfig == null) { 315 // default as batch mode 316 batch(); 317 } 318 return createBatchResequencer(routeContext, batchConfig); 319 } 320 } 321 322 /** 323 * Creates a batch {@link Resequencer} instance applying the given <code>config</code>. 324 * 325 * @param routeContext route context. 326 * @param config batch resequencer configuration. 327 * @return the configured batch resequencer. 328 * @throws Exception can be thrown 329 */ 330 @SuppressWarnings("deprecation") 331 protected Resequencer createBatchResequencer(RouteContext routeContext, 332 BatchResequencerConfig config) throws Exception { 333 Processor processor = this.createChildProcessor(routeContext, true); 334 Expression expression = getExpression().createExpression(routeContext); 335 336 ObjectHelper.notNull(config, "config", this); 337 ObjectHelper.notNull(expression, "expression", this); 338 339 Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, expression, 340 config.isAllowDuplicates(), config.isReverse()); 341 resequencer.setBatchSize(config.getBatchSize()); 342 resequencer.setBatchTimeout(config.getBatchTimeout()); 343 if (config.getIgnoreInvalidExchanges() != null) { 344 resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges()); 345 } 346 return resequencer; 347 } 348 349 /** 350 * Creates a {@link StreamResequencer} instance applying the given <code>config</code>. 351 * 352 * @param routeContext route context. 353 * @param config stream resequencer configuration. 354 * @return the configured stream resequencer. 355 * @throws Exception can be thrwon 356 */ 357 protected StreamResequencer createStreamResequencer(RouteContext routeContext, 358 StreamResequencerConfig config) throws Exception { 359 Processor processor = this.createChildProcessor(routeContext, true); 360 Expression expression = getExpression().createExpression(routeContext); 361 362 ObjectHelper.notNull(config, "config", this); 363 ObjectHelper.notNull(expression, "expression", this); 364 365 ExpressionResultComparator comparator = config.getComparator(); 366 comparator.setExpression(expression); 367 368 StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), processor, comparator); 369 resequencer.setTimeout(config.getTimeout()); 370 resequencer.setCapacity(config.getCapacity()); 371 if (config.getIgnoreInvalidExchanges() != null) { 372 resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges()); 373 } 374 return resequencer; 375 } 376 377 }