RabbitMQ是一种流行的开源消息中间件,广泛应用于分布式系统中。RabbitMQ支持多种工作模式,以下是七种主要的工作模式详细介绍,以及相应的代码示例。

1. 点对点模式(P2P)

在点对点模式中,消息生产者将消息发送到队列,消费者从该队列中获取消息。每条消息只有一个消费者会处理,这种模式适用于任务分发场景。

示例代码

import pika

# 发送消息
def send_message(message):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='task_queue', durable=True)
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(delivery_mode=2))
    print(f" [x] Sent {message}")
    connection.close()

send_message('Hello World!')

2. 发布/订阅模式(Pub/Sub)

在发布/订阅模式中,消息生产者将消息发送到交换机,多个消费者可以从这个交换机接收到相同的消息。这种模式适合需要广播消息的场景。

示例代码

import pika

# 发送消息
def publish_message(message):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    channel.basic_publish(exchange='logs', routing_key='', body=message)
    print(f" [x] Sent {message}")
    connection.close()

publish_message('Hello Subscribers!')

3. 路由模式(Routing)

在路由模式中,生产者将消息发送到交换机,并通过路由键将消息路由到特定队列。消费者根据绑定的路由键接收消息。

示例代码

import pika

def emit_log(severity, message):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
    channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
    print(f" [x] Sent {severity}:{message}")
    connection.close()

emit_log('info', 'Hello Info!')

4. 主题模式(Topics)

主题模式允许消费者根据特定的主题模式接收消息。生产者可以将消息发送到一个主题交换机,并使用主题模式来过滤需要接收的消息。

示例代码

import pika

def publish_topic_message(routing_key, message):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
    channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
    print(f" [x] Sent {routing_key}:{message}")
    connection.close()

publish_topic_message('kern.critical', 'A critical kernel error!')

5. 工作队列模式(Work Queue)

在工作队列模式中,多个消费者从同一个队列中获取任务,支持并发处理。这种模式适用于场景如处理任务的负载均衡。

示例代码

import pika
import time

def worker():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='task_queue', durable=True)

    def callback(ch, method, properties, body):
        time.sleep(body.count(b'.'))
        print(f" [x] Done {body}")
        ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_qos(prefetch_count=1)  # 确保一次只分派一个任务
    channel.basic_consume(queue='task_queue', on_message_callback=callback)
    print(" [*] Waiting for messages.")
    channel.start_consuming()

# 启动消费者
worker()

6. 延迟队列(Delayed Queues)

延迟队列允许在指定的时间之后将消息投递到队列。适用于推迟消息处理的场景。

示例代码(需要使用RabbitMQ的插件):

import pika

def send_delayed_message(message, delay):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='delayed_queue', durable=True)

    headers = {"x-delay": delay}
    properties = pika.BasicProperties(headers=headers)

    channel.basic_publish(exchange='',
                          routing_key='delayed_queue',
                          body=message,
                          properties=properties)
    print(f" [x] Sent {message} with delay {delay}")
    connection.close()

send_delayed_message('Delayed Hello!', 5000)  # 延迟5秒

7. 确认机制(Acknowledge)

在RabbitMQ中,消费者可以在处理完消息后发送确认(ack)信号,确保消息不会被丢失。这对于确保消息的可靠传递非常重要。

示例代码(包含确认机制):

import pika

def callback(ch, method, properties, body):
    print(f" [x] Received {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

def consume_with_ack():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='ack_queue', durable=True)
    channel.basic_qos(prefetch_count=1)  
    channel.basic_consume(queue='ack_queue', on_message_callback=callback)
    print(" [*] Waiting for messages.")
    channel.start_consuming()

# 启动消费者
consume_with_ack()

通过掌握这些工作模式,可以更好地利用RabbitMQ在不同场景下进行消息传递,满足系统间的通信需求。这些模式为开发人员提供了丰富的选择,以提高系统的灵活性和可靠性。

点赞(0) 打赏

微信小程序

微信扫一扫体验

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部