diff --git a/src/listener/mod.rs b/src/listener/mod.rs index 8ca8d9d..98882ee 100644 --- a/src/listener/mod.rs +++ b/src/listener/mod.rs @@ -9,7 +9,7 @@ use rdkafka::{ use serde_json; use thiserror::Error; use tokio::sync::broadcast; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use crate::{cli, config::Config, graphql::subscription::StatusUpdate, models::KafkaMessage}; use chrono::Utc; @@ -28,6 +28,8 @@ pub enum ListenerError { JsonError(#[from] serde_json::Error), #[error("消息错误: {0}")] MessageError(String), + #[error("配置错误: {0}")] + ConfigError(String), } impl KafkaListener { @@ -40,10 +42,18 @@ impl KafkaListener { }; let client: StreamConsumer = ClientConfig::new() - .set("bootstrap.servers", &config.kafka_brokers) .set("group.id", &config.kafka_group_id) - .set("auto.offset.reset", offset_reset) + .set("bootstrap.servers", &config.kafka_brokers) + .set("enable.partition.eof", "false") + .set("session.timeout.ms", "30000") .set("enable.auto.commit", "true") + .set("auto.offset.reset", offset_reset) + .set("heartbeat.interval.ms", "10000") + .set("max.poll.interval.ms", "300000") + .set("fetch.wait.max.ms", "500") + .set("socket.timeout.ms", "60000") + .set("request.timeout.ms", "30000") + .set("message.timeout.ms", "300000") .create()?; client.subscribe(&[&config.kafka_topic])?; @@ -62,7 +72,66 @@ impl KafkaListener { pub async fn run(&mut self, config: &Config) -> Result<(), ListenerError> { info!("Kafka listener 开始运行..."); + + let mut reconnect_attempts = 0; + let max_reconnect_attempts = 5; + let mut backoff_delay = std::time::Duration::from_secs(1); + + loop { + match self.run_consumer_loop(config).await { + Ok(_) => { + // 正常退出,重置重连计数 + reconnect_attempts = 0; + backoff_delay = std::time::Duration::from_secs(1); + break; + } + Err(e) => { + reconnect_attempts += 1; + + match &e { + ListenerError::KafkaError(kafka_err) => { + if self.is_connection_error(kafka_err) { + if reconnect_attempts <= max_reconnect_attempts { + warn!("Kafka 连接错误 (尝试 {}/{}): {:?}, {}秒后重试...", + reconnect_attempts, max_reconnect_attempts, kafka_err, backoff_delay.as_secs()); + + tokio::time::sleep(backoff_delay).await; + backoff_delay = std::cmp::min(backoff_delay * 2, std::time::Duration::from_secs(60)); + + // 尝试重建连接 + match self.rebuild_client(config).await { + Ok(_) => { + info!("Kafka 客户端重建成功,继续监听..."); + continue; + } + Err(rebuild_err) => { + error!("重建 Kafka 客户端失败: {:?}", rebuild_err); + } + } + } else { + error!("Kafka 重连尝试次数已达上限 ({}), 停止监听器", max_reconnect_attempts); + return Err(e); + } + } else { + // 非连接错误,直接返回 + error!("Kafka 非连接错误: {:?}", kafka_err); + return Err(e); + } + } + _ => { + // 其他类型错误,直接返回 + error!("监听器运行错误: {:?}", e); + return Err(e); + } + } + } + } + } + + Ok(()) + } + async fn run_consumer_loop(&mut self, config: &Config) -> Result<(), ListenerError> { // 如果配置为跳过历史消息,则寻找到最新位置 if config.kafka_skip_historical { if let Err(e) = self.seek_to_end(&config.kafka_topic).await { @@ -76,7 +145,7 @@ impl KafkaListener { if let Some(payload) = msg.payload() { match self.process_message(payload).await { Ok(_) => { - info!("消息处理成功"); + debug!("消息处理成功"); } Err(e) => { error!("消息处理失败: {:?}", e); @@ -99,6 +168,68 @@ impl KafkaListener { } } + /// 检查是否为连接相关错误 + fn is_connection_error(&self, kafka_error: &rdkafka::error::KafkaError) -> bool { + use rdkafka::error::{KafkaError, RDKafkaErrorCode}; + + match kafka_error { + KafkaError::MessageConsumption(RDKafkaErrorCode::OperationTimedOut) => true, + KafkaError::MessageConsumption(RDKafkaErrorCode::NetworkException) => true, + KafkaError::MessageConsumption(RDKafkaErrorCode::AllBrokersDown) => true, + KafkaError::MessageConsumption(RDKafkaErrorCode::BrokerNotAvailable) => true, + KafkaError::Global(RDKafkaErrorCode::OperationTimedOut) => true, + KafkaError::Global(RDKafkaErrorCode::NetworkException) => true, + KafkaError::Global(RDKafkaErrorCode::AllBrokersDown) => true, + KafkaError::Global(RDKafkaErrorCode::BrokerNotAvailable) => true, + _ => { + // 检查错误消息中是否包含连接相关关键词 + let error_str = format!("{:?}", kafka_error); + error_str.contains("timeout") || + error_str.contains("disconnect") || + error_str.contains("network") || + error_str.contains("broker") || + error_str.contains("connection") + } + } + } + + /// 重建 Kafka 客户端 + async fn rebuild_client(&mut self, config: &Config) -> Result<(), ListenerError> { + info!("正在重建 Kafka 客户端..."); + + // 根据配置决定是否跳过历史消息 + let offset_reset = if config.kafka_skip_historical { + "latest" // 从最新消息开始,跳过历史消息 + } else { + "earliest" // 从最早消息开始 + }; + + let consumer: StreamConsumer = ClientConfig::new() + .set("group.id", &config.kafka_group_id) + .set("bootstrap.servers", &config.kafka_brokers) + .set("enable.partition.eof", "false") + .set("session.timeout.ms", "30000") + .set("enable.auto.commit", "true") + .set("auto.offset.reset", offset_reset) + .set("heartbeat.interval.ms", "10000") + .set("max.poll.interval.ms", "300000") + .set("fetch.wait.max.ms", "500") + .set("socket.timeout.ms", "60000") + .set("request.timeout.ms", "30000") + .set("message.timeout.ms", "300000") + .create() + .map_err(|e| ListenerError::ConfigError(format!("创建新的Kafka consumer失败: {}", e)))?; + + consumer + .subscribe(&[&config.kafka_topic]) + .map_err(|e| ListenerError::ConfigError(format!("订阅主题失败: {}", e)))?; + + self.client = consumer; + info!("Kafka 客户端重建完成"); + + Ok(()) + } + async fn process_message(&self, payload: &[u8]) -> Result<(), ListenerError> { // 将字节数组转换为字符串 let message_str = match std::str::from_utf8(payload) {