首页 > 软件开发 > 架构

Rabbit MQ 简单介绍与Springboot 整合

admin 架构 2021-05-25 09:27:26 交换机 队列 rabbitmq Java 
后台-系统设置-扩展变量-手机广告位-内容正文底部

Rabbit MQ 简单介绍与Springboot 整合

什么是消息中间件?

MQ: mssage queue;消息队列是应用程序和应用程序之间的通信方法
为什么使用MQ?
一些无需立即返回,且耗时的操作提取出来,进行异步处理,节省服务器请求响应时间
作用:异步解耦,削峰填谷

市场上常见的消息队列有如下:

  1. ActiveMQ:基于JMS
  2. ZeroMQ:基于C语言开发
  3. RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
  4. RocketMQ:基于JMS,阿里巴巴产品
  5. Kafka:类似MQ的产品;分布式消息系统,高吞吐量

RabitMQ 常见的工作模式:

1.简单模式:一个生产者,一个消费者,不需要定义交换机(默认交换机)
在这里插入图片描述

2.工作模式: 一个生产者,多个消费者(竞争关系)。不需要定义交换机(默认交换机)
在这里插入图片描述

3.订阅模式:一个生产者,多个消费者。需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列 (一个消息可以被多个消费者收到,只要订阅即可)
在这里插入图片描述

4.路由模式: 一个生产者,多个消费者。需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
在这里插入图片描述

5.Topic 通配符模式: 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列(与路由模式相比,RoutingKey可以使用通配符)
在这里插入图片描述

通配符说明:
#:匹配一个或多个词,
*:匹配不多不少恰好1个词)
例如:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert

**交换机:**只负责转发消息,不具备存储消息的能力,如果队列与交换机绑定,没有符合路由规则的队列,那么消息会丢失。
1.广播式交换器类型(fanout):该类交换器不分析所接收到消息中的Routing Key,默认将消息转发到所有与该交换器绑定的队列中去。广播式交换器转发效率最高,但是安全性较低,消费者应用程序可获取本不属于自己的消息。
在这里插入图片描述

2.直接式交换器类型(direct):该类交换器需要精确匹配Routing Key与BindingKey,如消息的Routing Key = Cloud,那么该条消息只能被转发至Binding Key = Cloud的消息队列中去。直接式交换器的转发效率较高,安全性较好,但是缺乏灵活性,系统配置量较大。
在这里插入图片描述

3.主题式交换器(Topic Exchange):该类交换器通过消息的Routing Key与Binding Key的模式匹配,将消息转发至所有符合绑定规则的队列中。Binding Key支持通配符,其中“”匹配一个词组,“#”匹配多个词组(包括零个)。例如,Binding Key=“.Cloud.#”可转发Routing Key=“OpenStack.Cloud.GD.GZ”、“OpenStack.Cloud.Beijing”以及“OpenStack.Cloud”的消息,但是对于Routing Key=“Cloud.GZ”的消息是无法匹配的。

消息可靠性保证

消息投递流程如下:
在这里插入图片描述

1.生产者发送消息到交换机
2.交换机根据routingkey 转发消息给队列
3.消费者监控队列,获取队列中信息
4.消费成功删除队列中的消息

生产者可靠性消息投递

  • confirm模式
    生产者发送消息到交换机的时机
  • return模式
    交换机转发消息给queue的时机
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true # 默认为false
    publisher-returns: true # 默认为false

设置回调函数 callBack

@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {

    /**
     *
     * @param correlationData 消息信息
     * @param ack  确认标识:true,MQ服务器exchange表示已经确认收到消息 false 表示没有收到消息
     * @param cause  如果没有收到消息,则指定为MQ服务器exchange消息没有收到的原因,如果已经收到则指定为null
     */
    @Override
    public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) {
        if(ack){
            System.out.println("发送消息到交换机成功,"+cause);
        }else{
            System.out.println("发送消息到交换机失败,原因是:"+cause);
        }
    }
}

设置回调函数 return

@Component
public class MyReturnCallBack implements RabbitTemplate.ReturnCallback {
    /**
     *
     * @param message 消息信息
     * @param replyCode 退回的状态码
     * @param replyText 退回的信息
     * @param exchange 交换机
     * @param routingKey 路由key
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("退回的消息是:"+new String(message.getBody()));
        System.out.println("退回的replyCode是:"+replyCode);
        System.out.println("退回的replyText是:"+replyText);
        System.out.println("退回的exchange是:"+exchange);
        System.out.println("退回的routingKey是:"+routingKey);

    }
}

消费者确认机制(ACK)

ACK机制:有三种方式

  • 自动确认 acknowledge=“none”
  • 手动确认 acknowledge=“manual”
  • 根据异常情况来确认(暂时不怎么用) acknowledge=“auto”

其中自动确认是指:
当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
其中手动确认方式是指:
则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()等方法,让其按照业务功能进行处理,比如:重新发送,比如拒绝签收进入死信队列等等。

消费端限流说明

如果并发量大的情况下,生产方不停的发送消息,可能处理不了那么多消息,此时消息在队列中堆积很多,当消费端启动,瞬间就会涌入很多消息,消费端有可能瞬间垮掉,这时我们可以在消费端进行限流操作,每秒钟放行多少个消息。这样就可以进行并发量的控制,减轻系统的负载,提供系统的可用性,这种效果往往可以在秒杀和抢购中进行使用。在rabbitmq中也有限流的一些配置。

spring:
  application:
    name: consumer01
  rabbitmq:
    username: admin
    password: 123456
    host: 127.0.0.1
    port: 5672
    virtual-host: test
    listener:
      simple:
        acknowledge-mode: manual #设置确认模式为手动确认
        prefetch: 10 #设置消费端每秒拉取的消息数量为100 默认为250 用于消费端限流

手动签收消息:

@Component
@RabbitListener(queues = "q4")
public class RabbitMqListener {
    private static Integer count = 0;

    @RabbitHandler
    public void getMessage(Message message, Channel channel, String msg) {
        //接收消息
        System.out.println("消费端接收消息:" + msg);
        try {
            //处理本地业务
            System.out.println("处理本地业务开始======start======");
            // Thread.sleep(2000);
            // int i = 1 / 0;
            System.out.println("处理本地业务结束======end======");
            //签收消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            // 第一种:签收
            // channel.basicAck()
            // 第二种:拒绝签收 批量处理
            // channel.basicNack()
            // 第三种:拒绝签收 不批量处理
            // channel.basicReject()
        } catch (Exception e) {
            e.printStackTrace();
            //如果出现异常,则拒绝消息 可以重回队列 也可以丢弃 可以根据业务场景来
            try {
                // 第三个参数 true 则重回队列 false 则丢弃
                // 重试
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                // 直接丢弃
                // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                //channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        }
    }}

SpringBoot 整合 RabbitMQ

1.引入依赖:

  <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>

2.配置连接

spring:
  application:
    name: producer
  rabbitmq:
    username: admin
    password: 123456
    host: 127.0.0.1
    port: 5672
    virtual-host: test
    # 消息发送到交换机上 开启消息发送成功 确认机制
    publisher-confirm-type: correlated
    # 路由转发到队列 开启消息接受 确认机制
    publisher-returns: true
spring:
  application:
    name: consumer01
  rabbitmq:
    username: admin
    password: 123456
    host: 127.0.0.1
    port: 5672
    virtual-host: test
    listener:
      simple:
        acknowledge-mode: manual #设置确认模式为手动确认
        prefetch: 10 #设置消费端每秒拉取的消息数量为100 默认为250 用于消费端限流

3.创建队列和对应的交换机

    @Bean
    DirectExchange lonelyDirectExchange() {
        return new DirectExchange("lonelyDirectExchange");
    }
    // 队列
    @Bean
    public Queue topicQueue(){
        return new Queue("topicQueue", true);
    }
    // 主题交换机
    @Bean
    TopicExchange topicExchange(){
        return new TopicExchange("topicExchange", true, false);
    }

    // 绑定
    @Bean
    Binding bindingTopic(){
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("aiot.#");
    }

4.发送消息

    @GetMapping("/sendMessage")
    public Result sendDirectMessage() {
        String messageId = String.valueOf(IdUtil.fastUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
        rabbitTemplate.convertAndSend("topicExchange", "aiot.huxiongjun", map);
        log.info("发送消息成功: "+map);
        return new Result(map);
    }

5.接受消息

@Configuration
@RabbitListener(queues = "topicQueue")
public class RabbitMQListener {

    @RabbitHandler
    public void getMessage(Message message, Channel channel, HashMap messageMap) {
        //接收消息
        System.out.println("消费端接收消息:" + messageMap);
     
    }
}

以上就是关于rabbitMQ 的简单应用!

文章来源:https://blog.csdn.net/weixin_41861506/article/details/117194955

后台-系统设置-扩展变量-手机广告位-内容正文底部
版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。
本文地址:https://www.jcdi.cn/jiagou/30841.html

留言与评论(共有 0 条评论)
   
验证码:
后台-系统设置-扩展变量-手机广告位-评论底部广告位

教程弟

https://www.jcdi.cn/

统计代码 | 京ICP1234567-2号

Powered By 教程弟 教程弟

使用手机软件扫描微信二维码