kafka学习笔记(二)

kafka学习笔记(二)

核心概念

Topic

在 kafka 中,topic 是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到 kafka 集群的消息都有
一个类别,这个类别就是Topic。物理上来说,不同的 topic 的消息是分开存储的,每个 topic 可以有多个生产者向它发送消息,也可以有多
个消费者去消费其中的消息。如图所示
topic

Partition

每个 topic 可以划分多个分区(每个 Topic 至少有一个分区),同一 topic 下的不同分区包含的消息是不同的。每个
消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka 通过 offset
保证消息在分区内的顺序,offset 的顺序性不跨分区,即 kafka只保证在同一个分区内的消息是有序的,同一个Topic的多个分区内的消息,kafka不保证其顺序性。
part

Topic&Partition 的存储

Partition 是以文件的形式存储在文件系统中,比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在
kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0~2,命名规则是<topic_name>-<partition_id>

1
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic firstTopic

Broker

一个单独的Kafka server就是一个Broker。Broker的主要工作就是接受生产者发过来的消息,分配offset,之后保存到磁盘中;同时接受消费者、其他Broker的请求,根据请求类型进行相应处理并返回响应。
在一般的生产环境中,一个Broker独占一台物理服务器。

消息分发

kafka 消息分发策略

消息是 kafka 中最基本的数据单元,在 kafka 中,一条消息由 key、value 两部分构成,在发送一条消息时,我们可以指定这个 key,那么 producer 会根据 key 和 partition 机
制来判断当前这条消息应该发送并存储到哪个 partition 中。我们可以根据需要进行扩展 producer 的 partition 机制。

消息默认的分发机制

默认情况下,kafka 采用的是 hash 取模的分区算法。如果Key 为 null,则会随机分配一个分区。这个随机是在这个参
数”metadata.max.age.ms”的时间范围内随机选择一个。对于这个时间段内,如果 key 为 null,则只会发送到唯一的分区。
这个值默认情况下是 10 分钟更新一次。
Metadata是Topic/Partition 和 broker 的映射关系,每一个 topic 的每一个 partition,需要知道对应的 broker 列表是什么,leader是谁,follower 是谁。
这些信息都是存储在 Metadata 这个类里面。

消费端消费是也可以指定分区,这样它就不会接收其他分区的数据

1
2
//消费指定的0号分区
TopicPartition topicPartition = new TopicPartition(topic,0);

消息的消费原理

Consumer Group

在实际生产过程中,每个 topic 都会有多个 partitions,多个 partitions 的好处在于,一方面能够对 broker 上的数据进行分片有效减少了消息的容量从而提升 io 性能。另外一方面,为了提高消费端的消费能力,一般会通过多个consumer 去消费同一个 topic ,也就是消费端的负载均衡机制。
在多个 partition 以及多个 consumer 的情况下,消费者是如何消费消息的,kafka存在 Consumer group消费组的概念 ,也就是 group.id 一样的consumer ,这些consumer属于一个消费组,一个consumer只能属于一个消费组,消费组保证其订阅的Topic的每个分区只被分配给该消费组下的一个消费者消费。如果不同的消费组订阅了同一个Topic,消费组之间互不干扰。
如果要实现一个消息可以被多个消费者同时消费(广播)的效果,则将每个消费者放入单独的一个Consumer group消费组中。
如果要实现一个消息只被一个消费者消费(独占)的效果,则将所有的消费者放入一个Consumer group消费组中。

分区分配策略

同一个Consumer group中的消费者对于一个topic中的多个 partition,存在一定的分区分配策略。
在 kafka 中,存在两种分区分配策略,一种是 Range(范围分区,默认)、另一种是 RoundRobin(轮询)。 通过partition.assignment.strategy 这个参数来设置。

Range strategy(范围分区)

Range 策略是对每个Topic而言的,首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。假设有10个分区,3个消费者,排完序的分区将会是 0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C3-0。然后将 partitions 的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。在例子里面,有10个分区,3个消费者线程,10/3=3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区,所以最后分区分配的结果看起来是这样的:
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C3-0 将消费 7, 8, 9 分区

假如有11个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6, 7 分区
C3-0 将消费 8, 9, 10 分区

假如有 2 个Topic(T1 和 T2),分别有 10 个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 T1 Topic的 0, 1, 2, 3 分区以及 T2 Topic的 0,1, 2, 3 分区
C2-0 将消费 T1 Topic的 4, 5, 6 分区以及 T2 Topic的 4, 5,6 分区
C3-0 将消费 T1 Topic的 7, 8, 9 分区以及 T2 Topic的 7, 8,9 分区

可以看出,C1-0 消费者线程比其他消费者线程多消费了2个分区,这就是Range strategy的一个很明显的弊端

RoundRobin strategy(轮询分区)

轮询分区策略是把所有 partition 和所有 consumer 线程都列出来,然后按照 hashcode 进行排序。最后通过轮询算法分配 partition 给消费线程。如果所有 consumer 实例的订阅是相同的,那么 partition 会均匀分布。
例如Topic T1有10个分区,4个消费者,假如按照 hashCode 排序完的 topic partitions 组依次为 T1-0, T1-1, T1-2, T1-3, T1-4, T1-5, T1-6, T1-7, T1-8, T1-9,消费者线程排序为 C1-0, C2-0, C3-0, C4-0,最后分区分配的结果为:
C1-0 将消费 T1-0, T1-4, T1-8 分区;
C2-0 将消费 T1-1, T1-5, T1-9 分区;
C3-0 将消费 T1-2, T1-6 分区;
C4-0 将消费 T1-3, T1-7 分区;

使用轮询分区策略必须满足两个条件

  1. 同一个Consumer Group里面的所有消费者的num.streams必须相等
  2. 每个消费者订阅的主题必须是相同的

分区策略触发

当出现以下几种情况时,kafka 会进行一次分区分配操作,也就是 kafka consumer 的 rebalance

  1. 同一个 consumer group 内新增了消费者
  2. 消费者离开当前所属的 consumer group,比如主动停机或者宕机
  3. topic 新增了分区(也就是分区数量发生了变化)

kafka consuemr 的 rebalance 机制规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic 的每个分区。而具体如何执行分区策略,就是上述提到的两种内置的分区策略。而 kafka 对于分配策略这块,提供了可插拔的实现方式,也就是说,除了这两种之外,还可以创建自己的分配机制。

Rebalance的执行 以及管理consumer的group

Kafka 提供了一个角色:coordinator 来执行对于consumer group的管理,当consumer group 的第一个consumer启动的时候,它会去和broker确定谁是它们组的coordinator,之后该group内的所有成员都会和该coordinator进行协调通信。

确定 coordinator

consumer group如何确定自己的 coordinator, 消费者向kafka集群中的任意一个broker发送一个GroupCoordinatorRequest 请求,服务端会返回一个负载最小的broker节点的id,并将该broker设置为coordinator。

JoinGroup 的过程

在 rebalance 之前,需要保证 coordinator 是已经确定好了的,整个rebalance的过程分为两个步骤,Join 和 Sync
join: 表示加入到 consumer group 中,在这一步中,所有的成员都会向 coordinator 发送 joinGroup 的请求。一旦所有成员都发送了 joinGroup 请求,那么 coordinator 会选择一个consumer担任leader 角色,并把组成员信息和订阅信息发送消费者

syn:完成分区分配之后,就进入了Synchronizing Group State阶段,主要逻辑是向 GroupCoordinator 发送SyncGroupRequest 请求,并且处理 SyncGroupResponse 响应,简单来说,就是leader将消费者对应的 partition 分配方案通过coordinator同步给consumer group中的所有consumer

每个消费者都会向coordinator 发送sync group请求,不过只有leader节点会发送分配方案,其他消费者只是打打酱油而已。当leader把方案发给 coordinator 以后,coordinator 会把结果设置到 SyncGroupResponse中。
这样所有成员都知道自己应该消费哪个分区。

consumer group 的分区分配方案是在客户端执行的,Kafka将这个权利下放给客户端主要是因为这样做可以有更好的灵活性。

保存消费端的消费位置

offset

每个topic可以划分多个分区(每个Topic至少有一个分区),同一topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka 通过 offset 保证消息在分区内的顺序,offset的顺序不跨分区,即kafka 只保证在同一个分区内的消息是有序的;对于应用层的消费来说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个 offset。

offset 保存位置

在 kafka 中,提供了一个__consumer_offsets_* 的一个topic,把 offset信息写入到这个topic中。
__consumer_offsets保存了每个consumer group某一时刻提交的offset信息。 __consumer_offsets 默认有50 个分区。

可以根据groupid确定consumer_group保存在哪个分区中,计算公式Math.abs("groupid".hashCode())%groupMetadataTopicPartitionCount,默认情况下groupMetadataTopicPartitionCount有50个分区

消息的存储

消息的保存路径

kafka 是使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个offset值来表示它在分区中的偏移量。Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,Log 并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的明明规则是(topic_name)_(partition_id)
比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0~2

多个分区在集群中的分配

如果对于一个 topic,在集群中创建多个partition,那么partition是如何分布的

  1. 将所有 N 个Broker 和待分配的 i 个Partition排序
  2. 将第 i 个Partition分配到第(i mod n)个Broker上

消息写入的性能

现在大部分企业仍然用的是机械结构的磁盘,如果把消息以随机的方式写入到磁盘,那么磁盘首先要做的就是寻址,也就是定位到数据所在的物理地址,在磁盘上就要找到对应的柱面、磁头以及对应的扇区;这个过程相对内存来说会消耗大量时间,为了规避随机读写带来的时间消耗,kafka 采用顺序写的方式存储数据。即使是这样,但是频繁的 I/O 操作仍然会造成磁盘的性能瓶颈,所以 kafka 还有一个性能策略

零拷贝

消息从发送到落地保存,broker 维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过 socket 发送给消费者,这个操作上经历了很多步骤。
info

  • 操作系统将数据从磁盘读入到内核空间的页缓存
  • 应用程序将数据从内核空间读入到用户空间缓存中
  • 应用程序将数据写回到内核空间到 socket 缓存中
  • 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出

这个过程涉及到 4 次上下文切换以及 4 次数据复制,并且有两次复制操作是由 CPU 完成。但是这个过程中,数据完全没有进行变化,仅仅是从磁盘复制到网卡缓冲区。
通过”零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。现代的 unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到 socket;
在 Linux 中,是通过 sendfile 系统调用来完成的。Java 提供了访问这个系统调用的方法FileChannel.transferTo API
使用 sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的。

消息的存储原理

消息的文件存储机制

kafka 是通过分段的方式将 Log 分为多个 LogSegment,LogSegment 是一个逻辑上的概念,一个 LogSegment 对应磁盘上的一个日志文件和一个索引文件,其中日志文件是用来记录消息的,索引文件是用来保存消息的索引。

LogSegment

kafka 以 segment 为单位又把 partition 进行细分。每个 partition 相当于一个巨型文件被平均分配到多个大小相等的segment数据文件中
(每个 segment 文件中的消息不一定相等),这种特性方便已经被消费的消息的清理,提高磁盘的利用率。

log.segment.bytes=107370 (设置分段大小),默认是1GB,

segment file 由 2 大部分组成,分别为 index file 和 data file,此 2 个文件一一对应,成对出现,后缀”.index”和”.log”分别表示为 segment 索引文件、数据文件。
segment 文件命名规则:partion 全局的第一个 segment从 0 开始,后续每个 segment 文件名为上一个 segment文件最后一条消息的 offset 值进行递增。
数值最大为 64 位long 大小,20 位数字字符长度,没有数字用 0 填充
通过下面这条命令可以看到 kafka 消息日志的内容

1
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test0/00000000000000000000.log --print-data-log

segment 中 index 和 log 的对应关系

为了提高查找消息的性能,为每一个日志文件添加 2 个索引索引文件:OffsetIndex 和 TimeIndex,分别对应*.index以及*.timeindex,TimeIndex 索引文件格式:它是映射时间戳和相对 offset
例如下图所示
index
如图所示,index 中存储了索引以及物理偏移量。 log 存储了消息的内容。索引文件的元数据执行对应数据文件中message 的物理偏移地址。举个简单的案例来说,以[4053,80899]为例,在 log 文件中,对应的是第4053条记录,物理偏移量(position)为 80899. position 是ByteBuffer的指针位置

在 partition 中通过 offset 查找 message

  1. 根据 offset 的值,查找 segment 段中的 index 索引文件。由于索引文件命名是以上一个文件的最后一个offset 进行命名的,所以,使用二分查找算法能够根据offset 快速定位到指定的索引文件
  2. 找到索引文件后,根据 offset 进行定位,找到索引文件中的符合范围的索引。(kafka 采用稀疏索引的方式来提高查找性能)
  3. 得到 position 以后,再到对应的 log 文件中,从 position 出发开始查找 offset 对应的消息,将每条消息的offset与目标 offset 进行比较,直到找到消息

    例如,要查找 offset=2490 这条消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]这个索引,再到 log 文件中,根据 49111 这个 position 开始查找,比较每条消息的 offset 是否大于等于 2490,最后查找到对应的消息以后返回。

Log 文件的消息内容分析

前面通过kafka提供的命令,可以查看二进制的日志文件信息,一条消息,会包含很多的字段。
例如

1
offset: 5371 position: 102124 CreateTime: 1531477349286 isvalid: true keysize: -1 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: message_5371

createTime 表示创建时间、keysize 和 valuesize 表示 key 和 value 的大小、 compresscodec表示压缩编码、payload表示消息的具体内容

打赏

请我喝杯咖啡吧~

支付宝
微信