主题
创建并启动队列消费者 - MessageQueueConsume
函数简介
创建并启动消息队列消费者。
接口名称
MessageQueueConsumeDLL调用
c
int64_t MessageQueueConsume(int64_t instance, int32_t type, int32_t connect_type,
char* ip, int32_t port, char* queue_name,
MessageQueueCallback on_message);参数说明
| 参数名 | 类型 | 说明 |
|---|---|---|
| instance | 长整数型 | OLA 实例句柄。 |
| type | 整数型 | 角色:必须为 CLIENT。 |
| connect_type | 整数型 | 连接类型:TCP/PRO。 |
| ip | 字符串 | TCP 模式服务地址;PRO 模式可为空。 |
| port | 整数型 | TCP 端口;PRO 模式为 0。 |
| queue_name | 字符串 | 队列名(非空)。 |
| on_message | MessageQueueCallback | 消费回调;可为 NULL(为 NULL 时可使用 MessageQueuePull 主动拉取)。 |
MessageQueueCallback 回调函数定义
c
typedef void(OLA_CALL_TYPE* MessageQueueCallback)(int64_t consumer, char* topic,
int64_t data_ptr, int32_t data_len,
int32_t is_text, int64_t ack_token);| 参数名 | 类型 | 说明 |
|---|---|---|
| consumer | 长整数型 | 消费者句柄。 |
| topic | 字符串 | 主题名。 |
| data_ptr | 长整数型 | 数据指针(仅回调期间有效)。 |
| data_len | 整数型 | 数据长度(字节)。 |
| is_text | 整数型 | 是否文本:1=文本,0=字节流。 |
| ack_token | 长整数型 | 确认令牌,>0 时可 Ack/Nack。 |
示例
SDK 调用
cpp
#include "OLAPlugServer.h"
OLAPlugServer ola;
// on_message=nullptr 时可配合 MessageQueuePull 主动拉取
long consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", nullptr);
if (consumer > 0) {
// consumer 为消费者句柄
ola.MessageQueueCancel(consumer);
}csharp
using OLAPlug;
var ola = new OLAPlugServer();
long consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", null);
if (consumer > 0)
{
// consumer 为消费者句柄
ola.MessageQueueCancel(consumer);
}python
from OLAPlugServer import OLAPlugServer
ola = OLAPlugServer()
consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", None)
if consumer > 0:
# consumer 为消费者句柄
ola.MessageQueueCancel(consumer)java
import com.olaplug.OLAPlugServer;
OLAPlugServer ola = new OLAPlugServer();
long consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", null);
if (consumer > 0) {
// consumer 为消费者句柄
ola.MessageQueueCancel(consumer);
}cpp
var ola = com("OlaPlug.OlaSoft")
var consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", 0)
if(consumer > 0) {
// consumer 为消费者句柄
ola.MessageQueueCancel(consumer)
}vbscript
Set ola = CreateObject("OlaPlug.OlaSoft")
consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", 0)
If consumer > 0 Then
// consumer 为消费者句柄
ola.MessageQueueCancel(consumer)
End Iftext
.局部变量 ola, OLAPlug
ola.创建 ()
consumer = ola.MessageQueueConsume (1, 1, "127.0.0.1", 18890, "task_queue", 0)
.如果真 (consumer > 0)
// consumer 为消费者句柄
ola.MessageQueueCancel (consumer)
.如果真结束aardio
import OLAPlugServer;
var ola = OLAPlugServer();
var consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", null);
if(consumer > 0){
// consumer 为消费者句柄
ola.MessageQueueCancel(consumer);
}text
变量 ola <类型 = OLAPlugServer>
ola = 新建 OLAPlugServer
长整数 consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", 0)
如果真 (consumer > 0)
{
// consumer 为消费者句柄
ola.MessageQueueCancel(consumer)
}cpp
#include "OLAPlugServer.h"
OLAPlugServer ola;
long consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", nullptr);
if (consumer > 0) {
// consumer 为消费者句柄
ola.MessageQueueCancel(consumer);
}原生 DLL 调用
cpp
long instance = CreateCOLAPlugInterFace();
long consumer = MessageQueueConsume(instance, 1, 1, "127.0.0.1", 18890, "task_queue", 0);
if (consumer > 0) {
// 可 MessageQueuePull 或注册回调
MessageQueueCancel(instance, consumer);
}csharp
using System.Runtime.InteropServices;
using System.Text;
[DllImport("OLAPlug_x64.dll", CallingConvention = CallingConvention.StdCall)]
static extern long CreateCOLAPlugInterFace();
[DllImport("OLAPlug_x64.dll", CallingConvention = CallingConvention.StdCall)]
static extern long MessageQueueConsume(long ola, int type, int connect_type, string ip, int port, string queue_name, long callback);
[DllImport("OLAPlug_x64.dll", CallingConvention = CallingConvention.StdCall)]
static extern int MessageQueueCancel(long ola, long consumer);
long instance = CreateCOLAPlugInterFace();
long consumer = MessageQueueConsume(instance, 1, 1, "127.0.0.1", 18890, "task_queue", 0);
if (consumer > 0) {
// 可 MessageQueuePull 或注册回调
MessageQueueCancel(instance, consumer);
}python
from ctypes import CDLL, c_int, c_int64
ola = CDLL("OLAPlug_x64.dll")
ola.CreateCOLAPlugInterFace.restype = c_int64
instance = ola.CreateCOLAPlugInterFace()
consumer = ola.MessageQueueConsume(instance, 1, 1, b"127.0.0.1", 18890, b"task_queue", 0)
if consumer:
// 可 MessageQueuePull 或注册回调
ola.MessageQueueCancel(instance, consumer)返回值
| 返回值 | 说明 |
|---|---|
| (返回值) | 大于 0 为消费者句柄;<=0 为错误码。 |
注意事项
| 项目 | 说明 |
|---|---|
| 回调模式与 Pull 模式互斥使用 | 回调模式与 Pull 模式互斥使用,建议二选一。 |
