001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.builder;
018    
019    import java.util.ArrayList;
020    import java.util.Arrays;
021    import java.util.EventObject;
022    import java.util.List;
023    import java.util.concurrent.CountDownLatch;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.atomic.AtomicBoolean;
026    import java.util.concurrent.atomic.AtomicInteger;
027    
028    import org.apache.camel.CamelContext;
029    import org.apache.camel.Endpoint;
030    import org.apache.camel.Exchange;
031    import org.apache.camel.Expression;
032    import org.apache.camel.Predicate;
033    import org.apache.camel.Producer;
034    import org.apache.camel.component.direct.DirectEndpoint;
035    import org.apache.camel.component.mock.MockEndpoint;
036    import org.apache.camel.management.event.ExchangeCompletedEvent;
037    import org.apache.camel.management.event.ExchangeCreatedEvent;
038    import org.apache.camel.management.event.ExchangeFailedEvent;
039    import org.apache.camel.management.event.ExchangeSentEvent;
040    import org.apache.camel.spi.EventNotifier;
041    import org.apache.camel.support.EventNotifierSupport;
042    import org.apache.camel.util.EndpointHelper;
043    import org.apache.camel.util.ObjectHelper;
044    import org.apache.camel.util.ServiceHelper;
045    
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * A builder to build an expression based on {@link org.apache.camel.spi.EventNotifier} notifications
051     * about {@link Exchange} being routed.
052     * <p/>
053     * This builder can be used for testing purposes where you want to know when a test is supposed to be done.
054     * The idea is that you can build an expression that explains when the test is done. For example when Camel
055     * have finished routing 5 messages. You can then in your test await for this condition to occur.
056     *
057     * @version 
058     */
059    public class NotifyBuilder {
060    
061        private static final Logger LOG = LoggerFactory.getLogger(NotifyBuilder.class);
062    
063        private final CamelContext context;
064    
065        // notifier to hook into Camel to listen for events
066        private final EventNotifier eventNotifier;
067    
068        // the predicates build with this builder
069        private final List<EventPredicateHolder> predicates = new ArrayList<EventPredicateHolder>();
070    
071        // latch to be used to signal predicates matches
072        private CountDownLatch latch = new CountDownLatch(1);
073    
074        // the current state while building an event predicate where we use a stack and the operation
075        private final List<EventPredicate> stack = new ArrayList<EventPredicate>();
076        private EventOperation operation;
077        private boolean created;
078        // keep state of how many wereSentTo we have added
079        private int wereSentToIndex;
080    
081        // computed value whether all the predicates matched
082        private volatile boolean matches;
083    
084        /**
085         * Creates a new builder.
086         *
087         * @param context the Camel context
088         */
089        public NotifyBuilder(CamelContext context) {
090            this.context = context;
091            eventNotifier = new ExchangeNotifier();
092            try {
093                ServiceHelper.startService(eventNotifier);
094            } catch (Exception e) {
095                throw ObjectHelper.wrapRuntimeCamelException(e);
096            }
097            context.getManagementStrategy().addEventNotifier(eventNotifier);
098        }
099    
100        /**
101         * Optionally a <tt>from</tt> endpoint which means that this expression should only be based
102         * on {@link Exchange} which is originated from the particular endpoint(s).
103         *
104         * @param endpointUri uri of endpoint or pattern (see the EndpointHelper javadoc)
105         * @return the builder
106         * @see org.apache.camel.util.EndpointHelper#matchEndpoint(org.apache.camel.CamelContext, String, String)
107         */
108        public NotifyBuilder from(final String endpointUri) {
109            stack.add(new EventPredicateSupport() {
110    
111                @Override
112                public boolean isAbstract() {
113                    // is abstract as its a filter
114                    return true;
115                }
116    
117                @Override
118                public boolean onExchange(Exchange exchange) {
119                    // filter non matching exchanges
120                    return EndpointHelper.matchEndpoint(context, exchange.getFromEndpoint().getEndpointUri(), endpointUri);
121                }
122    
123                public boolean matches() {
124                    // should be true as we use the onExchange to filter
125                    return true;
126                }
127    
128                @Override
129                public String toString() {
130                    return "from(" + endpointUri + ")";
131                }
132            });
133            return this;
134        }
135    
136        /**
137         * Optionally a <tt>from</tt> route which means that this expression should only be based
138         * on {@link Exchange} which is originated from the particular route(s).
139         *
140         * @param routeId id of route or pattern (see the EndpointHelper javadoc)
141         * @return the builder
142         * @see org.apache.camel.util.EndpointHelper#matchEndpoint(org.apache.camel.CamelContext, String, String)
143         */
144        public NotifyBuilder fromRoute(final String routeId) {
145            stack.add(new EventPredicateSupport() {
146    
147                @Override
148                public boolean isAbstract() {
149                    // is abstract as its a filter
150                    return true;
151                }
152    
153                @Override
154                public boolean onExchange(Exchange exchange) {
155                    String id = EndpointHelper.getRouteIdFromEndpoint(exchange.getFromEndpoint());
156                    // filter non matching exchanges
157                    return EndpointHelper.matchPattern(id, routeId);
158                }
159    
160                public boolean matches() {
161                    // should be true as we use the onExchange to filter
162                    return true;
163                }
164    
165                @Override
166                public String toString() {
167                    return "fromRoute(" + routeId + ")";
168                }
169            });
170            return this;
171        }
172    
173        private NotifyBuilder fromRoutesOnly() {
174            // internal and should always be in top of stack
175            stack.add(0, new EventPredicateSupport() {
176    
177                @Override
178                public boolean isAbstract() {
179                    // is abstract as its a filter
180                    return true;
181                }
182    
183                @Override
184                public boolean onExchange(Exchange exchange) {
185                    // always accept direct endpoints as they are a special case as it will create the UoW beforehand
186                    // and just continue to route that on the consumer side, which causes the EventNotifier not to
187                    // emit events when the consumer received the exchange, as its already done. For example by
188                    // ProducerTemplate which creates the UoW before producing messages.
189                    if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint() instanceof DirectEndpoint) {
190                        return true;
191                    }
192                    return EndpointHelper.matchPattern(exchange.getFromRouteId(), "*");
193                }
194    
195                public boolean matches() {
196                    // should be true as we use the onExchange to filter
197                    return true;
198                }
199    
200                @Override
201                public String toString() {
202                    // we dont want any to string output as this is an internal predicate to match only from routes
203                    return "";
204                }
205            });
206            return this;
207        }
208    
209        /**
210         * Optionally a filter to only allow matching {@link Exchange} to be used for matching.
211         *
212         * @param predicate the predicate to use for the filter
213         * @return the builder
214         */
215        public NotifyBuilder filter(final Predicate predicate) {
216            stack.add(new EventPredicateSupport() {
217    
218                @Override
219                public boolean isAbstract() {
220                    // is abstract as its a filter
221                    return true;
222                }
223    
224                @Override
225                public boolean onExchange(Exchange exchange) {
226                    // filter non matching exchanges
227                    return predicate.matches(exchange);
228                }
229    
230                public boolean matches() {
231                    // should be true as we use the onExchange to filter
232                    return true;
233                }
234    
235                @Override
236                public String toString() {
237                    return "filter(" + predicate + ")";
238                }
239            });
240            return this;
241        }
242    
243        /**
244         * Optionally a filter to only allow matching {@link Exchange} to be used for matching.
245         *
246         * @return the builder
247         */
248        public ExpressionClauseSupport<NotifyBuilder> filter() {
249            final ExpressionClauseSupport<NotifyBuilder> clause = new ExpressionClauseSupport<NotifyBuilder>(this);
250            stack.add(new EventPredicateSupport() {
251    
252                @Override
253                public boolean isAbstract() {
254                    // is abstract as its a filter
255                    return true;
256                }
257    
258                @Override
259                public boolean onExchange(Exchange exchange) {
260                    // filter non matching exchanges
261                    Expression exp = clause.createExpression(exchange.getContext());
262                    return exp.evaluate(exchange, Boolean.class);
263                }
264    
265                public boolean matches() {
266                    // should be true as we use the onExchange to filter
267                    return true;
268                }
269    
270                @Override
271                public String toString() {
272                    return "filter(" + clause + ")";
273                }
274            });
275            return clause;
276        }
277    
278        /**
279         * Optionally a <tt>sent to</tt> endpoint which means that this expression should only be based
280         * on {@link Exchange} which has been sent to the given endpoint uri.
281         * <p/>
282         * Notice the {@link Exchange} may have been sent to other endpoints as well. This condition will match
283         * if the {@link Exchange} has been sent at least once to the given endpoint.
284         *
285         * @param endpointUri uri of endpoint or pattern (see the EndpointHelper javadoc)
286         * @return the builder
287         * @see org.apache.camel.util.EndpointHelper#matchEndpoint(org.apache.camel.CamelContext, String, String)
288         */
289        public NotifyBuilder wereSentTo(final String endpointUri) {
290            // insert in start of stack but after the previous wereSentTo
291            stack.add(wereSentToIndex++, new EventPredicateSupport() {
292                private AtomicBoolean sentTo = new AtomicBoolean();
293    
294                @Override
295                public boolean isAbstract() {
296                    // is abstract as its a filter
297                    return true;
298                }
299    
300                @Override
301                public boolean onExchangeCreated(Exchange exchange) {
302                    // reset when a new exchange is created
303                    sentTo.set(false);
304                    return onExchange(exchange);
305                }
306    
307                @Override
308                public boolean onExchangeSent(Exchange exchange, Endpoint endpoint, long timeTaken) {
309                    if (EndpointHelper.matchEndpoint(context, endpoint.getEndpointUri(), endpointUri)) {
310                        sentTo.set(true);
311                    }
312                    return onExchange(exchange);
313                }
314    
315                @Override
316                public boolean onExchange(Exchange exchange) {
317                    // filter only when sentTo
318                    return sentTo.get();
319                }
320    
321                public boolean matches() {
322                    // should be true as we use the onExchange to filter
323                    return true;
324                }
325    
326                @Override
327                public void reset() {
328                    sentTo.set(false);
329                }
330    
331                @Override
332                public String toString() {
333                    return "wereSentTo(" + endpointUri + ")";
334                }
335            });
336            return this;
337        }
338    
339        /**
340         * Sets a condition when <tt>number</tt> of {@link Exchange} has been received.
341         * <p/>
342         * The number matching is <i>at least</i> based which means that if more messages received
343         * it will match also.
344         *
345         * @param number at least number of messages
346         * @return the builder
347         */
348        public NotifyBuilder whenReceived(final int number) {
349            stack.add(new EventPredicateSupport() {
350                private AtomicInteger current = new AtomicInteger();
351    
352                @Override
353                public boolean onExchangeCreated(Exchange exchange) {
354                    current.incrementAndGet();
355                    return true;
356                }
357    
358                public boolean matches() {
359                    return current.get() >= number;
360                }
361    
362                @Override
363                public void reset() {
364                    current.set(0);
365                }
366    
367                @Override
368                public String toString() {
369                    return "whenReceived(" + number + ")";
370                }
371            });
372            return this;
373        }
374    
375        /**
376         * Sets a condition when <tt>number</tt> of {@link Exchange} is done being processed.
377         * <p/>
378         * The number matching is <i>at least</i> based which means that if more messages received
379         * it will match also.
380         * <p/>
381         * The difference between <i>done</i> and <i>completed</i> is that done can also include failed
382         * messages, where as completed is only successful processed messages.
383         *
384         * @param number at least number of messages
385         * @return the builder
386         */
387        public NotifyBuilder whenDone(final int number) {
388            stack.add(new EventPredicateSupport() {
389                private final AtomicInteger current = new AtomicInteger();
390    
391                @Override
392                public boolean onExchangeCompleted(Exchange exchange) {
393                    current.incrementAndGet();
394                    return true;
395                }
396    
397                @Override
398                public boolean onExchangeFailed(Exchange exchange) {
399                    current.incrementAndGet();
400                    return true;
401                }
402    
403                public boolean matches() {
404                    return current.get() >= number;
405                }
406    
407                @Override
408                public void reset() {
409                    current.set(0);
410                }
411    
412                @Override
413                public String toString() {
414                    return "whenDone(" + number + ")";
415                }
416            });
417            return this;
418        }
419    
420        /**
421         * Sets a condition when tne <tt>n'th</tt> (by index) {@link Exchange} is done being processed.
422         * <p/>
423         * The difference between <i>done</i> and <i>completed</i> is that done can also include failed
424         * messages, where as completed is only successful processed messages.
425         *
426         * @param index the message by index to be done
427         * @return the builder
428         */
429        public NotifyBuilder whenDoneByIndex(final int index) {
430            stack.add(new EventPredicateSupport() {
431                private AtomicInteger current = new AtomicInteger();
432                private String id;
433                private AtomicBoolean done = new AtomicBoolean();
434    
435                @Override
436                public boolean onExchangeCreated(Exchange exchange) {
437                    if (current.get() == index) {
438                        id = exchange.getExchangeId();
439                    }
440                    current.incrementAndGet();
441                    return true;
442                }
443    
444                @Override
445                public boolean onExchangeCompleted(Exchange exchange) {
446                    if (exchange.getExchangeId().equals(id)) {
447                        done.set(true);
448                    }
449                    return true;
450                }
451    
452                @Override
453                public boolean onExchangeFailed(Exchange exchange) {
454                    if (exchange.getExchangeId().equals(id)) {
455                        done.set(true);
456                    }
457                    return true;
458                }
459    
460                public boolean matches() {
461                    return done.get();
462                }
463    
464                @Override
465                public void reset() {
466                    current.set(0);
467                    id = null;
468                    done.set(false);
469                }
470    
471                @Override
472                public String toString() {
473                    return "whenDoneByIndex(" + index + ")";
474                }
475            });
476            return this;
477        }
478    
479        /**
480         * Sets a condition when <tt>number</tt> of {@link Exchange} has been completed.
481         * <p/>
482         * The number matching is <i>at least</i> based which means that if more messages received
483         * it will match also.
484         * <p/>
485         * The difference between <i>done</i> and <i>completed</i> is that done can also include failed
486         * messages, where as completed is only successful processed messages.
487         *
488         * @param number at least number of messages
489         * @return the builder
490         */
491        public NotifyBuilder whenCompleted(final int number) {
492            stack.add(new EventPredicateSupport() {
493                private AtomicInteger current = new AtomicInteger();
494    
495                @Override
496                public boolean onExchangeCompleted(Exchange exchange) {
497                    current.incrementAndGet();
498                    return true;
499                }
500    
501                public boolean matches() {
502                    return current.get() >= number;
503                }
504    
505                @Override
506                public void reset() {
507                    current.set(0);
508                }
509    
510                @Override
511                public String toString() {
512                    return "whenCompleted(" + number + ")";
513                }
514            });
515            return this;
516        }
517    
518        /**
519         * Sets a condition when <tt>number</tt> of {@link Exchange} has failed.
520         * <p/>
521         * The number matching is <i>at least</i> based which means that if more messages received
522         * it will match also.
523         *
524         * @param number at least number of messages
525         * @return the builder
526         */
527        public NotifyBuilder whenFailed(final int number) {
528            stack.add(new EventPredicateSupport() {
529                private AtomicInteger current = new AtomicInteger();
530    
531                @Override
532                public boolean onExchangeFailed(Exchange exchange) {
533                    current.incrementAndGet();
534                    return true;
535                }
536    
537                public boolean matches() {
538                    return current.get() >= number;
539                }
540    
541                @Override
542                public void reset() {
543                    current.set(0);
544                }
545    
546                @Override
547                public String toString() {
548                    return "whenFailed(" + number + ")";
549                }
550            });
551            return this;
552        }
553    
554        /**
555         * Sets a condition when <tt>number</tt> of {@link Exchange} is done being processed.
556         * <p/>
557         * messages, where as completed is only successful processed messages.
558         *
559         * @param number exactly number of messages
560         * @return the builder
561         */
562        public NotifyBuilder whenExactlyDone(final int number) {
563            stack.add(new EventPredicateSupport() {
564                private AtomicInteger current = new AtomicInteger();
565    
566                @Override
567                public boolean onExchangeCompleted(Exchange exchange) {
568                    current.incrementAndGet();
569                    return true;
570                }
571    
572                @Override
573                public boolean onExchangeFailed(Exchange exchange) {
574                    current.incrementAndGet();
575                    return true;
576                }
577    
578                public boolean matches() {
579                    return current.get() == number;
580                }
581    
582                @Override
583                public void reset() {
584                    current.set(0);
585                }
586    
587                @Override
588                public String toString() {
589                    return "whenExactlyDone(" + number + ")";
590                }
591            });
592            return this;
593        }
594    
595        /**
596         * Sets a condition when <tt>number</tt> of {@link Exchange} has been completed.
597         * <p/>
598         * The difference between <i>done</i> and <i>completed</i> is that done can also include failed
599         * messages, where as completed is only successful processed messages.
600         *
601         * @param number exactly number of messages
602         * @return the builder
603         */
604        public NotifyBuilder whenExactlyCompleted(final int number) {
605            stack.add(new EventPredicateSupport() {
606                private AtomicInteger current = new AtomicInteger();
607    
608                @Override
609                public boolean onExchangeCompleted(Exchange exchange) {
610                    current.incrementAndGet();
611                    return true;
612                }
613    
614                public boolean matches() {
615                    return current.get() == number;
616                }
617    
618                @Override
619                public void reset() {
620                    current.set(0);
621                }
622    
623                @Override
624                public String toString() {
625                    return "whenExactlyCompleted(" + number + ")";
626                }
627            });
628            return this;
629        }
630    
631        /**
632         * Sets a condition when <tt>number</tt> of {@link Exchange} has failed.
633         *
634         * @param number exactly number of messages
635         * @return the builder
636         */
637        public NotifyBuilder whenExactlyFailed(final int number) {
638            stack.add(new EventPredicateSupport() {
639                private AtomicInteger current = new AtomicInteger();
640    
641                @Override
642                public boolean onExchangeFailed(Exchange exchange) {
643                    current.incrementAndGet();
644                    return true;
645                }
646    
647                public boolean matches() {
648                    return current.get() == number;
649                }
650    
651                @Override
652                public void reset() {
653                    current.set(0);
654                }
655    
656                @Override
657                public String toString() {
658                    return "whenExactlyFailed(" + number + ")";
659                }
660            });
661            return this;
662        }
663    
664        /**
665         * Sets a condition that <b>any received</b> {@link Exchange} should match the {@link Predicate}
666         *
667         * @param predicate the predicate
668         * @return the builder
669         */
670        public NotifyBuilder whenAnyReceivedMatches(final Predicate predicate) {
671            return doWhenAnyMatches(predicate, true);
672        }
673    
674        /**
675         * Sets a condition that <b>any done</b> {@link Exchange} should match the {@link Predicate}
676         *
677         * @param predicate the predicate
678         * @return the builder
679         */
680        public NotifyBuilder whenAnyDoneMatches(final Predicate predicate) {
681            return doWhenAnyMatches(predicate, false);
682        }
683    
684        private NotifyBuilder doWhenAnyMatches(final Predicate predicate, final boolean received) {
685            stack.add(new EventPredicateSupport() {
686                private final AtomicBoolean matches = new AtomicBoolean();
687    
688                @Override
689                public boolean onExchangeCompleted(Exchange exchange) {
690                    if (!received && !matches.get()) {
691                        matches.set(predicate.matches(exchange));
692                    }
693                    return true;
694                }
695    
696                @Override
697                public boolean onExchangeFailed(Exchange exchange) {
698                    if (!received && !matches.get()) {
699                        matches.set(predicate.matches(exchange));
700                    }
701                    return true;
702                }
703    
704                @Override
705                public boolean onExchangeCreated(Exchange exchange) {
706                    if (received && !matches.get()) {
707                        matches.set(predicate.matches(exchange));
708                    }
709                    return true;
710                }
711    
712                public boolean matches() {
713                    return matches.get();
714                }
715    
716                @Override
717                public void reset() {
718                    matches.set(false);
719                }
720    
721                @Override
722                public String toString() {
723                    if (received) {
724                        return "whenAnyReceivedMatches(" + predicate + ")";
725                    } else {
726                        return "whenAnyDoneMatches(" + predicate + ")";
727                    }
728                }
729            });
730            return this;
731        }
732    
733        /**
734         * Sets a condition that <b>all received</b> {@link Exchange} should match the {@link Predicate}
735         *
736         * @param predicate the predicate
737         * @return the builder
738         */
739        public NotifyBuilder whenAllReceivedMatches(final Predicate predicate) {
740            return doWhenAllMatches(predicate, true);
741        }
742    
743        /**
744         * Sets a condition that <b>all done</b> {@link Exchange} should match the {@link Predicate}
745         *
746         * @param predicate the predicate
747         * @return the builder
748         */
749        public NotifyBuilder whenAllDoneMatches(final Predicate predicate) {
750            return doWhenAllMatches(predicate, false);
751        }
752    
753        private NotifyBuilder doWhenAllMatches(final Predicate predicate, final boolean received) {
754            stack.add(new EventPredicateSupport() {
755                private final AtomicBoolean matches = new AtomicBoolean(true);
756    
757                @Override
758                public boolean onExchangeCompleted(Exchange exchange) {
759                    if (!received && matches.get()) {
760                        matches.set(predicate.matches(exchange));
761                    }
762                    return true;
763                }
764    
765                @Override
766                public boolean onExchangeFailed(Exchange exchange) {
767                    if (!received && matches.get()) {
768                        matches.set(predicate.matches(exchange));
769                    }
770                    return true;
771                }
772    
773                @Override
774                public boolean onExchangeCreated(Exchange exchange) {
775                    if (received && matches.get()) {
776                        matches.set(predicate.matches(exchange));
777                    }
778                    return true;
779                }
780    
781                public boolean matches() {
782                    return matches.get();
783                }
784    
785                @Override
786                public void reset() {
787                    matches.set(true);
788                }
789    
790                @Override
791                public String toString() {
792                    if (received) {
793                        return "whenAllReceivedMatches(" + predicate + ")";
794                    } else {
795                        return "whenAllDoneMatches(" + predicate + ")";
796                    }
797                }
798            });
799            return this;
800        }
801    
802        /**
803         * Sets a condition when the provided mock is satisfied based on {@link Exchange}
804         * being sent to it when they are <b>done</b>.
805         * <p/>
806         * The idea is that you can use Mock for setting fine grained expectations
807         * and then use that together with this builder. The mock provided does <b>NOT</b>
808         * have to already exist in the route. You can just create a new pseudo mock
809         * and this builder will send the done {@link Exchange} to it. So its like
810         * adding the mock to the end of your route(s).
811         *
812         * @param mock the mock
813         * @return the builder
814         */
815        public NotifyBuilder whenDoneSatisfied(final MockEndpoint mock) {
816            return doWhenSatisfied(mock, false);
817        }
818    
819        /**
820         * Sets a condition when the provided mock is satisfied based on {@link Exchange}
821         * being sent to it when they are <b>received</b>.
822         * <p/>
823         * The idea is that you can use Mock for setting fine grained expectations
824         * and then use that together with this builder. The mock provided does <b>NOT</b>
825         * have to already exist in the route. You can just create a new pseudo mock
826         * and this builder will send the done {@link Exchange} to it. So its like
827         * adding the mock to the end of your route(s).
828         *
829         * @param mock the mock
830         * @return the builder
831         */
832        public NotifyBuilder whenReceivedSatisfied(final MockEndpoint mock) {
833            return doWhenSatisfied(mock, true);
834        }
835    
836        private NotifyBuilder doWhenSatisfied(final MockEndpoint mock, final boolean received) {
837            stack.add(new EventPredicateSupport() {
838                private Producer producer;
839    
840                @Override
841                public boolean onExchangeCreated(Exchange exchange) {
842                    if (received) {
843                        sendToMock(exchange);
844                    }
845                    return true;
846                }
847    
848                @Override
849                public boolean onExchangeFailed(Exchange exchange) {
850                    if (!received) {
851                        sendToMock(exchange);
852                    }
853                    return true;
854                }
855    
856                @Override
857                public boolean onExchangeCompleted(Exchange exchange) {
858                    if (!received) {
859                        sendToMock(exchange);
860                    }
861                    return true;
862                }
863    
864                private void sendToMock(Exchange exchange) {
865                    // send the exchange when its completed to the mock
866                    try {
867                        if (producer == null) {
868                            producer = mock.createProducer();
869                        }
870                        producer.process(exchange);
871                    } catch (Exception e) {
872                        throw ObjectHelper.wrapRuntimeCamelException(e);
873                    }
874                }
875    
876                public boolean matches() {
877                    try {
878                        return mock.await(0, TimeUnit.SECONDS);
879                    } catch (InterruptedException e) {
880                        throw ObjectHelper.wrapRuntimeCamelException(e);
881                    }
882                }
883    
884                @Override
885                public void reset() {
886                    mock.reset();
887                }
888    
889                @Override
890                public String toString() {
891                    if (received) {
892                        return "whenReceivedSatisfied(" + mock + ")";
893                    } else {
894                        return "whenDoneSatisfied(" + mock + ")";
895                    }
896                }
897            });
898            return this;
899        }
900    
901        /**
902         * Sets a condition when the provided mock is <b>not</b> satisfied based on {@link Exchange}
903         * being sent to it when they are <b>received</b>.
904         * <p/>
905         * The idea is that you can use Mock for setting fine grained expectations
906         * and then use that together with this builder. The mock provided does <b>NOT</b>
907         * have to already exist in the route. You can just create a new pseudo mock
908         * and this builder will send the done {@link Exchange} to it. So its like
909         * adding the mock to the end of your route(s).
910         *
911         * @param mock the mock
912         * @return the builder
913         */
914        public NotifyBuilder whenReceivedNotSatisfied(final MockEndpoint mock) {
915            return doWhenNotSatisfied(mock, true);
916        }
917    
918        /**
919         * Sets a condition when the provided mock is <b>not</b> satisfied based on {@link Exchange}
920         * being sent to it when they are <b>done</b>.
921         * <p/>
922         * The idea is that you can use Mock for setting fine grained expectations
923         * and then use that together with this builder. The mock provided does <b>NOT</b>
924         * have to already exist in the route. You can just create a new pseudo mock
925         * and this builder will send the done {@link Exchange} to it. So its like
926         * adding the mock to the end of your route(s).
927         *
928         * @param mock the mock
929         * @return the builder
930         */
931        public NotifyBuilder whenDoneNotSatisfied(final MockEndpoint mock) {
932            return doWhenNotSatisfied(mock, false);
933        }
934    
935        private NotifyBuilder doWhenNotSatisfied(final MockEndpoint mock, final boolean received) {
936            stack.add(new EventPredicateSupport() {
937                private Producer producer;
938    
939                @Override
940                public boolean onExchangeCreated(Exchange exchange) {
941                    if (received) {
942                        sendToMock(exchange);
943                    }
944                    return true;
945                }
946    
947                @Override
948                public boolean onExchangeFailed(Exchange exchange) {
949                    if (!received) {
950                        sendToMock(exchange);
951                    }
952                    return true;
953                }
954    
955                @Override
956                public boolean onExchangeCompleted(Exchange exchange) {
957                    if (!received) {
958                        sendToMock(exchange);
959                    }
960                    return true;
961                }
962    
963                private void sendToMock(Exchange exchange) {
964                    // send the exchange when its completed to the mock
965                    try {
966                        if (producer == null) {
967                            producer = mock.createProducer();
968                        }
969                        producer.process(exchange);
970                    } catch (Exception e) {
971                        throw ObjectHelper.wrapRuntimeCamelException(e);
972                    }
973                }
974    
975                public boolean matches() {
976                    try {
977                        return !mock.await(0, TimeUnit.SECONDS);
978                    } catch (InterruptedException e) {
979                        throw ObjectHelper.wrapRuntimeCamelException(e);
980                    }
981                }
982    
983                @Override
984                public void reset() {
985                    mock.reset();
986                }
987    
988                @Override
989                public String toString() {
990                    if (received) {
991                        return "whenReceivedNotSatisfied(" + mock + ")";
992                    } else {
993                        return "whenDoneNotSatisfied(" + mock + ")";
994                    }
995                }
996            });
997            return this;
998        }
999    
1000        /**
1001         * Sets a condition that the bodies is expected to be <b>received</b> in the order as well.
1002         * <p/>
1003         * This condition will discard any additional messages. If you need a more strict condition
1004         * then use {@link #whenExactBodiesReceived(Object...)}
1005         *
1006         * @param bodies the expected bodies
1007         * @return the builder
1008         * @see #whenExactBodiesReceived(Object...)
1009         */
1010        public NotifyBuilder whenBodiesReceived(Object... bodies) {
1011            List<Object> bodyList = new ArrayList<Object>();
1012            bodyList.addAll(Arrays.asList(bodies));
1013            return doWhenBodies(bodyList, true, false);
1014        }
1015    
1016        /**
1017         * Sets a condition that the bodies is expected to be <b>done</b> in the order as well.
1018         * <p/>
1019         * This condition will discard any additional messages. If you need a more strict condition
1020         * then use {@link #whenExactBodiesDone(Object...)}
1021         *
1022         * @param bodies the expected bodies
1023         * @return the builder
1024         * @see #whenExactBodiesDone(Object...)
1025         */
1026        public NotifyBuilder whenBodiesDone(Object... bodies) {
1027            List<Object> bodyList = new ArrayList<Object>();
1028            bodyList.addAll(Arrays.asList(bodies));
1029            return doWhenBodies(bodyList, false, false);
1030        }
1031    
1032        /**
1033         * Sets a condition that the bodies is expected to be <b>received</b> in the order as well.
1034         * <p/>
1035         * This condition is strict which means that it only expect that exact number of bodies
1036         *
1037         * @param bodies the expected bodies
1038         * @return the builder
1039         * @see #whenBodiesReceived(Object...)
1040         */
1041        public NotifyBuilder whenExactBodiesReceived(Object... bodies) {
1042            List<Object> bodyList = new ArrayList<Object>();
1043            bodyList.addAll(Arrays.asList(bodies));
1044            return doWhenBodies(bodyList, true, true);
1045        }
1046    
1047        /**
1048         * Sets a condition that the bodies is expected to be <b>done</b> in the order as well.
1049         * <p/>
1050         * This condition is strict which means that it only expect that exact number of bodies
1051         *
1052         * @param bodies the expected bodies
1053         * @return the builder
1054         * @see #whenExactBodiesDone(Object...)
1055         */
1056        public NotifyBuilder whenExactBodiesDone(Object... bodies) {
1057            List<Object> bodyList = new ArrayList<Object>();
1058            bodyList.addAll(Arrays.asList(bodies));
1059            return doWhenBodies(bodyList, false, true);
1060        }
1061    
1062        private NotifyBuilder doWhenBodies(final List<?> bodies, final boolean received, final boolean exact) {
1063            stack.add(new EventPredicateSupport() {
1064                private volatile boolean matches;
1065                private final AtomicInteger current = new AtomicInteger();
1066    
1067                @Override
1068                public boolean onExchangeCreated(Exchange exchange) {
1069                    if (received) {
1070                        matchBody(exchange);
1071                    }
1072                    return true;
1073                }
1074    
1075                @Override
1076                public boolean onExchangeFailed(Exchange exchange) {
1077                    if (!received) {
1078                        matchBody(exchange);
1079                    }
1080                    return true;
1081                }
1082    
1083                @Override
1084                public boolean onExchangeCompleted(Exchange exchange) {
1085                    if (!received) {
1086                        matchBody(exchange);
1087                    }
1088                    return true;
1089                }
1090    
1091                private void matchBody(Exchange exchange) {
1092                    if (current.incrementAndGet() > bodies.size()) {
1093                        // out of bounds
1094                        return;
1095                    }
1096    
1097                    Object actual = exchange.getIn().getBody();
1098                    Object expected = bodies.get(current.get() - 1);
1099                    matches = ObjectHelper.equal(expected, actual);
1100                }
1101    
1102                public boolean matches() {
1103                    if (exact) {
1104                        return matches && current.get() == bodies.size();
1105                    } else {
1106                        return matches && current.get() >= bodies.size();
1107                    }
1108                }
1109    
1110                @Override
1111                public void reset() {
1112                    matches = false;
1113                    current.set(0);
1114                }
1115    
1116                @Override
1117                public String toString() {
1118                    if (received) {
1119                        return "" + (exact ? "whenExactBodiesReceived(" : "whenBodiesReceived(") + bodies + ")";
1120                    } else {
1121                        return "" + (exact ? "whenExactBodiesDone(" : "whenBodiesDone(") + bodies + ")";
1122                    }
1123                }
1124            });
1125            return this;
1126        }
1127    
1128        /**
1129         * Prepares to append an additional expression using the <i>and</i> operator.
1130         *
1131         * @return the builder
1132         */
1133        public NotifyBuilder and() {
1134            doCreate(EventOperation.and);
1135            return this;
1136        }
1137    
1138        /**
1139         * Prepares to append an additional expression using the <i>or</i> operator.
1140         *
1141         * @return the builder
1142         */
1143        public NotifyBuilder or() {
1144            doCreate(EventOperation.or);
1145            return this;
1146        }
1147    
1148        /**
1149         * Prepares to append an additional expression using the <i>not</i> operator.
1150         *
1151         * @return the builder
1152         */
1153        public NotifyBuilder not() {
1154            doCreate(EventOperation.not);
1155            return this;
1156        }
1157    
1158        /**
1159         * Creates the expression this builder should use for matching.
1160         * <p/>
1161         * You must call this method when you are finished building the expressions.
1162         *
1163         * @return the created builder ready for matching
1164         */
1165        public NotifyBuilder create() {
1166            doCreate(EventOperation.and);
1167            created = true;
1168            return this;
1169        }
1170    
1171        /**
1172         * Does all the expression match?
1173         * <p/>
1174         * This operation will return immediately which means it can be used for testing at this very moment.
1175         *
1176         * @return <tt>true</tt> if matching, <tt>false</tt> otherwise
1177         */
1178        public boolean matches() {
1179            if (!created) {
1180                throw new IllegalStateException("NotifyBuilder has not been created. Invoke the create() method before matching.");
1181            }
1182            return matches;
1183        }
1184    
1185        /**
1186         * Does all the expression match?
1187         * <p/>
1188         * This operation will wait until the match is <tt>true</tt> or otherwise a timeout occur
1189         * which means <tt>false</tt> will be returned.
1190         *
1191         * @param timeout  the timeout value
1192         * @param timeUnit the time unit
1193         * @return <tt>true</tt> if matching, <tt>false</tt> otherwise due to timeout
1194         */
1195        public boolean matches(long timeout, TimeUnit timeUnit) {
1196            if (!created) {
1197                throw new IllegalStateException("NotifyBuilder has not been created. Invoke the create() method before matching.");
1198            }
1199            try {
1200                latch.await(timeout, timeUnit);
1201            } catch (InterruptedException e) {
1202                throw ObjectHelper.wrapRuntimeCamelException(e);
1203            }
1204            return matches();
1205        }
1206    
1207        /**
1208         * Does all the expression match?
1209         * <p/>
1210         * This operation will wait until the match is <tt>true</tt> or otherwise a timeout occur
1211         * which means <tt>false</tt> will be returned.
1212         * <p/>
1213         * The timeout value is by default 10 seconds. But it will use the highest <i>maximum result wait time</i>
1214         * from the configured mocks, if such a value has been configured.
1215         * <p/>
1216         * This method is convenient to use in unit tests to have it adhere and wait
1217         * as long as the mock endpoints.
1218         *
1219         * @return <tt>true</tt> if matching, <tt>false</tt> otherwise due to timeout
1220         */
1221        public boolean matchesMockWaitTime() {
1222            if (!created) {
1223                throw new IllegalStateException("NotifyBuilder has not been created. Invoke the create() method before matching.");
1224            }
1225            long timeout = 0;
1226            for (Endpoint endpoint : context.getEndpoints()) {
1227                if (endpoint instanceof MockEndpoint) {
1228                    long waitTime = ((MockEndpoint) endpoint).getResultWaitTime();
1229                    if (waitTime > 0) {
1230                        timeout = Math.max(timeout, waitTime);
1231                    }
1232                }
1233            }
1234    
1235            // use 10 sec as default
1236            if (timeout == 0) {
1237                timeout = 10000;
1238            }
1239    
1240            return matches(timeout, TimeUnit.MILLISECONDS);
1241        }
1242    
1243        /**
1244         * Resets the notifier.
1245         */
1246        public void reset() {
1247            for (EventPredicateHolder predicate : predicates) {
1248                predicate.reset();
1249            }
1250            latch = new CountDownLatch(1);
1251            matches = false;
1252        }
1253    
1254        @Override
1255        public String toString() {
1256            StringBuilder sb = new StringBuilder();
1257            for (EventPredicateHolder eventPredicateHolder : predicates) {
1258                if (sb.length() > 0) {
1259                    sb.append(".");
1260                }
1261                sb.append(eventPredicateHolder.toString());
1262            }
1263            // a crude way of skipping the first invisible operation
1264            return ObjectHelper.after(sb.toString(), "().");
1265        }
1266    
1267        private void doCreate(EventOperation newOperation) {
1268            // init operation depending on the newOperation
1269            if (operation == null) {
1270                // if the first new operation is an or then this operation must be an or as well
1271                // otherwise it should be and based
1272                operation = newOperation == EventOperation.or ? EventOperation.or : EventOperation.and;
1273            }
1274    
1275            // we have some predicates
1276            if (!stack.isEmpty()) {
1277                // we only want to match from routes, so skip for example events
1278                // which is triggered by producer templates etc.
1279                fromRoutesOnly();
1280    
1281                // the stack must have at least one non abstract
1282                boolean found = false;
1283                for (EventPredicate predicate : stack) {
1284                    if (!predicate.isAbstract()) {
1285                        found = true;
1286                        break;
1287                    }
1288                }
1289                if (!found) {
1290                    throw new IllegalArgumentException("NotifyBuilder must contain at least one non-abstract predicate (such as whenDone)");
1291                }
1292    
1293                CompoundEventPredicate compound = new CompoundEventPredicate(stack);
1294                stack.clear();
1295                predicates.add(new EventPredicateHolder(operation, compound));
1296            }
1297    
1298            operation = newOperation;
1299            // reset wereSentTo index position as this its a new group
1300            wereSentToIndex = 0;
1301        }
1302    
1303        /**
1304         * Notifier which hooks into Camel to listen for {@link Exchange} relevant events for this builder
1305         */
1306        private final class ExchangeNotifier extends EventNotifierSupport {
1307    
1308            public void notify(EventObject event) throws Exception {
1309                if (event instanceof ExchangeCreatedEvent) {
1310                    onExchangeCreated((ExchangeCreatedEvent) event);
1311                } else if (event instanceof ExchangeCompletedEvent) {
1312                    onExchangeCompleted((ExchangeCompletedEvent) event);
1313                } else if (event instanceof ExchangeFailedEvent) {
1314                    onExchangeFailed((ExchangeFailedEvent) event);
1315                } else if (event instanceof ExchangeSentEvent) {
1316                    onExchangeSent((ExchangeSentEvent) event);
1317                }
1318    
1319                // now compute whether we matched
1320                computeMatches();
1321            }
1322    
1323            public boolean isEnabled(EventObject event) {
1324                return true;
1325            }
1326    
1327            private void onExchangeCreated(ExchangeCreatedEvent event) {
1328                for (EventPredicateHolder predicate : predicates) {
1329                    predicate.getPredicate().onExchangeCreated(event.getExchange());
1330                }
1331            }
1332    
1333            private void onExchangeCompleted(ExchangeCompletedEvent event) {
1334                for (EventPredicateHolder predicate : predicates) {
1335                    predicate.getPredicate().onExchangeCompleted(event.getExchange());
1336                }
1337            }
1338    
1339            private void onExchangeFailed(ExchangeFailedEvent event) {
1340                for (EventPredicateHolder predicate : predicates) {
1341                    predicate.getPredicate().onExchangeFailed(event.getExchange());
1342                }
1343            }
1344    
1345            private void onExchangeSent(ExchangeSentEvent event) {
1346                for (EventPredicateHolder predicate : predicates) {
1347                    predicate.getPredicate().onExchangeSent(event.getExchange(), event.getEndpoint(), event.getTimeTaken());
1348                }
1349            }
1350    
1351            private synchronized void computeMatches() {
1352                // use a temporary answer until we have computed the value to assign
1353                Boolean answer = null;
1354    
1355                for (EventPredicateHolder holder : predicates) {
1356                    EventOperation operation = holder.getOperation();
1357                    if (EventOperation.and == operation) {
1358                        if (holder.getPredicate().matches()) {
1359                            answer = true;
1360                        } else {
1361                            answer = false;
1362                            // and break out since its an AND so it must match
1363                            break;
1364                        }
1365                    } else if (EventOperation.or == operation) {
1366                        if (holder.getPredicate().matches()) {
1367                            answer = true;
1368                        }
1369                    } else if (EventOperation.not == operation) {
1370                        if (holder.getPredicate().matches()) {
1371                            answer = false;
1372                            // and break out since its a NOT so it must not match
1373                            break;
1374                        } else {
1375                            answer = true;
1376                        }
1377                    }
1378                }
1379    
1380                // if we did compute a value then assign that
1381                if (answer != null) {
1382                    matches = answer;
1383                    if (matches) {
1384                        // signal completion
1385                        latch.countDown();
1386                    }
1387                }
1388            }
1389    
1390            @Override
1391            protected void doStart() throws Exception {
1392                // we only care about Exchange events
1393                setIgnoreCamelContextEvents(true);
1394                setIgnoreRouteEvents(true);
1395                setIgnoreServiceEvents(true);
1396            }
1397    
1398            @Override
1399            protected void doStop() throws Exception {
1400            }
1401        }
1402    
1403        private enum EventOperation {
1404            and, or, not;
1405        }
1406    
1407        private interface EventPredicate {
1408    
1409            /**
1410             * Evaluates whether the predicate matched or not.
1411             *
1412             * @return <tt>true</tt> if matched, <tt>false</tt> otherwise
1413             */
1414            boolean matches();
1415    
1416            /**
1417             * Resets the predicate
1418             */
1419            void reset();
1420    
1421            /**
1422             * Whether the predicate is abstract
1423             */
1424            boolean isAbstract();
1425    
1426            /**
1427             * Callback for {@link Exchange} lifecycle
1428             *
1429             * @param exchange the exchange
1430             * @return <tt>true</tt> to allow continue evaluating, <tt>false</tt> to stop immediately
1431             */
1432            boolean onExchangeCreated(Exchange exchange);
1433    
1434            /**
1435             * Callback for {@link Exchange} lifecycle
1436             *
1437             * @param exchange the exchange
1438             * @return <tt>true</tt> to allow continue evaluating, <tt>false</tt> to stop immediately
1439             */
1440            boolean onExchangeCompleted(Exchange exchange);
1441    
1442            /**
1443             * Callback for {@link Exchange} lifecycle
1444             *
1445             * @param exchange the exchange
1446             * @return <tt>true</tt> to allow continue evaluating, <tt>false</tt> to stop immediately
1447             */
1448            boolean onExchangeFailed(Exchange exchange);
1449    
1450            /**
1451             * Callback for {@link Exchange} lifecycle
1452             *
1453             * @param exchange the exchange
1454             * @param endpoint the endpoint sent to
1455             * @param timeTaken time taken in millis to send the to endpoint
1456             * @return <tt>true</tt> to allow continue evaluating, <tt>false</tt> to stop immediately
1457             */
1458            boolean onExchangeSent(Exchange exchange, Endpoint endpoint, long timeTaken);
1459        }
1460    
1461        private abstract class EventPredicateSupport implements EventPredicate {
1462    
1463            public boolean isAbstract() {
1464                return false;
1465            }
1466    
1467            public void reset() {
1468                // noop
1469            }
1470    
1471            public boolean onExchangeCreated(Exchange exchange) {
1472                return onExchange(exchange);
1473            }
1474    
1475            public boolean onExchangeCompleted(Exchange exchange) {
1476                return onExchange(exchange);
1477            }
1478    
1479            public boolean onExchangeFailed(Exchange exchange) {
1480                return onExchange(exchange);
1481            }
1482    
1483            public boolean onExchangeSent(Exchange exchange, Endpoint endpoint, long timeTaken) {
1484                // no need to invoke onExchange as this is a special case when the Exchange
1485                // was sent to a specific endpoint
1486                return true;
1487            }
1488    
1489            public boolean onExchange(Exchange exchange) {
1490                return true;
1491            }
1492        }
1493    
1494        /**
1495         * To hold an operation and predicate
1496         */
1497        private final class EventPredicateHolder {
1498            private final EventOperation operation;
1499            private final EventPredicate predicate;
1500    
1501            private EventPredicateHolder(EventOperation operation, EventPredicate predicate) {
1502                this.operation = operation;
1503                this.predicate = predicate;
1504            }
1505    
1506            public EventOperation getOperation() {
1507                return operation;
1508            }
1509    
1510            public EventPredicate getPredicate() {
1511                return predicate;
1512            }
1513    
1514            public void reset() {
1515                predicate.reset();
1516            }
1517    
1518            @Override
1519            public String toString() {
1520                return operation.name() + "()." + predicate;
1521            }
1522        }
1523    
1524        /**
1525         * To hold multiple predicates which are part of same expression
1526         */
1527        private final class CompoundEventPredicate implements EventPredicate {
1528    
1529            private List<EventPredicate> predicates = new ArrayList<EventPredicate>();
1530    
1531            private CompoundEventPredicate(List<EventPredicate> predicates) {
1532                this.predicates.addAll(predicates);
1533            }
1534    
1535            public boolean isAbstract() {
1536                return false;
1537            }
1538    
1539            public boolean matches() {
1540                for (EventPredicate predicate : predicates) {
1541                    boolean answer = predicate.matches();
1542                    LOG.trace("matches() {} -> {}", predicate, answer);
1543                    if (!answer) {
1544                        // break at first false
1545                        return false;
1546                    }
1547                }
1548                return true;
1549            }
1550    
1551            public void reset() {
1552                for (EventPredicate predicate : predicates) {
1553                    LOG.trace("reset() {}", predicate);
1554                    predicate.reset();
1555                }
1556            }
1557    
1558            public boolean onExchangeCreated(Exchange exchange) {
1559                for (EventPredicate predicate : predicates) {
1560                    boolean answer = predicate.onExchangeCreated(exchange);
1561                    LOG.trace("onExchangeCreated() {} -> {}", predicate, answer);
1562                    if (!answer) {
1563                        // break at first false
1564                        return false;
1565                    }
1566                }
1567                return true;
1568            }
1569    
1570            public boolean onExchangeCompleted(Exchange exchange) {
1571                for (EventPredicate predicate : predicates) {
1572                    boolean answer = predicate.onExchangeCompleted(exchange);
1573                    LOG.trace("onExchangeCompleted() {} -> {}", predicate, answer);
1574                    if (!answer) {
1575                        // break at first false
1576                        return false;
1577                    }
1578                }
1579                return true;
1580            }
1581    
1582            public boolean onExchangeFailed(Exchange exchange) {
1583                for (EventPredicate predicate : predicates) {
1584                    boolean answer = predicate.onExchangeFailed(exchange);
1585                    LOG.trace("onExchangeFailed() {} -> {}", predicate, answer);
1586                    if (!answer) {
1587                        // break at first false
1588                        return false;
1589                    }
1590                }
1591                return true;
1592            }
1593    
1594            @Override
1595            public boolean onExchangeSent(Exchange exchange, Endpoint endpoint, long timeTaken) {
1596                for (EventPredicate predicate : predicates) {
1597                    boolean answer = predicate.onExchangeSent(exchange, endpoint, timeTaken);
1598                    LOG.trace("onExchangeSent() {} {} -> {}", new Object[]{endpoint, predicate, answer});
1599                    if (!answer) {
1600                        // break at first false
1601                        return false;
1602                    }
1603                }
1604                return true;
1605            }
1606    
1607            @Override
1608            public String toString() {
1609                StringBuilder sb = new StringBuilder();
1610                for (EventPredicate eventPredicate : predicates) {
1611                    if (sb.length() > 0) {
1612                        sb.append(".");
1613                    }
1614                    sb.append(eventPredicate.toString());
1615                }
1616                return sb.toString();
1617            }
1618        }
1619    
1620    }