001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import org.apache.camel.CamelContext;
027import org.apache.camel.Endpoint;
028import org.apache.camel.NoSuchEndpointException;
029import org.apache.camel.Processor;
030import org.apache.camel.Route;
031import org.apache.camel.RuntimeCamelException;
032import org.apache.camel.ShutdownRoute;
033import org.apache.camel.ShutdownRunningTask;
034import org.apache.camel.model.FromDefinition;
035import org.apache.camel.model.ProcessorDefinition;
036import org.apache.camel.model.RouteDefinition;
037import org.apache.camel.processor.CamelInternalProcessor;
038import org.apache.camel.processor.ContractAdvice;
039import org.apache.camel.processor.Pipeline;
040import org.apache.camel.spi.Contract;
041import org.apache.camel.spi.InterceptStrategy;
042import org.apache.camel.spi.RouteContext;
043import org.apache.camel.spi.RouteController;
044import org.apache.camel.spi.RouteError;
045import org.apache.camel.spi.RoutePolicy;
046import org.apache.camel.util.CamelContextHelper;
047import org.apache.camel.util.ObjectHelper;
048
049/**
050 * The context used to activate new routing rules
051 *
052 * @version 
053 */
054public class DefaultRouteContext implements RouteContext {
055    private final Map<ProcessorDefinition<?>, AtomicInteger> nodeIndex = new HashMap<ProcessorDefinition<?>, AtomicInteger>();
056    private final RouteDefinition route;
057    private FromDefinition from;
058    private final Collection<Route> routes;
059    private Endpoint endpoint;
060    private final List<Processor> eventDrivenProcessors = new ArrayList<Processor>();
061    private CamelContext camelContext;
062    private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>();
063    private InterceptStrategy managedInterceptStrategy;
064    private boolean routeAdded;
065    private Boolean trace;
066    private Boolean messageHistory;
067    private Boolean logMask;
068    private Boolean logExhaustedMessageBody;
069    private Boolean streamCache;
070    private Boolean handleFault;
071    private Long delay;
072    private Boolean autoStartup = Boolean.TRUE;
073    private List<RoutePolicy> routePolicyList = new ArrayList<RoutePolicy>();
074    private ShutdownRoute shutdownRoute;
075    private ShutdownRunningTask shutdownRunningTask;
076    private RouteError routeError;
077    private RouteController routeController;
078
079    public DefaultRouteContext(CamelContext camelContext, RouteDefinition route, FromDefinition from, Collection<Route> routes) {
080        this.camelContext = camelContext;
081        this.route = route;
082        this.from = from;
083        this.routes = routes;
084    }
085
086    /**
087     * Only used for lazy construction from inside ExpressionType
088     */
089    public DefaultRouteContext(CamelContext camelContext) {
090        this.camelContext = camelContext;
091        this.routes = new ArrayList<Route>();
092        this.route = new RouteDefinition("temporary");
093    }
094
095    public Endpoint getEndpoint() {
096        if (endpoint == null) {
097            endpoint = from.resolveEndpoint(this);
098        }
099        return endpoint;
100    }
101
102    public FromDefinition getFrom() {
103        return from;
104    }
105
106    public RouteDefinition getRoute() {
107        return route;
108    }
109
110    public CamelContext getCamelContext() {
111        return camelContext;
112    }
113
114    public Endpoint resolveEndpoint(String uri) {
115        return route.resolveEndpoint(getCamelContext(), uri);
116    }
117
118    public Endpoint resolveEndpoint(String uri, String ref) {
119        Endpoint endpoint = null;
120        if (uri != null) {
121            endpoint = resolveEndpoint(uri);
122            if (endpoint == null) {
123                throw new NoSuchEndpointException(uri);
124            }
125        }
126        if (ref != null) {
127            endpoint = lookup(ref, Endpoint.class);
128            if (endpoint == null) {
129                throw new NoSuchEndpointException("ref:" + ref, "check your camel registry with id " + ref);
130            }
131            // Check the endpoint has the right CamelContext 
132            if (!this.getCamelContext().equals(endpoint.getCamelContext())) {
133                throw new NoSuchEndpointException("ref:" + ref, "make sure the endpoint has the same camel context as the route does.");
134            }
135            try {
136                // need add the endpoint into service
137                getCamelContext().addService(endpoint);
138            } catch (Exception ex) {
139                throw new RuntimeCamelException(ex);
140            }
141        }
142        if (endpoint == null) {
143            throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this);
144        } else {
145            return endpoint;
146        }
147    }
148
149    public <T> T lookup(String name, Class<T> type) {
150        return getCamelContext().getRegistry().lookupByNameAndType(name, type);
151    }
152
153    public <T> Map<String, T> lookupByType(Class<T> type) {
154        return getCamelContext().getRegistry().findByTypeWithName(type);
155    }
156
157    @Override
158    public <T> T mandatoryLookup(String name, Class<T> type) {
159        return CamelContextHelper.mandatoryLookup(getCamelContext(), name, type);
160    }
161
162    public void commit() {
163        // now lets turn all of the event driven consumer processors into a single route
164        if (!eventDrivenProcessors.isEmpty()) {
165            Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors);
166
167            // force creating the route id so its known ahead of the route is started
168            String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory());
169
170            // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
171            CamelInternalProcessor internal = new CamelInternalProcessor(target);
172            internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(this));
173
174            // and then optionally add route policy processor if a custom policy is set
175            List<RoutePolicy> routePolicyList = getRoutePolicyList();
176            if (routePolicyList != null && !routePolicyList.isEmpty()) {
177                for (RoutePolicy policy : routePolicyList) {
178                    // add policy as service if we have not already done that (eg possible if two routes have the same service)
179                    // this ensures Camel can control the lifecycle of the policy
180                    if (!camelContext.hasService(policy)) {
181                        try {
182                            camelContext.addService(policy);
183                        } catch (Exception e) {
184                            throw ObjectHelper.wrapRuntimeCamelException(e);
185                        }
186                    }
187                }
188
189                internal.addAdvice(new CamelInternalProcessor.RoutePolicyAdvice(routePolicyList));
190            }
191
192            // wrap in route inflight processor to track number of inflight exchanges for the route
193            internal.addAdvice(new CamelInternalProcessor.RouteInflightRepositoryAdvice(camelContext.getInflightRepository(), routeId));
194
195            // wrap in JMX instrumentation processor that is used for performance stats
196            internal.addAdvice(new CamelInternalProcessor.InstrumentationAdvice("route"));
197
198            // wrap in route lifecycle
199            internal.addAdvice(new CamelInternalProcessor.RouteLifecycleAdvice());
200
201            // wrap in REST binding
202            if (route.getRestBindingDefinition() != null) {
203                try {
204                    internal.addAdvice(route.getRestBindingDefinition().createRestBindingAdvice(this));
205                } catch (Exception e) {
206                    throw ObjectHelper.wrapRuntimeCamelException(e);
207                }
208            }
209
210            // wrap in contract
211            if (route.getInputType() != null || route.getOutputType() != null) {
212                Contract contract = new Contract();
213                if (route.getInputType() != null) {
214                    contract.setInputType(route.getInputType().getUrn());
215                    contract.setValidateInput(route.getInputType().isValidate());
216                }
217                if (route.getOutputType() != null) {
218                    contract.setOutputType(route.getOutputType().getUrn());
219                    contract.setValidateOutput(route.getOutputType().isValidate());
220                }
221                internal.addAdvice(new ContractAdvice(contract));
222                // make sure to enable data type as its in use when using input/output types on routes
223                camelContext.setUseDataType(true);
224            }
225
226            // and create the route that wraps the UoW
227            Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), internal);
228            edcr.getProperties().put(Route.ID_PROPERTY, routeId);
229            edcr.getProperties().put(Route.PARENT_PROPERTY, Integer.toHexString(route.hashCode()));
230            edcr.getProperties().put(Route.DESCRIPTION_PROPERTY, route.getDescriptionText());
231            if (route.getGroup() != null) {
232                edcr.getProperties().put(Route.GROUP_PROPERTY, route.getGroup());
233            }
234            String rest = "false";
235            if (route.isRest() != null && route.isRest()) {
236                rest = "true";
237            }
238            edcr.getProperties().put(Route.REST_PROPERTY, rest);
239
240            // after the route is created then set the route on the policy processor so we get hold of it
241            CamelInternalProcessor.RoutePolicyAdvice task = internal.getAdvice(CamelInternalProcessor.RoutePolicyAdvice.class);
242            if (task != null) {
243                task.setRoute(edcr);
244            }
245            CamelInternalProcessor.RouteLifecycleAdvice task2 = internal.getAdvice(CamelInternalProcessor.RouteLifecycleAdvice.class);
246            if (task2 != null) {
247                task2.setRoute(edcr);
248            }
249
250            // invoke init on route policy
251            if (routePolicyList != null && !routePolicyList.isEmpty()) {
252                for (RoutePolicy policy : routePolicyList) {
253                    policy.onInit(edcr);
254                }
255            }
256
257            routes.add(edcr);
258        }
259    }
260
261    public void addEventDrivenProcessor(Processor processor) {
262        eventDrivenProcessors.add(processor);
263    }
264
265    public List<InterceptStrategy> getInterceptStrategies() {
266        return interceptStrategies;
267    }
268
269    public void setInterceptStrategies(List<InterceptStrategy> interceptStrategies) {
270        this.interceptStrategies = interceptStrategies;
271    }
272
273    public void addInterceptStrategy(InterceptStrategy interceptStrategy) {
274        getInterceptStrategies().add(interceptStrategy);
275    }
276
277    public void setManagedInterceptStrategy(InterceptStrategy interceptStrategy) {
278        this.managedInterceptStrategy = interceptStrategy;
279    }
280
281    public InterceptStrategy getManagedInterceptStrategy() {
282        return managedInterceptStrategy;
283    }
284
285    public boolean isRouteAdded() {
286        return routeAdded;
287    }
288
289    public void setIsRouteAdded(boolean routeAdded) {
290        this.routeAdded = routeAdded;
291    }
292
293    public void setTracing(Boolean tracing) {
294        this.trace = tracing;
295    }
296
297    public Boolean isTracing() {
298        if (trace != null) {
299            return trace;
300        } else {
301            // fallback to the option from camel context
302            return getCamelContext().isTracing();
303        }
304    }
305
306    public void setMessageHistory(Boolean messageHistory) {
307        this.messageHistory = messageHistory;
308    }
309
310    public Boolean isMessageHistory() {
311        if (messageHistory != null) {
312            return messageHistory;
313        } else {
314            // fallback to the option from camel context
315            return getCamelContext().isMessageHistory();
316        }
317    }
318
319    public void setLogMask(Boolean logMask) {
320        this.logMask = logMask;
321    }
322
323    public Boolean isLogMask() {
324        if (logMask != null) {
325            return logMask;
326        } else {
327            // fallback to the option from camel context
328            return getCamelContext().isLogMask();
329        }
330    }
331
332    public void setLogExhaustedMessageBody(Boolean logExhaustedMessageBody) {
333        this.logExhaustedMessageBody = logExhaustedMessageBody;
334    }
335
336    public Boolean isLogExhaustedMessageBody() {
337        if (logExhaustedMessageBody != null) {
338            return logExhaustedMessageBody;
339        } else {
340            // fallback to the option from camel context
341            return getCamelContext().isLogExhaustedMessageBody();
342        }
343    }
344
345    public void setStreamCaching(Boolean cache) {
346        this.streamCache = cache;
347    }
348
349    public Boolean isStreamCaching() {
350        if (streamCache != null) {
351            return streamCache;
352        } else {
353            // fallback to the option from camel context
354            return getCamelContext().isStreamCaching();
355        }
356    }
357
358    public void setHandleFault(Boolean handleFault) {
359        this.handleFault = handleFault;
360    }
361
362    public Boolean isHandleFault() {
363        if (handleFault != null) {
364            return handleFault;
365        } else {
366            // fallback to the option from camel context
367            return getCamelContext().isHandleFault();
368        }
369    }
370
371    public void setDelayer(Long delay) {
372        this.delay = delay;
373    }
374
375    public Long getDelayer() {
376        if (delay != null) {
377            return delay;
378        } else {
379            // fallback to the option from camel context
380            return getCamelContext().getDelayer();
381        }
382    }
383
384    public void setAutoStartup(Boolean autoStartup) {
385        this.autoStartup = autoStartup;
386    }
387
388    public Boolean isAutoStartup() {
389        if (autoStartup != null) {
390            return autoStartup;
391        }
392        // default to true
393        return true;
394    }
395
396    public void setShutdownRoute(ShutdownRoute shutdownRoute) {
397        this.shutdownRoute = shutdownRoute;
398    }
399
400    public void setAllowUseOriginalMessage(Boolean allowUseOriginalMessage) {
401        // can only be configured on CamelContext
402        getCamelContext().setAllowUseOriginalMessage(allowUseOriginalMessage);
403    }
404
405    public Boolean isAllowUseOriginalMessage() {
406        // can only be configured on CamelContext
407        return getCamelContext().isAllowUseOriginalMessage();
408    }
409
410    public ShutdownRoute getShutdownRoute() {
411        if (shutdownRoute != null) {
412            return shutdownRoute;
413        } else {
414            // fallback to the option from camel context
415            return getCamelContext().getShutdownRoute();
416        }
417    }
418
419    public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) {
420        this.shutdownRunningTask = shutdownRunningTask;
421    }
422
423    public ShutdownRunningTask getShutdownRunningTask() {
424        if (shutdownRunningTask != null) {
425            return shutdownRunningTask;
426        } else {
427            // fallback to the option from camel context
428            return getCamelContext().getShutdownRunningTask();
429        }
430    }
431    
432    public int getAndIncrement(ProcessorDefinition<?> node) {
433        AtomicInteger count = nodeIndex.get(node);
434        if (count == null) {
435            count = new AtomicInteger();
436            nodeIndex.put(node, count);
437        }
438        return count.getAndIncrement();
439    }
440
441    public void setRoutePolicyList(List<RoutePolicy> routePolicyList) {
442        this.routePolicyList = routePolicyList;
443    }
444
445    public List<RoutePolicy> getRoutePolicyList() {
446        return routePolicyList;
447    }
448
449    @Override
450    public RouteError getLastError() {
451        return routeError;
452    }
453
454    @Override
455    public void setLastError(RouteError routeError) {
456        this.routeError = routeError;
457    }
458
459    @Override
460    public RouteController getRouteController() {
461        return routeController;
462    }
463
464    @Override
465    public void setRouteController(RouteController routeController) {
466        this.routeController = routeController;
467    }
468}