推荐系统实战开发
在掌握了推荐系统的架构和算法后,本章将带你进入实战开发阶段。我们将从技术环境搭建开始,逐步实现一个完整的推荐系统,包括特征工程、向量检索、模型训练等核心环节。
1. 推荐系统开发技术环境
推荐系统开发涉及多个技术栈,需要根据项目规模和团队技术选型灵活选择。
1.1 在线服务层
技术选型:
Python:适合快速原型开发
- Web 框架:Flask、FastAPI
- 通信协议:gRPC、HTTP
- 容器化:Docker、Kubernetes
Java:适合大规模生产环境
- Web 框架:Spring Boot
- 通信协议:gRPC、REST API
- 微服务:Spring Cloud
示例:Flask 推荐服务
from flask import Flask, jsonify, request
import redis
import numpy as np
app = Flask(__name__)
# 连接 Redis
r = redis.Redis(host='localhost', port=6379, db=0)
@app.route('/api/v1/recommend', methods=['GET'])
def recommend():
user_id = request.args.get('user_id')
limit = int(request.args.get('limit', 20))
# 获取推荐结果
recommendations = get_recommendations(user_id, limit)
return jsonify({
'code': 0,
'data': recommendations
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)1.2 机器学习层
技术选型:
传统机器学习:
- scikit-learn:传统算法(随机森林、GBDT)
- Spark MLlib:分布式机器学习
深度学习:
- TensorFlow:工业级深度学习框架
- PyTorch:研究和快速原型开发
示例:使用 scikit-learn 训练推荐模型
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
import pandas as pd
# 1. 加载数据
data = pd.read_csv('user_item_features.csv')
# 2. 特征工程
features = ['user_age', 'user_gender', 'item_category', 'item_price']
X = data[features]
y = data['rating']
# 3. 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# 4. 训练模型
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 5. 评估
score = model.score(X_test, y_test)
print(f"R² Score: {score}")
# 6. 预测
predictions = model.predict(X_test)1.3 数据存储层
技术选型:
关系型数据库:
- MySQL:存储物品元数据、用户信息
- PostgreSQL:复杂查询、JSON 数据
NoSQL 数据库:
- Redis:内存数据库,用于缓存和实时推荐
- Cassandra:分布式 NoSQL,存储用户-物品数据
- HBase:大数据场景,存储海量用户行为
数据仓库:
- Hive:离线数据仓库
- Spark SQL:实时数据分析
示例:Redis 存储推荐结果
import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0)
# 存储用户推荐列表
def save_user_recommendations(user_id, recommendations):
key = f"recommend:{user_id}"
value = json.dumps(recommendations)
r.setex(key, 3600, value) # 1小时过期
# 获取用户推荐列表
def get_user_recommendations(user_id):
key = f"recommend:{user_id}"
value = r.get(key)
if value:
return json.loads(value)
return None1.4 数据处理层
技术选型:
数据处理:
- NumPy / Pandas:小规模数据处理
- Spark:大规模分布式数据处理
数据仓库:
- Hive:离线数据查询和分析
- Presto:实时数据分析
示例:使用 Spark 处理用户行为数据
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, desc
# 创建 SparkSession
spark = SparkSession.builder \
.appName("UserBehaviorAnalysis") \
.getOrCreate()
# 读取数据
user_behavior = spark.read.csv('user_behavior.csv', header=True)
# 统计每个用户的互动次数
user_stats = user_behavior.groupBy('user_id') \
.agg(
count('*').alias('interaction_count')
) \
.orderBy(desc('interaction_count'))
# 显示结果
user_stats.show(10)
# 保存结果
user_stats.write.mode('overwrite').parquet('user_stats.parquet')
# 停止 Spark
spark.stop()1.5 编程语言选择
Python:
- 优势:生态丰富、开发效率高、适合算法研究
- 劣势:性能相对较低
- 适用场景:算法开发、快速原型、中小规模系统
Java:
- 优势:性能高、生态成熟、适合大规模系统
- 劣势:开发效率较低、学习曲线陡
- 适用场景:大规模生产系统、高并发服务
C++:
- 优势:性能最高、适合计算密集型任务
- 劣势:开发复杂度高、生态相对较少
- 适用场景:实时计算、高性能推理引擎
1.6 推荐技术栈组合
小型团队 / 快速原型:
前端:React / Vue
后端:Python + Flask
数据库:MySQL + Redis
机器学习:scikit-learn + TensorFlow
数据处理:Pandas中型团队 / 工业级系统:
前端:React / Vue
后端:Java + Spring Boot + Python Flask
数据库:MySQL + Redis + Cassandra
机器学习:TensorFlow + PyTorch
数据处理:Spark
部署:Docker + Kubernetes大型团队 / 超大规模系统:
前端:React / Vue
后端:Java + Go
数据库:MySQL + Redis + Cassandra + HBase
机器学习:TensorFlow Serving + ONNX Runtime
数据处理:Spark + Flink
部署:Kubernetes + Istio2. 数据源与特征工程
特征工程是推荐系统的核心环节,直接决定推荐效果。本节将详细介绍数据源、特征类型和特征处理方法。
2.1 数据源
2.1.1 业务数据库
用途:离线、在线处理,用于机器学习预估模型训练
MySQL:
- 存储物品数据(商品信息、内容元数据)
- 存储用户列表(用户基本信息)
- 存储推荐结果(预计算的推荐列表)
示例:MySQL 表设计
-- 物品表
CREATE TABLE items (
item_id VARCHAR(50) PRIMARY KEY,
title VARCHAR(200),
description TEXT,
category VARCHAR(50),
price DECIMAL(10, 2),
tags JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 用户表
CREATE TABLE users (
user_id VARCHAR(50) PRIMARY KEY,
age INT,
gender VARCHAR(10),
city VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 用户行为表
CREATE TABLE user_behaviors (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
user_id VARCHAR(50),
item_id VARCHAR(50),
behavior_type VARCHAR(20), -- click, view, purchase, favorite
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_id (user_id),
INDEX idx_item_id (item_id)
);2.1.2 前端埋点日志
用途:在线实时请求,用于在线机器学习预估模型训练
Kafka 流:
- 实时收集用户行为日志
- 流式处理,支持实时推荐
HDFS 表:
- 存储历史日志数据
- 用于离线分析和模型训练
日志格式:
{
"user_id": "user_123456",
"item_id": "item_789012",
"behavior_type": "click",
"timestamp": 1700000000,
"context": {
"device": "mobile",
"location": "beijing",
"page": "homepage"
}
}示例:读取 Kafka 日志
from kafka import KafkaConsumer
# 创建消费者
consumer = KafkaConsumer(
'user_behavior',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest'
)
# 消费消息
for message in consumer:
log = json.loads(message.value)
print(f"User {log['user_id']} clicked item {log['item_id']}")2.1.3 处理数据
用途:统计报表输出,用于查看效果报表、AB Test效果
输出格式:
- CSV / TXT:本地测试
- API:实时查询
- Hive 表:离线分析
示例:生成推荐效果报表
import pandas as pd
from datetime import datetime, timedelta
# 读取推荐日志
recommend_log = pd.read_csv('recommend_log.csv')
# 计算今日推荐效果
today = datetime.now().date()
today_log = recommend_log[pd.to_datetime(recommend_log['timestamp']).dt.date == today]
# 计算指标
impression_count = len(today_log)
click_count = len(today_log[today_log['behavior_type'] == 'click'])
ctr = click_count / impression_count
print(f"Today's CTR: {ctr:.2%}")2.2 特征工程
2.2.1 数值类特征
特征类型:
- 年龄、价格、评分、时长等连续值
- 需要进行归一化或分箱处理
归一化处理(Normalization):
将特征缩放到 [0, 1] 区间
from sklearn.preprocessing import MinMaxScaler
import numpy as np
# 示例数据
ages = np.array([[25], [30], [35], [40], [45]])
# 创建 MinMaxScaler
scaler = MinMaxScaler()
# 归一化
normalized_ages = scaler.fit_transform(ages)
print(f"Original ages: {ages.flatten()}")
print(f"Normalized ages: {normalized_ages.flatten()}")
# 输出:Original ages: [25 30 35 40 45]
# Normalized ages: [0. 0.25 0.5 0.75 1. ]标准化处理(Standardization):
将特征缩放到均值为 0、标准差为 1 的分布
from sklearn.preprocessing import StandardScaler
# 创建 StandardScaler
scaler = StandardScaler()
# 标准化
standardized_ages = scaler.fit_transform(ages)
print(f"Standardized ages: {standardized_ages.flatten()}")分箱处理(Binning):
将连续值转换为离散的区间
import pandas as pd
# 示例数据
df = pd.DataFrame({
'age': [18, 22, 28, 35, 45, 55, 65]
})
# 分箱
bins = [0, 18, 30, 50, 100]
labels = ['<18', '18-30', '30-50', '50+']
df['age_bin'] = pd.cut(df['age'], bins=bins, labels=labels)
print(df)
# 输出:
# age age_bin
# 0 18 <18
# 1 22 18-30
# 2 28 18-30
# 3 35 30-50
# 4 45 30-50
# 5 55 50+
# 6 65 50+2.2.2 文本类特征
处理步骤:
- 中文分词:使用 jieba 等工具分词
- 关键词提取:提取 TF-IDF 或 TextRank 关键词
- 编码:One-Hot / Multi-Hot 编码或 Word Embedding
One-Hot 编码:
from sklearn.preprocessing import MultiLabelBinarizer
# 示例:用户兴趣标签
user_interests = [
['Python', '编程', 'AI'],
['Java', '编程'],
['JavaScript', '前端'],
['Python', '深度学习']
]
# Multi-Hot 编码
mlb = MultiLabelBinarizer()
encoded = mlb.fit_transform(user_interests)
print("Classes:", mlb.classes_)
print("Encoded:")
print(encoded)
# 输出:
# Classes: ['AI' 'Java' 'JavaScript' 'Python' '编程' '深度学习' '前端']
# Encoded:
# [[1 0 0 1 1 0 0]
# [0 1 0 0 1 0 0]
# [0 0 1 0 0 0 1]
# [0 0 0 1 0 1 0]]TF-IDF 编码:
from sklearn.feature_extraction.text import TfidfVectorizer
# 示例:商品描述
descriptions = [
"Python编程从入门到实践",
"Java核心技术卷一",
"JavaScript高级程序设计"
]
# TF-IDF 编码
vectorizer = TfidfVectorizer(max_features=10)
tfidf_matrix = vectorizer.fit_transform(descriptions)
print("Feature names:", vectorizer.get_feature_names_out())
print("TF-IDF matrix:")
print(tfidf_matrix.toarray())Word Embedding:
from gensim.models import Word2Vec
import numpy as np
# 训练 Word2Vec
sentences = [
['Python', '编程', '入门', '实践'],
['Java', '编程', '核心', '技术'],
['JavaScript', '前端', '开发']
]
model = Word2Vec(sentences, vector_size=100, window=5, min_count=1)
# 文本向量化
def text_to_vector(text):
words = jieba.lcut(text)
vectors = [model.wv[word] for word in words if word in model.wv]
if vectors:
return np.mean(vectors, axis=0)
else:
return np.zeros(100)
# 示例
text = "Python编程入门"
vector = text_to_vector(text)
print(f"Vector shape: {vector.shape}")2.2.3 ID 类特征
处理方法:Embedding 向量化
示例:用户 ID Embedding
import tensorflow as tf
from tensorflow.keras import layers
# 用户 ID Embedding 层
user_id_input = layers.Input(shape=(1,), name='user_id')
user_embedding = layers.Embedding(
input_dim=1000000, # 100万用户
output_dim=64, # 64维向量
embeddings_initializer='he_normal'
)(user_id_input)
# 展平
user_vector = layers.Flatten()(user_embedding)
# 查看 Embedding 形状
print(f"User embedding shape: {user_vector.shape}")学习到的 Embedding 可以用于:
- 相似用户推荐
- 相似物品推荐
- 特征补充
2.3 特征工程最佳实践
✅ 应该做的:
- 特征选择:选择与目标任务相关的特征
- 特征变换:归一化、分箱、编码等
- 特征组合:创造更有意义的组合特征
- 特征监控:监控特征分布变化
- 特征版本管理:跟踪特征的变化历史
❌ 不应该做的:
- 数据泄露:使用未来信息训练模型
- 过度工程:创建过多无用特征
- 忽略缺失值:不处理缺失值直接训练
- 不区分训练/测试集:测试集信息混入训练集
3. Python 使用 Faiss 实现向量近邻搜索
Faiss(Facebook AI Similarity Search)是 Facebook 开源的高效向量相似度搜索库,适用于大规模向量检索场景。
3.1 Faiss 简介
优势:
- ⚡ 性能极高(10亿向量,毫秒级检索)
- 📦 支持 GPU 加速
- 🎯 支持多种索引结构
- 🔧 易于使用
应用场景:
- 大规模物品相似度检索
- 用户向量聚类
- 向量数据库
3.2 安装 Faiss
# CPU 版本
pip install faiss-cpu
# GPU 版本(需要 CUDA)
pip install faiss-gpu3.3 完整实现流程
3.3.1 读取训练好的 Embedding 数据
假设我们有一个电影数据集,每部电影都有一个 100 维的向量表示:
import numpy as np
# 模拟数据:1000部电影,每部100维
num_movies = 1000
embedding_dim = 100
# 生成随机向量(实际应用中从文件或数据库读取)
movie_embeddings = np.random.rand(num_movies, embedding_dim).astype('float32')
# 电影 ID 列表
movie_ids = [f"movie_{i}" for i in range(num_movies)]
print(f"Movie embeddings shape: {movie_embeddings.shape}")
print(f"First 5 movie IDs: {movie_ids[:5]}")3.3.2 构建 Faiss 索引
import faiss
# 创建索引
# IndexFlatL2 是精确的 L2 距离索引
index = faiss.IndexFlatL2(embedding_dim)
# 将嵌入向量添加到索引
index.add(movie_embeddings)
print(f"Index contains {index.ntotal} vectors")其他索引类型:
# 1. IndexIVFFlat(倒排索引,适合大规模)
nlist = 100 # 聚类中心数
quantizer = faiss.IndexFlatL2(embedding_dim)
index_ivf = faiss.IndexIVFFlat(quantizer, embedding_dim, nlist)
# 需要先训练(聚类)
index_ivf.train(movie_embeddings)
index_ivf.add(movie_embeddings)
# 2. IndexIVFPQ(量化压缩,节省内存)
m = 16 # 每个向量的子向量数量
nbits = 8 # 每个子向量的比特数
index_ivfpq = faiss.IndexIVFPQ(quantizer, embedding_dim, nlist, m, nbits)
index_ivfpq.train(movie_embeddings)
index_ivfpq.add(movie_embeddings)
# 3. GPU 加速(需要 GPU 版本)
# res = faiss.StandardGpuResources()
# index_gpu = faiss.index_cpu_to_gpu(res, 0, index)3.3.3 目标向量搜索
假设我们有一个目标电影,想找最相似的 10 部电影:
# 目标向量(例如:movie_100 的向量)
target_movie_id = "movie_100"
target_idx = movie_ids.index(target_movie_id)
target_vector = movie_embeddings[target_idx:target_idx+1]
# 搜索最相似的 K 个向量
k = 10 # 返回 Top10
distances, indices = index.search(target_vector, k)
print(f"Top {k} similar movies to {target_movie_id}:")
for i, (idx, dist) in enumerate(zip(indices[0], distances[0])):
similar_movie_id = movie_ids[idx]
print(f"{i+1}. {similar_movie_id} (distance: {dist:.4f})")输出示例:
Top 10 similar movies to movie_100:
1. movie_456 (distance: 0.1234)
2. movie_789 (distance: 0.2345)
3. movie_321 (distance: 0.3456)
...3.3.4 根据 ID 获取电影标题
假设我们有一个电影信息字典:
# 模拟电影信息
movie_info = {
f"movie_{i}": {
"title": f"Movie {i}",
"category": ["Action", "Drama", "Comedy"][i % 3],
"year": 2000 + i
}
for i in range(num_movies)
}
# 完整的推荐函数
def recommend_similar_movies(target_movie_id, k=10):
"""推荐相似的电影"""
# 1. 获取目标向量
target_idx = movie_ids.index(target_movie_id)
target_vector = movie_embeddings[target_idx:target_idx+1]
# 2. 搜索相似向量
distances, indices = index.search(target_vector, k)
# 3. 构建推荐结果
recommendations = []
for idx, dist in zip(indices[0], distances[0]):
similar_movie_id = movie_ids[idx]
movie_data = movie_info[similar_movie_id]
recommendations.append({
"movie_id": similar_movie_id,
"title": movie_data["title"],
"category": movie_data["category"],
"year": movie_data["year"],
"similarity": 1 - dist # 距离转换为相似度
})
return recommendations
# 使用
target_movie = "movie_100"
recommendations = recommend_similar_movies(target_movie, k=10)
print(f"\nRecommendations for {target_movie}:")
for i, rec in enumerate(recommendations):
print(f"{i+1}. {rec['title']} ({rec['category']}, {rec['year']}) - Similarity: {rec['similarity']:.4f}")3.4 保存和加载索引
# 保存索引
faiss.write_index(index, "movie_index.faiss")
# 加载索引
loaded_index = faiss.read_index("movie_index.faiss")
print(f"Loaded index contains {loaded_index.ntotal} vectors")3.5 性能优化
批量查询:
# 批量查询多个目标向量
target_vectors = movie_embeddings[:10] # 查询前10部电影的相似电影
k = 5
distances, indices = loaded_index.search(target_vectors, k)使用倒排索引加速:
# IndexIVFFlat 需要先训练
nlist = 100 # 聚类中心数
quantizer = faiss.IndexFlatL2(embedding_dim)
index_ivf = faiss.IndexIVFFlat(quantizer, embedding_dim, nlist)
# 训练(聚类)
index_ivf.train(movie_embeddings)
# 添加向量
index_ivf.add(movie_embeddings)
# 搜索
index_ivf.nprobe = 10 # 搜索的聚类中心数(越大越准确,但越慢)
distances, indices = index_ivf.search(target_vector, k)3.6 完整代码示例
import faiss
import numpy as np
# 1. 准备数据
num_movies = 1000
embedding_dim = 100
movie_embeddings = np.random.rand(num_movies, embedding_dim).astype('float32')
movie_ids = [f"movie_{i}" for i in range(num_movies)]
# 2. 构建索引
index = faiss.IndexFlatL2(embedding_dim)
index.add(movie_embeddings)
# 3. 搜索
target_movie_id = "movie_100"
target_idx = movie_ids.index(target_movie_id)
target_vector = movie_embeddings[target_idx:target_idx+1]
k = 10
distances, indices = index.search(target_vector, k)
# 4. 输出结果
print(f"Top {k} similar movies to {target_movie_id}:")
for i, (idx, dist) in enumerate(zip(indices[0], distances[0])):
print(f"{i+1}. {movie_ids[idx]} (distance: {dist:.4f})")
# 5. 保存索引
faiss.write_index(index, "movie_index.faiss")
print("\nIndex saved to movie_index.faiss")4. 使用腾讯开源 Word2Vec 实现内容相似推荐
当文本内容较少时,腾讯开源的 Word2Vec 是一个很好的选择,它提供了高质量的预训练词向量。
4.1 腾讯 Word2Vec 简介
特点:
- 基于大规模中文语料训练
- 覆盖 800 万中文词汇
- 词向量维度:200维
- 支持词相似度计算
下载地址:
4.2 准备工作
4.2.1 下载词向量
# 下载词向量文件(约 5GB)
wget https://ai.tencent.com/ailab/nlp/data/zh_wiki_word2vec.txt4.2.2 加载词向量
from gensim.models import KeyedVectors
# 加载词向量(需要一些时间)
print("Loading Tencent word vectors...")
word_vectors = KeyedVectors.load_word2vec_format(
'zh_wiki_word2vec.txt',
binary=False # 文本格式
)
print(f"Loaded {len(word_vectors.key_to_index)} word vectors")
# 测试
print("\nExample: '编程' similar words:")
similar_words = word_vectors.most_similar('编程', topn=10)
for word, score in similar_words:
print(f" {word}: {score:.4f}")4.3 获取文章列表数据
假设我们有一个文章数据集:
# 模拟文章数据
articles = [
{
"id": "article_001",
"title": "Python 编程从入门到实践",
"content": "本书是 Python 入门经典,适合初学者..."
},
{
"id": "article_002",
"title": "Java 核心技术卷一",
"content": "深入理解 Java 核心技术..."
},
{
"id": "article_003",
"title": "JavaScript 高级程序设计",
"content": "JavaScript 前端开发必备..."
},
{
"id": "article_004",
"title": "Python 深度学习实战",
"content": "使用 Python 进行深度学习..."
}
]4.4 使用 jieba 实现关键词提取
import jieba
import jieba.analyse
# 提取关键词
def extract_keywords(text, topK=10):
"""提取文本关键词"""
keywords = jieba.analyse.extract_tags(text, topK=topK, withWeight=True)
return keywords
# 测试
for article in articles[:2]:
text = article['title'] + ' ' + article['content']
keywords = extract_keywords(text, topK=5)
print(f"\nArticle: {article['title']}")
print("Keywords:", keywords)4.5 查询腾讯 Word2Vec,平均法得到文档 Embedding
import numpy as np
def text_to_embedding(text, word_vectors, use_tfidf=False):
"""将文本转换为向量(平均法)"""
# 1. 分词
words = jieba.lcut(text)
# 2. 提取词向量
vectors = []
for word in words:
if word in word_vectors:
vectors.append(word_vectors[word])
# 3. 如果没有词向量,返回零向量
if not vectors:
return np.zeros(200)
# 4. 平均法
if use_tfidf:
# 可以使用 TF-IDF 加权平均
weights = [tfidf_weights.get(word, 1.0) for word in words if word in word_vectors]
doc_embedding = np.average(vectors, axis=0, weights=weights)
else:
# 简单平均
doc_embedding = np.mean(vectors, axis=0)
return doc_embedding
# 为所有文章生成 Embedding
print("Generating article embeddings...")
article_embeddings = {}
for article in articles:
text = article['title'] + ' ' + article['content']
embedding = text_to_embedding(text, word_vectors)
article_embeddings[article['id']] = embedding
print(f"Generated {len(article_embeddings)} article embeddings")4.6 计算相似文章列表
from sklearn.metrics.pairwise import cosine_similarity
def find_similar_articles(target_article_id, k=5):
"""查找相似的文章"""
# 1. 获取目标文章的向量
target_embedding = article_embeddings[target_article_id]
# 2. 计算与所有文章的相似度
similarities = []
for article_id, embedding in article_embeddings.items():
if article_id == target_article_id:
continue
# 计算余弦相似度
similarity = cosine_similarity(
[target_embedding],
[embedding]
)[0][0]
similarities.append((article_id, similarity))
# 3. 排序并返回 Top K
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:k]
# 测试
target_article = "article_001"
similar_articles = find_similar_articles(target_article, k=3)
print(f"\nSimilar articles to '{target_article}':")
for i, (article_id, score) in enumerate(similar_articles):
article = next(a for a in articles if a['id'] == article_id)
print(f"{i+1}. {article['title']} (similarity: {score:.4f})")4.7 完整代码示例
import jieba
import jieba.analyse
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
# 1. 加载腾讯 Word2Vec
print("Loading Tencent word vectors...")
from gensim.models import KeyedVectors
word_vectors = KeyedVectors.load_word2vec_format('zh_wiki_word2vec.txt', binary=False)
# 2. 文章数据
articles = [
{"id": "001", "title": "Python 编程入门", "content": "Python 是一门易学的编程语言"},
{"id": "002", "title": "Java 核心技术", "content": "Java 是企业级应用的首选"},
{"id": "003", "title": "Python 深度学习", "content": "使用 Python 进行深度学习"}
]
# 3. 文本向量化
def text_to_embedding(text):
words = jieba.lcut(text)
vectors = [word_vectors[w] for w in words if w in word_vectors]
return np.mean(vectors, axis=0) if vectors else np.zeros(200)
# 4. 生成文章 Embedding
article_embeddings = {
a['id']: text_to_embedding(a['title'] + ' ' + a['content'])
for a in articles
}
# 5. 查找相似文章
target_id = "001"
target_emb = article_embeddings[target_id]
similarities = [
(aid, cosine_similarity([target_emb], [emb])[0][0])
for aid, emb in article_embeddings.items()
if aid != target_id
]
similarities.sort(key=lambda x: x[1], reverse=True)
print(f"\nSimilar to {target_id}:")
for aid, score in similarities:
print(f" {aid}: {score:.4f}")5. Python 训练 Item2Vec 实现电影相关推荐
Item2Vec 是基于 Word2Vec 的推荐算法,将物品序列看作"句子",物品看作"词",学习物品的向量表示。
5.1 Item2Vec 原理
核心思想:
- 用户的物品浏览/购买序列 = 文本中的句子
- 物品 = 文本中的词
- 学习物品向量,相似物品的向量距离较近
优势:
- 捕捉物品之间的共现关系
- 不需要显式评分数据
- 适用于隐式反馈场景
5.2 准备数据
假设我们有 MovieLens 数据集:
user_id,movie_id,rating,timestamp
1,1193,5,978300760
1,661,3,978302109
1,914,3,978301968
...from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec
# 创建 SparkSession
spark = SparkSession.builder \
.appName("Item2Vec") \
.getOrCreate()
# 读取数据
ratings = spark.read.csv(
'ratings.csv',
header=True,
inferSchema=True
)
# 查看数据
ratings.show(5)5.3 数据预处理
将用户-物品数据转换为物品序列:
from pyspark.sql.functions import collect_list, col, sort_array
# 按用户分组,收集物品列表(按时间排序)
user_sequences = ratings.groupBy('user_id') \
.agg(
sort_array(collect_list('movie_id')).alias('movie_sequence')
)
user_sequences.show(5)
# 输出:
# +--------+------------------+
# |user_id | movie_sequence |
# +--------+------------------+
# | 1 | [1193, 661, ...] |
# | 2 | [1357, 3068, ...] |
# +--------+------------------+5.4 使用 PySpark 训练 Item2Vec
from pyspark.ml.feature import Word2Vec
# 将物品 ID 转换为字符串
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
to_string_udf = udf(str, StringType())
user_sequences = user_sequences.withColumn(
'movie_sequence_str',
to_string_udf('movie_sequence')
)
# 训练 Word2Vec
word2vec = Word2Vec(
vectorSize=100, # 向量维度
minCount=5, # 最小出现次数
inputCol='movie_sequence',
outputCol='movie_vector'
)
# 训练模型
model = word2vec.fit(user_sequences)
# 获取物品向量
item_vectors = model.getVectors()
item_vectors.show(5)
# 输出格式:
# +---------+--------------------+
# | word | vector |
# +---------+--------------------+
# | 1193| [0.1, 0.2, 0.3, ...]|
# | 661| [0.2, 0.1, 0.4, ...]|
# +---------+--------------------+5.5 查找相似电影
# 查找相似电影的函数
def find_similar_movies(movie_id, k=10):
"""查找与指定电影相似的电影"""
# 1. 获取目标电影的向量
target_row = item_vectors.filter(item_vectors['word'] == str(movie_id)).first()
if not target_row:
print(f"Movie {movie_id} not found")
return []
target_vector = target_row['vector']
# 2. 计算与所有电影的相似度
def compute_similarity(row):
vector = row['vector']
similarity = sum([a * b for a, b in zip(target_vector, vector)])
return float(similarity)
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
similarity_udf = udf(compute_similarity, FloatType())
item_vectors_with_sim = item_vectors.withColumn(
'similarity',
similarity_udf(item_vectors['word'], item_vectors['vector'])
)
# 3. 排序并返回 Top K
similar_movies = item_vectors_with_sim \
.filter(item_vectors['word'] != str(movie_id)) \
.orderBy(col('similarity').desc()) \
.limit(k)
return similar_movies
# 测试
target_movie = 1193
similar_movies = find_similar_movies(target_movie, k=10)
print(f"\nTop 10 similar movies to {target_movie}:")
similar_movies.show()5.6 保存和加载模型
# 保存模型
model.save("item2vec_model")
# 加载模型
from pyspark.ml.feature import Word2VecModel
loaded_model = Word2VecModel.load("item2vec_model")5.7 完整代码示例
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list, sort_array, col, udf
from pyspark.sql.types import StringType, FloatType
from pyspark.ml.feature import Word2Vec
# 1. 创建 SparkSession
spark = SparkSession.builder.appName("Item2Vec").getOrCreate()
# 2. 读取数据
ratings = spark.read.csv('ratings.csv', header=True, inferSchema=True)
# 3. 构建物品序列
user_sequences = ratings.groupBy('user_id') \
.agg(sort_array(collect_list('movie_id')).alias('movie_sequence'))
# 4. 训练 Item2Vec
word2vec = Word2Vec(vectorSize=100, minCount=5, inputCol='movie_sequence', outputCol='movie_vector')
model = word2vec.fit(user_sequences)
# 5. 获取物品向量
item_vectors = model.getVectors()
# 6. 查找相似电影
target_movie = 1193
target_row = item_vectors.filter(item_vectors['word'] == str(target_movie)).first()
target_vector = target_row['vector']
def compute_similarity(vector):
return float(sum([a * b for a, b in zip(target_vector, vector)]))
similarity_udf = udf(compute_similarity, FloatType())
similar_movies = item_vectors \
.filter(item_vectors['word'] != str(target_movie)) \
.withColumn('similarity', similarity_udf('vector')) \
.orderBy(col('similarity').desc()) \
.limit(10)
print(f"\nSimilar movies to {target_movie}:")
similar_movies.show()
# 7. 保存模型
model.save("item2vec_model")
spark.stop()6. PySpark 训练 Word2Vec 实现内容相似推荐
对于有丰富文本内容的物品(如文章、商品描述),可以使用 PySpark 的 Word2Vec 训练文档向量,实现内容相似推荐。
6.1 准备文章数据
假设我们有一个文章数据集:
article_id,title,content
001,Python编程入门,Python是一门易学的编程语言
002,Java核心技术,Java是企业级应用的首选
003,JavaScript高级程序设计,JavaScript前端开发必备
...from pyspark.sql import SparkSession
# 创建 SparkSession
spark = SparkSession.builder \
.appName("ContentSimilarity") \
.getOrCreate()
# 读取数据
articles = spark.read.csv(
'articles.csv',
header=True,
inferSchema=True
)
articles.show(5)6.2 使用 jieba 实现中文分词
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import jieba
# 中文分词 UDF
def tokenize_chinese(text):
if text is None:
return []
return list(jieba.lcut(text))
tokenize_udf = udf(tokenize_chinese, ArrayType(StringType()))
# 对标题和内容分词
articles = articles.withColumn('title_words', tokenize_udf('title'))
articles = articles.withColumn('content_words', tokenize_udf('content'))
articles.show(5)
# 输出:
# +-----------+--------------+-------------------+-------------+------------------+
# |article_id| title | content | title_words | content_words |
# +-----------+--------------+-------------------+-------------+------------------+
# | 001 |Python编程入门|Python是一门... |[Python, 编程]| [Python, 是...]|
# +-----------+--------------+-------------------+-------------+------------------+6.3 送入 PySpark 实现 Word2Vec 训练
from pyspark.ml.feature import Word2Vec
# 合并标题和内容的词
from pyspark.sql.functions import concat, col
articles = articles.withColumn(
'all_words',
concat(col('title_words'), col('content_words'))
)
# 训练 Word2Vec
word2vec = Word2Vec(
vectorSize=100, # 向量维度
minCount=3, # 最小出现次数
inputCol='all_words',
outputCol='article_vector'
)
# 训练模型
model = word2vec.fit(articles)
# 获取文章向量
article_vectors = model.transform(articles)
article_vectors.select('article_id', 'title', 'article_vector').show(5)6.4 对于输入 ID,计算相似的文章列表
# 计算相似文章的函数
def find_similar_articles(article_id, k=10):
"""查找与指定文章相似的文章"""
# 1. 获取目标文章的向量
target_row = article_vectors.filter(col('article_id') == article_id).first()
if not target_row:
print(f"Article {article_id} not found")
return []
target_vector = target_row['article_vector']
# 2. 计算与所有文章的相似度
def compute_similarity(vector):
similarity = sum([a * b for a, b in zip(target_vector, vector)])
return float(similarity)
similarity_udf = udf(compute_similarity, FloatType())
articles_with_sim = article_vectors \
.filter(col('article_id') != article_id) \
.withColumn('similarity', similarity_udf('article_vector'))
# 3. 排序并返回 Top K
similar_articles = articles_with_sim \
.orderBy(col('similarity').desc()) \
.limit(k)
return similar_articles
# 测试
target_article = "001"
similar_articles = find_similar_articles(target_article, k=5)
print(f"\nSimilar articles to {target_article}:")
similar_articles.select('article_id', 'title', 'similarity').show()6.5 完整代码示例
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, concat
from pyspark.sql.types import ArrayType, StringType, FloatType
from pyspark.ml.feature import Word2Vec
import jieba
# 1. 创建 SparkSession
spark = SparkSession.builder.appName("ContentSimilarity").getOrCreate()
# 2. 读取数据
articles = spark.read.csv('articles.csv', header=True, inferSchema=True)
# 3. 中文分词
def tokenize_chinese(text):
return list(jieba.lcut(text)) if text else []
tokenize_udf = udf(tokenize_chinese, ArrayType(StringType()))
articles = articles.withColumn('title_words', tokenize_udf('title'))
articles = articles.withColumn('content_words', tokenize_udf('content'))
# 4. 合并词列表
articles = articles.withColumn('all_words', concat(col('title_words'), col('content_words')))
# 5. 训练 Word2Vec
word2vec = Word2Vec(vectorSize=100, minCount=3, inputCol='all_words', outputCol='article_vector')
model = word2vec.fit(articles)
# 6. 获取文章向量
article_vectors = model.transform(articles)
# 7. 查找相似文章
target_article = "001"
target_row = article_vectors.filter(col('article_id') == target_article).first()
target_vector = target_row['article_vector']
def compute_similarity(vector):
return float(sum([a * b for a, b in zip(target_vector, vector)]))
similarity_udf = udf(compute_similarity, FloatType())
similar_articles = article_vectors \
.filter(col('article_id') != target_article) \
.withColumn('similarity', similarity_udf('article_vector')) \
.orderBy(col('similarity').desc()) \
.limit(5)
print(f"\nSimilar articles to {target_article}:")
similar_articles.select('article_id', 'title', 'similarity').show()
# 8. 保存模型
model.save("content_word2vec_model")
spark.stop()7. 总结
本章通过实战案例,详细介绍了推荐系统的开发技术栈和实现方法:
- 技术环境:Python/Java、scikit-learn/TensorFlow、MySQL/Redis、Spark
- 特征工程:数值特征、文本特征、ID 特征的处理方法
- Faiss 向量检索:高效的相似度搜索,适合大规模场景
- Word2Vec 内容推荐:腾讯 Word2Vec 实现,适合文本内容少的场景
- Item2Vec 电影推荐:基于物品序列的向量学习
- PySpark 内容相似:分布式 Word2Vec 训练,适合大规模数据
下一步建议:
- 动手实现一个完整的推荐系统(如电影推荐、商品推荐)
- 学习深度学习推荐模型(Wide & Deep、DeepFM、DIN)
- 关注推荐系统的性能优化和工程实践
- 研究最新的推荐算法和技术趋势
参考资源:
💡 实战建议:
- 从小规模数据开始,逐步扩展
- 关注数据质量,比算法更重要
- 持续监控推荐效果,不断优化
- 结合业务场景,灵活调整算法