深入理解 Kafka
# 基本概念
一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer,以及一个 ZooKeeper 集群。其中 ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器的选举等操作的。Producer 将消息发送到 Broker,Broker 负责将收到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。对于 Kafka 而言,Broker 可以简单地看作一个独立的 Kafka 服务节点或 Kafka 服务实例。大多数情况下也可以将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个 Kafka 实例。一个或多个 Broker 组成了一个 Kafka 集群。 在 Kafka 中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka 中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到 Kafka 集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。 主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。
# 整体架构
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。
- 在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。
- Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。
RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory
配置,默认值为 33554432B,即 32MB。如果生产者生产消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer 的 send()
方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms
的配置,此参数的默认值为 60000,即 60 秒。
- ProducerRecord(生产者生产的消息)
- ProducerBatch 存放多个 ProducerRecord
- ProducerBatch 转换为 Kafka 格式的请求 Request
- InFightRequests 缓存已经发送还没有收到响应的的请求,先缓存请求再发送到 Kafka,每个连接默认只能存放 5 个,通过数量就能知道 node 的负载和网络连接情况。
# 消息转化处理
需要最消息进行追加或者修改,使用拦截器
spring:
kafka:
bootstrap-servers:
- 127.0.0.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
interceptor:
classes: com.starry.guide.config.MyProducerInterceptor
实现接口
public class MyProducerInterceptor implements ProducerInterceptor<String, Object> {
@Override
public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {
System.out.println("进入拦截器");
Object value = record.value();
if (record.value() instanceof String) {
// 自定义拦截逻辑
value = (String) record.value() + new Date();
}
return new ProducerRecord<>(
record.topic(),
record.partition(),
record.timestamp(),
record.key(),
value,
record.headers()
);
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
- KafkaProducer 在将消息序列化和计算分区之前会调用生产者拦截器的
onSend()
方法来对消息进行相应的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic、key 和 partition 等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改 key 不仅会影响分区的计算,同样会影响 broker 端日志压缩(Log Compaction)的功能。 - KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的
onAcknowledgement()
方法,优先于用户设定的Callback
之前执行。这个方法运行在 Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。 close()
方法主要用于在关闭拦截器时执行一些资源的清理工作。
测试
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Test
void contextLoads() {
for (int i = 0; i < 3; i++) {
kafkaTemplate.send(new ProducerRecord<>("test-topic", "key", "value"));
}
}
# 自定义序列化
spring:
kafka:
bootstrap-servers:
- 127.0.0.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.starry.guide.config.MySerializer
properties:
interceptor:
classes: com.starry.guide.config.MyProducerInterceptor
public class MySerializer implements Serializer<Order> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, Order data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new RuntimeException("Error serializing object", e);
}
}
}
# 自定义分区器
默认分区器(DefaultPartitioner)
- 没指定 key 使用黏性分区算法
- 如果 key 在 cache 中直接使用对应的分区
- 如果 key 不在 cache 中从可用的分区中随机选一个
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
- 指定 key 就使用哈希取模的方式
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
spring:
kafka:
bootstrap-servers:
- 127.0.0.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.starry.guide.config.MySerializer
properties:
interceptor:
classes: com.starry.guide.config.MyProducerInterceptor
partitioner:
class: com.starry.guide.config.MyPartitioner
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
System.out.println("自定义分区");
if (null == key) {
return 0;
}
if (String.valueOf(key).startsWith("key-1")) {
return 1;
}
Integer partitionCount = cluster.partitionCountForTopic(topic);
return Utils.toPositive(Utils.murmur2(keyBytes)) % partitionCount;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
# 消息回调
- 拦截器的
onAcknowledgement
方法可以知道消息是否发送成功,但是是在 IO 线程,如果逻辑复杂,影响发消息的速度。 send
方法返回一个Feature
通过get
方法可以获取结果,但是get
的阻塞的。- 可以指定一个
callback
,消息回调来处理,异步的。
public class MyCallback implements ListenableFutureCallback<SendResult<String, Object>> {
@Override
public void onFailure(Throwable ex) {
if (ex instanceof KafkaProducerException) {
ProducerRecord<Object, Object> record = ((KafkaProducerException) ex).getFailedProducerRecord();
System.out.println("record.key() = " + record.key());
System.out.println("record.value() = " + record.value());
System.out.println("record.partition() = " + record.partition());
System.out.println("record.topic() = " + record.topic());
ex.printStackTrace();
}
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("success" + result.getProducerRecord().value());
}
}
private final ListenableFutureCallback<SendResult<String, Object>> callback = new MyCallback();
@Test
void contextLoads() {
for (int i = 0; i < 3; i++) {
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>("test-topic1", "key-" + i, new Order(i, StateEnum.ORDER_WAIT_SEND)));
future.addCallback(callback);
}
}
# acks 参数
这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks 是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。acks 参数有 3 种类型的值(都是字符串类型)。
- acks=1。默认值即为 1。生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务端的成功响应。
- acks=0。生产者发送消息之后不需要等待任何服务端的响应。
- acks=-1 或 acks=all。生产者在消息发送之后,需要等待所有副本都成功写入消息之后才能够收到来自服务端的成功响应。
spring:
kafka:
bootstrap-servers:
- 127.0.0.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.starry.guide.config.MySerializer
properties:
interceptor:
classes: com.starry.guide.config.MyProducerInterceptor
partitioner:
class: com.starry.guide.config.MyPartitioner
acks: all
# 消息批量发送
生产者发送的消息先进行缓存,缓存达到一定大小或者指定时间后才发送(任意一个条件满足即可)。进而减少网络传输的资源消耗以提升性能。
- 默认缓存的内存大小为 32MB,
buffer.memory
参数控制。 如果 32 MB 不够用,扩大大概率还是不够用,高并发下就是晚几秒报错/阻塞,如何解决- 增大网络带宽
- 生产者降低生产速度
- 增加节点
- 每个批次的大小默认为 16KB,
batch.size
参数控制。只有batch.size
大小的内存会进行复用,超过这个大小的内存使用完会进行释放。通过java.io.ByteBuffer
实现消息内存的创建和释放。 - 如果缓存到达上限,生产者的
send()
方法要么阻塞,要么抛出异常,阻塞时长由max.block.ms
控制,默认 60 秒。 linger.ms
参数控制生产者发送的消息需要等待多久才会发送到 Kafka,默认 0ms。即默认来了消息就会发送到 Kafka,即瞬时能承受的并发消息大小为 16KB。可以根据并发情况适当增大批次大小。
# 大消息如何处理
什么是大消息?
- 超过 16KB 的消息可以算是大消息了,此时消息发送到消息累加器需要额外开辟内存(Java.io.ByteBuffer),超过一个批次大小了,直接发送到 Kafka,处理完,需要进行空间的释放(由于不是默认 16KB)。
超过 1MB 的消息如何处理?
- 修改客户端参数
max.request.size
,最大消息消息大小,默认 1MB。 - 修改 broker 端的参数
message.max.bytes
,broker 能处理的消息大小。 - 修改消费者的参数
fetch.max.bytes
,消费者处理消息大小限制。
超过 10MB 的消息如何处理?
- 不建议使用 Kafka,Kafka 适合处理小型消息,可以使用 nas、文件传输协议来进行处理。
- 进行消息压缩,
compression.type
参数指定消息压缩格式,默认none
,可选 gzip、snappy、lz4。确保消息没有超过消息大小限制。
# retry 机制
消息发送失败可以使用 callback
回调,将消息保存告警,后续进行处理。
也可以使用 retry 机制,消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader 副本的选举等,可以进行重试。消息超过消息大小 max.request.size
等错误是不会进行重试的。
retries
重试次数,默认 0 次。重试次数到上限还是失败,就抛出异常。retry.backoff.ms
每次重试间隔,默认 100ms,尽量设置大于 leader 副本的选举时间,防止过早抛出异常。
# 顺序消息
消息发送到同一个主题同一分区。 retry 机制下如何保证顺序消息?
生产者发送 A、B、C 三条消息,发送到 Kafka 时,A 发送失败,进行重试,B 和 C 发送成功,A 重试成功,此时顺序为 BCA。
max.in.flight.requests.per.connection
参数来限制缓存连接未响应的请求数量。默认为 5,即可以允许 5 个连接请求未响应,就会出现上面的情况。需要设置为 1 来保证顺序发送消息,不进行缓存。
# 消费者组
消费者(Consumer)负责订阅 Kafka 中的主题(Topic),并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组(ConsumerGroup)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。 一个主题 4 个分区
- 消费者组 A 有 4 个消费者,4 个消费者正好对应 4 个分区。消费者数量和分区数量最好保持一致,这样可以达到最大的消费并行。
- 消费者组 B 有 2 个消费者,每个消费者对应 2 个分区。
- 每个消费者组之间互不影响,一个消息会被多个消费者组消费,每个消息只会被消费者组中的一个消费者消费。
spring:
kafka:
bootstrap-servers:
- 127.0.0.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.starry.guide.config.MySerializer
consumer:
bootstrap-servers:
- 127.0.0.1:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.starry.guide.config.MyDeserializer
@Configuration
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "myGroupA")
public void consumerA(ConsumerRecord<String, Order> record) {
System.out.println("KafkaUtils.getConsumerGroupId() = " + KafkaUtils.getConsumerGroupId());
System.out.println("record.key() = " + record.key());
System.out.println("record.value() = " + record.value());
}
@KafkaListener(topics = "test-topic", groupId = "myGroupB")
public void consumerB(ConsumerRecord<String, Order> record) {
System.out.println("KafkaUtils.getConsumerGroupId() = " + KafkaUtils.getConsumerGroupId());
System.out.println("record.key() = " + record.key());
System.out.println("record.value() = " + record.value());
}
}
# 再均衡
再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。不过在再均衡发生期间,消费组内的消费者是无法读取消息的。也就是说,在再均衡发生期间的这一小段时间内,消费组会变得不可用。另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费。一般情况下,应尽量避免不必要的再均衡的发生。
# 反序列化
spring:
kafka:
bootstrap-servers:
- 127.0.0.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.starry.guide.config.MySerializer
consumer:
bootstrap-servers:
- 127.0.0.1:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.starry.guide.config.MyDeserializer
public class MyDeserializer implements Deserializer<Order> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public Order deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, Order.class);
} catch (Exception e) {
throw new RuntimeException("Error deserializing object", e);
}
}
}
# 消费者的 poll 有什么作用
- 获取消息,支持阻塞
- 分区再均衡(poll 就是从分区获取消息,离分区关系最近)
- 群组协调(加入新的消费者,进行 joingroup 操作)
- 发送心跳(维护消费者组)
# 消费者拦截器
spring:
kafka:
bootstrap-servers:
- 127.0.0.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.starry.guide.config.MySerializer
consumer:
bootstrap-servers:
- 127.0.0.1:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.starry.guide.config.MyDeserializer
properties:
interceptor:
classes: com.starry.guide.config.MyConsumerInterceptor
public class MyConsumerInterceptor implements ConsumerInterceptor<String, Order> {
@Override
public ConsumerRecords<String, Order> onConsume(ConsumerRecords<String, Order> records) {
System.out.println("进入消费者拦截器");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
- onConsume:
KafkaConsumer.poll()
方法返回对象之前调用。- 可以进行消息幂等、过期拦截(消息中携带时间戳)处理。
- onCommit:提交 offset 后调用。
# 消费者提交位移量
- 自动提交:可能会重复消息和丢失消息。默认间隔提价,还没处理完间隔到了也提交,就丢消息了。
enable.auto.commit
参数控制是否自动提交。auto.commint.interver.ms
参数控制多久提交一次 offset,默认 5s。 - 手动提交:
- 同步提交:提交偏移量时阻塞,降低性能。安全高。
- 异步提交:异步提交偏移量,可能会失败,就可能重复消费。
异步提交 X 偏移量,异步失败重试,消费者继续消费,异步提交到 X+Y 成功。之前提交到 X 的重试成功了。此时如果发送异常(或者再均衡),恢复后的消费者就会从 X 开始消费。重复消费。如何解决? 程序内维护一个递增序号来维护异步提交的顺序,每次提交记录序号。重试时如果当前提交偏移量小于序号就不管(有更大的偏移量提交了),偏移量等于序号的话进行重试。
# 分区分配策略
Kafka 提供了消费者客户端参数 partition.assignment.strategy
来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为 org.apache.kafka.clients.consumer.RangeAssignor
,即采用 RangeAssignor
分配策略。除此之外,Kafka 还提供了另外两种分配策略:RoundRobinAssignor
和 StickyAssignor
。
# RangeAssignor
RangeAssignor 分配策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个主题,RangeAssignor 策略会将消费组内所有订阅这个主题的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。 分配方式:n = 分区数 / 消费者数量,m = 分区数 % 消费者数量,那么前 m 个消费者每个分配 n+1 个分区,后面的(消费者数量 - m)个消费者每个分配 n 个分区。 假设消费组内有 2 个消费者 C0 和 C1,都订阅了主题 t0 和 t1,并且每个主题都有 4 个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p2、t1p3。最终的分配结果为:
消费者 C0 | t0p0 | t0p1 | t1p0 | t1p1 |
---|---|---|---|---|
消费者 C1 | t0p2 | t0p3 | t1p2 | t1p3 |
假设上面例子中 2 个主题都只有 3 个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:
消费者 C0 | t0p0 | t0p1 | t1p0 | t1p1 |
---|---|---|---|---|
消费者 C1 | t0p2 | t1p2 |
可以明显地看到这样的分配并不均匀,如果将类似的情形扩大,则有可能出现部分消费者过载的情况。
# RoundRobinAssignor
RoundRobinAssignor 分配策略的原理是将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。RoundRobinAssignor 分配策略对应的 partition.assignment.strategy
参数值为 org.apache.kafka.clients.consumer.RoundRobinAssignor
。
假设消费组中有 2 个消费者 C0 和 C1,都订阅了主题 t0 和 t1,并且每个主题都有 3 个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:
消费者 C0 | t0p0 | t0p2 | t1p1 |
---|---|---|---|
消费者 C1 | t0p1 | t1p0 | t1p2 |
假设消费组内有 3 个消费者(C0、C1 和 C2),它们共订阅了 3 个主题(t0、t1、t2),这 3 个主题分别有 1、2、3 个分区,即整个消费组订阅了 t0p0、t1p0、t1p1、t2p0、t2p1、t2p2 这 6 个分区。具体而言,消费者 C0订阅的是主题 t0,消费者 C1 订阅的是主题 t0 和 t1,消费者 C2 订阅的是主题 t0、t1 和 t2,那么最终的分配结果为:
消费者 C0 | t0p0 | |||
---|---|---|---|---|
消费者 C1 | t1p0 | |||
消费者 C2 | t1p1 | t2p0 | t2p1 | t2p2 |
# StickyAssignor分配策略
“sticky”这个单词可以翻译为“黏性的”,Kafka 从 0.11.x 版本开始引入这种分配策略,它主要有两个目的:
- 分区的分配要尽可能均匀。
- 分区的分配尽可能与上次分配的保持相同。
当两者发生冲突时,第一个目标优先于第二个目标。 假设消费组内有 3 个消费者(C0、C1 和 C2),它们都订阅了 4 个主题(t0、t1、t2、t3),并且每个主题有 2 个分区。也就是说,整个消费组订阅了 t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1 这 8 个分区。最终的分配结果如下:
消费者 C0 | t0p0 | t1p1 | t3p0 |
---|---|---|---|
消费者 C1 | t0p1 | t2p0 | t3p1 |
消费者 C2 | t1p0 | t2p1 |
假设此时消费者 C1 脱离了消费组,那么消费组就会执行再均衡操作,进而消费分区会重新分配。如果采用 RoundRobinAssignor 分配策略,那么此时的分配结果如下:
消费者 C0 | t0p0 | t1p0 | t2p0 | t3p0 |
---|---|---|---|---|
消费者 C2 | t0p1 | t1p1 | t2p1 | t3p1 |
如果此时使用的是 StickyAssignor 分配策略,那么分配结果为:
消费者 C0 | t0p0 | t1p1 | t3p0 | t2p0 |
---|---|---|---|---|
消费者 C2 | t1p0 | t2p1 | t0p1 | t3p1 |
可以看到分配结果中保留了上一次分配中对消费者 C0 和 C2 的所有分配结果,并将原来消费者 C1 的“负担”分配给了剩余的两个消费者 C0 和 C2,最终 C0 和 C2 的分配还保持了均衡。
同样消费组内有 3 个消费者(C0、C1 和 C2),集群中有 3 个主题(t0、t1 和 t2),这 3 个主题分别有 1、2、3 个分区。也就是说,集群中有 t0p0、t1p0、t1p1、t2p0、t2p1、t2p2 这 6 个分区。消费者 C0 订阅了主题 t0,消费者 C1 订阅了主题 t0 和 t1,消费者 C2 订阅了主题 t0、t1 和 t2。 如果此时采用 RoundRobinAssignor 分配策略,那么最终的分配结果为:
消费者 C0 | t0p0 | |||
---|---|---|---|---|
消费者 C1 | t1p0 | |||
消费者 C2 | t1p1 | t2p0 | t2p1 | t2p2 |
如果此时采用的是 StickyAssignor 分配策略,那么最终的分配结果为:
消费者 C0 | t0p0 | ||
---|---|---|---|
消费者 C1 | t1p0 | t1p1 | |
消费者 C2 | t2p0 | t2p1 | t2p2 |
可以看到这才是一个最优解(消费者 C0 没有订阅主题 t1 和 t2,所以不能分配主题 t1 和 t2 中的任何分区给它,对于消费者 C1 也可同理推断)。
# 再均衡原理
每个消费组的子集在服务端对应一个 GroupCoordinator 对其进行管理,GroupCoordinator 是 Kafka 服务端中用于管理消费组的组件。而消费者客户端中的 ConsumerCoordinator 组件负责与 GroupCoordinator 进行交互。 ConsumerCoordinator 与 GroupCoordinator 之间最重要的职责就是负责执行消费者再均衡的操作,包括前分区分配的工作也是在再均衡期间完成的。就目前而言,一共有如下几种情形会触发再均衡的操作:
- 有新的消费者加入消费组。
- 有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的 GC、网络延迟导致消费者长时间未向 GroupCoordinator 发送心跳等情况时,GroupCoordinator 会认为消费者已经下线。
- 有消费者主动退出消费组(发送 LeaveGroupRequest 请求)。比如客户端调用了
unsubscrible()
方法取消对某些主题的订阅。 - 消费组所对应的 GroupCoorinator 节点发生了变更。
- 消费组内所订阅的任一主题或者主题的分区数量发生变化。
当有消费者加入消费组时,消费者、消费组及组协调器之间会经历一下几个阶段。
- 第一阶段(FIND_COORDINATOR) 消费者需要确定它所属的消费组对应的 GroupCoordinator 所在的 broker,并创建与该 broker 相互通信的网络连接。
- 第二阶段(JOIN_GROUP) 在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。
- 第三阶段(SYNC_GROUP) 消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者。
- 第四阶段(HEARTBEAT) 进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。 ## 第一阶段(FIND_COORDINATOR) 消费者需要确定它所属的消费组对应的 GroupCoordinator 所在的 broker,并创建与该 broker 相互通信的网络连接。如果消费者已经保存了与消费组对应的 GroupCoordinator 节点的信息,并且与它之间的网络连接是正常的,那么就可以进入第二阶段。否则,就需要向集群中的负载最小的节点(leastLoadedNode)发送 FindCoordinatorRequest 请求来查找对应的 GroupCoordinator。
FindCoordinatorRequest 请求体中只有两个域(Field):coordinator_key 和 coordinator_type。
- coordinator_key 在这里就是消费组的名称,即 groupId。
- coordinator_type 置为0。
Kafka 在收到 FindCoordinatorRequest 请求之后,会根据 coordinator_key(也就是 groupId)查找对应的 GroupCoordinator 节点,如果找到对应的 GroupCoordinator 则会返回其相对应的 node_id、host 和 port 信息。
具体查找 GroupCoordinator 的方式是先根据消费组 groupId
的哈希值计算 __consumer_offsets
中的分区编号:
Utils.abs(groupid.hashCode) % groupMetadataTopicPartitionCount
- groupId.hashCode 就是使用 Java 中 String 类的
hashCode()
方法获得的。 - groupMetadataTopicPartitionCount 为主题
__consumer_offsets
的分区个数,这个可以通过 broker 端参数offsets.topic.num.partitions
来配置,默认值为 50。
找到对应的 __consumer_offsets
中的分区之后,再寻找此分区 leader 副本所在的 broker 节点,该 broker 节点即为这个 groupId 所对应的 GroupCoordinator 节点。
消费者 groupId 最终的分区分配方案及组内消费者所提交的消费位移信息都会发送给此分区 leader 副本所在的 broker 节点,让此 broker 节点既扮演 GroupCoordinator 的角色,又扮演保存分区分配方案和组内消费者位移的角色,这样可以省去很多不必要的中间轮转所带来的开销。
# 第二阶段(JOIN_GROUP)
在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。 JoinGroupRequest 的结构包含多个域:
- group_id 就是消费组的 id,通常也表示为 groupId。
- session_timout 对应消费端参数
session.timeout.ms
, 默认值为10000, 即 10 秒。GroupCoordinator 超过 session_timeout 指定的时间内没有收到心跳报文则认为此消费者已经下线。 - rebalance_timeout 对应消费端参数
max.poll.interval.ms
,默认值为 300000,即 5 分钟。表示当消费组再平衡的时候,GroupCoordinator 等待各个消费者重新加入的最长等待时间。 - member_id 表示 GroupCoordinator 分配给消费者的 id 标识。消费者第一次发送 JoinGroupRequest 请求的时候此字段设置为 null。
- protocol_type 表示消费组实现的协议,对于消费者而言此字段值为“consumer”。
- group_protocols 域为数组类型,其中可以囊括多个分区分配策略,这个主要取决于消费者客户端参数
partition.assignment.strategy
的配置。如果配置了多种策略, 那么 JoinGroupRequest 中就会包含多 个protocol_name
和protocol_metadata
。
服务端在收到 JoinGroupRequest 请求后会交由 GroupCoordinator 来进行处理。GroupCoordinator 首先会对 JoinGroupRequest 请求做合法性校验,比如 group_id 是否为空、当前 broker 节点是否是请求的消费者组所对应的组协调器、rebalance_timeout 的值是否在合理的范围之内。如果消费者是第一次请求加入消费组,那么 JoinGroupRequest 请求中的 member_id 值为 null,即没有它自身的唯一标志,此时组协调器负责为此消费者生成一个 member_id。
String memberid = clientid + "-" + UUID.randomUUID().toString();
# 选举消费组的 leader
GroupCoordinator 需要为消费组内的消费者选举出一个消费组的 leader ,分两种情况分析。
- 如果消费组内还没有 leader ,那么第一个加入消费组的消费者即为消费组的 leader 。
- 如果某一时刻 leader 消费者由于某些原因退出了消费组, 那么会重新选举一个新的 leader。
//scala code.
private val members = new mutable.HashMap[String, MemberMetadata]
var leaderId = members.keys.head
在 GroupCoordinator 中消费者的信息是以 HashMap 的形式存储的,其中 key 为消费者的 member_id,而 value是消费者相关的元数据信息。leaderId 表示 leader 消费者的 member_id,它的取值为 HashMap 中的第一个键值对的 key,这种选举的方式基本上和随机无异。
# 选举分区分配策略
每个消费者都可以设置自己的分区分配策略,这个分区分配的选举并非由 leader 消费者决定,而是根据消费组内的各个消费者投票来决定的。
- 收集各个消费者支持的所有分配策略,组成候选集 candidates。
- 每个消费者从候选集 candidates 中找出第一个自身支持的策略,为这个策略投上一票。
- 计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。
如果有消费者并不支持选出的分配策略,那么就会报出异常 IllegalArgumentException:Member does not support protocol。需要注意的是,这里所说的“消费者所支持的分配策略”是指 partition.assignment.strategy
参数配置的策略,如果这个参数值只配置了 RangeAssignor,那么这个消费者客户端只支持 RangeAssignor 分配策略,而不是消费者客户端代码中实现的 3 种分配策略及可能的自定义分配策略。
在此之后,Kafka 服务端就要发送 JoinGroupResponse 响应给各个消费者,leader 消费者和其他普通消费者收到的响应内容并不相同。 JoinGroupResponse 包含了多个域:
- generation_id 用来标识当前消费组的年代信息,避免受到过期请求的影响。
- leader_id表示消费组leader消费者的member_id。
- members 为数组类型,其中包含各个成员信息。 Kafka发送给普通消费者的 JoinGroupResponse 中的 members 内容为空,而只有 leader 消费者的JoinGroupResponse 中的 members 包含有效数据。member_metadata 为消费者的订阅信息。
# 第三阶段(SYNC_GROUP)
leader 消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者,此时 leader 消费者并不是直接和其余的普通消费者同步分配方案,而是通过 GroupCoordinator 这个“中间人”来负责转发同步分配方案的。在第三阶段,也就是同步阶段,各个消费者会向 GroupCoordinator 发送 SyncGroupRequest 请求来同步分配方案。
只有 leader 消费者发送的 SyncGroupRequest 请求中才包含具体的分区分配方案,这个分配方案保存在 group_assignment 中,而其余消费者发送的 SyncGroupRequest 请求中的 group_assignment 为空。
group_assignment 是一个数组类型,其中包含了各个消费者对应的具体分配方案:member_id 表示消费者的唯一标识,而 member_assignment 是与消费者对应的分配方案,它还可以做更具体的划分。
服务端在收到消费者发送的 SyncGroupRequest 请求之后会交由 GroupCoordinator 来负责具体的逻辑处理。 GroupCoordinator 同样会先对 SyncGroupRequest 请求做合法性校验,在此之后会将从 leader 消费者发送过来的分配方案提取出来,连同整个消费组的元数据信息一起存入Kafka 的 **__consumer_offsets**
主题中,最后发送响应给各个消费者以提供给各个消费者各自所属的分配方案。
# 第四阶段(HEARTBEAT)
进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交到了 GroupCoordinator,并且 GroupCoordinator 将其保存到了 Kafka 内部的 __consumer_offsets
主题中,此时消费者可以通过 OffsetFetchRequest 请求获取上次提交的消费位移并从此处继续消费。
消费者通过向 GroupCoordinator 发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区中的消息。心跳线程是一个独立的线程,可以在轮询消息的空档发送心跳。如果消费者停止发送心跳的时间足够长,则整个会话就被判定为过期,GroupCoordinator 也会认为这个消费者已经死亡,就会触发一次再均衡行为。消费者的心跳间隔时间由参数 heartbeat.interval.ms
指定,默认值为 3000,即 3 秒,这个参数必须比 session.timeout.ms
参数设定的值要小,一般情况下 heartbeat.interval.ms
的配置值不能超过 session.timeout.ms
配置值的 1/3。
# _consumer offsets 剖析
位移提交的内容最终会保存到 Kafka 的内部主题 __consumer_offsets 中,一般情况下,当集群中第一次有消费者消费消息时会自动创建主题 __consumer_offsets。
offsets.topic.replication.factor
为该主题分区的副本数,默认为 3;- 分区数可以通过
offsets.topic.num.partitions
参数设置,默认为 50。
图中展示了消费位移对应的消息内容格式,上面是消息的 key,下面是消息的 value。可以看到 key 和 value 中都包含了 version 字段,这个用来标识具体的 key 和 value 的版本信息,不同的版本对应的内容格式可能并不相同。就目前版本而言,key 和 value 的 version 值都为 1。 key 中除了 version 字段还有 group、topic、partition 字段,分别表示消费组的 groupId、主题名称和分区编号。虽然 key 中包含了 4 个字段,但最终确定这条消息所要存储的分区还是根据单独的 group 字段来计算的,这样就可以保证消费位移信息与消费组对应的 GroupCoordinator 处于同一个 broker 节点上,省去了中间轮转的开销,这一点与消费组的元数据信息的存储是一样的。 value 中包含了 5 个字段,除 version 字段外,其余的 offset、metadata、commit_timestamp、expire_timestamp 字段分别表示消费位移、自定义的元数据信息、位移提交到 Kafka 的时间戳、消费位移被判定为超时的时间戳。
与此同时,消费组的元数据信息也会存入 Kafka 的 __consumer_offsets 主题中。
- protocol_type:消费组实现的协议,这里的值为“consumer”。
- generation:标识当前消费组的年代信息,避免收到过期请求的影响。
- protocol:消费组选取的分区分配策略。
- leader:消费组的 leader 消费者的名称。
- members:数组类型,其中包含了消费组的各个消费者成员信息。
# 分区副本的分配规则
当 broker 端配置参数auto.create.topics.enable
设置为true
(默认值就是true
),那么以下情况将自动创建一个主题:
- 生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为
num.partitions
(默认值为1)和副本因子为default.replication.factor
(默认值为1)的主题。 - 当一个消费者开始从未知主题中读取消息时,或者任意一个客户端向未知主题发送元数据请求时,也会按照配置参数
num.partitions
和default.replication.factor
的值来创建相应的主题。
然而,这种自动创建主题的行为通常是不被期望的。除非有特殊的应用需求,否则不建议将auto.create.topics.enable
参数设置为true
,因为它会增加主题管理和维护的难度。
# 指定 broker 分区
这种方式根据分区号的数值大小按照从小到大的顺序进行排列,分区与分区之间用逗号 ,
隔开,分区内多个副本用冒号 :
隔开。并且在使用 replica-assignment
参数创建主题时不需要原本必备的partitions
和replication-factor
这两个参数。
- 逗号分隔的是分区,4 个分区
- 冒号分隔的是副本,每个分区 2 个副本
- 第1个分区的副本在 broker 2 和 broker 0上
- 第2个分区的副本在 broker 0 和 broker 1上
- 第3个分区的副本在 broker 1 和 broker 2上
- 第4个分区的副本在 broker 2 和 broker 1上
注意:
- 同一个分区内的副本不能有重复,比如 0:0,1,1
- 分区之间的副本数要相同,比如 0:1,0,1:0
- 不能跳过分区,比如 0:1,,1:0
# 指定机架信息
在创建主题时,如果使用了 replica-assignment
参数,那么就按照指定的方案来进行分区副本的创建;如果没有使用 replica-assignment
参数,那么就需要按照内部的逻辑来计算分配方案了。使用 kafka-topics.sh
脚本创建主题时的内部分配逻辑按照机架信息划分成两种策略:未指定机架信息和指定机架信息。如果集群中所有的 broker 节点都没有配置 broker.rack
参数,或者使用 disable-rack-aware
参数来创建主题,那么采用的就是未指定机架信息的分配策略,否则采用的就是指定机架信息的分配策略。
指定机架分配。kafka 服务就是一个 broker,我们可以再一台服务器上部署 kafka,那么这台服务器上就是一个 kafka 的 broker,我们可以再 broker 上配置 broker.rack="rack1"
来将我们的当前的 broker 定义到机架 rack1 上。如果另外一台服务器上我们也安装了一个 kafka,也指定 broker.rack="rack1"
,那么我们就可以说:rack1 机架上有 2 个 broker。
举例:比如我们有 3 个机架,分别为 rack1,rack2,rack3,每台机架上有 2 个 broker,brokerid 分别为 0 1 2 3 4 5。
- rack1: 0,1
- rack2: 2, 3
- rack3: 4, 5
在所有的 broker 都指定了
broker.rack
信息的前提下,我们才能够使用指定机架分配的策略。
满足以下任意一个条件的 broker 不能被添加到当前分区的副本列表之中:
- 如果此 broker 所在的机架中已经存在一个 broker 拥有该分区的副本,并且还有其他的机架中没有任何一个 broker 拥有该分区的副本。
- 如果此 broker 中已经拥有该分区的副本,并且还有其他 broker 中没有该分区的副本。
就是要尽量保证分别分配在不同的 broker 上
# Leader 副本的选举
分区使用多副本机制来提升可靠性,但只有 leader 副本对外提供读写服务,而 follower 副本只负责在内部进行消息的同步。如果一个分区的 leader 副本不可用,那么就意味着整个分区变得不可用,此时就需要 Kafka 从剩余的 follower 副本中挑选一个新的 leader 副本来继续对外提供服务。
在创建主题的时候,该主题的分区及副本会尽可能均匀地分布到 Kafka 集群的各个 broker 节点上,对应的 leader 副本的分配也比较均匀。 针对同一个分区而言,同一个 broker 节点中不可能出现它的多个副本,即 Kafka 集群的一个 broker 中最多只能有它的一个副本,我们可以将 leader 副本所在的 broker 节点叫作分区的 leader 节点,而 follower 副本所在的 broker 节点叫作分区的 follower 节点。 随着时间的更替,Kafka 集群的 broker 节点不可避免地会遇到宕机或崩溃的问题,当分区的 leader 节点发生故障时,其中一个 follower 节点就会成为新的 leader 节点,这样就会导致集群的负载不均衡,从而影响整体的健壮性和稳定性。当原来的 leader 节点恢复之后重新加入集群时,它只能成为一个新的 follower 节点而不再对外提供服务。比如我们将 brokerId 为 2 的节点重启,那么主题 topic-partitions 新的分布信息如下: 可以看到原本分区 1 的 leader 节点为 2,现在变成了 0,如此一来原本均衡的负载变成了失衡:节点 0 的负载最高,而节点 1 的负载最低。 概念:
- AR(All Replicas)表示当前分区的所有的副本集合
- 分区 0 的 AR就是 [1, 2 0]
- 分区 1 的 AR就是 [2, 0, 1]
- 分区 2 的 AR就是 [0, 1, 2]
- AR 包含 ISR,ISR 即 leader 副本 + 与 leader 副本数据同步的分区副本所在的节点
- OSR 仅包含与 leader 副本数据不同步的 follower 副本所在的节点
一旦 leader 节点下线,就从 ISR 中寻找第一个可用的节点成为 leader 副本。
# 重选 Leader 副本
kafka-perferred-replica-election.sh
脚本中提供了 path-to-json-file
参数来小批量地对部分分区执行优先副本的选举操作。通过 path-to-json-file
参数来指定一个 JSON 文件,这个 JSON 文件里保存需要执行优先副本选举的分区清单。
创建一个 JSON 文件,文件名假定为 election.json
。
执行命令
在生产环境中推荐使用配置文件的方式分批、手动的执行 leader 副本重选举,而不是自动重选举。重选举期间会占用大量磁盘和网络带宽,应该在业务低峰期进行。
# 分区重分配
Kafka 提供了 kafka-reassign-partitions.sh
脚本来执行分区重分配的工作,它可以在集群扩容、
broker 节点失效的场景下对分区进行迁移。
kafka-reassign-partitions.sh
脚本的使用分为 3 个步骤:
- 首先创建需要一个包含主题清单的 JSON 文件,
- 其次根据主题清单和 broker 节点清单生成一份重分配方案,
- 最后根据这份方案执行具体的重分配动作。
首先在一个由 3 个节点(broker 0、broker 1、broker 2)组成的集群中创建一个主题 topic-reassign,主题中
包含 4 个分区和 2 个副本:
可以观察到主题 topic-reassign 在 3 个节点中都有相应的分区副本分布。由于某种原因,我们想要下线 brokerId 为 1 的 broker 节点,在此之前,我们要做的就是将其上的分区副本迁移出去。使用 kafka-reassign-partitions.sh
脚本的第一步就是要创建一个 JSON 文件(文件的名称假定为 reassign.json ),文件内容为要进行分区重分配的主题清单。对主题 topic-reassign 而言,示例如下:
第二步就是根据这个 JSON 文件和指定所要分配的 broker 节点列表来生成一份候选的重分配方案,具体内容参考如下:
broker-list 用来指定所要分配的 broker 节点列表。
上面示例中打印出了两个 JSON 格式的内容。
- 第一个“Current partition replica assignment”所对应的 JSON 内容为当前的分区副本分配情况,在执行分区重分配的时候最好将这个内容保存起来,以备后续的回滚操作。
- 第二个“Proposed partition reassignment configuration”所对应的JSON 内容为重分配的候选方案,注意这里只是生成一份可行性的方案,并没有真正执行重分配的动作。生成的可行性方案的具体算法和创建主题时的一样,这里也包含了机架信息。
将第二个 JSON 内容保存在一个 JSON 文件中,假定这个文件的名称为 project.json
。
第三步执行具体的重分配动作,详细参考如下:
对于分区重分配而言,这里还有可选的第四步操作,即验证查看分区重分配的进度。只需将上面的 execute
替换为 verify
即可,具体示例如下:
分区重分配的基本原理是先通过控制器为每个分区添加新副本,新的副本将从分区的 leader 副本那里复制所有的数据。在复制完成之后,控制器将旧副本从副本清单里移除。
分区重分配对集群的性能有很大的影响,需要占用额外的资源,比如网络和磁盘。在实际操作中,我们将降低重分配的粒度,分成多个小批次来执行,以此来将负面的影响降到最低,这一点和优先副本的选举有异曲同工之妙。 还需要注意的是,如果要将某个 broker 下线,那么在执行分区重分配动作之前最好先关闭或重启 broker。这样这个 broker 就不再是任何分区的 leader 节点了,它的分区就可以被分配给集群中的其他 broker。这样可以减少 broker 间的流量复制,以此提升重分配的性能,以及减少对集群的影响。
# 分区副本复制限流
副本间的复制限流有两种实现方式:kafka-config.sh
脚本和 kafka-reassign-partitions.sh
脚本。
kafka-config.sh 脚本主要以动态配置的方式来达到限流的目的,在 broker 级别有两个与复制限流相关的配置参数:follower.replication.trottled.rate
和 leader.replication.throttled.rate
,前者用于设置 follower 副本复制的速度,后者用于设置 leader 副本传输的速度,它们的单位都是 B/s。通常情况下,两者的配置值是相同的。下面的示例中将 broker 1 中的 leader 副本和 follower 副本的复制速度限制在 1024B/s 之内,即 1KB/s:
kafka-reassign-partitions.sh 脚本本身也提供了限流的功能,只需一个 throttle
参数即可,具体用法如下:
复制限流,一定要在使用完成后恢复。
# 修改副本数量
编写 JSON 文件 执行命令
分区不支持减少,副本支持减少
# 控制器
在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器(KafkaController),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka 中的控制器选举工作依赖于 ZooKeeper,成功竞选为控制器的 broker 会在 ZooKeeper 中创建/controller 这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:
- 其中 version 在目前版本中固定为 1,
- brokerid 表示成为控制器的 broker 的 id 编号,
- timestamp 表示竞选成为控制器时的时间戳。
# 选举过程
在任意时刻,集群中有且仅有一个控制器。每个 broker 启动的时候会去尝试读取/controller
节点的 brokerid 的值,如果读取到 brokerid 的值不为 -1,则表示已经有其他 broker 节点成功竞选为控制器,所以当前 broker 就会放弃竞选;如果 ZooKeeper 中不存在/controller 节点,或者这个节点中的数据异常,那么就会尝试去创建 /controller
节点。当前 broker 去创建节点的时候,也有可能其他 broker 同时去尝试创建这个节点,只有创建成功的那个 broker 才会成为控制器,而创建失败的 broker 竞选失败。每个 broker 都会在内存中保存当前控制器的 brokerid 值,这个值可以标识为 activeControllerId
。
ZooKeeper 中还有一个与控制器有关的 /controller_epoch
节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的 controller_epoch
值。controller_epoch
用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器的纪元”。
controller_epoch
的初始值为 1,即集群中第一个控制器的纪元为 1,当控制器发生变更时,每选出一个新的控制器就将该字段值加 1。每个和控制器交互的请求都会携带 controller_epoch
这个字段,如果请求的 controller_epoch
值小于内存中的 controller_epoch
值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的 controller_epoch
值大于内存中的 controller_epoch
值,那么说明已经有新的控制器当选了。由此可见,Kafka 通过 controller_epoch
来保证控制器的唯一性,进而保证相关操作的一致性。
# 控制器职责
具备控制器身份的 broker 需要比其他普通的 broker 多一份职责,具体细节如下:
- 监听分区相关的变化。 为 ZooKeeper 中 的
/admin/reassign_partitions
节点注册PartitionReassignmentHandler
,用来处理分区重分配的动作。 为 ZooKeeper 中的/isr_change_notification
节点注册IsrChangeNotificetionHandler
,用来处理 ISR 集合变更的动作。为 ZooKeeper 中的/admin/preferred-replica-election
节点添加PreferredReplicaElectionHandler
,用来处理优先副本的选举动作。 - 监听主题相关的变化。为 ZooKeeper 中的
/brokers/topics
节点添加TopicChangeHandler
,用来处理主题增减的变化; 为 ZooKeeper 中 的/admin/delete_topics
节点添加TopicDeletionHandler
,用来处理删除主题的动作。 - 监听 broker 相关的变化。为 ZooKeeper 中的
/brokers/ids
节点添加BrokerChangeHandler
,用来处理 broker 增减的变化。 - 从 ZooKeeper 中读取获取当前所有与主题、分区及 broker 有关的信息并进行相应的管理。对所有主题对应 的 ZooKeeper 中的
/brokers/topics/<topic>
节点添加PartitionModificationsHandler
,用来监听主题中的分区分配变化。 - 启动并管理分区状态机和副本状态机。
- 更新集群的元数据信息。
- 如果参数
auto.leader.rebalance.enable
设置为 true,则还会开启一个名为auto-leader-rebalance-task
的定时任务来负责维护分区的优先副本的均衡。(这个参数,在生产上一定要设置为 false,我门不能开启自动的优选副本的均衡,这个参数如果开启,kafka 会每 5 分钟进行一次均衡率的计算,如果不均衡律超过 10% 就会触发一次自动的副本均衡。)
# 为什么使用磁盘存储还是很快
# 顺序写
Kafka 依赖于文件系统(更底层地来说就是磁盘)来存储和缓存消息。 有关测试结果表明,一个由 6 块 7200r/min 的 RAID-5 阵列组成的磁盘簇的线性(顺序)写入速度可以达到 600MB/s,而随机写入速度只有 100KB/s,两者性能相差 6000 倍。操作系统可以针对线性读写做深层次的优化,比如预读(read-ahead,提前将一个比较大的磁盘块读入内存)和后写(write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)技术。顺序写盘的速度不仅比随机写盘的速度快,而且也比随机写内存的速度快。 Kafka 在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作。
# 页缓存
页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。
当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页(page)是否在页缓存(pagecache)中,如果存在(命中)则直接返回数据,从而避免了对物理磁盘的 I/O 操作;如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性。
Kafka 中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的,但在 Kafka 中同样提供了同步刷盘及间断性强制刷盘(fsync)的功能,这些功能可以通过 log.flush.interval.messages
、log.flush.interval.ms
等参数来控制。同步刷盘可以提高消息的可靠性,防止由于机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。
# 零拷贝
所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。对 Linux 操作系统而言,零拷贝技术依赖于底层的sendfile()
方法实现。对应于 Java 语言,FileChannal.transferTo()
方法的底层实现就是 sendfile()
方法。
零拷贝技术通过 DMA(DirectMemoryAccess)技术将文件内容复制到内核模式下的 ReadBuffer 中。不过没有数据被复制到 SocketBuffer,相反只有包含数据的位置和长度的信息的文件描述符被加到 SocketBuffer 中。DMA 引擎直接将数据从内核模式中传递到网卡设备(协议引擎)。这里数据只经历了 2 次复制就从磁盘中传送出去了,并且上下文切换也变成了 2 次。零拷贝是针对内核模式而言的,数据在内核模式下实现了零拷贝。
# 日志文件
不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以“.txnindex”为后缀的事务索引文件)。
Log 对应了一个命名形式为<topic>-<partition>
的文件夹。举个例子,假设有一个名为“topic-log”的主题,此主题中具有 4 个分区,那么在实际物理存储上表现为“topic-log-0”“topic-log-1”“topic-log-2”“topic-log-3”这 4 个文件夹:
向 Log 中追加消息时是顺序写入的,只有最后一个 LogSegment 才能执行写入操作,在此之前所有的 LogSegment 都不能写入数据。为了方便描述,我们将最后一个 LogSegment 称为“activeSegment”,即表示当前活跃的日志分段。随着消息的不断写入,当 activeSegment 满足一定的条件时,就需要创建新的 activeSegment,之后追加的消息将写入新的 activeSegment。
为了便于消息的检索,每个 LogSegment 中的日志文件(以“.log”为文件后缀)都有对应的两个索引文件:偏移量索引文件(以“.index”为文件后缀)和时间戳索引文件(以“.timeindex”为文件后缀)。每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示 当前 LogSegment 中第一条消息的 offset。偏移量是一个 64 位的长整型数,日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为 20 位数字,没有达到的位数则用 0 填充。比如第一个 LogSegment 的基准偏移量为 0,对应的日志文件为 00000000000000000000.log。
第 2 个 LogSegment 对应的基准位移是 133,也说明了该 LogSegment 中的第一条消息的偏移量为 133,同时可以反映出第一个 LogSegment 中共有 133 条消息(偏移量从 0 至 132 的消息)。
注意每个 LogSegment 中不只包含“.log”“.index”“.timeindex”这 3 种文件,还可能包含“.deleted”“.cleaned”“.swap”等临时文件,以及可能的“.snapshot”“.txnindex”“leader-epoch-checkpoint”等文件。
从更加宏观的视角上看,Kafka 中的文件不只上面提及的这些文件,比如还有一些检查点文件,当一个 Kafka 服务第一次启动的时候,默认的根目录下就会创建以下 5 个文件:
# 日志分隔条件
日志分段文件切分包含以下几个条件,满足其一即可。
- 当前日志分段文件的大小超过了 broker 端参数
log.segment.bytes
配置的值。log.segment.bytes
参数的默认值为 1073741824,即 1GB。 - 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于
log.roll.ms
或log.roll.hours
参数配置的值。如果同时配置了log.roll.ms
和log.roll.hours
参数,那么log.roll.ms
的优先级高。默认情况下,只配置了log.roll.hours
参数,其值为 168,即 7 天。 - 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数
log.index.size.max.bytes
配置的值。log.index.size.max.bytes
的默认值为 10485760,即 10MB。 - 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于
Integer.MAX_VALUE
,即要追加的消息的偏移量不能转变为相对偏移量(offset-baseOffset > Integer.MAX_VALUE)。
# 日志索引
每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。
- 偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;
- 时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。
Kafka 中的索引文件以稀疏索引(sparseindex)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 log.index.interval.bytes
指定,默认值为 4096,即 4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小 log.index.interval.bytes
的值,对应地可以增加或缩小索引项的密度。
稀疏索引通过 MappedByteBuffer 将索引文件映射到内存中,以加快索引的查询速度。
- 偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。
- 时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。
# 偏移量索引
每个索引项占用 8 个字节,分为两个部分。
- relativeOffset:相对偏移量,表示消息相对于 baseOffset 的偏移量,占用 4 个字节,当前索引文件的文件名即为 baseOffset 的值。
- position:物理地址,也就是消息在日志分段文件中对应的物理位置,占用 4 个字节。
如果我们要查找偏移量为 23 的消息,那么应该怎么做呢? 首先通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即[22,656],然后从日志分段文件中的物理位置 656 开始顺序查找偏移量为 23 的消息。
# 时间戳索引
每个索引项占用 12 个字节,分为两个部分。
- timestamp:当前日志分段最大的时间戳。
- relativeOffset:时间戳所对应的消息的相对偏移量。
# 日志清理
Kafka 将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka 中每一个分区副本都对应一个 Log,而 Log 又可以分为多个日志分段,这样也便于日志的清理操作。Kafka 提供了两种日志清理策略。
- 日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。
- 日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本。
我们可以通过 broker 端参数 log.cleanup.policy
来设置日志清理策略,此参数的默认值为“delete”,即采用日志删除的清理策略。如果要采用日志压缩的清理策略,就需要将 log.cleanup.policy
设置为“compact”,并且还需要将 log.cleaner.enable
(默认值为 true)设定为 true。通过将 log.cleanup.policy
参数设置为“delete, compact”,还可以同时支持日志删除和日志压缩两种策略。日志清理的粒度可以控制到主题级别,比如与 log.cleanup.policy
对应的主题级别的参数为 cleanup.policy
。
# 日志删除
在 Kafka 的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过 broker 端参数 log.retention.check.interval.ms
来配置,默认值为 300000,即 5 分钟。当前日志分段的保留策略有 3 种:基于时间的保留策略、基于日志大小的保留策略和基于日志起始偏移量的保留策略。
- 默认情况下日志分段文件的保留时间为 7 天。
- broker 端参数
log.retention.bytes
来配置,默认值为-1,表示无穷大。
# 事务消息
KafkaProducer 类提供了与事务相关的以下 5 个方法:
- initTransactions():该方法用于初始化事务。在使用事务之前,需要调用此方法。它会为 KafkaProducer 实例分配必要的资源来支持事务。
- beginTransaction():该方法用于开始一个事务。在发送事务消息之前,需要先调用此方法来标识一个事务的开始。一旦调用了 beginTransaction() 方法,后续发送的消息将被视为属于同一事务。
- sendOffsetsToTransaction():该方法用于将消费者的位移提交到事务。当使用事务消息时,通常还需要对消费者的位移进行管理和提交。通过调用 sendOffsetsToTransaction() 方法,可以将消费者组内各分区的位移提交到当前事务中。
- commitTransaction():该方法用于提交事务。在所有的消息发送和其他操作完成后,可以调用 commitTransaction() 方法来提交整个事务。如果提交成功,则表示事务中的所有消息都成功处理;否则,事务将回滚并丢弃所有消息。
- abortTransaction():该方法用于回滚事务。在任何步骤出现错误或处理失败时,可以调用 abortTransaction() 方法来回滚事务。调用此方法将放弃事务中的所有消息,并释放相关的资源。
这些方法提供了在 Kafka 中使用事务的必要功能。它们使得能够将一组相关的消息作为一个原子单元进行处理,并提供了事务提交和回滚的机制,以确保消息的一致性和可靠性。
事务消息是对生产者发送消息到 broker 的保证,对消费者并不提供事务方法
# 时间轮
Kafka 中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。
- Kafka 中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。
- TimerTaskList 是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。
- 时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用 wheelSize 来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs×wheelSize 计算得出。
- 时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime 是 tickMs 的整数倍。currentTime 可以将整个时间轮划分为到期部分和未到期部分,currentTime 当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的 TimerTaskList 中的所有任务。