redis的数据类型和使用
2026/6/6 10:37:55 网站建设 项目流程

一、redis的常用数据类型对比

数据类型

像什么适合场景不适合
String一个值缓存、计数器、状态值、分布式锁多字段结构、大量消息队列
Hash一个对象/字典用户信息、设备状态、配置对象有顺序的消息流
List简单队列简单先进先出任务队列多消费者组、消息确认、失败恢复
Set不重复集合去重、标签、集合交并差排序、时间顺序消息
Sorted Set带分数排序集合排行榜、延迟队列、按时间排序复杂可靠消息消费
Pub/Sub广播通知在线实时通知、即时广播消息可靠性、离线补消费
Stream消息日志/可靠队列事件流、任务同步、消费者组、可回放消息超大规模 Kafka 级场景

二、redisStream的消费者组代码示例

import redis import time r = redis.Redis( host="localhost", port=6379, db=0, decode_responses=True ) stream_key = "sync_stream" group_name = "sync_group" consumer_name = "consumer_1" # 创建消费者组 # 如果已经存在,会报错,所以这里捕获异常 try: r.xgroup_create( name=stream_key, groupname=group_name, id="0", mkstream=True ) print("消费者组创建成功") except redis.exceptions.ResponseError as e: if "BUSYGROUP" in str(e): print("消费者组已存在") else: raise while True: # 使用消费者组读取消息 # > 表示只读取还没有投递给任何消费者的新消息 result = r.xreadgroup( groupname=group_name, consumername=consumer_name, streams={stream_key: ">"}, count=1, block=5000 ) if not result: print("暂无新消息") continue for stream, messages in result: for message_id, data in messages: print("收到消息:", message_id, data) try: # 这里模拟你的业务处理 # 比如: # 1. 根据 edge_bucket / edge_object_key 下载文件 # 2. 上传到中心站 # 3. 中心站返回 success print("开始处理任务:", data["sync_id"]) time.sleep(1) print("处理成功:", data["sync_id"]) # 处理成功后 ACK r.xack(stream_key, group_name, message_id) print("ACK 成功:", message_id) except Exception as e: # 如果处理失败,不 ACK # 这条消息会留在 pending 里 print("处理失败,不 ACK:", e)

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询