Skip to content

Pub/Sub使用指南

简介

Pub/Sub(发布订阅)适合“一个发布者对多个订阅者”的广播场景。
在 OLA 中可使用 TCP(跨进程/跨主机)或 PRO(进程内)两种模式。

典型使用流程

  1. 创建 instanceCreateCOLAPlugInterFace)。
  2. 创建 Pub/Sub 句柄(PubSubNew),指定角色与连接类型。
  3. 订阅主题(PubSubSub)或直接发布消息(PubSubPubText/PubSubPubBytes)。
  4. 需要查看状态时,调用 PubSubGetNetStatusPubSubGetMyTopics
  5. 结束时取消订阅(可选)并释放句柄(PubSubFree)。

角色与连接类型

  • 角色:
    • OLA_PUBSUB_TYPE_CLIENT = 1
    • OLA_PUBSUB_TYPE_SERVER = 2
  • 连接类型:
    • OLA_PUBSUB_CONNECT_TCP = 1
    • OLA_PUBSUB_CONNECT_PRO = 2

回调函数定义

c
typedef void(OLA_CALL_TYPE* PubSubCallback)(int64_t client, char* topic,
                                            int64_t data_ptr, int32_t data_len, int32_t is_text);

参数说明:

  • client:客户端句柄
  • topic:主题名
  • data_ptr:数据指针(仅回调期间有效)
  • data_len:数据长度(字节)
  • is_text:是否文本(1=文本0=字节流

最小示例(TCP)

c
// 1) B端:启动 Pub/Sub 服务端
int64_t server = PubSubNew(instance, OLA_PUBSUB_TYPE_SERVER, OLA_PUBSUB_CONNECT_TCP,
                           "0.0.0.0", 18990, OnPubSubMessage);

// 2) A端:启动客户端并订阅主题
int64_t client = PubSubNew(instance, OLA_PUBSUB_TYPE_CLIENT, OLA_PUBSUB_CONNECT_TCP,
                           "127.0.0.1", 18990, OnPubSubMessage);
PubSubSub(instance, client, "demo.topic");

// 3) 发布消息
PubSubPubText(instance, client, "demo.topic", "hello pubsub");

常见使用方式

  • 仅发布不消费PubSubNewon_message 可传 NULL
  • 批量取消订阅:使用 PubSubUnsubAll
  • 查看连接与订阅信息
    • PubSubGetNetStatus 返回连接状态 JSON(记得释放)。
    • PubSubGetMyTopics 返回当前订阅 JSON(记得释放)。
  • 服务端统计主题订阅数PubSubGetTopicSubCount(推荐服务端调用)。

注意事项

项目说明
窗口绑定typeconnect_type 的数值与 ABI 绑定,必须按 1/2 传值,不能自定义为 0/1
PRO 模式建议 ip="" 且 `poPRO 模式建议 ip=""port=0TCP 模式需确保地址和端口可达。
主题名 topic 必须非空主题名 topic 必须非空,建议统一命名(如 module.action)避免冲突。
回调中避免长时间阻塞;如需重计算回调中避免长时间阻塞;如需重计算,建议转交工作线程。
释放内存PubSubGetNetStatus / PubSubGetMyTopics 返回的字符串需要调用 FreeStringPtr 释放。
使用完成后务必 PubSubFree使用完成后务必 PubSubFree,避免句柄泄漏。

排障建议

  • 连接失败:先检查 type/connect_type 数值、IP/端口、防火墙和监听状态。
  • 发布成功但没回调:检查是否已成功订阅该主题、主题名是否完全一致。
  • 间歇性异常:检查回调线程是否阻塞,或是否重复释放句柄。