001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.mock; 018 019import java.io.File; 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Date; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.CopyOnWriteArrayList; 030import java.util.concurrent.CopyOnWriteArraySet; 031import java.util.concurrent.CountDownLatch; 032import java.util.concurrent.TimeUnit; 033 034import org.apache.camel.AsyncCallback; 035import org.apache.camel.CamelContext; 036import org.apache.camel.Component; 037import org.apache.camel.Consumer; 038import org.apache.camel.Endpoint; 039import org.apache.camel.Exchange; 040import org.apache.camel.ExchangePattern; 041import org.apache.camel.Expression; 042import org.apache.camel.Handler; 043import org.apache.camel.Message; 044import org.apache.camel.Predicate; 045import org.apache.camel.Processor; 046import org.apache.camel.Producer; 047import org.apache.camel.builder.ProcessorBuilder; 048import org.apache.camel.impl.DefaultAsyncProducer; 049import org.apache.camel.impl.DefaultEndpoint; 050import org.apache.camel.impl.InterceptSendToEndpoint; 051import org.apache.camel.spi.BrowsableEndpoint; 052import org.apache.camel.spi.Metadata; 053import org.apache.camel.spi.UriEndpoint; 054import org.apache.camel.spi.UriParam; 055import org.apache.camel.spi.UriPath; 056import org.apache.camel.util.CamelContextHelper; 057import org.apache.camel.util.CaseInsensitiveMap; 058import org.apache.camel.util.ExchangeHelper; 059import org.apache.camel.util.ExpressionComparator; 060import org.apache.camel.util.FileUtil; 061import org.apache.camel.util.ObjectHelper; 062import org.apache.camel.util.StopWatch; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066/** 067 * The mock component is used for testing routes and mediation rules using mocks. 068 * <p/> 069 * A Mock endpoint which provides a literate, fluent API for testing routes 070 * using a <a href="http://jmock.org/">JMock style</a> API. 071 * <p/> 072 * The mock endpoint have two set of methods 073 * <ul> 074 * <li>expectedXXX or expectsXXX - To set pre conditions, before the test is executed</li> 075 * <li>assertXXX - To assert assertions, after the test has been executed</li> 076 * </ul> 077 * Its <b>important</b> to know the difference between the two set. The former is used to 078 * set expectations before the test is being started (eg before the mock receives messages). 079 * The latter is used after the test has been executed, to verify the expectations; or 080 * other assertions which you can perform after the test has been completed. 081 * <p/> 082 * <b>Beware:</b> If you want to expect a mock does not receive any messages, by calling 083 * {@link #setExpectedMessageCount(int)} with <tt>0</tt>, then take extra care, 084 * as <tt>0</tt> matches when the tests starts, so you need to set a assert period time 085 * to let the test run for a while to make sure there are still no messages arrived; for 086 * that use {@link #setAssertPeriod(long)}. 087 * An alternative is to use <a href="http://camel.apache.org/notifybuilder.html">NotifyBuilder</a>, and use the notifier 088 * to know when Camel is done routing some messages, before you call the {@link #assertIsSatisfied()} method on the mocks. 089 * This allows you to not use a fixed assert period, to speedup testing times. 090 * <p/> 091 * <b>Important:</b> If using {@link #expectedMessageCount(int)} and also {@link #expectedBodiesReceived(java.util.List)} or 092 * {@link #expectedHeaderReceived(String, Object)} then the latter overrides the number of expected message based on the 093 * number of values provided in the bodies/headers. 094 * 095 * @version 096 */ 097@UriEndpoint(scheme = "mock", title = "Mock", syntax = "mock:name", producerOnly = true, label = "core,testing", lenientProperties = true) 098public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint { 099 private static final Logger LOG = LoggerFactory.getLogger(MockEndpoint.class); 100 // must be volatile so changes is visible between the thread which performs the assertions 101 // and the threads which process the exchanges when routing messages in Camel 102 protected volatile Processor reporter; 103 104 private volatile Processor defaultProcessor; 105 private volatile Map<Integer, Processor> processors; 106 private volatile List<Exchange> receivedExchanges; 107 private volatile List<Throwable> failures; 108 private volatile List<Runnable> tests; 109 private volatile CountDownLatch latch; 110 private volatile int expectedMinimumCount; 111 private volatile List<?> expectedBodyValues; 112 private volatile List<Object> actualBodyValues; 113 private volatile Map<String, Object> expectedHeaderValues; 114 private volatile Map<String, Object> actualHeaderValues; 115 private volatile Map<String, Object> expectedPropertyValues; 116 private volatile Map<String, Object> actualPropertyValues; 117 118 private volatile int counter; 119 120 @UriPath(description = "Name of mock endpoint") @Metadata(required = "true") 121 private String name; 122 @UriParam(label = "producer", defaultValue = "-1") 123 private int expectedCount; 124 @UriParam(label = "producer", defaultValue = "0") 125 private long sleepForEmptyTest; 126 @UriParam(label = "producer", defaultValue = "0") 127 private long resultWaitTime; 128 @UriParam(label = "producer", defaultValue = "0") 129 private long resultMinimumWaitTime; 130 @UriParam(label = "producer", defaultValue = "0") 131 private long assertPeriod; 132 @UriParam(label = "producer", defaultValue = "-1") 133 private int retainFirst; 134 @UriParam(label = "producer", defaultValue = "-1") 135 private int retainLast; 136 @UriParam(label = "producer") 137 private int reportGroup; 138 @UriParam(label = "producer,advanced", defaultValue = "true") 139 private boolean copyOnExchange = true; 140 141 public MockEndpoint(String endpointUri, Component component) { 142 super(endpointUri, component); 143 init(); 144 } 145 146 @Deprecated 147 public MockEndpoint(String endpointUri) { 148 super(endpointUri); 149 init(); 150 } 151 152 public MockEndpoint() { 153 this(null); 154 } 155 156 /** 157 * A helper method to resolve the mock endpoint of the given URI on the given context 158 * 159 * @param context the camel context to try resolve the mock endpoint from 160 * @param uri the uri of the endpoint to resolve 161 * @return the endpoint 162 */ 163 public static MockEndpoint resolve(CamelContext context, String uri) { 164 return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class); 165 } 166 167 public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 168 long start = System.currentTimeMillis(); 169 long left = unit.toMillis(timeout); 170 long end = start + left; 171 for (MockEndpoint endpoint : endpoints) { 172 if (!endpoint.await(left, TimeUnit.MILLISECONDS)) { 173 throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out."); 174 } 175 left = end - System.currentTimeMillis(); 176 if (left <= 0) { 177 left = 0; 178 } 179 } 180 } 181 182 public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 183 assertWait(timeout, unit, endpoints); 184 for (MockEndpoint endpoint : endpoints) { 185 endpoint.assertIsSatisfied(); 186 } 187 } 188 189 public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException { 190 for (MockEndpoint endpoint : endpoints) { 191 endpoint.assertIsSatisfied(); 192 } 193 } 194 195 196 /** 197 * Asserts that all the expectations on any {@link MockEndpoint} instances registered 198 * in the given context are valid 199 * 200 * @param context the camel context used to find all the available endpoints to be asserted 201 */ 202 public static void assertIsSatisfied(CamelContext context) throws InterruptedException { 203 ObjectHelper.notNull(context, "camelContext"); 204 Collection<Endpoint> endpoints = context.getEndpoints(); 205 for (Endpoint endpoint : endpoints) { 206 // if the endpoint was intercepted we should get the delegate 207 if (endpoint instanceof InterceptSendToEndpoint) { 208 endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate(); 209 } 210 if (endpoint instanceof MockEndpoint) { 211 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 212 mockEndpoint.assertIsSatisfied(); 213 } 214 } 215 } 216 217 /** 218 * Asserts that all the expectations on any {@link MockEndpoint} instances registered 219 * in the given context are valid 220 * 221 * @param context the camel context used to find all the available endpoints to be asserted 222 * @param timeout timeout 223 * @param unit time unit 224 */ 225 public static void assertIsSatisfied(CamelContext context, long timeout, TimeUnit unit) throws InterruptedException { 226 ObjectHelper.notNull(context, "camelContext"); 227 ObjectHelper.notNull(unit, "unit"); 228 Collection<Endpoint> endpoints = context.getEndpoints(); 229 long millis = unit.toMillis(timeout); 230 for (Endpoint endpoint : endpoints) { 231 // if the endpoint was intercepted we should get the delegate 232 if (endpoint instanceof InterceptSendToEndpoint) { 233 endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate(); 234 } 235 if (endpoint instanceof MockEndpoint) { 236 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 237 mockEndpoint.setResultWaitTime(millis); 238 mockEndpoint.assertIsSatisfied(); 239 } 240 } 241 } 242 243 /** 244 * Sets the assert period on all the expectations on any {@link MockEndpoint} instances registered 245 * in the given context. 246 * 247 * @param context the camel context used to find all the available endpoints 248 * @param period the period in millis 249 */ 250 public static void setAssertPeriod(CamelContext context, long period) { 251 ObjectHelper.notNull(context, "camelContext"); 252 Collection<Endpoint> endpoints = context.getEndpoints(); 253 for (Endpoint endpoint : endpoints) { 254 // if the endpoint was intercepted we should get the delegate 255 if (endpoint instanceof InterceptSendToEndpoint) { 256 endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate(); 257 } 258 if (endpoint instanceof MockEndpoint) { 259 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 260 mockEndpoint.setAssertPeriod(period); 261 } 262 } 263 } 264 265 /** 266 * Reset all mock endpoints 267 * 268 * @param context the camel context used to find all the available endpoints to reset 269 */ 270 public static void resetMocks(CamelContext context) { 271 ObjectHelper.notNull(context, "camelContext"); 272 Collection<Endpoint> endpoints = context.getEndpoints(); 273 for (Endpoint endpoint : endpoints) { 274 // if the endpoint was intercepted we should get the delegate 275 if (endpoint instanceof InterceptSendToEndpoint) { 276 endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate(); 277 } 278 if (endpoint instanceof MockEndpoint) { 279 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 280 mockEndpoint.reset(); 281 } 282 } 283 } 284 285 public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException { 286 for (MockEndpoint endpoint : endpoints) { 287 endpoint.setExpectedMessageCount(count); 288 } 289 } 290 291 public List<Exchange> getExchanges() { 292 return getReceivedExchanges(); 293 } 294 295 public Consumer createConsumer(Processor processor) throws Exception { 296 throw new UnsupportedOperationException("You cannot consume from this endpoint"); 297 } 298 299 public Producer createProducer() throws Exception { 300 return new DefaultAsyncProducer(this) { 301 public boolean process(Exchange exchange, AsyncCallback callback) { 302 onExchange(exchange); 303 callback.done(true); 304 return true; 305 } 306 }; 307 } 308 309 public void reset() { 310 init(); 311 } 312 313 314 // Testing API 315 // ------------------------------------------------------------------------- 316 317 /** 318 * Handles the incoming exchange. 319 * <p/> 320 * This method turns this mock endpoint into a bean which you can use 321 * in the Camel routes, which allows you to inject MockEndpoint as beans 322 * in your routes and use the features of the mock to control the bean. 323 * 324 * @param exchange the exchange 325 * @throws Exception can be thrown 326 */ 327 @Handler 328 public void handle(Exchange exchange) throws Exception { 329 onExchange(exchange); 330 } 331 332 /** 333 * Set the processor that will be invoked when the index 334 * message is received. 335 */ 336 public void whenExchangeReceived(int index, Processor processor) { 337 this.processors.put(index, processor); 338 } 339 340 /** 341 * Set the processor that will be invoked when the some message 342 * is received. 343 * 344 * This processor could be overwritten by 345 * {@link #whenExchangeReceived(int, Processor)} method. 346 */ 347 public void whenAnyExchangeReceived(Processor processor) { 348 this.defaultProcessor = processor; 349 } 350 351 /** 352 * Set the expression which value will be set to the message body 353 * @param expression which is use to set the message body 354 */ 355 public void returnReplyBody(Expression expression) { 356 this.defaultProcessor = ProcessorBuilder.setBody(expression); 357 } 358 359 /** 360 * Set the expression which value will be set to the message header 361 * @param headerName that will be set value 362 * @param expression which is use to set the message header 363 */ 364 public void returnReplyHeader(String headerName, Expression expression) { 365 this.defaultProcessor = ProcessorBuilder.setHeader(headerName, expression); 366 } 367 368 369 /** 370 * Validates that all the available expectations on this endpoint are 371 * satisfied; or throw an exception 372 */ 373 public void assertIsSatisfied() throws InterruptedException { 374 assertIsSatisfied(sleepForEmptyTest); 375 } 376 377 /** 378 * Validates that all the available expectations on this endpoint are 379 * satisfied; or throw an exception 380 * 381 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we 382 * should wait for the test to be true 383 */ 384 public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 385 LOG.info("Asserting: " + this + " is satisfied"); 386 doAssertIsSatisfied(timeoutForEmptyEndpoints); 387 if (assertPeriod > 0) { 388 // if an assert period was set then re-assert again to ensure the assertion is still valid 389 Thread.sleep(assertPeriod); 390 LOG.info("Re-asserting: " + this + " is satisfied after " + assertPeriod + " millis"); 391 // do not use timeout when we re-assert 392 doAssertIsSatisfied(0); 393 } 394 } 395 396 protected void doAssertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 397 if (expectedCount == 0) { 398 if (timeoutForEmptyEndpoints > 0) { 399 LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received"); 400 Thread.sleep(timeoutForEmptyEndpoints); 401 } 402 assertEquals("Received message count", expectedCount, getReceivedCounter()); 403 } else if (expectedCount > 0) { 404 if (expectedCount != getReceivedCounter()) { 405 waitForCompleteLatch(); 406 } 407 assertEquals("Received message count", expectedCount, getReceivedCounter()); 408 } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) { 409 waitForCompleteLatch(); 410 } 411 412 if (expectedMinimumCount >= 0) { 413 int receivedCounter = getReceivedCounter(); 414 assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter); 415 } 416 417 for (Runnable test : tests) { 418 test.run(); 419 } 420 421 for (Throwable failure : failures) { 422 if (failure != null) { 423 LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure); 424 fail("Failed due to caught exception: " + failure); 425 } 426 } 427 } 428 429 /** 430 * Validates that the assertions fail on this endpoint 431 */ 432 public void assertIsNotSatisfied() throws InterruptedException { 433 boolean failed = false; 434 try { 435 assertIsSatisfied(); 436 // did not throw expected error... fail! 437 failed = true; 438 } catch (AssertionError e) { 439 LOG.info("Caught expected failure: " + e); 440 } 441 if (failed) { 442 // fail() throws the AssertionError to indicate the test failed. 443 fail("Expected assertion failure but test succeeded!"); 444 } 445 } 446 447 /** 448 * Validates that the assertions fail on this endpoint 449 450 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we 451 * should wait for the test to be true 452 */ 453 public void assertIsNotSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 454 boolean failed = false; 455 try { 456 assertIsSatisfied(timeoutForEmptyEndpoints); 457 // did not throw expected error... fail! 458 failed = true; 459 } catch (AssertionError e) { 460 LOG.info("Caught expected failure: " + e); 461 } 462 if (failed) { 463 // fail() throws the AssertionError to indicate the test failed. 464 fail("Expected assertion failure but test succeeded!"); 465 } 466 } 467 468 /** 469 * Specifies the expected number of message exchanges that should be 470 * received by this endpoint 471 * 472 * If you want to assert that <b>exactly</b> n messages arrives to this mock 473 * endpoint, then see also the {@link #setAssertPeriod(long)} method for further details. 474 * 475 * @param expectedCount the number of message exchanges that should be 476 * expected by this endpoint 477 * @see #setAssertPeriod(long) 478 */ 479 public void expectedMessageCount(int expectedCount) { 480 setExpectedMessageCount(expectedCount); 481 } 482 483 /** 484 * Sets a grace period after which the mock endpoint will re-assert 485 * to ensure the preliminary assertion is still valid. 486 * <p/> 487 * This is used for example to assert that <b>exactly</b> a number of messages 488 * arrives. For example if {@link #expectedMessageCount(int)} was set to 5, then 489 * the assertion is satisfied when 5 or more message arrives. To ensure that 490 * exactly 5 messages arrives, then you would need to wait a little period 491 * to ensure no further message arrives. This is what you can use this 492 * {@link #setAssertPeriod(long)} method for. 493 * <p/> 494 * By default this period is disabled. 495 * 496 * @param period grace period in millis 497 */ 498 public void setAssertPeriod(long period) { 499 this.assertPeriod = period; 500 } 501 502 /** 503 * Specifies the minimum number of expected message exchanges that should be 504 * received by this endpoint 505 * 506 * @param expectedCount the number of message exchanges that should be 507 * expected by this endpoint 508 */ 509 public void expectedMinimumMessageCount(int expectedCount) { 510 setMinimumExpectedMessageCount(expectedCount); 511 } 512 513 /** 514 * Sets an expectation that the given header name & value are received by this endpoint 515 * <p/> 516 * You can set multiple expectations for different header names. 517 * If you set a value of <tt>null</tt> that means we accept either the header is absent, or its value is <tt>null</tt> 518 * <p/> 519 * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then 520 * there must be 3 values. 521 * <p/> 522 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 523 */ 524 public void expectedHeaderReceived(final String name, final Object value) { 525 if (expectedHeaderValues == null) { 526 expectedHeaderValues = new CaseInsensitiveMap(); 527 // we just wants to expects to be called once 528 expects(new Runnable() { 529 public void run() { 530 for (int i = 0; i < getReceivedExchanges().size(); i++) { 531 Exchange exchange = getReceivedExchange(i); 532 for (Map.Entry<String, Object> entry : expectedHeaderValues.entrySet()) { 533 String key = entry.getKey(); 534 Object expectedValue = entry.getValue(); 535 536 // we accept that an expectedValue of null also means that the header may be absent 537 if (expectedValue != null) { 538 assertTrue("Exchange " + i + " has no headers", exchange.getIn().hasHeaders()); 539 boolean hasKey = exchange.getIn().getHeaders().containsKey(key); 540 assertTrue("No header with name " + key + " found for message: " + i, hasKey); 541 } 542 543 Object actualValue = exchange.getIn().getHeader(key); 544 actualValue = extractActualValue(exchange, actualValue, expectedValue); 545 546 assertEquals("Header with name " + key + " for message: " + i, expectedValue, actualValue); 547 } 548 } 549 } 550 }); 551 } 552 expectedHeaderValues.put(name, value); 553 } 554 555 /** 556 * Adds an expectation that the given header values are received by this 557 * endpoint in any order. 558 * <p/> 559 * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then 560 * there must be 3 values. 561 * <p/> 562 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 563 */ 564 public void expectedHeaderValuesReceivedInAnyOrder(final String name, final List<?> values) { 565 expectedMessageCount(values.size()); 566 567 expects(new Runnable() { 568 public void run() { 569 // these are the expected values to find 570 final Set<Object> actualHeaderValues = new CopyOnWriteArraySet<Object>(values); 571 572 for (int i = 0; i < getReceivedExchanges().size(); i++) { 573 Exchange exchange = getReceivedExchange(i); 574 575 Object actualValue = exchange.getIn().getHeader(name); 576 for (Object expectedValue : actualHeaderValues) { 577 actualValue = extractActualValue(exchange, actualValue, expectedValue); 578 // remove any found values 579 actualHeaderValues.remove(actualValue); 580 } 581 } 582 583 // should be empty, as we should find all the values 584 assertTrue("Expected " + values.size() + " headers with key[" + name + "], received " + (values.size() - actualHeaderValues.size()) 585 + " headers. Expected header values: " + actualHeaderValues, actualHeaderValues.isEmpty()); 586 } 587 }); 588 } 589 590 /** 591 * Adds an expectation that the given header values are received by this 592 * endpoint in any order 593 * <p/> 594 * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then 595 * there must be 3 values. 596 * <p/> 597 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 598 */ 599 public void expectedHeaderValuesReceivedInAnyOrder(String name, Object... values) { 600 List<Object> valueList = new ArrayList<Object>(); 601 valueList.addAll(Arrays.asList(values)); 602 expectedHeaderValuesReceivedInAnyOrder(name, valueList); 603 } 604 605 /** 606 * Sets an expectation that the given property name & value are received by this endpoint 607 * <p/> 608 * You can set multiple expectations for different property names. 609 * If you set a value of <tt>null</tt> that means we accept either the property is absent, or its value is <tt>null</tt> 610 */ 611 public void expectedPropertyReceived(final String name, final Object value) { 612 if (expectedPropertyValues == null) { 613 expectedPropertyValues = new ConcurrentHashMap<String, Object>(); 614 } 615 if (value != null) { 616 // ConcurrentHashMap cannot store null values 617 expectedPropertyValues.put(name, value); 618 } 619 620 expects(new Runnable() { 621 public void run() { 622 for (int i = 0; i < getReceivedExchanges().size(); i++) { 623 Exchange exchange = getReceivedExchange(i); 624 for (Map.Entry<String, Object> entry : expectedPropertyValues.entrySet()) { 625 String key = entry.getKey(); 626 Object expectedValue = entry.getValue(); 627 628 // we accept that an expectedValue of null also means that the header may be absent 629 if (expectedValue != null) { 630 assertTrue("Exchange " + i + " has no properties", !exchange.getProperties().isEmpty()); 631 boolean hasKey = exchange.getProperties().containsKey(key); 632 assertTrue("No property with name " + key + " found for message: " + i, hasKey); 633 } 634 635 Object actualValue = exchange.getProperty(key); 636 actualValue = extractActualValue(exchange, actualValue, expectedValue); 637 638 assertEquals("Property with name " + key + " for message: " + i, expectedValue, actualValue); 639 } 640 } 641 } 642 }); 643 } 644 645 /** 646 * Adds an expectation that the given body values are received by this 647 * endpoint in the specified order 648 * <p/> 649 * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then 650 * there must be 3 values. 651 * <p/> 652 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 653 */ 654 public void expectedBodiesReceived(final List<?> bodies) { 655 expectedMessageCount(bodies.size()); 656 this.expectedBodyValues = bodies; 657 this.actualBodyValues = new ArrayList<Object>(); 658 659 expects(new Runnable() { 660 public void run() { 661 for (int i = 0; i < expectedBodyValues.size(); i++) { 662 Exchange exchange = getReceivedExchange(i); 663 assertTrue("No exchange received for counter: " + i, exchange != null); 664 665 Object expectedBody = expectedBodyValues.get(i); 666 Object actualBody = null; 667 if (i < actualBodyValues.size()) { 668 actualBody = actualBodyValues.get(i); 669 } 670 actualBody = extractActualValue(exchange, actualBody, expectedBody); 671 672 assertEquals("Body of message: " + i, expectedBody, actualBody); 673 } 674 } 675 }); 676 } 677 678 private Object extractActualValue(Exchange exchange, Object actualValue, Object expectedValue) { 679 if (actualValue == null) { 680 return null; 681 } 682 683 if (actualValue instanceof Expression) { 684 Class clazz = Object.class; 685 if (expectedValue != null) { 686 clazz = expectedValue.getClass(); 687 } 688 actualValue = ((Expression)actualValue).evaluate(exchange, clazz); 689 } else if (actualValue instanceof Predicate) { 690 actualValue = ((Predicate)actualValue).matches(exchange); 691 } else if (expectedValue != null) { 692 String from = actualValue.getClass().getName(); 693 String to = expectedValue.getClass().getName(); 694 actualValue = getCamelContext().getTypeConverter().convertTo(expectedValue.getClass(), exchange, actualValue); 695 assertTrue("There is no type conversion possible from " + from + " to " + to, actualValue != null); 696 } 697 return actualValue; 698 } 699 700 /** 701 * Sets an expectation that the given predicates matches the received messages by this endpoint 702 */ 703 public void expectedMessagesMatches(Predicate... predicates) { 704 for (int i = 0; i < predicates.length; i++) { 705 final int messageIndex = i; 706 final Predicate predicate = predicates[i]; 707 final AssertionClause clause = new AssertionClause(this) { 708 public void run() { 709 addPredicate(predicate); 710 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex)); 711 } 712 }; 713 expects(clause); 714 } 715 } 716 717 /** 718 * Sets an expectation that the given body values are received by this endpoint 719 * <p/> 720 * <b>Important:</b> The number of bodies must match the expected number of messages, so if you expect 3 messages, then 721 * there must be 3 bodies. 722 * <p/> 723 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 724 */ 725 public void expectedBodiesReceived(Object... bodies) { 726 List<Object> bodyList = new ArrayList<Object>(); 727 bodyList.addAll(Arrays.asList(bodies)); 728 expectedBodiesReceived(bodyList); 729 } 730 731 /** 732 * Adds an expectation that the given body value are received by this endpoint 733 */ 734 public AssertionClause expectedBodyReceived() { 735 expectedMessageCount(1); 736 final AssertionClause clause = new AssertionClause(this) { 737 public void run() { 738 Exchange exchange = getReceivedExchange(0); 739 assertTrue("No exchange received for counter: " + 0, exchange != null); 740 741 Object actualBody = exchange.getIn().getBody(); 742 Expression exp = createExpression(getCamelContext()); 743 Object expectedBody = exp.evaluate(exchange, Object.class); 744 745 assertEquals("Body of message: " + 0, expectedBody, actualBody); 746 } 747 }; 748 expects(clause); 749 return clause; 750 } 751 752 /** 753 * Adds an expectation that the given body values are received by this 754 * endpoint in any order 755 * <p/> 756 * <b>Important:</b> The number of bodies must match the expected number of messages, so if you expect 3 messages, then 757 * there must be 3 bodies. 758 * <p/> 759 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 760 */ 761 public void expectedBodiesReceivedInAnyOrder(final List<?> bodies) { 762 expectedMessageCount(bodies.size()); 763 this.expectedBodyValues = bodies; 764 this.actualBodyValues = new ArrayList<Object>(); 765 766 expects(new Runnable() { 767 public void run() { 768 List<Object> actualBodyValuesSet = new ArrayList<Object>(actualBodyValues); 769 for (int i = 0; i < expectedBodyValues.size(); i++) { 770 Exchange exchange = getReceivedExchange(i); 771 assertTrue("No exchange received for counter: " + i, exchange != null); 772 773 Object expectedBody = expectedBodyValues.get(i); 774 assertTrue("Message with body " + expectedBody + " was expected but not found in " + actualBodyValuesSet, actualBodyValuesSet.remove(expectedBody)); 775 } 776 } 777 }); 778 } 779 780 /** 781 * Adds an expectation that the given body values are received by this 782 * endpoint in any order 783 * <p/> 784 * <b>Important:</b> The number of bodies must match the expected number of messages, so if you expect 3 messages, then 785 * there must be 3 bodies. 786 * <p/> 787 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 788 */ 789 public void expectedBodiesReceivedInAnyOrder(Object... bodies) { 790 List<Object> bodyList = new ArrayList<Object>(); 791 bodyList.addAll(Arrays.asList(bodies)); 792 expectedBodiesReceivedInAnyOrder(bodyList); 793 } 794 795 /** 796 * Adds an expectation that a file exists with the given name 797 * 798 * @param name name of file, will cater for / and \ on different OS platforms 799 */ 800 public void expectedFileExists(final String name) { 801 expectedFileExists(name, null); 802 } 803 804 /** 805 * Adds an expectation that a file exists with the given name 806 * <p/> 807 * Will wait at most 5 seconds while checking for the existence of the file. 808 * 809 * @param name name of file, will cater for / and \ on different OS platforms 810 * @param content content of file to compare, can be <tt>null</tt> to not compare content 811 */ 812 public void expectedFileExists(final String name, final String content) { 813 final File file = new File(FileUtil.normalizePath(name)); 814 815 expects(new Runnable() { 816 public void run() { 817 // wait at most 5 seconds for the file to exists 818 final long timeout = System.currentTimeMillis() + 5000; 819 820 boolean stop = false; 821 while (!stop && !file.exists()) { 822 try { 823 Thread.sleep(50); 824 } catch (InterruptedException e) { 825 // ignore 826 } 827 stop = System.currentTimeMillis() > timeout; 828 } 829 830 assertTrue("The file should exists: " + name, file.exists()); 831 832 if (content != null) { 833 String body = getCamelContext().getTypeConverter().convertTo(String.class, file); 834 assertEquals("Content of file: " + name, content, body); 835 } 836 } 837 }); 838 } 839 840 /** 841 * Adds an expectation that messages received should have the given exchange pattern 842 */ 843 public void expectedExchangePattern(final ExchangePattern exchangePattern) { 844 expectedMessagesMatches(new Predicate() { 845 public boolean matches(Exchange exchange) { 846 return exchange.getPattern().equals(exchangePattern); 847 } 848 }); 849 } 850 851 /** 852 * Adds an expectation that messages received should have ascending values 853 * of the given expression such as a user generated counter value 854 */ 855 public void expectsAscending(final Expression expression) { 856 expects(new Runnable() { 857 public void run() { 858 assertMessagesAscending(expression); 859 } 860 }); 861 } 862 863 /** 864 * Adds an expectation that messages received should have ascending values 865 * of the given expression such as a user generated counter value 866 */ 867 public AssertionClause expectsAscending() { 868 final AssertionClause clause = new AssertionClause(this) { 869 public void run() { 870 assertMessagesAscending(createExpression(getCamelContext())); 871 } 872 }; 873 expects(clause); 874 return clause; 875 } 876 877 /** 878 * Adds an expectation that messages received should have descending values 879 * of the given expression such as a user generated counter value 880 */ 881 public void expectsDescending(final Expression expression) { 882 expects(new Runnable() { 883 public void run() { 884 assertMessagesDescending(expression); 885 } 886 }); 887 } 888 889 /** 890 * Adds an expectation that messages received should have descending values 891 * of the given expression such as a user generated counter value 892 */ 893 public AssertionClause expectsDescending() { 894 final AssertionClause clause = new AssertionClause(this) { 895 public void run() { 896 assertMessagesDescending(createExpression(getCamelContext())); 897 } 898 }; 899 expects(clause); 900 return clause; 901 } 902 903 /** 904 * Adds an expectation that no duplicate messages should be received using 905 * the expression to determine the message ID 906 * 907 * @param expression the expression used to create a unique message ID for 908 * message comparison (which could just be the message 909 * payload if the payload can be tested for uniqueness using 910 * {@link Object#equals(Object)} and 911 * {@link Object#hashCode()} 912 */ 913 public void expectsNoDuplicates(final Expression expression) { 914 expects(new Runnable() { 915 public void run() { 916 assertNoDuplicates(expression); 917 } 918 }); 919 } 920 921 /** 922 * Adds an expectation that no duplicate messages should be received using 923 * the expression to determine the message ID 924 */ 925 public AssertionClause expectsNoDuplicates() { 926 final AssertionClause clause = new AssertionClause(this) { 927 public void run() { 928 assertNoDuplicates(createExpression(getCamelContext())); 929 } 930 }; 931 expects(clause); 932 return clause; 933 } 934 935 /** 936 * Asserts that the messages have ascending values of the given expression 937 */ 938 public void assertMessagesAscending(Expression expression) { 939 assertMessagesSorted(expression, true); 940 } 941 942 /** 943 * Asserts that the messages have descending values of the given expression 944 */ 945 public void assertMessagesDescending(Expression expression) { 946 assertMessagesSorted(expression, false); 947 } 948 949 protected void assertMessagesSorted(Expression expression, boolean ascending) { 950 String type = ascending ? "ascending" : "descending"; 951 ExpressionComparator comparator = new ExpressionComparator(expression); 952 List<Exchange> list = getReceivedExchanges(); 953 for (int i = 1; i < list.size(); i++) { 954 int j = i - 1; 955 Exchange e1 = list.get(j); 956 Exchange e2 = list.get(i); 957 int result = comparator.compare(e1, e2); 958 if (result == 0) { 959 fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " 960 + expression.evaluate(e1, Object.class) + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2); 961 } else { 962 if (!ascending) { 963 result = result * -1; 964 } 965 if (result > 0) { 966 fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1, Object.class) 967 + " and message " + i + " has value: " + expression.evaluate(e2, Object.class) + " for expression: " 968 + expression + ". Exchanges: " + e1 + " and " + e2); 969 } 970 } 971 } 972 } 973 974 public void assertNoDuplicates(Expression expression) { 975 Map<Object, Exchange> map = new HashMap<Object, Exchange>(); 976 List<Exchange> list = getReceivedExchanges(); 977 for (int i = 0; i < list.size(); i++) { 978 Exchange e2 = list.get(i); 979 Object key = expression.evaluate(e2, Object.class); 980 Exchange e1 = map.get(key); 981 if (e1 != null) { 982 fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2); 983 } else { 984 map.put(key, e2); 985 } 986 } 987 } 988 989 /** 990 * Adds the expectation which will be invoked when enough messages are received 991 */ 992 public void expects(Runnable runnable) { 993 tests.add(runnable); 994 } 995 996 /** 997 * Adds an assertion to the given message index 998 * 999 * @param messageIndex the number of the message 1000 * @return the assertion clause 1001 */ 1002 public AssertionClause message(final int messageIndex) { 1003 final AssertionClause clause = new AssertionClause(this) { 1004 public void run() { 1005 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex)); 1006 } 1007 }; 1008 expects(clause); 1009 return clause; 1010 } 1011 1012 /** 1013 * Adds an assertion to all the received messages 1014 * 1015 * @return the assertion clause 1016 */ 1017 public AssertionClause allMessages() { 1018 final AssertionClause clause = new AssertionClause(this) { 1019 public void run() { 1020 List<Exchange> list = getReceivedExchanges(); 1021 int index = 0; 1022 for (Exchange exchange : list) { 1023 applyAssertionOn(MockEndpoint.this, index++, exchange); 1024 } 1025 } 1026 }; 1027 expects(clause); 1028 return clause; 1029 } 1030 1031 /** 1032 * Asserts that the given index of message is received (starting at zero) 1033 */ 1034 public Exchange assertExchangeReceived(int index) { 1035 int count = getReceivedCounter(); 1036 assertTrue("Not enough messages received. Was: " + count, count > index); 1037 return getReceivedExchange(index); 1038 } 1039 1040 // Properties 1041 // ------------------------------------------------------------------------- 1042 1043 public String getName() { 1044 return name; 1045 } 1046 1047 public void setName(String name) { 1048 this.name = name; 1049 } 1050 1051 public List<Throwable> getFailures() { 1052 return failures; 1053 } 1054 1055 public int getReceivedCounter() { 1056 return counter; 1057 } 1058 1059 public List<Exchange> getReceivedExchanges() { 1060 return receivedExchanges; 1061 } 1062 1063 public int getExpectedCount() { 1064 return expectedCount; 1065 } 1066 1067 public long getSleepForEmptyTest() { 1068 return sleepForEmptyTest; 1069 } 1070 1071 /** 1072 * Allows a sleep to be specified to wait to check that this endpoint really 1073 * is empty when {@link #expectedMessageCount(int)} is called with zero 1074 * 1075 * @param sleepForEmptyTest the milliseconds to sleep for to determine that 1076 * this endpoint really is empty 1077 */ 1078 public void setSleepForEmptyTest(long sleepForEmptyTest) { 1079 this.sleepForEmptyTest = sleepForEmptyTest; 1080 } 1081 1082 public long getResultWaitTime() { 1083 return resultWaitTime; 1084 } 1085 1086 /** 1087 * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will 1088 * wait on a latch until it is satisfied 1089 */ 1090 public void setResultWaitTime(long resultWaitTime) { 1091 this.resultWaitTime = resultWaitTime; 1092 } 1093 1094 /** 1095 * Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will 1096 * wait on a latch until it is satisfied 1097 */ 1098 public void setResultMinimumWaitTime(long resultMinimumWaitTime) { 1099 this.resultMinimumWaitTime = resultMinimumWaitTime; 1100 } 1101 1102 /** 1103 * @deprecated use {@link #setResultMinimumWaitTime(long)} 1104 */ 1105 @Deprecated 1106 public void setMinimumResultWaitTime(long resultMinimumWaitTime) { 1107 setResultMinimumWaitTime(resultMinimumWaitTime); 1108 } 1109 1110 /** 1111 * Specifies the expected number of message exchanges that should be 1112 * received by this endpoint. 1113 * <p/> 1114 * <b>Beware:</b> If you want to expect that <tt>0</tt> messages, then take extra care, 1115 * as <tt>0</tt> matches when the tests starts, so you need to set a assert period time 1116 * to let the test run for a while to make sure there are still no messages arrived; for 1117 * that use {@link #setAssertPeriod(long)}. 1118 * An alternative is to use <a href="http://camel.apache.org/notifybuilder.html">NotifyBuilder</a>, and use the notifier 1119 * to know when Camel is done routing some messages, before you call the {@link #assertIsSatisfied()} method on the mocks. 1120 * This allows you to not use a fixed assert period, to speedup testing times. 1121 * <p/> 1122 * If you want to assert that <b>exactly</b> n'th message arrives to this mock 1123 * endpoint, then see also the {@link #setAssertPeriod(long)} method for further details. 1124 * 1125 * @param expectedCount the number of message exchanges that should be 1126 * expected by this endpoint 1127 * @see #setAssertPeriod(long) 1128 */ 1129 public void setExpectedCount(int expectedCount) { 1130 setExpectedMessageCount(expectedCount); 1131 } 1132 1133 /** 1134 * @see #setExpectedCount(int) 1135 */ 1136 public void setExpectedMessageCount(int expectedCount) { 1137 this.expectedCount = expectedCount; 1138 if (expectedCount <= 0) { 1139 latch = null; 1140 } else { 1141 latch = new CountDownLatch(expectedCount); 1142 } 1143 } 1144 1145 /** 1146 * Specifies the minimum number of expected message exchanges that should be 1147 * received by this endpoint 1148 * 1149 * @param expectedCount the number of message exchanges that should be 1150 * expected by this endpoint 1151 */ 1152 public void setMinimumExpectedMessageCount(int expectedCount) { 1153 this.expectedMinimumCount = expectedCount; 1154 if (expectedCount <= 0) { 1155 latch = null; 1156 } else { 1157 latch = new CountDownLatch(expectedMinimumCount); 1158 } 1159 } 1160 1161 public Processor getReporter() { 1162 return reporter; 1163 } 1164 1165 /** 1166 * Allows a processor to added to the endpoint to report on progress of the test 1167 */ 1168 public void setReporter(Processor reporter) { 1169 this.reporter = reporter; 1170 } 1171 1172 /** 1173 * Specifies to only retain the first n'th number of received {@link Exchange}s. 1174 * <p/> 1175 * This is used when testing with big data, to reduce memory consumption by not storing 1176 * copies of every {@link Exchange} this mock endpoint receives. 1177 * <p/> 1178 * <b>Important:</b> When using this limitation, then the {@link #getReceivedCounter()} 1179 * will still return the actual number of received {@link Exchange}s. For example 1180 * if we have received 5000 {@link Exchange}s, and have configured to only retain the first 1181 * 10 {@link Exchange}s, then the {@link #getReceivedCounter()} will still return <tt>5000</tt> 1182 * but there is only the first 10 {@link Exchange}s in the {@link #getExchanges()} and 1183 * {@link #getReceivedExchanges()} methods. 1184 * <p/> 1185 * When using this method, then some of the other expectation methods is not supported, 1186 * for example the {@link #expectedBodiesReceived(Object...)} sets a expectation on the first 1187 * number of bodies received. 1188 * <p/> 1189 * You can configure both {@link #setRetainFirst(int)} and {@link #setRetainLast(int)} methods, 1190 * to limit both the first and last received. 1191 * 1192 * @param retainFirst to limit and only keep the first n'th received {@link Exchange}s, use 1193 * <tt>0</tt> to not retain any messages, or <tt>-1</tt> to retain all. 1194 * @see #setRetainLast(int) 1195 */ 1196 public void setRetainFirst(int retainFirst) { 1197 this.retainFirst = retainFirst; 1198 } 1199 1200 /** 1201 * Specifies to only retain the last n'th number of received {@link Exchange}s. 1202 * <p/> 1203 * This is used when testing with big data, to reduce memory consumption by not storing 1204 * copies of every {@link Exchange} this mock endpoint receives. 1205 * <p/> 1206 * <b>Important:</b> When using this limitation, then the {@link #getReceivedCounter()} 1207 * will still return the actual number of received {@link Exchange}s. For example 1208 * if we have received 5000 {@link Exchange}s, and have configured to only retain the last 1209 * 20 {@link Exchange}s, then the {@link #getReceivedCounter()} will still return <tt>5000</tt> 1210 * but there is only the last 20 {@link Exchange}s in the {@link #getExchanges()} and 1211 * {@link #getReceivedExchanges()} methods. 1212 * <p/> 1213 * When using this method, then some of the other expectation methods is not supported, 1214 * for example the {@link #expectedBodiesReceived(Object...)} sets a expectation on the first 1215 * number of bodies received. 1216 * <p/> 1217 * You can configure both {@link #setRetainFirst(int)} and {@link #setRetainLast(int)} methods, 1218 * to limit both the first and last received. 1219 * 1220 * @param retainLast to limit and only keep the last n'th received {@link Exchange}s, use 1221 * <tt>0</tt> to not retain any messages, or <tt>-1</tt> to retain all. 1222 * @see #setRetainFirst(int) 1223 */ 1224 public void setRetainLast(int retainLast) { 1225 this.retainLast = retainLast; 1226 } 1227 1228 public int isReportGroup() { 1229 return reportGroup; 1230 } 1231 1232 /** 1233 * A number that is used to turn on throughput logging based on groups of the size. 1234 */ 1235 public void setReportGroup(int reportGroup) { 1236 this.reportGroup = reportGroup; 1237 } 1238 1239 public boolean isCopyOnExchange() { 1240 return copyOnExchange; 1241 } 1242 1243 /** 1244 * Sets whether to make a deep copy of the incoming {@link Exchange} when received at this mock endpoint. 1245 * <p/> 1246 * Is by default <tt>true</tt>. 1247 */ 1248 public void setCopyOnExchange(boolean copyOnExchange) { 1249 this.copyOnExchange = copyOnExchange; 1250 } 1251 1252 // Implementation methods 1253 // ------------------------------------------------------------------------- 1254 private void init() { 1255 expectedCount = -1; 1256 counter = 0; 1257 defaultProcessor = null; 1258 processors = new HashMap<Integer, Processor>(); 1259 receivedExchanges = new CopyOnWriteArrayList<Exchange>(); 1260 failures = new CopyOnWriteArrayList<Throwable>(); 1261 tests = new CopyOnWriteArrayList<Runnable>(); 1262 latch = null; 1263 sleepForEmptyTest = 0; 1264 resultWaitTime = 0; 1265 resultMinimumWaitTime = 0L; 1266 assertPeriod = 0L; 1267 expectedMinimumCount = -1; 1268 expectedBodyValues = null; 1269 actualBodyValues = new ArrayList<Object>(); 1270 expectedHeaderValues = null; 1271 actualHeaderValues = null; 1272 expectedPropertyValues = null; 1273 actualPropertyValues = null; 1274 retainFirst = -1; 1275 retainLast = -1; 1276 } 1277 1278 protected synchronized void onExchange(Exchange exchange) { 1279 try { 1280 if (reporter != null) { 1281 reporter.process(exchange); 1282 } 1283 Exchange copy = exchange; 1284 if (copyOnExchange) { 1285 // copy the exchange so the mock stores the copy and not the actual exchange 1286 copy = ExchangeHelper.createCopy(exchange, true); 1287 } 1288 performAssertions(exchange, copy); 1289 } catch (Throwable e) { 1290 // must catch java.lang.Throwable as AssertionError extends java.lang.Error 1291 failures.add(e); 1292 } finally { 1293 // make sure latch is counted down to avoid test hanging forever 1294 if (latch != null) { 1295 latch.countDown(); 1296 } 1297 } 1298 } 1299 1300 /** 1301 * Performs the assertions on the incoming exchange. 1302 * 1303 * @param exchange the actual exchange 1304 * @param copy a copy of the exchange (only store this) 1305 * @throws Exception can be thrown if something went wrong 1306 */ 1307 protected void performAssertions(Exchange exchange, Exchange copy) throws Exception { 1308 Message in = copy.getIn(); 1309 Object actualBody = in.getBody(); 1310 1311 if (expectedHeaderValues != null) { 1312 if (actualHeaderValues == null) { 1313 actualHeaderValues = new CaseInsensitiveMap(); 1314 } 1315 if (in.hasHeaders()) { 1316 actualHeaderValues.putAll(in.getHeaders()); 1317 } 1318 } 1319 1320 if (expectedPropertyValues != null) { 1321 if (actualPropertyValues == null) { 1322 actualPropertyValues = new ConcurrentHashMap<String, Object>(); 1323 } 1324 actualPropertyValues.putAll(copy.getProperties()); 1325 } 1326 1327 if (expectedBodyValues != null) { 1328 int index = actualBodyValues.size(); 1329 if (expectedBodyValues.size() > index) { 1330 Object expectedBody = expectedBodyValues.get(index); 1331 if (expectedBody != null) { 1332 // prefer to convert body early, for example when using files 1333 // we need to read the content at this time 1334 Object body = in.getBody(expectedBody.getClass()); 1335 if (body != null) { 1336 actualBody = body; 1337 } 1338 } 1339 actualBodyValues.add(actualBody); 1340 } 1341 } 1342 1343 // let counter be 0 index-based in the logs 1344 if (LOG.isDebugEnabled()) { 1345 String msg = getEndpointUri() + " >>>> " + counter + " : " + copy + " with body: " + actualBody; 1346 if (copy.getIn().hasHeaders()) { 1347 msg += " and headers:" + copy.getIn().getHeaders(); 1348 } 1349 LOG.debug(msg); 1350 } 1351 1352 // record timestamp when exchange was received 1353 copy.setProperty(Exchange.RECEIVED_TIMESTAMP, new Date()); 1354 1355 // add a copy of the received exchange 1356 addReceivedExchange(copy); 1357 // and then increment counter after adding received exchange 1358 ++counter; 1359 1360 Processor processor = processors.get(getReceivedCounter()) != null 1361 ? processors.get(getReceivedCounter()) : defaultProcessor; 1362 1363 if (processor != null) { 1364 try { 1365 // must process the incoming exchange and NOT the copy as the idea 1366 // is the end user can manipulate the exchange 1367 processor.process(exchange); 1368 } catch (Exception e) { 1369 // set exceptions on exchange so we can throw exceptions to simulate errors 1370 exchange.setException(e); 1371 } 1372 } 1373 } 1374 1375 /** 1376 * Adds the received exchange. 1377 * 1378 * @param copy a copy of the received exchange 1379 */ 1380 protected void addReceivedExchange(Exchange copy) { 1381 if (retainFirst == 0 && retainLast == 0) { 1382 // do not retain any messages at all 1383 } else if (retainFirst < 0 && retainLast < 0) { 1384 // no limitation so keep them all 1385 receivedExchanges.add(copy); 1386 } else { 1387 // okay there is some sort of limitations, so figure out what to retain 1388 if (retainFirst > 0 && counter < retainFirst) { 1389 // store a copy as its within the retain first limitation 1390 receivedExchanges.add(copy); 1391 } else if (retainLast > 0) { 1392 // remove the oldest from the last retained boundary, 1393 int index = receivedExchanges.size() - retainLast; 1394 if (index >= 0) { 1395 // but must be outside the first range as well 1396 // otherwise we should not remove the oldest 1397 if (retainFirst <= 0 || retainFirst <= index) { 1398 receivedExchanges.remove(index); 1399 } 1400 } 1401 // store a copy of the last n'th received 1402 receivedExchanges.add(copy); 1403 } 1404 } 1405 } 1406 1407 protected void waitForCompleteLatch() throws InterruptedException { 1408 if (latch == null) { 1409 fail("Should have a latch!"); 1410 } 1411 1412 StopWatch watch = new StopWatch(); 1413 waitForCompleteLatch(resultWaitTime); 1414 long delta = watch.stop(); 1415 LOG.debug("Took {} millis to complete latch", delta); 1416 1417 if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) { 1418 fail("Expected minimum " + resultMinimumWaitTime 1419 + " millis waiting on the result, but was faster with " + delta + " millis."); 1420 } 1421 } 1422 1423 protected void waitForCompleteLatch(long timeout) throws InterruptedException { 1424 // Wait for a default 10 seconds if resultWaitTime is not set 1425 long waitTime = timeout == 0 ? 10000L : timeout; 1426 1427 // now let's wait for the results 1428 LOG.debug("Waiting on the latch for: " + timeout + " millis"); 1429 latch.await(waitTime, TimeUnit.MILLISECONDS); 1430 } 1431 1432 protected void assertEquals(String message, Object expectedValue, Object actualValue) { 1433 if (!ObjectHelper.equal(expectedValue, actualValue)) { 1434 fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">"); 1435 } 1436 } 1437 1438 protected void assertTrue(String message, boolean predicate) { 1439 if (!predicate) { 1440 fail(message); 1441 } 1442 } 1443 1444 protected void fail(Object message) { 1445 if (LOG.isDebugEnabled()) { 1446 List<Exchange> list = getReceivedExchanges(); 1447 int index = 0; 1448 for (Exchange exchange : list) { 1449 LOG.debug("{} failed and received[{}]: {}", new Object[]{getEndpointUri(), ++index, exchange}); 1450 } 1451 } 1452 throw new AssertionError(getEndpointUri() + " " + message); 1453 } 1454 1455 public int getExpectedMinimumCount() { 1456 return expectedMinimumCount; 1457 } 1458 1459 public void await() throws InterruptedException { 1460 if (latch != null) { 1461 latch.await(); 1462 } 1463 } 1464 1465 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 1466 if (latch != null) { 1467 return latch.await(timeout, unit); 1468 } 1469 return true; 1470 } 1471 1472 public boolean isSingleton() { 1473 return true; 1474 } 1475 1476 public boolean isLenientProperties() { 1477 return true; 1478 } 1479 1480 private Exchange getReceivedExchange(int index) { 1481 if (index <= receivedExchanges.size() - 1) { 1482 return receivedExchanges.get(index); 1483 } else { 1484 return null; 1485 } 1486 } 1487 1488}