MuleSoft驱动的企业级AI编排:构建可审计、可治理的LLM工作流
2026/6/6 12:03:12
数据类型 | 像什么 | 适合场景 | 不适合 |
|---|---|---|---|
| String | 一个值 | 缓存、计数器、状态值、分布式锁 | 多字段结构、大量消息队列 |
| Hash | 一个对象/字典 | 用户信息、设备状态、配置对象 | 有顺序的消息流 |
| List | 简单队列 | 简单先进先出任务队列 | 多消费者组、消息确认、失败恢复 |
| Set | 不重复集合 | 去重、标签、集合交并差 | 排序、时间顺序消息 |
| Sorted Set | 带分数排序集合 | 排行榜、延迟队列、按时间排序 | 复杂可靠消息消费 |
| Pub/Sub | 广播通知 | 在线实时通知、即时广播 | 消息可靠性、离线补消费 |
| Stream | 消息日志/可靠队列 | 事件流、任务同步、消费者组、可回放消息 | 超大规模 Kafka 级场景 |
import redis import time r = redis.Redis( host="localhost", port=6379, db=0, decode_responses=True ) stream_key = "sync_stream" group_name = "sync_group" consumer_name = "consumer_1" # 创建消费者组 # 如果已经存在,会报错,所以这里捕获异常 try: r.xgroup_create( name=stream_key, groupname=group_name, id="0", mkstream=True ) print("消费者组创建成功") except redis.exceptions.ResponseError as e: if "BUSYGROUP" in str(e): print("消费者组已存在") else: raise while True: # 使用消费者组读取消息 # > 表示只读取还没有投递给任何消费者的新消息 result = r.xreadgroup( groupname=group_name, consumername=consumer_name, streams={stream_key: ">"}, count=1, block=5000 ) if not result: print("暂无新消息") continue for stream, messages in result: for message_id, data in messages: print("收到消息:", message_id, data) try: # 这里模拟你的业务处理 # 比如: # 1. 根据 edge_bucket / edge_object_key 下载文件 # 2. 上传到中心站 # 3. 中心站返回 success print("开始处理任务:", data["sync_id"]) time.sleep(1) print("处理成功:", data["sync_id"]) # 处理成功后 ACK r.xack(stream_key, group_name, message_id) print("ACK 成功:", message_id) except Exception as e: # 如果处理失败,不 ACK # 这条消息会留在 pending 里 print("处理失败,不 ACK:", e)