This commit is contained in:
parent
e04984192c
commit
ef58da8853
@ -9,7 +9,7 @@ use rdkafka::{
|
||||
use serde_json;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::{cli, config::Config, graphql::subscription::StatusUpdate, models::KafkaMessage};
|
||||
use chrono::Utc;
|
||||
@ -28,6 +28,8 @@ pub enum ListenerError {
|
||||
JsonError(#[from] serde_json::Error),
|
||||
#[error("消息错误: {0}")]
|
||||
MessageError(String),
|
||||
#[error("配置错误: {0}")]
|
||||
ConfigError(String),
|
||||
}
|
||||
|
||||
impl KafkaListener {
|
||||
@ -40,10 +42,18 @@ impl KafkaListener {
|
||||
};
|
||||
|
||||
let client: StreamConsumer = ClientConfig::new()
|
||||
.set("bootstrap.servers", &config.kafka_brokers)
|
||||
.set("group.id", &config.kafka_group_id)
|
||||
.set("auto.offset.reset", offset_reset)
|
||||
.set("bootstrap.servers", &config.kafka_brokers)
|
||||
.set("enable.partition.eof", "false")
|
||||
.set("session.timeout.ms", "30000")
|
||||
.set("enable.auto.commit", "true")
|
||||
.set("auto.offset.reset", offset_reset)
|
||||
.set("heartbeat.interval.ms", "10000")
|
||||
.set("max.poll.interval.ms", "300000")
|
||||
.set("fetch.wait.max.ms", "500")
|
||||
.set("socket.timeout.ms", "60000")
|
||||
.set("request.timeout.ms", "30000")
|
||||
.set("message.timeout.ms", "300000")
|
||||
.create()?;
|
||||
|
||||
client.subscribe(&[&config.kafka_topic])?;
|
||||
@ -63,6 +73,65 @@ impl KafkaListener {
|
||||
pub async fn run(&mut self, config: &Config) -> Result<(), ListenerError> {
|
||||
info!("Kafka listener 开始运行...");
|
||||
|
||||
let mut reconnect_attempts = 0;
|
||||
let max_reconnect_attempts = 5;
|
||||
let mut backoff_delay = std::time::Duration::from_secs(1);
|
||||
|
||||
loop {
|
||||
match self.run_consumer_loop(config).await {
|
||||
Ok(_) => {
|
||||
// 正常退出,重置重连计数
|
||||
reconnect_attempts = 0;
|
||||
backoff_delay = std::time::Duration::from_secs(1);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
reconnect_attempts += 1;
|
||||
|
||||
match &e {
|
||||
ListenerError::KafkaError(kafka_err) => {
|
||||
if self.is_connection_error(kafka_err) {
|
||||
if reconnect_attempts <= max_reconnect_attempts {
|
||||
warn!("Kafka 连接错误 (尝试 {}/{}): {:?}, {}秒后重试...",
|
||||
reconnect_attempts, max_reconnect_attempts, kafka_err, backoff_delay.as_secs());
|
||||
|
||||
tokio::time::sleep(backoff_delay).await;
|
||||
backoff_delay = std::cmp::min(backoff_delay * 2, std::time::Duration::from_secs(60));
|
||||
|
||||
// 尝试重建连接
|
||||
match self.rebuild_client(config).await {
|
||||
Ok(_) => {
|
||||
info!("Kafka 客户端重建成功,继续监听...");
|
||||
continue;
|
||||
}
|
||||
Err(rebuild_err) => {
|
||||
error!("重建 Kafka 客户端失败: {:?}", rebuild_err);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!("Kafka 重连尝试次数已达上限 ({}), 停止监听器", max_reconnect_attempts);
|
||||
return Err(e);
|
||||
}
|
||||
} else {
|
||||
// 非连接错误,直接返回
|
||||
error!("Kafka 非连接错误: {:?}", kafka_err);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// 其他类型错误,直接返回
|
||||
error!("监听器运行错误: {:?}", e);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_consumer_loop(&mut self, config: &Config) -> Result<(), ListenerError> {
|
||||
// 如果配置为跳过历史消息,则寻找到最新位置
|
||||
if config.kafka_skip_historical {
|
||||
if let Err(e) = self.seek_to_end(&config.kafka_topic).await {
|
||||
@ -76,7 +145,7 @@ impl KafkaListener {
|
||||
if let Some(payload) = msg.payload() {
|
||||
match self.process_message(payload).await {
|
||||
Ok(_) => {
|
||||
info!("消息处理成功");
|
||||
debug!("消息处理成功");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("消息处理失败: {:?}", e);
|
||||
@ -99,6 +168,68 @@ impl KafkaListener {
|
||||
}
|
||||
}
|
||||
|
||||
/// 检查是否为连接相关错误
|
||||
fn is_connection_error(&self, kafka_error: &rdkafka::error::KafkaError) -> bool {
|
||||
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
|
||||
|
||||
match kafka_error {
|
||||
KafkaError::MessageConsumption(RDKafkaErrorCode::OperationTimedOut) => true,
|
||||
KafkaError::MessageConsumption(RDKafkaErrorCode::NetworkException) => true,
|
||||
KafkaError::MessageConsumption(RDKafkaErrorCode::AllBrokersDown) => true,
|
||||
KafkaError::MessageConsumption(RDKafkaErrorCode::BrokerNotAvailable) => true,
|
||||
KafkaError::Global(RDKafkaErrorCode::OperationTimedOut) => true,
|
||||
KafkaError::Global(RDKafkaErrorCode::NetworkException) => true,
|
||||
KafkaError::Global(RDKafkaErrorCode::AllBrokersDown) => true,
|
||||
KafkaError::Global(RDKafkaErrorCode::BrokerNotAvailable) => true,
|
||||
_ => {
|
||||
// 检查错误消息中是否包含连接相关关键词
|
||||
let error_str = format!("{:?}", kafka_error);
|
||||
error_str.contains("timeout") ||
|
||||
error_str.contains("disconnect") ||
|
||||
error_str.contains("network") ||
|
||||
error_str.contains("broker") ||
|
||||
error_str.contains("connection")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// 重建 Kafka 客户端
|
||||
async fn rebuild_client(&mut self, config: &Config) -> Result<(), ListenerError> {
|
||||
info!("正在重建 Kafka 客户端...");
|
||||
|
||||
// 根据配置决定是否跳过历史消息
|
||||
let offset_reset = if config.kafka_skip_historical {
|
||||
"latest" // 从最新消息开始,跳过历史消息
|
||||
} else {
|
||||
"earliest" // 从最早消息开始
|
||||
};
|
||||
|
||||
let consumer: StreamConsumer = ClientConfig::new()
|
||||
.set("group.id", &config.kafka_group_id)
|
||||
.set("bootstrap.servers", &config.kafka_brokers)
|
||||
.set("enable.partition.eof", "false")
|
||||
.set("session.timeout.ms", "30000")
|
||||
.set("enable.auto.commit", "true")
|
||||
.set("auto.offset.reset", offset_reset)
|
||||
.set("heartbeat.interval.ms", "10000")
|
||||
.set("max.poll.interval.ms", "300000")
|
||||
.set("fetch.wait.max.ms", "500")
|
||||
.set("socket.timeout.ms", "60000")
|
||||
.set("request.timeout.ms", "30000")
|
||||
.set("message.timeout.ms", "300000")
|
||||
.create()
|
||||
.map_err(|e| ListenerError::ConfigError(format!("创建新的Kafka consumer失败: {}", e)))?;
|
||||
|
||||
consumer
|
||||
.subscribe(&[&config.kafka_topic])
|
||||
.map_err(|e| ListenerError::ConfigError(format!("订阅主题失败: {}", e)))?;
|
||||
|
||||
self.client = consumer;
|
||||
info!("Kafka 客户端重建完成");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_message(&self, payload: &[u8]) -> Result<(), ListenerError> {
|
||||
// 将字节数组转换为字符串
|
||||
let message_str = match std::str::from_utf8(payload) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user