项目结构Fan-In 模式本质多数据源 → 统一接收器完美匹配珠宝全流程「多环节、单数据中心」场景架构设计核心分层5 层架构配置层config全局配置、常量消息层message统一消息结构体、数据格式业务层process各业务模块单一职责核心层coreFan-In 队列、Sink 接收器、线程管理启动层main项目入口、编排启动# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Fan-In Pattern Fan-In扇入模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 20:47 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : settings.py 全局配置文件 import queue # 队列最大容量 QUEUE_MAX_SIZE 100 # 全局 Fan-In 队列单例 FAN_IN_QUEUE queue.Queue(maxsizeQUEUE_MAX_SIZE) # 业务处理模拟时间范围秒 PROCESS_MIN_TIME 0.3 PROCESS_MAX_TIME 1.2 # 结束标识 END_SIGNAL SYSTEM_END # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Fan-In Pattern Fan-In扇入模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 20:47 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : business_msg.py 统一业务消息结构体企业级数据格式 from dataclasses import dataclass from typing import Any dataclass class BusinessMessage: 珠宝全流程统一消息体 严格结构化所有业务环节必须使用此格式发送数据 process_name: str # 业务环节名称 task_name: str # 任务名称 task_details: Any # 任务详情 timestamp: float # 时间戳 status: str success # 状态 # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Fan-In Pattern Fan-In扇入模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 20:48 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : fan_in_queue.py Fan-In 核心队列线程安全、单例、全局唯一 from FanInPattern.config.settings import FAN_IN_QUEUE, END_SIGNAL from FanInPattern.message.business_msg import BusinessMessage class FanInQueue: Fan-In 队列封装企业级安全调用 staticmethod def send_msg(msg: BusinessMessage) - None: 发送业务消息 :param msg: :return: FAN_IN_QUEUE.put(msg) staticmethod def get_msg() - BusinessMessage | str: 获取消息阻塞 :return: return FAN_IN_QUEUE.get() staticmethod def send_end_signal() - None: 发送结束信号 :return: FAN_IN_QUEUE.put(END_SIGNAL) staticmethod def task_done() - None: 任务完成标识 :return: FAN_IN_QUEUE.task_done() staticmethod def join() - None: 等待队列清空 :return: FAN_IN_QUEUE.join() # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Fan-In Pattern Fan-In扇入模式: # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 20:50 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : work_sink.py 工作接收器数据中心唯一汇聚点 import threading import time from FanInPattern.config.settings import END_SIGNAL from FanInPattern.core.fan_in_queue import FanInQueue from FanInPattern.message.business_msg import BusinessMessage class WorkSink: 珠宝业务数据中心Fan-In Sink def __init__(self): self.total_count 0 self.process_stats {} # 各环节统计 def _update_stats(self, process_name: str) - None: 更新业务统计 :param process_name: :return: self.process_stats[process_name] self.process_stats.get(process_name, 0) 1 def run(self) - None: 启动接收器 :return: print( * 90) print( 【企业级】珠宝业务数据中心已启动 - Fan-In 工作接收器) print( * 90) while True: data FanInQueue.get_msg() # 结束信号 if data END_SIGNAL: self._print_final_report() FanInQueue.task_done() break # 处理业务消息 if isinstance(data, BusinessMessage): self._handle_message(data) FanInQueue.task_done() time.sleep(0.1) def _handle_message(self, msg: BusinessMessage) - None: 处理单条消息 :param msg: :return: self.total_count 1 self._update_stats(msg.process_name) print( f 接收 | {msg.process_name:10s} | f任务{msg.task_name:15s} | 状态{msg.status} ) def _print_final_report(self) - None: 打印企业级汇总报告 :return: print(\n * 90) print( 珠宝全流程业务汇总报告) print( * 90) print(f✅ 总处理任务数{self.total_count}) for process, count in self.process_stats.items(): print(f 「{process}」{count} 项) print( * 90) # 单例接收器全局唯一 SINK_INSTANCE WorkSink()# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Fan-In Pattern Fan-In扇入模式: # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 20:52 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : base_process.py 业务基类所有业务环节必须继承此类 import abc import time import random import threading from FanInPattern.config.settings import PROCESS_MIN_TIME, PROCESS_MAX_TIME from FanInPattern.core.fan_in_queue import FanInQueue from FanInPattern.message.business_msg import BusinessMessage class BaseProcess(abc.ABC): 抽象业务基类 职责统一执行逻辑、发送消息、模拟耗时 def __init__(self, process_name: str): self.process_name process_name self.task_list [] abc.abstractmethod def init_tasks(self) - None: 初始化任务子类必须实现 :return: pass def _simulate_process(self) - None: 模拟业务处理耗时 :return: sleep_time random.uniform(PROCESS_MIN_TIME, PROCESS_MAX_TIME) time.sleep(sleep_time) def _send_task_msg(self, task: str) - None: 构造并发送消息 :param task: :return: msg BusinessMessage( process_nameself.process_name, task_nametask, task_detailstask, timestamptime.time() ) FanInQueue.send_msg(msg) def execute(self) - None: 执行业务流程 :return: self.init_tasks() for task in self.task_list: self._simulate_process() self._send_task_msg(task) def start_thread(self) - threading.Thread: 启动独立线程执行 :return: thread threading.Thread(targetself.execute, nameself.process_name) thread.start() return thread # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Fan-In Pattern Fan-In扇入模式: # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:09 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : design.py from FanInPattern.process.base_process import BaseProcess class DesignProcess(BaseProcess): 珠宝设计 def __init__(self): super().__init__(珠宝设计) def init_tasks(self): :return: self.task_list [钻戒款式设计, 项链3D建模, 手镯图纸审核] # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Fan-In Pattern Fan-In扇入模式: # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:11 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : purchase.py from FanInPattern.process.base_process import BaseProcess class PurchaseProcess(BaseProcess): 原料采购 def __init__(self): super().__init__(原料采购) def init_tasks(self): :return: self.task_list [ 采购1克拉南非钻石, 采购999足金500g, 采购红宝石10颗 ] # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:23 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : produce.py from FanInPattern.process.base_process import BaseProcess class ProduceProcess(BaseProcess): 生产加工 def __init__(self): super().__init__(生产加工) def init_tasks(self): :return: self.task_list [钻石镶嵌加工, 黄金手镯抛光, 金饰铸造成型] # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Fan-In Pattern Fan-In扇入模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:26 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : quality.py from FanInPattern.process.base_process import BaseProcess class QualityProcess(BaseProcess): 质量检测 def __init__(self): super().__init__(质量检测) def init_tasks(self): self.task_list [黄金纯度检测, 钻石工艺检测, 珠宝鉴定证书出具] # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Fan-In Pattern Fan-In扇入模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:28 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : warehouse.py from FanInPattern.process.base_process import BaseProcess class WarehouseProcess(BaseProcess): 仓储管理 def __init__(self): super().__init__(仓储管理) def init_tasks(self): self.task_list [钻戒入库登记, 黄金库存盘点, 宝石库存预警] # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Fan-In Pattern Fan-In扇入模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:28 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : sale.py from FanInPattern.process.base_process import BaseProcess class SaleProcess(BaseProcess): 销售环节 def __init__(self): super().__init__(销售环节) def init_tasks(self): self.task_list [线上钻戒售出, 门店黄金手镯售出, 珠宝批发订单发货] # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Fan-In Pattern Fan-In扇入模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:29 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : after_sale.py from FanInPattern.process.base_process import BaseProcess class AfterSaleProcess(BaseProcess): 售后服务 def __init__(self): super().__init__(售后服务) def init_tasks(self): self.task_list [钻戒免费清洗, 项链维修, 黄金首饰保养]调用# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述 Fan-In Pattern Fan-In扇入模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:51 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : FanInBll.py 企业级编排 import threading from FanInPattern.core.fan_in_queue import FanInQueue from FanInPattern.core.work_sink import SINK_INSTANCE # 导入所有业务流程 from FanInPattern.process.purchase import PurchaseProcess from FanInPattern.process.design import DesignProcess from FanInPattern.process.produce import ProduceProcess from FanInPattern.process.quality import QualityProcess from FanInPattern.process.warehouse import WarehouseProcess from FanInPattern.process.sale import SaleProcess from FanInPattern.process.after_sale import AfterSaleProcess class JewelryBusinessOrchestration(object): 业务流程编排器 staticmethod def get_all_processes() - list: 获取所有业务环节扩展只需在此添加 :return: return [ PurchaseProcess(), DesignProcess(), ProduceProcess(), QualityProcess(), WarehouseProcess(), SaleProcess(), AfterSaleProcess() ] staticmethod def run(): 启动全流程 # 1. 启动接收器线程 sink_thread threading.Thread(targetSINK_INSTANCE.run, nameWorkSink) sink_thread.start() # 2. 启动所有业务线程 processes JewelryBusinessOrchestration.get_all_processes() threads [p.start_thread() for p in processes] # 3. 等待所有业务完成 for t in threads: t.join() # 4. 发送结束信号 FanInQueue.send_end_signal() # 5. 等待队列处理完成 FanInQueue.join() sink_thread.join() class FanInBll(object): def demo(self): :return: print( 企业级珠宝 Fan-In 业务系统启动...\n) JewelryBusinessOrchestration.run() print(\n 系统全部执行完成)