Skip to content

推荐系统实战开发

在掌握了推荐系统的架构和算法后,本章将带你进入实战开发阶段。我们将从技术环境搭建开始,逐步实现一个完整的推荐系统,包括特征工程、向量检索、模型训练等核心环节。

1. 推荐系统开发技术环境

推荐系统开发涉及多个技术栈,需要根据项目规模和团队技术选型灵活选择。

1.1 在线服务层

技术选型

Python:适合快速原型开发

  • Web 框架:Flask、FastAPI
  • 通信协议:gRPC、HTTP
  • 容器化:Docker、Kubernetes

Java:适合大规模生产环境

  • Web 框架:Spring Boot
  • 通信协议:gRPC、REST API
  • 微服务:Spring Cloud

示例:Flask 推荐服务

python
from flask import Flask, jsonify, request
import redis
import numpy as np

app = Flask(__name__)

# 连接 Redis
r = redis.Redis(host='localhost', port=6379, db=0)

@app.route('/api/v1/recommend', methods=['GET'])
def recommend():
    user_id = request.args.get('user_id')
    limit = int(request.args.get('limit', 20))

    # 获取推荐结果
    recommendations = get_recommendations(user_id, limit)

    return jsonify({
        'code': 0,
        'data': recommendations
    })

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

1.2 机器学习层

技术选型

传统机器学习

  • scikit-learn:传统算法(随机森林、GBDT)
  • Spark MLlib:分布式机器学习

深度学习

  • TensorFlow:工业级深度学习框架
  • PyTorch:研究和快速原型开发

示例:使用 scikit-learn 训练推荐模型

python
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
import pandas as pd

# 1. 加载数据
data = pd.read_csv('user_item_features.csv')

# 2. 特征工程
features = ['user_age', 'user_gender', 'item_category', 'item_price']
X = data[features]
y = data['rating']

# 3. 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# 4. 训练模型
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# 5. 评估
score = model.score(X_test, y_test)
print(f"R² Score: {score}")

# 6. 预测
predictions = model.predict(X_test)

1.3 数据存储层

技术选型

关系型数据库

  • MySQL:存储物品元数据、用户信息
  • PostgreSQL:复杂查询、JSON 数据

NoSQL 数据库

  • Redis:内存数据库,用于缓存和实时推荐
  • Cassandra:分布式 NoSQL,存储用户-物品数据
  • HBase:大数据场景,存储海量用户行为

数据仓库

  • Hive:离线数据仓库
  • Spark SQL:实时数据分析

示例:Redis 存储推荐结果

python
import redis
import json

r = redis.Redis(host='localhost', port=6379, db=0)

# 存储用户推荐列表
def save_user_recommendations(user_id, recommendations):
    key = f"recommend:{user_id}"
    value = json.dumps(recommendations)
    r.setex(key, 3600, value)  # 1小时过期

# 获取用户推荐列表
def get_user_recommendations(user_id):
    key = f"recommend:{user_id}"
    value = r.get(key)
    if value:
        return json.loads(value)
    return None

1.4 数据处理层

技术选型

数据处理

  • NumPy / Pandas:小规模数据处理
  • Spark:大规模分布式数据处理

数据仓库

  • Hive:离线数据查询和分析
  • Presto:实时数据分析

示例:使用 Spark 处理用户行为数据

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, desc

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("UserBehaviorAnalysis") \
    .getOrCreate()

# 读取数据
user_behavior = spark.read.csv('user_behavior.csv', header=True)

# 统计每个用户的互动次数
user_stats = user_behavior.groupBy('user_id') \
    .agg(
        count('*').alias('interaction_count')
    ) \
    .orderBy(desc('interaction_count'))

# 显示结果
user_stats.show(10)

# 保存结果
user_stats.write.mode('overwrite').parquet('user_stats.parquet')

# 停止 Spark
spark.stop()

1.5 编程语言选择

Python

  • 优势:生态丰富、开发效率高、适合算法研究
  • 劣势:性能相对较低
  • 适用场景:算法开发、快速原型、中小规模系统

Java

  • 优势:性能高、生态成熟、适合大规模系统
  • 劣势:开发效率较低、学习曲线陡
  • 适用场景:大规模生产系统、高并发服务

C++

  • 优势:性能最高、适合计算密集型任务
  • 劣势:开发复杂度高、生态相对较少
  • 适用场景:实时计算、高性能推理引擎

1.6 推荐技术栈组合

小型团队 / 快速原型

前端:React / Vue
后端:Python + Flask
数据库:MySQL + Redis
机器学习:scikit-learn + TensorFlow
数据处理:Pandas

中型团队 / 工业级系统

前端:React / Vue
后端:Java + Spring Boot + Python Flask
数据库:MySQL + Redis + Cassandra
机器学习:TensorFlow + PyTorch
数据处理:Spark
部署:Docker + Kubernetes

大型团队 / 超大规模系统

前端:React / Vue
后端:Java + Go
数据库:MySQL + Redis + Cassandra + HBase
机器学习:TensorFlow Serving + ONNX Runtime
数据处理:Spark + Flink
部署:Kubernetes + Istio

2. 数据源与特征工程

特征工程是推荐系统的核心环节,直接决定推荐效果。本节将详细介绍数据源、特征类型和特征处理方法。

2.1 数据源

2.1.1 业务数据库

用途:离线、在线处理,用于机器学习预估模型训练

MySQL

  • 存储物品数据(商品信息、内容元数据)
  • 存储用户列表(用户基本信息)
  • 存储推荐结果(预计算的推荐列表)

示例:MySQL 表设计

sql
-- 物品表
CREATE TABLE items (
    item_id VARCHAR(50) PRIMARY KEY,
    title VARCHAR(200),
    description TEXT,
    category VARCHAR(50),
    price DECIMAL(10, 2),
    tags JSON,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 用户表
CREATE TABLE users (
    user_id VARCHAR(50) PRIMARY KEY,
    age INT,
    gender VARCHAR(10),
    city VARCHAR(50),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 用户行为表
CREATE TABLE user_behaviors (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    user_id VARCHAR(50),
    item_id VARCHAR(50),
    behavior_type VARCHAR(20),  -- click, view, purchase, favorite
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_user_id (user_id),
    INDEX idx_item_id (item_id)
);

2.1.2 前端埋点日志

用途:在线实时请求,用于在线机器学习预估模型训练

Kafka 流

  • 实时收集用户行为日志
  • 流式处理,支持实时推荐

HDFS 表

  • 存储历史日志数据
  • 用于离线分析和模型训练

日志格式

json
{
  "user_id": "user_123456",
  "item_id": "item_789012",
  "behavior_type": "click",
  "timestamp": 1700000000,
  "context": {
    "device": "mobile",
    "location": "beijing",
    "page": "homepage"
  }
}

示例:读取 Kafka 日志

python
from kafka import KafkaConsumer

# 创建消费者
consumer = KafkaConsumer(
    'user_behavior',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest'
)

# 消费消息
for message in consumer:
    log = json.loads(message.value)
    print(f"User {log['user_id']} clicked item {log['item_id']}")

2.1.3 处理数据

用途:统计报表输出,用于查看效果报表、AB Test效果

输出格式

  • CSV / TXT:本地测试
  • API:实时查询
  • Hive 表:离线分析

示例:生成推荐效果报表

python
import pandas as pd
from datetime import datetime, timedelta

# 读取推荐日志
recommend_log = pd.read_csv('recommend_log.csv')

# 计算今日推荐效果
today = datetime.now().date()
today_log = recommend_log[pd.to_datetime(recommend_log['timestamp']).dt.date == today]

# 计算指标
impression_count = len(today_log)
click_count = len(today_log[today_log['behavior_type'] == 'click'])
ctr = click_count / impression_count

print(f"Today's CTR: {ctr:.2%}")

2.2 特征工程

2.2.1 数值类特征

特征类型

  • 年龄、价格、评分、时长等连续值
  • 需要进行归一化或分箱处理

归一化处理(Normalization)

将特征缩放到 [0, 1] 区间

python
from sklearn.preprocessing import MinMaxScaler
import numpy as np

# 示例数据
ages = np.array([[25], [30], [35], [40], [45]])

# 创建 MinMaxScaler
scaler = MinMaxScaler()

# 归一化
normalized_ages = scaler.fit_transform(ages)

print(f"Original ages: {ages.flatten()}")
print(f"Normalized ages: {normalized_ages.flatten()}")
# 输出:Original ages: [25 30 35 40 45]
#       Normalized ages: [0.   0.25 0.5  0.75 1.  ]

标准化处理(Standardization)

将特征缩放到均值为 0、标准差为 1 的分布

python
from sklearn.preprocessing import StandardScaler

# 创建 StandardScaler
scaler = StandardScaler()

# 标准化
standardized_ages = scaler.fit_transform(ages)

print(f"Standardized ages: {standardized_ages.flatten()}")

分箱处理(Binning)

将连续值转换为离散的区间

python
import pandas as pd

# 示例数据
df = pd.DataFrame({
    'age': [18, 22, 28, 35, 45, 55, 65]
})

# 分箱
bins = [0, 18, 30, 50, 100]
labels = ['<18', '18-30', '30-50', '50+']

df['age_bin'] = pd.cut(df['age'], bins=bins, labels=labels)

print(df)
# 输出:
#    age age_bin
# 0   18    <18
# 1   22  18-30
# 2   28  18-30
# 3   35  30-50
# 4   45  30-50
# 5   55    50+
# 6   65    50+

2.2.2 文本类特征

处理步骤

  1. 中文分词:使用 jieba 等工具分词
  2. 关键词提取:提取 TF-IDF 或 TextRank 关键词
  3. 编码:One-Hot / Multi-Hot 编码或 Word Embedding

One-Hot 编码

python
from sklearn.preprocessing import MultiLabelBinarizer

# 示例:用户兴趣标签
user_interests = [
    ['Python', '编程', 'AI'],
    ['Java', '编程'],
    ['JavaScript', '前端'],
    ['Python', '深度学习']
]

# Multi-Hot 编码
mlb = MultiLabelBinarizer()
encoded = mlb.fit_transform(user_interests)

print("Classes:", mlb.classes_)
print("Encoded:")
print(encoded)

# 输出:
# Classes: ['AI' 'Java' 'JavaScript' 'Python' '编程' '深度学习' '前端']
# Encoded:
# [[1 0 0 1 1 0 0]
#  [0 1 0 0 1 0 0]
#  [0 0 1 0 0 0 1]
#  [0 0 0 1 0 1 0]]

TF-IDF 编码

python
from sklearn.feature_extraction.text import TfidfVectorizer

# 示例:商品描述
descriptions = [
    "Python编程从入门到实践",
    "Java核心技术卷一",
    "JavaScript高级程序设计"
]

# TF-IDF 编码
vectorizer = TfidfVectorizer(max_features=10)
tfidf_matrix = vectorizer.fit_transform(descriptions)

print("Feature names:", vectorizer.get_feature_names_out())
print("TF-IDF matrix:")
print(tfidf_matrix.toarray())

Word Embedding

python
from gensim.models import Word2Vec
import numpy as np

# 训练 Word2Vec
sentences = [
    ['Python', '编程', '入门', '实践'],
    ['Java', '编程', '核心', '技术'],
    ['JavaScript', '前端', '开发']
]

model = Word2Vec(sentences, vector_size=100, window=5, min_count=1)

# 文本向量化
def text_to_vector(text):
    words = jieba.lcut(text)
    vectors = [model.wv[word] for word in words if word in model.wv]
    if vectors:
        return np.mean(vectors, axis=0)
    else:
        return np.zeros(100)

# 示例
text = "Python编程入门"
vector = text_to_vector(text)
print(f"Vector shape: {vector.shape}")

2.2.3 ID 类特征

处理方法:Embedding 向量化

示例:用户 ID Embedding

python
import tensorflow as tf
from tensorflow.keras import layers

# 用户 ID Embedding 层
user_id_input = layers.Input(shape=(1,), name='user_id')
user_embedding = layers.Embedding(
    input_dim=1000000,  # 100万用户
    output_dim=64,      # 64维向量
    embeddings_initializer='he_normal'
)(user_id_input)

# 展平
user_vector = layers.Flatten()(user_embedding)

# 查看 Embedding 形状
print(f"User embedding shape: {user_vector.shape}")

学习到的 Embedding 可以用于

  1. 相似用户推荐
  2. 相似物品推荐
  3. 特征补充

2.3 特征工程最佳实践

✅ 应该做的

  1. 特征选择:选择与目标任务相关的特征
  2. 特征变换:归一化、分箱、编码等
  3. 特征组合:创造更有意义的组合特征
  4. 特征监控:监控特征分布变化
  5. 特征版本管理:跟踪特征的变化历史

❌ 不应该做的

  1. 数据泄露:使用未来信息训练模型
  2. 过度工程:创建过多无用特征
  3. 忽略缺失值:不处理缺失值直接训练
  4. 不区分训练/测试集:测试集信息混入训练集

3. Python 使用 Faiss 实现向量近邻搜索

Faiss(Facebook AI Similarity Search)是 Facebook 开源的高效向量相似度搜索库,适用于大规模向量检索场景。

3.1 Faiss 简介

优势

  • ⚡ 性能极高(10亿向量,毫秒级检索)
  • 📦 支持 GPU 加速
  • 🎯 支持多种索引结构
  • 🔧 易于使用

应用场景

  • 大规模物品相似度检索
  • 用户向量聚类
  • 向量数据库

3.2 安装 Faiss

bash
# CPU 版本
pip install faiss-cpu

# GPU 版本(需要 CUDA)
pip install faiss-gpu

3.3 完整实现流程

3.3.1 读取训练好的 Embedding 数据

假设我们有一个电影数据集,每部电影都有一个 100 维的向量表示:

python
import numpy as np

# 模拟数据:1000部电影,每部100维
num_movies = 1000
embedding_dim = 100

# 生成随机向量(实际应用中从文件或数据库读取)
movie_embeddings = np.random.rand(num_movies, embedding_dim).astype('float32')

# 电影 ID 列表
movie_ids = [f"movie_{i}" for i in range(num_movies)]

print(f"Movie embeddings shape: {movie_embeddings.shape}")
print(f"First 5 movie IDs: {movie_ids[:5]}")

3.3.2 构建 Faiss 索引

python
import faiss

# 创建索引
# IndexFlatL2 是精确的 L2 距离索引
index = faiss.IndexFlatL2(embedding_dim)

# 将嵌入向量添加到索引
index.add(movie_embeddings)

print(f"Index contains {index.ntotal} vectors")

其他索引类型

python
# 1. IndexIVFFlat(倒排索引,适合大规模)
nlist = 100  # 聚类中心数
quantizer = faiss.IndexFlatL2(embedding_dim)
index_ivf = faiss.IndexIVFFlat(quantizer, embedding_dim, nlist)

# 需要先训练(聚类)
index_ivf.train(movie_embeddings)
index_ivf.add(movie_embeddings)

# 2. IndexIVFPQ(量化压缩,节省内存)
m = 16  # 每个向量的子向量数量
nbits = 8  # 每个子向量的比特数
index_ivfpq = faiss.IndexIVFPQ(quantizer, embedding_dim, nlist, m, nbits)
index_ivfpq.train(movie_embeddings)
index_ivfpq.add(movie_embeddings)

# 3. GPU 加速(需要 GPU 版本)
# res = faiss.StandardGpuResources()
# index_gpu = faiss.index_cpu_to_gpu(res, 0, index)

3.3.3 目标向量搜索

假设我们有一个目标电影,想找最相似的 10 部电影:

python
# 目标向量(例如:movie_100 的向量)
target_movie_id = "movie_100"
target_idx = movie_ids.index(target_movie_id)
target_vector = movie_embeddings[target_idx:target_idx+1]

# 搜索最相似的 K 个向量
k = 10  # 返回 Top10
distances, indices = index.search(target_vector, k)

print(f"Top {k} similar movies to {target_movie_id}:")
for i, (idx, dist) in enumerate(zip(indices[0], distances[0])):
    similar_movie_id = movie_ids[idx]
    print(f"{i+1}. {similar_movie_id} (distance: {dist:.4f})")

输出示例

Top 10 similar movies to movie_100:
1. movie_456 (distance: 0.1234)
2. movie_789 (distance: 0.2345)
3. movie_321 (distance: 0.3456)
...

3.3.4 根据 ID 获取电影标题

假设我们有一个电影信息字典:

python
# 模拟电影信息
movie_info = {
    f"movie_{i}": {
        "title": f"Movie {i}",
        "category": ["Action", "Drama", "Comedy"][i % 3],
        "year": 2000 + i
    }
    for i in range(num_movies)
}

# 完整的推荐函数
def recommend_similar_movies(target_movie_id, k=10):
    """推荐相似的电影"""

    # 1. 获取目标向量
    target_idx = movie_ids.index(target_movie_id)
    target_vector = movie_embeddings[target_idx:target_idx+1]

    # 2. 搜索相似向量
    distances, indices = index.search(target_vector, k)

    # 3. 构建推荐结果
    recommendations = []
    for idx, dist in zip(indices[0], distances[0]):
        similar_movie_id = movie_ids[idx]
        movie_data = movie_info[similar_movie_id]
        recommendations.append({
            "movie_id": similar_movie_id,
            "title": movie_data["title"],
            "category": movie_data["category"],
            "year": movie_data["year"],
            "similarity": 1 - dist  # 距离转换为相似度
        })

    return recommendations

# 使用
target_movie = "movie_100"
recommendations = recommend_similar_movies(target_movie, k=10)

print(f"\nRecommendations for {target_movie}:")
for i, rec in enumerate(recommendations):
    print(f"{i+1}. {rec['title']} ({rec['category']}, {rec['year']}) - Similarity: {rec['similarity']:.4f}")

3.4 保存和加载索引

python
# 保存索引
faiss.write_index(index, "movie_index.faiss")

# 加载索引
loaded_index = faiss.read_index("movie_index.faiss")
print(f"Loaded index contains {loaded_index.ntotal} vectors")

3.5 性能优化

批量查询

python
# 批量查询多个目标向量
target_vectors = movie_embeddings[:10]  # 查询前10部电影的相似电影
k = 5
distances, indices = loaded_index.search(target_vectors, k)

使用倒排索引加速

python
# IndexIVFFlat 需要先训练
nlist = 100  # 聚类中心数
quantizer = faiss.IndexFlatL2(embedding_dim)
index_ivf = faiss.IndexIVFFlat(quantizer, embedding_dim, nlist)

# 训练(聚类)
index_ivf.train(movie_embeddings)

# 添加向量
index_ivf.add(movie_embeddings)

# 搜索
index_ivf.nprobe = 10  # 搜索的聚类中心数(越大越准确,但越慢)
distances, indices = index_ivf.search(target_vector, k)

3.6 完整代码示例

python
import faiss
import numpy as np

# 1. 准备数据
num_movies = 1000
embedding_dim = 100
movie_embeddings = np.random.rand(num_movies, embedding_dim).astype('float32')
movie_ids = [f"movie_{i}" for i in range(num_movies)]

# 2. 构建索引
index = faiss.IndexFlatL2(embedding_dim)
index.add(movie_embeddings)

# 3. 搜索
target_movie_id = "movie_100"
target_idx = movie_ids.index(target_movie_id)
target_vector = movie_embeddings[target_idx:target_idx+1]

k = 10
distances, indices = index.search(target_vector, k)

# 4. 输出结果
print(f"Top {k} similar movies to {target_movie_id}:")
for i, (idx, dist) in enumerate(zip(indices[0], distances[0])):
    print(f"{i+1}. {movie_ids[idx]} (distance: {dist:.4f})")

# 5. 保存索引
faiss.write_index(index, "movie_index.faiss")
print("\nIndex saved to movie_index.faiss")

4. 使用腾讯开源 Word2Vec 实现内容相似推荐

当文本内容较少时,腾讯开源的 Word2Vec 是一个很好的选择,它提供了高质量的预训练词向量。

4.1 腾讯 Word2Vec 简介

特点

  • 基于大规模中文语料训练
  • 覆盖 800 万中文词汇
  • 词向量维度:200维
  • 支持词相似度计算

下载地址

4.2 准备工作

4.2.1 下载词向量

bash
# 下载词向量文件(约 5GB)
wget https://ai.tencent.com/ailab/nlp/data/zh_wiki_word2vec.txt

4.2.2 加载词向量

python
from gensim.models import KeyedVectors

# 加载词向量(需要一些时间)
print("Loading Tencent word vectors...")
word_vectors = KeyedVectors.load_word2vec_format(
    'zh_wiki_word2vec.txt',
    binary=False  # 文本格式
)
print(f"Loaded {len(word_vectors.key_to_index)} word vectors")

# 测试
print("\nExample: '编程' similar words:")
similar_words = word_vectors.most_similar('编程', topn=10)
for word, score in similar_words:
    print(f"  {word}: {score:.4f}")

4.3 获取文章列表数据

假设我们有一个文章数据集:

python
# 模拟文章数据
articles = [
    {
        "id": "article_001",
        "title": "Python 编程从入门到实践",
        "content": "本书是 Python 入门经典,适合初学者..."
    },
    {
        "id": "article_002",
        "title": "Java 核心技术卷一",
        "content": "深入理解 Java 核心技术..."
    },
    {
        "id": "article_003",
        "title": "JavaScript 高级程序设计",
        "content": "JavaScript 前端开发必备..."
    },
    {
        "id": "article_004",
        "title": "Python 深度学习实战",
        "content": "使用 Python 进行深度学习..."
    }
]

4.4 使用 jieba 实现关键词提取

python
import jieba
import jieba.analyse

# 提取关键词
def extract_keywords(text, topK=10):
    """提取文本关键词"""
    keywords = jieba.analyse.extract_tags(text, topK=topK, withWeight=True)
    return keywords

# 测试
for article in articles[:2]:
    text = article['title'] + ' ' + article['content']
    keywords = extract_keywords(text, topK=5)
    print(f"\nArticle: {article['title']}")
    print("Keywords:", keywords)

4.5 查询腾讯 Word2Vec,平均法得到文档 Embedding

python
import numpy as np

def text_to_embedding(text, word_vectors, use_tfidf=False):
    """将文本转换为向量(平均法)"""

    # 1. 分词
    words = jieba.lcut(text)

    # 2. 提取词向量
    vectors = []
    for word in words:
        if word in word_vectors:
            vectors.append(word_vectors[word])

    # 3. 如果没有词向量,返回零向量
    if not vectors:
        return np.zeros(200)

    # 4. 平均法
    if use_tfidf:
        # 可以使用 TF-IDF 加权平均
        weights = [tfidf_weights.get(word, 1.0) for word in words if word in word_vectors]
        doc_embedding = np.average(vectors, axis=0, weights=weights)
    else:
        # 简单平均
        doc_embedding = np.mean(vectors, axis=0)

    return doc_embedding

# 为所有文章生成 Embedding
print("Generating article embeddings...")
article_embeddings = {}
for article in articles:
    text = article['title'] + ' ' + article['content']
    embedding = text_to_embedding(text, word_vectors)
    article_embeddings[article['id']] = embedding

print(f"Generated {len(article_embeddings)} article embeddings")

4.6 计算相似文章列表

python
from sklearn.metrics.pairwise import cosine_similarity

def find_similar_articles(target_article_id, k=5):
    """查找相似的文章"""

    # 1. 获取目标文章的向量
    target_embedding = article_embeddings[target_article_id]

    # 2. 计算与所有文章的相似度
    similarities = []
    for article_id, embedding in article_embeddings.items():
        if article_id == target_article_id:
            continue

        # 计算余弦相似度
        similarity = cosine_similarity(
            [target_embedding],
            [embedding]
        )[0][0]

        similarities.append((article_id, similarity))

    # 3. 排序并返回 Top K
    similarities.sort(key=lambda x: x[1], reverse=True)
    return similarities[:k]

# 测试
target_article = "article_001"
similar_articles = find_similar_articles(target_article, k=3)

print(f"\nSimilar articles to '{target_article}':")
for i, (article_id, score) in enumerate(similar_articles):
    article = next(a for a in articles if a['id'] == article_id)
    print(f"{i+1}. {article['title']} (similarity: {score:.4f})")

4.7 完整代码示例

python
import jieba
import jieba.analyse
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

# 1. 加载腾讯 Word2Vec
print("Loading Tencent word vectors...")
from gensim.models import KeyedVectors
word_vectors = KeyedVectors.load_word2vec_format('zh_wiki_word2vec.txt', binary=False)

# 2. 文章数据
articles = [
    {"id": "001", "title": "Python 编程入门", "content": "Python 是一门易学的编程语言"},
    {"id": "002", "title": "Java 核心技术", "content": "Java 是企业级应用的首选"},
    {"id": "003", "title": "Python 深度学习", "content": "使用 Python 进行深度学习"}
]

# 3. 文本向量化
def text_to_embedding(text):
    words = jieba.lcut(text)
    vectors = [word_vectors[w] for w in words if w in word_vectors]
    return np.mean(vectors, axis=0) if vectors else np.zeros(200)

# 4. 生成文章 Embedding
article_embeddings = {
    a['id']: text_to_embedding(a['title'] + ' ' + a['content'])
    for a in articles
}

# 5. 查找相似文章
target_id = "001"
target_emb = article_embeddings[target_id]

similarities = [
    (aid, cosine_similarity([target_emb], [emb])[0][0])
    for aid, emb in article_embeddings.items()
    if aid != target_id
]

similarities.sort(key=lambda x: x[1], reverse=True)

print(f"\nSimilar to {target_id}:")
for aid, score in similarities:
    print(f"  {aid}: {score:.4f}")

5. Python 训练 Item2Vec 实现电影相关推荐

Item2Vec 是基于 Word2Vec 的推荐算法,将物品序列看作"句子",物品看作"词",学习物品的向量表示。

5.1 Item2Vec 原理

核心思想

  • 用户的物品浏览/购买序列 = 文本中的句子
  • 物品 = 文本中的词
  • 学习物品向量,相似物品的向量距离较近

优势

  • 捕捉物品之间的共现关系
  • 不需要显式评分数据
  • 适用于隐式反馈场景

5.2 准备数据

假设我们有 MovieLens 数据集:

csv
user_id,movie_id,rating,timestamp
1,1193,5,978300760
1,661,3,978302109
1,914,3,978301968
...
python
from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("Item2Vec") \
    .getOrCreate()

# 读取数据
ratings = spark.read.csv(
    'ratings.csv',
    header=True,
    inferSchema=True
)

# 查看数据
ratings.show(5)

5.3 数据预处理

将用户-物品数据转换为物品序列:

python
from pyspark.sql.functions import collect_list, col, sort_array

# 按用户分组,收集物品列表(按时间排序)
user_sequences = ratings.groupBy('user_id') \
    .agg(
        sort_array(collect_list('movie_id')).alias('movie_sequence')
    )

user_sequences.show(5)
# 输出:
# +--------+------------------+
# |user_id |    movie_sequence |
# +--------+------------------+
# |   1    | [1193, 661, ...] |
# |   2    | [1357, 3068, ...] |
# +--------+------------------+

5.4 使用 PySpark 训练 Item2Vec

python
from pyspark.ml.feature import Word2Vec

# 将物品 ID 转换为字符串
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

to_string_udf = udf(str, StringType())
user_sequences = user_sequences.withColumn(
    'movie_sequence_str',
    to_string_udf('movie_sequence')
)

# 训练 Word2Vec
word2vec = Word2Vec(
    vectorSize=100,      # 向量维度
    minCount=5,         # 最小出现次数
    inputCol='movie_sequence',
    outputCol='movie_vector'
)

# 训练模型
model = word2vec.fit(user_sequences)

# 获取物品向量
item_vectors = model.getVectors()
item_vectors.show(5)

# 输出格式:
# +---------+--------------------+
# |    word |              vector |
# +---------+--------------------+
# |     1193| [0.1, 0.2, 0.3, ...]|
# |      661| [0.2, 0.1, 0.4, ...]|
# +---------+--------------------+

5.5 查找相似电影

python
# 查找相似电影的函数
def find_similar_movies(movie_id, k=10):
    """查找与指定电影相似的电影"""

    # 1. 获取目标电影的向量
    target_row = item_vectors.filter(item_vectors['word'] == str(movie_id)).first()

    if not target_row:
        print(f"Movie {movie_id} not found")
        return []

    target_vector = target_row['vector']

    # 2. 计算与所有电影的相似度
    def compute_similarity(row):
        vector = row['vector']
        similarity = sum([a * b for a, b in zip(target_vector, vector)])
        return float(similarity)

    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType

    similarity_udf = udf(compute_similarity, FloatType())

    item_vectors_with_sim = item_vectors.withColumn(
        'similarity',
        similarity_udf(item_vectors['word'], item_vectors['vector'])
    )

    # 3. 排序并返回 Top K
    similar_movies = item_vectors_with_sim \
        .filter(item_vectors['word'] != str(movie_id)) \
        .orderBy(col('similarity').desc()) \
        .limit(k)

    return similar_movies

# 测试
target_movie = 1193
similar_movies = find_similar_movies(target_movie, k=10)

print(f"\nTop 10 similar movies to {target_movie}:")
similar_movies.show()

5.6 保存和加载模型

python
# 保存模型
model.save("item2vec_model")

# 加载模型
from pyspark.ml.feature import Word2VecModel
loaded_model = Word2VecModel.load("item2vec_model")

5.7 完整代码示例

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list, sort_array, col, udf
from pyspark.sql.types import StringType, FloatType
from pyspark.ml.feature import Word2Vec

# 1. 创建 SparkSession
spark = SparkSession.builder.appName("Item2Vec").getOrCreate()

# 2. 读取数据
ratings = spark.read.csv('ratings.csv', header=True, inferSchema=True)

# 3. 构建物品序列
user_sequences = ratings.groupBy('user_id') \
    .agg(sort_array(collect_list('movie_id')).alias('movie_sequence'))

# 4. 训练 Item2Vec
word2vec = Word2Vec(vectorSize=100, minCount=5, inputCol='movie_sequence', outputCol='movie_vector')
model = word2vec.fit(user_sequences)

# 5. 获取物品向量
item_vectors = model.getVectors()

# 6. 查找相似电影
target_movie = 1193
target_row = item_vectors.filter(item_vectors['word'] == str(target_movie)).first()

target_vector = target_row['vector']

def compute_similarity(vector):
    return float(sum([a * b for a, b in zip(target_vector, vector)]))

similarity_udf = udf(compute_similarity, FloatType())

similar_movies = item_vectors \
    .filter(item_vectors['word'] != str(target_movie)) \
    .withColumn('similarity', similarity_udf('vector')) \
    .orderBy(col('similarity').desc()) \
    .limit(10)

print(f"\nSimilar movies to {target_movie}:")
similar_movies.show()

# 7. 保存模型
model.save("item2vec_model")

spark.stop()

6. PySpark 训练 Word2Vec 实现内容相似推荐

对于有丰富文本内容的物品(如文章、商品描述),可以使用 PySpark 的 Word2Vec 训练文档向量,实现内容相似推荐。

6.1 准备文章数据

假设我们有一个文章数据集:

csv
article_id,title,content
001,Python编程入门,Python是一门易学的编程语言
002,Java核心技术,Java是企业级应用的首选
003,JavaScript高级程序设计,JavaScript前端开发必备
...
python
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("ContentSimilarity") \
    .getOrCreate()

# 读取数据
articles = spark.read.csv(
    'articles.csv',
    header=True,
    inferSchema=True
)

articles.show(5)

6.2 使用 jieba 实现中文分词

python
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import jieba

# 中文分词 UDF
def tokenize_chinese(text):
    if text is None:
        return []
    return list(jieba.lcut(text))

tokenize_udf = udf(tokenize_chinese, ArrayType(StringType()))

# 对标题和内容分词
articles = articles.withColumn('title_words', tokenize_udf('title'))
articles = articles.withColumn('content_words', tokenize_udf('content'))

articles.show(5)
# 输出:
# +-----------+--------------+-------------------+-------------+------------------+
# |article_id|    title     |      content      | title_words |  content_words   |
# +-----------+--------------+-------------------+-------------+------------------+
# |      001  |Python编程入门|Python是一门...   |[Python, 编程]| [Python, 是...]|
# +-----------+--------------+-------------------+-------------+------------------+

6.3 送入 PySpark 实现 Word2Vec 训练

python
from pyspark.ml.feature import Word2Vec

# 合并标题和内容的词
from pyspark.sql.functions import concat, col

articles = articles.withColumn(
    'all_words',
    concat(col('title_words'), col('content_words'))
)

# 训练 Word2Vec
word2vec = Word2Vec(
    vectorSize=100,      # 向量维度
    minCount=3,         # 最小出现次数
    inputCol='all_words',
    outputCol='article_vector'
)

# 训练模型
model = word2vec.fit(articles)

# 获取文章向量
article_vectors = model.transform(articles)

article_vectors.select('article_id', 'title', 'article_vector').show(5)

6.4 对于输入 ID,计算相似的文章列表

python
# 计算相似文章的函数
def find_similar_articles(article_id, k=10):
    """查找与指定文章相似的文章"""

    # 1. 获取目标文章的向量
    target_row = article_vectors.filter(col('article_id') == article_id).first()

    if not target_row:
        print(f"Article {article_id} not found")
        return []

    target_vector = target_row['article_vector']

    # 2. 计算与所有文章的相似度
    def compute_similarity(vector):
        similarity = sum([a * b for a, b in zip(target_vector, vector)])
        return float(similarity)

    similarity_udf = udf(compute_similarity, FloatType())

    articles_with_sim = article_vectors \
        .filter(col('article_id') != article_id) \
        .withColumn('similarity', similarity_udf('article_vector'))

    # 3. 排序并返回 Top K
    similar_articles = articles_with_sim \
        .orderBy(col('similarity').desc()) \
        .limit(k)

    return similar_articles

# 测试
target_article = "001"
similar_articles = find_similar_articles(target_article, k=5)

print(f"\nSimilar articles to {target_article}:")
similar_articles.select('article_id', 'title', 'similarity').show()

6.5 完整代码示例

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, concat
from pyspark.sql.types import ArrayType, StringType, FloatType
from pyspark.ml.feature import Word2Vec
import jieba

# 1. 创建 SparkSession
spark = SparkSession.builder.appName("ContentSimilarity").getOrCreate()

# 2. 读取数据
articles = spark.read.csv('articles.csv', header=True, inferSchema=True)

# 3. 中文分词
def tokenize_chinese(text):
    return list(jieba.lcut(text)) if text else []

tokenize_udf = udf(tokenize_chinese, ArrayType(StringType()))

articles = articles.withColumn('title_words', tokenize_udf('title'))
articles = articles.withColumn('content_words', tokenize_udf('content'))

# 4. 合并词列表
articles = articles.withColumn('all_words', concat(col('title_words'), col('content_words')))

# 5. 训练 Word2Vec
word2vec = Word2Vec(vectorSize=100, minCount=3, inputCol='all_words', outputCol='article_vector')
model = word2vec.fit(articles)

# 6. 获取文章向量
article_vectors = model.transform(articles)

# 7. 查找相似文章
target_article = "001"
target_row = article_vectors.filter(col('article_id') == target_article).first()
target_vector = target_row['article_vector']

def compute_similarity(vector):
    return float(sum([a * b for a, b in zip(target_vector, vector)]))

similarity_udf = udf(compute_similarity, FloatType())

similar_articles = article_vectors \
    .filter(col('article_id') != target_article) \
    .withColumn('similarity', similarity_udf('article_vector')) \
    .orderBy(col('similarity').desc()) \
    .limit(5)

print(f"\nSimilar articles to {target_article}:")
similar_articles.select('article_id', 'title', 'similarity').show()

# 8. 保存模型
model.save("content_word2vec_model")

spark.stop()

7. 总结

本章通过实战案例,详细介绍了推荐系统的开发技术栈和实现方法:

  1. 技术环境:Python/Java、scikit-learn/TensorFlow、MySQL/Redis、Spark
  2. 特征工程:数值特征、文本特征、ID 特征的处理方法
  3. Faiss 向量检索:高效的相似度搜索,适合大规模场景
  4. Word2Vec 内容推荐:腾讯 Word2Vec 实现,适合文本内容少的场景
  5. Item2Vec 电影推荐:基于物品序列的向量学习
  6. PySpark 内容相似:分布式 Word2Vec 训练,适合大规模数据

下一步建议

  • 动手实现一个完整的推荐系统(如电影推荐、商品推荐)
  • 学习深度学习推荐模型(Wide & Deep、DeepFM、DIN)
  • 关注推荐系统的性能优化和工程实践
  • 研究最新的推荐算法和技术趋势

参考资源


💡 实战建议

  • 从小规模数据开始,逐步扩展
  • 关注数据质量,比算法更重要
  • 持续监控推荐效果,不断优化
  • 结合业务场景,灵活调整算法

MIT