Yuxi-Know 企业级部署最佳实践报告
执行摘要
本报告基于 Yuxi-Know v0.5.0 版本,系统阐述企业级生产环境部署的完整实施方案,涵盖架构设计、基础设施规划、安全策略、性能优化、运维监控等关键维度。报告结合真实项目经验,提供可操作的部署模板与应急预案,帮助企业在 30 天内完成从 PoC 到生产环境的平滑过渡。
核心交付成果
- 完整的企业级部署架构设计
- 高可用与弹性扩展方案
- 安全加固与合规配置指南
- 性能调优与容量规划建议
- 监控告警与运维自动化体系
- 数据备份与灾难恢复预案
目录
1. 项目背景与目标
1.1 Yuxi-Know 简介回顾
Yuxi-Know 是基于大语言模型的知识库与知识图谱智能体开发平台,核心特性包括:
- 智能体开发:LangGraph v1 多智能体架构
- 知识库(RAG):多格式文档支持,向量/全文/混合检索
- 知识图谱:LightRAG 图谱构建与可视化,参与智能体推理
- 平台工程:Vue + FastAPI 技术栈,Docker 与生产级部署
1.2 企业级部署目标
业务目标
| 目标维度 | 关键指标 | 目标值 |
|---|---|---|
| 可用性 | 系统正常运行时间 | ≥ 99.9% |
| 响应性能 | P95 查询延迟 | ≤ 2 秒 |
| 并发能力 | 支持并发用户数 | ≥ 1000 |
| 数据安全 | 数据泄露事件 | 0 |
| 合规性 | 满足监管要求 | 100% |
技术目标
- 支持水平扩展,业务增长无感知
- 多租户隔离,资源按需分配
- 故障自愈能力,减少人工干预
- 全面可观测性,问题快速定位
- 平滑升级,业务无中断
1.3 典型应用场景
场景一:企业知识管理系统
- 文档数量:50,000+
- 日查询量:100,000+
- 用户数:500+
- 核心需求:快速检索、智能问答、版本管理
场景二:智能客服系统
- 并发对话:200+
- 日查询量:500,000+
- 响应时效:< 3 秒
- 核心需求:实时性、准确性、多轮对话
场景三:研发知识库
- 技术文档:100,000+ 页
- 代码片段:1,000,000+
- 用户数:200+
- 核心需求:代码理解、精准匹配、跨项目关联
2. 系统架构设计
2.1 整体架构
┌─────────────────────────────────────────────────────────────┐
│ 负载均衡层 │
│ (Nginx / HAProxy / Cloud Load Balancer) │
└─────────────────────────────────────────────────────────────┘
│
├──────────┬──────────┬──────────┐
▼ ▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌───────────────┐ ┌──────────────────┐
│ 前端服务实例1 │ │ 前端服务实例2 │ │ 前端服务实例N │ │ 静态资源CDN │
│ (Vue + Nginx) │ │ (Vue + Nginx) │ │ (Vue + Nginx) │ │ (OSS/CloudFront)│
└──────────────────┘ └──────────────────┘ └───────────────┘ └──────────────────┘
│ │ │
└─────────────────────┴─────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 应用服务层 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ API 实例 1 │ │ API 实例 2 │ │ API 实例 N │ │
│ │ (FastAPI) │ │ (FastAPI) │ │ (FastAPI) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 消息队列层 │
│ (Redis Cluster / RabbitMQ / Kafka) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 数据存储层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │PostgreSQL│ │ Redis │ │ Milvus │ │ Neo4j │ │
│ │ 主库集群 │ │ Cluster │ │ Cluster │ │ Cluster │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ ┌──────────┐ ┌──────────┐ │
│ │ 文件存储 │ │ 对象存储 │ │
│ │(本地/MinIO)│ │ (S3/OSS) │ │
│ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 外部服务层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ LLM API │ │Embed API │ │Rerank API│ │ OCR API │ │
│ │(OpenAI等) │ │(本地/云端)│ │(本地/云端)│ │(本地/云端)│ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘2.2 核心组件说明
2.2.1 前端服务层
技术选型:Vue.js + Vite + Nginx
部署架构:
- 多实例部署,至少 2 个实例实现高可用
- 使用 Nginx 反向代理,启用 HTTPS
- 静态资源部署到 CDN 加速访问
配置要点:
# 前端 Nginx 配置示例
server {
listen 443 ssl http2;
server_name yuxi-know.example.com;
ssl_certificate /etc/ssl/certs/yuxi-know.crt;
ssl_certificate_key /etc/ssl/private/yuxi-know.key;
# 静态资源
location /static/ {
alias /app/dist/static/;
expires 30d;
add_header Cache-Control "public, immutable";
}
# API 代理
location /api/ {
proxy_pass http://backend:8000/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 超时配置
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
# WebSocket 支持(如果需要)
location /ws/ {
proxy_pass http://backend:8000/ws/;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}2.2.2 应用服务层
技术选型:FastAPI + Gunicorn + Uvicorn
部署架构:
- 容器化部署,使用 Docker Compose / Kubernetes
- 多实例部署,至少 3 个实例
- 自动扩缩容,基于 CPU/内存/请求量指标
Gunicorn 配置:
# gunicorn.conf.py
import multiprocessing
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
timeout = 120
keepalive = 5
preload_app = TrueFastAPI 配置:
# app/config.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
app = FastAPI(
title="Yuxi-Know API",
description="企业级知识库与知识图谱平台",
version="0.5.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
# CORS 配置
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://yuxi-know.example.com",
"https://app.yuxi-know.example.com"
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 受信任主机配置
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=[
"yuxi-know.example.com",
"*.yuxi-know.example.com"
]
)2.2.3 数据存储层
PostgreSQL 集群
架构设计:
主库(Master) ←─────> 从库1(Replica)
↓ ↑
└──────> 从库2(Replica) ←─┘配置要点:
- 主从复制,读写分离
- 连接池配置,避免连接泄漏
- 定期 VACUUM 与索引优化
配置示例:
-- PostgreSQL 配置优化
-- postgresql.conf
shared_buffers = 4GB
effective_cache_size = 12GB
maintenance_work_mem = 1GB
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 100
random_page_cost = 1.1
effective_io_concurrency = 200
work_mem = 13107kB
min_wal_size = 2GB
max_wal_size = 8GB
-- 连接池配置(使用 PgBouncer)
max_client_conn = 1000
default_pool_size = 100
reserve_pool_size = 10
reserve_pool_timeout = 5
max_db_connections = 50
server_idle_timeout = 600Redis 集群
架构设计:
Redis Cluster
├── Master 1 (Slot 0-5460)
│ ├── Replica 1
│ └── Replica 2
├── Master 2 (Slot 5461-10922)
│ ├── Replica 1
│ └── Replica 2
└── Master 3 (Slot 10923-16383)
├── Replica 1
└── Replica 2配置要点:
- 集群模式,数据分片
- 主从复制,故障自动转移
- 持久化配置(AOF + RDB)
配置示例:
# redis.conf
port 6379
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes
appendfsync everysec
save 900 1
save 300 10
save 60 10000
maxmemory 2gb
maxmemory-policy allkeys-lruMilvus 集群
架构设计:
Milvus Cluster
├── Root Coordinator (1)
├── Data Node (N)
├── Query Node (N)
├── Index Node (N)
├── Proxy (N)
└── Pulsar / Kafka (消息队列)配置要点:
- 分片策略:按业务域或数据量
- 索引类型:IVF_FLAT / HNSW
- 参数调优:nlist, nprobe
配置示例:
# Milvus 连接与索引配置
from pymilvus import Collection, CollectionSchema, FieldSchema, DataType
# 定义字段
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="metadata", dtype=DataType.JSON)
]
schema = CollectionSchema(fields, description="文档向量集合")
# 创建集合
collection = Collection(name="documents", schema=schema)
# 创建索引
index_params = {
"index_type": "HNSW",
"metric_type": "COSINE",
"params": {
"M": 16,
"efConstruction": 256
}
}
collection.create_index(field_name="embedding", index_params=index_params)Neo4j 集群
架构设计:
Neo4j Fabric
├── Fabric Database 1
│ ├── Primary Server 1
│ ├── Primary Server 2
│ └── Read Replica 1
├── Fabric Database 2
│ ├── Primary Server 1
│ ├── Primary Server 2
│ └── Read Replica 1配置要点:
- Fabric 多数据库支持,按业务域隔离
- 读写分离,查询性能优化
- Cypher 查询优化,建立索引
配置示例:
# neo4j.conf
# 内存配置
dbms.memory.heap.initial_size=2g
dbms.memory.heap.max_size=4g
dbms.memory.pagecache.size=8g
# 连接配置
dbms.connector.bolt.thread_pool_max_size=400
dbms.connector.bolt.thread_pool_min_size=5
# 事务配置
dbms.transaction.timeout=60s
dbms.transaction.concurrent.maximum=1000
# 调试配置(生产环境关闭)
dbms.logs.debug.level=INFO2.3 网络架构
┌─────────────────┐
│ Internet/ISP │
└────────┬────────┘
│
┌────────▼────────┐
│ 防火墙/WAF │
└────────┬────────┘
│
┌────────▼────────┐
│ 负载均衡器 │
│ (SLB/ELB) │
└────────┬────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
┌───────▼──────┐ ┌───────▼──────┐ ┌───────▼──────┐
│ DMZ 区域 │ │ 应用区域 │ │ 数据区域 │
│ 前端服务 │ │ API 服务 │ │ 数据库集群 │
│ Nginx │ │ FastAPI │ │ PostgreSQL │
│ 静态资源 │ │ Redis │ │ Milvus │
└──────────────┘ └──────────────┘ │ Neo4j │
└──────────────┘3. 基础设施规划
3.1 云服务商选择
3.1.1 公有云部署
推荐方案:阿里云 / 腾讯云 / AWS
| 资源类型 | 阿里云 | 腾讯云 | AWS |
|---|---|---|---|
| 计算 | ECS | CVM | EC2 |
| 数据库 | RDS PostgreSQL | TencentDB for PostgreSQL | RDS PostgreSQL |
| Redis | Redis 企业版 | TencentDB for Redis | ElastiCache Redis |
| 向量数据库 | 自建 Milvus | 自建 Milvus | OpenSearch Vector |
| 对象存储 | OSS | COS | S3 |
| 负载均衡 | SLB | CLB | ELB/ALB |
| 容器服务 | ACK / ACK Serverless | TKE / TKE Serverless | EKS / Fargate |
成本估算(月度,中小规模):
- 计算资源:¥3,000 - ¥8,000
- 数据库服务:¥2,000 - ¥5,000
- 存储服务:¥500 - ¥1,500
- 网络流量:¥500 - ¥2,000
- 合计:¥6,000 - ¥16,500
3.1.2 私有云部署
推荐方案:OpenShift / Rancher
硬件需求:
- 控制节点:3 台,配置 16C/32G/500G
- 工作节点:6-9 台,配置 32C/64G/1T
- 存储:分布式存储(Ceph/GlusterFS),容量 ≥ 10T
- 网络:万兆网络,冗余设计
成本估算:
- 硬件采购:¥500,000 - ¥800,000
- 软件授权:¥100,000 - ¥200,000(如有)
- 运维成本:¥100,000 - ¥200,000/年
- 首年合计:¥700,000 - ¥1,200,000
3.1.3 混合云部署
适用场景:数据敏感度高,同时需要弹性扩展
架构设计:
- 核心数据与关键服务部署在私有云
- 前端服务与非敏感数据部署在公有云
- 通过 VPN / 专线连接
3.2 容器化部署方案
3.2.1 Docker Compose 部署
适用场景:中小规模部署,快速验证
docker-compose.yml 模板:
version: '3.8'
services:
# 前端服务
frontend:
image: ghcr.io/xerrors/yuxi-know-frontend:0.5.0
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./ssl:/etc/nginx/ssl:ro
depends_on:
- backend
restart: unless-stopped
networks:
- yuxi-know-network
# 后端 API 服务
backend:
image: ghcr.io/xerrors/yuxi-know-backend:0.5.0
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@postgres:5432/yuxiknow
- REDIS_URL=redis://redis:6379/0
- MILVUS_HOST=milvus
- MILVUS_PORT=19530
- NEO4J_URI=bolt://neo4j:7687
- NEO4J_USER=neo4j
- NEO4J_PASSWORD=neo4j_password
volumes:
- ./uploads:/app/uploads
- ./logs:/app/logs
depends_on:
- postgres
- redis
- milvus
- neo4j
restart: unless-stopped
networks:
- yuxi-know-network
# PostgreSQL 数据库
postgres:
image: postgres:15-alpine
ports:
- "5432:5432"
environment:
- POSTGRES_DB=yuxiknow
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres-data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
restart: unless-stopped
networks:
- yuxi-know-network
# Redis 缓存
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: redis-server --appendonly yes --requirepass redis_password
volumes:
- redis-data:/data
restart: unless-stopped
networks:
- yuxi-know-network
# Milvus 向量数据库
milvus:
image: milvusdb/milvus:v2.3.0
ports:
- "19530:19530"
- "9091:9091"
environment:
- ETCD_ENDPOINTS=etcd:2379
- MINIO_ADDRESS=minio:9000
volumes:
- milvus-data:/var/lib/milvus
depends_on:
- etcd
- minio
restart: unless-stopped
networks:
- yuxi-know-network
# Neo4j 图数据库
neo4j:
image: neo4j:5.12.0-community
ports:
- "7474:7474"
- "7687:7687"
environment:
- NEO4J_AUTH=neo4j/neo4j_password
- NEO4J_dbms_memory_heap_max__size=2G
- NEO4J_dbms_memory_pagecache_size=2G
volumes:
- neo4j-data:/data
- neo4j-logs:/logs
restart: unless-stopped
networks:
- yuxi-know-network
# Milvus 依赖服务
etcd:
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
volumes:
- etcd-data:/etcd
networks:
- yuxi-know-network
minio:
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
- MINIO_ACCESS_KEY=minioadmin
- MINIO_SECRET_KEY=minioadmin
command: server /data --console-address ":9001"
ports:
- "9000:9000"
- "9001:9001"
volumes:
- minio-data:/data
networks:
- yuxi-know-network
volumes:
postgres-data:
redis-data:
milvus-data:
neo4j-data:
neo4j-logs:
etcd-data:
minio-data:
networks:
yuxi-know-network:
driver: bridge3.2.2 Kubernetes 部署
适用场景:大规模部署,自动扩缩容
部署架构:
Kubernetes Cluster
├── Namespace: yuxi-know-production
│ ├── Deployment: frontend
│ ├── Deployment: backend
│ ├── StatefulSet: postgres
│ ├── StatefulSet: redis
│ ├── StatefulSet: milvus
│ └── StatefulSet: neo4j
├── ConfigMaps & Secrets
├── Services (ClusterIP / LoadBalancer)
├── Ingress Controller
└── HPA (Horizontal Pod Autoscaler)关键配置文件:
backend-deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
name: yuxi-know-backend
namespace: yuxi-know-production
spec:
replicas: 3
selector:
matchLabels:
app: yuxi-know-backend
template:
metadata:
labels:
app: yuxi-know-backend
spec:
containers:
- name: backend
image: ghcr.io/xerrors/yuxi-know-backend:0.5.0
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: yuxi-know-secrets
key: database-url
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: yuxi-know-secrets
key: redis-url
- name: MILVUS_HOST
value: "milvus-service"
- name: MILVUS_PORT
value: "19530"
- name: NEO4J_URI
value: "bolt://neo4j-service:7687"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
volumeMounts:
- name: uploads
mountPath: /app/uploads
- name: logs
mountPath: /app/logs
volumes:
- name: uploads
persistentVolumeClaim:
claimName: uploads-pvc
- name: logs
persistentVolumeClaim:
claimName: logs-pvc
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: yuxi-know-backend-hpa
namespace: yuxi-know-production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: yuxi-know-backend
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 803.3 CI/CD 流程
3.3.1 流水线设计
代码提交 (Git)
↓
触发构建 (GitLab CI / GitHub Actions)
↓
单元测试 / 集成测试
↓
构建 Docker 镜像
↓
推送镜像到镜像仓库
↓
部署到测试环境
↓
自动化测试 / 人工验证
↓
部署到生产环境(蓝绿发布)3.3.2 GitLab CI 配置示例
.gitlab-ci.yml:
stages:
- test
- build
- deploy-staging
- deploy-production
variables:
DOCKER_DRIVER: overlay2
DOCKER_TLS_CERTDIR: "/certs"
REGISTRY_URL: registry.example.com
BACKEND_IMAGE: ${REGISTRY_URL}/yuxi-know-backend
FRONTEND_IMAGE: ${REGISTRY_URL}/yuxi-know-frontend
# 单元测试
unit-test:
stage: test
image: python:3.11-slim
before_script:
- pip install -r requirements.txt
script:
- pytest tests/unit/ -v --cov
coverage: '/TOTAL.*\s+(\d+%)$/'
artifacts:
reports:
coverage_report:
coverage_format: cobertura
path: coverage.xml
only:
- merge_requests
- main
# 集成测试
integration-test:
stage: test
image: docker:24.0
services:
- docker:24.0-dind
before_script:
- docker compose -f docker-compose.test.yml up -d
- sleep 30
script:
- docker compose -f docker-compose.test.yml exec backend pytest tests/integration/ -v
after_script:
- docker compose -f docker-compose.test.yml down
only:
- merge_requests
- main
# 构建后端镜像
build-backend:
stage: build
image: docker:24.0
services:
- docker:24.0-dind
before_script:
- echo $CI_REGISTRY_PASSWORD | docker login -u $CI_REGISTRY_USER --password-stdin $CI_REGISTRY
script:
- docker build -t ${BACKEND_IMAGE}:${CI_COMMIT_SHORT_SHA} -t ${BACKEND_IMAGE}:latest ./backend
- docker push ${BACKEND_IMAGE}:${CI_COMMIT_SHORT_SHA}
- docker push ${BACKEND_IMAGE}:latest
only:
- main
# 构建前端镜像
build-frontend:
stage: build
image: node:20-alpine
script:
- cd frontend
- npm ci
- npm run build
- docker build -t ${FRONTEND_IMAGE}:${CI_COMMIT_SHORT_SHA} -t ${FRONTEND_IMAGE}:latest .
- echo $CI_REGISTRY_PASSWORD | docker login -u $CI_REGISTRY_USER --password-stdin $CI_REGISTRY
- docker push ${FRONTEND_IMAGE}:${CI_COMMIT_SHORT_SHA}
- docker push ${FRONTEND_IMAGE}:latest
only:
- main
# 部署到测试环境
deploy-staging:
stage: deploy-staging
image: alpine/helm:3.13.0
environment:
name: staging
url: https://staging.yuxi-know.example.com
before_script:
- apk add --no-cache curl
- curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
- chmod +x kubectl
- mkdir -p ~/.kube
- echo $KUBE_CONFIG_STAGING | base64 -d > ~/.kube/config
script:
- helm upgrade --install yuxi-know ./helm-chart \
--namespace staging \
--set image.tag=${CI_COMMIT_SHORT_SHA} \
--set environment=staging \
--wait --timeout 10m
only:
- main
# 部署到生产环境
deploy-production:
stage: deploy-production
image: alpine/helm:3.13.0
environment:
name: production
url: https://yuxi-know.example.com
before_script:
- apk add --no-cache curl
- curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
- chmod +x kubectl
- mkdir -p ~/.kube
- echo $KUBE_CONFIG_PRODUCTION | base64 -d > ~/.kube/config
script:
- helm upgrade --install yuxi-know ./helm-chart \
--namespace production \
--set image.tag=${CI_COMMIT_SHORT_SHA} \
--set environment=production \
--wait --timeout 15m
when: manual
only:
- main4. 安全与合规
4.1 身份认证与授权
4.1.1 认证机制
多因素认证(MFA):
- 支持 TOTP(基于时间的一次性密码)
- 支持 WebAuthn / FIDO2 硬件密钥
- 支持 SMS 验证码(备用方案)
单点登录(SSO)集成:
- SAML 2.0 协议
- OAuth 2.0 / OpenID Connect
- 支持 LDAP / Active Directory 集成
配置示例(FastAPI):
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from fastapi.security.api_key import APIKeyHeader
from jose import JWTError, jwt
import hashlib
import hmac
# JWT 配置
SECRET_KEY = "your-secret-key-here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login")
api_key_header = APIKeyHeader(name="X-API-Key")
def verify_token(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return username
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def verify_api_key(api_key: str = Depends(api_key_header)):
# 验证 API Key
if not hmac.compare_digest(api_key, "your-secure-api-key"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid API Key"
)
return api_key
# 路由保护
@app.get("/api/documents/")
def read_documents(
username: str = Depends(verify_token),
api_key: str = Depends(verify_api_key)
):
return {"message": f"Hello {username}"}4.1.2 权限控制
基于角色的访问控制(RBAC):
角色层级
├── 超级管理员
│ ├── 用户管理
│ ├── 系统配置
│ └── 完全访问权限
├── 管理员
│ ├── 知识库管理
│ ├── 智能体管理
│ └── 部分访问权限
├── 编辑者
│ ├── 文档上传
│ ├── 内容编辑
│ └── 知识库修改权限
├── 查看者
│ ├── 文档检索
│ ├── 问答查询
│ └── 只读权限
└── 匿名用户
└── 公开知识库访问权限配置示例:
from enum import Enum
from typing import List
class Role(str, Enum):
SUPER_ADMIN = "super_admin"
ADMIN = "admin"
EDITOR = "editor"
VIEWER = "viewer"
ANONYMOUS = "anonymous"
class Permission(str, Enum):
# 用户管理
USER_CREATE = "user:create"
USER_READ = "user:read"
USER_UPDATE = "user:update"
USER_DELETE = "user:delete"
# 知识库管理
KB_CREATE = "kb:create"
KB_READ = "kb:read"
KB_UPDATE = "kb:update"
KB_DELETE = "kb:delete"
# 文档管理
DOC_UPLOAD = "doc:upload"
DOC_READ = "doc:read"
DOC_UPDATE = "doc:update"
DOC_DELETE = "doc:delete"
# 智能体管理
AGENT_CREATE = "agent:create"
AGENT_READ = "agent:read"
AGENT_UPDATE = "agent:update"
AGENT_DELETE = "agent:delete"
# 角色权限映射
ROLE_PERMISSIONS: dict[Role, List[Permission]] = {
Role.SUPER_ADMIN: [p for p in Permission], # 所有权限
Role.ADMIN: [
Permission.USER_READ,
Permission.KB_CREATE, Permission.KB_READ, Permission.KB_UPDATE, Permission.KB_DELETE,
Permission.DOC_UPLOAD, Permission.DOC_READ, Permission.DOC_UPDATE, Permission.DOC_DELETE,
Permission.AGENT_CREATE, Permission.AGENT_READ, Permission.AGENT_UPDATE, Permission.AGENT_DELETE,
],
Role.EDITOR: [
Permission.KB_READ, Permission.KB_UPDATE,
Permission.DOC_UPLOAD, Permission.DOC_READ, Permission.DOC_UPDATE,
],
Role.VIEWER: [
Permission.KB_READ,
Permission.DOC_READ,
Permission.AGENT_READ,
],
Role.ANONYMOUS: [
Permission.KB_READ,
Permission.DOC_READ,
],
}
def has_permission(role: Role, permission: Permission) -> bool:
return permission in ROLE_PERMISSIONS.get(role, [])4.2 数据加密
4.2.1 传输加密
TLS/SSL 配置:
- 强制使用 HTTPS(TLS 1.2+)
- 禁用弱加密套件
- 启用 HSTS(HTTP Strict Transport Security)
Nginx SSL 配置:
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers 'ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384';
ssl_prefer_server_ciphers off;
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
ssl_session_tickets off;
# HSTS
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always;4.2.2 存储加密
数据库加密:
- PostgreSQL:使用 pgcrypto 扩展加密敏感字段
- Milvus:启用 at-rest encryption
- Neo4j:启用数据库加密
PostgreSQL 加密示例:
-- 启用 pgcrypto 扩展
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- 加密函数
CREATE OR REPLACE FUNCTION encrypt_data(text, text)
RETURNS bytea AS $$
SELECT pgp_sym_encrypt($1, $2)
$$ LANGUAGE sql SECURITY DEFINER;
-- 解密函数
CREATE OR REPLACE FUNCTION decrypt_data(bytea, text)
RETURNS text AS $$
SELECT pgp_sym_decrypt($1, $2)
$$ LANGUAGE sql SECURITY DEFINER;
-- 使用示例
INSERT INTO documents (title, content, encrypted_content)
VALUES (
'敏感文档',
'原始内容',
encrypt_data('敏感信息', 'encryption-key')
);
-- 查询时解密
SELECT
title,
decrypt_data(encrypted_content, 'encryption-key') AS content
FROM documents;文件存储加密:
- 使用加密文件系统(如 LUKS)
- 或在应用层加密后存储
应用层加密示例(Python):
from cryptography.fernet import Fernet
import base64
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import os
# 生成密钥
def generate_key(password: str, salt: bytes = None) -> bytes:
if salt is None:
salt = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key
# 加密文件
def encrypt_file(file_path: str, key: bytes) -> bytes:
fernet = Fernet(key)
with open(file_path, 'rb') as file:
original_data = file.read()
encrypted_data = fernet.encrypt(original_data)
return encrypted_data
# 解密文件
def decrypt_file(encrypted_data: bytes, key: bytes) -> bytes:
fernet = Fernet(key)
decrypted_data = fernet.decrypt(encrypted_data)
return decrypted_data4.3 网络安全
4.3.1 防火墙规则
默认拒绝原则:
- 默认拒绝所有入站连接
- 仅开放必要端口(HTTPS 443, SSH 22)
- 使用 IP 白名单限制管理端口访问
防火墙规则示例:
# 清空现有规则
iptables -F
iptables -X
# 默认策略
iptables -P INPUT DROP
iptables -P FORWARD DROP
iptables -P OUTPUT ACCEPT
# 允许本地回环
iptables -A INPUT -i lo -j ACCEPT
iptables -A OUTPUT -o lo -j ACCEPT
# 允许已建立连接
iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT
# 允许 SSH(限制 IP)
iptables -A INPUT -p tcp -s 10.0.0.0/8 --dport 22 -j ACCEPT
# 允许 HTTPS
iptables -A INPUT -p tcp --dport 443 -j ACCEPT
# 允许 HTTP(重定向到 HTTPS)
iptables -A INPUT -p tcp --dport 80 -j ACCEPT
# 保存规则
iptables-save > /etc/iptables/rules.v44.3.2 Web 应用防火墙(WAF)
推荐方案:
- ModSecurity + Nginx
- Cloudflare WAF
- AWS WAF
ModSecurity 配置示例:
# nginx.conf
modsecurity on;
modsecurity_rules_file /etc/nginx/modsecurity.conf;
# /etc/nginx/modsecurity.conf
SecRuleEngine On
SecRequestBodyAccess On
SecResponseBodyAccess Off
SecDataDir /tmp/
SecTmpDir /tmp/
SecAuditLog /var/log/nginx/modsec_audit.log
# 基础规则
SecRule REQUEST_METHOD "^(TRACE|TRACK)$" \
"id:1001,phase:1,deny,status:403,msg:'TRACE/TRACK not allowed'"
SecRule ARGS "@detectSQLi" \
"id:1002,phase:2,deny,status:403,msg:'SQL Injection detected'"
SecRule ARGS "@detectXSS" \
"id:1003,phase:2,deny,status:403,msg:'XSS detected'"4.4 日志审计
4.4.1 日志分类
| 日志类型 | 存储位置 | 保留周期 | 用途 |
|---|---|---|---|
| 访问日志 | Nginx / 应用日志 | 30 天 | 访问分析、异常检测 |
| 审计日志 | 独立审计库 | 180 天 | 合规审计、追溯 |
| 错误日志 | 应用日志 | 90 天 | 问题排查、性能优化 |
| 安全日志 | SIEM 系统 | 365 天 | 安全事件响应 |
4.4.2 审计日志设计
审计事件类型:
- 用户登录/登出
- 权限变更
- 数据访问
- 系统配置变更
- 文档上传/删除
- 智能体创建/修改
审计日志表结构:
CREATE TABLE audit_logs (
id BIGSERIAL PRIMARY KEY,
event_type VARCHAR(50) NOT NULL,
user_id BIGINT,
username VARCHAR(100),
ip_address INET,
user_agent TEXT,
request_method VARCHAR(10),
request_path TEXT,
request_params JSONB,
response_status INTEGER,
error_message TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
INDEX idx_event_type (event_type),
INDEX idx_user_id (user_id),
INDEX idx_created_at (created_at)
);
-- 创建审计函数
CREATE OR REPLACE FUNCTION log_audit_event()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO audit_logs (
event_type,
user_id,
username,
ip_address,
user_agent,
request_method,
request_path,
request_params,
response_status,
created_at
) VALUES (
TG_ARGV[0],
current_user_id(),
current_username(),
client_ip(),
user_agent(),
request_method(),
request_path(),
request_params(),
response_status(),
NOW()
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;4.5 合规性要求
4.5.1 GDPR 合规
数据主体权利:
- 访问权:用户可请求访问其个人数据
- 更正权:用户可要求更正不准确的个人数据
- 删除权:用户可要求删除其个人数据(被遗忘权)
- 数据可携带权:用户可请求导出其数据
实现示例:
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
router = APIRouter()
@router.get("/api/user/data-export")
def export_user_data(
username: str = Depends(verify_token),
db: Session = Depends(get_db)
):
# 查询用户所有数据
user_data = {
"profile": get_user_profile(username, db),
"documents": get_user_documents(username, db),
"queries": get_user_queries(username, db),
"settings": get_user_settings(username, db),
}
# 生成 JSON 导出文件
import json
from io import BytesIO
buffer = BytesIO()
json.dump(user_data, buffer, ensure_ascii=False, indent=2)
buffer.seek(0)
from fastapi.responses import Response
return Response(
content=buffer.getvalue(),
media_type="application/json",
headers={
"Content-Disposition": f"attachment; filename=user_data_{username}.json"
}
)
@router.delete("/api/user/data-delete")
def delete_user_data(
username: str = Depends(verify_token),
db: Session = Depends(get_db)
):
# 删除用户所有数据
delete_user_profile(username, db)
delete_user_documents(username, db)
delete_user_queries(username, db)
delete_user_settings(username, db)
# 记录删除操作
log_audit_event(
event_type="data_deletion",
username=username
)
return {"message": "User data deleted successfully"}4.5.2 等保 2.0 合规
基本要求:
- 身份鉴别:双因素认证
- 访问控制:最小权限原则
- 安全审计:完整的审计日志
- 数据完整性:数据校验与备份
- 数据备份:定期备份与恢复测试
等保测评项对应:
| 等保要求 | 实施措施 |
|---|---|
| 身份鉴别 | JWT + MFA + 密码复杂度策略 |
| 访问控制 | RBAC + IP 白名单 + API Key |
| 安全审计 | 审计日志 + SIEM 集成 |
| 入侵防范 | WAF + IPS + 异常检测 |
| 数据完整性 | 数据校验 + 备份验证 |
| 数据备份 | 每日备份 + 异地备份 |
| 个人信息保护 | 数据加密 + 访问控制 + 审计 |
5. 性能优化
5.1 数据库优化
5.1.1 PostgreSQL 优化
索引策略:
-- 文档表索引
CREATE INDEX idx_documents_kb_id ON documents(kb_id);
CREATE INDEX idx_documents_created_at ON documents(created_at DESC);
CREATE INDEX idx_documents_embedding ON documents USING ivfflat(embedding vector_cosine_ops) WITH (lists = 100);
-- 查询表索引
CREATE INDEX idx_queries_user_id ON queries(user_id);
CREATE INDEX idx_queries_created_at ON queries(created_at DESC);
CREATE INDEX idx_queries_kb_id ON queries(kb_id);
-- 复合索引
CREATE INDEX idx_documents_kb_status ON documents(kb_id, status, created_at DESC);查询优化:
-- 使用 EXPLAIN ANALYZE 分析查询
EXPLAIN ANALYZE
SELECT d.*, e.similarity
FROM documents d
JOIN (
SELECT id, 1 - (embedding <=> '[0.1,0.2,...]') AS similarity
FROM documents
WHERE kb_id = 123
ORDER BY embedding <=> '[0.1,0.2,...]'
LIMIT 100
) e ON d.id = e.id
WHERE d.status = 'active';连接池优化:
# 使用 SQLAlchemy 连接池
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=3600,
echo=False
)5.1.2 Redis 优化
内存优化:
# 使用压缩列表优化内存
# redis.conf
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
# Python Redis 客户端配置
import redis
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
password='password',
socket_keepalive=True,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
health_check_interval=30
)
# 使用 Pipeline 批量操作
def cache_documents_batch(documents):
pipe = redis_client.pipeline()
for doc in documents:
pipe.setex(
f"doc:{doc['id']}",
3600, # 1 小时过期
json.dumps(doc)
)
pipe.execute()5.1.3 Milvus 优化
索引类型选择:
| 索引类型 | 特点 | 适用场景 |
|---|---|---|
| IVF_FLAT | 精度高,查询速度中等 | 小规模数据(< 100 万) |
| IVF_SQ8 | 精度中等,查询速度快 | 中等规模数据(100 万 - 1000 万) |
| HNSW | 精度高,查询速度快 | 大规模数据(> 1000 万) |
| ANNOY | 构建快,内存占用低 | 离线场景 |
Milvus 配置优化:
from pymilvus import Collection
collection = Collection("documents")
# HNSW 索引配置(推荐用于生产环境)
index_params = {
"index_type": "HNSW",
"metric_type": "COSINE",
"params": {
"M": 16, # 每个节点的最大连接数
"efConstruction": 256 # 构建索引时的搜索范围
}
}
collection.create_index(
field_name="embedding",
index_params=index_params
)
# 搜索参数配置
search_params = {
"metric_type": "COSINE",
"params": {
"ef": 128 # 搜索时的范围,越大精度越高但速度越慢
}
}
results = collection.search(
data=[query_embedding],
anns_field="embedding",
param=search_params,
limit=10,
expr=f"kb_id == {kb_id} AND status == 'active'",
consistency_level="Strong"
)5.1.4 Neo4j 优化
索引与约束:
// 创建唯一约束(自动创建索引)
CREATE CONSTRAINT FOR (d:Document) REQUIRE d.id IS UNIQUE;
CREATE CONSTRAINT FOR (u:User) REQUIRE u.id IS UNIQUE;
// 创建索引
CREATE INDEX FOR (d:Document) ON (d.kb_id);
CREATE INDEX FOR (d:Document) ON (d.created_at);
// 全文索引
CREATE FULLTEXT INDEX document_fulltext FOR (d:Document) ON EACH [d.title, d.content];查询优化:
// 使用 PROFILE 分析查询
PROFILE
MATCH (d:Document {kb_id: $kb_id})
WHERE d.status = 'active'
RETURN d
LIMIT 100;
// 优化后的查询
PROFILE
MATCH (d:Document)
WHERE d.kb_id = $kb_id
AND d.status = 'active'
RETURN d
LIMIT 100;
// 使用索引
PROFILE
MATCH (d:Document)
USING INDEX d:Document(kb_id)
WHERE d.kb_id = $kb_id
AND d.status = 'active'
RETURN d
LIMIT 100;
// 使用参数化查询
MATCH (d:Document {id: $doc_id})
RETURN d5.2 应用层优化
5.2.1 缓存策略
多级缓存架构:
┌─────────────┐
│ 应用层 │
│ (Redis) │
└─────────────┘
│
├─ L1: 本地缓存 (LRU)
├─ L2: Redis 缓存
└─ L3: CDN 缓存 (静态资源)缓存实现:
from functools import lru_cache
from typing import Optional
import hashlib
import json
import redis
# L1: 本地缓存
@lru_cache(maxsize=1000)
def get_document_cached_local(doc_id: int) -> Optional[dict]:
return get_document_from_db(doc_id)
# L2: Redis 缓存
class CacheManager:
def __init__(self, redis_client):
self.redis = redis_client
def get_document(self, doc_id: int) -> Optional[dict]:
cache_key = f"doc:{doc_id}"
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
return None
def set_document(self, doc_id: int, doc: dict, ttl: int = 3600):
cache_key = f"doc:{doc_id}"
self.redis.setex(cache_key, ttl, json.dumps(doc))
def invalidate_document(self, doc_id: int):
cache_key = f"doc:{doc_id}"
self.redis.delete(cache_key)
def invalidate_kb(self, kb_id: int):
pattern = f"doc:kb:{kb_id}:*"
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
# 使用示例
cache_manager = CacheManager(redis_client)
def get_document(doc_id: int) -> dict:
# 先查 L1 缓存
doc = get_document_cached_local(doc_id)
if doc:
return doc
# 再查 L2 缓存
doc = cache_manager.get_document(doc_id)
if doc:
return doc
# 最后查数据库
doc = get_document_from_db(doc_id)
if doc:
# 写入缓存
cache_manager.set_document(doc_id, doc)
return doc
return None5.2.2 异步处理
任务队列设计:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Web 应用 │───▶│ 消息队列 │───▶│ Worker │
│ (FastAPI) │ │ (Redis/RQ) │ │ (Celery) │
└─────────────┘ └─────────────┘ └─────────────┘Celery 配置:
# celery_app.py
from celery import Celery
celery_app = Celery(
"yuxi_know",
broker="redis://:password@localhost:6379/0",
backend="redis://:password@localhost:6379/1",
include=[
"app.tasks.document_tasks",
"app.tasks.knowledge_graph_tasks",
"app.tasks.embedding_tasks"
]
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Asia/Shanghai",
enable_utc=True,
task_routes={
"app.tasks.document_tasks.*": {"queue": "documents"},
"app.tasks.knowledge_graph_tasks.*": {"queue": "graph"},
"app.tasks.embedding_tasks.*": {"queue": "embeddings"},
},
worker_prefetch_multiplier=4,
task_acks_late=True,
worker_max_tasks_per_child=1000,
)
# tasks.py
from celery import shared_task
import logging
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=3)
def process_document_embedding(self, doc_id: int):
try:
# 处理文档嵌入
doc = get_document(doc_id)
embedding = generate_embedding(doc['content'])
# 保存到向量数据库
save_embedding(doc_id, embedding)
logger.info(f"Document {doc_id} embedding processed successfully")
return {"status": "success", "doc_id": doc_id}
except Exception as exc:
logger.error(f"Error processing document {doc_id}: {str(exc)}")
# 重试
raise self.retry(exc=exc, countdown=60)
@shared_task(bind=True)
def build_knowledge_graph(self, kb_id: int):
try:
# 构建知识图谱
documents = get_kb_documents(kb_id)
graph = build_graph_from_documents(documents)
# 保存到 Neo4j
save_graph_to_neo4j(graph)
logger.info(f"Knowledge graph for KB {kb_id} built successfully")
return {"status": "success", "kb_id": kb_id}
except Exception as exc:
logger.error(f"Error building graph for KB {kb_id}: {str(exc)}")
raise self.retry(exc=exc, countdown=300)5.2.3 并发控制
FastAPI 并发配置:
from fastapi import FastAPI, BackgroundTasks
from fastapi.concurrency import run_in_threadpool
import asyncio
app = FastAPI()
# 限制并发请求数
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
# 限流
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/api/search")
@limiter.limit("100/minute") # 每分钟 100 次请求
async def search_documents(
query: str,
request: Request
):
# 异步处理
results = await run_in_threadpool(
search_documents_sync,
query
)
return results
# 使用 BackgroundTasks 处理耗时任务
@app.post("/api/documents/upload")
async def upload_document(
file: UploadFile,
background_tasks: BackgroundTasks,
current_user: str = Depends(verify_token)
):
# 保存文件
doc_id = save_file(file)
# 后台处理
background_tasks.add_task(
process_document_embedding,
doc_id
)
background_tasks.add_task(
extract_knowledge_graph,
doc_id
)
return {"doc_id": doc_id, "status": "processing"}5.3 前端优化
5.3.1 代码分割
Vite 配置:
// vite.config.js
import { defineConfig } from 'vite'
import react from '@vitejs/plugin-react'
export default defineConfig({
plugins: [react()],
build: {
rollupOptions: {
output: {
manualChunks: {
'react-vendor': ['react', 'react-dom', 'react-router-dom'],
'ui-components': ['antd', '@ant-design/icons'],
'charts': ['echarts', 'react-echarts'],
'editor': ['monaco-editor'],
'utils': ['lodash', 'dayjs', 'axios']
}
}
},
chunkSizeWarningLimit: 1000
}
})5.3.2 资源优化
图片优化:
// 图片懒加载
import { LazyLoadImage } from 'react-lazy-load-image-component'
function DocumentCard({ document }) {
return (
<div>
<LazyLoadImage
src={document.thumbnail}
effect="blur"
threshold={100}
/>
<h3>{document.title}</h3>
</div>
)
}
// WebP 格式
const images = [
{
src: '/image.webp',
type: 'image/webp',
fallback: '/image.jpg'
}
]
function OptimizedImage({ src, webp, alt }) {
return (
<picture>
<source srcSet={webp} type="image/webp" />
<img src={src} alt={alt} loading="lazy" />
</picture>
)
}5.3.3 API 请求优化
请求合并与缓存:
import { useQuery } from '@tanstack/react-query'
import { debounce } from 'lodash'
// 使用 React Query 管理请求
const queryClient = new QueryClient({
defaultOptions: {
queries: {
staleTime: 5 * 60 * 1000, // 5 分钟
cacheTime: 10 * 60 * 1000, // 10 分钟
refetchOnWindowFocus: false,
},
},
})
// 防抖搜索
const debouncedSearch = debounce((query) => {
queryClient.invalidateQueries(['documents', { query }])
}, 300)
function SearchBar() {
const [searchQuery, setSearchQuery] = useState('')
const handleChange = (e) => {
setSearchQuery(e.target.value)
debouncedSearch(e.target.value)
}
return (
<input
type="text"
value={searchQuery}
onChange={handleChange}
placeholder="搜索文档..."
/>
)
}
// 并行请求
async function fetchDocumentDetail(docId) {
const [document, relatedDocs, graph] = await Promise.all([
fetch(`/api/documents/${docId}`).then(r => r.json()),
fetch(`/api/documents/${docId}/related`).then(r => r.json()),
fetch(`/api/graph/${docId}`).then(r => r.json()),
])
return { document, relatedDocs, graph }
}
function DocumentDetail({ docId }) {
const { data, isLoading } = useQuery({
queryKey: ['document', docId],
queryFn: () => fetchDocumentDetail(docId),
})
if (isLoading) return <Loading />
return <DocumentView {...data} />
}6. 监控与运维
6.1 监控体系
6.1.1 指标监控
Prometheus + Grafana 架构:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 应用节点 │───▶│ Prometheus │───▶│ Grafana │
│ (Exporter) │ │ (TSDB) │ │ (Dashboard) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
└─────────────────────────────────────┘
│
▼
┌─────────────┐
│ AlertMgr │
│ (告警管理) │
└─────────────┘关键监控指标:
| 指标类型 | 指标名称 | 告警阈值 |
|---|---|---|
| 系统指标 | CPU 使用率 | > 80% 持续 5 分钟 |
| 系统指标 | 内存使用率 | > 85% 持续 5 分钟 |
| 系统指标 | 磁盘使用率 | > 80% |
| 应用指标 | API 响应时间 P95 | > 2 秒 |
| 应用指标 | API 错误率 | > 1% |
| 应用指标 | 并发连接数 | > 80% 最大值 |
| 数据库指标 | 慢查询数量 | > 10/分钟 |
| 数据库指标 | 连接池使用率 | > 80% |
| Redis 指标 | 内存使用率 | > 80% |
| Redis 指标 | 命中率 | < 90% |
Prometheus 配置示例:
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alert_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
scrape_configs:
- job_name: 'yuxi-know-backend'
static_configs:
- targets: ['backend:8000']
metrics_path: '/metrics'
scrape_interval: 10s
- job_name: 'postgres'
static_configs:
- targets: ['postgres_exporter:9187']
- job_name: 'redis'
static_configs:
- targets: ['redis_exporter:9121']
- job_name: 'node'
static_configs:
- targets: ['node_exporter:9100']6.1.2 日志管理
ELK Stack 架构:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 应用日志 │───▶│ Filebeat │───▶│ Logstash │
│ (JSON) │ │ (采集) │ │ (处理) │
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Elasticsearch│
│ (存储) │
└─────────────┘
│
▼
┌─────────────┐
│ Kibana │
│ (可视化) │
└─────────────┘日志格式规范:
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_obj = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# 添加上下文信息
if hasattr(record, 'user_id'):
log_obj['user_id'] = record.user_id
if hasattr(record, 'kb_id'):
log_obj['kb_id'] = record.kb_id
if hasattr(record, 'doc_id'):
log_obj['doc_id'] = record.doc_id
# 异常信息
if record.exc_info:
log_obj['exception'] = self.formatException(record.exc_info)
return json.dumps(log_obj)
# 配置日志
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
# 使用示例
logger.info(
"Document processed",
extra={
"user_id": 123,
"kb_id": 456,
"doc_id": 789,
}
)6.1.3 链路追踪
Jaeger / Zipkin 集成:
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
# 配置 Jaeger 导出器
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger",
agent_port=6831,
)
# 配置 Tracer
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# 仪表化 FastAPI
FastAPIInstrumentor.instrument_app(app)
# 仪表化 SQLAlchemy
SQLAlchemyInstrumentor().instrument(
engine=engine,
)
# 使用 Tracer
tracer = trace.get_tracer(__name__)
def process_document(doc_id: int):
with tracer.start_as_current_span("process_document") as span:
span.set_attribute("doc_id", doc_id)
# 获取文档
with tracer.start_as_current_span("fetch_document"):
doc = fetch_document(doc_id)
# 生成嵌入
with tracer.start_as_current_span("generate_embedding"):
embedding = generate_embedding(doc)
# 保存
with tracer.start_as_current_span("save_embedding"):
save_embedding(doc_id, embedding)6.2 告警策略
6.2.1 告警规则
alert_rules.yml:
groups:
- name: yuxi-know-alerts
interval: 30s
rules:
# API 响应时间告警
- alert: HighAPILatency
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "API 响应时间过长"
description: "P95 响应时间为 {{ $value }}s,超过 2s 阈值"
# API 错误率告警
- alert: HighAPIErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m]) > 0.01
for: 5m
labels:
severity: critical
annotations:
summary: "API 错误率过高"
description: "错误率为 {{ $value | humanizePercentage }},超过 1% 阈值"
# CPU 使用率告警
- alert: HighCPUUsage
expr: 100 - (avg(rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 80
for: 5m
labels:
severity: warning
annotations:
summary: "CPU 使用率过高"
description: "CPU 使用率为 {{ $value }}%,超过 80% 阈值"
# 内存使用率告警
- alert: HighMemoryUsage
expr: (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100 > 85
for: 5m
labels:
severity: warning
annotations:
summary: "内存使用率过高"
description: "内存使用率为 {{ $value }}%,超过 85% 阈值"
# 磁盘使用率告警
- alert: HighDiskUsage
expr: (1 - (node_filesystem_avail_bytes{fstype=~"ext4|xfs"} / node_filesystem_size_bytes)) * 100 > 80
for: 5m
labels:
severity: warning
annotations:
summary: "磁盘使用率过高"
description: "磁盘使用率为 {{ $value }}%,超过 80% 阈值"
# PostgreSQL 慢查询告警
- alert: HighPostgreSQLSlowQueries
expr: rate(pg_stat_statements_calls_total{datname="yuxiknow"}[5m]) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "PostgreSQL 慢查询过多"
description: "慢查询数量为 {{ $value }}/分钟"
# Redis 内存告警
- alert: HighRedisMemoryUsage
expr: (redis_memory_used_bytes / redis_memory_max_bytes) * 100 > 80
for: 5m
labels:
severity: warning
annotations:
summary: "Redis 内存使用率过高"
description: "Redis 内存使用率为 {{ $value }}%"6.2.2 告警通知
AlertManager 配置:
# alertmanager.yml
global:
resolve_timeout: 5m
slack_api_url: 'https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK'
route:
group_by: ['alertname', 'severity']
group_wait: 10s
group_interval: 10s
repeat_interval: 12h
receiver: 'default'
routes:
- match:
severity: critical
receiver: 'critical-alerts'
- match:
severity: warning
receiver: 'warning-alerts'
receivers:
- name: 'default'
slack_configs:
- channel: '#yuxi-know-alerts'
send_resolved: true
- name: 'critical-alerts'
slack_configs:
- channel: '#yuxi-know-critical'
send_resolved: true
pagerduty_configs:
- service_key: 'YOUR_PAGERDUTY_SERVICE_KEY'
- name: 'warning-alerts'
slack_configs:
- channel: '#yuxi-know-warnings'
send_resolved: true
email_configs:
- to: '[email protected]'
headers:
Subject: '[Yuxi-Know Warning] {{ .GroupLabels.alertname }}'6.3 自动化运维
6.3.1 自动扩缩容
Kubernetes HPA 配置:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: yuxi-know-backend-hpa
namespace: yuxi-know-production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: yuxi-know-backend
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 30
- type: Pods
value: 2
periodSeconds: 30
selectPolicy: Max6.3.2 自动备份
备份脚本:
#!/bin/bash
# backup.sh
BACKUP_DIR="/backup/yuxi-know"
DATE=$(date +%Y%m%d_%H%M%S)
# 创建备份目录
mkdir -p ${BACKUP_DIR}/${DATE}
# PostgreSQL 备份
echo "Backing up PostgreSQL..."
docker exec postgres-container pg_dump -U user yuxiknow > ${BACKUP_DIR}/${DATE}/postgres_backup.sql
gzip ${BACKUP_DIR}/${DATE}/postgres_backup.sql
# Redis 备份
echo "Backing up Redis..."
docker exec redis-container redis-cli --rdb /data/dump.rdb
docker cp redis-container:/data/dump.rdb ${BACKUP_DIR}/${DATE}/redis_backup.rdb
gzip ${BACKUP_DIR}/${DATE}/redis_backup.rdb
# Neo4j 备份
echo "Backing up Neo4j..."
docker exec neo4j-container neo4j-admin database dump yuxi-know --to-path=/backups
docker cp neo4j-container:/backups/yuxi-know.dump ${BACKUP_DIR}/${DATE}/neo4j_backup.dump
gzip ${BACKUP_DIR}/${DATE}/neo4j_backup.dump
# 上传到 S3
echo "Uploading to S3..."
aws s3 sync ${BACKUP_DIR}/${DATE} s3://yuxi-know-backups/${DATE}/
# 清理旧备份(保留 7 天)
find ${BACKUP_DIR} -type d -mtime +7 -exec rm -rf {} \;
echo "Backup completed: ${DATE}"Crontab 配置:
# 每天凌晨 2 点执行备份
0 2 * * * /usr/local/bin/backup.sh >> /var/log/backup.log 2>&16.3.3 故障自愈
Kubernetes Liveness Probe:
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3Readiness Probe:
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3健康检查端点:
from fastapi import FastAPI, HTTPException
from sqlalchemy import text
import redis
import pymilvus
app = FastAPI()
@app.get("/health")
def health_check():
return {"status": "healthy"}
@app.get("/ready")
def readiness_check():
# 检查数据库连接
try:
db.execute(text("SELECT 1"))
except Exception as e:
raise HTTPException(status_code=503, detail="Database unavailable")
# 检查 Redis 连接
try:
redis_client.ping()
except Exception as e:
raise HTTPException(status_code=503, detail="Redis unavailable")
# 检查 Milvus 连接
try:
milvus_client.list_collections()
except Exception as e:
raise HTTPException(status_code=503, detail="Milvus unavailable")
# 检查 Neo4j 连接
try:
neo4j_driver.verify_connectivity()
except Exception as e:
raise HTTPException(status_code=503, detail="Neo4j unavailable")
return {"status": "ready"}7. 数据管理
7.1 数据备份
7.1.1 备份策略
3-2-1 备份原则:
- 至少保留 3 份副本
- 使用 2 种不同存储介质
- 至少 1 份异地备份
备份类型:
| 备份类型 | 频率 | 保留周期 | 用途 |
|---|---|---|---|
| 全量备份 | 每日 | 7 天 | 完整恢复 |
| 增量备份 | 每小时 | 24 小时 | 快速恢复 |
| 差异备份 | 每 6 小时 | 3 天 | 中速恢复 |
| 日志备份 | 实时 | 30 天 | 点时间恢复 |
7.1.2 自动化备份脚本
PostgreSQL 备份:
#!/bin/bash
# postgres_backup.sh
DB_NAME="yuxiknow"
DB_USER="user"
DB_HOST="postgres"
BACKUP_DIR="/backup/postgres"
S3_BUCKET="s3://yuxi-know-backups/postgres"
DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="${BACKUP_DIR}/${DB_NAME}_${DATE}.sql.gz"
# 创建备份
docker exec postgres-container pg_dump -U ${DB_USER} ${DB_NAME} | gzip > ${BACKUP_FILE}
# 上传到 S3
aws s3 cp ${BACKUP_FILE} ${S3_BUCKET}/${DATE}/
# 清理本地旧备份(保留 3 天)
find ${BACKUP_DIR} -name "*.sql.gz" -mtime +3 -delete
# 清理 S3 旧备份(保留 7 天)
aws s3 ls ${S3_BUCKET}/ | awk '{print $2}' | while read dir; do
dir_date=$(echo $dir | tr -d '/')
if [[ $(date -d "$dir_date" +%s) -lt $(date -d "7 days ago" +%s) ]]; then
aws s3 rm ${S3_BUCKET}/${dir} --recursive
fi
done7.1.3 恢复测试
恢复脚本:
#!/bin/bash
# postgres_restore.sh
BACKUP_FILE=$1
DB_NAME="yuxiknow"
DB_USER="user"
DB_HOST="postgres"
# 下载备份
aws s3 cp ${BACKUP_FILE} /tmp/restore_backup.sql.gz
# 解压
gunzip /tmp/restore_backup.sql.gz
# 停止应用
docker-compose stop backend
# 恢复数据库
docker exec -i postgres-container psql -U ${DB_USER} ${DB_NAME} < /tmp/restore_backup.sql
# 启动应用
docker-compose start backend
# 验证
docker exec postgres-container psql -U ${DB_USER} ${DB_NAME} -c "SELECT COUNT(*) FROM documents;"7.2 数据迁移
7.2.1 从其他系统迁移到 Yuxi-Know
迁移流程:
1. 源系统数据导出
↓
2. 数据格式转换
↓
3. 导入到 Yuxi-Know
↓
4. 生成向量嵌入
↓
5. 构建知识图谱
↓
6. 数据验证迁移脚本示例:
import requests
import hashlib
import json
from typing import List, Dict
import time
class DataMigrator:
def __init__(self, target_api_url: str, api_key: str):
self.target_api_url = target_api_url
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
def migrate_document(self, source_doc: Dict) -> bool:
try:
# 转换数据格式
target_doc = {
"title": source_doc.get("title", ""),
"content": source_doc.get("content", ""),
"metadata": {
"source": "migration",
"source_id": source_doc.get("id"),
"original_url": source_doc.get("url"),
"author": source_doc.get("author"),
"created_at": source_doc.get("created_at"),
},
"kb_id": self.kb_id
}
# 上传文档
response = requests.post(
f"{self.target_api_url}/api/documents",
headers=self.headers,
json=target_doc
)
if response.status_code == 201:
doc_id = response.json().get("id")
print(f"Document migrated successfully: {doc_id}")
return True
else:
print(f"Failed to migrate document: {response.text}")
return False
except Exception as e:
print(f"Error migrating document: {str(e)}")
return False
def batch_migrate(self, source_docs: List[Dict], batch_size: int = 10):
success_count = 0
fail_count = 0
for i, doc in enumerate(source_docs, 1):
if self.migrate_document(doc):
success_count += 1
else:
fail_count += 1
# 每 batch_size 个文档暂停一次,避免请求过快
if i % batch_size == 0:
time.sleep(1)
print(f"\nMigration completed:")
print(f" Success: {success_count}")
print(f" Failed: {fail_count}")
# 使用示例
migrator = DataMigrator(
target_api_url="https://yuxi-know.example.com",
api_key="your-api-key"
)
# 从源系统获取文档
source_docs = fetch_documents_from_source_system()
# 执行批量迁移
migrator.batch_migrate(source_docs, batch_size=10)7.2.2 数据验证
验证检查清单:
- [ ] 文档总数一致
- [ ] 文档内容完整
- [ ] 元数据正确
- [ ] 向量嵌入生成成功
- [ ] 知识图谱节点/边数量正确
- [ ] 搜索功能正常
验证脚本:
class DataValidator:
def __init__(self, target_api_url: str, api_key: str):
self.target_api_url = target_api_url
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
def validate_document_count(self, expected_count: int) -> bool:
"""验证文档总数"""
response = requests.get(
f"{self.target_api_url}/api/documents/count",
headers=self.headers
)
actual_count = response.json().get("count", 0)
if actual_count == expected_count:
print(f"✓ Document count matches: {actual_count}")
return True
else:
print(f"✗ Document count mismatch: expected {expected_count}, got {actual_count}")
return False
def validate_document_content(self, doc_id: int, expected_content: str) -> bool:
"""验证文档内容"""
response = requests.get(
f"{self.target_api_url}/api/documents/{doc_id}",
headers=self.headers
)
actual_content = response.json().get("content", "")
if actual_content == expected_content:
print(f"✓ Document {doc_id} content matches")
return True
else:
print(f"✗ Document {doc_id} content mismatch")
return False
def validate_search(self, query: str, expected_results: int) -> bool:
"""验证搜索功能"""
response = requests.post(
f"{self.target_api_url}/api/search",
headers=self.headers,
json={"query": query, "limit": 10}
)
results = response.json().get("results", [])
if len(results) >= expected_results:
print(f"✓ Search for '{query}' returned {len(results)} results")
return True
else:
print(f"✗ Search for '{query}' returned {len(results)} results, expected {expected_results}")
return False7.3 数据生命周期管理
7.3.1 数据保留策略
数据保留周期:
| 数据类型 | 保留周期 | 归档策略 |
|---|---|---|
| 活跃文档 | 永久保留 | 在线存储 |
| 已删除文档 | 30 天 | 软删除后归档 |
| 历史版本 | 90 天 | 归档到冷存储 |
| 查询日志 | 180 天 | 归档到数据湖 |
| 审计日志 | 365 天 | 归档到合规存储 |
| 临时文件 | 24 小时 | 自动清理 |
7.3.2 数据清理脚本
清理过期数据:
import asyncio
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
async def clean_expired_data(db: Session):
"""清理过期数据"""
# 清理已删除超过 30 天的文档
cutoff_date = datetime.utcnow() - timedelta(days=30)
deleted_docs = db.query(Document).filter(
Document.deleted_at < cutoff_date
).all()
for doc in deleted_docs:
# 删除向量数据
delete_embedding(doc.id)
# 删除图谱数据
delete_graph_node(doc.id)
# 物理删除
db.delete(doc)
db.commit()
# 清理历史版本
old_versions = db.query(DocumentVersion).filter(
DocumentVersion.created_at < cutoff_date
).all()
for version in old_versions:
db.delete(version)
db.commit()
# 清理临时文件
import os
temp_dir = "/tmp/yuxi-know"
cutoff_time = time.time() - 86400 # 24 小时
for filename in os.listdir(temp_dir):
filepath = os.path.join(temp_dir, filename)
if os.path.isfile(filepath):
if os.path.getmtime(filepath) < cutoff_time:
os.remove(filepath)
print(f"Deleted temporary file: {filename}")8. 实施路线图
8.1 30 天快速部署计划
第 1 周:基础设施准备
| 任务 | 负责人 | 交付物 |
|---|---|---|
| 云服务采购与配置 | DevOps | 账号、VPC、安全组 |
| Kubernetes 集群搭建 | DevOps | K8s 集群、Ingress |
| 数据库集群部署 | DBA | PostgreSQL、Redis、Milvus、Neo4j |
| 监控系统搭建 | DevOps | Prometheus、Grafana、AlertManager |
| CI/CD 流水线配置 | DevOps | GitLab CI、镜像仓库 |
第 2 周:应用部署
| 任务 | 负责人 | 交付物 |
|---|---|---|
| Yuxi-Know 源码编译 | 开发 | Docker 镜像 |
| Helm Chart 开发 | DevOps | 部署模板 |
| 测试环境部署 | DevOps | 测试环境 |
| 基础功能测试 | QA | 测试报告 |
| 性能基准测试 | QA | 性能报告 |
第 3 周:安全加固
| 任务 | 负责人 | 交付物 |
|---|---|---|
| HTTPS 证书配置 | DevOps | SSL 证书 |
| 身份认证集成 | 开发 | SSO/MFA |
| 权限系统配置 | 开发 | RBAC 配置 |
| 安全审计 | 安全团队 | 安全报告 |
| 渗透测试 | 安全团队 | 渗透测试报告 |
第 4 周:生产上线
| 任务 | 负责人 | 交付物 |
|---|---|---|
| 生产环境部署 | DevOps | 生产环境 |
| 数据迁移 | DBA/开发 | 数据迁移报告 |
| 监控告警配置 | DevOps | 告警规则 |
| 用户培训 | 培训师 | 培训材料 |
| 上线验证 | 全员 | 上线检查清单 |
8.2 持续优化计划
月度优化(第 1-3 个月)
- 第 1 个月:收集使用反馈,优化核心功能
- 第 2 个月:性能调优,提升响应速度
- 第 3 个月:扩展功能,满足业务需求
季度优化(第 4-12 个月)
- Q2:构建领域知识图谱,提升问答准确率
- Q3:集成更多外部数据源,丰富知识库
- Q4:实施多租户架构,支持多业务线
9. 成本分析
9.1 云服务成本
9.1.1 中小规模(500 用户)
| 资源类型 | 规格 | 数量 | 月度成本 |
|---|---|---|---|
| ECS | 4C/8G | 3 台 | ¥3,600 |
| RDS PostgreSQL | 4C/8G | 主从 2 台 | ¥2,400 |
| Redis 企业版 | 2G | 3 主 3 从 | ¥1,200 |
| Milvus(自建) | 8C/16G | 3 台 | ¥2,400 |
| Neo4j(自建) | 4C/8G | 3 台 | ¥1,800 |
| OSS 存储 | 500G | - | ¥300 |
| SLB 负载均衡 | - | 1 个 | ¥300 |
| CDN 流量 | 1TB | - | ¥600 |
| 合计 | ¥12,600 |
9.1.2 大规模(2000 用户)
| 资源类型 | 规格 | 数量 | 月度成本 |
|---|---|---|---|
| ECS | 8C/16G | 6 台 | ¥14,400 |
| RDS PostgreSQL | 8C/16G | 主从 2 台 | ¥4,800 |
| Redis 企业版 | 4G | 3 主 3 从 | ¥2,400 |
| Milvus(自建) | 16C/32G | 5 台 | ¥12,000 |
| Neo4j(自建) | 8C/16G | 5 台 | ¥6,000 |
| OSS 存储 | 2TB | - | ¥1,200 |
| SLB 负载均衡 | - | 2 个 | ¥600 |
| CDN 流量 | 5TB | - | ¥3,000 |
| 合计 | ¥44,400 |
9.2 私有云成本
9.2.1 硬件采购成本
| 设备类型 | 规格 | 数量 | 单价 | 合计 |
|---|---|---|---|---|
| 控制节点 | 16C/32G/500G | 3 台 | ¥40,000 | ¥120,000 |
| 工作节点 | 32C/64G/1T | 9 台 | ¥60,000 | ¥540,000 |
| 网络设备 | 万兆交换机 | 4 台 | ¥20,000 | ¥80,000 |
| 存储设备 | 分布式存储 | 1 套 | ¥100,000 | ¥100,000 |
| 合计 | ¥840,000 |
9.2.2 运维成本(年度)
| 成本项 | 年度成本 |
|---|---|
| 电费 | ¥60,000 |
| 机房租赁 | ¥120,000 |
| 运维人员 | ¥240,000 |
| 软件授权 | ¥100,000 |
| 备份设备 | ¥50,000 |
| 合计 | ¥570,000 |
9.3 成本优化建议
9.3.1 云服务优化
- 预留实例:购买预留实例,节省 30%-50% 成本
- 自动扩缩容:根据业务高峰调整资源
- 冷数据归档:将不常访问的数据归档到廉价存储
- 混合部署:核心数据私有云,弹性计算公有云
9.3.2 资源优化
- 容器化:提高资源利用率
- 共享服务:多个应用共享数据库、缓存等资源
- Serverless:使用 Serverless 计算处理突发流量
- CDN 加速:减少源站带宽消耗
10. 风险管理
10.1 技术风险
| 风险项 | 可能性 | 影响 | 应对措施 |
|---|---|---|---|
| 单点故障 | 中 | 高 | 高可用架构,多实例部署 |
| 数据丢失 | 低 | 极高 | 定期备份,异地存储 |
| 性能瓶颈 | 高 | 中 | 水平扩展,缓存优化 |
| 安全漏洞 | 中 | 高 | 安全审计,渗透测试 |
| 第三方依赖 | 中 | 中 | 多供应商策略 |
10.2 业务风险
| 风险项 | 可能性 | 影响 | 应对措施 |
|---|---|---|---|
| 用户接受度低 | 中 | 高 | 充分调研,用户培训 |
| 数据质量差 | 高 | 中 | 数据治理,质量控制 |
| 合规风险 | 中 | 高 | 法务审核,合规设计 |
| 成本超支 | 中 | 中 | 成本监控,持续优化 |
10.3 应急预案
10.3.1 服务中断应急
恢复步骤:
- 确认故障范围
- 切换到备用实例/备用数据中心
- 通知相关方
- 定位并修复问题
- 验证恢复
- 复盘总结
RTO/RPO 目标:
- RTO(恢复时间目标):≤ 4 小时
- RPO(恢复点目标):≤ 15 分钟
10.3.2 数据泄露应急
响应步骤:
- 立即隔离受影响系统
- 评估泄露范围与影响
- 通知管理层与法务部门
- 必要时通知受影响用户
- 修复安全漏洞
- 加强安全措施
- 总结经验教训
11. 附录
11.1 检查清单
部署前检查
- [ ] 云服务账号已创建并配置
- [ ] 域名已解析到负载均衡器
- [ ] SSL 证书已申请并配置
- [ ] 数据库集群已部署并测试
- [ ] 监控系统已搭建并配置告警
- [ ] CI/CD 流水线已配置并测试
- [ ] 备份策略已制定并测试
- [ ] 安全策略已制定并实施
部署后检查
- [ ] 所有服务正常运行
- [ ] 健康检查端点正常响应
- [ ] 监控指标正常
- [ ] 日志正常输出
- [ ] 备份正常执行
- [ ] 告警正常触发
- [ ] 用户能正常访问
- [ ] 核心功能正常
11.2 常用命令
Kubernetes 常用命令
# 查看所有 Pod
kubectl get pods -n yuxi-know-production
# 查看服务日志
kubectl logs -f deployment/yuxi-know-backend -n yuxi-know-production
# 进入容器
kubectl exec -it deployment/yuxi-know-backend -n yuxi-know-production -- bash
# 扩缩容
kubectl scale deployment/yuxi-know-backend --replicas=5 -n yuxi-know-production
# 查看事件
kubectl get events -n yuxi-know-production --sort-by=.metadata.creationTimestamp
# 查看资源使用
kubectl top pods -n yuxi-know-productionDocker 常用命令
# 查看运行中的容器
docker ps
# 查看容器日志
docker logs -f yuxi-know-backend
# 进入容器
docker exec -it yuxi-know-backend bash
# 查看容器资源使用
docker stats yuxi-know-backend
# 清理未使用的资源
docker system prune -a --volumesPostgreSQL 常用命令
# 连接数据库
psql -h localhost -U user -d yuxiknow
# 查看表大小
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
# 查看慢查询
SELECT
query,
mean_exec_time,
calls,
total_exec_time
FROM pg_stat_statements
ORDER BY mean_exec_time DESC
LIMIT 10;
# 分析表
VACUUM ANALYZE documents;Redis 常用命令
# 连接 Redis
redis-cli -h localhost -p 6379
# 查看内存使用
INFO memory
# 查看慢查询
SLOWLOG GET 10
# 清空缓存
FLUSHDB11.3 故障排查手册
问题一:服务无法启动
症状:容器启动失败
排查步骤:
- 查看容器日志
- 检查环境变量配置
- 检查依赖服务连接
- 检查端口占用
- 检查磁盘空间
常见原因:
- 数据库连接失败
- 配置文件错误
- 端口被占用
- 磁盘空间不足
问题二:查询速度慢
症状:API 响应时间长
排查步骤:
- 查看数据库慢查询日志
- 检查索引是否生效
- 检查缓存命中率
- 检查网络延迟
- 检查资源使用率
优化措施:
- 添加索引
- 优化查询语句
- 增加缓存
- 水平扩展
- 升级硬件
问题三:内存不足
症状:OOMKilled,服务频繁重启
排查步骤:
- 查看内存使用情况
- 分析内存占用
- 检查内存泄漏
- 检查缓存配置
优化措施:
- 调整容器内存限制
- 优化代码内存使用
- 调整缓存大小
- 增加物理内存
11.4 参考资源
官方文档
社区资源
相关工具
版本信息
文档版本:v1.0 编制日期:2026 年 3 月 25 日 适用版本:Yuxi-Know v0.5.0 编制单位:企业部署技术团队
免责声明
本报告基于 Yuxi-Know v0.5.0 版本编制,内容仅供参考。实际部署中请根据具体业务需求和技术环境进行调整。本报告不对因使用本报告内容而产生的任何直接或间接损失承担责任。
结束