零碎
- 每个
topic
是分区的,消息按照topic
分类 - 一个
partition
同时只能被同一个consumer group
中的某一个consumer
消费 topic
是逻辑上概念,partition
是物理概念- 一般修改
kafka
的config
配置文件server.properties
broker.id
:唯一整数delete.topic.enable=true
:能否删除主题log.dirs
:运行日志存储地址zookeeper.connect
:zk
集群地址
- 创建
topic
后,会在配置的log.dirs
下生成文件名为[topicName]-[partitionName]
的几个文件夹,用来存放数据00000000000000000000.log
:配置项为log.segment.bytes
,只存储数据,默认存7天,默认大小1G,超过了就会重新创建,其命名以当前segment
的第一条消息的offset
命名00000000000000000000.index
:存储相应log
的索引与偏移量、大小等,之后去log
中迅速查找消息00000000000000000000.timeindex
:leader-epoch-checkpoint
:
- 每个
partition
中维护一个从头开始的offset
,仅区内有序,不一定全局有序
生产者
1. 分区策略
- 指明
partition
,直接使用其值 - 未指明
partition
但有key
值,将key
的hash
值与topic
的partition
数取余,作为partition
数 - 未指明
partition
也没有key
值,第一次调用时生成一个随机整数的绝对值(之后再其上自增),将其值与topic
的可用partition
数取余,作为partition
数,这种方式称为round-robin 算法
(轮询算法)
2. 数据可靠保证(ISR)
topic
的每个partition
的leader
收到生产者发送的数据后,会向生产者发送ack
,生产者收到后会进行下一轮发送,否则会重新发送
2.1. 副本数据同步策略(发送 ack 时机)
策略 | 优点 | 缺点 |
---|---|---|
半数以上完成 | 延迟低 | 选举新leader 时,容忍n 台节点故障,需要2n+1 个副本 |
全部完成 | 选举新leader 时,容忍n 台节点故障,需要n+1 个副本 |
延迟高 |
2.2. kafka
的同步策略(全部完成的升级版)
ISR(in-sysc replica set)
,由leader
维护,意为与leader
保持同步的的follower
集合,集合中的follower
完成数据同步后,则会发送ack
。当其长时间未向leader
同步数据,则将该follower
踢出ISR
。时间阈值配置项为replica.lag.time.max.ms
。
2.3. ack
应答机制
对于不太重要的数据,允许少量丢失,所以没必要等待ISR
全部接收成功,此时提供了三种可靠性级别,即ack
参数
0
:生产者不等待broker
的ack
,延迟最低。broker
一接收到还没有落盘就返回,broker
故障时可能丢失数据1
:生产者等待broker
的ack
,partition
的leader
落盘成功后返回ack
。如果在follower
同步成功前leader
故障,可能丢失数据-1(all)
:生产者等待broker
的ack
,并且在follower
同步成功前后返回ack
。如果在follower
同步成功后,broker
发送ack
前,leader
故障,则会造成重复数据
2.4. 副本数据一致性
LEO(Log End Offset)
:每个副本中最后一个(即最大的)offset
HW(High Watermark)
:指消费者所能见到的最大的offset
,ISR
中最小的LEO
,HW
之前的数据才对消费者可见
2.4.1. follower
故障
follower
故障后,会被临时踢出ISR
,恢复后会读取本地磁盘的HW
,并将高于部分截掉,从HW
重新开始同步,等到其LEO
大于等于该partition
的HW
时,重新加入ISR
2.4.2. leader
故障
leader
故障后,会选举出新的leader
,其余follower
会将各自高于HW
的部分截掉,从新的leader
开始同步
3. Exactly-Once
3.1. 几个概念
At Least Once
:至少一次,即ack=-1
时,此时不会丢数据,但可能重复数据At Most Once
:只多一次,即ack=0
时,此时保证每条消息只会被发送一次,不会重复数据,但可能会丢数据幂等性
:无论发送多少次重复数据,都只会保留一条。配置项为enable.idompotence=true
Exactly-Once
:等于At Least Once
+幂等性
3.2. 语义实现
开启幂等性的生产者初始化时会生成一个PID
,发往同一个partition
的消息会附带sequence number
,而broker
端会对<PID,partition,sequence number>
做缓存,相同主键的消息只会持久化一条。但是由于PID
重启生产者会变化,不同的partition
也有不同主键,所以无法做到跨分区跨回话的Exactly-Once
消费者
1. 消费方式
pull
模式,优点是适应了消费能力,缺点是如果没有数据会陷入循环中。可使用timeout
,消费者会等待一段时间才返回
2. 分区分配策略
2.1. RoundRobin
消费者组的所有消费者,必须订阅相同的主题时,其将所有订阅的主题看成一个整体,按照hash
值排序,按照顺序依次分配
2.2. Range
按照相同的topic
分别分组,之后在按顺序分配
3. offset
消费者组+topic
+partition
,唯一确定一个offset
3.1. 保存位置
- 0.9之前:默认将
offset
保存在Zookeeper
- 0.9开始:默认将
offset
保存在Kafka
一个内置topic
,叫__consumer_offsets
中
3.2. 如何查看
高效读写
- 顺序写磁盘
- 零拷贝
事务
为了实现跨分区会话事务,引入了TransactionID
,其与PID
绑定,当生产者重启后,可以通过它找回原先的PID
__consumer_offsets
由于 zookeeper
不擅长大批量的频繁写操作,kafka_1.0.2
将 consumer
的位移信息保存在内部的 topic
中,即 __consumer_offsets
,并提供了 kafka_consumer_groups.sh
脚本供用户查看 consumer
信息