1. 丢消息
检测消息丢失的方法
一般而言,一个新的系统刚刚上线,各方面都不太稳定,需要一个磨合期,这个时候,特别需要监控到你的系统中是否有消息丢失的情况。
如果是 IT 基础设施比较完善的公司,一般都有分布式链路追踪系统,使用类似的追踪系统可以很方便地追踪每一条消息。
可以利用消息队列的有序性来验证是否有消息丢失。原理非常简单,在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。
如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 +1。如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。
大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到你的业务代码中,待你的系统稳定后,也方便将这部分检测的逻辑关闭或者删除。
如果是在一个分布式系统中实现这个检测方法,有几个问题需要你注意。
首先,像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。
如果你的系统中 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。
Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。
确保消息可靠传递
整个消息从生产到消费的过程中,哪些地方可能会导致丢消息,以及应该如何避免消息丢失。一条消息从生产到消费完成这个过程,可以划分三个阶段
- 生产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
- 存储阶段: 在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
- 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。
1. 生产阶段
在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。
只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。
你在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。以 Kafka 为例,我们看一下如何可靠地发送消息:
同步发送时,只要注意捕获异常即可。
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println(" 消息发送成功。");
} catch (Throwable e) {
System.out.println(" 消息发送失败!");
System.out.println(e);
}
异步发送时,则需要在回调方法里进行检查。这个地方是需要特别注意的,很多丢消息的原因就是,我们使用了异步发送,却没有在回调中检查发送结果。
producer.send(record, (metadata, exception) -> {
if (metadata != null) {
System.out.println(" 消息发送成功。");
} else {
System.out.println(" 消息发送失败!");
System.out.println(exception);
}
});
2. 存储阶段
在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。
如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。
对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。
如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。消息队列通过消息复制来确保消息的可靠性的。
3. 消费阶段
消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下
次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。
你在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
同样,我们以用 Python 语言消费 RabbitMQ 消息为例,来看一下如何实现一段可靠的消费代码:
def callback(ch, method, properties, body):
print(" [x] 收到消息 %r" % body)
# 在这儿处理收到的消息
database.save(body)
print(" [x] 消费完成 ")
# 完成消费业务逻辑后发送消费确认响应
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
在消费的回调方法 callback 中,正确的顺序先是把消息保存到数据库,然后再发送消费确认响应。这样如果保存消息到数据库失败了,就不会执行消费确认的代码,下次拉到的还是这条消息,直到消费成功。
两个消费者先后去拉消息是否能拉到同一条消息?
首先,消息队列一般都会有协调机制,不会让这种情况出现,但是由于网络不确定性,这种情况还是在极小概率下会出现的。
在同一个消费组内,A消费者拉走了index=10的这条消息,还没返回确认,这时候这个分区的消费位置还是10,B消费者来拉消息,可能有2种情况:
-
\1. 超时前,Broker认为这个分区还被A占用着,会拒绝B的请求。
-
\2. 超时后,Broker认为A已经超时没返回,这次消费失败,当前消费位置还是10,B再来拉消息,会给它返回10这条消息。
-
在生产阶段,你需要捕获消息发送的错误,并重发消息。
-
在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或者磁盘损坏而丢失。
-
在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。
你在理解了这几个阶段的原理后,如果再出现丢消息的情况,应该可以通过在代码中加一些日志的方式,很快定位到是哪个阶段出了问题,然后再进一步深入分析,快速找到问题原因。
2. 重复消息
在消息传递过程中,如果出现传递失败的情况,发送方会执行重试,重试的过程中就有可能会产生重复的消息。对使用消息队列的业务系统来说,如果没有对重复消息进行处理,就有可能会导致系统的数据出现错误。
消息重复的情况必然存在
在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:
- At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
- At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
- Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。
这个服务质量标准不仅适用于 MQTT,对所有的消息队列都是适用的。我们现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息
队****列很难保证消息不重复。
Kafka 支持的“Exactly once”和我们刚刚提到的消息传递的服务质量标准“Exactly once”是不一样的,它是 Kafka 提供的另外一个特性,Kafka 中支持的事务也和我们通常意义理解的事务有一定的差异。在 Kafka
中,事务和 Excactly once 主要是为了配合流计算使用的特性。巧妙地用了两个所有人都非常熟悉的概念“事务”和“Exactly once”来包装它的新的特性,实际上它实现的这个事务和 Exactly once 并不是我们通常
理解的那两个特性。
为什么大部分消息队列都选择只提供 At least once 的服务质量,而不是级别更高的 Exactly once?
解决一个问题,往往会引发别的问题。若消息队列实现了exactly once,会引发的问题有:
①消费端在pull消息时,需要检测此消息是否被消费,这个检测机制无疑会拉低消息消费的速度。可以预想到,随着消息的剧增,消费性能势必会急剧下降,导致消息积压;
②检查机制还需要业务端去配合实现,若一条消息长时间未返回ack,消息队列需要去回调看下消费结果(这个类似于事物消息的回查机制)。这样就会增加业务端的压力,与很多的未知因素。
所以,消息队列不实现exactly once,而是at least once + 幂等性,这个幂等性让给我们去处理。
最重要的原因是消息队列即使做到了Exactly once级别,consumer也还是要做幂等。因为在consumer从消息队列取消息这里,如果consumer消费成功,但是ack失败,consumer还是会取到重复的消息,所以消
息队列花大力气做成Exactly once并不能解决业务侧消息重复的问题。
1、At least once + 幂等消费 = Exactly once,所以对于消息队列来讲,要做到Exactly once,其实是需消费端的共同配合(幂等消费)才可完成,消息队列基本只提供At least once的实现;
2、从给的几种幂等消费的方案看,需要引入数据库、条件更新、分布式事务或锁等额外辅助,消息队列如果需要保障Exactly once,会导致消费端代码侵入,例如需要消费端增加消息队列用来处理幂等的client
端,而消费端的形态可是太多了,兼容适配工作量巨大。故这个Exactly once留给用户自己处理,并且具有选择权,毕竟不是所有业务场景都需要Exactly once,例如机房温度上报的案例。
如果队列的实现是At least once,但是为了确保消息不丢失,Broker Service会进行一定的重试,但是不可能一直重试,如果一直重试失败怎么处理了?
有的消息队列会有一个特殊的队列来保存这些总是消费失败的“坏消息”,然后继续消费之后的消息,避免坏消息卡死队列。这种坏消息一般不会是因为网络原因或者消费者死掉导致的,大多都是消息数据本身有
问题,消费者的业务逻辑处理不了导致的。
用幂等性解决重复消息问题
一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性。
幂等(Idempotence)是一个数学上的概念,它是这样定义的:
如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。
这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务。一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。
一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变。
比如在不考虑并发的情况下,“将账户 X 的余额设置为 100 元”,执行一次后对系统的影响是,账户 X 的余额变成了 100 元。只要提供的参数 100 元不变,那即使再执行多少次,账户 X 的余额始终都是 100 元,
不会变化,这个操作就是一个幂等的操作。
再比如“将账户 X 的余额加 100 元”,这个操作它就不是幂等的,每执行一次,账户余额就会增加 100 元,执行多次和执行一次对系统的影响(也就是账户的余额)是不一样的。
如果我们系统消费消息的业务逻辑具备幂等性,那就不用担心消息重复的问题了,因为同一条消息,消费一次和消费多次对系统的影响是完全一样的。也就可以认为,消费多次等于消费一次。
从对系统的影响结果来说:At least once + 幂等消费 = Exactly once。
那么如何实现幂等操作呢?最好的方式就是,从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。但是,不是所有的业务都能设计成天然幂等的,这里就需要一些方法和技巧来实现幂等。
下面我给你介绍几种常用的设计幂等操作的方法:
1. 利用数据库的唯一约束实现幂等
刚刚提到的那个不具备幂等特性转账的例子:将账户 X 的余额加 100 元。在这个例子中,我们可以通过改造业务逻辑,让它具备幂等性。
首先,我们可以限定,对于每个转账单每个账户只可以执行一次变更操作,在分布式系统中,这个限制实现的方法非常多,最简单的是我们在数据库中建一张转账流水表,这个表有三个字段:转账单 ID、账户
ID 和变更金额,然后给转账单 ID 和账户 ID 这两个字段联合起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录。
这样,我们消费消息的逻辑可以变为:“在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账
户 ID 转账单 ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。我们只要写一个 SQL,正确地实现它就可以了。
基于这个思路,不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,
比如, **Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂****等消费。 ( redis中的hash:**hsetnx ; )
比如,Elasticsearch中的幂等操作: PUT /movie_index/movie/3,加上文档 ID
2. 为更新的数据设置前置条件
另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次
更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。
比如,刚刚我们说过,“将账户 X 的余额增加 100 元”这个操作并不满足幂等性,我们可以把这个操作加上一个前置条件,变为:“如果账户 X 当前的余额为 500 元,将余额加 100 元”,这个操作就具备了幂等
性。对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。
但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本
号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。
3. 记录并检查操作
如果上面提到的两种实现幂等方法都不能适用于你的场景,我们还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别
简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。
具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
原理和实现是不是很简单?其实一点儿都不简单,在分布式系统中,这个方法其实是非常难实现的。首先,给每个消息指定一个全局唯一的 ID 就是一件不那么简单的事儿,方法有很多,但都不太好同时满足简
单、高可用和高性能,或多或少都要有些牺牲。更加麻烦的是,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。
比如说,对于同一条消息:“全局 ID 为 8,操作为:给 ID 为 666 账户增加 100 元”,有可能出现这样的情况:
- t0 时刻:Consumer A 收到条消息,检查消息执行状态,发现消息未处理过,开始执行“账户增加 100 元”;
- t1 时刻:Consumer B 收到条消息,检查消息执行状态,发现消息未处理过,因为这个时刻,Consumer A 还未来得及更新消息执行状态。
这样就会导致账户被错误地增加了两次 100 元,这是一个在分布式系统中非常容易犯的错误,一定要引以为戒。
对于这个问题,当然我们可以用事务来实现,也可以用锁来实现,但是在分布式系统中,无论是分布式事务还是分布式锁都是比较难解决问题。
几种实现幂等操作的方法
- 可以利用数据库的约束来防止重复更新数据,
- 可以为数据更新设置一次性的前置条件,来防止重复消息,如果这两种方法都不适用于你的场景,
- 还可以用“记录并检查操作”的方式来保证幂等,这种方法适用范围最广,但是实现难度和复杂度也比较高,一般不推荐使用。
这些实现幂等的方法,不仅可以用于解决重复消息的问题,也同样适用于,在其他场景中来解决重复请求或者重复调用的问题。比如,我们可以将 HTTP 服务设计成幂等的,解决前端或者 APP 重复提交表单数
据的问题;也可以将一个微服务设计成幂等的,解决 RPC 框架自动重试导致的重复调用问题。这些方法都是通用的。
3. 消息积压问题
在使用消息队列遇到的问题中,消息积压这个问题,应该是最常遇到的问题。
消息积压的直接原因,一定是系统中的某个部分出现了性能问题,来不及处理上游发送的消息,才会导致消息积压。 所以在使用消息队列时,如何来优化代码的性能,避免出现消息积压。
优化性能来避免消息积压
在使用消息队列的系统中,对于性能的优化,主要体现在生产者和消费者这一收一发两部分的业务逻辑中。对于消息队列本身的性能,不需要太关注。
主要原因是,对于绝大多数使用消息队列的业务来说,消息队列本身的处理能力要远大于业务系统的处理能力。主流消息队列的单个节点,消息收发的性能可以达到每秒钟处理几万至几十万条消息的水平,还可
以通过水平扩展 Broker 的实例数成倍地提升处理能力。而一般的业务系统需要处理的业务逻辑远比消息队列要复杂,单个节点每秒钟可以处理几百到几千次请求,已经可以算是性能非常好的了。所以,对于消
息队列的性能优化,我们更关注的是,在消息的收发两端,我们的业务代码怎么和消息队列配合,达到一个最佳的性能。
1. 发送端性能优化
如果说,代码发送消息的性能上不去,需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的。
对于发送消息的业务逻辑,只需要注意设置合适的并发和批量大小,就可以达到很好的发送性能。
Producer 发送消息的过程,Producer 发消息给 Broker,Broker 收到消息后返回确认响应,这是一次完整的交互。假设这一次交互的平均时延是 1ms,把这 1ms 的时间分解开,它包括了下面这些步骤的耗时:
- 发送端准备数据、序列化消息、构造请求等逻辑的时间,也就是发送端在发送网络请求之前的耗时;
- 发送消息和返回响应在网络传输中的耗时;
- Broker 处理消息的时延。
如果是单线程发送,每次只发送 1 条消息,那么每秒只能发送 1000ms / 1ms * 1 条 /ms = 1000 条 消息,这种情况下并不能发挥出消息队列的全部实力。
无论是增加每次发送消息的批量大小,还是增加并发,都能成倍地提升发送性能。至于到底是选择批量发送还是增加并发,主要取决于发送端程序的业务性质。
比如说,你的消息发送端是一个微服务,主要接受 RPC 请求处理在线业务。很自然的,微服务在处理每次请求的时候,就在当前线程直接发送消息就可以了,因为所有 RPC 框架都是多线程支持多并发的,自
然也就实现了并行发送消息。并且在线业务比较在意的是请求响应时延,选择批量发送必然会影响 RPC 服务的时延。这种情况,比较明智的方式就是通过并发来提升发送性能。
如果你的系统是一个离线分析系统,离线系统在性能上的需求是什么呢?它不关心时延,更注重整个系统的吞吐量。发送端的数据都是来自于数据库,这种情况就更适合批量发送,你可以批量从数据库读取数
据,然后批量来发送消息,同样用少量的并发就可以获得非常高的吞吐量。
2. 消费端性能优化
使用消息队列的时候,大部分的性能问题都出现在消费端,如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压。如果这种性能倒挂的问题只是暂时的,那问题不大,只要消费端的性能恢复之后,超过发送端的性能,那积压的消息是可以逐渐被消化掉的。
要是消费速度一直比生产速度慢,时间长了,整个系统就会出现问题,要么,消息队列的存储被填满无法提供服务,要么消息丢失,这对于整个系统来说都是严重故障。
所以,在设计系统的时候,一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。
消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。特别需要注意的一点是,在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区
(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。因为对于消费者,在每个分区上实际只能支持单线程消费。
一个解决消费慢的问题常见的错误:
它收消息处理的业务逻辑可能比较慢,也很难再优化了,为了避免消息积压,在收到消息的 OnMessage 方法中,不处理任何业务逻辑,把这个消息放到一个内存队列里面就返回了。然后它可以启动很多的业务
线程,这些业务线程里面是真正处理消息的业务逻辑,这些线程从内存队列里取消息处理,这样它就解决了单个 Consumer 不能并行消费的问题。
这个方法是不是很完美地实现了并发消费?错误! 因为会丢消息。如果收消息的节点发生宕机,在内存队列中还没来及处理的这些消息就会丢失。
在onMessage方法结束后,如果没有抛异常,就自动ACK了。而这个时候,消息只是在内存队列中,并没有被真正处理完。
如果onMessage方法中,收到消息后不确认,等真正处理完消息再确认,就可以了吧,这样就可以用内存队列了
理论上是可以的,但要注意,像RocketMQ,采用默认配置的时候,onMessage方法结束后,如果没抛异常,默认就会自动确认了。
在消费端是否可以通过批量消费的方式来提升消费性能?在什么样场景下,适合使用这种方法?或者说,这种方法有什么局限性?
批量消费即一次取一批消息,等这一批消息都成功了,再提交最后一条消息的位置作为新的消费位置。如果其中任何一条失败,则认为整批都失败。
批量消费应该是与消息处理是需要实时与否有关。如果需要实时处理,如订单相关的,就不能批量,但是发送提醒邮件之类的,就可以。
批量消费有意义的场景要求:
- 1.要么消费端对消息的处理支持批量处理,比如批量入库
- \2. 要么消费端支持多线程/协程并发处理,业务上也允许消息无序。
- \3. 或者网络带宽在考虑因素内,需要减少消息的overhead。
批量消费的局限性:
- \1. 需要一个整体ack的机制,一旦一条靠前的消息消费失败,可能会引起很多消息重试。
- \2. 多线程下批量消费速度受限于最慢的那个线程。
但其实以上局限并没有影响主流MQ的实现了批量功能。
1、要求消费端能够批量处理或者开启多线程进行单条处理 2、批量消费一旦某一条数据消费失败会导致整批数据重复消费 3、对实时性要求不能太高,批量消费需要Broker积累到一定消费数据才会发送到Consumer
消费端进行批量操作,感觉和上面的先将消息放在内存队列中,然后在并发消费消息,如果机器宕机,这些批量消息都会丢失,如果在数据库层面,批量操作在大事务,会导致锁的竞争,并且也会导致主备的不
一致。如果是一些不重要的消息如对日志进行备份,就可以使用批量操作之类的提高消费性能,因为一些日志消息丢失也是可以接受的。
如果使用了批量消费的方式,那么就需要批量确认,如果一次消费十条消息,除了第七条消费失败了,其他的都处理成功了,但是这中情况下broker只能将消费的游标修改成消息7,而之后的消息虽然处理成功
了,但是也只能使用类似于拉回重传的方式再次消费,浪费性能,而且这种批量消费对于消费者的并发我觉得不是很友好,可能消费者1来了取走了十条消息在处理,这时候消费者2过来了也想取十条消息,但是
他需要等待消费者1进行ack才可以取走消息。
如何判断增加多少consumer消费实例的个数?
可以简单计算一下,消费并行度:单实例平均消费tps * 消费并行度 > 生产消息的总tps 消费并行度 = min(consumer实例数,分区数量)
消息积压的紧急处理
还有一种消息积压的情况是,日常系统正常运转的时候,没有积压或者只有少量积压很快就消费掉了,但是某一个时刻,突然就开始积压消息并且积压持续上涨。这种情况下需要你在短时间内找到消息积压的原
因,迅速解决问题才不至于影响业务。
导致突然积压的原因肯定是多种多样的,不同的系统、不同的情况有不同的原因,不能一概而论。但是,排查消息积压原因,是有一些相对固定而且比较有效的方法的。
能导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了。
大部分消息队列都内置了监控的功能,只要通过监控数据,很容易确定是哪种原因。如果是单位时间发送的消息增多,比如说是赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,唯一的
方法是通过扩容消费端的实例数来提升总体的消费能力。
如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。
还有一种不太常见的情况,你通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这
种情况也会拖慢整个系统的消费速度。
如果监控到消费变慢了,你需要检查你的消费实例,分析一下是什么原因导致消费变慢。优先检查一下日志是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下你的消费线程是不是卡在
什么地方不动了,比如触发了死锁或者卡在等待某些资源上了。
**优化消息收发性能,**预防消息积压的方法有两种,增加批量或者是增加并发,在发送端这两种方法都可以使用,在消费端需要注意的是,增加并发需要同步扩容分区数量,否则是起不到效果的。
对于系统发生消息积压的情况,需要先解决积压,再分析原因,毕竟保证系统的可用性是首先要解决的问题。快速解决积压的方法就是通过水平扩容增加 Consumer 的实例数量。
消息积压处理: 1、发送端优化,增加批量和线程并发两种方式处理 2、消费端优化,优化业务逻辑代码、水平扩容增加并发并同步扩容分区数量 查看消息积压的方法: 1、消息队列内置监控,查看发送端发送消息与消费端消费消息的速度变化 2、查看日志是否有大量的消费错误 3、打印堆栈信息,查看消费线程卡点信息
面试解决消息积压的方法: (1)临时扩容,增加消费端,用硬件提升消费速度。 (2)服务降级,关闭一些非核心业务,减少消息生产。 (3)通过日志分析,监控等找到挤压原因,消息队列三部分,上游生产者是否异常生产大量数据,中游消息队列存储层是否出现问题,下游消费速度是否变慢,就能确定哪个环节出了问题 (4)根据排查解决异常部分。 (5)等待积压的消息被消费,恢复到正常状态,撤掉扩容服务器。
4. 如何保证消息的严格顺序?
怎么来保证消息的严格顺序?主题层面是无法保证严格顺序的,只有在队列上才能保证消息的严格顺序。
如果说,你的业务必须要求全局严格顺序,就只能把消息队列数配置成 1,生产者和消费者也只能是一个实例,这样才能保证全局严格顺序。
大部分情况下,并不需要全局严格顺序,只要保证局部有序就可以满足要求了。比如,在传递账户流水记录的时候,只要保证每个账户的流水有序就可以了,不同账户之间的流水记录是不需要保证顺序的。
如果需要保证局部严格顺序,可以这样来实现。在发送端,我们使用账户 ID 作为 Key,采用一致性哈希算法计算出队列编号,指定队列来发送消息。一致性哈希算法可以保证,相同 Key 的消息总是发送到同一
个队列上,这样可以保证相同 Key 的消息是严格有序的。如果不考虑队列扩容,也可以用队列数量取模的简单方法来计算队列编号。