001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.mock; 018 019import java.io.File; 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Date; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.CopyOnWriteArrayList; 029import java.util.concurrent.CopyOnWriteArraySet; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.camel.AsyncCallback; 034import org.apache.camel.CamelContext; 035import org.apache.camel.Component; 036import org.apache.camel.Consumer; 037import org.apache.camel.Endpoint; 038import org.apache.camel.Exchange; 039import org.apache.camel.ExchangePattern; 040import org.apache.camel.Expression; 041import org.apache.camel.Handler; 042import org.apache.camel.Message; 043import org.apache.camel.Predicate; 044import org.apache.camel.Processor; 045import org.apache.camel.Producer; 046import org.apache.camel.builder.ProcessorBuilder; 047import org.apache.camel.impl.DefaultAsyncProducer; 048import org.apache.camel.impl.DefaultEndpoint; 049import org.apache.camel.impl.InterceptSendToEndpoint; 050import org.apache.camel.spi.BrowsableEndpoint; 051import org.apache.camel.spi.Metadata; 052import org.apache.camel.spi.UriEndpoint; 053import org.apache.camel.spi.UriParam; 054import org.apache.camel.spi.UriPath; 055import org.apache.camel.util.CamelContextHelper; 056import org.apache.camel.util.ExchangeHelper; 057import org.apache.camel.util.ExpressionComparator; 058import org.apache.camel.util.FileUtil; 059import org.apache.camel.util.ObjectHelper; 060import org.apache.camel.util.StopWatch; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064/** 065 * The mock component is used for testing routes and mediation rules using mocks. 066 * <p/> 067 * A Mock endpoint which provides a literate, fluent API for testing routes 068 * using a <a href="http://jmock.org/">JMock style</a> API. 069 * <p/> 070 * The mock endpoint have two set of methods 071 * <ul> 072 * <li>expectedXXX or expectsXXX - To set pre conditions, before the test is executed</li> 073 * <li>assertXXX - To assert assertions, after the test has been executed</li> 074 * </ul> 075 * Its <b>important</b> to know the difference between the two set. The former is used to 076 * set expectations before the test is being started (eg before the mock receives messages). 077 * The latter is used after the test has been executed, to verify the expectations; or 078 * other assertions which you can perform after the test has been completed. 079 * <p/> 080 * <b>Beware:</b> If you want to expect a mock does not receive any messages, by calling 081 * {@link #setExpectedMessageCount(int)} with <tt>0</tt>, then take extra care, 082 * as <tt>0</tt> matches when the tests starts, so you need to set a assert period time 083 * to let the test run for a while to make sure there are still no messages arrived; for 084 * that use {@link #setAssertPeriod(long)}. 085 * An alternative is to use <a href="http://camel.apache.org/notifybuilder.html">NotifyBuilder</a>, and use the notifier 086 * to know when Camel is done routing some messages, before you call the {@link #assertIsSatisfied()} method on the mocks. 087 * This allows you to not use a fixed assert period, to speedup testing times. 088 * <p/> 089 * <b>Important:</b> If using {@link #expectedMessageCount(int)} and also {@link #expectedBodiesReceived(java.util.List)} or 090 * {@link #expectedHeaderReceived(String, Object)} then the latter overrides the number of expected message based on the 091 * number of values provided in the bodies/headers. 092 * 093 * @version 094 */ 095@UriEndpoint(firstVersion = "1.0.0", scheme = "mock", title = "Mock", syntax = "mock:name", producerOnly = true, label = "core,testing", lenientProperties = true) 096public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint { 097 private static final Logger LOG = LoggerFactory.getLogger(MockEndpoint.class); 098 // must be volatile so changes is visible between the thread which performs the assertions 099 // and the threads which process the exchanges when routing messages in Camel 100 protected volatile Processor reporter; 101 102 private volatile Processor defaultProcessor; 103 private volatile Map<Integer, Processor> processors; 104 private volatile List<Exchange> receivedExchanges; 105 private volatile List<Throwable> failures; 106 private volatile List<Runnable> tests; 107 private volatile CountDownLatch latch; 108 private volatile int expectedMinimumCount; 109 private volatile List<?> expectedBodyValues; 110 private volatile List<Object> actualBodyValues; 111 private volatile Map<String, Object> expectedHeaderValues; 112 private volatile Map<String, Object> actualHeaderValues; 113 private volatile Map<String, Object> expectedPropertyValues; 114 private volatile Map<String, Object> actualPropertyValues; 115 116 private volatile int counter; 117 118 @UriPath(description = "Name of mock endpoint") @Metadata(required = "true") 119 private String name; 120 @UriParam(label = "producer", defaultValue = "-1") 121 private int expectedCount; 122 @UriParam(label = "producer", defaultValue = "0") 123 private long sleepForEmptyTest; 124 @UriParam(label = "producer", defaultValue = "0") 125 private long resultWaitTime; 126 @UriParam(label = "producer", defaultValue = "0") 127 private long resultMinimumWaitTime; 128 @UriParam(label = "producer", defaultValue = "0") 129 private long assertPeriod; 130 @UriParam(label = "producer", defaultValue = "-1") 131 private int retainFirst; 132 @UriParam(label = "producer", defaultValue = "-1") 133 private int retainLast; 134 @UriParam(label = "producer") 135 private int reportGroup; 136 @UriParam(label = "producer,advanced", defaultValue = "true") 137 private boolean copyOnExchange = true; 138 139 public MockEndpoint(String endpointUri, Component component) { 140 super(endpointUri, component); 141 init(); 142 } 143 144 @Deprecated 145 public MockEndpoint(String endpointUri) { 146 super(endpointUri); 147 init(); 148 } 149 150 public MockEndpoint() { 151 this(null); 152 } 153 154 /** 155 * A helper method to resolve the mock endpoint of the given URI on the given context 156 * 157 * @param context the camel context to try resolve the mock endpoint from 158 * @param uri the uri of the endpoint to resolve 159 * @return the endpoint 160 */ 161 public static MockEndpoint resolve(CamelContext context, String uri) { 162 return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class); 163 } 164 165 public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 166 long start = System.currentTimeMillis(); 167 long left = unit.toMillis(timeout); 168 long end = start + left; 169 for (MockEndpoint endpoint : endpoints) { 170 if (!endpoint.await(left, TimeUnit.MILLISECONDS)) { 171 throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out."); 172 } 173 left = end - System.currentTimeMillis(); 174 if (left <= 0) { 175 left = 0; 176 } 177 } 178 } 179 180 public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 181 assertWait(timeout, unit, endpoints); 182 for (MockEndpoint endpoint : endpoints) { 183 endpoint.assertIsSatisfied(); 184 } 185 } 186 187 public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException { 188 for (MockEndpoint endpoint : endpoints) { 189 endpoint.assertIsSatisfied(); 190 } 191 } 192 193 194 /** 195 * Asserts that all the expectations on any {@link MockEndpoint} instances registered 196 * in the given context are valid 197 * 198 * @param context the camel context used to find all the available endpoints to be asserted 199 */ 200 public static void assertIsSatisfied(CamelContext context) throws InterruptedException { 201 ObjectHelper.notNull(context, "camelContext"); 202 Collection<Endpoint> endpoints = context.getEndpoints(); 203 for (Endpoint endpoint : endpoints) { 204 // if the endpoint was intercepted we should get the delegate 205 if (endpoint instanceof InterceptSendToEndpoint) { 206 endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate(); 207 } 208 if (endpoint instanceof MockEndpoint) { 209 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 210 mockEndpoint.assertIsSatisfied(); 211 } 212 } 213 } 214 215 /** 216 * Asserts that all the expectations on any {@link MockEndpoint} instances registered 217 * in the given context are valid 218 * 219 * @param context the camel context used to find all the available endpoints to be asserted 220 * @param timeout timeout 221 * @param unit time unit 222 */ 223 public static void assertIsSatisfied(CamelContext context, long timeout, TimeUnit unit) throws InterruptedException { 224 ObjectHelper.notNull(context, "camelContext"); 225 ObjectHelper.notNull(unit, "unit"); 226 Collection<Endpoint> endpoints = context.getEndpoints(); 227 long millis = unit.toMillis(timeout); 228 for (Endpoint endpoint : endpoints) { 229 // if the endpoint was intercepted we should get the delegate 230 if (endpoint instanceof InterceptSendToEndpoint) { 231 endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate(); 232 } 233 if (endpoint instanceof MockEndpoint) { 234 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 235 mockEndpoint.setResultWaitTime(millis); 236 mockEndpoint.assertIsSatisfied(); 237 } 238 } 239 } 240 241 /** 242 * Sets the assert period on all the expectations on any {@link MockEndpoint} instances registered 243 * in the given context. 244 * 245 * @param context the camel context used to find all the available endpoints 246 * @param period the period in millis 247 */ 248 public static void setAssertPeriod(CamelContext context, long period) { 249 ObjectHelper.notNull(context, "camelContext"); 250 Collection<Endpoint> endpoints = context.getEndpoints(); 251 for (Endpoint endpoint : endpoints) { 252 // if the endpoint was intercepted we should get the delegate 253 if (endpoint instanceof InterceptSendToEndpoint) { 254 endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate(); 255 } 256 if (endpoint instanceof MockEndpoint) { 257 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 258 mockEndpoint.setAssertPeriod(period); 259 } 260 } 261 } 262 263 /** 264 * Reset all mock endpoints 265 * 266 * @param context the camel context used to find all the available endpoints to reset 267 */ 268 public static void resetMocks(CamelContext context) { 269 ObjectHelper.notNull(context, "camelContext"); 270 Collection<Endpoint> endpoints = context.getEndpoints(); 271 for (Endpoint endpoint : endpoints) { 272 // if the endpoint was intercepted we should get the delegate 273 if (endpoint instanceof InterceptSendToEndpoint) { 274 endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate(); 275 } 276 if (endpoint instanceof MockEndpoint) { 277 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 278 mockEndpoint.reset(); 279 } 280 } 281 } 282 283 public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException { 284 for (MockEndpoint endpoint : endpoints) { 285 endpoint.setExpectedMessageCount(count); 286 } 287 } 288 289 public List<Exchange> getExchanges() { 290 return getReceivedExchanges(); 291 } 292 293 public Consumer createConsumer(Processor processor) throws Exception { 294 throw new UnsupportedOperationException("You cannot consume from this endpoint"); 295 } 296 297 public Producer createProducer() throws Exception { 298 return new DefaultAsyncProducer(this) { 299 public boolean process(Exchange exchange, AsyncCallback callback) { 300 onExchange(exchange); 301 callback.done(true); 302 return true; 303 } 304 }; 305 } 306 307 public void reset() { 308 init(); 309 } 310 311 312 // Testing API 313 // ------------------------------------------------------------------------- 314 315 /** 316 * Handles the incoming exchange. 317 * <p/> 318 * This method turns this mock endpoint into a bean which you can use 319 * in the Camel routes, which allows you to inject MockEndpoint as beans 320 * in your routes and use the features of the mock to control the bean. 321 * 322 * @param exchange the exchange 323 * @throws Exception can be thrown 324 */ 325 @Handler 326 public void handle(Exchange exchange) throws Exception { 327 onExchange(exchange); 328 } 329 330 /** 331 * Set the processor that will be invoked when the index 332 * message is received. 333 */ 334 public void whenExchangeReceived(int index, Processor processor) { 335 this.processors.put(index, processor); 336 } 337 338 /** 339 * Set the processor that will be invoked when the some message 340 * is received. 341 * 342 * This processor could be overwritten by 343 * {@link #whenExchangeReceived(int, Processor)} method. 344 */ 345 public void whenAnyExchangeReceived(Processor processor) { 346 this.defaultProcessor = processor; 347 } 348 349 /** 350 * Set the expression which value will be set to the message body 351 * @param expression which is use to set the message body 352 */ 353 public void returnReplyBody(Expression expression) { 354 this.defaultProcessor = ProcessorBuilder.setBody(expression); 355 } 356 357 /** 358 * Set the expression which value will be set to the message header 359 * @param headerName that will be set value 360 * @param expression which is use to set the message header 361 */ 362 public void returnReplyHeader(String headerName, Expression expression) { 363 this.defaultProcessor = ProcessorBuilder.setHeader(headerName, expression); 364 } 365 366 367 /** 368 * Validates that all the available expectations on this endpoint are 369 * satisfied; or throw an exception 370 */ 371 public void assertIsSatisfied() throws InterruptedException { 372 assertIsSatisfied(sleepForEmptyTest); 373 } 374 375 /** 376 * Validates that all the available expectations on this endpoint are 377 * satisfied; or throw an exception 378 * 379 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we 380 * should wait for the test to be true 381 */ 382 public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 383 LOG.info("Asserting: {} is satisfied", this); 384 doAssertIsSatisfied(timeoutForEmptyEndpoints); 385 if (assertPeriod > 0) { 386 // if an assert period was set then re-assert again to ensure the assertion is still valid 387 Thread.sleep(assertPeriod); 388 LOG.info("Re-asserting: {} is satisfied after {} millis", this, assertPeriod); 389 // do not use timeout when we re-assert 390 doAssertIsSatisfied(0); 391 } 392 } 393 394 protected void doAssertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 395 if (expectedCount == 0) { 396 if (timeoutForEmptyEndpoints > 0) { 397 LOG.debug("Sleeping for: {} millis to check there really are no messages received", timeoutForEmptyEndpoints); 398 Thread.sleep(timeoutForEmptyEndpoints); 399 } 400 assertEquals("Received message count", expectedCount, getReceivedCounter()); 401 } else if (expectedCount > 0) { 402 if (expectedCount != getReceivedCounter()) { 403 waitForCompleteLatch(); 404 } 405 assertEquals("Received message count", expectedCount, getReceivedCounter()); 406 } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) { 407 waitForCompleteLatch(); 408 } 409 410 if (expectedMinimumCount >= 0) { 411 int receivedCounter = getReceivedCounter(); 412 assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter); 413 } 414 415 for (Runnable test : tests) { 416 test.run(); 417 } 418 419 for (Throwable failure : failures) { 420 if (failure != null) { 421 LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure); 422 fail("Failed due to caught exception: " + failure); 423 } 424 } 425 } 426 427 /** 428 * Validates that the assertions fail on this endpoint 429 */ 430 public void assertIsNotSatisfied() throws InterruptedException { 431 boolean failed = false; 432 try { 433 assertIsSatisfied(); 434 // did not throw expected error... fail! 435 failed = true; 436 } catch (AssertionError e) { 437 if (LOG.isDebugEnabled()) { 438 // log incl stacktrace 439 LOG.debug("Caught expected failure: " + e.getMessage(), e); 440 } else { 441 LOG.info("Caught expected failure: " + e.getMessage()); 442 } 443 } 444 if (failed) { 445 // fail() throws the AssertionError to indicate the test failed. 446 fail("Expected assertion failure but test succeeded!"); 447 } 448 } 449 450 /** 451 * Validates that the assertions fail on this endpoint 452 453 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we 454 * should wait for the test to be true 455 */ 456 public void assertIsNotSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 457 boolean failed = false; 458 try { 459 assertIsSatisfied(timeoutForEmptyEndpoints); 460 // did not throw expected error... fail! 461 failed = true; 462 } catch (AssertionError e) { 463 if (LOG.isDebugEnabled()) { 464 // log incl stacktrace 465 LOG.debug("Caught expected failure: " + e.getMessage(), e); 466 } else { 467 LOG.info("Caught expected failure: " + e.getMessage()); 468 } 469 } 470 if (failed) { 471 // fail() throws the AssertionError to indicate the test failed. 472 fail("Expected assertion failure but test succeeded!"); 473 } 474 } 475 476 /** 477 * Specifies the expected number of message exchanges that should be 478 * received by this endpoint 479 * 480 * If you want to assert that <b>exactly</b> n messages arrives to this mock 481 * endpoint, then see also the {@link #setAssertPeriod(long)} method for further details. 482 * 483 * @param expectedCount the number of message exchanges that should be 484 * expected by this endpoint 485 * @see #setAssertPeriod(long) 486 */ 487 public void expectedMessageCount(int expectedCount) { 488 setExpectedMessageCount(expectedCount); 489 } 490 491 /** 492 * Sets a grace period after which the mock endpoint will re-assert 493 * to ensure the preliminary assertion is still valid. 494 * <p/> 495 * This is used for example to assert that <b>exactly</b> a number of messages 496 * arrives. For example if {@link #expectedMessageCount(int)} was set to 5, then 497 * the assertion is satisfied when 5 or more message arrives. To ensure that 498 * exactly 5 messages arrives, then you would need to wait a little period 499 * to ensure no further message arrives. This is what you can use this 500 * {@link #setAssertPeriod(long)} method for. 501 * <p/> 502 * By default this period is disabled. 503 * 504 * @param period grace period in millis 505 */ 506 public void setAssertPeriod(long period) { 507 this.assertPeriod = period; 508 } 509 510 /** 511 * Specifies the minimum number of expected message exchanges that should be 512 * received by this endpoint 513 * 514 * @param expectedCount the number of message exchanges that should be 515 * expected by this endpoint 516 */ 517 public void expectedMinimumMessageCount(int expectedCount) { 518 setMinimumExpectedMessageCount(expectedCount); 519 } 520 521 /** 522 * Sets an expectation that the given header name & value are received by this endpoint 523 * <p/> 524 * You can set multiple expectations for different header names. 525 * If you set a value of <tt>null</tt> that means we accept either the header is absent, or its value is <tt>null</tt> 526 * <p/> 527 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 528 */ 529 public void expectedHeaderReceived(final String name, final Object value) { 530 if (expectedCount == -1) { 531 expectedMessageCount(1); 532 } 533 if (expectedHeaderValues == null) { 534 expectedHeaderValues = getCamelContext().getHeadersMapFactory().newMap(); 535 // we just wants to expects to be called once 536 expects(new Runnable() { 537 public void run() { 538 for (int i = 0; i < getReceivedExchanges().size(); i++) { 539 Exchange exchange = getReceivedExchange(i); 540 for (Map.Entry<String, Object> entry : expectedHeaderValues.entrySet()) { 541 String key = entry.getKey(); 542 Object expectedValue = entry.getValue(); 543 544 // we accept that an expectedValue of null also means that the header may be absent 545 if (expectedValue != null) { 546 assertTrue("Exchange " + i + " has no headers", exchange.getIn().hasHeaders()); 547 boolean hasKey = exchange.getIn().getHeaders().containsKey(key); 548 assertTrue("No header with name " + key + " found for message: " + i, hasKey); 549 } 550 551 Object actualValue = exchange.getIn().getHeader(key); 552 actualValue = extractActualValue(exchange, actualValue, expectedValue); 553 554 assertEquals("Header with name " + key + " for message: " + i, expectedValue, actualValue); 555 } 556 } 557 } 558 }); 559 } 560 expectedHeaderValues.put(name, value); 561 } 562 563 /** 564 * Adds an expectation that the given header values are received by this 565 * endpoint in any order. 566 * <p/> 567 * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then 568 * there must be 3 values. 569 * <p/> 570 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 571 */ 572 public void expectedHeaderValuesReceivedInAnyOrder(final String name, final List<?> values) { 573 expectedMessageCount(values.size()); 574 575 expects(new Runnable() { 576 public void run() { 577 // these are the expected values to find 578 final Set<Object> actualHeaderValues = new CopyOnWriteArraySet<>(values); 579 580 for (int i = 0; i < getReceivedExchanges().size(); i++) { 581 Exchange exchange = getReceivedExchange(i); 582 583 Object actualValue = exchange.getIn().getHeader(name); 584 for (Object expectedValue : actualHeaderValues) { 585 actualValue = extractActualValue(exchange, actualValue, expectedValue); 586 // remove any found values 587 actualHeaderValues.remove(actualValue); 588 } 589 } 590 591 // should be empty, as we should find all the values 592 assertTrue("Expected " + values.size() + " headers with key[" + name + "], received " + (values.size() - actualHeaderValues.size()) 593 + " headers. Expected header values: " + actualHeaderValues, actualHeaderValues.isEmpty()); 594 } 595 }); 596 } 597 598 /** 599 * Adds an expectation that the given header values are received by this 600 * endpoint in any order 601 * <p/> 602 * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then 603 * there must be 3 values. 604 * <p/> 605 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 606 */ 607 public void expectedHeaderValuesReceivedInAnyOrder(String name, Object... values) { 608 List<Object> valueList = new ArrayList<>(); 609 valueList.addAll(Arrays.asList(values)); 610 expectedHeaderValuesReceivedInAnyOrder(name, valueList); 611 } 612 613 /** 614 * Sets an expectation that the given property name & value are received by this endpoint 615 * <p/> 616 * You can set multiple expectations for different property names. 617 * If you set a value of <tt>null</tt> that means we accept either the property is absent, or its value is <tt>null</tt> 618 */ 619 public void expectedPropertyReceived(final String name, final Object value) { 620 if (expectedPropertyValues == null) { 621 expectedPropertyValues = new HashMap<>(); 622 } 623 expectedPropertyValues.put(name, value); 624 625 expects(new Runnable() { 626 public void run() { 627 for (int i = 0; i < getReceivedExchanges().size(); i++) { 628 Exchange exchange = getReceivedExchange(i); 629 for (Map.Entry<String, Object> entry : expectedPropertyValues.entrySet()) { 630 String key = entry.getKey(); 631 Object expectedValue = entry.getValue(); 632 633 // we accept that an expectedValue of null also means that the property may be absent 634 if (expectedValue != null) { 635 assertTrue("Exchange " + i + " has no properties", !exchange.getProperties().isEmpty()); 636 boolean hasKey = exchange.getProperties().containsKey(key); 637 assertTrue("No property with name " + key + " found for message: " + i, hasKey); 638 } 639 640 Object actualValue = exchange.getProperty(key); 641 actualValue = extractActualValue(exchange, actualValue, expectedValue); 642 643 assertEquals("Property with name " + key + " for message: " + i, expectedValue, actualValue); 644 } 645 } 646 } 647 }); 648 } 649 650 /** 651 * Adds an expectation that the given property values are received by this 652 * endpoint in any order. 653 * <p/> 654 * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then 655 * there must be 3 values. 656 * <p/> 657 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 658 */ 659 public void expectedPropertyValuesReceivedInAnyOrder(final String name, final List<?> values) { 660 expectedMessageCount(values.size()); 661 662 expects(new Runnable() { 663 public void run() { 664 // these are the expected values to find 665 final Set<Object> actualPropertyValues = new CopyOnWriteArraySet<>(values); 666 667 for (int i = 0; i < getReceivedExchanges().size(); i++) { 668 Exchange exchange = getReceivedExchange(i); 669 670 Object actualValue = exchange.getProperty(name); 671 for (Object expectedValue : actualPropertyValues) { 672 actualValue = extractActualValue(exchange, actualValue, expectedValue); 673 // remove any found values 674 actualPropertyValues.remove(actualValue); 675 } 676 } 677 678 // should be empty, as we should find all the values 679 assertTrue("Expected " + values.size() + " properties with key[" + name + "], received " + (values.size() - actualPropertyValues.size()) 680 + " properties. Expected property values: " + actualPropertyValues, actualPropertyValues.isEmpty()); 681 } 682 }); 683 } 684 685 /** 686 * Adds an expectation that the given property values are received by this 687 * endpoint in any order 688 * <p/> 689 * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then 690 * there must be 3 values. 691 * <p/> 692 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 693 */ 694 public void expectedPropertyValuesReceivedInAnyOrder(String name, Object... values) { 695 List<Object> valueList = new ArrayList<>(); 696 valueList.addAll(Arrays.asList(values)); 697 expectedPropertyValuesReceivedInAnyOrder(name, valueList); 698 } 699 700 /** 701 * Adds an expectation that the given body values are received by this 702 * endpoint in the specified order 703 * <p/> 704 * <b>Important:</b> The number of values must match the expected number of messages, so if you expect 3 messages, then 705 * there must be 3 values. 706 * <p/> 707 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 708 */ 709 public void expectedBodiesReceived(final List<?> bodies) { 710 expectedMessageCount(bodies.size()); 711 this.expectedBodyValues = bodies; 712 this.actualBodyValues = new ArrayList<>(); 713 714 expects(new Runnable() { 715 public void run() { 716 for (int i = 0; i < expectedBodyValues.size(); i++) { 717 Exchange exchange = getReceivedExchange(i); 718 assertTrue("No exchange received for counter: " + i, exchange != null); 719 720 Object expectedBody = expectedBodyValues.get(i); 721 Object actualBody = null; 722 if (i < actualBodyValues.size()) { 723 actualBody = actualBodyValues.get(i); 724 } 725 actualBody = extractActualValue(exchange, actualBody, expectedBody); 726 727 assertEquals("Body of message: " + i, expectedBody, actualBody); 728 } 729 } 730 }); 731 } 732 733 private Object extractActualValue(Exchange exchange, Object actualValue, Object expectedValue) { 734 if (actualValue == null) { 735 return null; 736 } 737 738 if (actualValue instanceof Expression) { 739 Class clazz = Object.class; 740 if (expectedValue != null) { 741 clazz = expectedValue.getClass(); 742 } 743 actualValue = ((Expression)actualValue).evaluate(exchange, clazz); 744 } else if (actualValue instanceof Predicate) { 745 actualValue = ((Predicate)actualValue).matches(exchange); 746 } else if (expectedValue != null) { 747 String from = actualValue.getClass().getName(); 748 String to = expectedValue.getClass().getName(); 749 actualValue = getCamelContext().getTypeConverter().convertTo(expectedValue.getClass(), exchange, actualValue); 750 assertTrue("There is no type conversion possible from " + from + " to " + to, actualValue != null); 751 } 752 return actualValue; 753 } 754 755 /** 756 * Sets an expectation that the given predicates matches the received messages by this endpoint 757 */ 758 public void expectedMessagesMatches(Predicate... predicates) { 759 for (int i = 0; i < predicates.length; i++) { 760 final int messageIndex = i; 761 final Predicate predicate = predicates[i]; 762 final AssertionClause clause = new AssertionClause(this) { 763 public void run() { 764 addPredicate(predicate); 765 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex)); 766 } 767 }; 768 expects(clause); 769 } 770 } 771 772 /** 773 * Sets an expectation that the given body values are received by this endpoint 774 * <p/> 775 * <b>Important:</b> The number of bodies must match the expected number of messages, so if you expect 3 messages, then 776 * there must be 3 bodies. 777 * <p/> 778 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 779 */ 780 public void expectedBodiesReceived(Object... bodies) { 781 List<Object> bodyList = new ArrayList<>(); 782 bodyList.addAll(Arrays.asList(bodies)); 783 expectedBodiesReceived(bodyList); 784 } 785 786 /** 787 * Adds an expectation that the given body value are received by this endpoint 788 */ 789 public AssertionClause expectedBodyReceived() { 790 expectedMessageCount(1); 791 final AssertionClause clause = new AssertionClause(this) { 792 public void run() { 793 Exchange exchange = getReceivedExchange(0); 794 assertTrue("No exchange received for counter: " + 0, exchange != null); 795 796 Object actualBody = exchange.getIn().getBody(); 797 Expression exp = createExpression(getCamelContext()); 798 Object expectedBody = exp.evaluate(exchange, Object.class); 799 800 assertEquals("Body of message: " + 0, expectedBody, actualBody); 801 } 802 }; 803 expects(clause); 804 return clause; 805 } 806 807 /** 808 * Adds an expectation that the given body values are received by this 809 * endpoint in any order 810 * <p/> 811 * <b>Important:</b> The number of bodies must match the expected number of messages, so if you expect 3 messages, then 812 * there must be 3 bodies. 813 * <p/> 814 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 815 */ 816 public void expectedBodiesReceivedInAnyOrder(final List<?> bodies) { 817 expectedMessageCount(bodies.size()); 818 this.expectedBodyValues = bodies; 819 this.actualBodyValues = new ArrayList<>(); 820 821 expects(new Runnable() { 822 public void run() { 823 List<Object> actualBodyValuesSet = new ArrayList<>(actualBodyValues); 824 for (int i = 0; i < expectedBodyValues.size(); i++) { 825 Exchange exchange = getReceivedExchange(i); 826 assertTrue("No exchange received for counter: " + i, exchange != null); 827 828 Object expectedBody = expectedBodyValues.get(i); 829 assertTrue("Message with body " + expectedBody + " was expected but not found in " + actualBodyValuesSet, actualBodyValuesSet.remove(expectedBody)); 830 } 831 } 832 }); 833 } 834 835 /** 836 * Adds an expectation that the given body values are received by this 837 * endpoint in any order 838 * <p/> 839 * <b>Important:</b> The number of bodies must match the expected number of messages, so if you expect 3 messages, then 840 * there must be 3 bodies. 841 * <p/> 842 * <b>Important:</b> This overrides any previous set value using {@link #expectedMessageCount(int)} 843 */ 844 public void expectedBodiesReceivedInAnyOrder(Object... bodies) { 845 List<Object> bodyList = new ArrayList<>(); 846 bodyList.addAll(Arrays.asList(bodies)); 847 expectedBodiesReceivedInAnyOrder(bodyList); 848 } 849 850 /** 851 * Adds an expectation that a file exists with the given name 852 * 853 * @param name name of file, will cater for / and \ on different OS platforms 854 */ 855 public void expectedFileExists(final String name) { 856 expectedFileExists(name, null); 857 } 858 859 /** 860 * Adds an expectation that a file exists with the given name 861 * <p/> 862 * Will wait at most 5 seconds while checking for the existence of the file. 863 * 864 * @param name name of file, will cater for / and \ on different OS platforms 865 * @param content content of file to compare, can be <tt>null</tt> to not compare content 866 */ 867 public void expectedFileExists(final String name, final String content) { 868 final File file = new File(FileUtil.normalizePath(name)); 869 870 expects(new Runnable() { 871 public void run() { 872 // wait at most 5 seconds for the file to exists 873 final long timeout = System.currentTimeMillis() + 5000; 874 875 boolean stop = false; 876 while (!stop && !file.exists()) { 877 try { 878 Thread.sleep(50); 879 } catch (InterruptedException e) { 880 // ignore 881 } 882 stop = System.currentTimeMillis() > timeout; 883 } 884 885 assertTrue("The file should exists: " + name, file.exists()); 886 887 if (content != null) { 888 String body = getCamelContext().getTypeConverter().convertTo(String.class, file); 889 assertEquals("Content of file: " + name, content, body); 890 } 891 } 892 }); 893 } 894 895 /** 896 * Adds an expectation that messages received should have the given exchange pattern 897 */ 898 public void expectedExchangePattern(final ExchangePattern exchangePattern) { 899 expectedMessagesMatches(new Predicate() { 900 public boolean matches(Exchange exchange) { 901 return exchange.getPattern().equals(exchangePattern); 902 } 903 }); 904 } 905 906 /** 907 * Adds an expectation that messages received should have ascending values 908 * of the given expression such as a user generated counter value 909 */ 910 public void expectsAscending(final Expression expression) { 911 expects(new Runnable() { 912 public void run() { 913 assertMessagesAscending(expression); 914 } 915 }); 916 } 917 918 /** 919 * Adds an expectation that messages received should have ascending values 920 * of the given expression such as a user generated counter value 921 */ 922 public AssertionClause expectsAscending() { 923 final AssertionClause clause = new AssertionClause(this) { 924 public void run() { 925 assertMessagesAscending(createExpression(getCamelContext())); 926 } 927 }; 928 expects(clause); 929 return clause; 930 } 931 932 /** 933 * Adds an expectation that messages received should have descending values 934 * of the given expression such as a user generated counter value 935 */ 936 public void expectsDescending(final Expression expression) { 937 expects(new Runnable() { 938 public void run() { 939 assertMessagesDescending(expression); 940 } 941 }); 942 } 943 944 /** 945 * Adds an expectation that messages received should have descending values 946 * of the given expression such as a user generated counter value 947 */ 948 public AssertionClause expectsDescending() { 949 final AssertionClause clause = new AssertionClause(this) { 950 public void run() { 951 assertMessagesDescending(createExpression(getCamelContext())); 952 } 953 }; 954 expects(clause); 955 return clause; 956 } 957 958 /** 959 * Adds an expectation that no duplicate messages should be received using 960 * the expression to determine the message ID 961 * 962 * @param expression the expression used to create a unique message ID for 963 * message comparison (which could just be the message 964 * payload if the payload can be tested for uniqueness using 965 * {@link Object#equals(Object)} and 966 * {@link Object#hashCode()} 967 */ 968 public void expectsNoDuplicates(final Expression expression) { 969 expects(new Runnable() { 970 public void run() { 971 assertNoDuplicates(expression); 972 } 973 }); 974 } 975 976 /** 977 * Adds an expectation that no duplicate messages should be received using 978 * the expression to determine the message ID 979 */ 980 public AssertionClause expectsNoDuplicates() { 981 final AssertionClause clause = new AssertionClause(this) { 982 public void run() { 983 assertNoDuplicates(createExpression(getCamelContext())); 984 } 985 }; 986 expects(clause); 987 return clause; 988 } 989 990 /** 991 * Asserts that the messages have ascending values of the given expression 992 */ 993 public void assertMessagesAscending(Expression expression) { 994 assertMessagesSorted(expression, true); 995 } 996 997 /** 998 * Asserts that the messages have descending values of the given expression 999 */ 1000 public void assertMessagesDescending(Expression expression) { 1001 assertMessagesSorted(expression, false); 1002 } 1003 1004 protected void assertMessagesSorted(Expression expression, boolean ascending) { 1005 String type = ascending ? "ascending" : "descending"; 1006 ExpressionComparator comparator = new ExpressionComparator(expression); 1007 List<Exchange> list = getReceivedExchanges(); 1008 for (int i = 1; i < list.size(); i++) { 1009 int j = i - 1; 1010 Exchange e1 = list.get(j); 1011 Exchange e2 = list.get(i); 1012 int result = comparator.compare(e1, e2); 1013 if (result == 0) { 1014 fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " 1015 + expression.evaluate(e1, Object.class) + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2); 1016 } else { 1017 if (!ascending) { 1018 result = result * -1; 1019 } 1020 if (result > 0) { 1021 fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1, Object.class) 1022 + " and message " + i + " has value: " + expression.evaluate(e2, Object.class) + " for expression: " 1023 + expression + ". Exchanges: " + e1 + " and " + e2); 1024 } 1025 } 1026 } 1027 } 1028 1029 public void assertNoDuplicates(Expression expression) { 1030 Map<Object, Exchange> map = new HashMap<>(); 1031 List<Exchange> list = getReceivedExchanges(); 1032 for (int i = 0; i < list.size(); i++) { 1033 Exchange e2 = list.get(i); 1034 Object key = expression.evaluate(e2, Object.class); 1035 Exchange e1 = map.get(key); 1036 if (e1 != null) { 1037 fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2); 1038 } else { 1039 map.put(key, e2); 1040 } 1041 } 1042 } 1043 1044 /** 1045 * Adds the expectation which will be invoked when enough messages are received 1046 */ 1047 public void expects(Runnable runnable) { 1048 tests.add(runnable); 1049 } 1050 1051 /** 1052 * Adds an assertion to the given message index 1053 * 1054 * @param messageIndex the number of the message 1055 * @return the assertion clause 1056 */ 1057 public AssertionClause message(final int messageIndex) { 1058 final AssertionClause clause = new AssertionClause(this) { 1059 public void run() { 1060 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex)); 1061 } 1062 }; 1063 expects(clause); 1064 return clause; 1065 } 1066 1067 /** 1068 * Adds an assertion to all the received messages 1069 * 1070 * @return the assertion clause 1071 */ 1072 public AssertionClause allMessages() { 1073 final AssertionClause clause = new AssertionClause(this) { 1074 public void run() { 1075 List<Exchange> list = getReceivedExchanges(); 1076 int index = 0; 1077 for (Exchange exchange : list) { 1078 applyAssertionOn(MockEndpoint.this, index++, exchange); 1079 } 1080 } 1081 }; 1082 expects(clause); 1083 return clause; 1084 } 1085 1086 /** 1087 * Asserts that the given index of message is received (starting at zero) 1088 */ 1089 public Exchange assertExchangeReceived(int index) { 1090 int count = getReceivedCounter(); 1091 assertTrue("Not enough messages received. Was: " + count, count > index); 1092 return getReceivedExchange(index); 1093 } 1094 1095 // Properties 1096 // ------------------------------------------------------------------------- 1097 1098 public String getName() { 1099 return name; 1100 } 1101 1102 public void setName(String name) { 1103 this.name = name; 1104 } 1105 1106 public List<Throwable> getFailures() { 1107 return failures; 1108 } 1109 1110 public int getReceivedCounter() { 1111 return counter; 1112 } 1113 1114 public List<Exchange> getReceivedExchanges() { 1115 return receivedExchanges; 1116 } 1117 1118 public int getExpectedCount() { 1119 return expectedCount; 1120 } 1121 1122 public long getSleepForEmptyTest() { 1123 return sleepForEmptyTest; 1124 } 1125 1126 /** 1127 * Allows a sleep to be specified to wait to check that this endpoint really 1128 * is empty when {@link #expectedMessageCount(int)} is called with zero 1129 * 1130 * @param sleepForEmptyTest the milliseconds to sleep for to determine that 1131 * this endpoint really is empty 1132 */ 1133 public void setSleepForEmptyTest(long sleepForEmptyTest) { 1134 this.sleepForEmptyTest = sleepForEmptyTest; 1135 } 1136 1137 public long getResultWaitTime() { 1138 return resultWaitTime; 1139 } 1140 1141 /** 1142 * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will 1143 * wait on a latch until it is satisfied 1144 */ 1145 public void setResultWaitTime(long resultWaitTime) { 1146 this.resultWaitTime = resultWaitTime; 1147 } 1148 1149 /** 1150 * Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will 1151 * wait on a latch until it is satisfied 1152 */ 1153 public void setResultMinimumWaitTime(long resultMinimumWaitTime) { 1154 this.resultMinimumWaitTime = resultMinimumWaitTime; 1155 } 1156 1157 /** 1158 * @deprecated use {@link #setResultMinimumWaitTime(long)} 1159 */ 1160 @Deprecated 1161 public void setMinimumResultWaitTime(long resultMinimumWaitTime) { 1162 setResultMinimumWaitTime(resultMinimumWaitTime); 1163 } 1164 1165 /** 1166 * Specifies the expected number of message exchanges that should be 1167 * received by this endpoint. 1168 * <p/> 1169 * <b>Beware:</b> If you want to expect that <tt>0</tt> messages, then take extra care, 1170 * as <tt>0</tt> matches when the tests starts, so you need to set a assert period time 1171 * to let the test run for a while to make sure there are still no messages arrived; for 1172 * that use {@link #setAssertPeriod(long)}. 1173 * An alternative is to use <a href="http://camel.apache.org/notifybuilder.html">NotifyBuilder</a>, and use the notifier 1174 * to know when Camel is done routing some messages, before you call the {@link #assertIsSatisfied()} method on the mocks. 1175 * This allows you to not use a fixed assert period, to speedup testing times. 1176 * <p/> 1177 * If you want to assert that <b>exactly</b> n'th message arrives to this mock 1178 * endpoint, then see also the {@link #setAssertPeriod(long)} method for further details. 1179 * 1180 * @param expectedCount the number of message exchanges that should be 1181 * expected by this endpoint 1182 * @see #setAssertPeriod(long) 1183 */ 1184 public void setExpectedCount(int expectedCount) { 1185 setExpectedMessageCount(expectedCount); 1186 } 1187 1188 /** 1189 * @see #setExpectedCount(int) 1190 */ 1191 public void setExpectedMessageCount(int expectedCount) { 1192 this.expectedCount = expectedCount; 1193 if (expectedCount <= 0) { 1194 latch = null; 1195 } else { 1196 latch = new CountDownLatch(expectedCount); 1197 } 1198 } 1199 1200 /** 1201 * Specifies the minimum number of expected message exchanges that should be 1202 * received by this endpoint 1203 * 1204 * @param expectedCount the number of message exchanges that should be 1205 * expected by this endpoint 1206 */ 1207 public void setMinimumExpectedMessageCount(int expectedCount) { 1208 this.expectedMinimumCount = expectedCount; 1209 if (expectedCount <= 0) { 1210 latch = null; 1211 } else { 1212 latch = new CountDownLatch(expectedMinimumCount); 1213 } 1214 } 1215 1216 public Processor getReporter() { 1217 return reporter; 1218 } 1219 1220 /** 1221 * Allows a processor to added to the endpoint to report on progress of the test 1222 */ 1223 public void setReporter(Processor reporter) { 1224 this.reporter = reporter; 1225 } 1226 1227 /** 1228 * Specifies to only retain the first n'th number of received {@link Exchange}s. 1229 * <p/> 1230 * This is used when testing with big data, to reduce memory consumption by not storing 1231 * copies of every {@link Exchange} this mock endpoint receives. 1232 * <p/> 1233 * <b>Important:</b> When using this limitation, then the {@link #getReceivedCounter()} 1234 * will still return the actual number of received {@link Exchange}s. For example 1235 * if we have received 5000 {@link Exchange}s, and have configured to only retain the first 1236 * 10 {@link Exchange}s, then the {@link #getReceivedCounter()} will still return <tt>5000</tt> 1237 * but there is only the first 10 {@link Exchange}s in the {@link #getExchanges()} and 1238 * {@link #getReceivedExchanges()} methods. 1239 * <p/> 1240 * When using this method, then some of the other expectation methods is not supported, 1241 * for example the {@link #expectedBodiesReceived(Object...)} sets a expectation on the first 1242 * number of bodies received. 1243 * <p/> 1244 * You can configure both {@link #setRetainFirst(int)} and {@link #setRetainLast(int)} methods, 1245 * to limit both the first and last received. 1246 * 1247 * @param retainFirst to limit and only keep the first n'th received {@link Exchange}s, use 1248 * <tt>0</tt> to not retain any messages, or <tt>-1</tt> to retain all. 1249 * @see #setRetainLast(int) 1250 */ 1251 public void setRetainFirst(int retainFirst) { 1252 this.retainFirst = retainFirst; 1253 } 1254 1255 /** 1256 * Specifies to only retain the last n'th number of received {@link Exchange}s. 1257 * <p/> 1258 * This is used when testing with big data, to reduce memory consumption by not storing 1259 * copies of every {@link Exchange} this mock endpoint receives. 1260 * <p/> 1261 * <b>Important:</b> When using this limitation, then the {@link #getReceivedCounter()} 1262 * will still return the actual number of received {@link Exchange}s. For example 1263 * if we have received 5000 {@link Exchange}s, and have configured to only retain the last 1264 * 20 {@link Exchange}s, then the {@link #getReceivedCounter()} will still return <tt>5000</tt> 1265 * but there is only the last 20 {@link Exchange}s in the {@link #getExchanges()} and 1266 * {@link #getReceivedExchanges()} methods. 1267 * <p/> 1268 * When using this method, then some of the other expectation methods is not supported, 1269 * for example the {@link #expectedBodiesReceived(Object...)} sets a expectation on the first 1270 * number of bodies received. 1271 * <p/> 1272 * You can configure both {@link #setRetainFirst(int)} and {@link #setRetainLast(int)} methods, 1273 * to limit both the first and last received. 1274 * 1275 * @param retainLast to limit and only keep the last n'th received {@link Exchange}s, use 1276 * <tt>0</tt> to not retain any messages, or <tt>-1</tt> to retain all. 1277 * @see #setRetainFirst(int) 1278 */ 1279 public void setRetainLast(int retainLast) { 1280 this.retainLast = retainLast; 1281 } 1282 1283 public int isReportGroup() { 1284 return reportGroup; 1285 } 1286 1287 /** 1288 * A number that is used to turn on throughput logging based on groups of the size. 1289 */ 1290 public void setReportGroup(int reportGroup) { 1291 this.reportGroup = reportGroup; 1292 } 1293 1294 public boolean isCopyOnExchange() { 1295 return copyOnExchange; 1296 } 1297 1298 /** 1299 * Sets whether to make a deep copy of the incoming {@link Exchange} when received at this mock endpoint. 1300 * <p/> 1301 * Is by default <tt>true</tt>. 1302 */ 1303 public void setCopyOnExchange(boolean copyOnExchange) { 1304 this.copyOnExchange = copyOnExchange; 1305 } 1306 1307 // Implementation methods 1308 // ------------------------------------------------------------------------- 1309 private void init() { 1310 expectedCount = -1; 1311 counter = 0; 1312 defaultProcessor = null; 1313 processors = new HashMap<>(); 1314 receivedExchanges = new CopyOnWriteArrayList<>(); 1315 failures = new CopyOnWriteArrayList<>(); 1316 tests = new CopyOnWriteArrayList<>(); 1317 latch = null; 1318 sleepForEmptyTest = 0; 1319 resultWaitTime = 0; 1320 resultMinimumWaitTime = 0L; 1321 assertPeriod = 0L; 1322 expectedMinimumCount = -1; 1323 expectedBodyValues = null; 1324 actualBodyValues = new ArrayList<>(); 1325 expectedHeaderValues = null; 1326 actualHeaderValues = null; 1327 expectedPropertyValues = null; 1328 actualPropertyValues = null; 1329 retainFirst = -1; 1330 retainLast = -1; 1331 } 1332 1333 protected synchronized void onExchange(Exchange exchange) { 1334 try { 1335 if (reporter != null) { 1336 reporter.process(exchange); 1337 } 1338 Exchange copy = exchange; 1339 if (copyOnExchange) { 1340 // copy the exchange so the mock stores the copy and not the actual exchange 1341 copy = ExchangeHelper.createCopy(exchange, true); 1342 } 1343 performAssertions(exchange, copy); 1344 } catch (Throwable e) { 1345 // must catch java.lang.Throwable as AssertionError extends java.lang.Error 1346 failures.add(e); 1347 } finally { 1348 // make sure latch is counted down to avoid test hanging forever 1349 if (latch != null) { 1350 latch.countDown(); 1351 } 1352 } 1353 } 1354 1355 /** 1356 * Performs the assertions on the incoming exchange. 1357 * 1358 * @param exchange the actual exchange 1359 * @param copy a copy of the exchange (only store this) 1360 * @throws Exception can be thrown if something went wrong 1361 */ 1362 protected void performAssertions(Exchange exchange, Exchange copy) throws Exception { 1363 Message in = copy.getIn(); 1364 Object actualBody = in.getBody(); 1365 1366 if (expectedHeaderValues != null) { 1367 if (actualHeaderValues == null) { 1368 actualHeaderValues = getCamelContext().getHeadersMapFactory().newMap(); 1369 } 1370 if (in.hasHeaders()) { 1371 actualHeaderValues.putAll(in.getHeaders()); 1372 } 1373 } 1374 1375 if (expectedPropertyValues != null) { 1376 if (actualPropertyValues == null) { 1377 actualPropertyValues = getCamelContext().getHeadersMapFactory().newMap(); 1378 } 1379 actualPropertyValues.putAll(copy.getProperties()); 1380 } 1381 1382 if (expectedBodyValues != null) { 1383 int index = actualBodyValues.size(); 1384 if (expectedBodyValues.size() > index) { 1385 Object expectedBody = expectedBodyValues.get(index); 1386 if (expectedBody != null) { 1387 // prefer to convert body early, for example when using files 1388 // we need to read the content at this time 1389 Object body = in.getBody(expectedBody.getClass()); 1390 if (body != null) { 1391 actualBody = body; 1392 } 1393 } 1394 actualBodyValues.add(actualBody); 1395 } 1396 } 1397 1398 // let counter be 0 index-based in the logs 1399 if (LOG.isDebugEnabled()) { 1400 String msg = getEndpointUri() + " >>>> " + counter + " : " + copy + " with body: " + actualBody; 1401 if (copy.getIn().hasHeaders()) { 1402 msg += " and headers:" + copy.getIn().getHeaders(); 1403 } 1404 LOG.debug(msg); 1405 } 1406 1407 // record timestamp when exchange was received 1408 copy.setProperty(Exchange.RECEIVED_TIMESTAMP, new Date()); 1409 1410 // add a copy of the received exchange 1411 addReceivedExchange(copy); 1412 // and then increment counter after adding received exchange 1413 ++counter; 1414 1415 Processor processor = processors.get(getReceivedCounter()) != null 1416 ? processors.get(getReceivedCounter()) : defaultProcessor; 1417 1418 if (processor != null) { 1419 try { 1420 // must process the incoming exchange and NOT the copy as the idea 1421 // is the end user can manipulate the exchange 1422 processor.process(exchange); 1423 } catch (Exception e) { 1424 // set exceptions on exchange so we can throw exceptions to simulate errors 1425 exchange.setException(e); 1426 } 1427 } 1428 } 1429 1430 /** 1431 * Adds the received exchange. 1432 * 1433 * @param copy a copy of the received exchange 1434 */ 1435 protected void addReceivedExchange(Exchange copy) { 1436 if (retainFirst == 0 && retainLast == 0) { 1437 // do not retain any messages at all 1438 } else if (retainFirst < 0 && retainLast < 0) { 1439 // no limitation so keep them all 1440 receivedExchanges.add(copy); 1441 } else { 1442 // okay there is some sort of limitations, so figure out what to retain 1443 if (retainFirst > 0 && counter < retainFirst) { 1444 // store a copy as its within the retain first limitation 1445 receivedExchanges.add(copy); 1446 } else if (retainLast > 0) { 1447 // remove the oldest from the last retained boundary, 1448 int index = receivedExchanges.size() - retainLast; 1449 if (index >= 0) { 1450 // but must be outside the first range as well 1451 // otherwise we should not remove the oldest 1452 if (retainFirst <= 0 || retainFirst <= index) { 1453 receivedExchanges.remove(index); 1454 } 1455 } 1456 // store a copy of the last n'th received 1457 receivedExchanges.add(copy); 1458 } 1459 } 1460 } 1461 1462 protected void waitForCompleteLatch() throws InterruptedException { 1463 if (latch == null) { 1464 fail("Should have a latch!"); 1465 } 1466 1467 StopWatch watch = new StopWatch(); 1468 waitForCompleteLatch(resultWaitTime); 1469 long delta = watch.taken(); 1470 LOG.debug("Took {} millis to complete latch", delta); 1471 1472 if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) { 1473 fail("Expected minimum " + resultMinimumWaitTime 1474 + " millis waiting on the result, but was faster with " + delta + " millis."); 1475 } 1476 } 1477 1478 protected void waitForCompleteLatch(long timeout) throws InterruptedException { 1479 // Wait for a default 10 seconds if resultWaitTime is not set 1480 long waitTime = timeout == 0 ? 10000L : timeout; 1481 1482 // now let's wait for the results 1483 LOG.debug("Waiting on the latch for: {} millis", timeout); 1484 latch.await(waitTime, TimeUnit.MILLISECONDS); 1485 } 1486 1487 protected void assertEquals(String message, Object expectedValue, Object actualValue) { 1488 if (!ObjectHelper.equal(expectedValue, actualValue)) { 1489 fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">"); 1490 } 1491 } 1492 1493 protected void assertTrue(String message, boolean predicate) { 1494 if (!predicate) { 1495 fail(message); 1496 } 1497 } 1498 1499 protected void fail(Object message) { 1500 if (LOG.isDebugEnabled()) { 1501 List<Exchange> list = getReceivedExchanges(); 1502 int index = 0; 1503 for (Exchange exchange : list) { 1504 LOG.debug("{} failed and received[{}]: {}", getEndpointUri(), ++index, exchange); 1505 } 1506 } 1507 throw new AssertionError(getEndpointUri() + " " + message); 1508 } 1509 1510 public int getExpectedMinimumCount() { 1511 return expectedMinimumCount; 1512 } 1513 1514 public void await() throws InterruptedException { 1515 if (latch != null) { 1516 latch.await(); 1517 } 1518 } 1519 1520 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 1521 if (latch != null) { 1522 return latch.await(timeout, unit); 1523 } 1524 return true; 1525 } 1526 1527 public boolean isSingleton() { 1528 return true; 1529 } 1530 1531 public boolean isLenientProperties() { 1532 return true; 1533 } 1534 1535 private Exchange getReceivedExchange(int index) { 1536 if (index <= receivedExchanges.size() - 1) { 1537 return receivedExchanges.get(index); 1538 } else { 1539 return null; 1540 } 1541 } 1542 1543}