创建并启动队列消费者 - MessageQueueConsume
函数简介
创建并启动消息队列消费者。
接口名称
MessageQueueConsume
DLL调用
int64_t MessageQueueConsume(int64_t instance, int32_t type, int32_t connect_type,
char* ip, int32_t port, char* queue_name,
MessageQueueCallback on_message);
参数说明
| 参数名 | 类型 | 说明 |
|---|---|---|
| instance | 长整数型 | OLA 实例句柄。 |
| type | 整数型 | 角色:必须为 CLIENT。 |
| connect_type | 整数型 | 连接类型:TCP/PRO。 |
| ip | 字符串 | TCP 模式服务地址;PRO 模式可为空。 |
| port | 整数型 | TCP 端口;PRO 模式为 0。 |
| queue_name | 字符串 | 队列名(非空)。 |
| on_message | MessageQueueCallback | 消费回调;可为 NULL(为 NULL 时可使用 MessageQueuePull 主动拉取)。 |
MessageQueueCallback 回调函数定义
typedef void(OLA_CALL_TYPE* MessageQueueCallback)(int64_t consumer, char* topic,
int64_t data_ptr, int32_t data_len,
int32_t is_text, int64_t ack_token);
| 参数名 | 类型 | 说明 |
|---|---|---|
| consumer | 长整数型 | 消费者句柄。 |
| topic | 字符串 | 主题名。 |
| data_ptr | 长整数型 | 数据指针(仅回调期间有效)。 |
| data_len | 整数型 | 数据长度(字节)。 |
| is_text | 整数型 | 是否文本:1=文本,0=字节流。 |
| ack_token | 长整数型 | 确认令牌,>0 时可 Ack/Nack。 |
示例
void OLA_CALL_TYPE OnQueueMessage(int64_t consumer, char* topic,
int64_t data_ptr, int32_t data_len,
int32_t is_text, int64_t ack_token) {
if (is_text == 1) {
printf("[MQ] %s -> %.*s\n", topic, data_len, (char*)data_ptr);
}
if (ack_token > 0) {
MessageQueueAck(instance, consumer, ack_token);
}
}
int64_t consumer = MessageQueueConsume(instance, OLA_PUBSUB_TYPE_CLIENT, OLA_PUBSUB_CONNECT_TCP,
"127.0.0.1", 18990, "order.created", OnQueueMessage);
返回值
大于 0 为消费者句柄;<=0 为错误码。
注意事项
- 回调模式与 Pull 模式互斥使用,建议二选一。
