@ManagedResource(description="Kafka IdempotentRepository") public class KafkaIdempotentRepository extends org.apache.camel.support.ServiceSupport implements org.apache.camel.spi.IdempotentRepository<String>, org.apache.camel.CamelContextAware
IdempotentRepository
.
Uses a local cache of previously seen Message IDs. Mutations that come in via the (add(String)
), or
remove(String)
method will update the local cache and broadcast the change in state on a Kafka topic to
other instances. The cache is back-filled from the topic by a Kafka consumer.
The topic used must be unique per logical repository (i.e. two routes de-duplicate using different repositories,
and different topics).
This class makes no assumptions about the number of partitions (it is designed to consume from all at the
same time), or replication factor of the topic.
Each repository instance that uses the topic (e.g. typically on different machines running in parallel) controls its own
consumer group, so in a cluster of 10 Camel processes using the same topic each will control its own offset.
On startup, the instance subscribes to the topic and rewinds the offset to the beginning, rebuilding the cache to the
latest state. The cache will not be considered warmed up until one poll of pollDurationMs
in length
returns 0 records. Startup will not be completed until either the cache has warmed up, or 30 seconds go by; if the
latter happens the idempotent repository may be in an inconsistent state until its consumer catches up to the end
of the topic.
To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in
Spring/Blueprint, as it is CamelContext aware.Constructor and Description |
---|
KafkaIdempotentRepository()
No-op constructor for XML/property-based object initialisation.
|
KafkaIdempotentRepository(String topic,
Properties consumerConfig,
Properties producerConfig) |
KafkaIdempotentRepository(String topic,
Properties consumerConfig,
Properties producerConfig,
int maxCacheSize,
int pollDurationMs) |
KafkaIdempotentRepository(String topic,
String bootstrapServers) |
KafkaIdempotentRepository(String topic,
String bootstrapServers,
int maxCacheSize,
int pollDurationMs) |
Modifier and Type | Method and Description |
---|---|
boolean |
add(String key) |
void |
clear() |
boolean |
confirm(String key) |
boolean |
contains(String key) |
protected void |
doStart() |
protected void |
doStop() |
String |
getBootstrapServers() |
org.apache.camel.CamelContext |
getCamelContext() |
Properties |
getConsumerConfig() |
long |
getDuplicateCount() |
int |
getMaxCacheSize() |
int |
getPollDurationMs() |
Properties |
getProducerConfig() |
String |
getTopic() |
boolean |
isPollerRunning() |
boolean |
remove(String key) |
void |
setBootstrapServers(String bootstrapServers)
Sets the
|
void |
setCamelContext(org.apache.camel.CamelContext camelContext) |
void |
setConsumerConfig(Properties consumerConfig)
Sets the properties that will be used by the Kafka consumer.
|
void |
setMaxCacheSize(int maxCacheSize)
Sets the maximum size of the local key cache.
|
void |
setPollDurationMs(int pollDurationMs)
Sets the poll duration of the Kafka consumer.
|
void |
setProducerConfig(Properties producerConfig)
Sets the properties that will be used by the Kafka producer.
|
void |
setTopic(String topic)
Sets the name of the Kafka topic used by this idempotent repository.
|
doResume, doShutdown, doSuspend, getStatus, getVersion, isRunAllowed, isStarted, isStarting, isStopped, isStopping, isStoppingOrStopped, isSuspended, isSuspending, isSuspendingOrSuspended, resume, shutdown, start, stop, suspend
public KafkaIdempotentRepository()
public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs)
public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig)
public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize, int pollDurationMs)
public String getTopic()
public void setTopic(String topic)
topic
- The topic name.public String getBootstrapServers()
public void setBootstrapServers(String bootstrapServers)
bootstrap.serversproperty on the internal Kafka producer and consumer. Use this as shorthand if not setting
consumerConfig
and producerConfig
. If used, this component will apply sensible
default configurations for the producer and consumer.bootstrapServers
- The bootstrap.serversvalue to use.
public Properties getProducerConfig()
public void setProducerConfig(Properties producerConfig)
bootstrapServers
, so must define
the bootstrap.serversproperty itself. Prefer using
bootstrapServers
for default configuration unless you specifically need non-standard
configuration options such as SSL/SASL.producerConfig
- The producer configuration properties.public Properties getConsumerConfig()
public void setConsumerConfig(Properties consumerConfig)
bootstrapServers
, so must define
the bootstrap.serversproperty itself. Prefer using
bootstrapServers
for default configuration unless you specifically need non-standard
configuration options such as SSL/SASL.consumerConfig
- The consumer configuration properties.public int getMaxCacheSize()
public void setMaxCacheSize(int maxCacheSize)
maxCacheSize
- The maximum key cache size.public int getPollDurationMs()
public void setPollDurationMs(int pollDurationMs)
DEFAULT_POLL_DURATION_MS
. If setting this value explicitly, be aware that
there is a tradeoff between the remote cache liveness and the volume of network traffic between this repository's
consumer and the Kafka brokers.
The cache warmup process also depends on there being one poll that fetches nothing - this indicates that the
stream has been consumed up to the current point. If the poll duration is excessively long for the rate at
which messages are sent on the topic, there exists a possibility that the cache cannot be warmed up and will
operate in an inconsistent state relative to its peers until it catches up.pollDurationMs
- The poll duration in milliseconds.public void setCamelContext(org.apache.camel.CamelContext camelContext)
setCamelContext
in interface org.apache.camel.CamelContextAware
public org.apache.camel.CamelContext getCamelContext()
getCamelContext
in interface org.apache.camel.CamelContextAware
protected void doStart() throws Exception
doStart
in class org.apache.camel.support.ServiceSupport
Exception
protected void doStop()
doStop
in class org.apache.camel.support.ServiceSupport
public boolean add(String key)
add
in interface org.apache.camel.spi.IdempotentRepository<String>
@ManagedOperation(description="Does the store contain the given key") public boolean contains(String key)
contains
in interface org.apache.camel.spi.IdempotentRepository<String>
@ManagedOperation(description="Remove the key from the store") public boolean remove(String key)
remove
in interface org.apache.camel.spi.IdempotentRepository<String>
public boolean confirm(String key)
confirm
in interface org.apache.camel.spi.IdempotentRepository<String>
public void clear()
clear
in interface org.apache.camel.spi.IdempotentRepository<String>
@ManagedOperation(description="Number of times duplicate messages have been detected") public long getDuplicateCount()
@ManagedOperation(description="Number of times duplicate messages have been detected") public boolean isPollerRunning()
Apache Camel