Zenoh核心概念
本章将阐释使 Zenoh 独具特色的基础抽象理念。
理解这些概念,将助您设计出具备以下特性的系统:
位置透明(Location Transparent) —— 生产者与消费者完全解耦,无需感知对方在网络中的具体位置。
拓扑无关(Topology Independent) —— 无论您是从单机部署扩展到云边协同架构,应用逻辑代码均无需修改。
极致高效(Efficient) —— 零冗余拷贝,极低的线路开销。
键表达式
键表达式(Key Expression, KE)是 Zenoh 用于标识所有资源的命名系统 —— 它以一套统一的语法,替代了传统系统中的 URL、消息主题(Topic)和资源标识符。
语法
键表达式是正斜杠分隔的路径,可以选择包含通配符。以多个房间中传感器温度为例:
| 表达式 | 含义 |
|---|---|
| sensors/room1/temperature | 具体的资源 |
| sensors/ * /temperature | 在任意房间中传感器的温度(* 表示一个段) |
| sensors/ ** /temperature | 在任意嵌套段中的温度资源(** 表示任意嵌套段) |
| sensors/** | sensors/ 下所有资源 |
| **/temperature | 任意位置的任意温度资源 |
键表达式语言
键表达式支持交集运算与包含关系判定,这使得路由器能够在编译期(注:此处指路由系统的静态分析阶段,非编程语言编译时)确定哪些订阅与哪些发布相匹配。
| 概念 | 说明 |
|---|---|
| 包含测试 (Inclusion Test) | 判断一个键表达式是否完全覆盖另一个。例如:a/b/**包含a/b/c。 |
| 交集运算 (Intersection) | 判断两个键表达式是否存在重叠的资源空间。例如:a/*/c与a/b/*相交于a/b/c。 |
为何强调“编译时”或“静态分析”
这里的“编译时”并非指 C++ 代码的编译,而是指 Zenoh 路由器的声明周期阶段:
声明阶段(静态):当订阅者声明 subscribe(“sensors/temp/*”) 和发布者声明 publish(“sensors/temp/room1”) 时,Zenoh 立即计算二者的关系。
路由表构建:路由器在内存中维护一张表,记录“订阅者 A 对哪些前缀感兴趣”。
零运行时解析:当数据包到达时,路由器无需再解析字符串或计算通配符,直接查表转发。这消除了 CPU 开销,保证了 μs 级延迟。
下面是一个c++示例:
#include <zenoh.hxx> #include <iostream> int main() { // 1. 创建键表达式 zenoh::KeyExpr pub_key = zenoh::KeyExpr::create("sensors/temp/room1").unwrap(); zenoh::KeyExpr sub_key = zenoh::KeyExpr::create("sensors/temp/*").unwrap(); // 2. 测试包含关系 (Intersection Test) // 判断发布者的 Key 是否落在订阅者的 Key 范围内 if (pub_key.intersects(&sub_key)) { std::cout << "Match! The publisher's key is within the subscriber's scope." << std::endl; } else { std::cout << "No match." << std::endl; } // 3. 测试包含关系 (Inclusion Test) // 判断订阅者的 Key 是否包含发布者的 Key if (sub_key.includes(&pub_key)) { std::cout << "Inclusion confirmed: Subscriber covers the publisher." << std::endl; } return 0; }最佳实践
使用层级命名空间:遵循 {领域}/{实体}/{属性} 的结构(例如 robot/42/pose )。
保持分段语义明确:使用 robot/42/pose ,避免使用 r42p 这类晦涩缩写。
慎用 ** 通配符:宽泛的通配符会迫使路由器向大量订阅者分发数据(Fan-out),增加不必要的网络负载和处理延迟。
会话
会话(Session)是您的应用程序接入 Zenoh 网络的连接通道。Zenoh 的所有操作 —— 无论是发布(Publishing)、订阅(Subscribing)还是查询(Querying)—— 都必须经由 Session 执行。
可以将 Session 理解为 “应用程序与 Zenoh 网络之间的握手点”。它封装了所有的网络状态、配置和认证信息。
| 特性 | 说明 |
|---|---|
| 单一入口 | 所有 Zenoh API(发布、订阅、查询)都依附于 Session 对象。 |
| 轻量级 | 创建一个 Session 的开销很小,但它管理着底层的网络连接池。 |
| 配置载体 | 连接模式(对等网络 Peer-to-Peer 还是 客户端-路由器 Client-Router)、端点地址、超时时间等都通过 Session 的配置传入。 |
| 生命周期管理 | 通常遵循 RAII 原则(在 C++ 中尤为重要),Session 析构时自动清理资源。 |
打开Session
#include <zenoh.hxx> #include <iostream> int main() { // 1. 创建配置(决定是否连接路由器,还是作为独立节点) zenoh::Config config = zenoh::Config::create_default(); // 可选:配置连接到特定的路由器 config.insert_json("connect/endpoints", R"(["tcp/192.168.1.100:7447"])"); // 2. 开启 Session(这是最关键的一步) // 此时,应用程序正式加入 Zenoh 网络 zenoh::Session session = zenoh::open(std::move(config)); // 3. 所有后续操作都基于这个 Session 对象 // 3.1 声明发布者 auto publisher = session.declare_publisher("demo/data"); // 3.2 声明订阅者 auto subscriber = session.declare_subscriber("demo/data", [](const zenoh::Sample& s) { /* ... */ }); // 3.3 发起查询 auto replies = session.get("demo/data"); // 4. 使用 Session 发布数据 publisher.put("Hello Zenoh!"); return 0; } // Session 析构,自动断开连接并释放资源Session模式
| 模式 (Mode) | 描述 (Description) |
|---|---|
| peer | 点对点直连模式;适用于局域网(LAN)部署,节点间直接通信。 |
| client | 客户端模式;连接到路由器(zenohd);适用于资源受限设备或广域网(WAN)场景。 |
| router | 路由节点模式;作为路由节点运行,结合了点对点通信与数据路由能力。 |
- 点对点模式配置文件
// config_peer.json5{mode:"peer",// 设置为 peer 模式// 监听其他 peer 的连接listen:{endpoints:["tcp/[::]:7447","udp/[::]:7447"]},// 主动尝试连接的其他 peer 列表connect:{endpoints:["tcp/192.168.1.10:7447",// 连接另一台 peer"tcp/192.168.1.11:7447"]},// 局域网探测(用于自动发现邻居scouting:{multicast:{enabled:true,address:"224.0.0.224:7447"}}}- 客户端模式配置文件
// config_client.json5{mode:"client",// 设置为 client 模式// Client 通常不监听端口,只主动连接 Routerlisten:{endpoints:[]},// 指定要连接的路由器地址connect:{endpoints:["tcp/router.mycompany.com:7447",// 云端的路由器"tls/backup-router:7447"// 备用路由器(TLS加密)]},// 关闭局域网探测以节省资源scouting:{multicast:{enabled:false}}}- Router模式(路由节点)
// config_router.json5{mode:"router",// 设置为 router 模式// 监听所有网络接口,供 client 和 peer 连接listen:{endpoints:["tcp/[::]:7447",// 标准 TCP 端口"tls/[::]:7448",// TLS 加密端口"ws/[::]:80"// WebSocket 端口(供 Web 前端使用)]},// 路由插件配置(示例:启用 REST API)plugins:{"zenoh-plugin-rest":{enabled:true,endpoints:["http/[::]:8000"]},// 示例:启用 DDS 桥接"zenoh-plugin-dds":{enabled:true}},// 后端存储配置(示例:内存存储)backends:{memory:{my_storage:{key_expr:"demo/storage/**"}}}}Session生命周期
会话(Session)在其整个生存期内会持有开放的连接和活跃的订阅。使用完毕后,需要显式关闭以释放资源:
#include <zenoh.hxx> #include <iostream> int main() { try { // 1. 创建并打开会话 zenoh::Config config = zenoh::Config::create_default(); zenoh::Session session = zenoh::open(std::move(config)); std::cout << "Session opened successfully." << std::endl; // 2. 执行一些操作... auto publisher = session.declare_publisher("demo/close_test"); publisher.put("Hello before close"); // 3. 显式关闭会话 session.close(); std::cout << "Session closed successfully." << std::endl; } catch (const std::exception& e) { std::cerr << "Error: " << e.what() << std::endl; return 1; } return 0; }或者让其自动析构 —— Zenoh 会在作用域结束时优雅地清理所有资源。
Multiple Sessions(多会话)
在 Zenoh C++ 中,您可以同时创建和管理多个独立的 Session,每个 Session 可以连接不同的网络、使用不同的配置(如不同的路由器、不同的认证方式),且它们之间完全隔离。
两个独立的Session
#include <zenoh.hxx> #include <iostream> #include <thread> void run_network_a() { // Session A: 连接生产环境路由器 zenoh::Config config_a = zenoh::Config::create_default(); config_a.insert_json("connect/endpoints", R"(["tcp/prod-router:7447"])"); zenoh::Session session_a = zenoh::open(std::move(config_a)); auto pub_a = session_a.declare_publisher("prod/sensor/data"); pub_a.put("Data for Production Network"); std::cout << "Published to Network A (Production)" << std::endl; } void run_network_b() { // Session B: 连接测试环境路由器 zenoh::Config config_b = zenoh::Config::create_default(); config_b.insert_json("connect/endpoints", R"(["tcp/test-router:7447"])"); zenoh::Session session_b = zenoh::open(std::move(config_b)); auto pub_b = session_b.declare_publisher("test/sensor/data"); pub_b.put("Data for Test Network"); std::cout << "Published to Network B (Testing)" << std::endl; } int main() { // 同时运行两个逻辑 std::thread t1(run_network_a); std::thread t2(run_network_b); t1.join(); t2.join(); return 0; }RAII 包装器管理多 Session
#include <zenoh.hxx> #include <memory> #include <vector> #include <iostream> class MultiSessionManager { private: std::vector<std::unique_ptr<zenoh::Session>> sessions; public: void add_session(const std::string& config_json) { zenoh::Config config = zenoh::Config::create_default(); config.insert_json("connect/endpoints", config_json); // 使用 unique_ptr 管理 Session sessions.push_back( std::make_unique<zenoh::Session>(zenoh::open(std::move(config))) ); } zenoh::Session& get_session(size_t index) { return *sessions.at(index); } ~MultiSessionManager() { // unique_ptr 会自动调用 Session 的析构函数 (close()) sessions.clear(); std::cout << "All sessions closed." << std::endl; } }; int main() { MultiSessionManager manager; // 添加两个不同配置的 Session manager.add_session(R"(["tcp/cloud:7447"])"); // Session 0: Cloud manager.add_session(R"(["tcp/edge:7447"])"); // Session 1: Edge // 分别使用 auto pub1 = manager.get_session(0).declare_publisher("cloud/alerts"); pub1.put("Cloud Alert!"); auto pub2 = manager.get_session(1).declare_publisher("edge/metrics"); pub2.put("Edge Metrics!"); // 离开作用域时,manager 析构,自动关闭所有 Session return 0; }其他
- 线程安全:
zenoh::Session 本身是线程安全的,可以在多线程间共享。
但如果您创建了多个 Session,每个 Session 都可以独立地在不同的线程中使用。
- 资源消耗:
每个 Session 都会维护自己的网络连接和缓冲区。
创建过多 Session 可能会增加内存和 CPU 开销。通常一个进程 1-2 个 Session 足以应对大多数复杂场景。
- 配置隔离:
每个 Session 的配置是独立的。修改 Session A 的配置不会影响 Session B。
- 何时使用多 Session?
跨网络桥接:需要将数据从一个 Zenoh 网络转发到另一个 Zenoh 网络(例如:工厂网络 -> 云端网络)。
多租户应用:一个应用程序需要同时以不同的身份(不同的鉴权配置)连接同一个路由器。
差异化 QoS:一个 Session 用于高可靠的信令传输(TCP),另一个用于低延迟的视频流传输(UDP/QUIC)。