001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.List;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.CamelContext;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Processor;
025    import org.apache.camel.Route;
026    import org.apache.camel.StatefulService;
027    import org.apache.camel.spi.RoutePolicy;
028    import org.apache.camel.support.SynchronizationAdapter;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    /**
033     * {@link Processor} which instruments the {@link RoutePolicy}.
034     *
035     * @version 
036     */
037    public class RoutePolicyProcessor extends DelegateAsyncProcessor {
038    
039        private static final Logger LOG = LoggerFactory.getLogger(RoutePolicyProcessor.class);
040        private final List<RoutePolicy> routePolicies;
041        private Route route;
042    
043        public RoutePolicyProcessor(Processor processor, List<RoutePolicy> routePolicies) {
044            super(processor);
045            this.routePolicies = routePolicies;
046        }
047    
048        @Override
049        public String toString() {
050            return "RoutePolicy[" + routePolicies + "]";
051        }
052    
053        @Override
054        public boolean process(Exchange exchange, AsyncCallback callback) {
055    
056            // invoke begin
057            for (RoutePolicy policy : routePolicies) {
058                try {
059                    if (isRoutePolicyRunAllowed(policy)) {
060                        policy.onExchangeBegin(route, exchange);
061                    }
062                } catch (Exception e) {
063                    LOG.warn("Error occurred during onExchangeBegin on RoutePolicy: " + policy
064                            + ". This exception will be ignored", e);
065                }
066            }
067    
068            // add on completion that invokes the policy callback on complete
069            // as the Exchange can be routed async and thus we need the callback to
070            // invoke when the route is completed
071            exchange.addOnCompletion(new SynchronizationAdapter() {
072                @Override
073                public void onDone(Exchange exchange) {
074                    // do not invoke it if Camel is stopping as we don't want
075                    // the policy to start a consumer during Camel is stopping
076                    if (isCamelStopping(exchange.getContext())) {
077                        return;
078                    }
079    
080                    for (RoutePolicy policy : routePolicies) {
081                        try {
082                            if (isRoutePolicyRunAllowed(policy)) {
083                                policy.onExchangeDone(route, exchange);
084                            }
085                        } catch (Exception e) {
086                            LOG.warn("Error occurred during onExchangeDone on RoutePolicy: " + policy
087                                    + ". This exception will be ignored", e);
088                        }
089                    }
090                }
091    
092                @Override
093                public String toString() {
094                    return "RoutePolicyOnCompletion";
095                }
096            });
097    
098            return super.process(exchange, callback);
099        }
100    
101        /**
102         * Sets the route this policy applies.
103         *
104         * @param route the route
105         */
106        public void setRoute(Route route) {
107            this.route = route;
108        }
109    
110        /**
111         * Strategy to determine if this policy is allowed to run
112         *
113         * @param policy the policy
114         * @return <tt>true</tt> to run
115         */
116        protected boolean isRoutePolicyRunAllowed(RoutePolicy policy) {
117            if (policy instanceof StatefulService) {
118                StatefulService ss = (StatefulService) policy;
119                return ss.isRunAllowed();
120            }
121            return true;
122        }
123    
124        private static boolean isCamelStopping(CamelContext context) {
125            if (context instanceof StatefulService) {
126                StatefulService ss = (StatefulService) context;
127                return ss.isStopping() || ss.isStopped();
128            }
129            return false;
130        }
131    
132    }