Skip to content

MessageQueue使用指南

简介

MessageQueue 适用于“生产者-消费者”队列模型。
同一 queue_name 支持多个消费者竞争消费(非广播)。

典型流程

  1. 创建 instance
  2. 声明队列并创建发布端句柄:MessageQueueDeclare
  3. 创建消费者:MessageQueueConsume(回调模式)或 MessageQueueConsume + MessageQueuePull(主动拉取模式)。
  4. 发布消息:MessageQueuePublishText/MessageQueuePublishBytes
  5. 消费后确认:MessageQueueAck/MessageQueueNack
  6. 结束时释放:MessageQueueCancelMessageQueueClose

回调函数定义

c
typedef void(OLA_CALL_TYPE* MessageQueueCallback)(int64_t consumer, char* topic,
                                                  int64_t data_ptr, int32_t data_len,
                                                  int32_t is_text, int64_t ack_token);

参数说明:

  • consumer:消费者句柄
  • topic:主题名(队列名)
  • data_ptr:数据指针(仅回调期间有效)
  • data_len:数据长度(字节)
  • is_text:是否文本(1=文本0=字节流
  • ack_token:确认令牌(>0 时可 MessageQueueAck/Nack

最小示例(回调消费)

c
int64_t producer = MessageQueueDeclare(instance, OLA_PUBSUB_TYPE_CLIENT, OLA_PUBSUB_CONNECT_TCP,
                                       "127.0.0.1", 18990, "order.created");

int64_t consumer = MessageQueueConsume(instance, OLA_PUBSUB_TYPE_CLIENT, OLA_PUBSUB_CONNECT_TCP,
                                       "127.0.0.1", 18990, "order.created", OnQueueMessage);

MessageQueuePublishText(instance, producer, "order=1001");

Pull 模式说明

  • MessageQueueConsumeon_message=NULL 时,可用 MessageQueuePull 主动拉取。
  • timeout_ms
    • 0:立即返回
    • >0:最多等待 N 毫秒
    • <0:无限等待
  • MessageQueuePull 返回 JSON 字符串,使用后需 FreeStringPtr 释放。

注意事项

项目说明
type 推荐使用 CLIENT(消费者type 推荐使用 CLIENT(消费者侧必须为 CLIENT)。
connect_type 固定使用 `1/2connect_type 固定使用 1/2(TCP/PRO),不要自行改值。
回调模式与 Pull 模式建议二选一回调模式与 Pull 模式建议二选一,避免并发竞争同一消费者句柄。
data_ptr 仅在回调期间有效data_ptr 仅在回调期间有效,异步处理前请拷贝数据。
业务处理后建议显式 Ack/Nack业务处理后建议显式 Ack/Nack,避免消息堆积或重复投递。