001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import org.apache.camel.CamelContext;
027import org.apache.camel.Endpoint;
028import org.apache.camel.NoSuchEndpointException;
029import org.apache.camel.Processor;
030import org.apache.camel.Route;
031import org.apache.camel.RuntimeCamelException;
032import org.apache.camel.ShutdownRoute;
033import org.apache.camel.ShutdownRunningTask;
034import org.apache.camel.model.FromDefinition;
035import org.apache.camel.model.ProcessorDefinition;
036import org.apache.camel.model.RouteDefinition;
037import org.apache.camel.processor.CamelInternalProcessor;
038import org.apache.camel.processor.Pipeline;
039import org.apache.camel.spi.InterceptStrategy;
040import org.apache.camel.spi.RouteContext;
041import org.apache.camel.spi.RoutePolicy;
042import org.apache.camel.util.CamelContextHelper;
043import org.apache.camel.util.ObjectHelper;
044
045/**
046 * The context used to activate new routing rules
047 *
048 * @version 
049 */
050public class DefaultRouteContext implements RouteContext {
051    private final Map<ProcessorDefinition<?>, AtomicInteger> nodeIndex = new HashMap<ProcessorDefinition<?>, AtomicInteger>();
052    private final RouteDefinition route;
053    private FromDefinition from;
054    private final Collection<Route> routes;
055    private Endpoint endpoint;
056    private final List<Processor> eventDrivenProcessors = new ArrayList<Processor>();
057    private CamelContext camelContext;
058    private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>();
059    private InterceptStrategy managedInterceptStrategy;
060    private boolean routeAdded;
061    private Boolean trace;
062    private Boolean messageHistory;
063    private Boolean logExhaustedMessageBody;
064    private Boolean streamCache;
065    private Boolean handleFault;
066    private Long delay;
067    private Boolean autoStartup = Boolean.TRUE;
068    private List<RoutePolicy> routePolicyList = new ArrayList<RoutePolicy>();
069    private ShutdownRoute shutdownRoute;
070    private ShutdownRunningTask shutdownRunningTask;
071
072    public DefaultRouteContext(CamelContext camelContext, RouteDefinition route, FromDefinition from, Collection<Route> routes) {
073        this.camelContext = camelContext;
074        this.route = route;
075        this.from = from;
076        this.routes = routes;
077    }
078
079    /**
080     * Only used for lazy construction from inside ExpressionType
081     */
082    public DefaultRouteContext(CamelContext camelContext) {
083        this.camelContext = camelContext;
084        this.routes = new ArrayList<Route>();
085        this.route = new RouteDefinition("temporary");
086    }
087
088    public Endpoint getEndpoint() {
089        if (endpoint == null) {
090            endpoint = from.resolveEndpoint(this);
091        }
092        return endpoint;
093    }
094
095    public FromDefinition getFrom() {
096        return from;
097    }
098
099    public RouteDefinition getRoute() {
100        return route;
101    }
102
103    public CamelContext getCamelContext() {
104        return camelContext;
105    }
106
107    public Endpoint resolveEndpoint(String uri) {
108        return route.resolveEndpoint(getCamelContext(), uri);
109    }
110
111    public Endpoint resolveEndpoint(String uri, String ref) {
112        Endpoint endpoint = null;
113        if (uri != null) {
114            endpoint = resolveEndpoint(uri);
115            if (endpoint == null) {
116                throw new NoSuchEndpointException(uri);
117            }
118        }
119        if (ref != null) {
120            endpoint = lookup(ref, Endpoint.class);
121            if (endpoint == null) {
122                throw new NoSuchEndpointException("ref:" + ref, "check your camel registry with id " + ref);
123            }
124            // Check the endpoint has the right CamelContext 
125            if (!this.getCamelContext().equals(endpoint.getCamelContext())) {
126                throw new NoSuchEndpointException("ref:" + ref, "make sure the endpoint has the same camel context as the route does.");
127            }
128            try {
129                // need add the endpoint into service
130                getCamelContext().addService(endpoint);
131            } catch (Exception ex) {
132                throw new RuntimeCamelException(ex);
133            }
134        }
135        if (endpoint == null) {
136            throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this);
137        } else {
138            return endpoint;
139        }
140    }
141
142    public <T> T lookup(String name, Class<T> type) {
143        return getCamelContext().getRegistry().lookupByNameAndType(name, type);
144    }
145
146    public <T> Map<String, T> lookupByType(Class<T> type) {
147        return getCamelContext().getRegistry().findByTypeWithName(type);
148    }
149
150    @Override
151    public <T> T mandatoryLookup(String name, Class<T> type) {
152        return CamelContextHelper.mandatoryLookup(getCamelContext(), name, type);
153    }
154
155    public void commit() {
156        // now lets turn all of the event driven consumer processors into a single route
157        if (!eventDrivenProcessors.isEmpty()) {
158            Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors);
159
160            // force creating the route id so its known ahead of the route is started
161            String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory());
162
163            // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
164            CamelInternalProcessor internal = new CamelInternalProcessor(target);
165            internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(this));
166
167            // and then optionally add route policy processor if a custom policy is set
168            List<RoutePolicy> routePolicyList = getRoutePolicyList();
169            if (routePolicyList != null && !routePolicyList.isEmpty()) {
170                for (RoutePolicy policy : routePolicyList) {
171                    // add policy as service if we have not already done that (eg possible if two routes have the same service)
172                    // this ensures Camel can control the lifecycle of the policy
173                    if (!camelContext.hasService(policy)) {
174                        try {
175                            camelContext.addService(policy);
176                        } catch (Exception e) {
177                            throw ObjectHelper.wrapRuntimeCamelException(e);
178                        }
179                    }
180                }
181
182                internal.addAdvice(new CamelInternalProcessor.RoutePolicyAdvice(routePolicyList));
183            }
184
185            // wrap in route inflight processor to track number of inflight exchanges for the route
186            internal.addAdvice(new CamelInternalProcessor.RouteInflightRepositoryAdvice(camelContext.getInflightRepository(), routeId));
187
188            // wrap in JMX instrumentation processor that is used for performance stats
189            internal.addAdvice(new CamelInternalProcessor.InstrumentationAdvice("route"));
190
191            // wrap in route lifecycle
192            internal.addAdvice(new CamelInternalProcessor.RouteLifecycleAdvice());
193
194            // and create the route that wraps the UoW
195            Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), internal);
196            edcr.getProperties().put(Route.ID_PROPERTY, routeId);
197            edcr.getProperties().put(Route.PARENT_PROPERTY, Integer.toHexString(route.hashCode()));
198            edcr.getProperties().put(Route.DESCRIPTION_PROPERTY, route.getDescriptionText());
199            if (route.getGroup() != null) {
200                edcr.getProperties().put(Route.GROUP_PROPERTY, route.getGroup());
201            }
202            String rest = "false";
203            if (route.isRest() != null && route.isRest()) {
204                rest = "true";
205            }
206            edcr.getProperties().put(Route.REST_PROPERTY, rest);
207
208            // after the route is created then set the route on the policy processor so we get hold of it
209            CamelInternalProcessor.RoutePolicyAdvice task = internal.getAdvice(CamelInternalProcessor.RoutePolicyAdvice.class);
210            if (task != null) {
211                task.setRoute(edcr);
212            }
213            CamelInternalProcessor.RouteLifecycleAdvice task2 = internal.getAdvice(CamelInternalProcessor.RouteLifecycleAdvice.class);
214            if (task2 != null) {
215                task2.setRoute(edcr);
216            }
217
218            // invoke init on route policy
219            if (routePolicyList != null && !routePolicyList.isEmpty()) {
220                for (RoutePolicy policy : routePolicyList) {
221                    policy.onInit(edcr);
222                }
223            }
224
225            routes.add(edcr);
226        }
227    }
228
229    public void addEventDrivenProcessor(Processor processor) {
230        eventDrivenProcessors.add(processor);
231    }
232
233    public List<InterceptStrategy> getInterceptStrategies() {
234        return interceptStrategies;
235    }
236
237    public void setInterceptStrategies(List<InterceptStrategy> interceptStrategies) {
238        this.interceptStrategies = interceptStrategies;
239    }
240
241    public void addInterceptStrategy(InterceptStrategy interceptStrategy) {
242        getInterceptStrategies().add(interceptStrategy);
243    }
244
245    public void setManagedInterceptStrategy(InterceptStrategy interceptStrategy) {
246        this.managedInterceptStrategy = interceptStrategy;
247    }
248
249    public InterceptStrategy getManagedInterceptStrategy() {
250        return managedInterceptStrategy;
251    }
252
253    public boolean isRouteAdded() {
254        return routeAdded;
255    }
256
257    public void setIsRouteAdded(boolean routeAdded) {
258        this.routeAdded = routeAdded;
259    }
260
261    public void setTracing(Boolean tracing) {
262        this.trace = tracing;
263    }
264
265    public Boolean isTracing() {
266        if (trace != null) {
267            return trace;
268        } else {
269            // fallback to the option from camel context
270            return getCamelContext().isTracing();
271        }
272    }
273
274    public void setMessageHistory(Boolean messageHistory) {
275        this.messageHistory = messageHistory;
276    }
277
278    public Boolean isMessageHistory() {
279        if (messageHistory != null) {
280            return messageHistory;
281        } else {
282            // fallback to the option from camel context
283            return getCamelContext().isMessageHistory();
284        }
285    }
286
287    public void setLogExhaustedMessageBody(Boolean logExhaustedMessageBody) {
288        this.logExhaustedMessageBody = logExhaustedMessageBody;
289    }
290
291    public Boolean isLogExhaustedMessageBody() {
292        if (logExhaustedMessageBody != null) {
293            return logExhaustedMessageBody;
294        } else {
295            // fallback to the option from camel context
296            return getCamelContext().isLogExhaustedMessageBody();
297        }
298    }
299
300    public void setStreamCaching(Boolean cache) {
301        this.streamCache = cache;
302    }
303
304    public Boolean isStreamCaching() {
305        if (streamCache != null) {
306            return streamCache;
307        } else {
308            // fallback to the option from camel context
309            return getCamelContext().isStreamCaching();
310        }
311    }
312
313    public void setHandleFault(Boolean handleFault) {
314        this.handleFault = handleFault;
315    }
316
317    public Boolean isHandleFault() {
318        if (handleFault != null) {
319            return handleFault;
320        } else {
321            // fallback to the option from camel context
322            return getCamelContext().isHandleFault();
323        }
324    }
325
326    public void setDelayer(Long delay) {
327        this.delay = delay;
328    }
329
330    public Long getDelayer() {
331        if (delay != null) {
332            return delay;
333        } else {
334            // fallback to the option from camel context
335            return getCamelContext().getDelayer();
336        }
337    }
338
339    public void setAutoStartup(Boolean autoStartup) {
340        this.autoStartup = autoStartup;
341    }
342
343    public Boolean isAutoStartup() {
344        if (autoStartup != null) {
345            return autoStartup;
346        }
347        // default to true
348        return true;
349    }
350
351    public void setShutdownRoute(ShutdownRoute shutdownRoute) {
352        this.shutdownRoute = shutdownRoute;
353    }
354
355    public void setAllowUseOriginalMessage(Boolean allowUseOriginalMessage) {
356        // can only be configured on CamelContext
357        getCamelContext().setAllowUseOriginalMessage(allowUseOriginalMessage);
358    }
359
360    public Boolean isAllowUseOriginalMessage() {
361        // can only be configured on CamelContext
362        return getCamelContext().isAllowUseOriginalMessage();
363    }
364
365    public ShutdownRoute getShutdownRoute() {
366        if (shutdownRoute != null) {
367            return shutdownRoute;
368        } else {
369            // fallback to the option from camel context
370            return getCamelContext().getShutdownRoute();
371        }
372    }
373
374    public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) {
375        this.shutdownRunningTask = shutdownRunningTask;
376    }
377
378    public ShutdownRunningTask getShutdownRunningTask() {
379        if (shutdownRunningTask != null) {
380            return shutdownRunningTask;
381        } else {
382            // fallback to the option from camel context
383            return getCamelContext().getShutdownRunningTask();
384        }
385    }
386    
387    public int getAndIncrement(ProcessorDefinition<?> node) {
388        AtomicInteger count = nodeIndex.get(node);
389        if (count == null) {
390            count = new AtomicInteger();
391            nodeIndex.put(node, count);
392        }
393        return count.getAndIncrement();
394    }
395
396    public void setRoutePolicyList(List<RoutePolicy> routePolicyList) {
397        this.routePolicyList = routePolicyList;
398    }
399
400    public List<RoutePolicy> getRoutePolicyList() {
401        return routePolicyList;
402    }
403}