python: Worker Pool Pattern
2026/6/25 14:41:54 网站建设 项目流程

项目经构:

Worker Pool(工作池)是控制并发的经典模式:
固定数量的「工人」(线程 / 进程),不随任务数量无限创建
工人从「任务队列」中领取任务执行
核心价值:限制并发数、优化资源、稳定可控、优雅关闭

# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:20 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : settings.py """ 全局配置文件:所有可变参数统一管理 符合企业级:配置与代码分离 """ import os # 工作池配置 WORKER_COUNT = 3 # 固定生产线数量(可动态修改) TASK_QUEUE_MAXSIZE = 100 # 任务队列最大长度 # 业务模拟配置 SIMULATE_TASK_DELAY = True # 是否模拟任务耗时 RANDOM_DELAY_RANGE = (0.5, 2.5) # 模拟IO延迟 # 日志配置 LOG_LEVEL = "INFO" LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:20 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : exceptions.py """ 统一异常体系:职责单一 专门处理工作池 + 业务异常 """ class WorkerPoolException(Exception): """ 工作池基础异常 """ pass class TaskExecutionException(WorkerPoolException): """ 任务执行失败异常 """ pass class TaskNotFoundException(WorkerPoolException): """ 任务不存在异常 """ pass # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:21 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logger.py """ 日志工具:全局唯一日志实例 职责单一:只做日志输出 """ import logging from WorkerPoolPattern.config.settings import LOG_LEVEL, LOG_FORMAT def get_logger(name: str = "jewelry_production") -> logging.Logger: logger = logging.getLogger(name) logger.setLevel(LOG_LEVEL) if not logger.handlers: handler = logging.StreamHandler() # Windows 必加:解决中文/编码报错 handler.stream.reconfigure(encoding='utf-8') formatter = logging.Formatter(LOG_FORMAT) handler.setFormatter(formatter) logger.addHandler(handler) return logger # 全局可用的日志对象 logger = get_logger() # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:23 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : worker.py """ 核心工作池:通用、可扩展、线程安全 职责单一:只管理工人 + 任务队列 不耦合任何珠宝业务,可直接复用于其他项目 """ import queue import threading from typing import Callable, Any from WorkerPoolPattern.utils.logger import logger from WorkerPoolPattern.core.exceptions import TaskExecutionException class Worker(threading.Thread): """ 单个工作者:无限从队列取任务执行 """ def __init__(self, task_queue: queue.Queue, worker_id: int): super().__init__(daemon=True) self.queue = task_queue self.worker_id = worker_id self.name = f"Worker-{worker_id}" def run(self): """ :return: """ logger.info(f"工人【{self.worker_id}】已启动,等待任务...") while True: try: task_func, task_id, args, kwargs = self.queue.get() logger.info(f"工人【{self.worker_id}】领取任务【{task_id}】") try: task_func(*args, **kwargs) logger.info(f"工人【{self.worker_id}】完成任务【{task_id}】") except Exception as e: logger.error(f"任务【{task_id}】执行失败:{str(e)}") raise TaskExecutionException(f"任务执行异常:{e}") from e finally: self.queue.task_done() except Exception as e: logger.error(f"工人【{self.worker_id}】异常:{str(e)}") class WorkerPool: """ 工作池管理器:职责单一,只负责启动/管理/等待工人 """ def __init__(self, worker_count: int, queue_maxsize: int = 0): self.worker_count = worker_count self.task_queue = queue.Queue(maxsize=queue_maxsize) self.workers: list[Worker] = [] def start(self): """ 启动所有工作者 :return: """ logger.info(f"启动工作池,共 {self.worker_count} 个工作者") for i in range(1, self.worker_count + 1): worker = Worker(self.task_queue, i) worker.start() self.workers.append(worker) def submit(self, task_id: str, task_func: Callable, *args, **kwargs) -> None: """ 提交任务到队列(通用接口) :param task_id: :param task_func: :param args: :param kwargs: :return: """ self.task_queue.put((task_func, task_id, args, kwargs)) def wait_completion(self): """ 等待所有任务执行完成(优雅关闭) :return: """ self.task_queue.join() logger.info("所有任务已完成,工作池优雅关闭")
# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:25 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : check_tasks.py """原料质检 / 成品质检""" import time import random from WorkerPoolPattern.config.settings import RANDOM_DELAY_RANGE from WorkerPoolPattern.utils.logger import logger def raw_material_check(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 原料质检:钻石4C、真伪、纯度检测") time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) def finished_goods_check(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 成品质检:工艺、成色、尺寸、镶口精度全检") time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:27 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : process_tasks.py """首饰加工、设计、镶嵌""" import time import random from WorkerPoolPattern.config.settings import RANDOM_DELAY_RANGE from WorkerPoolPattern.utils.logger import logger def jewelry_process(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 首饰加工:设计 → 执模 → 镶嵌 → 抛光 → 电金") time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:28 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logistics_tasks.py """库存录入、物流发货""" import time import random from WorkerPoolPattern.config.settings import RANDOM_DELAY_RANGE from WorkerPoolPattern.utils.logger import logger def inventory_record(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 库存录入:生成唯一防伪码、ERP入库、标签打印") time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) def order_delivery(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 订单发货:物流下单、保价、短信通知") time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:29 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : production_service.py """ 珠宝生产业务服务 只负责:接收订单 → 编排任务 → 提交到工作池 """ from WorkerPoolPattern.tasks import JEWELRY_FULL_PROCESS from WorkerPoolPattern.core.worker import WorkerPool from WorkerPoolPattern.utils.logger import logger class JewelryProductionService(object): """ 业务编排层(职责单一) """ def __init__(self, pool: WorkerPool): self.pool = pool # 依赖注入:解耦 def create_order_task(self, order_id: str): """ 为单个订单创建全流程任务链 :param order_id: :return: """ logger.info(f"📦 开始创建订单【{order_id}】的全流程任务") for task_name, task_func in JEWELRY_FULL_PROCESS: task_id = f"{order_id}-{task_name}" self.pool.submit(task_id, task_func, order_id) def batch_create_orders(self, order_count: int): """ 批量创建订单任务 :param order_count: :return: """ logger.info(f"📋 开始批量创建 {order_count} 个珠宝生产订单") for i in range(1, order_count + 1): order_id = f"珠宝订单-{i:03d}" self.create_order_task(order_id)

调用:

# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:31 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : WorkerPoolBll.py """ 项目主入口 职责单一:只负责启动 → 执行业务 → 等待结束 """ from WorkerPoolPattern.config.settings import WORKER_COUNT, TASK_QUEUE_MAXSIZE from WorkerPoolPattern.core.worker import WorkerPool from WorkerPoolPattern.service.production_service import JewelryProductionService from WorkerPoolPattern.utils.logger import logger class WorkerPoolBll(object): """ """ def demo(self): """ :return: """ logger.info("=" * 60) logger.info("珠宝企业级生产系统启动(Worker Pool 模式)") logger.info("=" * 60) pool = WorkerPool( worker_count=WORKER_COUNT, queue_maxsize=TASK_QUEUE_MAXSIZE ) pool.start() production_service = JewelryProductionService(pool) production_service.batch_create_orders(order_count=10) pool.wait_completion() logger.info("珠宝生产系统全部执行完成")

输出:

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询