001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.builder;
018
019import java.util.ArrayList;
020import java.util.Arrays;
021import java.util.EventObject;
022import java.util.List;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.ConcurrentMap;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.concurrent.atomic.AtomicInteger;
029
030import org.apache.camel.CamelContext;
031import org.apache.camel.Endpoint;
032import org.apache.camel.Exchange;
033import org.apache.camel.Expression;
034import org.apache.camel.Predicate;
035import org.apache.camel.Producer;
036import org.apache.camel.component.direct.DirectEndpoint;
037import org.apache.camel.component.mock.MockEndpoint;
038import org.apache.camel.management.event.ExchangeCompletedEvent;
039import org.apache.camel.management.event.ExchangeCreatedEvent;
040import org.apache.camel.management.event.ExchangeFailedEvent;
041import org.apache.camel.management.event.ExchangeSentEvent;
042import org.apache.camel.support.EventNotifierSupport;
043import org.apache.camel.util.EndpointHelper;
044import org.apache.camel.util.ObjectHelper;
045import org.apache.camel.util.ServiceHelper;
046import org.apache.camel.util.StringHelper;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * A builder to build an expression based on {@link org.apache.camel.spi.EventNotifier} notifications
052 * about {@link Exchange} being routed.
053 * <p/>
054 * This builder can be used for testing purposes where you want to know when a test is supposed to be done.
055 * The idea is that you can build an expression that explains when the test is done. For example when Camel
056 * have finished routing 5 messages. You can then in your test await for this condition to occur.
057 *
058 * @version 
059 */
060public class NotifyBuilder {
061
062    private static final Logger LOG = LoggerFactory.getLogger(NotifyBuilder.class);
063
064    private final CamelContext context;
065
066    // notifier to hook into Camel to listen for events
067    private final EventNotifierSupport eventNotifier;
068
069    // the predicates build with this builder
070    private final List<EventPredicateHolder> predicates = new ArrayList<>();
071
072    // latch to be used to signal predicates matches
073    private CountDownLatch latch = new CountDownLatch(1);
074
075    // the current state while building an event predicate where we use a stack and the operation
076    private final List<EventPredicate> stack = new ArrayList<>();
077    private EventOperation operation;
078    private boolean created;
079    // keep state of how many wereSentTo we have added
080    private int wereSentToIndex;
081
082    // computed value whether all the predicates matched
083    private volatile boolean matches;
084
085    /**
086     * Creates a new builder.
087     *
088     * @param context the Camel context
089     */
090    public NotifyBuilder(CamelContext context) {
091        this.context = context;
092        eventNotifier = new ExchangeNotifier();
093        try {
094            ServiceHelper.startService(eventNotifier);
095        } catch (Exception e) {
096            throw ObjectHelper.wrapRuntimeCamelException(e);
097        }
098        context.getManagementStrategy().addEventNotifier(eventNotifier);
099    }
100
101    /**
102     * Optionally a <tt>from</tt> endpoint which means that this expression should only be based
103     * on {@link Exchange} which is originated from the particular endpoint(s).
104     *
105     * @param endpointUri uri of endpoint or pattern (see the EndpointHelper javadoc)
106     * @return the builder
107     * @see org.apache.camel.util.EndpointHelper#matchEndpoint(org.apache.camel.CamelContext, String, String)
108     */
109    public NotifyBuilder from(final String endpointUri) {
110        stack.add(new EventPredicateSupport() {
111
112            @Override
113            public boolean isAbstract() {
114                // is abstract as its a filter
115                return true;
116            }
117
118            @Override
119            public boolean onExchange(Exchange exchange) {
120                // filter non matching exchanges
121                return EndpointHelper.matchEndpoint(context, exchange.getFromEndpoint().getEndpointUri(), endpointUri);
122            }
123
124            public boolean matches() {
125                // should be true as we use the onExchange to filter
126                return true;
127            }
128
129            @Override
130            public String toString() {
131                return "from(" + endpointUri + ")";
132            }
133        });
134        return this;
135    }
136
137    /**
138     * Optionally a <tt>from</tt> route which means that this expression should only be based
139     * on {@link Exchange} which is originated from the particular route(s).
140     *
141     * @param routeId id of route or pattern (see the EndpointHelper javadoc)
142     * @return the builder
143     * @see org.apache.camel.util.EndpointHelper#matchEndpoint(org.apache.camel.CamelContext, String, String)
144     */
145    public NotifyBuilder fromRoute(final String routeId) {
146        stack.add(new EventPredicateSupport() {
147
148            @Override
149            public boolean isAbstract() {
150                // is abstract as its a filter
151                return true;
152            }
153
154            @Override
155            public boolean onExchange(Exchange exchange) {
156                String id = EndpointHelper.getRouteIdFromEndpoint(exchange.getFromEndpoint());
157
158                if (id == null) {
159                    id = exchange.getFromRouteId();
160                }
161
162                // filter non matching exchanges
163                return EndpointHelper.matchPattern(id, routeId);
164            }
165
166            public boolean matches() {
167                // should be true as we use the onExchange to filter
168                return true;
169            }
170
171            @Override
172            public String toString() {
173                return "fromRoute(" + routeId + ")";
174            }
175        });
176        return this;
177    }
178
179    private NotifyBuilder fromRoutesOnly() {
180        // internal and should always be in top of stack
181        stack.add(0, new EventPredicateSupport() {
182
183            @Override
184            public boolean isAbstract() {
185                // is abstract as its a filter
186                return true;
187            }
188
189            @Override
190            public boolean onExchange(Exchange exchange) {
191                // always accept direct endpoints as they are a special case as it will create the UoW beforehand
192                // and just continue to route that on the consumer side, which causes the EventNotifier not to
193                // emit events when the consumer received the exchange, as its already done. For example by
194                // ProducerTemplate which creates the UoW before producing messages.
195                if (exchange.getFromEndpoint() instanceof DirectEndpoint) {
196                    return true;
197                }
198                return EndpointHelper.matchPattern(exchange.getFromRouteId(), "*");
199            }
200
201            public boolean matches() {
202                // should be true as we use the onExchange to filter
203                return true;
204            }
205
206            @Override
207            public String toString() {
208                // we dont want any to string output as this is an internal predicate to match only from routes
209                return "";
210            }
211        });
212        return this;
213    }
214
215    /**
216     * Optionally a filter to only allow matching {@link Exchange} to be used for matching.
217     *
218     * @param predicate the predicate to use for the filter
219     * @return the builder
220     */
221    public NotifyBuilder filter(final Predicate predicate) {
222        stack.add(new EventPredicateSupport() {
223
224            @Override
225            public boolean isAbstract() {
226                // is abstract as its a filter
227                return true;
228            }
229
230            @Override
231            public boolean onExchange(Exchange exchange) {
232                // filter non matching exchanges
233                return predicate.matches(exchange);
234            }
235
236            public boolean matches() {
237                // should be true as we use the onExchange to filter
238                return true;
239            }
240
241            @Override
242            public String toString() {
243                return "filter(" + predicate + ")";
244            }
245        });
246        return this;
247    }
248
249    /**
250     * Optionally a filter to only allow matching {@link Exchange} to be used for matching.
251     *
252     * @return the builder
253     */
254    public ExpressionClauseSupport<NotifyBuilder> filter() {
255        final ExpressionClauseSupport<NotifyBuilder> clause = new ExpressionClauseSupport<>(this);
256        stack.add(new EventPredicateSupport() {
257
258            @Override
259            public boolean isAbstract() {
260                // is abstract as its a filter
261                return true;
262            }
263
264            @Override
265            public boolean onExchange(Exchange exchange) {
266                // filter non matching exchanges
267                Expression exp = clause.createExpression(exchange.getContext());
268                return exp.evaluate(exchange, Boolean.class);
269            }
270
271            public boolean matches() {
272                // should be true as we use the onExchange to filter
273                return true;
274            }
275
276            @Override
277            public String toString() {
278                return "filter(" + clause + ")";
279            }
280        });
281        return clause;
282    }
283
284    /**
285     * Optionally a <tt>sent to</tt> endpoint which means that this expression should only be based
286     * on {@link Exchange} which has been sent to the given endpoint uri.
287     * <p/>
288     * Notice the {@link Exchange} may have been sent to other endpoints as well. This condition will match
289     * if the {@link Exchange} has been sent at least once to the given endpoint.
290     *
291     * @param endpointUri uri of endpoint or pattern (see the EndpointHelper javadoc)
292     * @return the builder
293     * @see org.apache.camel.util.EndpointHelper#matchEndpoint(org.apache.camel.CamelContext, String, String)
294     */
295    public NotifyBuilder wereSentTo(final String endpointUri) {
296        // insert in start of stack but after the previous wereSentTo
297        stack.add(wereSentToIndex++, new EventPredicateSupport() {
298            private ConcurrentMap<String, String> sentTo = new ConcurrentHashMap<>();
299
300            @Override
301            public boolean isAbstract() {
302                // is abstract as its a filter
303                return true;
304            }
305
306            @Override
307            public boolean onExchangeSent(Exchange exchange, Endpoint endpoint, long timeTaken) {
308                if (EndpointHelper.matchEndpoint(context, endpoint.getEndpointUri(), endpointUri)) {
309                    sentTo.put(exchange.getExchangeId(), exchange.getExchangeId());
310                }
311                return onExchange(exchange);
312            }
313
314            @Override
315            public boolean onExchange(Exchange exchange) {
316                // filter only when sentTo
317                String sent = sentTo.get(exchange.getExchangeId());
318                return sent != null;
319            }
320
321            public boolean matches() {
322                // should be true as we use the onExchange to filter
323                return true;
324            }
325
326            @Override
327            public void reset() {
328                sentTo.clear();
329            }
330
331            @Override
332            public String toString() {
333                return "wereSentTo(" + endpointUri + ")";
334            }
335        });
336        return this;
337    }
338
339    /**
340     * Sets a condition when <tt>number</tt> of {@link Exchange} has been received.
341     * <p/>
342     * The number matching is <i>at least</i> based which means that if more messages received
343     * it will match also.
344     *
345     * @param number at least number of messages
346     * @return the builder
347     */
348    public NotifyBuilder whenReceived(final int number) {
349        stack.add(new EventPredicateSupport() {
350            private AtomicInteger current = new AtomicInteger();
351
352            @Override
353            public boolean onExchangeCreated(Exchange exchange) {
354                current.incrementAndGet();
355                return true;
356            }
357
358            public boolean matches() {
359                return current.get() >= number;
360            }
361
362            @Override
363            public void reset() {
364                current.set(0);
365            }
366
367            @Override
368            public String toString() {
369                return "whenReceived(" + number + ")";
370            }
371        });
372        return this;
373    }
374
375    /**
376     * Sets a condition when <tt>number</tt> of {@link Exchange} is done being processed.
377     * <p/>
378     * The number matching is <i>at least</i> based which means that if more messages received
379     * it will match also.
380     * <p/>
381     * The difference between <i>done</i> and <i>completed</i> is that done can also include failed
382     * messages, where as completed is only successful processed messages.
383     *
384     * @param number at least number of messages
385     * @return the builder
386     */
387    public NotifyBuilder whenDone(final int number) {
388        stack.add(new EventPredicateSupport() {
389            private final AtomicInteger current = new AtomicInteger();
390
391            @Override
392            public boolean onExchangeCompleted(Exchange exchange) {
393                current.incrementAndGet();
394                return true;
395            }
396
397            @Override
398            public boolean onExchangeFailed(Exchange exchange) {
399                current.incrementAndGet();
400                return true;
401            }
402
403            public boolean matches() {
404                return current.get() >= number;
405            }
406
407            @Override
408            public void reset() {
409                current.set(0);
410            }
411
412            @Override
413            public String toString() {
414                return "whenDone(" + number + ")";
415            }
416        });
417        return this;
418    }
419
420    /**
421     * Sets a condition when tne <tt>n'th</tt> (by index) {@link Exchange} is done being processed.
422     * <p/>
423     * The difference between <i>done</i> and <i>completed</i> is that done can also include failed
424     * messages, where as completed is only successful processed messages.
425     *
426     * @param index the message by index to be done
427     * @return the builder
428     */
429    public NotifyBuilder whenDoneByIndex(final int index) {
430        stack.add(new EventPredicateSupport() {
431            private AtomicInteger current = new AtomicInteger();
432            private String id;
433            private AtomicBoolean done = new AtomicBoolean();
434
435            @Override
436            public boolean onExchangeCreated(Exchange exchange) {
437                if (current.get() == index) {
438                    id = exchange.getExchangeId();
439                }
440                current.incrementAndGet();
441                return true;
442            }
443
444            @Override
445            public boolean onExchangeCompleted(Exchange exchange) {
446                if (exchange.getExchangeId().equals(id)) {
447                    done.set(true);
448                }
449                return true;
450            }
451
452            @Override
453            public boolean onExchangeFailed(Exchange exchange) {
454                if (exchange.getExchangeId().equals(id)) {
455                    done.set(true);
456                }
457                return true;
458            }
459
460            public boolean matches() {
461                return done.get();
462            }
463
464            @Override
465            public void reset() {
466                current.set(0);
467                id = null;
468                done.set(false);
469            }
470
471            @Override
472            public String toString() {
473                return "whenDoneByIndex(" + index + ")";
474            }
475        });
476        return this;
477    }
478
479    /**
480     * Sets a condition when <tt>number</tt> of {@link Exchange} has been completed.
481     * <p/>
482     * The number matching is <i>at least</i> based which means that if more messages received
483     * it will match also.
484     * <p/>
485     * The difference between <i>done</i> and <i>completed</i> is that done can also include failed
486     * messages, where as completed is only successful processed messages.
487     *
488     * @param number at least number of messages
489     * @return the builder
490     */
491    public NotifyBuilder whenCompleted(final int number) {
492        stack.add(new EventPredicateSupport() {
493            private AtomicInteger current = new AtomicInteger();
494
495            @Override
496            public boolean onExchangeCompleted(Exchange exchange) {
497                current.incrementAndGet();
498                return true;
499            }
500
501            public boolean matches() {
502                return current.get() >= number;
503            }
504
505            @Override
506            public void reset() {
507                current.set(0);
508            }
509
510            @Override
511            public String toString() {
512                return "whenCompleted(" + number + ")";
513            }
514        });
515        return this;
516    }
517
518    /**
519     * Sets a condition when <tt>number</tt> of {@link Exchange} has failed.
520     * <p/>
521     * The number matching is <i>at least</i> based which means that if more messages received
522     * it will match also.
523     *
524     * @param number at least number of messages
525     * @return the builder
526     */
527    public NotifyBuilder whenFailed(final int number) {
528        stack.add(new EventPredicateSupport() {
529            private AtomicInteger current = new AtomicInteger();
530
531            @Override
532            public boolean onExchangeFailed(Exchange exchange) {
533                current.incrementAndGet();
534                return true;
535            }
536
537            public boolean matches() {
538                return current.get() >= number;
539            }
540
541            @Override
542            public void reset() {
543                current.set(0);
544            }
545
546            @Override
547            public String toString() {
548                return "whenFailed(" + number + ")";
549            }
550        });
551        return this;
552    }
553
554    /**
555     * Sets a condition when <tt>number</tt> of {@link Exchange} is done being processed.
556     * <p/>
557     * messages, where as completed is only successful processed messages.
558     *
559     * @param number exactly number of messages
560     * @return the builder
561     */
562    public NotifyBuilder whenExactlyDone(final int number) {
563        stack.add(new EventPredicateSupport() {
564            private AtomicInteger current = new AtomicInteger();
565
566            @Override
567            public boolean onExchangeCompleted(Exchange exchange) {
568                current.incrementAndGet();
569                return true;
570            }
571
572            @Override
573            public boolean onExchangeFailed(Exchange exchange) {
574                current.incrementAndGet();
575                return true;
576            }
577
578            public boolean matches() {
579                return current.get() == number;
580            }
581
582            @Override
583            public void reset() {
584                current.set(0);
585            }
586
587            @Override
588            public String toString() {
589                return "whenExactlyDone(" + number + ")";
590            }
591        });
592        return this;
593    }
594
595    /**
596     * Sets a condition when <tt>number</tt> of {@link Exchange} has been completed.
597     * <p/>
598     * The difference between <i>done</i> and <i>completed</i> is that done can also include failed
599     * messages, where as completed is only successful processed messages.
600     *
601     * @param number exactly number of messages
602     * @return the builder
603     */
604    public NotifyBuilder whenExactlyCompleted(final int number) {
605        stack.add(new EventPredicateSupport() {
606            private AtomicInteger current = new AtomicInteger();
607
608            @Override
609            public boolean onExchangeCompleted(Exchange exchange) {
610                current.incrementAndGet();
611                return true;
612            }
613
614            public boolean matches() {
615                return current.get() == number;
616            }
617
618            @Override
619            public void reset() {
620                current.set(0);
621            }
622
623            @Override
624            public String toString() {
625                return "whenExactlyCompleted(" + number + ")";
626            }
627        });
628        return this;
629    }
630
631    /**
632     * Sets a condition when <tt>number</tt> of {@link Exchange} has failed.
633     *
634     * @param number exactly number of messages
635     * @return the builder
636     */
637    public NotifyBuilder whenExactlyFailed(final int number) {
638        stack.add(new EventPredicateSupport() {
639            private AtomicInteger current = new AtomicInteger();
640
641            @Override
642            public boolean onExchangeFailed(Exchange exchange) {
643                current.incrementAndGet();
644                return true;
645            }
646
647            public boolean matches() {
648                return current.get() == number;
649            }
650
651            @Override
652            public void reset() {
653                current.set(0);
654            }
655
656            @Override
657            public String toString() {
658                return "whenExactlyFailed(" + number + ")";
659            }
660        });
661        return this;
662    }
663
664    /**
665     * Sets a condition that <b>any received</b> {@link Exchange} should match the {@link Predicate}
666     *
667     * @param predicate the predicate
668     * @return the builder
669     */
670    public NotifyBuilder whenAnyReceivedMatches(final Predicate predicate) {
671        return doWhenAnyMatches(predicate, true);
672    }
673
674    /**
675     * Sets a condition that <b>any done</b> {@link Exchange} should match the {@link Predicate}
676     *
677     * @param predicate the predicate
678     * @return the builder
679     */
680    public NotifyBuilder whenAnyDoneMatches(final Predicate predicate) {
681        return doWhenAnyMatches(predicate, false);
682    }
683
684    private NotifyBuilder doWhenAnyMatches(final Predicate predicate, final boolean received) {
685        stack.add(new EventPredicateSupport() {
686            private final AtomicBoolean matches = new AtomicBoolean();
687
688            @Override
689            public boolean onExchangeCompleted(Exchange exchange) {
690                if (!received && !matches.get()) {
691                    matches.set(predicate.matches(exchange));
692                }
693                return true;
694            }
695
696            @Override
697            public boolean onExchangeFailed(Exchange exchange) {
698                if (!received && !matches.get()) {
699                    matches.set(predicate.matches(exchange));
700                }
701                return true;
702            }
703
704            @Override
705            public boolean onExchangeCreated(Exchange exchange) {
706                if (received && !matches.get()) {
707                    matches.set(predicate.matches(exchange));
708                }
709                return true;
710            }
711
712            public boolean matches() {
713                return matches.get();
714            }
715
716            @Override
717            public void reset() {
718                matches.set(false);
719            }
720
721            @Override
722            public String toString() {
723                if (received) {
724                    return "whenAnyReceivedMatches(" + predicate + ")";
725                } else {
726                    return "whenAnyDoneMatches(" + predicate + ")";
727                }
728            }
729        });
730        return this;
731    }
732
733    /**
734     * Sets a condition that <b>all received</b> {@link Exchange} should match the {@link Predicate}
735     *
736     * @param predicate the predicate
737     * @return the builder
738     */
739    public NotifyBuilder whenAllReceivedMatches(final Predicate predicate) {
740        return doWhenAllMatches(predicate, true);
741    }
742
743    /**
744     * Sets a condition that <b>all done</b> {@link Exchange} should match the {@link Predicate}
745     *
746     * @param predicate the predicate
747     * @return the builder
748     */
749    public NotifyBuilder whenAllDoneMatches(final Predicate predicate) {
750        return doWhenAllMatches(predicate, false);
751    }
752
753    private NotifyBuilder doWhenAllMatches(final Predicate predicate, final boolean received) {
754        stack.add(new EventPredicateSupport() {
755            private final AtomicBoolean matches = new AtomicBoolean(true);
756
757            @Override
758            public boolean onExchangeCompleted(Exchange exchange) {
759                if (!received && matches.get()) {
760                    matches.set(predicate.matches(exchange));
761                }
762                return true;
763            }
764
765            @Override
766            public boolean onExchangeFailed(Exchange exchange) {
767                if (!received && matches.get()) {
768                    matches.set(predicate.matches(exchange));
769                }
770                return true;
771            }
772
773            @Override
774            public boolean onExchangeCreated(Exchange exchange) {
775                if (received && matches.get()) {
776                    matches.set(predicate.matches(exchange));
777                }
778                return true;
779            }
780
781            public boolean matches() {
782                return matches.get();
783            }
784
785            @Override
786            public void reset() {
787                matches.set(true);
788            }
789
790            @Override
791            public String toString() {
792                if (received) {
793                    return "whenAllReceivedMatches(" + predicate + ")";
794                } else {
795                    return "whenAllDoneMatches(" + predicate + ")";
796                }
797            }
798        });
799        return this;
800    }
801
802    /**
803     * Sets a condition when the provided mock is satisfied based on {@link Exchange}
804     * being sent to it when they are <b>done</b>.
805     * <p/>
806     * The idea is that you can use Mock for setting fine grained expectations
807     * and then use that together with this builder. The mock provided does <b>NOT</b>
808     * have to already exist in the route. You can just create a new pseudo mock
809     * and this builder will send the done {@link Exchange} to it. So its like
810     * adding the mock to the end of your route(s).
811     *
812     * @param mock the mock
813     * @return the builder
814     */
815    public NotifyBuilder whenDoneSatisfied(final MockEndpoint mock) {
816        return doWhenSatisfied(mock, false);
817    }
818
819    /**
820     * Sets a condition when the provided mock is satisfied based on {@link Exchange}
821     * being sent to it when they are <b>received</b>.
822     * <p/>
823     * The idea is that you can use Mock for setting fine grained expectations
824     * and then use that together with this builder. The mock provided does <b>NOT</b>
825     * have to already exist in the route. You can just create a new pseudo mock
826     * and this builder will send the done {@link Exchange} to it. So its like
827     * adding the mock to the end of your route(s).
828     *
829     * @param mock the mock
830     * @return the builder
831     */
832    public NotifyBuilder whenReceivedSatisfied(final MockEndpoint mock) {
833        return doWhenSatisfied(mock, true);
834    }
835
836    private NotifyBuilder doWhenSatisfied(final MockEndpoint mock, final boolean received) {
837        stack.add(new EventPredicateSupport() {
838            private Producer producer;
839
840            @Override
841            public boolean onExchangeCreated(Exchange exchange) {
842                if (received) {
843                    sendToMock(exchange);
844                }
845                return true;
846            }
847
848            @Override
849            public boolean onExchangeFailed(Exchange exchange) {
850                if (!received) {
851                    sendToMock(exchange);
852                }
853                return true;
854            }
855
856            @Override
857            public boolean onExchangeCompleted(Exchange exchange) {
858                if (!received) {
859                    sendToMock(exchange);
860                }
861                return true;
862            }
863
864            private void sendToMock(Exchange exchange) {
865                // send the exchange when its completed to the mock
866                try {
867                    if (producer == null) {
868                        producer = mock.createProducer();
869                    }
870                    producer.process(exchange);
871                } catch (Exception e) {
872                    throw ObjectHelper.wrapRuntimeCamelException(e);
873                }
874            }
875
876            public boolean matches() {
877                try {
878                    return mock.await(0, TimeUnit.SECONDS);
879                } catch (InterruptedException e) {
880                    throw ObjectHelper.wrapRuntimeCamelException(e);
881                }
882            }
883
884            @Override
885            public void reset() {
886                mock.reset();
887            }
888
889            @Override
890            public String toString() {
891                if (received) {
892                    return "whenReceivedSatisfied(" + mock + ")";
893                } else {
894                    return "whenDoneSatisfied(" + mock + ")";
895                }
896            }
897        });
898        return this;
899    }
900
901    /**
902     * Sets a condition when the provided mock is <b>not</b> satisfied based on {@link Exchange}
903     * being sent to it when they are <b>received</b>.
904     * <p/>
905     * The idea is that you can use Mock for setting fine grained expectations
906     * and then use that together with this builder. The mock provided does <b>NOT</b>
907     * have to already exist in the route. You can just create a new pseudo mock
908     * and this builder will send the done {@link Exchange} to it. So its like
909     * adding the mock to the end of your route(s).
910     *
911     * @param mock the mock
912     * @return the builder
913     */
914    public NotifyBuilder whenReceivedNotSatisfied(final MockEndpoint mock) {
915        return doWhenNotSatisfied(mock, true);
916    }
917
918    /**
919     * Sets a condition when the provided mock is <b>not</b> satisfied based on {@link Exchange}
920     * being sent to it when they are <b>done</b>.
921     * <p/>
922     * The idea is that you can use Mock for setting fine grained expectations
923     * and then use that together with this builder. The mock provided does <b>NOT</b>
924     * have to already exist in the route. You can just create a new pseudo mock
925     * and this builder will send the done {@link Exchange} to it. So its like
926     * adding the mock to the end of your route(s).
927     *
928     * @param mock the mock
929     * @return the builder
930     */
931    public NotifyBuilder whenDoneNotSatisfied(final MockEndpoint mock) {
932        return doWhenNotSatisfied(mock, false);
933    }
934
935    private NotifyBuilder doWhenNotSatisfied(final MockEndpoint mock, final boolean received) {
936        stack.add(new EventPredicateSupport() {
937            private Producer producer;
938
939            @Override
940            public boolean onExchangeCreated(Exchange exchange) {
941                if (received) {
942                    sendToMock(exchange);
943                }
944                return true;
945            }
946
947            @Override
948            public boolean onExchangeFailed(Exchange exchange) {
949                if (!received) {
950                    sendToMock(exchange);
951                }
952                return true;
953            }
954
955            @Override
956            public boolean onExchangeCompleted(Exchange exchange) {
957                if (!received) {
958                    sendToMock(exchange);
959                }
960                return true;
961            }
962
963            private void sendToMock(Exchange exchange) {
964                // send the exchange when its completed to the mock
965                try {
966                    if (producer == null) {
967                        producer = mock.createProducer();
968                    }
969                    producer.process(exchange);
970                } catch (Exception e) {
971                    throw ObjectHelper.wrapRuntimeCamelException(e);
972                }
973            }
974
975            public boolean matches() {
976                try {
977                    return !mock.await(0, TimeUnit.SECONDS);
978                } catch (InterruptedException e) {
979                    throw ObjectHelper.wrapRuntimeCamelException(e);
980                }
981            }
982
983            @Override
984            public void reset() {
985                mock.reset();
986            }
987
988            @Override
989            public String toString() {
990                if (received) {
991                    return "whenReceivedNotSatisfied(" + mock + ")";
992                } else {
993                    return "whenDoneNotSatisfied(" + mock + ")";
994                }
995            }
996        });
997        return this;
998    }
999
1000    /**
1001     * Sets a condition that the bodies is expected to be <b>received</b> in the order as well.
1002     * <p/>
1003     * This condition will discard any additional messages. If you need a more strict condition
1004     * then use {@link #whenExactBodiesReceived(Object...)}
1005     *
1006     * @param bodies the expected bodies
1007     * @return the builder
1008     * @see #whenExactBodiesReceived(Object...)
1009     */
1010    public NotifyBuilder whenBodiesReceived(Object... bodies) {
1011        List<Object> bodyList = new ArrayList<Object>();
1012        bodyList.addAll(Arrays.asList(bodies));
1013        return doWhenBodies(bodyList, true, false);
1014    }
1015
1016    /**
1017     * Sets a condition that the bodies is expected to be <b>done</b> in the order as well.
1018     * <p/>
1019     * This condition will discard any additional messages. If you need a more strict condition
1020     * then use {@link #whenExactBodiesDone(Object...)}
1021     *
1022     * @param bodies the expected bodies
1023     * @return the builder
1024     * @see #whenExactBodiesDone(Object...)
1025     */
1026    public NotifyBuilder whenBodiesDone(Object... bodies) {
1027        List<Object> bodyList = new ArrayList<Object>();
1028        bodyList.addAll(Arrays.asList(bodies));
1029        return doWhenBodies(bodyList, false, false);
1030    }
1031
1032    /**
1033     * Sets a condition that the bodies is expected to be <b>received</b> in the order as well.
1034     * <p/>
1035     * This condition is strict which means that it only expect that exact number of bodies
1036     *
1037     * @param bodies the expected bodies
1038     * @return the builder
1039     * @see #whenBodiesReceived(Object...)
1040     */
1041    public NotifyBuilder whenExactBodiesReceived(Object... bodies) {
1042        List<Object> bodyList = new ArrayList<Object>();
1043        bodyList.addAll(Arrays.asList(bodies));
1044        return doWhenBodies(bodyList, true, true);
1045    }
1046
1047    /**
1048     * Sets a condition that the bodies is expected to be <b>done</b> in the order as well.
1049     * <p/>
1050     * This condition is strict which means that it only expect that exact number of bodies
1051     *
1052     * @param bodies the expected bodies
1053     * @return the builder
1054     * @see #whenExactBodiesDone(Object...)
1055     */
1056    public NotifyBuilder whenExactBodiesDone(Object... bodies) {
1057        List<Object> bodyList = new ArrayList<>();
1058        bodyList.addAll(Arrays.asList(bodies));
1059        return doWhenBodies(bodyList, false, true);
1060    }
1061
1062    private NotifyBuilder doWhenBodies(final List<?> bodies, final boolean received, final boolean exact) {
1063        stack.add(new EventPredicateSupport() {
1064            private volatile boolean matches;
1065            private final AtomicInteger current = new AtomicInteger();
1066
1067            @Override
1068            public boolean onExchangeCreated(Exchange exchange) {
1069                if (received) {
1070                    matchBody(exchange);
1071                }
1072                return true;
1073            }
1074
1075            @Override
1076            public boolean onExchangeFailed(Exchange exchange) {
1077                if (!received) {
1078                    matchBody(exchange);
1079                }
1080                return true;
1081            }
1082
1083            @Override
1084            public boolean onExchangeCompleted(Exchange exchange) {
1085                if (!received) {
1086                    matchBody(exchange);
1087                }
1088                return true;
1089            }
1090
1091            private void matchBody(Exchange exchange) {
1092                if (current.incrementAndGet() > bodies.size()) {
1093                    // out of bounds
1094                    return;
1095                }
1096
1097                Object actual = exchange.getIn().getBody();
1098                Object expected = bodies.get(current.get() - 1);
1099                matches = ObjectHelper.equal(expected, actual);
1100            }
1101
1102            public boolean matches() {
1103                if (exact) {
1104                    return matches && current.get() == bodies.size();
1105                } else {
1106                    return matches && current.get() >= bodies.size();
1107                }
1108            }
1109
1110            @Override
1111            public void reset() {
1112                matches = false;
1113                current.set(0);
1114            }
1115
1116            @Override
1117            public String toString() {
1118                if (received) {
1119                    return "" + (exact ? "whenExactBodiesReceived(" : "whenBodiesReceived(") + bodies + ")";
1120                } else {
1121                    return "" + (exact ? "whenExactBodiesDone(" : "whenBodiesDone(") + bodies + ")";
1122                }
1123            }
1124        });
1125        return this;
1126    }
1127
1128    /**
1129     * Prepares to append an additional expression using the <i>and</i> operator.
1130     *
1131     * @return the builder
1132     */
1133    public NotifyBuilder and() {
1134        doCreate(EventOperation.and);
1135        return this;
1136    }
1137
1138    /**
1139     * Prepares to append an additional expression using the <i>or</i> operator.
1140     *
1141     * @return the builder
1142     */
1143    public NotifyBuilder or() {
1144        doCreate(EventOperation.or);
1145        return this;
1146    }
1147
1148    /**
1149     * Prepares to append an additional expression using the <i>not</i> operator.
1150     *
1151     * @return the builder
1152     */
1153    public NotifyBuilder not() {
1154        doCreate(EventOperation.not);
1155        return this;
1156    }
1157
1158    /**
1159     * Creates the expression this builder should use for matching.
1160     * <p/>
1161     * You must call this method when you are finished building the expressions.
1162     *
1163     * @return the created builder ready for matching
1164     */
1165    public NotifyBuilder create() {
1166        doCreate(EventOperation.and);
1167        if (eventNotifier.isStopped()) {
1168            throw new IllegalStateException("A destroyed NotifyBuilder cannot be re-created.");
1169        }
1170        created = true;
1171        return this;
1172    }
1173
1174    /**
1175     * De-registers this builder from its {@link CamelContext}.
1176     * <p/>
1177     * Once destroyed, this instance will not function again.
1178     */
1179    public void destroy() {
1180        context.getManagementStrategy().removeEventNotifier(eventNotifier);
1181        try {
1182            ServiceHelper.stopService(eventNotifier);
1183        } catch (Exception e) {
1184            throw ObjectHelper.wrapRuntimeCamelException(e);
1185        }
1186        created = false;
1187    }
1188
1189    /**
1190     * Does all the expression match?
1191     * <p/>
1192     * This operation will return immediately which means it can be used for testing at this very moment.
1193     *
1194     * @return <tt>true</tt> if matching, <tt>false</tt> otherwise
1195     */
1196    public boolean matches() {
1197        if (!created) {
1198            throw new IllegalStateException("NotifyBuilder has not been created. Invoke the create() method before matching.");
1199        }
1200        return matches;
1201    }
1202
1203    /**
1204     * Does all the expression match?
1205     * <p/>
1206     * This operation will wait until the match is <tt>true</tt> or otherwise a timeout occur
1207     * which means <tt>false</tt> will be returned.
1208     *
1209     * @param timeout  the timeout value
1210     * @param timeUnit the time unit
1211     * @return <tt>true</tt> if matching, <tt>false</tt> otherwise due to timeout
1212     */
1213    public boolean matches(long timeout, TimeUnit timeUnit) {
1214        if (!created) {
1215            throw new IllegalStateException("NotifyBuilder has not been created. Invoke the create() method before matching.");
1216        }
1217        try {
1218            latch.await(timeout, timeUnit);
1219        } catch (InterruptedException e) {
1220            throw ObjectHelper.wrapRuntimeCamelException(e);
1221        }
1222        return matches();
1223    }
1224
1225    /**
1226     * Does all the expressions match?
1227     * <p/>
1228     * This operation will wait until the match is <tt>true</tt> or otherwise a timeout occur
1229     * which means <tt>false</tt> will be returned.
1230     * <p/>
1231     * The timeout value is by default 10 seconds. But it will use the highest <i>maximum result wait time</i>
1232     * from the configured mocks, if such a value has been configured.
1233     * <p/>
1234     * This method is convenient to use in unit tests to have it adhere and wait
1235     * as long as the mock endpoints.
1236     *
1237     * @return <tt>true</tt> if matching, <tt>false</tt> otherwise due to timeout
1238     */
1239    public boolean matchesMockWaitTime() {
1240        if (!created) {
1241            throw new IllegalStateException("NotifyBuilder has not been created. Invoke the create() method before matching.");
1242        }
1243        long timeout = 0;
1244        for (Endpoint endpoint : context.getEndpoints()) {
1245            if (endpoint instanceof MockEndpoint) {
1246                long waitTime = ((MockEndpoint) endpoint).getResultWaitTime();
1247                if (waitTime > 0) {
1248                    timeout = Math.max(timeout, waitTime);
1249                }
1250            }
1251        }
1252
1253        // use 10 sec as default
1254        if (timeout == 0) {
1255            timeout = 10000;
1256        }
1257
1258        return matches(timeout, TimeUnit.MILLISECONDS);
1259    }
1260
1261    /**
1262     * Resets the notifier.
1263     */
1264    public void reset() {
1265        for (EventPredicateHolder predicate : predicates) {
1266            predicate.reset();
1267        }
1268        latch = new CountDownLatch(1);
1269        matches = false;
1270    }
1271
1272    @Override
1273    public String toString() {
1274        StringBuilder sb = new StringBuilder();
1275        for (EventPredicateHolder eventPredicateHolder : predicates) {
1276            if (sb.length() > 0) {
1277                sb.append(".");
1278            }
1279            sb.append(eventPredicateHolder.toString());
1280        }
1281        // a crude way of skipping the first invisible operation
1282        return StringHelper.after(sb.toString(), "().");
1283    }
1284
1285    private void doCreate(EventOperation newOperation) {
1286        // init operation depending on the newOperation
1287        if (operation == null) {
1288            // if the first new operation is an or then this operation must be an or as well
1289            // otherwise it should be and based
1290            operation = newOperation == EventOperation.or ? EventOperation.or : EventOperation.and;
1291        }
1292
1293        // we have some predicates
1294        if (!stack.isEmpty()) {
1295            // we only want to match from routes, so skip for example events
1296            // which is triggered by producer templates etc.
1297            fromRoutesOnly();
1298
1299            // the stack must have at least one non abstract
1300            boolean found = false;
1301            for (EventPredicate predicate : stack) {
1302                if (!predicate.isAbstract()) {
1303                    found = true;
1304                    break;
1305                }
1306            }
1307            if (!found) {
1308                throw new IllegalArgumentException("NotifyBuilder must contain at least one non-abstract predicate (such as whenDone)");
1309            }
1310
1311            CompoundEventPredicate compound = new CompoundEventPredicate(stack);
1312            stack.clear();
1313            predicates.add(new EventPredicateHolder(operation, compound));
1314        }
1315
1316        operation = newOperation;
1317        // reset wereSentTo index position as this its a new group
1318        wereSentToIndex = 0;
1319    }
1320
1321    /**
1322     * Notifier which hooks into Camel to listen for {@link Exchange} relevant events for this builder
1323     */
1324    private final class ExchangeNotifier extends EventNotifierSupport {
1325
1326        public void notify(EventObject event) throws Exception {
1327            if (event instanceof ExchangeCreatedEvent) {
1328                onExchangeCreated((ExchangeCreatedEvent) event);
1329            } else if (event instanceof ExchangeCompletedEvent) {
1330                onExchangeCompleted((ExchangeCompletedEvent) event);
1331            } else if (event instanceof ExchangeFailedEvent) {
1332                onExchangeFailed((ExchangeFailedEvent) event);
1333            } else if (event instanceof ExchangeSentEvent) {
1334                onExchangeSent((ExchangeSentEvent) event);
1335            }
1336
1337            // now compute whether we matched
1338            computeMatches();
1339        }
1340
1341        public boolean isEnabled(EventObject event) {
1342            return true;
1343        }
1344
1345        private void onExchangeCreated(ExchangeCreatedEvent event) {
1346            for (EventPredicateHolder predicate : predicates) {
1347                predicate.getPredicate().onExchangeCreated(event.getExchange());
1348            }
1349        }
1350
1351        private void onExchangeCompleted(ExchangeCompletedEvent event) {
1352            for (EventPredicateHolder predicate : predicates) {
1353                predicate.getPredicate().onExchangeCompleted(event.getExchange());
1354            }
1355        }
1356
1357        private void onExchangeFailed(ExchangeFailedEvent event) {
1358            for (EventPredicateHolder predicate : predicates) {
1359                predicate.getPredicate().onExchangeFailed(event.getExchange());
1360            }
1361        }
1362
1363        private void onExchangeSent(ExchangeSentEvent event) {
1364            for (EventPredicateHolder predicate : predicates) {
1365                predicate.getPredicate().onExchangeSent(event.getExchange(), event.getEndpoint(), event.getTimeTaken());
1366            }
1367        }
1368
1369        private synchronized void computeMatches() {
1370            // use a temporary answer until we have computed the value to assign
1371            Boolean answer = null;
1372
1373            for (EventPredicateHolder holder : predicates) {
1374                EventOperation operation = holder.getOperation();
1375                if (EventOperation.and == operation) {
1376                    if (holder.getPredicate().matches()) {
1377                        answer = true;
1378                    } else {
1379                        answer = false;
1380                        // and break out since its an AND so it must match
1381                        break;
1382                    }
1383                } else if (EventOperation.or == operation) {
1384                    if (holder.getPredicate().matches()) {
1385                        answer = true;
1386                    }
1387                } else if (EventOperation.not == operation) {
1388                    if (holder.getPredicate().matches()) {
1389                        answer = false;
1390                        // and break out since its a NOT so it must not match
1391                        break;
1392                    } else {
1393                        answer = true;
1394                    }
1395                }
1396            }
1397
1398            // if we did compute a value then assign that
1399            if (answer != null) {
1400                matches = answer;
1401                if (matches) {
1402                    // signal completion
1403                    latch.countDown();
1404                }
1405            }
1406        }
1407
1408        @Override
1409        protected void doStart() throws Exception {
1410            // we only care about Exchange events
1411            setIgnoreCamelContextEvents(true);
1412            setIgnoreRouteEvents(true);
1413            setIgnoreServiceEvents(true);
1414        }
1415
1416        @Override
1417        protected void doStop() throws Exception {
1418        }
1419    }
1420
1421    private enum EventOperation {
1422        and, or, not
1423    }
1424
1425    private interface EventPredicate {
1426
1427        /**
1428         * Evaluates whether the predicate matched or not.
1429         *
1430         * @return <tt>true</tt> if matched, <tt>false</tt> otherwise
1431         */
1432        boolean matches();
1433
1434        /**
1435         * Resets the predicate
1436         */
1437        void reset();
1438
1439        /**
1440         * Whether the predicate is abstract
1441         */
1442        boolean isAbstract();
1443
1444        /**
1445         * Callback for {@link Exchange} lifecycle
1446         *
1447         * @param exchange the exchange
1448         * @return <tt>true</tt> to allow continue evaluating, <tt>false</tt> to stop immediately
1449         */
1450        boolean onExchangeCreated(Exchange exchange);
1451
1452        /**
1453         * Callback for {@link Exchange} lifecycle
1454         *
1455         * @param exchange the exchange
1456         * @return <tt>true</tt> to allow continue evaluating, <tt>false</tt> to stop immediately
1457         */
1458        boolean onExchangeCompleted(Exchange exchange);
1459
1460        /**
1461         * Callback for {@link Exchange} lifecycle
1462         *
1463         * @param exchange the exchange
1464         * @return <tt>true</tt> to allow continue evaluating, <tt>false</tt> to stop immediately
1465         */
1466        boolean onExchangeFailed(Exchange exchange);
1467
1468        /**
1469         * Callback for {@link Exchange} lifecycle
1470         *
1471         * @param exchange the exchange
1472         * @param endpoint the endpoint sent to
1473         * @param timeTaken time taken in millis to send the to endpoint
1474         * @return <tt>true</tt> to allow continue evaluating, <tt>false</tt> to stop immediately
1475         */
1476        boolean onExchangeSent(Exchange exchange, Endpoint endpoint, long timeTaken);
1477    }
1478
1479    private abstract class EventPredicateSupport implements EventPredicate {
1480
1481        public boolean isAbstract() {
1482            return false;
1483        }
1484
1485        public void reset() {
1486            // noop
1487        }
1488
1489        public boolean onExchangeCreated(Exchange exchange) {
1490            return onExchange(exchange);
1491        }
1492
1493        public boolean onExchangeCompleted(Exchange exchange) {
1494            return onExchange(exchange);
1495        }
1496
1497        public boolean onExchangeFailed(Exchange exchange) {
1498            return onExchange(exchange);
1499        }
1500
1501        public boolean onExchangeSent(Exchange exchange, Endpoint endpoint, long timeTaken) {
1502            // no need to invoke onExchange as this is a special case when the Exchange
1503            // was sent to a specific endpoint
1504            return true;
1505        }
1506
1507        public boolean onExchange(Exchange exchange) {
1508            return true;
1509        }
1510    }
1511
1512    /**
1513     * To hold an operation and predicate
1514     */
1515    private final class EventPredicateHolder {
1516        private final EventOperation operation;
1517        private final EventPredicate predicate;
1518
1519        private EventPredicateHolder(EventOperation operation, EventPredicate predicate) {
1520            this.operation = operation;
1521            this.predicate = predicate;
1522        }
1523
1524        public EventOperation getOperation() {
1525            return operation;
1526        }
1527
1528        public EventPredicate getPredicate() {
1529            return predicate;
1530        }
1531
1532        public void reset() {
1533            predicate.reset();
1534        }
1535
1536        @Override
1537        public String toString() {
1538            return operation.name() + "()." + predicate;
1539        }
1540    }
1541
1542    /**
1543     * To hold multiple predicates which are part of same expression
1544     */
1545    private final class CompoundEventPredicate implements EventPredicate {
1546
1547        private List<EventPredicate> predicates = new ArrayList<>();
1548
1549        private CompoundEventPredicate(List<EventPredicate> predicates) {
1550            this.predicates.addAll(predicates);
1551        }
1552
1553        public boolean isAbstract() {
1554            return false;
1555        }
1556
1557        public boolean matches() {
1558            for (EventPredicate predicate : predicates) {
1559                boolean answer = predicate.matches();
1560                LOG.trace("matches() {} -> {}", predicate, answer);
1561                if (!answer) {
1562                    // break at first false
1563                    return false;
1564                }
1565            }
1566            return true;
1567        }
1568
1569        public void reset() {
1570            for (EventPredicate predicate : predicates) {
1571                LOG.trace("reset() {}", predicate);
1572                predicate.reset();
1573            }
1574        }
1575
1576        public boolean onExchangeCreated(Exchange exchange) {
1577            for (EventPredicate predicate : predicates) {
1578                boolean answer = predicate.onExchangeCreated(exchange);
1579                LOG.trace("onExchangeCreated() {} -> {}", predicate, answer);
1580                if (!answer) {
1581                    // break at first false
1582                    return false;
1583                }
1584            }
1585            return true;
1586        }
1587
1588        public boolean onExchangeCompleted(Exchange exchange) {
1589            for (EventPredicate predicate : predicates) {
1590                boolean answer = predicate.onExchangeCompleted(exchange);
1591                LOG.trace("onExchangeCompleted() {} -> {}", predicate, answer);
1592                if (!answer) {
1593                    // break at first false
1594                    return false;
1595                }
1596            }
1597            return true;
1598        }
1599
1600        public boolean onExchangeFailed(Exchange exchange) {
1601            for (EventPredicate predicate : predicates) {
1602                boolean answer = predicate.onExchangeFailed(exchange);
1603                LOG.trace("onExchangeFailed() {} -> {}", predicate, answer);
1604                if (!answer) {
1605                    // break at first false
1606                    return false;
1607                }
1608            }
1609            return true;
1610        }
1611
1612        @Override
1613        public boolean onExchangeSent(Exchange exchange, Endpoint endpoint, long timeTaken) {
1614            for (EventPredicate predicate : predicates) {
1615                boolean answer = predicate.onExchangeSent(exchange, endpoint, timeTaken);
1616                LOG.trace("onExchangeSent() {} {} -> {}", endpoint, predicate, answer);
1617                if (!answer) {
1618                    // break at first false
1619                    return false;
1620                }
1621            }
1622            return true;
1623        }
1624
1625        @Override
1626        public String toString() {
1627            StringBuilder sb = new StringBuilder();
1628            for (EventPredicate eventPredicate : predicates) {
1629                if (sb.length() > 0) {
1630                    sb.append(".");
1631                }
1632                sb.append(eventPredicate.toString());
1633            }
1634            return sb.toString();
1635        }
1636    }
1637
1638}