博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka-一文读懂架构、源码、调优与面试
阅读量:4119 次
发布时间:2019-05-25

本文共 10105 字,大约阅读时间需要 33 分钟。

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据,Kafka结合了三个关键功能:

  1. 发布(写入)和订阅(读取)事件流,包括从其他系统连续导入/导出数据;
  2. 持久可靠地存储事件流;
  3. 当事件发生或追溯时处理事件流。

所有这些功能都是以分布式、高度可扩展、弹性、容错和安全的方式提供的。Kafka可以部署在裸机硬件、虚拟机和容器上,也可以部署在本地和云中。您可以在自我管理Kafka环境和使用各种供应商提供的完全托管服务之间进行选择。

1.1、消息队列

1.1.1、消息队列两种模式

(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

消费生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费后,queue中不在有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但对一个消息而言,只会有一个消费者可以消费消费。
消息队列点对点
(2)发布/订阅模式(一对多,消费者消费数据之后不会清除消息)
消息生产者(发布)将消息发布到topic中,同事有多个消费者消费(订阅)消费该消息,和点对点方式不同,发布到topic的消息会被所有订阅者消费。
消息队列一对多

  • 拉取模式:消费者主动拉取,需维护长轮询
  • 主动推送:队列主动推送,存在消费者速度跟不上

1.2、架构(broker)

Kafka是一个分布式基于发布/订阅模式的消息队列,主要用于大数据实时处理领域。

Kafka架构
如上图所示,一个典型的 Kafka 体系架构包括若干 Producer(可以是服务器日志,业务数据,页面前端产生的 page view 等等),若干 broker(Kafka 支持水平扩展,一般 broker 数量越多,集群吞吐率越高),若干 Consumer (Group),以及一个 Zookeeper 集群。Kafka 通过 Zookeeper 管理集群配置选举 leader,以及在 consumerGroup 发生变化时进行 rebalance。Producer 使用 push(推) 模式将消息发布到 broker,Consumer 使用 pull(拉) 模式从 broker 订阅并消费消息。
消息特点

1.2.1、Topic&Partition

一个topic可以认为一个一类消息每个topic将被分成多个partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被追加到 log 文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型的数字,它唯一标记一条消息。每条消息都被 append 到 partition 中实现顺序写磁盘,因此效率非常高。

Topic分区
每一条消息被发送到broker中,会根据partition规则选择被存储到哪一个 partition。如果partition规则(设计分区策略)设置的合理,所有消息可以均匀分布到不同的 partition 里,这样就实现了水平扩展。

1.3、高可靠性存储分析

Kafka 的高可靠性的保障来源于其健壮的副本(replication)策略。通过调节其副本相关参数,可以使得 Kafka 在性能和可靠性之间运转的游刃有余。

1.3.1、Kafka 文件存储机制

Kafka中消息按照Topic进行分类,生产者通过topic向Kafka broker 发送消息,消费者通过topic读取数据。然而 topic在物理层面又能以partition为分组,一个topic 可以分成若干个 partition,那么 topic 以及 partition 又是怎么存储的呢?partition 还可以细分为 segment,一个 partition 物理上由多个 segment 组成,那么这些 segment 又是什么呢?

创建一个名称为topic_vms_test的topic,partitions为4,此时在 /tmp/kafka-logs 目录中可以看到生成了 4 个文件目录:

partitions分区目录
在 Kafka 文件存储中,同一个 topic 下有多个不同的 partition,每个 partiton 为一个目录partition 的名称规则为:topic 名称 + 有序序号,第一个序号从 0 开始计,最大的序号为 partition 数量减 1,partition 是实际物理上的概念,而 topic 是逻辑上的概念。
分区处理
每个 partition(目录) 相当于一个巨型文件被平均分配到多个大小相等的 segment(段) 数据文件中(每个 segment 文件中消息数量不一定相等)这种特性也方便 old segment 的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个 partition 只需要支持顺序读写就行,segment的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}等若干参数)决定,一般设定1GB

index 和 log 文件名称以当前 segment 的第一条消息的 offset 命名。下图为 index 文件和 log文件的结构示意图。

日志文件文件组织结构
“.index文件存储大量的索引信息,“.log文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

1.3.2、文件查找

如何从partition中通过offset查找message呢?文件名称命名:文件名称是通过起始偏移量大小命名;

  • 通过偏移量定位segment索引文件
  • 按照二分查找定位文件具体位置
1.3.3、复制原理和同步方式

为了提高消息的可靠性,Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1) 是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移当 Kafka 集群中一个 broker 失效情况下仍然保证服务可用。在 Kafka 中发生复制时确保 partition 的日志能有序地写到其他节点上,N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower, leader 处理 partition 的所有读写请求,与此同时,follower 会被动定期地去复制 leader 上的数据。

复制原理和同步方式
leader 负责维护和跟踪 ISR(In-Sync Replicas 的缩写,表示副本同步队列) 中所有 follower 滞后的状态。当 producer 发送一条消息到 broker 后,leader 写入消息并复制到所有 follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的 follower 限制,重要的是快速检测慢副本,如果 follower“落后”太多或者失效,leader 将会把它从 ISR 中删除。注意:ISR包括leader和follower

1.4、Kafka生产者

1.4.1、分区策略

(1)分区原因

  • 方便在集群中扩展:每个Partition可以通过调整机器以适应所在的机器,而一个topic又可以由多个Partition组成,因此可适应任意大小的数据;
  • 可以提高并发:因为可以以Partition为单位读写;

(2)分区原则

我们需要将producer发送的数据封装陈一个ProducerRecord对象。

  • 指明partition情况下,直接将指明的值直接作为partition值;
  • 没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition;
  • 既没有partition值也没有key情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用partition总数取余得到partition,也就是常说的round-robin轮询算法(rocketMq通过平均策略实现)。
1.4.1、数据可靠性

为了保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。

卡夫卡数据可靠性

1.4.1.1、副本数据同步策略

副本同步策略选择

Kafka选择第二种方案,原因如下:

  • 第二种方案只需要n+1个副本,而Kafka的每个分区都要大量数据,第一种方案会造成大量数据冗余;
  • 虽然第二种方案网络延迟会比较高,但网络延迟对Kafka的影响较小;
1.4.1.2、ISR(同步副本队列):

采用第二种方案后,设想以下情景:leader收到数据后,所有follower都开始同步数据,但只有一个follower因为某种故障,迟迟不能与leader进行同步,那么leader就要一直等下去,直到它完成同步,才发送ack,这个问题怎么解决?

Leader维护了一个动态的in-sync replica set(ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据同步之后,leader就会给follower发送ack,如果follower长时间未向leader同步数据,则该follower将被剔出ISR,该时间阖值由replica.lag.max.ms参数设定。

  • Assigned Replicas: 所有的副本(replicas);
  • ISR: 同步副本集合(Leader负责维护和跟踪的follower集合,ISR 中包括leader 和 follower);
  • OSR:超过时间阈值被踢出ISR,存入到OSR(Outof_Sync Replicas);
  • 新加入follower: 会先存放到OSR中;
  • ISR和AR关系: ISR是AR的一个子集;
  • AR = ISR + OSR;
  • HW(高水位):leader等待该消息所有ISR中的replicas同步后更新HW此消息才能被消费
1.4.1.3、Ack应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。

所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下配置。

(1)acks参配置(保证生产者数据一致性):

  • 0(不管直接发):producer不等待broker的ack,这一操作提供最低的延迟,broker一接收到没有写入磁盘就已经返回,当broker故障时有可能丢失数据
  • 1(只等待leader写完):producer等待broker的ack,partition的leader罗盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据
    在这里插入图片描述
  • -1(all):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复
    在这里插入图片描述
1.4.1.4、(选举)故障处理细节

高水位HW

LEO:指的是每个副本最大的offset;
HW指的是消费者能见到的最大的offset,ISR队列(数据最少的follower)中最小的LEO,保证消费者数据一致性
(1)follower故障
follower发生故障后会被踢出ISR,待该 follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步数。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR(条件)。

(2)leader故障

leader发送故障之后,会从ISR中顺序选出一个新的Leader(不是按照LEO高低选举),之后,为了保证多个副本之间的数据一致性,其余的follower会先将搁置的log文件高于HW的部分截取掉,然后从新的Leader同步数据。

注意:这只能保证副本之间数据一致性,并不能保证数据不丢失或者重复。
1.4.2、Exactly Once语义(RocketMq无)

将服务器的ACK级别设置为-1,可以保证producer和consumer之间不会丢失数据,即At Least Once(至少一次)语义。相对的,将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义

At Least Once可以保证数据不丢失,但是不能保证数据不重复;相对的,At Most Once可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即Exactly Once语义。在0.11版本以前的Kafka,对此是无能威力的,只能保证数据不丢失,再下游消费者对数据做全局去重。对于多个下游应用情况,每个都需要单独做全局去重,对下游消费者性能影响较大。

0.11版本Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:

At Least Once + 幂等性 = Exactly Once;

要启用幂等性,只需要将Producer的参数中enable.idompotence设置为true即可。Kafka的幂等性实现其实就是将原来下游需要做得去重放到数据上游。开启幂等性的Producer在初始化的时候会分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端对
做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。但是PID重启会发生变化,同时不同的Partition也具有不同的主键,所以幂等性无法保证跨分区跨会话的Exactly Once。

(1)消息传递保证三个级别

  • At most once:消息可能会丢失,但绝对不会重复传递;
  • At least once:消息绝不会丢失,但消息可能会重复传递;
  • Exactly once:每条消息只会被传递一次(当节点故障时不能保证);

1.5、Kafka消费者

1.5.1、消费方式

Cnsumer采用pull(拉)模式从broker中读取数据。

Push(推)模式很难适应消费速率不同的消费者,因为消费发送速率是由broker决定的。

Pull 模式不足之处,如果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时候传入一个时长参数timeout。

1.5.2、分区分配策略

一个consumer group中有多个consumer,一个topic有多个partition,所以必然涉及到partition的分配问题,既确定那个partition由那个consumer来消费。

  • RoundRobin(轮询)
    按照group来分,topic的partition被按照轮询方式分配给group的consumer。
  • Range(默认)
    按照topic划分分区(类似RocketMq的Average),将topic里面partition平均分配给consumer,排在前面获取最多(例如:5-part,2-cons; cons1(1-3p),cons2(4-5p)。
1.5.3、offset的维护(类似rocketMq的abort文件)

由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障的位置继续消费,所以consumer需要实时记录自己消费到的那个offset,以便故障恢复后继续消费。

Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始consumer默认将offset保存在Kafka一个内置的topic中,该topic为_consumer_offsets

修改配置文件consumer.properties才能查看系统topic。exclude.internal.topics = false;读取offset:--consumer.config config/consumer.properties;
1.5.4、消费组案例

在这里插入图片描述

1.6、Kafka高效读写数据

1.6.1、顺序写磁盘

按照分区写提高并行度,按照Segment进行顺序写,顺序写600MB/s,而随机写100k/s;

1.6.2、零拷贝技术

使用PageCache和NIC(SocetCache),减少内存交换;

1.6.3、分布式

1.7、Zookeepr作用

1.7.1、ZK节点维护(交互)

Kafka 的 ISR 的管理最终都会反馈到 Zookeeper 节点上。具体位置为:/brokers/topics/[topic]/partitions/[partition]/state。目前有两个地方会对这个 Zookeeper 的节点进行维护:

在这里插入图片描述
(1)Controller 来维护Kafka 集群中的其中一个 Broker 会被选举为 Controller主要负责 Partition 管理和副本状态管理,也会执行类似于重分配 partition 之类的管理任务。符合某些特定条件下,Controller下的 LeaderSelector会选举新的 leaderISR 和新的leader_epoch及controller_epoch写入 Zookeeper的相关节点中。同时发起 LeaderAndIsrRequest通知所有的 replicas。
(2)leader 来维护:leader 有单独的线程定期检测 ISR 中 follower 是否脱离 ISR, 如果发现 ISR 变化,则会将新的 ISR 的信息返回到 Zookeeper 的相关节点中

1.7.2、Controller选举(针对broker服务器)

(1)第一个启动broker会成为controller(首次)

会在ZK上创建一个临时节点/controller,其他broker也会尝试创建,但会失败,此时这些broker会在ZK的/controlelr节点上创建一个监控(Watch)。

(2)抢占式创建成功成为controller

如果controller所在的broker宕机或断开与zk的连接,zk会自动删除/controller节点,其他broker收到通知后,抢占式创建/controller节点,创建成功的broker成为controller;
作用:
每一个Kafka集群任意时刻都只能有一个Controller,所有broker都参与竞选,最终只能有一个broker胜出。

  • 管理每一台broker上对应的分区副本;
  • 管理每一个Topic分区状态;
  • 选取leader。
1.7.3、Leader选举(针对topic的replica)

(1)少数服从多数(NO)

保证较高容错率,必须要有大量副本,大量副本导致性能急剧下降。如果容忍1个follower挂掉,至少需要3个以上副本,如果容忍2个follower挂掉,必须要有5个以上的副本。这种算法更多用在 Zookeeper 这种共享集群配置的系统中而很少在需要大量数据的系统中使用的原因。HDFS 的 HA 功能也是基于“少数服从多数”的方式,但是其数据存储并不是采用这样的方式。

leader 选举的算法非常多,比如 Zookeeper 的 Zab 、 Raft 以及 Viewstamped Replication 。而 Kafka 所使用的 leader 选举算法更像是微软的 PacificA 算法。

(2)ISR中是有follower

从ISR顺序选择follower作为leader;

(3)ISR中没有follower

  • 等待ISR中任意一个replica“活”过来,并选举它作为leader;
  • 选择第一个“活”过来的replica(并不一定是ISR中)作为leader;
  • 默认情况下,Kafka采用第二种策略,即unclean.leader.election.enable=false,也可以将此参数设置为false来启动第一种策略。

1.8、rocketmq与kafka区别

1.8.1、文件存储
  • 可靠性前者异步实时/同步刷盘(可靠性高),同步/异步复制;后者异步刷盘,异步/同步复制;

  • 性能比对:前者稍微低于后者;

  • 消息实时性:前者长轮询毫秒延迟,消息可重试;后者短轮询,消息不重试;

  • 消息顺序:前者严格消息顺序,Broker宕机不会乱序;后者支持消息顺序,Broker宕机后乱序;

  • 定时消息:前者支持定时消息,后者不支持定时消息;

  • 文件存储

    Kafka按topic单独存盘:Kafka按照Partition分区,分区划分为多个Segment,每个Segment由index和log文件组成;

    RocketMQ不同topic消息共存:RocketMq使用CommitLog和IndexFile,不同的topic的消息可能会存储到相同的文件;

  • 文件读取:Kafka能顺序读取,充分利用OS的预读机制;Rocketmq随机读,先读取ConsumerQueue,再随机读取CommitLog日志文件;

1.8.2、适用范围(阿里测试)

在消息发送端,消费端共存的场景下,随着Topic数的增加Kafka吞吐量会急剧下降而RocketMQ则表现稳定。因此,Kafka适合Topic和消费端都比较少的业务场景,而RocketMQ更适合多Topic多消费端的业务场景,Topic数量64是一个分水岭

kafka和rocketMq比对
可以看到,不论Topic数量是多少,Kafka和RocketMQ均能保证发送端和消费端的TPS持平,就是说,保证了消息没有累积。根据Topic数量的变化,画出二者的消息处理能力的对比曲线如下图:
在这里插入图片描述
从图上可以看出:

  • Kafka在Topic数量由64增长到256时,吞吐量下降了98.37%。

  • RocketMQ在Topic数量由64增长到256时,吞吐量只下降了16%。

    为什么两个产品的表现如此悬殊呢?这是因为Kafka的每个Topic、每个分区(在分段)都会对应一个物理文件。当Topic数量增加时,消息分散的落盘策略会导致磁盘IO竞争激烈成为瓶颈(每个Segment文件是1G)。而RocketMQ所有的消息是保存在同一个物理文件中的,Topic和分区数对RocketMQ也只是逻辑概念上的划分,所以Topic数的增加对RocketMQ的性能不会造成太大的影响。

    因此:在topic数量不同场景下,文件存储影响了RocketMq处理能力和Kafka差异

1.9、Kafka使用要点

1.9.1、Kafka压测

Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh,kafka-producer-perf-test.sh)。Kafka压测时,可以查看那些地方出现瓶颈(CPU、内存、网络IO)。一般都是网络IO达到瓶颈

1.9.2、Kafka的机器数量

Kafka机器数量=2* (峰值生产速度*副本数/100)+1;

1.9.3、Kafka日志保存时间

7天;

1.9.4、硬盘大小

每天数据量7天;

1.9.5、Kafka中数据量计算

每天数据量100g,每天产生1亿条日志,10000万/24/60/60=1150条每秒钟

平均每秒钟:1150条;

低谷每秒钟:400条;

高峰每秒钟:1150 * (2-20倍)=2300条-23000条;

每条日志大小:0.5k-2k;

每秒多少数据量:2.3M-20MB;

1.9.6、Kafka挂掉

Flume记录;

日志有记录;

短期没事;

1.9.7、Kafka消息数据积压,Kafka消费能力不足怎么处理

如果是Kafka消费能力不足,则可以考虑增加Topic分区数,并且同时提升消费组的消费数量,消费者数=分区数(两者缺一不可)。部署。

转载地址:http://zjcpi.baihongyu.com/

你可能感兴趣的文章
127个超级实用的JavaScript 代码片段,你千万要收藏好(下)
查看>>
【web素材】03-24款后台管理系统网站模板
查看>>
【5分钟代码练习】02—用CSS Grid实现响应式图片布局的效果
查看>>
Flex 布局教程:语法篇
查看>>
年薪50万+的90后程序员都经历了什么?
查看>>
2019年哪些外快收入可达到2万以上?
查看>>
【JavaScript 教程】标准库—Date 对象
查看>>
前阿里手淘前端负责人@winter:前端人如何保持竞争力?
查看>>
【JavaScript 教程】面向对象编程——实例对象与 new 命令
查看>>
我在网易做了6年前端,想给求职者4条建议
查看>>
SQL1015N The database is in an inconsistent state. SQLSTATE=55025
查看>>
RQP-DEF-0177
查看>>
Linux查看mac地址
查看>>
Linux修改ip
查看>>
MySQL字段类型的选择与MySQL的查询效率
查看>>
Java的Properties配置文件用法【续】
查看>>
JAVA操作properties文件的代码实例
查看>>
IPS开发手记【一】
查看>>
Java通用字符处理类
查看>>
文件上传时生成“日期+随机数”式文件名前缀的Java代码
查看>>