001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.dataset; 018 019import java.util.concurrent.atomic.AtomicInteger; 020 021import org.apache.camel.Component; 022import org.apache.camel.Consumer; 023import org.apache.camel.Exchange; 024import org.apache.camel.Message; 025import org.apache.camel.Processor; 026import org.apache.camel.Producer; 027import org.apache.camel.Service; 028import org.apache.camel.component.mock.MockEndpoint; 029import org.apache.camel.processor.ThroughputLogger; 030import org.apache.camel.spi.Metadata; 031import org.apache.camel.spi.UriEndpoint; 032import org.apache.camel.spi.UriParam; 033import org.apache.camel.spi.UriPath; 034import org.apache.camel.util.CamelLogger; 035import org.apache.camel.util.ExchangeHelper; 036import org.apache.camel.util.ObjectHelper; 037import org.apache.camel.util.URISupport; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * The dataset component provides a mechanism to easily perform load & soak testing of your system. 043 * 044 * It works by allowing you to create DataSet instances both as a source of messages and as a way to assert that the data set is received. 045 * Camel will use the throughput logger when sending dataset's. 046 */ 047@UriEndpoint(scheme = "dataset", title = "Dataset", syntax = "dataset:name", consumerClass = DataSetConsumer.class, label = "core,testing", lenientProperties = true) 048public class DataSetEndpoint extends MockEndpoint implements Service { 049 private final transient Logger log; 050 private final AtomicInteger receivedCounter = new AtomicInteger(); 051 @UriPath(name = "name", description = "Name of DataSet to lookup in the registry") @Metadata(required = "true") 052 private volatile DataSet dataSet; 053 @UriParam(label = "consumer", defaultValue = "0") 054 private int minRate; 055 @UriParam(label = "consumer", defaultValue = "3") 056 private long produceDelay = 3; 057 @UriParam(label = "producer", defaultValue = "0") 058 private long consumeDelay; 059 @UriParam(label = "consumer", defaultValue = "0") 060 private long preloadSize; 061 @UriParam(label = "consumer", defaultValue = "1000") 062 private long initialDelay = 1000; 063 @UriParam(enums = "strict,lenient,off", defaultValue = "lenient") 064 private String dataSetIndex = "lenient"; 065 066 @Deprecated 067 public DataSetEndpoint() { 068 this.log = LoggerFactory.getLogger(DataSetEndpoint.class); 069 // optimize as we dont need to copy the exchange 070 setCopyOnExchange(false); 071 } 072 073 public DataSetEndpoint(String endpointUri, Component component, DataSet dataSet) { 074 super(endpointUri, component); 075 this.dataSet = dataSet; 076 this.log = LoggerFactory.getLogger(endpointUri); 077 // optimize as we dont need to copy the exchange 078 setCopyOnExchange(false); 079 } 080 081 public static void assertEquals(String description, Object expected, Object actual, Exchange exchange) { 082 if (!ObjectHelper.equal(expected, actual)) { 083 throw new AssertionError(description + " does not match. Expected: " + expected + " but was: " + actual + " on " + exchange + " with headers: " + exchange.getIn().getHeaders()); 084 } 085 } 086 087 @Override 088 public Consumer createConsumer(Processor processor) throws Exception { 089 Consumer answer = new DataSetConsumer(this, processor); 090 configureConsumer(answer); 091 092 // expectedMessageCount((int) size); 093 094 return answer; 095 } 096 097 @Override 098 public Producer createProducer() throws Exception { 099 Producer answer = super.createProducer(); 100 101 long size = getDataSet().getSize(); 102 expectedMessageCount((int) size); 103 104 return answer; 105 } 106 107 @Override 108 public void reset() { 109 super.reset(); 110 receivedCounter.set(0); 111 } 112 113 @Override 114 public int getReceivedCounter() { 115 return receivedCounter.get(); 116 } 117 118 /** 119 * Creates a message exchange for the given index in the {@link DataSet} 120 */ 121 public Exchange createExchange(long messageIndex) throws Exception { 122 Exchange exchange = createExchange(); 123 124 getDataSet().populateMessage(exchange, messageIndex); 125 126 if (!getDataSetIndex().equals("off")) { 127 Message in = exchange.getIn(); 128 in.setHeader(Exchange.DATASET_INDEX, messageIndex); 129 } 130 131 return exchange; 132 } 133 134 @Override 135 protected void waitForCompleteLatch(long timeout) throws InterruptedException { 136 super.waitForCompleteLatch(timeout); 137 138 if (minRate > 0) { 139 int count = getReceivedCounter(); 140 do { 141 // wait as long as we get a decent message rate 142 super.waitForCompleteLatch(1000L); 143 count = getReceivedCounter() - count; 144 } while (count >= minRate); 145 } 146 } 147 148 // Properties 149 //------------------------------------------------------------------------- 150 151 public DataSet getDataSet() { 152 return dataSet; 153 } 154 155 public void setDataSet(DataSet dataSet) { 156 this.dataSet = dataSet; 157 } 158 159 public int getMinRate() { 160 return minRate; 161 } 162 163 /** 164 * Wait until the DataSet contains at least this number of messages 165 */ 166 public void setMinRate(int minRate) { 167 this.minRate = minRate; 168 } 169 170 public long getPreloadSize() { 171 return preloadSize; 172 } 173 174 /** 175 * Sets how many messages should be preloaded (sent) before the route completes its initialization 176 */ 177 public void setPreloadSize(long preloadSize) { 178 this.preloadSize = preloadSize; 179 } 180 181 public long getConsumeDelay() { 182 return consumeDelay; 183 } 184 185 /** 186 * Allows a delay to be specified which causes a delay when a message is consumed by the producer (to simulate slow processing) 187 */ 188 public void setConsumeDelay(long consumeDelay) { 189 this.consumeDelay = consumeDelay; 190 } 191 192 public long getProduceDelay() { 193 return produceDelay; 194 } 195 196 /** 197 * Allows a delay to be specified which causes a delay when a message is sent by the consumer (to simulate slow processing) 198 */ 199 public void setProduceDelay(long produceDelay) { 200 this.produceDelay = produceDelay; 201 } 202 203 public long getInitialDelay() { 204 return initialDelay; 205 } 206 207 /** 208 * Time period in millis to wait before starting sending messages. 209 */ 210 public void setInitialDelay(long initialDelay) { 211 this.initialDelay = initialDelay; 212 } 213 214 /** 215 * Controls the behaviour of the CamelDataSetIndex header. 216 * For Consumers: 217 * - off => the header will not be set 218 * - strict/lenient => the header will be set 219 * For Producers: 220 * - off => the header value will not be verified, and will not be set if it is not present 221 * = strict => the header value must be present and will be verified 222 * = lenient => the header value will be verified if it is present, and will be set if it is not present 223 */ 224 public void setDataSetIndex(String dataSetIndex) { 225 switch (dataSetIndex) { 226 case "off": 227 case "lenient": 228 case "strict": 229 this.dataSetIndex = dataSetIndex; 230 break; 231 default: 232 throw new IllegalArgumentException("Invalid value specified for the dataSetIndex URI parameter:" + dataSetIndex 233 + "Supported values are strict, lenient and off "); 234 } 235 } 236 237 public String getDataSetIndex() { 238 return dataSetIndex; 239 } 240 241 // Implementation methods 242 //------------------------------------------------------------------------- 243 244 @Override 245 protected void performAssertions(Exchange actual, Exchange copy) throws Exception { 246 int receivedCount = receivedCounter.incrementAndGet(); 247 long index = receivedCount - 1; 248 Exchange expected = createExchange(index); 249 250 // now let's assert that they are the same 251 if (log.isDebugEnabled()) { 252 if (copy.getIn().getHeader(Exchange.DATASET_INDEX) != null) { 253 log.debug("Received message: {} (DataSet index={}) = {}", 254 new Object[]{index, copy.getIn().getHeader(Exchange.DATASET_INDEX, Integer.class), copy}); 255 } else { 256 log.debug("Received message: {} = {}", 257 new Object[]{index, copy}); 258 } 259 } 260 261 assertMessageExpected(index, expected, copy); 262 263 if (consumeDelay > 0) { 264 Thread.sleep(consumeDelay); 265 } 266 } 267 268 protected void assertMessageExpected(long index, Exchange expected, Exchange actual) throws Exception { 269 switch (getDataSetIndex()) { 270 case "off": 271 break; 272 case "strict": 273 long actualCounter = ExchangeHelper.getMandatoryHeader(actual, Exchange.DATASET_INDEX, Long.class); 274 assertEquals("Header: " + Exchange.DATASET_INDEX, index, actualCounter, actual); 275 break; 276 case "lenient": 277 default: 278 // Validate the header value if it is present 279 Long dataSetIndexHeaderValue = actual.getIn().getHeader(Exchange.DATASET_INDEX, Long.class); 280 if (dataSetIndexHeaderValue != null) { 281 assertEquals("Header: " + Exchange.DATASET_INDEX, index, dataSetIndexHeaderValue, actual); 282 } else { 283 // set the header if it isn't there 284 actual.getIn().setHeader(Exchange.DATASET_INDEX, index); 285 } 286 break; 287 } 288 289 getDataSet().assertMessageExpected(this, expected, actual, index); 290 } 291 292 protected ThroughputLogger createReporter() { 293 // must sanitize uri to avoid logging sensitive information 294 String uri = URISupport.sanitizeUri(getEndpointUri()); 295 CamelLogger logger = new CamelLogger(uri); 296 ThroughputLogger answer = new ThroughputLogger(logger, (int) this.getDataSet().getReportCount()); 297 answer.setAction("Received"); 298 return answer; 299 } 300 301 @Override 302 protected void doStart() throws Exception { 303 super.doStart(); 304 305 if (reporter == null) { 306 reporter = createReporter(); 307 } 308 309 log.info(this + " expecting " + getExpectedCount() + " messages"); 310 } 311 312}