001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.ArrayList;
020import java.util.List;
021
022import javax.xml.bind.annotation.XmlAccessType;
023import javax.xml.bind.annotation.XmlAccessorType;
024import javax.xml.bind.annotation.XmlElement;
025import javax.xml.bind.annotation.XmlElementRef;
026import javax.xml.bind.annotation.XmlElements;
027import javax.xml.bind.annotation.XmlRootElement;
028import javax.xml.bind.annotation.XmlTransient;
029
030import org.apache.camel.Expression;
031import org.apache.camel.Processor;
032import org.apache.camel.model.config.BatchResequencerConfig;
033import org.apache.camel.model.config.ResequencerConfig;
034import org.apache.camel.model.config.StreamResequencerConfig;
035import org.apache.camel.model.language.ExpressionDefinition;
036import org.apache.camel.processor.CamelInternalProcessor;
037import org.apache.camel.processor.Resequencer;
038import org.apache.camel.processor.StreamResequencer;
039import org.apache.camel.processor.resequencer.ExpressionResultComparator;
040import org.apache.camel.spi.Metadata;
041import org.apache.camel.spi.RouteContext;
042import org.apache.camel.util.CamelContextHelper;
043import org.apache.camel.util.ObjectHelper;
044
045/**
046 * Resequences (re-order) messages based on an expression
047 *
048 * @version 
049 */
050@Metadata(label = "eip,routing")
051@XmlRootElement(name = "resequence")
052@XmlAccessorType(XmlAccessType.FIELD)
053public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefinition> {
054    @Metadata(required = "false")
055    @XmlElements({
056        @XmlElement(name = "batch-config", type = BatchResequencerConfig.class),
057        @XmlElement(name = "stream-config", type = StreamResequencerConfig.class)}
058        )
059    private ResequencerConfig resequencerConfig;
060    @XmlTransient
061    private BatchResequencerConfig batchConfig;
062    @XmlTransient
063    private StreamResequencerConfig streamConfig;
064    @XmlElementRef @Metadata(required = "true")
065    private ExpressionDefinition expression;
066    @XmlElementRef
067    private List<ProcessorDefinition<?>> outputs = new ArrayList<>();
068
069    public ResequenceDefinition() {
070    }
071
072    public ResequenceDefinition(Expression expression) {
073        if (expression != null) {
074            setExpression(ExpressionNodeHelper.toExpressionDefinition(expression));
075        }
076    }
077
078    public List<ProcessorDefinition<?>> getOutputs() {
079        return outputs;
080    }
081
082    public void setOutputs(List<ProcessorDefinition<?>> outputs) {
083        this.outputs = outputs;
084    }
085
086    @Override
087    public boolean isOutputSupported() {
088        return true;
089    }
090
091    // Fluent API
092    // -------------------------------------------------------------------------
093    /**
094     * Configures the stream-based resequencing algorithm using the default
095     * configuration.
096     *
097     * @return the builder
098     */
099    public ResequenceDefinition stream() {
100        return stream(StreamResequencerConfig.getDefault());
101    }
102
103    /**
104     * Configures the batch-based resequencing algorithm using the default
105     * configuration.
106     *
107     * @return the builder
108     */
109    public ResequenceDefinition batch() {
110        return batch(BatchResequencerConfig.getDefault());
111    }
112
113    /**
114     * Configures the stream-based resequencing algorithm using the given
115     * {@link StreamResequencerConfig}.
116     *
117     * @param config  the config
118     * @return the builder
119     */
120    public ResequenceDefinition stream(StreamResequencerConfig config) {
121        this.streamConfig = config;
122        this.batchConfig = null;
123        return this;
124    }
125
126    /**
127     * Configures the batch-based resequencing algorithm using the given
128     * {@link BatchResequencerConfig}.
129     *
130     * @param config  the config
131     * @return the builder
132     */
133    public ResequenceDefinition batch(BatchResequencerConfig config) {
134        this.batchConfig = config;
135        this.streamConfig = null;
136        return this;
137    }
138
139    /**
140     * Sets the timeout
141     * @param timeout  timeout in millis
142     * @return the builder
143     */
144    public ResequenceDefinition timeout(long timeout) {
145        if (streamConfig != null) {
146            streamConfig.setTimeout(timeout);
147        } else {
148            // initialize batch mode as its default mode
149            if (batchConfig == null) {
150                batch();
151            }
152            batchConfig.setBatchTimeout(timeout);
153        }
154        return this;
155    }
156
157    /**
158     * Sets the interval in milli seconds the stream resequencer will at most wait
159     * while waiting for condition of being able to deliver.
160     *
161     * @param deliveryAttemptInterval  interval in millis
162     * @return the builder
163     */
164    public ResequenceDefinition deliveryAttemptInterval(long deliveryAttemptInterval) {
165        if (streamConfig == null) {
166            throw new IllegalStateException("deliveryAttemptInterval() only supported for stream resequencer");
167        }
168        streamConfig.setDeliveryAttemptInterval(deliveryAttemptInterval);
169        return this;
170    }
171
172    /**
173     * Sets the rejectOld flag to throw an error when a message older than the last delivered message is processed
174     * @return the builder
175     */
176    public ResequenceDefinition rejectOld() {
177        if (streamConfig == null) {
178            throw new IllegalStateException("rejectOld() only supported for stream resequencer");
179        }
180        streamConfig.setRejectOld(true);
181        return this;
182    }
183
184    /**
185     * Sets the in batch size for number of exchanges received
186     * @param batchSize  the batch size
187     * @return the builder
188     */
189    public ResequenceDefinition size(int batchSize) {
190        if (streamConfig != null) {
191            throw new IllegalStateException("size() only supported for batch resequencer");
192        }
193        // initialize batch mode as its default mode
194        if (batchConfig == null) {
195            batch();
196        }
197        batchConfig.setBatchSize(batchSize);
198        return this;
199    }
200
201    /**
202     * Sets the capacity for the stream resequencer
203     *
204     * @param capacity  the capacity
205     * @return the builder
206     */
207    public ResequenceDefinition capacity(int capacity) {
208        if (streamConfig == null) {
209            throw new IllegalStateException("capacity() only supported for stream resequencer");
210        }
211        streamConfig.setCapacity(capacity);
212        return this;
213
214    }
215
216    /**
217     * Enables duplicates for the batch resequencer mode
218     * @return the builder
219     */
220    public ResequenceDefinition allowDuplicates() {
221        if (streamConfig != null) {
222            throw new IllegalStateException("allowDuplicates() only supported for batch resequencer");
223        }
224        // initialize batch mode as its default mode
225        if (batchConfig == null) {
226            batch();
227        }
228        batchConfig.setAllowDuplicates(true);
229        return this;
230    }
231
232    /**
233     * Enables reverse mode for the batch resequencer mode.
234     * <p/>
235     * This means the expression for determine the sequence order will be reversed.
236     * Can be used for Z..A or 9..0 ordering.
237     *
238     * @return the builder
239     */
240    public ResequenceDefinition reverse() {
241        if (streamConfig != null) {
242            throw new IllegalStateException("reverse() only supported for batch resequencer");
243        }
244        // initialize batch mode as its default mode
245        if (batchConfig == null) {
246            batch();
247        }
248        batchConfig.setReverse(true);
249        return this;
250    }
251
252    /**
253     * If an incoming {@link org.apache.camel.Exchange} is invalid, then it will be ignored.
254     *
255     * @return builder
256     */
257    public ResequenceDefinition ignoreInvalidExchanges() {
258        if (streamConfig != null) {
259            streamConfig.setIgnoreInvalidExchanges(true);
260        } else {
261            // initialize batch mode as its default mode
262            if (batchConfig == null) {
263                batch();
264            }
265            batchConfig.setIgnoreInvalidExchanges(true);
266        }
267        return this;
268    }
269
270    /**
271     * Sets the comparator to use for stream resequencer
272     *
273     * @param comparator  the comparator
274     * @return the builder
275     */
276    public ResequenceDefinition comparator(ExpressionResultComparator comparator) {
277        if (streamConfig == null) {
278            throw new IllegalStateException("comparator() only supported for stream resequencer");
279        }
280        streamConfig.setComparator(comparator);
281        return this;
282    }
283
284    @Override
285    public String toString() {
286        return "Resequencer[" + getExpression() + " -> " + getOutputs() + "]";
287    }
288    
289    @Override
290    public String getShortName() {
291        return "resequence";
292    }
293
294    @Override
295    public String getLabel() {
296        return "resequencer[" + (getExpression() != null ? getExpression().getLabel() : "") + "]";
297    }
298
299    public ResequencerConfig getResequencerConfig() {
300        return resequencerConfig;
301    }
302
303    /**
304     * To configure the resequencer in using either batch or stream configuration. Will by default use batch configuration.
305     */
306    public void setResequencerConfig(ResequencerConfig resequencerConfig) {
307        this.resequencerConfig = resequencerConfig;
308    }
309
310    public BatchResequencerConfig getBatchConfig() {
311        if (batchConfig == null && resequencerConfig != null && resequencerConfig instanceof BatchResequencerConfig) {
312            return (BatchResequencerConfig) resequencerConfig;
313        }
314        return batchConfig;
315    }
316
317    public StreamResequencerConfig getStreamConfig() {
318        if (streamConfig == null && resequencerConfig != null && resequencerConfig instanceof StreamResequencerConfig) {
319            return (StreamResequencerConfig) resequencerConfig;
320        }
321        return streamConfig;
322    }
323
324    public void setBatchConfig(BatchResequencerConfig batchConfig) {
325        this.batchConfig = batchConfig;
326    }
327
328    public void setStreamConfig(StreamResequencerConfig streamConfig) {
329        this.streamConfig = streamConfig;
330    }
331
332    public ExpressionDefinition getExpression() {
333        return expression;
334    }
335
336    /**
337     * Expression to use for re-ordering the messages, such as a header with a sequence number
338     */
339    public void setExpression(ExpressionDefinition expression) {
340        this.expression = expression;
341    }
342
343    @Override
344    public Processor createProcessor(RouteContext routeContext) throws Exception {
345        // if configured from XML then streamConfig has been set with the configuration
346        if (resequencerConfig != null) {
347            if (resequencerConfig instanceof StreamResequencerConfig) {
348                streamConfig = (StreamResequencerConfig) resequencerConfig;
349            } else {
350                batchConfig = (BatchResequencerConfig) resequencerConfig;
351            }
352        }
353
354        if (streamConfig != null) {
355            return createStreamResequencer(routeContext, streamConfig);
356        } else {
357            if (batchConfig == null) {
358                // default as batch mode
359                batch();
360            }
361            return createBatchResequencer(routeContext, batchConfig);
362        }
363    }
364
365    /**
366     * Creates a batch {@link Resequencer} instance applying the given <code>config</code>.
367     * 
368     * @param routeContext route context.
369     * @param config batch resequencer configuration.
370     * @return the configured batch resequencer.
371     * @throws Exception can be thrown
372     */
373    @SuppressWarnings("deprecation")
374    protected Resequencer createBatchResequencer(RouteContext routeContext,
375                                                 BatchResequencerConfig config) throws Exception {
376        Processor processor = this.createChildProcessor(routeContext, true);
377        Expression expression = getExpression().createExpression(routeContext);
378
379        // and wrap in unit of work
380        CamelInternalProcessor internal = new CamelInternalProcessor(processor);
381        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
382
383        ObjectHelper.notNull(config, "config", this);
384        ObjectHelper.notNull(expression, "expression", this);
385
386        boolean isReverse = config.getReverse() != null && config.getReverse();
387        boolean isAllowDuplicates = config.getAllowDuplicates() != null && config.getAllowDuplicates();
388
389        Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), internal, expression, isAllowDuplicates, isReverse);
390        resequencer.setBatchSize(config.getBatchSize());
391        resequencer.setBatchTimeout(config.getBatchTimeout());
392        resequencer.setReverse(isReverse);
393        resequencer.setAllowDuplicates(isAllowDuplicates);
394        if (config.getIgnoreInvalidExchanges() != null) {
395            resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
396        }
397        return resequencer;
398    }
399
400    /**
401     * Creates a {@link StreamResequencer} instance applying the given <code>config</code>.
402     * 
403     * @param routeContext route context.
404     * @param config stream resequencer configuration.
405     * @return the configured stream resequencer.
406     * @throws Exception can be thrwon
407     */
408    protected StreamResequencer createStreamResequencer(RouteContext routeContext,
409                                                        StreamResequencerConfig config) throws Exception {
410        Processor processor = this.createChildProcessor(routeContext, true);
411        Expression expression = getExpression().createExpression(routeContext);
412
413        CamelInternalProcessor internal = new CamelInternalProcessor(processor);
414        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
415
416        ObjectHelper.notNull(config, "config", this);
417        ObjectHelper.notNull(expression, "expression", this);
418
419        ExpressionResultComparator comparator;
420        if (config.getComparatorRef() != null) {
421            comparator = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), config.getComparatorRef(), ExpressionResultComparator.class);
422        } else {
423            comparator = config.getComparator();
424        }
425        comparator.setExpression(expression);
426
427        StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), internal, comparator, expression);
428        resequencer.setTimeout(config.getTimeout());
429        if (config.getDeliveryAttemptInterval() != null) {
430            resequencer.setDeliveryAttemptInterval(config.getDeliveryAttemptInterval());
431        }
432        resequencer.setCapacity(config.getCapacity());
433        resequencer.setRejectOld(config.getRejectOld());
434        if (config.getIgnoreInvalidExchanges() != null) {
435            resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
436        }
437        return resequencer;
438    }
439
440}