kafka原理与实操(一):初学者视角学kafka架构(详细、简洁、总结性长文)

1、kafka概述

在大数据处理时候,数据需要从一个应用程序传输到另一个应用程序,消息系统负责处理这项事务。Kafka 是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台。kafka基于发布-订阅(pub-pub)模式,可以处理大量的数据,适用于离线和在线消息消费。

  • 可靠性 - Kafka是分布式,分区,复制和容错的。
  • 可扩展性 - Kafka消息传递系统轻松缩放,无需停机。
  • 耐用性 - Kafka使用分布式提交日志,这意味着消息会尽可能快地保留在磁盘上,因此它是持久的。
  • 性能 - Kafka对于发布和订阅消息都具有高吞吐量。 即使存储了许多TB的消息,它也保持稳定的性能。

一般的消息系统主要有两种模式:点对点模式和发布订阅模式:

  • 点对点消息系统:一个或者多个消费者可以消耗队列中的消息,但是消息最多只能被一个消费者消费,一旦有一个消费者将其消费掉,消息就从该队列中消失。(有点类似于订单处理系统)
  • 发布订阅消息系统:消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。(有点类似于订阅公众号)

2、kafka架构

kafka基础框架如图所示,主要可以分为生产者(producer)、消费者(consumer),kafka集群。kafka集群是分布式系统,每一个server被称为一个broker。kafka中消息是以topic为单位,同一类消息被发送到同一个topic中,每个topic具有多个分区,并且对应着多个副本,每个分区对应着磁盘上一个文件,该文件使用偏移量(offset)来标记一条消息。具体的架构信息由下图和介绍文字组成。
在这里插入图片描述
(1)Producer :消息生产者,就是向kafka broker发消息的客户端;
(2)Consumer :消息消费者,向kafka broker取消息的客户端;
(3)Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
(4)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
(5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个topic;
(6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列;
(7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
(8)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
(9)follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的leader。

补充两点:

  • 分区和分布式:一个 topic 对应的多个 partition 分散存储到集群中的多个 broker 上,存储方式是一个 partition 对应一个文件,每个 broker 负责存储在自己机器上的 partition 中的消息读写。
  • 副本:kafka可以配置副本文件的数量,对于多个副本文件,每个 partition 选举一个broker 作为“leader”,由 leader 负责所有对该分区的读写,其他 server 作为 follower 只需要简单的与 leader 同步,保持跟进即可。如果原来的 leader 失效,会重新选举由其他的 follower 来成为新的 leader。接下来介绍一下两种副本选举机制:

kafka中broker的leader选举机制:
·
broker中的leader(controller leader):broker启动时,都创建一个kafka Controller对象,但是集群中只能有一个leader对外提供服务,这些每个节点上的KafkaController会在指定的zookeeper路径下创建临时节点,只有第一个成功创建的节点的KafkaController才可以成为leader,其余的都是follower。当leader故障后,所有的follower会收到通知,再次竞争在该路径下创建节点从而选举新的leader。
·

Kafka的Leader选举是通过在zookeeper上创建/controller临时节点来实现leader选举,并在该节点中写入当前broker的信息 {“version”:1,”brokerid”:1,”timestamp”:”1512018424988”} 利用Zookeeper的强一致性特性,一个节点只能被一个客户端创建成功,创建成功的broker即为leader,即先到先得原则,leader也就是集群中的controller,负责集群中所有大小事务。 当leader和zookeeper失去连接时,对应的Controller Path会自动消失(因为它是ephemeral Node),此时该Watch被fire,所有“活” 着的Broker都会去竞选成为新的Controller (创建新的Controller Path),但是只会有一个竞选成功(这点 Zookeeper保证)。竞选成功者即为新的Leader,竞选失败者则重新在新的Controller Path上注册Watch。因为Zookeeper的Watch是一次性的, 被fire一次之后即失效,所以需要重新注册

kafka中partition leader的选举
·
partition leader:负责该分区数据的读写,leader的选举由controller leader执行。具体的操作为
1、从Zookeeper中读取当前分区的所有ISR(in-sync replicas)集合。(ISR将在下文介绍)
2、调用配置的分区选择算法选择分区的leader。
在这里插入图片描述
Kafka会在Zookeeper上针对每个Topic维护一个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是一些分区的副本。只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的生产者。如果这个集合有增减,kafka会更新zookeeper上的记录。如果某个分区的Leader不可用,Kafka就会从ISR集合中选择一个副本作为新的Leader。显然通过ISR,kafka需要的冗余度较低,可以容忍的失败数比较高。假设某个topic有f+1个副本,kafka可以容忍f个服务器不可用。

2.1 工作流程

讲完上面的架构,现在看看消息具体怎么传输的,主要介绍生产过程和消费过程。

(1)生产过程
在这里插入图片描述
一条消息可以有四个参数:topic、partition、key和value,其中key和partition是可选的。一条记录需要进行序列化,放入发送队列中。生产者producer将和topic下所有的partition leader保持socket连接,消息通过producer直接通过topic发送到broker,其中partition leader的位置注册在zookeeper中, producer作为zookeeper client,已经注册了watch监听partition leader的变更事件,因此可以准确地知道谁是当前的leader。producer 端采用异步发送:将多条消息暂且在客户端 buffer 起来,并将他们批量的发送到 broker,小数据 IO 太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。

再说一下partition的组成:
在这里插入图片描述
topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2。“.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中message的物理偏移地址。
在这里插入图片描述

(2)消费过程
在这里插入图片描述
每一个消费者属于一个 consumer group,一个 group 包含多个 consumer。特别需要注意的是:订阅 Topic 是以一个消费组来订阅的,发送到 Topic 的消息,只会被订阅此 Topic 的每个 group 中的一个 consumer 消费。如果所有的 Consumer 都具有相同的 group,那么就像是一个点对点的消息系统;如果每个 consumer 都具有不同的 group,那么消息会广播给所有的消费者。

一个 Partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费,消费组里的每个消费者是关联到一个 partition 的,因此有这样的说法:对于一个 topic,同一个 group 中不能有多于 partitions 个数的 consumer 同时消费,否则将意味着某些 consumer 将无法得到消息。同一个消费组的两个消费者不会同时消费一个 partition。

在 kafka 中,采用了 pull 方式,即 consumer 在和 broker 建立连接之后,主动去 pull(或者说 fetch )消息,首先 consumer 端可以根据自己的消费能力适时的去 fetch 消息并处理,且可以控制消息消费的进度(offset)。

partition 中的消息只有一个 consumer 在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见 kafka broker 端是相当轻量级的。当消息被 consumer 接收之后,需要保存 Offset 记录消费到哪,以前保存在 ZK 中,由于 ZK 的写性能不好,以前的解决方法都是 Consumer 每隔一分钟上报一次,在 0.10 版本后,Kafka 把这个 Offset 的保存,从 ZK 中剥离,保存在一个名叫 __consumer_offsets- topic 的 Topic 中,由此可见,consumer 客户端也很轻量级。

事实上,在每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置.偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据;也可以跳过最近的记录,从"现在"开始消费。

2.2 生产者

上面的内容将kafka几个流程都简单介绍了一下,接下来详解几个流程原理。

2.2.1 分区策略

分区的好处:

  • 方便在集群扩展,可以添加分区,但是不可以减少分区
  • 可以提高并发,可以以partition来进行读写

分区的方法:

  1. 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
  2. 没有指明 partition 值但有key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
  3. 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。

2.2.2 数据可靠性保证

为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。这里并不是说没有收到ack就一直等待,producer是一个异步发送的过程层,它一直在发数据,但是如果没有收到ack,就会重新发送那条没收到ack的数据。
在这里插入图片描述
发送ack的策略如上图所示,kafka选用的是全部follower同步完成之后再发送ack,虽然这种策略网络延迟比较高,但是网络延迟对kafka影响比较小。

Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给producer发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。

Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择不同的ack配置:

  • 0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据;
  • 1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
  • -1(all):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。

如果出现了故障,比如follower或者leader宕机了,该怎么操作呢?
在这里插入图片描述
(1)follower故障
follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
(2)leader故障
leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

2.2.3 Exactly Once语义

ACK不同级别对应不同的数据发送级别:

  • At Least Once语义:将服务器的ACK级别设置为-1,可以保证Producer到Server之间不会丢失数据。
  • At Most Once语义:将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次。
  • Exactly Once语义:数据要求不重复也不丢失

0.11版本的Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:At Least Once + 幂等性 = Exactly Once

要启用幂等性,只需要将Producer的参数中enable.idompotence设置为true即可。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。

但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。

2.3 消费者

2.3.1 消费方式

consumer采用pull(拉)模式从broker中读取数据。

pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。

2.3.2 分区分配策略

一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。其中有两个策略:roundrobin和range

  • round robin:轮询调度
  • range:首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。假如现在有 10 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6,7,8,9;消费者排序完之后将会是C1-0,C2-0,C3-0。通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。

2.3.3 offset的维护

由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets,这个主题的主要作用就是消费者触发重平衡后记录偏移使用的,消费者每次向这个主题发送消息,正常情况下不触发重平衡,这个主题是不起作用的,当触发重平衡后,消费者停止工作,每个消费者可能会分到对应的分区,这个主题就是让消费者能够继续处理消息所设置的。
在这里插入图片描述

  • 如果提交的偏移量小于客户端最后一次处理的偏移量,那么位于两个偏移量之间的消息就会被重复处理;
  • 如果提交的偏移量大于最后一次消费时的偏移量,那么处于两个偏移量中间的消息将会丢失。

这个偏移量的提交方式有:

  • 自动提交:最简单的方式就是让消费者自动提交偏移量。如果 enable.auto.commit 被设置为true,那么每过 5s,消费者会自动把从 poll() 方法轮询到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认是 5s。与消费者里的其他东西一样,自动提交也是在轮询中进行的。消费者在每次轮询中会检查是否提交该偏移量了,如果是,那么就会提交从上一次轮询中返回的偏移量。
  • 提交当前偏移量:把 auto.commit.offset 设置为 false,可以让应用程序决定何时提交偏移量。使用 commitSync() 提交偏移量。这个 API 会提交由 poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。commitSync() 将会提交由 poll() 返回的最新偏移量,如果处理完所有记录后要确保调用了 commitSync(),否则还是会有丢失消息的风险,如果发生了在均衡,从最近一批消息到发生在均衡之间的所有消息都将被重复处理。
  • 异步提交:异步提交 commitAsync() 与同步提交 commitSync() 最大的区别在于异步提交不会进行重试,同步提交会一致进行重试。
  • 同步和异步组合提交:一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大的问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但是如果在关闭消费者或再均衡前的最后一次提交,就要确保提交成功。因此,在消费者关闭之前一般会组合使用commitAsync和commitSync提交偏移量。
  • 提交特定的偏移量:消费者API允许调用 commitSync() 和 commitAsync() 方法时传入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。

2.4 高效读写数据

高效读写数据的原因:

  • 顺序写磁盘:同样的磁盘,顺序写能到到600M/s,而随机写只有100k/s
  • 应用pagecache:Kafka数据持久化是直接持久化到Pagecache中,充分利用所有的空闲内存。
  • 零拷贝技术:所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。

2.5 zookeeper在kafka中作用

Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。Controller的管理工作都是依赖于Zookeeper的。

kafka中配置的各项目录,用于协调consumer和broker。下图就是leader失效之后,kafka中zookeeper控制选举的过程。原理在上面已经讲过了。
在这里插入图片描述
在这里插入图片描述

2.6 kafka事务

Kafka从0.11版本开始引入了事务支持。事务可以保证Kafka在Exactly Once语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

2.6.1 Producer事务

为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。

为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

2.6.2 Consumer事务

上述事务机制主要是从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit的信息被精确消费。这是由于Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

如果想完成Consumer端的精准一次性消费,那么需要kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将kafka的offset保存到支持事务的自定义介质中(比如mysql)。这部分知识会在后续项目部分涉及。

结语

如果现在很迷茫,有很多个选择,那就选一个先坚持走下去吧。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>