001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.mock;
018
019import java.io.File;
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Date;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.CopyOnWriteArrayList;
030import java.util.concurrent.CopyOnWriteArraySet;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.TimeUnit;
033
034import org.apache.camel.AsyncCallback;
035import org.apache.camel.CamelContext;
036import org.apache.camel.Component;
037import org.apache.camel.Consumer;
038import org.apache.camel.Endpoint;
039import org.apache.camel.Exchange;
040import org.apache.camel.ExchangePattern;
041import org.apache.camel.Expression;
042import org.apache.camel.Handler;
043import org.apache.camel.Message;
044import org.apache.camel.Predicate;
045import org.apache.camel.Processor;
046import org.apache.camel.Producer;
047import org.apache.camel.builder.ProcessorBuilder;
048import org.apache.camel.impl.DefaultAsyncProducer;
049import org.apache.camel.impl.DefaultEndpoint;
050import org.apache.camel.impl.InterceptSendToEndpoint;
051import org.apache.camel.spi.BrowsableEndpoint;
052import org.apache.camel.spi.Metadata;
053import org.apache.camel.spi.UriEndpoint;
054import org.apache.camel.spi.UriParam;
055import org.apache.camel.spi.UriPath;
056import org.apache.camel.util.CamelContextHelper;
057import org.apache.camel.util.CaseInsensitiveMap;
058import org.apache.camel.util.ExchangeHelper;
059import org.apache.camel.util.ExpressionComparator;
060import org.apache.camel.util.FileUtil;
061import org.apache.camel.util.ObjectHelper;
062import org.apache.camel.util.StopWatch;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * The mock component is used for testing routes and mediation rules using mocks.
068 * <p/>
069 * A Mock endpoint which provides a literate, fluent API for testing routes
070 * using a <a href="http://jmock.org/">JMock style</a> API.
071 * <p/>
072 * The mock endpoint have two set of methods
073 * <ul>
074 *   <li>expectedXXX or expectsXXX - To set pre conditions, before the test is executed</li>
075 *   <li>assertXXX - To assert assertions, after the test has been executed</li>
076 * </ul>
077 * Its <b>important</b> to know the difference between the two set. The former is used to
078 * set expectations before the test is being started (eg before the mock receives messages).
079 * The latter is used after the test has been executed, to verify the expectations; or
080 * other assertions which you can perform after the test has been completed.
081 * <p/>
082 * <b>Beware:</b> If you want to expect a mock does not receive any messages, by calling
083 * {@link #setExpectedMessageCount(int)} with <tt>0</tt>, then take extra care,
084 * as <tt>0</tt> matches when the tests starts, so you need to set a assert period time
085 * to let the test run for a while to make sure there are still no messages arrived; for
086 * that use {@link #setAssertPeriod(long)}.
087 * An alternative is to use <a href="http://camel.apache.org/notifybuilder.html">NotifyBuilder</a>, and use the notifier
088 * to know when Camel is done routing some messages, before you call the {@link #assertIsSatisfied()} method on the mocks.
089 * This allows you to not use a fixed assert period, to speedup testing times.
090 * <p/>
091 * <b>Important:</b> If using {@link #expectedMessageCount(int)} and also {@link #expectedBodiesReceived(java.util.List)} or
092 * {@link #expectedHeaderReceived(String, Object)} then the latter overrides the number of expected message based on the
093 * number of values provided in the bodies/headers.
094 *
095 * @version 
096 */
097@UriEndpoint(scheme = "mock", title = "Mock", syntax = "mock:name", producerOnly = true, label = "core,testing", lenientProperties = true)
098public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
099    private static final Logger LOG = LoggerFactory.getLogger(MockEndpoint.class);
100    // must be volatile so changes is visible between the thread which performs the assertions
101    // and the threads which process the exchanges when routing messages in Camel
102    protected volatile Processor reporter;
103    
104    private volatile Processor defaultProcessor;
105    private volatile Map<Integer, Processor> processors;
106    private volatile List<Exchange> receivedExchanges;
107    private volatile List<Throwable> failures;
108    private volatile List<Runnable> tests;
109    private volatile CountDownLatch latch;
110    private volatile int expectedMinimumCount;
111    private volatile List<?> expectedBodyValues;
112    private volatile List<Object> actualBodyValues;
113    private volatile Map<String, Object> expectedHeaderValues;
114    private volatile Map<String, Object> actualHeaderValues;
115    private volatile Map<String, Object> expectedPropertyValues;
116    private volatile Map<String, Object> actualPropertyValues;
117
118    private volatile int counter;
119
120    @UriPath(description = "Name of mock endpoint") @Metadata(required = "true")
121    private String name;
122    @UriParam(label = "producer", defaultValue = "-1")
123    private int expectedCount;
124    @UriParam(label = "producer", defaultValue = "0")
125    private long sleepForEmptyTest;
126    @UriParam(label = "producer", defaultValue = "0")
127    private long resultWaitTime;
128    @UriParam(label = "producer", defaultValue = "0")
129    private long resultMinimumWaitTime;
130    @UriParam(label = "producer", defaultValue = "0")
131    private long assertPeriod;
132    @UriParam(label = "producer", defaultValue = "-1")
133    private int retainFirst;
134    @UriParam(label = "producer", defaultValue = "-1")
135    private int retainLast;
136    @UriParam(label = "producer")
137    private int reportGroup;
138    @UriParam(label = "producer,advanced", defaultValue = "true")
139    private boolean copyOnExchange = true;
140
141    public MockEndpoint(String endpointUri, Component component) {
142        super(endpointUri, component);
143        init();
144    }
145
146    @Deprecated
147    public MockEndpoint(String endpointUri) {
148        super(endpointUri);
149        init();
150    }
151
152    public MockEndpoint() {
153        this(null);
154    }
155
156    /**
157     * A helper method to resolve the mock endpoint of the given URI on the given context
158     *
159     * @param context the camel context to try resolve the mock endpoint from
160     * @param uri the uri of the endpoint to resolve
161     * @return the endpoint
162     */
163    public static MockEndpoint resolve(CamelContext context, String uri) {
164        return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class);
165    }
166
167    public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
168        long start = System.currentTimeMillis();
169        long left = unit.toMillis(timeout);
170        long end = start + left;
171        for (MockEndpoint endpoint : endpoints) {
172            if (!endpoint.await(left, TimeUnit.MILLISECONDS)) {
173                throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out.");
174            }
175            left = end - System.currentTimeMillis();
176            if (left <= 0) {
177                left = 0;
178            }
179        }
180    }
181
182    public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
183        assertWait(timeout, unit, endpoints);
184        for (MockEndpoint endpoint : endpoints) {
185            endpoint.assertIsSatisfied();
186        }
187    }
188
189    public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
190        for (MockEndpoint endpoint : endpoints) {
191            endpoint.assertIsSatisfied();
192        }
193    }
194
195
196    /**
197     * Asserts that all the expectations on any {@link MockEndpoint} instances registered
198     * in the given context are valid
199     *
200     * @param context the camel context used to find all the available endpoints to be asserted
201     */
202    public static void assertIsSatisfied(CamelContext context) throws InterruptedException {
203        ObjectHelper.notNull(context, "camelContext");
204        Collection<Endpoint> endpoints = context.getEndpoints();
205        for (Endpoint endpoint : endpoints) {
206            // if the endpoint was intercepted we should get the delegate
207            if (endpoint instanceof InterceptSendToEndpoint) {
208                endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
209            }
210            if (endpoint instanceof MockEndpoint) {
211                MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
212                mockEndpoint.assertIsSatisfied();
213            }
214        }
215    }
216
217    /**
218     * Asserts that all the expectations on any {@link MockEndpoint} instances registered
219     * in the given context are valid
220     *
221     * @param context the camel context used to find all the available endpoints to be asserted
222     * @param timeout timeout
223     * @param unit    time unit
224     */
225    public static void assertIsSatisfied(CamelContext context, long timeout, TimeUnit unit) throws InterruptedException {
226        ObjectHelper.notNull(context, "camelContext");
227        ObjectHelper.notNull(unit, "unit");
228        Collection<Endpoint> endpoints = context.getEndpoints();
229        long millis = unit.toMillis(timeout);
230        for (Endpoint endpoint : endpoints) {
231            // if the endpoint was intercepted we should get the delegate
232            if (endpoint instanceof InterceptSendToEndpoint) {
233                endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
234            }
235            if (endpoint instanceof MockEndpoint) {
236                MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
237                mockEndpoint.setResultWaitTime(millis);
238                mockEndpoint.assertIsSatisfied();
239            }
240        }
241    }
242
243    /**
244     * Sets the assert period on all the expectations on any {@link MockEndpoint} instances registered
245     * in the given context.
246     *
247     * @param context the camel context used to find all the available endpoints
248     * @param period the period in millis
249     */
250    public static void setAssertPeriod(CamelContext context, long period) {
251        ObjectHelper.notNull(context, "camelContext");
252        Collection<Endpoint> endpoints = context.getEndpoints();
253        for (Endpoint endpoint : endpoints) {
254            // if the endpoint was intercepted we should get the delegate
255            if (endpoint instanceof InterceptSendToEndpoint) {
256                endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
257            }
258            if (endpoint instanceof MockEndpoint) {
259                MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
260                mockEndpoint.setAssertPeriod(period);
261            }
262        }
263    }
264
265    /**
266     * Reset all mock endpoints
267     *
268     * @param context the camel context used to find all the available endpoints to reset
269     */
270    public static void resetMocks(CamelContext context) {
271        ObjectHelper.notNull(context, "camelContext");
272        Collection<Endpoint> endpoints = context.getEndpoints();
273        for (Endpoint endpoint : endpoints) {
274            // if the endpoint was intercepted we should get the delegate
275            if (endpoint instanceof InterceptSendToEndpoint) {
276                endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
277            }
278            if (endpoint instanceof MockEndpoint) {
279                MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
280                mockEndpoint.reset();
281            }
282        }
283    }
284
285    public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
286        for (MockEndpoint endpoint : endpoints) {
287            endpoint.setExpectedMessageCount(count);
288        }
289    }
290
291    public List<Exchange> getExchanges() {
292        return getReceivedExchanges();
293    }
294
295    public Consumer createConsumer(Processor processor) throws Exception {
296        throw new UnsupportedOperationException("You cannot consume from this endpoint");
297    }
298
299    public Producer createProducer() throws Exception {
300        return new DefaultAsyncProducer(this) {
301            public boolean process(Exchange exchange, AsyncCallback callback) {
302                onExchange(exchange);
303                callback.done(true);
304                return true;
305            }
306        };
307    }
308
309    public void reset() {
310        init();
311    }
312
313
314    // Testing API
315    // -------------------------------------------------------------------------
316
317    /**
318     * Handles the incoming exchange.
319     * <p/>
320     * This method turns this mock endpoint into a bean which you can use
321     * in the Camel routes, which allows you to inject MockEndpoint as beans
322     * in your routes and use the features of the mock to control the bean.
323     *
324     * @param exchange  the exchange
325     * @throws Exception can be thrown
326     */
327    @Handler
328    public void handle(Exchange exchange) throws Exception {
329        onExchange(exchange);
330    }
331
332    /**
333     * Set the processor that will be invoked when the index
334     * message is received.
335     */
336    public void whenExchangeReceived(int index, Processor processor) {
337        this.processors.put(index, processor);
338    }
339
340    /**
341     * Set the processor that will be invoked when the some message
342     * is received.
343     *
344     * This processor could be overwritten by
345     * {@link #whenExchangeReceived(int, Processor)} method.
346     */
347    public void whenAnyExchangeReceived(Processor processor) {
348        this.defaultProcessor = processor;
349    }
350    
351    /**
352     * Set the expression which value will be set to the message body
353     * @param expression which is use to set the message body 
354     */
355    public void returnReplyBody(Expression expression) {
356        this.defaultProcessor = ProcessorBuilder.setBody(expression);
357    }
358    
359    /**
360     * Set the expression which value will be set to the message header
361     * @param headerName that will be set value
362     * @param expression which is use to set the message header 
363     */
364    public void returnReplyHeader(String headerName, Expression expression) {
365        this.defaultProcessor = ProcessorBuilder.setHeader(headerName, expression);
366    }
367    
368
369    /**
370     * Validates that all the available expectations on this endpoint are
371     * satisfied; or throw an exception
372     */
373    public void assertIsSatisfied() throws InterruptedException {
374        assertIsSatisfied(sleepForEmptyTest);
375    }
376
377    /**
378     * Validates that all the available expectations on this endpoint are
379     * satisfied; or throw an exception
380     *
381     * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
382     *                should wait for the test to be true
383     */
384    public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
385        LOG.info("Asserting: " + this + " is satisfied");
386        doAssertIsSatisfied(timeoutForEmptyEndpoints);
387        if (assertPeriod > 0) {
388            // if an assert period was set then re-assert again to ensure the assertion is still valid
389            Thread.sleep(assertPeriod);
390            LOG.info("Re-asserting: " + this + " is satisfied after " + assertPeriod + " millis");
391            // do not use timeout when we re-assert
392            doAssertIsSatisfied(0);
393        }
394    }
395
396    protected void doAssertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
397        if (expectedCount == 0) {
398            if (timeoutForEmptyEndpoints > 0) {
399                LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
400                Thread.sleep(timeoutForEmptyEndpoints);
401            }
402            assertEquals("Received message count", expectedCount, getReceivedCounter());
403        } else if (expectedCount > 0) {
404            if (expectedCount != getReceivedCounter()) {
405                waitForCompleteLatch();
406            }
407            assertEquals("Received message count", expectedCount, getReceivedCounter());
408        } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
409            waitForCompleteLatch();
410        }
411
412        if (expectedMinimumCount >= 0) {
413            int receivedCounter = getReceivedCounter();
414            assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter);
415        }
416
417        for (Runnable test : tests) {
418            test.run();
419        }
420
421        for (Throwable failure : failures) {
422            if (failure != null) {
423                LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
424                fail("Failed due to caught exception: " + failure);
425            }
426        }
427    }
428
429    /**
430     * Validates that the assertions fail on this endpoint
431     */
432    public void assertIsNotSatisfied() throws InterruptedException {
433        boolean failed = false;
434        try {
435            assertIsSatisfied();
436            // did not throw expected error... fail!
437            failed = true;
438        } catch (AssertionError e) {
439            LOG.info("Caught expected failure: " + e);
440        }
441        if (failed) {
442            // fail() throws the AssertionError to indicate the test failed. 
443            fail("Expected assertion failure but test succeeded!");
444        }
445    }
446
447    /**
448     * Validates that the assertions fail on this endpoint
449
450     * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
451     *        should wait for the test to be true
452     */
453    public void assertIsNotSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
454        boolean failed = false;
455        try {
456            assertIsSatisfied(timeoutForEmptyEndpoints);
457            // did not throw expected error... fail!
458            failed = true;
459        } catch (AssertionError e) {
460            LOG.info("Caught expected failure: " + e);
461        }
462        if (failed) { 
463            // fail() throws the AssertionError to indicate the test failed. 
464            fail("Expected assertion failure but test succeeded!");
465        }
466    }    
467    
468    /**
469     * Specifies the expected number of message exchanges that should be
470     * received by this endpoint
471     *
472     * If you want to assert that <b>exactly</b> n messages arrives to this mock
473     * endpoint, then see also the {@link #setAssertPeriod(long)} method for further details.
474     *
475     * @param expectedCount the number of message exchanges that should be
476     *                expected by this endpoint
477     * @see #setAssertPeriod(long)
478     */
479    public void expectedMessageCount(int expectedCount) {
480        setExpectedMessageCount(expectedCount);
481    }
482
483    /**
484     * Sets a grace period after which the mock endpoint will re-assert
485     * to ensure the preliminary assertion is still valid.
486     * <p/>
487     * This is used for example to assert that <b>exactly</b> a number of messages 
488     * arrives. For example if {@link #expectedMessageCount(int)} was set to 5, then
489     * the assertion is satisfied when 5 or more message arrives. To ensure that
490     * exactly 5 messages arrives, then you would need to wait a little period
491     * to ensure no further message arrives. This is what you can use this
492     * {@link #setAssertPeriod(long)} method for.
493     * <p/>
494     * By default this period is disabled.
495     *
496     * @param period grace period in millis
497     */
498    public void setAssertPeriod(long period) {
499        this.assertPeriod = period;
500    }
501
502    /**
503     * Specifies the minimum number of expected message exchanges that should be
504     * received by this endpoint
505     *
506     * @param expectedCount the number of message exchanges that should be
507     *                expected by this endpoint
508     */
509    public void expectedMinimumMessageCount(int expectedCount) {
510        setMinimumExpectedMessageCount(expectedCount);
511    }
512
513    /**
514     * Sets an expectation that the given header name & value are received by this endpoint
515     * <p/>
516     * You can set multiple expectations for different header names.
517     * If you set a value of <tt>null</tt> that means we accept either the header is absent, or its value is <tt>null</tt>
518     * <p/>
519     * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then
520     * there must be 3 values.
521     * <p/>
522     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
523     */
524    public void expectedHeaderReceived(final String name, final Object value) {
525        if (expectedHeaderValues == null) {
526            expectedHeaderValues = new CaseInsensitiveMap();
527            // we just wants to expects to be called once
528            expects(new Runnable() {
529                public void run() {
530                    for (int i = 0; i < getReceivedExchanges().size(); i++) {
531                        Exchange exchange = getReceivedExchange(i);
532                        for (Map.Entry<String, Object> entry : expectedHeaderValues.entrySet()) {
533                            String key = entry.getKey();
534                            Object expectedValue = entry.getValue();
535
536                            // we accept that an expectedValue of null also means that the header may be absent
537                            if (expectedValue != null) {
538                                assertTrue("Exchange " + i + " has no headers", exchange.getIn().hasHeaders());
539                                boolean hasKey = exchange.getIn().getHeaders().containsKey(key);
540                                assertTrue("No header with name " + key + " found for message: " + i, hasKey);
541                            }
542
543                            Object actualValue = exchange.getIn().getHeader(key);
544                            actualValue = extractActualValue(exchange, actualValue, expectedValue);
545
546                            assertEquals("Header with name " + key + " for message: " + i, expectedValue, actualValue);
547                        }
548                    }
549                }
550            });
551        }
552        expectedHeaderValues.put(name, value);
553    }
554
555    /**
556     * Adds an expectation that the given header values are received by this
557     * endpoint in any order.
558     * <p/>
559     * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then
560     * there must be 3 values.
561     * <p/>
562     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
563     */
564    public void expectedHeaderValuesReceivedInAnyOrder(final String name, final List<?> values) {
565        expectedMessageCount(values.size());
566
567        expects(new Runnable() {
568            public void run() {
569                // these are the expected values to find
570                final Set<Object> actualHeaderValues = new CopyOnWriteArraySet<Object>(values);
571
572                for (int i = 0; i < getReceivedExchanges().size(); i++) {
573                    Exchange exchange = getReceivedExchange(i);
574
575                    Object actualValue = exchange.getIn().getHeader(name);
576                    for (Object expectedValue : actualHeaderValues) {
577                        actualValue = extractActualValue(exchange, actualValue, expectedValue);
578                        // remove any found values
579                        actualHeaderValues.remove(actualValue);
580                    }
581                }
582
583                // should be empty, as we should find all the values
584                assertTrue("Expected " + values.size() + " headers with key[" + name + "], received " + (values.size() - actualHeaderValues.size())
585                        + " headers. Expected header values: " + actualHeaderValues, actualHeaderValues.isEmpty());
586            }
587        });
588    }
589
590    /**
591     * Adds an expectation that the given header values are received by this
592     * endpoint in any order
593     * <p/>
594     * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then
595     * there must be 3 values.
596     * <p/>
597     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
598     */
599    public void expectedHeaderValuesReceivedInAnyOrder(String name, Object... values) {
600        List<Object> valueList = new ArrayList<Object>();
601        valueList.addAll(Arrays.asList(values));
602        expectedHeaderValuesReceivedInAnyOrder(name, valueList);
603    }
604
605    /**
606     * Sets an expectation that the given property name & value are received by this endpoint
607     * <p/>
608     * You can set multiple expectations for different property names.
609     * If you set a value of <tt>null</tt> that means we accept either the property is absent, or its value is <tt>null</tt>
610     */
611    public void expectedPropertyReceived(final String name, final Object value) {
612        if (expectedPropertyValues == null) {
613            expectedPropertyValues = new ConcurrentHashMap<String, Object>();
614        }
615        if (value != null) {
616            // ConcurrentHashMap cannot store null values
617            expectedPropertyValues.put(name, value);
618        }
619
620        expects(new Runnable() {
621            public void run() {
622                for (int i = 0; i < getReceivedExchanges().size(); i++) {
623                    Exchange exchange = getReceivedExchange(i);
624                    for (Map.Entry<String, Object> entry : expectedPropertyValues.entrySet()) {
625                        String key = entry.getKey();
626                        Object expectedValue = entry.getValue();
627
628                        // we accept that an expectedValue of null also means that the header may be absent
629                        if (expectedValue != null) {
630                            assertTrue("Exchange " + i + " has no properties", !exchange.getProperties().isEmpty());
631                            boolean hasKey = exchange.getProperties().containsKey(key);
632                            assertTrue("No property with name " + key + " found for message: " + i, hasKey);
633                        }
634
635                        Object actualValue = exchange.getProperty(key);
636                        actualValue = extractActualValue(exchange, actualValue, expectedValue);
637
638                        assertEquals("Property with name " + key + " for message: " + i, expectedValue, actualValue);
639                    }
640                }
641            }
642        });
643    }
644
645    /**
646     * Adds an expectation that the given body values are received by this
647     * endpoint in the specified order
648     * <p/>
649     * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then
650     * there must be 3 values.
651     * <p/>
652     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
653     */
654    public void expectedBodiesReceived(final List<?> bodies) {
655        expectedMessageCount(bodies.size());
656        this.expectedBodyValues = bodies;
657        this.actualBodyValues = new ArrayList<Object>();
658
659        expects(new Runnable() {
660            public void run() {
661                for (int i = 0; i < expectedBodyValues.size(); i++) {
662                    Exchange exchange = getReceivedExchange(i);
663                    assertTrue("No exchange received for counter: " + i, exchange != null);
664
665                    Object expectedBody = expectedBodyValues.get(i);
666                    Object actualBody = null;
667                    if (i < actualBodyValues.size()) {
668                        actualBody = actualBodyValues.get(i);
669                    }
670                    actualBody = extractActualValue(exchange, actualBody, expectedBody);
671
672                    assertEquals("Body of message: " + i, expectedBody, actualBody);
673                }
674            }
675        });
676    }
677
678    private Object extractActualValue(Exchange exchange, Object actualValue, Object expectedValue) {
679        if (actualValue == null) {
680            return null;
681        }
682
683        if (actualValue instanceof Expression) {
684            Class clazz = Object.class;
685            if (expectedValue != null) {
686                clazz = expectedValue.getClass();
687            }
688            actualValue = ((Expression)actualValue).evaluate(exchange, clazz);
689        } else if (actualValue instanceof Predicate) {
690            actualValue = ((Predicate)actualValue).matches(exchange);
691        } else if (expectedValue != null) {
692            String from = actualValue.getClass().getName();
693            String to = expectedValue.getClass().getName();
694            actualValue = getCamelContext().getTypeConverter().convertTo(expectedValue.getClass(), exchange, actualValue);
695            assertTrue("There is no type conversion possible from " + from + " to " + to, actualValue != null);
696        }
697        return actualValue;
698    }
699
700    /**
701     * Sets an expectation that the given predicates matches the received messages by this endpoint
702     */
703    public void expectedMessagesMatches(Predicate... predicates) {
704        for (int i = 0; i < predicates.length; i++) {
705            final int messageIndex = i;
706            final Predicate predicate = predicates[i];
707            final AssertionClause clause = new AssertionClause(this) {
708                public void run() {
709                    addPredicate(predicate);
710                    applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
711                }
712            };
713            expects(clause);
714        }
715    }
716
717    /**
718     * Sets an expectation that the given body values are received by this endpoint
719     * <p/>
720     * <b>Important:</b> The number of bodies must match the expected number of messages, so if you expect 3 messages, then
721     * there must be 3 bodies.
722     * <p/>
723     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
724     */
725    public void expectedBodiesReceived(Object... bodies) {
726        List<Object> bodyList = new ArrayList<Object>();
727        bodyList.addAll(Arrays.asList(bodies));
728        expectedBodiesReceived(bodyList);
729    }
730
731    /**
732     * Adds an expectation that the given body value are received by this endpoint
733     */
734    public AssertionClause expectedBodyReceived() {
735        expectedMessageCount(1);
736        final AssertionClause clause = new AssertionClause(this) {
737            public void run() {
738                Exchange exchange = getReceivedExchange(0);
739                assertTrue("No exchange received for counter: " + 0, exchange != null);
740
741                Object actualBody = exchange.getIn().getBody();
742                Expression exp = createExpression(getCamelContext());
743                Object expectedBody = exp.evaluate(exchange, Object.class);
744
745                assertEquals("Body of message: " + 0, expectedBody, actualBody);
746            }
747        };
748        expects(clause);
749        return clause;
750    }
751
752    /**
753     * Adds an expectation that the given body values are received by this
754     * endpoint in any order
755     * <p/>
756     * <b>Important:</b> The number of bodies must match the expected number of messages, so if you expect 3 messages, then
757     * there must be 3 bodies.
758     * <p/>
759     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
760     */
761    public void expectedBodiesReceivedInAnyOrder(final List<?> bodies) {
762        expectedMessageCount(bodies.size());
763        this.expectedBodyValues = bodies;
764        this.actualBodyValues = new ArrayList<Object>();
765
766        expects(new Runnable() {
767            public void run() {
768                List<Object> actualBodyValuesSet = new ArrayList<Object>(actualBodyValues);
769                for (int i = 0; i < expectedBodyValues.size(); i++) {
770                    Exchange exchange = getReceivedExchange(i);
771                    assertTrue("No exchange received for counter: " + i, exchange != null);
772
773                    Object expectedBody = expectedBodyValues.get(i);
774                    assertTrue("Message with body " + expectedBody + " was expected but not found in " + actualBodyValuesSet, actualBodyValuesSet.remove(expectedBody));
775                }
776            }
777        });
778    }
779
780    /**
781     * Adds an expectation that the given body values are received by this
782     * endpoint in any order
783     * <p/>
784     * <b>Important:</b> The number of bodies must match the expected number of messages, so if you expect 3 messages, then
785     * there must be 3 bodies.
786     * <p/>
787     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
788     */
789    public void expectedBodiesReceivedInAnyOrder(Object... bodies) {
790        List<Object> bodyList = new ArrayList<Object>();
791        bodyList.addAll(Arrays.asList(bodies));
792        expectedBodiesReceivedInAnyOrder(bodyList);
793    }
794
795    /**
796     * Adds an expectation that a file exists with the given name
797     *
798     * @param name name of file, will cater for / and \ on different OS platforms
799     */
800    public void expectedFileExists(final String name) {
801        expectedFileExists(name, null);
802    }
803
804    /**
805     * Adds an expectation that a file exists with the given name
806     * <p/>
807     * Will wait at most 5 seconds while checking for the existence of the file.
808     *
809     * @param name name of file, will cater for / and \ on different OS platforms
810     * @param content content of file to compare, can be <tt>null</tt> to not compare content
811     */
812    public void expectedFileExists(final String name, final String content) {
813        final File file = new File(FileUtil.normalizePath(name));
814
815        expects(new Runnable() {
816            public void run() {
817                // wait at most 5 seconds for the file to exists
818                final long timeout = System.currentTimeMillis() + 5000;
819
820                boolean stop = false;
821                while (!stop && !file.exists()) {
822                    try {
823                        Thread.sleep(50);
824                    } catch (InterruptedException e) {
825                        // ignore
826                    }
827                    stop = System.currentTimeMillis() > timeout;
828                }
829
830                assertTrue("The file should exists: " + name, file.exists());
831
832                if (content != null) {
833                    String body = getCamelContext().getTypeConverter().convertTo(String.class, file);
834                    assertEquals("Content of file: " + name, content, body);
835                }
836            }
837        });
838    }
839
840    /**
841     * Adds an expectation that messages received should have the given exchange pattern
842     */
843    public void expectedExchangePattern(final ExchangePattern exchangePattern) {
844        expectedMessagesMatches(new Predicate() {
845            public boolean matches(Exchange exchange) {
846                return exchange.getPattern().equals(exchangePattern);
847            }
848        });
849    }
850
851    /**
852     * Adds an expectation that messages received should have ascending values
853     * of the given expression such as a user generated counter value
854     */
855    public void expectsAscending(final Expression expression) {
856        expects(new Runnable() {
857            public void run() {
858                assertMessagesAscending(expression);
859            }
860        });
861    }
862
863    /**
864     * Adds an expectation that messages received should have ascending values
865     * of the given expression such as a user generated counter value
866     */
867    public AssertionClause expectsAscending() {
868        final AssertionClause clause = new AssertionClause(this) {
869            public void run() {
870                assertMessagesAscending(createExpression(getCamelContext()));
871            }
872        };
873        expects(clause);
874        return clause;
875    }
876
877    /**
878     * Adds an expectation that messages received should have descending values
879     * of the given expression such as a user generated counter value
880     */
881    public void expectsDescending(final Expression expression) {
882        expects(new Runnable() {
883            public void run() {
884                assertMessagesDescending(expression);
885            }
886        });
887    }
888
889    /**
890     * Adds an expectation that messages received should have descending values
891     * of the given expression such as a user generated counter value
892     */
893    public AssertionClause expectsDescending() {
894        final AssertionClause clause = new AssertionClause(this) {
895            public void run() {
896                assertMessagesDescending(createExpression(getCamelContext()));
897            }
898        };
899        expects(clause);
900        return clause;
901    }
902
903    /**
904     * Adds an expectation that no duplicate messages should be received using
905     * the expression to determine the message ID
906     *
907     * @param expression the expression used to create a unique message ID for
908     *                message comparison (which could just be the message
909     *                payload if the payload can be tested for uniqueness using
910     *                {@link Object#equals(Object)} and
911     *                {@link Object#hashCode()}
912     */
913    public void expectsNoDuplicates(final Expression expression) {
914        expects(new Runnable() {
915            public void run() {
916                assertNoDuplicates(expression);
917            }
918        });
919    }
920
921    /**
922     * Adds an expectation that no duplicate messages should be received using
923     * the expression to determine the message ID
924     */
925    public AssertionClause expectsNoDuplicates() {
926        final AssertionClause clause = new AssertionClause(this) {
927            public void run() {
928                assertNoDuplicates(createExpression(getCamelContext()));
929            }
930        };
931        expects(clause);
932        return clause;
933    }
934
935    /**
936     * Asserts that the messages have ascending values of the given expression
937     */
938    public void assertMessagesAscending(Expression expression) {
939        assertMessagesSorted(expression, true);
940    }
941
942    /**
943     * Asserts that the messages have descending values of the given expression
944     */
945    public void assertMessagesDescending(Expression expression) {
946        assertMessagesSorted(expression, false);
947    }
948
949    protected void assertMessagesSorted(Expression expression, boolean ascending) {
950        String type = ascending ? "ascending" : "descending";
951        ExpressionComparator comparator = new ExpressionComparator(expression);
952        List<Exchange> list = getReceivedExchanges();
953        for (int i = 1; i < list.size(); i++) {
954            int j = i - 1;
955            Exchange e1 = list.get(j);
956            Exchange e2 = list.get(i);
957            int result = comparator.compare(e1, e2);
958            if (result == 0) {
959                fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: "
960                    + expression.evaluate(e1, Object.class) + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
961            } else {
962                if (!ascending) {
963                    result = result * -1;
964                }
965                if (result > 0) {
966                    fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1, Object.class)
967                        + " and message " + i + " has value: " + expression.evaluate(e2, Object.class) + " for expression: "
968                        + expression + ". Exchanges: " + e1 + " and " + e2);
969                }
970            }
971        }
972    }
973
974    public void assertNoDuplicates(Expression expression) {
975        Map<Object, Exchange> map = new HashMap<Object, Exchange>();
976        List<Exchange> list = getReceivedExchanges();
977        for (int i = 0; i < list.size(); i++) {
978            Exchange e2 = list.get(i);
979            Object key = expression.evaluate(e2, Object.class);
980            Exchange e1 = map.get(key);
981            if (e1 != null) {
982                fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
983            } else {
984                map.put(key, e2);
985            }
986        }
987    }
988
989    /**
990     * Adds the expectation which will be invoked when enough messages are received
991     */
992    public void expects(Runnable runnable) {
993        tests.add(runnable);
994    }
995
996    /**
997     * Adds an assertion to the given message index
998     *
999     * @param messageIndex the number of the message
1000     * @return the assertion clause
1001     */
1002    public AssertionClause message(final int messageIndex) {
1003        final AssertionClause clause = new AssertionClause(this) {
1004            public void run() {
1005                applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
1006            }
1007        };
1008        expects(clause);
1009        return clause;
1010    }
1011
1012    /**
1013     * Adds an assertion to all the received messages
1014     *
1015     * @return the assertion clause
1016     */
1017    public AssertionClause allMessages() {
1018        final AssertionClause clause = new AssertionClause(this) {
1019            public void run() {
1020                List<Exchange> list = getReceivedExchanges();
1021                int index = 0;
1022                for (Exchange exchange : list) {
1023                    applyAssertionOn(MockEndpoint.this, index++, exchange);
1024                }
1025            }
1026        };
1027        expects(clause);
1028        return clause;
1029    }
1030
1031    /**
1032     * Asserts that the given index of message is received (starting at zero)
1033     */
1034    public Exchange assertExchangeReceived(int index) {
1035        int count = getReceivedCounter();
1036        assertTrue("Not enough messages received. Was: " + count, count > index);
1037        return getReceivedExchange(index);
1038    }
1039
1040    // Properties
1041    // -------------------------------------------------------------------------
1042
1043    public String getName() {
1044        return name;
1045    }
1046
1047    public void setName(String name) {
1048        this.name = name;
1049    }
1050
1051    public List<Throwable> getFailures() {
1052        return failures;
1053    }
1054
1055    public int getReceivedCounter() {
1056        return counter;
1057    }
1058
1059    public List<Exchange> getReceivedExchanges() {
1060        return receivedExchanges;
1061    }
1062
1063    public int getExpectedCount() {
1064        return expectedCount;
1065    }
1066
1067    public long getSleepForEmptyTest() {
1068        return sleepForEmptyTest;
1069    }
1070
1071    /**
1072     * Allows a sleep to be specified to wait to check that this endpoint really
1073     * is empty when {@link #expectedMessageCount(int)} is called with zero
1074     *
1075     * @param sleepForEmptyTest the milliseconds to sleep for to determine that
1076     *                this endpoint really is empty
1077     */
1078    public void setSleepForEmptyTest(long sleepForEmptyTest) {
1079        this.sleepForEmptyTest = sleepForEmptyTest;
1080    }
1081
1082    public long getResultWaitTime() {
1083        return resultWaitTime;
1084    }
1085
1086    /**
1087     * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will
1088     * wait on a latch until it is satisfied
1089     */
1090    public void setResultWaitTime(long resultWaitTime) {
1091        this.resultWaitTime = resultWaitTime;
1092    }
1093
1094    /**
1095     * Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will
1096     * wait on a latch until it is satisfied
1097     */
1098    public void setResultMinimumWaitTime(long resultMinimumWaitTime) {
1099        this.resultMinimumWaitTime = resultMinimumWaitTime;
1100    }
1101
1102    /**
1103     * @deprecated use {@link #setResultMinimumWaitTime(long)}
1104     */
1105    @Deprecated
1106    public void setMinimumResultWaitTime(long resultMinimumWaitTime) {
1107        setResultMinimumWaitTime(resultMinimumWaitTime);
1108    }
1109
1110    /**
1111     * Specifies the expected number of message exchanges that should be
1112     * received by this endpoint.
1113     * <p/>
1114     * <b>Beware:</b> If you want to expect that <tt>0</tt> messages, then take extra care,
1115     * as <tt>0</tt> matches when the tests starts, so you need to set a assert period time
1116     * to let the test run for a while to make sure there are still no messages arrived; for
1117     * that use {@link #setAssertPeriod(long)}.
1118     * An alternative is to use <a href="http://camel.apache.org/notifybuilder.html">NotifyBuilder</a>, and use the notifier
1119     * to know when Camel is done routing some messages, before you call the {@link #assertIsSatisfied()} method on the mocks.
1120     * This allows you to not use a fixed assert period, to speedup testing times.
1121     * <p/>
1122     * If you want to assert that <b>exactly</b> n'th message arrives to this mock
1123     * endpoint, then see also the {@link #setAssertPeriod(long)} method for further details.
1124     *
1125     * @param expectedCount the number of message exchanges that should be
1126     *                expected by this endpoint
1127     * @see #setAssertPeriod(long)                      
1128     */
1129    public void setExpectedCount(int expectedCount) {
1130        setExpectedMessageCount(expectedCount);
1131    }
1132
1133    /**
1134     * @see #setExpectedCount(int)
1135     */
1136    public void setExpectedMessageCount(int expectedCount) {
1137        this.expectedCount = expectedCount;
1138        if (expectedCount <= 0) {
1139            latch = null;
1140        } else {
1141            latch = new CountDownLatch(expectedCount);
1142        }
1143    }
1144
1145    /**
1146     * Specifies the minimum number of expected message exchanges that should be
1147     * received by this endpoint
1148     *
1149     * @param expectedCount the number of message exchanges that should be
1150     *                expected by this endpoint
1151     */
1152    public void setMinimumExpectedMessageCount(int expectedCount) {
1153        this.expectedMinimumCount = expectedCount;
1154        if (expectedCount <= 0) {
1155            latch = null;
1156        } else {
1157            latch = new CountDownLatch(expectedMinimumCount);
1158        }
1159    }
1160
1161    public Processor getReporter() {
1162        return reporter;
1163    }
1164
1165    /**
1166     * Allows a processor to added to the endpoint to report on progress of the test
1167     */
1168    public void setReporter(Processor reporter) {
1169        this.reporter = reporter;
1170    }
1171
1172    /**
1173     * Specifies to only retain the first n'th number of received {@link Exchange}s.
1174     * <p/>
1175     * This is used when testing with big data, to reduce memory consumption by not storing
1176     * copies of every {@link Exchange} this mock endpoint receives.
1177     * <p/>
1178     * <b>Important:</b> When using this limitation, then the {@link #getReceivedCounter()}
1179     * will still return the actual number of received {@link Exchange}s. For example
1180     * if we have received 5000 {@link Exchange}s, and have configured to only retain the first
1181     * 10 {@link Exchange}s, then the {@link #getReceivedCounter()} will still return <tt>5000</tt>
1182     * but there is only the first 10 {@link Exchange}s in the {@link #getExchanges()} and
1183     * {@link #getReceivedExchanges()} methods.
1184     * <p/>
1185     * When using this method, then some of the other expectation methods is not supported,
1186     * for example the {@link #expectedBodiesReceived(Object...)} sets a expectation on the first
1187     * number of bodies received.
1188     * <p/>
1189     * You can configure both {@link #setRetainFirst(int)} and {@link #setRetainLast(int)} methods,
1190     * to limit both the first and last received.
1191     * 
1192     * @param retainFirst  to limit and only keep the first n'th received {@link Exchange}s, use
1193     *                     <tt>0</tt> to not retain any messages, or <tt>-1</tt> to retain all.
1194     * @see #setRetainLast(int)
1195     */
1196    public void setRetainFirst(int retainFirst) {
1197        this.retainFirst = retainFirst;
1198    }
1199
1200    /**
1201     * Specifies to only retain the last n'th number of received {@link Exchange}s.
1202     * <p/>
1203     * This is used when testing with big data, to reduce memory consumption by not storing
1204     * copies of every {@link Exchange} this mock endpoint receives.
1205     * <p/>
1206     * <b>Important:</b> When using this limitation, then the {@link #getReceivedCounter()}
1207     * will still return the actual number of received {@link Exchange}s. For example
1208     * if we have received 5000 {@link Exchange}s, and have configured to only retain the last
1209     * 20 {@link Exchange}s, then the {@link #getReceivedCounter()} will still return <tt>5000</tt>
1210     * but there is only the last 20 {@link Exchange}s in the {@link #getExchanges()} and
1211     * {@link #getReceivedExchanges()} methods.
1212     * <p/>
1213     * When using this method, then some of the other expectation methods is not supported,
1214     * for example the {@link #expectedBodiesReceived(Object...)} sets a expectation on the first
1215     * number of bodies received.
1216     * <p/>
1217     * You can configure both {@link #setRetainFirst(int)} and {@link #setRetainLast(int)} methods,
1218     * to limit both the first and last received.
1219     *
1220     * @param retainLast  to limit and only keep the last n'th received {@link Exchange}s, use
1221     *                     <tt>0</tt> to not retain any messages, or <tt>-1</tt> to retain all.
1222     * @see #setRetainFirst(int)
1223     */
1224    public void setRetainLast(int retainLast) {
1225        this.retainLast = retainLast;
1226    }
1227
1228    public int isReportGroup() {
1229        return reportGroup;
1230    }
1231
1232    /**
1233     * A number that is used to turn on throughput logging based on groups of the size.
1234     */
1235    public void setReportGroup(int reportGroup) {
1236        this.reportGroup = reportGroup;
1237    }
1238
1239    public boolean isCopyOnExchange() {
1240        return copyOnExchange;
1241    }
1242
1243    /**
1244     * Sets whether to make a deep copy of the incoming {@link Exchange} when received at this mock endpoint.
1245     * <p/>
1246     * Is by default <tt>true</tt>.
1247     */
1248    public void setCopyOnExchange(boolean copyOnExchange) {
1249        this.copyOnExchange = copyOnExchange;
1250    }
1251
1252    // Implementation methods
1253    // -------------------------------------------------------------------------
1254    private void init() {
1255        expectedCount = -1;
1256        counter = 0;
1257        defaultProcessor = null;
1258        processors = new HashMap<Integer, Processor>();
1259        receivedExchanges = new CopyOnWriteArrayList<Exchange>();
1260        failures = new CopyOnWriteArrayList<Throwable>();
1261        tests = new CopyOnWriteArrayList<Runnable>();
1262        latch = null;
1263        sleepForEmptyTest = 0;
1264        resultWaitTime = 0;
1265        resultMinimumWaitTime = 0L;
1266        assertPeriod = 0L;
1267        expectedMinimumCount = -1;
1268        expectedBodyValues = null;
1269        actualBodyValues = new ArrayList<Object>();
1270        expectedHeaderValues = null;
1271        actualHeaderValues = null;
1272        expectedPropertyValues = null;
1273        actualPropertyValues = null;
1274        retainFirst = -1;
1275        retainLast = -1;
1276    }
1277
1278    protected synchronized void onExchange(Exchange exchange) {
1279        try {
1280            if (reporter != null) {
1281                reporter.process(exchange);
1282            }
1283            Exchange copy = exchange;
1284            if (copyOnExchange) {
1285                // copy the exchange so the mock stores the copy and not the actual exchange
1286                copy = ExchangeHelper.createCopy(exchange, true);
1287            }
1288            performAssertions(exchange, copy);
1289        } catch (Throwable e) {
1290            // must catch java.lang.Throwable as AssertionError extends java.lang.Error
1291            failures.add(e);
1292        } finally {
1293            // make sure latch is counted down to avoid test hanging forever
1294            if (latch != null) {
1295                latch.countDown();
1296            }
1297        }
1298    }
1299
1300    /**
1301     * Performs the assertions on the incoming exchange.
1302     *
1303     * @param exchange   the actual exchange
1304     * @param copy       a copy of the exchange (only store this)
1305     * @throws Exception can be thrown if something went wrong
1306     */
1307    protected void performAssertions(Exchange exchange, Exchange copy) throws Exception {
1308        Message in = copy.getIn();
1309        Object actualBody = in.getBody();
1310
1311        if (expectedHeaderValues != null) {
1312            if (actualHeaderValues == null) {
1313                actualHeaderValues = new CaseInsensitiveMap();
1314            }
1315            if (in.hasHeaders()) {
1316                actualHeaderValues.putAll(in.getHeaders());
1317            }
1318        }
1319
1320        if (expectedPropertyValues != null) {
1321            if (actualPropertyValues == null) {
1322                actualPropertyValues = new ConcurrentHashMap<String, Object>();
1323            }
1324            actualPropertyValues.putAll(copy.getProperties());
1325        }
1326
1327        if (expectedBodyValues != null) {
1328            int index = actualBodyValues.size();
1329            if (expectedBodyValues.size() > index) {
1330                Object expectedBody = expectedBodyValues.get(index);
1331                if (expectedBody != null) {
1332                    // prefer to convert body early, for example when using files
1333                    // we need to read the content at this time
1334                    Object body = in.getBody(expectedBody.getClass());
1335                    if (body != null) {
1336                        actualBody = body;
1337                    }
1338                }
1339                actualBodyValues.add(actualBody);
1340            }
1341        }
1342
1343        // let counter be 0 index-based in the logs
1344        if (LOG.isDebugEnabled()) {
1345            String msg = getEndpointUri() + " >>>> " + counter + " : " + copy + " with body: " + actualBody;
1346            if (copy.getIn().hasHeaders()) {
1347                msg += " and headers:" + copy.getIn().getHeaders();
1348            }
1349            LOG.debug(msg);
1350        }
1351
1352        // record timestamp when exchange was received
1353        copy.setProperty(Exchange.RECEIVED_TIMESTAMP, new Date());
1354
1355        // add a copy of the received exchange
1356        addReceivedExchange(copy);
1357        // and then increment counter after adding received exchange
1358        ++counter;
1359
1360        Processor processor = processors.get(getReceivedCounter()) != null
1361                ? processors.get(getReceivedCounter()) : defaultProcessor;
1362
1363        if (processor != null) {
1364            try {
1365                // must process the incoming exchange and NOT the copy as the idea
1366                // is the end user can manipulate the exchange
1367                processor.process(exchange);
1368            } catch (Exception e) {
1369                // set exceptions on exchange so we can throw exceptions to simulate errors
1370                exchange.setException(e);
1371            }
1372        }
1373    }
1374
1375    /**
1376     * Adds the received exchange.
1377     * 
1378     * @param copy  a copy of the received exchange
1379     */
1380    protected void addReceivedExchange(Exchange copy) {
1381        if (retainFirst == 0 && retainLast == 0) {
1382            // do not retain any messages at all
1383        } else if (retainFirst < 0 && retainLast < 0) {
1384            // no limitation so keep them all
1385            receivedExchanges.add(copy);
1386        } else {
1387            // okay there is some sort of limitations, so figure out what to retain
1388            if (retainFirst > 0 && counter < retainFirst) {
1389                // store a copy as its within the retain first limitation
1390                receivedExchanges.add(copy);
1391            } else if (retainLast > 0) {
1392                // remove the oldest from the last retained boundary,
1393                int index = receivedExchanges.size() - retainLast;
1394                if (index >= 0) {
1395                    // but must be outside the first range as well
1396                    // otherwise we should not remove the oldest
1397                    if (retainFirst <= 0 || retainFirst <= index) {
1398                        receivedExchanges.remove(index);
1399                    }
1400                }
1401                // store a copy of the last n'th received
1402                receivedExchanges.add(copy);
1403            }
1404        }
1405    }
1406
1407    protected void waitForCompleteLatch() throws InterruptedException {
1408        if (latch == null) {
1409            fail("Should have a latch!");
1410        }
1411
1412        StopWatch watch = new StopWatch();
1413        waitForCompleteLatch(resultWaitTime);
1414        long delta = watch.stop();
1415        LOG.debug("Took {} millis to complete latch", delta);
1416
1417        if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) {
1418            fail("Expected minimum " + resultMinimumWaitTime
1419                + " millis waiting on the result, but was faster with " + delta + " millis.");
1420        }
1421    }
1422
1423    protected void waitForCompleteLatch(long timeout) throws InterruptedException {
1424        // Wait for a default 10 seconds if resultWaitTime is not set
1425        long waitTime = timeout == 0 ? 10000L : timeout;
1426
1427        // now let's wait for the results
1428        LOG.debug("Waiting on the latch for: " + timeout + " millis");
1429        latch.await(waitTime, TimeUnit.MILLISECONDS);
1430    }
1431
1432    protected void assertEquals(String message, Object expectedValue, Object actualValue) {
1433        if (!ObjectHelper.equal(expectedValue, actualValue)) {
1434            fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
1435        }
1436    }
1437
1438    protected void assertTrue(String message, boolean predicate) {
1439        if (!predicate) {
1440            fail(message);
1441        }
1442    }
1443
1444    protected void fail(Object message) {
1445        if (LOG.isDebugEnabled()) {
1446            List<Exchange> list = getReceivedExchanges();
1447            int index = 0;
1448            for (Exchange exchange : list) {
1449                LOG.debug("{} failed and received[{}]: {}", new Object[]{getEndpointUri(), ++index, exchange});
1450            }
1451        }
1452        throw new AssertionError(getEndpointUri() + " " + message);
1453    }
1454
1455    public int getExpectedMinimumCount() {
1456        return expectedMinimumCount;
1457    }
1458
1459    public void await() throws InterruptedException {
1460        if (latch != null) {
1461            latch.await();
1462        }
1463    }
1464
1465    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
1466        if (latch != null) {
1467            return latch.await(timeout, unit);
1468        }
1469        return true;
1470    }
1471
1472    public boolean isSingleton() {
1473        return true;
1474    }
1475
1476    public boolean isLenientProperties() {
1477        return true;
1478    }
1479
1480    private Exchange getReceivedExchange(int index) {
1481        if (index <= receivedExchanges.size() - 1) {
1482            return receivedExchanges.get(index);
1483        } else {
1484            return null;
1485        }
1486    }
1487
1488}