Kafka原理与实践
type
status
date
slug
summary
tags
category
difficulty
icon
password
为什么选择Kafka
kafka提供高吞吐量(10万级TPS)、低延迟(ms级)、高可用(分区多副本策略)、消息持久化等特性,提供强大的MQ及流式计算能力。
特点如下:
- 消息传输使用纯二进制的字节序列
- 支持点对点模型、发布订阅模型
- 不但是消息引擎系统,还是分布式流处理平台
- 基于零拷贝机制的网络和磁盘数据传输
- 顺序读写磁盘,规避随机读写磁盘慢的劣势
- 数据多分区、多副本冗余存储提高并发处理能力
- 提供基于selector多路复用的java客户端
Kafka的架构原理
kafka概念
Broker:kafka集群中包含的多个服务器节点,如图中Server1、Server2。负责接收客户端发送过来的请求,以及对消息进行持久化。
Topic:kafka中发布订阅的对象就是主题,主题是kafka集群中用来存储消息的第一层结构。
Partitioning:分区,如图中的P0、P1、P2、P3,kafka中每个主题包含多个分区,每个分区是一组有序的消息日志,分区是kafka集群中用来存储消息的第二层结构。
Replica:副本,每个分区都有多个副本,副本类型包含Leader副本和Follower副本,由Leader副本对外对接客户端提供服务,而Follower副本保持与Leader副本同步,并在必要的时候切换为Leader副本实现HA。
Consumer Group:消费者组,多个消费者实例共同组成一个消费者组来消费一组主题,增加消费端的吞吐量TPS。这组主题中的每个分区都只会被组内的一个消费者实例消费,其他消费者实例不能消费它。
Consumer Offset:消费者位移,每个消费者在消费消息的过程中记录它当前消费到了分区的哪个位置上的字段。
图中:Broker0上的分区0、1、2都在Broker1上有对应副本。
绿色为Leader副本,蓝色为Follower副本。Follower副本从Leader副本上同步分区消息。
同一个分区的所有副本中只能有一个Leader副本,N-1个Follower副本。
分区0的消息位移指向9的位置,说明当前分区下次消费将从9位置开始,也就是之前的消息已经被消费过。
Kafka基于领导者(Leader-based)的副本机制
领导者-追随者副本的机制设计意图
- 方便实现“Read-your-writes”
即当你使用生产者 API 向 Kafka 成功写入消息后,马上使用消费者 API 去读取刚才生产的消息,不会出现读取不到的情况。
- 方便实现单调读(Monotonic Reads)
就是对于一个消费者用户而言,在多次消费消息时,它不会看到某条消息一会儿存在一会儿不存在。就是说kafka不会因为异步复制出现读取不一致的情况。
领导者副本-追随者副本的同步管理机制-ISR
- ISR副本集合
也就是 In-sync Replicas,ISR副本集合的特点是ISR集合中的副本都是与Leader进行同步的副本(包括Leader副本自己)。ISR集合之外的副本就是与Leader副本不同步的副本。
- ISR同步的含义
ISR副本的同步指的是Follower副本落后Leader副本的最长时间间隔,默认 replica.lag.time.max.ms=10s(broker端参数) ,也就是说需要赶上来的时间超过10s那就是不同步,要被排除在ISR副本集合之外。
- ISR集合动态管理
Follower 副本唯一的工作就是不断地从 Leader 副本拉取消息,然后写入到自己的提交日志中。如果这个同步过程的速度持续慢于 Leader 副本的消息写入速度,那么在 replica.lag.time.max.ms 时间后,此 Follower 副本就会被认为是与 Leader 副本不同步的,因此不能再放入 ISR 中。 此时,Kafka 会自动收缩 ISR 集合,将该副本“踢出”ISR。如果该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回 ISR 的。
领导者选举
当Leader副本所在Broker宕机,kafka依托于zookeeper则可以监控到节点丢失,并开启新一轮的Leader选举,从所有Follower副本中选一个作为新Leader副本。
老的Leader副本恢复后只能作为Follower副本加入集群中。
UnClean选举
由于非ISR副本的存在,这些非ISR集合中的副本跟Leader副本不同步,如果它们也参与选举并成为Leader副本的话,则会导致数据丢失。所以生产建议设置
unclean.leader.election.enable=false,也就是禁止UnClean选举,虽然开启这种UnClean选举可以保证Leader副本一直存在不至于对外停止服务,但是会造成数据的丢失一般不被允许。
kafka分区设计
kafka之所以采用分区设计,是为了能够提供负载均衡的能力,实现消息存储的伸缩能力,不同分分区将被放置到不同的broker节点上,数据的读写请求将被分散在
独立的broker节点的分区上进行。必要时可以通过增加broker节点对主题的分区进行水平扩容,提升集群的整体吞吐量。
kafka消息持久化
Kafka 的消息是有留存时间设置的,默认是 1 周,也就是说 Kafka 默认删除 1 周前的数据。
当生产者发送一条消息后,kafka若干broker节点收到消息将会写入日志文件然后响应生产者消息写入成功。当然,也可以选择只要有一个broker写入消息成功,就响应生产者。可以设置生产这参数acks=all或其他值来设置。
对于写入成功的消息,如果是保存在了N个broker上,那么消息不丢失的条件是N≥1.
kafka的Reblance机制
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。
一般发生Reblance有三个场景:
- 消费者组成员数量发生变化。(比如消费者心跳未到达broker导致消费者实例被踢出组外,或者消费者poll之后消费时间过长超过两次poll间隔主动发起离开Group)
- 消费者组订阅的主题数量发生了变化。(比如正则方式订阅主题后主题增加或减少)
- 消费者组订阅的主题分区数发生变化。(比如重新给主题增加分区提升主题的整体并发量)
Reblance的缺点:
- 在Rebalance 发生时,Group 下所有的 Consumer 实例都会协调在一起共同参与,全部被重新分配所有分区
- 在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成
- Reblance过程本身比较慢,Reblance一次可能花费很长时间。
在实际场景中,需要尽量避免不必要的Reblance,比如
- Reblance‘发生的第一个场景中如果是由于网络不稳定导致心跳未到达导致消费者实例被踢出组,可以通过设置
session.timeout.ms(Coordinator 在 N秒之内没有到消费者心跳则将其移除) 和 heartbeat.interval.ms(发送心跳请求频率)来增加发送频率,减少未收到心跳的概率。
- 如果是消费者处理消息过长导致poll间隔过大,则可以适当调长max.poll.interval.ms参数。
kafka Java API应用
消费端如何找到消息对应的Leader副本所在Broker节点?
消费者在调用poll方法时,内部会建立三次TCP连接。
- 第一次TCP找到协调者Broker
消费者客户端发送FindCoordinator请求,获取元数据来找到kafaka集群中的协调者Broker。
将请求发送给集群中负载最少的Broker。负载最少的好意思是 看消费者连接的所有 Broker 中,谁的待发送请求最少。
- 第二次TCP跟协调者Broker建立连接进行 消费者与分区的协调管理。
- 第三次TCP向消息的Leader副本分区(多个分区对应Leader的Broker)所在Broker发送请求获取消息。此时废弃第一次建立的用来获取元数据的TCP连接,之后复用本次TCP连接来定期获取元数据。
消费者关闭TCP的方式:
消费者端参数 connection.max.idle.ms 控制的,默认值是 9 分钟,即如果某个 Socket 连接上连续 9 分钟都没有任何请求的话,那么消费者会强行杀掉这个 Socket 连接。
消费位移
- Consumer 的消费位移,它记录了 Consumer 要消费的下一条消息的位移。Consumer需要为分配给它的每个分区提交消费位移。
- 提交消费位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 __consumer_offsets中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。
位移提交方式
自动提交
- 自动提交也就是kafka的消费客户端在后台自动提交,无需手动干预。自动提交时默认开启的,由参数 enable.auto.commit=true控制。
当consumer调用poll方法时会先提交上次poll拉取的所有消息的位移,然后才拉取并处理下一批消息。
消费端还有另外一个参数 auto.commit.interval.ms=5s来控制提交位移的周期,5s是其默认值。
- 位移自动提交有重复消费的可能
比如自动提交位移之后的3s后发生了Reblance,而此时poll拉取的消息的位移还没有提交,等Reblance过后,所有Consumer都会从上一次提交的位移处继续消费,也就是3s前提交的位移处,导致重复消费。
一个优化的方式时减少auto.commit.interval.ms的周期,这样可以以更小的周期提交位移,减少重复消费的窗口。
注意,可能出现自动提交位移导致的撑满磁盘问题:
假设 Consumer 当前消费到了某个主题的最新一条消息,位移是 100,之后该主题没有任何新消息产生,故 Consumer 无消息可消费了,所以位移永远保持在 100。
由于是自动提交位移,位移主题中会不停地写入位移 =100 的消息。 显然 Kafka 只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。
这就要求 Kafka 必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。
解决办法 :只需要最新的一次位移数据就可以了,删除掉过期的位移数据。 Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。
Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner 。
手动提交
手动提交时使用如kafka的java api自己程序提交,首先设置 enable.auto.commit=false。然后使用consumer客户端API的
commitSync或者 commitAsync来提交位移。
Kafak监控运维实践
常用命令
- 启动kafka
- 创建topic
其中 --partitions 和 --replication-factor 分别设置了主题的分区数以及每个分区下的副本数。
- 查看topic列表
- 查看单个topic详细信息
- 查看消费者组信息
kafka0.9版本之后消费者组和offsets信息不再存储zk,而是存在broker节点上。
- 增加主题分区数
目前 Kafka 不允许减少某个主题的分区数。你可以使用 kafka-topics 脚本,结合 --alter 参数来增加某个主题的分区数
- 修改主题级别参数
比如动态设置主题级别参数 max.message.bytes
- 删除主题
删除操作是异步的,执行完这条命令不代表主题立即就被删除了。它仅仅是被标记成“已删除”状态而已。Kafka 会在后台默默地开启主题删除操作
- 生产消息
- 消费消息
异常场景模拟
重分配主题分区
test主题原来只有一个分区在broker0上,这里先创建三个分区,然后编写分区对应副本文件进行执行,这样就可以给test增加分区和副本了。
编写分配方案:topics - to - move . json文件
注意:一定得先创建三个分区存在,不然会报错!
执行命令:
结果:
分区复制异常
分区复制中,第三个分区也就是2号分区只在第一台broker,在1号和2号上都没有存在。 Under Replicated显示为true。
查看各个broker机器kafka-logs下分区副本,发现只有broker0上有三个分区。
在broker0上查看server.log日志找到:
连接到broker失败,经查找时hosts文件没有配置对应虚拟域名。
网络恢复之后,复制正常:
重启Broker引起的问题
Broker上的Leader分区副本存在倾斜。
重启broker1和broker2之后,集群中分区发生了Leader副本的重新分配。
broker0和broker1的Preferred Leader变为false,因为broker0和broker1的Leader分区副本在broker0上,与Replicas中的第一个副本编号不一致,Preferred Leader变为false!
以上问题在broker恢复的时候可能出现,几分钟后自己恢复了!
这是因为kafka集群默认提供了分区自动平衡机制,对应broker端参数为:auto.leader.reblance.auto.enable=true.在kafka服务端控制器会启动一个定时任务,来轮训所有broker节点,
计算每个broker节点的分区不平衡率( 非优先副本的Leader个数 / 分区总数 )是否超过leader.imblance.per.broker.percentage参数配置的比值,默认10%,如果计算超过此值则会启动
优先副本的选举动作,以求分区平衡。执行周期默认5分钟。
对于异常情况无法恢复的,可以通过 kafka-reassign-partitions.sh 调整分区和副本位置来解决。
除此之外,还有比如broker下线导致分区和副本迁移到其他机器、分区数过多导致Too Many Open Files问题等异常,也可以进行操作或模拟。
Kafka性能调优
吞吐量调优
吞吐量,也就是 TPS,是指 Broker 端进程或 Client 端应用程序每秒能处理的字节数或消息数,这个值越大越好。
Consumer端调优比较有限,采用Consumer端多线程编程消费消息或者设置fetch.min.bytes让broker端在返回Consumer端数据时多累积一些,默认1字节就返回给Consumer。
以下主要针对Producer和Broker端调优。
Producer端调优
Producer发送消息分为两个阶段:缓存消息和发送消息。
缓存消息阶段:是将消息发送到内存中的缓冲区,而发送消息涉及网络 I/O 传输将消息发送到broker。
- batch.size:增大消息批次大小,默认16KB,增大消息批次可以累积更多的消息一次性批量发出去,增加吞吐量。
- linger.ms:增大批次缓存时间,缓存消息的停留时间,默认值为0,消息立马发送出去,没有延时。适当调大此值可以增加发送出去的总消息数。
- compression.type:启用消息压缩,比如lz4或zstd。
- acks:设置acks=0或者1,不需要所有的broker都同步消息副本成功,提高响应速度。
- buffer.memory:多线程共用producer实例时,增大此参数确保缓冲区够用。
Broker端调优
- num.replica.fetchers:消息同步时,Follower副本使用多少个线程拉取Leader副本消息,默认1个线程。
- 避免FULLGC:调优GC参数避免FULL GC导致的STW。
延时调优
延时表示从 Producer 端发送消息到 Broker 端持久化完成之间的时间间隔,或者从 Producer 发送消息到 Consumer 成功消费该消息的总时长,这个值越小越好。
- Broker端:减小延时只要Follower副本同步Leader副本速度块就可以满足要求,主要增加num.replica.fetchers参数多线程同步。
- Producer端:生产者减小延时,那就是生产完立马发出去,设置linger.ms=0不要停留,而且不要启用压缩消息,也不要设置acks=all,可以设置为0或者1.
- Consumer端:保持fetch.min.bytes=1字节就可以,broker’有消息立马可以被获取到缩短延时。
其他的调优比如调大ulimit -n文件描述符大小避免Too many open files,还有JVM调优避免内存溢出,以及选用合适垃圾回收器增强GC效率。
Kafka版本变更列表
- 0.7 基础消息队列功能
- 0.8 引入副本功能
- 0.8.2.0 引入新版本 Producer API
- 0.9.0.0 增加了基础的安全认证 / 权限功能,同时使用 Java 重写了新版本消费者 API,另外还引入了 Kafka Connect 组件用于实现高性能的数据抽取.
- 0.10.0.0 引入了 Kafka Streams。
- 0.10.1 和 0.10.2 主要功能变更都是在 Kafka Streams 组件上.
- 0.11.0.0 引入了两个重量级的功能变更:
- 提供幂等性 Producer API 以及事务(Transaction) API;
- 对 Kafka 消息格式做了重构。
- 0.11.0.3 这个版本的消息引擎功能已经非常完善了。
- 1.0 和 2.0 这两个大版本主要还是 Kafka Streams 的各种改进,在消息引擎方面并未引入太多的重大功能特性。
- 作者:Episkey
- 链接:https://episkey.top/article/llKafka
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。