From f9072ead3c174dcba6e181d8748adaa6aceb44e8 Mon Sep 17 00:00:00 2001 From: tsuki Date: Sat, 2 Aug 2025 00:35:38 +0800 Subject: [PATCH] sync subscribtion --- Cargo.lock | 66 ++++++++++ Cargo.toml | 2 + src/app.rs | 30 ++--- src/cli.rs | 5 + src/config.rs | 8 ++ src/graphql/subscription.rs | 228 +++++++++++++++++++++++++++++++++- src/listener/mod.rs | 238 ++++++++++++++++++++++++++++++++++++ src/main.rs | 40 +++++- src/models/kafka_message.rs | 58 +++++++++ src/models/mod.rs | 2 + 10 files changed, 658 insertions(+), 19 deletions(-) create mode 100644 src/listener/mod.rs create mode 100644 src/models/kafka_message.rs diff --git a/Cargo.lock b/Cargo.lock index afef22c..96d5cf3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2075,6 +2075,18 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libz-sys" +version = "1.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b70e7a7df205e92a1a4cd9aaae7898dac0aa555503cc0a649494d0d60e7651d" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.3.8" @@ -2150,10 +2162,12 @@ dependencies = [ "dotenvy", "futures-util", "jsonwebtoken", + "rdkafka", "rustls", "sea-query", "sea-query-binder", "serde", + "serde_json", "sqlx", "tokio", "tower 0.4.13", @@ -2356,6 +2370,28 @@ dependencies = [ "libm", ] +[[package]] +name = "num_enum" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a973b4e44ce6cad84ce69d797acf9a044532e4184c4f267913d1b546a0727b7a" +dependencies = [ + "num_enum_derive", + "rustversion", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77e878c846a8abae00dd069496dbe8751b16ac1c3d6bd2a7283a938e8228f90d" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "object" version = "0.36.7" @@ -2823,6 +2859,36 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rdkafka" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f1856d72dbbbea0d2a5b2eaf6af7fb3847ef2746e883b11781446a51dbc85c0" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.9.0+2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5230dca48bc354d718269f3e4353280e188b610f7af7e2fcf54b7a79d5802872" +dependencies = [ + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + [[package]] name = "redox_syscall" version = "0.5.13" diff --git a/Cargo.toml b/Cargo.toml index c83230f..9a65f11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ async-graphql-axum = "7.0.17" axum = { version = "0.8.4", features = ["ws", "macros"] } chrono = { version = "0.4", features = ["serde"] } serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" sqlx = { version = "0.8", features = [ # "runtime-tokio-rustls", "postgres", @@ -46,4 +47,5 @@ sea-query-binder = {version = "0.7.0",features = [ axum-reverse-proxy = "1.0.3" rustls = { version = "0.23", features = ["aws-lc-rs"] } clap = { version = "4.0", features = ["derive"] } +rdkafka = "0.38.0" diff --git a/src/app.rs b/src/app.rs index 20bafdb..88d1e7b 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,5 +1,6 @@ use axum_reverse_proxy::ReverseProxy; use std::sync::Arc; +use tokio::sync::broadcast; use async_graphql::{ http::{playground_source, GraphQLPlaygroundConfig}, @@ -9,7 +10,7 @@ use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription}; use axum::{ extract::{FromRef, State}, response::{Html, IntoResponse}, - routing::{get, post}, + routing::get, Router, }; use jsonwebtoken::DecodingKey; @@ -17,16 +18,16 @@ use sqlx::PgPool; use tower_http::cors::CorsLayer; use crate::{ - auth::{AuthUser, AuthUserState, Claims as MyClaims}, + auth::{AuthUserState, Claims as MyClaims}, config::Config, - graphql::{MutationRoot, QueryRoot, SubscriptionRoot}, + graphql::{subscription::StatusUpdate, MutationRoot, QueryRoot, SubscriptionRoot}, services::{ invite_code_service::InviteCodeService, system_config_service::SystemConfigService, user_service::UserService, }, }; -use axum_jwt_auth::{Claims, JwtDecoderState, LocalDecoder}; +use axum_jwt_auth::{JwtDecoderState, LocalDecoder}; use jsonwebtoken::Validation; pub type AppSchema = Schema; @@ -35,9 +36,14 @@ pub type AppSchema = Schema; pub struct AppState { pub schema: AppSchema, pub decoder: JwtDecoderState, + pub status_sender: Option>, } -pub fn create_router(pool: PgPool, config: Config) -> Router { +pub fn create_router( + pool: PgPool, + config: Config, + status_sender: Option>, +) -> Router { let user_service = UserService::new(pool.clone(), config.jwt_secret.clone()); let invite_code_service = InviteCodeService::new(pool.clone()); let system_config_service = SystemConfigService::new(pool.clone()); @@ -48,6 +54,7 @@ pub fn create_router(pool: PgPool, config: Config) -> Router { .data(invite_code_service) .data(system_config_service) .data(config.clone()) + .data(status_sender.clone()) .finish(); let keys = vec![DecodingKey::from_secret(config.jwt_secret.as_bytes())]; @@ -59,16 +66,18 @@ pub fn create_router(pool: PgPool, config: Config) -> Router { .unwrap(); let app_state = AppState { - schema, + schema: schema.clone(), decoder: JwtDecoderState { decoder: Arc::new(decoder), }, + status_sender, }; let router = ReverseProxy::new("/api", &config.tile_server_url.as_str()); Router::new() .route("/", get(graphql_playground)) .route("/graphql", get(graphql_playground).post(graphql_handler)) + .route_service("/ws", GraphQLSubscription::new(schema)) .layer(CorsLayer::permissive()) .merge(router) .with_state(app_state) @@ -90,12 +99,3 @@ async fn graphql_handler( async fn graphql_playground() -> impl IntoResponse { Html(playground_source(GraphQLPlaygroundConfig::new("/graphql"))) } - -// async fn graphql_subscription_handler( -// State(state): State, -// ws: axum::extract::WebSocketUpgrade, -// ) -> Response { -// ws.on_upgrade(move |socket| async move { -// GraphQLSubscription::new(socket, state.schema).serve().await -// }) -// } diff --git a/src/cli.rs b/src/cli.rs index 7acc009..865286f 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -39,6 +39,10 @@ pub struct ServeArgs { /// 是否启用详细日志 #[arg(short, long)] pub verbose: bool, + + /// 是否启用Kafka消息监听器 + #[arg(short, long)] + pub kafka: bool, } #[derive(Args)] @@ -63,6 +67,7 @@ impl Default for Commands { host: "0.0.0.0".parse().unwrap(), dev: false, verbose: false, + kafka: false, }) } } diff --git a/src/config.rs b/src/config.rs index 02fffb1..d875f00 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,6 +6,10 @@ pub struct Config { pub jwt_secret: String, pub port: u16, pub tile_server_url: String, + + pub kafka_brokers: String, + pub kafka_topic: String, + pub kafka_group_id: String, } impl Config { @@ -20,6 +24,10 @@ impl Config { .parse() .unwrap_or(3000), tile_server_url: env::var("TILE_SERVER")?, + + kafka_brokers: env::var("KAFKA_BROKERS")?, + kafka_topic: env::var("KAFKA_TOPIC")?, + kafka_group_id: env::var("KAFKA_GROUP_ID")?, }) } } diff --git a/src/graphql/subscription.rs b/src/graphql/subscription.rs index da7c8f0..fb51798 100644 --- a/src/graphql/subscription.rs +++ b/src/graphql/subscription.rs @@ -1,7 +1,21 @@ -use async_graphql::{Subscription, Result}; +use async_graphql::{Context, Result, SimpleObject, Subscription}; +use chrono::{DateTime, Utc}; use futures_util::{Stream, StreamExt}; use std::time::Duration; -use tokio::time::interval; +use tokio::sync::broadcast; +use tokio::time::{interval, timeout}; +use tracing::{error, info, warn}; +use uuid::Uuid; + +#[derive(SimpleObject, Clone, Debug)] +pub struct StatusUpdate { + pub id: Uuid, + pub status: String, + pub message: String, + pub timestamp: DateTime, + pub data: Option, + pub newest_dt: Option, +} pub struct SubscriptionRoot; @@ -19,4 +33,212 @@ impl SubscriptionRoot { } } } -} \ No newline at end of file + + /// 状态更新订阅 - 客户端可以订阅这个来接收Kafka消息的状态更新 + async fn status_updates(&self, ctx: &Context<'_>) -> Result> { + // 尝试从上下文中获取状态发送器 + if let Ok(status_sender) = ctx.data::>>() { + if let Some(sender) = status_sender { + // 创建接收器 + let mut receiver = sender.subscribe(); + + info!("新的WebSocket客户端连接到状态更新订阅"); + + return Ok(async_stream::stream! { + // 发送初始连接确认消息 + let welcome = StatusUpdate { + id: Uuid::new_v4(), + status: "connected".to_string(), + message: "WebSocket连接已建立".to_string(), + timestamp: Utc::now(), + data: Some("websocket_connected".to_string()), + newest_dt: None, + }; + yield welcome; + + // 设置心跳间隔 + let mut heartbeat_interval = interval(Duration::from_secs(30)); + + loop { + tokio::select! { + // 处理来自Kafka的状态更新 + result = timeout(Duration::from_secs(5), receiver.recv()) => { + match result { + Ok(Ok(update)) => { + info!("转发状态更新到WebSocket客户端: {}", update.message); + yield update; + } + Ok(Err(broadcast::error::RecvError::Lagged(n))) => { + warn!("WebSocket客户端消息滞后 {} 条消息,尝试恢复", n); + // 发送滞后警告但继续订阅 + let lag_warning = StatusUpdate { + id: Uuid::new_v4(), + status: "warning".to_string(), + message: format!("连接滞后 {} 条消息", n), + timestamp: Utc::now(), + data: None, + newest_dt: None, + }; + yield lag_warning; + continue; + } + Ok(Err(broadcast::error::RecvError::Closed)) => { + error!("广播通道已关闭"); + break; + } + Err(_) => { + // 超时,继续下一次循环(避免阻塞) + continue; + } + } + } + + // 发送心跳 + _ = heartbeat_interval.tick() => { + let heartbeat = StatusUpdate { + id: Uuid::new_v4(), + status: "heartbeat".to_string(), + message: "连接正常".to_string(), + timestamp: Utc::now(), + data: Some("heartbeat".to_string()), + newest_dt: None, + }; + yield heartbeat; + } + } + } + + // 连接关闭时的清理 + info!("WebSocket客户端断开连接"); + } + .boxed()); + } + } + + // 如果没有Kafka发送器,返回模拟数据流 + let mut interval = interval(Duration::from_secs(5)); + + Ok(async_stream::stream! { + loop { + interval.tick().await; + + // 模拟状态更新 + let update = StatusUpdate { + id: Uuid::new_v4(), + status: "demo".to_string(), + message: "模拟状态更新 - Kafka未启用".to_string(), + timestamp: Utc::now(), + data: Some("demo/path/sample.png".to_string()), + newest_dt: None, + + }; + + yield update; + } + } + .boxed()) + } + + /// 特定用户的状态更新订阅 + async fn user_status_updates( + &self, + ctx: &Context<'_>, + user_id: Uuid, + ) -> Result> { + // 尝试从上下文中获取状态发送器 + if let Ok(status_sender) = ctx.data::>>() { + if let Some(sender) = status_sender { + let mut receiver = sender.subscribe(); + + info!("用户 {} 连接到状态更新订阅", user_id); + + return Ok(async_stream::stream! { + // 发送用户特定的欢迎消息 + let welcome = StatusUpdate { + id: Uuid::new_v4(), + status: "connected".to_string(), + message: format!("用户 {} WebSocket连接已建立", user_id), + timestamp: Utc::now(), + data: Some(user_id.to_string()), + newest_dt: None, + }; + yield welcome; + + let mut heartbeat_interval = interval(Duration::from_secs(30)); + + loop { + tokio::select! { + result = timeout(Duration::from_secs(5), receiver.recv()) => { + match result { + Ok(Ok(update)) => { + // 过滤只返回特定用户的更新(这里简单比较ID) + if update.id == user_id { + yield update; + } + } + Ok(Err(broadcast::error::RecvError::Lagged(n))) => { + warn!("用户 {} 的连接滞后 {} 条消息", user_id, n); + let lag_warning = StatusUpdate { + id: user_id, + status: "warning".to_string(), + message: format!("连接滞后 {} 条消息", n), + timestamp: Utc::now(), + data: None, + newest_dt: None, + }; + yield lag_warning; + continue; + } + Ok(Err(broadcast::error::RecvError::Closed)) => { + error!("用户 {} 的广播通道已关闭", user_id); + break; + } + Err(_) => { + continue; + } + } + } + + _ = heartbeat_interval.tick() => { + let heartbeat = StatusUpdate { + id: user_id, + status: "heartbeat".to_string(), + message: format!("用户 {} 连接正常", user_id), + timestamp: Utc::now(), + data: Some("heartbeat".to_string()), + newest_dt: None, + }; + yield heartbeat; + } + } + } + + info!("用户 {} WebSocket连接断开", user_id); + } + .boxed()); + } + } + + // 如果没有Kafka发送器,返回模拟数据流 + let mut interval = interval(Duration::from_secs(3)); + + Ok(async_stream::stream! { + loop { + interval.tick().await; + + // 模拟用户特定的状态更新 + let update = StatusUpdate { + id: user_id, + status: "demo".to_string(), + message: format!("模拟用户 {} 的状态更新", user_id), + timestamp: Utc::now(), + data: None, + newest_dt: None, + }; + + yield update; + } + } + .boxed()) + } +} diff --git a/src/listener/mod.rs b/src/listener/mod.rs new file mode 100644 index 0000000..fcfb8b4 --- /dev/null +++ b/src/listener/mod.rs @@ -0,0 +1,238 @@ +use rdkafka::{ + config::ClientConfig, + consumer::{Consumer, StreamConsumer}, + error::KafkaError, + message::Message, +}; +use serde_json; +use tokio::sync::broadcast; +use tracing::{error, info, warn}; + +use crate::{cli, config::Config, graphql::subscription::StatusUpdate, models::KafkaMessage}; +use chrono::Utc; +use uuid::Uuid; + +pub struct KafkaListener { + pub client: StreamConsumer, + pub status_sender: broadcast::Sender, +} + +#[derive(Debug)] +pub enum ListenerError { + KafkaError(KafkaError), + JsonError(serde_json::Error), + MessageError(String), +} + +impl From for ListenerError { + fn from(err: KafkaError) -> Self { + ListenerError::KafkaError(err) + } +} + +impl From for ListenerError { + fn from(err: serde_json::Error) -> Self { + ListenerError::JsonError(err) + } +} + +impl KafkaListener { + pub fn new(config: &Config) -> Result<(Self, broadcast::Receiver), KafkaError> { + let client: StreamConsumer = ClientConfig::new() + .set("bootstrap.servers", &config.kafka_brokers) + .set("group.id", &config.kafka_group_id) + .set("auto.offset.reset", "earliest") + .set("enable.auto.commit", "true") + .create()?; + + client.subscribe(&[&config.kafka_topic])?; + + // 创建广播通道,容量为1000个消息 + let (status_sender, status_receiver) = broadcast::channel(1000); + + Ok(( + Self { + client, + status_sender, + }, + status_receiver, + )) + } + + pub async fn run(&mut self) -> Result<(), ListenerError> { + info!("Kafka listener 开始运行..."); + + loop { + match self.client.recv().await { + Ok(msg) => { + if let Some(payload) = msg.payload() { + match self.process_message(payload).await { + Ok(_) => { + info!("消息处理成功"); + } + Err(e) => { + error!("消息处理失败: {:?}", e); + // 发送错误状态更新 + self.send_error_status(&format!("消息处理失败: {:?}", e)) + .await; + } + } + } else { + warn!("接收到空消息"); + } + } + Err(e) => { + error!("接收消息时发生错误: {:?}", e); + self.send_error_status(&format!("接收消息错误: {:?}", e)) + .await; + return Err(ListenerError::KafkaError(e)); + } + } + } + } + + async fn process_message(&self, payload: &[u8]) -> Result<(), ListenerError> { + // 将字节数组转换为字符串 + let message_str = match std::str::from_utf8(payload) { + Ok(s) => s, + Err(e) => { + return Err(ListenerError::MessageError(format!( + "无法将消息转换为UTF-8字符串: {}", + e + ))); + } + }; + + info!("接收到原始消息: {}", message_str); + info!("消息长度: {} 字节", message_str.len()); + + // 检查消息是否被截断(检查最后几个字符) + if message_str.len() > 10 { + let last_chars = &message_str[message_str.len().saturating_sub(10)..]; + info!("消息末尾10个字符: {:?}", last_chars); + } + + // 验证JSON格式基本完整性 + let brace_count = message_str.chars().filter(|&c| c == '{').count(); + let close_brace_count = message_str.chars().filter(|&c| c == '}').count(); + if brace_count != close_brace_count { + error!( + "JSON大括号不匹配: 开括号 {} 个,闭括号 {} 个", + brace_count, close_brace_count + ); + } + + // 解析 JSON 消息,添加更详细的错误处理 + let kafka_message: KafkaMessage = match serde_json::from_str(message_str) { + Ok(msg) => msg, + Err(e) => { + error!("JSON解析失败 - 错误: {}", e); + error!("原始消息内容: {}", message_str); + error!("消息长度: {} 字节", message_str.len()); + error!("错误位置: {}", e.to_string()); + + // 尝试部分解析来定位问题 + if let Ok(value) = serde_json::from_str::(message_str) { + error!("消息可以解析为通用JSON值,问题可能在结构映射上"); + error!("解析后的JSON结构: {:#}", value); + } else { + error!("消息无法解析为有效JSON"); + } + + return Err(ListenerError::JsonError(e)); + } + }; + + info!( + "解析消息成功 - ID: {}, 区域: {}, 文件路径: {}", + kafka_message.output_message.id, + kafka_message.get_area(), + kafka_message.get_file_path() + ); + + self.handle_success_message(&kafka_message).await?; + + Ok(()) + } + + async fn handle_success_message(&self, message: &KafkaMessage) -> Result<(), ListenerError> { + info!( + "处理成功消息 - 数据时间: {}, OSS文件: {}", + message.get_data_time(), + message.get_file_path() + ); + + // 发送成功状态更新到WebSocket客户端 + let status_update = StatusUpdate { + id: Uuid::new_v4(), + status: "success".to_string(), + message: format!( + "文件处理成功 - 区域: {}, 时间: {}, 文件: {}", + message.get_area(), + message.get_data_time(), + message.get_file_path() + ), + timestamp: Utc::now(), + data: Some(message.get_file_path().to_string()), + newest_dt: Some(message.get_data_time().to_string()), + }; + + if let Err(e) = self.status_sender.send(status_update) { + // 当没有订阅者时,这是正常情况,使用debug级别而不是warn + if self.status_sender.receiver_count() == 0 { + tracing::debug!("没有活跃的状态更新订阅者"); + } else { + warn!("发送状态更新失败: {:?}", e); + } + } + + // TODO: 在这里添加具体的业务逻辑 + // 例如: + // - 更新数据库状态 + // - 通知其他服务 + // - 缓存文件信息等 + + self.update_file_status(message).await?; + + Ok(()) + } + + async fn update_file_status(&self, message: &KafkaMessage) -> Result<(), ListenerError> { + // TODO: 实现数据库更新逻辑 + // 这里可以添加数据库操作来更新文件处理状态 + + info!( + "文件状态更新完成 - 区域: {}, 时间: {}, 文件: {}", + message.get_area(), + message.get_data_time(), + message.get_file_path() + ); + + Ok(()) + } + + async fn send_error_status(&self, error_message: &str) { + let status_update = StatusUpdate { + id: Uuid::new_v4(), + status: "error".to_string(), + message: error_message.to_string(), + timestamp: Utc::now(), + data: None, + newest_dt: None, + }; + + if let Err(e) = self.status_sender.send(status_update) { + // 当没有订阅者时,这是正常情况,使用debug级别而不是warn + if self.status_sender.receiver_count() == 0 { + tracing::debug!("没有活跃的状态更新订阅者"); + } else { + warn!("发送错误状态更新失败: {:?}", e); + } + } + } + + /// 获取状态更新接收器的克隆 + pub fn get_status_receiver(&self) -> broadcast::Receiver { + self.status_sender.subscribe() + } +} diff --git a/src/main.rs b/src/main.rs index e5b3c1d..8bd4ab4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ mod cli; mod config; mod db; mod graphql; +mod listener; mod models; mod services; @@ -12,8 +13,10 @@ use clap::Parser; use cli::{Cli, Commands, MigrateArgs, ServeArgs}; use config::Config; use db::{create_pool, run_migrations}; +use listener::KafkaListener; use rustls; use std::process; +use tokio::task; #[tokio::main] async fn main() -> Result<(), Box> { @@ -76,6 +79,17 @@ async fn serve_command(args: ServeArgs) -> Result<(), Box // 显示配置信息 print_config_info(&config, &args); + + // 显示Kafka状态 + if args.kafka { + println!("📡 Kafka监听器: 启用"); + println!(" 🔗 Kafka集群: {}", config.kafka_brokers); + println!(" 📺 主题: {}", config.kafka_topic); + println!(" 👥 消费者组: {}", config.kafka_group_id); + } else { + println!("📡 Kafka监听器: 禁用"); + } + println!(); println!("🔗 正在连接数据库..."); @@ -89,8 +103,32 @@ async fn serve_command(args: ServeArgs) -> Result<(), Box println!("✅ 数据库迁移完成"); } + // 启动Kafka监听器(如果启用) + let status_sender = if args.kafka { + let kafka_config = config.clone(); + match KafkaListener::new(&kafka_config) { + Ok((mut listener, _status_receiver)) => { + let sender = listener.status_sender.clone(); + task::spawn(async move { + tracing::info!("正在启动 Kafka 监听器..."); + if let Err(e) = listener.run().await { + tracing::error!("Kafka 监听器错误: {:?}", e); + } + }); + println!("📡 Kafka监听器已启动"); + Some(sender) + } + Err(e) => { + tracing::error!("无法创建 Kafka 监听器: {:?}", e); + None + } + } + } else { + None + }; + println!("⚙️ 正在创建GraphQL路由..."); - let router = create_router(pool, config.clone()); + let router = create_router(pool, config.clone(), status_sender); let bind_addr = format!("{}:{}", args.host, config.port); println!("🔌 正在绑定到地址: {}", bind_addr); diff --git a/src/models/kafka_message.rs b/src/models/kafka_message.rs new file mode 100644 index 0000000..441f8af --- /dev/null +++ b/src/models/kafka_message.rs @@ -0,0 +1,58 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Deserializer, Serialize}; +use uuid::Uuid; + +// 自定义反序列化函数来处理不带时区的时间字符串 +fn deserialize_datetime<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let s = String::deserialize(deserializer)?; + // 如果字符串不以Z结尾,添加Z后缀来表示UTC时间 + let time_str = if s.ends_with('Z') { + s + } else { + format!("{}Z", s) + }; + + DateTime::parse_from_rfc3339(&time_str) + .map(|dt| dt.with_timezone(&Utc)) + .map_err(serde::de::Error::custom) +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OutputMessage { + #[serde(default = "Uuid::new_v4")] + pub id: Uuid, + pub oss_url: String, + pub area: String, + pub data_time: String, + #[serde(deserialize_with = "deserialize_datetime")] + pub ingestion_time: DateTime, + pub status: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct KafkaMessage { + pub success: bool, + pub output_message: OutputMessage, + #[serde(default = "Uuid::new_v4")] + pub record_id: Uuid, +} + +impl KafkaMessage { + /// 获取文件路径 + pub fn get_file_path(&self) -> &str { + &self.output_message.oss_url + } + + /// 获取数据时间 + pub fn get_data_time(&self) -> &str { + &self.output_message.data_time + } + + /// 获取区域 + pub fn get_area(&self) -> &str { + &self.output_message.area + } +} diff --git a/src/models/mod.rs b/src/models/mod.rs index f30c7d6..f490ce1 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -1,5 +1,7 @@ pub mod invite_code; +pub mod kafka_message; pub mod user; pub use invite_code::*; +pub use kafka_message::*; pub use user::*;