RocketMQ快速入门:集成Java客户端实现各类消息发送

RocketMQ 是阿里巴巴开源的一款分布式消息队列,它具有高吞吐量、高可用性、可扩展性等特点,广泛应用于分布式应用程序中。本文将介绍如何使用 Java 客户端与 RocketMQ 集成,实现异步、同步、顺序、单向、延迟和事务消息发送,并附上相应的代码示例。

1. 环境准备

1.1 Maven 依赖

在项目的 pom.xml 中添加 RocketMQ 的 Maven 依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>5.0.0</version>
    </dependency>
</dependencies>

1.2 启动 RocketMQ

确保 RocketMQ 服务已经启动,您可以在本地环境中启动 RocketMQ,或者使用云服务。

2. 创建消息发送代码

接下来,我们实现不同类型的消息发送功能。

2.1 同步消息

同步发送是最常用的消息发送方式,发送者会等待消息发送结果。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("testProducerGroup");
        producer.setNamesrvAddr("localhost:9876"); // 设置 NameServer 地址
        producer.start();

        Message msg = new Message("TestTopic", "TagA", "Key1", "Hello RocketMQ Sync".getBytes());
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        producer.shutdown();
    }
}

2.2 异步消息

异步发送消息,不需要等待返回结果,可以提高发送效率。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("testProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("TestTopic", "TagA", "Key2", "Hello RocketMQ Async".getBytes());
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("Send success: %s%n", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                System.err.printf("Send failed: %s%n", e.getMessage());
            }
        });

        producer.shutdown();
    }
}

2.3 顺序消息

顺序消息适用于需要严格按照发送顺序的场景。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.Executors;

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("orderedProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            Message msg = new Message("OrderedTopic", "TagA", String.valueOf(i).getBytes());
            SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> mqs.get(arg % mqs.size()), i);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}

2.4 单向消息

单向发送不需要等待发送结果,适合对响应时间要求不高的场景。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("onewayProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("TestTopic", "TagA", "Key3", "Hello RocketMQ Oneway".getBytes());
        producer.sendOneway(msg);
        System.out.println("One-way message sent");

        producer.shutdown();
    }
}

2.5 延迟消息

延迟消息可在特定时间后被消费。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class DelayedProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("delayedProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("DelayedTopic", "TagA", "Key4", "Hello RocketMQ Delayed".getBytes());
        msg.setDelayTimeLevel(3); // 延迟发送
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        producer.shutdown();
    }
}

2.6 事务消息

事务消息分为三步:发送、执行本地事务、根据本地事务结果提交或回滚。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("transactionProducerGroup");
        producer.setNamesrvAddr("localhost:9876");

        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 执行本地事务
                // 返回事务状态
                return LocalTransactionState.COMMIT_MESSAGE; // 提交
            }

            @Override
            public LocalTransactionState checkLocalTransaction(Message msg) {
                // 检查本地事务状态
                return LocalTransactionState.COMMIT_MESSAGE; // 提交
            }
        });

        producer.start();

        Message msg = new Message("TransactionTopic", "TagA", "Key5", "Hello RocketMQ Transaction".getBytes());
        producer.sendMessageInTransaction(msg, null);

        producer.shutdown();
    }
}

结论

通过上述示例,我们能够使用 RocketMQ 实现不同类型的消息发送。RocketMQ 提供了灵活的消息发送方式,用户可以根据实际需求选择合适的发送方式。在实际应用中,建议根据系统架构和业务需求做好消息发送机制的设计与优化。希望本文对你快速入门 RocketMQ 有所帮助!

点赞(0) 打赏

微信小程序

微信扫一扫体验

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部