主题
主动拉取队列消息 - MessageQueuePull
函数简介
主动拉取一条消息,返回 JSON 字符串。
接口名称
MessageQueuePullDLL调用
c
char* MessageQueuePull(int64_t instance, int64_t consumer, int32_t timeout_ms);参数说明
| 参数名 | 类型 | 说明 |
|---|---|---|
| instance | 长整数型 | OLA 实例句柄。 |
| consumer | 长整数型 | 消费者句柄(由 MessageQueueConsume 创建,且 on_message 可为 NULL)。 |
| timeout_ms | 整数型 | 超时策略:0=立即返回,>0=最多等待 N 毫秒,<0=无限等待。 |
返回 JSON 字段
topic(string):队列名is_text(int):1=文本,0=二进制ack_token(int64):消息确认令牌,可用于MessageQueueAck/Nackdata_len(int):负载字节长度data(string):文本内容(仅is_text=1)data_hex(string):二进制十六进制(仅is_text=0)
示例
SDK 调用
cpp
#include "OLAPlugServer.h"
OLAPlugServer ola;
// on_message=nullptr 时可配合 MessageQueuePull 主动拉取
long consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", nullptr);
if (consumer > 0) {
std::string msg = ola.MessageQueuePull(consumer, 1000);
ola.MessageQueueCancel(consumer);
}csharp
using OLAPlug;
var ola = new OLAPlugServer();
long consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", null);
if (consumer > 0)
{
string msg = ola.MessageQueuePull(consumer, 1000);
ola.MessageQueueCancel(consumer);
}python
from OLAPlugServer import OLAPlugServer
ola = OLAPlugServer()
consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", None)
if consumer > 0:
msg = ola.MessageQueuePull(consumer, 1000)
ola.MessageQueueCancel(consumer)java
import com.olaplug.OLAPlugServer;
OLAPlugServer ola = new OLAPlugServer();
long consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", null);
if (consumer > 0) {
ola.MessageQueuePull(consumer, 1000);
ola.MessageQueueCancel(consumer);
}cpp
var ola = com("OlaPlug.OlaSoft")
var consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", 0)
if(consumer > 0) {
ola.MessageQueuePull(consumer, 1000);
ola.MessageQueueCancel(consumer)
}vbscript
Set ola = CreateObject("OlaPlug.OlaSoft")
consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", 0)
If consumer > 0 Then
ola.MessageQueuePull(consumer, 1000);
ola.MessageQueueCancel(consumer)
End Iftext
.局部变量 ola, OLAPlug
ola.创建 ()
consumer = ola.MessageQueueConsume (1, 1, "127.0.0.1", 18890, "task_queue", 0)
.如果真 (consumer > 0)
ola.MessageQueuePull(consumer, 1000);
ola.MessageQueueCancel (consumer)
.如果真结束aardio
import OLAPlugServer;
var ola = OLAPlugServer();
var consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", null);
if(consumer > 0){
ola.MessageQueuePull(consumer, 1000);
ola.MessageQueueCancel(consumer);
}text
变量 ola <类型 = OLAPlugServer>
ola = 新建 OLAPlugServer
长整数 consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", 0)
如果真 (consumer > 0)
{
ola.MessageQueuePull(consumer, 1000);
ola.MessageQueueCancel(consumer)
}cpp
#include "OLAPlugServer.h"
OLAPlugServer ola;
long consumer = ola.MessageQueueConsume(1, 1, "127.0.0.1", 18890, "task_queue", nullptr);
if (consumer > 0) {
ola.MessageQueuePull(consumer, 1000);
ola.MessageQueueCancel(consumer);
}原生 DLL 调用
cpp
long instance = CreateCOLAPlugInterFace();
long consumer = MessageQueueConsume(instance, 1, 1, "127.0.0.1", 18890, "task_queue", 0);
if (consumer > 0) {
long ptr = MessageQueuePull(instance, consumer, 1000);
MessageQueueCancel(instance, consumer);
}csharp
using System.Runtime.InteropServices;
using System.Text;
[DllImport("OLAPlug_x64.dll", CallingConvention = CallingConvention.StdCall)]
static extern long CreateCOLAPlugInterFace();
[DllImport("OLAPlug_x64.dll", CallingConvention = CallingConvention.StdCall)]
static extern long MessageQueueConsume(long ola, int type, int connect_type, string ip, int port, string queue_name, long callback);
[DllImport("OLAPlug_x64.dll", CallingConvention = CallingConvention.StdCall)]
static extern int MessageQueueCancel(long ola, long consumer);
[DllImport("OLAPlug_x64.dll", CallingConvention = CallingConvention.StdCall)]
static extern long MessageQueuePull(long ola, long consumer, int timeout_ms);
long instance = CreateCOLAPlugInterFace();
long consumer = MessageQueueConsume(instance, 1, 1, "127.0.0.1", 18890, "task_queue", 0);
if (consumer > 0) {
long ptr = MessageQueuePull(instance, consumer, 1000);
MessageQueueCancel(instance, consumer);
}python
from ctypes import CDLL, c_int, c_int64
ola = CDLL("OLAPlug_x64.dll")
ola.CreateCOLAPlugInterFace.restype = c_int64
instance = ola.CreateCOLAPlugInterFace()
consumer = ola.MessageQueueConsume(instance, 1, 1, b"127.0.0.1", 18890, b"task_queue", 0)
if consumer:
long ptr = ola.MessageQueuePull(instance, consumer, 1000);
ola.MessageQueueCancel(instance, consumer)返回值
| 返回值 | 说明 |
|---|---|
| (返回值) | JSON 字符串指针;无消息或失败返回 0。 |
注意事项
| 项目 | 说明 |
|---|---|
| 释放内存 | 返回值需调用 FreeStringPtr 释放。 |
