001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.mock;
018
019import java.io.File;
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Date;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.CopyOnWriteArrayList;
029import java.util.concurrent.CopyOnWriteArraySet;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.camel.AsyncCallback;
034import org.apache.camel.CamelContext;
035import org.apache.camel.Component;
036import org.apache.camel.Consumer;
037import org.apache.camel.Endpoint;
038import org.apache.camel.Exchange;
039import org.apache.camel.ExchangePattern;
040import org.apache.camel.Expression;
041import org.apache.camel.Handler;
042import org.apache.camel.Message;
043import org.apache.camel.Predicate;
044import org.apache.camel.Processor;
045import org.apache.camel.Producer;
046import org.apache.camel.builder.ProcessorBuilder;
047import org.apache.camel.impl.DefaultAsyncProducer;
048import org.apache.camel.impl.DefaultEndpoint;
049import org.apache.camel.impl.InterceptSendToEndpoint;
050import org.apache.camel.spi.BrowsableEndpoint;
051import org.apache.camel.spi.Metadata;
052import org.apache.camel.spi.UriEndpoint;
053import org.apache.camel.spi.UriParam;
054import org.apache.camel.spi.UriPath;
055import org.apache.camel.util.CamelContextHelper;
056import org.apache.camel.util.ExchangeHelper;
057import org.apache.camel.util.ExpressionComparator;
058import org.apache.camel.util.FileUtil;
059import org.apache.camel.util.ObjectHelper;
060import org.apache.camel.util.StopWatch;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * The mock component is used for testing routes and mediation rules using mocks.
066 * <p/>
067 * A Mock endpoint which provides a literate, fluent API for testing routes
068 * using a <a href="http://jmock.org/">JMock style</a> API.
069 * <p/>
070 * The mock endpoint have two set of methods
071 * <ul>
072 *   <li>expectedXXX or expectsXXX - To set pre conditions, before the test is executed</li>
073 *   <li>assertXXX - To assert assertions, after the test has been executed</li>
074 * </ul>
075 * Its <b>important</b> to know the difference between the two set. The former is used to
076 * set expectations before the test is being started (eg before the mock receives messages).
077 * The latter is used after the test has been executed, to verify the expectations; or
078 * other assertions which you can perform after the test has been completed.
079 * <p/>
080 * <b>Beware:</b> If you want to expect a mock does not receive any messages, by calling
081 * {@link #setExpectedMessageCount(int)} with <tt>0</tt>, then take extra care,
082 * as <tt>0</tt> matches when the tests starts, so you need to set a assert period time
083 * to let the test run for a while to make sure there are still no messages arrived; for
084 * that use {@link #setAssertPeriod(long)}.
085 * An alternative is to use <a href="http://camel.apache.org/notifybuilder.html">NotifyBuilder</a>, and use the notifier
086 * to know when Camel is done routing some messages, before you call the {@link #assertIsSatisfied()} method on the mocks.
087 * This allows you to not use a fixed assert period, to speedup testing times.
088 * <p/>
089 * <b>Important:</b> If using {@link #expectedMessageCount(int)} and also {@link #expectedBodiesReceived(java.util.List)} or
090 * {@link #expectedHeaderReceived(String, Object)} then the latter overrides the number of expected message based on the
091 * number of values provided in the bodies/headers.
092 *
093 * @version 
094 */
095@UriEndpoint(firstVersion = "1.0.0", scheme = "mock", title = "Mock", syntax = "mock:name", producerOnly = true, label = "core,testing", lenientProperties = true)
096public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
097    private static final Logger LOG = LoggerFactory.getLogger(MockEndpoint.class);
098    // must be volatile so changes is visible between the thread which performs the assertions
099    // and the threads which process the exchanges when routing messages in Camel
100    protected volatile Processor reporter;
101    
102    private volatile Processor defaultProcessor;
103    private volatile Map<Integer, Processor> processors;
104    private volatile List<Exchange> receivedExchanges;
105    private volatile List<Throwable> failures;
106    private volatile List<Runnable> tests;
107    private volatile CountDownLatch latch;
108    private volatile int expectedMinimumCount;
109    private volatile List<?> expectedBodyValues;
110    private volatile List<Object> actualBodyValues;
111    private volatile Map<String, Object> expectedHeaderValues;
112    private volatile Map<String, Object> actualHeaderValues;
113    private volatile Map<String, Object> expectedPropertyValues;
114    private volatile Map<String, Object> actualPropertyValues;
115
116    private volatile int counter;
117
118    @UriPath(description = "Name of mock endpoint") @Metadata(required = "true")
119    private String name;
120    @UriParam(label = "producer", defaultValue = "-1")
121    private int expectedCount;
122    @UriParam(label = "producer", defaultValue = "0")
123    private long sleepForEmptyTest;
124    @UriParam(label = "producer", defaultValue = "0")
125    private long resultWaitTime;
126    @UriParam(label = "producer", defaultValue = "0")
127    private long resultMinimumWaitTime;
128    @UriParam(label = "producer", defaultValue = "0")
129    private long assertPeriod;
130    @UriParam(label = "producer", defaultValue = "-1")
131    private int retainFirst;
132    @UriParam(label = "producer", defaultValue = "-1")
133    private int retainLast;
134    @UriParam(label = "producer")
135    private int reportGroup;
136    @UriParam(label = "producer,advanced", defaultValue = "true")
137    private boolean copyOnExchange = true;
138
139    public MockEndpoint(String endpointUri, Component component) {
140        super(endpointUri, component);
141        init();
142    }
143
144    @Deprecated
145    public MockEndpoint(String endpointUri) {
146        super(endpointUri);
147        init();
148    }
149
150    public MockEndpoint() {
151        this(null);
152    }
153
154    /**
155     * A helper method to resolve the mock endpoint of the given URI on the given context
156     *
157     * @param context the camel context to try resolve the mock endpoint from
158     * @param uri the uri of the endpoint to resolve
159     * @return the endpoint
160     */
161    public static MockEndpoint resolve(CamelContext context, String uri) {
162        return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class);
163    }
164
165    public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
166        long start = System.currentTimeMillis();
167        long left = unit.toMillis(timeout);
168        long end = start + left;
169        for (MockEndpoint endpoint : endpoints) {
170            if (!endpoint.await(left, TimeUnit.MILLISECONDS)) {
171                throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out.");
172            }
173            left = end - System.currentTimeMillis();
174            if (left <= 0) {
175                left = 0;
176            }
177        }
178    }
179
180    public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
181        assertWait(timeout, unit, endpoints);
182        for (MockEndpoint endpoint : endpoints) {
183            endpoint.assertIsSatisfied();
184        }
185    }
186
187    public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
188        for (MockEndpoint endpoint : endpoints) {
189            endpoint.assertIsSatisfied();
190        }
191    }
192
193
194    /**
195     * Asserts that all the expectations on any {@link MockEndpoint} instances registered
196     * in the given context are valid
197     *
198     * @param context the camel context used to find all the available endpoints to be asserted
199     */
200    public static void assertIsSatisfied(CamelContext context) throws InterruptedException {
201        ObjectHelper.notNull(context, "camelContext");
202        Collection<Endpoint> endpoints = context.getEndpoints();
203        for (Endpoint endpoint : endpoints) {
204            // if the endpoint was intercepted we should get the delegate
205            if (endpoint instanceof InterceptSendToEndpoint) {
206                endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
207            }
208            if (endpoint instanceof MockEndpoint) {
209                MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
210                mockEndpoint.assertIsSatisfied();
211            }
212        }
213    }
214
215    /**
216     * Asserts that all the expectations on any {@link MockEndpoint} instances registered
217     * in the given context are valid
218     *
219     * @param context the camel context used to find all the available endpoints to be asserted
220     * @param timeout timeout
221     * @param unit    time unit
222     */
223    public static void assertIsSatisfied(CamelContext context, long timeout, TimeUnit unit) throws InterruptedException {
224        ObjectHelper.notNull(context, "camelContext");
225        ObjectHelper.notNull(unit, "unit");
226        Collection<Endpoint> endpoints = context.getEndpoints();
227        long millis = unit.toMillis(timeout);
228        for (Endpoint endpoint : endpoints) {
229            // if the endpoint was intercepted we should get the delegate
230            if (endpoint instanceof InterceptSendToEndpoint) {
231                endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
232            }
233            if (endpoint instanceof MockEndpoint) {
234                MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
235                mockEndpoint.setResultWaitTime(millis);
236                mockEndpoint.assertIsSatisfied();
237            }
238        }
239    }
240
241    /**
242     * Sets the assert period on all the expectations on any {@link MockEndpoint} instances registered
243     * in the given context.
244     *
245     * @param context the camel context used to find all the available endpoints
246     * @param period the period in millis
247     */
248    public static void setAssertPeriod(CamelContext context, long period) {
249        ObjectHelper.notNull(context, "camelContext");
250        Collection<Endpoint> endpoints = context.getEndpoints();
251        for (Endpoint endpoint : endpoints) {
252            // if the endpoint was intercepted we should get the delegate
253            if (endpoint instanceof InterceptSendToEndpoint) {
254                endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
255            }
256            if (endpoint instanceof MockEndpoint) {
257                MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
258                mockEndpoint.setAssertPeriod(period);
259            }
260        }
261    }
262
263    /**
264     * Reset all mock endpoints
265     *
266     * @param context the camel context used to find all the available endpoints to reset
267     */
268    public static void resetMocks(CamelContext context) {
269        ObjectHelper.notNull(context, "camelContext");
270        Collection<Endpoint> endpoints = context.getEndpoints();
271        for (Endpoint endpoint : endpoints) {
272            // if the endpoint was intercepted we should get the delegate
273            if (endpoint instanceof InterceptSendToEndpoint) {
274                endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
275            }
276            if (endpoint instanceof MockEndpoint) {
277                MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
278                mockEndpoint.reset();
279            }
280        }
281    }
282
283    public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
284        for (MockEndpoint endpoint : endpoints) {
285            endpoint.setExpectedMessageCount(count);
286        }
287    }
288
289    public List<Exchange> getExchanges() {
290        return getReceivedExchanges();
291    }
292
293    public Consumer createConsumer(Processor processor) throws Exception {
294        throw new UnsupportedOperationException("You cannot consume from this endpoint");
295    }
296
297    public Producer createProducer() throws Exception {
298        return new DefaultAsyncProducer(this) {
299            public boolean process(Exchange exchange, AsyncCallback callback) {
300                onExchange(exchange);
301                callback.done(true);
302                return true;
303            }
304        };
305    }
306
307    public void reset() {
308        init();
309    }
310
311
312    // Testing API
313    // -------------------------------------------------------------------------
314
315    /**
316     * Handles the incoming exchange.
317     * <p/>
318     * This method turns this mock endpoint into a bean which you can use
319     * in the Camel routes, which allows you to inject MockEndpoint as beans
320     * in your routes and use the features of the mock to control the bean.
321     *
322     * @param exchange  the exchange
323     * @throws Exception can be thrown
324     */
325    @Handler
326    public void handle(Exchange exchange) throws Exception {
327        onExchange(exchange);
328    }
329
330    /**
331     * Set the processor that will be invoked when the index
332     * message is received.
333     */
334    public void whenExchangeReceived(int index, Processor processor) {
335        this.processors.put(index, processor);
336    }
337
338    /**
339     * Set the processor that will be invoked when the some message
340     * is received.
341     *
342     * This processor could be overwritten by
343     * {@link #whenExchangeReceived(int, Processor)} method.
344     */
345    public void whenAnyExchangeReceived(Processor processor) {
346        this.defaultProcessor = processor;
347    }
348    
349    /**
350     * Set the expression which value will be set to the message body
351     * @param expression which is use to set the message body 
352     */
353    public void returnReplyBody(Expression expression) {
354        this.defaultProcessor = ProcessorBuilder.setBody(expression);
355    }
356    
357    /**
358     * Set the expression which value will be set to the message header
359     * @param headerName that will be set value
360     * @param expression which is use to set the message header 
361     */
362    public void returnReplyHeader(String headerName, Expression expression) {
363        this.defaultProcessor = ProcessorBuilder.setHeader(headerName, expression);
364    }
365    
366
367    /**
368     * Validates that all the available expectations on this endpoint are
369     * satisfied; or throw an exception
370     */
371    public void assertIsSatisfied() throws InterruptedException {
372        assertIsSatisfied(sleepForEmptyTest);
373    }
374
375    /**
376     * Validates that all the available expectations on this endpoint are
377     * satisfied; or throw an exception
378     *
379     * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
380     *                should wait for the test to be true
381     */
382    public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
383        LOG.info("Asserting: {} is satisfied", this);
384        doAssertIsSatisfied(timeoutForEmptyEndpoints);
385        if (assertPeriod > 0) {
386            // if an assert period was set then re-assert again to ensure the assertion is still valid
387            Thread.sleep(assertPeriod);
388            LOG.info("Re-asserting: {} is satisfied after {} millis", this, assertPeriod);
389            // do not use timeout when we re-assert
390            doAssertIsSatisfied(0);
391        }
392    }
393
394    protected void doAssertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
395        if (expectedCount == 0) {
396            if (timeoutForEmptyEndpoints > 0) {
397                LOG.debug("Sleeping for: {} millis to check there really are no messages received", timeoutForEmptyEndpoints);
398                Thread.sleep(timeoutForEmptyEndpoints);
399            }
400            assertEquals("Received message count", expectedCount, getReceivedCounter());
401        } else if (expectedCount > 0) {
402            if (expectedCount != getReceivedCounter()) {
403                waitForCompleteLatch();
404            }
405            assertEquals("Received message count", expectedCount, getReceivedCounter());
406        } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
407            waitForCompleteLatch();
408        }
409
410        if (expectedMinimumCount >= 0) {
411            int receivedCounter = getReceivedCounter();
412            assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter);
413        }
414
415        for (Runnable test : tests) {
416            test.run();
417        }
418
419        for (Throwable failure : failures) {
420            if (failure != null) {
421                LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
422                fail("Failed due to caught exception: " + failure);
423            }
424        }
425    }
426
427    /**
428     * Validates that the assertions fail on this endpoint
429     */
430    public void assertIsNotSatisfied() throws InterruptedException {
431        boolean failed = false;
432        try {
433            assertIsSatisfied();
434            // did not throw expected error... fail!
435            failed = true;
436        } catch (AssertionError e) {
437            if (LOG.isDebugEnabled()) {
438                // log incl stacktrace
439                LOG.debug("Caught expected failure: " + e.getMessage(), e);
440            } else {
441                LOG.info("Caught expected failure: " + e.getMessage());
442            }
443        }
444        if (failed) {
445            // fail() throws the AssertionError to indicate the test failed. 
446            fail("Expected assertion failure but test succeeded!");
447        }
448    }
449
450    /**
451     * Validates that the assertions fail on this endpoint
452
453     * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
454     *        should wait for the test to be true
455     */
456    public void assertIsNotSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
457        boolean failed = false;
458        try {
459            assertIsSatisfied(timeoutForEmptyEndpoints);
460            // did not throw expected error... fail!
461            failed = true;
462        } catch (AssertionError e) {
463            if (LOG.isDebugEnabled()) {
464                // log incl stacktrace
465                LOG.debug("Caught expected failure: " + e.getMessage(), e);
466            } else {
467                LOG.info("Caught expected failure: " + e.getMessage());
468            }
469        }
470        if (failed) { 
471            // fail() throws the AssertionError to indicate the test failed. 
472            fail("Expected assertion failure but test succeeded!");
473        }
474    }    
475    
476    /**
477     * Specifies the expected number of message exchanges that should be
478     * received by this endpoint
479     *
480     * If you want to assert that <b>exactly</b> n messages arrives to this mock
481     * endpoint, then see also the {@link #setAssertPeriod(long)} method for further details.
482     *
483     * @param expectedCount the number of message exchanges that should be
484     *                expected by this endpoint
485     * @see #setAssertPeriod(long)
486     */
487    public void expectedMessageCount(int expectedCount) {
488        setExpectedMessageCount(expectedCount);
489    }
490
491    /**
492     * Sets a grace period after which the mock endpoint will re-assert
493     * to ensure the preliminary assertion is still valid.
494     * <p/>
495     * This is used for example to assert that <b>exactly</b> a number of messages 
496     * arrives. For example if {@link #expectedMessageCount(int)} was set to 5, then
497     * the assertion is satisfied when 5 or more message arrives. To ensure that
498     * exactly 5 messages arrives, then you would need to wait a little period
499     * to ensure no further message arrives. This is what you can use this
500     * {@link #setAssertPeriod(long)} method for.
501     * <p/>
502     * By default this period is disabled.
503     *
504     * @param period grace period in millis
505     */
506    public void setAssertPeriod(long period) {
507        this.assertPeriod = period;
508    }
509
510    /**
511     * Specifies the minimum number of expected message exchanges that should be
512     * received by this endpoint
513     *
514     * @param expectedCount the number of message exchanges that should be
515     *                expected by this endpoint
516     */
517    public void expectedMinimumMessageCount(int expectedCount) {
518        setMinimumExpectedMessageCount(expectedCount);
519    }
520
521    /**
522     * Sets an expectation that the given header name & value are received by this endpoint
523     * <p/>
524     * You can set multiple expectations for different header names.
525     * If you set a value of <tt>null</tt> that means we accept either the header is absent, or its value is <tt>null</tt>
526     * <p/>
527     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
528     */
529    public void expectedHeaderReceived(final String name, final Object value) {
530        if (expectedCount == -1) {
531            expectedMessageCount(1);
532        }
533        if (expectedHeaderValues == null) {
534            expectedHeaderValues = getCamelContext().getHeadersMapFactory().newMap();
535            // we just wants to expects to be called once
536            expects(new Runnable() {
537                public void run() {
538                    for (int i = 0; i < getReceivedExchanges().size(); i++) {
539                        Exchange exchange = getReceivedExchange(i);
540                        for (Map.Entry<String, Object> entry : expectedHeaderValues.entrySet()) {
541                            String key = entry.getKey();
542                            Object expectedValue = entry.getValue();
543
544                            // we accept that an expectedValue of null also means that the header may be absent
545                            if (expectedValue != null) {
546                                assertTrue("Exchange " + i + " has no headers", exchange.getIn().hasHeaders());
547                                boolean hasKey = exchange.getIn().getHeaders().containsKey(key);
548                                assertTrue("No header with name " + key + " found for message: " + i, hasKey);
549                            }
550
551                            Object actualValue = exchange.getIn().getHeader(key);
552                            actualValue = extractActualValue(exchange, actualValue, expectedValue);
553
554                            assertEquals("Header with name " + key + " for message: " + i, expectedValue, actualValue);
555                        }
556                    }
557                }
558            });
559        }
560        expectedHeaderValues.put(name, value);
561    }
562
563    /**
564     * Adds an expectation that the given header values are received by this
565     * endpoint in any order.
566     * <p/>
567     * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then
568     * there must be 3 values.
569     * <p/>
570     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
571     */
572    public void expectedHeaderValuesReceivedInAnyOrder(final String name, final List<?> values) {
573        expectedMessageCount(values.size());
574
575        expects(new Runnable() {
576            public void run() {
577                // these are the expected values to find
578                final Set<Object> actualHeaderValues = new CopyOnWriteArraySet<>(values);
579
580                for (int i = 0; i < getReceivedExchanges().size(); i++) {
581                    Exchange exchange = getReceivedExchange(i);
582
583                    Object actualValue = exchange.getIn().getHeader(name);
584                    for (Object expectedValue : actualHeaderValues) {
585                        actualValue = extractActualValue(exchange, actualValue, expectedValue);
586                        // remove any found values
587                        actualHeaderValues.remove(actualValue);
588                    }
589                }
590
591                // should be empty, as we should find all the values
592                assertTrue("Expected " + values.size() + " headers with key[" + name + "], received " + (values.size() - actualHeaderValues.size())
593                        + " headers. Expected header values: " + actualHeaderValues, actualHeaderValues.isEmpty());
594            }
595        });
596    }
597
598    /**
599     * Adds an expectation that the given header values are received by this
600     * endpoint in any order
601     * <p/>
602     * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then
603     * there must be 3 values.
604     * <p/>
605     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
606     */
607    public void expectedHeaderValuesReceivedInAnyOrder(String name, Object... values) {
608        List<Object> valueList = new ArrayList<>();
609        valueList.addAll(Arrays.asList(values));
610        expectedHeaderValuesReceivedInAnyOrder(name, valueList);
611    }
612
613    /**
614     * Sets an expectation that the given property name & value are received by this endpoint
615     * <p/>
616     * You can set multiple expectations for different property names.
617     * If you set a value of <tt>null</tt> that means we accept either the property is absent, or its value is <tt>null</tt>
618     */
619    public void expectedPropertyReceived(final String name, final Object value) {
620        if (expectedPropertyValues == null) {
621            expectedPropertyValues = new HashMap<>();
622        }
623        expectedPropertyValues.put(name, value);
624
625        expects(new Runnable() {
626            public void run() {
627                for (int i = 0; i < getReceivedExchanges().size(); i++) {
628                    Exchange exchange = getReceivedExchange(i);
629                    for (Map.Entry<String, Object> entry : expectedPropertyValues.entrySet()) {
630                        String key = entry.getKey();
631                        Object expectedValue = entry.getValue();
632
633                        // we accept that an expectedValue of null also means that the property may be absent
634                        if (expectedValue != null) {
635                            assertTrue("Exchange " + i + " has no properties", !exchange.getProperties().isEmpty());
636                            boolean hasKey = exchange.getProperties().containsKey(key);
637                            assertTrue("No property with name " + key + " found for message: " + i, hasKey);
638                        }
639
640                        Object actualValue = exchange.getProperty(key);
641                        actualValue = extractActualValue(exchange, actualValue, expectedValue);
642
643                        assertEquals("Property with name " + key + " for message: " + i, expectedValue, actualValue);
644                    }
645                }
646            }
647        });
648    }
649
650    /**
651     * Adds an expectation that the given property values are received by this
652     * endpoint in any order.
653     * <p/>
654     * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then
655     * there must be 3 values.
656     * <p/>
657     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
658     */
659    public void expectedPropertyValuesReceivedInAnyOrder(final String name, final List<?> values) {
660        expectedMessageCount(values.size());
661
662        expects(new Runnable() {
663            public void run() {
664                // these are the expected values to find
665                final Set<Object> actualPropertyValues = new CopyOnWriteArraySet<>(values);
666
667                for (int i = 0; i < getReceivedExchanges().size(); i++) {
668                    Exchange exchange = getReceivedExchange(i);
669
670                    Object actualValue = exchange.getProperty(name);
671                    for (Object expectedValue : actualPropertyValues) {
672                        actualValue = extractActualValue(exchange, actualValue, expectedValue);
673                        // remove any found values
674                        actualPropertyValues.remove(actualValue);
675                    }
676                }
677
678                // should be empty, as we should find all the values
679                assertTrue("Expected " + values.size() + " properties with key[" + name + "], received " + (values.size() - actualPropertyValues.size())
680                        + " properties. Expected property values: " + actualPropertyValues, actualPropertyValues.isEmpty());
681            }
682        });
683    }
684
685    /**
686     * Adds an expectation that the given property values are received by this
687     * endpoint in any order
688     * <p/>
689     * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then
690     * there must be 3 values.
691     * <p/>
692     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
693     */
694    public void expectedPropertyValuesReceivedInAnyOrder(String name, Object... values) {
695        List<Object> valueList = new ArrayList<>();
696        valueList.addAll(Arrays.asList(values));
697        expectedPropertyValuesReceivedInAnyOrder(name, valueList);
698    }    
699
700    /**
701     * Adds an expectation that the given body values are received by this
702     * endpoint in the specified order
703     * <p/>
704     * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then
705     * there must be 3 values.
706     * <p/>
707     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
708     */
709    public void expectedBodiesReceived(final List<?> bodies) {
710        expectedMessageCount(bodies.size());
711        this.expectedBodyValues = bodies;
712        this.actualBodyValues = new ArrayList<>();
713
714        expects(new Runnable() {
715            public void run() {
716                for (int i = 0; i < expectedBodyValues.size(); i++) {
717                    Exchange exchange = getReceivedExchange(i);
718                    assertTrue("No exchange received for counter: " + i, exchange != null);
719
720                    Object expectedBody = expectedBodyValues.get(i);
721                    Object actualBody = null;
722                    if (i < actualBodyValues.size()) {
723                        actualBody = actualBodyValues.get(i);
724                    }
725                    actualBody = extractActualValue(exchange, actualBody, expectedBody);
726
727                    assertEquals("Body of message: " + i, expectedBody, actualBody);
728                }
729            }
730        });
731    }
732
733    private Object extractActualValue(Exchange exchange, Object actualValue, Object expectedValue) {
734        if (actualValue == null) {
735            return null;
736        }
737
738        if (actualValue instanceof Expression) {
739            Class clazz = Object.class;
740            if (expectedValue != null) {
741                clazz = expectedValue.getClass();
742            }
743            actualValue = ((Expression)actualValue).evaluate(exchange, clazz);
744        } else if (actualValue instanceof Predicate) {
745            actualValue = ((Predicate)actualValue).matches(exchange);
746        } else if (expectedValue != null) {
747            String from = actualValue.getClass().getName();
748            String to = expectedValue.getClass().getName();
749            actualValue = getCamelContext().getTypeConverter().convertTo(expectedValue.getClass(), exchange, actualValue);
750            assertTrue("There is no type conversion possible from " + from + " to " + to, actualValue != null);
751        }
752        return actualValue;
753    }
754
755    /**
756     * Sets an expectation that the given predicates matches the received messages by this endpoint
757     */
758    public void expectedMessagesMatches(Predicate... predicates) {
759        for (int i = 0; i < predicates.length; i++) {
760            final int messageIndex = i;
761            final Predicate predicate = predicates[i];
762            final AssertionClause clause = new AssertionClause(this) {
763                public void run() {
764                    addPredicate(predicate);
765                    applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
766                }
767            };
768            expects(clause);
769        }
770    }
771
772    /**
773     * Sets an expectation that the given body values are received by this endpoint
774     * <p/>
775     * <b>Important:</b> The number of bodies must match the expected number of messages, so if you expect 3 messages, then
776     * there must be 3 bodies.
777     * <p/>
778     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
779     */
780    public void expectedBodiesReceived(Object... bodies) {
781        List<Object> bodyList = new ArrayList<>();
782        bodyList.addAll(Arrays.asList(bodies));
783        expectedBodiesReceived(bodyList);
784    }
785
786    /**
787     * Adds an expectation that the given body value are received by this endpoint
788     */
789    public AssertionClause expectedBodyReceived() {
790        expectedMessageCount(1);
791        final AssertionClause clause = new AssertionClause(this) {
792            public void run() {
793                Exchange exchange = getReceivedExchange(0);
794                assertTrue("No exchange received for counter: " + 0, exchange != null);
795
796                Object actualBody = exchange.getIn().getBody();
797                Expression exp = createExpression(getCamelContext());
798                Object expectedBody = exp.evaluate(exchange, Object.class);
799
800                assertEquals("Body of message: " + 0, expectedBody, actualBody);
801            }
802        };
803        expects(clause);
804        return clause;
805    }
806
807    /**
808     * Adds an expectation that the given body values are received by this
809     * endpoint in any order
810     * <p/>
811     * <b>Important:</b> The number of bodies must match the expected number of messages, so if you expect 3 messages, then
812     * there must be 3 bodies.
813     * <p/>
814     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
815     */
816    public void expectedBodiesReceivedInAnyOrder(final List<?> bodies) {
817        expectedMessageCount(bodies.size());
818        this.expectedBodyValues = bodies;
819        this.actualBodyValues = new ArrayList<>();
820
821        expects(new Runnable() {
822            public void run() {
823                List<Object> actualBodyValuesSet = new ArrayList<>(actualBodyValues);
824                for (int i = 0; i < expectedBodyValues.size(); i++) {
825                    Exchange exchange = getReceivedExchange(i);
826                    assertTrue("No exchange received for counter: " + i, exchange != null);
827
828                    Object expectedBody = expectedBodyValues.get(i);
829                    assertTrue("Message with body " + expectedBody + " was expected but not found in " + actualBodyValuesSet, actualBodyValuesSet.remove(expectedBody));
830                }
831            }
832        });
833    }
834
835    /**
836     * Adds an expectation that the given body values are received by this
837     * endpoint in any order
838     * <p/>
839     * <b>Important:</b> The number of bodies must match the expected number of messages, so if you expect 3 messages, then
840     * there must be 3 bodies.
841     * <p/>
842     * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)}
843     */
844    public void expectedBodiesReceivedInAnyOrder(Object... bodies) {
845        List<Object> bodyList = new ArrayList<>();
846        bodyList.addAll(Arrays.asList(bodies));
847        expectedBodiesReceivedInAnyOrder(bodyList);
848    }
849
850    /**
851     * Adds an expectation that a file exists with the given name
852     *
853     * @param name name of file, will cater for / and \ on different OS platforms
854     */
855    public void expectedFileExists(final String name) {
856        expectedFileExists(name, null);
857    }
858
859    /**
860     * Adds an expectation that a file exists with the given name
861     * <p/>
862     * Will wait at most 5 seconds while checking for the existence of the file.
863     *
864     * @param name name of file, will cater for / and \ on different OS platforms
865     * @param content content of file to compare, can be <tt>null</tt> to not compare content
866     */
867    public void expectedFileExists(final String name, final String content) {
868        final File file = new File(FileUtil.normalizePath(name));
869
870        expects(new Runnable() {
871            public void run() {
872                // wait at most 5 seconds for the file to exists
873                final long timeout = System.currentTimeMillis() + 5000;
874
875                boolean stop = false;
876                while (!stop && !file.exists()) {
877                    try {
878                        Thread.sleep(50);
879                    } catch (InterruptedException e) {
880                        // ignore
881                    }
882                    stop = System.currentTimeMillis() > timeout;
883                }
884
885                assertTrue("The file should exists: " + name, file.exists());
886
887                if (content != null) {
888                    String body = getCamelContext().getTypeConverter().convertTo(String.class, file);
889                    assertEquals("Content of file: " + name, content, body);
890                }
891            }
892        });
893    }
894
895    /**
896     * Adds an expectation that messages received should have the given exchange pattern
897     */
898    public void expectedExchangePattern(final ExchangePattern exchangePattern) {
899        expectedMessagesMatches(new Predicate() {
900            public boolean matches(Exchange exchange) {
901                return exchange.getPattern().equals(exchangePattern);
902            }
903        });
904    }
905
906    /**
907     * Adds an expectation that messages received should have ascending values
908     * of the given expression such as a user generated counter value
909     */
910    public void expectsAscending(final Expression expression) {
911        expects(new Runnable() {
912            public void run() {
913                assertMessagesAscending(expression);
914            }
915        });
916    }
917
918    /**
919     * Adds an expectation that messages received should have ascending values
920     * of the given expression such as a user generated counter value
921     */
922    public AssertionClause expectsAscending() {
923        final AssertionClause clause = new AssertionClause(this) {
924            public void run() {
925                assertMessagesAscending(createExpression(getCamelContext()));
926            }
927        };
928        expects(clause);
929        return clause;
930    }
931
932    /**
933     * Adds an expectation that messages received should have descending values
934     * of the given expression such as a user generated counter value
935     */
936    public void expectsDescending(final Expression expression) {
937        expects(new Runnable() {
938            public void run() {
939                assertMessagesDescending(expression);
940            }
941        });
942    }
943
944    /**
945     * Adds an expectation that messages received should have descending values
946     * of the given expression such as a user generated counter value
947     */
948    public AssertionClause expectsDescending() {
949        final AssertionClause clause = new AssertionClause(this) {
950            public void run() {
951                assertMessagesDescending(createExpression(getCamelContext()));
952            }
953        };
954        expects(clause);
955        return clause;
956    }
957
958    /**
959     * Adds an expectation that no duplicate messages should be received using
960     * the expression to determine the message ID
961     *
962     * @param expression the expression used to create a unique message ID for
963     *                message comparison (which could just be the message
964     *                payload if the payload can be tested for uniqueness using
965     *                {@link Object#equals(Object)} and
966     *                {@link Object#hashCode()}
967     */
968    public void expectsNoDuplicates(final Expression expression) {
969        expects(new Runnable() {
970            public void run() {
971                assertNoDuplicates(expression);
972            }
973        });
974    }
975
976    /**
977     * Adds an expectation that no duplicate messages should be received using
978     * the expression to determine the message ID
979     */
980    public AssertionClause expectsNoDuplicates() {
981        final AssertionClause clause = new AssertionClause(this) {
982            public void run() {
983                assertNoDuplicates(createExpression(getCamelContext()));
984            }
985        };
986        expects(clause);
987        return clause;
988    }
989
990    /**
991     * Asserts that the messages have ascending values of the given expression
992     */
993    public void assertMessagesAscending(Expression expression) {
994        assertMessagesSorted(expression, true);
995    }
996
997    /**
998     * Asserts that the messages have descending values of the given expression
999     */
1000    public void assertMessagesDescending(Expression expression) {
1001        assertMessagesSorted(expression, false);
1002    }
1003
1004    protected void assertMessagesSorted(Expression expression, boolean ascending) {
1005        String type = ascending ? "ascending" : "descending";
1006        ExpressionComparator comparator = new ExpressionComparator(expression);
1007        List<Exchange> list = getReceivedExchanges();
1008        for (int i = 1; i < list.size(); i++) {
1009            int j = i - 1;
1010            Exchange e1 = list.get(j);
1011            Exchange e2 = list.get(i);
1012            int result = comparator.compare(e1, e2);
1013            if (result == 0) {
1014                fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: "
1015                    + expression.evaluate(e1, Object.class) + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
1016            } else {
1017                if (!ascending) {
1018                    result = result * -1;
1019                }
1020                if (result > 0) {
1021                    fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1, Object.class)
1022                        + " and message " + i + " has value: " + expression.evaluate(e2, Object.class) + " for expression: "
1023                        + expression + ". Exchanges: " + e1 + " and " + e2);
1024                }
1025            }
1026        }
1027    }
1028
1029    public void assertNoDuplicates(Expression expression) {
1030        Map<Object, Exchange> map = new HashMap<>();
1031        List<Exchange> list = getReceivedExchanges();
1032        for (int i = 0; i < list.size(); i++) {
1033            Exchange e2 = list.get(i);
1034            Object key = expression.evaluate(e2, Object.class);
1035            Exchange e1 = map.get(key);
1036            if (e1 != null) {
1037                fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
1038            } else {
1039                map.put(key, e2);
1040            }
1041        }
1042    }
1043
1044    /**
1045     * Adds the expectation which will be invoked when enough messages are received
1046     */
1047    public void expects(Runnable runnable) {
1048        tests.add(runnable);
1049    }
1050
1051    /**
1052     * Adds an assertion to the given message index
1053     *
1054     * @param messageIndex the number of the message
1055     * @return the assertion clause
1056     */
1057    public AssertionClause message(final int messageIndex) {
1058        final AssertionClause clause = new AssertionClause(this) {
1059            public void run() {
1060                applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
1061            }
1062        };
1063        expects(clause);
1064        return clause;
1065    }
1066
1067    /**
1068     * Adds an assertion to all the received messages
1069     *
1070     * @return the assertion clause
1071     */
1072    public AssertionClause allMessages() {
1073        final AssertionClause clause = new AssertionClause(this) {
1074            public void run() {
1075                List<Exchange> list = getReceivedExchanges();
1076                int index = 0;
1077                for (Exchange exchange : list) {
1078                    applyAssertionOn(MockEndpoint.this, index++, exchange);
1079                }
1080            }
1081        };
1082        expects(clause);
1083        return clause;
1084    }
1085
1086    /**
1087     * Asserts that the given index of message is received (starting at zero)
1088     */
1089    public Exchange assertExchangeReceived(int index) {
1090        int count = getReceivedCounter();
1091        assertTrue("Not enough messages received. Was: " + count, count > index);
1092        return getReceivedExchange(index);
1093    }
1094
1095    // Properties
1096    // -------------------------------------------------------------------------
1097
1098    public String getName() {
1099        return name;
1100    }
1101
1102    public void setName(String name) {
1103        this.name = name;
1104    }
1105
1106    public List<Throwable> getFailures() {
1107        return failures;
1108    }
1109
1110    public int getReceivedCounter() {
1111        return counter;
1112    }
1113
1114    public List<Exchange> getReceivedExchanges() {
1115        return receivedExchanges;
1116    }
1117
1118    public int getExpectedCount() {
1119        return expectedCount;
1120    }
1121
1122    public long getSleepForEmptyTest() {
1123        return sleepForEmptyTest;
1124    }
1125
1126    /**
1127     * Allows a sleep to be specified to wait to check that this endpoint really
1128     * is empty when {@link #expectedMessageCount(int)} is called with zero
1129     *
1130     * @param sleepForEmptyTest the milliseconds to sleep for to determine that
1131     *                this endpoint really is empty
1132     */
1133    public void setSleepForEmptyTest(long sleepForEmptyTest) {
1134        this.sleepForEmptyTest = sleepForEmptyTest;
1135    }
1136
1137    public long getResultWaitTime() {
1138        return resultWaitTime;
1139    }
1140
1141    /**
1142     * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will
1143     * wait on a latch until it is satisfied
1144     */
1145    public void setResultWaitTime(long resultWaitTime) {
1146        this.resultWaitTime = resultWaitTime;
1147    }
1148
1149    /**
1150     * Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will
1151     * wait on a latch until it is satisfied
1152     */
1153    public void setResultMinimumWaitTime(long resultMinimumWaitTime) {
1154        this.resultMinimumWaitTime = resultMinimumWaitTime;
1155    }
1156
1157    /**
1158     * @deprecated use {@link #setResultMinimumWaitTime(long)}
1159     */
1160    @Deprecated
1161    public void setMinimumResultWaitTime(long resultMinimumWaitTime) {
1162        setResultMinimumWaitTime(resultMinimumWaitTime);
1163    }
1164
1165    /**
1166     * Specifies the expected number of message exchanges that should be
1167     * received by this endpoint.
1168     * <p/>
1169     * <b>Beware:</b> If you want to expect that <tt>0</tt> messages, then take extra care,
1170     * as <tt>0</tt> matches when the tests starts, so you need to set a assert period time
1171     * to let the test run for a while to make sure there are still no messages arrived; for
1172     * that use {@link #setAssertPeriod(long)}.
1173     * An alternative is to use <a href="http://camel.apache.org/notifybuilder.html">NotifyBuilder</a>, and use the notifier
1174     * to know when Camel is done routing some messages, before you call the {@link #assertIsSatisfied()} method on the mocks.
1175     * This allows you to not use a fixed assert period, to speedup testing times.
1176     * <p/>
1177     * If you want to assert that <b>exactly</b> n'th message arrives to this mock
1178     * endpoint, then see also the {@link #setAssertPeriod(long)} method for further details.
1179     *
1180     * @param expectedCount the number of message exchanges that should be
1181     *                expected by this endpoint
1182     * @see #setAssertPeriod(long)                      
1183     */
1184    public void setExpectedCount(int expectedCount) {
1185        setExpectedMessageCount(expectedCount);
1186    }
1187
1188    /**
1189     * @see #setExpectedCount(int)
1190     */
1191    public void setExpectedMessageCount(int expectedCount) {
1192        this.expectedCount = expectedCount;
1193        if (expectedCount <= 0) {
1194            latch = null;
1195        } else {
1196            latch = new CountDownLatch(expectedCount);
1197        }
1198    }
1199
1200    /**
1201     * Specifies the minimum number of expected message exchanges that should be
1202     * received by this endpoint
1203     *
1204     * @param expectedCount the number of message exchanges that should be
1205     *                expected by this endpoint
1206     */
1207    public void setMinimumExpectedMessageCount(int expectedCount) {
1208        this.expectedMinimumCount = expectedCount;
1209        if (expectedCount <= 0) {
1210            latch = null;
1211        } else {
1212            latch = new CountDownLatch(expectedMinimumCount);
1213        }
1214    }
1215
1216    public Processor getReporter() {
1217        return reporter;
1218    }
1219
1220    /**
1221     * Allows a processor to added to the endpoint to report on progress of the test
1222     */
1223    public void setReporter(Processor reporter) {
1224        this.reporter = reporter;
1225    }
1226
1227    /**
1228     * Specifies to only retain the first n'th number of received {@link Exchange}s.
1229     * <p/>
1230     * This is used when testing with big data, to reduce memory consumption by not storing
1231     * copies of every {@link Exchange} this mock endpoint receives.
1232     * <p/>
1233     * <b>Important:</b> When using this limitation, then the {@link #getReceivedCounter()}
1234     * will still return the actual number of received {@link Exchange}s. For example
1235     * if we have received 5000 {@link Exchange}s, and have configured to only retain the first
1236     * 10 {@link Exchange}s, then the {@link #getReceivedCounter()} will still return <tt>5000</tt>
1237     * but there is only the first 10 {@link Exchange}s in the {@link #getExchanges()} and
1238     * {@link #getReceivedExchanges()} methods.
1239     * <p/>
1240     * When using this method, then some of the other expectation methods is not supported,
1241     * for example the {@link #expectedBodiesReceived(Object...)} sets a expectation on the first
1242     * number of bodies received.
1243     * <p/>
1244     * You can configure both {@link #setRetainFirst(int)} and {@link #setRetainLast(int)} methods,
1245     * to limit both the first and last received.
1246     * 
1247     * @param retainFirst  to limit and only keep the first n'th received {@link Exchange}s, use
1248     *                     <tt>0</tt> to not retain any messages, or <tt>-1</tt> to retain all.
1249     * @see #setRetainLast(int)
1250     */
1251    public void setRetainFirst(int retainFirst) {
1252        this.retainFirst = retainFirst;
1253    }
1254
1255    /**
1256     * Specifies to only retain the last n'th number of received {@link Exchange}s.
1257     * <p/>
1258     * This is used when testing with big data, to reduce memory consumption by not storing
1259     * copies of every {@link Exchange} this mock endpoint receives.
1260     * <p/>
1261     * <b>Important:</b> When using this limitation, then the {@link #getReceivedCounter()}
1262     * will still return the actual number of received {@link Exchange}s. For example
1263     * if we have received 5000 {@link Exchange}s, and have configured to only retain the last
1264     * 20 {@link Exchange}s, then the {@link #getReceivedCounter()} will still return <tt>5000</tt>
1265     * but there is only the last 20 {@link Exchange}s in the {@link #getExchanges()} and
1266     * {@link #getReceivedExchanges()} methods.
1267     * <p/>
1268     * When using this method, then some of the other expectation methods is not supported,
1269     * for example the {@link #expectedBodiesReceived(Object...)} sets a expectation on the first
1270     * number of bodies received.
1271     * <p/>
1272     * You can configure both {@link #setRetainFirst(int)} and {@link #setRetainLast(int)} methods,
1273     * to limit both the first and last received.
1274     *
1275     * @param retainLast  to limit and only keep the last n'th received {@link Exchange}s, use
1276     *                     <tt>0</tt> to not retain any messages, or <tt>-1</tt> to retain all.
1277     * @see #setRetainFirst(int)
1278     */
1279    public void setRetainLast(int retainLast) {
1280        this.retainLast = retainLast;
1281    }
1282
1283    public int isReportGroup() {
1284        return reportGroup;
1285    }
1286
1287    /**
1288     * A number that is used to turn on throughput logging based on groups of the size.
1289     */
1290    public void setReportGroup(int reportGroup) {
1291        this.reportGroup = reportGroup;
1292    }
1293
1294    public boolean isCopyOnExchange() {
1295        return copyOnExchange;
1296    }
1297
1298    /**
1299     * Sets whether to make a deep copy of the incoming {@link Exchange} when received at this mock endpoint.
1300     * <p/>
1301     * Is by default <tt>true</tt>.
1302     */
1303    public void setCopyOnExchange(boolean copyOnExchange) {
1304        this.copyOnExchange = copyOnExchange;
1305    }
1306
1307    // Implementation methods
1308    // -------------------------------------------------------------------------
1309    private void init() {
1310        expectedCount = -1;
1311        counter = 0;
1312        defaultProcessor = null;
1313        processors = new HashMap<>();
1314        receivedExchanges = new CopyOnWriteArrayList<>();
1315        failures = new CopyOnWriteArrayList<>();
1316        tests = new CopyOnWriteArrayList<>();
1317        latch = null;
1318        sleepForEmptyTest = 0;
1319        resultWaitTime = 0;
1320        resultMinimumWaitTime = 0L;
1321        assertPeriod = 0L;
1322        expectedMinimumCount = -1;
1323        expectedBodyValues = null;
1324        actualBodyValues = new ArrayList<>();
1325        expectedHeaderValues = null;
1326        actualHeaderValues = null;
1327        expectedPropertyValues = null;
1328        actualPropertyValues = null;
1329        retainFirst = -1;
1330        retainLast = -1;
1331    }
1332
1333    protected synchronized void onExchange(Exchange exchange) {
1334        try {
1335            if (reporter != null) {
1336                reporter.process(exchange);
1337            }
1338            Exchange copy = exchange;
1339            if (copyOnExchange) {
1340                // copy the exchange so the mock stores the copy and not the actual exchange
1341                copy = ExchangeHelper.createCopy(exchange, true);
1342            }
1343            performAssertions(exchange, copy);
1344        } catch (Throwable e) {
1345            // must catch java.lang.Throwable as AssertionError extends java.lang.Error
1346            failures.add(e);
1347        } finally {
1348            // make sure latch is counted down to avoid test hanging forever
1349            if (latch != null) {
1350                latch.countDown();
1351            }
1352        }
1353    }
1354
1355    /**
1356     * Performs the assertions on the incoming exchange.
1357     *
1358     * @param exchange   the actual exchange
1359     * @param copy       a copy of the exchange (only store this)
1360     * @throws Exception can be thrown if something went wrong
1361     */
1362    protected void performAssertions(Exchange exchange, Exchange copy) throws Exception {
1363        Message in = copy.getIn();
1364        Object actualBody = in.getBody();
1365
1366        if (expectedHeaderValues != null) {
1367            if (actualHeaderValues == null) {
1368                actualHeaderValues = getCamelContext().getHeadersMapFactory().newMap();
1369            }
1370            if (in.hasHeaders()) {
1371                actualHeaderValues.putAll(in.getHeaders());
1372            }
1373        }
1374
1375        if (expectedPropertyValues != null) {
1376            if (actualPropertyValues == null) {
1377                actualPropertyValues = getCamelContext().getHeadersMapFactory().newMap();
1378            }
1379            actualPropertyValues.putAll(copy.getProperties());
1380        }
1381
1382        if (expectedBodyValues != null) {
1383            int index = actualBodyValues.size();
1384            if (expectedBodyValues.size() > index) {
1385                Object expectedBody = expectedBodyValues.get(index);
1386                if (expectedBody != null) {
1387                    // prefer to convert body early, for example when using files
1388                    // we need to read the content at this time
1389                    Object body = in.getBody(expectedBody.getClass());
1390                    if (body != null) {
1391                        actualBody = body;
1392                    }
1393                }
1394                actualBodyValues.add(actualBody);
1395            }
1396        }
1397
1398        // let counter be 0 index-based in the logs
1399        if (LOG.isDebugEnabled()) {
1400            String msg = getEndpointUri() + " >>>> " + counter + " : " + copy + " with body: " + actualBody;
1401            if (copy.getIn().hasHeaders()) {
1402                msg += " and headers:" + copy.getIn().getHeaders();
1403            }
1404            LOG.debug(msg);
1405        }
1406
1407        // record timestamp when exchange was received
1408        copy.setProperty(Exchange.RECEIVED_TIMESTAMP, new Date());
1409
1410        // add a copy of the received exchange
1411        addReceivedExchange(copy);
1412        // and then increment counter after adding received exchange
1413        ++counter;
1414
1415        Processor processor = processors.get(getReceivedCounter()) != null
1416                ? processors.get(getReceivedCounter()) : defaultProcessor;
1417
1418        if (processor != null) {
1419            try {
1420                // must process the incoming exchange and NOT the copy as the idea
1421                // is the end user can manipulate the exchange
1422                processor.process(exchange);
1423            } catch (Exception e) {
1424                // set exceptions on exchange so we can throw exceptions to simulate errors
1425                exchange.setException(e);
1426            }
1427        }
1428    }
1429
1430    /**
1431     * Adds the received exchange.
1432     * 
1433     * @param copy  a copy of the received exchange
1434     */
1435    protected void addReceivedExchange(Exchange copy) {
1436        if (retainFirst == 0 && retainLast == 0) {
1437            // do not retain any messages at all
1438        } else if (retainFirst < 0 && retainLast < 0) {
1439            // no limitation so keep them all
1440            receivedExchanges.add(copy);
1441        } else {
1442            // okay there is some sort of limitations, so figure out what to retain
1443            if (retainFirst > 0 && counter < retainFirst) {
1444                // store a copy as its within the retain first limitation
1445                receivedExchanges.add(copy);
1446            } else if (retainLast > 0) {
1447                // remove the oldest from the last retained boundary,
1448                int index = receivedExchanges.size() - retainLast;
1449                if (index >= 0) {
1450                    // but must be outside the first range as well
1451                    // otherwise we should not remove the oldest
1452                    if (retainFirst <= 0 || retainFirst <= index) {
1453                        receivedExchanges.remove(index);
1454                    }
1455                }
1456                // store a copy of the last n'th received
1457                receivedExchanges.add(copy);
1458            }
1459        }
1460    }
1461
1462    protected void waitForCompleteLatch() throws InterruptedException {
1463        if (latch == null) {
1464            fail("Should have a latch!");
1465        }
1466
1467        StopWatch watch = new StopWatch();
1468        waitForCompleteLatch(resultWaitTime);
1469        long delta = watch.taken();
1470        LOG.debug("Took {} millis to complete latch", delta);
1471
1472        if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) {
1473            fail("Expected minimum " + resultMinimumWaitTime
1474                + " millis waiting on the result, but was faster with " + delta + " millis.");
1475        }
1476    }
1477
1478    protected void waitForCompleteLatch(long timeout) throws InterruptedException {
1479        // Wait for a default 10 seconds if resultWaitTime is not set
1480        long waitTime = timeout == 0 ? 10000L : timeout;
1481
1482        // now let's wait for the results
1483        LOG.debug("Waiting on the latch for: {} millis", timeout);
1484        latch.await(waitTime, TimeUnit.MILLISECONDS);
1485    }
1486
1487    protected void assertEquals(String message, Object expectedValue, Object actualValue) {
1488        if (!ObjectHelper.equal(expectedValue, actualValue)) {
1489            fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
1490        }
1491    }
1492
1493    protected void assertTrue(String message, boolean predicate) {
1494        if (!predicate) {
1495            fail(message);
1496        }
1497    }
1498
1499    protected void fail(Object message) {
1500        if (LOG.isDebugEnabled()) {
1501            List<Exchange> list = getReceivedExchanges();
1502            int index = 0;
1503            for (Exchange exchange : list) {
1504                LOG.debug("{} failed and received[{}]: {}", getEndpointUri(), ++index, exchange);
1505            }
1506        }
1507        throw new AssertionError(getEndpointUri() + " " + message);
1508    }
1509
1510    public int getExpectedMinimumCount() {
1511        return expectedMinimumCount;
1512    }
1513
1514    public void await() throws InterruptedException {
1515        if (latch != null) {
1516            latch.await();
1517        }
1518    }
1519
1520    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
1521        if (latch != null) {
1522            return latch.await(timeout, unit);
1523        }
1524        return true;
1525    }
1526
1527    public boolean isSingleton() {
1528        return true;
1529    }
1530
1531    public boolean isLenientProperties() {
1532        return true;
1533    }
1534
1535    private Exchange getReceivedExchange(int index) {
1536        if (index <= receivedExchanges.size() - 1) {
1537            return receivedExchanges.get(index);
1538        } else {
1539            return null;
1540        }
1541    }
1542
1543}