RabbitMQ 是一个流行的消息中间件,它实现了 Advanced Message Queuing Protocol (AMQP),并支持多种编程语言,包括 C++。在这篇文章中,我们将详细探讨如何在 C++ 中使用 RabbitMQ,通过一个简单的示例来说明其基本用法。

RabbitMQ 概述

RabbitMQ 是一个基于队列的消息代理,允许不同的应用程序通过消息进行通信。它的核心组成部分包括:

  • Producer:消息的发送者。
  • Queue:存储消息的地方。
  • Consumer:消息的接收者。

C++ 中使用 RabbitMQ

在 C++ 中与 RabbitMQ 进行交互通常可以使用 rabbitmq-c 这个 C 语言库。首先,我们需要安装这个库,并确保你的开发环境配置好了。

安装 rabbitmq-c

在 Ubuntu 系统中,可以通过以下命令进行安装:

sudo apt-get install librabbitmq-dev

创建 C++ 项目

接下来,我们创建一个简单的 C++ 项目来演示如何发送和接收消息。

  1. 创建 Producer
#include <iostream>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>

void send_message(const char* message) {
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t* socket = amqp_tcp_socket_new(conn);

    if (!socket) {
        std::cerr << "创建 TCP socket 失败!" << std::endl;
        return;
    }

    if (amqp_socket_open(socket, "127.0.0.1", 5672)) {
        std::cerr << "连接 RabbitMQ server 失败!" << std::endl;
        return;
    }

    amqp_login(conn, "/", AMQP_VERSION_MAJOR, AMQP_VERSION_MINOR, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    amqp_channel_open(conn, 1);
    amqp_get_rpc_reply(conn);
    amqp_queue_declare(conn, 1, amqp_bytes_malloc_dup("hello"), 0, 0, 0, 0, nullptr);
    amqp_get_rpc_reply(conn);

    amqp_basic_publish(conn, 1, amqp_bytes_malloc_dup(""), amqp_bytes_malloc_dup("hello"), 0, 0, nullptr, amqp_bytes_malloc_dup(message));
    std::cout << "发送消息: " << message << std::endl;

    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
}

int main() {
    const char* msg = "Hello RabbitMQ!";
    send_message(msg);
    return 0;
}

在这个代码段中,我们首先建立与 RabbitMQ 服务器的连接,然后声明了一个队列(hello),最后将消息发送到该队列中。

  1. 创建 Consumer
#include <iostream>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>

void receive_message() {
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t* socket = amqp_tcp_socket_new(conn);

    if (!socket) {
        std::cerr << "创建 TCP socket 失败!" << std::endl;
        return;
    }

    if (amqp_socket_open(socket, "127.0.0.1", 5672)) {
        std::cerr << "连接 RabbitMQ server 失败!" << std::endl;
        return;
    }

    amqp_login(conn, "/", AMQP_VERSION_MAJOR, AMQP_VERSION_MINOR, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    amqp_channel_open(conn, 1);
    amqp_get_rpc_reply(conn);
    amqp_queue_declare(conn, 1, amqp_bytes_malloc_dup("hello"), 0, 0, 0, 0, nullptr);
    amqp_get_rpc_reply(conn);

    amqp_basic_consume(conn, 1, amqp_bytes_malloc_dup("hello"), amqp_bytes_malloc_dup(""), 0, 1, 0, nullptr);
    amqp_get_rpc_reply(conn);

    while (true) {
        amqp_envelope_t envelope;
        amqp_maybe_release_buffers(conn);
        amqp_simple_wait_frame(conn);
        amqp_simple_wait_frame(conn);
        amqp_bytes_t body = envelope.message.body;
        std::cout << "接收到消息: " << (char *)body.bytes << std::endl;

        amqp_destroy_envelope(&envelope);
    }

    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
}

int main() {
    receive_message();
    return 0;
}

在这个代码段中,消费者代码连接到 RabbitMQ,再次声明了相同的队列,从中获取消息并打印出来。

总结

通过上述代码,我们展示了如何使用 RabbitMQ 的 C++ 客户端发送和接收消息。虽然这只是一个简单的演示,但 RabbitMQ 提供了丰富的功能,比如消息确认、持久性、路由等,可以根据实际需求进行扩展。结合 RabbitMQ 使用 C++进行异步消息处理,可以极大提升系统的可拓展性与性能。

点赞(0) 打赏

微信小程序

微信扫一扫体验

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部