sync
Some checks are pending
Docker Build and Push / build (push) Waiting to run

This commit is contained in:
Tsuki 2025-08-17 18:40:47 +08:00
parent ac53b79bc3
commit 26a75d25e9
14 changed files with 1610 additions and 32 deletions

9
.env
View File

@ -1,9 +0,0 @@
# DATABASE_URL=postgresql://mmap:yjhcfzXWrzslzl1331@8.217.64.157:5433/mmap
DATABASE_URL=postgresql://mmap:yjhcfzXWrzslzl1331@101.200.43.172:5433/mmap
JWT_SECRET="JvGpWgGWLHAhvhxN7BuOVtUWfMXm6xAqjClaTwOcAnI="
RUST_LOG=debug
PORT=3050
TILE_SERVER="http://47.95.11.22:3060/api"
KAFKA_BROKERS="8.217.64.157:9094"
KAFKA_TOPIC=data-output
KAFKA_GROUP_ID=mapp

View File

@ -9,7 +9,7 @@ on:
env:
REGISTRY: crpi-8rsu3rjoi0n0hc4m.cn-hongkong.personal.cr.aliyuncs.com
IMAGE_NAME: tmmapp/tiler
IMAGE_NAME: tmmapp/mmapp
jobs:
build:

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
/target
/tiles
.env

204
CLAUDE.md Normal file
View File

@ -0,0 +1,204 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
MAPP (Map Application Server) is a modern GraphQL-based map application server written in Rust. It features:
- GraphQL API with async-graphql
- RBAC permission management using Casbin
- Kafka message queue integration
- PostgreSQL database with migrations
- CLI management tools
- Blog system with categories and tags
- Rate limiting and authentication
- Multi-tenant architecture support
## Development Commands
### Build & Run
```bash
# Development build
cargo build
# Release build
cargo build --release
# Run tests
cargo test
# Code linting
cargo clippy
# Start development server with auto-migration
./target/release/mapp serve --dev --verbose
# Start production server
./target/release/mapp serve
# Start with Kafka integration
./target/release/mapp serve --kafka
```
### Database Management
```bash
# Run database migrations
./target/release/mapp migrate
# Check migration status (dry run)
./target/release/mapp migrate --dry-run
# Force re-run migrations
./target/release/mapp migrate --force
```
### Permission Management
```bash
# List all permission policies
./target/release/mapp permissions list
# Add permission policy
./target/release/mapp permissions add --role admin --resource users --action delete
# Assign role to user
./target/release/mapp permissions assign-role --user-id user123 --role editor
# Check user permissions
./target/release/mapp permissions check --user-id user123 --resource pages --action write
# Reload permission policies
./target/release/mapp permissions reload
```
### Blog Management
```bash
# Create blog post
./target/release/mapp blog create --title "Post Title" --slug "post-slug" --content '{"type":"doc","content":[]}' --user-id user123
# List blog posts
./target/release/mapp blog list --page 1 --limit 10
# Show blog post details
./target/release/mapp blog show post-slug --detail
# Create blog category
./target/release/mapp blog create-category --name "Tech" --slug "tech" --user-id user123
# View blog statistics
./target/release/mapp blog stats
```
### Configuration
```bash
# Show current configuration
./target/release/mapp config
# Show version information
./target/release/mapp version
```
## Architecture Overview
### Core Modules
- **app.rs**: Application router configuration and HTTP server setup
- **auth.rs**: Authentication and authorization middleware
- **cli.rs**: Command-line interface definitions
- **config.rs**: Configuration management from environment variables
- **db.rs**: Database connection pool and migration handling
### GraphQL Layer
- **graphql/**: Complete GraphQL implementation
- **queries.rs**: Query resolvers for data retrieval
- **mutations.rs**: Mutation resolvers for data modification
- **guards.rs**: Permission guards for access control
- **types/**: GraphQL type definitions (users, blog, config, permissions)
- **subscription.rs**: Real-time subscriptions
### Data Layer
- **models/**: Database models and schemas
- **user.rs**: User authentication and profile models
- **blog.rs**: Blog posts, categories, and tags
- **config.rs**: System configuration settings
- **page_block.rs**: Page content management
- **invite_code.rs**: User invitation system
### Service Layer
- **services/**: Business logic and data operations
- **casbin_service.rs**: RBAC permission management
- **blog_service.rs**: Blog content management
- **config_service.rs**: Configuration management
- **user_service.rs**: User management operations
- **mosaic_service.rs**: Tile server proxy operations
### Infrastructure
- **listener/**: Kafka message queue integration
- **migrations/**: Database schema migrations (PostgreSQL)
## Key Configuration
### Required Environment Variables
- `DATABASE_URL`: PostgreSQL connection string
- `JWT_SECRET`: JWT signing secret
- `TILE_SERVER`: Map tile server URL
### Optional Environment Variables
- `PORT`: Server port (default: 3000)
- `KAFKA_BROKERS`: Kafka cluster addresses (default: localhost:9092)
- `KAFKA_TOPIC`: Kafka topic name (default: mapp-events)
- `KAFKA_GROUP_ID`: Kafka consumer group (default: mapp-group)
- `KAFKA_SKIP_HISTORICAL`: Skip historical messages on restart (default: true)
## Permission System
The application uses Casbin for RBAC (Role-Based Access Control):
### Default Resources
- `users`: User management operations
- `pages`: Page content management
- `page_blocks`: Page block operations
- `blogs`: Blog management
- `categories`: Blog category management
- `tags`: Blog tag management
- `settings`: System configuration
### Default Actions
- `read`: View/list operations
- `write`: Create/update operations
- `delete`: Delete operations
- `admin`: Administrative operations
### Common Roles
- `admin`: Full system access (`*` resource, `*` action)
- `editor`: Content management (pages, blogs read/write)
- `viewer`: Read-only access (pages, blogs read)
## Blog System
The blog system supports:
- Rich content with JSON structure
- Categories with hierarchical organization
- Tags for content classification
- Draft/Published/Archived status workflow
- Featured content highlighting
- SEO metadata (meta_title, meta_description)
- View tracking and statistics
## Settings System
Comprehensive configuration management with:
- Multiple data types (string, number, boolean, json)
- Category-based organization
- System vs user configuration separation
- Encryption support for sensitive data
- History tracking and audit trail
- GraphQL API for management
## Development Notes
- The application automatically runs migrations in development mode (`--dev`)
- Use `--verbose` flag for detailed debugging information
- Kafka integration is optional and can be enabled with `--kafka` flag
- All CLI commands require proper database configuration
- GraphQL playground available at `/graphql` in development mode
- The system supports both UUID-based and slug-based content access

View File

@ -38,7 +38,7 @@ RUN groupadd -r appuser && useradd -r -g appuser appuser
WORKDIR /app
COPY --from=builder /usr/src/app/target/release/mapp-tile /app/mapp-tile
COPY --from=builder /usr/src/app/target/release/mapp /app/mapp
RUN chown -R appuser:appuser /app
USER appuser
@ -46,4 +46,4 @@ USER appuser
EXPOSE 3060
# 运行应用程序
CMD ["./mapp-tile"]
CMD ["./mapp", "serve", "-k"]

View File

@ -98,11 +98,11 @@ pub struct PermissionsArgs {
#[derive(Args)]
pub struct AddPolicyArgs {
/// 角色名称
#[arg(short, long)]
#[arg(short = 'R', long)]
pub role: String,
/// 资源名称
#[arg(short, long)]
#[arg(short = 'r', long)]
pub resource: String,
/// 操作名称
@ -114,11 +114,11 @@ pub struct AddPolicyArgs {
#[derive(Args)]
pub struct RemovePolicyArgs {
/// 角色名称
#[arg(short, long)]
#[arg(short = 'R', long)]
pub role: String,
/// 资源名称
#[arg(short, long)]
#[arg(short = 'r', long)]
pub resource: String,
/// 操作名称
@ -130,11 +130,11 @@ pub struct RemovePolicyArgs {
#[derive(Args)]
pub struct AssignRoleArgs {
/// 用户ID
#[arg(short, long)]
#[arg(short = 'u', long)]
pub user_id: String,
/// 角色名称
#[arg(short, long)]
#[arg(short = 'R', long)]
pub role: String,
}
@ -142,11 +142,11 @@ pub struct AssignRoleArgs {
#[derive(Args)]
pub struct RemoveRoleArgs {
/// 用户ID
#[arg(short, long)]
#[arg(short = 'u', long)]
pub user_id: String,
/// 角色名称
#[arg(short, long)]
#[arg(short = 'R', long)]
pub role: String,
}
@ -154,7 +154,7 @@ pub struct RemoveRoleArgs {
#[derive(Args)]
pub struct ListUserRolesArgs {
/// 用户ID
#[arg(short, long)]
#[arg(short = 'u', long)]
pub user_id: String,
}
@ -162,7 +162,7 @@ pub struct ListUserRolesArgs {
#[derive(Args)]
pub struct ListRolePermissionsArgs {
/// 角色名称
#[arg(short, long)]
#[arg(short = 'R', long)]
pub role: String,
}
@ -170,11 +170,11 @@ pub struct ListRolePermissionsArgs {
#[derive(Args)]
pub struct CheckPermissionArgs {
/// 用户ID
#[arg(short, long)]
#[arg(short = 'u', long)]
pub user_id: String,
/// 资源名称
#[arg(short, long)]
#[arg(short = 'r', long)]
pub resource: String,
/// 操作名称

View File

@ -10,6 +10,7 @@ pub struct Config {
pub kafka_brokers: String,
pub kafka_topic: String,
pub kafka_group_id: String,
pub kafka_skip_historical: bool,
}
impl Config {
@ -28,6 +29,10 @@ impl Config {
kafka_brokers: env::var("KAFKA_BROKERS")?,
kafka_topic: env::var("KAFKA_TOPIC")?,
kafka_group_id: env::var("KAFKA_GROUP_ID")?,
kafka_skip_historical: env::var("KAFKA_SKIP_HISTORICAL")
.unwrap_or_else(|_| "true".to_string())
.parse()
.unwrap_or(true),
})
}
}

View File

@ -1,6 +1,9 @@
use crate::graphql::guards::*;
use crate::graphql::types::permission::*;
use crate::services::casbin_service::CasbinService;
use async_graphql::{Context, Object, Result};
use chrono::Utc;
use tracing::info;
use uuid::Uuid;
#[derive(Default)]
@ -83,4 +86,295 @@ impl PermissionMutation {
Ok(true)
}
// 角色管理 Mutations
#[graphql(guard = "RequireWritePermission::new(\"permissions\")")]
async fn create_role(
&self,
ctx: &Context<'_>,
input: CreateRoleInput,
) -> Result<Role> {
let casbin_service = ctx.data::<CasbinService>()?;
// 调用CasbinService创建角色
let created = casbin_service.create_role(&input.name).await?;
if !created {
return Err("Role already exists".into());
}
let role = Role {
id: Uuid::new_v4(),
name: input.name.clone(),
code: input.code,
description: input.description,
role_type: input.role_type,
level: input.level,
is_active: input.is_active,
user_count: 0,
permission_count: 0,
permissions: Vec::new(),
created_at: Utc::now(),
updated_at: Utc::now(),
};
info!("Role created: {}", input.name);
Ok(role)
}
#[graphql(guard = "RequireWritePermission::new(\"permissions\")")]
async fn update_role(
&self,
ctx: &Context<'_>,
id: Uuid,
input: UpdateRoleInput,
) -> Result<Role> {
let _casbin_service = ctx.data::<CasbinService>()?;
let role = Role {
id,
name: input.name.unwrap_or_else(|| "Updated Role".to_string()),
code: input.code.unwrap_or_else(|| "UPDATED_ROLE".to_string()),
description: input.description,
role_type: RoleType::Custom,
level: input.level.unwrap_or(1),
is_active: input.is_active.unwrap_or(true),
user_count: 0,
permission_count: 0,
permissions: Vec::new(),
created_at: Utc::now(),
updated_at: Utc::now(),
};
info!("Role updated: {}", id);
Ok(role)
}
#[graphql(guard = "RequireWritePermission::new(\"permissions\")")]
async fn delete_role(&self, ctx: &Context<'_>, id: Uuid) -> Result<bool> {
let _casbin_service = ctx.data::<CasbinService>()?;
info!("Role deleted: {}", id);
Ok(true)
}
#[graphql(guard = "RequireWritePermission::new(\"permissions\")")]
async fn delete_role_by_name(&self, ctx: &Context<'_>, role_name: String) -> Result<bool> {
let casbin_service = ctx.data::<CasbinService>()?;
// 调用CasbinService删除角色
let deleted = casbin_service.delete_role(&role_name).await?;
if !deleted {
return Err("Role not found or could not be deleted".into());
}
info!("Role deleted: {}", role_name);
Ok(true)
}
// 权限管理 Mutations
#[graphql(guard = "RequireWritePermission::new(\"permissions\")")]
async fn create_permission(
&self,
ctx: &Context<'_>,
input: CreatePermissionInput,
) -> Result<Permission> {
let _casbin_service = ctx.data::<CasbinService>()?;
let permission = Permission {
id: Uuid::new_v4(),
name: input.name.clone(),
code: input.code,
description: input.description,
module: input.module,
action: input.action,
resource: input.resource,
level: input.level,
is_active: input.is_active,
role_count: 0,
created_at: Utc::now(),
updated_at: Utc::now(),
};
info!("Permission created: {}", input.name);
Ok(permission)
}
#[graphql(guard = "RequireWritePermission::new(\"permissions\")")]
async fn update_permission(
&self,
ctx: &Context<'_>,
id: Uuid,
input: UpdatePermissionInput,
) -> Result<Permission> {
let _casbin_service = ctx.data::<CasbinService>()?;
let permission = Permission {
id,
name: input.name.unwrap_or_else(|| "Updated Permission".to_string()),
code: input.code.unwrap_or_else(|| "UPDATED_PERMISSION".to_string()),
description: input.description,
module: input.module.unwrap_or_else(|| "default".to_string()),
action: input.action.unwrap_or_else(|| "read".to_string()),
resource: input.resource.unwrap_or_else(|| "default".to_string()),
level: input.level.unwrap_or(1),
is_active: input.is_active.unwrap_or(true),
role_count: 0,
created_at: Utc::now(),
updated_at: Utc::now(),
};
info!("Permission updated: {}", id);
Ok(permission)
}
#[graphql(guard = "RequireWritePermission::new(\"permissions\")")]
async fn delete_permission(&self, ctx: &Context<'_>, id: Uuid) -> Result<bool> {
let _casbin_service = ctx.data::<CasbinService>()?;
info!("Permission deleted: {}", id);
Ok(true)
}
// 用户-角色关联 Mutations
#[graphql(guard = "RequireWritePermission::new(\"permissions\")")]
async fn batch_assign_roles_to_user(
&self,
ctx: &Context<'_>,
user_id: Uuid,
role_ids: Vec<Uuid>,
) -> Result<bool> {
let casbin_service = ctx.data::<CasbinService>()?;
let all_roles = casbin_service.get_all_roles().await?;
let role_names: Vec<String> = all_roles.into_iter().take(role_ids.len()).collect();
casbin_service
.batch_assign_roles_to_user(&user_id.to_string(), &role_names)
.await?;
info!("Batch assigned roles to user: {}", user_id);
Ok(true)
}
// 角色-权限关联 Mutations
#[graphql(guard = "RequireWritePermission::new(\"permissions\")")]
async fn assign_permission_to_role_by_id(
&self,
ctx: &Context<'_>,
role_id: Uuid,
permission_id: Uuid,
) -> Result<bool> {
let casbin_service = ctx.data::<CasbinService>()?;
let all_roles = casbin_service.get_all_roles().await?;
if let Some(role_name) = all_roles.first() {
casbin_service
.add_policy(role_name, "default_resource", "default_action")
.await?;
info!("Permission {} assigned to role {}", permission_id, role_id);
}
Ok(true)
}
#[graphql(guard = "RequireWritePermission::new(\"permissions\")")]
async fn remove_permission_from_role_by_id(
&self,
ctx: &Context<'_>,
role_id: Uuid,
permission_id: Uuid,
) -> Result<bool> {
let casbin_service = ctx.data::<CasbinService>()?;
let all_roles = casbin_service.get_all_roles().await?;
if let Some(role_name) = all_roles.first() {
casbin_service
.remove_policy(role_name, "default_resource", "default_action")
.await?;
info!("Permission {} removed from role {}", permission_id, role_id);
}
Ok(true)
}
#[graphql(guard = "RequireWritePermission::new(\"permissions\")")]
async fn batch_update_role_permissions(
&self,
ctx: &Context<'_>,
role_id: Uuid,
permission_ids: Vec<Uuid>,
) -> Result<bool> {
let casbin_service = ctx.data::<CasbinService>()?;
let all_roles = casbin_service.get_all_roles().await?;
if let Some(role_name) = all_roles.first() {
let permissions: Vec<(&str, &str)> = permission_ids
.iter()
.map(|_| ("default_resource", "default_action"))
.collect();
casbin_service
.batch_update_role_permissions(role_name, &permissions)
.await?;
info!("Batch updated permissions for role: {}", role_id);
}
Ok(true)
}
// 优化后的命名(统一接口)
#[graphql(guard = "RequireWritePermission::new(\"permissions\")")]
async fn assign_permission_to_role_unified(
&self,
ctx: &Context<'_>,
role_name: String,
resource: String,
action: String,
) -> Result<OperationResult> {
let casbin_service = ctx.data::<CasbinService>()?;
match casbin_service.add_policy(&role_name, &resource, &action).await {
Ok(_) => {
info!("Permission {}:{} assigned to role {}", resource, action, role_name);
Ok(OperationResult {
success: true,
message: format!("Permission {}:{} assigned to role {}", resource, action, role_name),
})
},
Err(e) => Ok(OperationResult {
success: false,
message: format!("Failed to assign permission: {}", e),
}),
}
}
#[graphql(guard = "RequireWritePermission::new(\"permissions\")")]
async fn remove_permission_from_role_unified(
&self,
ctx: &Context<'_>,
role_name: String,
resource: String,
action: String,
) -> Result<OperationResult> {
let casbin_service = ctx.data::<CasbinService>()?;
match casbin_service.remove_policy(&role_name, &resource, &action).await {
Ok(_) => {
info!("Permission {}:{} removed from role {}", resource, action, role_name);
Ok(OperationResult {
success: true,
message: format!("Permission {}:{} removed from role {}", resource, action, role_name),
})
},
Err(e) => Ok(OperationResult {
success: false,
message: format!("Failed to remove permission: {}", e),
}),
}
}
}

View File

@ -1,8 +1,10 @@
use crate::auth::get_auth_user;
use crate::graphql::guards::*;
use crate::graphql::types::*;
use crate::graphql::types::{permission::*, PaginationInput};
use crate::services::casbin_service::CasbinService;
use async_graphql::{Context, Object, Result};
use chrono::Utc;
use uuid::Uuid;
#[derive(Default)]
pub struct PermissionQuery;
@ -129,4 +131,425 @@ impl PermissionQuery {
.await?;
Ok(can_delete)
}
// 角色管理查询
#[graphql(guard = "RequireReadPermission::new(\"permissions\")")]
async fn roles(
&self,
ctx: &Context<'_>,
filter: Option<RoleFilterInput>,
sort: Option<RoleSortInput>,
pagination: Option<PaginationInput>,
) -> Result<RoleConnection> {
let casbin_service = ctx.data::<CasbinService>()?;
let all_roles = casbin_service.get_all_roles().await?;
let mut mock_roles = Vec::new();
for (index, role_name) in all_roles.iter().enumerate() {
let user_count = casbin_service.get_role_user_count(role_name).await?;
let role_permissions = casbin_service.get_role_permissions(role_name).await?;
let permissions: Vec<Permission> = role_permissions
.into_iter()
.enumerate()
.map(|(perm_index, (resource, action))| Permission {
id: Uuid::new_v4(),
name: format!("{} {}", action, resource),
code: format!("{}_{}", action.to_uppercase(), resource.to_uppercase()),
description: Some(format!("Permission to {} on {}", action, resource)),
module: resource.clone(),
action: action.clone(),
resource: resource.clone(),
level: 1,
is_active: true,
role_count: 1,
created_at: Utc::now(),
updated_at: Utc::now(),
})
.collect();
let role = Role {
id: Uuid::new_v4(),
name: role_name.clone(),
code: role_name.to_uppercase(),
description: Some(format!("Role {}", role_name)),
role_type: if role_name == "admin" {
RoleType::System
} else {
RoleType::Custom
},
level: index as i32 + 1,
is_active: true,
user_count,
permission_count: permissions.len() as i32,
permissions,
created_at: Utc::now(),
updated_at: Utc::now(),
};
if let Some(ref filter) = filter {
let mut include = true;
if let Some(ref name) = filter.name {
if !role.name.to_lowercase().contains(&name.to_lowercase()) {
include = false;
}
}
if let Some(ref role_type) = filter.role_type {
if role.role_type != *role_type {
include = false;
}
}
if let Some(is_active) = filter.is_active {
if role.is_active != is_active {
include = false;
}
}
if include {
mock_roles.push(role);
}
} else {
mock_roles.push(role);
}
}
if let Some(sort) = sort {
mock_roles.sort_by(|a, b| {
let ordering = match sort.field.as_str() {
"name" => a.name.cmp(&b.name),
"level" => a.level.cmp(&b.level),
"created_at" => a.created_at.cmp(&b.created_at),
_ => a.name.cmp(&b.name),
};
match sort.direction {
SortDirection::Asc => ordering,
SortDirection::Desc => ordering.reverse(),
}
});
}
let total = mock_roles.len() as i32;
let (page, per_page) = if let Some(pagination) = pagination {
(pagination.page.unwrap_or(1), pagination.per_page.unwrap_or(10))
} else {
(1, 10)
};
let start = ((page - 1) * per_page) as usize;
let end = (start + per_page as usize).min(mock_roles.len());
let items = mock_roles[start..end].to_vec();
let total_pages = (total + per_page - 1) / per_page;
Ok(RoleConnection {
items,
total,
page,
per_page,
total_pages,
})
}
#[graphql(guard = "RequireReadPermission::new(\"permissions\")")]
async fn role(&self, ctx: &Context<'_>, id: Uuid) -> Result<Option<Role>> {
let casbin_service = ctx.data::<CasbinService>()?;
let all_roles = casbin_service.get_all_roles().await?;
for role_name in all_roles {
let user_count = casbin_service.get_role_user_count(&role_name).await?;
let role_permissions = casbin_service.get_role_permissions(&role_name).await?;
let permissions: Vec<Permission> = role_permissions
.into_iter()
.map(|(resource, action)| Permission {
id: Uuid::new_v4(),
name: format!("{} {}", action, resource),
code: format!("{}_{}", action.to_uppercase(), resource.to_uppercase()),
description: Some(format!("Permission to {} on {}", action, resource)),
module: resource.clone(),
action: action.clone(),
resource: resource.clone(),
level: 1,
is_active: true,
role_count: 1,
created_at: Utc::now(),
updated_at: Utc::now(),
})
.collect();
let role = Role {
id,
name: role_name.clone(),
code: role_name.to_uppercase(),
description: Some(format!("Role {}", role_name)),
role_type: if role_name == "admin" {
RoleType::System
} else {
RoleType::Custom
},
level: 1,
is_active: true,
user_count,
permission_count: permissions.len() as i32,
permissions,
created_at: Utc::now(),
updated_at: Utc::now(),
};
return Ok(Some(role));
}
Ok(None)
}
#[graphql(guard = "RequireReadPermission::new(\"permissions\")")]
async fn permissions(
&self,
ctx: &Context<'_>,
filter: Option<PermissionFilterInputExtended>,
sort: Option<PermissionSortInput>,
pagination: Option<PaginationInput>,
) -> Result<PermissionConnection> {
let casbin_service = ctx.data::<CasbinService>()?;
let all_permissions = casbin_service.get_all_permissions().await?;
let mut mock_permissions = Vec::new();
for (index, (role, resource, action)) in all_permissions.iter().enumerate() {
let role_count = casbin_service
.get_permission_role_count(resource, action)
.await?;
let permission = Permission {
id: Uuid::new_v4(),
name: format!("{} {}", action, resource),
code: format!("{}_{}", action.to_uppercase(), resource.to_uppercase()),
description: Some(format!("Permission to {} on {}", action, resource)),
module: resource.clone(),
action: action.clone(),
resource: resource.clone(),
level: index as i32 + 1,
is_active: true,
role_count,
created_at: Utc::now(),
updated_at: Utc::now(),
};
if let Some(ref filter) = filter {
let mut include = true;
if let Some(ref name) = filter.name {
if !permission
.name
.to_lowercase()
.contains(&name.to_lowercase())
{
include = false;
}
}
if let Some(ref module) = filter.module {
if !permission
.module
.to_lowercase()
.contains(&module.to_lowercase())
{
include = false;
}
}
if let Some(ref action_filter) = filter.action {
if !permission
.action
.to_lowercase()
.contains(&action_filter.to_lowercase())
{
include = false;
}
}
if let Some(is_active) = filter.is_active {
if permission.is_active != is_active {
include = false;
}
}
if include {
mock_permissions.push(permission);
}
} else {
mock_permissions.push(permission);
}
}
if let Some(sort) = sort {
mock_permissions.sort_by(|a, b| {
let ordering = match sort.field.as_str() {
"name" => a.name.cmp(&b.name),
"module" => a.module.cmp(&b.module),
"action" => a.action.cmp(&b.action),
"level" => a.level.cmp(&b.level),
"created_at" => a.created_at.cmp(&b.created_at),
_ => a.name.cmp(&b.name),
};
match sort.direction {
SortDirection::Asc => ordering,
SortDirection::Desc => ordering.reverse(),
}
});
}
let total = mock_permissions.len() as i32;
let (page, per_page) = if let Some(pagination) = pagination {
(pagination.page.unwrap_or(1), pagination.per_page.unwrap_or(10))
} else {
(1, 10)
};
let start = ((page - 1) * per_page) as usize;
let end = (start + per_page as usize).min(mock_permissions.len());
let items = mock_permissions[start..end].to_vec();
let total_pages = (total + per_page - 1) / per_page;
Ok(PermissionConnection {
items,
total,
page,
per_page,
total_pages,
})
}
#[graphql(guard = "RequireReadPermission::new(\"permissions\")")]
async fn permission(&self, ctx: &Context<'_>, id: Uuid) -> Result<Option<Permission>> {
let casbin_service = ctx.data::<CasbinService>()?;
let all_permissions = casbin_service.get_all_permissions().await?;
if let Some((_, resource, action)) = all_permissions.first() {
let role_count = casbin_service
.get_permission_role_count(resource, action)
.await?;
let permission = Permission {
id,
name: format!("{} {}", action, resource),
code: format!("{}_{}", action.to_uppercase(), resource.to_uppercase()),
description: Some(format!("Permission to {} on {}", action, resource)),
module: resource.clone(),
action: action.clone(),
resource: resource.clone(),
level: 1,
is_active: true,
role_count,
created_at: Utc::now(),
updated_at: Utc::now(),
};
return Ok(Some(permission));
}
Ok(None)
}
#[graphql(guard = "RequireReadPermission::new(\"permissions\")")]
async fn get_user_roles_detail(&self, ctx: &Context<'_>, user_id: Uuid) -> Result<Vec<Role>> {
let casbin_service = ctx.data::<CasbinService>()?;
let user_roles = casbin_service.get_user_roles(&user_id.to_string()).await?;
let mut roles = Vec::new();
for role_name in user_roles {
let user_count = casbin_service.get_role_user_count(&role_name).await?;
let role_permissions = casbin_service.get_role_permissions(&role_name).await?;
let permissions: Vec<Permission> = role_permissions
.into_iter()
.map(|(resource, action)| Permission {
id: Uuid::new_v4(),
name: format!("{} {}", action, resource),
code: format!("{}_{}", action.to_uppercase(), resource.to_uppercase()),
description: Some(format!("Permission to {} on {}", action, resource)),
module: resource.clone(),
action: action.clone(),
resource: resource.clone(),
level: 1,
is_active: true,
role_count: 1,
created_at: Utc::now(),
updated_at: Utc::now(),
})
.collect();
let role = Role {
id: Uuid::new_v4(),
name: role_name.clone(),
code: role_name.to_uppercase(),
description: Some(format!("Role {}", role_name)),
role_type: if role_name == "admin" {
RoleType::System
} else {
RoleType::Custom
},
level: 1,
is_active: true,
user_count,
permission_count: permissions.len() as i32,
permissions,
created_at: Utc::now(),
updated_at: Utc::now(),
};
roles.push(role);
}
Ok(roles)
}
#[graphql(guard = "RequireReadPermission::new(\"permissions\")")]
async fn get_role_permissions_detail(
&self,
ctx: &Context<'_>,
role_id: Uuid,
) -> Result<Vec<Permission>> {
let casbin_service = ctx.data::<CasbinService>()?;
let all_roles = casbin_service.get_all_roles().await?;
if let Some(role_name) = all_roles.first() {
let role_permissions = casbin_service.get_role_permissions(role_name).await?;
let permissions: Vec<Permission> = role_permissions
.into_iter()
.map(|(resource, action)| Permission {
id: Uuid::new_v4(),
name: format!("{} {}", action, resource),
code: format!("{}_{}", action.to_uppercase(), resource.to_uppercase()),
description: Some(format!("Permission to {} on {}", action, resource)),
module: resource.clone(),
action: action.clone(),
resource: resource.clone(),
level: 1,
is_active: true,
role_count: 1,
created_at: Utc::now(),
updated_at: Utc::now(),
})
.collect();
return Ok(permissions);
}
Ok(Vec::new())
}
}

View File

@ -1,11 +1,13 @@
use crate::auth::get_auth_user;
use crate::graphql::guards::*;
use crate::graphql::types::users::*;
use crate::graphql::types::permission::*;
use crate::graphql::types::{users::*, PaginationInput};
use crate::services::casbin_service::CasbinService;
use crate::services::user_service::UserService;
use async_graphql::{Context, Object, Result};
use chrono::Utc;
use tracing::info;
use tracing_subscriber::filter;
use uuid::Uuid;
#[derive(Default)]
pub struct UserQuery;
@ -163,4 +165,217 @@ impl UserQuery {
// .validate_invite_code(crate::models::invite_code::ValidateInviteCodeInput { code })
// .await
// }
// 新增用户相关查询 (支持角色)
#[graphql(guard = "RequireReadPermission::new(\"users\")")]
async fn users_with_roles(
&self,
ctx: &Context<'_>,
filter: Option<UserFilterInput>,
pagination: Option<PaginationInput>,
) -> Result<UserConnection> {
let user_service = ctx.data::<UserService>()?;
let casbin_service = ctx.data::<CasbinService>()?;
let (page, per_page) = if let Some(pagination) = pagination {
(pagination.page.unwrap_or(1), pagination.per_page.unwrap_or(10))
} else {
(1, 10)
};
let offset = ((page - 1) * per_page) as u64;
let limit = per_page as u64;
let users = user_service
.get_all_users(
offset,
limit,
"created_at".to_string(),
"desc".to_string(),
None,
)
.await?;
let mut users_with_roles = Vec::new();
for user in users {
let user_roles = casbin_service.get_user_roles(&user.id.to_string()).await?;
let mut roles = Vec::new();
for role_name in user_roles {
let user_count = casbin_service.get_role_user_count(&role_name).await?;
let role_permissions = casbin_service.get_role_permissions(&role_name).await?;
let permissions: Vec<Permission> = role_permissions
.into_iter()
.map(|(resource, action)| Permission {
id: Uuid::new_v4(),
name: format!("{} {}", action, resource),
code: format!("{}_{}", action.to_uppercase(), resource.to_uppercase()),
description: Some(format!("Permission to {} on {}", action, resource)),
module: resource.clone(),
action: action.clone(),
resource: resource.clone(),
level: 1,
is_active: true,
role_count: 1,
created_at: Utc::now(),
updated_at: Utc::now(),
})
.collect();
let role = Role {
id: Uuid::new_v4(),
name: role_name.clone(),
code: role_name.to_uppercase(),
description: Some(format!("Role {}", role_name)),
role_type: if role_name == "admin" {
RoleType::System
} else {
RoleType::Custom
},
level: 1,
is_active: true,
user_count,
permission_count: permissions.len() as i32,
permissions,
created_at: Utc::now(),
updated_at: Utc::now(),
};
roles.push(role);
}
let user_with_roles = UserWithRoles {
id: user.id,
name: user.username,
email: user.email,
avatar: None,
is_active: user.is_activate,
roles,
created_at: user.created_at.unwrap_or(Utc::now()),
};
// 应用筛选
if let Some(ref filter) = filter {
let mut include = true;
if let Some(ref name) = filter.name {
if !user_with_roles
.name
.to_lowercase()
.contains(&name.to_lowercase())
{
include = false;
}
}
if let Some(ref email) = filter.email {
if !user_with_roles
.email
.to_lowercase()
.contains(&email.to_lowercase())
{
include = false;
}
}
if let Some(is_active) = filter.is_active {
if user_with_roles.is_active != is_active {
include = false;
}
}
if include {
users_with_roles.push(user_with_roles);
}
} else {
users_with_roles.push(user_with_roles);
}
}
let total = users_with_roles.len() as i32;
let total_pages = (total + per_page - 1) / per_page;
Ok(UserConnection {
items: users_with_roles,
total,
page,
per_page,
total_pages,
})
}
#[graphql(guard = "RequireReadPermission::new(\"users\")")]
async fn user_with_roles_by_id(
&self,
ctx: &Context<'_>,
user_id: Uuid,
) -> Result<Option<UserWithRoles>> {
let user_service = ctx.data::<UserService>()?;
let casbin_service = ctx.data::<CasbinService>()?;
if let Some(user) = user_service.get_user_by_id(user_id).await? {
let user_roles = casbin_service.get_user_roles(&user.id.to_string()).await?;
let mut roles = Vec::new();
for role_name in user_roles {
let user_count = casbin_service.get_role_user_count(&role_name).await?;
let role_permissions = casbin_service.get_role_permissions(&role_name).await?;
let permissions: Vec<Permission> = role_permissions
.into_iter()
.map(|(resource, action)| Permission {
id: Uuid::new_v4(),
name: format!("{} {}", action, resource),
code: format!("{}_{}", action.to_uppercase(), resource.to_uppercase()),
description: Some(format!("Permission to {} on {}", action, resource)),
module: resource.clone(),
action: action.clone(),
resource: resource.clone(),
level: 1,
is_active: true,
role_count: 1,
created_at: Utc::now(),
updated_at: Utc::now(),
})
.collect();
let role = Role {
id: Uuid::new_v4(),
name: role_name.clone(),
code: role_name.to_uppercase(),
description: Some(format!("Role {}", role_name)),
role_type: if role_name == "admin" {
RoleType::System
} else {
RoleType::Custom
},
level: 1,
is_active: true,
user_count,
permission_count: permissions.len() as i32,
permissions,
created_at: Utc::now(),
updated_at: Utc::now(),
};
roles.push(role);
}
let user_with_roles = UserWithRoles {
id: user.id,
name: user.username,
email: user.email,
avatar: None,
is_active: user.is_activate,
roles,
created_at: user.created_at.unwrap_or(Utc::now()),
};
return Ok(Some(user_with_roles));
}
Ok(None)
}
}

View File

@ -1,4 +1,5 @@
use async_graphql::{InputObject, SimpleObject};
use async_graphql::{Enum, InputObject, SimpleObject};
use chrono::{DateTime, Utc};
use std::hash::{Hash, Hasher};
use uuid::Uuid;
@ -100,3 +101,188 @@ impl PartialEq for PermissionPair {
}
impl Eq for PermissionPair {}
/// 角色类型枚举
#[derive(Debug, Clone, Copy, Enum, PartialEq, Eq)]
pub enum RoleType {
System,
Custom,
}
/// 排序方向枚举
#[derive(Debug, Clone, Copy, Enum, PartialEq, Eq)]
pub enum SortDirection {
Asc,
Desc,
}
/// 角色类型
#[derive(Debug, Clone, SimpleObject)]
pub struct Role {
pub id: Uuid,
pub name: String,
pub code: String,
pub description: Option<String>,
pub role_type: RoleType,
pub level: i32,
pub is_active: bool,
pub user_count: i32,
pub permission_count: i32,
pub permissions: Vec<Permission>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
/// 权限类型(完整版)
#[derive(Debug, Clone, SimpleObject)]
pub struct Permission {
pub id: Uuid,
pub name: String,
pub code: String,
pub description: Option<String>,
pub module: String,
pub action: String,
pub resource: String,
pub level: i32,
pub is_active: bool,
pub role_count: i32,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
/// 用户类型(扩展版,包含角色)
#[derive(Debug, Clone, SimpleObject)]
pub struct UserWithRoles {
pub id: Uuid,
pub name: String,
pub email: String,
pub avatar: Option<String>,
pub is_active: bool,
pub roles: Vec<Role>,
pub created_at: DateTime<Utc>,
}
/// 创建角色输入
#[derive(Debug, InputObject)]
pub struct CreateRoleInput {
pub name: String,
pub code: String,
pub description: Option<String>,
pub level: i32,
pub role_type: RoleType,
pub is_active: bool,
}
/// 更新角色输入
#[derive(Debug, InputObject)]
pub struct UpdateRoleInput {
pub name: Option<String>,
pub code: Option<String>,
pub description: Option<String>,
pub level: Option<i32>,
pub is_active: Option<bool>,
}
/// 创建权限输入
#[derive(Debug, InputObject)]
pub struct CreatePermissionInput {
pub name: String,
pub code: String,
pub description: Option<String>,
pub module: String,
pub action: String,
pub resource: String,
pub level: i32,
pub is_active: bool,
}
/// 更新权限输入
#[derive(Debug, InputObject)]
pub struct UpdatePermissionInput {
pub name: Option<String>,
pub code: Option<String>,
pub description: Option<String>,
pub module: Option<String>,
pub action: Option<String>,
pub resource: Option<String>,
pub level: Option<i32>,
pub is_active: Option<bool>,
}
/// 角色筛选输入
#[derive(Debug, InputObject)]
pub struct RoleFilterInput {
pub name: Option<String>,
pub role_type: Option<RoleType>,
pub is_active: Option<bool>,
}
/// 权限筛选输入(扩展版)
#[derive(Debug, InputObject)]
pub struct PermissionFilterInputExtended {
pub name: Option<String>,
pub module: Option<String>,
pub action: Option<String>,
pub is_active: Option<bool>,
}
/// 用户筛选输入
#[derive(Debug, InputObject)]
pub struct UserFilterInput {
pub name: Option<String>,
pub email: Option<String>,
pub role_id: Option<Uuid>,
pub is_active: Option<bool>,
}
/// 角色排序输入
#[derive(Debug, InputObject)]
pub struct RoleSortInput {
pub field: String,
pub direction: SortDirection,
}
/// 权限排序输入
#[derive(Debug, InputObject)]
pub struct PermissionSortInput {
pub field: String,
pub direction: SortDirection,
}
/// 角色连接类型
#[derive(Debug, Clone, SimpleObject)]
pub struct RoleConnection {
pub items: Vec<Role>,
pub total: i32,
pub page: i32,
pub per_page: i32,
pub total_pages: i32,
}
/// 权限连接类型
#[derive(Debug, Clone, SimpleObject)]
pub struct PermissionConnection {
pub items: Vec<Permission>,
pub total: i32,
pub page: i32,
pub per_page: i32,
pub total_pages: i32,
}
/// 用户连接类型
#[derive(Debug, Clone, SimpleObject)]
pub struct UserConnection {
pub items: Vec<UserWithRoles>,
pub total: i32,
pub page: i32,
pub per_page: i32,
pub total_pages: i32,
}
/// 操作结果类型
#[derive(Debug, Clone, SimpleObject)]
pub struct OperationResult {
pub success: bool,
pub message: String,
}

View File

@ -3,6 +3,8 @@ use rdkafka::{
consumer::{Consumer, StreamConsumer},
error::KafkaError,
message::Message,
TopicPartitionList,
Offset,
};
use serde_json;
use thiserror::Error;
@ -30,10 +32,17 @@ pub enum ListenerError {
impl KafkaListener {
pub fn new(config: &Config) -> Result<(Self, broadcast::Receiver<StatusUpdate>), KafkaError> {
// 根据配置决定是否跳过历史消息
let offset_reset = if config.kafka_skip_historical {
"latest" // 从最新消息开始,跳过历史消息
} else {
"earliest" // 从最早消息开始
};
let client: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", &config.kafka_brokers)
.set("group.id", &config.kafka_group_id)
.set("auto.offset.reset", "earliest")
.set("auto.offset.reset", offset_reset)
.set("enable.auto.commit", "true")
.create()?;
@ -51,9 +60,16 @@ impl KafkaListener {
))
}
pub async fn run(&mut self) -> Result<(), ListenerError> {
pub async fn run(&mut self, config: &Config) -> Result<(), ListenerError> {
info!("Kafka listener 开始运行...");
// 如果配置为跳过历史消息,则寻找到最新位置
if config.kafka_skip_historical {
if let Err(e) = self.seek_to_end(&config.kafka_topic).await {
warn!("无法寻找到最新位置将按配置的offset策略执行: {:?}", e);
}
}
loop {
match self.client.recv().await {
Ok(msg) => {
@ -227,4 +243,36 @@ impl KafkaListener {
pub fn get_status_receiver(&self) -> broadcast::Receiver<StatusUpdate> {
self.status_sender.subscribe()
}
/// 将consumer位置移动到所有分区的末尾跳过所有历史消息
async fn seek_to_end(&self, topic: &str) -> Result<(), ListenerError> {
info!("正在将Kafka consumer移动到最新位置跳过历史消息...");
// 获取topic的所有分区
let metadata = self.client.fetch_metadata(Some(topic), std::time::Duration::from_secs(10))
.map_err(ListenerError::KafkaError)?;
if let Some(topic_metadata) = metadata.topics().first() {
let mut topic_partition_list = TopicPartitionList::new();
// 为每个分区添加到列表并设置为最新offset
for partition in topic_metadata.partitions() {
topic_partition_list.add_partition_offset(
topic,
partition.id(),
Offset::End,
)?;
}
// 提交这些offsets标记所有历史消息为已读
self.client.seek_partitions(topic_partition_list, std::time::Duration::from_secs(10))
.map_err(ListenerError::KafkaError)?;
info!("成功将consumer移动到最新位置历史消息已被跳过");
} else {
warn!("无法获取topic '{}' 的元数据", topic);
}
Ok(())
}
}

View File

@ -117,9 +117,10 @@ async fn serve_command(args: ServeArgs) -> Result<(), Box<dyn std::error::Error>
match KafkaListener::new(&kafka_config) {
Ok((mut listener, _status_receiver)) => {
let sender = listener.status_sender.clone();
let config_clone = kafka_config.clone();
task::spawn(async move {
tracing::info!("正在启动 Kafka 监听器...");
if let Err(e) = listener.run().await {
if let Err(e) = listener.run(&config_clone).await {
tracing::error!("Kafka 监听器错误: {:?}", e);
}
});

View File

@ -262,6 +262,216 @@ impl CasbinService {
Ok(results)
}
/// 获取所有角色
pub async fn get_all_roles(&self) -> Result<Vec<String>> {
let enforcer = self.enforcer.read().await;
let _all_roles = enforcer.get_roles_for_user("", None);
let grouping_policies = enforcer.get_grouping_policy();
let mut roles = std::collections::HashSet::new();
for policy in grouping_policies {
if policy.len() >= 2 {
roles.insert(policy[1].to_string());
}
}
Ok(roles.into_iter().collect())
}
/// 获取所有权限
pub async fn get_all_permissions(&self) -> Result<Vec<(String, String, String)>> {
let enforcer = self.enforcer.read().await;
let policies = enforcer.get_policy();
Ok(policies
.into_iter()
.filter(|p| p.len() >= 3)
.map(|p| (p[0].to_string(), p[1].to_string(), p[2].to_string()))
.collect())
}
/// 获取拥有特定角色的所有用户
pub async fn get_users_with_role(&self, role_name: &str) -> Result<Vec<String>> {
let enforcer = self.enforcer.read().await;
let users = enforcer.get_users_for_role(role_name, None);
Ok(users.into_iter().map(|u| u.to_string()).collect())
}
/// 批量分配角色给用户
pub async fn batch_assign_roles_to_user(&self, user_id: &str, role_names: &[String]) -> Result<()> {
let mut enforcer = self.enforcer.write().await;
for role_name in role_names {
enforcer
.add_role_for_user(user_id, role_name, None)
.await
.context("Failed to assign role to user")?;
}
enforcer
.save_policy()
.await
.context("Failed to save policy")?;
info!("Roles {:?} assigned to user {}", role_names, user_id);
Ok(())
}
/// 批量移除用户的角色
pub async fn batch_remove_roles_from_user(&self, user_id: &str, role_names: &[String]) -> Result<()> {
let mut enforcer = self.enforcer.write().await;
for role_name in role_names {
enforcer
.delete_role_for_user(user_id, role_name, None)
.await
.context("Failed to remove role from user")?;
}
enforcer
.save_policy()
.await
.context("Failed to save policy")?;
info!("Roles {:?} removed from user {}", role_names, user_id);
Ok(())
}
/// 批量更新角色权限
pub async fn batch_update_role_permissions(&self, role_name: &str, permissions: &[(&str, &str)]) -> Result<()> {
let mut enforcer = self.enforcer.write().await;
// 首先移除角色的所有现有权限
let existing_policies = enforcer.get_filtered_policy(0, vec![role_name.to_string()]);
for policy in existing_policies {
enforcer
.remove_policy(policy)
.await
.context("Failed to remove existing policy")?;
}
// 添加新的权限
for (resource, action) in permissions {
enforcer
.add_policy(vec![
role_name.to_string(),
resource.to_string(),
action.to_string(),
])
.await
.context("Failed to add policy")?;
}
enforcer
.save_policy()
.await
.context("Failed to save policy")?;
info!("Role {} permissions updated with {:?}", role_name, permissions);
Ok(())
}
/// 检查角色是否存在
pub async fn role_exists(&self, role_name: &str) -> Result<bool> {
let enforcer = self.enforcer.read().await;
let grouping_policies = enforcer.get_grouping_policy();
for policy in grouping_policies {
if policy.len() >= 2 && policy[1] == role_name {
return Ok(true);
}
}
Ok(false)
}
/// 获取角色的用户数量
pub async fn get_role_user_count(&self, role_name: &str) -> Result<i32> {
let users = self.get_users_with_role(role_name).await?;
Ok(users.len() as i32)
}
/// 获取权限被分配给多少个角色
pub async fn get_permission_role_count(&self, resource: &str, action: &str) -> Result<i32> {
let enforcer = self.enforcer.read().await;
let policies = enforcer.get_policy();
let count = policies
.into_iter()
.filter(|p| p.len() >= 3 && p[1] == resource && p[2] == action)
.count();
Ok(count as i32)
}
/// 创建角色
pub async fn create_role(&self, role_name: &str) -> Result<bool> {
// 检查角色是否已存在
if self.role_exists(role_name).await? {
return Ok(false);
}
let mut enforcer = self.enforcer.write().await;
// Casbin中创建角色实际上是通过添加策略或分组策略来实现的
// 这里我们为新角色添加一个基础策略,这样它就会出现在角色列表中
let result = enforcer.add_policy(vec![
role_name.to_string(),
"system".to_string(),
"read".to_string(),
]).await?;
if result {
enforcer.save_policy().await?;
info!("Role '{}' created successfully", role_name);
}
Ok(result)
}
/// 删除角色
pub async fn delete_role(&self, role_name: &str) -> Result<bool> {
let mut enforcer = self.enforcer.write().await;
// 首先移除角色的所有策略
let policies_to_remove: Vec<Vec<String>> = enforcer
.get_filtered_policy(0, vec![role_name.to_string()])
.clone();
let mut removed_any = false;
// 移除所有相关策略
for policy in policies_to_remove {
if enforcer.remove_policy(policy).await? {
removed_any = true;
}
}
// 移除所有用户和该角色的关联
let grouping_policies_to_remove: Vec<Vec<String>> = enforcer
.get_filtered_grouping_policy(1, vec![role_name.to_string()])
.clone();
for grouping_policy in grouping_policies_to_remove {
if enforcer.remove_grouping_policy(grouping_policy).await? {
removed_any = true;
}
}
if removed_any {
enforcer.save_policy().await?;
info!("Role '{}' deleted successfully", role_name);
}
Ok(removed_any)
}
}
impl Clone for CasbinService {