使用MQ消息队列时需要考虑的问题

1、如何知道有消息丢失? 2、哪些环节可能丢消息? 3、如何确保消息不丢失?   引入 mq 消息中间件最直接的目的:系统解耦以及流量控制(削峰填谷)
系统解耦: 上下游系统之间的通信相互依赖,利用 mq 消息队列可以隔离上下游环境变化带来的不稳定因素。 流量控制: 超高并发场景中,引入 mq 可以实现流量 “削峰填谷” 的作用以及服务异步处理,不至于打崩服务。 引入 mq 同样带来其他问题:数据一致性。
在分布式系统中,如果两个节点之间存在数据同步,就会带来数据一致性的问题。消息生产端发送消息到 mq 再到消息消费端需要保证消息不丢失。
所以在使用 mq 消息队列时,需要考虑这 3 个问题:
如何知道有消息丢失?
哪些环节可能丢消息?
如何确保消息不丢失?
1、如何知道有消息丢失? 如何感知消息是否丢失了?可总结如下:
他人反馈: 运营、pm 反馈消息丢失。 监控报警: 监控指定指标,即时报警人工调整。kafka 集群异常、broker 宕机、broker 磁盘挂载问题、消费者异常导致消息积压等都会给用户直接感觉是消息丢失了。 案例:舆情分析中数据采集同步
pm 可自己下发采集调度指令,去采集特定数据。 pm 可通过 es 近实时查询对应数据,若没相应数据可再次下发指令。 当感知消息丢失了,那就需要一种机制来检查消息是否丢失。
检索消息 运维工具有:
查看 kafka 消费位置: > 基于 spring boot + mybatis plus + vue & element 实现的后台管理系统 + 用户小程序,支持 rbac 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能>> * 项目地址:> * 视频教程:# 查看某个topic的message数量$ ./kafka-run-class.sh kafka.tools.getoffsetshell --broker-list localhost:9092 --topic test_topic> 基于 spring cloud alibaba + gateway + nacos + rocketmq + vue & element 实现的后台管理系统 + 用户小程序,支持 rbac 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能>> * 项目地址:> * 视频教程:# 查看consumer group列表$ ./kafka-consumer-groups.sh  --list  --bootstrap-server 192.168.88.108:9092# 查看 offset 消费情况$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-1152 --describegroup                 topic           partition  current-offset  log-end-offset  lag             consumer-id                                                           host            client-idconsole-consumer-1152 test_topic      0          -               4               -               consumer-console-consumer-1152-1-2703ea2b-b62d-4cfd-8950-34e8c321b942 /127.0.0.1      consumer-console-consumer-1152-1 利用工具:kafka tools 其他可见化界面工具 2、哪些环节可能丢消息? 一条消息从生产到消费完成经历 3 个环节:消息生产者、消息中间件、消息消费者。
哪个环节都有可能出现消息丢失问题。
1)生产端 首先要认识到 kafka 生产端发送消息流程:
调用 send() 方法时,不会立刻把消息发送出去,而是缓存起来,选择恰当时机把缓存里的消息划分成一批数据,通过 sender 线程按批次发送给服务端 broker。
此环节丢失消息的场景有: 即导致 producer 消息没有发送成功
网络波动: 生产者与服务端之间的链路不可达,发送超时。现象是:各端状态正常,但消费端就是没有消费消息,就像丢失消息一样。
*解决措施: *重试 props.put(retries, 10); 不恰当配置: 发送消息无 ack 确认; 发送消息失败无回调,无日志。
producer.send(new producerrecord(topic, messagekey, messagestr),                           new callback(){...}); *解决措施: *设置 acks=1 或者 acks=all。发送消息设置回调。 回顾下重要的参数: acks
acks=0:不需要等待服务器的确认. 这是 retries 设置无效. 响应里来自服务端的 offset 总是 -1,producer只管发不管发送成功与否。延迟低,容易丢失数据。 acks=1:表示 leader 写入成功(但是并没有刷新到磁盘)后即向 producer 响应。延迟中等,一旦 leader 副本挂了,就会丢失数据。 acks=all:等待数据完成副本的复制, 等同于 -1. 假如需要保证消息不丢失, 需要使用该设置. 同时需要设置 unclean.leader.election.enable 为 true, 保证当 isr 列表为空时, 选择其他存活的副本作为新的 leader. 2)服务端 先来了解下 kafka broker 写入数据的过程:
broker 接收到一批数据,会先写入内存 pagecache(os cache)中。 操作系统会隔段时间把 os cache 中数据进行刷盘,这个过程会是 「异步批量刷盘」 。 这里就有个隐患,如果数据写入 pagecache 后 kafka broker宕机会怎样?机子宕机/掉电?
kafka broker 宕机: 消息不会丢失。因为数据已经写入 pagecache,只等待操作系统刷盘即可。
机子宕机/掉电: 消息会丢失。因为数据仍在内存里,内存ram 掉电后就会丢失数据。
解决方案 :使用带蓄电池后备电源的缓存 cache,防止系统断电异常。 对比学习 mysql 的 “双1” 策略,基本不使用这个策略,因为 “双1” 会导致频繁的 i/o 操作,也是最慢的一种。 对比学习 redis 的 aof 策略,默认且推荐的策略:**everysec(aof_fsync_everysec) 每一秒钟保存一次(默认):** 。每个写命令执行完, 只是先把日志写到 aof 文件的内存缓冲区, 每隔一秒把缓冲区中的内容写入磁盘。 拓展:kafka 日志刷盘机制
# 推荐采用默认值,即不配置该配置,交由操作系统自行决定何时落盘,以提升性能。# 针对 broker 配置:log.flush.interval.messages=10000 # 日志落盘消息条数间隔,即每接收到一定条数消息,即进行log落盘。log.flush.interval.ms=1000 # 日志落盘时间间隔,单位ms,即每隔一定时间,即进行log落盘。# 针对 topic 配置:flush.messages.flush.ms=1000 # topic下每1s刷盘flush.messages=1 # topic下每个消息都落盘# 查看 linux 后台线程执行配置$ sysctl -a | grep dirtyvm.dirty_background_bytes = 0vm.dirty_background_ratio = 10 # 表示当脏页占总内存的的百分比超过这个值时,后台线程开始刷新脏页。vm.dirty_bytes = 0vm.dirty_expire_centisecs = 3000 # 表示脏数据多久会被刷新到磁盘上(30秒)。vm.dirty_ratio = 20vm.dirty_writeback_centisecs = 500 # 表示多久唤醒一次刷新脏页的后台线程(5秒)。vm.dirtytime_expire_seconds = 43200 broker 的可靠性需要依赖其多副本机制: 一般副本数 3 个(配置参数:replication.factor=3)
leader partition 副本:提供对外读写机制。 follower partition 副本:同步 leader 数据。 副本之间的数据同步也可能出现问题:数据丢失问题和数据不一致问题。
解决方案:isr 和 epoch 机制
isr(in-sync replicas) : 当 le``ader 宕机,可以从 isr 中选择一个 follower 作为 leader。
epoch 机制: 解决 leader 副本高水位更新和 follower 副本高水位更新在时间上是存在错配问题。
tips: kafka 0.11.x 版本才引入 leader epoch 机制解决高水位机制弊端。
对应需要的配置参数如下:
acks=-1 或者 acks=all: 必须所有副本均同步到消息,才能表明消息发送成功。
replication.factor >= 3: 副本数至少有 3 个。
min.insync.replicas > 1: 代表消息至少写入 2个副本才算发送成功。前提需要 acks=-1。
举个栗子:leader 宕机了,至少要保证 isr 中有一个 follower,这样这个follwer被选举为leader 且不会丢失数据。
公式:replication.factor = min.insync.replicas + 1
unclean.leader.election.enable=false: 防止不在 isr 中的 follower 被选举为 leader。
kafka 0.11.0.0版本开始默认 unclean.leader.election.enable=false
3)消费端 消费端消息丢失场景有:
消息堆积: 几个分区的消息都没消费,就跟丢消息一样。
解决措施: 一般问题都出在消费端,尽量提高客户端的消费速度,消费逻辑另起线程进行处理。 自动提交: 消费端拉下一批数据,正在处理中自动提交了 offset,这时候消费端宕机了; 重启后,拉到新一批数据,而上一批数据却没处理完。
解决措施: 取消自动提交 auto.commit = false,改为手动 ack。 心跳超时,引发 rebalance: 客户端心跳超时,触发 rebalance被踢出消费组。如果只有这一个客户端,那消息就不会被消费了。
同时避免两次 poll 的间隔时间超过阈值:
max.poll.records:降低该参数值,建议远远小于 * * 的积。
max.poll.interval.ms: 该值要大于 / ( * ) 的值。
解决措施: 客户端版本升级至 0.10.2 以上版本。
案例:凡凡曾遇到数据同步时,消息中的文本需经过 nlp 的 ner 分析,再同步到 es。
这个过程的主要流程是:
数据同步程序从 kafka 中拉取消息。 数据同步程序将消息内的文本发送的 ner 进行分析,得到特征数组。 数据同步程序将消息同步给 es。 现象:线上数据同步程序运行一段时间后,消息就不消费了。
排查日志: 发现有 rebalance 日志,怀疑是客户端消费太慢被踢出了消费组。 本地测试: 发现运行一段时间也会出现 rebalance,且 nlp的ner 服务访问 http 500 报错。 得出结论: 因ner服务异常,导致数据同步程序消费超时。且当时客户端版本为 v0.10.1,consumer 没有独立线程维持心跳,而是把心跳维持与 poll 接口耦合在一起,从而也会造成心跳超时。 当时解决措施是:
session.timeout.ms: 设置为 25s,当时没有升级客户端版本,怕带来其他问题。 熔断机制: 增加 hystrix,超过 3 次服务调用异常就熔断,保护客户端正常消费数据。 3、如何确保消息不丢失? 掌握这些技能:
熟悉消息从发送到消费的每个阶段 监控报警 kafka 集群 熟悉方案 “mq 可靠消息投递” 怎么确保消息 100% 不丢失? 到这,总结下:


无人化农业能给农民带来什么好处,开启智慧农业的大门
AccelerComm加入O-RAN联盟,以提高开放5G网络的互操作性并最大限度地提高频谱效率
TDK 高可靠性车载用电源系统电感器的产品阵容扩大与量产
工业互联网将加速推动中国制造向中国智造转型
中国已成为名副其实的5G先锋
使用MQ消息队列时需要考虑的问题
英特尔全新Ice Lake震撼来袭,性能爆表主打灵活高效
区块链场外交易系统搭建数字资产币币交易所开发
苏宁新一代无人仓,相比传统人工拣选效率提升5倍
华数耐高温、耐腐蚀、高防护机器人守护安全生产
win10应用商店正式上架小爱同学UWP
非洲猪瘟检测设备的技术参数
Keyssa和IDT合作,推出可支持的真正“无线缆” 高性能充电和数据连接
华为荣耀8青春版怎么样?荣耀8青春版对比红米note4和360N5,1400元以内你会选谁?
基于STM32单片机的WIFI灌溉系统设计
电连接器过热的预防事项,电连接器常见故障有哪些?
入门滤波器结构-A Beginners Guide to F
无人机在风机叶片损伤和光伏组件巡检中的应用分析
电流电压表型号大全
浅析入耳式耳机的优缺点