001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model.config;
018
019import javax.xml.bind.annotation.XmlAccessType;
020import javax.xml.bind.annotation.XmlAccessorType;
021import javax.xml.bind.annotation.XmlAttribute;
022import javax.xml.bind.annotation.XmlRootElement;
023import javax.xml.bind.annotation.XmlTransient;
024
025import org.apache.camel.processor.resequencer.DefaultExchangeComparator;
026import org.apache.camel.processor.resequencer.ExpressionResultComparator;
027import org.apache.camel.spi.Metadata;
028
029/**
030 * Configures stream-processing resequence eip.
031 */
032@Metadata(label = "eip,routing,resequence")
033@XmlRootElement(name = "stream-config")
034@XmlAccessorType(XmlAccessType.FIELD)
035public class StreamResequencerConfig extends ResequencerConfig {
036    @XmlAttribute @Metadata(defaultValue = "100")
037    private Integer capacity;
038    @XmlAttribute @Metadata(defaultValue = "1000")
039    private Long timeout;
040    @XmlAttribute
041    private Boolean ignoreInvalidExchanges;
042    @XmlTransient
043    private ExpressionResultComparator comparator;
044    @XmlAttribute
045    private String comparatorRef;
046    @XmlAttribute
047    private Boolean rejectOld;
048
049    /**
050     * Creates a new {@link StreamResequencerConfig} instance using default
051     * values for <code>capacity</code> (1000) and <code>timeout</code>
052     * (1000L). Elements of the sequence are compared using the
053     * {@link DefaultExchangeComparator}.
054     */
055    public StreamResequencerConfig() {
056        this(1000, 1000L);
057    }
058
059    /**
060     * Creates a new {@link StreamResequencerConfig} instance using the given
061     * values for <code>capacity</code> and <code>timeout</code>. Elements
062     * of the sequence are compared using the {@link DefaultExchangeComparator}.
063     * 
064     * @param capacity   capacity of the resequencer's inbound queue.
065     * @param timeout    minimum time to wait for missing elements (messages).
066     */
067    public StreamResequencerConfig(int capacity, long timeout) {
068        this(capacity, timeout, new DefaultExchangeComparator());
069    }
070
071    /**
072     * Creates a new {@link StreamResequencerConfig} instance using the given
073     * values for <code>capacity</code> and <code>timeout</code>. Elements
074     * of the sequence are compared with the given
075     * {@link ExpressionResultComparator}.
076     * 
077     * @param capacity   capacity of the resequencer's inbound queue.
078     * @param timeout    minimum time to wait for missing elements (messages).
079     * @param comparator comparator for sequence comparision
080     */
081    public StreamResequencerConfig(int capacity, long timeout, ExpressionResultComparator comparator) {
082        this.capacity = capacity;
083        this.timeout = timeout;
084        this.comparator = comparator;
085    }
086
087    /**
088     * Creates a new {@link StreamResequencerConfig} instance using the given
089     * values for <code>capacity</code> and <code>timeout</code>. Elements
090     * of the sequence are compared using the {@link DefaultExchangeComparator}.
091     *
092     * @param capacity   capacity of the resequencer's inbound queue.
093     * @param timeout    minimum time to wait for missing elements (messages).
094     * @param rejectOld  if true, throws an exception when messages older than the last delivered message are processed
095     */
096    public StreamResequencerConfig(int capacity, long timeout, Boolean rejectOld) {
097        this(capacity, timeout, rejectOld, new DefaultExchangeComparator());
098    }
099
100    /**
101     * Creates a new {@link StreamResequencerConfig} instance using the given
102     * values for <code>capacity</code> and <code>timeout</code>. Elements
103     * of the sequence are compared with the given {@link ExpressionResultComparator}.
104     *
105     * @param capacity   capacity of the resequencer's inbound queue.
106     * @param timeout    minimum time to wait for missing elements (messages).
107     * @param rejectOld  if true, throws an exception when messages older than the last delivered message are processed
108     * @param comparator comparator for sequence comparision
109     */
110    public StreamResequencerConfig(int capacity, long timeout, Boolean rejectOld, ExpressionResultComparator comparator) {
111        this.capacity = capacity;
112        this.timeout = timeout;
113        this.rejectOld = rejectOld;
114        this.comparator = comparator;
115    }
116
117    /**
118     * Returns a new {@link StreamResequencerConfig} instance using default
119     * values for <code>capacity</code> (1000) and <code>timeout</code>
120     * (1000L). Elements of the sequence are compared using the
121     * {@link DefaultExchangeComparator}.
122     * 
123     * @return a default {@link StreamResequencerConfig}.
124     */
125    public static StreamResequencerConfig getDefault() {
126        return new StreamResequencerConfig();
127    }
128    
129    public int getCapacity() {
130        return capacity;
131    }
132
133    /**
134     * Sets the capacity of the resequencer's inbound queue.
135     */
136    public void setCapacity(int capacity) {
137        this.capacity = capacity;
138    }
139
140    public long getTimeout() {
141        return timeout;
142    }
143
144    /**
145     * Sets minimum time to wait for missing elements (messages).
146     */
147    public void setTimeout(long timeout) {
148        this.timeout = timeout;
149    }
150
151    public Boolean getIgnoreInvalidExchanges() {
152        return ignoreInvalidExchanges;
153    }
154
155    /**
156     * Whether to ignore invalid exchanges
157     */
158    public void setIgnoreInvalidExchanges(Boolean ignoreInvalidExchanges) {
159        this.ignoreInvalidExchanges = ignoreInvalidExchanges;
160    }
161
162    public ExpressionResultComparator getComparator() {
163        return comparator;
164    }
165
166    /**
167     * To use a custom comparator
168     */
169    public void setComparator(ExpressionResultComparator comparator) {
170        this.comparator = comparator;
171    }
172
173    public String getComparatorRef() {
174        return comparatorRef;
175    }
176
177    /**
178     * To use a custom comparator
179     */
180    public void setComparatorRef(String comparatorRef) {
181        this.comparatorRef = comparatorRef;
182    }
183
184    /**
185     * If true, throws an exception when messages older than the last delivered message are processed
186     */
187    public void setRejectOld(boolean value) {
188        this.rejectOld = value;
189    }
190
191    public Boolean getRejectOld() {
192        return rejectOld;
193    }
194
195}