如何保证不丢消息

一、消息丢失的原因有哪些

1、消息生产阶段

  • 生产者配置错误:生产者在发送消息时,配置错误的主体、分区或者消息的过期时间,造成消息无法正确发送到MQ中。
  • 网络故障:生产者与MQ集群之间网络故障。

2、消息存储阶段

  • 磁盘故障:以Kafka举例,如果磁盘出现故障,Kafka中的消息无法落盘,可能导致消息的丢失。
  • 同步策略:消息未同步到follow节点,leader节点就挂了。

3、消息消费阶段

  • 消费者处理失败:在处理消费逻辑时,由于程序bug等原因,造成系统异常,错误应答从而丢失消息。
  • 消费者提交偏移量错误:当消费者消费完消息之后,提交错误的偏移量造成消息的重复消费或者消息丢失。

二、消息丢失的解决方案有哪些

1、消息生产阶段

  • 配置正确的主体、分区、以及TTL。
  • 使用ACK应答,等待消息被MQ写入成功之后在确认为发送成功。
  • ack=-1或者ack=all:生产者需要等待ISR中的所有副本都成功写入消息才为消息发送成功。
  • 消息发送重试。

  • 设置合理的消息缓冲区大小。

buffer.memory:默认33554432。生产者用于缓存一批发送到服务器消息的总内存字节数。

2、消息存储阶段

  • 配置适当的副本数量和ISR。在发生故障的时候消息仍然可以从其他的副本中进行恢复。

  • 使用监控,实时检测消息的复制、磁盘的使用率。

  • 定期备份。

3、消息消费阶段

  • 编写健壮的代码。对于可能产生的异常原因进行分析处理。当发生异常时,可以做如下处理:
    1. 记录错误,有异常处理机制,保证能够正确的处理异常情况。
    2. 消息重试消息。(需要注意消费幂等以及死循环造成消息堆积)
  • 使用手动提交偏移量。(需保证所有的异常情况代码中都有对应的异常处理机制,也就是第一点,健壮的代码)
  • 使用自动提交偏移量。(需要保证消费逻辑正确)
  • 使用监控,监控消费者的消费情况,发现异常立即上报。
  • 正确的消费者组管理,类似消费者重平衡或者重启等造成的消息偏移量丢失。
  • 备份,发生异常或者消息丢失时,可以跟踪到消费者的消费情况,直接使用备份恢复。

以rockermq场景而言

生产者:

  • 同步阳塞的方式发送消息,加上失败重试机制,可能broker存储失败,可以通过查询确认
  • 异步发送需要重写回调方法,检查发送结果
  • ack机制,可能存储CommitLog,存储ConsumerQueue失败,此时对消费者不可见

broker:

同步刷盘、集群模式下采用同步复制、会等待slave复制完成才会返回确认

消费者:

  • offset手动提交,消息消费保证幂等