sync subscribtion

This commit is contained in:
tsuki 2025-08-02 00:35:38 +08:00
parent 5a50d6b84b
commit f9072ead3c
10 changed files with 658 additions and 19 deletions

66
Cargo.lock generated
View File

@ -2075,6 +2075,18 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "libz-sys"
version = "1.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b70e7a7df205e92a1a4cd9aaae7898dac0aa555503cc0a649494d0d60e7651d"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "linux-raw-sys"
version = "0.3.8"
@ -2150,10 +2162,12 @@ dependencies = [
"dotenvy",
"futures-util",
"jsonwebtoken",
"rdkafka",
"rustls",
"sea-query",
"sea-query-binder",
"serde",
"serde_json",
"sqlx",
"tokio",
"tower 0.4.13",
@ -2356,6 +2370,28 @@ dependencies = [
"libm",
]
[[package]]
name = "num_enum"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a973b4e44ce6cad84ce69d797acf9a044532e4184c4f267913d1b546a0727b7a"
dependencies = [
"num_enum_derive",
"rustversion",
]
[[package]]
name = "num_enum_derive"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77e878c846a8abae00dd069496dbe8751b16ac1c3d6bd2a7283a938e8228f90d"
dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"syn 2.0.104",
]
[[package]]
name = "object"
version = "0.36.7"
@ -2823,6 +2859,36 @@ dependencies = [
"getrandom 0.3.3",
]
[[package]]
name = "rdkafka"
version = "0.38.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f1856d72dbbbea0d2a5b2eaf6af7fb3847ef2746e883b11781446a51dbc85c0"
dependencies = [
"futures-channel",
"futures-util",
"libc",
"log",
"rdkafka-sys",
"serde",
"serde_derive",
"serde_json",
"slab",
"tokio",
]
[[package]]
name = "rdkafka-sys"
version = "4.9.0+2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5230dca48bc354d718269f3e4353280e188b610f7af7e2fcf54b7a79d5802872"
dependencies = [
"libc",
"libz-sys",
"num_enum",
"pkg-config",
]
[[package]]
name = "redox_syscall"
version = "0.5.13"

View File

@ -11,6 +11,7 @@ async-graphql-axum = "7.0.17"
axum = { version = "0.8.4", features = ["ws", "macros"] }
chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sqlx = { version = "0.8", features = [
# "runtime-tokio-rustls",
"postgres",
@ -46,4 +47,5 @@ sea-query-binder = {version = "0.7.0",features = [
axum-reverse-proxy = "1.0.3"
rustls = { version = "0.23", features = ["aws-lc-rs"] }
clap = { version = "4.0", features = ["derive"] }
rdkafka = "0.38.0"

View File

@ -1,5 +1,6 @@
use axum_reverse_proxy::ReverseProxy;
use std::sync::Arc;
use tokio::sync::broadcast;
use async_graphql::{
http::{playground_source, GraphQLPlaygroundConfig},
@ -9,7 +10,7 @@ use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
use axum::{
extract::{FromRef, State},
response::{Html, IntoResponse},
routing::{get, post},
routing::get,
Router,
};
use jsonwebtoken::DecodingKey;
@ -17,16 +18,16 @@ use sqlx::PgPool;
use tower_http::cors::CorsLayer;
use crate::{
auth::{AuthUser, AuthUserState, Claims as MyClaims},
auth::{AuthUserState, Claims as MyClaims},
config::Config,
graphql::{MutationRoot, QueryRoot, SubscriptionRoot},
graphql::{subscription::StatusUpdate, MutationRoot, QueryRoot, SubscriptionRoot},
services::{
invite_code_service::InviteCodeService, system_config_service::SystemConfigService,
user_service::UserService,
},
};
use axum_jwt_auth::{Claims, JwtDecoderState, LocalDecoder};
use axum_jwt_auth::{JwtDecoderState, LocalDecoder};
use jsonwebtoken::Validation;
pub type AppSchema = Schema<QueryRoot, MutationRoot, SubscriptionRoot>;
@ -35,9 +36,14 @@ pub type AppSchema = Schema<QueryRoot, MutationRoot, SubscriptionRoot>;
pub struct AppState {
pub schema: AppSchema,
pub decoder: JwtDecoderState<MyClaims>,
pub status_sender: Option<broadcast::Sender<StatusUpdate>>,
}
pub fn create_router(pool: PgPool, config: Config) -> Router {
pub fn create_router(
pool: PgPool,
config: Config,
status_sender: Option<broadcast::Sender<StatusUpdate>>,
) -> Router {
let user_service = UserService::new(pool.clone(), config.jwt_secret.clone());
let invite_code_service = InviteCodeService::new(pool.clone());
let system_config_service = SystemConfigService::new(pool.clone());
@ -48,6 +54,7 @@ pub fn create_router(pool: PgPool, config: Config) -> Router {
.data(invite_code_service)
.data(system_config_service)
.data(config.clone())
.data(status_sender.clone())
.finish();
let keys = vec![DecodingKey::from_secret(config.jwt_secret.as_bytes())];
@ -59,16 +66,18 @@ pub fn create_router(pool: PgPool, config: Config) -> Router {
.unwrap();
let app_state = AppState {
schema,
schema: schema.clone(),
decoder: JwtDecoderState {
decoder: Arc::new(decoder),
},
status_sender,
};
let router = ReverseProxy::new("/api", &config.tile_server_url.as_str());
Router::new()
.route("/", get(graphql_playground))
.route("/graphql", get(graphql_playground).post(graphql_handler))
.route_service("/ws", GraphQLSubscription::new(schema))
.layer(CorsLayer::permissive())
.merge(router)
.with_state(app_state)
@ -90,12 +99,3 @@ async fn graphql_handler(
async fn graphql_playground() -> impl IntoResponse {
Html(playground_source(GraphQLPlaygroundConfig::new("/graphql")))
}
// async fn graphql_subscription_handler(
// State(state): State<AppState>,
// ws: axum::extract::WebSocketUpgrade,
// ) -> Response {
// ws.on_upgrade(move |socket| async move {
// GraphQLSubscription::new(socket, state.schema).serve().await
// })
// }

View File

@ -39,6 +39,10 @@ pub struct ServeArgs {
/// 是否启用详细日志
#[arg(short, long)]
pub verbose: bool,
/// 是否启用Kafka消息监听器
#[arg(short, long)]
pub kafka: bool,
}
#[derive(Args)]
@ -63,6 +67,7 @@ impl Default for Commands {
host: "0.0.0.0".parse().unwrap(),
dev: false,
verbose: false,
kafka: false,
})
}
}

View File

@ -6,6 +6,10 @@ pub struct Config {
pub jwt_secret: String,
pub port: u16,
pub tile_server_url: String,
pub kafka_brokers: String,
pub kafka_topic: String,
pub kafka_group_id: String,
}
impl Config {
@ -20,6 +24,10 @@ impl Config {
.parse()
.unwrap_or(3000),
tile_server_url: env::var("TILE_SERVER")?,
kafka_brokers: env::var("KAFKA_BROKERS")?,
kafka_topic: env::var("KAFKA_TOPIC")?,
kafka_group_id: env::var("KAFKA_GROUP_ID")?,
})
}
}

View File

@ -1,7 +1,21 @@
use async_graphql::{Subscription, Result};
use async_graphql::{Context, Result, SimpleObject, Subscription};
use chrono::{DateTime, Utc};
use futures_util::{Stream, StreamExt};
use std::time::Duration;
use tokio::time::interval;
use tokio::sync::broadcast;
use tokio::time::{interval, timeout};
use tracing::{error, info, warn};
use uuid::Uuid;
#[derive(SimpleObject, Clone, Debug)]
pub struct StatusUpdate {
pub id: Uuid,
pub status: String,
pub message: String,
pub timestamp: DateTime<Utc>,
pub data: Option<String>,
pub newest_dt: Option<String>,
}
pub struct SubscriptionRoot;
@ -19,4 +33,212 @@ impl SubscriptionRoot {
}
}
}
}
/// 状态更新订阅 - 客户端可以订阅这个来接收Kafka消息的状态更新
async fn status_updates(&self, ctx: &Context<'_>) -> Result<impl Stream<Item = StatusUpdate>> {
// 尝试从上下文中获取状态发送器
if let Ok(status_sender) = ctx.data::<Option<broadcast::Sender<StatusUpdate>>>() {
if let Some(sender) = status_sender {
// 创建接收器
let mut receiver = sender.subscribe();
info!("新的WebSocket客户端连接到状态更新订阅");
return Ok(async_stream::stream! {
// 发送初始连接确认消息
let welcome = StatusUpdate {
id: Uuid::new_v4(),
status: "connected".to_string(),
message: "WebSocket连接已建立".to_string(),
timestamp: Utc::now(),
data: Some("websocket_connected".to_string()),
newest_dt: None,
};
yield welcome;
// 设置心跳间隔
let mut heartbeat_interval = interval(Duration::from_secs(30));
loop {
tokio::select! {
// 处理来自Kafka的状态更新
result = timeout(Duration::from_secs(5), receiver.recv()) => {
match result {
Ok(Ok(update)) => {
info!("转发状态更新到WebSocket客户端: {}", update.message);
yield update;
}
Ok(Err(broadcast::error::RecvError::Lagged(n))) => {
warn!("WebSocket客户端消息滞后 {} 条消息,尝试恢复", n);
// 发送滞后警告但继续订阅
let lag_warning = StatusUpdate {
id: Uuid::new_v4(),
status: "warning".to_string(),
message: format!("连接滞后 {} 条消息", n),
timestamp: Utc::now(),
data: None,
newest_dt: None,
};
yield lag_warning;
continue;
}
Ok(Err(broadcast::error::RecvError::Closed)) => {
error!("广播通道已关闭");
break;
}
Err(_) => {
// 超时,继续下一次循环(避免阻塞)
continue;
}
}
}
// 发送心跳
_ = heartbeat_interval.tick() => {
let heartbeat = StatusUpdate {
id: Uuid::new_v4(),
status: "heartbeat".to_string(),
message: "连接正常".to_string(),
timestamp: Utc::now(),
data: Some("heartbeat".to_string()),
newest_dt: None,
};
yield heartbeat;
}
}
}
// 连接关闭时的清理
info!("WebSocket客户端断开连接");
}
.boxed());
}
}
// 如果没有Kafka发送器返回模拟数据流
let mut interval = interval(Duration::from_secs(5));
Ok(async_stream::stream! {
loop {
interval.tick().await;
// 模拟状态更新
let update = StatusUpdate {
id: Uuid::new_v4(),
status: "demo".to_string(),
message: "模拟状态更新 - Kafka未启用".to_string(),
timestamp: Utc::now(),
data: Some("demo/path/sample.png".to_string()),
newest_dt: None,
};
yield update;
}
}
.boxed())
}
/// 特定用户的状态更新订阅
async fn user_status_updates(
&self,
ctx: &Context<'_>,
user_id: Uuid,
) -> Result<impl Stream<Item = StatusUpdate>> {
// 尝试从上下文中获取状态发送器
if let Ok(status_sender) = ctx.data::<Option<broadcast::Sender<StatusUpdate>>>() {
if let Some(sender) = status_sender {
let mut receiver = sender.subscribe();
info!("用户 {} 连接到状态更新订阅", user_id);
return Ok(async_stream::stream! {
// 发送用户特定的欢迎消息
let welcome = StatusUpdate {
id: Uuid::new_v4(),
status: "connected".to_string(),
message: format!("用户 {} WebSocket连接已建立", user_id),
timestamp: Utc::now(),
data: Some(user_id.to_string()),
newest_dt: None,
};
yield welcome;
let mut heartbeat_interval = interval(Duration::from_secs(30));
loop {
tokio::select! {
result = timeout(Duration::from_secs(5), receiver.recv()) => {
match result {
Ok(Ok(update)) => {
// 过滤只返回特定用户的更新这里简单比较ID
if update.id == user_id {
yield update;
}
}
Ok(Err(broadcast::error::RecvError::Lagged(n))) => {
warn!("用户 {} 的连接滞后 {} 条消息", user_id, n);
let lag_warning = StatusUpdate {
id: user_id,
status: "warning".to_string(),
message: format!("连接滞后 {} 条消息", n),
timestamp: Utc::now(),
data: None,
newest_dt: None,
};
yield lag_warning;
continue;
}
Ok(Err(broadcast::error::RecvError::Closed)) => {
error!("用户 {} 的广播通道已关闭", user_id);
break;
}
Err(_) => {
continue;
}
}
}
_ = heartbeat_interval.tick() => {
let heartbeat = StatusUpdate {
id: user_id,
status: "heartbeat".to_string(),
message: format!("用户 {} 连接正常", user_id),
timestamp: Utc::now(),
data: Some("heartbeat".to_string()),
newest_dt: None,
};
yield heartbeat;
}
}
}
info!("用户 {} WebSocket连接断开", user_id);
}
.boxed());
}
}
// 如果没有Kafka发送器返回模拟数据流
let mut interval = interval(Duration::from_secs(3));
Ok(async_stream::stream! {
loop {
interval.tick().await;
// 模拟用户特定的状态更新
let update = StatusUpdate {
id: user_id,
status: "demo".to_string(),
message: format!("模拟用户 {} 的状态更新", user_id),
timestamp: Utc::now(),
data: None,
newest_dt: None,
};
yield update;
}
}
.boxed())
}
}

238
src/listener/mod.rs Normal file
View File

@ -0,0 +1,238 @@
use rdkafka::{
config::ClientConfig,
consumer::{Consumer, StreamConsumer},
error::KafkaError,
message::Message,
};
use serde_json;
use tokio::sync::broadcast;
use tracing::{error, info, warn};
use crate::{cli, config::Config, graphql::subscription::StatusUpdate, models::KafkaMessage};
use chrono::Utc;
use uuid::Uuid;
pub struct KafkaListener {
pub client: StreamConsumer,
pub status_sender: broadcast::Sender<StatusUpdate>,
}
#[derive(Debug)]
pub enum ListenerError {
KafkaError(KafkaError),
JsonError(serde_json::Error),
MessageError(String),
}
impl From<KafkaError> for ListenerError {
fn from(err: KafkaError) -> Self {
ListenerError::KafkaError(err)
}
}
impl From<serde_json::Error> for ListenerError {
fn from(err: serde_json::Error) -> Self {
ListenerError::JsonError(err)
}
}
impl KafkaListener {
pub fn new(config: &Config) -> Result<(Self, broadcast::Receiver<StatusUpdate>), KafkaError> {
let client: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", &config.kafka_brokers)
.set("group.id", &config.kafka_group_id)
.set("auto.offset.reset", "earliest")
.set("enable.auto.commit", "true")
.create()?;
client.subscribe(&[&config.kafka_topic])?;
// 创建广播通道容量为1000个消息
let (status_sender, status_receiver) = broadcast::channel(1000);
Ok((
Self {
client,
status_sender,
},
status_receiver,
))
}
pub async fn run(&mut self) -> Result<(), ListenerError> {
info!("Kafka listener 开始运行...");
loop {
match self.client.recv().await {
Ok(msg) => {
if let Some(payload) = msg.payload() {
match self.process_message(payload).await {
Ok(_) => {
info!("消息处理成功");
}
Err(e) => {
error!("消息处理失败: {:?}", e);
// 发送错误状态更新
self.send_error_status(&format!("消息处理失败: {:?}", e))
.await;
}
}
} else {
warn!("接收到空消息");
}
}
Err(e) => {
error!("接收消息时发生错误: {:?}", e);
self.send_error_status(&format!("接收消息错误: {:?}", e))
.await;
return Err(ListenerError::KafkaError(e));
}
}
}
}
async fn process_message(&self, payload: &[u8]) -> Result<(), ListenerError> {
// 将字节数组转换为字符串
let message_str = match std::str::from_utf8(payload) {
Ok(s) => s,
Err(e) => {
return Err(ListenerError::MessageError(format!(
"无法将消息转换为UTF-8字符串: {}",
e
)));
}
};
info!("接收到原始消息: {}", message_str);
info!("消息长度: {} 字节", message_str.len());
// 检查消息是否被截断(检查最后几个字符)
if message_str.len() > 10 {
let last_chars = &message_str[message_str.len().saturating_sub(10)..];
info!("消息末尾10个字符: {:?}", last_chars);
}
// 验证JSON格式基本完整性
let brace_count = message_str.chars().filter(|&c| c == '{').count();
let close_brace_count = message_str.chars().filter(|&c| c == '}').count();
if brace_count != close_brace_count {
error!(
"JSON大括号不匹配: 开括号 {} 个,闭括号 {} 个",
brace_count, close_brace_count
);
}
// 解析 JSON 消息,添加更详细的错误处理
let kafka_message: KafkaMessage = match serde_json::from_str(message_str) {
Ok(msg) => msg,
Err(e) => {
error!("JSON解析失败 - 错误: {}", e);
error!("原始消息内容: {}", message_str);
error!("消息长度: {} 字节", message_str.len());
error!("错误位置: {}", e.to_string());
// 尝试部分解析来定位问题
if let Ok(value) = serde_json::from_str::<serde_json::Value>(message_str) {
error!("消息可以解析为通用JSON值问题可能在结构映射上");
error!("解析后的JSON结构: {:#}", value);
} else {
error!("消息无法解析为有效JSON");
}
return Err(ListenerError::JsonError(e));
}
};
info!(
"解析消息成功 - ID: {}, 区域: {}, 文件路径: {}",
kafka_message.output_message.id,
kafka_message.get_area(),
kafka_message.get_file_path()
);
self.handle_success_message(&kafka_message).await?;
Ok(())
}
async fn handle_success_message(&self, message: &KafkaMessage) -> Result<(), ListenerError> {
info!(
"处理成功消息 - 数据时间: {}, OSS文件: {}",
message.get_data_time(),
message.get_file_path()
);
// 发送成功状态更新到WebSocket客户端
let status_update = StatusUpdate {
id: Uuid::new_v4(),
status: "success".to_string(),
message: format!(
"文件处理成功 - 区域: {}, 时间: {}, 文件: {}",
message.get_area(),
message.get_data_time(),
message.get_file_path()
),
timestamp: Utc::now(),
data: Some(message.get_file_path().to_string()),
newest_dt: Some(message.get_data_time().to_string()),
};
if let Err(e) = self.status_sender.send(status_update) {
// 当没有订阅者时这是正常情况使用debug级别而不是warn
if self.status_sender.receiver_count() == 0 {
tracing::debug!("没有活跃的状态更新订阅者");
} else {
warn!("发送状态更新失败: {:?}", e);
}
}
// TODO: 在这里添加具体的业务逻辑
// 例如:
// - 更新数据库状态
// - 通知其他服务
// - 缓存文件信息等
self.update_file_status(message).await?;
Ok(())
}
async fn update_file_status(&self, message: &KafkaMessage) -> Result<(), ListenerError> {
// TODO: 实现数据库更新逻辑
// 这里可以添加数据库操作来更新文件处理状态
info!(
"文件状态更新完成 - 区域: {}, 时间: {}, 文件: {}",
message.get_area(),
message.get_data_time(),
message.get_file_path()
);
Ok(())
}
async fn send_error_status(&self, error_message: &str) {
let status_update = StatusUpdate {
id: Uuid::new_v4(),
status: "error".to_string(),
message: error_message.to_string(),
timestamp: Utc::now(),
data: None,
newest_dt: None,
};
if let Err(e) = self.status_sender.send(status_update) {
// 当没有订阅者时这是正常情况使用debug级别而不是warn
if self.status_sender.receiver_count() == 0 {
tracing::debug!("没有活跃的状态更新订阅者");
} else {
warn!("发送错误状态更新失败: {:?}", e);
}
}
}
/// 获取状态更新接收器的克隆
pub fn get_status_receiver(&self) -> broadcast::Receiver<StatusUpdate> {
self.status_sender.subscribe()
}
}

View File

@ -4,6 +4,7 @@ mod cli;
mod config;
mod db;
mod graphql;
mod listener;
mod models;
mod services;
@ -12,8 +13,10 @@ use clap::Parser;
use cli::{Cli, Commands, MigrateArgs, ServeArgs};
use config::Config;
use db::{create_pool, run_migrations};
use listener::KafkaListener;
use rustls;
use std::process;
use tokio::task;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@ -76,6 +79,17 @@ async fn serve_command(args: ServeArgs) -> Result<(), Box<dyn std::error::Error>
// 显示配置信息
print_config_info(&config, &args);
// 显示Kafka状态
if args.kafka {
println!("📡 Kafka监听器: 启用");
println!(" 🔗 Kafka集群: {}", config.kafka_brokers);
println!(" 📺 主题: {}", config.kafka_topic);
println!(" 👥 消费者组: {}", config.kafka_group_id);
} else {
println!("📡 Kafka监听器: 禁用");
}
println!();
println!("🔗 正在连接数据库...");
@ -89,8 +103,32 @@ async fn serve_command(args: ServeArgs) -> Result<(), Box<dyn std::error::Error>
println!("✅ 数据库迁移完成");
}
// 启动Kafka监听器如果启用
let status_sender = if args.kafka {
let kafka_config = config.clone();
match KafkaListener::new(&kafka_config) {
Ok((mut listener, _status_receiver)) => {
let sender = listener.status_sender.clone();
task::spawn(async move {
tracing::info!("正在启动 Kafka 监听器...");
if let Err(e) = listener.run().await {
tracing::error!("Kafka 监听器错误: {:?}", e);
}
});
println!("📡 Kafka监听器已启动");
Some(sender)
}
Err(e) => {
tracing::error!("无法创建 Kafka 监听器: {:?}", e);
None
}
}
} else {
None
};
println!("⚙️ 正在创建GraphQL路由...");
let router = create_router(pool, config.clone());
let router = create_router(pool, config.clone(), status_sender);
let bind_addr = format!("{}:{}", args.host, config.port);
println!("🔌 正在绑定到地址: {}", bind_addr);

View File

@ -0,0 +1,58 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Deserializer, Serialize};
use uuid::Uuid;
// 自定义反序列化函数来处理不带时区的时间字符串
fn deserialize_datetime<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
// 如果字符串不以Z结尾添加Z后缀来表示UTC时间
let time_str = if s.ends_with('Z') {
s
} else {
format!("{}Z", s)
};
DateTime::parse_from_rfc3339(&time_str)
.map(|dt| dt.with_timezone(&Utc))
.map_err(serde::de::Error::custom)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OutputMessage {
#[serde(default = "Uuid::new_v4")]
pub id: Uuid,
pub oss_url: String,
pub area: String,
pub data_time: String,
#[serde(deserialize_with = "deserialize_datetime")]
pub ingestion_time: DateTime<Utc>,
pub status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaMessage {
pub success: bool,
pub output_message: OutputMessage,
#[serde(default = "Uuid::new_v4")]
pub record_id: Uuid,
}
impl KafkaMessage {
/// 获取文件路径
pub fn get_file_path(&self) -> &str {
&self.output_message.oss_url
}
/// 获取数据时间
pub fn get_data_time(&self) -> &str {
&self.output_message.data_time
}
/// 获取区域
pub fn get_area(&self) -> &str {
&self.output_message.area
}
}

View File

@ -1,5 +1,7 @@
pub mod invite_code;
pub mod kafka_message;
pub mod user;
pub use invite_code::*;
pub use kafka_message::*;
pub use user::*;