rabbitmq 消息100%可靠性投递的解决方案实现 ack手动确认方式

代码实例及学习参考内容来自慕课网课程RabbitMQ消息中间件极速入门与实战部分代码自己进行了改造

特点:

  1. RabbitMQ底层使用Erlang语言编写,传递效率高,延迟低
  2. 开源、性能优秀、稳定性较高
  3. 与SpringAMQP完美的整合、API丰富
  4. 集群模式丰富、表达式配置、HA模式、镜像队列模式
  5. 保证数据不丢失的情况下,做到高可用
  6. AMQP全称:Advanced Message Queuing Protocol
  7. AMQP翻译:高级消息队列协议

AMQP核心概念

  • Server:又称Broker,接收客户端的连接,实现AMQP实体服务
  • Connection:连接,应用程序与Broker的网络连接
  • Channel:网络信道,几乎所有的操作都在Channel中进行,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。Channel是进行消息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务。
  • Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
  • Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual host可以有若干个Exchange和Queue,同一个Virtual host里面不能有相同的Exchange和Queue
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列

RabbitMQ中有三种常用的交换机类型:

direct: 如果路由键匹配,消息就投递到对应的队列

fanout:投递消息给所有绑定在当前交换机上面的队列

topic:允许实现有趣的消息通信场景,使得5不同源头的消息能够达到同一个队列。topic队列名称有两个特殊的关键字。

* 可以替换一个单词

# 可以替换所有的单词

  • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
  • Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
  • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者,多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
  • Prefetch count:如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。

RabbitMq投递信息流程图
在这里插入图片描述

RabbitMq百分百推送流程图

在这里插入图片描述

  • Step 1: 首先把消息信息(业务数据)存储到数据库中,紧接着,我们再把这个消息记录也存储到一张消息记录表里(或者另外一个同源数据库的消息记录表)

  • Step 2:发送消息到MQ Broker节点(采用confirm方式发送,会有异步的返回结果)

  • Step 3、4:生产者端接受MQ Broker节点返回的Confirm确认消息结果,然后进行更新消息记录表里的消息状态。比如默认Status = 0 当收到消息确认成功后,更新为1即可!

  • Step 5:但是在消息确认这个过程中可能由于网络闪断、MQ Broker端异常等原因导致 回送消息失败或者异常。这个时候就需要发送方(生产者)对消息进行可靠性投递了,保障消息不丢失,100%的投递成功!(有一种极限情况是闪断,Broker返回的成功确认消息,但是生产端由于网络闪断没收到,这个时候重新投递可能会造成消息重复,需要消费端去做幂等处理)所以我们需要有一个定时任务,(比如每5分钟拉取一下处于中间状态的消息,当然这个消息可以设置一个超时时间,比如超过1分钟 Status = 0 ,也就说明了1分钟这个时间窗口内,我们的消息没有被确认,那么会被定时任务拉取出来)

  • Step 6:接下来我们把中间状态的消息进行重新投递 retry send,继续发送消息到MQ ,当然也可能有多种原因导致发送失败

  • Step 7:我们可以采用设置最大努力尝试次数,比如投递了3次,还是失败,那么我们可以将最终状态设置为Status = 2 ,最后 交由人工解决处理此类问题(或者把消息转储到失败表中)。

代码测试
  • 1.将test.sql导入本地数据库,将服务端和消费端的rabbitmq配置文件账号密码进行相应的修改

  • 2.首先启动消费服务端 rabbitmq-customer服务的Application.java

  • 3.再启动rabbitmq服务端 rabbitmq-publisher服务的Application.java

  • 4.浏览器调用投递信息的控制器 127.0.0.1:8081/send

  • 5.在消费服务端的控制台可以看到信息处理的日志

项目源码地址: https://github.com/niezhiliang/springboot-rabbitmq

来源:网络


智能推荐

RabbitMQ的消息确认ACK机制

RabbitMQ的消息确认ACK机制 1、什么是消息确认ACK。   答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确定-ACK。 2、ACK的消息确认机制。   答:ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反...

RabbitMQ消费者怎么去手动ack

转载:https://blog.csdn.net/java_green_hand0909/article/details/79698608 1. 在这里不提如何集成rabbit mq到spring。 2. 实现功能的配置都在消费者端: 3. 下面是步骤和说明 (1)在消费者端的mq配置文件上添加,配置  关键代码为 acknowledeg = "manual" ,意为...

RabbitMQ(四)消息Ack确认机制

RabbitMQ(四)消息Ack确认机制 确认种类 RabbitMQ的消息确认有两种。 消息发送确认:这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 消费接收确认。这种是确认消费者是否成功消费了队列中的消息。 环境配置 为了测试,我们先配置rabbit环境 引入Maven依赖 配置文件 Rabbi...

rabbitmq生产端-消息可靠性投递方案(适用于高并发场景)

这个方案相对于我上一篇的可靠性投递方案少了一个入库操作,更适用高并发场景,具体执行流程如下: 1.UpstreamService先投递一个消息到MQ Broker,业务服务模块进行消息监听,监听到消息后进行业务处理,处理完成,投递一个确认消息到MQ Broker,由一个CallbackService服务进行确认消息的监听,监听到确认消息对消息进行入库操作。 2.UpstreamService再投递...

中间件 - 消息队列 - RabbitMQ - 消息的可靠性投递

消息的投递有4个环节, 如下图 . 环节1: 生产者Producer把消息发送给服务器Broker Producer怎么知道Broker有没有接收到消息 服务端确认-Transaction模式 只要channel.txCommit()方法返回, 服务端就一定接收到了消息 缺点: 同步模式, 等Broker返回成功之后 , Producer才会继续发送下一条消息, 大大降低了效率 服务端确认-Con...

猜你喜欢

Blender 插件之 Blender for UE4

Blender 插件之 Blender for UE4 https://zhuanlan.zhihu.com/p/146665394   Blender 插件之 Blender for UE4 WeArt微创意 ​ 腾讯科技有限公司 游戏美术     怎么使用? 使用Blender处理虚幻引擎4的对象包可能很繁琐。这就是为什么我创建加载项:“ Blende...

widows版本oraclexe的安装副本

1.安装好了在crm命令行里输入: sqlplus system/密码 看到如下提示就证明安装成功: 还有一种连接是基于网络通过监听器来完成连接的: 输入:sqlplus system/密码@127.0.0.1:1521/xe 2.继续来配置plsq Developer  进去之后在工具一栏选择:首选项 在其目录下配置路径如下: 连接成功之后以system的权限去登录,并且可以创建用户,...

Flink基础 -- 2.Flink的安装和第一个Demo

Flink的安装 Flink的相关安装步骤如下: 装虚拟机 装系统 装jdk 装scala(不需要不用) 装Hadoop(不需要不用) 装Flink 配置环境变量   如果只是刚开始的自我测试,安装还是很简单的,直接下载包,上传服务器,tar解压,配置了环境变量,source一下,ok,可以用了,这时不放start-cluster.sh一下启动flink吧(这里只是测试,安装了...

SyntaxError: Non-ASCII character '\xe5' in file G:/pycharm/test/Python�����.py on line 2解决方法

这属于中文不支持问题 只需在代码前加一行注释 #coding=utf-8 #encoding = utf-8 #-*-coding:utf-8-*- 任选一个均可,但最好用最后一个,它是正规的Python推荐写法   如果采用是的PyCharm,加注释还不行,可点击File ->Default Settings->File Encoding 改成utf-8。 如果还是不行。返...

webapi文档描述-swagger

  最近做的项目使用mvc+webapi,采取前后端分离的方式,后台提供API接口给前端开发人员。这个过程中遇到一个问题后台开发人员怎么提供接口说明文档给前端开发人员,最初打算使用word文档方式进行交流,实际操作中却很少动手去写。为了解决这个问题,特意在博客园中搜索了一下api接口文档生成的文章,引起我注意的有两种方案。1.微软自带的Microsoft.AspNet.WebApi.HelpPag...

问答精选

How to create spinner in wicket

I am looking for spinner in wicket which should be simillar as JSpinner in java swing. I found class: http://www.jarvana.com/jarvana/view/org/wicketstuff/minis/1.4.9/minis-1.4.9-javadoc.jar!/org/wicke...

Selecting individual elements on mouse click HTML

I am trying to implement selecting individual elements on the click of the mouse in a html page. When clicking, I want to be able to find which element I am clicking on. The end goal is to be able to ...

How to get time from server in android?

Possible Duplicate: Does anyone know of a good JSON time server? Is there any public json or xml present on server which I can parse for current time? I shall use this time for checking the expiration...

Wildcard table matches with _TABLE_SUFFIX and sub-query

The _TABLE_SUFFIX feature is great and exactly what I was looking for to solve my problem - however it is scanning all of the data matched by the wildcard when I use a sub-query to determine which tab...

SQL order by DATE DESC + group on other Column

I have tried a lot of different grouping and ordering syntax but I am really struggling to get what I need. I am trying to order by DATE DESC, but I also want the PROJECTS to stick together (no matter...

相关问题

相关文章

热门文章

推荐文章

相关标签

推荐问答