sync mapp

This commit is contained in:
tsuki 2025-07-30 21:32:14 +08:00
parent 378094744b
commit 5a50d6b84b
13 changed files with 1932 additions and 168 deletions

3
.env
View File

@ -1,4 +1,5 @@
DATABASE_URL=postgresql://mmap:yjhcfzXWrzslzl1331@8.217.64.157:5433/mmap DATABASE_URL=postgresql://mmap:yjhcfzXWrzslzl1331@8.217.64.157:5433/mmap
JWT_SECRET="JvGpWgGWLHAhvhxN7BuOVtUWfMXm6xAqjClaTwOcAnI=" JWT_SECRET="JvGpWgGWLHAhvhxN7BuOVtUWfMXm6xAqjClaTwOcAnI="
RUST_LOG=debug RUST_LOG=debug
PORT=3050 PORT=3050
TILE_SERVER="http://127.0.0.1:3060/api"

1489
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -2,6 +2,8 @@
name = "mapp" name = "mapp"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
authors = ["Developer <dev@example.com>"]
description = "基于GraphQL的现代地图应用服务器"
[dependencies] [dependencies]
async-graphql = { version = "7.0.17", features = ["chrono", "uuid"] } async-graphql = { version = "7.0.17", features = ["chrono", "uuid"] }
@ -10,7 +12,7 @@ axum = { version = "0.8.4", features = ["ws", "macros"] }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
sqlx = { version = "0.8", features = [ sqlx = { version = "0.8", features = [
"runtime-tokio-rustls", # "runtime-tokio-rustls",
"postgres", "postgres",
"chrono", "chrono",
"uuid", "uuid",
@ -28,3 +30,20 @@ futures-util = "0.3"
tower = "0.4" tower = "0.4"
async-stream = "0.3" async-stream = "0.3"
axum-jwt-auth = "0.5.1" axum-jwt-auth = "0.5.1"
sea-query = "0.32.6"
sea-query-binder = {version = "0.7.0",features = [
"sqlx-postgres",
"with-chrono",
"with-json",
"with-rust_decimal",
"with-bigdecimal",
"with-uuid",
"with-time",
"with-ipnetwork",
"with-mac_address",
"runtime-async-std-native-tls",
]}
axum-reverse-proxy = "1.0.3"
rustls = { version = "0.23", features = ["aws-lc-rs"] }
clap = { version = "4.0", features = ["derive"] }

View File

@ -0,0 +1,59 @@
-- Add migration script here
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- 数据入库记录表
CREATE TABLE IF NOT EXISTS data_ingestion (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
ingestion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
data_time TIMESTAMP NOT NULL,
source VARCHAR(25) NOT NULL,
storage_url TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- 添加字段注释
COMMENT ON COLUMN data_ingestion.id IS '数据唯一标识符';
COMMENT ON COLUMN data_ingestion.ingestion_time IS '入库时间';
COMMENT ON COLUMN data_ingestion.data_time IS '数据表示的时间';
COMMENT ON COLUMN data_ingestion.source IS '存放源/数据来源';
COMMENT ON COLUMN data_ingestion.storage_url IS '存放URL或路径';
-- 为快速查找创建索引
CREATE INDEX IF NOT EXISTS idx_data_ingestion_ingestion_time ON data_ingestion(ingestion_time);
CREATE INDEX IF NOT EXISTS idx_data_ingestion_id_ingestion_time ON data_ingestion(id, ingestion_time);
CREATE INDEX IF NOT EXISTS idx_data_ingestion_data_time ON data_ingestion(data_time);
CREATE INDEX IF NOT EXISTS idx_data_ingestion_source ON data_ingestion(source);
-- 可选的复合索引(根据实际查询需求启用)
-- 如果经常按时间范围+来源查询,取消注释下面的索引
-- CREATE INDEX IF NOT EXISTS idx_data_ingestion_data_time_source ON data_ingestion(data_time, source);
-- CREATE INDEX IF NOT EXISTS idx_data_ingestion_source_data_time ON data_ingestion(source, data_time);
-- 创建更新时间自动更新的触发器函数
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ language 'plpgsql';
-- 为表创建更新时间触发器
CREATE TRIGGER update_data_ingestion_updated_at
BEFORE UPDATE ON data_ingestion
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
-- 创建用于查询的视图(可选)
CREATE OR REPLACE VIEW v_data_ingestion_summary AS
SELECT
id,
source,
COUNT(*) as record_count,
MIN(data_time) as earliest_data_time,
MAX(data_time) as latest_data_time,
MIN(ingestion_time) as first_ingestion_time,
MAX(ingestion_time) as last_ingestion_time
FROM data_ingestion
GROUP BY id, source;

View File

@ -1,3 +1,4 @@
use axum_reverse_proxy::ReverseProxy;
use std::sync::Arc; use std::sync::Arc;
use async_graphql::{ use async_graphql::{
@ -64,11 +65,12 @@ pub fn create_router(pool: PgPool, config: Config) -> Router {
}, },
}; };
let router = ReverseProxy::new("/api", &config.tile_server_url.as_str());
Router::new() Router::new()
.route("/", get(graphql_playground)) .route("/", get(graphql_playground))
.route("/graphql", get(graphql_playground).post(graphql_handler)) .route("/graphql", get(graphql_playground).post(graphql_handler))
// .route("/ws", get(graphql_subscription_handler))
.layer(CorsLayer::permissive()) .layer(CorsLayer::permissive())
.merge(router)
.with_state(app_state) .with_state(app_state)
} }

68
src/cli.rs Normal file
View File

@ -0,0 +1,68 @@
use clap::{Args, Parser, Subcommand};
use std::net::IpAddr;
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
#[command(name = "mapp")]
#[command(about = "Map应用服务器 - 一个基于GraphQL的地图应用服务器")]
pub struct Cli {
#[command(subcommand)]
pub command: Option<Commands>,
}
#[derive(Subcommand)]
pub enum Commands {
/// 启动GraphQL服务器
Serve(ServeArgs),
/// 运行数据库迁移
Migrate(MigrateArgs),
/// 显示版本信息
Version,
/// 显示配置信息
Config,
}
#[derive(Args)]
pub struct ServeArgs {
/// 服务器监听端口
#[arg(short, long, value_name = "PORT")]
pub port: Option<u16>,
/// 服务器监听IP地址
#[arg(short, long, value_name = "IP", default_value = "0.0.0.0")]
pub host: IpAddr,
/// 是否启用开发模式
#[arg(short, long)]
pub dev: bool,
/// 是否启用详细日志
#[arg(short, long)]
pub verbose: bool,
}
#[derive(Args)]
pub struct MigrateArgs {
/// 要迁移到的特定版本
#[arg(short, long, value_name = "VERSION")]
pub version: Option<String>,
/// 只显示迁移状态,不执行迁移
#[arg(short, long)]
pub dry_run: bool,
/// 强制重新运行所有迁移
#[arg(short, long)]
pub force: bool,
}
impl Default for Commands {
fn default() -> Self {
Commands::Serve(ServeArgs {
port: None,
host: "0.0.0.0".parse().unwrap(),
dev: false,
verbose: false,
})
}
}

View File

@ -5,12 +5,13 @@ pub struct Config {
pub database_url: String, pub database_url: String,
pub jwt_secret: String, pub jwt_secret: String,
pub port: u16, pub port: u16,
pub tile_server_url: String,
} }
impl Config { impl Config {
pub fn from_env() -> Result<Self, env::VarError> { pub fn from_env() -> Result<Self, env::VarError> {
dotenvy::dotenv().ok(); dotenvy::dotenv().ok();
Ok(Config { Ok(Config {
database_url: env::var("DATABASE_URL")?, database_url: env::var("DATABASE_URL")?,
jwt_secret: env::var("JWT_SECRET")?, jwt_secret: env::var("JWT_SECRET")?,
@ -18,6 +19,7 @@ impl Config {
.unwrap_or_else(|_| "3000".to_string()) .unwrap_or_else(|_| "3000".to_string())
.parse() .parse()
.unwrap_or(3000), .unwrap_or(3000),
tile_server_url: env::var("TILE_SERVER")?,
}) })
} }
} }

View File

@ -2,7 +2,7 @@ use crate::auth::get_auth_user;
use crate::graphql::guards::RequireRole; use crate::graphql::guards::RequireRole;
use crate::graphql::types::UserInfoRespnose; use crate::graphql::types::UserInfoRespnose;
use crate::models::invite_code::InviteCode; use crate::models::invite_code::InviteCode;
use crate::models::user::{Role, User}; use crate::models::user::{Role, User, UserInfoRow};
use crate::services::invite_code_service::InviteCodeService; use crate::services::invite_code_service::InviteCodeService;
use crate::services::user_service::UserService; use crate::services::user_service::UserService;
use async_graphql::{Context, Object, Result}; use async_graphql::{Context, Object, Result};
@ -36,11 +36,12 @@ impl QueryRoot {
async fn users( async fn users(
&self, &self,
ctx: &Context<'_>, ctx: &Context<'_>,
offset: Option<i64>, offset: Option<u64>,
limit: Option<i64>, limit: Option<u64>,
sort_by: Option<String>, sort_by: Option<String>,
sort_order: Option<String>, sort_order: Option<String>,
) -> Result<Vec<User>> { filter: Option<String>,
) -> Result<Vec<UserInfoRow>> {
let user_service = ctx.data::<UserService>()?; let user_service = ctx.data::<UserService>()?;
info!("users im here"); info!("users im here");
let offset = offset.unwrap_or(0); let offset = offset.unwrap_or(0);
@ -48,7 +49,7 @@ impl QueryRoot {
let sort_by = sort_by.unwrap_or("created_at".to_string()); let sort_by = sort_by.unwrap_or("created_at".to_string());
let sort_order = sort_order.unwrap_or("desc".to_string()); let sort_order = sort_order.unwrap_or("desc".to_string());
user_service user_service
.get_all_users(offset, limit, sort_by, sort_order) .get_all_users(offset, limit, sort_by, sort_order, filter)
.await .await
} }
@ -73,18 +74,20 @@ impl QueryRoot {
async fn users_info( async fn users_info(
&self, &self,
ctx: &Context<'_>, ctx: &Context<'_>,
offset: Option<i64>, offset: Option<u64>,
limit: Option<i64>, limit: Option<u64>,
sort_by: Option<String>,
sort_order: Option<String>,
filter: Option<String>,
) -> Result<UserInfoRespnose> { ) -> Result<UserInfoRespnose> {
let user_service = ctx.data::<UserService>()?; let user_service = ctx.data::<UserService>()?;
let offset = offset.unwrap_or(0); let offset = offset.unwrap_or(0);
let limit = limit.unwrap_or(20); let limit = limit.unwrap_or(20);
let sort_by = sort_by.unwrap_or("created_at".to_string());
let sort_by = "created_at"; let sort_order = sort_order.unwrap_or("desc".to_string());
let sort_order = "desc";
user_service user_service
.users_info(offset, limit, sort_by, sort_order) .users_info(offset, limit, sort_by, sort_order, filter)
.await .await
} }
} }

View File

@ -68,5 +68,5 @@ pub struct UserInfoRespnose {
pub total_admin_users: i64, pub total_admin_users: i64,
pub total_user_users: i64, pub total_user_users: i64,
pub users: Vec<crate::models::user::User>, pub users: Vec<crate::models::user::UserInfoRow>,
} }

View File

@ -1,5 +1,6 @@
mod app; mod app;
mod auth; mod auth;
mod cli;
mod config; mod config;
mod db; mod db;
mod graphql; mod graphql;
@ -7,30 +8,236 @@ mod models;
mod services; mod services;
use app::create_router; use app::create_router;
use clap::Parser;
use cli::{Cli, Commands, MigrateArgs, ServeArgs};
use config::Config; use config::Config;
use db::{create_pool, run_migrations}; use db::{create_pool, run_migrations};
use rustls;
use std::process;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init(); let cli = Cli::parse();
let config = Config::from_env()?; match cli.command.unwrap_or_default() {
Commands::Serve(args) => serve_command(args).await,
Commands::Migrate(args) => migrate_command(args).await,
Commands::Version => version_command(),
Commands::Config => config_command(),
}
}
async fn serve_command(args: ServeArgs) -> Result<(), Box<dyn std::error::Error>> {
// 初始化日志
let log_level = if args.verbose {
tracing::Level::DEBUG
} else if args.dev {
tracing::Level::INFO
} else {
tracing::Level::WARN
};
tracing_subscriber::fmt()
.with_max_level(log_level)
.with_target(args.verbose)
.with_thread_ids(args.verbose)
.with_file(args.verbose)
.with_line_number(args.verbose)
.init();
// 打印启动横幅
print_banner();
// 显示模式信息
if args.dev {
println!("🔧 运行模式: 开发模式");
} else {
println!("🚀 运行模式: 生产模式");
}
if args.verbose {
println!("📝 日志级别: 详细 (DEBUG)");
} else if args.dev {
println!("📝 日志级别: 信息 (INFO)");
} else {
println!("📝 日志级别: 警告 (WARN)");
}
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.map_err(|_| "Failed to install rustls crypto provider")?;
let mut config = Config::from_env()?;
// 命令行参数覆盖配置文件
if let Some(port) = args.port {
config.port = port;
}
// 显示配置信息
print_config_info(&config, &args);
println!();
println!("🔗 正在连接数据库...");
let pool = create_pool(&config.database_url).await?; let pool = create_pool(&config.database_url).await?;
println!("✅ 数据库连接成功");
run_migrations(&pool).await?; // 自动运行迁移
if args.dev {
println!("🗄️ 开发模式:自动运行数据库迁移");
run_migrations(&pool).await?;
println!("✅ 数据库迁移完成");
}
println!("⚙️ 正在创建GraphQL路由...");
let router = create_router(pool, config.clone()); let router = create_router(pool, config.clone());
let listener = tokio::net::TcpListener::bind(&format!("0.0.0.0:{}", config.port)).await?; let bind_addr = format!("{}:{}", args.host, config.port);
println!("🔌 正在绑定到地址: {}", bind_addr);
let listener = tokio::net::TcpListener::bind(&bind_addr).await?;
tracing::info!("GraphQL server running on http://0.0.0.0:{}", config.port); println!();
tracing::info!( println!("🎉 服务器启动成功!");
"GraphiQL playground: http://0.0.0.0:{}/graphql",
config.port if args.dev {
); println!("💡 提示: 使用 Ctrl+C 停止服务器");
tracing::info!("WebSocket subscriptions: ws://0.0.0.0:{}/ws", config.port); }
tracing::info!("服务器正在监听 {}", bind_addr);
axum::serve(listener, router).await?; axum::serve(listener, router).await?;
Ok(()) Ok(())
} }
async fn migrate_command(args: MigrateArgs) -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
println!("🗄️ 数据库迁移工具");
println!("━━━━━━━━━━━━━━━━━");
let config = Config::from_env()?;
println!("📡 连接数据库: {}", mask_database_url(&config.database_url));
let pool = create_pool(&config.database_url).await?;
println!("✅ 数据库连接成功");
if args.dry_run {
println!("🔍 检查数据库迁移状态...");
// 这里可以添加检查迁移状态的逻辑
println!("✅ 数据库迁移状态检查完成");
return Ok(());
}
println!("⚡ 开始运行数据库迁移...");
run_migrations(&pool).await?;
println!("🎉 数据库迁移完成!");
Ok(())
}
fn version_command() -> Result<(), Box<dyn std::error::Error>> {
println!();
println!("🗺️ Map Application Server");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!("📦 版本: {}", env!("CARGO_PKG_VERSION"));
println!(
"🏗️ 构建时间: {}",
option_env!("VERGEN_BUILD_TIMESTAMP").unwrap_or("未知")
);
println!(
"🔗 Git提交: {}",
option_env!("VERGEN_GIT_SHA").unwrap_or("未知")
);
println!(
"🦀 Rust版本: {}",
option_env!("VERGEN_RUSTC_SEMVER").unwrap_or("未知")
);
println!("👨‍💻 作者: {}", env!("CARGO_PKG_AUTHORS"));
println!("📝 描述: {}", env!("CARGO_PKG_DESCRIPTION"));
println!();
Ok(())
}
fn config_command() -> Result<(), Box<dyn std::error::Error>> {
match Config::from_env() {
Ok(config) => {
println!();
println!("⚙️ 应用配置信息");
println!("━━━━━━━━━━━━━━━━━");
println!("🌐 服务端口: {}", config.port);
println!("🗄️ 数据库URL: {}", mask_database_url(&config.database_url));
println!(
"🔐 JWT密钥: {}",
if config.jwt_secret.is_empty() {
"❌ 未设置"
} else {
"✅ 已设置"
}
);
println!("🗺️ 瓦片服务器: {}", config.tile_server_url);
println!();
println!("💡 提示: 配置通过环境变量加载");
println!(" DATABASE_URL, JWT_SECRET, PORT, TILE_SERVER");
}
Err(e) => {
eprintln!();
eprintln!("❌ 配置加载失败");
eprintln!("━━━━━━━━━━━━━━━━");
eprintln!("错误: {}", e);
eprintln!();
eprintln!("💡 请检查以下环境变量是否正确设置:");
eprintln!(" - DATABASE_URL: PostgreSQL连接字符串");
eprintln!(" - JWT_SECRET: JWT签名密钥");
eprintln!(" - PORT: 服务器端口 (可选默认3000)");
eprintln!(" - TILE_SERVER: 瓦片服务器URL");
eprintln!();
process::exit(1);
}
}
Ok(())
}
fn mask_database_url(url: &str) -> String {
if let Some(at_pos) = url.find('@') {
if let Some(scheme_end) = url.find("://") {
let scheme = &url[..scheme_end + 3];
let rest = &url[at_pos..];
format!("{}***{}", scheme, rest)
} else {
"***".to_string()
}
} else {
url.to_string()
}
}
fn print_banner() {
println!();
println!("███╗ ███╗ █████╗ ██████╗ ██████╗ ");
println!("████╗ ████║██╔══██╗██╔══██╗██╔══██╗");
println!("██╔████╔██║███████║██████╔╝██████╔╝");
println!("██║╚██╔╝██║██╔══██║██╔═══╝ ██╔═══╝ ");
println!("██║ ╚═╝ ██║██║ ██║██║ ██║ ");
println!("╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝ ");
println!();
println!("🗺️ Map Application Server v{}", env!("CARGO_PKG_VERSION"));
println!(" 基于GraphQL的现代地图应用服务器");
println!();
}
fn print_config_info(config: &Config, args: &ServeArgs) {
println!("📋 配置信息:");
println!(" 🌐 服务地址: {}:{}", args.host, config.port);
println!(" 🗄️ 数据库: {}", mask_database_url(&config.database_url));
println!(
" 🔐 JWT密钥: {}",
if config.jwt_secret.is_empty() {
"❌ 未设置"
} else {
"✅ 已配置"
}
);
println!(" 🗺️ 瓦片服务: {}", config.tile_server_url);
}

View File

@ -1,5 +1,6 @@
use async_graphql::{Enum, SimpleObject}; use async_graphql::{Enum, SimpleObject};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use sea_query::Iden;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::FromRow; use sqlx::FromRow;
use uuid::Uuid; use uuid::Uuid;
@ -45,3 +46,43 @@ pub struct LoginInput {
pub username: String, pub username: String,
pub password: String, pub password: String,
} }
#[derive(Iden, PartialEq, Eq)]
pub enum Users {
Table,
Id,
Username,
Email,
Role,
IsActivate,
CreatedAt,
UpdatedAt,
}
impl TryFrom<String> for Users {
type Error = async_graphql::Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
match value.as_str() {
"id" => Ok(Users::Id),
"username" => Ok(Users::Username),
"email" => Ok(Users::Email),
"role" => Ok(Users::Role),
"is_activate" => Ok(Users::IsActivate),
"created_at" => Ok(Users::CreatedAt),
"updated_at" => Ok(Users::UpdatedAt),
_ => Err(async_graphql::Error::new("Invalid column name")),
}
}
}
#[derive(sqlx::FromRow, Debug, SimpleObject)]
pub struct UserInfoRow {
pub id: Uuid,
pub username: String,
pub email: String,
pub role: Role,
pub is_activate: bool,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}

View File

@ -5,7 +5,7 @@ use uuid::Uuid;
use crate::{ use crate::{
graphql::types::CreateInviteCodeInput, graphql::types::CreateInviteCodeInput,
models::invite_code::{InviteCode, UseInviteCodeInput, ValidateInviteCodeInput}, models::invite_code::{InviteCode, ValidateInviteCodeInput},
}; };
pub struct InviteCodeService { pub struct InviteCodeService {
@ -41,7 +41,10 @@ impl InviteCodeService {
let invite_code = sqlx::query_as!( let invite_code = sqlx::query_as!(
InviteCode, InviteCode,
r#" r#"
SELECT id, code, created_by, used_by, is_used, expires_at, created_at, used_at SELECT id, code, created_by, used_by, is_used,
expires_at as "expires_at: chrono::DateTime<chrono::Utc>",
created_at as "created_at: chrono::DateTime<chrono::Utc>",
used_at as "used_at: chrono::DateTime<chrono::Utc>"
FROM invite_codes FROM invite_codes
WHERE code = $1 WHERE code = $1
"#, "#,
@ -97,7 +100,10 @@ impl InviteCodeService {
let invite_codes = sqlx::query_as!( let invite_codes = sqlx::query_as!(
InviteCode, InviteCode,
r#" r#"
SELECT id, code, created_by, used_by, is_used, expires_at, created_at, used_at SELECT id, code, created_by, used_by, is_used,
expires_at as "expires_at: chrono::DateTime<chrono::Utc>",
created_at as "created_at: chrono::DateTime<chrono::Utc>",
used_at as "used_at: chrono::DateTime<chrono::Utc>"
FROM invite_codes FROM invite_codes
WHERE created_by = $1 WHERE created_by = $1
ORDER BY created_at DESC ORDER BY created_at DESC
@ -115,7 +121,10 @@ impl InviteCodeService {
let invite_code = sqlx::query_as!( let invite_code = sqlx::query_as!(
InviteCode, InviteCode,
r#" r#"
SELECT id, code, created_by, used_by, is_used, expires_at, created_at, used_at SELECT id, code, created_by, used_by, is_used,
expires_at as "expires_at: chrono::DateTime<chrono::Utc>",
created_at as "created_at: chrono::DateTime<chrono::Utc>",
used_at as "used_at: chrono::DateTime<chrono::Utc>"
FROM invite_codes FROM invite_codes
WHERE id = $1 WHERE id = $1
"#, "#,

View File

@ -1,6 +1,8 @@
use argon2::password_hash::{rand_core::OsRng, SaltString}; use argon2::password_hash::{rand_core::OsRng, SaltString};
use argon2::{Argon2, PasswordHash, PasswordHasher, PasswordVerifier}; use argon2::{Argon2, PasswordHash, PasswordHasher, PasswordVerifier};
use async_graphql::{Error, Result}; use async_graphql::{Error, Result};
use sea_query::{Expr, PostgresQueryBuilder};
use sea_query_binder::SqlxBinder;
use sqlx::PgPool; use sqlx::PgPool;
use tracing::info; use tracing::info;
use uuid::Uuid; use uuid::Uuid;
@ -10,6 +12,7 @@ use crate::graphql::types::{
CreateUserInput, LoginInput, LoginResponse, RegisterInput, UserInfoRespnose, CreateUserInput, LoginInput, LoginResponse, RegisterInput, UserInfoRespnose,
}; };
use crate::models::user::{Role, User}; use crate::models::user::{Role, User};
use crate::services::invite_code_service::InviteCodeService; use crate::services::invite_code_service::InviteCodeService;
use crate::services::system_config_service::SystemConfigService; use crate::services::system_config_service::SystemConfigService;
@ -48,7 +51,9 @@ impl UserService {
r#" r#"
INSERT INTO users (username, email, password_hash, role, is_activate) INSERT INTO users (username, email, password_hash, role, is_activate)
VALUES ($1, $2, $3, $4, $5) VALUES ($1, $2, $3, $4, $5)
RETURNING id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate, created_at, updated_at RETURNING id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate,
created_at as "created_at: chrono::DateTime<chrono::Utc>",
updated_at as "updated_at: chrono::DateTime<chrono::Utc>"
"#, "#,
input.username, input.username,
input.email, input.email,
@ -87,7 +92,9 @@ impl UserService {
User, User,
r#" r#"
UPDATE users SET invite_code_id = $1 WHERE id = $2 UPDATE users SET invite_code_id = $1 WHERE id = $2
RETURNING id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate, created_at, updated_at RETURNING id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate,
created_at as "created_at: chrono::DateTime<chrono::Utc>",
updated_at as "updated_at: chrono::DateTime<chrono::Utc>"
"#, "#,
invite_code_id, invite_code_id,
user.id user.id
@ -114,7 +121,9 @@ impl UserService {
r#" r#"
INSERT INTO users (username, email, password_hash, role, is_activate) INSERT INTO users (username, email, password_hash, role, is_activate)
VALUES ($1, $2, $3, $4, $5) VALUES ($1, $2, $3, $4, $5)
RETURNING id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate, created_at, updated_at RETURNING id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate,
created_at as "created_at: chrono::DateTime<chrono::Utc>",
updated_at as "updated_at: chrono::DateTime<chrono::Utc>"
"#, "#,
input.username, input.username,
input.email, input.email,
@ -139,7 +148,9 @@ impl UserService {
let user = sqlx::query_as!( let user = sqlx::query_as!(
User, User,
r#" r#"
SELECT id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate, created_at, updated_at SELECT id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate,
created_at as "created_at: chrono::DateTime<chrono::Utc>",
updated_at as "updated_at: chrono::DateTime<chrono::Utc>"
FROM users WHERE username = $1 FROM users WHERE username = $1
"#, "#,
input.username input.username
@ -165,7 +176,9 @@ impl UserService {
let user = sqlx::query_as!( let user = sqlx::query_as!(
User, User,
r#" r#"
SELECT id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate, created_at, updated_at SELECT id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate,
created_at as "created_at: chrono::DateTime<chrono::Utc>",
updated_at as "updated_at: chrono::DateTime<chrono::Utc>"
FROM users WHERE id = $1 FROM users WHERE id = $1
"#, "#,
id id
@ -179,32 +192,46 @@ impl UserService {
pub async fn get_all_users( pub async fn get_all_users(
&self, &self,
offset: i64, offset: u64,
limit: i64, limit: u64,
sort_by: String, sort_by: String,
sort_order: String, sort_order: String,
) -> Result<Vec<User>> { filter: Option<String>,
// 验证排序字段防止SQL注入 ) -> Result<Vec<crate::models::user::UserInfoRow>> {
let sort_by = match sort_by.as_str() { use crate::models::user::Users;
"username" | "email" | "created_at" | "updated_at" | "role" | "is_activate" => sort_by, use sea_query::{Expr, Order, Query};
_ => "created_at".to_string(), // 默认排序字段 let sort_by = Users::try_from(sort_by)?;
let sort_order = if sort_order == "asc" {
Order::Asc
} else {
Order::Desc
}; };
let sort_order = if sort_order == "asc" { "ASC" } else { "DESC" }; let (sql, values) = Query::select()
.columns([
Users::Id,
Users::Username,
Users::Email,
Users::Role,
Users::IsActivate,
Users::CreatedAt,
Users::UpdatedAt,
])
.from(Users::Table)
.and_where_option(filter.map(|r| Expr::col(Users::Role).eq(r)))
.order_by(sort_by, sort_order)
.limit(limit)
.offset(offset)
.build_sqlx(PostgresQueryBuilder);
// 动态构建SQL查询因为列名和排序方向不能参数化 info!("sql: {:?}", sql);
let query = format!( info!("values: {:?}", values);
r#"SELECT id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate, created_at, updated_at FROM users ORDER BY {} {} LIMIT $1 OFFSET $2"#,
sort_by, sort_order
);
let users = sqlx::query_as!(User, &query, limit, offset) let users = sqlx::query_as_with::<_, crate::models::user::UserInfoRow, _>(&sql, values)
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| Error::new(format!("Database error: {}", e)))?; .map_err(|e| Error::new(format!("Database error: {}", e)))?;
info!("users: {:?}", users);
Ok(users) Ok(users)
} }
@ -245,7 +272,9 @@ impl UserService {
r#" r#"
INSERT INTO users (username, email, password_hash, role, is_activate) INSERT INTO users (username, email, password_hash, role, is_activate)
VALUES ($1, $2, $3, $4, $5) VALUES ($1, $2, $3, $4, $5)
RETURNING id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate, created_at, updated_at RETURNING id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate,
created_at as "created_at: chrono::DateTime<chrono::Utc>",
updated_at as "updated_at: chrono::DateTime<chrono::Utc>"
"#, "#,
username, username,
email, email,
@ -298,52 +327,43 @@ impl UserService {
pub async fn users_info( pub async fn users_info(
&self, &self,
offset: i64, offset: u64,
limit: i64, limit: u64,
sort_by: impl Into<String>, sort_by: impl Into<String>,
sort_order: impl Into<String>, sort_order: impl Into<String>,
filter: Option<impl Into<String>>,
) -> Result<UserInfoRespnose> { ) -> Result<UserInfoRespnose> {
let sort_by = sort_by.into(); let sort_by = sort_by.into();
let sort_order = sort_order.into(); let sort_order = sort_order.into();
let filter = filter.map(|f| f.into());
let total_users = sqlx::query_scalar!(r#"SELECT COUNT(*) FROM users"#) let stats = sqlx::query!(
.fetch_one(&self.pool) r#"
.await SELECT
.map_err(|e| Error::new(format!("Database error: {}", e)))?; COUNT(*) as total_users,
COUNT(CASE WHEN is_activate = true THEN 1 END) as total_active_users,
COUNT(CASE WHEN is_activate = false THEN 1 END) as total_inactive_users,
COUNT(CASE WHEN role = 'Admin' THEN 1 END) as total_admin_users,
COUNT(CASE WHEN role = 'User' THEN 1 END) as total_user_users
FROM users
"#
)
.fetch_one(&self.pool)
.await
.map_err(|e| Error::new(format!("Database error: {}", e)))?;
let total_active_users = // 并行获取用户列表
sqlx::query_scalar!(r#"SELECT COUNT(*) FROM users WHERE is_activate = true"#) let users = self
.fetch_one(&self.pool) .get_all_users(offset, limit, sort_by, sort_order, filter)
.await .await?;
.map_err(|e| Error::new(format!("Database error: {}", e)))?;
let total_inactive_users =
sqlx::query_scalar!(r#"SELECT COUNT(*) FROM users WHERE is_activate = false"#)
.fetch_one(&self.pool)
.await
.map_err(|e| Error::new(format!("Database error: {}", e)))?;
let total_admin_users =
sqlx::query_scalar!(r#"SELECT COUNT(*) FROM users WHERE role = 'Admin'"#)
.fetch_one(&self.pool)
.await
.map_err(|e| Error::new(format!("Database error: {}", e)))?;
let total_user_users =
sqlx::query_scalar!(r#"SELECT COUNT(*) FROM users WHERE role = 'user'"#)
.fetch_one(&self.pool)
.await
.map_err(|e| Error::new(format!("Database error: {}", e)))?;
let users = self.get_all_users(offset, limit, sort_by, sort_order).await;
Ok(UserInfoRespnose { Ok(UserInfoRespnose {
total_users: total_users.unwrap_or(0), total_users: stats.total_users.unwrap_or(0),
total_active_users: total_active_users.unwrap_or(0), total_active_users: stats.total_active_users.unwrap_or(0),
total_inactive_users: total_inactive_users.unwrap_or(0), total_inactive_users: stats.total_inactive_users.unwrap_or(0),
total_admin_users: total_admin_users.unwrap_or(0), total_admin_users: stats.total_admin_users.unwrap_or(0),
total_user_users: total_user_users.unwrap_or(0), total_user_users: stats.total_user_users.unwrap_or(0),
users: users.unwrap_or_default(), users,
}) })
} }
} }