001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.lang.reflect.Method;
020import java.util.Collections;
021import java.util.HashMap;
022import java.util.Map;
023
024import org.apache.camel.Endpoint;
025import org.apache.camel.Exchange;
026import org.apache.camel.InvokeOnHeader;
027import org.apache.camel.InvokeOnHeaders;
028import org.apache.camel.Message;
029import org.apache.camel.NoSuchHeaderException;
030import org.apache.camel.Processor;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * A selector-based produced which uses an header value to determine which processor
036 * should be invoked.
037 */
038public class HeaderSelectorProducer extends BaseSelectorProducer {
039    private static final Logger LOGGER = LoggerFactory.getLogger(HeaderSelectorProducer.class);
040
041    private final String header;
042    private final String defaultHeaderValue;
043    private final Object target;
044    private Map<String, Processor> handlers;
045
046    public HeaderSelectorProducer(Endpoint endpoint, String header) {
047        this(endpoint, header, null, null);
048    }
049
050    public HeaderSelectorProducer(Endpoint endpoint, String header, Object target) {
051        this(endpoint, header, null, target);
052    }
053
054    public HeaderSelectorProducer(Endpoint endpoint, String header, String defaultHeaderValue) {
055        this(endpoint, header, defaultHeaderValue, null);
056    }
057
058    public HeaderSelectorProducer(Endpoint endpoint, String header, String defaultHeaderValue, Object target) {
059        super(endpoint);
060
061        this.header = header;
062        this.defaultHeaderValue = defaultHeaderValue;
063        this.target = target != null ? target : this;
064        this.handlers = new HashMap<>();
065    }
066
067    @Override
068    protected void doStart() throws Exception {
069        for (final Method method : target.getClass().getDeclaredMethods()) {
070            InvokeOnHeaders annotation = method.getAnnotation(InvokeOnHeaders.class);
071            if (annotation != null) {
072                for (InvokeOnHeader processor : annotation.value()) {
073                    bind(processor, method);
074                }
075            } else {
076                bind(method.getAnnotation(InvokeOnHeader.class), method);
077            }
078        }
079
080        handlers = Collections.unmodifiableMap(handlers);
081
082        super.doStart();
083    }
084
085    @Override
086    protected void doStop() throws Exception {
087        super.doStop();
088
089        handlers.clear();
090    }
091
092    @Override
093    protected Processor getProcessor(Exchange exchange) throws Exception {
094        final String action = exchange.getIn().getHeader(header, defaultHeaderValue, String.class);
095        if (action == null) {
096            throw new NoSuchHeaderException(exchange, header, String.class);
097        }
098
099        return handlers.get(action);
100    }
101
102    protected void onMissingProcessor(Exchange exchange) throws Exception {
103        throw new IllegalStateException(
104            "Unsupported operation " + exchange.getIn().getHeader(header)
105        );
106    }
107
108    protected final void bind(String key, Processor processor) {
109        if (handlers.containsKey(key)) {
110            LOGGER.warn("A processor is already set for action {}", key);
111        }
112
113        this.handlers.put(key, processor);
114    }
115
116    private void bind(InvokeOnHeader handler, final Method method) {
117        final Class<?>[] types = method.getParameterTypes();
118
119        if (handler != null && types != null && types.length == 1) {
120            method.setAccessible(true);
121
122            final Class<?> type = types[0];
123
124            LOGGER.debug("bind key={}, class={}, method={}, type={}",
125                handler.value(), this.getClass(), method.getName(), type);
126
127            if (Message.class.isAssignableFrom(type)) {
128                bind(
129                    handler.value(), 
130                    new Processor() {
131                        public void process(Exchange exchange) throws Exception {
132                            method.invoke(target, exchange.getIn());
133                        }
134                    }
135                );
136            } else {
137                bind(
138                    handler.value(), 
139                    new Processor() {
140                        public void process(Exchange exchange) throws Exception {
141                            method.invoke(target, exchange);
142                        }
143                    }
144                );
145            }
146        }
147    }
148}