001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.HashMap;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.concurrent.atomic.AtomicInteger;
025    
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.Endpoint;
028    import org.apache.camel.NoSuchEndpointException;
029    import org.apache.camel.Processor;
030    import org.apache.camel.Route;
031    import org.apache.camel.ShutdownRoute;
032    import org.apache.camel.ShutdownRunningTask;
033    import org.apache.camel.management.InstrumentationProcessor;
034    import org.apache.camel.model.FromDefinition;
035    import org.apache.camel.model.ProcessorDefinition;
036    import org.apache.camel.model.RouteDefinition;
037    import org.apache.camel.processor.Pipeline;
038    import org.apache.camel.processor.RouteInflightRepositoryProcessor;
039    import org.apache.camel.processor.RoutePolicyProcessor;
040    import org.apache.camel.processor.UnitOfWorkProcessor;
041    import org.apache.camel.spi.InterceptStrategy;
042    import org.apache.camel.spi.RouteContext;
043    import org.apache.camel.spi.RoutePolicy;
044    import org.apache.camel.util.CamelContextHelper;
045    import org.apache.camel.util.ObjectHelper;
046    
047    /**
048     * The context used to activate new routing rules
049     *
050     * @version 
051     */
052    public class DefaultRouteContext implements RouteContext {
053        private final Map<ProcessorDefinition<?>, AtomicInteger> nodeIndex = new HashMap<ProcessorDefinition<?>, AtomicInteger>();
054        private final RouteDefinition route;
055        private FromDefinition from;
056        private final Collection<Route> routes;
057        private Endpoint endpoint;
058        private final List<Processor> eventDrivenProcessors = new ArrayList<Processor>();
059        private CamelContext camelContext;
060        private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>();
061        private InterceptStrategy managedInterceptStrategy;
062        private boolean routeAdded;
063        private Boolean trace;
064        private Boolean streamCache;
065        private Boolean handleFault;
066        private Long delay;
067        private Boolean autoStartup = Boolean.TRUE;
068        private List<RoutePolicy> routePolicyList = new ArrayList<RoutePolicy>();
069        private ShutdownRoute shutdownRoute;
070        private ShutdownRunningTask shutdownRunningTask;
071    
072        public DefaultRouteContext(CamelContext camelContext, RouteDefinition route, FromDefinition from, Collection<Route> routes) {
073            this.camelContext = camelContext;
074            this.route = route;
075            this.from = from;
076            this.routes = routes;
077        }
078    
079        /**
080         * Only used for lazy construction from inside ExpressionType
081         */
082        public DefaultRouteContext(CamelContext camelContext) {
083            this.camelContext = camelContext;
084            this.routes = new ArrayList<Route>();
085            this.route = new RouteDefinition("temporary");
086        }
087    
088        public Endpoint getEndpoint() {
089            if (endpoint == null) {
090                endpoint = from.resolveEndpoint(this);
091            }
092            return endpoint;
093        }
094    
095        public FromDefinition getFrom() {
096            return from;
097        }
098    
099        public RouteDefinition getRoute() {
100            return route;
101        }
102    
103        public CamelContext getCamelContext() {
104            return camelContext;
105        }
106    
107        public Endpoint resolveEndpoint(String uri) {
108            return route.resolveEndpoint(getCamelContext(), uri);
109        }
110    
111        public Endpoint resolveEndpoint(String uri, String ref) {
112            Endpoint endpoint = null;
113            if (uri != null) {
114                endpoint = resolveEndpoint(uri);
115                if (endpoint == null) {
116                    throw new NoSuchEndpointException(uri);
117                }
118            }
119            if (ref != null) {
120                endpoint = lookup(ref, Endpoint.class);
121                if (endpoint == null) {
122                    throw new NoSuchEndpointException("ref:" + ref, "check your camel registry with id " + ref);
123                }
124                // Check the endpoint has the right CamelContext 
125                if (!this.getCamelContext().equals(endpoint.getCamelContext())) {
126                    throw new NoSuchEndpointException("ref:" + ref, "make sure the endpoint has the same camel context as the route does.");
127                }
128            }
129            if (endpoint == null) {
130                throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this);
131            } else {
132                return endpoint;
133            }
134        }
135    
136        public <T> T lookup(String name, Class<T> type) {
137            return getCamelContext().getRegistry().lookup(name, type);
138        }
139    
140        public <T> Map<String, T> lookupByType(Class<T> type) {
141            return getCamelContext().getRegistry().lookupByType(type);
142        }
143    
144        @Override
145        public <T> T mandatoryLookup(String name, Class<T> type) {
146            return CamelContextHelper.mandatoryLookup(getCamelContext(), name, type);
147        }
148    
149        public void commit() {
150            // now lets turn all of the event driven consumer processors into a single route
151            if (!eventDrivenProcessors.isEmpty()) {
152                Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors);
153    
154                // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
155                UnitOfWorkProcessor unitOfWorkProcessor = new UnitOfWorkProcessor(this, target);
156    
157                // and then optionally add route policy processor if a custom policy is set
158                RoutePolicyProcessor routePolicyProcessor = null;
159                List<RoutePolicy> routePolicyList = getRoutePolicyList();
160                if (routePolicyList != null && !routePolicyList.isEmpty()) {
161                    for (RoutePolicy policy : routePolicyList) {
162                        // add policy as service if we have not already done that (eg possible if two routes have the same service)
163                        // this ensures Camel can control the lifecycle of the policy
164                        if (!camelContext.hasService(policy)) {
165                            try {
166                                camelContext.addService(policy);
167                            } catch (Exception e) {
168                                throw ObjectHelper.wrapRuntimeCamelException(e);
169                            }
170                        }
171                    }
172                    routePolicyProcessor = new RoutePolicyProcessor(unitOfWorkProcessor, routePolicyList);
173                    target = routePolicyProcessor;
174                } else {
175                    target = unitOfWorkProcessor;
176                }
177    
178                // wrap in route inflight processor to track number of inflight exchanges for the route
179                RouteInflightRepositoryProcessor inflight = new RouteInflightRepositoryProcessor(camelContext.getInflightRepository(), target);
180    
181                // and wrap it by a instrumentation processor that is to be used for performance stats
182                // for this particular route
183                InstrumentationProcessor instrument = new InstrumentationProcessor();
184                instrument.setType("route");
185                instrument.setProcessor(inflight);
186    
187                // and create the route that wraps the UoW
188                Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), instrument);
189                // create the route id
190                String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory());
191                edcr.getProperties().put(Route.ID_PROPERTY, routeId);
192                edcr.getProperties().put(Route.PARENT_PROPERTY, Integer.toHexString(route.hashCode()));
193                if (route.getGroup() != null) {
194                    edcr.getProperties().put(Route.GROUP_PROPERTY, route.getGroup());
195                }
196    
197                // after the route is created then set the route on the policy processor so we get hold of it
198                if (routePolicyProcessor != null) {
199                    routePolicyProcessor.setRoute(edcr);
200                }
201                // after the route is created then set the route on the inflight processor so we get hold of it
202                inflight.setRoute(edcr);
203    
204                // invoke init on route policy
205                if (routePolicyList != null && !routePolicyList.isEmpty()) {
206                    for (RoutePolicy policy : routePolicyList) {
207                        policy.onInit(edcr);
208                    }
209                }
210    
211                routes.add(edcr);
212            }
213        }
214    
215        public void addEventDrivenProcessor(Processor processor) {
216            eventDrivenProcessors.add(processor);
217        }
218    
219        public List<InterceptStrategy> getInterceptStrategies() {
220            return interceptStrategies;
221        }
222    
223        public void setInterceptStrategies(List<InterceptStrategy> interceptStrategies) {
224            this.interceptStrategies = interceptStrategies;
225        }
226    
227        public void addInterceptStrategy(InterceptStrategy interceptStrategy) {
228            getInterceptStrategies().add(interceptStrategy);
229        }
230    
231        public void setManagedInterceptStrategy(InterceptStrategy interceptStrategy) {
232            this.managedInterceptStrategy = interceptStrategy;
233        }
234    
235        public InterceptStrategy getManagedInterceptStrategy() {
236            return managedInterceptStrategy;
237        }
238    
239        public boolean isRouteAdded() {
240            return routeAdded;
241        }
242    
243        public void setIsRouteAdded(boolean routeAdded) {
244            this.routeAdded = routeAdded;
245        }
246    
247        public void setTracing(Boolean tracing) {
248            this.trace = tracing;
249        }
250    
251        public Boolean isTracing() {
252            if (trace != null) {
253                return trace;
254            } else {
255                // fallback to the option from camel context
256                return getCamelContext().isTracing();
257            }
258        }
259    
260        public void setStreamCaching(Boolean cache) {
261            this.streamCache = cache;
262        }
263    
264        public Boolean isStreamCaching() {
265            if (streamCache != null) {
266                return streamCache;
267            } else {
268                // fallback to the option from camel context
269                return getCamelContext().isStreamCaching();
270            }
271        }
272    
273        public void setHandleFault(Boolean handleFault) {
274            this.handleFault = handleFault;
275        }
276    
277        public Boolean isHandleFault() {
278            if (handleFault != null) {
279                return handleFault;
280            } else {
281                // fallback to the option from camel context
282                return getCamelContext().isHandleFault();
283            }
284        }
285    
286        public void setDelayer(Long delay) {
287            this.delay = delay;
288        }
289    
290        public Long getDelayer() {
291            if (delay != null) {
292                return delay;
293            } else {
294                // fallback to the option from camel context
295                return getCamelContext().getDelayer();
296            }
297        }
298    
299        public void setAutoStartup(Boolean autoStartup) {
300            this.autoStartup = autoStartup;
301        }
302    
303        public Boolean isAutoStartup() {
304            if (autoStartup != null) {
305                return autoStartup;
306            }
307            // default to true
308            return true;
309        }
310    
311        public void setShutdownRoute(ShutdownRoute shutdownRoute) {
312            this.shutdownRoute = shutdownRoute;
313        }
314    
315        public ShutdownRoute getShutdownRoute() {
316            if (shutdownRoute != null) {
317                return shutdownRoute;
318            } else {
319                // fallback to the option from camel context
320                return getCamelContext().getShutdownRoute();
321            }
322        }
323    
324        public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) {
325            this.shutdownRunningTask = shutdownRunningTask;
326        }
327    
328        public ShutdownRunningTask getShutdownRunningTask() {
329            if (shutdownRunningTask != null) {
330                return shutdownRunningTask;
331            } else {
332                // fallback to the option from camel context
333                return getCamelContext().getShutdownRunningTask();
334            }
335        }
336        
337        public int getAndIncrement(ProcessorDefinition<?> node) {
338            AtomicInteger count = nodeIndex.get(node);
339            if (count == null) {
340                count = new AtomicInteger();
341                nodeIndex.put(node, count);
342            }
343            return count.getAndIncrement();
344        }
345    
346        public void setRoutePolicyList(List<RoutePolicy> routePolicyList) {
347            this.routePolicyList = routePolicyList;
348        }
349    
350        public List<RoutePolicy> getRoutePolicyList() {
351            return routePolicyList;
352        }
353    }