从传感器到推理端:VLA 机器人 TCP 通信与 msgpack 序列化深度解析
2026/5/25 1:45:32 网站建设 项目流程

从传感器到推理端:VLA 机器人 TCP 通信与 msgpack 序列化深度解析

场景:在做 VLA 机器人项目时需要一套高效的传感器数据传输方案——机器人端发送传感器数据,推理端接收后模型推理,再将结果以 chunk 流式返回。本文以此为背景,把 TCP 通信 + msgpack 序列化涉及的每个知识点都讲清楚。


一、为什么用 msgpack 而不是 JSON

TCP 传输的是字节流,任何数据发送前都需要序列化成字节。

JSONmsgpack
格式文本,可读二进制,不可读
体积较大比 JSON 小 20–50%
速度较慢序列化/反序列化更快
适合场景对外 API、配置文件传感器数据流、服务间通信

机器人传感器数据高频(100Hz+)、数据量大,msgpack 是更合适的选择。


二、msgpack 序列化原理:逐字节拆解

机器人端发送一帧传感器数据:

sensor_request={'type':'sensor','joint':[0.1,-0.2,0.3],# 关节角度,单位 rad'ts':1700000000# 时间戳,Unix 秒}packed=msgpack.packb(sensor_request,use_bin_type=True)# 共 43 字节

用 hex 查看原始字节:

83 a4 74797065 a6 73656e736f72 a5 6a6f696e74 93 ca 3dcccccd ca be4ccccd ca 3e99999a a2 7473 ce 6553f100

逐字节对照表

偏移字节(hex)含义
[00]83fixmap,3 个键值对(0x80 + 3
[01]a4fixstr,长度 4(0xa0 + 4
[02-05]74 79 70 65"type"ASCII
[06]a6fixstr,长度 6(0xa0 + 6
[07-12]73 65 6e 73 6f 72"sensor"ASCII
[13]a5fixstr,长度 5(0xa0 + 5
[14-18]6a 6f 69 6e 74"joint"ASCII
[19]93fixarray,3 个元素(0x90 + 3
[20]cafloat32 类型标记
[21-24]3d cc cc cd0.1的 IEEE 754 float32
[25]cafloat32 类型标记
[26-29]be 4c cc cd-0.2的 IEEE 754 float32
[30]cafloat32 类型标记
[31-34]3e 99 99 9a0.3的 IEEE 754 float32
[35]a2fixstr,长度 2(0xa0 + 2
[36-37]74 73"ts"ASCII
[38]ceuint32 类型标记
[39-42]65 53 f1 001700000000大端 uint32

字节数验证:

1 ← fixmap 头 + (1+4) + (1+6) ← 'type': 'sensor' + (1+5) ← 'joint' 键 + 1 + 3×(1+4) ← fixarray 头 + 3个float32(每个1字节标记+4字节数据) + (1+2) ← 'ts' 键 + (1+4) ← uint32 值 = 43 字节 ✓

msgpack 类型编码规律

前缀规则范围
0x80 + nfixmap(字典)n ≤ 15
0x90 + nfixarray(列表)n ≤ 15
0xa0 + nfixstr(字符串)n ≤ 31 字节
0x00~0x7f正整数直接存,单字节0–127
0xce+ 4字节uint320 – 4,294,967,295
0xca+ 4字节float32(IEEE 754)

Python 打印bytes时,能表示为 ASCII 的字节会直接显示成字母,所以看到的是\x83\xa4type\xa6sensor...而不是全十六进制。


三、TCP 粘包问题与长度前缀协议

什么是粘包

TCP 是流式协议,没有消息边界。sendall一次发出 47 字节(4 字节头 + 43 字节体),接收方可能:

第一次 recv → 20 字节 第二次 recv → 27 字节

如果推理端连续推理多帧并返回,接收方甚至可能一次收到多条消息粘在一起。

解决方案:4 字节长度前缀

协议约定:每条消息前加固定 4 字节,存储消息体的字节长度。

发送的 47 字节: ┌──────────────────┬────────────────────────────────────────────┐ │ 00 00 00 2b │ 83 a4 74 79 70 65 a6 73 65 6e ... │ │ (4字节,值=43) │ (43字节 msgpack 消息体) │ └──────────────────┴────────────────────────────────────────────┘

接收方先读 4 字节知道长度(43),再精确读 43 字节,完全消除粘包。


四、struct.pack / unpack:字节与整数互转

发送端:整数 → 字节

struct.pack('>I',43)# b'\x00\x00\x00\x2b'

格式字符串'>I'

字符含义
>大端序(Big-endian),高位字节在前,即网络字节序
Iunsigned int,4 字节无符号整数
43 = 0x0000002b 大端序: 00 00 00 2b ← 高位在前(标准网络传输顺序) 小端序: 2b 00 00 00 ← x86 CPU 本地字节序

接收端:字节 → 整数

msg_length=struct.unpack('>I',raw_length)[0]# b'\x00\x00\x00\x2b' → (43,) → 43

struct.unpack固定返回元组(支持一次解多个值),[0]取第一个元素:

struct.unpack('>I',...)# → (43,) 一个值也是元组struct.unpack('>II',...)# → (43, 7) 解两个值

五、recv_all:确保读满指定字节数

defrecv_all(conn,length):data=b''whilelen(data)<length:packet=conn.recv(length-len(data))ifnotpacket:returnNone# 对端关闭连接,recv 返回 b''data+=packetreturndata

conn.recv(n)语义是"最多读 n 字节",不保证一次读满。循环示例(目标读 43 字节,TCP 分两次到达):

初始:data = b'' 第 1 次循环:len(data)=0 < 43,recv(43) → 实际到了 20 字节,data = 20字节 第 2 次循环:len(data)=20 < 43,recv(23) → 实际到了 23 字节,data = 43字节 第 3 次循环:len(data)=43 == 43,退出,return data

if not packet处理对端正常关闭的情况,此时recv返回b'',不判断会死循环。


六、Socket 对象解读

conn,addr=server_sock.accept()# <socket.socket fd=4, family=2, type=1, proto=0,# laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 49724)>
字段含义
fd=44文件描述符,Linux 中 socket 也是文件
family=2AF_INETIPv4
type=1SOCK_STREAMTCP
laddr('127.0.0.1', 9999)推理服务端地址和监听端口
raddr('127.0.0.1', 49724)机器人端地址和临时端口

server_sock 与 conn 的区别

server_sock.listen(5)# 只负责监听,等待机器人连接conn,addr=server_sock.accept()# 每来一个连接,新建 conn 专门通信
  • server_sock:守着 9999 端口,不做数据收发
  • conn:和某个具体机器人节点通信的 socket

关于客户端临时端口

机器人端connect()时,OS 随机分配一个空闲端口(Ephemeral Port,通常 49152–65535),用完即释放。TCP 连接由四元组唯一标识:

机器人端 IP : 临时端口 → 推理端 IP : 监听端口 127.0.0.1 : 49724 → 127.0.0.1 : 9999

七、完整通信流程

机器人端(client) 推理端(server) │ │ │ connect(推理端 IP:9999) │ │──────────────────────────────────────────>│ │ │ accept() → conn │ │ │ sendall(4字节长度头 + 43字节传感器数据) │ │──────────────────────────────────────────>│ │ │ recv_all(conn, 4) 读长度 → 43 │ │ recv_all(conn, 43) 读消息体 │ │ msgpack.unpackb() 还原字典 │ │ VLA 模型推理 │ │ msgpack.packb() 序列化 action │ │ sendall(4字节头 + 41字节响应) │ │ │ recv_all(4) 读长度 → 41 │ │ recv_all(41) 读响应体 │ │ msgpack.unpackb() 还原 action 字典 │ │ │ │ close() │ close(conn)

八、完整代码

server.py(推理端)

""" 推理端 TCP 服务 - 接收机器人传感器数据(msgpack 序列化) - 模拟 VLA 模型推理,返回控制 action """importsocketimportmsgpackimportstruct HOST='127.0.0.1'PORT=9999defrecv_all(conn,length):"""循环读取,确保收满 length 字节,解决 TCP 流式拆包问题"""data=b''whilelen(data)<length:packet=conn.recv(length-len(data))ifnotpacket:returnNonedata+=packetreturndatadefinfer(sensor_data:dict)->dict:"""模拟 VLA 推理:输入传感器数据,返回控制 action"""joint=sensor_data.get('joint',[])# 实际场景替换为模型前向推理action=[round(j*0.5,4)forjinjoint]return{'status':'ok','action':action,'chunk':1,}defhandle_client(conn,addr):print(f"[推理端] 机器人已连接:{addr}")try:# 1. 读 4 字节长度前缀raw_length=recv_all(conn,4)ifnotraw_length:return# b'\x00\x00\x00\x2b' → 43msg_length=struct.unpack('>I',raw_length)[0]print(f"[推理端] 消息体长度:{msg_length}字节")# 2. 按长度读消息体raw_data=recv_all(conn,msg_length)ifnotraw_data:return# 3. msgpack 反序列化sensor_data=msgpack.unpackb(raw_data,raw=False)print(f"[推理端] 收到传感器数据:{sensor_data}")# 4. 推理response=infer(sensor_data)print(f"[推理端] 推理结果:{response}")# 5. 序列化响应 + 长度前缀,发送packed=msgpack.packb(response,use_bin_type=True)conn.sendall(struct.pack('>I',len(packed))+packed)exceptExceptionase:print(f"[推理端] 出错:{e}")finally:conn.close()print(f"[推理端] 关闭连接:{addr}\n")defmain():withsocket.socket(socket.AF_INET,socket.SOCK_STREAM)asserver_sock:server_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)server_sock.bind((HOST,PORT))server_sock.listen(5)print(f"[推理端] 监听{HOST}:{PORT}...")whileTrue:conn,addr=server_sock.accept()handle_client(conn,addr)# 并发版:# import threading# threading.Thread(target=handle_client, args=(conn, addr)).start()if__name__=='__main__':try:main()exceptKeyboardInterrupt:print("\n[推理端] 已退出")

client.py(机器人端)

""" 机器人端 TCP 客户端 - 采集传感器数据,msgpack 序列化后发送给推理端 - 接收推理端返回的控制 action """importsocketimportmsgpackimportstruct HOST='127.0.0.1'PORT=9999defrecv_all(sock,length):data=b''whilelen(data)<length:packet=sock.recv(length-len(data))ifnotpacket:returnNonedata+=packetreturndatadefsend_sensor(sensor_data:dict)->dict:"""发送一帧传感器数据,返回推理端的 action"""withsocket.socket(socket.AF_INET,socket.SOCK_STREAM)assock:sock.connect((HOST,PORT))# msgpack 序列化# {'type': 'sensor', 'joint': [0.1, -0.2, 0.3], 'ts': 1700000000}# → 43 字节,hex: 83 a4 74797065 a6 73656e736f72 ...packed=msgpack.packb(sensor_data,use_bin_type=True)print(f"[机器人端] 序列化后{len(packed)}字节:{packed.hex()}")# 加 4 字节长度前缀后发送# 43 → struct.pack('>I', 43) = b'\x00\x00\x00\x2b'sock.sendall(struct.pack('>I',len(packed))+packed)print(f"[机器人端] 已发送:{sensor_data}")# 接收推理结果raw_length=recv_all(sock,4)msg_length=struct.unpack('>I',raw_length)[0]raw_data=recv_all(sock,msg_length)response=msgpack.unpackb(raw_data,raw=False)print(f"[机器人端] 收到 action:{response}\n")returnresponsedefmain():# 模拟多帧传感器数据frames=[{'type':'sensor','joint':[0.1,-0.2,0.3],'ts':1700000000},{'type':'sensor','joint':[0.2,-0.15,0.25],'ts':1700000001},{'type':'sensor','joint':[0.0,0.0,0.0],'ts':1700000002},]forframeinframes:try:send_sensor(frame)exceptConnectionRefusedError:print("[机器人端] 无法连接推理端,请先启动 server.py")breakif__name__=='__main__':main()

九、运行

pipinstallmsgpack# 终端 1:启动推理端python server.py# 终端 2:启动机器人端python client.py

十、扩展方向

本文 demo 是单帧一问一答,实际 VLA 场景可在此基础上扩展:

  • 长连接多帧:一次connect后循环发送多帧传感器数据,避免频繁建连开销,需在handle_client中加while True循环读包
  • 推理 chunk 流式返回:推理端每推理出一个 token/chunk 就发一帧响应,机器人端循环接收,同样用长度前缀帧封装每个 chunk
  • 并发多机器人handle_client改为threading.Threadasyncio,同时服务多个机器人节点
  • 消息类型扩展:在请求字典中加type字段区分关节角度、图像帧(base64)、力传感器等不同数据,服务端按type分发处理

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询