King's Studio

SpringBoot整合RabbitMQ消息队列

字数统计: 1.1k阅读时长: 4 min
2020/05/23 Share

RabbitMQ前面已经介绍过原理以及工作机制,现在我们利用SpringBoot进行消息队列的操作。

创建RabbitMQ环境

同样的,为了测试方便,我们在这里直接使用docker部署RabbitMQ,使用下面的命令在docker hub中查找RabbitMQ的版本。

1
docker search rabbitmq

为了便于直观看到RabbitMQ中的情况,我们使用带”-management”后缀的版本,这种版本自带web管理界面,能更好的帮助我们理解RabbitMQ的执行流程。

1
docker pull rabbitmq:3-management

下载完成后查看镜像的下载情况,并进行部署启动。

1
2
docker images
docker run -d -p 5672:5672 -p 15672:15672 --name RabbitMQ rabbitmq:3-management

这边指定的两个端口,第一个端口是RabbitMQ自身的启动端口,第二个端口号是web管理后台的端口号,在启动的时候指定,我们就能直接利用这个端口号访问后台页面,如下图所示。

预览

点击Exchanges查看所有的交换器情况。

交换器页面

点击Queues查看所有的队列的情况。

队列

使用SpringBoot操作RabbitMQ

创建测试工程,引入相关依赖

在SpringBoot2.0版本中,我们直接引入amqp的依赖就可以进行与RabbitMQ的整合,因为RabbitMQ正是属于AMQP类型的消息队列。

1
2
3
4
5
6
7
8
9
10
11
12
<!-- 引入AMQP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- 使用fastjson对普通对象进行json转化 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>

然后在application.properties配置文件中配置RabbitMQ的相关信息。

1
2
3
4
5
# RabbitMQ配置信息
spring.rabbitmq.host=服务器地址
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672

测试direct点对点模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 测试单播(点对点发送)
*/
@Test
void contextLoads() {
//需要自己定义message,定义消息头和消息体内容
//rabbitTemplate.send(exchange,routingKey,message);

//object当做默认的消息体,只需传入对象,自动序列化发送给rabbitMQ
//rabbitTemplate.convertAndSend(exchange,routingKey,object);
Map<String, Object> map = new HashMap<>();
map.put("msg","这是第一个单播的消息");
map.put("data", Arrays.asList("test",123,true));
//第一个参数是指定哪一个交换器,第二个参数是绑定规则
rabbitTemplate.convertAndSend("exchange.direct","atguigu.news", JSON.toJSONString(new Book("三国演义","罗贯中")));
}

在这里我们使用fastjson将具体的POJO对象转换成json字符串发送到消息队列中进行保存,而在接收的时候,则需要使用fastjson将字符串反序列化为原POJO对象。我们编写一个service进行消息队列监听的操作,使用@RabbitListener注解指定监听哪一个队列接收到消息,一但被我们写的service接收,该条消息即从消息队列中删除。需要提醒的是,如果要开启监听功能,需要在启动类上加上@EnableRabbit注解。

1
2
3
4
5
6
7
8
9
@EnableRabbit//开启基于注解的RabbitMQ
@SpringBootApplication
public class SpringbootAmqpApplication {

public static void main(String[] args) {
SpringApplication.run(SpringbootAmqpApplication.class, args);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Service("bookService")
public class BookServiceImpl implements BookService {

private static final Logger logger = LoggerFactory.getLogger(BookServiceImpl.class);

@Override
@RabbitListener(queues = "atguigu.news")//指定监听哪一个队列
public void receive(String book) {
logger.info("-------------------接收消息-------------------");
//由于我们是将book对象使用fastJson序列化为字符串之后发送到RabbitMQ中的,因此监听的消费者接收到的是字符串,此处再将字符串反序列化为Book类型进行接收
Book book1 = JSONObject.parseObject(book, Book.class);
System.out.println(book1);
}

@RabbitListener(queues = "atguigu")
public void receive(Message message){
//获得消息体
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
}

广播消息

通过指定交换器为fanout类型的,进行发送广播消息,即所有的队列均可以接收到发送的消息。

1
2
3
4
5
6
7
8
9
/**
* 广播消息
*/
@Test
void broadcast(){
Map map = new HashMap();
map.put("msg","这是广播的消息");
rabbitTemplate.convertAndSend("exchange.fanout","",map);
}

操作创建队列、交换器、绑定规则

创建队列、交换器、绑定规则的前提需要引入AmqpAdmin,RabbitMQ的系统功能管理组件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Autowired
private AmqpAdmin amqpAdmin;

//创建交换器
@Test
void createExchange(){
amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.exchange"));

amqpAdmin.declareQueue(new Queue("amqpAdmin.queue"));
logger.warn("---------------------创建完成--------------------");

//创建绑定规则
amqpAdmin.declareBinding(new Binding("amqpAdmin.queue",Binding.DestinationType.QUEUE,"amqpAdmin.exchange","amqp.binding",null));
}

总结

以上就是SpringBoot简单整合RabbitMQ的流程,在这边总结的目的也就是为了提高开发的效率,不需要再一步一步去找文档怎样进行配置,而着重关注业务逻辑。

原文作者:金奇

原文链接:https://www.rossontheway.com/2020/05/23/SpringBoot整合RabbitMQ消息队列/

发表日期:May 23rd 2020, 12:00:00 am

更新日期:May 23rd 2020, 11:26:11 am

版权声明:本文采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可,除特别声明外,转载请注明出处!

CATALOG
  1. 1. 创建RabbitMQ环境
  2. 2. 使用SpringBoot操作RabbitMQ
    1. 2.1. 创建测试工程,引入相关依赖
    2. 2.2. 测试direct点对点模式
    3. 2.3. 广播消息
    4. 2.4. 操作创建队列、交换器、绑定规则
    5. 2.5. 总结