001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.file; 018 019import org.apache.camel.Consumer; 020import org.apache.camel.Exchange; 021import org.apache.camel.impl.EventDrivenPollingConsumer; 022import org.apache.camel.impl.ScheduledBatchPollingConsumer; 023import org.apache.camel.spi.PollingConsumerPollStrategy; 024import org.apache.camel.util.ObjectHelper; 025import org.apache.camel.util.ServiceHelper; 026import org.apache.camel.util.StopWatch; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030public class GenericFilePollingConsumer extends EventDrivenPollingConsumer { 031 032 private static final Logger LOG = LoggerFactory.getLogger(GenericFilePollingConsumer.class); 033 034 private final long delay; 035 036 public GenericFilePollingConsumer(GenericFileEndpoint endpoint) throws Exception { 037 super(endpoint); 038 this.delay = endpoint.getDelay(); 039 } 040 041 @Override 042 protected Consumer createConsumer() throws Exception { 043 // lets add ourselves as a consumer 044 GenericFileConsumer consumer = (GenericFileConsumer) super.createConsumer(); 045 // do not start scheduler as we poll manually 046 consumer.setStartScheduler(false); 047 // when using polling consumer we poll only 1 file per poll so we can limit 048 consumer.setMaxMessagesPerPoll(1); 049 // however do not limit eager as we may sort the files and thus need to do a full scan so we can sort afterwards 050 consumer.setEagerLimitMaxMessagesPerPoll(false); 051 // we only want to poll once so disconnect by default 052 return consumer; 053 } 054 055 @Override 056 protected void doStart() throws Exception { 057 super.doStart(); 058 // ensure consumer is started 059 ServiceHelper.startService(getConsumer()); 060 } 061 062 @Override 063 protected void doStop() throws Exception { 064 super.doStop(); 065 } 066 067 @Override 068 protected void doShutdown() throws Exception { 069 super.doShutdown(); 070 } 071 072 @Override 073 protected GenericFileConsumer getConsumer() { 074 return (GenericFileConsumer) super.getConsumer(); 075 } 076 077 @Override 078 public Exchange receiveNoWait() { 079 if (LOG.isTraceEnabled()) { 080 LOG.trace("receiveNoWait polling file: {}", getConsumer().getEndpoint()); 081 } 082 int polled = doReceive(0); 083 if (polled > 0) { 084 return super.receive(0); 085 } else { 086 return null; 087 } 088 } 089 090 @Override 091 public Exchange receive() { 092 if (LOG.isTraceEnabled()) { 093 LOG.trace("receive polling file: {}", getConsumer().getEndpoint()); 094 } 095 int polled = doReceive(Long.MAX_VALUE); 096 if (polled > 0) { 097 return super.receive(); 098 } else { 099 return null; 100 } 101 } 102 103 @Override 104 public Exchange receive(long timeout) { 105 if (LOG.isTraceEnabled()) { 106 LOG.trace("receive({}) polling file: {}", timeout, getConsumer().getEndpoint()); 107 } 108 int polled = doReceive(timeout); 109 if (polled > 0) { 110 return super.receive(timeout); 111 } else { 112 return null; 113 } 114 } 115 116 protected int doReceive(long timeout) { 117 int retryCounter = -1; 118 boolean done = false; 119 Throwable cause = null; 120 int polledMessages = 0; 121 PollingConsumerPollStrategy pollStrategy = getConsumer().getPollStrategy(); 122 boolean sendEmptyMessageWhenIdle = getConsumer() instanceof ScheduledBatchPollingConsumer && getConsumer().isSendEmptyMessageWhenIdle(); 123 StopWatch watch = new StopWatch(); 124 125 while (!done) { 126 try { 127 cause = null; 128 // eager assume we are done 129 done = true; 130 if (isRunAllowed()) { 131 132 if (retryCounter == -1) { 133 LOG.trace("Starting to poll: {}", this.getEndpoint()); 134 } else { 135 LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint()); 136 } 137 138 // mark we are polling which should also include the begin/poll/commit 139 boolean begin = pollStrategy.begin(getConsumer(), getEndpoint()); 140 if (begin) { 141 retryCounter++; 142 polledMessages = getConsumer().poll(); 143 LOG.trace("Polled {} messages", polledMessages); 144 145 if (polledMessages == 0 && sendEmptyMessageWhenIdle) { 146 // send an "empty" exchange 147 processEmptyMessage(); 148 } else if (polledMessages == 0 && timeout > 0) { 149 // if we did not poll a file and we are using timeout then try to poll again 150 done = false; 151 } 152 153 pollStrategy.commit(getConsumer(), getEndpoint(), polledMessages); 154 } else { 155 LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy); 156 } 157 } 158 159 LOG.trace("Finished polling: {}", this.getEndpoint()); 160 } catch (Exception e) { 161 try { 162 boolean retry = pollStrategy.rollback(getConsumer(), getEndpoint(), retryCounter, e); 163 if (retry) { 164 // do not set cause as we retry 165 done = false; 166 } else { 167 cause = e; 168 done = true; 169 } 170 } catch (Throwable t) { 171 cause = t; 172 done = true; 173 } 174 } catch (Throwable t) { 175 cause = t; 176 done = true; 177 } 178 179 if (!done && timeout > 0) { 180 // prepare for next attempt until we hit timeout 181 long left = timeout - watch.taken(); 182 long min = Math.min(left, delay); 183 if (min > 0) { 184 try { 185 // sleep for next pool 186 sleep(min); 187 } catch (InterruptedException e) { 188 // ignore 189 } 190 } else { 191 // timeout hit 192 done = true; 193 } 194 } 195 } 196 197 if (cause != null) { 198 throw ObjectHelper.wrapRuntimeCamelException(cause); 199 } 200 201 return polledMessages; 202 } 203 204 @Override 205 public void process(Exchange exchange) throws Exception { 206 Object name = exchange.getIn().getHeader(Exchange.FILE_NAME); 207 if (name != null) { 208 LOG.debug("Received file: {}", name); 209 } 210 super.process(exchange); 211 } 212 213 /** 214 * No messages to poll so send an empty message instead. 215 * 216 * @throws Exception is thrown if error processing the empty message. 217 */ 218 protected void processEmptyMessage() throws Exception { 219 Exchange exchange = getEndpoint().createExchange(); 220 log.debug("Sending empty message as there were no messages from polling: {}", this.getEndpoint()); 221 process(exchange); 222 } 223 224 private static void sleep(long delay) throws InterruptedException { 225 if (delay <= 0) { 226 return; 227 } 228 LOG.trace("Sleeping for: {} millis", delay); 229 Thread.sleep(delay); 230 } 231 232}