前面已经在Windows环境下成功安装并运行起了RabbitMQ服务,上一篇:《消息中间件RabbitMQ(二)Windows安装Erlang和RabbitMQ》,这里将分享在实际springboot项目中,整合RabbitMQ的学习心得。
一. SpringBoot项目初始化
这里新建SpringBoot项目,就简单的使用Spring initalizr来初始化,打开网站:Spring initalizr
二. 导入项目-准备工作
1.pom.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.trhyme</groupId>
<artifactId>FenBushi</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>FenBushi</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<!--SpringBoot整合RabbitMQ的Jar包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.application.yml配置
server:
servlet:
#访问项目名称路径
context-path: /fenbushi
#项目访问端口
port: 4399
spring:
application:
name: springboot-rabbit-mq
#配置RabbitMQ
rabbitmq:
#RabbitMQ的地址
host: 127.0.0.1
#AMQP协议端口
port: 5672
#RabbitMQ的账号
username: guest
#RabbitMQ的登录密码
password: guest
#RabbitMQ的虚拟机
virtual-host: /
#解读:公平分发就是根据谁执行的效率高,那么就给其多分发消息进行处理,正所谓能者多劳。
listener:
simple:
prefetch: 1
三. 发布/订阅模式
1.编写配置类
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/***
* 发布/订阅模式(Fanout)
* 配置类
*/
@Configuration
public class FanoutRabbitConfig {
/**
* 创建三个队列 :fanout.ShortMessage fanout.Email fanout.WeChat
* 将三个队列都绑定在交换机 fanoutExchange 上
* 因为是扇型交换机, 路由键无需配置,配置也不起作用
*/
@Bean
public Queue queueShortMessage() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("fanout.ShortMessage",true);
}
@Bean
public Queue queueEmail() {
return new Queue("fanout.Email",true);
}
@Bean
public Queue queueWeChat() {
return new Queue("fanout.WeChat",true);
}
//Fanout交换机 起名:fanoutExchange
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
//绑定 将队列和交换机绑定
@Bean
Binding bindingExchangeShortMessage() {
return BindingBuilder.bind(queueShortMessage()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeEmail() {
return BindingBuilder.bind(queueEmail()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeWeChat() {
return BindingBuilder.bind(queueWeChat()).to(fanoutExchange());
}
}
2.消费者系统类
(1)邮件系统类
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/***
* 消费者:邮件系统
*/
@Component
@RabbitListener(queues = "fanout.Email")
public class Email {
/***
* 处理消息
* @param testMessage
*/
@RabbitHandler
public void process(Map testMessage) {
System.out.println("邮件系统收到消息 : " +testMessage.toString());
}
}
(2)短信系统类
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/***
* 消费者:短信系统
*/
@Component
@RabbitListener(queues = "fanout.ShortMessage")
public class ShortMessage {
/***
* 处理消息
* @param testMessage
*/
@RabbitHandler
public void process(Map testMessage) {
System.out.println("短信系统收到消息 : " + testMessage.toString());
}
}
(3)微信系统类
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/***
* 消费者:微信系统
*/
@Component
@RabbitListener(queues = "fanout.WeChat")
public class WeChat {
/***
* 消息业务处理
* @param testMessage
*/
@RabbitHandler
public void process(Map testMessage) {
System.out.println("微信系统收到消息 : " +testMessage.toString());
}
}
3.Controller消息推送类
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/***
* 测试订单消息推送的类
*/
@RestController
@RequestMapping("/order")
public class OrderController {
//使用RabbitTemplate,这提供了接收/发送等等方法
private RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/***
* 请求测试
*/
@RequestMapping(value = "/send-message", method = RequestMethod.GET)
public String sendOutMessageMethod(){
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "用户成功下单了!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("fanoutExchange", null, map);
return "Send Success";
}
}
4.启动项目并运行
启动项目后,在浏览器中输入地址:http://127.0.0.1:4399/fenbushi/order/send-message
四.Work模式
4.1.轮询模式
所谓轮询分发就是有两个消费者监听同一个队列,那么当我们发大量消息的时候,交换器会将消息平均分配给两个消费者,就算其中一个消费者的处理效率比另一个高,也同样只能分配一样的消息数量。
4.1.1.编写配置类
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/***
* Work模式
* 轮询模式
* 解释:所谓轮询分发就是有两个消费者监听同一个队列,那么当我们发大量消息的时候,
* 交换器会将消息平均分配给两个消费者,就算其中一个消费者的处理效率比另一个高,也同样只能分配一样的消息数量。
*/
//@Configuration
public class WorkRabbitMQConfig {
@Bean
public Queue queueWork() {
return new Queue("queue_work",true);
}
@Bean
FanoutExchange workExchange() {
return new FanoutExchange("WorkExchange");
}
//绑定 将队列和交换机绑定
@Bean
Binding bindingExchangeShortMessage() {
return BindingBuilder.bind(queueWork()).to(workExchange());
}
}
4.1.2.编写消费者类
- (1)邮件系统类
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/***
* 消费者:邮件系统
*/
@Component
@RabbitListener(queues = "queue_work")
public class Email2 {
@RabbitHandler
public void process(String testMessage) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("邮件系统收到消息 : " +testMessage);
}
}
- (2)短信系统类
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/***
* 消费者:邮件系统
*/
@Component
@RabbitListener(queues = "queue_work")
public class ShortMessage2 {
@RabbitHandler
public void process(String testMessage) {
//休眠300毫秒,表示效率相比Email低
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("短信系统收到消息 : " +testMessage);
}
}
4.1.3.Controller消息推送类
package com.trhyme.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/***
* work模式轮询测试类
*/
@RestController
@RequestMapping("/order")
public class SendMessageController {
//使用RabbitTemplate,这提供了接收/发送等等方法
private RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/***
* Work模式请求测试
*/
@RequestMapping(value = "/send-work-message", method = RequestMethod.GET)
public String sendDirectMessage() {
for(int i=0;i<10;i++) {
String message="收到消息:"+i;
//将消息携带绑定键值:shortmessage 发送到交换机DirectExchange
rabbitTemplate.convertAndSend("WorkExchange", "", message);
System.out.println("发送成功:"+i);
}
return "消息发送成功!";
}
}
4.1.4.启动项目并运行
启动项目后,在浏览器中输入地址:http://127.0.0.1:4399/fenbushi/order/send-work-message
4.2.公平分发模式
公平分发就是根据谁执行的效率高,那么就给其多分发消息进行处理,正所谓能者多劳。
实现公平分发很简单,在基于轮询分发的基础上,我们只需要在消费者项目的配置文件中加入以下代码:
spring:
listener:
simple:
prefetch: 1
五.路由模式
1.编写配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/***
* 路由模式(Direct)
* 配置类
*/
@Configuration
public class DirectRabbitConfig {
/**
* 创建三个队列 :direct.ShortMessage direct.Email direct.WeChat
* 将三个队列都绑定在交换机 DirectExchange 上
* 因为是扇型交换机, 路由键无需配置,配置也不起作用
*/
@Bean
public Queue queueShortMessage() {
return new Queue("direct.ShortMessage",true);
}
@Bean
public Queue queueEmail() {
return new Queue("direct.Email",true);
}
@Bean
public Queue queueWeChat() {
return new Queue("direct.WeChat",true);
}
//Direct交换机 起名:DirectExchange
@Bean
DirectExchange DirectExchange() {
return new DirectExchange("DirectExchange",true,false);
}
//绑定 将队列和交换机绑定,并设置用于匹配键
@Bean
Binding bindingExchangeShortMessage() {
return BindingBuilder.bind(queueShortMessage()).to(DirectExchange()).with("shortmessage");
}
@Bean
Binding bindingExchangeEmail() {
return BindingBuilder.bind(queueEmail()).to(DirectExchange()).with("email");
}
@Bean
Binding bindingExchangeWeChat() {
return BindingBuilder.bind(queueWeChat()).to(DirectExchange()).with("wechat");
}
}
2.消费者系统类
(1)邮件系统类
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/***
* 消费者:邮件系统
*/
@Component
@RabbitListener(queues = "direct.Email")
public class Email3 {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("33邮件系统收到消息 : " +testMessage.toString());
}
}
(2)短信系统类
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/***
* 消费者:短信系统
*/
@Component
@RabbitListener(queues = "direct.ShortMessage")
public class ShortMessage3 {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("33短信系统收到消息 : " +testMessage.toString());
}
}
(3)微信系统类
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/***
* 消费者:微信系统
*/
@Component
@RabbitListener(queues = "direct.WeChat")
public class WeChat3 {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("33微信系统收到消息 : " +testMessage.toString());
}
}
3.Controller消息推送类
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/***
* work模式轮询测试类
*/
@RestController
@RequestMapping("/order")
public class SendMessageController {
//使用RabbitTemplate,这提供了接收/发送等等方法
private RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/***
* 2.路由模式测试类
*/
@RequestMapping(value = "/send-direct-message", method = RequestMethod.GET)
public String sendDirectMessageTwo() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "Direct:用户成功下单了!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//将消息携带绑定键值:shortmessage 发送到交换机DirectExchange
rabbitTemplate.convertAndSend("DirectExchange", "shortmessage", map);
rabbitTemplate.convertAndSend("DirectExchange", "email", map);
rabbitTemplate.convertAndSend("DirectExchange", "wechat", map);
return "消息发送成功!";
}
}
4.启动项目并运行
启动项目后,在浏览器中输入地址:http://127.0.0.1:4399/fenbushi/order/send-direct-message
六.主题模式
1.编写配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/***
* 主题模式(Topic)
* 配置类
*/
//@Configuration
public class TopicRabbitConfig {
@Bean
public Queue queueShortMessage() {
return new Queue("topic.ShortMessage");
}
@Bean
public Queue queueEmail() {
return new Queue("topic.Email");
}
@Bean
public Queue queueWeChat() {
return new Queue("topic.WeChat");
}
@Bean
TopicExchange exchange() {
return new TopicExchange("TopicExchange");
}
//将queueShortMessage和TopicExchange绑定,而且绑定的键值为topic.shortmessage
//这样只要是消息携带的路由键是topic.shortmessage,才会分发到该队列
@Bean
Binding bindingExchangeShortMessage() {
return BindingBuilder.bind(queueShortMessage()).to(exchange()).with("topic.shortmessage");
}
//将queueEmail和TopicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeEmail() {
return BindingBuilder.bind(queueEmail()).to(exchange()).with("topic.#");
}
//只要是消息携带的路由键是topic.wechat,才会分发到该队列
@Bean
Binding bindingExchangeWeChat() {
return BindingBuilder.bind(queueEmail()).to(exchange()).with("topic.wechat");
}
}
2.消费者系统类
(1)邮件系统类
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "topic.Email")
public class Email4 {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("邮件系统收到消息 : " +testMessage.toString());
}
}
(2)短信系统
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "topic.ShortMessage")
public class ShortMessage4 {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("短信系统收到消息 : " +testMessage.toString());
}
}
(3)微信系统
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "topic.WeChat")
public class WeChat4 {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("微信系统收到消息 : " +testMessage.toString());
}
}
3.Controller消息推送类
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/***
* work模式轮询测试类
*/
@RestController
@RequestMapping("/order")
public class SendMessageController {
//使用RabbitTemplate,这提供了接收/发送等等方法
private RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/***
* 3.Topic模式请求测试
*/
@RequestMapping(value = "/send-topic-message", method = RequestMethod.GET)
public String sendDirectMessageThree() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "Topic:用户成功下单了!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//将消息携带绑定键值:topic.shortmessage 发送到交换机TopicExchange
rabbitTemplate.convertAndSend("TopicExchange", "topic.shortmessage", map);
return "消息发送成功!";
}
}
4.启动项目并运行
启动项目后,在浏览器中输入地址:http://127.0.0.1:4399/fenbushi/order/send-topic-message