001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.dataset;
018
019import java.util.concurrent.atomic.AtomicInteger;
020
021import org.apache.camel.Component;
022import org.apache.camel.Consumer;
023import org.apache.camel.Exchange;
024import org.apache.camel.Message;
025import org.apache.camel.Processor;
026import org.apache.camel.Producer;
027import org.apache.camel.Service;
028import org.apache.camel.component.mock.MockEndpoint;
029import org.apache.camel.processor.ThroughputLogger;
030import org.apache.camel.spi.Metadata;
031import org.apache.camel.spi.UriEndpoint;
032import org.apache.camel.spi.UriParam;
033import org.apache.camel.spi.UriPath;
034import org.apache.camel.util.CamelLogger;
035import org.apache.camel.util.ExchangeHelper;
036import org.apache.camel.util.ObjectHelper;
037import org.apache.camel.util.URISupport;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * The dataset component provides a mechanism to easily perform load & soak testing of your system.
043 *
044 * It works by allowing you to create DataSet instances both as a source of messages and as a way to assert that the data set is received.
045 * Camel will use the throughput logger when sending dataset's.
046 */
047@UriEndpoint(scheme = "dataset", title = "Dataset", syntax = "dataset:name", consumerClass = DataSetConsumer.class, label = "core,testing", lenientProperties = true)
048public class DataSetEndpoint extends MockEndpoint implements Service {
049    private final transient Logger log;
050    private final AtomicInteger receivedCounter = new AtomicInteger();
051    @UriPath(name = "name", description = "Name of DataSet to lookup in the registry") @Metadata(required = "true")
052    private volatile DataSet dataSet;
053    @UriParam(label = "consumer", defaultValue = "0")
054    private int minRate;
055    @UriParam(label = "consumer", defaultValue = "3")
056    private long produceDelay = 3;
057    @UriParam(label = "producer", defaultValue = "0")
058    private long consumeDelay;
059    @UriParam(label = "consumer", defaultValue = "0")
060    private long preloadSize;
061    @UriParam(label = "consumer", defaultValue = "1000")
062    private long initialDelay = 1000;
063    @UriParam(enums = "strict,lenient,off", defaultValue = "lenient")
064    private String dataSetIndex = "lenient";
065
066    @Deprecated
067    public DataSetEndpoint() {
068        this.log = LoggerFactory.getLogger(DataSetEndpoint.class);
069        // optimize as we dont need to copy the exchange
070        setCopyOnExchange(false);
071    }
072
073    public DataSetEndpoint(String endpointUri, Component component, DataSet dataSet) {
074        super(endpointUri, component);
075        this.dataSet = dataSet;
076        this.log = LoggerFactory.getLogger(endpointUri);
077        // optimize as we dont need to copy the exchange
078        setCopyOnExchange(false);
079    }
080
081    public static void assertEquals(String description, Object expected, Object actual, Exchange exchange) {
082        if (!ObjectHelper.equal(expected, actual)) {
083            throw new AssertionError(description + " does not match. Expected: " + expected + " but was: " + actual + " on " + exchange + " with headers: " + exchange.getIn().getHeaders());
084        }
085    }
086
087    @Override
088    public Consumer createConsumer(Processor processor) throws Exception {
089        Consumer answer = new DataSetConsumer(this, processor);
090        configureConsumer(answer);
091
092        // expectedMessageCount((int) size);
093
094        return answer;
095    }
096
097    @Override
098    public Producer createProducer() throws Exception {
099        Producer answer = super.createProducer();
100
101        long size = getDataSet().getSize();
102        expectedMessageCount((int) size);
103
104        return answer;
105    }
106
107    @Override
108    public void reset() {
109        super.reset();
110        receivedCounter.set(0);
111    }
112
113    @Override
114    public int getReceivedCounter() {
115        return receivedCounter.get();
116    }
117
118    /**
119     * Creates a message exchange for the given index in the {@link DataSet}
120     */
121    public Exchange createExchange(long messageIndex) throws Exception {
122        Exchange exchange = createExchange();
123
124        getDataSet().populateMessage(exchange, messageIndex);
125
126        if (!getDataSetIndex().equals("off")) {
127            Message in = exchange.getIn();
128            in.setHeader(Exchange.DATASET_INDEX, messageIndex);
129        }
130
131        return exchange;
132    }
133
134    @Override
135    protected void waitForCompleteLatch(long timeout) throws InterruptedException {
136        super.waitForCompleteLatch(timeout);
137
138        if (minRate > 0) {
139            int count = getReceivedCounter();
140            do {
141                // wait as long as we get a decent message rate
142                super.waitForCompleteLatch(1000L);
143                count = getReceivedCounter() - count;
144            } while (count >= minRate);
145        }
146    }
147
148    // Properties
149    //-------------------------------------------------------------------------
150
151    public DataSet getDataSet() {
152        return dataSet;
153    }
154
155    public void setDataSet(DataSet dataSet) {
156        this.dataSet = dataSet;
157    }
158
159    public int getMinRate() {
160        return minRate;
161    }
162
163    /**
164     * Wait until the DataSet contains at least this number of messages
165     */
166    public void setMinRate(int minRate) {
167        this.minRate = minRate;
168    }
169
170    public long getPreloadSize() {
171        return preloadSize;
172    }
173
174    /**
175     * Sets how many messages should be preloaded (sent) before the route completes its initialization
176     */
177    public void setPreloadSize(long preloadSize) {
178        this.preloadSize = preloadSize;
179    }
180
181    public long getConsumeDelay() {
182        return consumeDelay;
183    }
184
185    /**
186     * Allows a delay to be specified which causes a delay when a message is consumed by the producer (to simulate slow processing)
187     */
188    public void setConsumeDelay(long consumeDelay) {
189        this.consumeDelay = consumeDelay;
190    }
191
192    public long getProduceDelay() {
193        return produceDelay;
194    }
195
196    /**
197     * Allows a delay to be specified which causes a delay when a message is sent by the consumer (to simulate slow processing)
198     */
199    public void setProduceDelay(long produceDelay) {
200        this.produceDelay = produceDelay;
201    }
202
203    public long getInitialDelay() {
204        return initialDelay;
205    }
206
207    /**
208     * Time period in millis to wait before starting sending messages.
209     */
210    public void setInitialDelay(long initialDelay) {
211        this.initialDelay = initialDelay;
212    }
213
214    /**
215     * Controls the behaviour of the CamelDataSetIndex header.
216     * For Consumers:
217     * - off => the header will not be set
218     * - strict/lenient => the header will be set
219     * For Producers:
220     * - off => the header value will not be verified, and will not be set if it is not present
221     * = strict => the header value must be present and will be verified
222     * = lenient => the header value will be verified if it is present, and will be set if it is not present
223     */
224    public void setDataSetIndex(String dataSetIndex) {
225        switch (dataSetIndex) {
226        case "off":
227        case "lenient":
228        case "strict":
229            this.dataSetIndex = dataSetIndex;
230            break;
231        default:
232            throw new IllegalArgumentException("Invalid value specified for the dataSetIndex URI parameter:" + dataSetIndex
233                    + "Supported values are strict, lenient and off ");
234        }
235    }
236
237    public String getDataSetIndex() {
238        return dataSetIndex;
239    }
240
241    // Implementation methods
242    //-------------------------------------------------------------------------
243
244    @Override
245    protected void performAssertions(Exchange actual, Exchange copy) throws Exception {
246        int receivedCount = receivedCounter.incrementAndGet();
247        long index = receivedCount - 1;
248        Exchange expected = createExchange(index);
249
250        // now let's assert that they are the same
251        if (log.isDebugEnabled()) {
252            if (copy.getIn().getHeader(Exchange.DATASET_INDEX) != null) {
253                log.debug("Received message: {} (DataSet index={}) = {}",
254                        new Object[]{index, copy.getIn().getHeader(Exchange.DATASET_INDEX, Integer.class), copy});
255            } else {
256                log.debug("Received message: {} = {}",
257                        new Object[]{index, copy});
258            }
259        }
260
261        assertMessageExpected(index, expected, copy);
262
263        if (consumeDelay > 0) {
264            Thread.sleep(consumeDelay);
265        }
266    }
267
268    protected void assertMessageExpected(long index, Exchange expected, Exchange actual) throws Exception {
269        switch (getDataSetIndex()) {
270        case "off":
271            break;
272        case "strict":
273            long actualCounter = ExchangeHelper.getMandatoryHeader(actual, Exchange.DATASET_INDEX, Long.class);
274            assertEquals("Header: " + Exchange.DATASET_INDEX, index, actualCounter, actual);
275            break;
276        case "lenient":
277        default:
278            // Validate the header value if it is present
279            Long dataSetIndexHeaderValue = actual.getIn().getHeader(Exchange.DATASET_INDEX, Long.class);
280            if (dataSetIndexHeaderValue != null) {
281                assertEquals("Header: " + Exchange.DATASET_INDEX, index, dataSetIndexHeaderValue, actual);
282            } else {
283                // set the header if it isn't there
284                actual.getIn().setHeader(Exchange.DATASET_INDEX, index);
285            }
286            break;
287        }
288
289        getDataSet().assertMessageExpected(this, expected, actual, index);
290    }
291
292    protected ThroughputLogger createReporter() {
293        // must sanitize uri to avoid logging sensitive information
294        String uri = URISupport.sanitizeUri(getEndpointUri());
295        CamelLogger logger = new CamelLogger(uri);
296        ThroughputLogger answer = new ThroughputLogger(logger, (int) this.getDataSet().getReportCount());
297        answer.setAction("Received");
298        return answer;
299    }
300
301    @Override
302    protected void doStart() throws Exception {
303        super.doStart();
304
305        if (reporter == null) {
306            reporter = createReporter();
307        }
308
309        log.info(this + " expecting " + getExpectedCount() + " messages");
310    }
311
312}