x

五金门店70万商品标签系统架构方案

一、数据库选型分析

1.1 方案对比总览

方案 推荐度 优势 劣势 适用场景
MySQL + ES 混合 ⭐⭐⭐⭐⭐ 成熟稳定、事务支持、ES搜索强 需维护两套系统、数据同步 首选方案,适合业务+搜索双需求
MongoDB ⭐⭐⭐ 文档灵活、JSON存储、扩展性好 关联查询弱、事务支持弱 标签结构不固定、需快速迭代
Neo4j ⭐⭐ 标签关联关系强、可视化 数据量大性能差、配套弱 标签关系复杂、推荐系统
ClickHouse ⭐⭐⭐ OLAP分析强、压缩率高 不适合实时写入、事务差 标签统计分析、BI报表
PostgreSQL + ES ⭐⭐⭐⭐ JSON支持强、全文检索、扩展性好 - 中等规模、更适合新项目

1.2 推荐方案:MySQL + ES 混合架构

┌─────────────────────────────────────────────────────────────┐
│                        应用层                                 │
├─────────────────────────────────────────────────────────────┤
│  商品管理  │  标签搜索  │  批量标注  │  智能推荐  │  报表分析  │
└────────────┴───────────┴───────────┴───────────┴────────────┘
        │               │               │               │
        ▼               ▼               ▼               ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│     MySQL      │ │  Elasticsearch │ │  Redis缓存    │
│  (主库/事务)   │ │  (标签搜索)    │ │  (热点数据)   │
│               │ │               │ │               │
│ - 商品主数据   │ │ - 标签倒排索引 │ │ - 标签热度    │
│ - 标签维度表   │ │ - 商品标签文档 │ │ - 搜索缓存    │
│ - 关联关系表   │ │ - 复杂查询     │ │               │
└───────────────┘ └───────────────┘ └───────────────┘

1.3 为什么 MySQL + ES 是最佳选择

  1. 业务连续性:爱优五金收银系统基于MySQL,零迁移风险
  2. 标签搜索需求强:多维度组合查询、模糊匹配、权重排序
  3. 成熟度高:业界最成熟方案,配套工具完善
  4. 数据一致性:MySQL事务保证商品与标签关联的ACID

二、标签数据模型设计

2.1 核心表结构设计(MySQL DDL)

-- 1. 商品主表(扩展现有表)
CREATE TABLE `product` (
    `id` BIGINT PRIMARY KEY AUTO_INCREMENT,
    `name` VARCHAR(200) NOT NULL COMMENT '商品名称',
    `spec` VARCHAR(200) COMMENT '规格型号',
    `brand` VARCHAR(100) COMMENT '品牌',
    `category_id` INT COMMENT '分类ID',
    `purchase_price` DECIMAL(10,2) COMMENT '进货价',
    `retail_price` DECIMAL(10,2) COMMENT '零售价',
    `stock_qty` INT DEFAULT 0 COMMENT '库存量',
    `supplier_id` INT COMMENT '供应商ID',
    `created_at` DATETIME DEFAULT CURRENT_TIMESTAMP,
    `updated_at` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_category (category_id),
    INDEX idx_brand (brand),
    INDEX idx_price (retail_price)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 2. 标签维度表(定义有哪些维度)
CREATE TABLE `tag_dimension` (
    `id` INT PRIMARY KEY AUTO_INCREMENT,
    `code` VARCHAR(50) NOT NULL UNIQUE COMMENT '维度编码',
    `name` VARCHAR(50) NOT NULL COMMENT '维度名称',
    `description` VARCHAR(200) COMMENT '维度描述',
    `is_tree` TINYINT DEFAULT 0 COMMENT '是否树形结构',
    `parent_id` INT DEFAULT NULL COMMENT '父维度ID',
    `weight` DECIMAL(3,2) DEFAULT 1.00 COMMENT '默认权重',
    `sort_order` INT DEFAULT 0,
    `status` TINYINT DEFAULT 1,
    FOREIGN KEY (`parent_id`) REFERENCES `tag_dimension`(`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 3. 标签值表(每个维度的具体标签)
CREATE TABLE `tag_value` (
    `id` INT PRIMARY KEY AUTO_INCREMENT,
    `dimension_id` INT NOT NULL COMMENT '所属维度ID',
    `code` VARCHAR(50) NOT NULL COMMENT '标签编码',
    `name` VARCHAR(100) NOT NULL COMMENT '标签名称',
    `parent_id` INT DEFAULT NULL COMMENT '父标签ID(树形)',
    `level` INT DEFAULT 1 COMMENT '层级深度',
    `weight` DECIMAL(3,2) DEFAULT 1.00 COMMENT '标签权重',
    `mutual_exclusive` TINYINT DEFAULT 0 COMMENT '是否互斥(同类只能选一个)',
    `icon` VARCHAR(100) COMMENT '图标',
    `sort_order` INT DEFAULT 0,
    `status` TINYINT DEFAULT 1,
    FOREIGN KEY (`dimension_id`) REFERENCES `tag_dimension`(`id`),
    UNIQUE KEY `uk_dim_code` (`dimension_id`, `code`),
    INDEX idx_parent (parent_id),
    INDEX idx_level (level)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 4. 商品标签关联表(核心多对多关系)
CREATE TABLE `product_tag` (
    `id` BIGINT PRIMARY KEY AUTO_INCREMENT,
    `product_id` BIGINT NOT NULL COMMENT '商品ID',
    `dimension_id` INT NOT NULL COMMENT '维度ID',
    `tag_value_id` INT NOT NULL COMMENT '标签值ID',
    `weight` DECIMAL(3,2) DEFAULT 1.00 COMMENT '该标签对本商品的重要程度',
    `source` VARCHAR(20) DEFAULT 'manual' COMMENT '标注来源:manual/rule/ai',
    `confidence` DECIMAL(5,4) DEFAULT 1.0000 COMMENT '置信度',
    `created_at` DATETIME DEFAULT CURRENT_TIMESTAMP,
    `updated_at` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    FOREIGN KEY (`product_id`) REFERENCES `product`(`id`) ON DELETE CASCADE,
    FOREIGN KEY (`dimension_id`) REFERENCES `tag_dimension`(`id`),
    FOREIGN KEY (`tag_value_id`) REFERENCES `tag_value`(`id`),
    UNIQUE KEY `uk_product_dim_tag` (`product_id`, `dimension_id`, `tag_value_id`),
    INDEX idx_product (product_id),
    INDEX idx_tag_value (tag_value_id),
    INDEX idx_source (source)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 5. 标注任务表(支持批量标注任务管理)
CREATE TABLE `tag_task` (
    `id` BIGINT PRIMARY KEY AUTO_INCREMENT,
    `task_name` VARCHAR(200) NOT NULL,
    `task_type` VARCHAR(20) NOT NULL COMMENT 'rule/ai/manual',
    `dimension_ids` VARCHAR(200) COMMENT '标注的维度,多个逗号分隔',
    `status` VARCHAR(20) DEFAULT 'pending' COMMENT 'pending/running/completed/failed',
    `total_count` INT DEFAULT 0,
    `success_count` INT DEFAULT 0,
    `fail_count` INT DEFAULT 0,
    `started_at` DATETIME,
    `completed_at` DATETIME,
    `error_msg` TEXT,
    `created_at` DATETIME DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

2.2 MongoDB Schema(可选方案B)

// MongoDB: 商品集合(嵌入标签)
{
  "_id": ObjectId,
  "name": "六角螺栓 M10*50",
  "spec": "M10*50 镀锌",
  "brand": "国产优质",
  "category_id": 101,
  "purchase_price": 0.85,
  "retail_price": 1.50,
  "stock_qty": 5000,
  "supplier_id": 2001,

  // 标签嵌入
  "tags": [
    {
      "dimension": "scene",
      "dimension_name": "应用场景",
      "value": "engineering",
      "value_name": "工程场景",
      "weight": 0.9,
      "source": "rule",
      "confidence": 1.0
    },
    {
      "dimension": "frequency",
      "dimension_name": "使用频率",
      "value": "high",
      "value_name": "高频",
      "weight": 1.0,
      "source": "ai",
      "confidence": 0.95
    }
  ],

  // 便于按标签筛选的冗余字段
  "tag_codes": ["scene_engineering", "frequency_high", "difficulty_easy"],
  "updated_at": ISODate
}

2.3 Elasticsearch Mapping

{
  "mappings": {
    "properties": {
      "product_id": { "type": "long" },
      "name": { 
        "type": "text",
        "analyzer": "ik_max_word",
        "fields": { "keyword": { "type": "keyword" } }
      },
      "spec": { "type": "text", "analyzer": "ik_max_word" },
      "brand": { "type": "keyword" },
      "category_id": { "type": "integer" },
      "retail_price": { "type": "float" },
      "stock_qty": { "type": "integer" },

      // 标签字段(扁平化,便于搜索)
      "tags": {
        "type": "nested",
        "properties": {
          "dimension_id": { "type": "integer" },
          "dimension_code": { "type": "keyword" },
          "dimension_name": { "type": "keyword" },
          "tag_value_id": { "type": "integer" },
          "tag_code": { "type": "keyword" },
          "tag_name": { "type": "keyword" },
          "weight": { "type": "float" },
          "confidence": { "type": "float" }
        }
      },

      // 常用筛选字段(denormalized)
      "scene_tags": { "type": "keyword" },
      "customer_tags": { "type": "keyword" },
      "difficulty_tags": { "type": "keyword" },
      "season_tags": { "type": "keyword" }
    }
  }
}

三、标签体系设计(五金门店场景)

3.1 15个标签维度定义

序号 维度编码 维度名称 类型 说明
1 scene 应用场景 单选互斥 工程/家居/工业/农业
2 customer_type 客户类型 多选 装修队/物业/工厂/个人
3 frequency 使用频率 单选互斥 高频/中频/低频/偶发
4 difficulty 难度等级 单选互斥 专业级/普通级/入门级
5 price_range 价格区间 单选互斥 0-5元/5-20元/20-100元/100元+
6 profession 专业领域 多选 水电/木工/泥瓦/油漆/焊接
7 season 季节性 多选 春季/夏季/秋季/冬季/全年
8 project_type 工程类型 多选 家装/公装/厂房/市政/农业基建
9 urgency 紧急程度 单选互斥 常规/急需/紧急
10 quality_level 品质档次 单选互斥 高档/中档/经济
11 brand_level 品牌档次 单选互斥 知名品牌/普通品牌/无品牌
12 install_complexity 安装复杂度 单选互斥 简单/中等/复杂/需专业
13 maintenance 维护需求 单选互斥 无需维护/简单维护/定期维护
14 area 适用面积 多选 小型(<100㎡)/中型(100-500㎡)/大型(500㎡+)
15 certification 认证要求 多选 3C认证/ISO/消防认证/环保认证

3.2 各维度标签值详细定义

维度1: scene(应用场景)- 单选互斥

scene
├── engineering  (工程场景)
│   ├── 描述: 各类工程项目使用
│   └── 常见商品: 钢筋、水泥、模板、脚手架
├── household    (家居场景)
│   ├── 描述: 家庭日常维修装修
│   └── 常见商品: 螺丝刀、扳手、灯泡、水管
├── industrial   (工业场景)
│   ├── 描述: 工厂生产设备维护
│   └── 常见商品: 工业皮带、轴承、阀门、电机
└── agricultural (农业场景)
    └── 描述: 农业生产设施使用

维度2: customer_type(客户类型)- 多选

标签编码 标签名称 说明
deco_team 装修队 专业装修施工团队
property 物业公司 负责小区维护的物业
factory 工厂 各类生产制造企业
individual 个人用户 家庭个人购买
government 政府单位 机关事业单位采购
real_estate 房地产 开发商及建筑商

维度3: frequency(使用频率)- 单选互斥

标签编码 标签名称 权重建议
high 高频 1.0
medium 中频 0.8
low 低频 0.5
occasional 偶发 0.3

维度4: difficulty(难度等级)- 单选互斥

标签编码 标签名称 适用商品
professional 专业级 电焊机、切割机、电动机
normal 普通级 扳手、螺丝刀、水平尺
beginner 入门级 锤子、钉子、胶带

维度5: price_range(价格区间)- 单选互斥

标签编码 标签名称 价格区间 权重
price_a 0-5元 0 < x ≤ 5 0.5
price_b 5-20元 5 < x ≤ 20 0.8
price_c 20-100元 20 < x ≤ 100 1.0
price_d 100元+ x > 100 1.2

维度6: profession(专业领域)- 多选

标签编码 标签名称 关联商品
electrician 水电 电线、电缆、开关、插座、水管
carpenter 木工 钉子、螺丝、合页、木工锯
mason 泥瓦 水泥、砂石、瓷砖、填缝剂
painter 油漆 油漆、涂料、滚筒、刷子
welder 焊接 电焊条、焊机、氧气瓶
plumber 管工 阀门、管道、接头、生料带

维度7: season(季节性)- 多选

标签编码 标签名称 旺季
spring 春季 3-5月
summer 夏季 6-8月
autumn 秋季 9-11月
winter 冬季 12-2月
all_year 全年 无季节性

维度8: project_type(工程类型)- 多选

标签编码 标签名称
residential 家装
commercial 公装
factory 厂房建设
municipal 市政工程
agricultural 农业基建
renovation 旧改翻新

维度9: urgency(紧急程度)- 单选互斥

标签编码 标签名称
normal 常规
urgent 急需
emergency 紧急

维度10: quality_level(品质档次)- 单选互斥

标签编码 标签名称 价格系数
high 高档 1.5x
medium 中档 1.0x
economy 经济 0.7x

维度11: brand_level(品牌档次)- 单选互斥

标签编码 标签名称 代表品牌
famous 知名品牌 史丹利、博世、3M
normal 普通品牌 国产优质
none 无品牌 白牌/散装

维度12: install_complexity(安装复杂度)- 单选互斥

标签编码 标签名称
simple 简单(自行安装)
medium 中等(需工具)
complex 复杂(需多人)
professional 需专业人员

维度13: maintenance(维护需求)- 单选互斥

标签编码 标签名称
none 无需维护
simple 简单维护
regular 定期维护
specialized 专业维护

维度14: area(适用面积)- 多选

标签编码 标签名称
small 小型(<100㎡)
medium 中型(100-500㎡)
large 大型(500㎡+)
any 不限面积

维度15: certification(认证要求)- 多选

标签编码 标签名称
ccc 3C认证
iso ISO认证
fire 消防认证
environmental 环保认证
none 无特殊认证

3.3 标签互斥与共现关系

互斥关系(同一商品同一维度只能选一个)

互斥维度:
  - scene: { max_select: 1, description: "应用场景单一" }
  - frequency: { max_select: 1 }
  - difficulty: { max_select: 1 }
  - price_range: { max_select: 1 }
  - urgency: { max_select: 1 }
  - quality_level: { max_select: 1 }
  - brand_level: { max_select: 1 }
  - install_complexity: { max_select: 1 }
  - maintenance: { max_select: 1 }

共现关系(标签之间的关联性)

标签组合 共现说明 应用场景
家装 + 个人用户 高概率共现 面向散客推荐
工程场景 + 装修队 高概率共现 工程项目采购
专业级 + 工厂 高概率共现 工业客户采购
入门级 + 个人用户 高概率共现 DIY市场
紧急 + 高频 建议关联 常用库存预警
市政工程 + 知名品牌 高概率共现 招标要求
厂房建设 + 工业场景 高概率共现 工业物资采购

四、批量标注策略

4.1 标注流程总览

┌──────────────────────────────────────────────────────────────────┐
                         批量标注流程                              
├──────────────────────────────────────────────────────────────────┤
                                                                  
  ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐   
   数据准备   ->  规则标注   ->  AI语义标注  ->  人工复核     
                 (并发)         (批量)         (抽样)       
  └──────────┘    └──────────┘    └──────────┘    └──────────┘   
                                                             
       v              v                v                v        
  ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐   
   商品清洗       关键词匹配      Embedding      置信度      
   标准化        分类映射        相似度        过滤        
  └──────────┘    └──────────┘    └──────────┘    └──────────┘   
                                                                  
└──────────────────────────────────────────────────────────────────┘

4.2 规则匹配标注

# 规则匹配引擎伪代码

TAG_RULES = {
    # 场景标签规则
    "scene": {
        "engineering": {
            "keywords": ["工程", "建筑", "施工", "项目", "工地", "钢筋", "水泥", "模板"],
            "categories": [101, 102, 103],  # 对应分类ID
            "price_range": (50, 100000)  # 价格区间
        },
        "household": {
            "keywords": ["家用", "家庭", "维修", "日常", "DIY"],
            "categories": [201, 202, 203],
            "price_range": (0, 500)
        },
        "industrial": {
            "keywords": ["工业", "工厂", "机械", "设备", "电机", "轴承"],
            "categories": [301, 302, 303],
            "price_range": (100, 100000)
        }
    },

    # 难度等级规则
    "difficulty": {
        "professional": {
            "keywords": ["电焊", "切割", "大型", "重型", "专业设备"],
            "price_min": 500
        },
        "normal": {
            "keywords": ["扳手", "螺丝刀", "锤", "锯"],
            "price_range": (5, 500)
        },
        "beginner": {
            "keywords": ["钉子", "胶带", "粘钩", "小型"],
            "price_max": 50
        }
    },

    # 价格区间规则
    "price_range": {
        "price_a": (0, 5),
        "price_b": (5, 20),
        "price_c": (20, 100),
        "price_d": (100, 999999)
    },

    # 专业领域规则
    "profession": {
        "electrician": {
            "keywords": ["电线", "电缆", "开关", "插座", "配电", "空开", "漏保"]
        },
        "plumber": {
            "keywords": ["水管", "阀门", "龙头", "生料带", "接头", "PVC", "PPR"]
        },
        "painter": {
            "keywords": ["油漆", "涂料", "乳胶漆", "色浆", "滚筒", "刷子", "砂纸"]
        }
    }
}

def match_rules(product):
    """规则匹配标注"""
    matched_tags = []

    for dimension, rules in TAG_RULES.items():
        for tag_code, rule in rules.items():
            score = 0

            # 关键词匹配打分
            if rule.get("keywords"):
                for kw in rule["keywords"]:
                    if kw in product["name"] or kw in product.get("spec", ""):
                        score += 1

            # 分类匹配
            if rule.get("categories"):
                if product.get("category_id") in rule["categories"]:
                    score += 3

            # 价格区间匹配
            price = product.get("retail_price", 0)
            if "price_range" in rule:
                pmin, pmax = rule["price_range"]
                if pmin <= price < pmax:
                    score += 2
            if "price_min" in rule and price >= rule["price_min"]:
                score += 2
            if "price_max" in rule and price <= rule["price_max"]:
                score += 2

            # 阈值判断
            if score >= 2:
                matched_tags.append({
                    "dimension": dimension,
                    "tag_code": tag_code,
                    "score": score,
                    "source": "rule"
                })

    return matched_tags

4.3 AI语义标注

# AI语义标注方案

import openai
from sentence_transformers import SentenceTransformer
import numpy as np

class AISemanticTagger:
    def __init__(self):
        # 本地Embedding模型(推荐)
        self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')

        # 或者使用OpenAI API
        # self.openai_client = openai.OpenAI()

        # 预定义标签的向量表示
        self.tag_embeddings = self._build_tag_embeddings()

    def _build_tag_embeddings(self):
        """构建标签向量库"""
        tag_texts = []
        tag_info = []

        # 为每个标签构建描述文本
        for dimension, tags in TAG_DEFINITIONS.items():
            for tag in tags:
                # 构建描述性文本
                desc = f"{dimension['name']}-{tag['name']}: {tag.get('description', '')}"
                tag_texts.append(desc)
                tag_info.append({
                    "dimension": dimension["code"],
                    "tag_code": tag["code"],
                    "name": tag["name"]
                })

        # 批量计算向量
        embeddings = self.model.encode(tag_texts, batch_size=64)
        return {"embeddings": embeddings, "tags": tag_info}

    def tag_product(self, product):
        """为单个商品生成标签"""
        # 构建商品描述
        product_text = f"{product['name']} {product.get('spec', '')} {product.get('brand', '')}"

        # 计算商品向量
        product_vec = self.model.encode([product_text])[0]

        # 计算与所有标签的相似度
        similarities = np.dot(
            self.tag_embeddings["embeddings"], 
            product_vec
        )

        # 选取每个维度最相关的标签
        result = {}
        for i, sim in enumerate(similarities):
            tag = self.tag_embeddings["tags"][i]
            dim = tag["dimension"]

            if dim not in result or sim > result[dim]["score"]:
                result[dim] = {
                    "tag_code": tag["tag_code"],
                    "score": float(sim),
                    "confidence": float(sim)
                }

        return result

    def batch_tag(self, products, batch_size=1000):
        """批量标注"""
        results = []
        total = len(products)

        for i in range(0, total, batch_size):
            batch = products[i:i+batch_size]

            # 批量构建商品描述
            texts = [f"{p['name']} {p.get('spec', '')} {p.get('brand', '')}" 
                     for p in batch]

            # 批量计算向量
            product_vecs = self.model.encode(texts, batch_size=64)

            # 批量计算相似度
            for j, product_vec in enumerate(product_vecs):
                similarities = np.dot(
                    self.tag_embeddings["embeddings"], 
                    product_vec
                )

                # 解析每个维度的top结果
                product_result = {"product_id": batch[j]["id"], "tags": []}
                dim_scores = {}

                for k, sim in enumerate(similarities):
                    tag = self.tag_embeddings["tags"][k]
                    dim = tag["dimension"]

                    if dim not in dim_scores or sim > dim_scores[dim]["score"]:
                        dim_scores[dim] = {"tag_code": tag["tag_code"], "score": float(sim)}

                # 转换为标签列表
                for dim, info in dim_scores.items():
                    if info["score"] > 0.5:  # 置信度阈值
                        product_result["tags"].append({
                            "dimension": dim,
                            "tag_code": info["tag_code"],
                            "source": "ai",
                            "confidence": info["score"]
                        })

                results.append(product_result)

            print(f"进度: {min(i+batch_size, total)}/{total}")

        return results

4.4 人工复核流程

-- 抽样复核任务生成
CREATE PROCEDURE review_sampling()
BEGIN
    -- 低置信度样本抽检(置信度 < 0.8)
    INSERT INTO review_tasks (product_id, dimension_id, tag_value_id, 
                              ai_tag_value_id, ai_confidence, priority)
    SELECT 
        pt.product_id,
        pt.dimension_id,
        pt.tag_value_id,
        pt.tag_value_id as ai_tag_value_id,
        pt.confidence,
        CASE 
            WHEN pt.confidence < 0.5 THEN 'high'
            WHEN pt.confidence < 0.7 THEN 'medium'
            ELSE 'low'
        END as priority
    FROM product_tag pt
    WHERE pt.source = 'ai'
      AND pt.confidence < 0.8
      AND pt.reviewed = 0
    ORDER BY pt.confidence ASC
    LIMIT 10000;

    -- 随机抽样(总量1%)
    INSERT INTO review_tasks (product_id, dimension_id, tag_value_id, priority)
    SELECT 
        pt.product_id,
        pt.dimension_id,
        pt.tag_value_id,
        'random'
    FROM product_tag pt
    WHERE pt.source IN ('rule', 'ai')
      AND pt.reviewed = 0
      AND pt.review_task_id IS NULL
    ORDER BY RAND()
    LIMIT 7000;  -- 70万的1%
END;

4.5 增量标注(新商品自动标注)

# 增量标注触发器(伪代码)

def on_new_product(product_id):
    """新商品入库自动标注"""
    product = db.get_product(product_id)

    # 1. 规则匹配(同步,实时)
    rule_tags = match_rules(product)

    # 2. AI标注(异步,延迟执行)
    queue.enqueue("ai_tag_task", {
        "product_id": product_id,
        "priority": "normal"
    })

    # 3. 记录标注结果
    save_tags(product_id, rule_tags)

    # 4. 同步到ES
    sync_to_elasticsearch(product_id)

def scheduled_full_recalculation():
    """定期全量重算(每周一次)"""
    # 1. 暂停增量标注
    pause_incremental_tagging()

    # 2. 重新运行AI标注
    run_batch_ai_tagging(
        product_ids=get_all_product_ids(),
        batch_size=5000,
        priority="low"
    )

    # 3. 重新计算权重
    recalculate_weights()

    # 4. 全量同步ES
    full_sync_to_elasticsearch()

    # 5. 恢复增量标注
    resume_incremental_tagging()

五、性能与扩展性设计

5.1 70万商品批量标注性能估算

阶段 规则匹配 AI语义标注
单条耗时 ~1ms ~50ms (含向量计算)
70万总耗时 ~12分钟 ~10小时
推荐批次 10000条/批 1000条/批
并发数 4-8核 1-2核(GPU)
预估时间(4核并发) 3-5分钟 5-8小时

5.2 分批处理策略

# 分批处理配置
BATCH_CONFIG = {
    "rule_matching": {
        "batch_size": 10000,
        "concurrent_batches": 8,
        "estimated_time": "3-5分钟",
        "total_products": 700000
    },

    "ai_semantic": {
        "batch_size": 1000,
        "concurrent_batches": 2,
        "estimated_time": "5-8小时",
        "embedding_model": "paraphrase-multilingual-MiniLM-L12-v2",
        "gpu_memory": "4GB"
    }
}

def process_batch_rule_matching(product_ids, batch_size=10000):
    """规则匹配批量处理"""
    total = len(product_ids)
    results = []

    for i in range(0, total, batch_size):
        batch_ids = product_ids[i:i+batch_size]
        batch_products = get_products(batch_ids)

        # 并行处理
        batch_results = parallel_map(
            match_rules, 
            batch_products, 
            workers=8
        )

        results.extend(batch_results)

        # 每批处理完写入数据库
        batch_insert_product_tags(results)

        # 更新任务进度
        update_task_progress(min(i+batch_size, total), total)

    return results

def process_batch_ai_semantic(product_ids, batch_size=1000):
    """AI语义批量处理(GPU加速)"""
    total = len(product_ids)
    results = []

    for i in range(0, total, batch_size):
        batch_ids = product_ids[i:i+batch_size]
        batch_products = get_products(batch_ids)

        # 调用AI标注
        batch_results = ai_tagger.batch_tag(batch_products, batch_size=1000)

        results.extend(batch_results)

        # 写入数据库(低置信度标记待复核)
        batch_insert_product_tags(results, set_review_flag=True)

        print(f"AI标注进度: {min(i+batch_size, total)}/{total}")

    return results

5.3 数据库索引优化

-- 核心查询索引优化

-- 1. 商品标签关联表索引(最关键)
ALTER TABLE product_tag ADD INDEX idx_product_dim (product_id, dimension_id);
ALTER TABLE product_tag ADD INDEX idx_tag_value_product (tag_value_id, product_id);
ALTER TABLE product_tag ADD INDEX idx_source_confidence (source, confidence);

-- 2. 组合查询优化(维度+标签值)
ALTER TABLE product_tag ADD INDEX idx_dim_tag (dimension_id, tag_value_id);

-- 3. 高频筛选字段
ALTER TABLE product_tag ADD INDEX idx_dimension_weight (dimension_id, weight DESC);

-- 4. ES同步状态追踪
ALTER TABLE product ADD INDEX idx_es_sync (es_synced, updated_at);

-- 5. 标签值表层级索引
ALTER TABLE tag_value ADD INDEX idx_dim_parent (dimension_id, parent_id);

-- 6. 标签维度表层级索引
ALTER TABLE tag_dimension ADD INDEX idx_parent_sort (parent_id, sort_order);

5.4 数据更新机制

# 数据同步策略

class DataSyncManager:
    """MySQL与ES数据同步管理"""

    def __init__(self):
        self.mysql_client = MySQLClient()
        self.es_client = ElasticsearchClient()
        self.redis_client = RedisClient()

    def sync_single_product(self, product_id):
        """单个商品变更同步"""
        # 1. 获取商品完整信息
        product = self.mysql_client.get_product_with_tags(product_id)

        # 2. 构建ES文档
        es_doc = self._build_es_document(product)

        # 3. 更新ES
        self.es_client.index("products", es_doc, id=product_id)

        # 4. 清除缓存
        self.redis_client.delete(f"product:{product_id}")

    def sync_batch_products(self, product_ids, batch_size=5000):
        """批量同步"""
        for i in range(0, len(product_ids), batch_size):
            batch_ids = product_ids[i:i+batch_size]

            # 批量获取
            products = self.mysql_client.get_products_with_tags(batch_ids)

            # 批量构建ES文档
            actions = []
            for product in products:
                doc = self._build_es_document(product)
                actions.append({
                    "index": {"_index": "products", "_id": product["id"]}
                })
                actions.append(doc)

            # 批量写入ES
            self.es_client.bulk(actions)

            # 更新同步状态
            self.mysql_client.mark_es_synced(batch_ids)

            print(f"ES同步进度: {min(i+batch_size, len(product_ids))}/{len(product_ids)}")

    def full_sync(self):
        """全量同步(建议凌晨执行)"""
        # 1. 创建全量同步任务
        task_id = self.mysql_client.create_sync_task("full")

        # 2. 切换ES别名(零停机)
        self.es_client.switch_alias("products", "products_v2", "products_v1")

        # 3. 全量重建索引
        self.rebuild_index()

        # 4. 切换完成
        self.es_client.switch_alias("products", "products_v1", "products_v2")

        # 5. 删除旧索引
        self.es_client.delete_index("products_v2")

    def incremental_sync(self, interval_seconds=60):
        """增量同步(定时任务)"""
        while True:
            # 获取最近更新的商品
            recent_products = self.mysql_client.get_recent_updated(
                since_minutes=interval_seconds/60
            )

            if recent_products:
                self.sync_batch_products(recent_products)

            time.sleep(interval_seconds)

5.5 完整技术架构图

┌─────────────────────────────────────────────────────────────────────────┐
│                           整体架构                                      │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐                │
│  │  爱优五金    │    │   标签管理   │    │   数据分析   │                │
│  │  收银系统    │    │    Web      │    │   Dashboard │                │
│  └──────┬──────┘    └──────┬──────┘    └──────┬──────┘                │
│         │                  │                  │                        │
│         └──────────────────┼──────────────────┘                        │
│                            │                                           │
│                            ▼                                           │
│  ┌─────────────────────────────────────────────────────────────┐      │
│  │                      API 网关层                               │      │
│  │  /api/products/tags    /api/tags/search    /api/tags/stats  │      │
│  └─────────────────────────────────────────────────────────────┘      │
│                            │                                           │
│         ┌──────────────────┼──────────────────┐                       │
│         │                  │                  │                        │
│         ▼                  ▼                  ▼                        │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐                │
│  │   标注服务   │    │   搜索服务   │    │   统计服务   │                │
│  │             │    │             │    │             │                │
│  │ - 规则引擎   │    │ - ES查询    │    │ - OLAP聚合  │                │
│  │ - AI标注    │    │ - 缓存      │    │ - 报表生成  │                │
│  │ - 任务调度   │    │ - 高亮      │    │             │                │
│  └──────┬──────┘    └──────┬──────┘    └──────┬──────┘                │
│         │                  │                  │                        │
│         └──────────────────┼──────────────────┘                       │
│                            │                                           │
│         ┌──────────────────┼──────────────────┐                       │
│         │                  │                  │                        │
│         ▼                  ▼                  ▼                        │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐                │
│  │    MySQL     │    │ Elasticsearch│    │  ClickHouse  │                │
│  │  (主库)      │    │  (搜索)     │    │  (分析)     │                │
│  │             │    │             │    │             │                │
│  │ - 商品主数据  │    │ - 标签倒排  │    │ - 标签统计  │                │
│  │ - 标签数据   │    │ - 商品文档  │    │ - 趋势分析  │                │
│  │ - 关联关系   │    │ - 复杂查询  │    │ - BI报表   │                │
│  └─────────────┘    └─────────────┘    └─────────────┘                │
│         │                  │                                           │
│         └──────────────────┼──────────────────┘                       │
│                            │                                           │
│                            ▼                                           │
│                    ┌─────────────┐                                    │
│                    │    Redis     │                                    │
│                    │   (缓存)     │                                    │
│                    │             │                                    │
│                    │ - 热点商品   │                                    │
│                    │ - 搜索缓存   │                                    │
│                    │ - 标签热度   │                                    │
│                    └─────────────┘                                    │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

六、标签搜索使用示例

6.1 MySQL查询示例

-- 查询同时满足多个条件的商品
SELECT p.id, p.name, p.retail_price, GROUP_CONCAT(tv.name) as tags
FROM product p
JOIN product_tag pt ON p.id = pt.product_id
JOIN tag_value tv ON pt.tag_value_id = tv.id
WHERE pt.dimension_id = 1 AND pt.tag_value_id IN (1, 2)  -- 工程场景
  AND pt.dimension_id = 4 AND pt.tag_value_id = 10      -- 专业级
  AND p.retail_price BETWEEN 50 AND 500
GROUP BY p.id
HAVING COUNT(DISTINCT pt.dimension_id) >= 2;

-- 查找推荐商品(相似标签权重高)
SELECT p.*, SUM(pt.weight * pt.confidence) as relevance_score
FROM product p
JOIN product_tag pt ON p.id = pt.product_id
WHERE pt.tag_value_id IN (
    SELECT tag_value_id FROM product_tag WHERE product_id = ?  -- 参照商品
)
AND p.id != ?  -- 排除自身
GROUP BY p.id
ORDER BY relevance_score DESC
LIMIT 20;

6.2 Elasticsearch查询示例

// 多维度组合搜索
POST /products/_search
{
  "query": {
    "bool": {
      "must": [
        { "term": { "scene_tags": "engineering" }},
        { "term": { "difficulty_tags": "professional" }}
      ],
      "should": [
        { "term": { "customer_tags": "factory" }},
        { "term": { "customer_tags": "property" }}
      ],
      "filter": [
        { "range": { "retail_price": { "gte": 50, "lte": 500 }}},
        { "term": { "stock_qty": { "gt": 0 }}}
      ]
    }
  },
  "aggs": {
    "by_scene": { "terms": { "field": "scene_tags" }},
    "by_difficulty": { "terms": { "field": "difficulty_tags" }}
  }
}

// 智能推荐(基于标签权重)
POST /products/_search
{
  "query": {
    "function_score": {
      "query": { "match_all": {} },
      "functions": [
        {
          "filter": { "term": { "scene_tags": "engineering" }},
          "weight": 1.5
        },
        {
          "filter": { "range": { "retail_price": { "lte": 100 }}},
          "weight": 1.2
        }
      ],
      "score_mode": "sum"
    }
  }
}

七、实施建议与里程碑

7.1 实施阶段划分

阶段 时间 内容 交付物
Phase 1 第1-2周 数据库表设计、基础框架搭建 MySQL表结构、ES索引
Phase 2 第3-4周 规则引擎开发、基础标注 规则引擎、10+维度标注
Phase 3 第5-6周 AI标注集成、批量处理 AI标注服务、70万商品标注
Phase 4 第7-8周 人工复核、搜索优化 复核系统、搜索API
Phase 5 第9-10周 性能优化、全量上线 优化报告、正式环境

7.2 硬件配置建议

组件 开发测试 生产环境(70万商品)
MySQL 4核8G 8核32G, SSD 500G
Elasticsearch 2核4G 4核16G, SSD 1T
Redis 1核2G 2核4G
AI标注服务器 - GPU 4GB+ (如RTX 3060)

文档版本: v1.0
创建时间: 2026-05-03
适用场景: 五金建材门店70万商品标签系统

Left-click: follow link, Right-click: select node, Scroll: zoom
x