Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(processor): Fix client handling and number of active connections #4597

Merged
merged 1 commit into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2425,6 +2425,7 @@ impl Config {
Some(create_redis_pools(
redis_configs,
self.cpu_concurrency() as u32,
self.pool_concurrency() as u32,
))
}

Expand Down
14 changes: 11 additions & 3 deletions relay-config/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,20 @@ pub(super) fn create_redis_pool(
}
}

pub(super) fn create_redis_pools(configs: &RedisConfigs, cpu_concurrency: u32) -> RedisPoolConfigs {
pub(super) fn create_redis_pools(
configs: &RedisConfigs,
cpu_concurrency: u32,
max_pool_concurrency: u32,
) -> RedisPoolConfigs {
// Default `max_connections` for the `project_configs` pool.
// In a unified config, this is used for all pools.
let project_configs_default_connections =
std::cmp::max(cpu_concurrency * 2, DEFAULT_MIN_MAX_CONNECTIONS);

// The number of default connections is equal to how many threads we have times the number of
// futures we can concurrently drive times some leeway since we might use more connections.
let default_connections = cpu_concurrency * max_pool_concurrency * 2;

match configs {
RedisConfigs::Unified(cfg) => {
let pool = create_redis_pool(cfg, project_configs_default_connections);
Expand All @@ -291,8 +299,8 @@ pub(super) fn create_redis_pools(configs: &RedisConfigs, cpu_concurrency: u32) -
} => {
let project_configs =
create_redis_pool(project_configs, project_configs_default_connections);
let cardinality = create_redis_pool(cardinality, cpu_concurrency);
let quotas = create_redis_pool(quotas, cpu_concurrency);
let cardinality = create_redis_pool(cardinality, default_connections);
let quotas = create_redis_pool(quotas, default_connections);
RedisPoolConfigs::Individual {
project_configs,
cardinality,
Expand Down
5 changes: 4 additions & 1 deletion relay-quotas/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ impl<T: GlobalLimiter> RedisRateLimiter<T> {
quantity: usize,
over_accept_once: bool,
) -> Result<RateLimits, RateLimitingError> {
let mut client = self.pool.client().map_err(RateLimitingError::Redis)?;
let timestamp = UnixTimestamp::now();
let mut invocation = self.script.prepare_invoke();
let mut tracked_quotas = Vec::new();
Expand Down Expand Up @@ -338,6 +337,10 @@ impl<T: GlobalLimiter> RedisRateLimiter<T> {
return Ok(rate_limits);
}

// We get the redis client after the global rate limiting since we don't want to hold the
// client across await points, otherwise it might be held for too long, and we will run out
// of connections.
let mut client = self.pool.client().map_err(RateLimitingError::Redis)?;
let rejections: Vec<bool> = invocation
.invoke(&mut client.connection().map_err(RateLimitingError::Redis)?)
.map_err(RedisError::Redis)
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ pub enum ServiceError {
#[error("could not initialize kafka producer: {0}")]
Kafka(String),

/// Initializing the Redis cluster client failed.
/// Initializing the Redis client failed.
#[cfg(feature = "processing")]
#[error("could not initialize redis cluster client")]
#[error("could not initialize redis client during startup")]
Redis,
}

Expand Down
Loading