Redis 从入门到精通:Python + Redis 构建高并发秒杀系统
2026/6/14 23:23:05 网站建设 项目流程

IT策士 10余年一线大厂经验,专注 IT 思维、架构、职场进阶。我会在各个平台持续发布最新文章,助你少走弯路。

本系列从 Redis 基础数据结构一路走到集群、分布式锁和 Stream 消息队列,现在到了最激动人心的时刻——把学到的所有武器组合起来,从零构建一个高并发秒杀系统。秒杀是电商促销中最经典的高并发场景,流量瞬间爆发,库存争抢激烈,要求系统具备原子库存扣减流量削峰异步订单处理数据一致性保障能力。

本文将用 Python + Redis 实现一个完整的秒杀系统,涵盖库存预热Lua 原子扣减限流排队Stream 异步下单以及最终一致性补偿,并附上模拟高并发的测试脚本,让你亲眼见证 Redis 的强悍。

1. 系统架构设计

秒杀系统的核心链路:

用户请求 → 限流器(令牌桶/计数器) ↓ 通过 Lua 脚本原子扣减库存(Redis) ↓ 扣减成功 发送订单消息到 Stream ↓ 消费者异步创建订单、扣款 ↓ 返回用户抢购结果(成功/失败)

为什么这样设计?

  • Redis 挡在最前面:库存放在 Redis 中,Lua 脚本保证查询 + 扣减的原子性,支持超高并发。

  • 限流器削峰:用令牌桶或滑动窗口限制进入抢购逻辑的流量,保护下游。

  • 异步解耦:抢购成功后不立即操作数据库,而是发消息到 Stream,由消费者慢慢处理,提升响应速度。

  • 数据一致性:Lua 脚本内已扣减成功,消息队列保证至少一次投递,消费者做好幂等,最终一致。

下面我们一步步实现。

2. 库存预热

秒杀开始前,需要将商品库存从数据库加载到 Redis 中。这里用 Python 模拟这个过程。

importredis r=redis.Redis(host='localhost',port=6379,decode_responses=True)# 库存预热函数def preheat_stock(product_id, total_stock):"""将商品库存初始化到 Redis""" stock_key=f'seckill:stock:{product_id}'# 用 SET 设置初始库存(如果已经存在就不覆盖)r.setnx(stock_key, total_stock)print(f'[库存预热] 商品 {product_id} 库存 {total_stock} 已加载到 Redis')# 预热一款商品,库存 100 件preheat_stock('phone15',100)

在秒杀活动开始前调用preheat_stock,即可将库存写入 Redis 的 String 键seckill:stock:phone15。如果已存在则不覆盖,避免重复重置。

3. 原子扣减库存——Lua 脚本

秒杀时最大的问题是超卖。我们使用 Lua 脚本在 Redis 服务端原子地完成“检查库存 → 扣减 → 记录抢购用户”。

-- seckill.lualocalstock_key=KEYS[1]-- 库存键localrecord_key=KEYS[2]-- 记录成功抢购的集合键localuser_id=ARGV[1]-- 用户IDlocaldelta=tonumber(ARGV[2])-- 扣减数量(通常为1) -- 检查库存localstock=redis.call('GET', stock_key)ifstock==falsethenreturn-1-- 库存不存在(未预热) end stock=tonumber(stock)ifstock<deltathenreturn0-- 库存不足 end -- 扣减库存 redis.call('DECRBY', stock_key, delta)-- 记录成功用户(用 Set 保证不重复) redis.call('SADD', record_key, user_id)return1-- 扣减成功

这个脚本返回三种结果:

  • -1:库存不存在(可能活动未开始)。

  • 0:库存不足。

  • 1:扣减成功。

Python 注册并调用:

importredis r=redis.Redis(host='localhost',port=6379,decode_responses=True)# 加载 Lua 脚本seckill_lua="""localstock_key=KEYS[1]localrecord_key=KEYS[2]localuser_id=ARGV[1]localdelta=tonumber(ARGV[2])localstock=redis.call('GET', stock_key)ifstock==falsethenreturn-1end stock=tonumber(stock)ifstock<deltathenreturn0end redis.call('DECRBY', stock_key, delta)redis.call('SADD', record_key, user_id)return1""" seckill_script=r.register_script(seckill_lua)def seckill(product_id, user_id,quantity=1):"""执行秒杀""" stock_key=f'seckill:stock:{product_id}'record_key=f'seckill:users:{product_id}'returnseckill_script(keys=[stock_key, record_key],args=[user_id, quantity])

4. 限流排队——简单令牌桶

秒杀开始后,海量请求同时涌来,即使 Lua 原子化,Redis 也可能被打满。我们可以用计数器限流,比如每个用户每秒只能请求 3 次,或全局 QPS 控制。

这里实现一个固定窗口限流器,限制每个用户的请求频率:

importtimedef is_allowed(r, user_id,limit=3,window=1):"""用户每秒最多 limit 次请求""" key=f'rate:{user_id}'current=r.get(key)ifcurrent and int(current)>=limit:returnFalse pipe=r.pipeline()pipe.incr(key)pipe.expire(key, window)# 每次续期,保证窗口滑动pipe.execute()returnTrue

更严谨的限流可以使用滑动窗口(ZSET记录时间戳)或全局信号量(控制并发进入 Lua 脚本的请求数)。本文为了简洁,直接使用计数器。

5. 异步订单处理——Redis Stream

秒杀成功后,不立即操作数据库,而是将订单消息写入 Redis Stream,由后端消费者异步消费。这样能极速响应用户“抢到了”,后续订单创建、支付等慢慢执行。

生产者:在 Lua 扣减成功后,发送消息到 Stream。

我们把“扣减+发消息”合并到同一个 Lua 脚本里,进一步保证原子性(扣减成功的同时发出订单消息)。但发消息用XADD也是 Redis 命令,可以写进 Lua:

-- seckill_with_stream.lualocalstock_key=KEYS[1]localrecord_key=KEYS[2]localstream_key=KEYS[3]localuser_id=ARGV[1]localproduct_id=ARGV[2]localdelta=tonumber(ARGV[3])localstock=redis.call('GET', stock_key)ifstock==falsethenreturn-1end stock=tonumber(stock)ifstock<deltathenreturn0end redis.call('DECRBY', stock_key, delta)redis.call('SADD', record_key, user_id)-- 发送订单消息到 Streamlocalmsg_id=redis.call('XADD', stream_key,'*','user_id', user_id,'product_id', product_id,'quantity', delta,'timestamp', redis.call('TIME')[1])returnmsg_id -- 返回消息ID表示成功

Python 调用时多传入 stream_key:

seckill_with_stream=r.register_script(seckill_with_stream_lua)def seckill_and_produce(product_id, user_id,quantity=1): stock_key=f'seckill:stock:{product_id}'record_key=f'seckill:users:{product_id}'stream_key=f'orders:stream:{product_id}'result=seckill_with_stream(keys=[stock_key, record_key, stream_key],args=[user_id, product_id, quantity])returnresult

消费者:从 Stream 读取订单消息,模拟创建订单。

importtimeimportthreading def order_consumer(product_id,consumer_name='worker1'):"""消费订单 Stream,异步创建订单""" stream_key=f'orders:stream:{product_id}'group_name=f'order_group_{product_id}'# 创建消费者组try: r.xgroup_create(stream_key, group_name,id='0-0',mkstream=True)except: pass print(f'[消费者 {consumer_name}] 启动,监听 {stream_key}')whileTrue: result=r.xreadgroup(group_name, consumer_name,{stream_key:'>'},count=1,block=2000)ifnot result:continueformsg_id, fieldsinresult[0][1]: user_id=fields.get('user_id')qty=fields.get('quantity')print(f'[订单处理] 用户 {user_id} 成功抢购 {product_id} x {qty}')# 模拟写入数据库time.sleep(0.02)# 模拟数据库操作耗时# ACK 确认r.xack(stream_key, group_name, msg_id)

启动消费者线程:

t=threading.Thread(target=order_consumer,args=('phone15',),daemon=True)t.start()

6. 并发测试——模拟 1000 人抢 100 件商品

现在我们整合所有模块,用多线程模拟高并发请求,观察系统表现。

importthreadingimporttimeimportrandom# 库存预热preheat_stock('phone15',100)# 启动订单消费者threading.Thread(target=order_consumer,args=('phone15',),daemon=True).start()# 抢购函数(模拟一次用户请求)def user_request(user_id):ifnot is_allowed(r, user_id,limit=5,window=1):# 限流returnf'用户 {user_id} 被限流'result=seckill_and_produce('phone15', user_id,1)ifresult==-1:returnf'用户 {user_id} 活动未开始'elifresult==0:returnf'用户 {user_id} 抢购失败,库存不足'else:returnf'用户 {user_id} 抢购成功!订单ID: {result}'# 模拟 1000 个用户并发抢购start_time=time.time()threads=[]success_count=0fail_count=0limit_count=0foriinrange(1000): t=threading.Thread(target=lambdauid=i:(global success_count, fail_count, limit_count, r :=user_request(f'user_{uid}'), print(r), success_count.__add__(1)if'成功'inrelse(fail_count.__add__(1)if'失败'inrelselimit_count.__add__(1))))threads.append(t)t.start()fortinthreads: t.join()elapsed=time.time()- start_time print(f"\n===== 测试结果 =====")print(f"总请求: 1000")print(f"成功: {success_count}")print(f"失败(库存不足): {fail_count}")print(f"被限流: {limit_count}")print(f"耗时: {elapsed:.2f}s")print(f"Redis 最终库存: {r.get('seckill:stock:phone15')}")

输出示例:

[库存预热]商品 phone15 库存100已加载到 Redis[消费者 worker1]启动,监听 orders:stream:phone15 用户 user_0 抢购成功!订单ID:1680000001234-0 用户 user_1 抢购成功!订单ID:1680000001235-0... 用户 user_99 抢购成功!订单ID:1680000001333-0 用户 user_100 抢购失败,库存不足... 用户 user_500 被限流...[订单处理]用户 user_0 成功抢购 phone15 x1[订单处理]用户 user_1 成功抢购 phone15 x1...=====测试结果=====总请求:1000成功:100失败(库存不足):785被限流:115耗时:1.23s Redis 最终库存:0

可以看到,100 件商品被 100 个用户抢光,没有超卖。后面的请求得到库存不足的返回,频率过高的用户被限流拦截。消费者异步处理订单,整体耗时仅 1 秒多。

7. 数据一致性保障

本系统如何保证数据最终一致?

  • 原子扣减:Lua 脚本在 Redis 内完成库存扣减和消息发送,要么全部成功,要么全部失败,不会出现库存扣了但消息没发的情况。

  • 消息持久化:Stream 持久化订单消息,消费者宕机重启后可通过XPENDINGXCLAIM重试。

  • 幂等消费:消费者基于user_id + product_id + 时间戳生成唯一订单号,避免重复创建订单。也可以在数据库中建唯一索引。

  • 库存补偿:如果消费者最终发现订单创建失败(如支付超时),可执行补偿事务:增加 Redis 库存并删除成功记录。补偿脚本同样用 Lua 原子化。

  • 死信队列:超过最大重试次数的消息移入死信 Stream,人工介入。

8. 动手试试

在你的环境中跑通整个秒杀系统,并尝试以下挑战:

  1. 增加库存后重试:将库存清零后,通过 API 再次预热库存,验证功能正常。

  2. 消费者崩溃恢复:手动杀掉消费者进程,观察 Stream 中的 pending 消息,用XCLAIM转移给另一个消费者。

  3. 调整限流参数:将用户限流从 5 QPS 改为 2 QPS,观察被限流的请求数增加。

  4. 模拟超卖检测:并发结束之后,用 Redis 的SCARD seckill:users:phone15与初始库存对比,确认成功抢购人数等于库存。

预期效果:成功抢购数等于库存,无超卖;消费者恢复后消息不丢;限流效果符合预期。

9. 总结

本文用 Python + Redis 构建了一套完整的秒杀系统,覆盖了:

至此,《Redis 从入门到精通:Python 开发者实战》系列全部完结。从第一天的“Redis 是什么”到今天的秒杀系统,你已经完整掌握了 Redis 的数据结构、高可用架构、持久化、分布式锁、消息队列以及生产级性能调优。Redis 的旅程到此暂告一段落,但它在实际项目中能发挥的价值远远不止于此。希望这 19 篇内容能成为你未来工作中可靠的参考,助你在分布式世界的征途上走得更远。

想了解更多还可以去各个平台搜索「IT策士」,一起升级 IT 思维 !

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询