一、引言
针对分布式微服务项目中,为实现各个微服务模块功能的高内聚,避免非必要功能的耦合,应采取异步调用的方式实现其他功能。因此,消息队列成为首选技术,对于并发量一般的项目,RabbitMQ能够优秀胜任其工作。
二、操作步骤
1.Docker部署RabbitMQ容器
1)拉取RabbitMQ镜像文件
访问docker官方仓库,检索RabbitMQ镜像文件,查看所需版本Tag。

执行镜像拉取命令,默认为拉取最新版本Tag。
2)部署RabbitMQ容器
成功拉取镜像文件后,通过执行运行命令将镜像部署为容器。
1
| docker run -e RABBITMQ_DEFAULT_USER=rabbitmq -e RABBITMQ_DEFAULT_PASS=rabbitmq -v rabbitmq:/plugins --name rabbitmq -p 15672:15672 -p 5672:5672 -d rabbitmq
|
RABBITMQ_DEFAULT_USER和RABBITMQ_DEFAULT_PASS对应RabbitMQ控制台的账号与密码。
3)访问控制台
通过浏览器访问http://192.168.19.130:15672即可进入RabbitMQ控制台,成功登陆后进入管理界面。

进入Admin菜单栏可创建新用户,针对不同项目可对应创建不同用户与虚拟机。其他用户对各自的虚拟机具有足够的开发权限,可对通过Exchanges和Queues菜单栏针对消息交换机和消息队列进行业务配置。
2.与Spring及其子框架集成
1)引入AMQP依赖
基于RabbitMQ采用AMQP协议的特性,Spring提供了SpringAMQP作为消息收发的模板工具。通过Maven导入相关依赖即可快速使用。
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
依赖版本version由SpringBoot框架进行管理并自动适配。
2)资源文件Application.yml配置
设置Spring应用程序与RabbitMQ服务器的连接参数。
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.19.130 port: 5672 virtual-host: / username: rabbitmq password: rabbitmq
|
3.基本消息队列处理
1)仅队列消息转发
一般消息队列处理模式为向指定消息交换机发送消息,消息交换机根据其路由断言,将消息转发至指定消息队列,再由消息队列发送消息。此处仅演示最简单的无交换机仅队列消息转发。
通过测试用例代码向RabbitMQ服务器发送消息,SpringAMQP自动注入RabbitTmplate的Bean对象,由它来完成消息的发送。
1 2 3 4 5 6 7 8 9 10 11 12
| @SpringBootTest @Slf4j public class PublisherApplicationTests { @Autowired private RabbitTemplate rabbitTemplate;
@Test void testPublishOnlyQueue() { String queue = "only_queue"; rabbitTemplate.convertAndSend(queue, message); } }
|
声明消费者及其监听的消息队列,通过@RabbitListener注解声明当前消费者监听的消息队列为only_queue。消费者类应使用@Component注解注册为Spring管理的Bean对象。
1 2 3 4 5 6 7 8
| @Component @Slf4j public class RabbitMqListener { @RabbitListener(queues = "work_queue") public void listenOnlyQueue(String msg) { System.out.println("消费者收到消息:" + msg); } }
|
2)fanout模式消息转发
fanout模式为广播转发,当fanout类型消息交换机接受消息后,会将消息转发至所有与其绑定的消息队列。
将消息发送至amq.fanout消息交换机,再由RabbitMQ服务器自行转发消息。
1 2 3 4 5 6 7 8 9 10 11 12 13
| @SpringBootTest @Slf4j public class PublisherApplicationTests { @Autowired private RabbitTemplate rabbitTemplate;
@Test void testPublishFanoutQueue() { String exchange = "amq.fanout"; String message = "Hello, RabbitMQ!"; rabbitTemplate.convertAndSend(exchange, null, message); } }
|
声明与amq.fanout消息交换机所绑定的两个消息队列,RabbitMQ服务器将会向两个消息队列均发送消息。
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Component @Slf4j public class RabbitMqListener { @RabbitListener(queues = "amq.fanout.queue1") public void listenFanoutQueue1(String msg) { log.info("消费者(1)收到消息:" + msg); }
@RabbitListener(queues = "amq.fanout.queue2") public void listenFanoutQueue2(String msg) { log.info("消费者(2)收到消息:" + msg); } }
|
3)direct模式消息转发
direct模式为指向模式,当direct类型消息交换机接受消息后,会根据路由绑定key将消息转发至对应与其绑定的消息队列。
将消息发送至amq.direct消息交换机,再由RabbitMQ服务器自行转发消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @SpringBootTest @Slf4j public class PublisherApplicationTests { @Autowired private RabbitTemplate rabbitTemplate;
@Test void testPublishDirectQueue() { String exchange = "amq.direct"; String routingKey = "direct";
Map<String, Object> message = Map.of("language", "Java", "framework", "Spring");
rabbitTemplate.convertAndSend(exchange, routingKey, message); } }
|
由于转发的消息类型为Map,转发方法的底层消息转换器会将其序列化为不可读字段,因此可以将消息转换器自定义为Json格式转换器。
1 2 3 4
| <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Configuration @Slf4j public class RabbitmqConfig { @Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void initRabbitTemplate() { Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter); } }
|
声明与amq.direct消息交换机所绑定的消息队列,消息交换机amq.direct与消息队列amq.direct.queue通过direct作为路由绑定key进行绑定,RabbitMQ服务器将会根据路由绑定key向消息队列发送消息。
1 2 3 4 5 6 7 8 9 10
| @Component @Slf4j public class RabbitMqListener { @RabbitListener(bindings = @QueueBinding(value = @Queue("amq.direct.queue", durable = "true"), exchange = @Exchange("amq.direct"), key = "direct")) public void listenDirectQueue(Map<?, ?> msg) { log.info("消费者收到消息:" + msg); } }
|
通过@RabbitListener注解的bindings参数配置可以通过代码实现消息交换机与消息队列的绑定,无需通过RabbitMQ控制台进行手动操作。
4)topic模式消息转发
topic模式为话题模式,对比direct模式单个绑定,可理解为一组相似的消息队列绑定。借助通配符来实现多个相似绑定,#匹配一个或多个词,*仅匹配一个词。其实现形式与direct模式消息转发类似。
4.进阶消息队列处理
1)消费者确认及失败重复机制
当消费者监听得到消息并实现消费逻辑后,应向RabbitMQ服务器发送反馈回执。
最优解决方式为SpringAMQP利用AOP对消息处理逻辑进行环绕增强,当业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果。如果是业务异常,会自动返回nack。如果是消息处理或校验异常,自动返回reject。
1 2 3 4 5
| spring: rabbitmq: listener: simple: acknowledge-mode: auto
|
将acknowledge-mode模式设置为auto,即可实现自动处理。
此时若消息被消费失败,则会无限回到消息队列重复投递给消费者,造成死循环。需要对失败消息的重新投递次数做一定限制,以避免此极端情况的发生。
1 2 3 4 5 6 7 8 9 10
| spring: rabbitmq: listener: simple: retry: enabled: true initial-interval: 1000 multiplier: 1 max-attempts: 3 stateless: true
|
当执行消费逻辑失败后,等待一秒后再次投递消息,最多重复三次即放弃此次消息。此消息将被废弃,无法寻回。
针对多次重试仍然失败的消息,较好的处理方式是,通过配置类中自定义Bean的方式,声明一组处理失败消息的消息交换机和消息队列。当消费失败,重复尝试三次仍然失败之后,失败消息将被投递至该组消息交换机和消息队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| @Configuration @Slf4j public class RabbitmqConfig { @Autowired private RabbitTemplate rabbitTemplate;
@Bean public DirectExchange errorExchange() { return new DirectExchange("error.exchange", true, false); }
@Bean public Queue errorQueue() { return new Queue("error.queue", true, false, false); }
@Bean public Binding errorBinding(DirectExchange errorExchange, Queue errorQueue) { return BindingBuilder.bind(errorQueue) .to(errorExchange) .with("error"); }
@Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error"); } }
|
2)发送延迟消息
特殊业务环境下,需要向RabbitMQ中发送延迟消息。传统的延迟消息需要借助死信交换机,实现起来相对麻烦。RabbitMQ提供了一个延迟消息插件DelayExchange来实现相同的效果,能够大幅度减少消息交换机和消息队列的多余配置。

访问DelayExchange插件的仓库,通过Release下载插件文件。
1 2
| docker volume ps docker volume inspect mq-plugins
|
查看RabbitMQ容器的文件挂载路径,将插件文件上传至该目录,再进入容器启用该插件即可实现简化延迟消息发送的功能。
1
| docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
|
声明延迟消息消费者,将延迟消息交换机delay.exchange与延迟消息队列delay.queue通过delay作为路由绑定key进行绑定。
1 2 3 4 5 6 7 8 9 10
| @Component @Slf4j public class RabbitMqListener { @RabbitListener(bindings = @QueueBinding(value = @Queue("delay.queue"), exchange = @Exchange(value = "delay.exchange", delayed = "true"), key = "delay")) public void listenDelayQueue(String msg) { log.info("消费者<delay.queue>收到消息:" + msg); } }
|
向延迟消息交换机delay.exchange指定路由绑定key为delay发送延时10s的消息delay。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @SpringBootTest @Slf4j public class PublisherApplicationTests { @Autowired private RabbitTemplate rabbitTemplate;
@Test void testPublishDelayQueue() { rabbitTemplate.convertAndSend("delay.exchange", "delay", "delay", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(1000 * 10); return message; } }); } }
|
例如网购下单三十分钟之后未支付即自动取消订单,就使用了延迟消息发送功能。当下单成功后即发送一个延时消息,等待指定时间之后,再判断订单是否成功支付。由此可见,发送延迟消息此功能在实际业务中应用的广泛性。
三、写在最后
通过RabbitMQ实现分布式微服务之间的异步调用,具有耦合度更低、性能更好、业务拓展性强、故障隔离以避免级联失败等诸多优点,但同时依然存在完全依赖于Broker的可靠性、安全性和性能,架构复杂,后期维护和调试麻烦等缺陷。总的来说,依然是利大于弊,其优点自不必说,部分缺点是可以通过良好的软件开发模式和习惯不同程度的缓解的。