五金门店70万商品标签系统架构方案
一、数据库选型分析
1.1 方案对比总览
| 方案 | 推荐度 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|---|
| MySQL + ES 混合 | ⭐⭐⭐⭐⭐ | 成熟稳定、事务支持、ES搜索强 | 需维护两套系统、数据同步 | 首选方案,适合业务+搜索双需求 |
| MongoDB | ⭐⭐⭐ | 文档灵活、JSON存储、扩展性好 | 关联查询弱、事务支持弱 | 标签结构不固定、需快速迭代 |
| Neo4j | ⭐⭐ | 标签关联关系强、可视化 | 数据量大性能差、配套弱 | 标签关系复杂、推荐系统 |
| ClickHouse | ⭐⭐⭐ | OLAP分析强、压缩率高 | 不适合实时写入、事务差 | 标签统计分析、BI报表 |
| PostgreSQL + ES | ⭐⭐⭐⭐ | JSON支持强、全文检索、扩展性好 | - | 中等规模、更适合新项目 |
1.2 推荐方案:MySQL + ES 混合架构
┌─────────────────────────────────────────────────────────────┐
│ 应用层 │
├─────────────────────────────────────────────────────────────┤
│ 商品管理 │ 标签搜索 │ 批量标注 │ 智能推荐 │ 报表分析 │
└────────────┴───────────┴───────────┴───────────┴────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ MySQL │ │ Elasticsearch │ │ Redis缓存 │
│ (主库/事务) │ │ (标签搜索) │ │ (热点数据) │
│ │ │ │ │ │
│ - 商品主数据 │ │ - 标签倒排索引 │ │ - 标签热度 │
│ - 标签维度表 │ │ - 商品标签文档 │ │ - 搜索缓存 │
│ - 关联关系表 │ │ - 复杂查询 │ │ │
└───────────────┘ └───────────────┘ └───────────────┘
1.3 为什么 MySQL + ES 是最佳选择
- 业务连续性:爱优五金收银系统基于MySQL,零迁移风险
- 标签搜索需求强:多维度组合查询、模糊匹配、权重排序
- 成熟度高:业界最成熟方案,配套工具完善
- 数据一致性:MySQL事务保证商品与标签关联的ACID
二、标签数据模型设计
2.1 核心表结构设计(MySQL DDL)
-- 1. 商品主表(扩展现有表)
CREATE TABLE `product` (
`id` BIGINT PRIMARY KEY AUTO_INCREMENT,
`name` VARCHAR(200) NOT NULL COMMENT '商品名称',
`spec` VARCHAR(200) COMMENT '规格型号',
`brand` VARCHAR(100) COMMENT '品牌',
`category_id` INT COMMENT '分类ID',
`purchase_price` DECIMAL(10,2) COMMENT '进货价',
`retail_price` DECIMAL(10,2) COMMENT '零售价',
`stock_qty` INT DEFAULT 0 COMMENT '库存量',
`supplier_id` INT COMMENT '供应商ID',
`created_at` DATETIME DEFAULT CURRENT_TIMESTAMP,
`updated_at` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_category (category_id),
INDEX idx_brand (brand),
INDEX idx_price (retail_price)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 2. 标签维度表(定义有哪些维度)
CREATE TABLE `tag_dimension` (
`id` INT PRIMARY KEY AUTO_INCREMENT,
`code` VARCHAR(50) NOT NULL UNIQUE COMMENT '维度编码',
`name` VARCHAR(50) NOT NULL COMMENT '维度名称',
`description` VARCHAR(200) COMMENT '维度描述',
`is_tree` TINYINT DEFAULT 0 COMMENT '是否树形结构',
`parent_id` INT DEFAULT NULL COMMENT '父维度ID',
`weight` DECIMAL(3,2) DEFAULT 1.00 COMMENT '默认权重',
`sort_order` INT DEFAULT 0,
`status` TINYINT DEFAULT 1,
FOREIGN KEY (`parent_id`) REFERENCES `tag_dimension`(`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 3. 标签值表(每个维度的具体标签)
CREATE TABLE `tag_value` (
`id` INT PRIMARY KEY AUTO_INCREMENT,
`dimension_id` INT NOT NULL COMMENT '所属维度ID',
`code` VARCHAR(50) NOT NULL COMMENT '标签编码',
`name` VARCHAR(100) NOT NULL COMMENT '标签名称',
`parent_id` INT DEFAULT NULL COMMENT '父标签ID(树形)',
`level` INT DEFAULT 1 COMMENT '层级深度',
`weight` DECIMAL(3,2) DEFAULT 1.00 COMMENT '标签权重',
`mutual_exclusive` TINYINT DEFAULT 0 COMMENT '是否互斥(同类只能选一个)',
`icon` VARCHAR(100) COMMENT '图标',
`sort_order` INT DEFAULT 0,
`status` TINYINT DEFAULT 1,
FOREIGN KEY (`dimension_id`) REFERENCES `tag_dimension`(`id`),
UNIQUE KEY `uk_dim_code` (`dimension_id`, `code`),
INDEX idx_parent (parent_id),
INDEX idx_level (level)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 4. 商品标签关联表(核心多对多关系)
CREATE TABLE `product_tag` (
`id` BIGINT PRIMARY KEY AUTO_INCREMENT,
`product_id` BIGINT NOT NULL COMMENT '商品ID',
`dimension_id` INT NOT NULL COMMENT '维度ID',
`tag_value_id` INT NOT NULL COMMENT '标签值ID',
`weight` DECIMAL(3,2) DEFAULT 1.00 COMMENT '该标签对本商品的重要程度',
`source` VARCHAR(20) DEFAULT 'manual' COMMENT '标注来源:manual/rule/ai',
`confidence` DECIMAL(5,4) DEFAULT 1.0000 COMMENT '置信度',
`created_at` DATETIME DEFAULT CURRENT_TIMESTAMP,
`updated_at` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (`product_id`) REFERENCES `product`(`id`) ON DELETE CASCADE,
FOREIGN KEY (`dimension_id`) REFERENCES `tag_dimension`(`id`),
FOREIGN KEY (`tag_value_id`) REFERENCES `tag_value`(`id`),
UNIQUE KEY `uk_product_dim_tag` (`product_id`, `dimension_id`, `tag_value_id`),
INDEX idx_product (product_id),
INDEX idx_tag_value (tag_value_id),
INDEX idx_source (source)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 5. 标注任务表(支持批量标注任务管理)
CREATE TABLE `tag_task` (
`id` BIGINT PRIMARY KEY AUTO_INCREMENT,
`task_name` VARCHAR(200) NOT NULL,
`task_type` VARCHAR(20) NOT NULL COMMENT 'rule/ai/manual',
`dimension_ids` VARCHAR(200) COMMENT '标注的维度,多个逗号分隔',
`status` VARCHAR(20) DEFAULT 'pending' COMMENT 'pending/running/completed/failed',
`total_count` INT DEFAULT 0,
`success_count` INT DEFAULT 0,
`fail_count` INT DEFAULT 0,
`started_at` DATETIME,
`completed_at` DATETIME,
`error_msg` TEXT,
`created_at` DATETIME DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
2.2 MongoDB Schema(可选方案B)
// MongoDB: 商品集合(嵌入标签)
{
"_id": ObjectId,
"name": "六角螺栓 M10*50",
"spec": "M10*50 镀锌",
"brand": "国产优质",
"category_id": 101,
"purchase_price": 0.85,
"retail_price": 1.50,
"stock_qty": 5000,
"supplier_id": 2001,
// 标签嵌入
"tags": [
{
"dimension": "scene",
"dimension_name": "应用场景",
"value": "engineering",
"value_name": "工程场景",
"weight": 0.9,
"source": "rule",
"confidence": 1.0
},
{
"dimension": "frequency",
"dimension_name": "使用频率",
"value": "high",
"value_name": "高频",
"weight": 1.0,
"source": "ai",
"confidence": 0.95
}
],
// 便于按标签筛选的冗余字段
"tag_codes": ["scene_engineering", "frequency_high", "difficulty_easy"],
"updated_at": ISODate
}
2.3 Elasticsearch Mapping
{
"mappings": {
"properties": {
"product_id": { "type": "long" },
"name": {
"type": "text",
"analyzer": "ik_max_word",
"fields": { "keyword": { "type": "keyword" } }
},
"spec": { "type": "text", "analyzer": "ik_max_word" },
"brand": { "type": "keyword" },
"category_id": { "type": "integer" },
"retail_price": { "type": "float" },
"stock_qty": { "type": "integer" },
// 标签字段(扁平化,便于搜索)
"tags": {
"type": "nested",
"properties": {
"dimension_id": { "type": "integer" },
"dimension_code": { "type": "keyword" },
"dimension_name": { "type": "keyword" },
"tag_value_id": { "type": "integer" },
"tag_code": { "type": "keyword" },
"tag_name": { "type": "keyword" },
"weight": { "type": "float" },
"confidence": { "type": "float" }
}
},
// 常用筛选字段(denormalized)
"scene_tags": { "type": "keyword" },
"customer_tags": { "type": "keyword" },
"difficulty_tags": { "type": "keyword" },
"season_tags": { "type": "keyword" }
}
}
}
三、标签体系设计(五金门店场景)
3.1 15个标签维度定义
| 序号 | 维度编码 | 维度名称 | 类型 | 说明 |
|---|---|---|---|---|
| 1 | scene | 应用场景 | 单选互斥 | 工程/家居/工业/农业 |
| 2 | customer_type | 客户类型 | 多选 | 装修队/物业/工厂/个人 |
| 3 | frequency | 使用频率 | 单选互斥 | 高频/中频/低频/偶发 |
| 4 | difficulty | 难度等级 | 单选互斥 | 专业级/普通级/入门级 |
| 5 | price_range | 价格区间 | 单选互斥 | 0-5元/5-20元/20-100元/100元+ |
| 6 | profession | 专业领域 | 多选 | 水电/木工/泥瓦/油漆/焊接 |
| 7 | season | 季节性 | 多选 | 春季/夏季/秋季/冬季/全年 |
| 8 | project_type | 工程类型 | 多选 | 家装/公装/厂房/市政/农业基建 |
| 9 | urgency | 紧急程度 | 单选互斥 | 常规/急需/紧急 |
| 10 | quality_level | 品质档次 | 单选互斥 | 高档/中档/经济 |
| 11 | brand_level | 品牌档次 | 单选互斥 | 知名品牌/普通品牌/无品牌 |
| 12 | install_complexity | 安装复杂度 | 单选互斥 | 简单/中等/复杂/需专业 |
| 13 | maintenance | 维护需求 | 单选互斥 | 无需维护/简单维护/定期维护 |
| 14 | area | 适用面积 | 多选 | 小型(<100㎡)/中型(100-500㎡)/大型(500㎡+) |
| 15 | certification | 认证要求 | 多选 | 3C认证/ISO/消防认证/环保认证 |
3.2 各维度标签值详细定义
维度1: scene(应用场景)- 单选互斥
scene
├── engineering (工程场景)
│ ├── 描述: 各类工程项目使用
│ └── 常见商品: 钢筋、水泥、模板、脚手架
├── household (家居场景)
│ ├── 描述: 家庭日常维修装修
│ └── 常见商品: 螺丝刀、扳手、灯泡、水管
├── industrial (工业场景)
│ ├── 描述: 工厂生产设备维护
│ └── 常见商品: 工业皮带、轴承、阀门、电机
└── agricultural (农业场景)
└── 描述: 农业生产设施使用
维度2: customer_type(客户类型)- 多选
| 标签编码 | 标签名称 | 说明 |
|---|---|---|
| deco_team | 装修队 | 专业装修施工团队 |
| property | 物业公司 | 负责小区维护的物业 |
| factory | 工厂 | 各类生产制造企业 |
| individual | 个人用户 | 家庭个人购买 |
| government | 政府单位 | 机关事业单位采购 |
| real_estate | 房地产 | 开发商及建筑商 |
维度3: frequency(使用频率)- 单选互斥
| 标签编码 | 标签名称 | 权重建议 |
|---|---|---|
| high | 高频 | 1.0 |
| medium | 中频 | 0.8 |
| low | 低频 | 0.5 |
| occasional | 偶发 | 0.3 |
维度4: difficulty(难度等级)- 单选互斥
| 标签编码 | 标签名称 | 适用商品 |
|---|---|---|
| professional | 专业级 | 电焊机、切割机、电动机 |
| normal | 普通级 | 扳手、螺丝刀、水平尺 |
| beginner | 入门级 | 锤子、钉子、胶带 |
维度5: price_range(价格区间)- 单选互斥
| 标签编码 | 标签名称 | 价格区间 | 权重 |
|---|---|---|---|
| price_a | 0-5元 | 0 < x ≤ 5 | 0.5 |
| price_b | 5-20元 | 5 < x ≤ 20 | 0.8 |
| price_c | 20-100元 | 20 < x ≤ 100 | 1.0 |
| price_d | 100元+ | x > 100 | 1.2 |
维度6: profession(专业领域)- 多选
| 标签编码 | 标签名称 | 关联商品 |
|---|---|---|
| electrician | 水电 | 电线、电缆、开关、插座、水管 |
| carpenter | 木工 | 钉子、螺丝、合页、木工锯 |
| mason | 泥瓦 | 水泥、砂石、瓷砖、填缝剂 |
| painter | 油漆 | 油漆、涂料、滚筒、刷子 |
| welder | 焊接 | 电焊条、焊机、氧气瓶 |
| plumber | 管工 | 阀门、管道、接头、生料带 |
维度7: season(季节性)- 多选
| 标签编码 | 标签名称 | 旺季 |
|---|---|---|
| spring | 春季 | 3-5月 |
| summer | 夏季 | 6-8月 |
| autumn | 秋季 | 9-11月 |
| winter | 冬季 | 12-2月 |
| all_year | 全年 | 无季节性 |
维度8: project_type(工程类型)- 多选
| 标签编码 | 标签名称 |
|---|---|
| residential | 家装 |
| commercial | 公装 |
| factory | 厂房建设 |
| municipal | 市政工程 |
| agricultural | 农业基建 |
| renovation | 旧改翻新 |
维度9: urgency(紧急程度)- 单选互斥
| 标签编码 | 标签名称 |
|---|---|
| normal | 常规 |
| urgent | 急需 |
| emergency | 紧急 |
维度10: quality_level(品质档次)- 单选互斥
| 标签编码 | 标签名称 | 价格系数 |
|---|---|---|
| high | 高档 | 1.5x |
| medium | 中档 | 1.0x |
| economy | 经济 | 0.7x |
维度11: brand_level(品牌档次)- 单选互斥
| 标签编码 | 标签名称 | 代表品牌 |
|---|---|---|
| famous | 知名品牌 | 史丹利、博世、3M |
| normal | 普通品牌 | 国产优质 |
| none | 无品牌 | 白牌/散装 |
维度12: install_complexity(安装复杂度)- 单选互斥
| 标签编码 | 标签名称 |
|---|---|
| simple | 简单(自行安装) |
| medium | 中等(需工具) |
| complex | 复杂(需多人) |
| professional | 需专业人员 |
维度13: maintenance(维护需求)- 单选互斥
| 标签编码 | 标签名称 |
|---|---|
| none | 无需维护 |
| simple | 简单维护 |
| regular | 定期维护 |
| specialized | 专业维护 |
维度14: area(适用面积)- 多选
| 标签编码 | 标签名称 |
|---|---|
| small | 小型(<100㎡) |
| medium | 中型(100-500㎡) |
| large | 大型(500㎡+) |
| any | 不限面积 |
维度15: certification(认证要求)- 多选
| 标签编码 | 标签名称 |
|---|---|
| ccc | 3C认证 |
| iso | ISO认证 |
| fire | 消防认证 |
| environmental | 环保认证 |
| none | 无特殊认证 |
3.3 标签互斥与共现关系
互斥关系(同一商品同一维度只能选一个)
互斥维度:
- scene: { max_select: 1, description: "应用场景单一" }
- frequency: { max_select: 1 }
- difficulty: { max_select: 1 }
- price_range: { max_select: 1 }
- urgency: { max_select: 1 }
- quality_level: { max_select: 1 }
- brand_level: { max_select: 1 }
- install_complexity: { max_select: 1 }
- maintenance: { max_select: 1 }
共现关系(标签之间的关联性)
| 标签组合 | 共现说明 | 应用场景 |
|---|---|---|
| 家装 + 个人用户 | 高概率共现 | 面向散客推荐 |
| 工程场景 + 装修队 | 高概率共现 | 工程项目采购 |
| 专业级 + 工厂 | 高概率共现 | 工业客户采购 |
| 入门级 + 个人用户 | 高概率共现 | DIY市场 |
| 紧急 + 高频 | 建议关联 | 常用库存预警 |
| 市政工程 + 知名品牌 | 高概率共现 | 招标要求 |
| 厂房建设 + 工业场景 | 高概率共现 | 工业物资采购 |
四、批量标注策略
4.1 标注流程总览
┌──────────────────────────────────────────────────────────────────┐
│ 批量标注流程 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 数据准备 │ -> │ 规则标注 │ -> │ AI语义标注 │ -> │ 人工复核 │ │
│ │ │ │ (并发) │ │ (批量) │ │ (抽样) │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │ │
│ v v v v │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 商品清洗 │ │ 关键词匹配 │ │ Embedding │ │ 置信度 │ │
│ │ 标准化 │ │ 分类映射 │ │ 相似度 │ │ 过滤 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘
4.2 规则匹配标注
# 规则匹配引擎伪代码
TAG_RULES = {
# 场景标签规则
"scene": {
"engineering": {
"keywords": ["工程", "建筑", "施工", "项目", "工地", "钢筋", "水泥", "模板"],
"categories": [101, 102, 103], # 对应分类ID
"price_range": (50, 100000) # 价格区间
},
"household": {
"keywords": ["家用", "家庭", "维修", "日常", "DIY"],
"categories": [201, 202, 203],
"price_range": (0, 500)
},
"industrial": {
"keywords": ["工业", "工厂", "机械", "设备", "电机", "轴承"],
"categories": [301, 302, 303],
"price_range": (100, 100000)
}
},
# 难度等级规则
"difficulty": {
"professional": {
"keywords": ["电焊", "切割", "大型", "重型", "专业设备"],
"price_min": 500
},
"normal": {
"keywords": ["扳手", "螺丝刀", "锤", "锯"],
"price_range": (5, 500)
},
"beginner": {
"keywords": ["钉子", "胶带", "粘钩", "小型"],
"price_max": 50
}
},
# 价格区间规则
"price_range": {
"price_a": (0, 5),
"price_b": (5, 20),
"price_c": (20, 100),
"price_d": (100, 999999)
},
# 专业领域规则
"profession": {
"electrician": {
"keywords": ["电线", "电缆", "开关", "插座", "配电", "空开", "漏保"]
},
"plumber": {
"keywords": ["水管", "阀门", "龙头", "生料带", "接头", "PVC", "PPR"]
},
"painter": {
"keywords": ["油漆", "涂料", "乳胶漆", "色浆", "滚筒", "刷子", "砂纸"]
}
}
}
def match_rules(product):
"""规则匹配标注"""
matched_tags = []
for dimension, rules in TAG_RULES.items():
for tag_code, rule in rules.items():
score = 0
# 关键词匹配打分
if rule.get("keywords"):
for kw in rule["keywords"]:
if kw in product["name"] or kw in product.get("spec", ""):
score += 1
# 分类匹配
if rule.get("categories"):
if product.get("category_id") in rule["categories"]:
score += 3
# 价格区间匹配
price = product.get("retail_price", 0)
if "price_range" in rule:
pmin, pmax = rule["price_range"]
if pmin <= price < pmax:
score += 2
if "price_min" in rule and price >= rule["price_min"]:
score += 2
if "price_max" in rule and price <= rule["price_max"]:
score += 2
# 阈值判断
if score >= 2:
matched_tags.append({
"dimension": dimension,
"tag_code": tag_code,
"score": score,
"source": "rule"
})
return matched_tags
4.3 AI语义标注
# AI语义标注方案
import openai
from sentence_transformers import SentenceTransformer
import numpy as np
class AISemanticTagger:
def __init__(self):
# 本地Embedding模型(推荐)
self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
# 或者使用OpenAI API
# self.openai_client = openai.OpenAI()
# 预定义标签的向量表示
self.tag_embeddings = self._build_tag_embeddings()
def _build_tag_embeddings(self):
"""构建标签向量库"""
tag_texts = []
tag_info = []
# 为每个标签构建描述文本
for dimension, tags in TAG_DEFINITIONS.items():
for tag in tags:
# 构建描述性文本
desc = f"{dimension['name']}-{tag['name']}: {tag.get('description', '')}"
tag_texts.append(desc)
tag_info.append({
"dimension": dimension["code"],
"tag_code": tag["code"],
"name": tag["name"]
})
# 批量计算向量
embeddings = self.model.encode(tag_texts, batch_size=64)
return {"embeddings": embeddings, "tags": tag_info}
def tag_product(self, product):
"""为单个商品生成标签"""
# 构建商品描述
product_text = f"{product['name']} {product.get('spec', '')} {product.get('brand', '')}"
# 计算商品向量
product_vec = self.model.encode([product_text])[0]
# 计算与所有标签的相似度
similarities = np.dot(
self.tag_embeddings["embeddings"],
product_vec
)
# 选取每个维度最相关的标签
result = {}
for i, sim in enumerate(similarities):
tag = self.tag_embeddings["tags"][i]
dim = tag["dimension"]
if dim not in result or sim > result[dim]["score"]:
result[dim] = {
"tag_code": tag["tag_code"],
"score": float(sim),
"confidence": float(sim)
}
return result
def batch_tag(self, products, batch_size=1000):
"""批量标注"""
results = []
total = len(products)
for i in range(0, total, batch_size):
batch = products[i:i+batch_size]
# 批量构建商品描述
texts = [f"{p['name']} {p.get('spec', '')} {p.get('brand', '')}"
for p in batch]
# 批量计算向量
product_vecs = self.model.encode(texts, batch_size=64)
# 批量计算相似度
for j, product_vec in enumerate(product_vecs):
similarities = np.dot(
self.tag_embeddings["embeddings"],
product_vec
)
# 解析每个维度的top结果
product_result = {"product_id": batch[j]["id"], "tags": []}
dim_scores = {}
for k, sim in enumerate(similarities):
tag = self.tag_embeddings["tags"][k]
dim = tag["dimension"]
if dim not in dim_scores or sim > dim_scores[dim]["score"]:
dim_scores[dim] = {"tag_code": tag["tag_code"], "score": float(sim)}
# 转换为标签列表
for dim, info in dim_scores.items():
if info["score"] > 0.5: # 置信度阈值
product_result["tags"].append({
"dimension": dim,
"tag_code": info["tag_code"],
"source": "ai",
"confidence": info["score"]
})
results.append(product_result)
print(f"进度: {min(i+batch_size, total)}/{total}")
return results
4.4 人工复核流程
-- 抽样复核任务生成
CREATE PROCEDURE review_sampling()
BEGIN
-- 低置信度样本抽检(置信度 < 0.8)
INSERT INTO review_tasks (product_id, dimension_id, tag_value_id,
ai_tag_value_id, ai_confidence, priority)
SELECT
pt.product_id,
pt.dimension_id,
pt.tag_value_id,
pt.tag_value_id as ai_tag_value_id,
pt.confidence,
CASE
WHEN pt.confidence < 0.5 THEN 'high'
WHEN pt.confidence < 0.7 THEN 'medium'
ELSE 'low'
END as priority
FROM product_tag pt
WHERE pt.source = 'ai'
AND pt.confidence < 0.8
AND pt.reviewed = 0
ORDER BY pt.confidence ASC
LIMIT 10000;
-- 随机抽样(总量1%)
INSERT INTO review_tasks (product_id, dimension_id, tag_value_id, priority)
SELECT
pt.product_id,
pt.dimension_id,
pt.tag_value_id,
'random'
FROM product_tag pt
WHERE pt.source IN ('rule', 'ai')
AND pt.reviewed = 0
AND pt.review_task_id IS NULL
ORDER BY RAND()
LIMIT 7000; -- 70万的1%
END;
4.5 增量标注(新商品自动标注)
# 增量标注触发器(伪代码)
def on_new_product(product_id):
"""新商品入库自动标注"""
product = db.get_product(product_id)
# 1. 规则匹配(同步,实时)
rule_tags = match_rules(product)
# 2. AI标注(异步,延迟执行)
queue.enqueue("ai_tag_task", {
"product_id": product_id,
"priority": "normal"
})
# 3. 记录标注结果
save_tags(product_id, rule_tags)
# 4. 同步到ES
sync_to_elasticsearch(product_id)
def scheduled_full_recalculation():
"""定期全量重算(每周一次)"""
# 1. 暂停增量标注
pause_incremental_tagging()
# 2. 重新运行AI标注
run_batch_ai_tagging(
product_ids=get_all_product_ids(),
batch_size=5000,
priority="low"
)
# 3. 重新计算权重
recalculate_weights()
# 4. 全量同步ES
full_sync_to_elasticsearch()
# 5. 恢复增量标注
resume_incremental_tagging()
五、性能与扩展性设计
5.1 70万商品批量标注性能估算
| 阶段 | 规则匹配 | AI语义标注 |
|---|---|---|
| 单条耗时 | ~1ms | ~50ms (含向量计算) |
| 70万总耗时 | ~12分钟 | ~10小时 |
| 推荐批次 | 10000条/批 | 1000条/批 |
| 并发数 | 4-8核 | 1-2核(GPU) |
| 预估时间(4核并发) | 3-5分钟 | 5-8小时 |
5.2 分批处理策略
# 分批处理配置
BATCH_CONFIG = {
"rule_matching": {
"batch_size": 10000,
"concurrent_batches": 8,
"estimated_time": "3-5分钟",
"total_products": 700000
},
"ai_semantic": {
"batch_size": 1000,
"concurrent_batches": 2,
"estimated_time": "5-8小时",
"embedding_model": "paraphrase-multilingual-MiniLM-L12-v2",
"gpu_memory": "4GB"
}
}
def process_batch_rule_matching(product_ids, batch_size=10000):
"""规则匹配批量处理"""
total = len(product_ids)
results = []
for i in range(0, total, batch_size):
batch_ids = product_ids[i:i+batch_size]
batch_products = get_products(batch_ids)
# 并行处理
batch_results = parallel_map(
match_rules,
batch_products,
workers=8
)
results.extend(batch_results)
# 每批处理完写入数据库
batch_insert_product_tags(results)
# 更新任务进度
update_task_progress(min(i+batch_size, total), total)
return results
def process_batch_ai_semantic(product_ids, batch_size=1000):
"""AI语义批量处理(GPU加速)"""
total = len(product_ids)
results = []
for i in range(0, total, batch_size):
batch_ids = product_ids[i:i+batch_size]
batch_products = get_products(batch_ids)
# 调用AI标注
batch_results = ai_tagger.batch_tag(batch_products, batch_size=1000)
results.extend(batch_results)
# 写入数据库(低置信度标记待复核)
batch_insert_product_tags(results, set_review_flag=True)
print(f"AI标注进度: {min(i+batch_size, total)}/{total}")
return results
5.3 数据库索引优化
-- 核心查询索引优化
-- 1. 商品标签关联表索引(最关键)
ALTER TABLE product_tag ADD INDEX idx_product_dim (product_id, dimension_id);
ALTER TABLE product_tag ADD INDEX idx_tag_value_product (tag_value_id, product_id);
ALTER TABLE product_tag ADD INDEX idx_source_confidence (source, confidence);
-- 2. 组合查询优化(维度+标签值)
ALTER TABLE product_tag ADD INDEX idx_dim_tag (dimension_id, tag_value_id);
-- 3. 高频筛选字段
ALTER TABLE product_tag ADD INDEX idx_dimension_weight (dimension_id, weight DESC);
-- 4. ES同步状态追踪
ALTER TABLE product ADD INDEX idx_es_sync (es_synced, updated_at);
-- 5. 标签值表层级索引
ALTER TABLE tag_value ADD INDEX idx_dim_parent (dimension_id, parent_id);
-- 6. 标签维度表层级索引
ALTER TABLE tag_dimension ADD INDEX idx_parent_sort (parent_id, sort_order);
5.4 数据更新机制
# 数据同步策略
class DataSyncManager:
"""MySQL与ES数据同步管理"""
def __init__(self):
self.mysql_client = MySQLClient()
self.es_client = ElasticsearchClient()
self.redis_client = RedisClient()
def sync_single_product(self, product_id):
"""单个商品变更同步"""
# 1. 获取商品完整信息
product = self.mysql_client.get_product_with_tags(product_id)
# 2. 构建ES文档
es_doc = self._build_es_document(product)
# 3. 更新ES
self.es_client.index("products", es_doc, id=product_id)
# 4. 清除缓存
self.redis_client.delete(f"product:{product_id}")
def sync_batch_products(self, product_ids, batch_size=5000):
"""批量同步"""
for i in range(0, len(product_ids), batch_size):
batch_ids = product_ids[i:i+batch_size]
# 批量获取
products = self.mysql_client.get_products_with_tags(batch_ids)
# 批量构建ES文档
actions = []
for product in products:
doc = self._build_es_document(product)
actions.append({
"index": {"_index": "products", "_id": product["id"]}
})
actions.append(doc)
# 批量写入ES
self.es_client.bulk(actions)
# 更新同步状态
self.mysql_client.mark_es_synced(batch_ids)
print(f"ES同步进度: {min(i+batch_size, len(product_ids))}/{len(product_ids)}")
def full_sync(self):
"""全量同步(建议凌晨执行)"""
# 1. 创建全量同步任务
task_id = self.mysql_client.create_sync_task("full")
# 2. 切换ES别名(零停机)
self.es_client.switch_alias("products", "products_v2", "products_v1")
# 3. 全量重建索引
self.rebuild_index()
# 4. 切换完成
self.es_client.switch_alias("products", "products_v1", "products_v2")
# 5. 删除旧索引
self.es_client.delete_index("products_v2")
def incremental_sync(self, interval_seconds=60):
"""增量同步(定时任务)"""
while True:
# 获取最近更新的商品
recent_products = self.mysql_client.get_recent_updated(
since_minutes=interval_seconds/60
)
if recent_products:
self.sync_batch_products(recent_products)
time.sleep(interval_seconds)
5.5 完整技术架构图
┌─────────────────────────────────────────────────────────────────────────┐
│ 整体架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 爱优五金 │ │ 标签管理 │ │ 数据分析 │ │
│ │ 收银系统 │ │ Web │ │ Dashboard │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ API 网关层 │ │
│ │ /api/products/tags /api/tags/search /api/tags/stats │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 标注服务 │ │ 搜索服务 │ │ 统计服务 │ │
│ │ │ │ │ │ │ │
│ │ - 规则引擎 │ │ - ES查询 │ │ - OLAP聚合 │ │
│ │ - AI标注 │ │ - 缓存 │ │ - 报表生成 │ │
│ │ - 任务调度 │ │ - 高亮 │ │ │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ MySQL │ │ Elasticsearch│ │ ClickHouse │ │
│ │ (主库) │ │ (搜索) │ │ (分析) │ │
│ │ │ │ │ │ │ │
│ │ - 商品主数据 │ │ - 标签倒排 │ │ - 标签统计 │ │
│ │ - 标签数据 │ │ - 商品文档 │ │ - 趋势分析 │ │
│ │ - 关联关系 │ │ - 复杂查询 │ │ - BI报表 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Redis │ │
│ │ (缓存) │ │
│ │ │ │
│ │ - 热点商品 │ │
│ │ - 搜索缓存 │ │
│ │ - 标签热度 │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
六、标签搜索使用示例
6.1 MySQL查询示例
-- 查询同时满足多个条件的商品
SELECT p.id, p.name, p.retail_price, GROUP_CONCAT(tv.name) as tags
FROM product p
JOIN product_tag pt ON p.id = pt.product_id
JOIN tag_value tv ON pt.tag_value_id = tv.id
WHERE pt.dimension_id = 1 AND pt.tag_value_id IN (1, 2) -- 工程场景
AND pt.dimension_id = 4 AND pt.tag_value_id = 10 -- 专业级
AND p.retail_price BETWEEN 50 AND 500
GROUP BY p.id
HAVING COUNT(DISTINCT pt.dimension_id) >= 2;
-- 查找推荐商品(相似标签权重高)
SELECT p.*, SUM(pt.weight * pt.confidence) as relevance_score
FROM product p
JOIN product_tag pt ON p.id = pt.product_id
WHERE pt.tag_value_id IN (
SELECT tag_value_id FROM product_tag WHERE product_id = ? -- 参照商品
)
AND p.id != ? -- 排除自身
GROUP BY p.id
ORDER BY relevance_score DESC
LIMIT 20;
6.2 Elasticsearch查询示例
// 多维度组合搜索
POST /products/_search
{
"query": {
"bool": {
"must": [
{ "term": { "scene_tags": "engineering" }},
{ "term": { "difficulty_tags": "professional" }}
],
"should": [
{ "term": { "customer_tags": "factory" }},
{ "term": { "customer_tags": "property" }}
],
"filter": [
{ "range": { "retail_price": { "gte": 50, "lte": 500 }}},
{ "term": { "stock_qty": { "gt": 0 }}}
]
}
},
"aggs": {
"by_scene": { "terms": { "field": "scene_tags" }},
"by_difficulty": { "terms": { "field": "difficulty_tags" }}
}
}
// 智能推荐(基于标签权重)
POST /products/_search
{
"query": {
"function_score": {
"query": { "match_all": {} },
"functions": [
{
"filter": { "term": { "scene_tags": "engineering" }},
"weight": 1.5
},
{
"filter": { "range": { "retail_price": { "lte": 100 }}},
"weight": 1.2
}
],
"score_mode": "sum"
}
}
}
七、实施建议与里程碑
7.1 实施阶段划分
| 阶段 | 时间 | 内容 | 交付物 |
|---|---|---|---|
| Phase 1 | 第1-2周 | 数据库表设计、基础框架搭建 | MySQL表结构、ES索引 |
| Phase 2 | 第3-4周 | 规则引擎开发、基础标注 | 规则引擎、10+维度标注 |
| Phase 3 | 第5-6周 | AI标注集成、批量处理 | AI标注服务、70万商品标注 |
| Phase 4 | 第7-8周 | 人工复核、搜索优化 | 复核系统、搜索API |
| Phase 5 | 第9-10周 | 性能优化、全量上线 | 优化报告、正式环境 |
7.2 硬件配置建议
| 组件 | 开发测试 | 生产环境(70万商品) |
|---|---|---|
| MySQL | 4核8G | 8核32G, SSD 500G |
| Elasticsearch | 2核4G | 4核16G, SSD 1T |
| Redis | 1核2G | 2核4G |
| AI标注服务器 | - | GPU 4GB+ (如RTX 3060) |
文档版本: v1.0
创建时间: 2026-05-03
适用场景: 五金建材门店70万商品标签系统