001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.file;
018
019import org.apache.camel.Consumer;
020import org.apache.camel.Exchange;
021import org.apache.camel.impl.EventDrivenPollingConsumer;
022import org.apache.camel.impl.ScheduledBatchPollingConsumer;
023import org.apache.camel.spi.PollingConsumerPollStrategy;
024import org.apache.camel.util.ObjectHelper;
025import org.apache.camel.util.ServiceHelper;
026import org.apache.camel.util.StopWatch;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030public class GenericFilePollingConsumer extends EventDrivenPollingConsumer {
031
032    private static final Logger LOG = LoggerFactory.getLogger(GenericFilePollingConsumer.class);
033
034    private final long delay;
035
036    public GenericFilePollingConsumer(GenericFileEndpoint endpoint) throws Exception {
037        super(endpoint);
038        this.delay = endpoint.getDelay();
039    }
040
041    @Override
042    protected Consumer createConsumer() throws Exception {
043        // lets add ourselves as a consumer
044        GenericFileConsumer consumer = (GenericFileConsumer) super.createConsumer();
045        // do not start scheduler as we poll manually
046        consumer.setStartScheduler(false);
047        // when using polling consumer we poll only 1 file per poll so we can limit
048        consumer.setMaxMessagesPerPoll(1);
049        // however do not limit eager as we may sort the files and thus need to do a full scan so we can sort afterwards
050        consumer.setEagerLimitMaxMessagesPerPoll(false);
051        // we only want to poll once so disconnect by default
052        return consumer;
053    }
054
055    @Override
056    protected void doStart() throws Exception {
057        super.doStart();
058        // ensure consumer is started
059        ServiceHelper.startService(getConsumer());
060    }
061
062    @Override
063    protected void doStop() throws Exception {
064        super.doStop();
065    }
066
067    @Override
068    protected void doShutdown() throws Exception {
069        super.doShutdown();
070    }
071
072    @Override
073    protected GenericFileConsumer getConsumer() {
074        return (GenericFileConsumer) super.getConsumer();
075    }
076
077    @Override
078    public Exchange receiveNoWait() {
079        if (LOG.isTraceEnabled()) {
080            LOG.trace("receiveNoWait polling file: {}", getConsumer().getEndpoint());
081        }
082        int polled = doReceive(0);
083        if (polled > 0) {
084            return super.receive(0);
085        } else {
086            return null;
087        }
088    }
089
090    @Override
091    public Exchange receive() {
092        if (LOG.isTraceEnabled()) {
093            LOG.trace("receive polling file: {}", getConsumer().getEndpoint());
094        }
095        int polled = doReceive(Long.MAX_VALUE);
096        if (polled > 0) {
097            return super.receive();
098        } else {
099            return null;
100        }
101    }
102
103    @Override
104    public Exchange receive(long timeout) {
105        if (LOG.isTraceEnabled()) {
106            LOG.trace("receive({}) polling file: {}", timeout, getConsumer().getEndpoint());
107        }
108        int polled = doReceive(timeout);
109        if (polled > 0) {
110            return super.receive(timeout);
111        } else {
112            return null;
113        }
114    }
115
116    protected int doReceive(long timeout) {
117        int retryCounter = -1;
118        boolean done = false;
119        Throwable cause = null;
120        int polledMessages = 0;
121        PollingConsumerPollStrategy pollStrategy = getConsumer().getPollStrategy();
122        boolean sendEmptyMessageWhenIdle = getConsumer() instanceof ScheduledBatchPollingConsumer && getConsumer().isSendEmptyMessageWhenIdle();
123        StopWatch watch = new StopWatch();
124
125        while (!done) {
126            try {
127                cause = null;
128                // eager assume we are done
129                done = true;
130                if (isRunAllowed()) {
131
132                    if (retryCounter == -1) {
133                        LOG.trace("Starting to poll: {}", this.getEndpoint());
134                    } else {
135                        LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint());
136                    }
137
138                    // mark we are polling which should also include the begin/poll/commit
139                    boolean begin = pollStrategy.begin(getConsumer(), getEndpoint());
140                    if (begin) {
141                        retryCounter++;
142                        polledMessages = getConsumer().poll();
143                        LOG.trace("Polled {} messages", polledMessages);
144
145                        if (polledMessages == 0 && sendEmptyMessageWhenIdle) {
146                            // send an "empty" exchange
147                            processEmptyMessage();
148                        } else if (polledMessages == 0 && timeout > 0) {
149                            // if we did not poll a file and we are using timeout then try to poll again
150                            done = false;
151                        }
152
153                        pollStrategy.commit(getConsumer(), getEndpoint(), polledMessages);
154                    } else {
155                        LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
156                    }
157                }
158
159                LOG.trace("Finished polling: {}", this.getEndpoint());
160            } catch (Exception e) {
161                try {
162                    boolean retry = pollStrategy.rollback(getConsumer(), getEndpoint(), retryCounter, e);
163                    if (retry) {
164                        // do not set cause as we retry
165                        done = false;
166                    } else {
167                        cause = e;
168                        done = true;
169                    }
170                } catch (Throwable t) {
171                    cause = t;
172                    done = true;
173                }
174            } catch (Throwable t) {
175                cause = t;
176                done = true;
177            }
178
179            if (!done && timeout > 0) {
180                // prepare for next attempt until we hit timeout
181                long left = timeout - watch.taken();
182                long min = Math.min(left, delay);
183                if (min > 0) {
184                    try {
185                        // sleep for next pool
186                        sleep(min);
187                    } catch (InterruptedException e) {
188                        // ignore
189                    }
190                } else {
191                    // timeout hit
192                    done = true;
193                }
194            }
195        }
196
197        if (cause != null) {
198            throw ObjectHelper.wrapRuntimeCamelException(cause);
199        }
200
201        return polledMessages;
202    }
203
204    @Override
205    public void process(Exchange exchange) throws Exception {
206        Object name = exchange.getIn().getHeader(Exchange.FILE_NAME);
207        if (name != null) {
208            LOG.debug("Received file: {}", name);
209        }
210        super.process(exchange);
211    }
212
213    /**
214     * No messages to poll so send an empty message instead.
215     *
216     * @throws Exception is thrown if error processing the empty message.
217     */
218    protected void processEmptyMessage() throws Exception {
219        Exchange exchange = getEndpoint().createExchange();
220        log.debug("Sending empty message as there were no messages from polling: {}", this.getEndpoint());
221        process(exchange);
222    }
223
224    private static void sleep(long delay) throws InterruptedException {
225        if (delay <= 0) {
226            return;
227        }
228        LOG.trace("Sleeping for: {} millis", delay);
229        Thread.sleep(delay);
230    }
231
232}