public class CassandraAggregationRepository
extends org.apache.camel.support.ServiceSupport
implements org.apache.camel.spi.RecoverableAggregationRepository
AggregationRepository
using Cassandra table to store
exchanges.
Advice: use LeveledCompaction for this table and tune read/write consistency levels.
Warning: Cassandra is not the best tool for queuing use cases
See: http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasetsConstructor and Description |
---|
CassandraAggregationRepository() |
CassandraAggregationRepository(com.datastax.driver.core.Cluster cluster,
String keyspace) |
CassandraAggregationRepository(com.datastax.driver.core.Session session) |
Modifier and Type | Method and Description |
---|---|
org.apache.camel.Exchange |
add(org.apache.camel.CamelContext camelContext,
String key,
org.apache.camel.Exchange exchange)
Insert or update exchange in aggregation table.
|
void |
confirm(org.apache.camel.CamelContext camelContext,
String exchangeId)
Remove exchange by Id from aggregation table.
|
protected void |
doStart() |
protected void |
doStop() |
org.apache.camel.Exchange |
get(org.apache.camel.CamelContext camelContext,
String key)
Get exchange from aggregation table by aggregation key.
|
String |
getDeadLetterUri() |
String |
getExchangeColumn() |
String |
getExchangeIdColumn() |
Set<String> |
getKeys()
Get aggregation exchangeIds from aggregation table.
|
int |
getMaximumRedeliveries() |
String[] |
getPKColumns() |
protected Object[] |
getPKValues(String key)
Generate primary key values from aggregation key.
|
Object[] |
getPrefixPKValues() |
com.datastax.driver.core.ConsistencyLevel |
getReadConsistencyLevel() |
long |
getRecoveryIntervalInMillis() |
com.datastax.driver.core.Session |
getSession() |
String |
getTable() |
Integer |
getTtl() |
com.datastax.driver.core.ConsistencyLevel |
getWriteConsistencyLevel() |
protected void |
initSelectStatement() |
boolean |
isAllowSerializedHeaders() |
boolean |
isUseRecovery() |
org.apache.camel.Exchange |
recover(org.apache.camel.CamelContext camelContext,
String exchangeId)
Get exchange by exchange ID.
|
void |
remove(org.apache.camel.CamelContext camelContext,
String key,
org.apache.camel.Exchange exchange)
Remove exchange by aggregation key from aggregation table.
|
Set<String> |
scan(org.apache.camel.CamelContext camelContext)
Get exchange IDs to be recovered
|
protected List<com.datastax.driver.core.Row> |
selectKeyIds() |
void |
setAllowSerializedHeaders(boolean allowSerializedHeaders) |
void |
setDeadLetterUri(String deadLetterUri) |
void |
setExchangeColumn(String exchangeColumnName) |
void |
setExchangeIdColumn(String exchangeIdColumn) |
void |
setMaximumRedeliveries(int maximumRedeliveries) |
void |
setPKColumns(String... pkColumns) |
void |
setPrefixPKValues(Object... prefixPKValues) |
void |
setReadConsistencyLevel(com.datastax.driver.core.ConsistencyLevel readConsistencyLevel) |
void |
setRecoveryInterval(long recoveryIntervalInMillis) |
void |
setRecoveryInterval(long interval,
TimeUnit timeUnit) |
void |
setRecoveryIntervalInMillis(long recoveryIntervalInMillis) |
void |
setSession(com.datastax.driver.core.Session session) |
void |
setTable(String table) |
void |
setTtl(Integer ttl) |
void |
setUseRecovery(boolean useRecovery) |
void |
setWriteConsistencyLevel(com.datastax.driver.core.ConsistencyLevel writeConsistencyLevel) |
public CassandraAggregationRepository()
public CassandraAggregationRepository(com.datastax.driver.core.Session session)
public CassandraAggregationRepository(com.datastax.driver.core.Cluster cluster, String keyspace)
protected Object[] getPKValues(String key)
protected void doStart() throws Exception
doStart
in class org.apache.camel.support.ServiceSupport
Exception
protected void doStop() throws Exception
doStop
in class org.apache.camel.support.ServiceSupport
Exception
public org.apache.camel.Exchange add(org.apache.camel.CamelContext camelContext, String key, org.apache.camel.Exchange exchange)
add
in interface org.apache.camel.spi.AggregationRepository
protected void initSelectStatement()
public org.apache.camel.Exchange get(org.apache.camel.CamelContext camelContext, String key)
get
in interface org.apache.camel.spi.AggregationRepository
public void confirm(org.apache.camel.CamelContext camelContext, String exchangeId)
confirm
in interface org.apache.camel.spi.AggregationRepository
public void remove(org.apache.camel.CamelContext camelContext, String key, org.apache.camel.Exchange exchange)
remove
in interface org.apache.camel.spi.AggregationRepository
protected List<com.datastax.driver.core.Row> selectKeyIds()
public Set<String> getKeys()
getKeys
in interface org.apache.camel.spi.AggregationRepository
public Set<String> scan(org.apache.camel.CamelContext camelContext)
scan
in interface org.apache.camel.spi.RecoverableAggregationRepository
public org.apache.camel.Exchange recover(org.apache.camel.CamelContext camelContext, String exchangeId)
recover
in interface org.apache.camel.spi.RecoverableAggregationRepository
public com.datastax.driver.core.Session getSession()
public void setSession(com.datastax.driver.core.Session session)
public String getTable()
public void setTable(String table)
public Object[] getPrefixPKValues()
public void setPrefixPKValues(Object... prefixPKValues)
public String[] getPKColumns()
public void setPKColumns(String... pkColumns)
public String getExchangeIdColumn()
public void setExchangeIdColumn(String exchangeIdColumn)
public com.datastax.driver.core.ConsistencyLevel getWriteConsistencyLevel()
public void setWriteConsistencyLevel(com.datastax.driver.core.ConsistencyLevel writeConsistencyLevel)
public com.datastax.driver.core.ConsistencyLevel getReadConsistencyLevel()
public void setReadConsistencyLevel(com.datastax.driver.core.ConsistencyLevel readConsistencyLevel)
public String getExchangeColumn()
public void setExchangeColumn(String exchangeColumnName)
public Integer getTtl()
public void setTtl(Integer ttl)
public long getRecoveryIntervalInMillis()
getRecoveryIntervalInMillis
in interface org.apache.camel.spi.RecoverableAggregationRepository
public void setRecoveryIntervalInMillis(long recoveryIntervalInMillis)
public void setRecoveryInterval(long interval, TimeUnit timeUnit)
setRecoveryInterval
in interface org.apache.camel.spi.RecoverableAggregationRepository
public void setRecoveryInterval(long recoveryIntervalInMillis)
setRecoveryInterval
in interface org.apache.camel.spi.RecoverableAggregationRepository
public boolean isUseRecovery()
isUseRecovery
in interface org.apache.camel.spi.RecoverableAggregationRepository
public void setUseRecovery(boolean useRecovery)
setUseRecovery
in interface org.apache.camel.spi.RecoverableAggregationRepository
public String getDeadLetterUri()
getDeadLetterUri
in interface org.apache.camel.spi.RecoverableAggregationRepository
public void setDeadLetterUri(String deadLetterUri)
setDeadLetterUri
in interface org.apache.camel.spi.RecoverableAggregationRepository
public int getMaximumRedeliveries()
getMaximumRedeliveries
in interface org.apache.camel.spi.RecoverableAggregationRepository
public void setMaximumRedeliveries(int maximumRedeliveries)
setMaximumRedeliveries
in interface org.apache.camel.spi.RecoverableAggregationRepository
public boolean isAllowSerializedHeaders()
public void setAllowSerializedHeaders(boolean allowSerializedHeaders)
Apache Camel