001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    import javax.xml.bind.annotation.XmlAccessType;
022    import javax.xml.bind.annotation.XmlAccessorType;
023    import javax.xml.bind.annotation.XmlElement;
024    import javax.xml.bind.annotation.XmlElementRef;
025    import javax.xml.bind.annotation.XmlElements;
026    import javax.xml.bind.annotation.XmlRootElement;
027    import javax.xml.bind.annotation.XmlTransient;
028    
029    import org.apache.camel.Expression;
030    import org.apache.camel.Processor;
031    import org.apache.camel.model.config.BatchResequencerConfig;
032    import org.apache.camel.model.config.ResequencerConfig;
033    import org.apache.camel.model.config.StreamResequencerConfig;
034    import org.apache.camel.model.language.ExpressionDefinition;
035    import org.apache.camel.processor.Resequencer;
036    import org.apache.camel.processor.StreamResequencer;
037    import org.apache.camel.processor.resequencer.ExpressionResultComparator;
038    import org.apache.camel.spi.Required;
039    import org.apache.camel.spi.RouteContext;
040    import org.apache.camel.util.ObjectHelper;
041    
042    /**
043     * Represents an XML <resequence/> element
044     *
045     * @version 
046     */
047    @XmlRootElement(name = "resequence")
048    @XmlAccessorType(XmlAccessType.FIELD)
049    public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefinition> {
050        @XmlElements({
051        @XmlElement(required = false, name = "batch-config", type = BatchResequencerConfig.class),
052        @XmlElement(required = false, name = "stream-config", type = StreamResequencerConfig.class)}
053        )
054        private ResequencerConfig resequencerConfig;
055        @XmlTransient
056        private BatchResequencerConfig batchConfig;
057        @XmlTransient
058        private StreamResequencerConfig streamConfig;
059        @XmlElementRef
060        @Required
061        private ExpressionDefinition expression;
062        @XmlElementRef
063        private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
064    
065        public ResequenceDefinition() {
066        }
067    
068        @Override
069        public String getShortName() {
070            return "resequence";
071        }
072    
073        public List<ProcessorDefinition<?>> getOutputs() {
074            return outputs;
075        }
076    
077        public void setOutputs(List<ProcessorDefinition<?>> outputs) {
078            this.outputs = outputs;
079        }
080    
081        @Override
082        public boolean isOutputSupported() {
083            return true;
084        }
085    
086        // Fluent API
087        // -------------------------------------------------------------------------
088        /**
089         * Configures the stream-based resequencing algorithm using the default
090         * configuration.
091         *
092         * @return the builder
093         */
094        public ResequenceDefinition stream() {
095            return stream(StreamResequencerConfig.getDefault());
096        }
097    
098        /**
099         * Configures the batch-based resequencing algorithm using the default
100         * configuration.
101         *
102         * @return the builder
103         */
104        public ResequenceDefinition batch() {
105            return batch(BatchResequencerConfig.getDefault());
106        }
107    
108        /**
109         * Configures the stream-based resequencing algorithm using the given
110         * {@link StreamResequencerConfig}.
111         *
112         * @param config  the config
113         * @return the builder
114         */
115        public ResequenceDefinition stream(StreamResequencerConfig config) {
116            this.streamConfig = config;
117            this.batchConfig = null;
118            return this;
119        }
120    
121        /**
122         * Configures the batch-based resequencing algorithm using the given
123         * {@link BatchResequencerConfig}.
124         *
125         * @param config  the config
126         * @return the builder
127         */
128        public ResequenceDefinition batch(BatchResequencerConfig config) {
129            this.batchConfig = config;
130            this.streamConfig = null;
131            return this;
132        }
133    
134        /**
135         * Sets the timeout
136         * @param timeout  timeout in millis
137         * @return the builder
138         */
139        public ResequenceDefinition timeout(long timeout) {
140            if (streamConfig != null) {
141                streamConfig.setTimeout(timeout);
142            } else {
143                // initialize batch mode as its default mode
144                if (batchConfig == null) {
145                    batch();
146                }
147                batchConfig.setBatchTimeout(timeout);
148            }
149            return this;
150        }
151    
152        /**
153         * Sets the in batch size for number of exchanges received
154         * @param batchSize  the batch size
155         * @return the builder
156         */
157        public ResequenceDefinition size(int batchSize) {
158            if (streamConfig != null) {
159                throw new IllegalStateException("size() only supported for batch resequencer");
160            }
161            // initialize batch mode as its default mode
162            if (batchConfig == null) {
163                batch();
164            }
165            batchConfig.setBatchSize(batchSize);
166            return this;
167        }
168    
169        /**
170         * Sets the capacity for the stream resequencer
171         *
172         * @param capacity  the capacity
173         * @return the builder
174         */
175        public ResequenceDefinition capacity(int capacity) {
176            if (streamConfig == null) {
177                throw new IllegalStateException("capacity() only supported for stream resequencer");
178            }
179            streamConfig.setCapacity(capacity);
180            return this;
181    
182        }
183    
184        /**
185         * Enables duplicates for the batch resequencer mode
186         * @return the builder
187         */
188        public ResequenceDefinition allowDuplicates() {
189            if (streamConfig != null) {
190                throw new IllegalStateException("allowDuplicates() only supported for batch resequencer");
191            }
192            // initialize batch mode as its default mode
193            if (batchConfig == null) {
194                batch();
195            }
196            batchConfig.setAllowDuplicates(true);
197            return this;
198        }
199    
200        /**
201         * Enables reverse mode for the batch resequencer mode.
202         * <p/>
203         * This means the expression for determine the sequence order will be reversed.
204         * Can be used for Z..A or 9..0 ordering.
205         *
206         * @return the builder
207         */
208        public ResequenceDefinition reverse() {
209            if (streamConfig != null) {
210                throw new IllegalStateException("reverse() only supported for batch resequencer");
211            }
212            // initialize batch mode as its default mode
213            if (batchConfig == null) {
214                batch();
215            }
216            batchConfig.setReverse(true);
217            return this;
218        }
219    
220        /**
221         * If an incoming {@link org.apache.camel.Exchange} is invalid, then it will be ignored.
222         *
223         * @return builder
224         */
225        public ResequenceDefinition ignoreInvalidExchanges() {
226            if (streamConfig != null) {
227                streamConfig.setIgnoreInvalidExchanges(true);
228            } else {
229                // initialize batch mode as its default mode
230                if (batchConfig == null) {
231                    batch();
232                }
233                batchConfig.setIgnoreInvalidExchanges(true);
234            }
235            return this;
236        }
237    
238        /**
239         * Sets the comparator to use for stream resequencer
240         *
241         * @param comparator  the comparator
242         * @return the builder
243         */
244        public ResequenceDefinition comparator(ExpressionResultComparator comparator) {
245            if (streamConfig == null) {
246                throw new IllegalStateException("comparator() only supported for stream resequencer");
247            }
248            streamConfig.setComparator(comparator);
249            return this;
250        }
251    
252        @Override
253        public String toString() {
254            return "Resequencer[" + getExpression() + " -> " + getOutputs() + "]";
255        }
256        
257        @Override
258        public String getLabel() {
259            return "resequencer[" + (getExpression() != null ? getExpression().getLabel() : "") + "]";
260        }
261    
262        public ResequencerConfig getResequencerConfig() {
263            return resequencerConfig;
264        }
265    
266        public void setResequencerConfig(ResequencerConfig resequencerConfig) {
267            this.resequencerConfig = resequencerConfig;
268        }
269    
270        public BatchResequencerConfig getBatchConfig() {
271            if (batchConfig == null && resequencerConfig != null && resequencerConfig instanceof BatchResequencerConfig) {
272                return (BatchResequencerConfig) resequencerConfig;
273            }
274            return batchConfig;
275        }
276    
277        public StreamResequencerConfig getStreamConfig() {
278            if (streamConfig == null && resequencerConfig != null && resequencerConfig instanceof StreamResequencerConfig) {
279                return (StreamResequencerConfig) resequencerConfig;
280            }
281            return streamConfig;
282        }
283    
284        public void setBatchConfig(BatchResequencerConfig batchConfig) {
285            this.batchConfig = batchConfig;
286        }
287    
288        public void setStreamConfig(StreamResequencerConfig streamConfig) {
289            this.streamConfig = streamConfig;
290        }
291    
292        public ExpressionDefinition getExpression() {
293            return expression;
294        }
295    
296        public void setExpression(ExpressionDefinition expression) {
297            this.expression = expression;
298        }
299    
300        @Override
301        public Processor createProcessor(RouteContext routeContext) throws Exception {
302            // if configured from XML then streamConfig has been set with the configuration
303            if (resequencerConfig != null) {
304                if (resequencerConfig instanceof StreamResequencerConfig) {
305                    streamConfig = (StreamResequencerConfig) resequencerConfig;
306                } else {
307                    batchConfig = (BatchResequencerConfig) resequencerConfig;
308                }
309            }
310    
311            if (streamConfig != null) {
312                return createStreamResequencer(routeContext, streamConfig);
313            } else {
314                if (batchConfig == null) {
315                    // default as batch mode
316                    batch();
317                }
318                return createBatchResequencer(routeContext, batchConfig);
319            }
320        }
321    
322        /**
323         * Creates a batch {@link Resequencer} instance applying the given <code>config</code>.
324         * 
325         * @param routeContext route context.
326         * @param config batch resequencer configuration.
327         * @return the configured batch resequencer.
328         * @throws Exception can be thrown
329         */
330        @SuppressWarnings("deprecation")
331        protected Resequencer createBatchResequencer(RouteContext routeContext,
332                                                     BatchResequencerConfig config) throws Exception {
333            Processor processor = this.createChildProcessor(routeContext, true);
334            Expression expression = getExpression().createExpression(routeContext);
335    
336            ObjectHelper.notNull(config, "config", this);
337            ObjectHelper.notNull(expression, "expression", this);
338    
339            Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, expression,
340                    config.isAllowDuplicates(), config.isReverse());
341            resequencer.setBatchSize(config.getBatchSize());
342            resequencer.setBatchTimeout(config.getBatchTimeout());
343            if (config.getIgnoreInvalidExchanges() != null) {
344                resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
345            }
346            return resequencer;
347        }
348    
349        /**
350         * Creates a {@link StreamResequencer} instance applying the given <code>config</code>.
351         * 
352         * @param routeContext route context.
353         * @param config stream resequencer configuration.
354         * @return the configured stream resequencer.
355         * @throws Exception can be thrwon
356         */
357        protected StreamResequencer createStreamResequencer(RouteContext routeContext,
358                                                            StreamResequencerConfig config) throws Exception {
359            Processor processor = this.createChildProcessor(routeContext, true);
360            Expression expression = getExpression().createExpression(routeContext);
361    
362            ObjectHelper.notNull(config, "config", this);
363            ObjectHelper.notNull(expression, "expression", this);
364    
365            ExpressionResultComparator comparator = config.getComparator();
366            comparator.setExpression(expression);
367    
368            StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), processor, comparator);
369            resequencer.setTimeout(config.getTimeout());
370            resequencer.setCapacity(config.getCapacity());
371            if (config.getIgnoreInvalidExchanges() != null) {
372                resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
373            }
374            return resequencer;
375        }
376    
377    }