MessageQueue使用指南
简介
MessageQueue 适用于“生产者-消费者”队列模型。
同一 queue_name 支持多个消费者竞争消费(非广播)。
典型流程
- 创建
instance。 - 声明队列并创建发布端句柄:
MessageQueueDeclare。 - 创建消费者:
MessageQueueConsume(回调模式)或MessageQueueConsume + MessageQueuePull(主动拉取模式)。 - 发布消息:
MessageQueuePublishText/MessageQueuePublishBytes。 - 消费后确认:
MessageQueueAck/MessageQueueNack。 - 结束时释放:
MessageQueueCancel、MessageQueueClose。
回调函数定义
typedef void(OLA_CALL_TYPE* MessageQueueCallback)(int64_t consumer, char* topic,
int64_t data_ptr, int32_t data_len,
int32_t is_text, int64_t ack_token);
参数说明:
consumer:消费者句柄topic:主题名(队列名)data_ptr:数据指针(仅回调期间有效)data_len:数据长度(字节)is_text:是否文本(1=文本,0=字节流)ack_token:确认令牌(>0时可MessageQueueAck/Nack)
最小示例(回调消费)
int64_t producer = MessageQueueDeclare(instance, OLA_PUBSUB_TYPE_CLIENT, OLA_PUBSUB_CONNECT_TCP,
"127.0.0.1", 18990, "order.created");
int64_t consumer = MessageQueueConsume(instance, OLA_PUBSUB_TYPE_CLIENT, OLA_PUBSUB_CONNECT_TCP,
"127.0.0.1", 18990, "order.created", OnQueueMessage);
MessageQueuePublishText(instance, producer, "order=1001");
Pull 模式说明
- 当
MessageQueueConsume的on_message=NULL时,可用MessageQueuePull主动拉取。 timeout_ms:0:立即返回>0:最多等待 N 毫秒<0:无限等待
MessageQueuePull返回 JSON 字符串,使用后需FreeStringPtr释放。
注意事项
type推荐使用CLIENT(消费者侧必须为CLIENT)。connect_type固定使用1/2(TCP/PRO),不要自行改值。- 回调模式与 Pull 模式建议二选一,避免并发竞争同一消费者句柄。
data_ptr仅在回调期间有效,异步处理前请拷贝数据。- 业务处理后建议显式 Ack/Nack,避免消息堆积或重复投递。
