RocketMQ快速入门:集成Java客户端实现各类消息发送
RocketMQ 是阿里巴巴开源的一款分布式消息队列,它具有高吞吐量、高可用性、可扩展性等特点,广泛应用于分布式应用程序中。本文将介绍如何使用 Java 客户端与 RocketMQ 集成,实现异步、同步、顺序、单向、延迟和事务消息发送,并附上相应的代码示例。
1. 环境准备
1.1 Maven 依赖
在项目的 pom.xml
中添加 RocketMQ 的 Maven 依赖:
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.0.0</version>
</dependency>
</dependencies>
1.2 启动 RocketMQ
确保 RocketMQ 服务已经启动,您可以在本地环境中启动 RocketMQ,或者使用云服务。
2. 创建消息发送代码
接下来,我们实现不同类型的消息发送功能。
2.1 同步消息
同步发送是最常用的消息发送方式,发送者会等待消息发送结果。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("testProducerGroup");
producer.setNamesrvAddr("localhost:9876"); // 设置 NameServer 地址
producer.start();
Message msg = new Message("TestTopic", "TagA", "Key1", "Hello RocketMQ Sync".getBytes());
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
}
2.2 异步消息
异步发送消息,不需要等待返回结果,可以提高发送效率。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("testProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TestTopic", "TagA", "Key2", "Hello RocketMQ Async".getBytes());
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("Send success: %s%n", sendResult);
}
@Override
public void onException(Throwable e) {
System.err.printf("Send failed: %s%n", e.getMessage());
}
});
producer.shutdown();
}
}
2.3 顺序消息
顺序消息适用于需要严格按照发送顺序的场景。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.Executors;
public class OrderedProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("orderedProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message("OrderedTopic", "TagA", String.valueOf(i).getBytes());
SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> mqs.get(arg % mqs.size()), i);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
2.4 单向消息
单向发送不需要等待发送结果,适合对响应时间要求不高的场景。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class OnewayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("onewayProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TestTopic", "TagA", "Key3", "Hello RocketMQ Oneway".getBytes());
producer.sendOneway(msg);
System.out.println("One-way message sent");
producer.shutdown();
}
}
2.5 延迟消息
延迟消息可在特定时间后被消费。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class DelayedProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("delayedProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("DelayedTopic", "TagA", "Key4", "Hello RocketMQ Delayed".getBytes());
msg.setDelayTimeLevel(3); // 延迟发送
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
}
2.6 事务消息
事务消息分为三步:发送、执行本地事务、根据本地事务结果提交或回滚。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("transactionProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
// 返回事务状态
return LocalTransactionState.COMMIT_MESSAGE; // 提交
}
@Override
public LocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务状态
return LocalTransactionState.COMMIT_MESSAGE; // 提交
}
});
producer.start();
Message msg = new Message("TransactionTopic", "TagA", "Key5", "Hello RocketMQ Transaction".getBytes());
producer.sendMessageInTransaction(msg, null);
producer.shutdown();
}
}
结论
通过上述示例,我们能够使用 RocketMQ 实现不同类型的消息发送。RocketMQ 提供了灵活的消息发送方式,用户可以根据实际需求选择合适的发送方式。在实际应用中,建议根据系统架构和业务需求做好消息发送机制的设计与优化。希望本文对你快速入门 RocketMQ 有所帮助!