Skip to content

创建并启动队列消费者 - MessageQueueConsume

函数简介

创建并启动消息队列消费者。

接口名称

MessageQueueConsume

DLL调用

c
int64_t MessageQueueConsume(int64_t instance, int32_t type, int32_t connect_type,
                            char* ip, int32_t port, char* queue_name,
                            MessageQueueCallback on_message);

参数说明

参数名类型说明
instance长整数型OLA 实例句柄。
type整数型角色:必须为 CLIENT
connect_type整数型连接类型:TCP/PRO
ip字符串TCP 模式服务地址;PRO 模式可为空。
port整数型TCP 端口;PRO 模式为 0
queue_name字符串队列名(非空)。
on_messageMessageQueueCallback消费回调;可为 NULL(为 NULL 时可使用 MessageQueuePull 主动拉取)。

MessageQueueCallback 回调函数定义

c
typedef void(OLA_CALL_TYPE* MessageQueueCallback)(int64_t consumer, char* topic,
                                                  int64_t data_ptr, int32_t data_len,
                                                  int32_t is_text, int64_t ack_token);
参数名类型说明
consumer长整数型消费者句柄。
topic字符串主题名。
data_ptr长整数型数据指针(仅回调期间有效)。
data_len整数型数据长度(字节)。
is_text整数型是否文本:1=文本0=字节流
ack_token长整数型确认令牌,>0 时可 Ack/Nack

示例

SDK 调用

cpp
#include "OLAPlugServer.h"

OLAPlugServer ola;
// on_message=nullptr 时可配合 MessageQueuePull 主动拉取
long consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", nullptr);
if (consumer > 0) {
    // consumer 为消费者句柄
    ola.MessageQueueCancel(consumer);
}
csharp
using OLAPlug;

var ola = new OLAPlugServer();
long consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", null);
if (consumer > 0)
{
    // consumer 为消费者句柄
    ola.MessageQueueCancel(consumer);
}
python
from OLAPlugServer import OLAPlugServer

ola = OLAPlugServer()
consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", None)
if consumer > 0:
    # consumer 为消费者句柄
    ola.MessageQueueCancel(consumer)
java
import com.olaplug.OLAPlugServer;

OLAPlugServer ola = new OLAPlugServer();
long consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", null);
if (consumer > 0) {
    // consumer 为消费者句柄
    ola.MessageQueueCancel(consumer);
}
cpp
var ola = com("OlaPlug.OlaSoft")
var consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", 0)
if(consumer > 0) {
    // consumer 为消费者句柄
    ola.MessageQueueCancel(consumer)
}
vbscript
Set ola = CreateObject("OlaPlug.OlaSoft")
consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", 0)
If consumer > 0 Then
    // consumer 为消费者句柄
    ola.MessageQueueCancel(consumer)
End If
text
.局部变量 ola, OLAPlug
ola.创建 ()
consumer = ola.MessageQueueConsume (1, 1, "127.0.0.1", 18890, "task_queue", 0)
.如果真 (consumer > 0)
    // consumer 为消费者句柄
    ola.MessageQueueCancel (consumer)
.如果真结束
aardio
import OLAPlugServer;
var ola = OLAPlugServer();
var consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", null);
if(consumer > 0){
    // consumer 为消费者句柄
    ola.MessageQueueCancel(consumer);
}
text
变量 ola <类型 = OLAPlugServer>
ola = 新建 OLAPlugServer
长整数 consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", 0)
如果真 (consumer > 0)
{
    // consumer 为消费者句柄
    ola.MessageQueueCancel(consumer)
}
cpp
#include "OLAPlugServer.h"

OLAPlugServer ola;
long consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", nullptr);
if (consumer > 0) {
    // consumer 为消费者句柄
    ola.MessageQueueCancel(consumer);
}

原生 DLL 调用

cpp
long instance = CreateCOLAPlugInterFace();
long consumer = MessageQueueConsume(instance, 1, 1, "127.0.0.1", 18890, "task_queue", 0);
if (consumer > 0) {
    // 可 MessageQueuePull 或注册回调
    MessageQueueCancel(instance, consumer);
}
csharp
using System.Runtime.InteropServices;
using System.Text;

[DllImport("OLAPlug_x64.dll", CallingConvention = CallingConvention.StdCall)]
static extern long CreateCOLAPlugInterFace();
[DllImport("OLAPlug_x64.dll", CallingConvention = CallingConvention.StdCall)]
static extern long MessageQueueConsume(long ola, int type, int connect_type, string ip, int port, string queue_name, long callback);
[DllImport("OLAPlug_x64.dll", CallingConvention = CallingConvention.StdCall)]
static extern int MessageQueueCancel(long ola, long consumer);

long instance = CreateCOLAPlugInterFace();
long consumer = MessageQueueConsume(instance, 1, 1, "127.0.0.1", 18890, "task_queue", 0);
if (consumer > 0) {
    // 可 MessageQueuePull 或注册回调
    MessageQueueCancel(instance, consumer);
}
python
from ctypes import CDLL, c_int, c_int64

ola = CDLL("OLAPlug_x64.dll")
ola.CreateCOLAPlugInterFace.restype = c_int64
instance = ola.CreateCOLAPlugInterFace()
consumer = ola.MessageQueueConsume(instance, 1, 1, b"127.0.0.1", 18890, b"task_queue", 0)
if consumer:
    // 可 MessageQueuePull 或注册回调
    ola.MessageQueueCancel(instance, consumer)

返回值

返回值说明
(返回值)大于 0 为消费者句柄;<=0 为错误码。

注意事项

项目说明
回调模式与 Pull 模式互斥使用回调模式与 Pull 模式互斥使用,建议二选一。