001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.dataset;
018    
019    import java.util.concurrent.atomic.AtomicInteger;
020    
021    import org.apache.camel.Component;
022    import org.apache.camel.Consumer;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Message;
025    import org.apache.camel.Processor;
026    import org.apache.camel.Service;
027    import org.apache.camel.component.mock.MockEndpoint;
028    import org.apache.camel.processor.ThroughputLogger;
029    import org.apache.camel.util.CamelLogger;
030    import org.apache.camel.util.ExchangeHelper;
031    import org.apache.camel.util.ObjectHelper;
032    import org.apache.camel.util.URISupport;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     * Endpoint for DataSet.
038     *
039     * @version 
040     */
041    public class DataSetEndpoint extends MockEndpoint implements Service {
042        private final transient Logger log;
043        private DataSet dataSet;
044        private AtomicInteger receivedCounter = new AtomicInteger();
045        private int minRate;
046        private long produceDelay = 3;
047        private long consumeDelay;
048        private long preloadSize;
049        private long initialDelay = 1000;
050    
051        @Deprecated
052        public DataSetEndpoint() {
053            this.log = LoggerFactory.getLogger(DataSetEndpoint.class);
054            // optimize as we dont need to copy the exchange
055            copyOnExchange = false;
056        }
057    
058        public DataSetEndpoint(String endpointUri, Component component, DataSet dataSet) {
059            super(endpointUri, component);
060            this.dataSet = dataSet;
061            this.log = LoggerFactory.getLogger(endpointUri);
062            // optimize as we dont need to copy the exchange
063            copyOnExchange = false;
064        }
065    
066        public static void assertEquals(String description, Object expected, Object actual, Exchange exchange) {
067            if (!ObjectHelper.equal(expected, actual)) {
068                throw new AssertionError(description + " does not match. Expected: " + expected + " but was: " + actual + " on " + exchange + " with headers: " + exchange.getIn().getHeaders());
069            }
070        }
071    
072        @Override
073        public Consumer createConsumer(Processor processor) throws Exception {
074            return new DataSetConsumer(this, processor);
075        }
076    
077        @Override
078        public void reset() {
079            super.reset();
080            receivedCounter.set(0);
081        }
082    
083        @Override
084        public int getReceivedCounter() {
085            return receivedCounter.get();
086        }
087    
088        /**
089         * Creates a message exchange for the given index in the {@link DataSet}
090         */
091        public Exchange createExchange(long messageIndex) throws Exception {
092            Exchange exchange = createExchange();
093            getDataSet().populateMessage(exchange, messageIndex);
094    
095            Message in = exchange.getIn();
096            in.setHeader(Exchange.DATASET_INDEX, messageIndex);
097    
098            return exchange;
099        }
100    
101        public int getMinRate() {
102            return minRate;
103        }
104    
105        public void setMinRate(int minRate) {
106            this.minRate = minRate;
107        }
108    
109        @Override
110        protected void waitForCompleteLatch(long timeout) throws InterruptedException {
111            super.waitForCompleteLatch(timeout);
112    
113            if (minRate > 0) {
114                int count = getReceivedCounter();
115                do {
116                    // wait as long as we get a decent message rate
117                    super.waitForCompleteLatch(1000L);
118                    count = getReceivedCounter() - count;
119                } while (count >= minRate);
120            }
121        }
122    
123        // Properties
124        //-------------------------------------------------------------------------
125    
126        public DataSet getDataSet() {
127            return dataSet;
128        }
129    
130        public void setDataSet(DataSet dataSet) {
131            this.dataSet = dataSet;
132        }
133    
134        public long getPreloadSize() {
135            return preloadSize;
136        }
137    
138        /**
139         * Sets how many messages should be preloaded (sent) before the route completes its initialization
140         */
141        public void setPreloadSize(long preloadSize) {
142            this.preloadSize = preloadSize;
143        }
144    
145        public long getConsumeDelay() {
146            return consumeDelay;
147        }
148    
149        /**
150         * Allows a delay to be specified which causes consumers to pause - to simulate slow consumers
151         */
152        public void setConsumeDelay(long consumeDelay) {
153            this.consumeDelay = consumeDelay;
154        }
155    
156        public long getProduceDelay() {
157            return produceDelay;
158        }
159    
160        /**
161         * Allows a delay to be specified which causes producers to pause - to simulate slow producers
162         */
163        public void setProduceDelay(long produceDelay) {
164            this.produceDelay = produceDelay;
165        }
166    
167        public long getInitialDelay() {
168            return initialDelay;
169        }
170    
171        public void setInitialDelay(long initialDelay) {
172            this.initialDelay = initialDelay;
173        }
174    
175        // Implementation methods
176        //-------------------------------------------------------------------------
177    
178        @Override
179        protected void performAssertions(Exchange actual, Exchange copy) throws Exception {
180            int receivedCount = receivedCounter.incrementAndGet();
181            long index = receivedCount - 1;
182            Exchange expected = createExchange(index);
183    
184            // now let's assert that they are the same
185            if (log.isDebugEnabled()) {
186                log.debug("Received message: {} (DataSet index={}) = {}",
187                        new Object[]{index, copy.getIn().getHeader(Exchange.DATASET_INDEX, Integer.class), copy});
188            }
189    
190            assertMessageExpected(index, expected, copy);
191    
192            if (consumeDelay > 0) {
193                Thread.sleep(consumeDelay);
194            }
195        }
196    
197        protected void assertMessageExpected(long index, Exchange expected, Exchange actual) throws Exception {
198            long actualCounter = ExchangeHelper.getMandatoryHeader(actual, Exchange.DATASET_INDEX, Long.class);
199            assertEquals("Header: " + Exchange.DATASET_INDEX, index, actualCounter, actual);
200    
201            getDataSet().assertMessageExpected(this, expected, actual, index);
202        }
203    
204        protected ThroughputLogger createReporter() {
205            // must sanitize uri to avoid logging sensitive information
206            String uri = URISupport.sanitizeUri(getEndpointUri());
207            CamelLogger logger = new CamelLogger(uri);
208            ThroughputLogger answer = new ThroughputLogger(logger, (int) this.getDataSet().getReportCount());
209            answer.setAction("Received");
210            return answer;
211        }
212    
213        @Override
214        protected void doStart() throws Exception {
215            super.doStart();
216    
217            long size = getDataSet().getSize();
218            expectedMessageCount((int) size);
219            if (reporter == null) {
220                reporter = createReporter();
221            }
222            log.info(this + " expecting " + size + " messages");
223        }
224    
225    }