001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.dataset; 018 019 import java.util.concurrent.atomic.AtomicInteger; 020 021 import org.apache.camel.Component; 022 import org.apache.camel.Consumer; 023 import org.apache.camel.Exchange; 024 import org.apache.camel.Message; 025 import org.apache.camel.Processor; 026 import org.apache.camel.Service; 027 import org.apache.camel.component.mock.MockEndpoint; 028 import org.apache.camel.processor.ThroughputLogger; 029 import org.apache.camel.util.CamelLogger; 030 import org.apache.camel.util.ExchangeHelper; 031 import org.apache.camel.util.ObjectHelper; 032 import org.apache.camel.util.URISupport; 033 import org.slf4j.Logger; 034 import org.slf4j.LoggerFactory; 035 036 /** 037 * Endpoint for DataSet. 038 * 039 * @version 040 */ 041 public class DataSetEndpoint extends MockEndpoint implements Service { 042 private final transient Logger log; 043 private DataSet dataSet; 044 private AtomicInteger receivedCounter = new AtomicInteger(); 045 private int minRate; 046 private long produceDelay = 3; 047 private long consumeDelay; 048 private long preloadSize; 049 private long initialDelay = 1000; 050 051 @Deprecated 052 public DataSetEndpoint() { 053 this.log = LoggerFactory.getLogger(DataSetEndpoint.class); 054 // optimize as we dont need to copy the exchange 055 copyOnExchange = false; 056 } 057 058 public DataSetEndpoint(String endpointUri, Component component, DataSet dataSet) { 059 super(endpointUri, component); 060 this.dataSet = dataSet; 061 this.log = LoggerFactory.getLogger(endpointUri); 062 // optimize as we dont need to copy the exchange 063 copyOnExchange = false; 064 } 065 066 public static void assertEquals(String description, Object expected, Object actual, Exchange exchange) { 067 if (!ObjectHelper.equal(expected, actual)) { 068 throw new AssertionError(description + " does not match. Expected: " + expected + " but was: " + actual + " on " + exchange + " with headers: " + exchange.getIn().getHeaders()); 069 } 070 } 071 072 @Override 073 public Consumer createConsumer(Processor processor) throws Exception { 074 return new DataSetConsumer(this, processor); 075 } 076 077 @Override 078 public void reset() { 079 super.reset(); 080 receivedCounter.set(0); 081 } 082 083 @Override 084 public int getReceivedCounter() { 085 return receivedCounter.get(); 086 } 087 088 /** 089 * Creates a message exchange for the given index in the {@link DataSet} 090 */ 091 public Exchange createExchange(long messageIndex) throws Exception { 092 Exchange exchange = createExchange(); 093 getDataSet().populateMessage(exchange, messageIndex); 094 095 Message in = exchange.getIn(); 096 in.setHeader(Exchange.DATASET_INDEX, messageIndex); 097 098 return exchange; 099 } 100 101 public int getMinRate() { 102 return minRate; 103 } 104 105 public void setMinRate(int minRate) { 106 this.minRate = minRate; 107 } 108 109 @Override 110 protected void waitForCompleteLatch(long timeout) throws InterruptedException { 111 super.waitForCompleteLatch(timeout); 112 113 if (minRate > 0) { 114 int count = getReceivedCounter(); 115 do { 116 // wait as long as we get a decent message rate 117 super.waitForCompleteLatch(1000L); 118 count = getReceivedCounter() - count; 119 } while (count >= minRate); 120 } 121 } 122 123 // Properties 124 //------------------------------------------------------------------------- 125 126 public DataSet getDataSet() { 127 return dataSet; 128 } 129 130 public void setDataSet(DataSet dataSet) { 131 this.dataSet = dataSet; 132 } 133 134 public long getPreloadSize() { 135 return preloadSize; 136 } 137 138 /** 139 * Sets how many messages should be preloaded (sent) before the route completes its initialization 140 */ 141 public void setPreloadSize(long preloadSize) { 142 this.preloadSize = preloadSize; 143 } 144 145 public long getConsumeDelay() { 146 return consumeDelay; 147 } 148 149 /** 150 * Allows a delay to be specified which causes consumers to pause - to simulate slow consumers 151 */ 152 public void setConsumeDelay(long consumeDelay) { 153 this.consumeDelay = consumeDelay; 154 } 155 156 public long getProduceDelay() { 157 return produceDelay; 158 } 159 160 /** 161 * Allows a delay to be specified which causes producers to pause - to simulate slow producers 162 */ 163 public void setProduceDelay(long produceDelay) { 164 this.produceDelay = produceDelay; 165 } 166 167 public long getInitialDelay() { 168 return initialDelay; 169 } 170 171 public void setInitialDelay(long initialDelay) { 172 this.initialDelay = initialDelay; 173 } 174 175 // Implementation methods 176 //------------------------------------------------------------------------- 177 178 @Override 179 protected void performAssertions(Exchange actual, Exchange copy) throws Exception { 180 int receivedCount = receivedCounter.incrementAndGet(); 181 long index = receivedCount - 1; 182 Exchange expected = createExchange(index); 183 184 // now let's assert that they are the same 185 if (log.isDebugEnabled()) { 186 log.debug("Received message: {} (DataSet index={}) = {}", 187 new Object[]{index, copy.getIn().getHeader(Exchange.DATASET_INDEX, Integer.class), copy}); 188 } 189 190 assertMessageExpected(index, expected, copy); 191 192 if (consumeDelay > 0) { 193 Thread.sleep(consumeDelay); 194 } 195 } 196 197 protected void assertMessageExpected(long index, Exchange expected, Exchange actual) throws Exception { 198 long actualCounter = ExchangeHelper.getMandatoryHeader(actual, Exchange.DATASET_INDEX, Long.class); 199 assertEquals("Header: " + Exchange.DATASET_INDEX, index, actualCounter, actual); 200 201 getDataSet().assertMessageExpected(this, expected, actual, index); 202 } 203 204 protected ThroughputLogger createReporter() { 205 // must sanitize uri to avoid logging sensitive information 206 String uri = URISupport.sanitizeUri(getEndpointUri()); 207 CamelLogger logger = new CamelLogger(uri); 208 ThroughputLogger answer = new ThroughputLogger(logger, (int) this.getDataSet().getReportCount()); 209 answer.setAction("Received"); 210 return answer; 211 } 212 213 @Override 214 protected void doStart() throws Exception { 215 super.doStart(); 216 217 long size = getDataSet().getSize(); 218 expectedMessageCount((int) size); 219 if (reporter == null) { 220 reporter = createReporter(); 221 } 222 log.info(this + " expecting " + size + " messages"); 223 } 224 225 }