Skip to content

Yuxi-Know 企业级部署最佳实践报告

执行摘要

本报告基于 Yuxi-Know v0.5.0 版本,系统阐述企业级生产环境部署的完整实施方案,涵盖架构设计、基础设施规划、安全策略、性能优化、运维监控等关键维度。报告结合真实项目经验,提供可操作的部署模板与应急预案,帮助企业在 30 天内完成从 PoC 到生产环境的平滑过渡。

核心交付成果

  • 完整的企业级部署架构设计
  • 高可用与弹性扩展方案
  • 安全加固与合规配置指南
  • 性能调优与容量规划建议
  • 监控告警与运维自动化体系
  • 数据备份与灾难恢复预案

目录

  1. 项目背景与目标
  2. 系统架构设计
  3. 基础设施规划
  4. 安全与合规
  5. 性能优化
  6. 监控与运维
  7. 数据管理
  8. 实施路线图
  9. 成本分析
  10. 风险管理
  11. 附录

1. 项目背景与目标

1.1 Yuxi-Know 简介回顾

Yuxi-Know 是基于大语言模型的知识库与知识图谱智能体开发平台,核心特性包括:

  • 智能体开发:LangGraph v1 多智能体架构
  • 知识库(RAG):多格式文档支持,向量/全文/混合检索
  • 知识图谱:LightRAG 图谱构建与可视化,参与智能体推理
  • 平台工程:Vue + FastAPI 技术栈,Docker 与生产级部署

1.2 企业级部署目标

业务目标

目标维度关键指标目标值
可用性系统正常运行时间≥ 99.9%
响应性能P95 查询延迟≤ 2 秒
并发能力支持并发用户数≥ 1000
数据安全数据泄露事件0
合规性满足监管要求100%

技术目标

  • 支持水平扩展,业务增长无感知
  • 多租户隔离,资源按需分配
  • 故障自愈能力,减少人工干预
  • 全面可观测性,问题快速定位
  • 平滑升级,业务无中断

1.3 典型应用场景

场景一:企业知识管理系统

  • 文档数量:50,000+
  • 日查询量:100,000+
  • 用户数:500+
  • 核心需求:快速检索、智能问答、版本管理

场景二:智能客服系统

  • 并发对话:200+
  • 日查询量:500,000+
  • 响应时效:< 3 秒
  • 核心需求:实时性、准确性、多轮对话

场景三:研发知识库

  • 技术文档:100,000+ 页
  • 代码片段:1,000,000+
  • 用户数:200+
  • 核心需求:代码理解、精准匹配、跨项目关联

2. 系统架构设计

2.1 整体架构

┌─────────────────────────────────────────────────────────────┐
│                         负载均衡层                           │
│  (Nginx / HAProxy / Cloud Load Balancer)                    │
└─────────────────────────────────────────────────────────────┘

                              ├──────────┬──────────┬──────────┐
                              ▼          ▼          ▼          ▼
┌──────────────────┐  ┌──────────────────┐  ┌───────────────┐  ┌──────────────────┐
│   前端服务实例1  │  │   前端服务实例2  │  │  前端服务实例N │  │   静态资源CDN    │
│   (Vue + Nginx)  │  │   (Vue + Nginx)  │  │  (Vue + Nginx) │  │  (OSS/CloudFront)│
└──────────────────┘  └──────────────────┘  └───────────────┘  └──────────────────┘
         │                     │                     │
         └─────────────────────┴─────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                      应用服务层                             │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐    │
│  │  API 实例 1  │  │  API 实例 2  │  │  API 实例 N  │    │
│  │  (FastAPI)   │  │  (FastAPI)   │  │  (FastAPI)   │    │
│  └──────────────┘  └──────────────┘  └──────────────┘    │
└─────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                      消息队列层                             │
│         (Redis Cluster / RabbitMQ / Kafka)                  │
└─────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                      数据存储层                             │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐ │
│  │PostgreSQL│  │  Redis   │  │ Milvus   │  │ Neo4j    │ │
│  │ 主库集群 │  │  Cluster │  │  Cluster │  │  Cluster │ │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘ │
│  ┌──────────┐  ┌──────────┐                              │
│  │ 文件存储  │  │ 对象存储  │                              │
│  │(本地/MinIO)│  │ (S3/OSS)  │                              │
│  └──────────┘  └──────────┘                              │
└─────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                      外部服务层                             │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐ │
│  │  LLM API │  │Embed API │  │Rerank API│  │ OCR API  │ │
│  │(OpenAI等) │  │(本地/云端)│  │(本地/云端)│  │(本地/云端)│ │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘ │
└─────────────────────────────────────────────────────────────┘

2.2 核心组件说明

2.2.1 前端服务层

技术选型:Vue.js + Vite + Nginx

部署架构

  • 多实例部署,至少 2 个实例实现高可用
  • 使用 Nginx 反向代理,启用 HTTPS
  • 静态资源部署到 CDN 加速访问

配置要点

nginx
# 前端 Nginx 配置示例
server {
    listen 443 ssl http2;
    server_name yuxi-know.example.com;

    ssl_certificate /etc/ssl/certs/yuxi-know.crt;
    ssl_certificate_key /etc/ssl/private/yuxi-know.key;

    # 静态资源
    location /static/ {
        alias /app/dist/static/;
        expires 30d;
        add_header Cache-Control "public, immutable";
    }

    # API 代理
    location /api/ {
        proxy_pass http://backend:8000/;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # 超时配置
        proxy_connect_timeout 60s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;
    }

    # WebSocket 支持(如果需要)
    location /ws/ {
        proxy_pass http://backend:8000/ws/;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}

2.2.2 应用服务层

技术选型:FastAPI + Gunicorn + Uvicorn

部署架构

  • 容器化部署,使用 Docker Compose / Kubernetes
  • 多实例部署,至少 3 个实例
  • 自动扩缩容,基于 CPU/内存/请求量指标

Gunicorn 配置

python
# gunicorn.conf.py
import multiprocessing

bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
timeout = 120
keepalive = 5
preload_app = True

FastAPI 配置

python
# app/config.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware

app = FastAPI(
    title="Yuxi-Know API",
    description="企业级知识库与知识图谱平台",
    version="0.5.0",
    docs_url="/api/docs",
    redoc_url="/api/redoc"
)

# CORS 配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "https://yuxi-know.example.com",
        "https://app.yuxi-know.example.com"
    ],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 受信任主机配置
app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=[
        "yuxi-know.example.com",
        "*.yuxi-know.example.com"
    ]
)

2.2.3 数据存储层

PostgreSQL 集群

架构设计

主库(Master) ←─────> 从库1(Replica)
   ↓                           ↑
   └──────> 从库2(Replica) ←─┘

配置要点

  • 主从复制,读写分离
  • 连接池配置,避免连接泄漏
  • 定期 VACUUM 与索引优化

配置示例

sql
-- PostgreSQL 配置优化
-- postgresql.conf

shared_buffers = 4GB
effective_cache_size = 12GB
maintenance_work_mem = 1GB
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 100
random_page_cost = 1.1
effective_io_concurrency = 200
work_mem = 13107kB
min_wal_size = 2GB
max_wal_size = 8GB

-- 连接池配置(使用 PgBouncer)
max_client_conn = 1000
default_pool_size = 100
reserve_pool_size = 10
reserve_pool_timeout = 5
max_db_connections = 50
server_idle_timeout = 600

Redis 集群

架构设计

Redis Cluster
├── Master 1 (Slot 0-5460)
│   ├── Replica 1
│   └── Replica 2
├── Master 2 (Slot 5461-10922)
│   ├── Replica 1
│   └── Replica 2
└── Master 3 (Slot 10923-16383)
    ├── Replica 1
    └── Replica 2

配置要点

  • 集群模式,数据分片
  • 主从复制,故障自动转移
  • 持久化配置(AOF + RDB)

配置示例

conf
# redis.conf
port 6379
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000

appendonly yes
appendfsync everysec
save 900 1
save 300 10
save 60 10000

maxmemory 2gb
maxmemory-policy allkeys-lru

Milvus 集群

架构设计

Milvus Cluster
├── Root Coordinator (1)
├── Data Node (N)
├── Query Node (N)
├── Index Node (N)
├── Proxy (N)
└── Pulsar / Kafka (消息队列)

配置要点

  • 分片策略:按业务域或数据量
  • 索引类型:IVF_FLAT / HNSW
  • 参数调优:nlist, nprobe

配置示例

python
# Milvus 连接与索引配置
from pymilvus import Collection, CollectionSchema, FieldSchema, DataType

# 定义字段
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="metadata", dtype=DataType.JSON)
]

schema = CollectionSchema(fields, description="文档向量集合")

# 创建集合
collection = Collection(name="documents", schema=schema)

# 创建索引
index_params = {
    "index_type": "HNSW",
    "metric_type": "COSINE",
    "params": {
        "M": 16,
        "efConstruction": 256
    }
}

collection.create_index(field_name="embedding", index_params=index_params)

Neo4j 集群

架构设计

Neo4j Fabric
├── Fabric Database 1
│   ├── Primary Server 1
│   ├── Primary Server 2
│   └── Read Replica 1
├── Fabric Database 2
│   ├── Primary Server 1
│   ├── Primary Server 2
│   └── Read Replica 1

配置要点

  • Fabric 多数据库支持,按业务域隔离
  • 读写分离,查询性能优化
  • Cypher 查询优化,建立索引

配置示例

conf
# neo4j.conf

# 内存配置
dbms.memory.heap.initial_size=2g
dbms.memory.heap.max_size=4g
dbms.memory.pagecache.size=8g

# 连接配置
dbms.connector.bolt.thread_pool_max_size=400
dbms.connector.bolt.thread_pool_min_size=5

# 事务配置
dbms.transaction.timeout=60s
dbms.transaction.concurrent.maximum=1000

# 调试配置(生产环境关闭)
dbms.logs.debug.level=INFO

2.3 网络架构

                    ┌─────────────────┐
                    │   Internet/ISP  │
                    └────────┬────────┘

                    ┌────────▼────────┐
                    │   防火墙/WAF    │
                    └────────┬────────┘

                    ┌────────▼────────┐
                    │   负载均衡器     │
                    │   (SLB/ELB)     │
                    └────────┬────────┘

        ┌────────────────────┼────────────────────┐
        │                    │                    │
┌───────▼──────┐    ┌───────▼──────┐    ┌───────▼──────┐
│   DMZ 区域    │    │   应用区域     │    │   数据区域     │
│   前端服务     │    │   API 服务     │    │   数据库集群   │
│   Nginx       │    │   FastAPI     │    │   PostgreSQL  │
│   静态资源     │    │   Redis       │    │   Milvus      │
└──────────────┘    └──────────────┘    │   Neo4j       │
                                       └──────────────┘

3. 基础设施规划

3.1 云服务商选择

3.1.1 公有云部署

推荐方案:阿里云 / 腾讯云 / AWS

资源类型阿里云腾讯云AWS
计算ECSCVMEC2
数据库RDS PostgreSQLTencentDB for PostgreSQLRDS PostgreSQL
RedisRedis 企业版TencentDB for RedisElastiCache Redis
向量数据库自建 Milvus自建 MilvusOpenSearch Vector
对象存储OSSCOSS3
负载均衡SLBCLBELB/ALB
容器服务ACK / ACK ServerlessTKE / TKE ServerlessEKS / Fargate

成本估算(月度,中小规模):

  • 计算资源:¥3,000 - ¥8,000
  • 数据库服务:¥2,000 - ¥5,000
  • 存储服务:¥500 - ¥1,500
  • 网络流量:¥500 - ¥2,000
  • 合计:¥6,000 - ¥16,500

3.1.2 私有云部署

推荐方案:OpenShift / Rancher

硬件需求

  • 控制节点:3 台,配置 16C/32G/500G
  • 工作节点:6-9 台,配置 32C/64G/1T
  • 存储:分布式存储(Ceph/GlusterFS),容量 ≥ 10T
  • 网络:万兆网络,冗余设计

成本估算

  • 硬件采购:¥500,000 - ¥800,000
  • 软件授权:¥100,000 - ¥200,000(如有)
  • 运维成本:¥100,000 - ¥200,000/年
  • 首年合计:¥700,000 - ¥1,200,000

3.1.3 混合云部署

适用场景:数据敏感度高,同时需要弹性扩展

架构设计

  • 核心数据与关键服务部署在私有云
  • 前端服务与非敏感数据部署在公有云
  • 通过 VPN / 专线连接

3.2 容器化部署方案

3.2.1 Docker Compose 部署

适用场景:中小规模部署,快速验证

docker-compose.yml 模板

yaml
version: '3.8'

services:
  # 前端服务
  frontend:
    image: ghcr.io/xerrors/yuxi-know-frontend:0.5.0
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
      - ./ssl:/etc/nginx/ssl:ro
    depends_on:
      - backend
    restart: unless-stopped
    networks:
      - yuxi-know-network

  # 后端 API 服务
  backend:
    image: ghcr.io/xerrors/yuxi-know-backend:0.5.0
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:pass@postgres:5432/yuxiknow
      - REDIS_URL=redis://redis:6379/0
      - MILVUS_HOST=milvus
      - MILVUS_PORT=19530
      - NEO4J_URI=bolt://neo4j:7687
      - NEO4J_USER=neo4j
      - NEO4J_PASSWORD=neo4j_password
    volumes:
      - ./uploads:/app/uploads
      - ./logs:/app/logs
    depends_on:
      - postgres
      - redis
      - milvus
      - neo4j
    restart: unless-stopped
    networks:
      - yuxi-know-network

  # PostgreSQL 数据库
  postgres:
    image: postgres:15-alpine
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DB=yuxiknow
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
    volumes:
      - postgres-data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    restart: unless-stopped
    networks:
      - yuxi-know-network

  # Redis 缓存
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes --requirepass redis_password
    volumes:
      - redis-data:/data
    restart: unless-stopped
    networks:
      - yuxi-know-network

  # Milvus 向量数据库
  milvus:
    image: milvusdb/milvus:v2.3.0
    ports:
      - "19530:19530"
      - "9091:9091"
    environment:
      - ETCD_ENDPOINTS=etcd:2379
      - MINIO_ADDRESS=minio:9000
    volumes:
      - milvus-data:/var/lib/milvus
    depends_on:
      - etcd
      - minio
    restart: unless-stopped
    networks:
      - yuxi-know-network

  # Neo4j 图数据库
  neo4j:
    image: neo4j:5.12.0-community
    ports:
      - "7474:7474"
      - "7687:7687"
    environment:
      - NEO4J_AUTH=neo4j/neo4j_password
      - NEO4J_dbms_memory_heap_max__size=2G
      - NEO4J_dbms_memory_pagecache_size=2G
    volumes:
      - neo4j-data:/data
      - neo4j-logs:/logs
    restart: unless-stopped
    networks:
      - yuxi-know-network

  # Milvus 依赖服务
  etcd:
    image: quay.io/coreos/etcd:v3.5.5
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
    volumes:
      - etcd-data:/etcd
    networks:
      - yuxi-know-network

  minio:
    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
    environment:
      - MINIO_ACCESS_KEY=minioadmin
      - MINIO_SECRET_KEY=minioadmin
    command: server /data --console-address ":9001"
    ports:
      - "9000:9000"
      - "9001:9001"
    volumes:
      - minio-data:/data
    networks:
      - yuxi-know-network

volumes:
  postgres-data:
  redis-data:
  milvus-data:
  neo4j-data:
  neo4j-logs:
  etcd-data:
  minio-data:

networks:
  yuxi-know-network:
    driver: bridge

3.2.2 Kubernetes 部署

适用场景:大规模部署,自动扩缩容

部署架构

Kubernetes Cluster
├── Namespace: yuxi-know-production
│   ├── Deployment: frontend
│   ├── Deployment: backend
│   ├── StatefulSet: postgres
│   ├── StatefulSet: redis
│   ├── StatefulSet: milvus
│   └── StatefulSet: neo4j
├── ConfigMaps & Secrets
├── Services (ClusterIP / LoadBalancer)
├── Ingress Controller
└── HPA (Horizontal Pod Autoscaler)

关键配置文件

backend-deployment.yaml

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: yuxi-know-backend
  namespace: yuxi-know-production
spec:
  replicas: 3
  selector:
    matchLabels:
      app: yuxi-know-backend
  template:
    metadata:
      labels:
        app: yuxi-know-backend
    spec:
      containers:
      - name: backend
        image: ghcr.io/xerrors/yuxi-know-backend:0.5.0
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: yuxi-know-secrets
              key: database-url
        - name: REDIS_URL
          valueFrom:
            secretKeyRef:
              name: yuxi-know-secrets
              key: redis-url
        - name: MILVUS_HOST
          value: "milvus-service"
        - name: MILVUS_PORT
          value: "19530"
        - name: NEO4J_URI
          value: "bolt://neo4j-service:7687"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 5
        volumeMounts:
        - name: uploads
          mountPath: /app/uploads
        - name: logs
          mountPath: /app/logs
      volumes:
      - name: uploads
        persistentVolumeClaim:
          claimName: uploads-pvc
      - name: logs
        persistentVolumeClaim:
          claimName: logs-pvc
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: yuxi-know-backend-hpa
  namespace: yuxi-know-production
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: yuxi-know-backend
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

3.3 CI/CD 流程

3.3.1 流水线设计

代码提交 (Git)

触发构建 (GitLab CI / GitHub Actions)

单元测试 / 集成测试

构建 Docker 镜像

推送镜像到镜像仓库

部署到测试环境

自动化测试 / 人工验证

部署到生产环境(蓝绿发布)

3.3.2 GitLab CI 配置示例

.gitlab-ci.yml

yaml
stages:
  - test
  - build
  - deploy-staging
  - deploy-production

variables:
  DOCKER_DRIVER: overlay2
  DOCKER_TLS_CERTDIR: "/certs"
  REGISTRY_URL: registry.example.com
  BACKEND_IMAGE: ${REGISTRY_URL}/yuxi-know-backend
  FRONTEND_IMAGE: ${REGISTRY_URL}/yuxi-know-frontend

# 单元测试
unit-test:
  stage: test
  image: python:3.11-slim
  before_script:
    - pip install -r requirements.txt
  script:
    - pytest tests/unit/ -v --cov
  coverage: '/TOTAL.*\s+(\d+%)$/'
  artifacts:
    reports:
      coverage_report:
        coverage_format: cobertura
        path: coverage.xml
  only:
    - merge_requests
    - main

# 集成测试
integration-test:
  stage: test
  image: docker:24.0
  services:
    - docker:24.0-dind
  before_script:
    - docker compose -f docker-compose.test.yml up -d
    - sleep 30
  script:
    - docker compose -f docker-compose.test.yml exec backend pytest tests/integration/ -v
  after_script:
    - docker compose -f docker-compose.test.yml down
  only:
    - merge_requests
    - main

# 构建后端镜像
build-backend:
  stage: build
  image: docker:24.0
  services:
    - docker:24.0-dind
  before_script:
    - echo $CI_REGISTRY_PASSWORD | docker login -u $CI_REGISTRY_USER --password-stdin $CI_REGISTRY
  script:
    - docker build -t ${BACKEND_IMAGE}:${CI_COMMIT_SHORT_SHA} -t ${BACKEND_IMAGE}:latest ./backend
    - docker push ${BACKEND_IMAGE}:${CI_COMMIT_SHORT_SHA}
    - docker push ${BACKEND_IMAGE}:latest
  only:
    - main

# 构建前端镜像
build-frontend:
  stage: build
  image: node:20-alpine
  script:
    - cd frontend
    - npm ci
    - npm run build
    - docker build -t ${FRONTEND_IMAGE}:${CI_COMMIT_SHORT_SHA} -t ${FRONTEND_IMAGE}:latest .
    - echo $CI_REGISTRY_PASSWORD | docker login -u $CI_REGISTRY_USER --password-stdin $CI_REGISTRY
    - docker push ${FRONTEND_IMAGE}:${CI_COMMIT_SHORT_SHA}
    - docker push ${FRONTEND_IMAGE}:latest
  only:
    - main

# 部署到测试环境
deploy-staging:
  stage: deploy-staging
  image: alpine/helm:3.13.0
  environment:
    name: staging
    url: https://staging.yuxi-know.example.com
  before_script:
    - apk add --no-cache curl
    - curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
    - chmod +x kubectl
    - mkdir -p ~/.kube
    - echo $KUBE_CONFIG_STAGING | base64 -d > ~/.kube/config
  script:
    - helm upgrade --install yuxi-know ./helm-chart \
      --namespace staging \
      --set image.tag=${CI_COMMIT_SHORT_SHA} \
      --set environment=staging \
      --wait --timeout 10m
  only:
    - main

# 部署到生产环境
deploy-production:
  stage: deploy-production
  image: alpine/helm:3.13.0
  environment:
    name: production
    url: https://yuxi-know.example.com
  before_script:
    - apk add --no-cache curl
    - curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
    - chmod +x kubectl
    - mkdir -p ~/.kube
    - echo $KUBE_CONFIG_PRODUCTION | base64 -d > ~/.kube/config
  script:
    - helm upgrade --install yuxi-know ./helm-chart \
      --namespace production \
      --set image.tag=${CI_COMMIT_SHORT_SHA} \
      --set environment=production \
      --wait --timeout 15m
  when: manual
  only:
    - main

4. 安全与合规

4.1 身份认证与授权

4.1.1 认证机制

多因素认证(MFA)

  • 支持 TOTP(基于时间的一次性密码)
  • 支持 WebAuthn / FIDO2 硬件密钥
  • 支持 SMS 验证码(备用方案)

单点登录(SSO)集成

  • SAML 2.0 协议
  • OAuth 2.0 / OpenID Connect
  • 支持 LDAP / Active Directory 集成

配置示例(FastAPI)

python
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from fastapi.security.api_key import APIKeyHeader
from jose import JWTError, jwt
import hashlib
import hmac

# JWT 配置
SECRET_KEY = "your-secret-key-here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login")
api_key_header = APIKeyHeader(name="X-API-Key")

def verify_token(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid authentication credentials",
                headers={"WWW-Authenticate": "Bearer"},
            )
        return username
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

def verify_api_key(api_key: str = Depends(api_key_header)):
    # 验证 API Key
    if not hmac.compare_digest(api_key, "your-secure-api-key"):
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Invalid API Key"
        )
    return api_key

# 路由保护
@app.get("/api/documents/")
def read_documents(
    username: str = Depends(verify_token),
    api_key: str = Depends(verify_api_key)
):
    return {"message": f"Hello {username}"}

4.1.2 权限控制

基于角色的访问控制(RBAC)

角色层级
├── 超级管理员
│   ├── 用户管理
│   ├── 系统配置
│   └── 完全访问权限
├── 管理员
│   ├── 知识库管理
│   ├── 智能体管理
│   └── 部分访问权限
├── 编辑者
│   ├── 文档上传
│   ├── 内容编辑
│   └── 知识库修改权限
├── 查看者
│   ├── 文档检索
│   ├── 问答查询
│   └── 只读权限
└── 匿名用户
    └── 公开知识库访问

权限配置示例

python
from enum import Enum
from typing import List

class Role(str, Enum):
    SUPER_ADMIN = "super_admin"
    ADMIN = "admin"
    EDITOR = "editor"
    VIEWER = "viewer"
    ANONYMOUS = "anonymous"

class Permission(str, Enum):
    # 用户管理
    USER_CREATE = "user:create"
    USER_READ = "user:read"
    USER_UPDATE = "user:update"
    USER_DELETE = "user:delete"

    # 知识库管理
    KB_CREATE = "kb:create"
    KB_READ = "kb:read"
    KB_UPDATE = "kb:update"
    KB_DELETE = "kb:delete"

    # 文档管理
    DOC_UPLOAD = "doc:upload"
    DOC_READ = "doc:read"
    DOC_UPDATE = "doc:update"
    DOC_DELETE = "doc:delete"

    # 智能体管理
    AGENT_CREATE = "agent:create"
    AGENT_READ = "agent:read"
    AGENT_UPDATE = "agent:update"
    AGENT_DELETE = "agent:delete"

# 角色权限映射
ROLE_PERMISSIONS: dict[Role, List[Permission]] = {
    Role.SUPER_ADMIN: [p for p in Permission],  # 所有权限
    Role.ADMIN: [
        Permission.USER_READ,
        Permission.KB_CREATE, Permission.KB_READ, Permission.KB_UPDATE, Permission.KB_DELETE,
        Permission.DOC_UPLOAD, Permission.DOC_READ, Permission.DOC_UPDATE, Permission.DOC_DELETE,
        Permission.AGENT_CREATE, Permission.AGENT_READ, Permission.AGENT_UPDATE, Permission.AGENT_DELETE,
    ],
    Role.EDITOR: [
        Permission.KB_READ, Permission.KB_UPDATE,
        Permission.DOC_UPLOAD, Permission.DOC_READ, Permission.DOC_UPDATE,
    ],
    Role.VIEWER: [
        Permission.KB_READ,
        Permission.DOC_READ,
        Permission.AGENT_READ,
    ],
    Role.ANONYMOUS: [
        Permission.KB_READ,
        Permission.DOC_READ,
    ],
}

def has_permission(role: Role, permission: Permission) -> bool:
    return permission in ROLE_PERMISSIONS.get(role, [])

4.2 数据加密

4.2.1 传输加密

TLS/SSL 配置

  • 强制使用 HTTPS(TLS 1.2+)
  • 禁用弱加密套件
  • 启用 HSTS(HTTP Strict Transport Security)

Nginx SSL 配置

nginx
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers 'ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384';
ssl_prefer_server_ciphers off;
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
ssl_session_tickets off;

# HSTS
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always;

4.2.2 存储加密

数据库加密

  • PostgreSQL:使用 pgcrypto 扩展加密敏感字段
  • Milvus:启用 at-rest encryption
  • Neo4j:启用数据库加密

PostgreSQL 加密示例

sql
-- 启用 pgcrypto 扩展
CREATE EXTENSION IF NOT EXISTS pgcrypto;

-- 加密函数
CREATE OR REPLACE FUNCTION encrypt_data(text, text)
RETURNS bytea AS $$
    SELECT pgp_sym_encrypt($1, $2)
$$ LANGUAGE sql SECURITY DEFINER;

-- 解密函数
CREATE OR REPLACE FUNCTION decrypt_data(bytea, text)
RETURNS text AS $$
    SELECT pgp_sym_decrypt($1, $2)
$$ LANGUAGE sql SECURITY DEFINER;

-- 使用示例
INSERT INTO documents (title, content, encrypted_content)
VALUES (
    '敏感文档',
    '原始内容',
    encrypt_data('敏感信息', 'encryption-key')
);

-- 查询时解密
SELECT
    title,
    decrypt_data(encrypted_content, 'encryption-key') AS content
FROM documents;

文件存储加密

  • 使用加密文件系统(如 LUKS)
  • 或在应用层加密后存储

应用层加密示例(Python)

python
from cryptography.fernet import Fernet
import base64
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import os

# 生成密钥
def generate_key(password: str, salt: bytes = None) -> bytes:
    if salt is None:
        salt = os.urandom(16)
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=salt,
        iterations=100000,
    )
    key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
    return key

# 加密文件
def encrypt_file(file_path: str, key: bytes) -> bytes:
    fernet = Fernet(key)
    with open(file_path, 'rb') as file:
        original_data = file.read()
    encrypted_data = fernet.encrypt(original_data)
    return encrypted_data

# 解密文件
def decrypt_file(encrypted_data: bytes, key: bytes) -> bytes:
    fernet = Fernet(key)
    decrypted_data = fernet.decrypt(encrypted_data)
    return decrypted_data

4.3 网络安全

4.3.1 防火墙规则

默认拒绝原则

  • 默认拒绝所有入站连接
  • 仅开放必要端口(HTTPS 443, SSH 22)
  • 使用 IP 白名单限制管理端口访问

防火墙规则示例

bash
# 清空现有规则
iptables -F
iptables -X

# 默认策略
iptables -P INPUT DROP
iptables -P FORWARD DROP
iptables -P OUTPUT ACCEPT

# 允许本地回环
iptables -A INPUT -i lo -j ACCEPT
iptables -A OUTPUT -o lo -j ACCEPT

# 允许已建立连接
iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT

# 允许 SSH(限制 IP)
iptables -A INPUT -p tcp -s 10.0.0.0/8 --dport 22 -j ACCEPT

# 允许 HTTPS
iptables -A INPUT -p tcp --dport 443 -j ACCEPT

# 允许 HTTP(重定向到 HTTPS)
iptables -A INPUT -p tcp --dport 80 -j ACCEPT

# 保存规则
iptables-save > /etc/iptables/rules.v4

4.3.2 Web 应用防火墙(WAF)

推荐方案

  • ModSecurity + Nginx
  • Cloudflare WAF
  • AWS WAF

ModSecurity 配置示例

nginx
# nginx.conf
modsecurity on;
modsecurity_rules_file /etc/nginx/modsecurity.conf;

# /etc/nginx/modsecurity.conf
SecRuleEngine On
SecRequestBodyAccess On
SecResponseBodyAccess Off
SecDataDir /tmp/
SecTmpDir /tmp/
SecAuditLog /var/log/nginx/modsec_audit.log

# 基础规则
SecRule REQUEST_METHOD "^(TRACE|TRACK)$" \
    "id:1001,phase:1,deny,status:403,msg:'TRACE/TRACK not allowed'"

SecRule ARGS "@detectSQLi" \
    "id:1002,phase:2,deny,status:403,msg:'SQL Injection detected'"

SecRule ARGS "@detectXSS" \
    "id:1003,phase:2,deny,status:403,msg:'XSS detected'"

4.4 日志审计

4.4.1 日志分类

日志类型存储位置保留周期用途
访问日志Nginx / 应用日志30 天访问分析、异常检测
审计日志独立审计库180 天合规审计、追溯
错误日志应用日志90 天问题排查、性能优化
安全日志SIEM 系统365 天安全事件响应

4.4.2 审计日志设计

审计事件类型

  • 用户登录/登出
  • 权限变更
  • 数据访问
  • 系统配置变更
  • 文档上传/删除
  • 智能体创建/修改

审计日志表结构

sql
CREATE TABLE audit_logs (
    id BIGSERIAL PRIMARY KEY,
    event_type VARCHAR(50) NOT NULL,
    user_id BIGINT,
    username VARCHAR(100),
    ip_address INET,
    user_agent TEXT,
    request_method VARCHAR(10),
    request_path TEXT,
    request_params JSONB,
    response_status INTEGER,
    error_message TEXT,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    INDEX idx_event_type (event_type),
    INDEX idx_user_id (user_id),
    INDEX idx_created_at (created_at)
);

-- 创建审计函数
CREATE OR REPLACE FUNCTION log_audit_event()
RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO audit_logs (
        event_type,
        user_id,
        username,
        ip_address,
        user_agent,
        request_method,
        request_path,
        request_params,
        response_status,
        created_at
    ) VALUES (
        TG_ARGV[0],
        current_user_id(),
        current_username(),
        client_ip(),
        user_agent(),
        request_method(),
        request_path(),
        request_params(),
        response_status(),
        NOW()
    );
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

4.5 合规性要求

4.5.1 GDPR 合规

数据主体权利

  • 访问权:用户可请求访问其个人数据
  • 更正权:用户可要求更正不准确的个人数据
  • 删除权:用户可要求删除其个人数据(被遗忘权)
  • 数据可携带权:用户可请求导出其数据

实现示例

python
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session

router = APIRouter()

@router.get("/api/user/data-export")
def export_user_data(
    username: str = Depends(verify_token),
    db: Session = Depends(get_db)
):
    # 查询用户所有数据
    user_data = {
        "profile": get_user_profile(username, db),
        "documents": get_user_documents(username, db),
        "queries": get_user_queries(username, db),
        "settings": get_user_settings(username, db),
    }

    # 生成 JSON 导出文件
    import json
    from io import BytesIO

    buffer = BytesIO()
    json.dump(user_data, buffer, ensure_ascii=False, indent=2)
    buffer.seek(0)

    from fastapi.responses import Response
    return Response(
        content=buffer.getvalue(),
        media_type="application/json",
        headers={
            "Content-Disposition": f"attachment; filename=user_data_{username}.json"
        }
    )

@router.delete("/api/user/data-delete")
def delete_user_data(
    username: str = Depends(verify_token),
    db: Session = Depends(get_db)
):
    # 删除用户所有数据
    delete_user_profile(username, db)
    delete_user_documents(username, db)
    delete_user_queries(username, db)
    delete_user_settings(username, db)

    # 记录删除操作
    log_audit_event(
        event_type="data_deletion",
        username=username
    )

    return {"message": "User data deleted successfully"}

4.5.2 等保 2.0 合规

基本要求

  • 身份鉴别:双因素认证
  • 访问控制:最小权限原则
  • 安全审计:完整的审计日志
  • 数据完整性:数据校验与备份
  • 数据备份:定期备份与恢复测试

等保测评项对应

等保要求实施措施
身份鉴别JWT + MFA + 密码复杂度策略
访问控制RBAC + IP 白名单 + API Key
安全审计审计日志 + SIEM 集成
入侵防范WAF + IPS + 异常检测
数据完整性数据校验 + 备份验证
数据备份每日备份 + 异地备份
个人信息保护数据加密 + 访问控制 + 审计

5. 性能优化

5.1 数据库优化

5.1.1 PostgreSQL 优化

索引策略

sql
-- 文档表索引
CREATE INDEX idx_documents_kb_id ON documents(kb_id);
CREATE INDEX idx_documents_created_at ON documents(created_at DESC);
CREATE INDEX idx_documents_embedding ON documents USING ivfflat(embedding vector_cosine_ops) WITH (lists = 100);

-- 查询表索引
CREATE INDEX idx_queries_user_id ON queries(user_id);
CREATE INDEX idx_queries_created_at ON queries(created_at DESC);
CREATE INDEX idx_queries_kb_id ON queries(kb_id);

-- 复合索引
CREATE INDEX idx_documents_kb_status ON documents(kb_id, status, created_at DESC);

查询优化

sql
-- 使用 EXPLAIN ANALYZE 分析查询
EXPLAIN ANALYZE
SELECT d.*, e.similarity
FROM documents d
JOIN (
    SELECT id, 1 - (embedding <=> '[0.1,0.2,...]') AS similarity
    FROM documents
    WHERE kb_id = 123
    ORDER BY embedding <=> '[0.1,0.2,...]'
    LIMIT 100
) e ON d.id = e.id
WHERE d.status = 'active';

连接池优化

python
# 使用 SQLAlchemy 连接池
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    DATABASE_URL,
    poolclass=QueuePool,
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
    pool_recycle=3600,
    echo=False
)

5.1.2 Redis 优化

内存优化

python
# 使用压缩列表优化内存
# redis.conf
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64

# Python Redis 客户端配置
import redis

redis_client = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    password='password',
    socket_keepalive=True,
    socket_connect_timeout=5,
    socket_timeout=5,
    retry_on_timeout=True,
    health_check_interval=30
)

# 使用 Pipeline 批量操作
def cache_documents_batch(documents):
    pipe = redis_client.pipeline()
    for doc in documents:
        pipe.setex(
            f"doc:{doc['id']}",
            3600,  # 1 小时过期
            json.dumps(doc)
        )
    pipe.execute()

5.1.3 Milvus 优化

索引类型选择

索引类型特点适用场景
IVF_FLAT精度高,查询速度中等小规模数据(< 100 万)
IVF_SQ8精度中等,查询速度快中等规模数据(100 万 - 1000 万)
HNSW精度高,查询速度快大规模数据(> 1000 万)
ANNOY构建快,内存占用低离线场景

Milvus 配置优化

python
from pymilvus import Collection

collection = Collection("documents")

# HNSW 索引配置(推荐用于生产环境)
index_params = {
    "index_type": "HNSW",
    "metric_type": "COSINE",
    "params": {
        "M": 16,  # 每个节点的最大连接数
        "efConstruction": 256  # 构建索引时的搜索范围
    }
}

collection.create_index(
    field_name="embedding",
    index_params=index_params
)

# 搜索参数配置
search_params = {
    "metric_type": "COSINE",
    "params": {
        "ef": 128  # 搜索时的范围,越大精度越高但速度越慢
    }
}

results = collection.search(
    data=[query_embedding],
    anns_field="embedding",
    param=search_params,
    limit=10,
    expr=f"kb_id == {kb_id} AND status == 'active'",
    consistency_level="Strong"
)

5.1.4 Neo4j 优化

索引与约束

cypher
// 创建唯一约束(自动创建索引)
CREATE CONSTRAINT FOR (d:Document) REQUIRE d.id IS UNIQUE;
CREATE CONSTRAINT FOR (u:User) REQUIRE u.id IS UNIQUE;

// 创建索引
CREATE INDEX FOR (d:Document) ON (d.kb_id);
CREATE INDEX FOR (d:Document) ON (d.created_at);

// 全文索引
CREATE FULLTEXT INDEX document_fulltext FOR (d:Document) ON EACH [d.title, d.content];

查询优化

cypher
// 使用 PROFILE 分析查询
PROFILE
MATCH (d:Document {kb_id: $kb_id})
WHERE d.status = 'active'
RETURN d
LIMIT 100;

// 优化后的查询
PROFILE
MATCH (d:Document)
WHERE d.kb_id = $kb_id
  AND d.status = 'active'
RETURN d
LIMIT 100;

// 使用索引
PROFILE
MATCH (d:Document)
USING INDEX d:Document(kb_id)
WHERE d.kb_id = $kb_id
  AND d.status = 'active'
RETURN d
LIMIT 100;

// 使用参数化查询
MATCH (d:Document {id: $doc_id})
RETURN d

5.2 应用层优化

5.2.1 缓存策略

多级缓存架构

┌─────────────┐
│   应用层     │
│  (Redis)    │
└─────────────┘

       ├─ L1: 本地缓存 (LRU)
       ├─ L2: Redis 缓存
       └─ L3: CDN 缓存 (静态资源)

缓存实现

python
from functools import lru_cache
from typing import Optional
import hashlib
import json
import redis

# L1: 本地缓存
@lru_cache(maxsize=1000)
def get_document_cached_local(doc_id: int) -> Optional[dict]:
    return get_document_from_db(doc_id)

# L2: Redis 缓存
class CacheManager:
    def __init__(self, redis_client):
        self.redis = redis_client

    def get_document(self, doc_id: int) -> Optional[dict]:
        cache_key = f"doc:{doc_id}"
        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)
        return None

    def set_document(self, doc_id: int, doc: dict, ttl: int = 3600):
        cache_key = f"doc:{doc_id}"
        self.redis.setex(cache_key, ttl, json.dumps(doc))

    def invalidate_document(self, doc_id: int):
        cache_key = f"doc:{doc_id}"
        self.redis.delete(cache_key)

    def invalidate_kb(self, kb_id: int):
        pattern = f"doc:kb:{kb_id}:*"
        keys = self.redis.keys(pattern)
        if keys:
            self.redis.delete(*keys)

# 使用示例
cache_manager = CacheManager(redis_client)

def get_document(doc_id: int) -> dict:
    # 先查 L1 缓存
    doc = get_document_cached_local(doc_id)
    if doc:
        return doc

    # 再查 L2 缓存
    doc = cache_manager.get_document(doc_id)
    if doc:
        return doc

    # 最后查数据库
    doc = get_document_from_db(doc_id)
    if doc:
        # 写入缓存
        cache_manager.set_document(doc_id, doc)
        return doc

    return None

5.2.2 异步处理

任务队列设计

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  Web 应用    │───▶│  消息队列    │───▶│  Worker     │
│ (FastAPI)   │    │ (Redis/RQ)  │    │  (Celery)   │
└─────────────┘    └─────────────┘    └─────────────┘

Celery 配置

python
# celery_app.py
from celery import Celery

celery_app = Celery(
    "yuxi_know",
    broker="redis://:password@localhost:6379/0",
    backend="redis://:password@localhost:6379/1",
    include=[
        "app.tasks.document_tasks",
        "app.tasks.knowledge_graph_tasks",
        "app.tasks.embedding_tasks"
    ]
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Asia/Shanghai",
    enable_utc=True,
    task_routes={
        "app.tasks.document_tasks.*": {"queue": "documents"},
        "app.tasks.knowledge_graph_tasks.*": {"queue": "graph"},
        "app.tasks.embedding_tasks.*": {"queue": "embeddings"},
    },
    worker_prefetch_multiplier=4,
    task_acks_late=True,
    worker_max_tasks_per_child=1000,
)

# tasks.py
from celery import shared_task
import logging

logger = logging.getLogger(__name__)

@shared_task(bind=True, max_retries=3)
def process_document_embedding(self, doc_id: int):
    try:
        # 处理文档嵌入
        doc = get_document(doc_id)
        embedding = generate_embedding(doc['content'])

        # 保存到向量数据库
        save_embedding(doc_id, embedding)

        logger.info(f"Document {doc_id} embedding processed successfully")
        return {"status": "success", "doc_id": doc_id}

    except Exception as exc:
        logger.error(f"Error processing document {doc_id}: {str(exc)}")
        # 重试
        raise self.retry(exc=exc, countdown=60)

@shared_task(bind=True)
def build_knowledge_graph(self, kb_id: int):
    try:
        # 构建知识图谱
        documents = get_kb_documents(kb_id)
        graph = build_graph_from_documents(documents)

        # 保存到 Neo4j
        save_graph_to_neo4j(graph)

        logger.info(f"Knowledge graph for KB {kb_id} built successfully")
        return {"status": "success", "kb_id": kb_id}

    except Exception as exc:
        logger.error(f"Error building graph for KB {kb_id}: {str(exc)}")
        raise self.retry(exc=exc, countdown=300)

5.2.3 并发控制

FastAPI 并发配置

python
from fastapi import FastAPI, BackgroundTasks
from fastapi.concurrency import run_in_threadpool
import asyncio

app = FastAPI()

# 限制并发请求数
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware

# 限流
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/api/search")
@limiter.limit("100/minute")  # 每分钟 100 次请求
async def search_documents(
    query: str,
    request: Request
):
    # 异步处理
    results = await run_in_threadpool(
        search_documents_sync,
        query
    )
    return results

# 使用 BackgroundTasks 处理耗时任务
@app.post("/api/documents/upload")
async def upload_document(
    file: UploadFile,
    background_tasks: BackgroundTasks,
    current_user: str = Depends(verify_token)
):
    # 保存文件
    doc_id = save_file(file)

    # 后台处理
    background_tasks.add_task(
        process_document_embedding,
        doc_id
    )
    background_tasks.add_task(
        extract_knowledge_graph,
        doc_id
    )

    return {"doc_id": doc_id, "status": "processing"}

5.3 前端优化

5.3.1 代码分割

Vite 配置

javascript
// vite.config.js
import { defineConfig } from 'vite'
import react from '@vitejs/plugin-react'

export default defineConfig({
  plugins: [react()],
  build: {
    rollupOptions: {
      output: {
        manualChunks: {
          'react-vendor': ['react', 'react-dom', 'react-router-dom'],
          'ui-components': ['antd', '@ant-design/icons'],
          'charts': ['echarts', 'react-echarts'],
          'editor': ['monaco-editor'],
          'utils': ['lodash', 'dayjs', 'axios']
        }
      }
    },
    chunkSizeWarningLimit: 1000
  }
})

5.3.2 资源优化

图片优化

javascript
// 图片懒加载
import { LazyLoadImage } from 'react-lazy-load-image-component'

function DocumentCard({ document }) {
  return (
    <div>
      <LazyLoadImage
        src={document.thumbnail}
        effect="blur"
        threshold={100}
      />
      <h3>{document.title}</h3>
    </div>
  )
}

// WebP 格式
const images = [
  {
    src: '/image.webp',
    type: 'image/webp',
    fallback: '/image.jpg'
  }
]

function OptimizedImage({ src, webp, alt }) {
  return (
    <picture>
      <source srcSet={webp} type="image/webp" />
      <img src={src} alt={alt} loading="lazy" />
    </picture>
  )
}

5.3.3 API 请求优化

请求合并与缓存

javascript
import { useQuery } from '@tanstack/react-query'
import { debounce } from 'lodash'

// 使用 React Query 管理请求
const queryClient = new QueryClient({
  defaultOptions: {
    queries: {
      staleTime: 5 * 60 * 1000,  // 5 分钟
      cacheTime: 10 * 60 * 1000, // 10 分钟
      refetchOnWindowFocus: false,
    },
  },
})

// 防抖搜索
const debouncedSearch = debounce((query) => {
  queryClient.invalidateQueries(['documents', { query }])
}, 300)

function SearchBar() {
  const [searchQuery, setSearchQuery] = useState('')

  const handleChange = (e) => {
    setSearchQuery(e.target.value)
    debouncedSearch(e.target.value)
  }

  return (
    <input
      type="text"
      value={searchQuery}
      onChange={handleChange}
      placeholder="搜索文档..."
    />
  )
}

// 并行请求
async function fetchDocumentDetail(docId) {
  const [document, relatedDocs, graph] = await Promise.all([
    fetch(`/api/documents/${docId}`).then(r => r.json()),
    fetch(`/api/documents/${docId}/related`).then(r => r.json()),
    fetch(`/api/graph/${docId}`).then(r => r.json()),
  ])

  return { document, relatedDocs, graph }
}

function DocumentDetail({ docId }) {
  const { data, isLoading } = useQuery({
    queryKey: ['document', docId],
    queryFn: () => fetchDocumentDetail(docId),
  })

  if (isLoading) return <Loading />
  return <DocumentView {...data} />
}

6. 监控与运维

6.1 监控体系

6.1.1 指标监控

Prometheus + Grafana 架构

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  应用节点    │───▶│  Prometheus │───▶│   Grafana   │
│ (Exporter)  │    │   (TSDB)    │    │ (Dashboard) │
└─────────────┘    └─────────────┘    └─────────────┘
       │                                     │
       └─────────────────────────────────────┘


                    ┌─────────────┐
                    │  AlertMgr   │
                    │  (告警管理)  │
                    └─────────────┘

关键监控指标

指标类型指标名称告警阈值
系统指标CPU 使用率> 80% 持续 5 分钟
系统指标内存使用率> 85% 持续 5 分钟
系统指标磁盘使用率> 80%
应用指标API 响应时间 P95> 2 秒
应用指标API 错误率> 1%
应用指标并发连接数> 80% 最大值
数据库指标慢查询数量> 10/分钟
数据库指标连接池使用率> 80%
Redis 指标内存使用率> 80%
Redis 指标命中率< 90%

Prometheus 配置示例

yaml
# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "alert_rules.yml"

alerting:
  alertmanagers:
    - static_configs:
        - targets:
            - alertmanager:9093

scrape_configs:
  - job_name: 'yuxi-know-backend'
    static_configs:
      - targets: ['backend:8000']
    metrics_path: '/metrics'
    scrape_interval: 10s

  - job_name: 'postgres'
    static_configs:
      - targets: ['postgres_exporter:9187']

  - job_name: 'redis'
    static_configs:
      - targets: ['redis_exporter:9121']

  - job_name: 'node'
    static_configs:
      - targets: ['node_exporter:9100']

6.1.2 日志管理

ELK Stack 架构

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  应用日志    │───▶│  Filebeat   │───▶│  Logstash   │
│  (JSON)     │    │  (采集)     │    │  (处理)     │
└─────────────┘    └─────────────┘    └─────────────┘


                                    ┌─────────────┐
                                    │ Elasticsearch│
                                    │   (存储)    │
                                    └─────────────┘


                                    ┌─────────────┐
                                    │   Kibana    │
                                    │  (可视化)    │
                                    └─────────────┘

日志格式规范

python
import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_obj = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
        }

        # 添加上下文信息
        if hasattr(record, 'user_id'):
            log_obj['user_id'] = record.user_id
        if hasattr(record, 'kb_id'):
            log_obj['kb_id'] = record.kb_id
        if hasattr(record, 'doc_id'):
            log_obj['doc_id'] = record.doc_id

        # 异常信息
        if record.exc_info:
            log_obj['exception'] = self.formatException(record.exc_info)

        return json.dumps(log_obj)

# 配置日志
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)

# 使用示例
logger.info(
    "Document processed",
    extra={
        "user_id": 123,
        "kb_id": 456,
        "doc_id": 789,
    }
)

6.1.3 链路追踪

Jaeger / Zipkin 集成

python
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter

# 配置 Jaeger 导出器
jaeger_exporter = JaegerExporter(
    agent_host_name="jaeger",
    agent_port=6831,
)

# 配置 Tracer
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

# 仪表化 FastAPI
FastAPIInstrumentor.instrument_app(app)

# 仪表化 SQLAlchemy
SQLAlchemyInstrumentor().instrument(
    engine=engine,
)

# 使用 Tracer
tracer = trace.get_tracer(__name__)

def process_document(doc_id: int):
    with tracer.start_as_current_span("process_document") as span:
        span.set_attribute("doc_id", doc_id)

        # 获取文档
        with tracer.start_as_current_span("fetch_document"):
            doc = fetch_document(doc_id)

        # 生成嵌入
        with tracer.start_as_current_span("generate_embedding"):
            embedding = generate_embedding(doc)

        # 保存
        with tracer.start_as_current_span("save_embedding"):
            save_embedding(doc_id, embedding)

6.2 告警策略

6.2.1 告警规则

alert_rules.yml

yaml
groups:
  - name: yuxi-know-alerts
    interval: 30s
    rules:
      # API 响应时间告警
      - alert: HighAPILatency
        expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 2
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "API 响应时间过长"
          description: "P95 响应时间为 {{ $value }}s,超过 2s 阈值"

      # API 错误率告警
      - alert: HighAPIErrorRate
        expr: rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m]) > 0.01
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "API 错误率过高"
          description: "错误率为 {{ $value | humanizePercentage }},超过 1% 阈值"

      # CPU 使用率告警
      - alert: HighCPUUsage
        expr: 100 - (avg(rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 80
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "CPU 使用率过高"
          description: "CPU 使用率为 {{ $value }}%,超过 80% 阈值"

      # 内存使用率告警
      - alert: HighMemoryUsage
        expr: (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100 > 85
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "内存使用率过高"
          description: "内存使用率为 {{ $value }}%,超过 85% 阈值"

      # 磁盘使用率告警
      - alert: HighDiskUsage
        expr: (1 - (node_filesystem_avail_bytes{fstype=~"ext4|xfs"} / node_filesystem_size_bytes)) * 100 > 80
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "磁盘使用率过高"
          description: "磁盘使用率为 {{ $value }}%,超过 80% 阈值"

      # PostgreSQL 慢查询告警
      - alert: HighPostgreSQLSlowQueries
        expr: rate(pg_stat_statements_calls_total{datname="yuxiknow"}[5m]) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "PostgreSQL 慢查询过多"
          description: "慢查询数量为 {{ $value }}/分钟"

      # Redis 内存告警
      - alert: HighRedisMemoryUsage
        expr: (redis_memory_used_bytes / redis_memory_max_bytes) * 100 > 80
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Redis 内存使用率过高"
          description: "Redis 内存使用率为 {{ $value }}%"

6.2.2 告警通知

AlertManager 配置

yaml
# alertmanager.yml
global:
  resolve_timeout: 5m
  slack_api_url: 'https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK'

route:
  group_by: ['alertname', 'severity']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 12h
  receiver: 'default'

  routes:
    - match:
        severity: critical
      receiver: 'critical-alerts'
    - match:
        severity: warning
      receiver: 'warning-alerts'

receivers:
  - name: 'default'
    slack_configs:
      - channel: '#yuxi-know-alerts'
        send_resolved: true

  - name: 'critical-alerts'
    slack_configs:
      - channel: '#yuxi-know-critical'
        send_resolved: true
    pagerduty_configs:
      - service_key: 'YOUR_PAGERDUTY_SERVICE_KEY'

  - name: 'warning-alerts'
    slack_configs:
      - channel: '#yuxi-know-warnings'
        send_resolved: true
    email_configs:
      - to: '[email protected]'
        headers:
          Subject: '[Yuxi-Know Warning] {{ .GroupLabels.alertname }}'

6.3 自动化运维

6.3.1 自动扩缩容

Kubernetes HPA 配置

yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: yuxi-know-backend-hpa
  namespace: yuxi-know-production
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: yuxi-know-backend
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60
    scaleUp:
      stabilizationWindowSeconds: 0
      policies:
      - type: Percent
        value: 100
        periodSeconds: 30
      - type: Pods
        value: 2
        periodSeconds: 30
      selectPolicy: Max

6.3.2 自动备份

备份脚本

bash
#!/bin/bash
# backup.sh

BACKUP_DIR="/backup/yuxi-know"
DATE=$(date +%Y%m%d_%H%M%S)

# 创建备份目录
mkdir -p ${BACKUP_DIR}/${DATE}

# PostgreSQL 备份
echo "Backing up PostgreSQL..."
docker exec postgres-container pg_dump -U user yuxiknow > ${BACKUP_DIR}/${DATE}/postgres_backup.sql
gzip ${BACKUP_DIR}/${DATE}/postgres_backup.sql

# Redis 备份
echo "Backing up Redis..."
docker exec redis-container redis-cli --rdb /data/dump.rdb
docker cp redis-container:/data/dump.rdb ${BACKUP_DIR}/${DATE}/redis_backup.rdb
gzip ${BACKUP_DIR}/${DATE}/redis_backup.rdb

# Neo4j 备份
echo "Backing up Neo4j..."
docker exec neo4j-container neo4j-admin database dump yuxi-know --to-path=/backups
docker cp neo4j-container:/backups/yuxi-know.dump ${BACKUP_DIR}/${DATE}/neo4j_backup.dump
gzip ${BACKUP_DIR}/${DATE}/neo4j_backup.dump

# 上传到 S3
echo "Uploading to S3..."
aws s3 sync ${BACKUP_DIR}/${DATE} s3://yuxi-know-backups/${DATE}/

# 清理旧备份(保留 7 天)
find ${BACKUP_DIR} -type d -mtime +7 -exec rm -rf {} \;

echo "Backup completed: ${DATE}"

Crontab 配置

bash
# 每天凌晨 2 点执行备份
0 2 * * * /usr/local/bin/backup.sh >> /var/log/backup.log 2>&1

6.3.3 故障自愈

Kubernetes Liveness Probe

yaml
livenessProbe:
  httpGet:
    path: /health
    port: 8000
  initialDelaySeconds: 30
  periodSeconds: 10
  timeoutSeconds: 5
  failureThreshold: 3

Readiness Probe

yaml
readinessProbe:
  httpGet:
    path: /ready
    port: 8000
  initialDelaySeconds: 10
  periodSeconds: 5
  timeoutSeconds: 3
  failureThreshold: 3

健康检查端点

python
from fastapi import FastAPI, HTTPException
from sqlalchemy import text
import redis
import pymilvus

app = FastAPI()

@app.get("/health")
def health_check():
    return {"status": "healthy"}

@app.get("/ready")
def readiness_check():
    # 检查数据库连接
    try:
        db.execute(text("SELECT 1"))
    except Exception as e:
        raise HTTPException(status_code=503, detail="Database unavailable")

    # 检查 Redis 连接
    try:
        redis_client.ping()
    except Exception as e:
        raise HTTPException(status_code=503, detail="Redis unavailable")

    # 检查 Milvus 连接
    try:
        milvus_client.list_collections()
    except Exception as e:
        raise HTTPException(status_code=503, detail="Milvus unavailable")

    # 检查 Neo4j 连接
    try:
        neo4j_driver.verify_connectivity()
    except Exception as e:
        raise HTTPException(status_code=503, detail="Neo4j unavailable")

    return {"status": "ready"}

7. 数据管理

7.1 数据备份

7.1.1 备份策略

3-2-1 备份原则

  • 至少保留 3 份副本
  • 使用 2 种不同存储介质
  • 至少 1 份异地备份

备份类型

备份类型频率保留周期用途
全量备份每日7 天完整恢复
增量备份每小时24 小时快速恢复
差异备份每 6 小时3 天中速恢复
日志备份实时30 天点时间恢复

7.1.2 自动化备份脚本

PostgreSQL 备份

bash
#!/bin/bash
# postgres_backup.sh

DB_NAME="yuxiknow"
DB_USER="user"
DB_HOST="postgres"
BACKUP_DIR="/backup/postgres"
S3_BUCKET="s3://yuxi-know-backups/postgres"

DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="${BACKUP_DIR}/${DB_NAME}_${DATE}.sql.gz"

# 创建备份
docker exec postgres-container pg_dump -U ${DB_USER} ${DB_NAME} | gzip > ${BACKUP_FILE}

# 上传到 S3
aws s3 cp ${BACKUP_FILE} ${S3_BUCKET}/${DATE}/

# 清理本地旧备份(保留 3 天)
find ${BACKUP_DIR} -name "*.sql.gz" -mtime +3 -delete

# 清理 S3 旧备份(保留 7 天)
aws s3 ls ${S3_BUCKET}/ | awk '{print $2}' | while read dir; do
    dir_date=$(echo $dir | tr -d '/')
    if [[ $(date -d "$dir_date" +%s) -lt $(date -d "7 days ago" +%s) ]]; then
        aws s3 rm ${S3_BUCKET}/${dir} --recursive
    fi
done

7.1.3 恢复测试

恢复脚本

bash
#!/bin/bash
# postgres_restore.sh

BACKUP_FILE=$1
DB_NAME="yuxiknow"
DB_USER="user"
DB_HOST="postgres"

# 下载备份
aws s3 cp ${BACKUP_FILE} /tmp/restore_backup.sql.gz

# 解压
gunzip /tmp/restore_backup.sql.gz

# 停止应用
docker-compose stop backend

# 恢复数据库
docker exec -i postgres-container psql -U ${DB_USER} ${DB_NAME} < /tmp/restore_backup.sql

# 启动应用
docker-compose start backend

# 验证
docker exec postgres-container psql -U ${DB_USER} ${DB_NAME} -c "SELECT COUNT(*) FROM documents;"

7.2 数据迁移

7.2.1 从其他系统迁移到 Yuxi-Know

迁移流程

1. 源系统数据导出

2. 数据格式转换

3. 导入到 Yuxi-Know

4. 生成向量嵌入

5. 构建知识图谱

6. 数据验证

迁移脚本示例

python
import requests
import hashlib
import json
from typing import List, Dict
import time

class DataMigrator:
    def __init__(self, target_api_url: str, api_key: str):
        self.target_api_url = target_api_url
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

    def migrate_document(self, source_doc: Dict) -> bool:
        try:
            # 转换数据格式
            target_doc = {
                "title": source_doc.get("title", ""),
                "content": source_doc.get("content", ""),
                "metadata": {
                    "source": "migration",
                    "source_id": source_doc.get("id"),
                    "original_url": source_doc.get("url"),
                    "author": source_doc.get("author"),
                    "created_at": source_doc.get("created_at"),
                },
                "kb_id": self.kb_id
            }

            # 上传文档
            response = requests.post(
                f"{self.target_api_url}/api/documents",
                headers=self.headers,
                json=target_doc
            )

            if response.status_code == 201:
                doc_id = response.json().get("id")
                print(f"Document migrated successfully: {doc_id}")
                return True
            else:
                print(f"Failed to migrate document: {response.text}")
                return False

        except Exception as e:
            print(f"Error migrating document: {str(e)}")
            return False

    def batch_migrate(self, source_docs: List[Dict], batch_size: int = 10):
        success_count = 0
        fail_count = 0

        for i, doc in enumerate(source_docs, 1):
            if self.migrate_document(doc):
                success_count += 1
            else:
                fail_count += 1

            # 每 batch_size 个文档暂停一次,避免请求过快
            if i % batch_size == 0:
                time.sleep(1)

        print(f"\nMigration completed:")
        print(f"  Success: {success_count}")
        print(f"  Failed: {fail_count}")

# 使用示例
migrator = DataMigrator(
    target_api_url="https://yuxi-know.example.com",
    api_key="your-api-key"
)

# 从源系统获取文档
source_docs = fetch_documents_from_source_system()

# 执行批量迁移
migrator.batch_migrate(source_docs, batch_size=10)

7.2.2 数据验证

验证检查清单

  • [ ] 文档总数一致
  • [ ] 文档内容完整
  • [ ] 元数据正确
  • [ ] 向量嵌入生成成功
  • [ ] 知识图谱节点/边数量正确
  • [ ] 搜索功能正常

验证脚本

python
class DataValidator:
    def __init__(self, target_api_url: str, api_key: str):
        self.target_api_url = target_api_url
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

    def validate_document_count(self, expected_count: int) -> bool:
        """验证文档总数"""
        response = requests.get(
            f"{self.target_api_url}/api/documents/count",
            headers=self.headers
        )
        actual_count = response.json().get("count", 0)

        if actual_count == expected_count:
            print(f"✓ Document count matches: {actual_count}")
            return True
        else:
            print(f"✗ Document count mismatch: expected {expected_count}, got {actual_count}")
            return False

    def validate_document_content(self, doc_id: int, expected_content: str) -> bool:
        """验证文档内容"""
        response = requests.get(
            f"{self.target_api_url}/api/documents/{doc_id}",
            headers=self.headers
        )
        actual_content = response.json().get("content", "")

        if actual_content == expected_content:
            print(f"✓ Document {doc_id} content matches")
            return True
        else:
            print(f"✗ Document {doc_id} content mismatch")
            return False

    def validate_search(self, query: str, expected_results: int) -> bool:
        """验证搜索功能"""
        response = requests.post(
            f"{self.target_api_url}/api/search",
            headers=self.headers,
            json={"query": query, "limit": 10}
        )
        results = response.json().get("results", [])

        if len(results) >= expected_results:
            print(f"✓ Search for '{query}' returned {len(results)} results")
            return True
        else:
            print(f"✗ Search for '{query}' returned {len(results)} results, expected {expected_results}")
            return False

7.3 数据生命周期管理

7.3.1 数据保留策略

数据保留周期

数据类型保留周期归档策略
活跃文档永久保留在线存储
已删除文档30 天软删除后归档
历史版本90 天归档到冷存储
查询日志180 天归档到数据湖
审计日志365 天归档到合规存储
临时文件24 小时自动清理

7.3.2 数据清理脚本

清理过期数据

python
import asyncio
from datetime import datetime, timedelta
from sqlalchemy.orm import Session

async def clean_expired_data(db: Session):
    """清理过期数据"""

    # 清理已删除超过 30 天的文档
    cutoff_date = datetime.utcnow() - timedelta(days=30)

    deleted_docs = db.query(Document).filter(
        Document.deleted_at < cutoff_date
    ).all()

    for doc in deleted_docs:
        # 删除向量数据
        delete_embedding(doc.id)

        # 删除图谱数据
        delete_graph_node(doc.id)

        # 物理删除
        db.delete(doc)

    db.commit()

    # 清理历史版本
    old_versions = db.query(DocumentVersion).filter(
        DocumentVersion.created_at < cutoff_date
    ).all()

    for version in old_versions:
        db.delete(version)

    db.commit()

    # 清理临时文件
    import os
    temp_dir = "/tmp/yuxi-know"
    cutoff_time = time.time() - 86400  # 24 小时

    for filename in os.listdir(temp_dir):
        filepath = os.path.join(temp_dir, filename)
        if os.path.isfile(filepath):
            if os.path.getmtime(filepath) < cutoff_time:
                os.remove(filepath)
                print(f"Deleted temporary file: {filename}")

8. 实施路线图

8.1 30 天快速部署计划

第 1 周:基础设施准备

任务负责人交付物
云服务采购与配置DevOps账号、VPC、安全组
Kubernetes 集群搭建DevOpsK8s 集群、Ingress
数据库集群部署DBAPostgreSQL、Redis、Milvus、Neo4j
监控系统搭建DevOpsPrometheus、Grafana、AlertManager
CI/CD 流水线配置DevOpsGitLab CI、镜像仓库

第 2 周:应用部署

任务负责人交付物
Yuxi-Know 源码编译开发Docker 镜像
Helm Chart 开发DevOps部署模板
测试环境部署DevOps测试环境
基础功能测试QA测试报告
性能基准测试QA性能报告

第 3 周:安全加固

任务负责人交付物
HTTPS 证书配置DevOpsSSL 证书
身份认证集成开发SSO/MFA
权限系统配置开发RBAC 配置
安全审计安全团队安全报告
渗透测试安全团队渗透测试报告

第 4 周:生产上线

任务负责人交付物
生产环境部署DevOps生产环境
数据迁移DBA/开发数据迁移报告
监控告警配置DevOps告警规则
用户培训培训师培训材料
上线验证全员上线检查清单

8.2 持续优化计划

月度优化(第 1-3 个月)

  • 第 1 个月:收集使用反馈,优化核心功能
  • 第 2 个月:性能调优,提升响应速度
  • 第 3 个月:扩展功能,满足业务需求

季度优化(第 4-12 个月)

  • Q2:构建领域知识图谱,提升问答准确率
  • Q3:集成更多外部数据源,丰富知识库
  • Q4:实施多租户架构,支持多业务线

9. 成本分析

9.1 云服务成本

9.1.1 中小规模(500 用户)

资源类型规格数量月度成本
ECS4C/8G3 台¥3,600
RDS PostgreSQL4C/8G主从 2 台¥2,400
Redis 企业版2G3 主 3 从¥1,200
Milvus(自建)8C/16G3 台¥2,400
Neo4j(自建)4C/8G3 台¥1,800
OSS 存储500G-¥300
SLB 负载均衡-1 个¥300
CDN 流量1TB-¥600
合计¥12,600

9.1.2 大规模(2000 用户)

资源类型规格数量月度成本
ECS8C/16G6 台¥14,400
RDS PostgreSQL8C/16G主从 2 台¥4,800
Redis 企业版4G3 主 3 从¥2,400
Milvus(自建)16C/32G5 台¥12,000
Neo4j(自建)8C/16G5 台¥6,000
OSS 存储2TB-¥1,200
SLB 负载均衡-2 个¥600
CDN 流量5TB-¥3,000
合计¥44,400

9.2 私有云成本

9.2.1 硬件采购成本

设备类型规格数量单价合计
控制节点16C/32G/500G3 台¥40,000¥120,000
工作节点32C/64G/1T9 台¥60,000¥540,000
网络设备万兆交换机4 台¥20,000¥80,000
存储设备分布式存储1 套¥100,000¥100,000
合计¥840,000

9.2.2 运维成本(年度)

成本项年度成本
电费¥60,000
机房租赁¥120,000
运维人员¥240,000
软件授权¥100,000
备份设备¥50,000
合计¥570,000

9.3 成本优化建议

9.3.1 云服务优化

  • 预留实例:购买预留实例,节省 30%-50% 成本
  • 自动扩缩容:根据业务高峰调整资源
  • 冷数据归档:将不常访问的数据归档到廉价存储
  • 混合部署:核心数据私有云,弹性计算公有云

9.3.2 资源优化

  • 容器化:提高资源利用率
  • 共享服务:多个应用共享数据库、缓存等资源
  • Serverless:使用 Serverless 计算处理突发流量
  • CDN 加速:减少源站带宽消耗

10. 风险管理

10.1 技术风险

风险项可能性影响应对措施
单点故障高可用架构,多实例部署
数据丢失极高定期备份,异地存储
性能瓶颈水平扩展,缓存优化
安全漏洞安全审计,渗透测试
第三方依赖多供应商策略

10.2 业务风险

风险项可能性影响应对措施
用户接受度低充分调研,用户培训
数据质量差数据治理,质量控制
合规风险法务审核,合规设计
成本超支成本监控,持续优化

10.3 应急预案

10.3.1 服务中断应急

恢复步骤

  1. 确认故障范围
  2. 切换到备用实例/备用数据中心
  3. 通知相关方
  4. 定位并修复问题
  5. 验证恢复
  6. 复盘总结

RTO/RPO 目标

  • RTO(恢复时间目标):≤ 4 小时
  • RPO(恢复点目标):≤ 15 分钟

10.3.2 数据泄露应急

响应步骤

  1. 立即隔离受影响系统
  2. 评估泄露范围与影响
  3. 通知管理层与法务部门
  4. 必要时通知受影响用户
  5. 修复安全漏洞
  6. 加强安全措施
  7. 总结经验教训

11. 附录

11.1 检查清单

部署前检查

  • [ ] 云服务账号已创建并配置
  • [ ] 域名已解析到负载均衡器
  • [ ] SSL 证书已申请并配置
  • [ ] 数据库集群已部署并测试
  • [ ] 监控系统已搭建并配置告警
  • [ ] CI/CD 流水线已配置并测试
  • [ ] 备份策略已制定并测试
  • [ ] 安全策略已制定并实施

部署后检查

  • [ ] 所有服务正常运行
  • [ ] 健康检查端点正常响应
  • [ ] 监控指标正常
  • [ ] 日志正常输出
  • [ ] 备份正常执行
  • [ ] 告警正常触发
  • [ ] 用户能正常访问
  • [ ] 核心功能正常

11.2 常用命令

Kubernetes 常用命令

bash
# 查看所有 Pod
kubectl get pods -n yuxi-know-production

# 查看服务日志
kubectl logs -f deployment/yuxi-know-backend -n yuxi-know-production

# 进入容器
kubectl exec -it deployment/yuxi-know-backend -n yuxi-know-production -- bash

# 扩缩容
kubectl scale deployment/yuxi-know-backend --replicas=5 -n yuxi-know-production

# 查看事件
kubectl get events -n yuxi-know-production --sort-by=.metadata.creationTimestamp

# 查看资源使用
kubectl top pods -n yuxi-know-production

Docker 常用命令

bash
# 查看运行中的容器
docker ps

# 查看容器日志
docker logs -f yuxi-know-backend

# 进入容器
docker exec -it yuxi-know-backend bash

# 查看容器资源使用
docker stats yuxi-know-backend

# 清理未使用的资源
docker system prune -a --volumes

PostgreSQL 常用命令

bash
# 连接数据库
psql -h localhost -U user -d yuxiknow

# 查看表大小
SELECT
  schemaname,
  tablename,
  pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;

# 查看慢查询
SELECT
  query,
  mean_exec_time,
  calls,
  total_exec_time
FROM pg_stat_statements
ORDER BY mean_exec_time DESC
LIMIT 10;

# 分析表
VACUUM ANALYZE documents;

Redis 常用命令

bash
# 连接 Redis
redis-cli -h localhost -p 6379

# 查看内存使用
INFO memory

# 查看慢查询
SLOWLOG GET 10

# 清空缓存
FLUSHDB

11.3 故障排查手册

问题一:服务无法启动

症状:容器启动失败

排查步骤

  1. 查看容器日志
  2. 检查环境变量配置
  3. 检查依赖服务连接
  4. 检查端口占用
  5. 检查磁盘空间

常见原因

  • 数据库连接失败
  • 配置文件错误
  • 端口被占用
  • 磁盘空间不足

问题二:查询速度慢

症状:API 响应时间长

排查步骤

  1. 查看数据库慢查询日志
  2. 检查索引是否生效
  3. 检查缓存命中率
  4. 检查网络延迟
  5. 检查资源使用率

优化措施

  • 添加索引
  • 优化查询语句
  • 增加缓存
  • 水平扩展
  • 升级硬件

问题三:内存不足

症状:OOMKilled,服务频繁重启

排查步骤

  1. 查看内存使用情况
  2. 分析内存占用
  3. 检查内存泄漏
  4. 检查缓存配置

优化措施

  • 调整容器内存限制
  • 优化代码内存使用
  • 调整缓存大小
  • 增加物理内存

11.4 参考资源

官方文档

社区资源

相关工具


版本信息

文档版本:v1.0 编制日期:2026 年 3 月 25 日 适用版本:Yuxi-Know v0.5.0 编制单位:企业部署技术团队


免责声明

本报告基于 Yuxi-Know v0.5.0 版本编制,内容仅供参考。实际部署中请根据具体业务需求和技术环境进行调整。本报告不对因使用本报告内容而产生的任何直接或间接损失承担责任。


结束

MIT