Kafka 事务相关总结

Updated on with 0 views and 0 comments

Apache Kafka 在 Exactly-Once Semantics(EOS)上三种粒度的保证如下:

  1. Idempotent Producer:Exactly-once,in-order,delivery per partition
  2. Transactions:Atomic writes across partitions
  3. Exactly-Once stream processing across read-process-write tasks

幂等性

  • 幂等性是来解决什么问题的?主要是解决数据重复的问题
  • 幂等性是有条件的:
    1. 只能保证 Producer 在单个会话内不丟不重,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重)
    2. 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性,当涉及多个 Topic-Partition 时,这中间的状态并没有同步
  • 两个重要机制:
    1. PID(Producer ID),用来标识每个 producer client
    2. sequence numbers,client 发送的每条消息都会带相应的 sequence number,Server 端就是根据这个值来判断数据是否重复
  • TransactionManager(Producer 端)主要功能:
    1. 记录本地的事务状态(事务性时必须)
    2. 记录一些状态信息以保证幂等性,比如:每个 topic-partition 对应的下一个 sequence numbers 和 last acked batch(最近一个已经确认的 batch)的最大的 sequence number 等
    3. 记录 ProducerIdAndEpoch 信息(PID 信息)
  • 简单来说,其实现机制概括为:
    1. Server 端验证 batch 的 sequence number 值,不连续时,直接返回异常
    2. Client 端请求重试时,batch 在 reenqueue 时会根据 sequence number 值放到合适的位置(有序保证之一)
    3. Sender 线程发送时,在遍历 queue 中的 batch 时,会检查这个 batch 是否是重试的 batch,如果是的话,只有这个 batch 是最旧的那个需要重试的 batch,才允许发送,否则本次发送跳过这个 Topic-Partition 数据的发送等待下次发送

事务性

  • 从 Producer 的角度来看
    1. 跨会话的幂等性写入:即使中间故障,恢复后依然可以保持幂等性
    2. 跨会话的事务恢复:如果一个应用实例挂了,启动的下一个实例依然可以保证上一个事务完成(commit 或者 abort)
    3. 跨多个 Topic-Partition 的幂等性写入,Kafka 可以保证跨多个 Topic-Partition 的数据要么全部写入成功,要么全部失败,不会出现中间状态
  • 从 Consumer 的角度来看
    1. 对于 compacted topic,在一个事务中写入的数据可能会被新的值覆盖
    2. 一个事务内的数据,可能会跨多个 log segment,如果旧的 segment 数据由于过期而被清除,那么这个事务的一部分数据就无法被消费到了
    3. Consumer 在消费时可以通过 seek 机制,随机从一个位置开始消费,这也会导致一个事务内的部分数据无法消费
    4. Consumer 可能没有订阅这个事务涉及的全部 Partition
  • 关于 Kafka 事务性语义提供的保证主要以下三个:
    1. Atomic writes across multiple partitions
    2. All messages in a transaction are made visible together, or none are
    3. Consumers must be configured to skip uncommitted messages
  • 事务性要解决的问题,其实更多的是解决幂等性中没有解决的问题
  • 事务性实现的关键,Kafka借鉴了 XA 协议的思想
    1. 如何管理事务相关的状态信息?引入 Kafka Server 端的 TransactionCoordinator 角色
    2. TransactionCoordinator 如何实现高可用? min.isr + ack 机制,所有事务状态信息都会持久化到 __transaction_state 这个内部 topic
    3. 每次 Producer 在重启时,PID 都会更新为一个新值,这种情况应该怎么办?Kafka 在 Producer 端引入了一个 TransactionalId 来解决这个问题,这个 txn.id 是由应用来配置的
    4. 如何标识一个事务操作的开始、进行、完成的状态?事务状态转移,简化 TransactionCoordinator 对于事务的管理
  • TransactionCoordinator 主要功能:
    1. 处理事务相关的请求
    2. 维护事务的状态信息
    3. 向 Broker 发送 Transaction Marker 数据
  • 什么是 Transaction Marker?
    Transaction Marker 也叫做 control messages,它的作用主要是告诉这个事务操作涉及的 Topic-Partition Set 的 leaders 当前的事务操作已经完成,可以执行 commit 或者 abort(Marker 主要的内容就是 commit 或 abort),这个 marker 数据由该事务的 TransactionCoordinator 来发送的
  • 事务性的整体流程
    1. Finding a TransactionCoordinator
    2. Getting a PID
    3. Starting a Transaction
    4. Consume-Porcess-Produce Loop
    5. Committing or Aborting a Transaction
  • 几个问题
    1. txn.id 是否可以被多 Producer 使用,如果有多个 Producer 使用了这个 txn.id 会出现什么问题?
    2. TransactionCoordinator Fencing 和 Producer Fencing 分别是什么,它们是用来解决什么问题的?
    3. 对于事务的数据,Consumer 端是如何消费的,一个事务可能会 commit,也可能会 abort,这个在 Consumer 端是如何体现的?
    4. 对于一个 Topic,如果既有事务数据写入又有其他 topic 数据写入,消费时,其顺序性时怎么保证的?
    5. 如果 txn.id 长期不使用,server 端怎么处理?
    6. PID Snapshot 是做什么的?是用来解决什么问题?

标题:Kafka 事务相关总结
作者:yanghao
地址:http://solo.fancydigital.com.cn/articles/2021/09/01/1630467775072.html