001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.lang.reflect.Method;
020import java.util.Collections;
021import java.util.HashMap;
022import java.util.Map;
023
024import org.apache.camel.Endpoint;
025import org.apache.camel.Exchange;
026import org.apache.camel.InvokeOnHeader;
027import org.apache.camel.InvokeOnHeaders;
028import org.apache.camel.Message;
029import org.apache.camel.NoSuchHeaderException;
030import org.apache.camel.Processor;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * A selector-based produced which uses an header value to determine which processor
036 * should be invoked.
037 */
038public class HeaderSelectorProducer extends BaseSelectorProducer {
039    private static final Logger LOGGER = LoggerFactory.getLogger(HeaderSelectorProducer.class);
040
041    private final String header;
042    private final String defaultHeaderValue;
043    private final Object target;
044    private Map<String, Processor> handlers;
045
046    public HeaderSelectorProducer(Endpoint endpoint, String header) {
047        this(endpoint, header, null, null);
048    }
049
050    public HeaderSelectorProducer(Endpoint endpoint, String header, Object target) {
051        this(endpoint, header, null, target);
052    }
053
054    public HeaderSelectorProducer(Endpoint endpoint, String header, String defaultHeaderValue) {
055        this(endpoint, header, defaultHeaderValue, null);
056    }
057
058    public HeaderSelectorProducer(Endpoint endpoint, String header, String defaultHeaderValue, Object target) {
059        super(endpoint);
060
061        this.header = header;
062        this.defaultHeaderValue = defaultHeaderValue;
063        this.target = target != null ? target : this;
064        this.handlers = new HashMap<>();
065    }
066
067    @Override
068    protected void doStart() throws Exception {
069        for (final Method method : target.getClass().getDeclaredMethods()) {
070            InvokeOnHeaders annotation = method.getAnnotation(InvokeOnHeaders.class);
071            if (annotation != null) {
072                for (InvokeOnHeader processor : annotation.value()) {
073                    bind(processor, method);
074                }
075            } else {
076                bind(method.getAnnotation(InvokeOnHeader.class), method);
077            }
078        }
079
080        handlers = Collections.unmodifiableMap(handlers);
081
082        super.doStart();
083    }
084
085    @Override
086    protected Processor getProcessor(Exchange exchange) throws Exception {
087        final String action = exchange.getIn().getHeader(header, defaultHeaderValue, String.class);
088        if (action == null) {
089            throw new NoSuchHeaderException(exchange, header, String.class);
090        }
091
092        return handlers.get(action);
093    }
094
095    protected void onMissingProcessor(Exchange exchange) throws Exception {
096        throw new IllegalStateException(
097            "Unsupported operation " + exchange.getIn().getHeader(header)
098        );
099    }
100
101    protected final void bind(String key, Processor processor) {
102        if (handlers.containsKey(key)) {
103            LOGGER.warn("A processor is already set for action {}", key);
104        }
105
106        this.handlers.put(key, processor);
107    }
108
109    private void bind(InvokeOnHeader handler, final Method method) {
110        if (handler != null && method.getParameterCount() == 1) {
111            method.setAccessible(true);
112
113            final Class<?> type = method.getParameterTypes()[0];
114
115            LOGGER.debug("bind key={}, class={}, method={}, type={}",
116                handler.value(), this.getClass(), method.getName(), type);
117
118            if (Message.class.isAssignableFrom(type)) {
119                bind(handler.value(), e -> method.invoke(target, e.getIn()));
120            } else {
121                bind(handler.value(), e -> method.invoke(target, e));
122            }
123        }
124    }
125}