366 lines
12 KiB
Rust
366 lines
12 KiB
Rust
use argon2::password_hash::{rand_core::OsRng, SaltString};
|
||
use argon2::{Argon2, PasswordHash, PasswordHasher, PasswordVerifier};
|
||
use async_graphql::{Error, Result};
|
||
use sqlx::PgPool;
|
||
use tracing::info;
|
||
use uuid::Uuid;
|
||
|
||
use crate::auth::Claims;
|
||
use crate::graphql::types::{
|
||
CreateUserInput, LoginInput, LoginResponse, RegisterInput, UserInfoRespnose,
|
||
};
|
||
use crate::models::user::{Role, User};
|
||
use crate::services::invite_code_service::InviteCodeService;
|
||
use crate::services::system_config_service::SystemConfigService;
|
||
|
||
pub struct UserService {
|
||
pool: PgPool,
|
||
jwt_secret: String,
|
||
}
|
||
|
||
impl UserService {
|
||
pub fn new(pool: PgPool, jwt_secret: String) -> Self {
|
||
Self { pool, jwt_secret }
|
||
}
|
||
|
||
pub async fn register(&self, input: RegisterInput) -> Result<User> {
|
||
let password_hash = self.hash_password(&input.password)?;
|
||
let role = input.role.unwrap_or(Role::User);
|
||
|
||
// Use transaction to ensure data consistency
|
||
let mut tx = self
|
||
.pool
|
||
.begin()
|
||
.await
|
||
.map_err(|e| Error::new(format!("Failed to start transaction: {}", e)))?;
|
||
|
||
// Validate invite code first
|
||
let invite_code_service = InviteCodeService::new(self.pool.clone());
|
||
invite_code_service
|
||
.validate_invite_code(crate::models::invite_code::ValidateInviteCodeInput {
|
||
code: input.invite_code.clone(),
|
||
})
|
||
.await?;
|
||
|
||
// Create user first
|
||
let user = sqlx::query_as!(
|
||
User,
|
||
r#"
|
||
INSERT INTO users (username, email, password_hash, role, is_activate)
|
||
VALUES ($1, $2, $3, $4, $5)
|
||
RETURNING id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate, created_at, updated_at
|
||
"#,
|
||
input.username,
|
||
input.email,
|
||
password_hash,
|
||
role as Role,
|
||
true
|
||
)
|
||
.fetch_one(&mut *tx)
|
||
.await
|
||
.map_err(|e| {
|
||
if e.to_string().contains("duplicate key") {
|
||
Error::new("Username or email already exists")
|
||
} else {
|
||
Error::new(format!("Failed to create user: {}", e))
|
||
}
|
||
})?;
|
||
|
||
// Use the invite code
|
||
let invite_code_id = sqlx::query_scalar!(
|
||
r#"
|
||
UPDATE invite_codes
|
||
SET is_used = true, used_by = $1, used_at = NOW()
|
||
WHERE code = $2 AND is_used = false
|
||
RETURNING id
|
||
"#,
|
||
user.id,
|
||
input.invite_code
|
||
)
|
||
.fetch_optional(&mut *tx)
|
||
.await
|
||
.map_err(|e| Error::new(format!("Database error: {}", e)))?
|
||
.ok_or_else(|| Error::new("Failed to use invite code"))?;
|
||
|
||
// Update user with invite code id
|
||
let user = sqlx::query_as!(
|
||
User,
|
||
r#"
|
||
UPDATE users SET invite_code_id = $1 WHERE id = $2
|
||
RETURNING id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate, created_at, updated_at
|
||
"#,
|
||
invite_code_id,
|
||
user.id
|
||
)
|
||
.fetch_one(&mut *tx)
|
||
.await
|
||
.map_err(|e| Error::new(format!("Failed to update user with invite code: {}", e)))?;
|
||
|
||
// Commit transaction
|
||
tx.commit()
|
||
.await
|
||
.map_err(|e| Error::new(format!("Failed to commit transaction: {}", e)))?;
|
||
|
||
Ok(user)
|
||
}
|
||
|
||
pub async fn create_user(&self, input: CreateUserInput) -> Result<User> {
|
||
let password_hash = self.hash_password(&input.password)?;
|
||
let role = input.role.unwrap_or(Role::User);
|
||
|
||
// Create user first
|
||
let user = sqlx::query_as!(
|
||
User,
|
||
r#"
|
||
INSERT INTO users (username, email, password_hash, role, is_activate)
|
||
VALUES ($1, $2, $3, $4, $5)
|
||
RETURNING id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate, created_at, updated_at
|
||
"#,
|
||
input.username,
|
||
input.email,
|
||
password_hash,
|
||
role as Role,
|
||
true
|
||
)
|
||
.fetch_one(&self.pool)
|
||
.await
|
||
.map_err(|e| {
|
||
if e.to_string().contains("duplicate key") {
|
||
Error::new("Username or email already exists")
|
||
} else {
|
||
Error::new(format!("Failed to create user: {}", e))
|
||
}
|
||
})?;
|
||
|
||
Ok(user)
|
||
}
|
||
|
||
pub async fn login(&self, input: LoginInput) -> Result<LoginResponse> {
|
||
let user = sqlx::query_as!(
|
||
User,
|
||
r#"
|
||
SELECT id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate, created_at, updated_at
|
||
FROM users WHERE username = $1
|
||
"#,
|
||
input.username
|
||
)
|
||
.fetch_optional(&self.pool)
|
||
.await
|
||
.map_err(|e| Error::new(format!("Database error: {}", e)))?
|
||
.ok_or_else(|| Error::new("Invalid credentials"))?;
|
||
|
||
if !self.verify_password(&input.password, &user.password_hash)? {
|
||
return Err(Error::new("Invalid credentials"));
|
||
}
|
||
|
||
let token = Claims::new(&user, &self.jwt_secret)?;
|
||
|
||
Ok(LoginResponse {
|
||
token,
|
||
user_id: user.id.to_string(),
|
||
})
|
||
}
|
||
|
||
pub async fn get_user_by_id(&self, id: Uuid) -> Result<Option<User>> {
|
||
let user = sqlx::query_as!(
|
||
User,
|
||
r#"
|
||
SELECT id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate, created_at, updated_at
|
||
FROM users WHERE id = $1
|
||
"#,
|
||
id
|
||
)
|
||
.fetch_optional(&self.pool)
|
||
.await
|
||
.map_err(|e| Error::new(format!("Database error: {}", e)))?;
|
||
|
||
Ok(user)
|
||
}
|
||
|
||
pub async fn get_all_users(
|
||
&self,
|
||
offset: i64,
|
||
limit: i64,
|
||
sort_by: String,
|
||
sort_order: String,
|
||
) -> Result<Vec<User>> {
|
||
// 验证排序字段,防止SQL注入
|
||
let sort_by = match sort_by.as_str() {
|
||
"username" | "email" | "created_at" | "updated_at" | "role" | "is_activate" => sort_by,
|
||
_ => "created_at".to_string(), // 默认排序字段
|
||
};
|
||
|
||
let sort_order = if sort_order == "asc" { "ASC" } else { "DESC" };
|
||
|
||
// 动态构建SQL查询,因为列名和排序方向不能参数化
|
||
let query = format!(
|
||
r#"SELECT id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate, created_at, updated_at FROM users ORDER BY {} {} LIMIT $1 OFFSET $2"#,
|
||
sort_by, sort_order
|
||
);
|
||
|
||
let users = sqlx::query_as!(User, &query, limit, offset)
|
||
.fetch_all(&self.pool)
|
||
.await
|
||
.map_err(|e| Error::new(format!("Database error: {}", e)))?;
|
||
|
||
info!("users: {:?}", users);
|
||
|
||
Ok(users)
|
||
}
|
||
|
||
pub async fn initialize_admin(
|
||
&self,
|
||
username: String,
|
||
email: String,
|
||
password: String,
|
||
) -> Result<User> {
|
||
// Check if admin is already initialized
|
||
let system_config_service = SystemConfigService::new(self.pool.clone());
|
||
if system_config_service.is_admin_initialized().await? {
|
||
return Err(Error::new("Admin has already been initialized"));
|
||
}
|
||
|
||
// Check if any users exist
|
||
let user_count = sqlx::query_scalar!(r#"SELECT COUNT(*) FROM users"#)
|
||
.fetch_one(&self.pool)
|
||
.await
|
||
.map_err(|e| Error::new(format!("Database error: {}", e)))?;
|
||
|
||
if user_count.unwrap_or(0) > 0 {
|
||
return Err(Error::new("Users already exist in the system"));
|
||
}
|
||
|
||
let password_hash = self.hash_password(&password)?;
|
||
|
||
// Use transaction to ensure data consistency
|
||
let mut tx = self
|
||
.pool
|
||
.begin()
|
||
.await
|
||
.map_err(|e| Error::new(format!("Failed to start transaction: {}", e)))?;
|
||
|
||
// Create admin user
|
||
let user = sqlx::query_as!(
|
||
User,
|
||
r#"
|
||
INSERT INTO users (username, email, password_hash, role, is_activate)
|
||
VALUES ($1, $2, $3, $4, $5)
|
||
RETURNING id, username, email, password_hash, role as "role: Role", invite_code_id, is_activate, created_at, updated_at
|
||
"#,
|
||
username,
|
||
email,
|
||
password_hash,
|
||
Role::Admin as Role,
|
||
true
|
||
)
|
||
.fetch_one(&mut *tx)
|
||
.await
|
||
.map_err(|e| {
|
||
if e.to_string().contains("duplicate key") {
|
||
Error::new("Username or email already exists")
|
||
} else {
|
||
Error::new(format!("Failed to create admin user: {}", e))
|
||
}
|
||
})?;
|
||
|
||
// Mark admin as initialized
|
||
system_config_service.set_admin_initialized().await?;
|
||
|
||
// Commit transaction
|
||
tx.commit()
|
||
.await
|
||
.map_err(|e| Error::new(format!("Failed to commit transaction: {}", e)))?;
|
||
|
||
info!("Admin user initialized: {}", user.username);
|
||
Ok(user)
|
||
}
|
||
|
||
fn hash_password(&self, password: &str) -> Result<String> {
|
||
let salt = SaltString::generate(&mut OsRng);
|
||
let argon2 = Argon2::default();
|
||
|
||
argon2
|
||
.hash_password(password.as_bytes(), &salt)
|
||
.map_err(|e| Error::new(format!("Failed to hash password: {}", e)))?
|
||
.to_string()
|
||
.pipe(Ok)
|
||
}
|
||
|
||
fn verify_password(&self, password: &str, hash: &str) -> Result<bool> {
|
||
let parsed_hash = PasswordHash::new(hash)
|
||
.map_err(|e| Error::new(format!("Invalid password hash: {}", e)))?;
|
||
|
||
let argon2 = Argon2::default();
|
||
Ok(argon2
|
||
.verify_password(password.as_bytes(), &parsed_hash)
|
||
.is_ok())
|
||
}
|
||
|
||
pub async fn users_info(
|
||
&self,
|
||
offset: i64,
|
||
limit: i64,
|
||
sort_by: impl Into<String>,
|
||
sort_order: impl Into<String>,
|
||
) -> Result<UserInfoRespnose> {
|
||
let sort_by = sort_by.into();
|
||
let sort_order = sort_order.into();
|
||
|
||
let total_users = sqlx::query_scalar!(r#"SELECT COUNT(*) FROM users"#)
|
||
.fetch_one(&self.pool)
|
||
.await
|
||
.map_err(|e| Error::new(format!("Database error: {}", e)))?;
|
||
|
||
let total_active_users =
|
||
sqlx::query_scalar!(r#"SELECT COUNT(*) FROM users WHERE is_activate = true"#)
|
||
.fetch_one(&self.pool)
|
||
.await
|
||
.map_err(|e| Error::new(format!("Database error: {}", e)))?;
|
||
|
||
let total_inactive_users =
|
||
sqlx::query_scalar!(r#"SELECT COUNT(*) FROM users WHERE is_activate = false"#)
|
||
.fetch_one(&self.pool)
|
||
.await
|
||
.map_err(|e| Error::new(format!("Database error: {}", e)))?;
|
||
|
||
let total_admin_users =
|
||
sqlx::query_scalar!(r#"SELECT COUNT(*) FROM users WHERE role = 'Admin'"#)
|
||
.fetch_one(&self.pool)
|
||
.await
|
||
.map_err(|e| Error::new(format!("Database error: {}", e)))?;
|
||
|
||
let total_user_users =
|
||
sqlx::query_scalar!(r#"SELECT COUNT(*) FROM users WHERE role = 'user'"#)
|
||
.fetch_one(&self.pool)
|
||
.await
|
||
.map_err(|e| Error::new(format!("Database error: {}", e)))?;
|
||
|
||
let users = self.get_all_users(offset, limit, sort_by, sort_order).await;
|
||
|
||
Ok(UserInfoRespnose {
|
||
total_users: total_users.unwrap_or(0),
|
||
total_active_users: total_active_users.unwrap_or(0),
|
||
total_inactive_users: total_inactive_users.unwrap_or(0),
|
||
total_admin_users: total_admin_users.unwrap_or(0),
|
||
total_user_users: total_user_users.unwrap_or(0),
|
||
users: users.unwrap_or_default(),
|
||
})
|
||
}
|
||
}
|
||
|
||
trait Pipe<T> {
|
||
fn pipe<F, R>(self, f: F) -> R
|
||
where
|
||
F: FnOnce(Self) -> R,
|
||
Self: Sized;
|
||
}
|
||
|
||
impl<T> Pipe<T> for T {
|
||
fn pipe<F, R>(self, f: F) -> R
|
||
where
|
||
F: FnOnce(Self) -> R,
|
||
{
|
||
f(self)
|
||
}
|
||
}
|